1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
//! A [resource][`crate::coordinate::resource`] for a [`Cluster`].
use std::time::Duration;
use std::{ffi::OsStr, process::ExitStatus};
use either::{Either, Left, Right};
use rand::RngCore;
use super::{
coordinate::{resource, CoordinateError, State},
exists, Cluster, ClusterError,
};
// ----------------------------------------------------------------------------
pub type ResourceFree<'a> = resource::ResourceFree<'a, Cluster>;
pub type ResourceShared<'a> = resource::ResourceShared<'a, Cluster>;
pub type ResourceExclusive<'a> = resource::ResourceExclusive<'a, Cluster>;
// ----------------------------------------------------------------------------
pub type Error = CoordinateError<ClusterError>;
impl From<ClusterError> for Error {
fn from(err: ClusterError) -> Self {
Self::ControlError(err)
}
}
// ----------------------------------------------------------------------------
impl<'a> resource::Faceted<'a> for Cluster {
type FacetFree = ClusterFree<'a>;
type FacetShared = ClusterShared<'a>;
type FacetExclusive = ClusterExclusive<'a>;
fn facet_free(&'a self) -> Self::FacetFree {
ClusterFree { cluster: self }
}
fn facet_shared(&'a self) -> Self::FacetShared {
ClusterShared { cluster: self }
}
fn facet_exclusive(&'a self) -> Self::FacetExclusive {
ClusterExclusive { cluster: self }
}
}
// ----------------------------------------------------------------------------
pub struct ClusterFree<'a> {
cluster: &'a Cluster,
}
/// When the cluster is not locked, all one can do is check for its existence
/// and if it is running. However, be careful of TOCTOU errors if you're using
/// this for more than informational purposes.
///
/// [TOCTOU]: https://en.wikipedia.org/wiki/Time-of-check_to_time-of-use
impl<'a> ClusterFree<'a> {
pub fn exists(&self) -> Result<bool, ClusterError> {
Ok(exists(self.cluster))
}
pub fn running(&self) -> Result<bool, ClusterError> {
self.cluster.running()
}
}
// ----------------------------------------------------------------------------
pub struct ClusterShared<'a> {
cluster: &'a Cluster,
}
/// When the cluster is shared, one can connect to the cluster, and execute
/// processes. It is possible to abuse this and shutdown the cluster, for
/// example, but that's on you; there's only so much that this library can do to
/// prevent misuse.
impl<'a> ClusterShared<'a> {
pub fn exists(&self) -> Result<bool, ClusterError> {
Ok(exists(self.cluster))
}
pub fn running(&self) -> Result<bool, ClusterError> {
self.cluster.running()
}
/// Forwards to [`Cluster::pool`].
pub fn pool(&self, database: Option<&str>) -> Result<sqlx::PgPool, ClusterError> {
self.cluster.pool(database)
}
/// Forwards to [`Cluster::exec`].
pub fn exec<T: AsRef<OsStr>>(
&self,
database: Option<&str>,
command: T,
args: &[T],
) -> Result<ExitStatus, ClusterError> {
self.cluster.exec(database, command, args)
}
}
// ----------------------------------------------------------------------------
pub struct ClusterExclusive<'a> {
cluster: &'a Cluster,
}
/// When you have exclusive control of a cluster, you can start, stop, destroy,
/// reconfigure it – anything.
impl<'a> ClusterExclusive<'a> {
pub fn start(&self, options: super::Options<'_>) -> Result<State, ClusterError> {
self.cluster.start(options)
}
pub fn stop(&self) -> Result<State, ClusterError> {
self.cluster.stop()
}
pub fn destroy(&self) -> Result<State, ClusterError> {
self.cluster.destroy()
}
pub fn exists(&self) -> Result<bool, ClusterError> {
Ok(exists(self.cluster))
}
pub fn running(&self) -> Result<bool, ClusterError> {
self.cluster.running()
}
/// Forwards to [`Cluster::pool`].
pub fn pool(&self, database: Option<&str>) -> Result<sqlx::PgPool, ClusterError> {
self.cluster.pool(database)
}
/// Forwards to [`Cluster::exec`].
pub fn exec<T: AsRef<OsStr>>(
&self,
database: Option<&str>,
command: T,
args: &[T],
) -> Result<ExitStatus, ClusterError> {
self.cluster.exec(database, command, args)
}
}
// ----------------------------------------------------------------------------
pub type StartupResource<'a> = Either<ResourceShared<'a>, ResourceExclusive<'a>>;
/// Creates the cluster, if it doesn't already exist, and starts it in a
/// cooperative manner.
///
/// The return value has two parts: the state, [`State`], and the resource,
/// [`StartupResource`].
///
/// The state is [`State::Unmodified`] if the cluster was already running, else
/// [`State::Modified`] if the cluster was created or started by this function.
///
/// The resource is [`Left(ResourceShared)`] if the cluster is already in use,
/// or [`Right(ResourceExclusive)`] otherwise. Typically one would drop the
/// exclusive hold down to shared as soon as possible, but the option is there
/// to do maintenance, for example, that requires an exclusive hold.
pub fn startup<'res>(
mut resource: ResourceFree<'res>,
options: super::Options<'_>,
) -> Result<(State, StartupResource<'res>), Error> {
loop {
resource = match resource.try_exclusive() {
Ok(Left(resource)) => {
// The resource is locked exclusively by someone/something else.
// Switch to a shared lock optimistically. This blocks until we
// get the shared lock.
let resource = resource.shared()?;
// The resource may have been started while that exclusive lock
// was held, so we must check if the resource is running now –
// otherwise we loop back to the top again.
if resource.facet().running()? {
return Ok((State::Unmodified, Left(resource)));
}
// Release all locks then sleep for a random time between 200ms
// and 1000ms in an attempt to make sure that when there are
// many competing processes one of them rapidly acquires an
// exclusive lock and is able to create and start the resource.
let resource = resource.release()?;
let delay = rand::thread_rng().next_u32();
let delay = 200 + (delay % 800);
let delay = Duration::from_millis(u64::from(delay));
std::thread::sleep(delay);
resource
}
Ok(Right(resource)) => {
// We have an exclusive lock, so try to start the resource.
let state = resource.facet().start(options)?;
return Ok((state, Right(resource)));
}
Err(err) => return Err(err),
};
}
}
/// Similar to [`startup`] but does not create the cluster, and thus only
/// succeeds if the cluster already exists.
pub fn startup_if_exists<'res>(
mut resource: ResourceFree<'res>,
options: super::Options<'_>,
) -> Result<(State, StartupResource<'res>), Error> {
loop {
resource = match resource.try_exclusive() {
Ok(Left(resource)) => {
// The resource is locked exclusively by someone/something else.
// Switch to a shared lock optimistically. This blocks until we
// get the shared lock.
let resource = resource.shared()?;
// The resource may have been started while that exclusive lock
// was held, so we must check if the resource is running now –
// otherwise we loop back to the top again.
if resource.facet().running()? {
return Ok((State::Unmodified, Left(resource)));
}
// Release all locks then sleep for a random time between 200ms
// and 1000ms in an attempt to make sure that when there are
// many competing processes one of them rapidly acquires an
// exclusive lock and is able to create and start the resource.
let resource = resource.release()?;
let delay = rand::thread_rng().next_u32();
let delay = 200 + (delay % 800);
let delay = Duration::from_millis(u64::from(delay));
std::thread::sleep(delay);
resource
}
Ok(Right(resource)) => {
// We have an exclusive lock, so try to start the resource.
let facet = resource.facet();
let state = if facet.exists()? {
facet.start(options)?
} else {
return Err(CoordinateError::DoesNotExist);
};
return Ok((state, Right(resource)));
}
Err(err) => return Err(err),
};
}
}
/// Shuts down the cluster if it is running and if there are no other concurrent
/// users.
///
/// The return value has two parts: the state, [`State`], and the resource.
///
/// The state is [`State::Unmodified`] if the cluster could not be shut down or
/// if it was already shut down, else [`State::Modified`].
///
/// The resource is [`Left(ResourceShared)`] if the cluster is already in use –
/// i.e. the resource passed in is returned – else [`Right(ResourceExclusive)`]
/// otherwise.
pub fn shutdown(
resource: ResourceShared,
) -> Result<(State, Either<ResourceShared, ResourceExclusive>), Error> {
match resource.try_exclusive() {
Ok(Left(resource)) => {
// The resource is in use by someone/something else. There's nothing
// more we can do here.
Ok((State::Unmodified, Left(resource)))
}
Ok(Right(resource)) => {
// We have an exclusive lock, so we can mutate the resource.
match resource.facet().stop() {
Ok(state) => Ok((state, Right(resource))),
Err(err) => {
resource.release()?;
Err(err)?
}
}
}
Err(err) => Err(err),
}
}
/// Similar to [`shutdown`] but also attempts to destroy the cluster, i.e.
/// remove it entirely from the filesystem.
pub fn destroy(
resource: ResourceShared,
) -> Result<(State, Either<ResourceShared, ResourceExclusive>), Error> {
match resource.try_exclusive() {
Ok(Left(resource)) => {
// The resource is in use by someone/something else. There's nothing
// more we can do here.
Ok((State::Unmodified, Left(resource)))
}
Ok(Right(resource)) => {
// We have an exclusive lock, so we can mutate the resource.
match resource.facet().destroy() {
Ok(state) => Ok((state, Right(resource))),
Err(err) => {
resource.release()?;
Err(err)?
}
}
}
Err(err) => Err(err),
}
}