pg_embedded_setup_unpriv/cluster/
mod.rs

1//! RAII wrapper that boots an embedded `PostgreSQL` instance for tests.
2//!
3//! The cluster starts during [`TestCluster::new`] and shuts down automatically when the
4//! value drops out of scope.
5//!
6//! # Synchronous API
7//!
8//! Use [`TestCluster::new`] from synchronous contexts or when you want the cluster to
9//! own its own Tokio runtime:
10//!
11//! ```no_run
12//! use pg_embedded_setup_unpriv::TestCluster;
13//!
14//! # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
15//! let cluster = TestCluster::new()?;
16//! let url = cluster.settings().url("my_database");
17//! // Perform test database work here.
18//! drop(cluster); // `PostgreSQL` stops automatically.
19//! # Ok(())
20//! # }
21//! ```
22//!
23//! # Async API
24//!
25//! When running within an existing async runtime (e.g., `#[tokio::test]`), use
26//! [`TestCluster::start_async`] to avoid the "Cannot start a runtime from within a
27//! runtime" panic that occurs when nesting Tokio runtimes:
28//!
29//! ```ignore
30//! use pg_embedded_setup_unpriv::TestCluster;
31//!
32//! #[tokio::test]
33//! async fn test_with_embedded_postgres() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
34//!     let cluster = TestCluster::start_async().await?;
35//!     let url = cluster.settings().url("my_database");
36//!     // ... async database operations ...
37//!     cluster.stop_async().await?;
38//!     Ok(())
39//! }
40//! ```
41//!
42//! The async API requires the `async-api` feature flag:
43//!
44//! ```toml
45//! [dependencies]
46//! pg-embedded-setup-unpriv = { version = "...", features = ["async-api"] }
47//! ```
48
49mod connection;
50mod delegation;
51mod lifecycle;
52mod runtime;
53mod temporary_database;
54mod worker_invoker;
55mod worker_operation;
56
57pub use self::connection::{ConnectionMetadata, TestClusterConnection};
58pub use self::lifecycle::DatabaseName;
59pub use self::temporary_database::TemporaryDatabase;
60#[cfg(any(doc, test, feature = "cluster-unit-tests", feature = "dev-worker"))]
61pub use self::worker_invoker::WorkerInvoker;
62#[doc(hidden)]
63pub use self::worker_operation::WorkerOperation;
64
65use self::runtime::build_runtime;
66#[cfg(feature = "async-api")]
67use self::worker_invoker::AsyncInvoker;
68use self::worker_invoker::WorkerInvoker as ClusterWorkerInvoker;
69use crate::bootstrap_for_tests;
70use crate::env::ScopedEnv;
71use crate::error::BootstrapResult;
72use crate::observability::LOG_TARGET;
73use crate::{ExecutionPrivileges, TestBootstrapEnvironment, TestBootstrapSettings};
74use postgresql_embedded::{PostgreSQL, Settings};
75use std::fmt::Display;
76use tokio::runtime::Runtime;
77use tokio::time;
78use tracing::{info, info_span};
79
80/// Encodes the runtime mode for a `TestCluster`.
81///
82/// This enum eliminates the need for separate `runtime: Option<Runtime>` and
83/// `is_async_mode: bool` fields, preventing invalid states where the two could
84/// disagree.
85#[derive(Debug)]
86enum ClusterRuntime {
87    /// Synchronous mode: the cluster owns its own Tokio runtime.
88    Sync(Runtime),
89    /// Async mode: the cluster runs on the caller's runtime.
90    #[cfg_attr(
91        not(feature = "async-api"),
92        expect(dead_code, reason = "used when async-api feature is enabled")
93    )]
94    Async,
95}
96
97impl ClusterRuntime {
98    /// Returns `true` if this is async mode.
99    const fn is_async(&self) -> bool {
100        matches!(self, Self::Async)
101    }
102}
103
104/// Embedded `PostgreSQL` instance whose lifecycle follows Rust's drop semantics.
105#[derive(Debug)]
106pub struct TestCluster {
107    /// Runtime mode: either owns a runtime (sync) or runs on caller's runtime (async).
108    runtime: ClusterRuntime,
109    postgres: Option<PostgreSQL>,
110    bootstrap: TestBootstrapSettings,
111    is_managed_via_worker: bool,
112    env_vars: Vec<(String, Option<String>)>,
113    worker_guard: Option<ScopedEnv>,
114    _env_guard: ScopedEnv,
115    // Keeps the cluster span alive for the lifetime of the guard.
116    _cluster_span: tracing::Span,
117}
118
119struct StartupOutcome {
120    bootstrap: TestBootstrapSettings,
121    postgres: Option<PostgreSQL>,
122    is_managed_via_worker: bool,
123}
124
125impl TestCluster {
126    /// Boots a `PostgreSQL` instance configured by [`bootstrap_for_tests`].
127    ///
128    /// The constructor blocks until the underlying server process is running and returns an
129    /// error when startup fails.
130    ///
131    /// # Errors
132    /// Returns an error if the bootstrap configuration cannot be prepared or if starting the
133    /// embedded cluster fails.
134    pub fn new() -> BootstrapResult<Self> {
135        let span = info_span!(target: LOG_TARGET, "test_cluster");
136        let (runtime, env_vars, env_guard, outcome) = {
137            let _entered = span.enter();
138            let initial_bootstrap = bootstrap_for_tests()?;
139            let runtime = build_runtime()?;
140            let env_vars = initial_bootstrap.environment.to_env();
141            let env_guard = ScopedEnv::apply(&env_vars);
142            let outcome = Self::start_postgres(&runtime, initial_bootstrap, &env_vars)?;
143            (runtime, env_vars, env_guard, outcome)
144        };
145
146        Ok(Self {
147            runtime: ClusterRuntime::Sync(runtime),
148            postgres: outcome.postgres,
149            bootstrap: outcome.bootstrap,
150            is_managed_via_worker: outcome.is_managed_via_worker,
151            env_vars,
152            worker_guard: None,
153            _env_guard: env_guard,
154            _cluster_span: span,
155        })
156    }
157
158    #[expect(
159        clippy::cognitive_complexity,
160        reason = "privilege-aware lifecycle setup requires explicit branching for observability"
161    )]
162    fn start_postgres(
163        runtime: &Runtime,
164        mut bootstrap: TestBootstrapSettings,
165        env_vars: &[(String, Option<String>)],
166    ) -> BootstrapResult<StartupOutcome> {
167        let privileges = bootstrap.privileges;
168        let mut embedded = PostgreSQL::new(bootstrap.settings.clone());
169        info!(
170            target: LOG_TARGET,
171            privileges = ?privileges,
172            mode = ?bootstrap.execution_mode,
173            "starting embedded postgres lifecycle"
174        );
175
176        let invoker = ClusterWorkerInvoker::new(runtime, &bootstrap, env_vars);
177        Self::invoke_lifecycle(&invoker, &mut embedded)?;
178
179        let is_managed_via_worker = matches!(privileges, ExecutionPrivileges::Root);
180        let postgres =
181            Self::prepare_postgres_handle(is_managed_via_worker, &mut bootstrap, embedded);
182
183        info!(
184            target: LOG_TARGET,
185            privileges = ?privileges,
186            worker_managed = is_managed_via_worker,
187            "embedded postgres started"
188        );
189        Ok(StartupOutcome {
190            bootstrap,
191            postgres,
192            is_managed_via_worker,
193        })
194    }
195
196    fn prepare_postgres_handle(
197        is_managed_via_worker: bool,
198        bootstrap: &mut TestBootstrapSettings,
199        embedded: PostgreSQL,
200    ) -> Option<PostgreSQL> {
201        if is_managed_via_worker {
202            None
203        } else {
204            bootstrap.settings = embedded.settings().clone();
205            Some(embedded)
206        }
207    }
208
209    fn invoke_lifecycle(
210        invoker: &ClusterWorkerInvoker<'_>,
211        embedded: &mut PostgreSQL,
212    ) -> BootstrapResult<()> {
213        invoker.invoke(worker_operation::WorkerOperation::Setup, async {
214            embedded.setup().await
215        })?;
216        invoker.invoke(worker_operation::WorkerOperation::Start, async {
217            embedded.start().await
218        })
219    }
220
221    /// Boots a `PostgreSQL` instance asynchronously for use in `#[tokio::test]` contexts.
222    ///
223    /// Unlike [`TestCluster::new`], this constructor does not create its own Tokio runtime.
224    /// Instead, it runs on the caller's async runtime, making it safe to call from within
225    /// `#[tokio::test]` and other async contexts.
226    ///
227    /// **Important:** Clusters created with `start_async()` should be shut down explicitly
228    /// using [`stop_async()`](Self::stop_async). The `Drop` implementation will attempt
229    /// best-effort cleanup but may not succeed if the runtime is no longer available.
230    ///
231    /// # Errors
232    ///
233    /// Returns an error if the bootstrap configuration cannot be prepared or if starting
234    /// the embedded cluster fails.
235    ///
236    /// # Examples
237    ///
238    /// ```no_run
239    /// use pg_embedded_setup_unpriv::TestCluster;
240    ///
241    /// #[tokio::test]
242    /// async fn test_with_embedded_postgres() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
243    ///     let cluster = TestCluster::start_async().await?;
244    ///     let url = cluster.settings().url("my_database");
245    ///     // ... async database operations ...
246    ///     cluster.stop_async().await?;
247    ///     Ok(())
248    /// }
249    /// ```
250    #[cfg(feature = "async-api")]
251    pub async fn start_async() -> BootstrapResult<Self> {
252        use tracing::Instrument;
253
254        let span = info_span!(target: LOG_TARGET, "test_cluster", async_mode = true);
255
256        // Sync bootstrap preparation (no await needed).
257        let initial_bootstrap = bootstrap_for_tests()?;
258        let env_vars = initial_bootstrap.environment.to_env();
259        let env_guard = ScopedEnv::apply(&env_vars);
260
261        // Async postgres startup, instrumented with the span.
262        // Box::pin to avoid large future on the stack.
263        let outcome = Box::pin(Self::start_postgres_async(initial_bootstrap, &env_vars))
264            .instrument(span.clone())
265            .await?;
266
267        Ok(Self {
268            runtime: ClusterRuntime::Async,
269            postgres: outcome.postgres,
270            bootstrap: outcome.bootstrap,
271            is_managed_via_worker: outcome.is_managed_via_worker,
272            env_vars,
273            worker_guard: None,
274            _env_guard: env_guard,
275            _cluster_span: span,
276        })
277    }
278
279    /// Async variant of `start_postgres` that runs on the caller's runtime.
280    #[cfg(feature = "async-api")]
281    async fn start_postgres_async(
282        mut bootstrap: TestBootstrapSettings,
283        env_vars: &[(String, Option<String>)],
284    ) -> BootstrapResult<StartupOutcome> {
285        let privileges = bootstrap.privileges;
286        let mut embedded = PostgreSQL::new(bootstrap.settings.clone());
287        Self::log_lifecycle_start(privileges, &bootstrap);
288
289        let invoker = AsyncInvoker::new(&bootstrap, env_vars);
290        Box::pin(Self::invoke_lifecycle_async(&invoker, &mut embedded)).await?;
291
292        let is_managed_via_worker = matches!(privileges, ExecutionPrivileges::Root);
293        let postgres =
294            Self::prepare_postgres_handle(is_managed_via_worker, &mut bootstrap, embedded);
295
296        Self::log_lifecycle_complete(privileges, is_managed_via_worker);
297        Ok(StartupOutcome {
298            bootstrap,
299            postgres,
300            is_managed_via_worker,
301        })
302    }
303
304    #[cfg(feature = "async-api")]
305    fn log_lifecycle_start(privileges: ExecutionPrivileges, bootstrap: &TestBootstrapSettings) {
306        info!(
307            target: LOG_TARGET,
308            privileges = ?privileges,
309            mode = ?bootstrap.execution_mode,
310            async_mode = true,
311            "starting embedded postgres lifecycle"
312        );
313    }
314
315    #[cfg(feature = "async-api")]
316    fn log_lifecycle_complete(privileges: ExecutionPrivileges, is_managed_via_worker: bool) {
317        info!(
318            target: LOG_TARGET,
319            privileges = ?privileges,
320            worker_managed = is_managed_via_worker,
321            async_mode = true,
322            "embedded postgres started"
323        );
324    }
325
326    /// Async variant of `invoke_lifecycle`.
327    #[cfg(feature = "async-api")]
328    async fn invoke_lifecycle_async(
329        invoker: &AsyncInvoker<'_>,
330        embedded: &mut PostgreSQL,
331    ) -> BootstrapResult<()> {
332        Box::pin(
333            invoker.invoke(worker_operation::WorkerOperation::Setup, async {
334                embedded.setup().await
335            }),
336        )
337        .await?;
338        Box::pin(
339            invoker.invoke(worker_operation::WorkerOperation::Start, async {
340                embedded.start().await
341            }),
342        )
343        .await
344    }
345
346    /// Extends the cluster lifetime to cover additional scoped environment guards.
347    ///
348    /// Primarily used by fixtures that need to ensure `PG_EMBEDDED_WORKER` remains set for the
349    /// duration of the cluster lifetime.
350    #[doc(hidden)]
351    #[must_use]
352    pub fn with_worker_guard(mut self, worker_guard: Option<ScopedEnv>) -> Self {
353        self.worker_guard = worker_guard;
354        self
355    }
356
357    /// Explicitly shuts down an async cluster.
358    ///
359    /// This method should be called for clusters created with [`start_async()`](Self::start_async)
360    /// to ensure proper cleanup. It consumes `self` to prevent the `Drop` implementation from
361    /// attempting duplicate shutdown.
362    ///
363    /// For worker-managed clusters (root privileges), the worker subprocess is invoked
364    /// synchronously via `spawn_blocking`.
365    ///
366    /// # Errors
367    ///
368    /// Returns an error if the shutdown operation fails. The cluster resources are released
369    /// regardless of whether shutdown succeeds.
370    ///
371    /// # Examples
372    ///
373    /// ```no_run
374    /// use pg_embedded_setup_unpriv::TestCluster;
375    ///
376    /// #[tokio::test]
377    /// async fn test_explicit_shutdown() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
378    ///     let cluster = TestCluster::start_async().await?;
379    ///     // ... use cluster ...
380    ///     cluster.stop_async().await?;
381    ///     Ok(())
382    /// }
383    /// ```
384    #[cfg(feature = "async-api")]
385    pub async fn stop_async(mut self) -> BootstrapResult<()> {
386        let context = Self::stop_context(&self.bootstrap.settings);
387        Self::log_async_stop(&context, self.is_managed_via_worker);
388
389        if self.is_managed_via_worker {
390            Self::stop_worker_managed_async(&self.bootstrap, &self.env_vars, &context).await
391        } else if let Some(postgres) = self.postgres.take() {
392            Self::stop_in_process_async(postgres, self.bootstrap.shutdown_timeout, &context).await
393        } else {
394            Ok(())
395        }
396    }
397
398    #[cfg(feature = "async-api")]
399    fn log_async_stop(context: &str, is_managed_via_worker: bool) {
400        info!(
401            target: LOG_TARGET,
402            context = %context,
403            worker_managed = is_managed_via_worker,
404            async_mode = true,
405            "stopping embedded postgres cluster"
406        );
407    }
408
409    #[cfg(feature = "async-api")]
410    async fn stop_worker_managed_async(
411        bootstrap: &TestBootstrapSettings,
412        env_vars: &[(String, Option<String>)],
413        context: &str,
414    ) -> BootstrapResult<()> {
415        let owned_bootstrap = bootstrap.clone();
416        let owned_env_vars = env_vars.to_vec();
417        let owned_context = context.to_owned();
418        tokio::task::spawn_blocking(move || {
419            Self::stop_via_worker_sync(&owned_bootstrap, &owned_env_vars, &owned_context)
420        })
421        .await
422        .map_err(|err| {
423            crate::error::BootstrapError::from(color_eyre::eyre::eyre!(
424                "worker stop task panicked: {err}"
425            ))
426        })?
427    }
428
429    #[cfg(feature = "async-api")]
430    async fn stop_in_process_async(
431        postgres: PostgreSQL,
432        timeout: std::time::Duration,
433        context: &str,
434    ) -> BootstrapResult<()> {
435        match time::timeout(timeout, postgres.stop()).await {
436            Ok(Ok(())) => Ok(()),
437            Ok(Err(err)) => {
438                Self::warn_stop_failure(context, &err);
439                Err(crate::error::BootstrapError::from(color_eyre::eyre::eyre!(
440                    "failed to stop postgres: {err}"
441                )))
442            }
443            Err(_) => {
444                let timeout_secs = timeout.as_secs();
445                Self::warn_stop_timeout(timeout_secs, context);
446                Err(crate::error::BootstrapError::from(color_eyre::eyre::eyre!(
447                    "stop timed out after {timeout_secs}s"
448                )))
449            }
450        }
451    }
452
453    /// Synchronous worker stop for use with `spawn_blocking`.
454    #[cfg(feature = "async-api")]
455    fn stop_via_worker_sync(
456        bootstrap: &TestBootstrapSettings,
457        env_vars: &[(String, Option<String>)],
458        context: &str,
459    ) -> BootstrapResult<()> {
460        let runtime = build_runtime()?;
461        let invoker = ClusterWorkerInvoker::new(&runtime, bootstrap, env_vars);
462        invoker
463            .invoke_as_root(worker_operation::WorkerOperation::Stop)
464            .inspect_err(|err| Self::warn_stop_failure(context, err))
465    }
466
467    /// Returns the prepared `PostgreSQL` settings for the running cluster.
468    pub const fn settings(&self) -> &Settings {
469        &self.bootstrap.settings
470    }
471
472    /// Returns the environment required for clients to interact with the cluster.
473    pub const fn environment(&self) -> &TestBootstrapEnvironment {
474        &self.bootstrap.environment
475    }
476
477    /// Returns the bootstrap metadata captured when the cluster was started.
478    pub const fn bootstrap(&self) -> &TestBootstrapSettings {
479        &self.bootstrap
480    }
481
482    /// Returns helper methods for constructing connection artefacts.
483    ///
484    /// # Examples
485    /// ```no_run
486    /// use pg_embedded_setup_unpriv::TestCluster;
487    ///
488    /// # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
489    /// let cluster = TestCluster::new()?;
490    /// let metadata = cluster.connection().metadata();
491    /// println!(
492    ///     "postgresql://{}:***@{}:{}/postgres",
493    ///     metadata.superuser(),
494    ///     metadata.host(),
495    ///     metadata.port(),
496    /// );
497    /// # Ok(())
498    /// # }
499    /// ```
500    #[must_use]
501    pub fn connection(&self) -> TestClusterConnection {
502        TestClusterConnection::new(&self.bootstrap)
503    }
504
505    fn stop_context(settings: &Settings) -> String {
506        let data_dir = settings.data_dir.display();
507        let version = settings.version.to_string();
508        format!("version {version}, data_dir {data_dir}")
509    }
510
511    /// Best-effort cleanup for async clusters dropped without `stop_async()`.
512    ///
513    /// Attempts to spawn cleanup on the current runtime handle if available.
514    /// For worker-managed clusters, attempts to invoke the worker stop operation.
515    fn drop_async_cluster(&mut self, context: &str) {
516        Self::warn_async_drop_without_stop(context);
517
518        if self.is_managed_via_worker {
519            self.drop_async_worker_managed(context);
520        } else if let Some(postgres) = self.postgres.take() {
521            self.drop_async_in_process(context, postgres);
522        }
523        // If neither worker-managed nor has postgres handle, already cleaned up via stop_async().
524    }
525
526    /// Best-effort worker stop for async clusters dropped without `stop_async()`.
527    fn drop_async_worker_managed(&self, context: &str) {
528        let Ok(handle) = tokio::runtime::Handle::try_current() else {
529            Self::error_no_runtime_for_cleanup(context);
530            return;
531        };
532
533        let bootstrap = self.bootstrap.clone();
534        let env_vars = self.env_vars.clone();
535        let owned_context = context.to_owned();
536
537        drop(handle.spawn(spawn_worker_stop_task(bootstrap, env_vars, owned_context)));
538    }
539
540    /// Best-effort in-process stop for async clusters dropped without `stop_async()`.
541    fn drop_async_in_process(&self, context: &str, postgres: PostgreSQL) {
542        let Ok(handle) = tokio::runtime::Handle::try_current() else {
543            Self::error_no_runtime_for_cleanup(context);
544            return;
545        };
546
547        spawn_async_cleanup(&handle, postgres, self.bootstrap.shutdown_timeout);
548    }
549
550    fn warn_async_drop_without_stop(context: &str) {
551        tracing::warn!(
552            target: LOG_TARGET,
553            context = %context,
554            concat!(
555                "async TestCluster dropped without calling stop_async(); ",
556                "attempting best-effort cleanup"
557            )
558        );
559    }
560
561    fn error_no_runtime_for_cleanup(context: &str) {
562        tracing::error!(
563            target: LOG_TARGET,
564            context = %context,
565            "no async runtime available for cleanup; resources may leak"
566        );
567    }
568
569    fn warn_stop_failure(context: &str, err: &impl Display) {
570        tracing::warn!(
571            "SKIP-TEST-CLUSTER: failed to stop embedded postgres instance ({}): {}",
572            context,
573            err
574        );
575    }
576
577    fn warn_stop_timeout(timeout_secs: u64, context: &str) {
578        tracing::warn!(
579            "SKIP-TEST-CLUSTER: stop() timed out after {timeout_secs}s ({context}); proceeding with drop"
580        );
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use std::ffi::OsString;
587
588    use super::*;
589    use crate::ExecutionPrivileges;
590    use crate::test_support::{dummy_settings, scoped_env};
591
592    #[test]
593    fn with_worker_guard_restores_environment() {
594        const KEY: &str = "PG_EMBEDDED_WORKER_GUARD_TEST";
595        let baseline = std::env::var(KEY).ok();
596        let guard = scoped_env(vec![(OsString::from(KEY), Some(OsString::from("guarded")))]);
597        let cluster = dummy_cluster().with_worker_guard(Some(guard));
598        assert_eq!(
599            std::env::var(KEY).as_deref(),
600            Ok("guarded"),
601            "worker guard should remain active whilst the cluster runs",
602        );
603        drop(cluster);
604        match baseline {
605            Some(value) => assert_eq!(
606                std::env::var(KEY).as_deref(),
607                Ok(value.as_str()),
608                "worker guard should restore the previous value"
609            ),
610            None => assert!(
611                std::env::var(KEY).is_err(),
612                "worker guard should unset the variable once the cluster drops"
613            ),
614        }
615    }
616
617    fn dummy_cluster() -> TestCluster {
618        let runtime = tokio::runtime::Builder::new_current_thread()
619            .enable_all()
620            .build()
621            .expect("test runtime");
622        let span = info_span!(target: LOG_TARGET, "test_cluster");
623        let bootstrap = dummy_settings(ExecutionPrivileges::Unprivileged);
624        let env_vars = bootstrap.environment.to_env();
625        let env_guard = ScopedEnv::apply(&env_vars);
626        TestCluster {
627            runtime: ClusterRuntime::Sync(runtime),
628            postgres: None,
629            bootstrap,
630            is_managed_via_worker: false,
631            env_vars,
632            worker_guard: None,
633            _env_guard: env_guard,
634            _cluster_span: span,
635        }
636    }
637}
638
639/// Spawns async cleanup of a `PostgreSQL` instance on the provided runtime handle.
640///
641/// The task is fire-and-forget; errors during shutdown are logged at debug level.
642fn spawn_async_cleanup(
643    handle: &tokio::runtime::Handle,
644    postgres: PostgreSQL,
645    timeout: std::time::Duration,
646) {
647    drop(handle.spawn(async move {
648        match time::timeout(timeout, postgres.stop()).await {
649            Ok(Ok(())) => {
650                tracing::debug!(target: LOG_TARGET, "async cleanup completed successfully");
651            }
652            Ok(Err(err)) => {
653                tracing::debug!(
654                    target: LOG_TARGET,
655                    error = %err,
656                    "async cleanup failed during postgres stop"
657                );
658            }
659            Err(_) => {
660                tracing::debug!(
661                    target: LOG_TARGET,
662                    timeout_secs = timeout.as_secs(),
663                    "async cleanup timed out"
664                );
665            }
666        }
667    }));
668}
669
670/// Spawns a blocking task to stop a worker-managed cluster.
671///
672/// Used by the async drop path to invoke the worker stop operation without
673/// blocking the current async context.
674#[expect(
675    clippy::cognitive_complexity,
676    reason = "complexity is from spawn_blocking + error! macro expansion, not logic"
677)]
678async fn spawn_worker_stop_task(
679    bootstrap: TestBootstrapSettings,
680    env_vars: Vec<(String, Option<String>)>,
681    context: String,
682) {
683    let result =
684        tokio::task::spawn_blocking(move || worker_stop_sync(&bootstrap, &env_vars, &context))
685            .await;
686
687    if let Err(err) = result {
688        tracing::error!(
689            target: LOG_TARGET,
690            error = %err,
691            "worker stop task panicked during async drop"
692        );
693    }
694}
695
696/// Synchronous worker stop for async drop cleanup.
697///
698/// Builds a temporary runtime to invoke the worker stop operation.
699fn worker_stop_sync(
700    bootstrap: &TestBootstrapSettings,
701    env_vars: &[(String, Option<String>)],
702    context: &str,
703) {
704    let Ok(runtime) = build_runtime() else {
705        tracing::error!(
706            target: LOG_TARGET,
707            "failed to build runtime for worker stop during async drop"
708        );
709        return;
710    };
711
712    let invoker = ClusterWorkerInvoker::new(&runtime, bootstrap, env_vars);
713    if let Err(err) = invoker.invoke_as_root(worker_operation::WorkerOperation::Stop) {
714        TestCluster::warn_stop_failure(context, &err);
715    }
716}
717
718impl Drop for TestCluster {
719    fn drop(&mut self) {
720        let context = Self::stop_context(&self.bootstrap.settings);
721        let is_async = self.runtime.is_async();
722        info!(
723            target: LOG_TARGET,
724            context = %context,
725            worker_managed = self.is_managed_via_worker,
726            async_mode = is_async,
727            "stopping embedded postgres cluster"
728        );
729
730        if is_async {
731            // Async clusters should use stop_async() explicitly; attempt best-effort cleanup.
732            self.drop_async_cluster(&context);
733        } else {
734            self.drop_sync_cluster(&context);
735        }
736        // Environment guards drop after this block, restoring the process state.
737    }
738}
739
740impl TestCluster {
741    /// Synchronous drop path: stops the cluster using the owned runtime.
742    fn drop_sync_cluster(&mut self, context: &str) {
743        let ClusterRuntime::Sync(ref runtime) = self.runtime else {
744            // Should never happen: drop_sync_cluster is only called for sync mode.
745            return;
746        };
747
748        if self.is_managed_via_worker {
749            let invoker = ClusterWorkerInvoker::new(runtime, &self.bootstrap, &self.env_vars);
750            if let Err(err) = invoker.invoke_as_root(worker_operation::WorkerOperation::Stop) {
751                Self::warn_stop_failure(context, &err);
752            }
753            return;
754        }
755
756        let Some(postgres) = self.postgres.take() else {
757            return;
758        };
759
760        let timeout = self.bootstrap.shutdown_timeout;
761        let timeout_secs = timeout.as_secs();
762        let outcome = runtime.block_on(async { time::timeout(timeout, postgres.stop()).await });
763
764        match outcome {
765            Ok(Ok(())) => {}
766            Ok(Err(err)) => Self::warn_stop_failure(context, &err),
767            Err(_) => Self::warn_stop_timeout(timeout_secs, context),
768        }
769    }
770}
771
772#[cfg(all(test, feature = "cluster-unit-tests"))]
773mod drop_logging_tests {
774    use super::*;
775    use crate::test_support::capture_warn_logs;
776
777    #[test]
778    fn warn_stop_timeout_emits_warning() {
779        let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_timeout(5, "ctx"));
780        assert!(
781            logs.iter()
782                .any(|line| line.contains("stop() timed out after 5s (ctx)")),
783            "expected timeout warning, got {logs:?}"
784        );
785    }
786
787    #[test]
788    fn warn_stop_failure_emits_warning() {
789        let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_failure("ctx", &"boom"));
790        assert!(
791            logs.iter()
792                .any(|line| line.contains("failed to stop embedded postgres instance")),
793            "expected failure warning, got {logs:?}"
794        );
795    }
796}
797
798#[cfg(all(test, not(feature = "cluster-unit-tests")))]
799#[path = "../../tests/test_cluster.rs"]
800mod tests;