pgdo/
coordinate.rs

1//! Safely coordinate use of things that can be controlled.
2//!
3//! For example, if many concurrent processes want to make use of the same
4//! cluster, e.g. as part of a test suite, you can use [`run_and_stop`] to
5//! safely start and use the cluster, then stop it when it's no longer needed:
6//!
7//! ```rust
8//! # use pgdo::{runtime, coordinate, cluster, lock};
9//! let cluster_dir = tempfile::tempdir()?;
10//! let data_dir = cluster_dir.path().join("data");
11//! let strategy = runtime::strategy::Strategy::default();
12//! let cluster = cluster::Cluster::new(&data_dir, strategy)?;
13//! let lock_file = cluster_dir.path().join("lock");
14//! let lock = lock::UnlockedFile::try_from(lock_file.as_path())?;
15//! assert!(coordinate::run_and_stop(&cluster, &[], lock, || cluster::exists(&cluster))?);
16//! # Ok::<(), Box<dyn std::error::Error>>(())
17//! ```
18
19pub mod cleanup;
20mod error;
21pub mod finally;
22pub mod guard;
23pub mod resource;
24
25#[cfg(test)]
26mod tests;
27
28use std::time::Duration;
29
30use either::Either::{Left, Right};
31use rand::RngCore;
32
33use crate::lock;
34pub use error::CoordinateError;
35
36use self::finally::with_finally;
37
38#[derive(Debug, Copy, Clone, PartialEq, Eq)]
39pub enum State {
40    /// The action we requested was performed from this process, e.g. we tried
41    /// to create the subject, and we did indeed create the subject.
42    Modified,
43    /// The action we requested was performed by another process, or was not
44    /// necessary, e.g. we tried to stop the subject but it was already stopped.
45    Unmodified,
46}
47
48/// The trait that these coordinate functions work with.
49pub trait Subject {
50    type Error: std::error::Error + Send + Sync;
51    type Options<'a>: Default;
52    fn start(&self, options: Self::Options<'_>) -> Result<State, Self::Error>;
53    fn stop(&self) -> Result<State, Self::Error>;
54    fn destroy(&self) -> Result<State, Self::Error>;
55    fn exists(&self) -> Result<bool, Self::Error>;
56    fn running(&self) -> Result<bool, Self::Error>;
57}
58
59/// Perform `action` in `subject`.
60///
61/// Using the given lock for synchronisation, this creates the subject if it
62/// does not exist, starts it if it's not running, performs the `action`, then
63/// (maybe) stops the subject again, and finally returns the result of `action`.
64/// If there are other users of the subject – i.e. if an exclusive lock cannot
65/// be acquired during the shutdown phase – then the subject is left running.
66pub fn run_and_stop<S, F, T>(
67    subject: &S,
68    options: S::Options<'_>,
69    lock: lock::UnlockedFile,
70    action: F,
71) -> Result<T, CoordinateError<S::Error>>
72where
73    S: std::panic::RefUnwindSafe + Subject,
74    F: std::panic::UnwindSafe + FnOnce() -> T,
75{
76    let lock = startup(lock, subject, options)?;
77    with_finally(
78        || shutdown::<S, _, _>(lock, || subject.stop()),
79        || -> Result<T, CoordinateError<S::Error>> { Ok(action()) },
80    )
81}
82
83/// Perform `action` in `subject` **if it exists**.
84///
85/// Using the given lock for synchronisation, this starts the subject it if it's
86/// not running, performs the `action`, then (maybe) stops the subject again,
87/// and finally returns the result of `action`. If there are other users of the
88/// subject – i.e. if an exclusive lock cannot be acquired during the shutdown
89/// phase – then the subject is left running.
90pub fn run_and_stop_if_exists<S, F, T>(
91    subject: &S,
92    options: S::Options<'_>,
93    lock: lock::UnlockedFile,
94    action: F,
95) -> Result<T, CoordinateError<S::Error>>
96where
97    S: std::panic::RefUnwindSafe + Subject,
98    F: std::panic::UnwindSafe + FnOnce() -> T,
99{
100    let lock = startup_if_exists(lock, subject, options)?;
101    with_finally(
102        || shutdown::<S, _, _>(lock, || subject.stop()),
103        || -> Result<T, CoordinateError<S::Error>> { Ok(action()) },
104    )
105}
106
107/// Perform `action` in `subject`, destroying the subject before returning.
108///
109/// Similar to [`run_and_stop`] except this attempts to destroy the subject
110/// – i.e. stop the subject and completely delete its data directory – before
111/// returning. If there are other users of the subject – i.e. if an exclusive
112/// lock cannot be acquired during the shutdown phase – then the subject is left
113/// running and is **not** destroyed.
114pub fn run_and_destroy<S, F, T>(
115    subject: &S,
116    options: S::Options<'_>,
117    lock: lock::UnlockedFile,
118    action: F,
119) -> Result<T, CoordinateError<S::Error>>
120where
121    S: std::panic::RefUnwindSafe + Subject,
122    F: std::panic::UnwindSafe + FnOnce() -> T,
123{
124    let lock = startup(lock, subject, options)?;
125    with_finally(
126        || shutdown::<S, _, _>(lock, || subject.destroy()),
127        || -> Result<T, CoordinateError<S::Error>> { Ok(action()) },
128    )
129}
130
131// ----------------------------------------------------------------------------
132
133fn startup<S: Subject>(
134    mut lock: lock::UnlockedFile,
135    control: &S,
136    options: S::Options<'_>,
137) -> Result<lock::LockedFileShared, CoordinateError<S::Error>> {
138    loop {
139        lock = match lock.try_lock_exclusive() {
140            Ok(Left(lock)) => {
141                // The subject is locked elsewhere, shared or exclusively. We
142                // optimistically take a shared lock. If the other lock is also
143                // shared, this will not block. If the other lock is exclusive,
144                // this will block until that lock is released (or changed to a
145                // shared lock).
146                let lock = lock.lock_shared()?;
147                // If obtaining the lock blocked, i.e. the lock elsewhere was
148                // exclusive, then the subject may have been started by the
149                // process that held that exclusive lock. We should check.
150                if control.running().map_err(CoordinateError::ControlError)? {
151                    return Ok(lock);
152                }
153                // Release all locks then sleep for a random time between 200ms
154                // and 1000ms in an attempt to make sure that when there are
155                // many competing processes one of them rapidly acquires an
156                // exclusive lock and is able to create and start the subject.
157                let lock = lock.unlock()?;
158                let delay = rand::rng().next_u32();
159                let delay = 200 + (delay % 800);
160                let delay = Duration::from_millis(u64::from(delay));
161                std::thread::sleep(delay);
162                lock
163            }
164            Ok(Right(lock)) => {
165                // We have an exclusive lock, so try to start the subject.
166                control
167                    .start(options)
168                    .map_err(CoordinateError::ControlError)?;
169                // Once started, downgrade to a shared log.
170                return Ok(lock.lock_shared()?);
171            }
172            Err(err) => return Err(err.into()),
173        };
174    }
175}
176
177fn startup_if_exists<S: Subject>(
178    mut lock: lock::UnlockedFile,
179    subject: &S,
180    options: S::Options<'_>,
181) -> Result<lock::LockedFileShared, CoordinateError<S::Error>> {
182    loop {
183        lock = match lock.try_lock_exclusive() {
184            Ok(Left(lock)) => {
185                // The subject is locked elsewhere, shared or exclusively. We
186                // optimistically take a shared lock. If the other lock is also
187                // shared, this will not block. If the other lock is exclusive,
188                // this will block until that lock is released (or changed to a
189                // shared lock).
190                let lock = lock.lock_shared()?;
191                // If obtaining the lock blocked, i.e. the lock elsewhere was
192                // exclusive, then the subject may have been started by the
193                // process that held that exclusive lock. We should check.
194                if subject.running().map_err(CoordinateError::ControlError)? {
195                    return Ok(lock);
196                }
197                // Release all locks then sleep for a random time between 200ms
198                // and 1000ms in an attempt to make sure that when there are
199                // many competing processes one of them rapidly acquires an
200                // exclusive lock and is able to create and start the subject.
201                let lock = lock.unlock()?;
202                let delay = rand::rng().next_u32();
203                let delay = 200 + (delay % 800);
204                let delay = Duration::from_millis(u64::from(delay));
205                std::thread::sleep(delay);
206                lock
207            }
208            Ok(Right(lock)) => {
209                // We have an exclusive lock, so try to start the subject.
210                if subject.exists().map_err(CoordinateError::ControlError)? {
211                    subject
212                        .start(options)
213                        .map_err(CoordinateError::ControlError)?;
214                } else {
215                    return Err(CoordinateError::DoesNotExist);
216                }
217                // Once started, downgrade to a shared log.
218                return Ok(lock.lock_shared()?);
219            }
220            Err(err) => return Err(err.into()),
221        };
222    }
223}
224
225fn shutdown<S, F, T>(
226    lock: lock::LockedFileShared,
227    action: F,
228) -> Result<Option<T>, CoordinateError<S::Error>>
229where
230    S: Subject,
231    F: FnOnce() -> Result<T, S::Error>,
232{
233    match lock.try_lock_exclusive() {
234        Ok(Left(lock)) => {
235            // The subject is in use elsewhere. There's nothing more we can do
236            // here.
237            lock.unlock()?;
238            Ok(None)
239        }
240        Ok(Right(lock)) => {
241            // We have an exclusive lock, so we can mutate the subject.
242            match action() {
243                Ok(result) => {
244                    lock.unlock()?;
245                    Ok(Some(result))
246                }
247                Err(err) => Err(CoordinateError::ControlError(err)),
248            }
249        }
250        Err(err) => Err(err.into()),
251    }
252}