1use std::{
2 ffi::OsStr,
3 io,
4 path::{Path, PathBuf},
5 process::ExitStatus,
6};
7
8use either::{Left, Right};
9use tempfile::TempDir;
10use tokio::{fs, task::block_in_place};
11use tokio_stream::{wrappers::ReadDirStream, StreamExt};
12
13use super::{config, resource::HeldResource};
14use crate::{cluster, coordinate, lock};
15
16#[derive(Debug)]
19pub struct Backup {
20 pub backup_dir: PathBuf,
21 pub backup_wal_dir: PathBuf,
22}
23
24impl Backup {
25 pub async fn prepare<D: AsRef<Path>>(backup_dir: D) -> Result<Self, BackupError> {
28 fs::create_dir_all(&backup_dir).await?;
29 let backup_dir = backup_dir.as_ref().canonicalize()?;
30 let backup_wal_dir = backup_dir.join("wal");
31 fs::create_dir_all(&backup_wal_dir).await?;
32 Ok(Self { backup_dir, backup_wal_dir })
33 }
34
35 pub async fn do_configure_archiving<'a>(
41 &self,
42 resource: &'a HeldResource,
43 archive_command: &str,
44 ) -> Result<bool, BackupError> {
45 let pool = match resource {
46 Left(resource) => resource.facet().pool(None),
47 Right(resource) => resource.facet().pool(None),
48 }?;
49 let mut restart: bool = false;
50
51 match WAL_LEVEL.get(&pool).await? {
54 Some(config::Value::String(level)) if level == "replica" || level == "logical" => {
55 log::debug!("{WAL_LEVEL:?} already set to {level:?}");
56 }
57 Some(_) => {
58 log::info!("Setting {WAL_LEVEL:?} to 'replica'");
59 WAL_LEVEL.set(&pool, "replica").await?;
60 restart = true;
61 }
62 None => {
63 return Err(BackupError::ConfigError(
64 "WAL is not supported; cannot proceed".into(),
65 ))
66 }
67 }
68
69 match ARCHIVE_MODE.get(&pool).await? {
72 Some(config::Value::String(level)) if level == "on" || level == "always" => {
73 log::debug!("{ARCHIVE_MODE:?} already set to {level:?}");
74 }
75 Some(_) => {
76 log::info!("Setting {ARCHIVE_MODE:?} to 'on'");
77 ARCHIVE_MODE.set(&pool, "on").await?;
78 restart = true;
79 }
80 None => {
81 return Err(BackupError::ConfigError(
82 "Archiving is not supported; cannot proceed".into(),
83 ))
84 }
85 }
86
87 match ARCHIVE_LIBRARY.get(&pool).await? {
89 Some(config::Value::String(library)) if library.is_empty() => {
90 log::debug!("{ARCHIVE_LIBRARY:?} not set (good for us)");
91 }
92 Some(archive_library) => {
93 return Err(BackupError::ConfigError(format!(
94 "{ARCHIVE_LIBRARY:?} is already set to {archive_library:?}; cannot proceed"
95 )))
96 }
97 None => {
98 log::debug!("{ARCHIVE_LIBRARY:?} is not supported (good for us)");
99 }
100 }
101
102 match ARCHIVE_COMMAND.get(&pool).await? {
103 Some(config::Value::String(command)) if command == archive_command => {
104 log::debug!("{ARCHIVE_COMMAND:?} already set to {archive_command:?}");
105 }
106 Some(config::Value::String(command))
108 if command.is_empty() || command == "(disabled)" =>
109 {
110 log::info!("Setting {ARCHIVE_COMMAND:?} to {archive_command:?}");
111 ARCHIVE_COMMAND.set(&pool, archive_command).await?;
112 }
113 Some(archive_command) => {
114 return Err(BackupError::ConfigError(format!(
115 "{ARCHIVE_COMMAND:?} is already set to {archive_command:?}; cannot proceed"
116 )))
117 }
118 None => {
119 return Err(BackupError::ConfigError(
120 "Archiving is not supported; cannot proceed".into(),
121 ))
122 }
123 }
124
125 Ok(restart)
126 }
127
128 pub async fn do_base_backup<'a>(
136 &self,
137 resource: &'a HeldResource,
138 ) -> Result<PathBuf, BackupError> {
139 let backup_tmp_dir =
141 block_in_place(|| TempDir::with_prefix_in(BACKUP_DATA_PREFIX_TMP, &self.backup_dir))?;
142
143 let args: &[&OsStr] = &[
144 "--pgdata".as_ref(),
145 backup_tmp_dir.path().as_ref(),
146 "--format".as_ref(),
147 "plain".as_ref(),
148 "--progress".as_ref(),
149 ];
150 let status = block_in_place(|| match resource {
151 Left(resource) => resource.facet().exec(None, "pg_basebackup".as_ref(), args),
152 Right(resource) => resource.facet().exec(None, "pg_basebackup".as_ref(), args),
153 })?;
154 if !status.success() {
155 Err(status)?;
156 }
157 let backup_lock = block_in_place(|| {
160 lock::UnlockedFile::try_from(&self.backup_dir.join(BACKUP_LOCK_NAME))?
161 .lock_exclusive()
162 .map_err(coordinate::CoordinateError::UnixError)
163 })?;
164
165 let backup_data_dir = self.backup_dir.join(format!(
169 "{BACKUP_DATA_PREFIX}{:010}",
170 ReadDirStream::new(fs::read_dir(&self.backup_dir).await?)
171 .filter_map(Result::ok)
172 .filter_map(|entry| match entry.file_name().to_str() {
173 Some(name) if name.starts_with(BACKUP_DATA_PREFIX) =>
174 name[BACKUP_DATA_PREFIX.len()..].parse::<u32>().ok(),
175 Some(_) | None => None,
176 })
177 .fold(0, Ord::max)
178 .await
179 + 1
180 ));
181
182 fs::rename(&backup_tmp_dir, &backup_data_dir).await?;
184 drop(backup_lock);
185
186 Ok(backup_data_dir)
187 }
188}
189
190static ARCHIVE_MODE: config::Parameter = config::Parameter("archive_mode");
193static ARCHIVE_COMMAND: config::Parameter = config::Parameter("archive_command");
194static ARCHIVE_LIBRARY: config::Parameter = config::Parameter("archive_library");
195static WAL_LEVEL: config::Parameter = config::Parameter("wal_level");
196
197pub static BACKUP_DATA_PREFIX: &str = "data.";
199
200static BACKUP_DATA_PREFIX_TMP: &str = ".tmp.data.";
202
203static BACKUP_LOCK_NAME: &str = ".lock";
205
206#[derive(thiserror::Error, miette::Diagnostic, Debug)]
209pub enum BackupError {
210 #[error("Input/output error")]
211 IoError(#[from] io::Error),
212 #[error("Shell error: {0}")]
213 GeneralError(String),
214 #[error("Configuration error: {0}")]
215 ConfigError(String),
216 #[error(transparent)]
217 CoordinateError(#[from] coordinate::CoordinateError<cluster::ClusterError>),
218 #[error(transparent)]
219 ClusterError(#[from] cluster::ClusterError),
220 #[error("External command failed: {0:?}")]
221 CommandError(ExitStatus),
222 #[error("Database error")]
223 SqlxError(#[from] cluster::sqlx::Error),
224}
225
226impl From<ExitStatus> for BackupError {
227 fn from(error: ExitStatus) -> BackupError {
228 Self::CommandError(error)
229 }
230}