use std::{
collections::HashMap, future::Future, panic::AssertUnwindSafe, pin::Pin, sync::Arc,
time::Duration,
};
use super::{BackgroundWorker, JobStatus, Queue};
use crate::{config::SqliteQueueConfig, Error, Result};
use chrono::{DateTime, Utc};
use futures_util::FutureExt;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
pub use sqlx::SqlitePool;
use sqlx::{
sqlite::{SqliteConnectOptions, SqlitePoolOptions, SqliteRow},
ConnectOptions, QueryBuilder, Row,
};
use std::fmt::Write;
use tokio::{task::JoinHandle, time::sleep};
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, trace};
use ulid::Ulid;
type JobId = String;
type JobData = JsonValue;
type JobHandler = Box<
dyn Fn(
JobId,
JobData,
) -> Pin<Box<dyn std::future::Future<Output = Result<(), crate::Error>> + Send>>
+ Send
+ Sync,
>;
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Job {
pub id: JobId,
pub name: String,
#[serde(rename = "task_data")]
pub data: JobData,
pub status: JobStatus,
pub run_at: DateTime<Utc>,
pub interval: Option<i64>,
pub created_at: Option<DateTime<Utc>>,
pub updated_at: Option<DateTime<Utc>>,
pub tags: Option<Vec<String>>,
}
pub struct JobRegistry {
handlers: Arc<HashMap<String, JobHandler>>,
}
impl JobRegistry {
#[must_use]
pub fn new() -> Self {
Self {
handlers: Arc::new(HashMap::new()),
}
}
pub fn register_worker<Args, W>(&mut self, name: String, worker: W) -> Result<()>
where
Args: Send + Serialize + Sync + 'static,
W: BackgroundWorker<Args> + 'static,
for<'de> Args: Deserialize<'de>,
{
let worker = Arc::new(worker);
let wrapped_handler = move |_job_id: String, job_data: JobData| {
let w = worker.clone();
Box::pin(async move {
let args = serde_json::from_value::<Args>(job_data);
match args {
Ok(args) => {
match AssertUnwindSafe(w.perform(args)).catch_unwind().await {
Ok(result) => result,
Err(panic) => {
let panic_msg = panic
.downcast_ref::<String>()
.map(String::as_str)
.or_else(|| panic.downcast_ref::<&str>().copied())
.unwrap_or("Unknown panic occurred");
error!(error = panic_msg, "Worker panicked during execution");
Err(Error::string(panic_msg))
}
}
}
Err(err) => Err(err.into()),
}
}) as Pin<Box<dyn Future<Output = Result<(), crate::Error>> + Send>>
};
Arc::get_mut(&mut self.handlers)
.ok_or_else(|| Error::string("cannot register worker"))?
.insert(name, Box::new(wrapped_handler));
Ok(())
}
#[must_use]
pub fn handlers(&self) -> &Arc<HashMap<String, JobHandler>> {
&self.handlers
}
#[must_use]
pub fn run(
&self,
pool: &SqlitePool,
opts: &RunOpts,
token: &CancellationToken,
tags: &[String],
) -> Vec<JoinHandle<()>> {
let mut jobs = Vec::new();
let interval = opts.poll_interval_sec;
for idx in 0..opts.num_workers {
let handlers = self.handlers.clone();
let worker_token = token.clone();
let worker_tags = tags.to_vec();
let pool = pool.clone();
let job = tokio::spawn(async move {
loop {
if worker_token.is_cancelled() {
trace!(worker_id = idx, "Cancellation received, stopping worker");
break;
}
trace!(
pool_size = pool.num_idle(),
worker_id = idx,
"Connection pool stats"
);
let job_opt = match dequeue(&pool, &worker_tags).await {
Ok(t) => t,
Err(err) => {
error!(error = %err, "Failed to fetch job from queue");
None
}
};
if let Some(job) = job_opt {
debug!(job_id = %job.id, job_name = %job.name, "Processing job");
if let Some(handler) = handlers.get(&job.name) {
match handler(job.id.clone(), job.data.clone()).await {
Ok(()) => {
if let Err(err) =
complete_job(&pool, &job.id, job.interval).await
{
error!(
error = %err,
job_id = %job.id,
job_name = %job.name,
"Failed to mark job as completed"
);
} else {
debug!(job_id = %job.id, "Job completed successfully");
}
}
Err(err) => {
if let Err(fail_err) = fail_job(&pool, &job.id, &err).await {
error!(
error = %fail_err,
job_id = %job.id,
job_name = %job.name,
"Failed to mark job as failed"
);
} else {
debug!(job_id = %job.id, error = %err, "Job execution failed");
}
}
}
} else {
error!(job_name = %job.name, "No handler registered for job");
}
} else {
tokio::select! {
biased;
() = worker_token.cancelled() => {
trace!(worker_id = idx, "Cancellation received during sleep, stopping worker");
break;
}
() = sleep(Duration::from_secs(interval.into())) => {
}
}
}
}
});
jobs.push(job);
}
jobs
}
}
impl Default for JobRegistry {
fn default() -> Self {
Self::new()
}
}
async fn connect(cfg: &SqliteQueueConfig) -> Result<SqlitePool> {
let mut conn_opts: SqliteConnectOptions = cfg.uri.parse()?;
if !cfg.enable_logging {
conn_opts = conn_opts.disable_statement_logging();
}
let pool = SqlitePoolOptions::new()
.min_connections(cfg.min_connections)
.max_connections(cfg.max_connections)
.idle_timeout(Duration::from_millis(cfg.idle_timeout))
.acquire_timeout(Duration::from_millis(cfg.connect_timeout))
.connect_with(conn_opts)
.await?;
Ok(pool)
}
pub async fn initialize_database(pool: &SqlitePool) -> Result<()> {
debug!("Initializing job database tables");
sqlx::query(
&format!(r"
CREATE TABLE IF NOT EXISTS sqlt_loco_queue (
id TEXT NOT NULL,
name TEXT NOT NULL,
task_data JSON NOT NULL,
status TEXT NOT NULL DEFAULT '{}',
run_at TIMESTAMP NOT NULL,
interval INTEGER,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
tags JSON
);
CREATE TABLE IF NOT EXISTS sqlt_loco_queue_lock (
id INTEGER PRIMARY KEY CHECK (id = 1),
is_locked BOOLEAN NOT NULL DEFAULT FALSE,
locked_at TIMESTAMP NULL
);
INSERT OR IGNORE INTO sqlt_loco_queue_lock (id, is_locked) VALUES (1, FALSE);
CREATE INDEX IF NOT EXISTS idx_sqlt_queue_status_run_at ON sqlt_loco_queue(status, run_at);
", JobStatus::Queued),
)
.execute(pool)
.await?;
Ok(())
}
pub async fn enqueue(
pool: &SqlitePool,
name: &str,
data: JobData,
run_at: DateTime<Utc>,
interval: Option<Duration>,
tags: Option<Vec<String>>,
) -> Result<JobId> {
let data = serde_json::to_value(data)?;
let tags_json = match &tags {
Some(tags) => Some(serde_json::to_value(tags)?),
None => None,
};
#[allow(clippy::cast_possible_truncation)]
let interval_ms: Option<i64> = interval.map(|i| i.as_millis() as i64);
let id = Ulid::new().to_string();
debug!(job_id = %id, job_name = %name, run_at = %run_at, tags = ?tags, "Enqueueing job");
sqlx::query(
"INSERT INTO sqlt_loco_queue (id, task_data, name, run_at, interval, tags) VALUES ($1, $2, $3, \
DATETIME($4), $5, $6)",
)
.bind(id.clone())
.bind(data)
.bind(name)
.bind(run_at)
.bind(interval_ms)
.bind(tags_json)
.execute(pool)
.await?;
Ok(id)
}
async fn dequeue(client: &SqlitePool, worker_tags: &[String]) -> Result<Option<Job>> {
let mut tx = client.begin().await?;
let acquired_write_lock = sqlx::query(
"UPDATE sqlt_loco_queue_lock SET
is_locked = TRUE,
locked_at = CURRENT_TIMESTAMP
WHERE id = 1 AND is_locked = FALSE",
)
.execute(&mut *tx)
.await?;
if acquired_write_lock.rows_affected() == 0 {
trace!("Unable to acquire queue lock, skipping job fetch");
tx.rollback().await?;
return Ok(None);
}
let mut query = String::from(
"SELECT id, name, task_data, status, run_at, interval, tags
FROM sqlt_loco_queue
WHERE
status = ? AND
run_at <= CURRENT_TIMESTAMP",
);
if worker_tags.is_empty() {
query.push_str(" AND (tags IS NULL)");
} else {
query.push_str(" AND (tags IS NOT NULL)");
let mut conditions = Vec::new();
for _ in worker_tags {
conditions.push("json_extract(tags, '$') LIKE ?".to_string());
}
if !conditions.is_empty() {
query.push_str(" AND (");
query.push_str(&conditions.join(" OR "));
query.push(')');
}
}
query.push_str(" ORDER BY run_at LIMIT 1");
let mut db_query = sqlx::query(&query).bind(JobStatus::Queued.to_string());
for tag in worker_tags {
db_query = db_query.bind(format!("%\"{tag}\"%"));
}
let row = db_query
.map(|row: SqliteRow| to_job(&row).ok())
.fetch_optional(&mut *tx)
.await?
.flatten();
if let Some(job) = row {
trace!(job_id = %job.id, job_name = %job.name, job_tags = ?job.tags, "Dequeueing job for processing");
sqlx::query(
"UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2",
)
.bind(JobStatus::Processing.to_string())
.bind(&job.id)
.execute(&mut *tx)
.await?;
sqlx::query(
"UPDATE sqlt_loco_queue_lock
SET is_locked = FALSE,
locked_at = NULL
WHERE id = 1",
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(Some(job))
} else {
trace!("No jobs available for processing");
sqlx::query(
"UPDATE sqlt_loco_queue_lock
SET is_locked = FALSE,
locked_at = NULL
WHERE id = 1",
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(None)
}
}
async fn complete_job(pool: &SqlitePool, id: &JobId, interval_ms: Option<i64>) -> Result<()> {
if let Some(interval_ms) = interval_ms {
let next_run_at = Utc::now() + chrono::Duration::milliseconds(interval_ms);
trace!(
job_id = %id,
status = "queued",
next_run_at = %next_run_at,
"Rescheduling recurring job"
);
sqlx::query(
"UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP, run_at = \
DATETIME($2) WHERE id = $3",
)
.bind(JobStatus::Queued.to_string())
.bind(next_run_at)
.bind(id)
.execute(pool)
.await?;
} else {
trace!(job_id = %id, status = "completed", "Marking job as completed");
sqlx::query(
"UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE id = $2",
)
.bind(JobStatus::Completed.to_string())
.bind(id)
.execute(pool)
.await?;
}
Ok(())
}
async fn fail_job(pool: &SqlitePool, id: &JobId, error: &crate::Error) -> Result<()> {
let msg = error.to_string();
debug!(job_id = %id, error = %msg, "Marking job as failed");
let error_json = serde_json::json!({ "error": msg });
sqlx::query(
"UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP, task_data = \
json_patch(task_data, $2) WHERE id = $3",
)
.bind(JobStatus::Failed.to_string())
.bind(error_json)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
pub async fn cancel_jobs_by_name(pool: &SqlitePool, name: &str) -> Result<()> {
debug!(job_name = %name, "Cancelling queued jobs by name");
sqlx::query(
"UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE name = $2 \
AND status = $3",
)
.bind(JobStatus::Cancelled.to_string())
.bind(name)
.bind(JobStatus::Queued.to_string())
.execute(pool)
.await?;
Ok(())
}
pub async fn clear(pool: &SqlitePool) -> Result<()> {
sqlx::query(
"
DELETE FROM sqlt_loco_queue;
DELETE FROM sqlt_loco_queue_lock;
",
)
.execute(pool)
.await?;
Ok(())
}
pub async fn clear_by_status(pool: &SqlitePool, status: Vec<JobStatus>) -> Result<()> {
let status_in = status
.iter()
.map(|s| format!("'{s}'"))
.collect::<Vec<String>>()
.join(",");
debug!(status = ?status, "Clearing jobs by status");
sqlx::query(&format!(
"DELETE FROM sqlt_loco_queue WHERE status IN ({status_in})"
))
.execute(pool)
.await?;
Ok(())
}
pub async fn requeue(pool: &SqlitePool, age_minutes: &i64) -> Result<()> {
let query = format!(
"UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE status = $2 AND updated_at <= DATETIME('now', '-{age_minutes} minute')"
);
debug!(age_minutes = age_minutes, "Requeueing stalled jobs");
sqlx::query(&query)
.bind(JobStatus::Queued.to_string())
.bind(JobStatus::Processing.to_string())
.execute(pool)
.await?;
Ok(())
}
pub async fn clear_jobs_older_than(
pool: &SqlitePool,
age_days: i64,
status: Option<&Vec<JobStatus>>,
) -> Result<()> {
let cutoff_date = Utc::now() - chrono::Duration::days(age_days);
let threshold_date = cutoff_date.format("%+").to_string();
let mut query_builder =
QueryBuilder::<sqlx::Sqlite>::new("DELETE FROM sqlt_loco_queue WHERE created_at <= ");
query_builder.push_bind(threshold_date);
if let Some(status_list) = status {
if !status_list.is_empty() {
let status_in = status_list
.iter()
.map(|s| format!("'{s}'"))
.collect::<Vec<String>>()
.join(",");
query_builder.push(format!(" AND status IN ({status_in})"));
}
}
debug!(age_days = age_days, status = ?status, "Clearing older jobs");
query_builder.build().execute(pool).await?;
Ok(())
}
pub async fn ping(pool: &SqlitePool) -> Result<()> {
trace!("Pinging job queue database");
sqlx::query("SELECT id from sqlt_loco_queue LIMIT 1")
.execute(pool)
.await?;
Ok(())
}
#[derive(Debug)]
pub struct RunOpts {
pub num_workers: u32,
pub poll_interval_sec: u32,
}
pub async fn create_provider(qcfg: &SqliteQueueConfig) -> Result<Queue> {
debug!(
num_workers = qcfg.num_workers,
poll_interval = qcfg.poll_interval_sec,
"Creating job queue provider"
);
let pool = connect(qcfg).await.map_err(Box::from)?;
let registry = JobRegistry::new();
let token = CancellationToken::new();
Ok(Queue::Sqlite(
pool,
Arc::new(tokio::sync::Mutex::new(registry)),
RunOpts {
num_workers: qcfg.num_workers,
poll_interval_sec: qcfg.poll_interval_sec,
},
token,
))
}
pub async fn get_jobs(
pool: &SqlitePool,
status: Option<&Vec<JobStatus>>,
age_days: Option<i64>,
) -> Result<Vec<Job>> {
let mut query = String::from("SELECT * FROM sqlt_loco_queue WHERE 1 = 1 ");
if let Some(status) = status {
let status_in = status
.iter()
.map(|s| format!("'{s}'"))
.collect::<Vec<String>>()
.join(",");
let _ = write!(query, " AND status IN ({status_in})");
}
if let Some(age_days) = age_days {
let cutoff_date = Utc::now() - chrono::Duration::days(age_days);
let threshold_date = cutoff_date.format("%+").to_string();
let _ = write!(query, " AND created_at <= '{threshold_date}' ");
}
debug!(status = ?status, age_days = ?age_days, "Retrieving jobs");
let rows = sqlx::query(&query).fetch_all(pool).await?;
let jobs = rows.iter().filter_map(|row| to_job(row).ok()).collect();
debug!(job_count = rows.len(), "Retrieved jobs from database");
Ok(jobs)
}
fn to_job(row: &SqliteRow) -> Result<Job> {
let tags_json: Option<serde_json::Value> = row.try_get("tags").unwrap_or_default();
let tags = tags_json.and_then(|json_val| {
if json_val.is_array() {
let tags_vec: Vec<String> =
serde_json::from_value(json_val).unwrap_or_else(|_| Vec::new());
if tags_vec.is_empty() {
None
} else {
Some(tags_vec)
}
} else {
None
}
});
Ok(Job {
id: row.get("id"),
name: row.get("name"),
data: row.get("task_data"),
status: row.get::<String, _>("status").parse().map_err(|err| {
let status: String = row.get("status");
tracing::error!(status, err = %err, "Unsupported job status in database");
Error::string("invalid job status")
})?,
run_at: row.get("run_at"),
interval: row.get("interval"),
created_at: row.try_get("created_at").unwrap_or_default(),
updated_at: row.try_get("updated_at").unwrap_or_default(),
tags,
})
}
#[cfg(test)]
mod tests {
use std::path::Path;
use chrono::{NaiveDate, NaiveTime, TimeZone};
use insta::{assert_debug_snapshot, with_settings};
use sqlx::{query_as, FromRow, Pool, Sqlite};
use super::*;
use crate::tests_cfg;
#[derive(Debug, Serialize, FromRow)]
pub struct TableInfo {
cid: i32,
name: String,
#[sqlx(rename = "type")]
_type: String,
notnull: bool,
dflt_value: Option<String>,
pk: bool,
}
#[derive(Debug, Serialize, FromRow)]
struct JobQueueLock {
id: i32,
is_locked: bool,
locked_at: Option<DateTime<Utc>>,
}
fn reduction() -> &'static [(&'static str, &'static str)] {
&[
("[A-Z0-9]{26}", "<REDACTED>"),
(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", "<REDACTED>"),
]
}
async fn init(db_path: &Path) -> Pool<Sqlite> {
let qcfg = SqliteQueueConfig {
uri: format!(
"sqlite://{}?mode=rwc",
db_path.join("sample.sqlite").display()
),
dangerously_flush: false,
enable_logging: false,
max_connections: 1,
min_connections: 1,
connect_timeout: 500,
idle_timeout: 500,
poll_interval_sec: 1,
num_workers: 1,
};
let pool = connect(&qcfg).await.unwrap();
sqlx::raw_sql(
r"
DROP TABLE IF EXISTS sqlt_loco_queue;
DROP TABLE IF EXISTS sqlt_loco_queue_lock;
",
)
.execute(&pool)
.await
.expect("drop table if exists");
pool
}
async fn get_all_jobs(pool: &SqlitePool) -> Vec<Job> {
sqlx::query("select * from sqlt_loco_queue")
.fetch_all(pool)
.await
.expect("get jobs")
.iter()
.filter_map(|row| to_job(row).ok())
.collect()
}
async fn get_job(pool: &SqlitePool, id: &str) -> Job {
sqlx::query(&format!("select * from sqlt_loco_queue where id = '{id}'"))
.fetch_all(pool)
.await
.expect("get jobs")
.first()
.and_then(|row| to_job(row).ok())
.expect("job not found")
}
#[tokio::test]
async fn can_initialize_database() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
for table in ["sqlt_loco_queue", "sqlt_loco_queue_lock"] {
let table_info: Vec<TableInfo> =
query_as::<_, TableInfo>(&format!("PRAGMA table_info({table})"))
.fetch_all(&pool)
.await
.unwrap();
assert_debug_snapshot!(table, table_info);
}
}
#[tokio::test]
async fn can_enqueue() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
let jobs = get_all_jobs(&pool).await;
assert_eq!(jobs.len(), 0);
let run_at = Utc.from_utc_datetime(
&NaiveDate::from_ymd_opt(2023, 1, 15)
.unwrap()
.and_time(NaiveTime::from_hms_opt(12, 30, 0).unwrap()),
);
let job_data = serde_json::json!({"user_id": 1});
let tags = Some(vec!["email".to_string(), "notification".to_string()]);
assert!(enqueue(
&pool,
"PasswordChangeNotification",
job_data,
run_at,
None,
tags
)
.await
.is_ok());
let jobs = get_all_jobs(&pool).await;
assert_eq!(jobs.len(), 1);
with_settings!({
filters => reduction().iter().map(|&(pattern, replacement)| (pattern, replacement)),
}, {
assert_debug_snapshot!(jobs);
});
let job_lock: JobQueueLock =
query_as::<_, JobQueueLock>("select * from sqlt_loco_queue_lock")
.fetch_one(&pool)
.await
.unwrap();
assert!(!job_lock.is_locked);
}
#[tokio::test]
async fn can_dequeue() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
let jobs = get_all_jobs(&pool).await;
assert_eq!(jobs.len(), 0);
let run_at = Utc.from_utc_datetime(
&NaiveDate::from_ymd_opt(2023, 1, 15)
.unwrap()
.and_time(NaiveTime::from_hms_opt(12, 30, 0).unwrap()),
);
let job_data = serde_json::json!({"user_id": 1});
assert!(enqueue(
&pool,
"PasswordChangeNotification",
job_data,
run_at,
None,
None
)
.await
.is_ok());
let job_before_dequeue = get_all_jobs(&pool)
.await
.first()
.cloned()
.expect("gets first job");
assert_eq!(job_before_dequeue.status, JobStatus::Queued);
std::thread::sleep(std::time::Duration::from_secs(1));
assert!(dequeue(&pool, &[]).await.is_ok());
let job_after_dequeue = get_all_jobs(&pool)
.await
.first()
.cloned()
.expect("gets first job");
assert_ne!(job_after_dequeue.updated_at, job_before_dequeue.updated_at);
with_settings!({
filters => reduction().iter().map(|&(pattern, replacement)| (pattern, replacement)),
}, {
assert_debug_snapshot!(job_after_dequeue);
});
}
#[tokio::test]
async fn can_complete_job_without_interval() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::sqlite_seed_data(&pool).await;
let job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA99").await;
assert_eq!(job.status, JobStatus::Queued);
assert!(complete_job(&pool, &job.id, None).await.is_ok());
let job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA99").await;
assert_eq!(job.status, JobStatus::Completed);
}
#[tokio::test]
async fn can_complete_job_with_interval() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::sqlite_seed_data(&pool).await;
let before_complete_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA98").await;
assert_eq!(before_complete_job.status, JobStatus::Completed);
std::thread::sleep(std::time::Duration::from_secs(1));
assert!(complete_job(&pool, &before_complete_job.id, Some(10))
.await
.is_ok());
let after_complete_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA98").await;
assert_ne!(
after_complete_job.updated_at,
before_complete_job.updated_at
);
with_settings!({
filters => reduction().iter().map(|&(pattern, replacement)| (pattern, replacement)),
}, {
assert_debug_snapshot!(after_complete_job);
});
}
#[tokio::test]
async fn can_fail_job() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::sqlite_seed_data(&pool).await;
let before_fail_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA97").await;
std::thread::sleep(std::time::Duration::from_secs(1));
assert!(fail_job(
&pool,
&before_fail_job.id,
&crate::Error::string("some error")
)
.await
.is_ok());
let after_fail_job = get_job(&pool, "01JDM0X8EVAM823JZBGKYNBA97").await;
assert_ne!(after_fail_job.updated_at, before_fail_job.updated_at);
with_settings!({
filters => reduction().iter().map(|&(pattern, replacement)| (pattern, replacement)),
}, {
assert_debug_snapshot!(after_fail_job);
});
}
#[tokio::test]
async fn can_cancel_job_by_name() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::sqlite_seed_data(&pool).await;
let count_cancelled_jobs = get_all_jobs(&pool)
.await
.iter()
.filter(|j| j.status == JobStatus::Cancelled)
.count();
assert_eq!(count_cancelled_jobs, 1);
assert!(cancel_jobs_by_name(&pool, "UserAccountActivation")
.await
.is_ok());
let count_cancelled_jobs = get_all_jobs(&pool)
.await
.iter()
.filter(|j| j.status == JobStatus::Cancelled)
.count();
assert_eq!(count_cancelled_jobs, 2);
}
#[tokio::test]
async fn can_clear() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::sqlite_seed_data(&pool).await;
let job_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sqlt_loco_queue")
.fetch_one(&pool)
.await
.unwrap();
let lock_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sqlt_loco_queue_lock")
.fetch_one(&pool)
.await
.unwrap();
assert_ne!(job_count, 0);
assert_ne!(lock_count, 0);
assert!(clear(&pool).await.is_ok());
let job_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sqlt_loco_queue")
.fetch_one(&pool)
.await
.unwrap();
let lock_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM sqlt_loco_queue_lock")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(job_count, 0);
assert_eq!(lock_count, 0);
}
#[tokio::test]
async fn can_clear_by_status() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::sqlite_seed_data(&pool).await;
let jobs = get_all_jobs(&pool).await;
assert_eq!(jobs.len(), 14);
assert_eq!(
jobs.iter()
.filter(|j| j.status == JobStatus::Completed)
.count(),
3
);
assert_eq!(
jobs.iter()
.filter(|j| j.status == JobStatus::Failed)
.count(),
2
);
assert!(
clear_by_status(&pool, vec![JobStatus::Completed, JobStatus::Failed])
.await
.is_ok()
);
let jobs = get_all_jobs(&pool).await;
assert_eq!(jobs.len(), 9);
assert_eq!(
jobs.iter()
.filter(|j| j.status == JobStatus::Completed)
.count(),
0
);
assert_eq!(
jobs.iter()
.filter(|j| j.status == JobStatus::Failed)
.count(),
0
);
}
#[tokio::test]
async fn can_clear_jobs_older_than() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
sqlx::query(
r"INSERT INTO sqlt_loco_queue (id, name, task_data, status,run_at, created_at, updated_at) VALUES
('job1', 'Test Job 1', '{}', 'queued', CURRENT_TIMESTAMP,DATETIME('now', '-15 days'), CURRENT_TIMESTAMP),
('job2', 'Test Job 2', '{}', 'queued', CURRENT_TIMESTAMP, DATETIME('now', '-5 days'), CURRENT_TIMESTAMP),
('job3', 'Test Job 3', '{}', 'queued', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)",
)
.execute(&pool)
.await
.unwrap();
assert_eq!(get_all_jobs(&pool).await.len(), 3);
assert!(clear_jobs_older_than(&pool, 10, None).await.is_ok());
assert_eq!(get_all_jobs(&pool).await.len(), 2);
}
#[tokio::test]
async fn can_clear_jobs_older_than_with_status() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
sqlx::query(
r"INSERT INTO sqlt_loco_queue (id, name, task_data, status,run_at, created_at, updated_at) VALUES
('job1', 'Test Job 1', '{}', 'completed', CURRENT_TIMESTAMP,DATETIME('now', '-20 days'), CURRENT_TIMESTAMP),
('job2', 'Test Job 2', '{}', 'failed', CURRENT_TIMESTAMP,DATETIME('now', '-15 days'), CURRENT_TIMESTAMP),
('job3', 'Test Job 3', '{}', 'completed', CURRENT_TIMESTAMP, DATETIME('now', '-5 days'), CURRENT_TIMESTAMP),
('job4', 'Test Job 4', '{}', 'cancelled', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)",
)
.execute(&pool)
.await
.unwrap();
assert_eq!(get_all_jobs(&pool).await.len(), 4);
assert!(clear_jobs_older_than(
&pool,
10,
Some(&vec![JobStatus::Cancelled, JobStatus::Completed])
)
.await
.is_ok());
assert_eq!(get_all_jobs(&pool).await.len(), 3);
}
#[tokio::test]
async fn can_get_jobs() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::sqlite_seed_data(&pool).await;
assert_eq!(
get_jobs(&pool, Some(&vec![JobStatus::Failed]), None)
.await
.expect("get jobs")
.len(),
2
);
assert_eq!(
get_jobs(
&pool,
Some(&vec![JobStatus::Failed, JobStatus::Completed]),
None
)
.await
.expect("get jobs")
.len(),
5
);
assert_eq!(
get_jobs(&pool, None, None).await.expect("get jobs").len(),
14
);
}
#[tokio::test]
async fn can_get_jobs_with_age() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
sqlx::query(
r"INSERT INTO sqlt_loco_queue (id, name, task_data, status,run_at, created_at, updated_at) VALUES
('job1', 'Test Job 1', '{}', 'completed', CURRENT_TIMESTAMP,DATETIME('now', '-20 days'), CURRENT_TIMESTAMP),
('job2', 'Test Job 2', '{}', 'failed', CURRENT_TIMESTAMP,DATETIME('now', '-15 days'), CURRENT_TIMESTAMP),
('job3', 'Test Job 3', '{}', 'completed', CURRENT_TIMESTAMP, DATETIME('now', '-5 days'), CURRENT_TIMESTAMP),
('job4', 'Test Job 4', '{}', 'cancelled', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)",
)
.execute(&pool)
.await
.unwrap();
assert_eq!(
get_jobs(
&pool,
Some(&vec![JobStatus::Failed, JobStatus::Completed]),
Some(10)
)
.await
.expect("get jobs")
.len(),
2
);
}
#[tokio::test]
async fn can_requeue() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
sqlx::query(
r"INSERT INTO sqlt_loco_queue (id, name, task_data, status,run_at, created_at, updated_at) VALUES
('job1', 'Test Job 1', '{}', 'processing', CURRENT_TIMESTAMP,CURRENT_TIMESTAMP, DATETIME('now', '-20 minute')),
('job2', 'Test Job 2', '{}', 'processing', CURRENT_TIMESTAMP,CURRENT_TIMESTAMP, DATETIME('now', '-5 minute')),
('job3', 'Test Job 3', '{}', 'completed', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, DATETIME('now', '-5 minute')),
('job4', 'Test Job 4', '{}', 'queued', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP),
('job5', 'Test Job 5', '{}', 'processing', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)",
)
.execute(&pool)
.await
.unwrap();
let jobs = get_all_jobs(&pool).await;
let processing_job_count = jobs
.iter()
.filter(|job| job.status == JobStatus::Processing)
.count();
let queued_job_count = jobs
.iter()
.filter(|job| job.status == JobStatus::Queued)
.count();
assert_eq!(processing_job_count, 3);
assert_eq!(queued_job_count, 1);
assert!(requeue(&pool, &10).await.is_ok());
let jobs = get_all_jobs(&pool).await;
let processing_job_count = jobs
.iter()
.filter(|job| job.status == JobStatus::Processing)
.count();
let queued_job_count = jobs
.iter()
.filter(|job| job.status == JobStatus::Queued)
.count();
assert_eq!(processing_job_count, 2);
assert_eq!(queued_job_count, 2);
}
#[tokio::test]
async fn can_handle_worker_panic() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
let job_data = serde_json::json!(null);
let job_id = enqueue(&pool, "PanicJob", job_data, Utc::now(), None, None)
.await
.expect("Failed to enqueue job");
struct PanicWorker;
#[async_trait::async_trait]
impl BackgroundWorker<()> for PanicWorker {
fn build(_ctx: &crate::app::AppContext) -> Self {
Self
}
async fn perform(&self, _args: ()) -> crate::Result<()> {
panic!("intentional panic for testing");
}
}
let mut registry = JobRegistry::new();
assert!(registry
.register_worker("PanicJob".to_string(), PanicWorker)
.is_ok());
let job = get_job(&pool, &job_id).await;
assert_eq!(job.status, JobStatus::Queued);
let opts = RunOpts {
num_workers: 1,
poll_interval_sec: 1,
};
let token = CancellationToken::new();
let handles = registry.run(&pool, &opts, &token, &[]);
sleep(Duration::from_secs(1)).await;
for handle in handles {
handle.abort();
}
let failed_job = get_job(&pool, &job_id).await;
assert_eq!(failed_job.status, JobStatus::Failed);
println!("Job data: {:?}", failed_job.data);
let error_msg = failed_job
.data
.as_object()
.and_then(|obj| obj.get("error"))
.and_then(|v| v.as_str())
.expect("Expected error message in job data");
assert!(
error_msg.contains("intentional panic for testing"),
"Error message '{error_msg}' did not contain expected text"
);
}
#[tokio::test]
async fn can_dequeue_with_tags() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;
assert!(initialize_database(&pool).await.is_ok());
let run_at = Utc::now() - chrono::Duration::minutes(5); let job_data = serde_json::json!({"user_id": 1});
let email_tags = Some(vec!["email".to_string()]);
let email_id = enqueue(
&pool,
"EmailNotification",
job_data.clone(),
run_at,
None,
email_tags,
)
.await
.expect("Failed to enqueue email job");
let sms_tags = Some(vec!["sms".to_string()]);
let sms_id = enqueue(
&pool,
"SmsNotification",
job_data.clone(),
run_at,
None,
sms_tags,
)
.await
.expect("Failed to enqueue sms job");
let multi_tags = Some(vec!["email".to_string(), "priority".to_string()]);
let multi_id = enqueue(
&pool,
"PriorityEmail",
job_data.clone(),
run_at,
None,
multi_tags,
)
.await
.expect("Failed to enqueue multi-tag job");
let no_tag_id = enqueue(
&pool,
"GenericNotification",
job_data.clone(),
run_at,
None,
None,
)
.await
.expect("Failed to enqueue untagged job");
let all_jobs = get_all_jobs(&pool).await;
assert_eq!(all_jobs.len(), 4);
let job = dequeue(&pool, &[]).await.expect("dequeue failed");
assert!(job.is_some());
let job = job.unwrap();
assert_eq!(job.id, no_tag_id);
assert!(job.tags.is_none());
complete_job(&pool, &job.id, None)
.await
.expect("Failed to complete job");
let job = dequeue(&pool, &["email".to_string()])
.await
.expect("dequeue failed");
assert!(job.is_some());
let job = job.unwrap();
assert!(
job.id == email_id || job.id == multi_id,
"Expected either email job or multi-tag job"
);
assert!(job.tags.is_some());
assert!(job.tags.as_ref().unwrap().contains(&"email".to_string()));
complete_job(&pool, &job.id, None)
.await
.expect("Failed to complete job");
let job = dequeue(&pool, &["email".to_string()])
.await
.expect("dequeue failed");
assert!(job.is_some());
let job = job.unwrap();
assert!(
job.id == email_id || job.id == multi_id,
"Expected either email job or multi-tag job"
);
assert!(job.tags.is_some());
assert!(job.tags.as_ref().unwrap().contains(&"email".to_string()));
complete_job(&pool, &job.id, None)
.await
.expect("Failed to complete job");
let job = dequeue(&pool, &["sms".to_string()])
.await
.expect("dequeue failed");
assert!(job.is_some());
let job = job.unwrap();
assert_eq!(job.id, sms_id);
assert!(job.tags.is_some());
assert_eq!(job.tags.as_ref().unwrap(), &vec!["sms".to_string()]);
complete_job(&pool, &job.id, None)
.await
.expect("Failed to complete job");
let job = dequeue(&pool, &["email".to_string()])
.await
.expect("dequeue failed");
assert!(job.is_none());
let job = dequeue(&pool, &[]).await.expect("dequeue failed");
assert!(job.is_none());
}
}