use crate::Config;
use async_trait::async_trait;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConcurrencyModel {
MultiProcess,
SingleProcess,
}
pub mod any;
pub mod dblock;
pub(crate) mod dialect;
#[cfg(feature = "postgres")]
pub mod postgres;
pub(crate) mod query;
#[cfg(feature = "s3")]
pub mod s3;
#[cfg(feature = "sqlite")]
pub mod sqlite;
pub(crate) mod tables;
#[cfg(feature = "turso")]
pub mod turso;
pub use crate::tables::*;
pub use crate::workers::*;
pub use any::AnyStore;
pub use dblock::{DbLock, DbOpFuture, DbTables, SerializedLock, Tables};
#[cfg(any(feature = "sqlite", feature = "turso", feature = "s3"))]
pub(crate) mod sqlite_utils {
use crate::error::{Error, Result};
use chrono::{DateTime, NaiveDateTime, Utc};
pub fn parse_timestamp(s: &str) -> Result<DateTime<Utc>> {
const TIMESTAMP_FORMATS: [&str; 2] = ["%Y-%m-%d %H:%M:%S%.f", "%Y-%m-%d %H:%M:%S"];
TIMESTAMP_FORMATS
.iter()
.find_map(|fmt| NaiveDateTime::parse_from_str(s, fmt).ok())
.map(|dt| dt.and_utc())
.ok_or_else(|| Error::Internal {
message: format!("Invalid timestamp: {s}"),
})
}
pub fn format_timestamp(dt: &DateTime<Utc>) -> String {
dt.format("%Y-%m-%d %H:%M:%S%.6f").to_string()
}
}
#[async_trait]
pub trait Store: Send + Sync + 'static {
async fn execute_raw(&self, sql: &str) -> crate::error::Result<()>;
async fn execute_raw_with_i64(&self, sql: &str, param: i64) -> crate::error::Result<()>;
async fn execute_raw_with_two_i64(
&self,
sql: &str,
param1: i64,
param2: i64,
) -> crate::error::Result<()>;
async fn query_int(&self, sql: &str) -> crate::error::Result<i64>;
async fn query_string(&self, sql: &str) -> crate::error::Result<String>;
async fn query_bool(&self, sql: &str) -> crate::error::Result<bool>;
fn config(&self) -> &Config;
fn queues(&self) -> &dyn QueueTable;
fn messages(&self) -> &dyn MessageTable;
fn workers(&self) -> &dyn WorkerTable;
fn db_state(&self) -> &dyn DbStateTable;
fn workflows(&self) -> &dyn WorkflowTable;
fn workflow_runs(&self) -> &dyn RunRecordTable;
fn workflow_steps(&self) -> &dyn StepRecordTable;
async fn bootstrap(&self) -> crate::error::Result<()>;
async fn admin(
&self,
name: &str,
config: &Config,
) -> crate::error::Result<crate::workers::Admin>;
async fn admin_ephemeral(&self, config: &Config)
-> crate::error::Result<crate::workers::Admin>;
async fn producer(
&self,
queue: &str,
name: &str,
config: &Config,
) -> crate::error::Result<Producer>;
async fn consumer(
&self,
queue: &str,
name: &str,
config: &Config,
) -> crate::error::Result<Consumer>;
async fn queue(&self, name: &str) -> crate::error::Result<crate::types::QueueRecord>;
async fn workflow(&self, name: &str) -> crate::error::Result<crate::types::WorkflowRecord>;
async fn run(&self, message: crate::types::QueueMessage) -> crate::error::Result<Run>;
async fn worker(&self, id: i64) -> crate::error::Result<Box<dyn Worker>>;
fn concurrency_model(&self) -> ConcurrencyModel;
fn backend_name(&self) -> &'static str;
async fn producer_ephemeral(
&self,
queue: &str,
config: &Config,
) -> crate::error::Result<Producer>;
async fn consumer_ephemeral(
&self,
queue: &str,
config: &Config,
) -> crate::error::Result<Consumer>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BackendType {
#[cfg(feature = "postgres")]
Postgres,
#[cfg(feature = "s3")]
S3,
#[cfg(feature = "sqlite")]
Sqlite,
#[cfg(feature = "turso")]
Turso,
}
impl BackendType {
const POSTGRES_PREFIXES: &'static [&'static str] =
&["postgres://", "postgresql://", "postgres", "pg"];
const SQLITE_PREFIXES: &'static [&'static str] = &["sqlite://", "sqlite:", "sqlite"];
#[cfg(feature = "s3")]
const S3_PREFIXES: &'static [&'static str] = &["s3://", "s3:", "s3"];
const TURSO_PREFIXES: &'static [&'static str] = &["turso://", "turso:", "turso"];
pub fn detect(dsn: &str) -> crate::error::Result<Self> {
if Self::POSTGRES_PREFIXES.iter().any(|p| dsn.starts_with(p)) {
#[cfg(feature = "postgres")]
return Ok(Self::Postgres);
#[cfg(not(feature = "postgres"))]
return Err(crate::error::Error::InvalidConfig {
field: "dsn".to_string(),
message: "Postgres backend is not enabled".to_string(),
});
}
#[cfg(feature = "s3")]
if Self::S3_PREFIXES.iter().any(|p| dsn.starts_with(p)) {
return Ok(Self::S3);
}
if Self::SQLITE_PREFIXES.iter().any(|p| dsn.starts_with(p)) {
#[cfg(feature = "sqlite")]
return Ok(Self::Sqlite);
#[cfg(not(feature = "sqlite"))]
return Err(crate::error::Error::InvalidConfig {
field: "dsn".to_string(),
message: "Sqlite backend is not enabled".to_string(),
});
}
if Self::TURSO_PREFIXES.iter().any(|p| dsn.starts_with(p)) {
#[cfg(feature = "turso")]
return Ok(Self::Turso);
#[cfg(not(feature = "turso"))]
return Err(crate::error::Error::InvalidConfig {
field: "dsn".to_string(),
message: "Turso backend is not enabled".to_string(),
});
}
Err(crate::error::Error::InvalidConfig {
field: "dsn".to_string(),
message: format!("Unsupported DSN format: {}", dsn),
})
}
}
#[cfg(test)]
mod tests {
use super::BackendType;
#[test]
fn detect_rejects_unsupported_dsn() {
let err = BackendType::detect("invalid://dsn").unwrap_err();
assert!(err.to_string().contains("Unsupported DSN format"));
}
#[cfg(feature = "s3")]
#[test]
fn detect_s3_dsn_returns_s3_backend() {
assert_eq!(
BackendType::detect("s3://bucket/queue.sqlite").unwrap(),
BackendType::S3
);
assert_eq!(BackendType::detect("s3:").unwrap(), BackendType::S3);
assert_eq!(BackendType::detect("s3").unwrap(), BackendType::S3);
}
}