temp_postgres/
temp_postgres.rs1use std::ffi::{OsStr, OsString};
2use std::path::{Path, PathBuf};
3use std::time::Duration;
4
5use crate::error::ErrorInner;
6use crate::util::{KillOnDrop, TempDir};
7use crate::Error;
8use std::process::Command;
9
10pub struct TempPostgres {
15 tempdir: TempDir,
16 server: KillOnDrop,
17 log_path: PathBuf,
18}
19
20impl std::fmt::Debug for TempPostgres {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 f.debug_struct("TempPostgres")
23 .field("tempdir", &self.tempdir.path())
24 .field("server_pid", &self.server.id())
25 .finish_non_exhaustive()
26 }
27}
28
29impl TempPostgres {
30 pub async fn new() -> Result<Self, Error> {
32 Self::from_builder(&TempPostgresBuilder::new()).await
33 }
34
35 pub fn builder() -> TempPostgresBuilder {
39 TempPostgresBuilder::new()
40 }
41
42 pub fn process_id(&self) -> u32 {
44 self.server.id()
45 }
46
47 pub fn directory(&self) -> &Path {
49 self.tempdir.path()
50 }
51
52 pub fn log_path(&self) -> &Path {
54 &self.log_path
55 }
56
57 pub async fn read_log(&self) -> Result<String, Error> {
59 use tokio::io::AsyncReadExt;
60
61 let mut file = tokio::fs::File::open(&self.log_path)
62 .await
63 .map_err(|e| ErrorInner::Open(self.log_path.clone(), e))?;
64
65 let mut buffer = String::new();
66 file.read_to_string(&mut buffer)
67 .await
68 .map_err(|e| ErrorInner::Read(self.log_path.clone(), e))?;
69 Ok(buffer)
70 }
71
72 pub async fn client(&self) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
74 let (client, connection) = tokio_postgres::Config::new()
75 .dbname("postgres")
76 .host_path(self.directory())
77 .connect_timeout(Duration::from_millis(10))
78 .connect(tokio_postgres::NoTls)
79 .await?;
80
81 let directory = self.directory().to_owned();
82 tokio::spawn(async move {
83 if let Err(e) = connection.await {
84 panic!("Error in connection with postgres on {}: {e}", directory.display());
85 }
86 });
87
88 Ok(client)
89 }
90
91 pub fn set_clean_on_drop(&mut self, clean_on_drop: bool) {
93 self.tempdir.set_clean_on_drop(clean_on_drop);
94 }
95
96 pub async fn kill_and_clean(mut self) -> Result<(), Error> {
104 self.server.kill()
105 .map_err(ErrorInner::KillServer)?;
106
107 let path = self.tempdir.path().to_owned();
108 self.tempdir.close()
109 .map_err(|e| ErrorInner::CleanDir(path, e))?;
110 Ok(())
111 }
112
113 pub async fn kill_no_clean(mut self) -> Result<(), Error> {
120 let _path = self.tempdir.into_path();
121 self.server.kill()
122 .map_err(ErrorInner::KillServer)?;
123 Ok(())
124 }
125
126 async fn from_builder(builder: &TempPostgresBuilder) -> Result<Self, Error> {
128 let tempdir = builder.make_temp_dir().map_err(ErrorInner::MakeTempDir)?;
129
130 let data_dir = tempdir.path().join("data");
131 let pid_file = data_dir.join("postmaster.pid");
132 let log_path = tempdir.path().join("output.log");
133
134 let stderr = std::fs::File::create(&log_path)
135 .map_err(|e| ErrorInner::Create(log_path.clone(), e))?;
136 let stdout = stderr.try_clone()
137 .map_err(|e| ErrorInner::Duplicate(log_path.clone(), e))?;
138
139 crate::util::run_command(
140 "pgctl initdb",
141 tokio::process::Command::new(builder.get_pg_ctl_command())
142 .env("TZ", builder.get_default_timezone())
143 .arg("initdb")
144 .arg("-D")
145 .arg(&data_dir)
146 .arg("-o")
147 .arg("--locale")
148 .arg("-o")
149 .arg(builder.get_default_locale())
150 .arg("-o")
151 .arg("--encoding=UTF8")
152 .arg("-o")
153 .arg("--no-sync")
154 .arg("-o")
155 .arg("--no-instructions")
156 .arg("-o")
157 .arg("--auth=trust")
158 ).await?;
159
160 let server = Command::new(builder.get_postgres_command())
161 .stderr(stderr)
162 .stdout(stdout)
163 .arg("-D")
164 .arg(&data_dir)
165 .arg("-k")
166 .arg(tempdir.path())
167 .arg("-c")
168 .arg("listen_addresses=")
169 .arg("-F")
170 .spawn()
171 .map_err(|e| ErrorInner::SpawnServer(builder.get_postgres_command_string(), e))?;
172 let server = KillOnDrop::new(server);
173
174 wait_ready(&pid_file, Duration::from_millis(500)).await?;
175
176 Ok(Self {
177 tempdir,
178 server,
179 log_path,
180 })
181 }
182}
183
184async fn wait_ready(pid_file: &Path, timeout: Duration) -> Result<(), Error> {
186 let deadline = std::time::Instant::now() + timeout;
187 loop {
188 if poll_ready(pid_file).await? {
189 return Ok(())
190 } else if std::time::Instant::now() > deadline {
191 return Err(ErrorInner::ServerReadyTimeout.into());
192 } else {
193 tokio::time::sleep(Duration::from_millis(10)).await;
194 }
195 }
196}
197
198async fn poll_ready(pid_file: &Path) -> Result<bool, Error> {
200 use tokio::io::AsyncReadExt;
201 let mut file = match tokio::fs::File::open(&pid_file).await {
202 Ok(x) => x,
203 Err(e) => {
204 if e.kind() == std::io::ErrorKind::NotFound {
205 return Ok(false);
206 } else {
207 return Err(ErrorInner::Open(pid_file.to_owned(), e).into());
208 }
209 }
210 };
211
212 let mut data = Vec::new();
213 file.read_to_end(&mut data)
214 .await
215 .map_err(|e| ErrorInner::Read(pid_file.to_owned(), e))?;
216
217 while data.last().is_some_and(|&b| b == b'\n' || b == b'\r' || b == b' ') {
219 data.pop();
220 };
221 Ok(data.ends_with(b"\nready"))
222}
223
224#[derive(Debug)]
228pub struct TempPostgresBuilder {
229 parent_directory: Option<PathBuf>,
233
234 default_locale: Option<OsString>,
236
237 default_timezone: Option<OsString>,
239
240 postgres_command: Option<OsString>,
242
243 pg_ctl_command: Option<OsString>,
245
246 clean_on_drop: bool,
248}
249
250impl TempPostgresBuilder {
251 pub fn new() -> Self {
253 Self {
254 parent_directory: None,
255 default_locale: None,
256 default_timezone: None,
257 postgres_command: None,
258 pg_ctl_command: None,
259 clean_on_drop: true,
260 }
261 }
262
263 pub async fn spawn(&self) -> Result<TempPostgres, Error> {
265 TempPostgres::from_builder(self).await
266 }
267
268 pub fn clean_on_drop(mut self, clean_on_drop: bool) -> Self {
272 self.clean_on_drop = clean_on_drop;
273 self
274 }
275
276 pub fn default_locale(mut self, locale: impl Into<OsString>) -> Self {
282 self.default_locale = Some(locale.into());
283 self
284 }
285
286 pub fn default_timezone(mut self, timezone: impl Into<OsString>) -> Self {
292 self.default_timezone = Some(timezone.into());
293 self
294 }
295
296 pub fn postgres_command(mut self, command: impl Into<OsString>) -> Self {
300 self.postgres_command = Some(command.into());
301 self
302 }
303
304 pub fn pg_ctl_command(mut self, command: impl Into<OsString>) -> Self {
308 self.pg_ctl_command = Some(command.into());
309 self
310 }
311
312 fn get_default_locale(&self) -> &OsStr {
314 self.default_locale
315 .as_deref()
316 .unwrap_or("C".as_ref())
317 }
318
319 fn get_default_timezone(&self) -> &OsStr {
321 self.default_locale
322 .as_deref()
323 .unwrap_or("UTC".as_ref())
324 }
325
326 fn get_postgres_command(&self) -> &OsStr {
328 self.postgres_command
329 .as_deref()
330 .unwrap_or("postgres".as_ref())
331 }
332
333 fn get_postgres_command_string(&self) -> String {
335 self.get_postgres_command().to_string_lossy().into()
336 }
337
338 fn get_pg_ctl_command(&self) -> &OsStr {
340 self.pg_ctl_command
341 .as_deref()
342 .unwrap_or("pg_ctl".as_ref())
343 }
344
345 fn make_temp_dir(&self) -> std::io::Result<TempDir> {
347 match &self.parent_directory {
348 Some(dir) => TempDir::new_in(dir, self.clean_on_drop),
349 None => TempDir::new(self.clean_on_drop),
350 }
351 }
352}
353
354impl Default for TempPostgresBuilder {
355 fn default() -> Self {
356 Self::new()
357 }
358}