use std::{
ffi::OsStr,
io,
path::{Path, PathBuf},
process::ExitStatus,
};
use either::{Left, Right};
use tempfile::TempDir;
use tokio::{fs, task::block_in_place};
use tokio_stream::{wrappers::ReadDirStream, StreamExt};
use super::{config, resource::StartupResource};
use crate::{cluster, coordinate, lock};
#[derive(Debug)]
pub struct Backup {
pub backup_dir: PathBuf,
pub backup_wal_dir: PathBuf,
}
impl Backup {
pub async fn prepare<D: AsRef<Path>>(backup_dir: D) -> Result<Self, BackupError> {
fs::create_dir_all(&backup_dir).await?;
let backup_dir = backup_dir.as_ref().canonicalize()?;
let backup_wal_dir = backup_dir.join("wal");
fs::create_dir_all(&backup_wal_dir).await?;
Ok(Self { backup_dir, backup_wal_dir })
}
pub async fn do_configure_archiving<'a>(
&self,
resource: &'a StartupResource<'a>,
archive_command: &str,
) -> Result<bool, BackupError> {
let pool = match resource {
Left(resource) => resource.facet().pool(None),
Right(resource) => resource.facet().pool(None),
}?;
let mut restart: bool = false;
match WAL_LEVEL.get(&pool).await? {
Some(config::Value::String(level)) if level == "replica" || level == "logical" => {
log::debug!("{WAL_LEVEL:?} already set to {level:?}");
}
Some(_) => {
log::info!("Setting {WAL_LEVEL:?} to 'replica'");
WAL_LEVEL.set(&pool, "replica").await?;
restart = true;
}
None => {
return Err(BackupError::ConfigError(
"WAL is not supported; cannot proceed".into(),
))
}
}
match ARCHIVE_MODE.get(&pool).await? {
Some(config::Value::String(level)) if level == "on" || level == "always" => {
log::debug!("{ARCHIVE_MODE:?} already set to {level:?}");
}
Some(_) => {
log::info!("Setting {ARCHIVE_MODE:?} to 'on'");
ARCHIVE_MODE.set(&pool, "on").await?;
restart = true;
}
None => {
return Err(BackupError::ConfigError(
"Archiving is not supported; cannot proceed".into(),
))
}
}
match ARCHIVE_LIBRARY.get(&pool).await? {
Some(config::Value::String(library)) if library.is_empty() => {
log::debug!("{ARCHIVE_LIBRARY:?} not set (good for us)");
}
Some(archive_library) => {
return Err(BackupError::ConfigError(format!(
"{ARCHIVE_LIBRARY:?} is already set to {archive_library:?}; cannot proceed"
)))
}
None => {
log::debug!("{ARCHIVE_LIBRARY:?} is not supported (good for us)");
}
}
match ARCHIVE_COMMAND.get(&pool).await? {
Some(config::Value::String(command)) if command == archive_command => {
log::debug!("{ARCHIVE_COMMAND:?} already set to {archive_command:?}");
}
Some(config::Value::String(command))
if command.is_empty() || command == "(disabled)" =>
{
log::info!("Setting {ARCHIVE_COMMAND:?} to {archive_command:?}");
ARCHIVE_COMMAND.set(&pool, archive_command).await?;
}
Some(archive_command) => {
return Err(BackupError::ConfigError(format!(
"{ARCHIVE_COMMAND:?} is already set to {archive_command:?}; cannot proceed"
)))
}
None => {
return Err(BackupError::ConfigError(
"Archiving is not supported; cannot proceed".into(),
))
}
}
Ok(restart)
}
pub async fn do_base_backup<'a>(
&self,
resource: &'a StartupResource<'a>,
) -> Result<PathBuf, BackupError> {
let backup_tmp_dir =
block_in_place(|| TempDir::with_prefix_in(BACKUP_DATA_PREFIX_TMP, &self.backup_dir))?;
let args: &[&OsStr] = &[
"--pgdata".as_ref(),
backup_tmp_dir.path().as_ref(),
"--format".as_ref(),
"plain".as_ref(),
"--progress".as_ref(),
];
let status = block_in_place(|| match resource {
Left(resource) => resource.facet().exec(None, "pg_basebackup".as_ref(), args),
Right(resource) => resource.facet().exec(None, "pg_basebackup".as_ref(), args),
})?;
if !status.success() {
Err(status)?;
}
let backup_lock = block_in_place(|| {
lock::UnlockedFile::try_from(&self.backup_dir.join(BACKUP_LOCK_NAME))?
.lock_exclusive()
.map_err(coordinate::CoordinateError::UnixError)
})?;
let backup_data_dir = self.backup_dir.join(format!(
"{BACKUP_DATA_PREFIX}{:010}",
ReadDirStream::new(fs::read_dir(&self.backup_dir).await?)
.filter_map(Result::ok)
.filter_map(|entry| match entry.file_name().to_str() {
Some(name) if name.starts_with(BACKUP_DATA_PREFIX) =>
name[BACKUP_DATA_PREFIX.len()..].parse::<u32>().ok(),
Some(_) | None => None,
})
.fold(0, Ord::max)
.await
+ 1
));
fs::rename(&backup_tmp_dir, &backup_data_dir).await?;
drop(backup_lock);
Ok(backup_data_dir)
}
}
static ARCHIVE_MODE: config::Parameter = config::Parameter("archive_mode");
static ARCHIVE_COMMAND: config::Parameter = config::Parameter("archive_command");
static ARCHIVE_LIBRARY: config::Parameter = config::Parameter("archive_library");
static WAL_LEVEL: config::Parameter = config::Parameter("wal_level");
pub static BACKUP_DATA_PREFIX: &str = "data.";
static BACKUP_DATA_PREFIX_TMP: &str = ".tmp.data.";
static BACKUP_LOCK_NAME: &str = ".lock";
#[derive(thiserror::Error, miette::Diagnostic, Debug)]
pub enum BackupError {
#[error("Input/output error")]
IoError(#[from] io::Error),
#[error("Shell error: {0}")]
GeneralError(String),
#[error("Configuration error: {0}")]
ConfigError(String),
#[error(transparent)]
CoordinateError(#[from] coordinate::CoordinateError<cluster::ClusterError>),
#[error(transparent)]
ClusterError(#[from] cluster::ClusterError),
#[error("External command failed: {0:?}")]
CommandError(ExitStatus),
#[error("Database error")]
SqlxError(#[from] cluster::sqlx::Error),
}
impl From<ExitStatus> for BackupError {
fn from(error: ExitStatus) -> BackupError {
Self::CommandError(error)
}
}