#[allow(unreachable_pub)]
pub(crate) mod database_config;
#[allow(unreachable_pub)]
pub(crate) mod db_timing;
#[allow(unreachable_pub)]
pub(crate) mod error;
mod event_buffer;
#[allow(unreachable_pub)]
pub(crate) mod paths;
#[cfg(feature = "postgres")]
#[allow(unreachable_pub)]
pub(crate) mod postgres;
mod retry;
#[allow(unreachable_pub)]
pub(crate) mod sqlite;
#[allow(unreachable_pub)]
pub(crate) mod types;
pub use paths::{PathsError, QueuePaths};
pub(crate) use retry::with_transient_retry;
pub use database_config::{DatabaseConfig, PostgresConfig, SqliteConfig};
pub use db_timing::DrainedSamples;
pub use error::{Result, StorageError};
#[cfg(feature = "postgres")]
pub use postgres::PostgresStorage;
pub use sqlite::SqliteStorage;
pub use types::{
CronScheduleRecord, EnqueueOutcome, EnqueueRequest, FinalizeOutcome, JobId, JobLatency,
JobRecord, JobStatus, MetricBucket, NewCronSchedule, NewJob, PROCESS_WIDE_QUEUE, PodRecord,
ProcessRecord, QueueConfigRow, QueueCounts, SlotAssignment, TimelineEvent, TimelineEventType,
metric,
};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
pub(crate) const ERROR_HISTORY_CAP: usize = 32;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct StorageInfo {
pub backend: String,
pub fields: Vec<(String, String)>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum DeleteOutcome {
Deleted,
CancelRequested,
NotFound,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum HeartbeatStatus {
Active,
CancelRequested,
Lost,
}
#[async_trait]
pub trait JobQueue: Send + Sync + std::fmt::Debug {
async fn enqueue(&self, req: EnqueueRequest) -> Result<EnqueueOutcome>;
async fn enqueue_bulk(&self, reqs: Vec<EnqueueRequest>) -> Result<Vec<EnqueueOutcome>>;
#[must_use = "claim_next has already transitioned the row to in_progress; dropping the result strands the claim until the reaper revives it"]
async fn claim_next(&self, queue: &str, process_id: &str) -> Result<Option<JobRecord>>;
async fn finalize(
&self,
job_id: &JobId,
owner: Option<&str>,
outcome: FinalizeOutcome,
) -> Result<()>;
async fn heartbeat_job(&self, job_id: &JobId, process_id: &str) -> Result<HeartbeatStatus>;
async fn revive_stale(&self, stale_before: DateTime<Utc>) -> Result<u64>;
async fn cleanup_aged(
&self,
queue: &str,
status: JobStatus,
threshold: DateTime<Utc>,
) -> Result<u64>;
#[must_use]
async fn get_job(&self, job_id: &JobId) -> Result<Option<JobRecord>>;
async fn list_by_status(
&self,
queue: Option<&str>,
status: JobStatus,
limit: usize,
) -> Result<Vec<JobRecord>>;
async fn count_by_status(&self, queue: &str) -> Result<QueueCounts>;
async fn oldest_ready_at(&self, queue: &str) -> Result<Option<DateTime<Utc>>>;
async fn distinct_kinds(&self, queue: Option<&str>) -> Result<Vec<String>>;
async fn list_for_timeline(
&self,
from: DateTime<Utc>,
to: DateTime<Utc>,
) -> Result<Vec<TimelineEvent>>;
async fn completed_latencies(
&self,
queue: Option<&str>,
from: DateTime<Utc>,
to: DateTime<Utc>,
limit: usize,
) -> Result<Vec<JobLatency>>;
async fn upsert_metric_buckets(&self, rows: &[MetricBucket]) -> Result<()>;
async fn metric_buckets(
&self,
queue: Option<&str>,
metrics: &[&str],
from: DateTime<Utc>,
to: DateTime<Utc>,
) -> Result<Vec<MetricBucket>>;
async fn delete_metric_buckets_before(&self, before: DateTime<Utc>) -> Result<u64>;
async fn delete(&self, job_id: &JobId) -> Result<DeleteOutcome>;
async fn requeue(&self, job_id: &JobId) -> Result<bool>;
async fn delete_batch_by_status(
&self,
queue: Option<&str>,
status: JobStatus,
batch_size: usize,
) -> Result<u64>;
async fn requeue_batch_by_status(
&self,
queue: Option<&str>,
status: JobStatus,
batch_size: usize,
) -> Result<u64>;
async fn cleanup_superseded_retries(&self) -> Result<u64>;
async fn list_scheduled_after(
&self,
queue: Option<&str>,
now: DateTime<Utc>,
limit: usize,
) -> Result<Vec<JobRecord>>;
async fn run_now(&self, job_id: &JobId) -> Result<bool>;
async fn wait_for_work(&self, queue: &str, timeout: Duration) -> Result<bool>;
async fn notify(&self, _queue: &str) -> Result<()> {
Ok(())
}
async fn describe(&self) -> Result<StorageInfo>;
fn drain_op_samples(&self) -> db_timing::DrainedSamples {
db_timing::DrainedSamples::default()
}
async fn flush_event_buffer(&self) -> Result<u64> {
Ok(0)
}
async fn db_health_snapshot(&self) -> Vec<(&'static str, f64)> {
Vec::new()
}
}
#[async_trait]
pub trait ProcessRegistry: Send + Sync + std::fmt::Debug {
async fn register(&self, process_id: &str, queue: &str, host: &str) -> Result<()>;
async fn heartbeat(&self, process_id: &str, current_job: Option<JobId>) -> Result<()>;
async fn deregister(&self, process_id: &str) -> Result<()>;
async fn reap_stale(&self, stale_before: DateTime<Utc>) -> Result<u64>;
async fn list(&self, queue: Option<&str>) -> Result<Vec<ProcessRecord>>;
async fn delete_for_host(&self, host: &str) -> Result<u64>;
async fn pod_heartbeat(
&self,
host: &str,
worker_name: Option<&str>,
queues: &[String],
) -> Result<()>;
async fn list_live_pods(&self, stale_before: DateTime<Utc>) -> Result<Vec<PodRecord>>;
async fn list_slot_assignments(&self) -> Result<Vec<SlotAssignment>>;
async fn set_slots(&self, queue: &str, host: &str, slots: i32) -> Result<()>;
async fn get_slots(&self, queue: &str, host: &str) -> Result<Option<i32>>;
}
#[async_trait]
pub trait QueueConfig: Send + Sync + std::fmt::Debug {
async fn ensure_queue(&self, name: &str, default_max_workers: i32) -> Result<()>;
#[must_use]
async fn get_queue(&self, name: &str) -> Result<Option<QueueConfigRow>>;
async fn list_queues(&self) -> Result<Vec<QueueConfigRow>>;
async fn set_max_workers(&self, name: &str, n: i32) -> Result<()>;
async fn set_paused(&self, name: &str, paused: bool) -> Result<()>;
async fn set_retention(&self, name: &str, done_days: i32, dead_days: i32) -> Result<()>;
async fn set_backoff(
&self,
name: &str,
enabled: bool,
base_seconds: i32,
max_seconds: i32,
) -> Result<()>;
}
#[async_trait]
pub trait CronStorage: Send + Sync + std::fmt::Debug {
async fn ensure_schedule(&self, schedule: NewCronSchedule) -> Result<()>;
async fn list_schedules(&self) -> Result<Vec<CronScheduleRecord>>;
async fn record_fire(
&self,
name: &str,
fired_at: DateTime<Utc>,
next_at: DateTime<Utc>,
) -> Result<()>;
async fn try_advance_fire(
&self,
name: &str,
expected: DateTime<Utc>,
fired_at: DateTime<Utc>,
next_at: DateTime<Utc>,
) -> Result<bool>;
async fn record_parse_error(&self, name: &str, message: &str) -> Result<()>;
async fn set_enabled(&self, name: &str, enabled: bool) -> Result<()>;
async fn set_expr(&self, name: &str, expr: &str) -> Result<()>;
async fn set_dedupe_key(&self, name: &str, dedupe_key: Option<String>) -> Result<()>;
async fn delete_schedule(&self, name: &str) -> Result<()>;
#[must_use]
async fn get_schedule(&self, name: &str) -> Result<Option<CronScheduleRecord>>;
async fn try_cron_lease(&self, holder: &str, ttl: std::time::Duration) -> Result<bool>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum RateLimitOutcome {
Granted,
Throttled,
}
#[async_trait]
pub trait RateLimitStorage: Send + Sync + std::fmt::Debug {
async fn acquire(&self, scope: &str) -> Result<RateLimitOutcome>;
async fn drain(&self, scope: &str) -> Result<()>;
async fn ensure_default(&self, scope: &str, capacity: i64, refill_per_sec: f64) -> Result<()>;
}
#[derive(Clone)]
#[non_exhaustive]
pub struct Storage {
pub jobs: Arc<dyn JobQueue>,
pub procs: Arc<dyn ProcessRegistry>,
pub config: Arc<dyn QueueConfig>,
pub cron: Arc<dyn CronStorage>,
pub rate_limit: Arc<dyn RateLimitStorage>,
}
impl std::fmt::Debug for Storage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Storage").finish_non_exhaustive()
}
}
impl Storage {
#[must_use]
pub fn from_one<T>(inner: Arc<T>) -> Self
where
T: JobQueue + ProcessRegistry + QueueConfig + CronStorage + RateLimitStorage + 'static,
{
Self {
jobs: inner.clone(),
procs: inner.clone(),
config: inner.clone(),
cron: inner.clone(),
rate_limit: inner,
}
}
}