pg_embedded_setup_unpriv/cluster/
mod.rs

1//! RAII wrapper that boots an embedded `PostgreSQL` instance for tests.
2//!
3//! The cluster starts during [`TestCluster::new`] and shuts down automatically when the
4//! value drops out of scope.
5//!
6//! # Examples
7//! ```no_run
8//! use pg_embedded_setup_unpriv::TestCluster;
9//!
10//! # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
11//! let cluster = TestCluster::new()?;
12//! let url = cluster.settings().url("my_database");
13//! // Perform test database work here.
14//! drop(cluster); // `PostgreSQL` stops automatically.
15//! # Ok(())
16//! # }
17//! ```
18
19mod connection;
20mod worker_invoker;
21
22pub use self::connection::{ConnectionMetadata, TestClusterConnection};
23#[cfg(any(doc, test, feature = "cluster-unit-tests", feature = "dev-worker"))]
24pub use self::worker_invoker::WorkerInvoker;
25
26use self::worker_invoker::WorkerInvoker as ClusterWorkerInvoker;
27use crate::bootstrap_for_tests;
28use crate::env::ScopedEnv;
29use crate::error::{BootstrapError, BootstrapResult};
30use crate::{ExecutionPrivileges, TestBootstrapEnvironment, TestBootstrapSettings};
31use color_eyre::eyre::Context;
32use postgresql_embedded::{PostgreSQL, Settings};
33use std::fmt::Display;
34use std::time::Duration;
35use tokio::runtime::{Builder, Runtime};
36use tokio::time;
37
38/// Embedded `PostgreSQL` instance whose lifecycle follows Rust's drop semantics.
39#[derive(Debug)]
40pub struct TestCluster {
41    runtime: Runtime,
42    postgres: Option<PostgreSQL>,
43    bootstrap: TestBootstrapSettings,
44    is_managed_via_worker: bool,
45    env_vars: Vec<(String, Option<String>)>,
46    worker_guard: Option<ScopedEnv>,
47    _env_guard: ScopedEnv,
48}
49
50impl TestCluster {
51    /// Boots a `PostgreSQL` instance configured by [`bootstrap_for_tests`].
52    ///
53    /// The constructor blocks until the underlying server process is running and returns an
54    /// error when startup fails.
55    ///
56    /// # Errors
57    /// Returns an error if the bootstrap configuration cannot be prepared or if starting the
58    /// embedded cluster fails.
59    pub fn new() -> BootstrapResult<Self> {
60        let mut bootstrap = bootstrap_for_tests()?;
61        let runtime = Builder::new_current_thread()
62            .enable_all()
63            .build()
64            .context("failed to create Tokio runtime for TestCluster")
65            .map_err(BootstrapError::from)?;
66
67        let env_vars = bootstrap.environment.to_env();
68        let env_guard = ScopedEnv::apply(&env_vars);
69        let privileges = bootstrap.privileges;
70        let mut embedded = PostgreSQL::new(bootstrap.settings.clone());
71
72        let invoker = ClusterWorkerInvoker::new(&runtime, &bootstrap, &env_vars);
73
74        invoker.invoke(WorkerOperation::Setup, async { embedded.setup().await })?;
75        invoker.invoke(WorkerOperation::Start, async { embedded.start().await })?;
76
77        let is_managed_via_worker = matches!(privileges, ExecutionPrivileges::Root);
78        if !is_managed_via_worker {
79            // Capture runtime mutations such as dynamically assigned ports so connection
80            // metadata reflects the live cluster rather than the pre-bootstrap defaults.
81            bootstrap.settings = embedded.settings().clone();
82        }
83        let postgres = if is_managed_via_worker {
84            None
85        } else {
86            Some(embedded)
87        };
88
89        Ok(Self {
90            runtime,
91            postgres,
92            bootstrap,
93            is_managed_via_worker,
94            env_vars,
95            worker_guard: None,
96            _env_guard: env_guard,
97        })
98    }
99
100    /// Extends the cluster lifetime to cover additional scoped environment guards.
101    ///
102    /// Primarily used by fixtures that need to ensure `PG_EMBEDDED_WORKER` remains set for the
103    /// duration of the cluster lifetime.
104    #[doc(hidden)]
105    #[must_use]
106    pub fn with_worker_guard(mut self, worker_guard: Option<ScopedEnv>) -> Self {
107        self.worker_guard = worker_guard;
108        self
109    }
110
111    /// Returns the prepared `PostgreSQL` settings for the running cluster.
112    pub const fn settings(&self) -> &Settings {
113        &self.bootstrap.settings
114    }
115
116    /// Returns the environment required for clients to interact with the cluster.
117    pub const fn environment(&self) -> &TestBootstrapEnvironment {
118        &self.bootstrap.environment
119    }
120
121    /// Returns the bootstrap metadata captured when the cluster was started.
122    pub const fn bootstrap(&self) -> &TestBootstrapSettings {
123        &self.bootstrap
124    }
125
126    /// Returns helper methods for constructing connection artefacts.
127    ///
128    /// # Examples
129    /// ```no_run
130    /// use pg_embedded_setup_unpriv::TestCluster;
131    ///
132    /// # fn main() -> pg_embedded_setup_unpriv::BootstrapResult<()> {
133    /// let cluster = TestCluster::new()?;
134    /// let metadata = cluster.connection().metadata();
135    /// println!(
136    ///     "postgresql://{}:***@{}:{}/postgres",
137    ///     metadata.superuser(),
138    ///     metadata.host(),
139    ///     metadata.port(),
140    /// );
141    /// # Ok(())
142    /// # }
143    /// ```
144    #[must_use]
145    pub fn connection(&self) -> TestClusterConnection {
146        TestClusterConnection::new(&self.bootstrap)
147    }
148
149    fn stop_context(settings: &Settings) -> String {
150        let data_dir = settings.data_dir.display();
151        let version = settings.version.to_string();
152        format!("version {version}, data_dir {data_dir}")
153    }
154
155    fn warn_stop_failure(context: &str, err: &impl Display) {
156        tracing::warn!(
157            "SKIP-TEST-CLUSTER: failed to stop embedded postgres instance ({}): {}",
158            context,
159            err
160        );
161    }
162
163    fn warn_stop_timeout(timeout_secs: u64, context: &str) {
164        tracing::warn!(
165            "SKIP-TEST-CLUSTER: stop() timed out after {timeout_secs}s ({context}); proceeding with drop"
166        );
167    }
168}
169
170#[cfg(test)]
171mod tests {
172    use std::ffi::OsString;
173
174    use super::*;
175    use crate::ExecutionPrivileges;
176    use crate::test_support::{dummy_settings, scoped_env};
177
178    #[test]
179    fn with_worker_guard_restores_environment() {
180        const KEY: &str = "PG_EMBEDDED_WORKER_GUARD_TEST";
181        let baseline = std::env::var(KEY).ok();
182        let guard = scoped_env(vec![(OsString::from(KEY), Some(OsString::from("guarded")))]);
183        let cluster = dummy_cluster().with_worker_guard(Some(guard));
184        assert_eq!(
185            std::env::var(KEY).as_deref(),
186            Ok("guarded"),
187            "worker guard should remain active whilst the cluster runs",
188        );
189        drop(cluster);
190        match baseline {
191            Some(value) => assert_eq!(
192                std::env::var(KEY).as_deref(),
193                Ok(value.as_str()),
194                "worker guard should restore the previous value"
195            ),
196            None => assert!(
197                std::env::var(KEY).is_err(),
198                "worker guard should unset the variable once the cluster drops"
199            ),
200        }
201    }
202
203    fn dummy_cluster() -> TestCluster {
204        let runtime = tokio::runtime::Builder::new_current_thread()
205            .enable_all()
206            .build()
207            .expect("test runtime");
208        let bootstrap = dummy_settings(ExecutionPrivileges::Unprivileged);
209        let env_vars = bootstrap.environment.to_env();
210        let env_guard = ScopedEnv::apply(&env_vars);
211        TestCluster {
212            runtime,
213            postgres: None,
214            bootstrap,
215            is_managed_via_worker: false,
216            env_vars,
217            worker_guard: None,
218            _env_guard: env_guard,
219        }
220    }
221}
222
223#[doc(hidden)]
224/// Identifies worker lifecycle operations executed via the helper binary.
225#[derive(Clone, Copy)]
226pub enum WorkerOperation {
227    Setup,
228    Start,
229    Stop,
230}
231
232impl WorkerOperation {
233    #[must_use]
234    pub const fn as_str(self) -> &'static str {
235        match self {
236            Self::Setup => "setup",
237            Self::Start => "start",
238            Self::Stop => "stop",
239        }
240    }
241
242    #[must_use]
243    pub const fn error_context(self) -> &'static str {
244        match self {
245            Self::Setup => "postgresql_embedded::setup() failed",
246            Self::Start => "postgresql_embedded::start() failed",
247            Self::Stop => "postgresql_embedded::stop() failed",
248        }
249    }
250
251    #[must_use]
252    pub const fn timeout(self, bootstrap: &TestBootstrapSettings) -> Duration {
253        match self {
254            Self::Setup => bootstrap.setup_timeout,
255            Self::Start => bootstrap.start_timeout,
256            Self::Stop => bootstrap.shutdown_timeout,
257        }
258    }
259}
260
261impl Drop for TestCluster {
262    fn drop(&mut self) {
263        let context = Self::stop_context(&self.bootstrap.settings);
264
265        if self.is_managed_via_worker {
266            let invoker = ClusterWorkerInvoker::new(&self.runtime, &self.bootstrap, &self.env_vars);
267            if let Err(err) = invoker.invoke_as_root(WorkerOperation::Stop) {
268                Self::warn_stop_failure(&context, &err);
269            }
270        } else if let Some(postgres) = self.postgres.take() {
271            let timeout = self.bootstrap.shutdown_timeout;
272            let timeout_secs = timeout.as_secs();
273            let outcome = self
274                .runtime
275                .block_on(async { time::timeout(timeout, postgres.stop()).await });
276            match outcome {
277                Ok(Ok(())) => {}
278                Ok(Err(err)) => Self::warn_stop_failure(&context, &err),
279                Err(_) => Self::warn_stop_timeout(timeout_secs, &context),
280            }
281        }
282        // Environment guards drop after this block, restoring the process state.
283    }
284}
285
286#[cfg(all(test, feature = "cluster-unit-tests"))]
287mod drop_logging_tests {
288    use super::*;
289    use crate::test_support::capture_warn_logs;
290
291    #[test]
292    fn warn_stop_timeout_emits_warning() {
293        let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_timeout(5, "ctx"));
294        assert!(
295            logs.iter()
296                .any(|line| line.contains("stop() timed out after 5s (ctx)")),
297            "expected timeout warning, got {logs:?}"
298        );
299    }
300
301    #[test]
302    fn warn_stop_failure_emits_warning() {
303        let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_failure("ctx", &"boom"));
304        assert!(
305            logs.iter()
306                .any(|line| line.contains("failed to stop embedded postgres instance")),
307            "expected failure warning, got {logs:?}"
308        );
309    }
310}
311
312#[cfg(all(test, not(feature = "cluster-unit-tests")))]
313#[path = "../../tests/test_cluster.rs"]
314mod tests;