mod error;
#[cfg(test)]
mod tests;
use std::ffi::{OsStr, OsString};
use std::os::unix::prelude::OsStringExt;
use std::path::{Path, PathBuf};
use std::process::{Command, ExitStatus};
use std::{env, fs, io};
use nix::errno::Errno;
use shell_quote::sh::escape_into;
use crate::runtime;
use crate::version;
pub use error::ClusterError;
pub struct Cluster {
datadir: PathBuf,
strategy: Box<dyn runtime::strategy::RuntimeStrategy>,
}
impl Cluster {
pub fn new<P: AsRef<Path>, S: runtime::strategy::RuntimeStrategy>(
datadir: P,
strategy: S,
) -> Result<Self, ClusterError> {
Ok(Self {
datadir: datadir.as_ref().to_owned(),
strategy: Box::new(strategy),
})
}
fn runtime(&self) -> Result<runtime::Runtime, ClusterError> {
match version(self)? {
None => self
.strategy
.fallback()
.ok_or_else(|| ClusterError::RuntimeDefaultNotFound),
Some(version) => self
.strategy
.select(&version)
.ok_or_else(|| ClusterError::RuntimeNotFound(version)),
}
}
fn ctl(&self) -> Result<Command, ClusterError> {
let mut command = self.runtime()?.execute("pg_ctl");
command.env("PGDATA", &self.datadir);
command.env("PGHOST", &self.datadir);
Ok(command)
}
pub fn running(&self) -> Result<bool, ClusterError> {
let output = self.ctl()?.arg("status").output()?;
let code = match output.status.code() {
None => return Err(ClusterError::Other(output)),
Some(0) => return Ok(true),
Some(code) => code,
};
let runtime = self.runtime()?;
let running = match runtime.version {
version::Version::Post10(_major, _minor) => {
match code {
3 => Some(false),
4 if !exists(self) => Some(false),
_ => None,
}
}
version::Version::Pre10(9, point, _minor) => {
if point >= 4 {
match code {
3 => Some(false),
4 if !exists(self) => Some(false),
_ => None,
}
}
else if point >= 2 {
match code {
3 => Some(false),
_ => None,
}
}
else {
match code {
1 => Some(false),
_ => None,
}
}
}
version::Version::Pre10(_major, _point, _minor) => None,
};
match running {
Some(running) => Ok(running),
None => Err(ClusterError::UnsupportedVersion(runtime.version)),
}
}
pub fn pidfile(&self) -> PathBuf {
self.datadir.join("postmaster.pid")
}
pub fn logfile(&self) -> PathBuf {
self.datadir.join("postmaster.log")
}
pub fn create(&self) -> Result<bool, ClusterError> {
match self._create() {
Err(ClusterError::UnixError(Errno::EAGAIN)) if exists(self) => Ok(false),
Err(ClusterError::UnixError(Errno::EAGAIN)) => Err(ClusterError::InUse),
other => other,
}
}
fn _create(&self) -> Result<bool, ClusterError> {
if exists(self) {
Ok(false)
} else {
fs::create_dir_all(&self.datadir)?;
#[allow(clippy::suspicious_command_arg_space)]
self.ctl()?
.arg("init")
.arg("-s")
.arg("-o")
.arg("-E utf8 --locale C -A trust")
.env("TZ", "UTC")
.output()?;
Ok(true)
}
}
pub fn start(&self) -> Result<bool, ClusterError> {
match self._start() {
Err(ClusterError::UnixError(Errno::EAGAIN)) if self.running()? => Ok(false),
Err(ClusterError::UnixError(Errno::EAGAIN)) => Err(ClusterError::InUse),
other => other,
}
}
fn _start(&self) -> Result<bool, ClusterError> {
self._create()?;
if self.running()? {
return Ok(false);
}
self.ctl()?
.arg("start")
.arg("-l")
.arg(self.logfile())
.arg("-s")
.arg("-w")
.arg("-o")
.arg({
let mut arg = b"-h '' -k "[..].into();
escape_into(&self.datadir, &mut arg);
OsString::from_vec(arg)
})
.output()?;
Ok(true)
}
pub fn connect(&self, database: &str) -> Result<postgres::Client, ClusterError> {
let user = &env::var("USER").unwrap_or_else(|_| "USER-not-set".to_string());
let host = self.datadir.to_string_lossy(); let client = postgres::Client::configure()
.user(user)
.dbname(database)
.host(&host)
.connect(postgres::NoTls)?;
Ok(client)
}
pub fn shell(&self, database: &str) -> Result<ExitStatus, ClusterError> {
let mut command = self.runtime()?.execute("psql");
command.arg("--quiet");
command.env("PGDATA", &self.datadir);
command.env("PGHOST", &self.datadir);
command.env("PGDATABASE", database);
Ok(command.spawn()?.wait()?)
}
pub fn exec<T: AsRef<OsStr>>(
&self,
database: &str,
command: T,
args: &[T],
) -> Result<ExitStatus, ClusterError> {
let mut command = self.runtime()?.command(command);
command.args(args);
command.env("PGDATA", &self.datadir);
command.env("PGHOST", &self.datadir);
command.env("PGDATABASE", database);
Ok(command.spawn()?.wait()?)
}
pub fn databases(&self) -> Result<Vec<String>, ClusterError> {
let mut conn = self.connect("template1")?;
let rows = conn.query(
"SELECT datname FROM pg_catalog.pg_database ORDER BY datname",
&[],
)?;
let datnames: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
Ok(datnames)
}
pub fn createdb(&self, database: &str) -> Result<bool, ClusterError> {
let statement = format!(
"CREATE DATABASE {}",
postgres_protocol::escape::escape_identifier(database)
);
self.connect("template1")?
.execute(statement.as_str(), &[])?;
Ok(true)
}
pub fn dropdb(&self, database: &str) -> Result<bool, ClusterError> {
let statement = format!(
"DROP DATABASE {}",
postgres_protocol::escape::escape_identifier(database)
);
self.connect("template1")?
.execute(statement.as_str(), &[])?;
Ok(true)
}
pub fn stop(&self) -> Result<bool, ClusterError> {
match self._stop() {
Err(ClusterError::UnixError(Errno::EAGAIN)) if !self.running()? => Ok(false),
Err(ClusterError::UnixError(Errno::EAGAIN)) => Err(ClusterError::InUse),
other => other,
}
}
fn _stop(&self) -> Result<bool, ClusterError> {
if !self.running()? {
return Ok(false);
}
self.ctl()?
.arg("stop")
.arg("-s")
.arg("-w")
.arg("-m")
.arg("fast")
.output()?;
Ok(true)
}
pub fn destroy(&self) -> Result<bool, ClusterError> {
match self._destroy() {
Err(ClusterError::UnixError(Errno::EAGAIN)) => Err(ClusterError::InUse),
other => other,
}
}
fn _destroy(&self) -> Result<bool, ClusterError> {
if self._stop()? || self.datadir.is_dir() {
fs::remove_dir_all(&self.datadir)?;
Ok(true)
} else {
Ok(false)
}
}
}
impl AsRef<Path> for Cluster {
fn as_ref(&self) -> &Path {
&self.datadir
}
}
pub fn exists<P: AsRef<Path>>(datadir: P) -> bool {
let datadir = datadir.as_ref();
datadir.is_dir() && datadir.join("PG_VERSION").is_file()
}
pub fn version<P: AsRef<Path>>(
datadir: P,
) -> Result<Option<version::PartialVersion>, ClusterError> {
let version_file = datadir.as_ref().join("PG_VERSION");
match std::fs::read_to_string(version_file) {
Ok(version) => Ok(Some(version.parse()?)),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err)?,
}
}