Skip to main content

pg_embedded_setup_unpriv/cluster/
mod.rs

1//! RAII wrapper that boots an embedded `PostgreSQL` instance for tests.
2//!
3//! The cluster starts during [`TestCluster::new`] and shuts down automatically when the
4//! value drops out of scope.
5//!
6//! # Synchronous API
7//!
8//! Use [`TestCluster::new`] from synchronous contexts or when you want the cluster to
9//! own its own Tokio runtime:
10//!
11//! ```no_run
12//! use pg_embedded_setup_unpriv::TestCluster;
13//!
14//! # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
15//! let cluster = TestCluster::new()?;
16//! let url = cluster.settings().url("my_database");
17//! // Perform test database work here.
18//! drop(cluster); // `PostgreSQL` stops automatically.
19//! # Ok(())
20//! # }
21//! ```
22//!
23//! # Async API
24//!
25//! When running within an existing async runtime (e.g., `#[tokio::test]`), use
26//! [`TestCluster::start_async`] to avoid the "Cannot start a runtime from within a
27//! runtime" panic that occurs when nesting Tokio runtimes:
28//!
29//! ```ignore
30//! use pg_embedded_setup_unpriv::TestCluster;
31//!
32//! #[tokio::test]
33//! async fn test_with_embedded_postgres() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
34//!     let cluster = TestCluster::start_async().await?;
35//!     let url = cluster.settings().url("my_database");
36//!     // ... async database operations ...
37//!     cluster.stop_async().await?;
38//!     Ok(())
39//! }
40//! ```
41//!
42//! The async API requires the `async-api` feature flag:
43//!
44//! ```toml
45//! [dependencies]
46//! pg-embedded-setup-unpriv = { version = "...", features = ["async-api"] }
47//! ```
48
49mod cache_integration;
50mod cleanup;
51mod connection;
52mod delegation;
53mod guard;
54mod handle;
55mod installation;
56mod lifecycle;
57mod runtime;
58mod runtime_mode;
59mod shutdown;
60mod startup;
61mod temporary_database;
62mod worker_invoker;
63mod worker_operation;
64
65pub use self::connection::{ConnectionMetadata, TestClusterConnection};
66pub use self::guard::ClusterGuard;
67pub use self::handle::ClusterHandle;
68pub use self::lifecycle::DatabaseName;
69pub use self::temporary_database::TemporaryDatabase;
70#[cfg(any(doc, test, feature = "cluster-unit-tests", feature = "dev-worker"))]
71pub use self::worker_invoker::WorkerInvoker;
72#[doc(hidden)]
73pub use self::worker_operation::WorkerOperation;
74
75use self::runtime::build_runtime;
76use self::runtime_mode::ClusterRuntime;
77#[cfg(feature = "async-api")]
78use self::startup::start_postgres_async;
79use self::startup::{cache_config_from_bootstrap, start_postgres};
80use crate::bootstrap_for_tests;
81use crate::env::ScopedEnv;
82use crate::error::BootstrapResult;
83use crate::observability::LOG_TARGET;
84use std::ops::Deref;
85use tracing::info_span;
86
87/// Embedded `PostgreSQL` instance whose lifecycle follows Rust's drop semantics.
88///
89/// `TestCluster` combines a [`ClusterHandle`] (for cluster access) with a
90/// [`ClusterGuard`] (for lifecycle management). For most use cases, this
91/// combined type is the simplest option.
92///
93/// # Send-Safe Patterns
94///
95/// `TestCluster` is `!Send` because it contains environment guards that must
96/// be dropped on the creating thread. For patterns requiring `Send` (such as
97/// `OnceLock` or rstest timeouts), use [`new_split()`](Self::new_split) to
98/// obtain a `Send`-safe [`ClusterHandle`]:
99///
100/// ```no_run
101/// use std::sync::OnceLock;
102/// use pg_embedded_setup_unpriv::{ClusterHandle, TestCluster};
103///
104/// static SHARED: OnceLock<ClusterHandle> = OnceLock::new();
105///
106/// fn shared_cluster() -> &'static ClusterHandle {
107///     SHARED.get_or_init(|| {
108///         let (handle, guard) = TestCluster::new_split()
109///             .expect("cluster bootstrap failed");
110///         // Forget the guard to prevent shutdown on drop
111///         std::mem::forget(guard);
112///         handle
113///     })
114/// }
115/// ```
116#[derive(Debug)]
117pub struct TestCluster {
118    /// Send-safe handle providing cluster access.
119    pub(crate) handle: ClusterHandle,
120    /// Lifecycle guard managing shutdown and environment restoration.
121    pub(crate) guard: ClusterGuard,
122}
123
124#[cfg(feature = "async-api")]
125enum StopAsyncPath {
126    WorkerManaged,
127    InProcess(Box<postgresql_embedded::PostgreSQL>),
128    Noop,
129}
130
131impl TestCluster {
132    /// Boots a `PostgreSQL` instance configured by [`bootstrap_for_tests`].
133    ///
134    /// The constructor blocks until the underlying server process is running and returns an
135    /// error when startup fails.
136    ///
137    /// # Errors
138    /// Returns an error if the bootstrap configuration cannot be prepared or if starting the
139    /// embedded cluster fails.
140    pub fn new() -> BootstrapResult<Self> {
141        let (handle, guard) = Self::new_split()?;
142        Ok(Self { handle, guard })
143    }
144
145    /// Boots a `PostgreSQL` instance and returns a separate handle and guard.
146    ///
147    /// This constructor is useful for patterns requiring `Send`, such as shared
148    /// cluster fixtures with [`OnceLock`](std::sync::OnceLock) or rstest fixtures
149    /// with timeouts.
150    ///
151    /// # Returns
152    ///
153    /// A tuple of:
154    /// - [`ClusterHandle`]: `Send + Sync` handle for accessing the cluster
155    /// - [`ClusterGuard`]: `!Send` guard managing shutdown and environment
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the bootstrap configuration cannot be prepared or if
160    /// starting the embedded cluster fails.
161    ///
162    /// # Examples
163    ///
164    /// ## Shared Cluster with `OnceLock`
165    ///
166    /// For shared clusters that run for the entire process lifetime, forget
167    /// the guard to prevent shutdown:
168    ///
169    /// ```no_run
170    /// use std::sync::OnceLock;
171    /// use pg_embedded_setup_unpriv::{ClusterHandle, TestCluster};
172    ///
173    /// static SHARED: OnceLock<ClusterHandle> = OnceLock::new();
174    ///
175    /// fn shared_cluster() -> &'static ClusterHandle {
176    ///     SHARED.get_or_init(|| {
177    ///         let (handle, guard) = TestCluster::new_split()
178    ///             .expect("cluster bootstrap failed");
179    ///         // Forget the guard to prevent shutdown on drop
180    ///         std::mem::forget(guard);
181    ///         handle
182    ///     })
183    /// }
184    /// ```
185    ///
186    /// **Warning**: Dropping the guard shuts down the cluster. Do not use the
187    /// handle after the guard has been dropped unless the guard was forgotten.
188    pub fn new_split() -> BootstrapResult<(ClusterHandle, ClusterGuard)> {
189        let span = info_span!(target: LOG_TARGET, "test_cluster");
190        // Resolve cache directory BEFORE applying test environment.
191        // Otherwise, the test sandbox's XDG_CACHE_HOME would be used.
192        let (runtime, env_vars, env_guard, outcome) = {
193            let _entered = span.enter();
194            let initial_bootstrap = bootstrap_for_tests()?;
195            let cache_config = cache_config_from_bootstrap(&initial_bootstrap);
196            let runtime = build_runtime()?;
197            let env_vars = initial_bootstrap.environment.to_env();
198            let env_guard = ScopedEnv::apply(&env_vars);
199            let outcome = start_postgres(&runtime, initial_bootstrap, &env_vars, &cache_config)?;
200            (runtime, env_vars, env_guard, outcome)
201        };
202
203        let handle = ClusterHandle::new(outcome.bootstrap.clone());
204        let guard = ClusterGuard {
205            runtime: ClusterRuntime::Sync(runtime),
206            postgres: outcome.postgres,
207            bootstrap: outcome.bootstrap,
208            is_managed_via_worker: outcome.is_managed_via_worker,
209            env_vars,
210            worker_guard: None,
211            _env_guard: env_guard,
212            _cluster_span: span,
213        };
214
215        Ok((handle, guard))
216    }
217
218    /// Boots a `PostgreSQL` instance asynchronously for use in `#[tokio::test]` contexts.
219    ///
220    /// Unlike [`TestCluster::new`], this constructor does not create its own Tokio runtime.
221    /// Instead, it runs on the caller's async runtime, making it safe to call from within
222    /// `#[tokio::test]` and other async contexts.
223    ///
224    /// **Important:** Clusters created with `start_async()` should be shut down explicitly
225    /// using [`stop_async()`](Self::stop_async). The `Drop` implementation will attempt
226    /// best-effort cleanup but may not succeed if the runtime is no longer available.
227    ///
228    /// # Errors
229    ///
230    /// Returns an error if the bootstrap configuration cannot be prepared or if starting
231    /// the embedded cluster fails.
232    ///
233    /// # Examples
234    ///
235    /// ```no_run
236    /// use pg_embedded_setup_unpriv::TestCluster;
237    ///
238    /// #[tokio::test]
239    /// async fn test_with_embedded_postgres() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
240    ///     let cluster = TestCluster::start_async().await?;
241    ///     let url = cluster.settings().url("my_database");
242    ///     // ... async database operations ...
243    ///     cluster.stop_async().await?;
244    ///     Ok(())
245    /// }
246    /// ```
247    #[cfg(feature = "async-api")]
248    pub async fn start_async() -> BootstrapResult<Self> {
249        let (handle, guard) = Self::start_async_split().await?;
250        Ok(Self { handle, guard })
251    }
252
253    /// Boots a `PostgreSQL` instance asynchronously and returns a separate handle and guard.
254    ///
255    /// This is the async equivalent of [`new_split()`](Self::new_split).
256    ///
257    /// # Returns
258    ///
259    /// A tuple of:
260    /// - [`ClusterHandle`]: `Send + Sync` handle for accessing the cluster
261    /// - [`ClusterGuard`]: `!Send` guard managing shutdown and environment
262    ///
263    /// # Errors
264    ///
265    /// Returns an error if the bootstrap configuration cannot be prepared or if
266    /// starting the embedded cluster fails.
267    #[cfg(feature = "async-api")]
268    pub async fn start_async_split() -> BootstrapResult<(ClusterHandle, ClusterGuard)> {
269        use tracing::Instrument;
270
271        let span = info_span!(target: LOG_TARGET, "test_cluster", async_mode = true);
272
273        // Sync bootstrap preparation (no await needed).
274        // Resolve cache directory BEFORE applying test environment.
275        // Otherwise, the test sandbox's XDG_CACHE_HOME would be used.
276        let initial_bootstrap = bootstrap_for_tests()?;
277        let cache_config = cache_config_from_bootstrap(&initial_bootstrap);
278        let env_vars = initial_bootstrap.environment.to_env();
279        let env_guard = ScopedEnv::apply(&env_vars);
280
281        // Async postgres startup, instrumented with the span.
282        // Box::pin to avoid large future on the stack.
283        let outcome = Box::pin(start_postgres_async(
284            initial_bootstrap,
285            &env_vars,
286            &cache_config,
287        ))
288        .instrument(span.clone())
289        .await?;
290
291        let handle = ClusterHandle::new(outcome.bootstrap.clone());
292        let guard = ClusterGuard {
293            runtime: ClusterRuntime::Async,
294            postgres: outcome.postgres,
295            bootstrap: outcome.bootstrap,
296            is_managed_via_worker: outcome.is_managed_via_worker,
297            env_vars,
298            worker_guard: None,
299            _env_guard: env_guard,
300            _cluster_span: span,
301        };
302
303        Ok((handle, guard))
304    }
305
306    /// Extends the cluster lifetime to cover additional scoped environment guards.
307    ///
308    /// Primarily used by fixtures that need to ensure `PG_EMBEDDED_WORKER` remains set for the
309    /// duration of the cluster lifetime.
310    #[doc(hidden)]
311    #[must_use]
312    pub fn with_worker_guard(self, worker_guard: Option<ScopedEnv>) -> Self {
313        Self {
314            handle: self.handle,
315            guard: self.guard.with_worker_guard(worker_guard),
316        }
317    }
318
319    #[cfg(feature = "async-api")]
320    fn stop_async_path(&mut self) -> StopAsyncPath {
321        if self.guard.is_managed_via_worker {
322            StopAsyncPath::WorkerManaged
323        } else if let Some(postgres) = self.guard.postgres.take() {
324            StopAsyncPath::InProcess(Box::new(postgres))
325        } else {
326            StopAsyncPath::Noop
327        }
328    }
329
330    /// Explicitly shuts down an async cluster.
331    ///
332    /// This method should be called for clusters created with [`start_async()`](Self::start_async)
333    /// to ensure proper cleanup. It consumes `self` to prevent the `Drop` implementation from
334    /// attempting duplicate shutdown.
335    ///
336    /// For worker-managed clusters (root privileges), the worker subprocess is invoked
337    /// synchronously via `spawn_blocking`.
338    ///
339    /// # Errors
340    ///
341    /// Returns an error if the shutdown operation fails. The cluster resources are released
342    /// regardless of whether shutdown succeeds.
343    ///
344    /// # Examples
345    ///
346    /// ```no_run
347    /// use pg_embedded_setup_unpriv::TestCluster;
348    ///
349    /// #[tokio::test]
350    /// async fn test_explicit_shutdown() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
351    ///     let cluster = TestCluster::start_async().await?;
352    ///     // ... use cluster ...
353    ///     cluster.stop_async().await?;
354    ///     Ok(())
355    /// }
356    /// ```
357    #[cfg(feature = "async-api")]
358    pub async fn stop_async(mut self) -> BootstrapResult<()> {
359        let context = shutdown::stop_context(self.handle.settings());
360        shutdown::log_async_stop(&context, self.guard.is_managed_via_worker);
361
362        match self.stop_async_path() {
363            StopAsyncPath::WorkerManaged => {
364                shutdown::stop_worker_managed_async(
365                    &self.guard.bootstrap,
366                    &self.guard.env_vars,
367                    &context,
368                )
369                .await
370            }
371            StopAsyncPath::InProcess(postgres) => {
372                let cleanup = shutdown::InProcessCleanup {
373                    cleanup_mode: self.guard.bootstrap.cleanup_mode,
374                    settings: &self.guard.bootstrap.settings,
375                    context: &context,
376                };
377                shutdown::stop_in_process_async(
378                    *postgres,
379                    self.guard.bootstrap.shutdown_timeout,
380                    cleanup,
381                )
382                .await
383            }
384            StopAsyncPath::Noop => Ok(()),
385        }
386    }
387}
388
389/// Provides transparent access to [`ClusterHandle`] methods.
390///
391/// This allows `TestCluster` to be used interchangeably with `ClusterHandle`
392/// for all read-only operations like `settings()`, `connection()`, etc.
393impl Deref for TestCluster {
394    type Target = ClusterHandle;
395
396    fn deref(&self) -> &Self::Target {
397        &self.handle
398    }
399}
400
401// Note: TestCluster does NOT implement Drop because the ClusterGuard handles shutdown.
402// When TestCluster drops, its _guard field drops, which triggers ClusterGuard::Drop.
403
404#[cfg(test)]
405mod mod_tests;
406
407#[cfg(all(test, feature = "cluster-unit-tests"))]
408mod drop_logging_tests;
409
410#[cfg(all(test, not(feature = "cluster-unit-tests")))]
411#[path = "../../tests/test_cluster.rs"]
412mod test_cluster_tests;