pgdo/cluster/resource.rs
1//! A [resource][`crate::coordinate::resource`] for a [`Cluster`].
2
3use std::time::Duration;
4use std::{ffi::OsStr, process::ExitStatus};
5
6use either::{Either, Left, Right};
7use rand::RngCore;
8
9use super::{
10 coordinate::{resource, CoordinateError, State},
11 exists, Cluster, ClusterError,
12};
13
14// ----------------------------------------------------------------------------
15
16pub type ResourceFree = resource::ResourceFree<Cluster>;
17pub type ResourceShared = resource::ResourceShared<Cluster>;
18pub type ResourceExclusive = resource::ResourceExclusive<Cluster>;
19
20// ----------------------------------------------------------------------------
21
22pub type Error = CoordinateError<ClusterError>;
23
24impl From<ClusterError> for Error {
25 fn from(err: ClusterError) -> Self {
26 Self::ControlError(err)
27 }
28}
29
30// ----------------------------------------------------------------------------
31
32impl resource::FacetFree for Cluster {
33 type FacetFree<'a> = ClusterFree<'a>;
34
35 fn facet_free(&self) -> Self::FacetFree<'_> {
36 ClusterFree { cluster: self }
37 }
38}
39
40impl resource::FacetShared for Cluster {
41 type FacetShared<'a> = ClusterShared<'a>;
42
43 fn facet_shared(&self) -> Self::FacetShared<'_> {
44 ClusterShared { cluster: self }
45 }
46}
47
48impl resource::FacetExclusive for Cluster {
49 type FacetExclusive<'a> = ClusterExclusive<'a>;
50
51 fn facet_exclusive(&self) -> Self::FacetExclusive<'_> {
52 ClusterExclusive { cluster: self }
53 }
54}
55
56// ----------------------------------------------------------------------------
57
58pub struct ClusterFree<'a> {
59 cluster: &'a Cluster,
60}
61
62/// When the cluster is not locked, all one can do is check for its existence
63/// and if it is running. However, be careful of TOCTOU errors if you're using
64/// this for more than informational purposes.
65///
66/// [TOCTOU]: https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
67impl ClusterFree<'_> {
68 pub fn exists(&self) -> Result<bool, ClusterError> {
69 Ok(exists(self.cluster))
70 }
71
72 pub fn running(&self) -> Result<bool, ClusterError> {
73 self.cluster.running()
74 }
75}
76
77// ----------------------------------------------------------------------------
78
79pub struct ClusterShared<'a> {
80 cluster: &'a Cluster,
81}
82
83/// When the cluster is shared, one can connect to the cluster, and execute
84/// processes. It is possible to abuse this and shutdown the cluster, for
85/// example, but that's on you; there's only so much that this library can do to
86/// prevent misuse.
87impl ClusterShared<'_> {
88 pub fn exists(&self) -> Result<bool, ClusterError> {
89 Ok(exists(self.cluster))
90 }
91
92 pub fn running(&self) -> Result<bool, ClusterError> {
93 self.cluster.running()
94 }
95
96 /// Forwards to [`Cluster::pool`].
97 pub fn pool(&self, database: Option<&str>) -> Result<sqlx::PgPool, ClusterError> {
98 self.cluster.pool(database)
99 }
100
101 /// Forwards to [`Cluster::exec`].
102 pub fn exec<T: AsRef<OsStr>>(
103 &self,
104 database: Option<&str>,
105 command: T,
106 args: &[T],
107 ) -> Result<ExitStatus, ClusterError> {
108 self.cluster.exec(database, command, args)
109 }
110}
111
112// ----------------------------------------------------------------------------
113
114pub struct ClusterExclusive<'a> {
115 cluster: &'a Cluster,
116}
117
118/// When you have exclusive control of a cluster, you can start, stop, destroy,
119/// reconfigure it – anything.
120impl ClusterExclusive<'_> {
121 pub fn start(&self, options: super::Options<'_>) -> Result<State, ClusterError> {
122 self.cluster.start(options)
123 }
124
125 pub fn stop(&self) -> Result<State, ClusterError> {
126 self.cluster.stop()
127 }
128
129 pub fn destroy(&self) -> Result<State, ClusterError> {
130 self.cluster.destroy()
131 }
132
133 pub fn exists(&self) -> Result<bool, ClusterError> {
134 Ok(exists(self.cluster))
135 }
136
137 pub fn running(&self) -> Result<bool, ClusterError> {
138 self.cluster.running()
139 }
140
141 /// Forwards to [`Cluster::pool`].
142 pub fn pool(&self, database: Option<&str>) -> Result<sqlx::PgPool, ClusterError> {
143 self.cluster.pool(database)
144 }
145
146 /// Forwards to [`Cluster::exec`].
147 pub fn exec<T: AsRef<OsStr>>(
148 &self,
149 database: Option<&str>,
150 command: T,
151 args: &[T],
152 ) -> Result<ExitStatus, ClusterError> {
153 self.cluster.exec(database, command, args)
154 }
155}
156
157// ----------------------------------------------------------------------------
158
159/// A [`ResourceShared`] or a [`ResourceExclusive`].
160pub type HeldResource = Either<ResourceShared, ResourceExclusive>;
161
162/// Creates the cluster, if it doesn't already exist, and starts it in a
163/// cooperative manner.
164///
165/// The return value has two parts: the state, [`State`], and the resource,
166/// [`HeldResource`].
167///
168/// The state is [`State::Unmodified`] if the cluster was already running, else
169/// [`State::Modified`] if the cluster was created or started by this function.
170///
171/// The resource is [`Left(ResourceShared)`] if the cluster is already in use,
172/// or [`Right(ResourceExclusive)`] otherwise. Typically one would drop the
173/// exclusive hold down to shared as soon as possible, but the option is there
174/// to do maintenance, for example, that requires an exclusive hold.
175pub fn startup(
176 mut resource: ResourceFree,
177 options: super::Options<'_>,
178) -> Result<(State, HeldResource), Error> {
179 loop {
180 resource = match resource.try_exclusive() {
181 Ok(Left(resource)) => {
182 // The resource is locked exclusively by someone/something else.
183 // Switch to a shared lock optimistically. This blocks until we
184 // get the shared lock.
185 let resource = resource.shared()?;
186 // The resource may have been started while that exclusive lock
187 // was held, so we must check if the resource is running now –
188 // otherwise we loop back to the top again.
189 if resource.facet().running()? {
190 return Ok((State::Unmodified, Left(resource)));
191 }
192 // Release all locks then sleep for a random time between 200ms
193 // and 1000ms in an attempt to make sure that when there are
194 // many competing processes one of them rapidly acquires an
195 // exclusive lock and is able to create and start the resource.
196 let resource = resource.release()?;
197 let delay = rand::rng().next_u32();
198 let delay = 200 + (delay % 800);
199 let delay = Duration::from_millis(u64::from(delay));
200 std::thread::sleep(delay);
201 resource
202 }
203 Ok(Right(resource)) => {
204 // We have an exclusive lock, so try to start the resource.
205 let state = resource.facet().start(options)?;
206 return Ok((state, Right(resource)));
207 }
208 Err(err) => return Err(err),
209 };
210 }
211}
212
213/// Similar to [`startup`] but does not create the cluster, and thus only
214/// succeeds if the cluster already exists.
215pub fn startup_if_exists(
216 mut resource: ResourceFree,
217 options: super::Options<'_>,
218) -> Result<(State, HeldResource), Error> {
219 loop {
220 resource = match resource.try_exclusive() {
221 Ok(Left(resource)) => {
222 // The resource is locked exclusively by someone/something else.
223 // Switch to a shared lock optimistically. This blocks until we
224 // get the shared lock.
225 let resource = resource.shared()?;
226 // The resource may have been started while that exclusive lock
227 // was held, so we must check if the resource is running now –
228 // otherwise we loop back to the top again.
229 if resource.facet().running()? {
230 return Ok((State::Unmodified, Left(resource)));
231 }
232 // Release all locks then sleep for a random time between 200ms
233 // and 1000ms in an attempt to make sure that when there are
234 // many competing processes one of them rapidly acquires an
235 // exclusive lock and is able to create and start the resource.
236 let resource = resource.release()?;
237 let delay = rand::rng().next_u32();
238 let delay = 200 + (delay % 800);
239 let delay = Duration::from_millis(u64::from(delay));
240 std::thread::sleep(delay);
241 resource
242 }
243 Ok(Right(resource)) => {
244 // We have an exclusive lock, so try to start the resource.
245 let facet = resource.facet();
246 let state = if facet.exists()? {
247 facet.start(options)?
248 } else {
249 return Err(CoordinateError::DoesNotExist);
250 };
251 return Ok((state, Right(resource)));
252 }
253 Err(err) => return Err(err),
254 };
255 }
256}
257
258/// Shuts down the cluster if it is running and if there are no other concurrent
259/// users.
260///
261/// The return value has two parts: the state, [`State`], and the resource.
262///
263/// The state is [`State::Unmodified`] if the cluster could not be shut down or
264/// if it was already shut down, else [`State::Modified`].
265///
266/// The resource is [`Left(ResourceShared)`] if the cluster is already in use –
267/// i.e. the resource passed in is returned – else [`Right(ResourceExclusive)`]
268/// otherwise.
269pub fn shutdown(resource: ResourceShared) -> Result<(State, HeldResource), Error> {
270 match resource.try_exclusive() {
271 Ok(Left(resource)) => {
272 // The resource is in use by someone/something else. There's nothing
273 // more we can do here.
274 Ok((State::Unmodified, Left(resource)))
275 }
276 Ok(Right(resource)) => {
277 // We have an exclusive lock, so we can mutate the resource.
278 match resource.facet().stop() {
279 Ok(state) => Ok((state, Right(resource))),
280 Err(err) => {
281 resource.release()?;
282 Err(err)?
283 }
284 }
285 }
286 Err(err) => Err(err),
287 }
288}
289
290/// Similar to [`shutdown`] but also attempts to destroy the cluster, i.e.
291/// remove it entirely from the filesystem.
292pub fn destroy(resource: ResourceShared) -> Result<(State, HeldResource), Error> {
293 match resource.try_exclusive() {
294 Ok(Left(resource)) => {
295 // The resource is in use by someone/something else. There's nothing
296 // more we can do here.
297 Ok((State::Unmodified, Left(resource)))
298 }
299 Ok(Right(resource)) => {
300 // We have an exclusive lock, so we can mutate the resource.
301 match resource.facet().destroy() {
302 Ok(state) => Ok((state, Right(resource))),
303 Err(err) => {
304 resource.release()?;
305 Err(err)?
306 }
307 }
308 }
309 Err(err) => Err(err),
310 }
311}