1use std::{
2 ffi::OsStr,
3 io,
4 path::{Path, PathBuf},
5 process::ExitStatus,
6};
7
8use either::{Left, Right};
9use tempfile::TempDir;
10use tokio::{fs, task::block_in_place};
11use tokio_stream::{wrappers::ReadDirStream, StreamExt};
12
13use super::{config, resource::HeldResource};
14use crate::{cluster, coordinate, lock};
15
16#[derive(Debug)]
19pub struct Backup {
20 pub backup_dir: PathBuf,
21 pub backup_wal_dir: PathBuf,
22}
23
24impl Backup {
25 pub async fn prepare<D: AsRef<Path>>(backup_dir: D) -> Result<Self, BackupError> {
28 fs::create_dir_all(&backup_dir).await?;
29 let backup_dir = backup_dir.as_ref().canonicalize()?;
30 let backup_wal_dir = backup_dir.join("wal");
31 fs::create_dir_all(&backup_wal_dir).await?;
32 Ok(Self { backup_dir, backup_wal_dir })
33 }
34
35 pub async fn do_configure_archiving(
41 &self,
42 resource: &'_ HeldResource,
43 archive_command: &str,
44 ) -> Result<bool, BackupError> {
45 let pool = match resource {
46 Left(resource) => resource.facet().pool(None),
47 Right(resource) => resource.facet().pool(None),
48 }?;
49 let mut restart: bool = false;
50
51 match WAL_LEVEL.get(&pool).await? {
54 Some(config::Value::String(level)) if level == "replica" || level == "logical" => {
55 log::debug!("{WAL_LEVEL:?} already set to {level:?}");
56 }
57 Some(_) => {
58 log::info!("Setting {WAL_LEVEL:?} to 'replica'");
59 WAL_LEVEL.set(&pool, "replica").await?;
60 restart = true;
61 }
62 None => {
63 return Err(BackupError::ConfigError(
64 "WAL is not supported; cannot proceed".into(),
65 ))
66 }
67 }
68
69 match ARCHIVE_MODE.get(&pool).await? {
72 Some(config::Value::String(level)) if level == "on" || level == "always" => {
73 log::debug!("{ARCHIVE_MODE:?} already set to {level:?}");
74 }
75 Some(_) => {
76 log::info!("Setting {ARCHIVE_MODE:?} to 'on'");
77 ARCHIVE_MODE.set(&pool, "on").await?;
78 restart = true;
79 }
80 None => {
81 return Err(BackupError::ConfigError(
82 "Archiving is not supported; cannot proceed".into(),
83 ))
84 }
85 }
86
87 match ARCHIVE_LIBRARY.get(&pool).await? {
89 Some(config::Value::String(library)) if library.is_empty() => {
90 log::debug!("{ARCHIVE_LIBRARY:?} not set (good for us)");
91 }
92 Some(archive_library) => {
93 return Err(BackupError::ConfigError(format!(
94 "{ARCHIVE_LIBRARY:?} is already set to {archive_library:?}; cannot proceed"
95 )))
96 }
97 None => {
98 log::debug!("{ARCHIVE_LIBRARY:?} is not supported (good for us)");
99 }
100 }
101
102 match ARCHIVE_COMMAND.get(&pool).await? {
103 Some(config::Value::String(command)) if command == archive_command => {
104 log::debug!("{ARCHIVE_COMMAND:?} already set to {archive_command:?}");
105 }
106 Some(config::Value::String(command))
108 if command.is_empty() || command == "(disabled)" =>
109 {
110 log::info!("Setting {ARCHIVE_COMMAND:?} to {archive_command:?}");
111 ARCHIVE_COMMAND.set(&pool, archive_command).await?;
112 }
113 Some(archive_command) => {
114 return Err(BackupError::ConfigError(format!(
115 "{ARCHIVE_COMMAND:?} is already set to {archive_command:?}; cannot proceed"
116 )))
117 }
118 None => {
119 return Err(BackupError::ConfigError(
120 "Archiving is not supported; cannot proceed".into(),
121 ))
122 }
123 }
124
125 Ok(restart)
126 }
127
128 pub async fn do_base_backup(&self, resource: &'_ HeldResource) -> Result<PathBuf, BackupError> {
136 let backup_tmp_dir =
138 block_in_place(|| TempDir::with_prefix_in(BACKUP_DATA_PREFIX_TMP, &self.backup_dir))?;
139
140 let args: &[&OsStr] = &[
141 "--pgdata".as_ref(),
142 backup_tmp_dir.path().as_ref(),
143 "--format".as_ref(),
144 "plain".as_ref(),
145 "--progress".as_ref(),
146 ];
147 let status = block_in_place(|| match resource {
148 Left(resource) => resource.facet().exec(None, "pg_basebackup".as_ref(), args),
149 Right(resource) => resource.facet().exec(None, "pg_basebackup".as_ref(), args),
150 })?;
151 if !status.success() {
152 Err(status)?;
153 }
154 let backup_lock = block_in_place(|| {
157 lock::UnlockedFile::try_from(&self.backup_dir.join(BACKUP_LOCK_NAME))?
158 .lock_exclusive()
159 .map_err(coordinate::CoordinateError::UnixError)
160 })?;
161
162 let backup_data_dir = self.backup_dir.join(format!(
166 "{BACKUP_DATA_PREFIX}{:010}",
167 ReadDirStream::new(fs::read_dir(&self.backup_dir).await?)
168 .filter_map(Result::ok)
169 .filter_map(|entry| match entry.file_name().to_str() {
170 Some(name) if name.starts_with(BACKUP_DATA_PREFIX) =>
171 name[BACKUP_DATA_PREFIX.len()..].parse::<u32>().ok(),
172 Some(_) | None => None,
173 })
174 .fold(0, Ord::max)
175 .await
176 + 1
177 ));
178
179 fs::rename(&backup_tmp_dir, &backup_data_dir).await?;
181 drop(backup_lock);
182
183 Ok(backup_data_dir)
184 }
185}
186
187static ARCHIVE_MODE: config::Parameter = config::Parameter("archive_mode");
190static ARCHIVE_COMMAND: config::Parameter = config::Parameter("archive_command");
191static ARCHIVE_LIBRARY: config::Parameter = config::Parameter("archive_library");
192static WAL_LEVEL: config::Parameter = config::Parameter("wal_level");
193
194pub static BACKUP_DATA_PREFIX: &str = "data.";
196
197static BACKUP_DATA_PREFIX_TMP: &str = ".tmp.data.";
199
200static BACKUP_LOCK_NAME: &str = ".lock";
202
203#[derive(thiserror::Error, miette::Diagnostic, Debug)]
206pub enum BackupError {
207 #[error("Input/output error")]
208 IoError(#[from] io::Error),
209 #[error("Shell error: {0}")]
210 GeneralError(String),
211 #[error("Configuration error: {0}")]
212 ConfigError(String),
213 #[error(transparent)]
214 CoordinateError(#[from] coordinate::CoordinateError<cluster::ClusterError>),
215 #[error(transparent)]
216 ClusterError(#[from] cluster::ClusterError),
217 #[error("External command failed: {0:?}")]
218 CommandError(ExitStatus),
219 #[error("Database error")]
220 SqlxError(#[from] cluster::sqlx::Error),
221}
222
223impl From<ExitStatus> for BackupError {
224 fn from(error: ExitStatus) -> BackupError {
225 Self::CommandError(error)
226 }
227}