use std::{
borrow::Cow,
io::Write,
path::{Path, PathBuf},
process::ExitCode,
};
use either::Either::Right;
use shell_quote::{QuoteRefExt, Sh};
use crate::runner;
use super::ExitResult;
use pgdo::{
cluster::{self, backup},
coordinate::{finally::with_finally, State},
};
#[derive(clap::Args)]
#[clap(next_help_heading = Some("Options for restore"))]
pub struct Restore {
#[clap(long = "from", value_name = "BACKUP_DIR", display_order = 100)]
pub backup_dir: PathBuf,
#[clap(long = "to", value_name = "RESTORE_DIR", display_order = 200)]
pub restore_dir: PathBuf,
}
impl Restore {
pub fn invoke(self) -> ExitResult {
let Self { backup_dir, restore_dir } = self;
restore(backup_dir, restore_dir)?;
Ok(ExitCode::SUCCESS)
}
}
impl From<Restore> for super::Command {
fn from(restore: Restore) -> Self {
Self::Restore(restore)
}
}
#[derive(thiserror::Error, miette::Diagnostic, Debug)]
enum RestoreError {
#[error("Input/output error")]
IoError(#[from] std::io::Error),
#[error("File copy error")]
FileCopyError(#[from] fs_extra::error::Error),
#[error("Shell error")]
ShellError(#[from] std::string::FromUtf8Error),
#[error(transparent)]
ClusterError(#[from] pgdo::cluster::ClusterError),
#[error(transparent)]
StrategyError(#[from] runner::StrategyError),
#[error(transparent)]
LockForError(#[from] runner::LockForError),
#[error(transparent)]
ResourceError(#[from] cluster::resource::Error),
#[error("{0}")]
Other(Cow<'static, str>),
}
impl From<&'static str> for RestoreError {
fn from(s: &'static str) -> Self {
Self::Other(s.into())
}
}
impl From<String> for RestoreError {
fn from(s: String) -> Self {
Self::Other(s.into())
}
}
fn restore<D: AsRef<Path>>(backup_dir: D, restore_dir: D) -> Result<(), RestoreError> {
let term = console::Term::stdout();
let backup_dir = backup_dir.as_ref().canonicalize()?;
let backup_wal_dir = backup_dir.join("wal");
let backup_data_dir = backup_dir
.read_dir()?
.filter_map(|entry| entry.ok()) .filter_map(|entry| match entry.file_name().to_str() {
Some(name) if name.starts_with(backup::BACKUP_DATA_PREFIX) => name
[backup::BACKUP_DATA_PREFIX.len()..]
.parse::<u32>()
.ok()
.map(|n| (n, entry)),
Some(_) | None => None,
})
.max_by_key(|(n, _)| *n)
.map(|(_, entry)| entry.path())
.ok_or_else(|| format!("No base backup found in {backup_dir:?}"))?;
std::fs::create_dir_all(&restore_dir)?;
let restore_dir = restore_dir.as_ref().canonicalize()?;
if restore_dir.read_dir()?.next().is_some() {
Err("Restore directory is not empty")?;
} else {
let mut perms = restore_dir.metadata()?.permissions();
std::os::unix::fs::PermissionsExt::set_mode(&mut perms, 0o700);
std::fs::set_permissions(&restore_dir, perms)?;
}
{
let progress_bar = indicatif::ProgressBar::hidden();
progress_bar.set_draw_target(indicatif::ProgressDrawTarget::term(term.clone(), 20));
progress_bar.set_style(
indicatif::ProgressStyle::with_template(
"{wide_bar} {percent}% complete; {msg}; {eta} remaining",
)
.expect("invalid progress bar template"),
);
fs_extra::dir::copy_with_progress(
backup_data_dir,
&restore_dir,
&fs_extra::dir::CopyOptions::new().content_only(true),
|progress| match progress.state {
fs_extra::dir::TransitState::Exists => fs_extra::dir::TransitProcessResult::Abort,
fs_extra::dir::TransitState::NoAccess => fs_extra::dir::TransitProcessResult::Abort,
fs_extra::dir::TransitState::Normal => {
progress_bar.set_length(progress.total_bytes);
progress_bar.set_position(progress.copied_bytes);
progress_bar.set_message(format!(
"{count} of {total} copied",
count = indicatif::HumanBytes(progress.copied_bytes),
total = indicatif::HumanBytes(progress.total_bytes),
));
fs_extra::dir::TransitProcessResult::ContinueOrAbort
}
},
)?;
progress_bar.finish_and_clear();
}
write!(&term, "Removing WAL from restored cluster…")?;
empty_out_dir(restore_dir.join("pg_wal"))?;
writeln!(&term, " done.")?;
std::fs::write(restore_dir.join("recovery.signal"), "")?;
let backup_wal_dir_sh = String::from_utf8(backup_wal_dir.quoted(Sh))?;
let restore_command = format!("cp {backup_wal_dir_sh}/%f %p");
let (datadir, lock) = runner::lock_for(&restore_dir)?;
let strategy = runner::determine_strategy(None)?;
let cluster = cluster::Cluster::new(datadir, strategy)?;
let resource = cluster::resource::ResourceFree::new(lock, cluster);
let (State::Modified, Right(resource)) = cluster::resource::startup(
resource,
&[
(ARCHIVE_MODE, "off".into()),
(RESTORE_COMMAND, restore_command.into()),
(RECOVERY_TARGET, "immediate".into()),
(RECOVERY_TARGET_ACTION, "shutdown".into()),
],
)?
else {
Err(format!(
"Restored cluster is already running in {restore_dir:?}!"
))?
};
{
let start = std::time::Instant::now();
let interval = std::time::Duration::from_secs(1);
let message = "Waiting for database recovery…";
term.write_line(message)?;
while resource.facet().running()? {
std::thread::sleep(interval);
term.clear_last_lines(1)?;
writeln!(
&term,
"{message} ({} elapsed)",
indicatif::HumanDuration(start.elapsed())
)?;
}
term.clear_last_lines(1)?;
}
std::fs::remove_file(restore_dir.join("recovery.signal"))?;
writeln!(&term, "Disabling archiving…")?;
resource.facet().start(&[(ARCHIVE_MODE, "off".into())])?;
with_finally(
|| resource.facet().stop(),
|| {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
let pool = resource.facet().pool(None)?;
write!(&term, "Resetting {ARCHIVE_MODE}…")?;
ARCHIVE_MODE.reset(&pool).await?;
writeln!(&term, " done.")?;
write!(&term, "Resetting {ARCHIVE_COMMAND}…")?;
ARCHIVE_COMMAND.reset(&pool).await?;
writeln!(&term, " done.")?;
write!(&term, "Resetting {ARCHIVE_LIBRARY}…")?;
match ARCHIVE_LIBRARY.reset(&pool).await {
Ok(_) => writeln!(&term, " done.")?,
Err(err) => {
match err.as_database_error() {
Some(err) if err.code() == Some("42704".into()) => {
writeln!(&term, " not supported.")?;
Ok(())
}
_ => Err(err),
}?;
}
};
Ok::<_, cluster::ClusterError>(())
})?;
Ok::<_, RestoreError>(())
},
)?;
writeln!(&term, "Archiving disabled in restored cluster.")?;
let (_lock, cluster) = resource.release()?.into_parts();
let superusers = cluster::determine_superuser_role_names(&cluster)?;
let restore_dir_sh: String = String::from_utf8(restore_dir.quoted(Sh))?;
let title = console::style("Restore/recovery complete!")
.bold()
.bright()
.white();
let warning = console::style("WARNING").bold().yellow();
let code = console::Style::new().bold().cyan();
match pgdo::util::current_user() {
Ok(user) if superusers.contains(&user) => {
writeln!(
&term,
"{title} Use {} to start the cluster.",
code.apply_to(format!("pgdo -D {restore_dir_sh}")),
)?;
}
Ok(_) | Err(_) => match superusers.iter().min() {
Some(user) => {
let user_sh: String = String::from_utf8(user.quoted(Sh))?;
writeln!(&term, "{title}")?;
writeln!(&term, "{warning}: Current user does not match any superuser role in the restored cluster.")?;
writeln!(
&term,
"Try {} to start the cluster.",
code.apply_to(format!("PGUSER={user_sh} pgdo -D {restore_dir_sh}")),
)?;
}
None => {
writeln!(
&term,
"{title} Use {} to start the cluster.",
code.apply_to(format!("pgdo -D {restore_dir_sh}")),
)?;
writeln!(
&term,
"WARNING: No superuser role was found in the restored cluster!"
)?;
}
},
}
Ok(())
}
static ARCHIVE_MODE: cluster::config::Parameter = cluster::config::Parameter("archive_mode");
static ARCHIVE_COMMAND: cluster::config::Parameter = cluster::config::Parameter("archive_command");
static ARCHIVE_LIBRARY: cluster::config::Parameter = cluster::config::Parameter("archive_library");
static RESTORE_COMMAND: cluster::config::Parameter = cluster::config::Parameter("restore_command");
static RECOVERY_TARGET: cluster::config::Parameter = cluster::config::Parameter("recovery_target");
static RECOVERY_TARGET_ACTION: cluster::config::Parameter =
cluster::config::Parameter("recovery_target_action");
fn empty_out_dir<P: AsRef<Path>>(dir: P) -> Result<(), std::io::Error> {
dir.as_ref().read_dir()?.try_for_each(|entry| {
let entry = entry?;
if entry.file_type()?.is_dir() {
std::fs::remove_dir_all(entry.path())?;
} else {
std::fs::remove_file(entry.path())?;
}
Ok::<_, std::io::Error>(())
})
}