use std::fs;
use std::io;
use std::os::unix::prelude::OsStrExt;
use std::path::{Path, PathBuf};
use std::process::ExitCode;
use std::process::ExitStatus;
use miette::{bail, IntoDiagnostic, Result, WrapErr};
use crate::{args, ExitResult};
use pgdo::{
cluster, coordinate, lock,
runtime::{
self,
constraint::Constraint,
strategy::{Strategy, StrategyLike},
},
};
pub(crate) fn check_exit(status: ExitStatus) -> ExitResult {
match status.code() {
None => bail!("Command terminated: {status}"),
Some(code) => Ok(u8::try_from(code)
.map(ExitCode::from)
.unwrap_or(ExitCode::FAILURE)),
}
}
#[derive(thiserror::Error, miette::Diagnostic, Debug)]
pub(crate) enum StrategyError {
#[error("No runtime matches constraint {0:?}")]
#[diagnostic(help("Use `runtimes` to see available runtimes"))]
ConstraintNotSatisfied(runtime::constraint::Constraint),
}
pub(crate) fn determine_strategy(fallback: Option<Constraint>) -> Result<Strategy, StrategyError> {
let strategy = runtime::strategy::Strategy::default();
let fallback: Option<_> = match fallback {
Some(constraint) => match strategy.select(&constraint) {
Some(runtime) => Some(runtime),
None => return Err(StrategyError::ConstraintNotSatisfied(constraint)),
},
None => None,
};
let strategy = match fallback {
Some(fallback) => strategy.push_front(fallback),
None => strategy,
};
Ok(strategy)
}
pub(crate) fn ensure_database(cluster: &cluster::Cluster, database_name: &str) -> Result<()> {
cluster
.createdb(database_name)
.wrap_err_with(|| "Could not create database")
.wrap_err_with(|| format!("Database: {database_name}"))?;
Ok(())
}
const UUID_NS: uuid::Uuid = uuid::Uuid::from_u128(93875103436633470414348750305797058811);
#[derive(thiserror::Error, miette::Diagnostic, Debug)]
pub(crate) enum LockForError {
#[error("Could not canonicalize cluster directory ({1})")]
ClusterDirectoryError(#[source] std::io::Error, PathBuf),
#[error("Could not create UUID-based lock file (uuid = {1})")]
UuidLockError(#[source] std::io::Error, uuid::Uuid),
}
pub(crate) fn lock_for<P: AsRef<Path>>(
path: P,
) -> Result<(PathBuf, lock::UnlockedFile), LockForError> {
let path = path.as_ref();
let path = path
.canonicalize()
.map_err(|err| LockForError::ClusterDirectoryError(err, path.into()))?;
let name = path.as_os_str().as_bytes();
let lock_uuid = uuid::Uuid::new_v5(&UUID_NS, name);
let lock = lock::UnlockedFile::try_from(&lock_uuid)
.map_err(|err| LockForError::UuidLockError(err, lock_uuid))?;
Ok((path, lock))
}
#[allow(clippy::enum_variant_names)]
pub(crate) enum Runner {
RunAndStop,
RunAndStopIfExists,
RunAndDestroy,
}
pub(crate) fn run<ACTION>(
runner: Runner,
args::ClusterArgs { dir: cluster_dir }: args::ClusterArgs,
args::ClusterModeArgs { mode: cluster_mode }: args::ClusterModeArgs,
args::RuntimeArgs { fallback }: args::RuntimeArgs,
action: ACTION,
) -> ExitResult
where
ACTION: FnOnce(&cluster::Cluster) -> ExitResult + std::panic::UnwindSafe,
{
match runner {
Runner::RunAndStop | Runner::RunAndDestroy => {
match fs::create_dir(&cluster_dir) {
Err(err) if err.kind() == io::ErrorKind::AlreadyExists => (),
err @ Err(_) => err
.into_diagnostic()
.wrap_err_with(|| "Could not create cluster directory")
.wrap_err_with(|| format!("Cluster directory: {}", cluster_dir.display()))?,
_ => (),
}
}
Runner::RunAndStopIfExists => {
}
};
let (datadir, lock) = lock_for(&cluster_dir)?;
let strategy = determine_strategy(fallback)?;
let cluster = cluster::Cluster::new(datadir, strategy)?;
let act = || {
if let Some(cluster_mode) = cluster_mode {
let rt = tokio::runtime::Runtime::new().into_diagnostic()?;
rt.block_on(set_cluster_mode(cluster_mode, &cluster))?;
}
ctrlc::set_handler(|| ())
.into_diagnostic()
.context("Could not set signal handler")?;
action(&cluster)
};
use coordinate::{run_and_destroy, run_and_stop, run_and_stop_if_exists};
match runner {
Runner::RunAndStop => run_and_stop(&cluster, &[], lock, act),
Runner::RunAndStopIfExists => run_and_stop_if_exists(&cluster, &[], lock, act),
Runner::RunAndDestroy => run_and_destroy(&cluster, &[], lock, act),
}?
}
async fn set_cluster_mode(
mode: args::ClusterMode,
cluster: &cluster::Cluster,
) -> Result<(), cluster::ClusterError> {
use pgdo::cluster::config::{self, Parameter};
static FSYNC: Parameter = Parameter("fsync");
static FULL_PAGE_WRITES: Parameter = Parameter("full_page_writes");
static SYNCHRONOUS_COMMIT: Parameter = Parameter("synchronous_commit");
match mode {
args::ClusterMode::Fast => {
let pool = cluster.pool(None)?;
FSYNC.set(&pool, false).await?;
FULL_PAGE_WRITES.set(&pool, false).await?;
SYNCHRONOUS_COMMIT.set(&pool, false).await?;
config::reload(&pool).await?;
Ok(())
}
args::ClusterMode::Slow => {
let pool = cluster.pool(None)?;
FSYNC.reset(&pool).await?;
FULL_PAGE_WRITES.reset(&pool).await?;
SYNCHRONOUS_COMMIT.reset(&pool).await?;
config::reload(&pool).await?;
Ok(())
}
}
}