use async_trait::async_trait;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use crate::core::{Job, JobState, JobStateKind, RecurringJob, ServerInfo};
pub mod config;
pub mod database_init;
pub mod error;
pub mod memory;
#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "redis")]
pub mod redis;
pub mod settings;
#[cfg(test)]
mod test_locking;
#[cfg(feature = "postgres")]
pub use config::PostgresConfig;
#[cfg(feature = "redis")]
pub use config::RedisConfig;
pub use config::{MemoryConfig, StorageConfig};
#[cfg(feature = "postgres")]
pub use database_init::{DatabaseInitError, DatabaseInitializer};
pub use error::StorageError;
pub use memory::MemoryStorage;
#[cfg(feature = "postgres")]
pub use postgres::PostgresStorage;
#[cfg(feature = "redis")]
pub use redis::RedisStorage;
#[async_trait]
pub trait Storage: MonitoringApi + Send + Sync {
async fn enqueue(&self, job: &Job) -> Result<(), StorageError>;
async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError>;
async fn fetch_due_scheduled_jobs(
&self,
now: DateTime<Utc>,
limit: usize,
) -> Result<Vec<Job>, StorageError>;
async fn fetch_due_retry_jobs(
&self,
now: DateTime<Utc>,
limit: usize,
) -> Result<Vec<Job>, StorageError>;
async fn requeue_stranded_jobs(
&self,
stale_before: DateTime<Utc>,
) -> Result<usize, StorageError>;
async fn fetch_and_lock_job(
&self,
worker_id: &str,
queues: Option<&[String]>,
) -> Result<Option<Job>, StorageError>;
async fn try_acquire_job_lock(
&self,
job_id: &str,
worker_id: &str,
timeout_seconds: u64,
) -> Result<bool, StorageError>;
async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError>;
async fn fetch_available_jobs_atomic(
&self,
worker_id: &str,
limit: Option<usize>,
queues: Option<&[String]>,
) -> Result<Vec<Job>, StorageError>;
async fn upsert_recurring_job(&self, job: &RecurringJob) -> Result<(), StorageError>;
async fn remove_recurring_job(&self, id: &str) -> Result<bool, StorageError>;
async fn list_recurring_jobs(&self) -> Result<Vec<RecurringJob>, StorageError>;
async fn fetch_due_recurring_jobs(
&self,
now: DateTime<Utc>,
limit: usize,
) -> Result<Vec<RecurringJob>, StorageError>;
async fn delete_expired_jobs(&self, now: DateTime<Utc>) -> Result<usize, StorageError>;
async fn register_server(&self, info: &ServerInfo) -> Result<(), StorageError>;
async fn heartbeat_server(
&self,
server_id: &str,
now: DateTime<Utc>,
) -> Result<bool, StorageError>;
async fn deregister_server(&self, server_id: &str) -> Result<bool, StorageError>;
async fn list_dead_servers(
&self,
stale_before: DateTime<Utc>,
) -> Result<Vec<ServerInfo>, StorageError>;
async fn reclaim_jobs_from_server(&self, server_id: &str) -> Result<usize, StorageError>;
async fn try_acquire_lock(
&self,
resource: &str,
owner: &str,
ttl: std::time::Duration,
) -> Result<bool, StorageError>;
async fn release_lock(&self, resource: &str, owner: &str) -> Result<bool, StorageError>;
}
pub enum StorageInstance {
Memory(MemoryStorage),
#[cfg(feature = "redis")]
Redis(RedisStorage),
#[cfg(feature = "postgres")]
Postgres(PostgresStorage),
}
impl StorageInstance {
pub async fn from_config(config: StorageConfig) -> Result<Self, StorageError> {
match config {
StorageConfig::Memory(memory_config) => Ok(StorageInstance::Memory(
MemoryStorage::with_config(memory_config),
)),
#[cfg(feature = "redis")]
StorageConfig::Redis(redis_config) => {
let redis_storage = RedisStorage::with_config(redis_config).await?;
Ok(StorageInstance::Redis(redis_storage))
}
#[cfg(feature = "postgres")]
StorageConfig::Postgres(postgres_config) => {
let postgres_storage = PostgresStorage::new(postgres_config).await?;
Ok(StorageInstance::Postgres(postgres_storage))
}
}
}
pub fn memory() -> Self {
StorageInstance::Memory(MemoryStorage::new())
}
pub fn memory_with_config(config: MemoryConfig) -> Self {
StorageInstance::Memory(MemoryStorage::with_config(config))
}
#[cfg(feature = "redis")]
pub async fn redis(config: RedisConfig) -> Result<Self, StorageError> {
let redis_storage = RedisStorage::with_config(config).await?;
Ok(StorageInstance::Redis(redis_storage))
}
#[cfg(feature = "postgres")]
pub async fn postgres(config: PostgresConfig) -> Result<Self, StorageError> {
let postgres_storage = PostgresStorage::new(config).await?;
Ok(StorageInstance::Postgres(postgres_storage))
}
}
#[async_trait]
pub trait MonitoringApi: Send + Sync {
async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError>;
async fn update(&self, job: &Job) -> Result<(), StorageError>;
async fn delete(&self, job_id: &str) -> Result<bool, StorageError>;
async fn list(
&self,
state_filter: Option<&JobState>,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Vec<Job>, StorageError>;
async fn get_job_counts(&self) -> Result<HashMap<JobStateKind, usize>, StorageError>;
}
#[async_trait]
impl MonitoringApi for StorageInstance {
async fn get(&self, job_id: &str) -> Result<Option<Job>, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.get(job_id).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.get(job_id).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.get(job_id).await,
}
}
async fn update(&self, job: &Job) -> Result<(), StorageError> {
match self {
StorageInstance::Memory(storage) => storage.update(job).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.update(job).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.update(job).await,
}
}
async fn delete(&self, job_id: &str) -> Result<bool, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.delete(job_id).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.delete(job_id).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.delete(job_id).await,
}
}
async fn list(
&self,
state_filter: Option<&JobState>,
limit: Option<usize>,
offset: Option<usize>,
) -> Result<Vec<Job>, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.list(state_filter, limit, offset).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.list(state_filter, limit, offset).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.list(state_filter, limit, offset).await,
}
}
async fn get_job_counts(&self) -> Result<HashMap<JobStateKind, usize>, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.get_job_counts().await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.get_job_counts().await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.get_job_counts().await,
}
}
}
#[async_trait]
impl Storage for StorageInstance {
async fn enqueue(&self, job: &Job) -> Result<(), StorageError> {
match self {
StorageInstance::Memory(storage) => storage.enqueue(job).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.enqueue(job).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.enqueue(job).await,
}
}
async fn get_available_jobs(&self, limit: Option<usize>) -> Result<Vec<Job>, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.get_available_jobs(limit).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.get_available_jobs(limit).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.get_available_jobs(limit).await,
}
}
async fn fetch_due_scheduled_jobs(
&self,
now: DateTime<Utc>,
limit: usize,
) -> Result<Vec<Job>, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.fetch_due_scheduled_jobs(now, limit).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.fetch_due_scheduled_jobs(now, limit).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => {
storage.fetch_due_scheduled_jobs(now, limit).await
}
}
}
async fn fetch_due_retry_jobs(
&self,
now: DateTime<Utc>,
limit: usize,
) -> Result<Vec<Job>, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.fetch_due_retry_jobs(now, limit).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.fetch_due_retry_jobs(now, limit).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.fetch_due_retry_jobs(now, limit).await,
}
}
async fn requeue_stranded_jobs(
&self,
stale_before: DateTime<Utc>,
) -> Result<usize, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.requeue_stranded_jobs(stale_before).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.requeue_stranded_jobs(stale_before).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.requeue_stranded_jobs(stale_before).await,
}
}
async fn fetch_and_lock_job(
&self,
worker_id: &str,
queues: Option<&[String]>,
) -> Result<Option<Job>, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.fetch_and_lock_job(worker_id, queues).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.fetch_and_lock_job(worker_id, queues).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => {
storage.fetch_and_lock_job(worker_id, queues).await
}
}
}
async fn try_acquire_job_lock(
&self,
job_id: &str,
worker_id: &str,
timeout_seconds: u64,
) -> Result<bool, StorageError> {
match self {
StorageInstance::Memory(storage) => {
storage
.try_acquire_job_lock(job_id, worker_id, timeout_seconds)
.await
}
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => {
storage
.try_acquire_job_lock(job_id, worker_id, timeout_seconds)
.await
}
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => {
storage
.try_acquire_job_lock(job_id, worker_id, timeout_seconds)
.await
}
}
}
async fn release_job_lock(&self, job_id: &str, worker_id: &str) -> Result<bool, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.release_job_lock(job_id, worker_id).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.release_job_lock(job_id, worker_id).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.release_job_lock(job_id, worker_id).await,
}
}
async fn fetch_available_jobs_atomic(
&self,
worker_id: &str,
limit: Option<usize>,
queues: Option<&[String]>,
) -> Result<Vec<Job>, StorageError> {
match self {
StorageInstance::Memory(storage) => {
storage
.fetch_available_jobs_atomic(worker_id, limit, queues)
.await
}
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => {
storage
.fetch_available_jobs_atomic(worker_id, limit, queues)
.await
}
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => {
storage
.fetch_available_jobs_atomic(worker_id, limit, queues)
.await
}
}
}
async fn upsert_recurring_job(&self, job: &RecurringJob) -> Result<(), StorageError> {
match self {
StorageInstance::Memory(storage) => storage.upsert_recurring_job(job).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.upsert_recurring_job(job).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.upsert_recurring_job(job).await,
}
}
async fn remove_recurring_job(&self, id: &str) -> Result<bool, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.remove_recurring_job(id).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.remove_recurring_job(id).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.remove_recurring_job(id).await,
}
}
async fn list_recurring_jobs(&self) -> Result<Vec<RecurringJob>, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.list_recurring_jobs().await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.list_recurring_jobs().await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.list_recurring_jobs().await,
}
}
async fn fetch_due_recurring_jobs(
&self,
now: DateTime<Utc>,
limit: usize,
) -> Result<Vec<RecurringJob>, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.fetch_due_recurring_jobs(now, limit).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.fetch_due_recurring_jobs(now, limit).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => {
storage.fetch_due_recurring_jobs(now, limit).await
}
}
}
async fn delete_expired_jobs(&self, now: DateTime<Utc>) -> Result<usize, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.delete_expired_jobs(now).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.delete_expired_jobs(now).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.delete_expired_jobs(now).await,
}
}
async fn register_server(&self, info: &ServerInfo) -> Result<(), StorageError> {
match self {
StorageInstance::Memory(storage) => storage.register_server(info).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.register_server(info).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.register_server(info).await,
}
}
async fn heartbeat_server(
&self,
server_id: &str,
now: DateTime<Utc>,
) -> Result<bool, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.heartbeat_server(server_id, now).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.heartbeat_server(server_id, now).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.heartbeat_server(server_id, now).await,
}
}
async fn deregister_server(&self, server_id: &str) -> Result<bool, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.deregister_server(server_id).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.deregister_server(server_id).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.deregister_server(server_id).await,
}
}
async fn list_dead_servers(
&self,
stale_before: DateTime<Utc>,
) -> Result<Vec<ServerInfo>, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.list_dead_servers(stale_before).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.list_dead_servers(stale_before).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.list_dead_servers(stale_before).await,
}
}
async fn reclaim_jobs_from_server(&self, server_id: &str) -> Result<usize, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.reclaim_jobs_from_server(server_id).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.reclaim_jobs_from_server(server_id).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.reclaim_jobs_from_server(server_id).await,
}
}
async fn try_acquire_lock(
&self,
resource: &str,
owner: &str,
ttl: std::time::Duration,
) -> Result<bool, StorageError> {
match self {
StorageInstance::Memory(storage) => {
storage.try_acquire_lock(resource, owner, ttl).await
}
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.try_acquire_lock(resource, owner, ttl).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => {
storage.try_acquire_lock(resource, owner, ttl).await
}
}
}
async fn release_lock(&self, resource: &str, owner: &str) -> Result<bool, StorageError> {
match self {
StorageInstance::Memory(storage) => storage.release_lock(resource, owner).await,
#[cfg(feature = "redis")]
StorageInstance::Redis(storage) => storage.release_lock(resource, owner).await,
#[cfg(feature = "postgres")]
StorageInstance::Postgres(storage) => storage.release_lock(resource, owner).await,
}
}
}