mod mailboxes;
mod messages;
mod metadata;
mod schema;
#[cfg(test)]
mod tests;
use crate::traits::{MailboxStore, MessageStore, MetadataStore, StorageBackend};
use mailboxes::PostgresMailboxStore;
use messages::PostgresMessageStore;
use metadata::PostgresMetadataStore;
use sqlx::postgres::{PgPool, PgPoolOptions};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
#[derive(Debug, Clone)]
pub struct PostgresConfig {
pub max_connections: u32,
pub min_connections: u32,
pub connect_timeout: Duration,
pub idle_timeout: Option<Duration>,
pub max_lifetime: Option<Duration>,
pub inline_threshold: usize,
}
impl Default for PostgresConfig {
fn default() -> Self {
Self {
max_connections: 20,
min_connections: 5,
connect_timeout: Duration::from_secs(30),
idle_timeout: Some(Duration::from_secs(600)),
max_lifetime: Some(Duration::from_secs(1800)),
inline_threshold: 100 * 1024, }
}
}
pub struct PostgresBackend {
pool: PgPool,
config: PostgresConfig,
shutdown_tx: watch::Sender<bool>,
}
impl PostgresBackend {
pub async fn new(database_url: &str) -> anyhow::Result<Self> {
Self::with_config(database_url, PostgresConfig::default()).await
}
pub async fn with_config(database_url: &str, config: PostgresConfig) -> anyhow::Result<Self> {
Self::with_config_and_vacuum(
database_url,
config,
Duration::from_secs(86_400), )
.await
}
pub async fn with_config_and_vacuum(
database_url: &str,
config: PostgresConfig,
vacuum_interval: Duration,
) -> anyhow::Result<Self> {
use sqlx::Executor;
let mut pool_options = PgPoolOptions::new()
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.acquire_timeout(config.connect_timeout);
if let Some(idle_timeout) = config.idle_timeout {
pool_options = pool_options.idle_timeout(idle_timeout);
}
if let Some(max_lifetime) = config.max_lifetime {
pool_options = pool_options.max_lifetime(max_lifetime);
}
let pool = pool_options.connect(database_url).await?;
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let vacuum_pool = pool.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(vacuum_interval);
let mut rx = shutdown_rx;
loop {
tokio::select! {
_ = interval.tick() => {
if let Err(e) = vacuum_pool.execute("VACUUM (ANALYZE)").await {
tracing::warn!("Background VACUUM failed: {}", e);
} else {
tracing::debug!("Background VACUUM (ANALYZE) completed");
}
}
_ = rx.changed() => {
if *rx.borrow() {
tracing::debug!("PostgresBackend vacuum task shutting down");
break;
}
}
}
}
});
Ok(Self {
pool,
config,
shutdown_tx,
})
}
pub fn shutdown(&self) {
let _ = self.shutdown_tx.send(true);
}
pub async fn init_schema(&self) -> anyhow::Result<()> {
schema::init_schema(&self.pool).await
}
pub async fn vacuum(&self) -> anyhow::Result<()> {
use sqlx::Executor;
tracing::info!("Running VACUUM on PostgreSQL database");
self.pool.execute("VACUUM ANALYZE").await?;
Ok(())
}
pub async fn reindex(&self) -> anyhow::Result<()> {
use sqlx::Executor;
tracing::info!("Running REINDEX on PostgreSQL database");
self.pool.execute("REINDEX DATABASE CONCURRENTLY").await?;
Ok(())
}
pub fn pool_size(&self) -> u32 {
self.pool.size()
}
pub fn idle_connections(&self) -> usize {
self.pool.num_idle()
}
pub fn pool(&self) -> &PgPool {
&self.pool
}
}
impl StorageBackend for PostgresBackend {
fn mailbox_store(&self) -> Arc<dyn MailboxStore> {
Arc::new(PostgresMailboxStore {
pool: self.pool.clone(),
})
}
fn message_store(&self) -> Arc<dyn MessageStore> {
Arc::new(PostgresMessageStore {
pool: self.pool.clone(),
inline_threshold: self.config.inline_threshold,
})
}
fn metadata_store(&self) -> Arc<dyn MetadataStore> {
Arc::new(PostgresMetadataStore {
pool: self.pool.clone(),
})
}
}