pg_embedded_setup_unpriv/cluster/
mod.rs1mod connection;
20mod delegation;
21mod lifecycle;
22mod runtime;
23mod temporary_database;
24mod worker_invoker;
25mod worker_operation;
26
27pub use self::connection::{ConnectionMetadata, TestClusterConnection};
28pub use self::lifecycle::DatabaseName;
29pub use self::temporary_database::TemporaryDatabase;
30#[cfg(any(doc, test, feature = "cluster-unit-tests", feature = "dev-worker"))]
31pub use self::worker_invoker::WorkerInvoker;
32#[doc(hidden)]
33pub use self::worker_operation::WorkerOperation;
34
35use self::runtime::build_runtime;
36use self::worker_invoker::WorkerInvoker as ClusterWorkerInvoker;
37use crate::bootstrap_for_tests;
38use crate::env::ScopedEnv;
39use crate::error::BootstrapResult;
40use crate::observability::LOG_TARGET;
41use crate::{ExecutionPrivileges, TestBootstrapEnvironment, TestBootstrapSettings};
42use postgresql_embedded::{PostgreSQL, Settings};
43use std::fmt::Display;
44use tokio::runtime::Runtime;
45use tokio::time;
46use tracing::{info, info_span};
47
48#[derive(Debug)]
50pub struct TestCluster {
51 runtime: Runtime,
52 postgres: Option<PostgreSQL>,
53 bootstrap: TestBootstrapSettings,
54 is_managed_via_worker: bool,
55 env_vars: Vec<(String, Option<String>)>,
56 worker_guard: Option<ScopedEnv>,
57 _env_guard: ScopedEnv,
58 _cluster_span: tracing::Span,
60}
61
62struct StartupOutcome {
63 bootstrap: TestBootstrapSettings,
64 postgres: Option<PostgreSQL>,
65 is_managed_via_worker: bool,
66}
67
68impl TestCluster {
69 pub fn new() -> BootstrapResult<Self> {
78 let span = info_span!(target: LOG_TARGET, "test_cluster");
79 let (runtime, env_vars, env_guard, outcome) = {
80 let _entered = span.enter();
81 let initial_bootstrap = bootstrap_for_tests()?;
82 let runtime = build_runtime()?;
83 let env_vars = initial_bootstrap.environment.to_env();
84 let env_guard = ScopedEnv::apply(&env_vars);
85 let outcome = Self::start_postgres(&runtime, initial_bootstrap, &env_vars)?;
86 (runtime, env_vars, env_guard, outcome)
87 };
88
89 Ok(Self {
90 runtime,
91 postgres: outcome.postgres,
92 bootstrap: outcome.bootstrap,
93 is_managed_via_worker: outcome.is_managed_via_worker,
94 env_vars,
95 worker_guard: None,
96 _env_guard: env_guard,
97 _cluster_span: span,
98 })
99 }
100
101 #[expect(
102 clippy::cognitive_complexity,
103 reason = "privilege-aware lifecycle setup requires explicit branching for observability"
104 )]
105 fn start_postgres(
106 runtime: &Runtime,
107 mut bootstrap: TestBootstrapSettings,
108 env_vars: &[(String, Option<String>)],
109 ) -> BootstrapResult<StartupOutcome> {
110 let privileges = bootstrap.privileges;
111 let mut embedded = PostgreSQL::new(bootstrap.settings.clone());
112 info!(
113 target: LOG_TARGET,
114 privileges = ?privileges,
115 mode = ?bootstrap.execution_mode,
116 "starting embedded postgres lifecycle"
117 );
118
119 let invoker = ClusterWorkerInvoker::new(runtime, &bootstrap, env_vars);
120 Self::invoke_lifecycle(&invoker, &mut embedded)?;
121
122 let is_managed_via_worker = matches!(privileges, ExecutionPrivileges::Root);
123 let postgres =
124 Self::prepare_postgres_handle(is_managed_via_worker, &mut bootstrap, embedded);
125
126 info!(
127 target: LOG_TARGET,
128 privileges = ?privileges,
129 worker_managed = is_managed_via_worker,
130 "embedded postgres started"
131 );
132 Ok(StartupOutcome {
133 bootstrap,
134 postgres,
135 is_managed_via_worker,
136 })
137 }
138
139 fn prepare_postgres_handle(
140 is_managed_via_worker: bool,
141 bootstrap: &mut TestBootstrapSettings,
142 embedded: PostgreSQL,
143 ) -> Option<PostgreSQL> {
144 if is_managed_via_worker {
145 None
146 } else {
147 bootstrap.settings = embedded.settings().clone();
148 Some(embedded)
149 }
150 }
151
152 fn invoke_lifecycle(
153 invoker: &ClusterWorkerInvoker<'_>,
154 embedded: &mut PostgreSQL,
155 ) -> BootstrapResult<()> {
156 invoker.invoke(worker_operation::WorkerOperation::Setup, async {
157 embedded.setup().await
158 })?;
159 invoker.invoke(worker_operation::WorkerOperation::Start, async {
160 embedded.start().await
161 })
162 }
163
164 #[doc(hidden)]
169 #[must_use]
170 pub fn with_worker_guard(mut self, worker_guard: Option<ScopedEnv>) -> Self {
171 self.worker_guard = worker_guard;
172 self
173 }
174
175 pub const fn settings(&self) -> &Settings {
177 &self.bootstrap.settings
178 }
179
180 pub const fn environment(&self) -> &TestBootstrapEnvironment {
182 &self.bootstrap.environment
183 }
184
185 pub const fn bootstrap(&self) -> &TestBootstrapSettings {
187 &self.bootstrap
188 }
189
190 #[must_use]
209 pub fn connection(&self) -> TestClusterConnection {
210 TestClusterConnection::new(&self.bootstrap)
211 }
212
213 fn stop_context(settings: &Settings) -> String {
214 let data_dir = settings.data_dir.display();
215 let version = settings.version.to_string();
216 format!("version {version}, data_dir {data_dir}")
217 }
218
219 fn warn_stop_failure(context: &str, err: &impl Display) {
220 tracing::warn!(
221 "SKIP-TEST-CLUSTER: failed to stop embedded postgres instance ({}): {}",
222 context,
223 err
224 );
225 }
226
227 fn warn_stop_timeout(timeout_secs: u64, context: &str) {
228 tracing::warn!(
229 "SKIP-TEST-CLUSTER: stop() timed out after {timeout_secs}s ({context}); proceeding with drop"
230 );
231 }
232}
233
234#[cfg(test)]
235mod tests {
236 use std::ffi::OsString;
237
238 use super::*;
239 use crate::ExecutionPrivileges;
240 use crate::test_support::{dummy_settings, scoped_env};
241
242 #[test]
243 fn with_worker_guard_restores_environment() {
244 const KEY: &str = "PG_EMBEDDED_WORKER_GUARD_TEST";
245 let baseline = std::env::var(KEY).ok();
246 let guard = scoped_env(vec![(OsString::from(KEY), Some(OsString::from("guarded")))]);
247 let cluster = dummy_cluster().with_worker_guard(Some(guard));
248 assert_eq!(
249 std::env::var(KEY).as_deref(),
250 Ok("guarded"),
251 "worker guard should remain active whilst the cluster runs",
252 );
253 drop(cluster);
254 match baseline {
255 Some(value) => assert_eq!(
256 std::env::var(KEY).as_deref(),
257 Ok(value.as_str()),
258 "worker guard should restore the previous value"
259 ),
260 None => assert!(
261 std::env::var(KEY).is_err(),
262 "worker guard should unset the variable once the cluster drops"
263 ),
264 }
265 }
266
267 fn dummy_cluster() -> TestCluster {
268 let runtime = tokio::runtime::Builder::new_current_thread()
269 .enable_all()
270 .build()
271 .expect("test runtime");
272 let span = info_span!(target: LOG_TARGET, "test_cluster");
273 let bootstrap = dummy_settings(ExecutionPrivileges::Unprivileged);
274 let env_vars = bootstrap.environment.to_env();
275 let env_guard = ScopedEnv::apply(&env_vars);
276 TestCluster {
277 runtime,
278 postgres: None,
279 bootstrap,
280 is_managed_via_worker: false,
281 env_vars,
282 worker_guard: None,
283 _env_guard: env_guard,
284 _cluster_span: span,
285 }
286 }
287}
288
289impl Drop for TestCluster {
290 #[expect(
291 clippy::cognitive_complexity,
292 reason = "drop path must branch between worker and in-process shutdown with logging"
293 )]
294 fn drop(&mut self) {
295 let context = Self::stop_context(&self.bootstrap.settings);
296 info!(
297 target: LOG_TARGET,
298 context = %context,
299 worker_managed = self.is_managed_via_worker,
300 "stopping embedded postgres cluster"
301 );
302
303 if self.is_managed_via_worker {
304 let invoker = ClusterWorkerInvoker::new(&self.runtime, &self.bootstrap, &self.env_vars);
305 if let Err(err) = invoker.invoke_as_root(worker_operation::WorkerOperation::Stop) {
306 Self::warn_stop_failure(&context, &err);
307 }
308 } else if let Some(postgres) = self.postgres.take() {
309 let timeout = self.bootstrap.shutdown_timeout;
310 let timeout_secs = timeout.as_secs();
311 let outcome = self
312 .runtime
313 .block_on(async { time::timeout(timeout, postgres.stop()).await });
314
315 match outcome {
316 Ok(Ok(())) => {}
317 Ok(Err(err)) => Self::warn_stop_failure(&context, &err),
318 Err(_) => Self::warn_stop_timeout(timeout_secs, &context),
319 }
320 }
321 }
323}
324
325#[cfg(all(test, feature = "cluster-unit-tests"))]
326mod drop_logging_tests {
327 use super::*;
328 use crate::test_support::capture_warn_logs;
329
330 #[test]
331 fn warn_stop_timeout_emits_warning() {
332 let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_timeout(5, "ctx"));
333 assert!(
334 logs.iter()
335 .any(|line| line.contains("stop() timed out after 5s (ctx)")),
336 "expected timeout warning, got {logs:?}"
337 );
338 }
339
340 #[test]
341 fn warn_stop_failure_emits_warning() {
342 let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_failure("ctx", &"boom"));
343 assert!(
344 logs.iter()
345 .any(|line| line.contains("failed to stop embedded postgres instance")),
346 "expected failure warning, got {logs:?}"
347 );
348 }
349}
350
351#[cfg(all(test, not(feature = "cluster-unit-tests")))]
352#[path = "../../tests/test_cluster.rs"]
353mod tests;