pgdo/cluster.rs
1//! Create, start, introspect, stop, and destroy PostgreSQL clusters.
2
3pub mod backup;
4pub mod config;
5pub mod resource;
6
7mod error;
8
9use std::ffi::{OsStr, OsString};
10use std::io::{self, Read, Write};
11use std::os::unix::prelude::{OsStrExt, OsStringExt};
12use std::path::{Path, PathBuf};
13use std::process::{Command, ExitStatus, Output};
14use std::{fmt, fs};
15
16use postgres;
17use shell_quote::{QuoteExt, Sh};
18pub use sqlx;
19
20use crate::runtime::{
21 strategy::{Strategy, StrategyLike},
22 Runtime,
23};
24use crate::{
25 coordinate::{
26 self,
27 State::{self, *},
28 },
29 version,
30};
31pub use error::ClusterError;
32
33/// `template0` is always present in a PostgreSQL cluster.
34///
35/// This database is a template database, though it's used to a lesser extent
36/// than `template1`.
37///
38/// `template0` should never be modified so it's rare to connect to this
39/// database, even as a convenient default – see [`DATABASE_TEMPLATE1`] for an
40/// explanation as to why.
41pub static DATABASE_TEMPLATE0: &str = "template0";
42
43/// `template1` is always present in a PostgreSQL cluster.
44///
45/// This database is used as the default template for creating new databases.
46///
47/// Connecting to a database prevents other sessions from creating new databases
48/// using that database as a template; see PostgreSQL's [Template Databases][]
49/// page to learn more about this limitation. Since `template1` is the default
50/// template, connecting to this database prevents other sessions from using a
51/// plain `CREATE DATABASE` command. In other words, it may be a good idea to
52/// connect to this database _only_ when modifying it, not as a default.
53///
54/// [Template Databases]:
55/// https://www.postgresql.org/docs/current/manage-ag-templatedbs.html
56pub static DATABASE_TEMPLATE1: &str = "template1";
57
58/// `postgres` is always created by `initdb` when building a PostgreSQL cluster.
59///
60/// From `initdb(1)`:
61/// > The postgres database is a default database meant for use by users,
62/// > utilities and third party applications.
63///
64/// Given that it can be problematic to connect to `template0` and `template1` –
65/// see [`DATABASE_TEMPLATE1`] for an explanation – `postgres` is a convenient
66/// default, hence this library uses `postgres` as the database from which to
67/// perform administrative tasks, for example.
68///
69/// Unfortunately, `postgres` can be dropped, in which case some of the
70/// functionality of this crate will be broken. Ideally we could connect to a
71/// PostgreSQL cluster without specifying a database, but that is presently not
72/// possible.
73pub static DATABASE_POSTGRES: &str = "postgres";
74
75#[derive(Debug, PartialEq, Eq, Clone)]
76pub enum ClusterStatus {
77 Running,
78 Stopped,
79 Missing,
80}
81
82impl fmt::Display for ClusterStatus {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 ClusterStatus::Running => write!(f, "running"),
86 ClusterStatus::Stopped => write!(f, "stopped"),
87 ClusterStatus::Missing => write!(f, "missing"),
88 }
89 }
90}
91
92/// Representation of a PostgreSQL cluster.
93///
94/// The cluster may not yet exist on disk. It may exist but be stopped, or it
95/// may be running. The methods here can be used to create, start, introspect,
96/// stop, and destroy the cluster. There's no protection against concurrent
97/// changes to the cluster made by other processes, but the functions in the
98/// [`coordinate`][`crate::coordinate`] module may help.
99#[derive(Debug)]
100pub struct Cluster {
101 /// The data directory of the cluster.
102 ///
103 /// Corresponds to the `PGDATA` environment variable.
104 pub datadir: PathBuf,
105 /// How to select the PostgreSQL installation to use with this cluster.
106 pub strategy: Strategy,
107}
108
109impl Cluster {
110 /// Represent a cluster at the given path.
111 pub fn new<P: AsRef<Path>, S: Into<Strategy>>(
112 datadir: P,
113 strategy: S,
114 ) -> Result<Self, ClusterError> {
115 Ok(Self {
116 datadir: datadir.as_ref().to_owned(),
117 strategy: strategy.into(),
118 })
119 }
120
121 /// Determine the runtime to use with this cluster.
122 fn runtime(&self) -> Result<Runtime, ClusterError> {
123 match version(self)? {
124 None => self
125 .strategy
126 .fallback()
127 .ok_or_else(|| ClusterError::RuntimeDefaultNotFound),
128 Some(version) => self
129 .strategy
130 .select(&version.into())
131 .ok_or_else(|| ClusterError::RuntimeNotFound(version)),
132 }
133 }
134
135 /// Return a [`Command`] that will invoke `pg_ctl` with the environment
136 /// referring to this cluster.
137 fn ctl(&self) -> Result<Command, ClusterError> {
138 let mut command = self.runtime()?.execute("pg_ctl");
139 command.env("PGDATA", &self.datadir);
140 command.env("PGHOST", &self.datadir);
141 Ok(command)
142 }
143
144 /// Check if this cluster is running.
145 ///
146 /// Convenient call-through to [`status`][`Self::status`]; only returns
147 /// `true` when the cluster is definitely running.
148 pub fn running(&self) -> Result<bool, ClusterError> {
149 self.status().map(|status| status == ClusterStatus::Running)
150 }
151
152 /// Check the status of this cluster.
153 ///
154 /// Tries to distinguish carefully between "definitely running", "definitely
155 /// not running", "missing", and "don't know". The latter results in
156 /// [`ClusterError`].
157 pub fn status(&self) -> Result<ClusterStatus, ClusterError> {
158 let output = self.ctl()?.arg("status").output()?;
159 let code = match output.status.code() {
160 // Killed by signal; return early.
161 None => return Err(ClusterError::CommandError(output)),
162 // Success; return early (the server is running).
163 Some(0) => return Ok(ClusterStatus::Running),
164 // More work required to decode what this means.
165 Some(code) => code,
166 };
167 let runtime = self.runtime()?;
168 // PostgreSQL has evolved to return different error codes in
169 // later versions, so here we check for specific codes to avoid
170 // masking errors from insufficient permissions or missing
171 // executables, for example.
172 let status = match runtime.version {
173 // PostgreSQL 10.x and later.
174 version::Version::Post10(_major, _minor) => {
175 // PostgreSQL 10
176 // https://www.postgresql.org/docs/10/static/app-pg-ctl.html
177 match code {
178 // 3 means that the data directory is present and
179 // accessible but that the server is not running.
180 3 => Some(ClusterStatus::Stopped),
181 // 4 means that the data directory is not present or is
182 // not accessible. If it's missing, then the server is
183 // not running. If it is present but not accessible
184 // then crash because we can't know if the server is
185 // running or not.
186 4 if !exists(self) => Some(ClusterStatus::Missing),
187 // For anything else we don't know.
188 _ => None,
189 }
190 }
191 // PostgreSQL 9.x only.
192 version::Version::Pre10(9, point, _minor) => {
193 // PostgreSQL 9.4+
194 // https://www.postgresql.org/docs/9.4/static/app-pg-ctl.html
195 // https://www.postgresql.org/docs/9.5/static/app-pg-ctl.html
196 // https://www.postgresql.org/docs/9.6/static/app-pg-ctl.html
197 if point >= 4 {
198 match code {
199 // 3 means that the data directory is present and
200 // accessible but that the server is not running.
201 3 => Some(ClusterStatus::Stopped),
202 // 4 means that the data directory is not present or is
203 // not accessible. If it's missing, then the server is
204 // not running. If it is present but not accessible
205 // then crash because we can't know if the server is
206 // running or not.
207 4 if !exists(self) => Some(ClusterStatus::Missing),
208 // For anything else we don't know.
209 _ => None,
210 }
211 }
212 // PostgreSQL 9.2+
213 // https://www.postgresql.org/docs/9.2/static/app-pg-ctl.html
214 // https://www.postgresql.org/docs/9.3/static/app-pg-ctl.html
215 else if point >= 2 {
216 match code {
217 // 3 means that the data directory is present and
218 // accessible but that the server is not running OR
219 // that the data directory is not present.
220 3 if !exists(self) => Some(ClusterStatus::Missing),
221 3 => Some(ClusterStatus::Stopped),
222 // For anything else we don't know.
223 _ => None,
224 }
225 }
226 // PostgreSQL 9.0+
227 // https://www.postgresql.org/docs/9.0/static/app-pg-ctl.html
228 // https://www.postgresql.org/docs/9.1/static/app-pg-ctl.html
229 else {
230 match code {
231 // 1 means that the server is not running OR the data
232 // directory is not present OR that the data directory
233 // is not accessible.
234 1 if !exists(self) => Some(ClusterStatus::Missing),
235 1 => Some(ClusterStatus::Stopped),
236 // For anything else we don't know.
237 _ => None,
238 }
239 }
240 }
241 // All other versions.
242 version::Version::Pre10(_major, _point, _minor) => None,
243 };
244
245 match status {
246 Some(running) => Ok(running),
247 // TODO: Perhaps include the exit code from `pg_ctl status` in the
248 // error message, and whatever it printed out.
249 None => Err(ClusterError::UnsupportedVersion(runtime.version)),
250 }
251 }
252
253 /// Return the path to the PID file used in this cluster.
254 ///
255 /// The PID file does not necessarily exist.
256 pub fn pidfile(&self) -> PathBuf {
257 self.datadir.join("postmaster.pid")
258 }
259
260 /// Return the path to the log file used in this cluster.
261 ///
262 /// The log file does not necessarily exist.
263 pub fn logfile(&self) -> PathBuf {
264 self.datadir.join("postmaster.log")
265 }
266
267 /// Create the cluster if it does not already exist.
268 pub fn create(&self) -> Result<State, ClusterError> {
269 if exists(self) {
270 // Nothing more to do; the cluster is already in place.
271 Ok(Unmodified)
272 } else {
273 // Create the cluster and report back that we did so.
274 fs::create_dir_all(&self.datadir)?;
275
276 // Construct the `pg_ctl init` command.
277 let mut command = self.ctl()?;
278 #[allow(clippy::suspicious_command_arg_space)]
279 command
280 .arg("init")
281 // Silent; `--silent` flag accepted only in PostgreSQL >=9.2.
282 .arg("-s")
283 // Options for `initdb`; `--options` flag accepted only in PostgreSQL >=10.
284 .arg("-o")
285 // Passing multiple flags in a single `arg(...)` is intentional.
286 // These constitute the single value for the `-o` flag above.
287 .arg("-E utf8 --locale C -A trust")
288 .env("TZ", "UTC");
289
290 bugs::retry_pg_ctl(&mut command, |_| Ok(()))
291 }
292 }
293
294 /// Start the cluster if it's not already running, with the given options.
295 ///
296 /// Returns [`State::Unmodified`] if the cluster is already running, meaning
297 /// the given options were **NOT** applied.
298 pub fn start(
299 &self,
300 options: &[(config::Parameter, config::Value)],
301 ) -> Result<State, ClusterError> {
302 // Ensure that the cluster has been created.
303 self.create()?;
304 // Check if we're running already.
305 if self.running()? {
306 // We didn't start this cluster; say so.
307 return Ok(Unmodified);
308 }
309 // Construct the options that `pg_ctl` will pass through to `postgres`.
310 // These have to be carefully escaped for the target shell – which is
311 // likely to be `sh`. Here's what they mean:
312 // -h <arg> -- host name; empty arg means Unix socket only.
313 // -k -- socket directory.
314 // -c name=value -- set a configuration parameter.
315 let options = {
316 let mut arg: Vec<u8> = b"-h '' -k ".into();
317 arg.push_quoted(Sh, &self.datadir);
318 for (parameter, value) in options {
319 arg.extend(b" -c ");
320 arg.push_quoted(Sh, &format!("{parameter}={value}",));
321 }
322 OsString::from_vec(arg)
323 };
324
325 // Track the logs so that we can see if there's a retryable failure.
326 let logfile = self.logfile();
327 let logfile_name = logfile
328 .file_name()
329 .unwrap_or_else(|| "<unknown.log>".as_ref())
330 .display();
331 let mut log: logfile::LogFile = logfile.as_path().try_into()?;
332
333 // Next, invoke `pg_ctl` to start the cluster.
334 let mut command = self.ctl()?;
335 // -l <file> -- log file.
336 // -s -- no informational messages.
337 // -w -- wait until startup is complete.
338 // -o <string> -- options to pass through to `postgres`.
339 command
340 .arg("start")
341 .arg("-l")
342 .arg(&logfile)
343 .arg("-s")
344 .arg("-w")
345 .arg("-o")
346 .arg(options);
347
348 // Append new logs to the command's `Output` so that the retry machinery
349 // has visibility of it.
350 let append_logs_to_stderr = |output: &mut Output| {
351 writeln!(&mut output.stderr)?;
352 writeln!(&mut output.stderr, "-- {logfile_name} --")?;
353 log.read_to_end(&mut output.stderr)?;
354 Ok(())
355 };
356
357 bugs::retry_pg_ctl(&mut command, append_logs_to_stderr)
358 }
359
360 /// Connect to this cluster.
361 ///
362 /// When the database is not specified, connects to [`DATABASE_POSTGRES`].
363 fn connect(&self, database: Option<&str>) -> Result<postgres::Client, ClusterError> {
364 let user = crate::util::current_user()?;
365 let host = self.datadir.to_string_lossy(); // postgres crate API limitation.
366 let client = postgres::Client::configure()
367 .host(&host)
368 .dbname(database.unwrap_or(DATABASE_POSTGRES))
369 .user(&user)
370 .connect(postgres::NoTls)?;
371 Ok(client)
372 }
373
374 /// Create a lazy SQLx pool for this cluster.
375 ///
376 /// Although it's possible to call this anywhere, at runtime it needs a
377 /// Tokio context to work, e.g.:
378 ///
379 /// ```rust,no_run
380 /// # use pgdo::cluster::ClusterError;
381 /// # let runtime = pgdo::runtime::strategy::Strategy::default();
382 /// # let cluster = pgdo::cluster::Cluster::new("some/where", runtime)?;
383 /// let tokio = tokio::runtime::Runtime::new()?;
384 /// let rows = tokio.block_on(async {
385 /// let pool = cluster.pool(None)?;
386 /// let rows = sqlx::query("SELECT 1").fetch_all(&pool).await?;
387 /// Ok::<_, ClusterError>(rows)
388 /// })?;
389 /// # Ok::<(), ClusterError>(())
390 /// ```
391 ///
392 /// When the database is not specified, connects to [`DATABASE_POSTGRES`].
393 pub fn pool(&self, database: Option<&str>) -> Result<sqlx::PgPool, ClusterError> {
394 Ok(sqlx::PgPool::connect_lazy_with(
395 sqlx::postgres::PgConnectOptions::new()
396 .socket(&self.datadir)
397 .database(database.unwrap_or(DATABASE_POSTGRES))
398 .username(&crate::util::current_user()?)
399 .application_name("pgdo"),
400 ))
401 }
402
403 /// Return a URL for this cluster, if possible.
404 ///
405 /// It is not possible to return a URL for a cluster when `self.datadir` is
406 /// not valid UTF-8, in which case `Ok(None)` is returned.
407 fn url(&self, database: &str) -> Result<Option<url::Url>, url::ParseError> {
408 match self.datadir.to_str() {
409 Some(datadir) => url::Url::parse_with_params(
410 "postgresql://",
411 [("host", datadir), ("dbname", database)],
412 )
413 .map(Some),
414 None => Ok(None),
415 }
416 }
417
418 /// Run `psql` against this cluster, in the given database.
419 ///
420 /// When the database is not specified, connects to [`DATABASE_POSTGRES`].
421 pub fn shell(&self, database: Option<&str>) -> Result<ExitStatus, ClusterError> {
422 let mut command = self.runtime()?.execute("psql");
423 self.set_env(command.arg("--quiet"), database)?;
424 Ok(command.spawn()?.wait()?)
425 }
426
427 /// Run the given command against this cluster.
428 ///
429 /// The command is run with the `PGDATA`, `PGHOST`, and `PGDATABASE`
430 /// environment variables set appropriately.
431 ///
432 /// When the database is not specified, uses [`DATABASE_POSTGRES`].
433 pub fn exec<T: AsRef<OsStr>>(
434 &self,
435 database: Option<&str>,
436 command: T,
437 args: &[T],
438 ) -> Result<ExitStatus, ClusterError> {
439 let mut command = self.runtime()?.command(command);
440 self.set_env(command.args(args), database)?;
441 Ok(command.spawn()?.wait()?)
442 }
443
444 /// Set the environment variables for this cluster.
445 fn set_env(&self, command: &mut Command, database: Option<&str>) -> Result<(), ClusterError> {
446 let database = database.unwrap_or(DATABASE_POSTGRES);
447
448 // Set a few standard PostgreSQL environment variables.
449 command.env("PGDATA", &self.datadir);
450 command.env("PGHOST", &self.datadir);
451 command.env("PGDATABASE", database);
452
453 // Set `DATABASE_URL` if `self.datadir` is valid UTF-8, otherwise ensure
454 // that `DATABASE_URL` is erased from the command's environment.
455 match self.url(database)? {
456 Some(url) => command.env("DATABASE_URL", url.as_str()),
457 None => command.env_remove("DATABASE_URL"),
458 };
459
460 Ok(())
461 }
462
463 /// The names of databases in this cluster.
464 pub fn databases(&self) -> Result<Vec<String>, ClusterError> {
465 let mut conn = self.connect(None)?;
466 let rows = conn.query(
467 "SELECT datname FROM pg_catalog.pg_database ORDER BY datname",
468 &[],
469 )?;
470 let datnames: Vec<String> = rows.iter().map(|row| row.get(0)).collect();
471 Ok(datnames)
472 }
473
474 /// Create the named database.
475 ///
476 /// Returns [`Unmodified`] if the database already exists, otherwise it
477 /// returns [`Modified`].
478 pub fn createdb(&self, database: &str) -> Result<State, ClusterError> {
479 use postgres::error::SqlState;
480 let statement = format!(
481 "CREATE DATABASE {}",
482 postgres_protocol::escape::escape_identifier(database)
483 );
484 match self.connect(None)?.execute(statement.as_str(), &[]) {
485 Err(err) if err.code() == Some(&SqlState::DUPLICATE_DATABASE) => Ok(Unmodified),
486 Err(err) => Err(err)?,
487 Ok(_) => Ok(Modified),
488 }
489 }
490
491 /// Drop the named database.
492 ///
493 /// Returns [`Unmodified`] if the database does not exist, otherwise it
494 /// returns [`Modified`].
495 pub fn dropdb(&self, database: &str) -> Result<State, ClusterError> {
496 use postgres::error::SqlState;
497 let statement = format!(
498 "DROP DATABASE {}",
499 postgres_protocol::escape::escape_identifier(database)
500 );
501 match self.connect(None)?.execute(statement.as_str(), &[]) {
502 Err(err) if err.code() == Some(&SqlState::UNDEFINED_DATABASE) => Ok(Unmodified),
503 Err(err) => Err(err)?,
504 Ok(_) => Ok(Modified),
505 }
506 }
507
508 /// Stop the cluster if it's running.
509 pub fn stop(&self) -> Result<State, ClusterError> {
510 // If the cluster's not already running, don't do anything.
511 if !self.running()? {
512 return Ok(Unmodified);
513 }
514 // pg_ctl options:
515 // -w -- wait for shutdown to complete.
516 // -m <mode> -- shutdown mode.
517 let output = self
518 .ctl()?
519 .arg("stop")
520 .arg("-s")
521 .arg("-w")
522 .arg("-m")
523 .arg("fast")
524 .output()?;
525
526 if output.status.success() {
527 Ok(Modified) // We did actually stop the cluster; say so.
528 } else {
529 Err(ClusterError::CommandError(output))
530 }
531 }
532
533 /// Destroy the cluster if it exists, after stopping it.
534 pub fn destroy(&self) -> Result<State, ClusterError> {
535 self.stop()?;
536 match fs::remove_dir_all(&self.datadir) {
537 Ok(()) => Ok(Modified),
538 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(Unmodified),
539 Err(err) => Err(err)?,
540 }
541 }
542}
543
544impl AsRef<Path> for Cluster {
545 fn as_ref(&self) -> &Path {
546 &self.datadir
547 }
548}
549
550/// A fairly simplistic but quick check: does the directory exist and does it
551/// look like a PostgreSQL cluster data directory, i.e. does it contain a file
552/// named `PG_VERSION`?
553///
554/// [`version()`] provides a more reliable measure, plus yields the version of
555/// PostgreSQL required to use the cluster.
556pub fn exists<P: AsRef<Path>>(datadir: P) -> bool {
557 let datadir = datadir.as_ref();
558 datadir.is_dir() && datadir.join("PG_VERSION").is_file()
559}
560
561/// Yields the version of PostgreSQL required to use a cluster.
562///
563/// This returns the version from the file named `PG_VERSION` in the data
564/// directory if it exists, otherwise this returns `None`. For PostgreSQL
565/// versions before 10 this is typically (maybe always) the major and point
566/// version, e.g. 9.4 rather than 9.4.26. For version 10 and above it appears to
567/// be just the major number, e.g. 14 rather than 14.2.
568pub fn version<P: AsRef<Path>>(
569 datadir: P,
570) -> Result<Option<version::PartialVersion>, ClusterError> {
571 let version_file = datadir.as_ref().join("PG_VERSION");
572 match std::fs::read_to_string(version_file) {
573 Ok(version) => Ok(Some(version.parse()?)),
574 Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(None),
575 Err(err) => Err(err)?,
576 }
577}
578
579/// Determine the names of superuser roles in a cluster (that can log in).
580///
581/// It may not be possible to even connect to a running cluster when you don't
582/// know a role to use.
583///
584/// This gets around the problem by launching the cluster in single-user mode
585/// and matching the output of a single query of the `pg_roles` table. It's
586/// hacky and fragile but it may work for you.
587///
588/// If no superusers are found, this returns an error containing the output from
589/// the `postgres` process.
590///
591/// # Panics
592///
593/// This function panics if the regular expression used to match the output does
594/// not compile; that's a bug and should never occur in a release build.
595///
596/// It can also panic if the thread that writes to the single-user `postgres`
597/// process itself panics, but under normal circumstances that also should never
598/// happen.
599///
600pub fn determine_superuser_role_names(
601 cluster: &Cluster,
602) -> Result<std::collections::HashSet<String>, ClusterError> {
603 use regex::Regex;
604 use std::io::Write;
605 use std::panic::panic_any;
606 use std::process::Stdio;
607 use std::sync::LazyLock;
608
609 static QUERY: &[u8] = b"select rolname from pg_roles where rolsuper and rolcanlogin\n";
610 static RE: LazyLock<Regex> = LazyLock::new(|| {
611 Regex::new(r#"\brolname\s*=\s*"(.+)""#)
612 .expect("invalid regex (for matching single-user role names)")
613 });
614
615 let mut child = cluster
616 .runtime()?
617 .execute("postgres")
618 .arg("--single")
619 .arg("-D")
620 .arg(&cluster.datadir)
621 .arg("postgres")
622 .stdin(Stdio::piped())
623 .stdout(Stdio::piped())
624 .stderr(Stdio::piped())
625 .spawn()?;
626
627 let mut stdin = child.stdin.take().expect("could not take stdin");
628 let writer = std::thread::spawn(move || stdin.write_all(QUERY));
629 let output = child.wait_with_output()?;
630 let stdout = String::from_utf8_lossy(&output.stdout);
631 let superusers: std::collections::HashSet<_> = RE
632 .captures_iter(&stdout)
633 .filter_map(|capture| capture.get(1))
634 .map(|m| m.as_str().to_owned())
635 .collect();
636
637 match writer.join() {
638 Err(err) => panic_any(err),
639 Ok(result) => result?,
640 }
641
642 if superusers.is_empty() {
643 return Err(ClusterError::CommandError(output));
644 }
645
646 Ok(superusers)
647}
648
649pub type Options<'a> = &'a [(config::Parameter<'a>, config::Value)];
650
651/// [`Cluster`] can be coordinated.
652impl coordinate::Subject for Cluster {
653 type Error = ClusterError;
654 type Options<'a> = Options<'a>;
655
656 fn start(&self, options: Self::Options<'_>) -> Result<State, Self::Error> {
657 self.start(options)
658 }
659
660 fn stop(&self) -> Result<State, Self::Error> {
661 self.stop()
662 }
663
664 fn destroy(&self) -> Result<State, Self::Error> {
665 self.destroy()
666 }
667
668 fn exists(&self) -> Result<bool, Self::Error> {
669 Ok(exists(self))
670 }
671
672 fn running(&self) -> Result<bool, Self::Error> {
673 self.running()
674 }
675}
676
677#[allow(clippy::unreadable_literal)]
678const UUID_NS: uuid::Uuid = uuid::Uuid::from_u128(93875103436633470414348750305797058811);
679
680pub type ClusterGuard = coordinate::guard::Guard<Cluster>;
681
682/// Create and start a cluster at the given path, with the given options.
683///
684/// Uses the default runtime strategy. Returns a guard which will stop the
685/// cluster when it's dropped.
686pub fn run<P: AsRef<Path>>(
687 path: P,
688 options: Options<'_>,
689) -> Result<ClusterGuard, coordinate::CoordinateError<ClusterError>> {
690 let path = path.as_ref();
691 // We have to create the data directory so that we can canonicalize its
692 // location. This is because we use the data directory's path as the basis
693 // for the lock file's name. This is duplicative – `Cluster::create` also
694 // creates the data directory – but necessary.
695 fs::create_dir_all(path)?;
696 let path = path.canonicalize()?;
697
698 let strategy = crate::runtime::strategy::Strategy::default();
699 let cluster = crate::cluster::Cluster::new(&path, strategy)?;
700
701 let lock_name = path.as_os_str().as_bytes();
702 let lock_uuid = uuid::Uuid::new_v5(&UUID_NS, lock_name);
703 let lock = crate::lock::UnlockedFile::try_from(&lock_uuid)?;
704
705 ClusterGuard::startup(lock, cluster, options)
706}
707
708// ----------------------------------------------------------------------
709
710mod logfile {
711 use std::fs::File;
712 use std::io::{self, ErrorKind::NotFound, Read, Seek};
713 use std::path::{Path, PathBuf};
714
715 /// Abstraction for reading a log file that may be appended to repeatedly.
716 ///
717 /// For example, `postmaster.log` may be written to each time we call
718 /// `pg_ctl start`. If it succeeds, a spawned `postgres` process will
719 /// continue writing to the log indefinitely, even after `pg_ctl` finishes.
720 /// If it fails, however, it will write out the cause to this log. It's this
721 /// information that we want to capture so that we can use it for deciding
722 /// whether or not to retry a call to `pg_ctl`.
723 ///
724 /// The log file does not need to exist when creating a new [`LogFile`], and
725 /// it will not create it. Its [`Read::read`] implementation will return
726 /// `Ok(0)` as long as the log file does not exist.
727 ///
728 /// Limitations of [`LogFile`]:
729 /// - It keeps the file handle open, meaning that if the log file is deleted
730 /// and reopened, this will not see new logs.
731 /// - It will not detect if the log is truncated; again, this will not see
732 /// new logs.
733 pub struct LogFile {
734 path: PathBuf,
735 file: Option<File>,
736 }
737
738 impl TryFrom<&Path> for LogFile {
739 type Error = io::Error;
740
741 fn try_from(path: &Path) -> Result<Self, Self::Error> {
742 let file = match File::open(path) {
743 Err(err) if err.kind() == NotFound => None,
744 Err(err) => Err(err)?,
745 Ok(mut file) => {
746 file.seek(io::SeekFrom::End(0))?;
747 Some(file)
748 }
749 };
750 Ok(LogFile { path: path.to_owned(), file })
751 }
752 }
753
754 impl Read for LogFile {
755 fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
756 if let Some(ref mut file) = self.file {
757 file.read(buf)
758 } else {
759 self.file = match File::open(&self.path) {
760 Err(err) if err.kind() == NotFound => return Ok(0),
761 Err(err) => Err(err)?,
762 Ok(file) => Some(file),
763 };
764 match self.file {
765 Some(ref mut file) => file.read(buf),
766 None => Ok(0),
767 }
768 }
769 }
770 }
771}
772
773mod bugs {
774 use super::{ClusterError, State, State::Modified};
775 use regex::bytes::Regex;
776 use std::process::{Command, Output};
777 use std::{fmt::Write, sync, time::Duration};
778
779 /// Work around bugs in `pg_ctl`.
780 ///
781 /// In all presently released versions of PostgreSQL, `pg_ctl init` can fail
782 /// on some platforms (macOS, some BSDs) due to a semget(2) failure. This is
783 /// a particular problem in PostgreSQL 17.x (see the linked bug report below
784 /// to understand why). We try to work around this by retrying the command a
785 /// few times with a short delay between each attempt.
786 ///
787 /// We detect whether we want to retry by checking `pg_ctl`'s output for log
788 /// messages like these:
789 ///
790 /// FATAL: could not create semaphores: Invalid argument DETAIL: Failed
791 /// system call was semget(189980241, 20, 03600).
792 ///
793 /// See the [initial bug
794 /// report](https://www.postgresql.org/message-id/CALL7chmzY3eXHA7zHnODUVGZLSvK3wYCSP0RmcDFHJY8f28Q3g@mail.gmail.com)
795 /// and its thread for _much_ more detail.
796 pub fn retry_pg_ctl(
797 command: &mut Command,
798 mut supplement: impl FnMut(&mut Output) -> Result<(), ClusterError>,
799 ) -> Result<State, ClusterError> {
800 static SEMGET_BUG_LOG: sync::Once = sync::Once::new();
801 static SEMGET_BUG_RE1: sync::LazyLock<Regex> = sync::LazyLock::new(|| {
802 Regex::new(r"\bFATAL:\s+could not create semaphores: Invalid argument\b")
803 .expect("invalid regex (for matching semaphore errors #1)")
804 });
805 static SEMGET_BUG_RE2: sync::LazyLock<Regex> = sync::LazyLock::new(|| {
806 Regex::new(r"\bDETAIL:\s+Failed system call was semget\b")
807 .expect("invalid regex (for matching semaphore errors #2)")
808 });
809
810 fn is_retryable(logs: &[u8]) -> bool {
811 SEMGET_BUG_RE1.is_match(logs) && SEMGET_BUG_RE2.is_match(logs)
812 }
813
814 // Capture which `pg_ctl` command we're running – assume it's the first
815 // argument – for use with `notify` later on.
816 let command_summary = command.get_args().take(1).fold(
817 command.get_program().display().to_string(),
818 |mut summary, arg| {
819 write!(&mut summary, " {}", arg.display()).ok();
820 summary
821 },
822 );
823
824 // Function that attempts to run `command`.
825 let run = || {
826 use backoff::Error;
827 use ClusterError::{CommandError, IoError};
828 match command.output() {
829 Ok(output) if output.status.success() => Ok(Modified),
830 Ok(mut output) => {
831 supplement(&mut output)?;
832 if is_retryable(output.stderr.as_slice()) {
833 Err(Error::transient(CommandError(output)))
834 } else {
835 Err(Error::permanent(CommandError(output)))
836 }
837 }
838 Err(err) => Err(Error::permanent(IoError(err))),
839 }
840 };
841
842 // Function that notifies when `pg_ctl` fails transiently.
843 let notify = move |_, delay: Duration| {
844 SEMGET_BUG_LOG.call_once(|| {
845 log::info!(concat!(
846 "In all presently released versions of PostgreSQL, `pg_ctl` can fail on ",
847 "some platforms (macOS, some BSDs) due to a semget(2) failure. This is a ",
848 "particular problem in PostgreSQL 17.x (the linked bug report has more ",
849 "information). As an imperfect workaround, `pgdo` retries the command a ",
850 "few times when it detects this specific error. Original bug report: ",
851 "https://www.postgresql.org/message-id/CALL7chmzY3eXHA7zHnODUVGZLSvK3wYCSP0RmcDFHJY8f28Q3g@mail.gmail.com.",
852 ));
853 });
854 log::warn!("`{command_summary}` failed; retrying in {delay:?}…",);
855 };
856
857 // Retry with exponential backoff + jitter.
858 let state = backoff::retry_notify(
859 backoff::ExponentialBackoffBuilder::new()
860 .with_initial_interval(Duration::from_millis(200))
861 .with_max_elapsed_time(Some(Duration::from_secs(60)))
862 .with_max_interval(Duration::from_millis(10000))
863 .build(),
864 run,
865 notify,
866 )
867 .map_err(|err| match err {
868 backoff::Error::Permanent(err) => err,
869 backoff::Error::Transient { err, .. } => err,
870 })?;
871
872 Ok(state)
873 }
874}