1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
//! A [resource][`crate::coordinate::resource`] for a [`Cluster`].

use std::time::Duration;
use std::{ffi::OsStr, process::ExitStatus};

use either::{Either, Left, Right};
use rand::RngCore;

use super::{
    coordinate::{resource, CoordinateError, State},
    exists, Cluster, ClusterError,
};

// ----------------------------------------------------------------------------

pub type ResourceFree<'a> = resource::ResourceFree<'a, Cluster>;
pub type ResourceShared<'a> = resource::ResourceShared<'a, Cluster>;
pub type ResourceExclusive<'a> = resource::ResourceExclusive<'a, Cluster>;

// ----------------------------------------------------------------------------

pub type Error = CoordinateError<ClusterError>;

impl From<ClusterError> for Error {
    fn from(err: ClusterError) -> Self {
        Self::ControlError(err)
    }
}

// ----------------------------------------------------------------------------

impl<'a> resource::Faceted<'a> for Cluster {
    type FacetFree = ClusterFree<'a>;
    type FacetShared = ClusterShared<'a>;
    type FacetExclusive = ClusterExclusive<'a>;

    fn facet_free(&'a self) -> Self::FacetFree {
        ClusterFree { cluster: self }
    }

    fn facet_shared(&'a self) -> Self::FacetShared {
        ClusterShared { cluster: self }
    }

    fn facet_exclusive(&'a self) -> Self::FacetExclusive {
        ClusterExclusive { cluster: self }
    }
}

// ----------------------------------------------------------------------------

pub struct ClusterFree<'a> {
    cluster: &'a Cluster,
}

/// When the cluster is not locked, all one can do is check for its existence
/// and if it is running. However, be careful of TOCTOU errors if you're using
/// this for more than informational purposes.
///
/// [TOCTOU]: https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
impl<'a> ClusterFree<'a> {
    pub fn exists(&self) -> Result<bool, ClusterError> {
        Ok(exists(self.cluster))
    }

    pub fn running(&self) -> Result<bool, ClusterError> {
        self.cluster.running()
    }
}

// ----------------------------------------------------------------------------

pub struct ClusterShared<'a> {
    cluster: &'a Cluster,
}

/// When the cluster is shared, one can connect to the cluster, and execute
/// processes. It is possible to abuse this and shutdown the cluster, for
/// example, but that's on you; there's only so much that this library can do to
/// prevent misuse.
impl<'a> ClusterShared<'a> {
    pub fn exists(&self) -> Result<bool, ClusterError> {
        Ok(exists(self.cluster))
    }

    pub fn running(&self) -> Result<bool, ClusterError> {
        self.cluster.running()
    }

    /// Forwards to [`Cluster::pool`].
    pub fn pool(&self, database: Option<&str>) -> Result<sqlx::PgPool, ClusterError> {
        self.cluster.pool(database)
    }

    /// Forwards to [`Cluster::exec`].
    pub fn exec<T: AsRef<OsStr>>(
        &self,
        database: Option<&str>,
        command: T,
        args: &[T],
    ) -> Result<ExitStatus, ClusterError> {
        self.cluster.exec(database, command, args)
    }
}

// ----------------------------------------------------------------------------

pub struct ClusterExclusive<'a> {
    cluster: &'a Cluster,
}

/// When you have exclusive control of a cluster, you can start, stop, destroy,
/// reconfigure it – anything.
impl<'a> ClusterExclusive<'a> {
    pub fn start(&self, options: super::Options<'_>) -> Result<State, ClusterError> {
        self.cluster.start(options)
    }

    pub fn stop(&self) -> Result<State, ClusterError> {
        self.cluster.stop()
    }

    pub fn destroy(&self) -> Result<State, ClusterError> {
        self.cluster.destroy()
    }

    pub fn exists(&self) -> Result<bool, ClusterError> {
        Ok(exists(self.cluster))
    }

    pub fn running(&self) -> Result<bool, ClusterError> {
        self.cluster.running()
    }

    /// Forwards to [`Cluster::pool`].
    pub fn pool(&self, database: Option<&str>) -> Result<sqlx::PgPool, ClusterError> {
        self.cluster.pool(database)
    }

    /// Forwards to [`Cluster::exec`].
    pub fn exec<T: AsRef<OsStr>>(
        &self,
        database: Option<&str>,
        command: T,
        args: &[T],
    ) -> Result<ExitStatus, ClusterError> {
        self.cluster.exec(database, command, args)
    }
}

// ----------------------------------------------------------------------------

pub type StartupResource<'a> = Either<ResourceShared<'a>, ResourceExclusive<'a>>;

