pub mod backup;
pub mod config;
pub mod resource;
mod error;
use std::ffi::{OsStr, OsString};
use std::io::{self, Read, Write};
use std::os::unix::prelude::{OsStrExt, OsStringExt};
use std::path::{Path, PathBuf};
use std::process::{Command, ExitStatus, Output};
use std::{fmt, fs};
use postgres;
use shell_quote::{QuoteExt, Sh};
pub use sqlx;
use crate::runtime::{
strategy::{Strategy, StrategyLike},
Runtime,
};
use crate::{
coordinate::{
self,
State::{self, *},
},
version,
};
pub use error::ClusterError;
pub static DATABASE_TEMPLATE0: &str = "template0";
pub static DATABASE_TEMPLATE1: &str = "template1";
pub static DATABASE_POSTGRES: &str = "postgres";
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum ClusterStatus {
Running,
Stopped,
Missing,
}
impl fmt::Display for ClusterStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ClusterStatus::Running => write!(f, "running"),
ClusterStatus::Stopped => write!(f, "stopped"),
ClusterStatus::Missing => write!(f, "missing"),
}
}
}
#[derive(Debug)]
pub struct Cluster {
pub datadir: PathBuf,
pub strategy: Strategy,
}
impl Cluster {
pub fn new<P: AsRef<Path>, S: Into<Strategy>>(
datadir: P,
strategy: S,
) -> Result<Self, ClusterError> {
Ok(Self {
datadir: datadir.as_ref().to_owned(),
strategy: strategy.into(),
})
}
fn runtime(&self) -> Result<Runtime, ClusterError> {
match version(self)? {
None => self
.strategy
.fallback()
.ok_or_else(|| ClusterError::RuntimeDefaultNotFound),
Some(version) => self
.strategy
.select(&version.into())
.ok_or_else(|| ClusterError::RuntimeNotFound(version)),
}
}
fn ctl(&self) -> Result<Command, ClusterError> {
let mut command = self.runtime()?.execute("pg_ctl");
command.env("PGDATA", &self.datadir);
command.env("PGHOST", &self.datadir);
Ok(command)
}
pub fn running(&self) -> Result<bool, ClusterError> {
self.status().map(|status| status == ClusterStatus::Running)
}
pub fn status(&self) -> Result<ClusterStatus, ClusterError> {
let output = self.ctl()?.arg("status").output()?;
let code = match output.status.code() {
None => return Err(ClusterError::CommandError(output)),
Some(0) => return Ok(ClusterStatus::Running),
Some(code) => code,
};
let runtime = self.runtime()?;
let status = match runtime.version {
version::Version::Post10(_major, _minor) => {
match code {
3 => Some(ClusterStatus::Stopped),
4 if !exists(self) => Some(ClusterStatus::Missing),
_ => None,
}
}
version::Version::Pre10(9, point, _minor) => {
if point >= 4 {
match code {
3 => Some(ClusterStatus::Stopped),
4 if !exists(self) => Some(ClusterStatus::Missing),
_ => None,
}
}
else if point >= 2 {
match code {
3 if !exists(self) => Some(ClusterStatus::Missing),
3 => Some(ClusterStatus::Stopped),
_ => None,
}
}
else {
match code {
1 if !exists(self) => Some(ClusterStatus::Missing),
1 => Some(ClusterStatus::Stopped),
_ => None,
}
}
}
version::Version::Pre10(_major, _point, _minor) => None,
};
match status {
Some(running) => Ok(running),
None => Err(ClusterError::UnsupportedVersion(runtime.version)),
}
}
pub fn pidfile(&self) -> PathBuf {
self.datadir.join("postmaster.pid")
}
pub fn logfile(&self) -> PathBuf {
self.datadir.join("postmaster.log")
}
pub fn create(&self) -> Result<State, ClusterError> {
if exists(self) {
Ok(Unmodified)
} else {
fs::create_dir_all(&self.datadir)?;
let mut command = self.ctl()?;
#[allow(clippy::suspicious_command_arg_space)]
command
.arg("init")
.arg("-s")
.arg("-o")
.arg("-E utf8 --locale C -A trust")
.env("TZ", "UTC");
bugs::retry_pg_ctl(&mut command, |_| Ok(()))
}
}
pub fn start(
&self,
options: &[(config::Parameter, config::Value)],
) -> Result<State, ClusterError> {
self.create()?;
if self.running()? {
return Ok(Unmodified);
}
let options = {
let mut arg: Vec<u8> = b"-h '' -k ".into();
arg.push_quoted(Sh, &self.datadir);
for (parameter, value) in options {
arg.extend(b" -c ");
arg.push_quoted(Sh, &format!("{parameter}={value}"));
}
OsString::from_vec(arg)
};
let logfile = self.logfile();
let logfile_name = logfile
.file_name()
.unwrap_or_else(|| "<unknown.log>".as_ref())
.display();
let mut log: logfile::LogFile = logfile.as_path().try_into()?;
let mut command = self.ctl()?;
command
.arg("start")
.arg("-l")
.arg(&logfile)
.arg("-s")
.arg("-w")
.arg("-o")
.arg(options);
let append_logs_to_stderr = |output: &mut Output| {
writeln!(&mut output.stderr)?;
writeln!(&mut output.stderr, "-- {logfile_name} --")?;
log.read_to_end(&mut output.stderr)?;
Ok(())
};
bugs::retry_pg_ctl(&mut command, append_logs_to_stderr)
}
fn connect(&self, database: Option<&str>) -> Result<postgres::Client, ClusterError> {
let user = crate::util::current_user()?;
let host = self.datadir.to_string_lossy(); let client = postgres::Client::configure()
.host(&host)
.dbname(database.unwrap_or(DATABASE_POSTGRES))
.user(&user)
.connect(postgres::NoTls)?;
Ok(client)
}
pub fn pool(&self, database: Option<&str>) -> Result<sqlx::PgPool, ClusterError> {
Ok(sqlx::PgPool::connect_lazy_with(
sqlx::postgres::PgConnectOptions::new()
.socket(&self.datadir)
.database(database.unwrap_or(DATABASE_POSTGRES))
.username(&crate::util::current_user()?)
.application_name("pgdo"),
))
}
fn url(&self, database: &str) -> Result<Option<url::Url>, url::ParseError> {
match self.datadir.to_str() {
Some(datadir) => url::Url::parse_with_params(
"postgresql://",
[("host", datadir), ("dbname", database)],
)
.map(Some),
None => Ok(None),
}
}
pub fn shell(&self, database: Option<&str>) -> Result<ExitStatus, ClusterError> {
let mut command = self.runtime()?.execute("psql");
self.set_env(command.arg("--quiet"), database)?;
Ok(command.spawn()?.wait()?)
}
pub fn exec<T: AsRef<OsStr>>(
&self,
database: Option<&str>,
command: T,
args: &[T],
) -> Result<ExitStatus, ClusterError> {
let mut command = self.runtime()?.command(command);
self.set_env(command.args(args), database)?;
Ok(command.spawn()?.wait()?)
}
fn set_env(&self, command: &mut Command, database: Option<&str>) -> Result<(), ClusterError> {
let database = database.unwrap_or(DATABASE_POSTGRES);
command.env("PGDATA", &self.datadir);
command.env("PGHOST", &self.datadir);
command.env("PGDATABASE", database);
match self.url(database)? {
Some(url) => command.env("DATABASE_URL", url.as_str()),
None => command.env_remove("DATABASE_URL"),
};
Ok(())
}
pub fn databases(&self) -> Result<Vec<String>, ClusterError> {
let mut conn = self.connect(None)?;
let rows = conn.query(
"SELECT datname FROM pg_catalog.pg_database ORDER BY datname",
&[],
)?;
let datnames: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
Ok(datnames)
}
pub fn createdb(&self, database: &str) -> Result<State, ClusterError> {
use postgres::error::SqlState;
let statement = format!(
"CREATE DATABASE {}",
postgres_protocol::escape::escape_identifier(database)
);
match self.connect(None)?.execute(statement.as_str(), &[]) {
Err(err) if err.code() == Some(&SqlState::DUPLICATE_DATABASE) => Ok(Unmodified),
Err(err) => Err(err)?,
Ok(_) => Ok(Modified),
}
}
pub fn dropdb(&self, database: &str) -> Result<State, ClusterError> {
use postgres::error::SqlState;
let statement = format!(
"DROP DATABASE {}",
postgres_protocol::escape::escape_identifier(database)
);
match self.connect(None)?.execute(statement.as_str(), &[]) {
Err(err) if err.code() == Some(&SqlState::UNDEFINED_DATABASE) => Ok(Unmodified),
Err(err) => Err(err)?,
Ok(_) => Ok(Modified),
}
}
pub fn stop(&self) -> Result<State, ClusterError> {
if !self.running()? {
return Ok(Unmodified);
}
let output = self
.ctl()?
.arg("stop")
.arg("-s")
.arg("-w")
.arg("-m")
.arg("fast")
.output()?;
if output.status.success() {
Ok(Modified) } else {
Err(ClusterError::CommandError(output))
}
}
pub fn destroy(&self) -> Result<State, ClusterError> {
self.stop()?;
match fs::remove_dir_all(&self.datadir) {
Ok(()) => Ok(Modified),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(Unmodified),
Err(err) => Err(err)?,
}
}
}
impl AsRef<Path> for Cluster {
fn as_ref(&self) -> &Path {
&self.datadir
}
}
pub fn exists<P: AsRef<Path>>(datadir: P) -> bool {
let datadir = datadir.as_ref();
datadir.is_dir() && datadir.join("PG_VERSION").is_file()
}
pub fn version<P: AsRef<Path>>(
datadir: P,
) -> Result<Option<version::PartialVersion>, ClusterError> {
let version_file = datadir.as_ref().join("PG_VERSION");
match std::fs::read_to_string(version_file) {
Ok(version) => Ok(Some(version.parse()?)),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
Err(err) => Err(err)?,
}
}
pub fn determine_superuser_role_names(
cluster: &Cluster,
) -> Result<std::collections::HashSet<String>, ClusterError> {
use regex::Regex;
use std::io::Write;
use std::panic::panic_any;
use std::process::Stdio;
use std::sync::LazyLock;
static QUERY: &[u8] = b"select rolname from pg_roles where rolsuper and rolcanlogin\n";
static RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r#"\brolname\s*=\s*"(.+)""#)
.expect("invalid regex (for matching single-user role names)")
});
let mut child = cluster
.runtime()?
.execute("postgres")
.arg("--single")
.arg("-D")
.arg(&cluster.datadir)
.arg("postgres")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let mut stdin = child.stdin.take().expect("could not take stdin");
let writer = std::thread::spawn(move || stdin.write_all(QUERY));
let output = child.wait_with_output()?;
let stdout = String::from_utf8_lossy(&output.stdout);
let superusers: std::collections::HashSet<_> = RE
.captures_iter(&stdout)
.filter_map(|capture| capture.get(1))
.map(|m| m.as_str().to_owned())
.collect();
match writer.join() {
Err(err) => panic_any(err),
Ok(result) => result?,
}
if superusers.is_empty() {
return Err(ClusterError::CommandError(output));
}
Ok(superusers)
}
pub type Options<'a> = &'a [(config::Parameter<'a>, config::Value)];
impl coordinate::Subject for Cluster {
type Error = ClusterError;
type Options<'a> = Options<'a>;
fn start(&self, options: Self::Options<'_>) -> Result<State, Self::Error> {
self.start(options)
}
fn stop(&self) -> Result<State, Self::Error> {
self.stop()
}
fn destroy(&self) -> Result<State, Self::Error> {
self.destroy()
}
fn exists(&self) -> Result<bool, Self::Error> {
Ok(exists(self))
}
fn running(&self) -> Result<bool, Self::Error> {
self.running()
}
}
#[allow(clippy::unreadable_literal)]
const UUID_NS: uuid::Uuid = uuid::Uuid::from_u128(93875103436633470414348750305797058811);
pub type ClusterGuard = coordinate::guard::Guard<Cluster>;
pub fn run<P: AsRef<Path>>(
path: P,
options: Options<'_>,
) -> Result<ClusterGuard, coordinate::CoordinateError<ClusterError>> {
let path = path.as_ref();
fs::create_dir_all(path)?;
let path = path.canonicalize()?;
let strategy = crate::runtime::strategy::Strategy::default();
let cluster = crate::cluster::Cluster::new(&path, strategy)?;
let lock_name = path.as_os_str().as_bytes();
let lock_uuid = uuid::Uuid::new_v5(&UUID_NS, lock_name);
let lock = crate::lock::UnlockedFile::try_from(&lock_uuid)?;
ClusterGuard::startup(lock, cluster, options)
}
mod logfile {
use std::fs::File;
use std::io::{self, ErrorKind::NotFound, Read, Seek};
use std::path::{Path, PathBuf};
pub struct LogFile {
path: PathBuf,
file: Option<File>,
}
impl TryFrom<&Path> for LogFile {
type Error = io::Error;
fn try_from(path: &Path) -> Result<Self, Self::Error> {
let file = match File::open(path) {
Err(err) if err.kind() == NotFound => None,
Err(err) => Err(err)?,
Ok(mut file) => {
file.seek(io::SeekFrom::End(0))?;
Some(file)
}
};
Ok(LogFile { path: path.to_owned(), file })
}
}
impl Read for LogFile {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Some(ref mut file) = self.file {
file.read(buf)
} else {
self.file = match File::open(&self.path) {
Err(err) if err.kind() == NotFound => return Ok(0),
Err(err) => Err(err)?,
Ok(file) => Some(file),
};
match self.file {
Some(ref mut file) => file.read(buf),
None => Ok(0),
}
}
}
}
}
mod bugs {
use super::{ClusterError, State, State::Modified};
use regex::bytes::Regex;
use std::process::{Command, Output};
use std::{fmt::Write, sync, time::Duration};
pub fn retry_pg_ctl(
command: &mut Command,
mut supplement: impl FnMut(&mut Output) -> Result<(), ClusterError>,
) -> Result<State, ClusterError> {
static SEMGET_BUG_LOG: sync::Once = sync::Once::new();
static SEMGET_BUG_RE1: sync::LazyLock<Regex> = sync::LazyLock::new(|| {
Regex::new(r"\bFATAL:\s+could not create semaphores: Invalid argument\b")
.expect("invalid regex (for matching semaphore errors #1)")
});
static SEMGET_BUG_RE2: sync::LazyLock<Regex> = sync::LazyLock::new(|| {
Regex::new(r"\bDETAIL:\s+Failed system call was semget\b")
.expect("invalid regex (for matching semaphore errors #2)")
});
fn is_retryable(logs: &[u8]) -> bool {
SEMGET_BUG_RE1.is_match(logs) && SEMGET_BUG_RE2.is_match(logs)
}
let command_summary = command.get_args().take(1).fold(
command.get_program().display().to_string(),
|mut summary, arg| {
write!(&mut summary, " {}", arg.display()).ok();
summary
},
);
let run = || {
use backoff::Error;
use ClusterError::{CommandError, IoError};
match command.output() {
Ok(output) if output.status.success() => Ok(Modified),
Ok(mut output) => {
supplement(&mut output)?;
if is_retryable(output.stderr.as_slice()) {
Err(Error::transient(CommandError(output)))
} else {
Err(Error::permanent(CommandError(output)))
}
}
Err(err) => Err(Error::permanent(IoError(err))),
}
};
let notify = move |_, delay: Duration| {
SEMGET_BUG_LOG.call_once(|| {
log::info!(concat!(
"In all presently released versions of PostgreSQL, `pg_ctl` can fail on ",
"some platforms (macOS, some BSDs) due to a semget(2) failure. This is a ",
"particular problem in PostgreSQL 17.x (the linked bug report has more ",
"information). As an imperfect workaround, `pgdo` retries the command a ",
"few times when it detects this specific error. Original bug report: ",
"https://www.postgresql.org/message-id/CALL7chmzY3eXHA7zHnODUVGZLSvK3wYCSP0RmcDFHJY8f28Q3g@mail.gmail.com.",
));
});
log::warn!("`{command_summary}` failed; retrying in {delay:?}…");
};
let state = backoff::retry_notify(
backoff::ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(200))
.with_max_elapsed_time(Some(Duration::from_mins(1)))
.with_max_interval(Duration::from_secs(10))
.build(),
run,
notify,
)
.map_err(|err| match err {
backoff::Error::Permanent(err) => err,
backoff::Error::Transient { err, .. } => err,
})?;
Ok(state)
}
}