use std::path::{Path, PathBuf};
use std::process::{Command, ExitStatus, Output};
use std::{env, error, fmt, fs, io};
use lock::LockDo;
use postgres;
use runtime;
use shell_escape::escape;
#[derive(Debug)]
pub enum ClusterError {
PathEncodingError, IoError(io::Error),
UnixError(nix::Error),
UnsupportedVersion(runtime::Version),
UnknownVersion(runtime::VersionError),
DatabaseError(postgres::error::Error),
Other(Output),
}
impl fmt::Display for ClusterError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use ClusterError::*;
match *self {
PathEncodingError => write!(fmt, "path is not UTF-8"),
IoError(ref e) => write!(fmt, "input/output error: {}", e),
UnixError(ref e) => write!(fmt, "UNIX error: {}", e),
UnsupportedVersion(ref e) => write!(fmt, "PostgreSQL version not supported: {}", e),
UnknownVersion(ref e) => write!(fmt, "PostgreSQL version not known: {}", e),
DatabaseError(ref e) => write!(fmt, "database error: {}", e),
Other(ref e) => write!(fmt, "external command failed: {:?}", e),
}
}
}
impl error::Error for ClusterError {
fn cause(&self) -> Option<&dyn error::Error> {
match *self {
ClusterError::PathEncodingError => None,
ClusterError::IoError(ref error) => Some(error),
ClusterError::UnixError(ref error) => Some(error),
ClusterError::UnsupportedVersion(_) => None,
ClusterError::UnknownVersion(ref error) => Some(error),
ClusterError::DatabaseError(ref error) => Some(error),
ClusterError::Other(_) => None,
}
}
}
impl From<io::Error> for ClusterError {
fn from(error: io::Error) -> ClusterError {
ClusterError::IoError(error)
}
}
impl From<nix::Error> for ClusterError {
fn from(error: nix::Error) -> ClusterError {
ClusterError::UnixError(error)
}
}
impl From<runtime::VersionError> for ClusterError {
fn from(error: runtime::VersionError) -> ClusterError {
ClusterError::UnknownVersion(error)
}
}
impl From<postgres::error::Error> for ClusterError {
fn from(error: postgres::error::Error) -> ClusterError {
ClusterError::DatabaseError(error)
}
}
pub struct Cluster {
datadir: PathBuf,
lockfile: PathBuf,
runtime: runtime::Runtime,
}
impl Cluster {
pub fn new<P: AsRef<Path>>(datadir: P, runtime: runtime::Runtime) -> Self {
let datadir = datadir.as_ref();
Self {
datadir: datadir.to_path_buf(),
lockfile: datadir
.parent()
.unwrap_or(datadir)
.join(".cluster.lock")
.to_path_buf(),
runtime: runtime,
}
}
fn ctl(&self) -> Command {
let mut command = self.runtime.execute("pg_ctl");
command.env("PGDATA", &self.datadir);
command.env("PGHOST", &self.datadir);
command
}
pub fn exists(&self) -> bool {
self.datadir.is_dir() && self.datadir.join("PG_VERSION").is_file()
}
pub fn running(&self) -> Result<bool, ClusterError> {
let output = self.ctl().arg("status").output()?;
let code = match output.status.code() {
None => return Err(ClusterError::Other(output)),
Some(code) if code == 0 => return Ok(true),
Some(code) => code,
};
let version = self.runtime.version()?;
let running = match (version.major >= 10, version.major) {
(true, _) => {
match code {
3 => Some(false),
4 if !self.exists() => Some(false),
_ => None,
}
}
(false, 9) => {
if version.minor >= 4 {
match code {
3 => Some(false),
4 if !self.exists() => Some(false),
_ => None,
}
}
else if version.minor >= 2 {
match code {
3 => Some(false),
_ => None,
}
}
else {
match code {
1 => Some(false),
_ => None,
}
}
}
(_, _) => None,
};
match running {
Some(running) => Ok(running),
None => Err(ClusterError::UnsupportedVersion(version)),
}
}
pub fn pidfile(&self) -> PathBuf {
self.datadir.join("postmaster.pid")
}
pub fn logfile(&self) -> PathBuf {
self.datadir.join("backend.log")
}
fn lock(&self) -> io::Result<fs::File> {
fs::OpenOptions::new()
.append(true)
.create(true)
.open(&self.lockfile)
}
pub fn create(&self) -> Result<bool, ClusterError> {
self.lock()?.do_exclusive(|| self._create())?
}
fn _create(&self) -> Result<bool, ClusterError> {
match self.datadir.join("PG_VERSION").is_file() {
true => Ok(false),
false => {
fs::create_dir_all(&self.datadir)?;
self.ctl()
.arg("init")
.arg("-s")
.arg("-o")
.arg("-E utf8 -A trust")
.output()?;
Ok(true)
}
}
}
pub fn start(&self) -> Result<bool, ClusterError> {
self.lock()?.do_exclusive(|| self._start())?
}
fn _start(&self) -> Result<bool, ClusterError> {
if self.running()? {
return Ok(false);
}
self._create()?;
let datadir = self
.datadir
.as_path()
.as_os_str()
.to_str()
.ok_or(ClusterError::PathEncodingError)?;
self.ctl()
.arg("start")
.arg("-l")
.arg(self.logfile())
.arg("-s")
.arg("-w")
.arg("-o")
.arg(format!("-h '' -F -k {}", escape(datadir.into())))
.output()?;
Ok(true)
}
pub fn connect(&self, database: &str) -> Result<postgres::Client, ClusterError> {
let user = &env::var("USER").unwrap_or("USER-not-set".to_string());
let host = self.datadir.to_string_lossy(); let client = postgres::Client::configure()
.user(user)
.dbname(database)
.host(&host)
.connect(postgres::NoTls)?;
Ok(client)
}
pub fn shell(&self, database: &str) -> Result<ExitStatus, ClusterError> {
let mut command = self.runtime.execute("psql");
command.arg("--quiet").arg("--").arg(database);
command.env("PGDATA", &self.datadir);
command.env("PGHOST", &self.datadir);
Ok(command.spawn()?.wait()?)
}
pub fn databases(&self) -> Result<Vec<String>, ClusterError> {
let mut conn = self.connect("template1")?;
let rows = conn.query("SELECT datname FROM pg_catalog.pg_database", &[])?;
let datnames: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
Ok(datnames)
}
pub fn createdb(&self, database: &str) -> Result<bool, ClusterError> {
let statement = format!("CREATE DATABASE {}", &database);
self.connect("template1")?
.execute(statement.as_str(), &[])?;
Ok(true)
}
pub fn dropdb(&self, database: &str) -> Result<bool, ClusterError> {
let statement = format!("DROP DATABASE {}", &database);
self.connect("template1")?
.execute(statement.as_str(), &[])?;
Ok(true)
}
pub fn stop(&self) -> Result<bool, ClusterError> {
self.lock()?.do_exclusive(|| self._stop())?
}
fn _stop(&self) -> Result<bool, ClusterError> {
if !self.running()? {
return Ok(false);
}
self.ctl()
.arg("stop")
.arg("-s")
.arg("-w")
.arg("-m")
.arg("fast")
.output()?;
Ok(true)
}
pub fn destroy(&self) -> Result<bool, ClusterError> {
self.lock()?.do_exclusive(|| self._destroy())?
}
fn _destroy(&self) -> Result<bool, ClusterError> {
if self._stop()? || self.datadir.is_dir() {
fs::remove_dir_all(&self.datadir)?;
Ok(true)
} else {
Ok(false)
}
}
}
#[cfg(test)]
mod tests {
extern crate tempdir;
use super::Cluster;
use runtime::Runtime;
use std::collections::HashSet;
use std::fs::File;
use std::path::{Path, PathBuf};
#[test]
fn cluster_new() {
let runtime = Runtime { bindir: None };
let cluster = Cluster::new("some/path", runtime);
assert_eq!(Path::new("some/path"), cluster.datadir);
assert_eq!(false, cluster.running().unwrap());
}
#[test]
fn cluster_does_not_exist() {
let runtime = Runtime { bindir: None };
let cluster = Cluster::new("some/path", runtime);
assert!(!cluster.exists());
}
#[test]
fn cluster_does_exist() {
let data_dir = tempdir::TempDir::new("data").unwrap();
let version_file = data_dir.path().join("PG_VERSION");
File::create(&version_file).unwrap();
let runtime = Runtime { bindir: None };
let cluster = Cluster::new(&data_dir, runtime);
assert!(cluster.exists());
}
#[test]
fn cluster_has_pid_file() {
let data_dir = PathBuf::from("/some/where");
let runtime = Runtime { bindir: None };
let cluster = Cluster::new(&data_dir, runtime);
assert_eq!(
PathBuf::from("/some/where/postmaster.pid"),
cluster.pidfile()
);
}
#[test]
fn cluster_has_log_file() {
let data_dir = PathBuf::from("/some/where");
let runtime = Runtime { bindir: None };
let cluster = Cluster::new(&data_dir, runtime);
assert_eq!(PathBuf::from("/some/where/backend.log"), cluster.logfile());
}
#[test]
fn cluster_create_creates_cluster() {
let data_dir = tempdir::TempDir::new("data").unwrap();
let runtime = Runtime::default();
let cluster = Cluster::new(&data_dir, runtime);
assert!(!cluster.exists());
assert!(cluster.create().unwrap());
assert!(cluster.exists());
}
#[test]
fn cluster_create_does_nothing_when_it_already_exists() {
let data_dir = tempdir::TempDir::new("data").unwrap();
let runtime = Runtime::default();
let cluster = Cluster::new(&data_dir, runtime);
assert!(!cluster.exists());
assert!(cluster.create().unwrap());
assert!(cluster.exists());
assert!(!cluster.create().unwrap());
}
#[test]
fn cluster_start_stop_starts_and_stops_cluster() {
let data_dir = tempdir::TempDir::new("data").unwrap();
let runtime = Runtime::default();
let cluster = Cluster::new(&data_dir, runtime);
cluster.create().unwrap();
assert!(!cluster.running().unwrap());
cluster.start().unwrap();
assert!(cluster.running().unwrap());
cluster.stop().unwrap();
assert!(!cluster.running().unwrap());
}
#[test]
fn cluster_destroy_stops_and_removes_cluster() {
let data_dir = tempdir::TempDir::new("data").unwrap();
let runtime = Runtime::default();
let cluster = Cluster::new(&data_dir, runtime);
cluster.create().unwrap();
cluster.start().unwrap();
assert!(cluster.exists());
cluster.destroy().unwrap();
assert!(!cluster.exists());
}
#[test]
fn cluster_connect_connects() {
let data_dir = tempdir::TempDir::new("data").unwrap();
let runtime = Runtime::default();
let cluster = Cluster::new(&data_dir, runtime);
cluster.start().unwrap();
cluster.connect("template1").unwrap();
cluster.destroy().unwrap();
}
#[test]
fn cluster_databases_returns_vec_of_database_names() {
let data_dir = tempdir::TempDir::new("data").unwrap();
let runtime = Runtime::default();
let cluster = Cluster::new(&data_dir, runtime);
cluster.start().unwrap();
let expected: HashSet<String> = ["postgres", "template0", "template1"]
.iter()
.cloned()
.map(|s| s.to_string())
.collect();
let observed: HashSet<String> = cluster.databases().unwrap().iter().cloned().collect();
assert_eq!(expected, observed);
cluster.destroy().unwrap();
}
}