pgdo/cluster/
resource.rs

1//! A [resource][`crate::coordinate::resource`] for a [`Cluster`].
2
3use std::time::Duration;
4use std::{ffi::OsStr, process::ExitStatus};
5
6use either::{Either, Left, Right};
7use rand::RngCore;
8
9use super::{
10    coordinate::{resource, CoordinateError, State},
11    exists, Cluster, ClusterError,
12};
13
14// ----------------------------------------------------------------------------
15
16pub type ResourceFree = resource::ResourceFree<Cluster>;
17pub type ResourceShared = resource::ResourceShared<Cluster>;
18pub type ResourceExclusive = resource::ResourceExclusive<Cluster>;
19
20// ----------------------------------------------------------------------------
21
22pub type Error = CoordinateError<ClusterError>;
23
24impl From<ClusterError> for Error {
25    fn from(err: ClusterError) -> Self {
26        Self::ControlError(err)
27    }
28}
29
30// ----------------------------------------------------------------------------
31
32impl resource::FacetFree for Cluster {
33    type FacetFree<'a> = ClusterFree<'a>;
34
35    fn facet_free(&self) -> Self::FacetFree<'_> {
36        ClusterFree { cluster: self }
37    }
38}
39
40impl resource::FacetShared for Cluster {
41    type FacetShared<'a> = ClusterShared<'a>;
42
43    fn facet_shared(&self) -> Self::FacetShared<'_> {
44        ClusterShared { cluster: self }
45    }
46}
47
48impl resource::FacetExclusive for Cluster {
49    type FacetExclusive<'a> = ClusterExclusive<'a>;
50
51    fn facet_exclusive(&self) -> Self::FacetExclusive<'_> {
52        ClusterExclusive { cluster: self }
53    }
54}
55
56// ----------------------------------------------------------------------------
57
58pub struct ClusterFree<'a> {
59    cluster: &'a Cluster,
60}
61
62/// When the cluster is not locked, all one can do is check for its existence
63/// and if it is running. However, be careful of TOCTOU errors if you're using
64/// this for more than informational purposes.
65///
66/// [TOCTOU]: https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
67impl ClusterFree<'_> {
68    pub fn exists(&self) -> Result<bool, ClusterError> {
69        Ok(exists(self.cluster))
70    }
71
72    pub fn running(&self) -> Result<bool, ClusterError> {
73        self.cluster.running()
74    }
75}
76
77// ----------------------------------------------------------------------------
78
79pub struct ClusterShared<'a> {
80    cluster: &'a Cluster,
81}
82
83/// When the cluster is shared, one can connect to the cluster, and execute
84/// processes. It is possible to abuse this and shutdown the cluster, for
85/// example, but that's on you; there's only so much that this library can do to
86/// prevent misuse.
87impl ClusterShared<'_> {
88    pub fn exists(&self) -> Result<bool, ClusterError> {
89        Ok(exists(self.cluster))
90    }
91
92    pub fn running(&self) -> Result<bool, ClusterError> {
93        self.cluster.running()
94    }
95
96    /// Forwards to [`Cluster::pool`].
97    pub fn pool(&self, database: Option<&str>) -> Result<sqlx::PgPool, ClusterError> {
98        self.cluster.pool(database)
99    }
100
101    /// Forwards to [`Cluster::exec`].
102    pub fn exec<T: AsRef<OsStr>>(
103        &self,
104        database: Option<&str>,
105        command: T,
106        args: &[T],
107    ) -> Result<ExitStatus, ClusterError> {
108        self.cluster.exec(database, command, args)
109    }
110}
111
112// ----------------------------------------------------------------------------
113
114pub struct ClusterExclusive<'a> {
115    cluster: &'a Cluster,
116}
117
118/// When you have exclusive control of a cluster, you can start, stop, destroy,
119/// reconfigure it – anything.
120impl ClusterExclusive<'_> {
121    pub fn start(&self, options: super::Options<'_>) -> Result<State, ClusterError> {
122        self.cluster.start(options)
123    }
124
125    pub fn stop(&self) -> Result<State, ClusterError> {
126        self.cluster.stop()
127    }
128
129    pub fn destroy(&self) -> Result<State, ClusterError> {
130        self.cluster.destroy()
131    }
132
133    pub fn exists(&self) -> Result<bool, ClusterError> {
134        Ok(exists(self.cluster))
135    }
136
137    pub fn running(&self) -> Result<bool, ClusterError> {
138        self.cluster.running()
139    }
140
141    /// Forwards to [`Cluster::pool`].
142    pub fn pool(&self, database: Option<&str>) -> Result<sqlx::PgPool, ClusterError> {
143        self.cluster.pool(database)
144    }
145
146    /// Forwards to [`Cluster::exec`].
147    pub fn exec<T: AsRef<OsStr>>(
148        &self,
149        database: Option<&str>,
150        command: T,
151        args: &[T],
152    ) -> Result<ExitStatus, ClusterError> {
153        self.cluster.exec(database, command, args)
154    }
155}
156
157// ----------------------------------------------------------------------------
158
159/// A [`ResourceShared`] or a [`ResourceExclusive`].
160pub type HeldResource = Either<ResourceShared, ResourceExclusive>;
161
162/// Creates the cluster, if it doesn't already exist, and starts it in a
163/// cooperative manner.
164///
165/// The return value has two parts: the state, [`State`], and the resource,
166/// [`HeldResource`].
167///
168/// The state is [`State::Unmodified`] if the cluster was already running, else
169/// [`State::Modified`] if the cluster was created or started by this function.
170///
171/// The resource is [`Left(ResourceShared)`] if the cluster is already in use,
172/// or [`Right(ResourceExclusive)`] otherwise. Typically one would drop the
173/// exclusive hold down to shared as soon as possible, but the option is there
174/// to do maintenance, for example, that requires an exclusive hold.
175pub fn startup(
176    mut resource: ResourceFree,
177    options: super::Options<'_>,
178) -> Result<(State, HeldResource), Error> {
179    loop {
180        resource = match resource.try_exclusive() {
181            Ok(Left(resource)) => {
182                // The resource is locked exclusively by someone/something else.
183                // Switch to a shared lock optimistically. This blocks until we
184                // get the shared lock.
185                let resource = resource.shared()?;
186                // The resource may have been started while that exclusive lock
187                // was held, so we must check if the resource is running now –
188                // otherwise we loop back to the top again.
189                if resource.facet().running()? {
190                    return Ok((State::Unmodified, Left(resource)));
191                }
192                // Release all locks then sleep for a random time between 200ms
193                // and 1000ms in an attempt to make sure that when there are
194                // many competing processes one of them rapidly acquires an
195                // exclusive lock and is able to create and start the resource.
196                let resource = resource.release()?;
197                let delay = rand::rng().next_u32();
198                let delay = 200 + (delay % 800);
199                let delay = Duration::from_millis(u64::from(delay));
200                std::thread::sleep(delay);
201                resource
202            }
203            Ok(Right(resource)) => {
204                // We have an exclusive lock, so try to start the resource.
205                let state = resource.facet().start(options)?;
206                return Ok((state, Right(resource)));
207            }
208            Err(err) => return Err(err),
209        };
210    }
211}
212
213/// Similar to [`startup`] but does not create the cluster, and thus only
214/// succeeds if the cluster already exists.
215pub fn startup_if_exists(
216    mut resource: ResourceFree,
217    options: super::Options<'_>,
218) -> Result<(State, HeldResource), Error> {
219    loop {
220        resource = match resource.try_exclusive() {
221            Ok(Left(resource)) => {
222                // The resource is locked exclusively by someone/something else.
223                // Switch to a shared lock optimistically. This blocks until we
224                // get the shared lock.
225                let resource = resource.shared()?;
226                // The resource may have been started while that exclusive lock
227                // was held, so we must check if the resource is running now –
228                // otherwise we loop back to the top again.
229                if resource.facet().running()? {
230                    return Ok((State::Unmodified, Left(resource)));
231                }
232                // Release all locks then sleep for a random time between 200ms
233                // and 1000ms in an attempt to make sure that when there are
234                // many competing processes one of them rapidly acquires an
235                // exclusive lock and is able to create and start the resource.
236                let resource = resource.release()?;
237                let delay = rand::rng().next_u32();
238                let delay = 200 + (delay % 800);
239                let delay = Duration::from_millis(u64::from(delay));
240                std::thread::sleep(delay);
241                resource
242            }
243            Ok(Right(resource)) => {
244                // We have an exclusive lock, so try to start the resource.
245                let facet = resource.facet();
246                let state = if facet.exists()? {
247                    facet.start(options)?
248                } else {
249                    return Err(CoordinateError::DoesNotExist);
250                };
251                return Ok((state, Right(resource)));
252            }
253            Err(err) => return Err(err),
254        };
255    }
256}
257
258/// Shuts down the cluster if it is running and if there are no other concurrent
259/// users.
260///
261/// The return value has two parts: the state, [`State`], and the resource.
262///
263/// The state is [`State::Unmodified`] if the cluster could not be shut down or
264/// if it was already shut down, else [`State::Modified`].
265///
266/// The resource is [`Left(ResourceShared)`] if the cluster is already in use –
267/// i.e. the resource passed in is returned – else [`Right(ResourceExclusive)`]
268/// otherwise.
269pub fn shutdown(resource: ResourceShared) -> Result<(State, HeldResource), Error> {
270    match resource.try_exclusive() {
271        Ok(Left(resource)) => {
272            // The resource is in use by someone/something else. There's nothing
273            // more we can do here.
274            Ok((State::Unmodified, Left(resource)))
275        }
276        Ok(Right(resource)) => {
277            // We have an exclusive lock, so we can mutate the resource.
278            match resource.facet().stop() {
279                Ok(state) => Ok((state, Right(resource))),
280                Err(err) => {
281                    resource.release()?;
282                    Err(err)?
283                }
284            }
285        }
286        Err(err) => Err(err),
287    }
288}
289
290/// Similar to [`shutdown`] but also attempts to destroy the cluster, i.e.
291/// remove it entirely from the filesystem.
292pub fn destroy(resource: ResourceShared) -> Result<(State, HeldResource), Error> {
293    match resource.try_exclusive() {
294        Ok(Left(resource)) => {
295            // The resource is in use by someone/something else. There's nothing
296            // more we can do here.
297            Ok((State::Unmodified, Left(resource)))
298        }
299        Ok(Right(resource)) => {
300            // We have an exclusive lock, so we can mutate the resource.
301            match resource.facet().destroy() {
302                Ok(state) => Ok((state, Right(resource))),
303                Err(err) => {
304                    resource.release()?;
305                    Err(err)?
306                }
307            }
308        }
309        Err(err) => Err(err),
310    }
311}