mod cron;
mod jobs;
mod notify;
mod procs;
mod queue_config;
mod rate_limit;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use sqlx::ConnectOptions;
use sqlx::sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePool, SqlitePoolOptions};
use super::db_timing::DbRecorder;
use super::error::{Result, StorageError};
use super::event_buffer::EventBuffer;
pub(super) const WRITE_POOL_MAX: u32 = 1;
pub(super) const READ_POOL_MAX: u32 = 8;
#[derive(Clone)]
pub struct SqliteStorage {
write_pool: SqlitePool,
read_pool: SqlitePool,
notify: Arc<notify::NotifyHub>,
ulid_gen: Arc<tokio::sync::Mutex<ulid::Generator>>,
pub(super) db_recorder: Arc<DbRecorder>,
pub(super) events: Arc<EventBuffer>,
pub(super) db_path: Option<PathBuf>,
}
impl std::fmt::Debug for SqliteStorage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SqliteStorage").finish_non_exhaustive()
}
}
impl SqliteStorage {
pub async fn open_default(paths: &dyn super::QueuePaths) -> Result<Self> {
let dir = paths.data_dir()?;
std::fs::create_dir_all(&dir)?;
let path = dir.join("queue.sqlite");
Self::open_file(&path).await
}
pub async fn open_file(path: &Path) -> Result<Self> {
let opts = SqliteConnectOptions::new()
.filename(path)
.create_if_missing(true)
.journal_mode(SqliteJournalMode::Wal)
.busy_timeout(Duration::from_secs(30))
.foreign_keys(false)
.log_statements(tracing::log::LevelFilter::Debug)
.log_slow_statements(tracing::log::LevelFilter::Warn, Duration::from_millis(500));
Self::finish(opts, Some(path.to_path_buf())).await
}
pub async fn open_in_memory() -> Result<Self> {
let opts = SqliteConnectOptions::new()
.in_memory(true)
.journal_mode(SqliteJournalMode::Memory)
.foreign_keys(false);
let pool = SqlitePoolOptions::new()
.max_connections(1)
.min_connections(1)
.acquire_timeout(Duration::from_secs(5))
.connect_with(opts)
.await
.map_err(map_sqlx_err)?;
sqlx::migrate!("src/storage/sqlite/migrations")
.run(&pool)
.await
.map_err(|e| StorageError::Migration {
version: 0,
message: e.to_string(),
})?;
Ok(Self {
write_pool: pool.clone(),
read_pool: pool,
notify: Arc::new(notify::NotifyHub::default()),
ulid_gen: Arc::new(tokio::sync::Mutex::new(ulid::Generator::new())),
db_recorder: Arc::new(DbRecorder::default()),
events: Arc::new(EventBuffer::default()),
db_path: None,
})
}
async fn finish(opts: SqliteConnectOptions, db_path: Option<PathBuf>) -> Result<Self> {
let write_pool = SqlitePoolOptions::new()
.max_connections(WRITE_POOL_MAX)
.min_connections(1)
.acquire_timeout(Duration::from_secs(30))
.connect_with(opts.clone())
.await
.map_err(map_sqlx_err)?;
sqlx::migrate!("src/storage/sqlite/migrations")
.run(&write_pool)
.await
.map_err(|e| StorageError::Migration {
version: 0,
message: e.to_string(),
})?;
let read_pool = SqlitePoolOptions::new()
.max_connections(READ_POOL_MAX)
.min_connections(1)
.acquire_timeout(Duration::from_secs(10))
.connect_with(opts)
.await
.map_err(map_sqlx_err)?;
Ok(Self {
write_pool,
read_pool,
notify: Arc::new(notify::NotifyHub::default()),
ulid_gen: Arc::new(tokio::sync::Mutex::new(ulid::Generator::new())),
db_recorder: Arc::new(DbRecorder::default()),
events: Arc::new(EventBuffer::default()),
db_path,
})
}
pub(crate) async fn next_ulid(&self) -> ulid::Ulid {
let mut generator = self.ulid_gen.lock().await;
generator.generate().unwrap_or_else(|_| ulid::Ulid::new())
}
#[must_use]
pub const fn write_pool(&self) -> &SqlitePool {
&self.write_pool
}
#[must_use]
pub const fn read_pool(&self) -> &SqlitePool {
&self.read_pool
}
}
fn map_sqlx_err(e: sqlx::Error) -> StorageError {
use sqlx::Error as E;
match e {
E::RowNotFound => StorageError::NotFound("row not found".into()),
E::Database(db) => {
let code = db.code().unwrap_or_default();
if code == "5" || code == "6" || code == "517" {
StorageError::Conflict(db.message().to_owned())
} else if code == "1555" || code == "2067" {
StorageError::Conflict(db.message().to_owned())
} else {
StorageError::Backend(format!("sqlite [{code}]: {db}"))
}
}
other => StorageError::Backend(other.to_string()),
}
}