1mod connection;
50mod delegation;
51mod lifecycle;
52mod runtime;
53mod temporary_database;
54mod worker_invoker;
55mod worker_operation;
56
57pub use self::connection::{ConnectionMetadata, TestClusterConnection};
58pub use self::lifecycle::DatabaseName;
59pub use self::temporary_database::TemporaryDatabase;
60#[cfg(any(doc, test, feature = "cluster-unit-tests", feature = "dev-worker"))]
61pub use self::worker_invoker::WorkerInvoker;
62#[doc(hidden)]
63pub use self::worker_operation::WorkerOperation;
64
65use self::runtime::build_runtime;
66#[cfg(feature = "async-api")]
67use self::worker_invoker::AsyncInvoker;
68use self::worker_invoker::WorkerInvoker as ClusterWorkerInvoker;
69use crate::bootstrap_for_tests;
70use crate::env::ScopedEnv;
71use crate::error::BootstrapResult;
72use crate::observability::LOG_TARGET;
73use crate::{ExecutionPrivileges, TestBootstrapEnvironment, TestBootstrapSettings};
74use postgresql_embedded::{PostgreSQL, Settings};
75use std::fmt::Display;
76use tokio::runtime::Runtime;
77use tokio::time;
78use tracing::{info, info_span};
79
80#[derive(Debug)]
86enum ClusterRuntime {
87 Sync(Runtime),
89 #[cfg_attr(
91 not(feature = "async-api"),
92 expect(dead_code, reason = "used when async-api feature is enabled")
93 )]
94 Async,
95}
96
97impl ClusterRuntime {
98 const fn is_async(&self) -> bool {
100 matches!(self, Self::Async)
101 }
102}
103
104#[derive(Debug)]
106pub struct TestCluster {
107 runtime: ClusterRuntime,
109 postgres: Option<PostgreSQL>,
110 bootstrap: TestBootstrapSettings,
111 is_managed_via_worker: bool,
112 env_vars: Vec<(String, Option<String>)>,
113 worker_guard: Option<ScopedEnv>,
114 _env_guard: ScopedEnv,
115 _cluster_span: tracing::Span,
117}
118
119struct StartupOutcome {
120 bootstrap: TestBootstrapSettings,
121 postgres: Option<PostgreSQL>,
122 is_managed_via_worker: bool,
123}
124
125impl TestCluster {
126 pub fn new() -> BootstrapResult<Self> {
135 let span = info_span!(target: LOG_TARGET, "test_cluster");
136 let (runtime, env_vars, env_guard, outcome) = {
137 let _entered = span.enter();
138 let initial_bootstrap = bootstrap_for_tests()?;
139 let runtime = build_runtime()?;
140 let env_vars = initial_bootstrap.environment.to_env();
141 let env_guard = ScopedEnv::apply(&env_vars);
142 let outcome = Self::start_postgres(&runtime, initial_bootstrap, &env_vars)?;
143 (runtime, env_vars, env_guard, outcome)
144 };
145
146 Ok(Self {
147 runtime: ClusterRuntime::Sync(runtime),
148 postgres: outcome.postgres,
149 bootstrap: outcome.bootstrap,
150 is_managed_via_worker: outcome.is_managed_via_worker,
151 env_vars,
152 worker_guard: None,
153 _env_guard: env_guard,
154 _cluster_span: span,
155 })
156 }
157
158 #[expect(
159 clippy::cognitive_complexity,
160 reason = "privilege-aware lifecycle setup requires explicit branching for observability"
161 )]
162 fn start_postgres(
163 runtime: &Runtime,
164 mut bootstrap: TestBootstrapSettings,
165 env_vars: &[(String, Option<String>)],
166 ) -> BootstrapResult<StartupOutcome> {
167 let privileges = bootstrap.privileges;
168 let mut embedded = PostgreSQL::new(bootstrap.settings.clone());
169 info!(
170 target: LOG_TARGET,
171 privileges = ?privileges,
172 mode = ?bootstrap.execution_mode,
173 "starting embedded postgres lifecycle"
174 );
175
176 let invoker = ClusterWorkerInvoker::new(runtime, &bootstrap, env_vars);
177 Self::invoke_lifecycle(&invoker, &mut embedded)?;
178
179 let is_managed_via_worker = matches!(privileges, ExecutionPrivileges::Root);
180 let postgres =
181 Self::prepare_postgres_handle(is_managed_via_worker, &mut bootstrap, embedded);
182
183 info!(
184 target: LOG_TARGET,
185 privileges = ?privileges,
186 worker_managed = is_managed_via_worker,
187 "embedded postgres started"
188 );
189 Ok(StartupOutcome {
190 bootstrap,
191 postgres,
192 is_managed_via_worker,
193 })
194 }
195
196 fn prepare_postgres_handle(
197 is_managed_via_worker: bool,
198 bootstrap: &mut TestBootstrapSettings,
199 embedded: PostgreSQL,
200 ) -> Option<PostgreSQL> {
201 if is_managed_via_worker {
202 None
203 } else {
204 bootstrap.settings = embedded.settings().clone();
205 Some(embedded)
206 }
207 }
208
209 fn invoke_lifecycle(
210 invoker: &ClusterWorkerInvoker<'_>,
211 embedded: &mut PostgreSQL,
212 ) -> BootstrapResult<()> {
213 invoker.invoke(worker_operation::WorkerOperation::Setup, async {
214 embedded.setup().await
215 })?;
216 invoker.invoke(worker_operation::WorkerOperation::Start, async {
217 embedded.start().await
218 })
219 }
220
221 #[cfg(feature = "async-api")]
251 pub async fn start_async() -> BootstrapResult<Self> {
252 use tracing::Instrument;
253
254 let span = info_span!(target: LOG_TARGET, "test_cluster", async_mode = true);
255
256 let initial_bootstrap = bootstrap_for_tests()?;
258 let env_vars = initial_bootstrap.environment.to_env();
259 let env_guard = ScopedEnv::apply(&env_vars);
260
261 let outcome = Box::pin(Self::start_postgres_async(initial_bootstrap, &env_vars))
264 .instrument(span.clone())
265 .await?;
266
267 Ok(Self {
268 runtime: ClusterRuntime::Async,
269 postgres: outcome.postgres,
270 bootstrap: outcome.bootstrap,
271 is_managed_via_worker: outcome.is_managed_via_worker,
272 env_vars,
273 worker_guard: None,
274 _env_guard: env_guard,
275 _cluster_span: span,
276 })
277 }
278
279 #[cfg(feature = "async-api")]
281 async fn start_postgres_async(
282 mut bootstrap: TestBootstrapSettings,
283 env_vars: &[(String, Option<String>)],
284 ) -> BootstrapResult<StartupOutcome> {
285 let privileges = bootstrap.privileges;
286 let mut embedded = PostgreSQL::new(bootstrap.settings.clone());
287 Self::log_lifecycle_start(privileges, &bootstrap);
288
289 let invoker = AsyncInvoker::new(&bootstrap, env_vars);
290 Box::pin(Self::invoke_lifecycle_async(&invoker, &mut embedded)).await?;
291
292 let is_managed_via_worker = matches!(privileges, ExecutionPrivileges::Root);
293 let postgres =
294 Self::prepare_postgres_handle(is_managed_via_worker, &mut bootstrap, embedded);
295
296 Self::log_lifecycle_complete(privileges, is_managed_via_worker);
297 Ok(StartupOutcome {
298 bootstrap,
299 postgres,
300 is_managed_via_worker,
301 })
302 }
303
304 #[cfg(feature = "async-api")]
305 fn log_lifecycle_start(privileges: ExecutionPrivileges, bootstrap: &TestBootstrapSettings) {
306 info!(
307 target: LOG_TARGET,
308 privileges = ?privileges,
309 mode = ?bootstrap.execution_mode,
310 async_mode = true,
311 "starting embedded postgres lifecycle"
312 );
313 }
314
315 #[cfg(feature = "async-api")]
316 fn log_lifecycle_complete(privileges: ExecutionPrivileges, is_managed_via_worker: bool) {
317 info!(
318 target: LOG_TARGET,
319 privileges = ?privileges,
320 worker_managed = is_managed_via_worker,
321 async_mode = true,
322 "embedded postgres started"
323 );
324 }
325
326 #[cfg(feature = "async-api")]
328 async fn invoke_lifecycle_async(
329 invoker: &AsyncInvoker<'_>,
330 embedded: &mut PostgreSQL,
331 ) -> BootstrapResult<()> {
332 Box::pin(
333 invoker.invoke(worker_operation::WorkerOperation::Setup, async {
334 embedded.setup().await
335 }),
336 )
337 .await?;
338 Box::pin(
339 invoker.invoke(worker_operation::WorkerOperation::Start, async {
340 embedded.start().await
341 }),
342 )
343 .await
344 }
345
346 #[doc(hidden)]
351 #[must_use]
352 pub fn with_worker_guard(mut self, worker_guard: Option<ScopedEnv>) -> Self {
353 self.worker_guard = worker_guard;
354 self
355 }
356
357 #[cfg(feature = "async-api")]
385 pub async fn stop_async(mut self) -> BootstrapResult<()> {
386 let context = Self::stop_context(&self.bootstrap.settings);
387 Self::log_async_stop(&context, self.is_managed_via_worker);
388
389 if self.is_managed_via_worker {
390 Self::stop_worker_managed_async(&self.bootstrap, &self.env_vars, &context).await
391 } else if let Some(postgres) = self.postgres.take() {
392 Self::stop_in_process_async(postgres, self.bootstrap.shutdown_timeout, &context).await
393 } else {
394 Ok(())
395 }
396 }
397
398 #[cfg(feature = "async-api")]
399 fn log_async_stop(context: &str, is_managed_via_worker: bool) {
400 info!(
401 target: LOG_TARGET,
402 context = %context,
403 worker_managed = is_managed_via_worker,
404 async_mode = true,
405 "stopping embedded postgres cluster"
406 );
407 }
408
409 #[cfg(feature = "async-api")]
410 async fn stop_worker_managed_async(
411 bootstrap: &TestBootstrapSettings,
412 env_vars: &[(String, Option<String>)],
413 context: &str,
414 ) -> BootstrapResult<()> {
415 let owned_bootstrap = bootstrap.clone();
416 let owned_env_vars = env_vars.to_vec();
417 let owned_context = context.to_owned();
418 tokio::task::spawn_blocking(move || {
419 Self::stop_via_worker_sync(&owned_bootstrap, &owned_env_vars, &owned_context)
420 })
421 .await
422 .map_err(|err| {
423 crate::error::BootstrapError::from(color_eyre::eyre::eyre!(
424 "worker stop task panicked: {err}"
425 ))
426 })?
427 }
428
429 #[cfg(feature = "async-api")]
430 async fn stop_in_process_async(
431 postgres: PostgreSQL,
432 timeout: std::time::Duration,
433 context: &str,
434 ) -> BootstrapResult<()> {
435 match time::timeout(timeout, postgres.stop()).await {
436 Ok(Ok(())) => Ok(()),
437 Ok(Err(err)) => {
438 Self::warn_stop_failure(context, &err);
439 Err(crate::error::BootstrapError::from(color_eyre::eyre::eyre!(
440 "failed to stop postgres: {err}"
441 )))
442 }
443 Err(_) => {
444 let timeout_secs = timeout.as_secs();
445 Self::warn_stop_timeout(timeout_secs, context);
446 Err(crate::error::BootstrapError::from(color_eyre::eyre::eyre!(
447 "stop timed out after {timeout_secs}s"
448 )))
449 }
450 }
451 }
452
453 #[cfg(feature = "async-api")]
455 fn stop_via_worker_sync(
456 bootstrap: &TestBootstrapSettings,
457 env_vars: &[(String, Option<String>)],
458 context: &str,
459 ) -> BootstrapResult<()> {
460 let runtime = build_runtime()?;
461 let invoker = ClusterWorkerInvoker::new(&runtime, bootstrap, env_vars);
462 invoker
463 .invoke_as_root(worker_operation::WorkerOperation::Stop)
464 .inspect_err(|err| Self::warn_stop_failure(context, err))
465 }
466
467 pub const fn settings(&self) -> &Settings {
469 &self.bootstrap.settings
470 }
471
472 pub const fn environment(&self) -> &TestBootstrapEnvironment {
474 &self.bootstrap.environment
475 }
476
477 pub const fn bootstrap(&self) -> &TestBootstrapSettings {
479 &self.bootstrap
480 }
481
482 #[must_use]
501 pub fn connection(&self) -> TestClusterConnection {
502 TestClusterConnection::new(&self.bootstrap)
503 }
504
505 fn stop_context(settings: &Settings) -> String {
506 let data_dir = settings.data_dir.display();
507 let version = settings.version.to_string();
508 format!("version {version}, data_dir {data_dir}")
509 }
510
511 fn drop_async_cluster(&mut self, context: &str) {
516 Self::warn_async_drop_without_stop(context);
517
518 if self.is_managed_via_worker {
519 self.drop_async_worker_managed(context);
520 } else if let Some(postgres) = self.postgres.take() {
521 self.drop_async_in_process(context, postgres);
522 }
523 }
525
526 fn drop_async_worker_managed(&self, context: &str) {
528 let Ok(handle) = tokio::runtime::Handle::try_current() else {
529 Self::error_no_runtime_for_cleanup(context);
530 return;
531 };
532
533 let bootstrap = self.bootstrap.clone();
534 let env_vars = self.env_vars.clone();
535 let owned_context = context.to_owned();
536
537 drop(handle.spawn(spawn_worker_stop_task(bootstrap, env_vars, owned_context)));
538 }
539
540 fn drop_async_in_process(&self, context: &str, postgres: PostgreSQL) {
542 let Ok(handle) = tokio::runtime::Handle::try_current() else {
543 Self::error_no_runtime_for_cleanup(context);
544 return;
545 };
546
547 spawn_async_cleanup(&handle, postgres, self.bootstrap.shutdown_timeout);
548 }
549
550 fn warn_async_drop_without_stop(context: &str) {
551 tracing::warn!(
552 target: LOG_TARGET,
553 context = %context,
554 concat!(
555 "async TestCluster dropped without calling stop_async(); ",
556 "attempting best-effort cleanup"
557 )
558 );
559 }
560
561 fn error_no_runtime_for_cleanup(context: &str) {
562 tracing::error!(
563 target: LOG_TARGET,
564 context = %context,
565 "no async runtime available for cleanup; resources may leak"
566 );
567 }
568
569 fn warn_stop_failure(context: &str, err: &impl Display) {
570 tracing::warn!(
571 "SKIP-TEST-CLUSTER: failed to stop embedded postgres instance ({}): {}",
572 context,
573 err
574 );
575 }
576
577 fn warn_stop_timeout(timeout_secs: u64, context: &str) {
578 tracing::warn!(
579 "SKIP-TEST-CLUSTER: stop() timed out after {timeout_secs}s ({context}); proceeding with drop"
580 );
581 }
582}
583
584#[cfg(test)]
585mod tests {
586 use std::ffi::OsString;
587
588 use super::*;
589 use crate::ExecutionPrivileges;
590 use crate::test_support::{dummy_settings, scoped_env};
591
592 #[test]
593 fn with_worker_guard_restores_environment() {
594 const KEY: &str = "PG_EMBEDDED_WORKER_GUARD_TEST";
595 let baseline = std::env::var(KEY).ok();
596 let guard = scoped_env(vec![(OsString::from(KEY), Some(OsString::from("guarded")))]);
597 let cluster = dummy_cluster().with_worker_guard(Some(guard));
598 assert_eq!(
599 std::env::var(KEY).as_deref(),
600 Ok("guarded"),
601 "worker guard should remain active whilst the cluster runs",
602 );
603 drop(cluster);
604 match baseline {
605 Some(value) => assert_eq!(
606 std::env::var(KEY).as_deref(),
607 Ok(value.as_str()),
608 "worker guard should restore the previous value"
609 ),
610 None => assert!(
611 std::env::var(KEY).is_err(),
612 "worker guard should unset the variable once the cluster drops"
613 ),
614 }
615 }
616
617 fn dummy_cluster() -> TestCluster {
618 let runtime = tokio::runtime::Builder::new_current_thread()
619 .enable_all()
620 .build()
621 .expect("test runtime");
622 let span = info_span!(target: LOG_TARGET, "test_cluster");
623 let bootstrap = dummy_settings(ExecutionPrivileges::Unprivileged);
624 let env_vars = bootstrap.environment.to_env();
625 let env_guard = ScopedEnv::apply(&env_vars);
626 TestCluster {
627 runtime: ClusterRuntime::Sync(runtime),
628 postgres: None,
629 bootstrap,
630 is_managed_via_worker: false,
631 env_vars,
632 worker_guard: None,
633 _env_guard: env_guard,
634 _cluster_span: span,
635 }
636 }
637}
638
639fn spawn_async_cleanup(
643 handle: &tokio::runtime::Handle,
644 postgres: PostgreSQL,
645 timeout: std::time::Duration,
646) {
647 drop(handle.spawn(async move {
648 match time::timeout(timeout, postgres.stop()).await {
649 Ok(Ok(())) => {
650 tracing::debug!(target: LOG_TARGET, "async cleanup completed successfully");
651 }
652 Ok(Err(err)) => {
653 tracing::debug!(
654 target: LOG_TARGET,
655 error = %err,
656 "async cleanup failed during postgres stop"
657 );
658 }
659 Err(_) => {
660 tracing::debug!(
661 target: LOG_TARGET,
662 timeout_secs = timeout.as_secs(),
663 "async cleanup timed out"
664 );
665 }
666 }
667 }));
668}
669
670#[expect(
675 clippy::cognitive_complexity,
676 reason = "complexity is from spawn_blocking + error! macro expansion, not logic"
677)]
678async fn spawn_worker_stop_task(
679 bootstrap: TestBootstrapSettings,
680 env_vars: Vec<(String, Option<String>)>,
681 context: String,
682) {
683 let result =
684 tokio::task::spawn_blocking(move || worker_stop_sync(&bootstrap, &env_vars, &context))
685 .await;
686
687 if let Err(err) = result {
688 tracing::error!(
689 target: LOG_TARGET,
690 error = %err,
691 "worker stop task panicked during async drop"
692 );
693 }
694}
695
696fn worker_stop_sync(
700 bootstrap: &TestBootstrapSettings,
701 env_vars: &[(String, Option<String>)],
702 context: &str,
703) {
704 let Ok(runtime) = build_runtime() else {
705 tracing::error!(
706 target: LOG_TARGET,
707 "failed to build runtime for worker stop during async drop"
708 );
709 return;
710 };
711
712 let invoker = ClusterWorkerInvoker::new(&runtime, bootstrap, env_vars);
713 if let Err(err) = invoker.invoke_as_root(worker_operation::WorkerOperation::Stop) {
714 TestCluster::warn_stop_failure(context, &err);
715 }
716}
717
718impl Drop for TestCluster {
719 fn drop(&mut self) {
720 let context = Self::stop_context(&self.bootstrap.settings);
721 let is_async = self.runtime.is_async();
722 info!(
723 target: LOG_TARGET,
724 context = %context,
725 worker_managed = self.is_managed_via_worker,
726 async_mode = is_async,
727 "stopping embedded postgres cluster"
728 );
729
730 if is_async {
731 self.drop_async_cluster(&context);
733 } else {
734 self.drop_sync_cluster(&context);
735 }
736 }
738}
739
740impl TestCluster {
741 fn drop_sync_cluster(&mut self, context: &str) {
743 let ClusterRuntime::Sync(ref runtime) = self.runtime else {
744 return;
746 };
747
748 if self.is_managed_via_worker {
749 let invoker = ClusterWorkerInvoker::new(runtime, &self.bootstrap, &self.env_vars);
750 if let Err(err) = invoker.invoke_as_root(worker_operation::WorkerOperation::Stop) {
751 Self::warn_stop_failure(context, &err);
752 }
753 return;
754 }
755
756 let Some(postgres) = self.postgres.take() else {
757 return;
758 };
759
760 let timeout = self.bootstrap.shutdown_timeout;
761 let timeout_secs = timeout.as_secs();
762 let outcome = runtime.block_on(async { time::timeout(timeout, postgres.stop()).await });
763
764 match outcome {
765 Ok(Ok(())) => {}
766 Ok(Err(err)) => Self::warn_stop_failure(context, &err),
767 Err(_) => Self::warn_stop_timeout(timeout_secs, context),
768 }
769 }
770}
771
772#[cfg(all(test, feature = "cluster-unit-tests"))]
773mod drop_logging_tests {
774 use super::*;
775 use crate::test_support::capture_warn_logs;
776
777 #[test]
778 fn warn_stop_timeout_emits_warning() {
779 let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_timeout(5, "ctx"));
780 assert!(
781 logs.iter()
782 .any(|line| line.contains("stop() timed out after 5s (ctx)")),
783 "expected timeout warning, got {logs:?}"
784 );
785 }
786
787 #[test]
788 fn warn_stop_failure_emits_warning() {
789 let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_failure("ctx", &"boom"));
790 assert!(
791 logs.iter()
792 .any(|line| line.contains("failed to stop embedded postgres instance")),
793 "expected failure warning, got {logs:?}"
794 );
795 }
796}
797
798#[cfg(all(test, not(feature = "cluster-unit-tests")))]
799#[path = "../../tests/test_cluster.rs"]
800mod tests;