postgresfixture/
coordinate.rs

1//! Safely coordinate use of [`Cluster`].
2//!
3//! For example, if many concurrent processes want to make use of the same
4//! cluster, e.g. as part of a test suite, you can use [`run_and_stop`] to
5//! safely start and use the cluster, then stop it when it's no longer needed:
6//!
7//! ```rust
8//! use postgresfixture::prelude::*;
9//! let cluster_dir = tempdir::TempDir::new("cluster")?;
10//! let data_dir = cluster_dir.path().join("data");
11//! let runtime = strategy::default();
12//! let cluster = Cluster::new(&data_dir, runtime)?;
13//! let lock_file = cluster_dir.path().join("lock");
14//! let lock = lock::UnlockedFile::try_from(lock_file.as_path())?;
15//! assert!(coordinate::run_and_stop(&cluster, lock, cluster::exists)?);
16//! # Ok::<(), ClusterError>(())
17//! ```
18
19use std::time::Duration;
20
21use either::Either::{Left, Right};
22use rand::RngCore;
23
24use crate::cluster::{Cluster, ClusterError};
25use crate::lock;
26
27/// Perform `action` in `cluster`.
28///
29/// Using the given lock for synchronisation, this creates the cluster if it
30/// does not exist, starts it if it's not running, performs the `action`, then
31/// (maybe) stops the cluster again, and finally returns the result of `action`.
32/// If there are other users of the cluster – i.e. if an exclusive lock cannot
33/// be acquired during the shutdown phase – then the cluster is left running.
34pub fn run_and_stop<'a, F, T>(
35    cluster: &'a Cluster,
36    lock: lock::UnlockedFile,
37    action: F,
38) -> Result<T, ClusterError>
39where
40    F: std::panic::UnwindSafe + FnOnce(&'a Cluster) -> T,
41{
42    let lock = startup(cluster, lock)?;
43    let action_res = std::panic::catch_unwind(|| action(cluster));
44    let _: Option<bool> = shutdown(cluster, lock, Cluster::stop)?;
45    match action_res {
46        Ok(result) => Ok(result),
47        Err(err) => std::panic::resume_unwind(err),
48    }
49}
50
51/// Perform `action` in `cluster`, destroying the cluster before returning.
52///
53/// Similar to [`run_and_stop`] except this attempts to destroy the cluster
54/// – i.e. stop the cluster and completely delete its data directory – before
55/// returning. If there are other users of the cluster – i.e. if an exclusive
56/// lock cannot be acquired during the shutdown phase – then the cluster is left
57/// running and is **not** destroyed.
58pub fn run_and_destroy<'a, F, T>(
59    cluster: &'a Cluster,
60    lock: lock::UnlockedFile,
61    action: F,
62) -> Result<T, ClusterError>
63where
64    F: std::panic::UnwindSafe + FnOnce(&'a Cluster) -> T,
65{
66    let lock = startup(cluster, lock)?;
67    let action_res = std::panic::catch_unwind(|| action(cluster));
68    let shutdown_res = shutdown(cluster, lock, Cluster::destroy);
69    match action_res {
70        Ok(result) => shutdown_res.map(|_| result),
71        Err(err) => std::panic::resume_unwind(err),
72    }
73}
74
75fn startup(
76    cluster: &Cluster,
77    mut lock: lock::UnlockedFile,
78) -> Result<lock::LockedFileShared, ClusterError> {
79    loop {
80        lock = match lock.try_lock_exclusive() {
81            Ok(Left(lock)) => {
82                // The cluster is locked exclusively. Switch to a shared lock
83                // optimistically.
84                let lock = lock.lock_shared()?;
85                // The cluster may have been stopped while held in that
86                // exclusive lock, so we must check if the cluster is running
87                // _now_, else loop back to the top again.
88                if cluster.running()? {
89                    return Ok(lock);
90                }
91                // Release all locks then sleep for a random time between 200ms
92                // and 1000ms in an attempt to make sure that when there are
93                // many competing processes one of them rapidly acquires an
94                // exclusive lock and is able to create and start the cluster.
95                let lock = lock.unlock()?;
96                let delay = rand::thread_rng().next_u32();
97                let delay = 200 + (delay % 800);
98                let delay = Duration::from_millis(u64::from(delay));
99                std::thread::sleep(delay);
100                lock
101            }
102            Ok(Right(lock)) => {
103                // We have an exclusive lock, so try to start the cluster.
104                cluster.start()?;
105                // Once started, downgrade to a shared log.
106                return Ok(lock.lock_shared()?);
107            }
108            Err(err) => return Err(err.into()),
109        };
110    }
111}
112
113fn shutdown<F, T>(
114    cluster: &Cluster,
115    lock: lock::LockedFileShared,
116    action: F,
117) -> Result<Option<T>, ClusterError>
118where
119    F: FnOnce(&Cluster) -> Result<T, ClusterError>,
120{
121    match lock.try_lock_exclusive() {
122        Ok(Left(lock)) => {
123            lock.unlock()?;
124            Ok(None)
125        }
126        Ok(Right(lock)) => match action(cluster) {
127            Ok(result) => {
128                lock.unlock()?;
129                Ok(Some(result))
130            }
131            Err(err) => Err(err),
132        },
133        Err(err) => Err(err.into()),
134    }
135}