1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248
//! Safely coordinate use of things that can be controlled.
//!
//! For example, if many concurrent processes want to make use of the same
//! cluster, e.g. as part of a test suite, you can use [`run_and_stop`] to
//! safely start and use the cluster, then stop it when it's no longer needed:
//!
//! ```rust
//! # use pgdo::{runtime, coordinate, cluster, lock};
//! let cluster_dir = tempfile::tempdir()?;
//! let data_dir = cluster_dir.path().join("data");
//! let strategy = runtime::strategy::Strategy::default();
//! let cluster = cluster::Cluster::new(&data_dir, strategy)?;
//! let lock_file = cluster_dir.path().join("lock");
//! let lock = lock::UnlockedFile::try_from(lock_file.as_path())?;
//! assert!(coordinate::run_and_stop(&cluster, &[], lock, || cluster::exists(&cluster))?);
//! # Ok::<(), Box<dyn std::error::Error>>(())
//! ```
pub mod cleanup;
mod error;
pub mod finally;
pub mod guard;
pub mod resource;
#[cfg(test)]
mod tests;
use std::time::Duration;
use either::Either::{Left, Right};
use rand::RngCore;
use crate::lock;
pub use error::CoordinateError;
use self::finally::with_finally;
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum State {
/// The action we requested was performed from this process, e.g. we tried
/// to create the subject, and we did indeed create the subject.
Modified,
/// The action we requested was performed by another process, or was not
/// necessary, e.g. we tried to stop the subject but it was already stopped.
Unmodified,
}
/// The trait that these coordinate functions work with.
pub trait Subject {
type Error: std::error::Error + Send + Sync;
type Options<'a>: Default;
fn start(&self, options: Self::Options<'_>) -> Result<State, Self::Error>;
fn stop(&self) -> Result<State, Self::Error>;
fn destroy(&self) -> Result<State, Self::Error>;
fn exists(&self) -> Result<bool, Self::Error>;
fn running(&self) -> Result<bool, Self::Error>;
}
/// Perform `action` in `subject`.
///
/// Using the given lock for synchronisation, this creates the subject if it
/// does not exist, starts it if it's not running, performs the `action`, then
/// (maybe) stops the subject again, and finally returns the result of `action`.
/// If there are other users of the subject – i.e. if an exclusive lock cannot
/// be acquired during the shutdown phase – then the subject is left running.
pub fn run_and_stop<S, F, T>(
subject: &S,
options: S::Options<'_>,
lock: lock::UnlockedFile,
action: F,
) -> Result<T, CoordinateError<S::Error>>
where
S: std::panic::RefUnwindSafe + Subject,
F: std::panic::UnwindSafe + FnOnce() -> T,
{
let lock = startup(lock, subject, options)?;
with_finally(
|| shutdown::<S, _, _>(lock, || subject.stop()),
|| -> Result<T, CoordinateError<S::Error>> { Ok(action()) },
)
}
/// Perform `action` in `subject` **if it exists**.
///
/// Using the given lock for synchronisation, this starts the subject it if it's
/// not running, performs the `action`, then (maybe) stops the subject again,
/// and finally returns the result of `action`. If there are other users of the
/// subject – i.e. if an exclusive lock cannot be acquired during the shutdown
/// phase – then the subject is left running.
pub fn run_and_stop_if_exists<S, F, T>(
subject: &S,
options: S::Options<'_>,
lock: lock::UnlockedFile,
action: F,
) -> Result<T, CoordinateError<S::Error>>
where
S: std::panic::RefUnwindSafe + Subject,
F: std::panic::UnwindSafe + FnOnce() -> T,
{
let lock = startup_if_exists(lock, subject, options)?;
with_finally(
|| shutdown::<S, _, _>(lock, || subject.stop()),
|| -> Result<T, CoordinateError<S::Error>> { Ok(action()) },
)
}
/// Perform `action` in `subject`, destroying the subject before returning.
///
/// Similar to [`run_and_stop`] except this attempts to destroy the subject
/// – i.e. stop the subject and completely delete its data directory – before
/// returning. If there are other users of the subject – i.e. if an exclusive
/// lock cannot be acquired during the shutdown phase – then the subject is left
/// running and is **not** destroyed.
pub fn run_and_destroy<S, F, T>(
subject: &S,
options: S::Options<'_>,
lock: lock::UnlockedFile,
action: F,
) -> Result<T, CoordinateError<S::Error>>
where
S: std::panic::RefUnwindSafe + Subject,
F: std::panic::UnwindSafe + FnOnce() -> T,
{
let lock = startup(lock, subject, options)?;
with_finally(
|| shutdown::<S, _, _>(lock, || subject.destroy()),
|| -> Result<T, CoordinateError<S::Error>> { Ok(action()) },
)
}
// ----------------------------------------------------------------------------
fn startup<S: Subject>(
mut lock: lock::UnlockedFile,
control: &S,
options: S::Options<'_>,
) -> Result<lock::LockedFileShared, CoordinateError<S::Error>> {
loop {
lock = match lock.try_lock_exclusive() {
Ok(Left(lock)) => {
// The subject is locked exclusively by someone/something else.
// Switch to a shared lock optimistically. This blocks until we
// get the shared lock.
let lock = lock.lock_shared()?;
// The subject may have been started while that exclusive lock
// was held, so we must check if the subject is running now –
// otherwise we loop back to the top again.
if control.running().map_err(CoordinateError::ControlError)? {
return Ok(lock);
}
// Release all locks then sleep for a random time between 200ms
// and 1000ms in an attempt to make sure that when there are
// many competing processes one of them rapidly acquires an
// exclusive lock and is able to create and start the subject.
let lock = lock.unlock()?;
let delay = rand::thread_rng().next_u32();
let delay = 200 + (delay % 800);
let delay = Duration::from_millis(u64::from(delay));
std::thread::sleep(delay);
lock
}
Ok(Right(lock)) => {
// We have an exclusive lock, so try to start the subject.
control
.start(options)
.map_err(CoordinateError::ControlError)?;
// Once started, downgrade to a shared log.
return Ok(lock.lock_shared()?);
}
Err(err) => return Err(err.into()),
};
}
}
fn startup_if_exists<S: Subject>(
mut lock: lock::UnlockedFile,
subject: &S,
options: S::Options<'_>,
) -> Result<lock::LockedFileShared, CoordinateError<S::Error>> {
loop {
lock = match lock.try_lock_exclusive() {
Ok(Left(lock)) => {
// The subject is locked exclusively by someone/something else.
// Switch to a shared lock optimistically. This blocks until we
// get the shared lock.
let lock = lock.lock_shared()?;
// The subject may have been started while that exclusive lock
// was held, so we must check if the subject is running now –
// otherwise we loop back to the top again.
if subject.running().map_err(CoordinateError::ControlError)? {
return Ok(lock);
}
// Release all locks then sleep for a random time between 200ms
// and 1000ms in an attempt to make sure that when there are
// many competing processes one of them rapidly acquires an
// exclusive lock and is able to create and start the subject.
let lock = lock.unlock()?;
let delay = rand::thread_rng().next_u32();
let delay = 200 + (delay % 800);
let delay = Duration::from_millis(u64::from(delay));
std::thread::sleep(delay);
lock
}
Ok(Right(lock)) => {
// We have an exclusive lock, so try to start the subject.
if subject.exists().map_err(CoordinateError::ControlError)? {
subject
.start(options)
.map_err(CoordinateError::ControlError)?;
} else {
return Err(CoordinateError::DoesNotExist);
}
// Once started, downgrade to a shared log.
return Ok(lock.lock_shared()?);
}
Err(err) => return Err(err.into()),
};
}
}
fn shutdown<S, F, T>(
lock: lock::LockedFileShared,
action: F,
) -> Result<Option<T>, CoordinateError<S::Error>>
where
S: Subject,
F: FnOnce() -> Result<T, S::Error>,
{
match lock.try_lock_exclusive() {
Ok(Left(lock)) => {
// The subject is in use by someone/something else. There's nothing
// more we can do here.
lock.unlock()?;
Ok(None)
}
Ok(Right(lock)) => {
// We have an exclusive lock, so we can mutate the subject.
match action() {
Ok(result) => {
lock.unlock()?;
Ok(Some(result))
}
Err(err) => Err(CoordinateError::ControlError(err)),
}
}
Err(err) => Err(err.into()),
}
}