pg_embedded_setup_unpriv/cluster/mod.rs
1//! RAII wrapper that boots an embedded `PostgreSQL` instance for tests.
2//!
3//! The cluster starts during [`TestCluster::new`] and shuts down automatically when the
4//! value drops out of scope.
5//!
6//! # Synchronous API
7//!
8//! Use [`TestCluster::new`] from synchronous contexts or when you want the cluster to
9//! own its own Tokio runtime:
10//!
11//! ```no_run
12//! use pg_embedded_setup_unpriv::TestCluster;
13//!
14//! # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
15//! let cluster = TestCluster::new()?;
16//! let url = cluster.settings().url("my_database");
17//! // Perform test database work here.
18//! drop(cluster); // `PostgreSQL` stops automatically.
19//! # Ok(())
20//! # }
21//! ```
22//!
23//! # Async API
24//!
25//! When running within an existing async runtime (e.g., `#[tokio::test]`), use
26//! [`TestCluster::start_async`] to avoid the "Cannot start a runtime from within a
27//! runtime" panic that occurs when nesting Tokio runtimes:
28//!
29//! ```ignore
30//! use pg_embedded_setup_unpriv::TestCluster;
31//!
32//! #[tokio::test]
33//! async fn test_with_embedded_postgres() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
34//! let cluster = TestCluster::start_async().await?;
35//! let url = cluster.settings().url("my_database");
36//! // ... async database operations ...
37//! cluster.stop_async().await?;
38//! Ok(())
39//! }
40//! ```
41//!
42//! The async API requires the `async-api` feature flag:
43//!
44//! ```toml
45//! [dependencies]
46//! pg-embedded-setup-unpriv = { version = "...", features = ["async-api"] }
47//! ```
48
49mod cache_integration;
50mod connection;
51mod delegation;
52mod installation;
53mod lifecycle;
54mod runtime;
55mod runtime_mode;
56mod shutdown;
57mod startup;
58mod temporary_database;
59mod worker_invoker;
60mod worker_operation;
61
62pub use self::connection::{ConnectionMetadata, TestClusterConnection};
63pub use self::lifecycle::DatabaseName;
64pub use self::temporary_database::TemporaryDatabase;
65#[cfg(any(doc, test, feature = "cluster-unit-tests", feature = "dev-worker"))]
66pub use self::worker_invoker::WorkerInvoker;
67#[doc(hidden)]
68pub use self::worker_operation::WorkerOperation;
69
70use self::runtime::build_runtime;
71use self::runtime_mode::ClusterRuntime;
72#[cfg(feature = "async-api")]
73use self::startup::start_postgres_async;
74use self::startup::{cache_config_from_bootstrap, start_postgres};
75use crate::bootstrap_for_tests;
76use crate::env::ScopedEnv;
77use crate::error::BootstrapResult;
78use crate::observability::LOG_TARGET;
79use crate::{TestBootstrapEnvironment, TestBootstrapSettings};
80use postgresql_embedded::{PostgreSQL, Settings};
81use tracing::{info, info_span};
82
83/// Embedded `PostgreSQL` instance whose lifecycle follows Rust's drop semantics.
84#[derive(Debug)]
85pub struct TestCluster {
86 /// Runtime mode: either owns a runtime (sync) or runs on caller's runtime (async).
87 runtime: ClusterRuntime,
88 postgres: Option<PostgreSQL>,
89 bootstrap: TestBootstrapSettings,
90 is_managed_via_worker: bool,
91 env_vars: Vec<(String, Option<String>)>,
92 worker_guard: Option<ScopedEnv>,
93 _env_guard: ScopedEnv,
94 // Keeps the cluster span alive for the lifetime of the guard.
95 _cluster_span: tracing::Span,
96}
97
98impl TestCluster {
99 /// Boots a `PostgreSQL` instance configured by [`bootstrap_for_tests`].
100 ///
101 /// The constructor blocks until the underlying server process is running and returns an
102 /// error when startup fails.
103 ///
104 /// # Errors
105 /// Returns an error if the bootstrap configuration cannot be prepared or if starting the
106 /// embedded cluster fails.
107 pub fn new() -> BootstrapResult<Self> {
108 let span = info_span!(target: LOG_TARGET, "test_cluster");
109 // Resolve cache directory BEFORE applying test environment.
110 // Otherwise, the test sandbox's XDG_CACHE_HOME would be used.
111 let (runtime, env_vars, env_guard, outcome) = {
112 let _entered = span.enter();
113 let initial_bootstrap = bootstrap_for_tests()?;
114 let cache_config = cache_config_from_bootstrap(&initial_bootstrap);
115 let runtime = build_runtime()?;
116 let env_vars = initial_bootstrap.environment.to_env();
117 let env_guard = ScopedEnv::apply(&env_vars);
118 let outcome = start_postgres(&runtime, initial_bootstrap, &env_vars, &cache_config)?;
119 (runtime, env_vars, env_guard, outcome)
120 };
121
122 Ok(Self {
123 runtime: ClusterRuntime::Sync(runtime),
124 postgres: outcome.postgres,
125 bootstrap: outcome.bootstrap,
126 is_managed_via_worker: outcome.is_managed_via_worker,
127 env_vars,
128 worker_guard: None,
129 _env_guard: env_guard,
130 _cluster_span: span,
131 })
132 }
133
134 /// Boots a `PostgreSQL` instance asynchronously for use in `#[tokio::test]` contexts.
135 ///
136 /// Unlike [`TestCluster::new`], this constructor does not create its own Tokio runtime.
137 /// Instead, it runs on the caller's async runtime, making it safe to call from within
138 /// `#[tokio::test]` and other async contexts.
139 ///
140 /// **Important:** Clusters created with `start_async()` should be shut down explicitly
141 /// using [`stop_async()`](Self::stop_async). The `Drop` implementation will attempt
142 /// best-effort cleanup but may not succeed if the runtime is no longer available.
143 ///
144 /// # Errors
145 ///
146 /// Returns an error if the bootstrap configuration cannot be prepared or if starting
147 /// the embedded cluster fails.
148 ///
149 /// # Examples
150 ///
151 /// ```no_run
152 /// use pg_embedded_setup_unpriv::TestCluster;
153 ///
154 /// #[tokio::test]
155 /// async fn test_with_embedded_postgres() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
156 /// let cluster = TestCluster::start_async().await?;
157 /// let url = cluster.settings().url("my_database");
158 /// // ... async database operations ...
159 /// cluster.stop_async().await?;
160 /// Ok(())
161 /// }
162 /// ```
163 #[cfg(feature = "async-api")]
164 pub async fn start_async() -> BootstrapResult<Self> {
165 use tracing::Instrument;
166
167 let span = info_span!(target: LOG_TARGET, "test_cluster", async_mode = true);
168
169 // Sync bootstrap preparation (no await needed).
170 // Resolve cache directory BEFORE applying test environment.
171 // Otherwise, the test sandbox's XDG_CACHE_HOME would be used.
172 let initial_bootstrap = bootstrap_for_tests()?;
173 let cache_config = cache_config_from_bootstrap(&initial_bootstrap);
174 let env_vars = initial_bootstrap.environment.to_env();
175 let env_guard = ScopedEnv::apply(&env_vars);
176
177 // Async postgres startup, instrumented with the span.
178 // Box::pin to avoid large future on the stack.
179 let outcome = Box::pin(start_postgres_async(
180 initial_bootstrap,
181 &env_vars,
182 &cache_config,
183 ))
184 .instrument(span.clone())
185 .await?;
186
187 Ok(Self {
188 runtime: ClusterRuntime::Async,
189 postgres: outcome.postgres,
190 bootstrap: outcome.bootstrap,
191 is_managed_via_worker: outcome.is_managed_via_worker,
192 env_vars,
193 worker_guard: None,
194 _env_guard: env_guard,
195 _cluster_span: span,
196 })
197 }
198
199 /// Extends the cluster lifetime to cover additional scoped environment guards.
200 ///
201 /// Primarily used by fixtures that need to ensure `PG_EMBEDDED_WORKER` remains set for the
202 /// duration of the cluster lifetime.
203 #[doc(hidden)]
204 #[must_use]
205 pub fn with_worker_guard(mut self, worker_guard: Option<ScopedEnv>) -> Self {
206 self.worker_guard = worker_guard;
207 self
208 }
209
210 /// Explicitly shuts down an async cluster.
211 ///
212 /// This method should be called for clusters created with [`start_async()`](Self::start_async)
213 /// to ensure proper cleanup. It consumes `self` to prevent the `Drop` implementation from
214 /// attempting duplicate shutdown.
215 ///
216 /// For worker-managed clusters (root privileges), the worker subprocess is invoked
217 /// synchronously via `spawn_blocking`.
218 ///
219 /// # Errors
220 ///
221 /// Returns an error if the shutdown operation fails. The cluster resources are released
222 /// regardless of whether shutdown succeeds.
223 ///
224 /// # Examples
225 ///
226 /// ```no_run
227 /// use pg_embedded_setup_unpriv::TestCluster;
228 ///
229 /// #[tokio::test]
230 /// async fn test_explicit_shutdown() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
231 /// let cluster = TestCluster::start_async().await?;
232 /// // ... use cluster ...
233 /// cluster.stop_async().await?;
234 /// Ok(())
235 /// }
236 /// ```
237 #[cfg(feature = "async-api")]
238 pub async fn stop_async(mut self) -> BootstrapResult<()> {
239 let context = shutdown::stop_context(&self.bootstrap.settings);
240 shutdown::log_async_stop(&context, self.is_managed_via_worker);
241
242 if self.is_managed_via_worker {
243 shutdown::stop_worker_managed_async(&self.bootstrap, &self.env_vars, &context).await
244 } else if let Some(postgres) = self.postgres.take() {
245 shutdown::stop_in_process_async(postgres, self.bootstrap.shutdown_timeout, &context)
246 .await
247 } else {
248 Ok(())
249 }
250 }
251
252 /// Returns the prepared `PostgreSQL` settings for the running cluster.
253 pub const fn settings(&self) -> &Settings {
254 &self.bootstrap.settings
255 }
256
257 /// Returns the environment required for clients to interact with the cluster.
258 pub const fn environment(&self) -> &TestBootstrapEnvironment {
259 &self.bootstrap.environment
260 }
261
262 /// Returns the bootstrap metadata captured when the cluster was started.
263 pub const fn bootstrap(&self) -> &TestBootstrapSettings {
264 &self.bootstrap
265 }
266
267 /// Returns helper methods for constructing connection artefacts.
268 ///
269 /// # Examples
270 /// ```no_run
271 /// use pg_embedded_setup_unpriv::TestCluster;
272 ///
273 /// # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
274 /// let cluster = TestCluster::new()?;
275 /// let metadata = cluster.connection().metadata();
276 /// println!(
277 /// "postgresql://{}:***@{}:{}/postgres",
278 /// metadata.superuser(),
279 /// metadata.host(),
280 /// metadata.port(),
281 /// );
282 /// # Ok(())
283 /// # }
284 /// ```
285 #[must_use]
286 pub fn connection(&self) -> TestClusterConnection {
287 TestClusterConnection::new(&self.bootstrap)
288 }
289}
290
291impl Drop for TestCluster {
292 fn drop(&mut self) {
293 let context = shutdown::stop_context(&self.bootstrap.settings);
294 let is_async = self.runtime.is_async();
295 info!(
296 target: LOG_TARGET,
297 context = %context,
298 worker_managed = self.is_managed_via_worker,
299 async_mode = is_async,
300 "stopping embedded postgres cluster"
301 );
302
303 if is_async {
304 // Async clusters should use stop_async() explicitly; attempt best-effort cleanup.
305 shutdown::drop_async_cluster(shutdown::DropContext {
306 is_managed_via_worker: self.is_managed_via_worker,
307 postgres: &mut self.postgres,
308 bootstrap: &self.bootstrap,
309 env_vars: &self.env_vars,
310 context: &context,
311 });
312 } else {
313 self.drop_sync_cluster(&context);
314 }
315 // Environment guards drop after this block, restoring the process state.
316 }
317}
318
319impl TestCluster {
320 /// Synchronous drop path: stops the cluster using the owned runtime.
321 fn drop_sync_cluster(&mut self, context: &str) {
322 let ClusterRuntime::Sync(ref runtime) = self.runtime else {
323 // Should never happen: drop_sync_cluster is only called for sync mode.
324 return;
325 };
326
327 shutdown::drop_sync_cluster(
328 runtime,
329 shutdown::DropContext {
330 is_managed_via_worker: self.is_managed_via_worker,
331 postgres: &mut self.postgres,
332 bootstrap: &self.bootstrap,
333 env_vars: &self.env_vars,
334 context,
335 },
336 );
337 }
338}
339
340#[cfg(test)]
341mod mod_tests;
342
343#[cfg(all(test, feature = "cluster-unit-tests"))]
344mod drop_logging_tests {
345 use crate::test_support::capture_warn_logs;
346
347 use super::shutdown;
348
349 #[test]
350 fn warn_stop_timeout_emits_warning() {
351 let (logs, ()) = capture_warn_logs(|| shutdown::warn_stop_timeout(5, "ctx"));
352 assert!(
353 logs.iter()
354 .any(|line| line.contains("stop() timed out after 5s (ctx)")),
355 "expected timeout warning, got {logs:?}"
356 );
357 }
358
359 #[test]
360 fn warn_stop_failure_emits_warning() {
361 let (logs, ()) = capture_warn_logs(|| shutdown::warn_stop_failure("ctx", &"boom"));
362 assert!(
363 logs.iter()
364 .any(|line| line.contains("failed to stop embedded postgres instance")),
365 "expected failure warning, got {logs:?}"
366 );
367 }
368}
369
370#[cfg(all(test, not(feature = "cluster-unit-tests")))]
371#[path = "../../tests/test_cluster.rs"]
372mod test_cluster_tests;