pg_embedded_setup_unpriv/cluster/
mod.rs

1//! RAII wrapper that boots an embedded `PostgreSQL` instance for tests.
2//!
3//! The cluster starts during [`TestCluster::new`] and shuts down automatically when the
4//! value drops out of scope.
5//!
6//! # Examples
7//! ```no_run
8//! use pg_embedded_setup_unpriv::TestCluster;
9//!
10//! # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
11//! let cluster = TestCluster::new()?;
12//! let url = cluster.settings().url("my_database");
13//! // Perform test database work here.
14//! drop(cluster); // `PostgreSQL` stops automatically.
15//! # Ok(())
16//! # }
17//! ```
18
19mod connection;
20mod delegation;
21mod lifecycle;
22mod runtime;
23mod temporary_database;
24mod worker_invoker;
25mod worker_operation;
26
27pub use self::connection::{ConnectionMetadata, TestClusterConnection};
28pub use self::lifecycle::DatabaseName;
29pub use self::temporary_database::TemporaryDatabase;
30#[cfg(any(doc, test, feature = "cluster-unit-tests", feature = "dev-worker"))]
31pub use self::worker_invoker::WorkerInvoker;
32#[doc(hidden)]
33pub use self::worker_operation::WorkerOperation;
34
35use self::runtime::build_runtime;
36use self::worker_invoker::WorkerInvoker as ClusterWorkerInvoker;
37use crate::bootstrap_for_tests;
38use crate::env::ScopedEnv;
39use crate::error::BootstrapResult;
40use crate::observability::LOG_TARGET;
41use crate::{ExecutionPrivileges, TestBootstrapEnvironment, TestBootstrapSettings};
42use postgresql_embedded::{PostgreSQL, Settings};
43use std::fmt::Display;
44use tokio::runtime::Runtime;
45use tokio::time;
46use tracing::{info, info_span};
47
48/// Embedded `PostgreSQL` instance whose lifecycle follows Rust's drop semantics.
49#[derive(Debug)]
50pub struct TestCluster {
51    runtime: Runtime,
52    postgres: Option<PostgreSQL>,
53    bootstrap: TestBootstrapSettings,
54    is_managed_via_worker: bool,
55    env_vars: Vec<(String, Option<String>)>,
56    worker_guard: Option<ScopedEnv>,
57    _env_guard: ScopedEnv,
58    // Keeps the cluster span alive for the lifetime of the guard.
59    _cluster_span: tracing::Span,
60}
61
62struct StartupOutcome {
63    bootstrap: TestBootstrapSettings,
64    postgres: Option<PostgreSQL>,
65    is_managed_via_worker: bool,
66}
67
68impl TestCluster {
69    /// Boots a `PostgreSQL` instance configured by [`bootstrap_for_tests`].
70    ///
71    /// The constructor blocks until the underlying server process is running and returns an
72    /// error when startup fails.
73    ///
74    /// # Errors
75    /// Returns an error if the bootstrap configuration cannot be prepared or if starting the
76    /// embedded cluster fails.
77    pub fn new() -> BootstrapResult<Self> {
78        let span = info_span!(target: LOG_TARGET, "test_cluster");
79        let (runtime, env_vars, env_guard, outcome) = {
80            let _entered = span.enter();
81            let initial_bootstrap = bootstrap_for_tests()?;
82            let runtime = build_runtime()?;
83            let env_vars = initial_bootstrap.environment.to_env();
84            let env_guard = ScopedEnv::apply(&env_vars);
85            let outcome = Self::start_postgres(&runtime, initial_bootstrap, &env_vars)?;
86            (runtime, env_vars, env_guard, outcome)
87        };
88
89        Ok(Self {
90            runtime,
91            postgres: outcome.postgres,
92            bootstrap: outcome.bootstrap,
93            is_managed_via_worker: outcome.is_managed_via_worker,
94            env_vars,
95            worker_guard: None,
96            _env_guard: env_guard,
97            _cluster_span: span,
98        })
99    }
100
101    #[expect(
102        clippy::cognitive_complexity,
103        reason = "privilege-aware lifecycle setup requires explicit branching for observability"
104    )]
105    fn start_postgres(
106        runtime: &Runtime,
107        mut bootstrap: TestBootstrapSettings,
108        env_vars: &[(String, Option<String>)],
109    ) -> BootstrapResult<StartupOutcome> {
110        let privileges = bootstrap.privileges;
111        let mut embedded = PostgreSQL::new(bootstrap.settings.clone());
112        info!(
113            target: LOG_TARGET,
114            privileges = ?privileges,
115            mode = ?bootstrap.execution_mode,
116            "starting embedded postgres lifecycle"
117        );
118
119        let invoker = ClusterWorkerInvoker::new(runtime, &bootstrap, env_vars);
120        Self::invoke_lifecycle(&invoker, &mut embedded)?;
121
122        let is_managed_via_worker = matches!(privileges, ExecutionPrivileges::Root);
123        let postgres =
124            Self::prepare_postgres_handle(is_managed_via_worker, &mut bootstrap, embedded);
125
126        info!(
127            target: LOG_TARGET,
128            privileges = ?privileges,
129            worker_managed = is_managed_via_worker,
130            "embedded postgres started"
131        );
132        Ok(StartupOutcome {
133            bootstrap,
134            postgres,
135            is_managed_via_worker,
136        })
137    }
138
139    fn prepare_postgres_handle(
140        is_managed_via_worker: bool,
141        bootstrap: &mut TestBootstrapSettings,
142        embedded: PostgreSQL,
143    ) -> Option<PostgreSQL> {
144        if is_managed_via_worker {
145            None
146        } else {
147            bootstrap.settings = embedded.settings().clone();
148            Some(embedded)
149        }
150    }
151
152    fn invoke_lifecycle(
153        invoker: &ClusterWorkerInvoker<'_>,
154        embedded: &mut PostgreSQL,
155    ) -> BootstrapResult<()> {
156        invoker.invoke(worker_operation::WorkerOperation::Setup, async {
157            embedded.setup().await
158        })?;
159        invoker.invoke(worker_operation::WorkerOperation::Start, async {
160            embedded.start().await
161        })
162    }
163
164    /// Extends the cluster lifetime to cover additional scoped environment guards.
165    ///
166    /// Primarily used by fixtures that need to ensure `PG_EMBEDDED_WORKER` remains set for the
167    /// duration of the cluster lifetime.
168    #[doc(hidden)]
169    #[must_use]
170    pub fn with_worker_guard(mut self, worker_guard: Option<ScopedEnv>) -> Self {
171        self.worker_guard = worker_guard;
172        self
173    }
174
175    /// Returns the prepared `PostgreSQL` settings for the running cluster.
176    pub const fn settings(&self) -> &Settings {
177        &self.bootstrap.settings
178    }
179
180    /// Returns the environment required for clients to interact with the cluster.
181    pub const fn environment(&self) -> &TestBootstrapEnvironment {
182        &self.bootstrap.environment
183    }
184
185    /// Returns the bootstrap metadata captured when the cluster was started.
186    pub const fn bootstrap(&self) -> &TestBootstrapSettings {
187        &self.bootstrap
188    }
189
190    /// Returns helper methods for constructing connection artefacts.
191    ///
192    /// # Examples
193    /// ```no_run
194    /// use pg_embedded_setup_unpriv::TestCluster;
195    ///
196    /// # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
197    /// let cluster = TestCluster::new()?;
198    /// let metadata = cluster.connection().metadata();
199    /// println!(
200    ///     "postgresql://{}:***@{}:{}/postgres",
201    ///     metadata.superuser(),
202    ///     metadata.host(),
203    ///     metadata.port(),
204    /// );
205    /// # Ok(())
206    /// # }
207    /// ```
208    #[must_use]
209    pub fn connection(&self) -> TestClusterConnection {
210        TestClusterConnection::new(&self.bootstrap)
211    }
212
213    fn stop_context(settings: &Settings) -> String {
214        let data_dir = settings.data_dir.display();
215        let version = settings.version.to_string();
216        format!("version {version}, data_dir {data_dir}")
217    }
218
219    fn warn_stop_failure(context: &str, err: &impl Display) {
220        tracing::warn!(
221            "SKIP-TEST-CLUSTER: failed to stop embedded postgres instance ({}): {}",
222            context,
223            err
224        );
225    }
226
227    fn warn_stop_timeout(timeout_secs: u64, context: &str) {
228        tracing::warn!(
229            "SKIP-TEST-CLUSTER: stop() timed out after {timeout_secs}s ({context}); proceeding with drop"
230        );
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use std::ffi::OsString;
237
238    use super::*;
239    use crate::ExecutionPrivileges;
240    use crate::test_support::{dummy_settings, scoped_env};
241
242    #[test]
243    fn with_worker_guard_restores_environment() {
244        const KEY: &str = "PG_EMBEDDED_WORKER_GUARD_TEST";
245        let baseline = std::env::var(KEY).ok();
246        let guard = scoped_env(vec![(OsString::from(KEY), Some(OsString::from("guarded")))]);
247        let cluster = dummy_cluster().with_worker_guard(Some(guard));
248        assert_eq!(
249            std::env::var(KEY).as_deref(),
250            Ok("guarded"),
251            "worker guard should remain active whilst the cluster runs",
252        );
253        drop(cluster);
254        match baseline {
255            Some(value) => assert_eq!(
256                std::env::var(KEY).as_deref(),
257                Ok(value.as_str()),
258                "worker guard should restore the previous value"
259            ),
260            None => assert!(
261                std::env::var(KEY).is_err(),
262                "worker guard should unset the variable once the cluster drops"
263            ),
264        }
265    }
266
267    fn dummy_cluster() -> TestCluster {
268        let runtime = tokio::runtime::Builder::new_current_thread()
269            .enable_all()
270            .build()
271            .expect("test runtime");
272        let span = info_span!(target: LOG_TARGET, "test_cluster");
273        let bootstrap = dummy_settings(ExecutionPrivileges::Unprivileged);
274        let env_vars = bootstrap.environment.to_env();
275        let env_guard = ScopedEnv::apply(&env_vars);
276        TestCluster {
277            runtime,
278            postgres: None,
279            bootstrap,
280            is_managed_via_worker: false,
281            env_vars,
282            worker_guard: None,
283            _env_guard: env_guard,
284            _cluster_span: span,
285        }
286    }
287}
288
289impl Drop for TestCluster {
290    #[expect(
291        clippy::cognitive_complexity,
292        reason = "drop path must branch between worker and in-process shutdown with logging"
293    )]
294    fn drop(&mut self) {
295        let context = Self::stop_context(&self.bootstrap.settings);
296        info!(
297            target: LOG_TARGET,
298            context = %context,
299            worker_managed = self.is_managed_via_worker,
300            "stopping embedded postgres cluster"
301        );
302
303        if self.is_managed_via_worker {
304            let invoker = ClusterWorkerInvoker::new(&self.runtime, &self.bootstrap, &self.env_vars);
305            if let Err(err) = invoker.invoke_as_root(worker_operation::WorkerOperation::Stop) {
306                Self::warn_stop_failure(&context, &err);
307            }
308        } else if let Some(postgres) = self.postgres.take() {
309            let timeout = self.bootstrap.shutdown_timeout;
310            let timeout_secs = timeout.as_secs();
311            let outcome = self
312                .runtime
313                .block_on(async { time::timeout(timeout, postgres.stop()).await });
314
315            match outcome {
316                Ok(Ok(())) => {}
317                Ok(Err(err)) => Self::warn_stop_failure(&context, &err),
318                Err(_) => Self::warn_stop_timeout(timeout_secs, &context),
319            }
320        }
321        // Environment guards drop after this block, restoring the process state.
322    }
323}
324
325#[cfg(all(test, feature = "cluster-unit-tests"))]
326mod drop_logging_tests {
327    use super::*;
328    use crate::test_support::capture_warn_logs;
329
330    #[test]
331    fn warn_stop_timeout_emits_warning() {
332        let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_timeout(5, "ctx"));
333        assert!(
334            logs.iter()
335                .any(|line| line.contains("stop() timed out after 5s (ctx)")),
336            "expected timeout warning, got {logs:?}"
337        );
338    }
339
340    #[test]
341    fn warn_stop_failure_emits_warning() {
342        let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_failure("ctx", &"boom"));
343        assert!(
344            logs.iter()
345                .any(|line| line.contains("failed to stop embedded postgres instance")),
346            "expected failure warning, got {logs:?}"
347        );
348    }
349}
350
351#[cfg(all(test, not(feature = "cluster-unit-tests")))]
352#[path = "../../tests/test_cluster.rs"]
353mod tests;