/// Creates the cluster, if it doesn't already exist, and starts it in a
/// cooperative manner.
///
/// The return value has two parts: the state, [`State`], and the resource,
/// [`StartupResource`].
///
/// The state is [`State::Unmodified`] if the cluster was already running, else
/// [`State::Modified`] if the cluster was created or started by this function.
///
/// The resource is [`Left(ResourceShared)`] if the cluster is already in use,
/// or [`Right(ResourceExclusive)`] otherwise. Typically one would drop the
/// exclusive hold down to shared as soon as possible, but the option is there
/// to do maintenance, for example, that requires an exclusive hold.
pub fn startup<'res>(
    mut resource: ResourceFree<'res>,
    options: super::Options<'_>,
) -> Result<(State, StartupResource<'res>), Error> {
    loop {
        resource = match resource.try_exclusive() {
            Ok(Left(resource)) => {
                // The resource is locked exclusively by someone/something else.
                // Switch to a shared lock optimistically. This blocks until we
                // get the shared lock.
                let resource = resource.shared()?;
                // The resource may have been started while that exclusive lock
                // was held, so we must check if the resource is running now –
                // otherwise we loop back to the top again.
                if resource.facet().running()? {
                    return Ok((State::Unmodified, Left(resource)));
                }
                // Release all locks then sleep for a random time between 200ms
                // and 1000ms in an attempt to make sure that when there are
                // many competing processes one of them rapidly acquires an
                // exclusive lock and is able to create and start the resource.
                let resource = resource.release()?;
                let delay = rand::thread_rng().next_u32();
                let delay = 200 + (delay % 800);
                let delay = Duration::from_millis(u64::from(delay));
                std::thread::sleep(delay);
                resource
            }
            Ok(Right(resource)) => {
                // We have an exclusive lock, so try to start the resource.
                let state = resource.facet().start(options)?;
                return Ok((state, Right(resource)));
            }
            Err(err) => return Err(err),
        };
    }
}

/// Similar to [`startup`] but does not create the cluster, and thus only
/// succeeds if the cluster already exists.
pub fn startup_if_exists<'res>(
    mut resource: ResourceFree<'res>,
    options: super::Options<'_>,
) -> Result<(State, StartupResource<'res>), Error> {
    loop {
        resource = match resource.try_exclusive() {
            Ok(Left(resource)) => {
                // The resource is locked exclusively by someone/something else.
                // Switch to a shared lock optimistically. This blocks until we
                // get the shared lock.
                let resource = resource.shared()?;
                // The resource may have been started while that exclusive lock
                // was held, so we must check if the resource is running now –
                // otherwise we loop back to the top again.
                if resource.facet().running()? {
                    return Ok((State::Unmodified, Left(resource)));
                }
                // Release all locks then sleep for a random time between 200ms
                // and 1000ms in an attempt to make sure that when there are
                // many competing processes one of them rapidly acquires an
                // exclusive lock and is able to create and start the resource.
                let resource = resource.release()?;
                let delay = rand::thread_rng().next_u32();
                let delay = 200 + (delay % 800);
                let delay = Duration::from_millis(u64::from(delay));
                std::thread::sleep(delay);
                resource
            }
            Ok(Right(resource)) => {
                // We have an exclusive lock, so try to start the resource.
                let facet = resource.facet();
                let state = if facet.exists()? {
                    facet.start(options)?
                } else {
                    return Err(CoordinateError::DoesNotExist);
                };
                return Ok((state, Right(resource)));
            }
            Err(err) => return Err(err),
        };
    }
}

/// Shuts down the cluster if it is running and if there are no other concurrent
/// users.
///
/// The return value has two parts: the state, [`State`], and the resource.
///
/// The state is [`State::Unmodified`] if the cluster could not be shut down or
/// if it was already shut down, else [`State::Modified`].
///
/// The resource is [`Left(ResourceShared)`] if the cluster is already in use –
/// i.e. the resource passed in is returned – else [`Right(ResourceExclusive)`]
/// otherwise.
pub fn shutdown(
    resource: ResourceShared,
) -> Result<(State, Either<ResourceShared, ResourceExclusive>), Error> {
    match resource.try_exclusive() {
        Ok(Left(resource)) => {
            // The resource is in use by someone/something else. There's nothing
            // more we can do here.
            Ok((State::Unmodified, Left(resource)))
        }
        Ok(Right(resource)) => {
            // We have an exclusive lock, so we can mutate the resource.
            match resource.facet().stop() {
                Ok(state) => Ok((state, Right(resource))),
                Err(err) => {
                    resource.release()?;
                    Err(err)?
                }
            }
        }
        Err(err) => Err(err),
    }
}

/// Similar to [`shutdown`] but also attempts to destroy the cluster, i.e.
/// remove it entirely from the filesystem.
pub fn destroy(
    resource: ResourceShared,
) -> Result<(State, Either<ResourceShared, ResourceExclusive>), Error> {
    match resource.try_exclusive() {
        Ok(Left(resource)) => {
            // The resource is in use by someone/something else. There's nothing
            // more we can do here.
            Ok((State::Unmodified, Left(resource)))
        }
        Ok(Right(resource)) => {
            // We have an exclusive lock, so we can mutate the resource.
            match resource.facet().destroy() {
                Ok(state) => Ok((state, Right(resource))),
                Err(err) => {
                    resource.release()?;
                    Err(err)?
                }
            }
        }
        Err(err) => Err(err),
    }
}