pg_embedded_setup_unpriv/cluster/
mod.rs

1//! RAII wrapper that boots an embedded `PostgreSQL` instance for tests.
2//!
3//! The cluster starts during [`TestCluster::new`] and shuts down automatically when the
4//! value drops out of scope.
5//!
6//! # Synchronous API
7//!
8//! Use [`TestCluster::new`] from synchronous contexts or when you want the cluster to
9//! own its own Tokio runtime:
10//!
11//! ```no_run
12//! use pg_embedded_setup_unpriv::TestCluster;
13//!
14//! # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
15//! let cluster = TestCluster::new()?;
16//! let url = cluster.settings().url("my_database");
17//! // Perform test database work here.
18//! drop(cluster); // `PostgreSQL` stops automatically.
19//! # Ok(())
20//! # }
21//! ```
22//!
23//! # Async API
24//!
25//! When running within an existing async runtime (e.g., `#[tokio::test]`), use
26//! [`TestCluster::start_async`] to avoid the "Cannot start a runtime from within a
27//! runtime" panic that occurs when nesting Tokio runtimes:
28//!
29//! ```ignore
30//! use pg_embedded_setup_unpriv::TestCluster;
31//!
32//! #[tokio::test]
33//! async fn test_with_embedded_postgres() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
34//!     let cluster = TestCluster::start_async().await?;
35//!     let url = cluster.settings().url("my_database");
36//!     // ... async database operations ...
37//!     cluster.stop_async().await?;
38//!     Ok(())
39//! }
40//! ```
41//!
42//! The async API requires the `async-api` feature flag:
43//!
44//! ```toml
45//! [dependencies]
46//! pg-embedded-setup-unpriv = { version = "...", features = ["async-api"] }
47//! ```
48
49mod cache_integration;
50mod connection;
51mod delegation;
52mod installation;
53mod lifecycle;
54mod runtime;
55mod runtime_mode;
56mod shutdown;
57mod startup;
58mod temporary_database;
59mod worker_invoker;
60mod worker_operation;
61
62pub use self::connection::{ConnectionMetadata, TestClusterConnection};
63pub use self::lifecycle::DatabaseName;
64pub use self::temporary_database::TemporaryDatabase;
65#[cfg(any(doc, test, feature = "cluster-unit-tests", feature = "dev-worker"))]
66pub use self::worker_invoker::WorkerInvoker;
67#[doc(hidden)]
68pub use self::worker_operation::WorkerOperation;
69
70use self::runtime::build_runtime;
71use self::runtime_mode::ClusterRuntime;
72#[cfg(feature = "async-api")]
73use self::startup::start_postgres_async;
74use self::startup::{cache_config_from_bootstrap, start_postgres};
75use crate::bootstrap_for_tests;
76use crate::env::ScopedEnv;
77use crate::error::BootstrapResult;
78use crate::observability::LOG_TARGET;
79use crate::{TestBootstrapEnvironment, TestBootstrapSettings};
80use postgresql_embedded::{PostgreSQL, Settings};
81use tracing::{info, info_span};
82
83/// Embedded `PostgreSQL` instance whose lifecycle follows Rust's drop semantics.
84#[derive(Debug)]
85pub struct TestCluster {
86    /// Runtime mode: either owns a runtime (sync) or runs on caller's runtime (async).
87    runtime: ClusterRuntime,
88    postgres: Option<PostgreSQL>,
89    bootstrap: TestBootstrapSettings,
90    is_managed_via_worker: bool,
91    env_vars: Vec<(String, Option<String>)>,
92    worker_guard: Option<ScopedEnv>,
93    _env_guard: ScopedEnv,
94    // Keeps the cluster span alive for the lifetime of the guard.
95    _cluster_span: tracing::Span,
96}
97
98impl TestCluster {
99    /// Boots a `PostgreSQL` instance configured by [`bootstrap_for_tests`].
100    ///
101    /// The constructor blocks until the underlying server process is running and returns an
102    /// error when startup fails.
103    ///
104    /// # Errors
105    /// Returns an error if the bootstrap configuration cannot be prepared or if starting the
106    /// embedded cluster fails.
107    pub fn new() -> BootstrapResult<Self> {
108        let span = info_span!(target: LOG_TARGET, "test_cluster");
109        // Resolve cache directory BEFORE applying test environment.
110        // Otherwise, the test sandbox's XDG_CACHE_HOME would be used.
111        let (runtime, env_vars, env_guard, outcome) = {
112            let _entered = span.enter();
113            let initial_bootstrap = bootstrap_for_tests()?;
114            let cache_config = cache_config_from_bootstrap(&initial_bootstrap);
115            let runtime = build_runtime()?;
116            let env_vars = initial_bootstrap.environment.to_env();
117            let env_guard = ScopedEnv::apply(&env_vars);
118            let outcome = start_postgres(&runtime, initial_bootstrap, &env_vars, &cache_config)?;
119            (runtime, env_vars, env_guard, outcome)
120        };
121
122        Ok(Self {
123            runtime: ClusterRuntime::Sync(runtime),
124            postgres: outcome.postgres,
125            bootstrap: outcome.bootstrap,
126            is_managed_via_worker: outcome.is_managed_via_worker,
127            env_vars,
128            worker_guard: None,
129            _env_guard: env_guard,
130            _cluster_span: span,
131        })
132    }
133
134    /// Boots a `PostgreSQL` instance asynchronously for use in `#[tokio::test]` contexts.
135    ///
136    /// Unlike [`TestCluster::new`], this constructor does not create its own Tokio runtime.
137    /// Instead, it runs on the caller's async runtime, making it safe to call from within
138    /// `#[tokio::test]` and other async contexts.
139    ///
140    /// **Important:** Clusters created with `start_async()` should be shut down explicitly
141    /// using [`stop_async()`](Self::stop_async). The `Drop` implementation will attempt
142    /// best-effort cleanup but may not succeed if the runtime is no longer available.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if the bootstrap configuration cannot be prepared or if starting
147    /// the embedded cluster fails.
148    ///
149    /// # Examples
150    ///
151    /// ```no_run
152    /// use pg_embedded_setup_unpriv::TestCluster;
153    ///
154    /// #[tokio::test]
155    /// async fn test_with_embedded_postgres() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
156    ///     let cluster = TestCluster::start_async().await?;
157    ///     let url = cluster.settings().url("my_database");
158    ///     // ... async database operations ...
159    ///     cluster.stop_async().await?;
160    ///     Ok(())
161    /// }
162    /// ```
163    #[cfg(feature = "async-api")]
164    pub async fn start_async() -> BootstrapResult<Self> {
165        use tracing::Instrument;
166
167        let span = info_span!(target: LOG_TARGET, "test_cluster", async_mode = true);
168
169        // Sync bootstrap preparation (no await needed).
170        // Resolve cache directory BEFORE applying test environment.
171        // Otherwise, the test sandbox's XDG_CACHE_HOME would be used.
172        let initial_bootstrap = bootstrap_for_tests()?;
173        let cache_config = cache_config_from_bootstrap(&initial_bootstrap);
174        let env_vars = initial_bootstrap.environment.to_env();
175        let env_guard = ScopedEnv::apply(&env_vars);
176
177        // Async postgres startup, instrumented with the span.
178        // Box::pin to avoid large future on the stack.
179        let outcome = Box::pin(start_postgres_async(
180            initial_bootstrap,
181            &env_vars,
182            &cache_config,
183        ))
184        .instrument(span.clone())
185        .await?;
186
187        Ok(Self {
188            runtime: ClusterRuntime::Async,
189            postgres: outcome.postgres,
190            bootstrap: outcome.bootstrap,
191            is_managed_via_worker: outcome.is_managed_via_worker,
192            env_vars,
193            worker_guard: None,
194            _env_guard: env_guard,
195            _cluster_span: span,
196        })
197    }
198
199    /// Extends the cluster lifetime to cover additional scoped environment guards.
200    ///
201    /// Primarily used by fixtures that need to ensure `PG_EMBEDDED_WORKER` remains set for the
202    /// duration of the cluster lifetime.
203    #[doc(hidden)]
204    #[must_use]
205    pub fn with_worker_guard(mut self, worker_guard: Option<ScopedEnv>) -> Self {
206        self.worker_guard = worker_guard;
207        self
208    }
209
210    /// Explicitly shuts down an async cluster.
211    ///
212    /// This method should be called for clusters created with [`start_async()`](Self::start_async)
213    /// to ensure proper cleanup. It consumes `self` to prevent the `Drop` implementation from
214    /// attempting duplicate shutdown.
215    ///
216    /// For worker-managed clusters (root privileges), the worker subprocess is invoked
217    /// synchronously via `spawn_blocking`.
218    ///
219    /// # Errors
220    ///
221    /// Returns an error if the shutdown operation fails. The cluster resources are released
222    /// regardless of whether shutdown succeeds.
223    ///
224    /// # Examples
225    ///
226    /// ```no_run
227    /// use pg_embedded_setup_unpriv::TestCluster;
228    ///
229    /// #[tokio::test]
230    /// async fn test_explicit_shutdown() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
231    ///     let cluster = TestCluster::start_async().await?;
232    ///     // ... use cluster ...
233    ///     cluster.stop_async().await?;
234    ///     Ok(())
235    /// }
236    /// ```
237    #[cfg(feature = "async-api")]
238    pub async fn stop_async(mut self) -> BootstrapResult<()> {
239        let context = shutdown::stop_context(&self.bootstrap.settings);
240        shutdown::log_async_stop(&context, self.is_managed_via_worker);
241
242        if self.is_managed_via_worker {
243            shutdown::stop_worker_managed_async(&self.bootstrap, &self.env_vars, &context).await
244        } else if let Some(postgres) = self.postgres.take() {
245            shutdown::stop_in_process_async(postgres, self.bootstrap.shutdown_timeout, &context)
246                .await
247        } else {
248            Ok(())
249        }
250    }
251
252    /// Returns the prepared `PostgreSQL` settings for the running cluster.
253    pub const fn settings(&self) -> &Settings {
254        &self.bootstrap.settings
255    }
256
257    /// Returns the environment required for clients to interact with the cluster.
258    pub const fn environment(&self) -> &TestBootstrapEnvironment {
259        &self.bootstrap.environment
260    }
261
262    /// Returns the bootstrap metadata captured when the cluster was started.
263    pub const fn bootstrap(&self) -> &TestBootstrapSettings {
264        &self.bootstrap
265    }
266
267    /// Returns helper methods for constructing connection artefacts.
268    ///
269    /// # Examples
270    /// ```no_run
271    /// use pg_embedded_setup_unpriv::TestCluster;
272    ///
273    /// # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
274    /// let cluster = TestCluster::new()?;
275    /// let metadata = cluster.connection().metadata();
276    /// println!(
277    ///     "postgresql://{}:***@{}:{}/postgres",
278    ///     metadata.superuser(),
279    ///     metadata.host(),
280    ///     metadata.port(),
281    /// );
282    /// # Ok(())
283    /// # }
284    /// ```
285    #[must_use]
286    pub fn connection(&self) -> TestClusterConnection {
287        TestClusterConnection::new(&self.bootstrap)
288    }
289}
290
291impl Drop for TestCluster {
292    fn drop(&mut self) {
293        let context = shutdown::stop_context(&self.bootstrap.settings);
294        let is_async = self.runtime.is_async();
295        info!(
296            target: LOG_TARGET,
297            context = %context,
298            worker_managed = self.is_managed_via_worker,
299            async_mode = is_async,
300            "stopping embedded postgres cluster"
301        );
302
303        if is_async {
304            // Async clusters should use stop_async() explicitly; attempt best-effort cleanup.
305            shutdown::drop_async_cluster(shutdown::DropContext {
306                is_managed_via_worker: self.is_managed_via_worker,
307                postgres: &mut self.postgres,
308                bootstrap: &self.bootstrap,
309                env_vars: &self.env_vars,
310                context: &context,
311            });
312        } else {
313            self.drop_sync_cluster(&context);
314        }
315        // Environment guards drop after this block, restoring the process state.
316    }
317}
318
319impl TestCluster {
320    /// Synchronous drop path: stops the cluster using the owned runtime.
321    fn drop_sync_cluster(&mut self, context: &str) {
322        let ClusterRuntime::Sync(ref runtime) = self.runtime else {
323            // Should never happen: drop_sync_cluster is only called for sync mode.
324            return;
325        };
326
327        shutdown::drop_sync_cluster(
328            runtime,
329            shutdown::DropContext {
330                is_managed_via_worker: self.is_managed_via_worker,
331                postgres: &mut self.postgres,
332                bootstrap: &self.bootstrap,
333                env_vars: &self.env_vars,
334                context,
335            },
336        );
337    }
338}
339
340#[cfg(test)]
341mod mod_tests;
342
343#[cfg(all(test, feature = "cluster-unit-tests"))]
344mod drop_logging_tests {
345    use crate::test_support::capture_warn_logs;
346
347    use super::shutdown;
348
349    #[test]
350    fn warn_stop_timeout_emits_warning() {
351        let (logs, ()) = capture_warn_logs(|| shutdown::warn_stop_timeout(5, "ctx"));
352        assert!(
353            logs.iter()
354                .any(|line| line.contains("stop() timed out after 5s (ctx)")),
355            "expected timeout warning, got {logs:?}"
356        );
357    }
358
359    #[test]
360    fn warn_stop_failure_emits_warning() {
361        let (logs, ()) = capture_warn_logs(|| shutdown::warn_stop_failure("ctx", &"boom"));
362        assert!(
363            logs.iter()
364                .any(|line| line.contains("failed to stop embedded postgres instance")),
365            "expected failure warning, got {logs:?}"
366        );
367    }
368}
369
370#[cfg(all(test, not(feature = "cluster-unit-tests")))]
371#[path = "../../tests/test_cluster.rs"]
372mod test_cluster_tests;