pgdo/
cluster.rs

1//! Create, start, introspect, stop, and destroy PostgreSQL clusters.
2
3pub mod backup;
4pub mod config;
5pub mod resource;
6
7mod error;
8
9use std::ffi::{OsStr, OsString};
10use std::io::{self, Read, Write};
11use std::os::unix::prelude::{OsStrExt, OsStringExt};
12use std::path::{Path, PathBuf};
13use std::process::{Command, ExitStatus, Output};
14use std::{fmt, fs};
15
16use postgres;
17use shell_quote::{QuoteExt, Sh};
18pub use sqlx;
19
20use crate::runtime::{
21    strategy::{Strategy, StrategyLike},
22    Runtime,
23};
24use crate::{
25    coordinate::{
26        self,
27        State::{self, *},
28    },
29    version,
30};
31pub use error::ClusterError;
32
33/// `template0` is always present in a PostgreSQL cluster.
34///
35/// This database is a template database, though it's used to a lesser extent
36/// than `template1`.
37///
38/// `template0` should never be modified so it's rare to connect to this
39/// database, even as a convenient default – see [`DATABASE_TEMPLATE1`] for an
40/// explanation as to why.
41pub static DATABASE_TEMPLATE0: &str = "template0";
42
43/// `template1` is always present in a PostgreSQL cluster.
44///
45/// This database is used as the default template for creating new databases.
46///
47/// Connecting to a database prevents other sessions from creating new databases
48/// using that database as a template; see PostgreSQL's [Template Databases][]
49/// page to learn more about this limitation. Since `template1` is the default
50/// template, connecting to this database prevents other sessions from using a
51/// plain `CREATE DATABASE` command. In other words, it may be a good idea to
52/// connect to this database _only_ when modifying it, not as a default.
53///
54/// [Template Databases]:
55///     https://www.postgresql.org/docs/current/manage-ag-templatedbs.html
56pub static DATABASE_TEMPLATE1: &str = "template1";
57
58/// `postgres` is always created by `initdb` when building a PostgreSQL cluster.
59///
60/// From `initdb(1)`:
61/// > The postgres database is a default database meant for use by users,
62/// > utilities and third party applications.
63///
64/// Given that it can be problematic to connect to `template0` and `template1` –
65/// see [`DATABASE_TEMPLATE1`] for an explanation – `postgres` is a convenient
66/// default, hence this library uses `postgres` as the database from which to
67/// perform administrative tasks, for example.
68///
69/// Unfortunately, `postgres` can be dropped, in which case some of the
70/// functionality of this crate will be broken. Ideally we could connect to a
71/// PostgreSQL cluster without specifying a database, but that is presently not
72/// possible.
73pub static DATABASE_POSTGRES: &str = "postgres";
74
75#[derive(Debug, PartialEq, Eq, Clone)]
76pub enum ClusterStatus {
77    Running,
78    Stopped,
79    Missing,
80}
81
82impl fmt::Display for ClusterStatus {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        match self {
85            ClusterStatus::Running => write!(f, "running"),
86            ClusterStatus::Stopped => write!(f, "stopped"),
87            ClusterStatus::Missing => write!(f, "missing"),
88        }
89    }
90}
91
92/// Representation of a PostgreSQL cluster.
93///
94/// The cluster may not yet exist on disk. It may exist but be stopped, or it
95/// may be running. The methods here can be used to create, start, introspect,
96/// stop, and destroy the cluster. There's no protection against concurrent
97/// changes to the cluster made by other processes, but the functions in the
98/// [`coordinate`][`crate::coordinate`] module may help.
99#[derive(Debug)]
100pub struct Cluster {
101    /// The data directory of the cluster.
102    ///
103    /// Corresponds to the `PGDATA` environment variable.
104    pub datadir: PathBuf,
105    /// How to select the PostgreSQL installation to use with this cluster.
106    pub strategy: Strategy,
107}
108
109impl Cluster {
110    /// Represent a cluster at the given path.
111    pub fn new<P: AsRef<Path>, S: Into<Strategy>>(
112        datadir: P,
113        strategy: S,
114    ) -> Result<Self, ClusterError> {
115        Ok(Self {
116            datadir: datadir.as_ref().to_owned(),
117            strategy: strategy.into(),
118        })
119    }
120
121    /// Determine the runtime to use with this cluster.
122    fn runtime(&self) -> Result<Runtime, ClusterError> {
123        match version(self)? {
124            None => self
125                .strategy
126                .fallback()
127                .ok_or_else(|| ClusterError::RuntimeDefaultNotFound),
128            Some(version) => self
129                .strategy
130                .select(&version.into())
131                .ok_or_else(|| ClusterError::RuntimeNotFound(version)),
132        }
133    }
134
135    /// Return a [`Command`] that will invoke `pg_ctl` with the environment
136    /// referring to this cluster.
137    fn ctl(&self) -> Result<Command, ClusterError> {
138        let mut command = self.runtime()?.execute("pg_ctl");
139        command.env("PGDATA", &self.datadir);
140        command.env("PGHOST", &self.datadir);
141        Ok(command)
142    }
143
144    /// Check if this cluster is running.
145    ///
146    /// Convenient call-through to [`status`][`Self::status`]; only returns
147    /// `true` when the cluster is definitely running.
148    pub fn running(&self) -> Result<bool, ClusterError> {
149        self.status().map(|status| status == ClusterStatus::Running)
150    }
151
152    /// Check the status of this cluster.
153    ///
154    /// Tries to distinguish carefully between "definitely running", "definitely
155    /// not running", "missing", and "don't know". The latter results in
156    /// [`ClusterError`].
157    pub fn status(&self) -> Result<ClusterStatus, ClusterError> {
158        let output = self.ctl()?.arg("status").output()?;
159        let code = match output.status.code() {
160            // Killed by signal; return early.
161            None => return Err(ClusterError::CommandError(output)),
162            // Success; return early (the server is running).
163            Some(0) => return Ok(ClusterStatus::Running),
164            // More work required to decode what this means.
165            Some(code) => code,
166        };
167        let runtime = self.runtime()?;
168        // PostgreSQL has evolved to return different error codes in
169        // later versions, so here we check for specific codes to avoid
170        // masking errors from insufficient permissions or missing
171        // executables, for example.
172        let status = match runtime.version {
173            // PostgreSQL 10.x and later.
174            version::Version::Post10(_major, _minor) => {
175                // PostgreSQL 10
176                // https://www.postgresql.org/docs/10/static/app-pg-ctl.html
177                match code {
178                    // 3 means that the data directory is present and
179                    // accessible but that the server is not running.
180                    3 => Some(ClusterStatus::Stopped),
181                    // 4 means that the data directory is not present or is
182                    // not accessible. If it's missing, then the server is
183                    // not running. If it is present but not accessible
184                    // then crash because we can't know if the server is
185                    // running or not.
186                    4 if !exists(self) => Some(ClusterStatus::Missing),
187                    // For anything else we don't know.
188                    _ => None,
189                }
190            }
191            // PostgreSQL 9.x only.
192            version::Version::Pre10(9, point, _minor) => {
193                // PostgreSQL 9.4+
194                // https://www.postgresql.org/docs/9.4/static/app-pg-ctl.html
195                // https://www.postgresql.org/docs/9.5/static/app-pg-ctl.html
196                // https://www.postgresql.org/docs/9.6/static/app-pg-ctl.html
197                if point >= 4 {
198                    match code {
199                        // 3 means that the data directory is present and
200                        // accessible but that the server is not running.
201                        3 => Some(ClusterStatus::Stopped),
202                        // 4 means that the data directory is not present or is
203                        // not accessible. If it's missing, then the server is
204                        // not running. If it is present but not accessible
205                        // then crash because we can't know if the server is
206                        // running or not.
207                        4 if !exists(self) => Some(ClusterStatus::Missing),
208                        // For anything else we don't know.
209                        _ => None,
210                    }
211                }
212                // PostgreSQL 9.2+
213                // https://www.postgresql.org/docs/9.2/static/app-pg-ctl.html
214                // https://www.postgresql.org/docs/9.3/static/app-pg-ctl.html
215                else if point >= 2 {
216                    match code {
217                        // 3 means that the data directory is present and
218                        // accessible but that the server is not running OR
219                        // that the data directory is not present.
220                        3 if !exists(self) => Some(ClusterStatus::Missing),
221                        3 => Some(ClusterStatus::Stopped),
222                        // For anything else we don't know.
223                        _ => None,
224                    }
225                }
226                // PostgreSQL 9.0+
227                // https://www.postgresql.org/docs/9.0/static/app-pg-ctl.html
228                // https://www.postgresql.org/docs/9.1/static/app-pg-ctl.html
229                else {
230                    match code {
231                        // 1 means that the server is not running OR the data
232                        // directory is not present OR that the data directory
233                        // is not accessible.
234                        1 if !exists(self) => Some(ClusterStatus::Missing),
235                        1 => Some(ClusterStatus::Stopped),
236                        // For anything else we don't know.
237                        _ => None,
238                    }
239                }
240            }
241            // All other versions.
242            version::Version::Pre10(_major, _point, _minor) => None,
243        };
244
245        match status {
246            Some(running) => Ok(running),
247            // TODO: Perhaps include the exit code from `pg_ctl status` in the
248            // error message, and whatever it printed out.
249            None => Err(ClusterError::UnsupportedVersion(runtime.version)),
250        }
251    }
252
253    /// Return the path to the PID file used in this cluster.
254    ///
255    /// The PID file does not necessarily exist.
256    pub fn pidfile(&self) -> PathBuf {
257        self.datadir.join("postmaster.pid")
258    }
259
260    /// Return the path to the log file used in this cluster.
261    ///
262    /// The log file does not necessarily exist.
263    pub fn logfile(&self) -> PathBuf {
264        self.datadir.join("postmaster.log")
265    }
266
267    /// Create the cluster if it does not already exist.
268    pub fn create(&self) -> Result<State, ClusterError> {
269        if exists(self) {
270            // Nothing more to do; the cluster is already in place.
271            Ok(Unmodified)
272        } else {
273            // Create the cluster and report back that we did so.
274            fs::create_dir_all(&self.datadir)?;
275
276            // Construct the `pg_ctl init` command.
277            let mut command = self.ctl()?;
278            #[allow(clippy::suspicious_command_arg_space)]
279            command
280                .arg("init")
281                // Silent; `--silent` flag accepted only in PostgreSQL >=9.2.
282                .arg("-s")
283                // Options for `initdb`; `--options` flag accepted only in PostgreSQL >=10.
284                .arg("-o")
285                // Passing multiple flags in a single `arg(...)` is intentional.
286                // These constitute the single value for the `-o` flag above.
287                .arg("-E utf8 --locale C -A trust")
288                .env("TZ", "UTC");
289
290            bugs::retry_pg_ctl(&mut command, |_| Ok(()))
291        }
292    }
293
294    /// Start the cluster if it's not already running, with the given options.
295    ///
296    /// Returns [`State::Unmodified`] if the cluster is already running, meaning
297    /// the given options were **NOT** applied.
298    pub fn start(
299        &self,
300        options: &[(config::Parameter, config::Value)],
301    ) -> Result<State, ClusterError> {
302        // Ensure that the cluster has been created.
303        self.create()?;
304        // Check if we're running already.
305        if self.running()? {
306            // We didn't start this cluster; say so.
307            return Ok(Unmodified);
308        }
309        // Construct the options that `pg_ctl` will pass through to `postgres`.
310        // These have to be carefully escaped for the target shell – which is
311        // likely to be `sh`. Here's what they mean:
312        //  -h <arg> -- host name; empty arg means Unix socket only.
313        //  -k -- socket directory.
314        //  -c name=value -- set a configuration parameter.
315        let options = {
316            let mut arg: Vec<u8> = b"-h '' -k ".into();
317            arg.push_quoted(Sh, &self.datadir);
318            for (parameter, value) in options {
319                arg.extend(b" -c ");
320                arg.push_quoted(Sh, &format!("{parameter}={value}",));
321            }
322            OsString::from_vec(arg)
323        };
324
325        // Track the logs so that we can see if there's a retryable failure.
326        let logfile = self.logfile();
327        let logfile_name = logfile
328            .file_name()
329            .unwrap_or_else(|| "<unknown.log>".as_ref())
330            .display();
331        let mut log: logfile::LogFile = logfile.as_path().try_into()?;
332
333        // Next, invoke `pg_ctl` to start the cluster.
334        let mut command = self.ctl()?;
335        //  -l <file> -- log file.
336        //  -s -- no informational messages.
337        //  -w -- wait until startup is complete.
338        //  -o <string> -- options to pass through to `postgres`.
339        command
340            .arg("start")
341            .arg("-l")
342            .arg(&logfile)
343            .arg("-s")
344            .arg("-w")
345            .arg("-o")
346            .arg(options);
347
348        // Append new logs to the command's `Output` so that the retry machinery
349        // has visibility of it.
350        let append_logs_to_stderr = |output: &mut Output| {
351            writeln!(&mut output.stderr)?;
352            writeln!(&mut output.stderr, "-- {logfile_name} --")?;
353            log.read_to_end(&mut output.stderr)?;
354            Ok(())
355        };
356
357        bugs::retry_pg_ctl(&mut command, append_logs_to_stderr)
358    }
359
360    /// Connect to this cluster.
361    ///
362    /// When the database is not specified, connects to [`DATABASE_POSTGRES`].
363    fn connect(&self, database: Option<&str>) -> Result<postgres::Client, ClusterError> {
364        let user = crate::util::current_user()?;
365        let host = self.datadir.to_string_lossy(); // postgres crate API limitation.
366        let client = postgres::Client::configure()
367            .host(&host)
368            .dbname(database.unwrap_or(DATABASE_POSTGRES))
369            .user(&user)
370            .connect(postgres::NoTls)?;
371        Ok(client)
372    }
373
374    /// Create a lazy SQLx pool for this cluster.
375    ///
376    /// Although it's possible to call this anywhere, at runtime it needs a
377    /// Tokio context to work, e.g.:
378    ///
379    /// ```rust,no_run
380    /// # use pgdo::cluster::ClusterError;
381    /// # let runtime = pgdo::runtime::strategy::Strategy::default();
382    /// # let cluster = pgdo::cluster::Cluster::new("some/where", runtime)?;
383    /// let tokio = tokio::runtime::Runtime::new()?;
384    /// let rows = tokio.block_on(async {
385    ///   let pool = cluster.pool(None)?;
386    ///   let rows = sqlx::query("SELECT 1").fetch_all(&pool).await?;
387    ///   Ok::<_, ClusterError>(rows)
388    /// })?;
389    /// # Ok::<(), ClusterError>(())
390    /// ```
391    ///
392    /// When the database is not specified, connects to [`DATABASE_POSTGRES`].
393    pub fn pool(&self, database: Option<&str>) -> Result<sqlx::PgPool, ClusterError> {
394        Ok(sqlx::PgPool::connect_lazy_with(
395            sqlx::postgres::PgConnectOptions::new()
396                .socket(&self.datadir)
397                .database(database.unwrap_or(DATABASE_POSTGRES))
398                .username(&crate::util::current_user()?)
399                .application_name("pgdo"),
400        ))
401    }
402
403    /// Return a URL for this cluster, if possible.
404    ///
405    /// It is not possible to return a URL for a cluster when `self.datadir` is
406    /// not valid UTF-8, in which case `Ok(None)` is returned.
407    fn url(&self, database: &str) -> Result<Option<url::Url>, url::ParseError> {
408        match self.datadir.to_str() {
409            Some(datadir) => url::Url::parse_with_params(
410                "postgresql://",
411                [("host", datadir), ("dbname", database)],
412            )
413            .map(Some),
414            None => Ok(None),
415        }
416    }
417
418    /// Run `psql` against this cluster, in the given database.
419    ///
420    /// When the database is not specified, connects to [`DATABASE_POSTGRES`].
421    pub fn shell(&self, database: Option<&str>) -> Result<ExitStatus, ClusterError> {
422        let mut command = self.runtime()?.execute("psql");
423        self.set_env(command.arg("--quiet"), database)?;
424        Ok(command.spawn()?.wait()?)
425    }
426
427    /// Run the given command against this cluster.
428    ///
429    /// The command is run with the `PGDATA`, `PGHOST`, and `PGDATABASE`
430    /// environment variables set appropriately.
431    ///
432    /// When the database is not specified, uses [`DATABASE_POSTGRES`].
433    pub fn exec<T: AsRef<OsStr>>(
434        &self,
435        database: Option<&str>,
436        command: T,
437        args: &[T],
438    ) -> Result<ExitStatus, ClusterError> {
439        let mut command = self.runtime()?.command(command);
440        self.set_env(command.args(args), database)?;
441        Ok(command.spawn()?.wait()?)
442    }
443
444    /// Set the environment variables for this cluster.
445    fn set_env(&self, command: &mut Command, database: Option<&str>) -> Result<(), ClusterError> {
446        let database = database.unwrap_or(DATABASE_POSTGRES);
447
448        // Set a few standard PostgreSQL environment variables.
449        command.env("PGDATA", &self.datadir);
450        command.env("PGHOST", &self.datadir);
451        command.env("PGDATABASE", database);
452
453        // Set `DATABASE_URL` if `self.datadir` is valid UTF-8, otherwise ensure
454        // that `DATABASE_URL` is erased from the command's environment.
455        match self.url(database)? {
456            Some(url) => command.env("DATABASE_URL", url.as_str()),
457            None => command.env_remove("DATABASE_URL"),
458        };
459
460        Ok(())
461    }
462
463    /// The names of databases in this cluster.
464    pub fn databases(&self) -> Result<Vec<String>, ClusterError> {
465        let mut conn = self.connect(None)?;
466        let rows = conn.query(
467            "SELECT datname FROM pg_catalog.pg_database ORDER BY datname",
468            &[],
469        )?;
470        let datnames: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
471        Ok(datnames)
472    }
473
474    /// Create the named database.
475    ///
476    /// Returns [`Unmodified`] if the database already exists, otherwise it
477    /// returns [`Modified`].
478    pub fn createdb(&self, database: &str) -> Result<State, ClusterError> {
479        use postgres::error::SqlState;
480        let statement = format!(
481            "CREATE DATABASE {}",
482            postgres_protocol::escape::escape_identifier(database)
483        );
484        match self.connect(None)?.execute(statement.as_str(), &[]) {
485            Err(err) if err.code() == Some(&SqlState::DUPLICATE_DATABASE) => Ok(Unmodified),
486            Err(err) => Err(err)?,
487            Ok(_) => Ok(Modified),
488        }
489    }
490
491    /// Drop the named database.
492    ///
493    /// Returns [`Unmodified`] if the database does not exist, otherwise it
494    /// returns [`Modified`].
495    pub fn dropdb(&self, database: &str) -> Result<State, ClusterError> {
496        use postgres::error::SqlState;
497        let statement = format!(
498            "DROP DATABASE {}",
499            postgres_protocol::escape::escape_identifier(database)
500        );
501        match self.connect(None)?.execute(statement.as_str(), &[]) {
502            Err(err) if err.code() == Some(&SqlState::UNDEFINED_DATABASE) => Ok(Unmodified),
503            Err(err) => Err(err)?,
504            Ok(_) => Ok(Modified),
505        }
506    }
507
508    /// Stop the cluster if it's running.
509    pub fn stop(&self) -> Result<State, ClusterError> {
510        // If the cluster's not already running, don't do anything.
511        if !self.running()? {
512            return Ok(Unmodified);
513        }
514        // pg_ctl options:
515        //  -w -- wait for shutdown to complete.
516        //  -m <mode> -- shutdown mode.
517        let output = self
518            .ctl()?
519            .arg("stop")
520            .arg("-s")
521            .arg("-w")
522            .arg("-m")
523            .arg("fast")
524            .output()?;
525
526        if output.status.success() {
527            Ok(Modified) // We did actually stop the cluster; say so.
528        } else {
529            Err(ClusterError::CommandError(output))
530        }
531    }
532
533    /// Destroy the cluster if it exists, after stopping it.
534    pub fn destroy(&self) -> Result<State, ClusterError> {
535        self.stop()?;
536        match fs::remove_dir_all(&self.datadir) {
537            Ok(()) => Ok(Modified),
538            Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(Unmodified),
539            Err(err) => Err(err)?,
540        }
541    }
542}
543
544impl AsRef<Path> for Cluster {
545    fn as_ref(&self) -> &Path {
546        &self.datadir
547    }
548}
549
550/// A fairly simplistic but quick check: does the directory exist and does it
551/// look like a PostgreSQL cluster data directory, i.e. does it contain a file
552/// named `PG_VERSION`?
553///
554/// [`version()`] provides a more reliable measure, plus yields the version of
555/// PostgreSQL required to use the cluster.
556pub fn exists<P: AsRef<Path>>(datadir: P) -> bool {
557    let datadir = datadir.as_ref();
558    datadir.is_dir() && datadir.join("PG_VERSION").is_file()
559}
560
561/// Yields the version of PostgreSQL required to use a cluster.
562///
563/// This returns the version from the file named `PG_VERSION` in the data
564/// directory if it exists, otherwise this returns `None`. For PostgreSQL
565/// versions before 10 this is typically (maybe always) the major and point
566/// version, e.g. 9.4 rather than 9.4.26. For version 10 and above it appears to
567/// be just the major number, e.g. 14 rather than 14.2.
568pub fn version<P: AsRef<Path>>(
569    datadir: P,
570) -> Result<Option<version::PartialVersion>, ClusterError> {
571    let version_file = datadir.as_ref().join("PG_VERSION");
572    match std::fs::read_to_string(version_file) {
573        Ok(version) => Ok(Some(version.parse()?)),
574        Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
575        Err(err) => Err(err)?,
576    }
577}
578
579/// Determine the names of superuser roles in a cluster (that can log in).
580///
581/// It may not be possible to even connect to a running cluster when you don't
582/// know a role to use.
583///
584/// This gets around the problem by launching the cluster in single-user mode
585/// and matching the output of a single query of the `pg_roles` table. It's
586/// hacky and fragile but it may work for you.
587///
588/// If no superusers are found, this returns an error containing the output from
589/// the `postgres` process.
590///
591/// # Panics
592///
593/// This function panics if the regular expression used to match the output does
594/// not compile; that's a bug and should never occur in a release build.
595///
596/// It can also panic if the thread that writes to the single-user `postgres`
597/// process itself panics, but under normal circumstances that also should never
598/// happen.
599///
600pub fn determine_superuser_role_names(
601    cluster: &Cluster,
602) -> Result<std::collections::HashSet<String>, ClusterError> {
603    use regex::Regex;
604    use std::io::Write;
605    use std::panic::panic_any;
606    use std::process::Stdio;
607    use std::sync::LazyLock;
608
609    static QUERY: &[u8] = b"select rolname from pg_roles where rolsuper and rolcanlogin\n";
610    static RE: LazyLock<Regex> = LazyLock::new(|| {
611        Regex::new(r#"\brolname\s*=\s*"(.+)""#)
612            .expect("invalid regex (for matching single-user role names)")
613    });
614
615    let mut child = cluster
616        .runtime()?
617        .execute("postgres")
618        .arg("--single")
619        .arg("-D")
620        .arg(&cluster.datadir)
621        .arg("postgres")
622        .stdin(Stdio::piped())
623        .stdout(Stdio::piped())
624        .stderr(Stdio::piped())
625        .spawn()?;
626
627    let mut stdin = child.stdin.take().expect("could not take stdin");
628    let writer = std::thread::spawn(move || stdin.write_all(QUERY));
629    let output = child.wait_with_output()?;
630    let stdout = String::from_utf8_lossy(&output.stdout);
631    let superusers: std::collections::HashSet<_> = RE
632        .captures_iter(&stdout)
633        .filter_map(|capture| capture.get(1))
634        .map(|m| m.as_str().to_owned())
635        .collect();
636
637    match writer.join() {
638        Err(err) => panic_any(err),
639        Ok(result) => result?,
640    }
641
642    if superusers.is_empty() {
643        return Err(ClusterError::CommandError(output));
644    }
645
646    Ok(superusers)
647}
648
649pub type Options<'a> = &'a [(config::Parameter<'a>, config::Value)];
650
651/// [`Cluster`] can be coordinated.
652impl coordinate::Subject for Cluster {
653    type Error = ClusterError;
654    type Options<'a> = Options<'a>;
655
656    fn start(&self, options: Self::Options<'_>) -> Result<State, Self::Error> {
657        self.start(options)
658    }
659
660    fn stop(&self) -> Result<State, Self::Error> {
661        self.stop()
662    }
663
664    fn destroy(&self) -> Result<State, Self::Error> {
665        self.destroy()
666    }
667
668    fn exists(&self) -> Result<bool, Self::Error> {
669        Ok(exists(self))
670    }
671
672    fn running(&self) -> Result<bool, Self::Error> {
673        self.running()
674    }
675}
676
677#[allow(clippy::unreadable_literal)]
678const UUID_NS: uuid::Uuid = uuid::Uuid::from_u128(93875103436633470414348750305797058811);
679
680pub type ClusterGuard = coordinate::guard::Guard<Cluster>;
681
682/// Create and start a cluster at the given path, with the given options.
683///
684/// Uses the default runtime strategy. Returns a guard which will stop the
685/// cluster when it's dropped.
686pub fn run<P: AsRef<Path>>(
687    path: P,
688    options: Options<'_>,
689) -> Result<ClusterGuard, coordinate::CoordinateError<ClusterError>> {
690    let path = path.as_ref();
691    // We have to create the data directory so that we can canonicalize its
692    // location. This is because we use the data directory's path as the basis
693    // for the lock file's name. This is duplicative – `Cluster::create` also
694    // creates the data directory – but necessary.
695    fs::create_dir_all(path)?;
696    let path = path.canonicalize()?;
697
698    let strategy = crate::runtime::strategy::Strategy::default();
699    let cluster = crate::cluster::Cluster::new(&path, strategy)?;
700
701    let lock_name = path.as_os_str().as_bytes();
702    let lock_uuid = uuid::Uuid::new_v5(&UUID_NS, lock_name);
703    let lock = crate::lock::UnlockedFile::try_from(&lock_uuid)?;
704
705    ClusterGuard::startup(lock, cluster, options)
706}
707
708// ----------------------------------------------------------------------
709
710mod logfile {
711    use std::fs::File;
712    use std::io::{self, ErrorKind::NotFound, Read, Seek};
713    use std::path::{Path, PathBuf};
714
715    /// Abstraction for reading a log file that may be appended to repeatedly.
716    ///
717    /// For example, `postmaster.log` may be written to each time we call
718    /// `pg_ctl start`. If it succeeds, a spawned `postgres` process will
719    /// continue writing to the log indefinitely, even after `pg_ctl` finishes.
720    /// If it fails, however, it will write out the cause to this log. It's this
721    /// information that we want to capture so that we can use it for deciding
722    /// whether or not to retry a call to `pg_ctl`.
723    ///
724    /// The log file does not need to exist when creating a new [`LogFile`], and
725    /// it will not create it. Its [`Read::read`] implementation will return
726    /// `Ok(0)` as long as the log file does not exist.
727    ///
728    /// Limitations of [`LogFile`]:
729    /// - It keeps the file handle open, meaning that if the log file is deleted
730    ///   and reopened, this will not see new logs.
731    /// - It will not detect if the log is truncated; again, this will not see
732    ///   new logs.
733    pub struct LogFile {
734        path: PathBuf,
735        file: Option<File>,
736    }
737
738    impl TryFrom<&Path> for LogFile {
739        type Error = io::Error;
740
741        fn try_from(path: &Path) -> Result<Self, Self::Error> {
742            let file = match File::open(path) {
743                Err(err) if err.kind() == NotFound => None,
744                Err(err) => Err(err)?,
745                Ok(mut file) => {
746                    file.seek(io::SeekFrom::End(0))?;
747                    Some(file)
748                }
749            };
750            Ok(LogFile { path: path.to_owned(), file })
751        }
752    }
753
754    impl Read for LogFile {
755        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
756            if let Some(ref mut file) = self.file {
757                file.read(buf)
758            } else {
759                self.file = match File::open(&self.path) {
760                    Err(err) if err.kind() == NotFound => return Ok(0),
761                    Err(err) => Err(err)?,
762                    Ok(file) => Some(file),
763                };
764                match self.file {
765                    Some(ref mut file) => file.read(buf),
766                    None => Ok(0),
767                }
768            }
769        }
770    }
771}
772
773mod bugs {
774    use super::{ClusterError, State, State::Modified};
775    use regex::bytes::Regex;
776    use std::process::{Command, Output};
777    use std::{fmt::Write, sync, time::Duration};
778
779    /// Work around bugs in `pg_ctl`.
780    ///
781    /// In all presently released versions of PostgreSQL, `pg_ctl init` can fail
782    /// on some platforms (macOS, some BSDs) due to a semget(2) failure. This is
783    /// a particular problem in PostgreSQL 17.x (see the linked bug report below
784    /// to understand why). We try to work around this by retrying the command a
785    /// few times with a short delay between each attempt.
786    ///
787    /// We detect whether we want to retry by checking `pg_ctl`'s output for log
788    /// messages like these:
789    ///
790    ///   FATAL:  could not create semaphores: Invalid argument DETAIL: Failed
791    ///   system call was semget(189980241, 20, 03600).
792    ///
793    /// See the [initial bug
794    /// report](https://www.postgresql.org/message-id/CALL7chmzY3eXHA7zHnODUVGZLSvK3wYCSP0RmcDFHJY8f28Q3g@mail.gmail.com)
795    /// and its thread for _much_ more detail.
796    pub fn retry_pg_ctl(
797        command: &mut Command,
798        mut supplement: impl FnMut(&mut Output) -> Result<(), ClusterError>,
799    ) -> Result<State, ClusterError> {
800        static SEMGET_BUG_LOG: sync::Once = sync::Once::new();
801        static SEMGET_BUG_RE1: sync::LazyLock<Regex> = sync::LazyLock::new(|| {
802            Regex::new(r"\bFATAL:\s+could not create semaphores: Invalid argument\b")
803                .expect("invalid regex (for matching semaphore errors #1)")
804        });
805        static SEMGET_BUG_RE2: sync::LazyLock<Regex> = sync::LazyLock::new(|| {
806            Regex::new(r"\bDETAIL:\s+Failed system call was semget\b")
807                .expect("invalid regex (for matching semaphore errors #2)")
808        });
809
810        fn is_retryable(logs: &[u8]) -> bool {
811            SEMGET_BUG_RE1.is_match(logs) && SEMGET_BUG_RE2.is_match(logs)
812        }
813
814        // Capture which `pg_ctl` command we're running – assume it's the first
815        // argument – for use with `notify` later on.
816        let command_summary = command.get_args().take(1).fold(
817            command.get_program().display().to_string(),
818            |mut summary, arg| {
819                write!(&mut summary, " {}", arg.display()).ok();
820                summary
821            },
822        );
823
824        // Function that attempts to run `command`.
825        let run = || {
826            use backoff::Error;
827            use ClusterError::{CommandError, IoError};
828            match command.output() {
829                Ok(output) if output.status.success() => Ok(Modified),
830                Ok(mut output) => {
831                    supplement(&mut output)?;
832                    if is_retryable(output.stderr.as_slice()) {
833                        Err(Error::transient(CommandError(output)))
834                    } else {
835                        Err(Error::permanent(CommandError(output)))
836                    }
837                }
838                Err(err) => Err(Error::permanent(IoError(err))),
839            }
840        };
841
842        // Function that notifies when `pg_ctl` fails transiently.
843        let notify = move |_, delay: Duration| {
844            SEMGET_BUG_LOG.call_once(|| {
845                log::info!(concat!(
846                    "In all presently released versions of PostgreSQL, `pg_ctl` can fail on ",
847                    "some platforms (macOS, some BSDs) due to a semget(2) failure. This is a ",
848                    "particular problem in PostgreSQL 17.x (the linked bug report has more ",
849                    "information). As an imperfect workaround, `pgdo` retries the command a ",
850                    "few times when it detects this specific error. Original bug report: ",
851                    "https://www.postgresql.org/message-id/CALL7chmzY3eXHA7zHnODUVGZLSvK3wYCSP0RmcDFHJY8f28Q3g@mail.gmail.com.",
852                ));
853            });
854            log::warn!("`{command_summary}` failed; retrying in {delay:?}…",);
855        };
856
857        // Retry with exponential backoff + jitter.
858        let state = backoff::retry_notify(
859            backoff::ExponentialBackoffBuilder::new()
860                .with_initial_interval(Duration::from_millis(200))
861                .with_max_elapsed_time(Some(Duration::from_secs(60)))
862                .with_max_interval(Duration::from_millis(10000))
863                .build(),
864            run,
865            notify,
866        )
867        .map_err(|err| match err {
868            backoff::Error::Permanent(err) => err,
869            backoff::Error::Transient { err, .. } => err,
870        })?;
871
872        Ok(state)
873    }
874}