pg_embedded_setup_unpriv/cluster/
mod.rs1mod connection;
20mod worker_invoker;
21
22pub use self::connection::{ConnectionMetadata, TestClusterConnection};
23#[cfg(any(doc, test, feature = "cluster-unit-tests", feature = "dev-worker"))]
24pub use self::worker_invoker::WorkerInvoker;
25
26use self::worker_invoker::WorkerInvoker as ClusterWorkerInvoker;
27use crate::bootstrap_for_tests;
28use crate::env::ScopedEnv;
29use crate::error::{BootstrapError, BootstrapResult};
30use crate::{ExecutionPrivileges, TestBootstrapEnvironment, TestBootstrapSettings};
31use color_eyre::eyre::Context;
32use postgresql_embedded::{PostgreSQL, Settings};
33use std::fmt::Display;
34use std::time::Duration;
35use tokio::runtime::{Builder, Runtime};
36use tokio::time;
37
38#[derive(Debug)]
40pub struct TestCluster {
41 runtime: Runtime,
42 postgres: Option<PostgreSQL>,
43 bootstrap: TestBootstrapSettings,
44 is_managed_via_worker: bool,
45 env_vars: Vec<(String, Option<String>)>,
46 worker_guard: Option<ScopedEnv>,
47 _env_guard: ScopedEnv,
48}
49
50impl TestCluster {
51 pub fn new() -> BootstrapResult<Self> {
60 let mut bootstrap = bootstrap_for_tests()?;
61 let runtime = Builder::new_current_thread()
62 .enable_all()
63 .build()
64 .context("failed to create Tokio runtime for TestCluster")
65 .map_err(BootstrapError::from)?;
66
67 let env_vars = bootstrap.environment.to_env();
68 let env_guard = ScopedEnv::apply(&env_vars);
69 let privileges = bootstrap.privileges;
70 let mut embedded = PostgreSQL::new(bootstrap.settings.clone());
71
72 let invoker = ClusterWorkerInvoker::new(&runtime, &bootstrap, &env_vars);
73
74 invoker.invoke(WorkerOperation::Setup, async { embedded.setup().await })?;
75 invoker.invoke(WorkerOperation::Start, async { embedded.start().await })?;
76
77 let is_managed_via_worker = matches!(privileges, ExecutionPrivileges::Root);
78 if !is_managed_via_worker {
79 bootstrap.settings = embedded.settings().clone();
82 }
83 let postgres = if is_managed_via_worker {
84 None
85 } else {
86 Some(embedded)
87 };
88
89 Ok(Self {
90 runtime,
91 postgres,
92 bootstrap,
93 is_managed_via_worker,
94 env_vars,
95 worker_guard: None,
96 _env_guard: env_guard,
97 })
98 }
99
100 #[doc(hidden)]
105 #[must_use]
106 pub fn with_worker_guard(mut self, worker_guard: Option<ScopedEnv>) -> Self {
107 self.worker_guard = worker_guard;
108 self
109 }
110
111 pub const fn settings(&self) -> &Settings {
113 &self.bootstrap.settings
114 }
115
116 pub const fn environment(&self) -> &TestBootstrapEnvironment {
118 &self.bootstrap.environment
119 }
120
121 pub const fn bootstrap(&self) -> &TestBootstrapSettings {
123 &self.bootstrap
124 }
125
126 #[must_use]
145 pub fn connection(&self) -> TestClusterConnection {
146 TestClusterConnection::new(&self.bootstrap)
147 }
148
149 fn stop_context(settings: &Settings) -> String {
150 let data_dir = settings.data_dir.display();
151 let version = settings.version.to_string();
152 format!("version {version}, data_dir {data_dir}")
153 }
154
155 fn warn_stop_failure(context: &str, err: &impl Display) {
156 tracing::warn!(
157 "SKIP-TEST-CLUSTER: failed to stop embedded postgres instance ({}): {}",
158 context,
159 err
160 );
161 }
162
163 fn warn_stop_timeout(timeout_secs: u64, context: &str) {
164 tracing::warn!(
165 "SKIP-TEST-CLUSTER: stop() timed out after {timeout_secs}s ({context}); proceeding with drop"
166 );
167 }
168}
169
170#[cfg(test)]
171mod tests {
172 use std::ffi::OsString;
173
174 use super::*;
175 use crate::ExecutionPrivileges;
176 use crate::test_support::{dummy_settings, scoped_env};
177
178 #[test]
179 fn with_worker_guard_restores_environment() {
180 const KEY: &str = "PG_EMBEDDED_WORKER_GUARD_TEST";
181 let baseline = std::env::var(KEY).ok();
182 let guard = scoped_env(vec![(OsString::from(KEY), Some(OsString::from("guarded")))]);
183 let cluster = dummy_cluster().with_worker_guard(Some(guard));
184 assert_eq!(
185 std::env::var(KEY).as_deref(),
186 Ok("guarded"),
187 "worker guard should remain active whilst the cluster runs",
188 );
189 drop(cluster);
190 match baseline {
191 Some(value) => assert_eq!(
192 std::env::var(KEY).as_deref(),
193 Ok(value.as_str()),
194 "worker guard should restore the previous value"
195 ),
196 None => assert!(
197 std::env::var(KEY).is_err(),
198 "worker guard should unset the variable once the cluster drops"
199 ),
200 }
201 }
202
203 fn dummy_cluster() -> TestCluster {
204 let runtime = tokio::runtime::Builder::new_current_thread()
205 .enable_all()
206 .build()
207 .expect("test runtime");
208 let bootstrap = dummy_settings(ExecutionPrivileges::Unprivileged);
209 let env_vars = bootstrap.environment.to_env();
210 let env_guard = ScopedEnv::apply(&env_vars);
211 TestCluster {
212 runtime,
213 postgres: None,
214 bootstrap,
215 is_managed_via_worker: false,
216 env_vars,
217 worker_guard: None,
218 _env_guard: env_guard,
219 }
220 }
221}
222
223#[doc(hidden)]
224#[derive(Clone, Copy)]
226pub enum WorkerOperation {
227 Setup,
228 Start,
229 Stop,
230}
231
232impl WorkerOperation {
233 #[must_use]
234 pub const fn as_str(self) -> &'static str {
235 match self {
236 Self::Setup => "setup",
237 Self::Start => "start",
238 Self::Stop => "stop",
239 }
240 }
241
242 #[must_use]
243 pub const fn error_context(self) -> &'static str {
244 match self {
245 Self::Setup => "postgresql_embedded::setup() failed",
246 Self::Start => "postgresql_embedded::start() failed",
247 Self::Stop => "postgresql_embedded::stop() failed",
248 }
249 }
250
251 #[must_use]
252 pub const fn timeout(self, bootstrap: &TestBootstrapSettings) -> Duration {
253 match self {
254 Self::Setup => bootstrap.setup_timeout,
255 Self::Start => bootstrap.start_timeout,
256 Self::Stop => bootstrap.shutdown_timeout,
257 }
258 }
259}
260
261impl Drop for TestCluster {
262 fn drop(&mut self) {
263 let context = Self::stop_context(&self.bootstrap.settings);
264
265 if self.is_managed_via_worker {
266 let invoker = ClusterWorkerInvoker::new(&self.runtime, &self.bootstrap, &self.env_vars);
267 if let Err(err) = invoker.invoke_as_root(WorkerOperation::Stop) {
268 Self::warn_stop_failure(&context, &err);
269 }
270 } else if let Some(postgres) = self.postgres.take() {
271 let timeout = self.bootstrap.shutdown_timeout;
272 let timeout_secs = timeout.as_secs();
273 let outcome = self
274 .runtime
275 .block_on(async { time::timeout(timeout, postgres.stop()).await });
276 match outcome {
277 Ok(Ok(())) => {}
278 Ok(Err(err)) => Self::warn_stop_failure(&context, &err),
279 Err(_) => Self::warn_stop_timeout(timeout_secs, &context),
280 }
281 }
282 }
284}
285
286#[cfg(all(test, feature = "cluster-unit-tests"))]
287mod drop_logging_tests {
288 use super::*;
289 use crate::test_support::capture_warn_logs;
290
291 #[test]
292 fn warn_stop_timeout_emits_warning() {
293 let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_timeout(5, "ctx"));
294 assert!(
295 logs.iter()
296 .any(|line| line.contains("stop() timed out after 5s (ctx)")),
297 "expected timeout warning, got {logs:?}"
298 );
299 }
300
301 #[test]
302 fn warn_stop_failure_emits_warning() {
303 let (logs, ()) = capture_warn_logs(|| TestCluster::warn_stop_failure("ctx", &"boom"));
304 assert!(
305 logs.iter()
306 .any(|line| line.contains("failed to stop embedded postgres instance")),
307 "expected failure warning, got {logs:?}"
308 );
309 }
310}
311
312#[cfg(all(test, not(feature = "cluster-unit-tests")))]
313#[path = "../../tests/test_cluster.rs"]
314mod tests;