pg_embedded_setup_unpriv/cluster/mod.rs
1//! RAII wrapper that boots an embedded `PostgreSQL` instance for tests.
2//!
3//! The cluster starts during [`TestCluster::new`] and shuts down automatically when the
4//! value drops out of scope.
5//!
6//! # Synchronous API
7//!
8//! Use [`TestCluster::new`] from synchronous contexts or when you want the cluster to
9//! own its own Tokio runtime:
10//!
11//! ```no_run
12//! use pg_embedded_setup_unpriv::TestCluster;
13//!
14//! # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
15//! let cluster = TestCluster::new()?;
16//! let url = cluster.settings().url("my_database");
17//! // Perform test database work here.
18//! drop(cluster); // `PostgreSQL` stops automatically.
19//! # Ok(())
20//! # }
21//! ```
22//!
23//! # Async API
24//!
25//! When running within an existing async runtime (e.g., `#[tokio::test]`), use
26//! [`TestCluster::start_async`] to avoid the "Cannot start a runtime from within a
27//! runtime" panic that occurs when nesting Tokio runtimes:
28//!
29//! ```ignore
30//! use pg_embedded_setup_unpriv::TestCluster;
31//!
32//! #[tokio::test]
33//! async fn test_with_embedded_postgres() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
34//! let cluster = TestCluster::start_async().await?;
35//! let url = cluster.settings().url("my_database");
36//! // ... async database operations ...
37//! cluster.stop_async().await?;
38//! Ok(())
39//! }
40//! ```
41//!
42//! The async API requires the `async-api` feature flag:
43//!
44//! ```toml
45//! [dependencies]
46//! pg-embedded-setup-unpriv = { version = "...", features = ["async-api"] }
47//! ```
48
49mod cache_integration;
50mod cleanup;
51mod connection;
52mod delegation;
53mod guard;
54mod handle;
55mod installation;
56mod lifecycle;
57mod runtime;
58mod runtime_mode;
59mod shutdown;
60mod startup;
61mod temporary_database;
62mod worker_invoker;
63mod worker_operation;
64
65pub use self::connection::{ConnectionMetadata, TestClusterConnection};
66pub use self::guard::ClusterGuard;
67pub use self::handle::ClusterHandle;
68pub use self::lifecycle::DatabaseName;
69pub use self::temporary_database::TemporaryDatabase;
70#[cfg(any(doc, test, feature = "cluster-unit-tests", feature = "dev-worker"))]
71pub use self::worker_invoker::WorkerInvoker;
72#[doc(hidden)]
73pub use self::worker_operation::WorkerOperation;
74
75use self::runtime::build_runtime;
76use self::runtime_mode::ClusterRuntime;
77#[cfg(feature = "async-api")]
78use self::startup::start_postgres_async;
79use self::startup::{cache_config_from_bootstrap, start_postgres};
80use crate::bootstrap_for_tests;
81use crate::env::ScopedEnv;
82use crate::error::BootstrapResult;
83use crate::observability::LOG_TARGET;
84use std::ops::Deref;
85use tracing::info_span;
86
87/// Embedded `PostgreSQL` instance whose lifecycle follows Rust's drop semantics.
88///
89/// `TestCluster` combines a [`ClusterHandle`] (for cluster access) with a
90/// [`ClusterGuard`] (for lifecycle management). For most use cases, this
91/// combined type is the simplest option.
92///
93/// # Send-Safe Patterns
94///
95/// `TestCluster` is `!Send` because it contains environment guards that must
96/// be dropped on the creating thread. For patterns requiring `Send` (such as
97/// `OnceLock` or rstest timeouts), use [`new_split()`](Self::new_split) to
98/// obtain a `Send`-safe [`ClusterHandle`]:
99///
100/// ```no_run
101/// use std::sync::OnceLock;
102/// use pg_embedded_setup_unpriv::{ClusterHandle, TestCluster};
103///
104/// static SHARED: OnceLock<ClusterHandle> = OnceLock::new();
105///
106/// fn shared_cluster() -> &'static ClusterHandle {
107/// SHARED.get_or_init(|| {
108/// let (handle, guard) = TestCluster::new_split()
109/// .expect("cluster bootstrap failed");
110/// // Forget the guard to prevent shutdown on drop
111/// std::mem::forget(guard);
112/// handle
113/// })
114/// }
115/// ```
116#[derive(Debug)]
117pub struct TestCluster {
118 /// Send-safe handle providing cluster access.
119 pub(crate) handle: ClusterHandle,
120 /// Lifecycle guard managing shutdown and environment restoration.
121 pub(crate) guard: ClusterGuard,
122}
123
124#[cfg(feature = "async-api")]
125enum StopAsyncPath {
126 WorkerManaged,
127 InProcess(Box<postgresql_embedded::PostgreSQL>),
128 Noop,
129}
130
131impl TestCluster {
132 /// Boots a `PostgreSQL` instance configured by [`bootstrap_for_tests`].
133 ///
134 /// The constructor blocks until the underlying server process is running and returns an
135 /// error when startup fails.
136 ///
137 /// # Errors
138 /// Returns an error if the bootstrap configuration cannot be prepared or if starting the
139 /// embedded cluster fails.
140 pub fn new() -> BootstrapResult<Self> {
141 let (handle, guard) = Self::new_split()?;
142 Ok(Self { handle, guard })
143 }
144
145 /// Boots a `PostgreSQL` instance and returns a separate handle and guard.
146 ///
147 /// This constructor is useful for patterns requiring `Send`, such as shared
148 /// cluster fixtures with [`OnceLock`](std::sync::OnceLock) or rstest fixtures
149 /// with timeouts.
150 ///
151 /// # Returns
152 ///
153 /// A tuple of:
154 /// - [`ClusterHandle`]: `Send + Sync` handle for accessing the cluster
155 /// - [`ClusterGuard`]: `!Send` guard managing shutdown and environment
156 ///
157 /// # Errors
158 ///
159 /// Returns an error if the bootstrap configuration cannot be prepared or if
160 /// starting the embedded cluster fails.
161 ///
162 /// # Examples
163 ///
164 /// ## Shared Cluster with `OnceLock`
165 ///
166 /// For shared clusters that run for the entire process lifetime, forget
167 /// the guard to prevent shutdown:
168 ///
169 /// ```no_run
170 /// use std::sync::OnceLock;
171 /// use pg_embedded_setup_unpriv::{ClusterHandle, TestCluster};
172 ///
173 /// static SHARED: OnceLock<ClusterHandle> = OnceLock::new();
174 ///
175 /// fn shared_cluster() -> &'static ClusterHandle {
176 /// SHARED.get_or_init(|| {
177 /// let (handle, guard) = TestCluster::new_split()
178 /// .expect("cluster bootstrap failed");
179 /// // Forget the guard to prevent shutdown on drop
180 /// std::mem::forget(guard);
181 /// handle
182 /// })
183 /// }
184 /// ```
185 ///
186 /// **Warning**: Dropping the guard shuts down the cluster. Do not use the
187 /// handle after the guard has been dropped unless the guard was forgotten.
188 pub fn new_split() -> BootstrapResult<(ClusterHandle, ClusterGuard)> {
189 let span = info_span!(target: LOG_TARGET, "test_cluster");
190 // Resolve cache directory BEFORE applying test environment.
191 // Otherwise, the test sandbox's XDG_CACHE_HOME would be used.
192 let (runtime, env_vars, env_guard, outcome) = {
193 let _entered = span.enter();
194 let initial_bootstrap = bootstrap_for_tests()?;
195 let cache_config = cache_config_from_bootstrap(&initial_bootstrap);
196 let runtime = build_runtime()?;
197 let env_vars = initial_bootstrap.environment.to_env();
198 let env_guard = ScopedEnv::apply(&env_vars);
199 let outcome = start_postgres(&runtime, initial_bootstrap, &env_vars, &cache_config)?;
200 (runtime, env_vars, env_guard, outcome)
201 };
202
203 let handle = ClusterHandle::new(outcome.bootstrap.clone());
204 let guard = ClusterGuard {
205 runtime: ClusterRuntime::Sync(runtime),
206 postgres: outcome.postgres,
207 bootstrap: outcome.bootstrap,
208 is_managed_via_worker: outcome.is_managed_via_worker,
209 env_vars,
210 worker_guard: None,
211 _env_guard: env_guard,
212 _cluster_span: span,
213 };
214
215 Ok((handle, guard))
216 }
217
218 /// Boots a `PostgreSQL` instance asynchronously for use in `#[tokio::test]` contexts.
219 ///
220 /// Unlike [`TestCluster::new`], this constructor does not create its own Tokio runtime.
221 /// Instead, it runs on the caller's async runtime, making it safe to call from within
222 /// `#[tokio::test]` and other async contexts.
223 ///
224 /// **Important:** Clusters created with `start_async()` should be shut down explicitly
225 /// using [`stop_async()`](Self::stop_async). The `Drop` implementation will attempt
226 /// best-effort cleanup but may not succeed if the runtime is no longer available.
227 ///
228 /// # Errors
229 ///
230 /// Returns an error if the bootstrap configuration cannot be prepared or if starting
231 /// the embedded cluster fails.
232 ///
233 /// # Examples
234 ///
235 /// ```no_run
236 /// use pg_embedded_setup_unpriv::TestCluster;
237 ///
238 /// #[tokio::test]
239 /// async fn test_with_embedded_postgres() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
240 /// let cluster = TestCluster::start_async().await?;
241 /// let url = cluster.settings().url("my_database");
242 /// // ... async database operations ...
243 /// cluster.stop_async().await?;
244 /// Ok(())
245 /// }
246 /// ```
247 #[cfg(feature = "async-api")]
248 pub async fn start_async() -> BootstrapResult<Self> {
249 let (handle, guard) = Self::start_async_split().await?;
250 Ok(Self { handle, guard })
251 }
252
253 /// Boots a `PostgreSQL` instance asynchronously and returns a separate handle and guard.
254 ///
255 /// This is the async equivalent of [`new_split()`](Self::new_split).
256 ///
257 /// # Returns
258 ///
259 /// A tuple of:
260 /// - [`ClusterHandle`]: `Send + Sync` handle for accessing the cluster
261 /// - [`ClusterGuard`]: `!Send` guard managing shutdown and environment
262 ///
263 /// # Errors
264 ///
265 /// Returns an error if the bootstrap configuration cannot be prepared or if
266 /// starting the embedded cluster fails.
267 #[cfg(feature = "async-api")]
268 pub async fn start_async_split() -> BootstrapResult<(ClusterHandle, ClusterGuard)> {
269 use tracing::Instrument;
270
271 let span = info_span!(target: LOG_TARGET, "test_cluster", async_mode = true);
272
273 // Sync bootstrap preparation (no await needed).
274 // Resolve cache directory BEFORE applying test environment.
275 // Otherwise, the test sandbox's XDG_CACHE_HOME would be used.
276 let initial_bootstrap = bootstrap_for_tests()?;
277 let cache_config = cache_config_from_bootstrap(&initial_bootstrap);
278 let env_vars = initial_bootstrap.environment.to_env();
279 let env_guard = ScopedEnv::apply(&env_vars);
280
281 // Async postgres startup, instrumented with the span.
282 // Box::pin to avoid large future on the stack.
283 let outcome = Box::pin(start_postgres_async(
284 initial_bootstrap,
285 &env_vars,
286 &cache_config,
287 ))
288 .instrument(span.clone())
289 .await?;
290
291 let handle = ClusterHandle::new(outcome.bootstrap.clone());
292 let guard = ClusterGuard {
293 runtime: ClusterRuntime::Async,
294 postgres: outcome.postgres,
295 bootstrap: outcome.bootstrap,
296 is_managed_via_worker: outcome.is_managed_via_worker,
297 env_vars,
298 worker_guard: None,
299 _env_guard: env_guard,
300 _cluster_span: span,
301 };
302
303 Ok((handle, guard))
304 }
305
306 /// Extends the cluster lifetime to cover additional scoped environment guards.
307 ///
308 /// Primarily used by fixtures that need to ensure `PG_EMBEDDED_WORKER` remains set for the
309 /// duration of the cluster lifetime.
310 #[doc(hidden)]
311 #[must_use]
312 pub fn with_worker_guard(self, worker_guard: Option<ScopedEnv>) -> Self {
313 Self {
314 handle: self.handle,
315 guard: self.guard.with_worker_guard(worker_guard),
316 }
317 }
318
319 #[cfg(feature = "async-api")]
320 fn stop_async_path(&mut self) -> StopAsyncPath {
321 if self.guard.is_managed_via_worker {
322 StopAsyncPath::WorkerManaged
323 } else if let Some(postgres) = self.guard.postgres.take() {
324 StopAsyncPath::InProcess(Box::new(postgres))
325 } else {
326 StopAsyncPath::Noop
327 }
328 }
329
330 /// Explicitly shuts down an async cluster.
331 ///
332 /// This method should be called for clusters created with [`start_async()`](Self::start_async)
333 /// to ensure proper cleanup. It consumes `self` to prevent the `Drop` implementation from
334 /// attempting duplicate shutdown.
335 ///
336 /// For worker-managed clusters (root privileges), the worker subprocess is invoked
337 /// synchronously via `spawn_blocking`.
338 ///
339 /// # Errors
340 ///
341 /// Returns an error if the shutdown operation fails. The cluster resources are released
342 /// regardless of whether shutdown succeeds.
343 ///
344 /// # Examples
345 ///
346 /// ```no_run
347 /// use pg_embedded_setup_unpriv::TestCluster;
348 ///
349 /// #[tokio::test]
350 /// async fn test_explicit_shutdown() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
351 /// let cluster = TestCluster::start_async().await?;
352 /// // ... use cluster ...
353 /// cluster.stop_async().await?;
354 /// Ok(())
355 /// }
356 /// ```
357 #[cfg(feature = "async-api")]
358 pub async fn stop_async(mut self) -> BootstrapResult<()> {
359 let context = shutdown::stop_context(self.handle.settings());
360 shutdown::log_async_stop(&context, self.guard.is_managed_via_worker);
361
362 match self.stop_async_path() {
363 StopAsyncPath::WorkerManaged => {
364 shutdown::stop_worker_managed_async(
365 &self.guard.bootstrap,
366 &self.guard.env_vars,
367 &context,
368 )
369 .await
370 }
371 StopAsyncPath::InProcess(postgres) => {
372 let cleanup = shutdown::InProcessCleanup {
373 cleanup_mode: self.guard.bootstrap.cleanup_mode,
374 settings: &self.guard.bootstrap.settings,
375 context: &context,
376 };
377 shutdown::stop_in_process_async(
378 *postgres,
379 self.guard.bootstrap.shutdown_timeout,
380 cleanup,
381 )
382 .await
383 }
384 StopAsyncPath::Noop => Ok(()),
385 }
386 }
387}
388
389/// Provides transparent access to [`ClusterHandle`] methods.
390///
391/// This allows `TestCluster` to be used interchangeably with `ClusterHandle`
392/// for all read-only operations like `settings()`, `connection()`, etc.
393impl Deref for TestCluster {
394 type Target = ClusterHandle;
395
396 fn deref(&self) -> &Self::Target {
397 &self.handle
398 }
399}
400
401// Note: TestCluster does NOT implement Drop because the ClusterGuard handles shutdown.
402// When TestCluster drops, its _guard field drops, which triggers ClusterGuard::Drop.
403
404#[cfg(test)]
405mod mod_tests;
406
407#[cfg(all(test, feature = "cluster-unit-tests"))]
408mod drop_logging_tests;
409
410#[cfg(all(test, not(feature = "cluster-unit-tests")))]
411#[path = "../../tests/test_cluster.rs"]
412mod test_cluster_tests;