pgdo/coordinate.rs
1//! Safely coordinate use of things that can be controlled.
2//!
3//! For example, if many concurrent processes want to make use of the same
4//! cluster, e.g. as part of a test suite, you can use [`run_and_stop`] to
5//! safely start and use the cluster, then stop it when it's no longer needed:
6//!
7//! ```rust
8//! # use pgdo::{runtime, coordinate, cluster, lock};
9//! let cluster_dir = tempfile::tempdir()?;
10//! let data_dir = cluster_dir.path().join("data");
11//! let strategy = runtime::strategy::Strategy::default();
12//! let cluster = cluster::Cluster::new(&data_dir, strategy)?;
13//! let lock_file = cluster_dir.path().join("lock");
14//! let lock = lock::UnlockedFile::try_from(lock_file.as_path())?;
15//! assert!(coordinate::run_and_stop(&cluster, &[], lock, || cluster::exists(&cluster))?);
16//! # Ok::<(), Box<dyn std::error::Error>>(())
17//! ```
18
19pub mod cleanup;
20mod error;
21pub mod finally;
22pub mod guard;
23pub mod resource;
24
25#[cfg(test)]
26mod tests;
27
28use std::time::Duration;
29
30use either::Either::{Left, Right};
31use rand::RngCore;
32
33use crate::lock;
34pub use error::CoordinateError;
35
36use self::finally::with_finally;
37
38#[derive(Debug, Copy, Clone, PartialEq, Eq)]
39pub enum State {
40 /// The action we requested was performed from this process, e.g. we tried
41 /// to create the subject, and we did indeed create the subject.
42 Modified,
43 /// The action we requested was performed by another process, or was not
44 /// necessary, e.g. we tried to stop the subject but it was already stopped.
45 Unmodified,
46}
47
48/// The trait that these coordinate functions work with.
49pub trait Subject {
50 type Error: std::error::Error + Send + Sync;
51 type Options<'a>: Default;
52 fn start(&self, options: Self::Options<'_>) -> Result<State, Self::Error>;
53 fn stop(&self) -> Result<State, Self::Error>;
54 fn destroy(&self) -> Result<State, Self::Error>;
55 fn exists(&self) -> Result<bool, Self::Error>;
56 fn running(&self) -> Result<bool, Self::Error>;
57}
58
59/// Perform `action` in `subject`.
60///
61/// Using the given lock for synchronisation, this creates the subject if it
62/// does not exist, starts it if it's not running, performs the `action`, then
63/// (maybe) stops the subject again, and finally returns the result of `action`.
64/// If there are other users of the subject – i.e. if an exclusive lock cannot
65/// be acquired during the shutdown phase – then the subject is left running.
66pub fn run_and_stop<S, F, T>(
67 subject: &S,
68 options: S::Options<'_>,
69 lock: lock::UnlockedFile,
70 action: F,
71) -> Result<T, CoordinateError<S::Error>>
72where
73 S: std::panic::RefUnwindSafe + Subject,
74 F: std::panic::UnwindSafe + FnOnce() -> T,
75{
76 let lock = startup(lock, subject, options)?;
77 with_finally(
78 || shutdown::<S, _, _>(lock, || subject.stop()),
79 || -> Result<T, CoordinateError<S::Error>> { Ok(action()) },
80 )
81}
82
83/// Perform `action` in `subject` **if it exists**.
84///
85/// Using the given lock for synchronisation, this starts the subject it if it's
86/// not running, performs the `action`, then (maybe) stops the subject again,
87/// and finally returns the result of `action`. If there are other users of the
88/// subject – i.e. if an exclusive lock cannot be acquired during the shutdown
89/// phase – then the subject is left running.
90pub fn run_and_stop_if_exists<S, F, T>(
91 subject: &S,
92 options: S::Options<'_>,
93 lock: lock::UnlockedFile,
94 action: F,
95) -> Result<T, CoordinateError<S::Error>>
96where
97 S: std::panic::RefUnwindSafe + Subject,
98 F: std::panic::UnwindSafe + FnOnce() -> T,
99{
100 let lock = startup_if_exists(lock, subject, options)?;
101 with_finally(
102 || shutdown::<S, _, _>(lock, || subject.stop()),
103 || -> Result<T, CoordinateError<S::Error>> { Ok(action()) },
104 )
105}
106
107/// Perform `action` in `subject`, destroying the subject before returning.
108///
109/// Similar to [`run_and_stop`] except this attempts to destroy the subject
110/// – i.e. stop the subject and completely delete its data directory – before
111/// returning. If there are other users of the subject – i.e. if an exclusive
112/// lock cannot be acquired during the shutdown phase – then the subject is left
113/// running and is **not** destroyed.
114pub fn run_and_destroy<S, F, T>(
115 subject: &S,
116 options: S::Options<'_>,
117 lock: lock::UnlockedFile,
118 action: F,
119) -> Result<T, CoordinateError<S::Error>>
120where
121 S: std::panic::RefUnwindSafe + Subject,
122 F: std::panic::UnwindSafe + FnOnce() -> T,
123{
124 let lock = startup(lock, subject, options)?;
125 with_finally(
126 || shutdown::<S, _, _>(lock, || subject.destroy()),
127 || -> Result<T, CoordinateError<S::Error>> { Ok(action()) },
128 )
129}
130
131// ----------------------------------------------------------------------------
132
133fn startup<S: Subject>(
134 mut lock: lock::UnlockedFile,
135 control: &S,
136 options: S::Options<'_>,
137) -> Result<lock::LockedFileShared, CoordinateError<S::Error>> {
138 loop {
139 lock = match lock.try_lock_exclusive() {
140 Ok(Left(lock)) => {
141 // The subject is locked elsewhere, shared or exclusively. We
142 // optimistically take a shared lock. If the other lock is also
143 // shared, this will not block. If the other lock is exclusive,
144 // this will block until that lock is released (or changed to a
145 // shared lock).
146 let lock = lock.lock_shared()?;
147 // If obtaining the lock blocked, i.e. the lock elsewhere was
148 // exclusive, then the subject may have been started by the
149 // process that held that exclusive lock. We should check.
150 if control.running().map_err(CoordinateError::ControlError)? {
151 return Ok(lock);
152 }
153 // Release all locks then sleep for a random time between 200ms
154 // and 1000ms in an attempt to make sure that when there are
155 // many competing processes one of them rapidly acquires an
156 // exclusive lock and is able to create and start the subject.
157 let lock = lock.unlock()?;
158 let delay = rand::rng().next_u32();
159 let delay = 200 + (delay % 800);
160 let delay = Duration::from_millis(u64::from(delay));
161 std::thread::sleep(delay);
162 lock
163 }
164 Ok(Right(lock)) => {
165 // We have an exclusive lock, so try to start the subject.
166 control
167 .start(options)
168 .map_err(CoordinateError::ControlError)?;
169 // Once started, downgrade to a shared log.
170 return Ok(lock.lock_shared()?);
171 }
172 Err(err) => return Err(err.into()),
173 };
174 }
175}
176
177fn startup_if_exists<S: Subject>(
178 mut lock: lock::UnlockedFile,
179 subject: &S,
180 options: S::Options<'_>,
181) -> Result<lock::LockedFileShared, CoordinateError<S::Error>> {
182 loop {
183 lock = match lock.try_lock_exclusive() {
184 Ok(Left(lock)) => {
185 // The subject is locked elsewhere, shared or exclusively. We
186 // optimistically take a shared lock. If the other lock is also
187 // shared, this will not block. If the other lock is exclusive,
188 // this will block until that lock is released (or changed to a
189 // shared lock).
190 let lock = lock.lock_shared()?;
191 // If obtaining the lock blocked, i.e. the lock elsewhere was
192 // exclusive, then the subject may have been started by the
193 // process that held that exclusive lock. We should check.
194 if subject.running().map_err(CoordinateError::ControlError)? {
195 return Ok(lock);
196 }
197 // Release all locks then sleep for a random time between 200ms
198 // and 1000ms in an attempt to make sure that when there are
199 // many competing processes one of them rapidly acquires an
200 // exclusive lock and is able to create and start the subject.
201 let lock = lock.unlock()?;
202 let delay = rand::rng().next_u32();
203 let delay = 200 + (delay % 800);
204 let delay = Duration::from_millis(u64::from(delay));
205 std::thread::sleep(delay);
206 lock
207 }
208 Ok(Right(lock)) => {
209 // We have an exclusive lock, so try to start the subject.
210 if subject.exists().map_err(CoordinateError::ControlError)? {
211 subject
212 .start(options)
213 .map_err(CoordinateError::ControlError)?;
214 } else {
215 return Err(CoordinateError::DoesNotExist);
216 }
217 // Once started, downgrade to a shared log.
218 return Ok(lock.lock_shared()?);
219 }
220 Err(err) => return Err(err.into()),
221 };
222 }
223}
224
225fn shutdown<S, F, T>(
226 lock: lock::LockedFileShared,
227 action: F,
228) -> Result<Option<T>, CoordinateError<S::Error>>
229where
230 S: Subject,
231 F: FnOnce() -> Result<T, S::Error>,
232{
233 match lock.try_lock_exclusive() {
234 Ok(Left(lock)) => {
235 // The subject is in use elsewhere. There's nothing more we can do
236 // here.
237 lock.unlock()?;
238 Ok(None)
239 }
240 Ok(Right(lock)) => {
241 // We have an exclusive lock, so we can mutate the subject.
242 match action() {
243 Ok(result) => {
244 lock.unlock()?;
245 Ok(Some(result))
246 }
247 Err(err) => Err(CoordinateError::ControlError(err)),
248 }
249 }
250 Err(err) => Err(err.into()),
251 }
252}