use std::{
path::{Path, PathBuf},
process::ExitCode,
sync::{PoisonError, RwLock},
};
use either::{Left, Right};
use miette::IntoDiagnostic;
use shell_quote::{QuoteRefExt, Sh};
use super::ExitResult;
use crate::{args, runner};
use pgdo::{
cluster::{self, backup, resource, ClusterError},
coordinate::{cleanup::with_cleanup, finally::with_finally, State},
};
#[derive(clap::Args)]
#[clap(next_help_heading = Some("Options for backup"))]
pub struct Backup {
#[clap(flatten)]
pub cluster: args::ClusterArgs,
#[clap(long = "into", value_name = "BACKUP_DIR", display_order = 100)]
pub backup_dir: PathBuf,
}
impl Backup {
pub fn invoke(self) -> ExitResult {
let Self { cluster, backup_dir } = self;
let (datadir, lock) = runner::lock_for(cluster.dir)?;
let strategy = runner::determine_strategy(None)?;
let cluster = cluster::Cluster::new(datadir, strategy)?;
let resource = resource::ResourceFree::new(lock, cluster);
backup(resource, backup_dir)?;
Ok(ExitCode::SUCCESS)
}
}
impl From<Backup> for super::Command {
fn from(backup: Backup) -> Self {
Self::Backup(backup)
}
}
#[derive(clap::Args)]
#[clap(next_help_heading = Some("Options for backup:tools"))]
pub struct BackupTools {
#[clap(subcommand)]
command: BackupTool,
}
impl BackupTools {
pub fn invoke(self) -> ExitResult {
match self.command {
BackupTool::WalArchive { source, target } => Ok(copy_wal_archive(source, target)),
}
}
}
impl From<BackupTools> for super::Command {
fn from(tools: BackupTools) -> Self {
Self::BackupTools(tools)
}
}
#[derive(clap::Subcommand)]
pub(crate) enum BackupTool {
#[clap(name = "wal:archive", display_order = 1)]
WalArchive {
source: PathBuf,
target: PathBuf,
},
}
fn backup<D: AsRef<Path>>(resource: resource::ResourceFree, backup_dir: D) -> miette::Result<()> {
let backup = {
let rt = tokio::runtime::Runtime::new().into_diagnostic()?;
rt.block_on(async { backup::Backup::prepare(&backup_dir).await })
.into_diagnostic()?
};
log::info!("Starting cluster (if not already started)…");
let (started, resource) = resource::startup_if_exists(resource, &[])?;
let resource = RwLock::new(resource);
let do_cleanup = || -> Result<State, ClusterError> {
match (started, resource.read().as_deref()) {
(State::Modified, Ok(Right(resource))) => {
log::info!("Shutting down cluster…");
resource.facet().stop()
}
(State::Modified, Ok(Left(_)) | Err(_)) => {
log::warn!(concat!(
"Cluster was started for backup, but it cannot now be shut down; ",
"please shut it down manually."
));
Ok(State::Unmodified)
}
(State::Unmodified, Ok(_)) => {
Ok(State::Unmodified)
}
(State::Unmodified, Err(_)) => {
Ok(State::Unmodified)
}
}
};
let archive_command = {
let pgdo_exe = std::env::current_exe().into_diagnostic()?;
let pgdo_exe_shell = String::from_utf8(pgdo_exe.quoted(Sh)).into_diagnostic()?;
let destination_wal_shell =
String::from_utf8(backup.backup_wal_dir.quoted(Sh)).into_diagnostic()?;
format!("{pgdo_exe_shell} backup:tools wal:archive %p {destination_wal_shell}/%f")
};
let needs_restart = with_cleanup(do_cleanup, || {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
match resource.read().as_deref() {
Ok(resource) => {
backup
.do_configure_archiving(resource, &archive_command)
.await
}
Err(err) => panic!("Could not acquire resource: {err}"),
}
})
})?;
if needs_restart {
log::info!("The cluster must be restarted so that configuration changes come into effect.");
match resource.read().as_deref() {
Ok(Left(_)) => {
Err(backup::BackupError::GeneralError(
concat!(
"The cluster is in use, and so cannot be restarted automatically. ",
"Please restart the cluster manually then try this backup again.",
)
.into(),
))?;
}
Ok(Right(resource)) => {
let facet = resource.facet();
with_cleanup(do_cleanup, || {
log::info!("Restarting cluster; stopping…");
facet.stop().and_then(|_| {
log::info!("Restarting cluster; starting up again…");
facet.start(&[])
})
})?;
}
Err(err) => panic!("Could not acquire resource: {err}"),
};
}
log::info!("Performing base backup…");
let destination_data = match resource.read().as_deref() {
Ok(resource) => with_finally(do_cleanup, || {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async { backup.do_base_backup(resource).await })
}),
Err(err) => panic!("Could not acquire resource: {err}"),
}?;
log::info!("Base backup complete; find it at {destination_data:?}");
resource
.into_inner()
.unwrap_or_else(PoisonError::into_inner)
.either(
resource::ResourceShared::release,
resource::ResourceExclusive::release,
)?;
Ok(())
}
fn copy_wal_archive(source: PathBuf, target: PathBuf) -> ExitCode {
use std::{
fs::File,
io::{self, BufRead, ErrorKind::AlreadyExists, Write},
};
match File::open(&source) {
Ok(file_source) => {
match File::options().write(true).create_new(true).open(&target) {
Ok(file_target) => {
log::info!("WAL archiving from {source:?} to {target:?}");
let mut reader = io::BufReader::new(&file_source);
let mut writer = io::BufWriter::new(&file_target);
match io::copy(&mut reader, &mut writer)
.and_then(|_| writer.flush())
.and_then(|_| file_target.sync_all())
{
Ok(()) => ExitCode::SUCCESS,
Err(err) => {
log::error!("WAL archive failure; error while copying: {err}");
ExitCode::FAILURE
}
}
}
Err(err) if err.kind() == AlreadyExists => {
match File::open(&target) {
Ok(file_target) => {
let mut reader_source = io::BufReader::new(&file_source);
let mut reader_target = io::BufReader::new(&file_target);
loop {
let (bytes_source, bytes_target) = {
let buf_source = match reader_source.fill_buf() {
Ok(buf) => buf,
Err(err) => {
log::error!("WAL archive failure; error reading {source:?}: {err}");
break ExitCode::FAILURE;
}
};
let buf_target = match reader_target.fill_buf() {
Ok(buf) => buf,
Err(err) => {
log::error!("WAL archive failure; error reading {target:?}: {err}");
break ExitCode::FAILURE;
}
};
if buf_source.is_empty() && buf_target.is_empty() {
log::info!("WAL file {source:?} already archived okay");
break ExitCode::SUCCESS;
} else if buf_source != buf_target {
log::error!("WAL file {source:?} already archived to {target:?} **BUT CONTENTS DIFFER**");
break ExitCode::FAILURE;
};
(buf_source.len(), buf_target.len())
};
reader_source.consume(bytes_source);
reader_target.consume(bytes_target);
}
}
Err(err) => {
log::error!("WAL archive failure; error accessing {target:?}: {err}");
ExitCode::FAILURE
}
}
}
Err(err) => {
log::error!("WAL archive failure; error accessing {target:?}: {err}");
ExitCode::FAILURE
}
}
}
Err(err) => {
log::error!("WAL archive failure; error accessing {source:?}: {err}");
ExitCode::FAILURE
}
}
}