pgdo/cluster/
backup.rs

1use std::{
2    ffi::OsStr,
3    io,
4    path::{Path, PathBuf},
5    process::ExitStatus,
6};
7
8use either::{Left, Right};
9use tempfile::TempDir;
10use tokio::{fs, task::block_in_place};
11use tokio_stream::{wrappers::ReadDirStream, StreamExt};
12
13use super::{config, resource::HeldResource};
14use crate::{cluster, coordinate, lock};
15
16// ----------------------------------------------------------------------------
17
18#[derive(Debug)]
19pub struct Backup {
20    pub backup_dir: PathBuf,
21    pub backup_wal_dir: PathBuf,
22}
23
24impl Backup {
25    /// Creates the destination directory and the WAL archive directory if these
26    /// do not exist, and allocates a temporary location for the base backup.
27    pub async fn prepare<D: AsRef<Path>>(backup_dir: D) -> Result<Self, BackupError> {
28        fs::create_dir_all(&backup_dir).await?;
29        let backup_dir = backup_dir.as_ref().canonicalize()?;
30        let backup_wal_dir = backup_dir.join("wal");
31        fs::create_dir_all(&backup_wal_dir).await?;
32        Ok(Self { backup_dir, backup_wal_dir })
33    }
34
35    /// Configures the cluster for continuous archiving.
36    ///
37    /// Returns a flag indicating if the cluster must be restarted for changes
38    /// to take effect. If the cluster is already configured appropriately, this
39    /// does nothing (and returns `false`).
40    pub async fn do_configure_archiving(
41        &self,
42        resource: &'_ HeldResource,
43        archive_command: &str,
44    ) -> Result<bool, BackupError> {
45        let pool = match resource {
46            Left(resource) => resource.facet().pool(None),
47            Right(resource) => resource.facet().pool(None),
48        }?;
49        let mut restart: bool = false;
50
51        // Ensure that `wal_level` is set to `replica` or `logical`. If not,
52        // set it to `replica`.
53        match WAL_LEVEL.get(&pool).await? {
54            Some(config::Value::String(level)) if level == "replica" || level == "logical" => {
55                log::debug!("{WAL_LEVEL:?} already set to {level:?}");
56            }
57            Some(_) => {
58                log::info!("Setting {WAL_LEVEL:?} to 'replica'");
59                WAL_LEVEL.set(&pool, "replica").await?;
60                restart = true;
61            }
62            None => {
63                return Err(BackupError::ConfigError(
64                    "WAL is not supported; cannot proceed".into(),
65                ))
66            }
67        }
68
69        // Ensure that `archive_mode` is set to `on` or `always`. If not,
70        // set it to `on`.
71        match ARCHIVE_MODE.get(&pool).await? {
72            Some(config::Value::String(level)) if level == "on" || level == "always" => {
73                log::debug!("{ARCHIVE_MODE:?} already set to {level:?}");
74            }
75            Some(_) => {
76                log::info!("Setting {ARCHIVE_MODE:?} to 'on'");
77                ARCHIVE_MODE.set(&pool, "on").await?;
78                restart = true;
79            }
80            None => {
81                return Err(BackupError::ConfigError(
82                    "Archiving is not supported; cannot proceed".into(),
83                ))
84            }
85        }
86
87        // We can't set `archive_command` if `archive_library` is already set.
88        match ARCHIVE_LIBRARY.get(&pool).await? {
89            Some(config::Value::String(library)) if library.is_empty() => {
90                log::debug!("{ARCHIVE_LIBRARY:?} not set (good for us)");
91            }
92            Some(archive_library) => {
93                return Err(BackupError::ConfigError(format!(
94                    "{ARCHIVE_LIBRARY:?} is already set to {archive_library:?}; cannot proceed"
95                )))
96            }
97            None => {
98                log::debug!("{ARCHIVE_LIBRARY:?} is not supported (good for us)");
99            }
100        }
101
102        match ARCHIVE_COMMAND.get(&pool).await? {
103            Some(config::Value::String(command)) if command == archive_command => {
104                log::debug!("{ARCHIVE_COMMAND:?} already set to {archive_command:?}");
105            }
106            // Re. "(disabled)", see `show_archive_command` in xlog.c.
107            Some(config::Value::String(command))
108                if command.is_empty() || command == "(disabled)" =>
109            {
110                log::info!("Setting {ARCHIVE_COMMAND:?} to {archive_command:?}");
111                ARCHIVE_COMMAND.set(&pool, archive_command).await?;
112            }
113            Some(archive_command) => {
114                return Err(BackupError::ConfigError(format!(
115                    "{ARCHIVE_COMMAND:?} is already set to {archive_command:?}; cannot proceed"
116                )))
117            }
118            None => {
119                return Err(BackupError::ConfigError(
120                    "Archiving is not supported; cannot proceed".into(),
121                ))
122            }
123        }
124
125        Ok(restart)
126    }
127
128    /// Performs a "base backup" of the cluster.
129    ///
130    /// Returns the directory into which the backup has been created. This is
131    /// always a subdirectory of [`self.backup_dir`].
132    ///
133    /// This must be performed _after_ configuring continuous archiving (see
134    /// [`Backup::do_configure_archiving`]).
135    pub async fn do_base_backup(&self, resource: &'_ HeldResource) -> Result<PathBuf, BackupError> {
136        // Temporary location into which we'll make the base backup.
137        let backup_tmp_dir =
138            block_in_place(|| TempDir::with_prefix_in(BACKUP_DATA_PREFIX_TMP, &self.backup_dir))?;
139
140        let args: &[&OsStr] = &[
141            "--pgdata".as_ref(),
142            backup_tmp_dir.path().as_ref(),
143            "--format".as_ref(),
144            "plain".as_ref(),
145            "--progress".as_ref(),
146        ];
147        let status = block_in_place(|| match resource {
148            Left(resource) => resource.facet().exec(None, "pg_basebackup".as_ref(), args),
149            Right(resource) => resource.facet().exec(None, "pg_basebackup".as_ref(), args),
150        })?;
151        if !status.success() {
152            Err(status)?;
153        }
154        // Before calculating the target directory name or doing the actual
155        // rename, take out a coordinating lock in `backup_dir`.
156        let backup_lock = block_in_place(|| {
157            lock::UnlockedFile::try_from(&self.backup_dir.join(BACKUP_LOCK_NAME))?
158                .lock_exclusive()
159                .map_err(coordinate::CoordinateError::UnixError)
160        })?;
161
162        // Where we're going to move the new backup to. This is always a
163        // directory named `{BACKUP_DATA_PREFIX}.NNNNNNNNNN` where NNNNNNNNNN is
164        // a zero-padded integer, the next available in `destination`.
165        let backup_data_dir = self.backup_dir.join(format!(
166            "{BACKUP_DATA_PREFIX}{:010}",
167            ReadDirStream::new(fs::read_dir(&self.backup_dir).await?)
168                .filter_map(Result::ok)
169                .filter_map(|entry| match entry.file_name().to_str() {
170                    Some(name) if name.starts_with(BACKUP_DATA_PREFIX) =>
171                        name[BACKUP_DATA_PREFIX.len()..].parse::<u32>().ok(),
172                    Some(_) | None => None,
173                })
174                .fold(0, Ord::max)
175                .await
176                + 1
177        ));
178
179        // Do the rename.
180        fs::rename(&backup_tmp_dir, &backup_data_dir).await?;
181        drop(backup_lock);
182
183        Ok(backup_data_dir)
184    }
185}
186
187// ----------------------------------------------------------------------------
188
189static ARCHIVE_MODE: config::Parameter = config::Parameter("archive_mode");
190static ARCHIVE_COMMAND: config::Parameter = config::Parameter("archive_command");
191static ARCHIVE_LIBRARY: config::Parameter = config::Parameter("archive_library");
192static WAL_LEVEL: config::Parameter = config::Parameter("wal_level");
193
194// Successful backups have this directory name prefix.
195pub static BACKUP_DATA_PREFIX: &str = "data.";
196
197// In-progress backups have this directory name prefix.
198static BACKUP_DATA_PREFIX_TMP: &str = ".tmp.data.";
199
200// Coordinating lock for working in the backup directory.
201static BACKUP_LOCK_NAME: &str = ".lock";
202
203// ----------------------------------------------------------------------------
204
205#[derive(thiserror::Error, miette::Diagnostic, Debug)]
206pub enum BackupError {
207    #[error("Input/output error")]
208    IoError(#[from] io::Error),
209    #[error("Shell error: {0}")]
210    GeneralError(String),
211    #[error("Configuration error: {0}")]
212    ConfigError(String),
213    #[error(transparent)]
214    CoordinateError(#[from] coordinate::CoordinateError<cluster::ClusterError>),
215    #[error(transparent)]
216    ClusterError(#[from] cluster::ClusterError),
217    #[error("External command failed: {0:?}")]
218    CommandError(ExitStatus),
219    #[error("Database error")]
220    SqlxError(#[from] cluster::sqlx::Error),
221}
222
223impl From<ExitStatus> for BackupError {
224    fn from(error: ExitStatus) -> BackupError {
225        Self::CommandError(error)
226    }
227}