use std::ffi::{OsStr, OsString};
use std::path::{Path, PathBuf};
use std::time::Duration;
use crate::error::ErrorInner;
use crate::util::{KillOnDrop, TempDir};
use crate::Error;
use std::process::Command;
pub struct TempPostgres {
tempdir: TempDir,
server: KillOnDrop,
log_path: PathBuf,
}
impl std::fmt::Debug for TempPostgres {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TempPostgres")
.field("tempdir", &self.tempdir.path())
.field("server_pid", &self.server.id())
.finish_non_exhaustive()
}
}
impl TempPostgres {
pub async fn new() -> Result<Self, Error> {
Self::from_builder(&TempPostgresBuilder::new()).await
}
pub fn builder() -> TempPostgresBuilder {
TempPostgresBuilder::new()
}
pub fn process_id(&self) -> u32 {
self.server.id()
}
pub fn directory(&self) -> &Path {
self.tempdir.path()
}
pub fn log_path(&self) -> &Path {
&self.log_path
}
pub async fn read_log(&self) -> Result<String, Error> {
use tokio::io::AsyncReadExt;
let mut file = tokio::fs::File::open(&self.log_path)
.await
.map_err(|e| ErrorInner::Open(self.log_path.clone(), e))?;
let mut buffer = String::new();
file.read_to_string(&mut buffer)
.await
.map_err(|e| ErrorInner::Read(self.log_path.clone(), e))?;
Ok(buffer)
}
pub async fn client(&self) -> Result<tokio_postgres::Client, tokio_postgres::Error> {
let (client, connection) = tokio_postgres::Config::new()
.dbname("postgres")
.host_path(self.directory())
.connect_timeout(Duration::from_millis(10))
.connect(tokio_postgres::NoTls)
.await?;
let directory = self.directory().to_owned();
tokio::spawn(async move {
if let Err(e) = connection.await {
panic!("Error in connection with postgres on {}: {e}", directory.display());
}
});
Ok(client)
}
pub fn set_clean_on_drop(&mut self, clean_on_drop: bool) {
self.tempdir.set_clean_on_drop(clean_on_drop);
}
pub async fn kill_and_clean(mut self) -> Result<(), Error> {
self.server.kill()
.map_err(ErrorInner::KillServer)?;
let path = self.tempdir.path().to_owned();
self.tempdir.close()
.map_err(|e| ErrorInner::CleanDir(path, e))?;
Ok(())
}
pub async fn kill_no_clean(mut self) -> Result<(), Error> {
let _path = self.tempdir.into_path();
self.server.kill()
.map_err(ErrorInner::KillServer)?;
Ok(())
}
async fn from_builder(builder: &TempPostgresBuilder) -> Result<Self, Error> {
let tempdir = builder.make_temp_dir().map_err(ErrorInner::MakeTempDir)?;
let data_dir = tempdir.path().join("data");
let pid_file = data_dir.join("postmaster.pid");
let log_path = tempdir.path().join("output.log");
let stderr = std::fs::File::create(&log_path)
.map_err(|e| ErrorInner::Create(log_path.clone(), e))?;
let stdout = stderr.try_clone()
.map_err(|e| ErrorInner::Duplicate(log_path.clone(), e))?;
crate::util::run_command(
"pgctl initdb",
tokio::process::Command::new(builder.get_pg_ctl_command())
.env("TZ", builder.get_default_timezone())
.arg("initdb")
.arg("-D")
.arg(&data_dir)
.arg("-o")
.arg("--locale")
.arg("-o")
.arg(builder.get_default_locale())
.arg("-o")
.arg("--encoding=UTF8")
.arg("-o")
.arg("--no-sync")
.arg("-o")
.arg("--no-instructions")
.arg("-o")
.arg("--auth=trust")
).await?;
let server = Command::new(builder.get_postgres_command())
.stderr(stderr)
.stdout(stdout)
.arg("-D")
.arg(&data_dir)
.arg("-k")
.arg(tempdir.path())
.arg("-c")
.arg("listen_addresses=")
.arg("-F")
.spawn()
.map_err(|e| ErrorInner::SpawnServer(builder.get_postgres_command_string(), e))?;
let server = KillOnDrop::new(server);
wait_ready(&pid_file, Duration::from_millis(500)).await?;
Ok(Self {
tempdir,
server,
log_path,
})
}
}
async fn wait_ready(pid_file: &Path, timeout: Duration) -> Result<(), Error> {
let deadline = std::time::Instant::now() + timeout;
loop {
if poll_ready(pid_file).await? {
return Ok(())
} else if std::time::Instant::now() > deadline {
return Err(ErrorInner::ServerReadyTimeout.into());
} else {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
}
async fn poll_ready(pid_file: &Path) -> Result<bool, Error> {
use tokio::io::AsyncReadExt;
let mut file = match tokio::fs::File::open(&pid_file).await {
Ok(x) => x,
Err(e) => {
if e.kind() == std::io::ErrorKind::NotFound {
return Ok(false);
} else {
return Err(ErrorInner::Open(pid_file.to_owned(), e).into());
}
}
};
let mut data = Vec::new();
file.read_to_end(&mut data)
.await
.map_err(|e| ErrorInner::Read(pid_file.to_owned(), e))?;
while data.last().is_some_and(|&b| b == b'\n' || b == b'\r' || b == b' ') {
data.pop();
};
Ok(data.ends_with(b"\nready"))
}
#[derive(Debug)]
pub struct TempPostgresBuilder {
parent_directory: Option<PathBuf>,
default_locale: Option<OsString>,
default_timezone: Option<OsString>,
postgres_command: Option<OsString>,
pg_ctl_command: Option<OsString>,
clean_on_drop: bool,
}
impl TempPostgresBuilder {
pub fn new() -> Self {
Self {
parent_directory: None,
default_locale: None,
default_timezone: None,
postgres_command: None,
pg_ctl_command: None,
clean_on_drop: true,
}
}
pub async fn spawn(&self) -> Result<TempPostgres, Error> {
TempPostgres::from_builder(self).await
}
pub fn clean_on_drop(mut self, clean_on_drop: bool) -> Self {
self.clean_on_drop = clean_on_drop;
self
}
pub fn default_locale(mut self, locale: impl Into<OsString>) -> Self {
self.default_locale = Some(locale.into());
self
}
pub fn default_timezone(mut self, timezone: impl Into<OsString>) -> Self {
self.default_timezone = Some(timezone.into());
self
}
pub fn postgres_command(mut self, command: impl Into<OsString>) -> Self {
self.postgres_command = Some(command.into());
self
}
pub fn pg_ctl_command(mut self, command: impl Into<OsString>) -> Self {
self.pg_ctl_command = Some(command.into());
self
}
fn get_default_locale(&self) -> &OsStr {
self.default_locale
.as_deref()
.unwrap_or("C".as_ref())
}
fn get_default_timezone(&self) -> &OsStr {
self.default_locale
.as_deref()
.unwrap_or("UTC".as_ref())
}
fn get_postgres_command(&self) -> &OsStr {
self.postgres_command
.as_deref()
.unwrap_or("postgres".as_ref())
}
fn get_postgres_command_string(&self) -> String {
self.get_postgres_command().to_string_lossy().into()
}
fn get_pg_ctl_command(&self) -> &OsStr {
self.pg_ctl_command
.as_deref()
.unwrap_or("pg_ctl".as_ref())
}
fn make_temp_dir(&self) -> std::io::Result<TempDir> {
match &self.parent_directory {
Some(dir) => TempDir::new_in(dir, self.clean_on_drop),
None => TempDir::new(self.clean_on_drop),
}
}
}
impl Default for TempPostgresBuilder {
fn default() -> Self {
Self::new()
}
}