use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use bytes::Bytes;
use slatedb::object_store::ObjectStore;
use slatedb::{Db, IsolationLevel};
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tracing::{debug, instrument, warn};
use ulid::Ulid;
use crate::error::{Error, Result};
use crate::job::{JobRecord, JobStatus};
use crate::reaper::{reap_expired, sweep_dead, sweep_done};
use crate::scheduler::{promote_due_jobs, schedule_loop};
use crate::stats::{CounterMergeOperator, QueueStats, read_stats, update_stats};
const DEFAULT_MAX_ATTEMPTS: u32 = 3;
const DEFAULT_LEASE_DURATION: Duration = Duration::from_secs(30);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CancelOutcome {
Removed,
Requested,
NotFound,
}
impl CancelOutcome {
pub fn acted(self) -> bool {
!matches!(self, CancelOutcome::NotFound)
}
}
pub const PRIORITY_HIGH: u32 = 100;
pub const PRIORITY_NORMAL: u32 = 1_000;
pub const PRIORITY_LOW: u32 = 10_000;
pub(crate) fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time before epoch")
.as_millis() as u64
}
pub(crate) fn pending_key(queue: &str, priority: u32, id: &str) -> String {
format!("pending:{}:{:010}:{}", queue, priority, id)
}
pub(crate) fn pending_prefix(queue: &str) -> String {
format!("pending:{}:", queue)
}
pub(crate) fn dead_key(queue: &str, id: &str) -> String {
format!("dead:{}:{}", queue, id)
}
pub(crate) fn claimed_key(queue: &str, lease_expires_at: u64, id: &str) -> String {
format!("claimed:{:020}:{}:{}", lease_expires_at, queue, id)
}
pub(crate) fn done_key(queue: &str, id: &str) -> String {
format!("done:{}:{}", queue, id)
}
pub(crate) fn scheduled_key(queue: &str, run_at: u64, id: &str) -> String {
format!("scheduled:{:020}:{}:{}", run_at, queue, id)
}
pub(crate) fn job_index_key(id: &str) -> String {
format!("jobindex:{}", id)
}
pub(crate) fn dedup_index_key(queue: &str, key: &str) -> String {
format!("dedup:{}:{}", queue, key)
}
const USR_PREFIX: &[u8] = b"usr:";
pub const MAX_KV_VALUE_SIZE: usize = 256 * 1024;
fn user_scoped_key(key: &[u8]) -> Vec<u8> {
let mut out = Vec::with_capacity(USR_PREFIX.len() + key.len());
out.extend_from_slice(USR_PREFIX);
out.extend_from_slice(key);
out
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EnqueueResult {
New(String),
AlreadyEnqueued(String),
}
impl EnqueueResult {
pub fn id(&self) -> &str {
match self {
Self::New(id) | Self::AlreadyEnqueued(id) => id,
}
}
}
pub(crate) fn backoff_delay(attempts: u32, base: Duration, max: Duration) -> Duration {
if base.is_zero() {
return Duration::ZERO;
}
let mult = 2u32.saturating_pow(attempts.saturating_sub(1));
base.saturating_mul(mult).min(max)
}
#[derive(Debug, Clone)]
pub struct QueueConfig {
pub max_attempts: u32,
pub lease_duration: Duration,
pub default_priority: u32,
pub retry_backoff_base: Duration,
pub retry_backoff_max: Duration,
}
impl Default for QueueConfig {
fn default() -> Self {
Self {
max_attempts: DEFAULT_MAX_ATTEMPTS,
lease_duration: DEFAULT_LEASE_DURATION,
default_priority: PRIORITY_NORMAL,
retry_backoff_base: Duration::from_secs(1),
retry_backoff_max: Duration::from_secs(300),
}
}
}
pub struct OpenOptions {
pub reaper_interval: Duration,
pub scheduler_interval: Duration,
pub default_queue_config: QueueConfig,
pub queue_configs: HashMap<String, QueueConfig>,
pub keep_done_jobs: Option<Duration>,
pub dead_retention: Option<Duration>,
}
impl Default for OpenOptions {
fn default() -> Self {
Self {
reaper_interval: Duration::from_secs(5),
scheduler_interval: Duration::from_secs(1),
default_queue_config: QueueConfig::default(),
queue_configs: HashMap::new(),
keep_done_jobs: None,
dead_retention: Some(Duration::from_secs(7 * 24 * 3600)),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct EnqueueOptions {
pub max_attempts: Option<u32>,
pub priority: Option<u32>,
pub run_at: Option<std::time::SystemTime>,
pub dedup_key: Option<String>,
pub headers: HashMap<String, String>,
}
pub struct Queue {
db: Arc<Db>,
reaper_shutdown: watch::Sender<bool>,
reaper_handle: JoinHandle<()>,
scheduler_shutdown: watch::Sender<bool>,
scheduler_handle: JoinHandle<()>,
default_queue_config: QueueConfig,
queue_configs: HashMap<String, QueueConfig>,
keep_done_jobs: Option<Duration>,
job_available: Arc<tokio::sync::Notify>,
claimed_tokens: Arc<std::sync::Mutex<HashMap<String, tokio_util::sync::CancellationToken>>>,
completion_notify: Arc<tokio::sync::Notify>,
}
#[derive(Debug)]
pub enum WaitOutcome {
Completed(Option<Box<JobRecord>>),
TimedOut,
NotFound,
}
impl Queue {
pub async fn open(object_store: Arc<dyn ObjectStore>, path: &str) -> Result<Self> {
Self::open_with_options(object_store, path, OpenOptions::default()).await
}
pub async fn open_with_options(
object_store: Arc<dyn ObjectStore>,
path: &str,
opts: OpenOptions,
) -> Result<Self> {
let db = Arc::new(
Db::builder(path, object_store)
.with_merge_operator(Arc::new(CounterMergeOperator))
.build()
.await?,
);
let job_available = Arc::new(tokio::sync::Notify::new());
let completion_notify = Arc::new(tokio::sync::Notify::new());
let (reaper_shutdown, reaper_rx) = watch::channel(false);
let reaper_handle = tokio::spawn(crate::reaper::reap_loop(
db.clone(),
opts.reaper_interval,
opts.keep_done_jobs,
opts.dead_retention,
job_available.clone(),
completion_notify.clone(),
reaper_rx,
));
let (scheduler_shutdown, scheduler_rx) = watch::channel(false);
let scheduler_handle = tokio::spawn(schedule_loop(
db.clone(),
opts.scheduler_interval,
job_available.clone(),
scheduler_rx,
));
Ok(Self {
db,
reaper_shutdown,
reaper_handle,
scheduler_shutdown,
scheduler_handle,
default_queue_config: opts.default_queue_config,
queue_configs: opts.queue_configs,
keep_done_jobs: opts.keep_done_jobs,
job_available,
claimed_tokens: Arc::new(std::sync::Mutex::new(HashMap::new())),
completion_notify,
})
}
fn install_cancel_token(&self, job: &mut JobRecord) {
let token = tokio_util::sync::CancellationToken::new();
if job.cancel_requested {
token.cancel();
}
self.claimed_tokens
.lock()
.expect("claimed_tokens mutex poisoned")
.insert(job.id.clone(), token.clone());
job.cancel_token = Some(token);
}
fn clear_cancel_token(&self, job_id: &str) {
self.claimed_tokens
.lock()
.expect("claimed_tokens mutex poisoned")
.remove(job_id);
}
fn queue_config(&self, queue: &str) -> &QueueConfig {
self.queue_configs
.get(queue)
.unwrap_or(&self.default_queue_config)
}
pub fn queue_lease_duration(&self, queue: &str) -> Duration {
self.queue_config(queue).lease_duration
}
pub async fn enqueue(&self, queue: &str, payload: Vec<u8>) -> Result<String> {
self.enqueue_with(queue, payload, EnqueueOptions::default())
.await
}
#[instrument(skip(self, payload), fields(queue, job_id))]
pub async fn enqueue_with(
&self,
queue: &str,
payload: Vec<u8>,
opts: EnqueueOptions,
) -> Result<String> {
let cfg = self.queue_config(queue);
let max_attempts = opts.max_attempts.unwrap_or(cfg.max_attempts);
let priority = opts.priority.unwrap_or(cfg.default_priority);
let run_at = opts.run_at.and_then(|when| {
let ms = when
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
(ms > now_ms()).then_some(ms)
});
let id = Ulid::new().to_string();
let (status, key) = match run_at {
Some(ms) => (JobStatus::Scheduled, scheduled_key(queue, ms, &id)),
None => (JobStatus::Pending, pending_key(queue, priority, &id)),
};
let job = JobRecord {
id,
queue: queue.to_string(),
payload,
headers: opts.headers,
status,
attempts: 0,
max_attempts,
enqueued_at: now_ms(),
claimed_at: None,
lease_expires_at: None,
run_at,
priority,
last_error: None,
dedup_key: opts.dedup_key.clone(),
completed_at: None,
failed_at: None,
cancel_requested: false,
cancel_token: None,
};
match opts.dedup_key {
Some(dk) => self.write_unique(job, key, dk).await,
None => self.write_new(job, key).await,
}
}
async fn write_new(&self, job: JobRecord, key: String) -> Result<String> {
let value = rmp_serde::to_vec_named(&job)?;
let JobRecord {
id,
queue,
status,
priority,
run_at,
..
} = job;
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
txn.put(key.as_bytes(), &value)?;
txn.put(job_index_key(&id).as_bytes(), key.as_bytes())?;
update_stats(&txn, &queue, &[(status, 1)])?;
txn.commit().await?;
if matches!(status, JobStatus::Pending) {
self.job_available.notify_waiters();
}
debug!(queue = %queue, job_id = %id, priority, ?run_at, "job enqueued");
Ok(id)
}
#[instrument(skip(self, payload, kv_writes), fields(queue, job_id))]
pub async fn enqueue_with_kv(
&self,
queue: &str,
payload: Vec<u8>,
opts: EnqueueOptions,
kv_writes: HashMap<Vec<u8>, Vec<u8>>,
) -> Result<EnqueueResult> {
for value in kv_writes.values() {
if value.len() > MAX_KV_VALUE_SIZE {
return Err(Error::KvValueTooLarge {
size: value.len(),
max: MAX_KV_VALUE_SIZE,
});
}
}
let cfg = self.queue_config(queue);
let max_attempts = opts.max_attempts.unwrap_or(cfg.max_attempts);
let priority = opts.priority.unwrap_or(cfg.default_priority);
let run_at = opts.run_at.and_then(|when| {
let ms = when
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
(ms > now_ms()).then_some(ms)
});
let id = Ulid::new().to_string();
let (status, key) = match run_at {
Some(ms) => (JobStatus::Scheduled, scheduled_key(queue, ms, &id)),
None => (JobStatus::Pending, pending_key(queue, priority, &id)),
};
let job = JobRecord {
id: id.clone(),
queue: queue.to_string(),
payload,
headers: opts.headers,
status,
attempts: 0,
max_attempts,
enqueued_at: now_ms(),
claimed_at: None,
lease_expires_at: None,
run_at,
priority,
last_error: None,
dedup_key: opts.dedup_key.clone(),
completed_at: None,
failed_at: None,
cancel_requested: false,
cancel_token: None,
};
let value = rmp_serde::to_vec_named(&job)?;
let dkey = opts.dedup_key.as_ref().map(|dk| dedup_index_key(queue, dk));
loop {
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
if let Some(ref dkey) = dkey {
if let Some(bytes) = txn.get(dkey.as_bytes()).await? {
txn.rollback();
let existing =
String::from_utf8(bytes.to_vec()).map_err(|_| Error::InvalidState)?;
return Ok(EnqueueResult::AlreadyEnqueued(existing));
}
}
txn.put(key.as_bytes(), &value)?;
txn.put(job_index_key(&id).as_bytes(), key.as_bytes())?;
if let Some(ref dkey) = dkey {
txn.put(dkey.as_bytes(), id.as_bytes())?;
}
update_stats(&txn, queue, &[(status, 1)])?;
for (k, v) in &kv_writes {
txn.put(user_scoped_key(k), v)?;
}
match txn.commit().await {
Ok(_) => {
if matches!(status, JobStatus::Pending) {
self.job_available.notify_waiters();
}
debug!(queue = %queue, job_id = %id, "job enqueued with kv");
return Ok(EnqueueResult::New(id));
}
Err(e) if e.kind() == slatedb::ErrorKind::Transaction => continue,
Err(e) => return Err(e.into()),
}
}
}
pub async fn kv_get(&self, key: &[u8]) -> Result<Option<Bytes>> {
Ok(self.db.get(user_scoped_key(key)).await?)
}
pub async fn kv_delete(&self, key: &[u8]) -> Result<()> {
self.db.delete(user_scoped_key(key)).await?;
Ok(())
}
async fn write_unique(&self, job: JobRecord, key: String, dedup_key: String) -> Result<String> {
let dkey = dedup_index_key(&job.queue, &dedup_key);
let value = rmp_serde::to_vec_named(&job)?;
let JobRecord {
id, queue, status, ..
} = job;
loop {
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
if let Some(bytes) = txn.get(dkey.as_bytes()).await? {
txn.rollback();
return String::from_utf8(bytes.to_vec()).map_err(|_| Error::InvalidState);
}
txn.put(key.as_bytes(), &value)?;
txn.put(job_index_key(&id).as_bytes(), key.as_bytes())?;
txn.put(dkey.as_bytes(), id.as_bytes())?;
update_stats(&txn, &queue, &[(status, 1)])?;
match txn.commit().await {
Ok(_) => {
if matches!(status, JobStatus::Pending) {
self.job_available.notify_waiters();
}
debug!(queue = %queue, job_id = %id, dedup_key, "unique job enqueued");
return Ok(id);
}
Err(e) if e.kind() == slatedb::ErrorKind::Transaction => continue,
Err(e) => return Err(e.into()),
}
}
}
pub async fn claim_next(&self, queue: &str) -> Result<Option<JobRecord>> {
let lease_duration = self.queue_config(queue).lease_duration;
self.claim(queue, lease_duration).await
}
pub async fn wait_for_jobs(&self, max_wait: Duration) {
let notified = self.job_available.notified();
tokio::pin!(notified);
tokio::select! {
_ = &mut notified => {}
_ = tokio::time::sleep(max_wait) => {}
}
}
pub async fn claim_with_wait(
&self,
queue: &str,
lease_duration: Duration,
max_wait: Duration,
) -> Result<Option<JobRecord>> {
let notified = self.job_available.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if let Some(job) = self.claim(queue, lease_duration).await? {
return Ok(Some(job));
}
tokio::select! {
_ = &mut notified => {}
_ = tokio::time::sleep(max_wait) => return Ok(None),
}
self.claim(queue, lease_duration).await
}
#[instrument(skip(self), fields(queue))]
pub async fn claim(&self, queue: &str, lease_duration: Duration) -> Result<Option<JobRecord>> {
let prefix = pending_prefix(queue);
loop {
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
let mut iter = txn.scan_prefix(prefix.as_bytes()).await?;
let kv = match iter.next().await? {
Some(kv) => kv,
None => return Ok(None),
};
drop(iter);
let pending_key_bytes = kv.key.clone();
let mut job: JobRecord = rmp_serde::from_slice(&kv.value)?;
let now = now_ms();
let lease_expires_at = now + lease_duration.as_millis() as u64;
job.status = JobStatus::Claimed;
job.claimed_at = Some(now);
job.lease_expires_at = Some(lease_expires_at);
job.attempts += 1;
let dedup_key_to_release = job.dedup_key.take();
let claimed = claimed_key(&job.queue, lease_expires_at, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.delete(&pending_key_bytes)?;
txn.put(claimed.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), claimed.as_bytes())?;
if let Some(dk) = dedup_key_to_release.as_deref() {
txn.delete(dedup_index_key(&job.queue, dk).as_bytes())?;
}
update_stats(
&txn,
queue,
&[(JobStatus::Pending, -1), (JobStatus::Claimed, 1)],
)?;
self.install_cancel_token(&mut job);
match txn.commit().await {
Ok(_) => {
debug!(queue = queue, job_id = %job.id, attempt = job.attempts, "job claimed");
return Ok(Some(job));
}
Err(e) if e.kind() == slatedb::ErrorKind::Transaction => {
warn!(queue = queue, "claim transaction conflict, retrying");
self.clear_cancel_token(&job.id);
continue;
}
Err(e) => {
self.clear_cancel_token(&job.id);
return Err(e.into());
}
}
}
}
#[instrument(skip(self, job), fields(queue = %job.queue, job_id = %job.id))]
pub async fn ack(&self, job: &JobRecord) -> Result<()> {
let lease_expires_at = job.lease_expires_at.ok_or(Error::InvalidState)?;
let claimed = claimed_key(&job.queue, lease_expires_at, &job.id);
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
txn.delete(claimed.as_bytes())?;
if self.keep_done_jobs.is_some() {
let mut done_job = job.clone();
done_job.status = JobStatus::Done;
done_job.completed_at = Some(now_ms());
let value = rmp_serde::to_vec_named(&done_job)?;
let done = done_key(&job.queue, &job.id);
txn.put(done.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), done.as_bytes())?;
} else {
txn.delete(job_index_key(&job.id).as_bytes())?;
}
update_stats(
&txn,
&job.queue,
&[(JobStatus::Claimed, -1), (JobStatus::Done, 1)],
)?;
txn.commit().await?;
self.clear_cancel_token(&job.id);
self.completion_notify.notify_waiters();
debug!(queue = %job.queue, job_id = %job.id, "job acked");
Ok(())
}
#[instrument(skip(self, job), fields(queue = %job.queue, job_id = %job.id))]
pub async fn nack(&self, mut job: JobRecord, error: &str) -> Result<()> {
let lease_expires_at = job.lease_expires_at.ok_or(Error::InvalidState)?;
let claimed = claimed_key(&job.queue, lease_expires_at, &job.id);
job.last_error = Some(error.to_string());
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
txn.delete(claimed.as_bytes())?;
if job.attempts >= job.max_attempts {
job.status = JobStatus::Dead;
job.failed_at = Some(now_ms());
let dead = dead_key(&job.queue, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(dead.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), dead.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Claimed, -1), (JobStatus::Dead, 1)],
)?;
warn!(
queue = %job.queue,
job_id = %job.id,
attempts = job.attempts,
"job dead-lettered"
);
} else {
let cfg = self.queue_config(&job.queue);
let backoff =
backoff_delay(job.attempts, cfg.retry_backoff_base, cfg.retry_backoff_max);
job.claimed_at = None;
job.lease_expires_at = None;
if backoff.is_zero() {
job.status = JobStatus::Pending;
let priority = job.priority;
let pending = pending_key(&job.queue, priority, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(pending.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), pending.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Pending, 1), (JobStatus::Claimed, -1)],
)?;
debug!(
queue = %job.queue,
job_id = %job.id,
attempts = job.attempts,
"job re-queued"
);
} else {
let run_at = now_ms() + backoff.as_millis() as u64;
job.status = JobStatus::Scheduled;
job.run_at = Some(run_at);
let scheduled = scheduled_key(&job.queue, run_at, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(scheduled.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), scheduled.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Claimed, -1), (JobStatus::Scheduled, 1)],
)?;
debug!(
queue = %job.queue,
job_id = %job.id,
attempts = job.attempts,
backoff_ms = backoff.as_millis() as u64,
"job scheduled for retry"
);
}
}
let immediate_retry = matches!(job.status, JobStatus::Pending);
let became_dead = matches!(job.status, JobStatus::Dead);
let job_id = job.id.clone();
txn.commit().await?;
self.clear_cancel_token(&job_id);
if immediate_retry {
self.job_available.notify_waiters();
}
if became_dead {
self.completion_notify.notify_waiters();
}
Ok(())
}
#[instrument(skip(self, job), fields(queue = %job.queue, job_id = %job.id))]
pub async fn dead_letter(&self, mut job: JobRecord, reason: &str) -> Result<()> {
let lease_expires_at = job.lease_expires_at.ok_or(Error::InvalidState)?;
let claimed = claimed_key(&job.queue, lease_expires_at, &job.id);
job.last_error = Some(reason.to_string());
job.status = JobStatus::Dead;
job.failed_at = Some(now_ms());
job.claimed_at = None;
job.lease_expires_at = None;
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
txn.delete(claimed.as_bytes())?;
let dead = dead_key(&job.queue, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(dead.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), dead.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Claimed, -1), (JobStatus::Dead, 1)],
)?;
txn.commit().await?;
self.clear_cancel_token(&job.id);
self.completion_notify.notify_waiters();
warn!(
queue = %job.queue,
job_id = %job.id,
attempts = job.attempts,
"job dead-lettered (permanent failure)"
);
Ok(())
}
pub async fn stats(&self, queue: &str) -> Result<QueueStats> {
read_stats(&self.db, queue).await
}
pub async fn list_queues(&self) -> Result<Vec<String>> {
let mut seen = std::collections::HashSet::new();
let mut queues = Vec::new();
let mut iter = self.db.scan_prefix(b"stats:").await?;
while let Some(kv) = iter.next().await? {
let key_str = match std::str::from_utf8(&kv.key) {
Ok(s) => s,
Err(_) => continue,
};
let without_prefix = key_str.strip_prefix("stats:").unwrap_or(key_str);
if let Some(idx) = without_prefix.rfind(':') {
let queue = &without_prefix[..idx];
if seen.insert(queue.to_string()) {
queues.push(queue.to_string());
}
}
}
Ok(queues)
}
pub async fn dead_jobs(
&self,
queue: &str,
after: Option<&str>,
limit: usize,
) -> Result<Vec<JobRecord>> {
if limit == 0 {
return Ok(Vec::new());
}
let prefix = format!("dead:{}:", queue);
let mut jobs = Vec::with_capacity(limit);
let mut iter = self.db.scan_prefix(prefix.as_bytes()).await?;
while let Some(kv) = iter.next().await? {
if let Some(after_id) = after {
let key_str = std::str::from_utf8(&kv.key).unwrap_or("");
let id = key_str.rsplit(':').next().unwrap_or("");
if id <= after_id {
continue;
}
}
let job: JobRecord = rmp_serde::from_slice(&kv.value)?;
jobs.push(job);
if jobs.len() >= limit {
break;
}
}
Ok(jobs)
}
#[instrument(skip(self, job), fields(queue = %job.queue, job_id = %job.id))]
pub async fn requeue_dead_job(&self, mut job: JobRecord) -> Result<()> {
if job.status != JobStatus::Dead {
return Err(Error::InvalidState);
}
let dead = dead_key(&job.queue, &job.id);
let priority = job.priority;
job.status = JobStatus::Pending;
job.attempts = 0;
job.last_error = None;
job.claimed_at = None;
job.lease_expires_at = None;
job.failed_at = None;
job.cancel_requested = false;
let pending = pending_key(&job.queue, priority, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
txn.delete(dead.as_bytes())?;
txn.put(pending.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), pending.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Pending, 1), (JobStatus::Dead, -1)],
)?;
txn.commit().await?;
self.job_available.notify_waiters();
debug!(queue = %job.queue, job_id = %job.id, "dead job re-queued");
Ok(())
}
#[instrument(skip(self, job), fields(queue = %job.queue, job_id = %job.id))]
pub async fn renew_lease(&self, job: &mut JobRecord, extension: Duration) -> Result<()> {
let old_expiry = job.lease_expires_at.ok_or(Error::InvalidState)?;
let old_claimed = claimed_key(&job.queue, old_expiry, &job.id);
let new_expiry = now_ms() + extension.as_millis() as u64;
job.lease_expires_at = Some(new_expiry);
let new_claimed = claimed_key(&job.queue, new_expiry, &job.id);
let value = rmp_serde::to_vec_named(job)?;
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
txn.delete(old_claimed.as_bytes())?;
txn.put(new_claimed.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), new_claimed.as_bytes())?;
txn.commit().await?;
debug!(queue = %job.queue, job_id = %job.id, new_expiry, "lease renewed");
Ok(())
}
pub async fn wait_for_completion(&self, id: &str, timeout: Duration) -> Result<WaitOutcome> {
let deadline = tokio::time::Instant::now() + timeout;
let mut first = true;
loop {
let notified = self.completion_notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
match self.get_job(id).await? {
None if first => return Ok(WaitOutcome::NotFound),
None => return Ok(WaitOutcome::Completed(None)),
Some(job) if matches!(job.status, JobStatus::Done | JobStatus::Dead) => {
return Ok(WaitOutcome::Completed(Some(Box::new(job))));
}
Some(_) => {}
}
first = false;
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Ok(WaitOutcome::TimedOut);
}
tokio::select! {
_ = &mut notified => {}
_ = tokio::time::sleep(remaining) => return Ok(WaitOutcome::TimedOut),
}
}
}
pub async fn get_job(&self, id: &str) -> Result<Option<JobRecord>> {
let index_key = job_index_key(id);
let current_key = match self.db.get(index_key.as_bytes()).await? {
None => return Ok(None),
Some(bytes) => match String::from_utf8(bytes.to_vec()) {
Ok(s) => s,
Err(_) => return Err(Error::InvalidState),
},
};
match self.db.get(current_key.as_bytes()).await? {
None => Ok(None),
Some(bytes) => Ok(Some(rmp_serde::from_slice(&bytes)?)),
}
}
pub async fn cancel(&self, id: &str) -> Result<CancelOutcome> {
loop {
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
let index_key = job_index_key(id);
let current_key = match txn.get(index_key.as_bytes()).await? {
None => {
txn.rollback();
return Ok(CancelOutcome::NotFound);
}
Some(bytes) => match String::from_utf8(bytes.to_vec()) {
Ok(s) => s,
Err(_) => {
txn.rollback();
return Err(Error::InvalidState);
}
},
};
let mut job: JobRecord = match txn.get(current_key.as_bytes()).await? {
None => {
txn.rollback();
return Ok(CancelOutcome::NotFound);
}
Some(bytes) => rmp_serde::from_slice(&bytes)?,
};
let (msg, outcome, remove_from_registry) = match job.status {
JobStatus::Pending | JobStatus::Scheduled => {
let is_scheduled = matches!(job.status, JobStatus::Scheduled);
txn.delete(current_key.as_bytes())?;
txn.delete(index_key.as_bytes())?;
if let Some(ref dk) = job.dedup_key {
txn.delete(dedup_index_key(&job.queue, dk).as_bytes())?;
}
if is_scheduled {
update_stats(&txn, &job.queue, &[(JobStatus::Scheduled, -1)])?;
} else {
update_stats(&txn, &job.queue, &[(JobStatus::Pending, -1)])?;
}
(
"pending/scheduled job cancelled",
CancelOutcome::Removed,
true,
)
}
JobStatus::Claimed => {
if job.cancel_requested {
txn.rollback();
if let Some(token) = self
.claimed_tokens
.lock()
.expect("claimed_tokens mutex poisoned")
.get(id)
.cloned()
{
token.cancel();
}
debug!(job_id = %id, "cancel re-requested on claimed job");
return Ok(CancelOutcome::Requested);
}
job.cancel_requested = true;
let value = rmp_serde::to_vec_named(&job)?;
txn.put(current_key.as_bytes(), &value)?;
(
"claimed job cancellation requested",
CancelOutcome::Requested,
false,
)
}
JobStatus::Done | JobStatus::Dead => {
txn.rollback();
return Ok(CancelOutcome::NotFound);
}
};
match txn.commit().await {
Ok(_) => {
let token = {
let mut guard = self
.claimed_tokens
.lock()
.expect("claimed_tokens mutex poisoned");
if remove_from_registry {
guard.remove(id)
} else {
guard.get(id).cloned()
}
};
if let Some(token) = token {
token.cancel();
}
if matches!(outcome, CancelOutcome::Removed) {
self.completion_notify.notify_waiters();
}
debug!(job_id = %id, "{msg}");
return Ok(outcome);
}
Err(e) if e.kind() == slatedb::ErrorKind::Transaction => continue,
Err(e) => return Err(e.into()),
}
}
}
pub async fn enqueue_batch(&self, queue: &str, payloads: Vec<Vec<u8>>) -> Result<Vec<String>> {
if payloads.is_empty() {
return Ok(Vec::new());
}
let cfg = self.queue_config(queue);
let max_attempts = cfg.max_attempts;
let priority = cfg.default_priority;
let now = now_ms();
let mut ids = Vec::with_capacity(payloads.len());
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
let mut id_gen = ulid::Generator::new();
for payload in payloads {
let id = id_gen
.generate()
.expect("monotonic ULID generator overflowed within one ms")
.to_string();
let job = JobRecord {
id: id.clone(),
queue: queue.to_string(),
payload,
headers: HashMap::new(),
status: JobStatus::Pending,
attempts: 0,
max_attempts,
enqueued_at: now,
claimed_at: None,
lease_expires_at: None,
run_at: None,
priority,
last_error: None,
dedup_key: None,
completed_at: None,
failed_at: None,
cancel_requested: false,
cancel_token: None,
};
let key = pending_key(queue, priority, &id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(key.as_bytes(), &value)?;
txn.put(job_index_key(&id).as_bytes(), key.as_bytes())?;
ids.push(id);
}
update_stats(&txn, queue, &[(JobStatus::Pending, ids.len() as i64)])?;
txn.commit().await?;
self.job_available.notify_waiters();
debug!(queue = queue, count = ids.len(), "batch enqueued");
Ok(ids)
}
pub async fn reap_now(&self) -> Result<()> {
let count = reap_expired(&self.db, &self.completion_notify).await?;
if count > 0 {
self.job_available.notify_waiters();
}
Ok(())
}
pub async fn promote_scheduled_now(&self) -> Result<()> {
let count = promote_due_jobs(&self.db).await?;
if count > 0 {
self.job_available.notify_waiters();
}
Ok(())
}
pub async fn sweep_done_now(&self, retention: Duration) -> Result<()> {
sweep_done(&self.db, retention).await
}
pub async fn sweep_dead_now(&self, retention: Duration) -> Result<()> {
sweep_dead(&self.db, retention).await
}
pub async fn close(self) -> Result<()> {
let _ = self.reaper_shutdown.send(true);
let _ = self.reaper_handle.await;
let _ = self.scheduler_shutdown.send(true);
let _ = self.scheduler_handle.await;
self.db.close().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use slatedb::object_store::memory::InMemory;
fn make_store() -> Arc<dyn ObjectStore> {
Arc::new(InMemory::new())
}
fn no_backoff_opts() -> OpenOptions {
OpenOptions {
default_queue_config: QueueConfig {
retry_backoff_base: Duration::ZERO,
retry_backoff_max: Duration::ZERO,
..QueueConfig::default()
},
..OpenOptions::default()
}
}
#[tokio::test]
async fn test_enqueue_and_claim() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q.enqueue("email", b"hello".to_vec()).await.unwrap();
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.id, id);
assert_eq!(job.queue, "email");
assert_eq!(job.payload, b"hello");
assert_eq!(job.status, JobStatus::Claimed);
assert_eq!(job.attempts, 1);
assert!(job.claimed_at.is_some());
assert!(job.lease_expires_at.is_some());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_claim_empty_queue_returns_none() {
let q = Queue::open(make_store(), "test").await.unwrap();
assert!(
q.claim("email", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_ack_moves_job_to_done() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("email", b"hello".to_vec()).await.unwrap();
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.ack(&job).await.unwrap();
assert!(
q.claim("email", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_nack_requeues_job() {
let q = Queue::open_with_options(make_store(), "test", no_backoff_opts())
.await
.unwrap();
q.enqueue_with(
"email",
b"hello".to_vec(),
EnqueueOptions {
max_attempts: Some(3),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.attempts, 1);
q.nack(job, "transient error").await.unwrap();
let retried = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(retried.attempts, 2);
assert_eq!(retried.last_error.as_deref(), Some("transient error"));
assert_eq!(retried.status, JobStatus::Claimed);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_nack_dead_letters_after_max_attempts() {
let q = Queue::open_with_options(make_store(), "test", no_backoff_opts())
.await
.unwrap();
q.enqueue_with(
"email",
b"hello".to_vec(),
EnqueueOptions {
max_attempts: Some(2),
..Default::default()
},
)
.await
.unwrap();
for _ in 0..2 {
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "persistent error").await.unwrap();
}
assert!(
q.claim("email", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_fifo_ordering() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id_a = q.enqueue("work", b"first".to_vec()).await.unwrap();
let id_b = q.enqueue("work", b"second".to_vec()).await.unwrap();
let id_c = q.enqueue("work", b"third".to_vec()).await.unwrap();
let j1 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j2 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j3 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j1.id, id_a);
assert_eq!(j2.id, id_b);
assert_eq!(j3.id, id_c);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_queue_isolation() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id_email = q.enqueue("email", b"email job".to_vec()).await.unwrap();
let id_resize = q.enqueue("resize", b"resize job".to_vec()).await.unwrap();
let email_job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let resize_job = q
.claim("resize", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(email_job.id, id_email);
assert_eq!(resize_job.id, id_resize);
assert!(
q.claim("email", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
assert!(
q.claim("resize", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_reaper_requeues_expired_job() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(3),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_millis(0))
.await
.unwrap()
.unwrap();
assert_eq!(job.attempts, 1);
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.reap_now().await.unwrap();
let reclaimed = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(reclaimed.id, job.id);
assert_eq!(reclaimed.attempts, 2);
assert_eq!(reclaimed.status, JobStatus::Claimed);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_reaper_dead_letters_after_max_attempts() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(2),
..Default::default()
},
)
.await
.unwrap();
let _job = q
.claim("work", Duration::from_millis(0))
.await
.unwrap()
.unwrap();
q.reap_now().await.unwrap();
let _job = q
.claim("work", Duration::from_millis(0))
.await
.unwrap()
.unwrap();
q.reap_now().await.unwrap();
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_reaper_skips_active_leases() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(300))
.await
.unwrap()
.unwrap();
q.reap_now().await.unwrap();
assert!(
q.claim("work", Duration::from_secs(300))
.await
.unwrap()
.is_none()
);
q.ack(&job).await.unwrap();
q.close().await.unwrap();
}
#[tokio::test]
async fn test_reaper_ignores_already_acked_job() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_millis(0))
.await
.unwrap()
.unwrap();
q.ack(&job).await.unwrap();
q.reap_now().await.unwrap();
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_stats_track_job_lifecycle() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("email", b"a".to_vec()).await.unwrap();
q.enqueue("email", b"b".to_vec()).await.unwrap();
let s = q.stats("email").await.unwrap();
assert_eq!(s.pending, 2);
assert_eq!(s.claimed, 0);
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let s = q.stats("email").await.unwrap();
assert_eq!(s.pending, 1);
assert_eq!(s.claimed, 1);
q.ack(&job).await.unwrap();
let s = q.stats("email").await.unwrap();
assert_eq!(s.pending, 1);
assert_eq!(s.claimed, 0);
assert_eq!(s.done, 1);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_stats_nack_dead_letter() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"email",
b"x".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "fail").await.unwrap();
let s = q.stats("email").await.unwrap();
assert_eq!(s.pending, 0);
assert_eq!(s.claimed, 0);
assert_eq!(s.dead, 1);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_list_queues() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("alpha", b"1".to_vec()).await.unwrap();
q.enqueue("beta", b"2".to_vec()).await.unwrap();
q.enqueue("gamma", b"3".to_vec()).await.unwrap();
let mut queues = q.list_queues().await.unwrap();
queues.sort();
assert_eq!(queues, vec!["alpha", "beta", "gamma"]);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_dead_jobs_and_requeue() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "fatal").await.unwrap();
let dead = q.dead_jobs("work", None, 100).await.unwrap();
assert_eq!(dead.len(), 1);
assert_eq!(dead[0].id, id);
assert_eq!(dead[0].status, JobStatus::Dead);
q.requeue_dead_job(dead.into_iter().next().unwrap())
.await
.unwrap();
let revived = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(revived.id, id);
assert_eq!(revived.attempts, 1); assert!(revived.last_error.is_none());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_per_queue_config() {
let mut opts = OpenOptions::default();
opts.queue_configs.insert(
"fast".to_string(),
QueueConfig {
max_attempts: 1,
lease_duration: Duration::from_secs(5),
..QueueConfig::default()
},
);
let q = Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap();
q.enqueue("fast", b"x".to_vec()).await.unwrap();
let job = q.claim_next("fast").await.unwrap().unwrap();
assert_eq!(job.max_attempts, 1);
let lease_expires_at = job.lease_expires_at.unwrap();
let claimed_at = job.claimed_at.unwrap();
assert!(lease_expires_at - claimed_at <= 5_001);
q.ack(&job).await.unwrap();
q.close().await.unwrap();
}
#[tokio::test]
async fn test_priority_ordering() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id_low = q
.enqueue_with(
"jobs",
b"low".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_LOW),
..Default::default()
},
)
.await
.unwrap();
let id_normal = q
.enqueue_with(
"jobs",
b"normal".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_NORMAL),
..Default::default()
},
)
.await
.unwrap();
let id_high = q
.enqueue_with(
"jobs",
b"high".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_HIGH),
..Default::default()
},
)
.await
.unwrap();
let j1 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j2 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j3 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j1.id, id_high);
assert_eq!(j2.id, id_normal);
assert_eq!(j3.id, id_low);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_priority_fifo_within_same_priority() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id_first = q
.enqueue_with(
"jobs",
b"first".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_NORMAL),
..Default::default()
},
)
.await
.unwrap();
let id_second = q
.enqueue_with(
"jobs",
b"second".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_NORMAL),
..Default::default()
},
)
.await
.unwrap();
let j1 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j2 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j1.id, id_first);
assert_eq!(j2.id, id_second);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_priority_preserved_after_nack() {
let q = Queue::open_with_options(make_store(), "test", no_backoff_opts())
.await
.unwrap();
let id_high = q
.enqueue_with(
"jobs",
b"high".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_HIGH),
..Default::default()
},
)
.await
.unwrap();
let _id_normal = q
.enqueue_with(
"jobs",
b"normal".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_NORMAL),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.id, id_high);
q.nack(job, "retry me").await.unwrap();
let reclaimed = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(reclaimed.id, id_high);
assert_eq!(reclaimed.priority, PRIORITY_HIGH);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_priority_stored_on_job_record() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"jobs",
b"x".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_HIGH),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.priority, PRIORITY_HIGH);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_at_future_not_immediately_claimable() {
let q = Queue::open(make_store(), "test").await.unwrap();
let run_at = std::time::SystemTime::now() + Duration::from_secs(3600);
q.enqueue_with(
"jobs",
b"future".to_vec(),
EnqueueOptions {
run_at: Some(run_at),
..Default::default()
},
)
.await
.unwrap();
assert!(
q.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
let s = q.stats("jobs").await.unwrap();
assert_eq!(s.scheduled, 1);
assert_eq!(s.pending, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_at_past_is_immediately_pending() {
let q = Queue::open(make_store(), "test").await.unwrap();
let run_at = std::time::SystemTime::now() - Duration::from_secs(1);
q.enqueue_with(
"jobs",
b"past".to_vec(),
EnqueueOptions {
run_at: Some(run_at),
..Default::default()
},
)
.await
.unwrap();
let job = q.claim("jobs", Duration::from_secs(30)).await.unwrap();
assert!(job.is_some());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_promote_scheduled_now() {
let q = Queue::open(make_store(), "test").await.unwrap();
let run_at = std::time::SystemTime::now() + Duration::from_millis(1);
let id = q
.enqueue_with(
"jobs",
b"soon".to_vec(),
EnqueueOptions {
run_at: Some(run_at),
..Default::default()
},
)
.await
.unwrap();
assert!(
q.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
tokio::time::sleep(Duration::from_millis(5)).await;
q.promote_scheduled_now().await.unwrap();
let s = q.stats("jobs").await.unwrap();
assert_eq!(s.scheduled, 0);
assert_eq!(s.pending, 1);
let job = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.id, id);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_in_convenience() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"jobs",
b"delayed".to_vec(),
EnqueueOptions {
run_at: Some(std::time::SystemTime::now() + Duration::from_secs(3600)),
..Default::default()
},
)
.await
.unwrap();
let s = q.stats("jobs").await.unwrap();
assert_eq!(s.scheduled, 1);
assert_eq!(s.pending, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_scheduled_job_preserves_priority() {
let q = Queue::open(make_store(), "test").await.unwrap();
let run_at = std::time::SystemTime::now() + Duration::from_millis(1);
q.enqueue_with(
"jobs",
b"normal".to_vec(),
EnqueueOptions {
run_at: Some(run_at),
..Default::default()
},
)
.await
.unwrap();
q.enqueue_with(
"jobs",
b"high".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_HIGH),
..Default::default()
},
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(5)).await;
q.promote_scheduled_now().await.unwrap();
let j1 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j1.payload, b"high");
let j2 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j2.payload, b"normal");
q.close().await.unwrap();
}
#[tokio::test]
async fn test_dead_letter_skips_attempts_check() {
let q = Queue::open_with_options(
make_store(),
"test",
OpenOptions {
queue_configs: HashMap::from([(
"work".to_string(),
QueueConfig {
max_attempts: 5,
..QueueConfig::default()
},
)]),
..OpenOptions::default()
},
)
.await
.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let claimed = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(claimed.attempts, 1);
q.dead_letter(claimed, "permanent failure").await.unwrap();
let job = q.get_job(&id).await.unwrap().unwrap();
assert_eq!(job.status, JobStatus::Dead);
assert_eq!(job.attempts, 1, "attempts should not be incremented");
assert_eq!(job.last_error.as_deref(), Some("permanent failure"));
assert!(job.failed_at.is_some());
let stats = q.stats("work").await.unwrap();
assert_eq!(stats.dead, 1);
assert_eq!(stats.claimed, 0);
}
#[tokio::test]
async fn test_run_worker_dead_letters_on_permanent_failure() {
use crate::worker::{PermanentFailure, Worker, WorkerError, run_worker};
struct PermanentFailWorker;
impl Worker for PermanentFailWorker {
async fn process(&self, _job: &JobRecord) -> std::result::Result<(), WorkerError> {
Err(PermanentFailure::new("HTTP 410 Gone").into())
}
}
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let q2 = q.clone();
let handle = tokio::spawn(async move {
run_worker(
&q2,
"work",
&PermanentFailWorker,
Duration::from_millis(10),
async move {
let _ = shutdown_rx.await;
},
)
.await
});
loop {
let s = q.stats("work").await.unwrap();
if s.dead > 0 {
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
let _ = shutdown_tx.send(());
let _ = handle.await;
let job = q.get_job(&id).await.unwrap().unwrap();
assert_eq!(job.status, JobStatus::Dead);
assert_eq!(
job.attempts, 1,
"PermanentFailure should not consume retries"
);
assert_eq!(job.last_error.as_deref(), Some("HTTP 410 Gone"));
}
#[tokio::test]
async fn test_worker_trait() {
use crate::worker::{Worker, WorkerError, run_worker};
struct EchoWorker;
impl Worker for EchoWorker {
async fn process(&self, _job: &JobRecord) -> std::result::Result<(), WorkerError> {
Ok(())
}
}
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
q.enqueue("work", b"hello".to_vec()).await.unwrap();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let q2 = q.clone();
let handle = tokio::spawn(async move {
run_worker(
&q2,
"work",
&EchoWorker,
Duration::from_millis(10),
async move {
let _ = shutdown_rx.await;
},
)
.await
});
loop {
let s = q.stats("work").await.unwrap();
if s.pending == 0 && s.claimed == 0 {
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
let _ = shutdown_tx.send(());
let _ = handle.await;
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn test_get_job_tracks_lifecycle() {
let opts = OpenOptions {
keep_done_jobs: Some(Duration::from_secs(60)),
..OpenOptions::default()
};
let q = Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q.get_job(&id).await.unwrap().unwrap();
assert_eq!(job.status, JobStatus::Pending);
let claimed = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let job = q.get_job(&id).await.unwrap().unwrap();
assert_eq!(job.status, JobStatus::Claimed);
q.ack(&claimed).await.unwrap();
let job = q.get_job(&id).await.unwrap().unwrap();
assert_eq!(job.status, JobStatus::Done);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_ack_deletes_job_by_default() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.ack(&job).await.unwrap();
assert!(
q.get_job(&id).await.unwrap().is_none(),
"ack must drop the index by default"
);
let s = q.stats("work").await.unwrap();
assert_eq!(s.done, 1, "done counter still tracks throughput");
assert_eq!(s.pending, 0);
assert_eq!(s.claimed, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_done_retention_sweeps_old_jobs() {
let opts = OpenOptions {
keep_done_jobs: Some(Duration::from_millis(20)),
..OpenOptions::default()
};
let q = Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.ack(&job).await.unwrap();
assert!(q.get_job(&id).await.unwrap().is_some());
tokio::time::sleep(Duration::from_millis(30)).await;
q.sweep_done_now(Duration::from_millis(20)).await.unwrap();
assert!(
q.get_job(&id).await.unwrap().is_none(),
"retention sweep must purge expired done jobs"
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_done_retention_uses_completion_time_not_enqueue_time() {
let opts = OpenOptions {
keep_done_jobs: Some(Duration::from_millis(500)),
..OpenOptions::default()
};
let q = Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap();
let id = q
.enqueue_with(
"work",
b"weekly".to_vec(),
EnqueueOptions {
run_at: Some(std::time::SystemTime::now() + Duration::from_millis(200)),
..Default::default()
},
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(220)).await;
q.promote_scheduled_now().await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let elapsed_since_enqueue = now_ms().saturating_sub(job.enqueued_at);
assert!(
elapsed_since_enqueue > 200,
"enqueued_at should be well over 200ms old (was {elapsed_since_enqueue}ms)"
);
q.ack(&job).await.unwrap();
q.sweep_done_now(Duration::from_millis(500)).await.unwrap();
let kept = q.get_job(&id).await.unwrap().expect(
"fresh completion must survive the sweep regardless of how long ago the job was enqueued",
);
assert!(
kept.completed_at.is_some(),
"ack must stamp completed_at when keep_done_jobs is set"
);
tokio::time::sleep(Duration::from_millis(550)).await;
q.sweep_done_now(Duration::from_millis(500)).await.unwrap();
assert!(q.get_job(&id).await.unwrap().is_none());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_dead_retention_sweep_boundary() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let id = job.id.clone();
q.nack(job, "fatal").await.unwrap();
let dead = q.dead_jobs("work", None, 100).await.unwrap();
assert_eq!(dead.len(), 1);
assert!(dead[0].failed_at.is_some(), "failed_at must be stamped");
assert_eq!(q.stats("work").await.unwrap().dead, 1);
q.sweep_dead_now(Duration::from_secs(3600)).await.unwrap();
assert_eq!(q.dead_jobs("work", None, 100).await.unwrap().len(), 1);
tokio::time::sleep(Duration::from_millis(30)).await;
q.sweep_dead_now(Duration::from_millis(20)).await.unwrap();
assert!(q.dead_jobs("work", None, 100).await.unwrap().is_empty());
assert_eq!(
q.stats("work").await.unwrap().dead,
0,
"dead counter must reflect the sweep"
);
assert!(q.get_job(&id).await.unwrap().is_none());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_requeue_dead_resets_failed_at() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "fatal").await.unwrap();
let dead = q.dead_jobs("work", None, 100).await.unwrap().pop().unwrap();
assert!(dead.failed_at.is_some());
q.requeue_dead_job(dead).await.unwrap();
let pending = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert!(
pending.failed_at.is_none(),
"requeue must clear failed_at so a re-fail starts a fresh retention window"
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_get_job_returns_none_for_unknown_id() {
let q = Queue::open(make_store(), "test").await.unwrap();
assert!(q.get_job("nonexistent").await.unwrap().is_none());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_get_job_after_nack_to_dead() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"x".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let id = job.id.clone();
q.nack(job, "fatal").await.unwrap();
let dead = q.get_job(&id).await.unwrap().unwrap();
assert_eq!(dead.status, JobStatus::Dead);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_renew_lease() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let mut job = q
.claim("work", Duration::from_millis(1))
.await
.unwrap()
.unwrap();
let original_expiry = job.lease_expires_at.unwrap();
q.renew_lease(&mut job, Duration::from_secs(30))
.await
.unwrap();
let new_expiry = job.lease_expires_at.unwrap();
assert!(new_expiry > original_expiry, "renewed expiry must be later");
q.reap_now().await.unwrap();
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
let fetched = q.get_job(&job.id).await.unwrap().unwrap();
assert_eq!(fetched.status, JobStatus::Claimed);
assert_eq!(fetched.lease_expires_at.unwrap(), new_expiry);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_cancel_pending_job() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
assert_eq!(q.cancel(&id).await.unwrap(), CancelOutcome::Removed);
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
assert!(q.get_job(&id).await.unwrap().is_none());
assert_eq!(q.stats("work").await.unwrap().pending, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_cancel_scheduled_job() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
run_at: Some(std::time::SystemTime::now() + Duration::from_secs(3600)),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(q.stats("work").await.unwrap().scheduled, 1);
assert_eq!(q.cancel(&id).await.unwrap(), CancelOutcome::Removed);
assert_eq!(q.stats("work").await.unwrap().scheduled, 0);
assert!(q.get_job(&id).await.unwrap().is_none());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_cancel_claimed_job_fires_token() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let token = job.cancel_token.clone().expect("claim returned a token");
assert!(!token.is_cancelled());
assert_eq!(q.cancel(&job.id).await.unwrap(), CancelOutcome::Requested);
assert!(token.is_cancelled());
q.ack(&job).await.unwrap();
q.close().await.unwrap();
}
#[tokio::test]
async fn test_cancel_terminal_job_is_not_found() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.ack(&job).await.unwrap();
assert_eq!(q.cancel(&id).await.unwrap(), CancelOutcome::NotFound);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_cancel_persists_across_reaper_requeue() {
let opts = OpenOptions {
reaper_interval: Duration::from_secs(3600),
..no_backoff_opts()
};
let q = Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap();
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job1 = q
.claim("work", Duration::from_millis(50))
.await
.unwrap()
.unwrap();
let first_token = job1.cancel_token.clone().unwrap();
assert_eq!(q.cancel(&job1.id).await.unwrap(), CancelOutcome::Requested,);
assert!(first_token.is_cancelled());
assert!(
q.get_job(&job1.id).await.unwrap().unwrap().cancel_requested,
"cancel_requested must persist on the claimed record",
);
tokio::time::sleep(Duration::from_millis(100)).await;
q.reap_now().await.unwrap();
let job2 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job1.id, job2.id);
assert!(job2.cancel_requested);
let second_token = job2
.cancel_token
.clone()
.expect("re-claim returned a token");
assert!(
second_token.is_cancelled(),
"re-claim should surface a pre-cancelled token",
);
q.ack(&job2).await.unwrap();
q.close().await.unwrap();
}
#[tokio::test]
async fn test_cancel_token_used_in_worker_select() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let token = job.cancel_token.clone().unwrap();
assert_eq!(q.cancel(&id).await.unwrap(), CancelOutcome::Requested);
let took_path = tokio::select! {
biased;
_ = token.cancelled() => "cancelled",
_ = tokio::time::sleep(Duration::from_secs(5)) => "slept",
};
assert_eq!(took_path, "cancelled");
q.ack(&job).await.unwrap();
q.close().await.unwrap();
}
#[tokio::test]
async fn test_wait_for_completion_unknown_id_is_not_found() {
let q = Queue::open(make_store(), "test").await.unwrap();
let outcome = q
.wait_for_completion("does-not-exist", Duration::from_millis(50))
.await
.unwrap();
assert!(matches!(outcome, WaitOutcome::NotFound), "{outcome:?}");
q.close().await.unwrap();
}
#[tokio::test]
async fn test_wait_for_completion_pending_times_out() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let outcome = q
.wait_for_completion(&id, Duration::from_millis(100))
.await
.unwrap();
assert!(matches!(outcome, WaitOutcome::TimedOut), "{outcome:?}");
q.close().await.unwrap();
}
#[tokio::test]
async fn test_wait_for_completion_wakes_on_ack() {
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let waiter_q = q.clone();
let waiter_id = id.clone();
let waiter = tokio::spawn(async move {
waiter_q
.wait_for_completion(&waiter_id, Duration::from_secs(5))
.await
.unwrap()
});
tokio::time::sleep(Duration::from_millis(50)).await;
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.ack(&job).await.unwrap();
assert!(
matches!(waiter.await.unwrap(), WaitOutcome::Completed(None)),
"expected Completed(None) on default ack",
);
assert!(q.get_job(&id).await.unwrap().is_none());
}
#[tokio::test]
async fn test_wait_for_completion_with_kept_done_jobs() {
let opts = OpenOptions {
keep_done_jobs: Some(Duration::from_secs(60)),
..no_backoff_opts()
};
let q = Arc::new(
Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap(),
);
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let waiter_q = q.clone();
let waiter_id = id.clone();
let waiter = tokio::spawn(async move {
waiter_q
.wait_for_completion(&waiter_id, Duration::from_secs(5))
.await
.unwrap()
});
tokio::time::sleep(Duration::from_millis(50)).await;
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.ack(&job).await.unwrap();
match waiter.await.unwrap() {
WaitOutcome::Completed(Some(record)) => {
assert_eq!(record.id, id);
assert_eq!(record.status, JobStatus::Done);
}
other => panic!("expected Completed(Some(Done)), got {other:?}"),
}
}
#[tokio::test]
async fn test_wait_for_completion_wakes_on_dead_letter() {
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let waiter_q = q.clone();
let waiter_id = id.clone();
let waiter = tokio::spawn(async move {
waiter_q
.wait_for_completion(&waiter_id, Duration::from_secs(5))
.await
.unwrap()
});
tokio::time::sleep(Duration::from_millis(50)).await;
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.dead_letter(job, "permanent").await.unwrap();
match waiter.await.unwrap() {
WaitOutcome::Completed(Some(record)) => {
assert_eq!(record.id, id);
assert_eq!(record.status, JobStatus::Dead);
assert_eq!(record.last_error.as_deref(), Some("permanent"));
}
other => panic!("expected Completed(Some(Dead)), got {other:?}"),
}
}
#[tokio::test]
async fn test_wait_for_completion_wakes_on_cancel_removed() {
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let waiter_q = q.clone();
let waiter_id = id.clone();
let waiter = tokio::spawn(async move {
waiter_q
.wait_for_completion(&waiter_id, Duration::from_secs(5))
.await
.unwrap()
});
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(q.cancel(&id).await.unwrap(), CancelOutcome::Removed);
assert!(
matches!(waiter.await.unwrap(), WaitOutcome::Completed(None)),
"expected Completed(None) after Pending cancel",
);
assert!(q.get_job(&id).await.unwrap().is_none());
}
#[tokio::test]
async fn test_wait_for_completion_does_not_wake_on_cancel_requested() {
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let id = job.id.clone();
let waiter_q = q.clone();
let waiter_id = id.clone();
let waiter = tokio::spawn(async move {
waiter_q
.wait_for_completion(&waiter_id, Duration::from_millis(200))
.await
.unwrap()
});
tokio::time::sleep(Duration::from_millis(50)).await;
assert_eq!(q.cancel(&id).await.unwrap(), CancelOutcome::Requested);
assert!(
matches!(waiter.await.unwrap(), WaitOutcome::TimedOut),
"claimed cancel should not wake the completion waiter",
);
q.ack(&job).await.unwrap();
}
#[tokio::test]
async fn test_wait_for_completion_returns_immediately_when_already_terminal() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let id = job.id.clone();
q.dead_letter(job, "permanent").await.unwrap();
match q
.wait_for_completion(&id, Duration::from_millis(0))
.await
.unwrap()
{
WaitOutcome::Completed(Some(record)) => {
assert_eq!(record.id, id);
assert_eq!(record.status, JobStatus::Dead);
}
other => panic!("expected Completed(Some(Dead)), got {other:?}"),
}
q.close().await.unwrap();
}
#[tokio::test]
async fn test_wait_for_completion_fan_out_to_multiple_waiters() {
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let mut waiters = Vec::new();
for _ in 0..4 {
let q = q.clone();
let id = id.clone();
waiters.push(tokio::spawn(async move {
q.wait_for_completion(&id, Duration::from_secs(5))
.await
.unwrap()
}));
}
tokio::time::sleep(Duration::from_millis(50)).await;
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.dead_letter(job, "permanent").await.unwrap();
for waiter in waiters {
match waiter.await.unwrap() {
WaitOutcome::Completed(Some(record)) => {
assert_eq!(record.id, id);
assert_eq!(record.status, JobStatus::Dead);
}
other => panic!("waiter saw {other:?}, expected Completed(Some(Dead))"),
}
}
}
#[tokio::test]
async fn test_wait_for_completion_wakes_on_reaper_dead_letter() {
let opts = OpenOptions {
reaper_interval: Duration::from_secs(3600),
default_queue_config: QueueConfig {
max_attempts: 1,
retry_backoff_base: Duration::ZERO,
retry_backoff_max: Duration::ZERO,
..QueueConfig::default()
},
..OpenOptions::default()
};
let q = Arc::new(
Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap(),
);
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_millis(10))
.await
.unwrap()
.unwrap();
let id = job.id.clone();
drop(job);
let waiter_q = q.clone();
let waiter_id = id.clone();
let waiter = tokio::spawn(async move {
waiter_q
.wait_for_completion(&waiter_id, Duration::from_secs(5))
.await
.unwrap()
});
tokio::time::sleep(Duration::from_millis(50)).await;
q.reap_now().await.unwrap();
match waiter.await.unwrap() {
WaitOutcome::Completed(Some(record)) => {
assert_eq!(record.id, id);
assert_eq!(record.status, JobStatus::Dead);
assert_eq!(record.last_error.as_deref(), Some("lease expired"));
}
other => panic!("expected Completed(Some(Dead)), got {other:?}"),
}
}
#[tokio::test]
async fn test_cancel_nonexistent_is_not_found() {
let q = Queue::open(make_store(), "test").await.unwrap();
assert_eq!(
q.cancel("does-not-exist").await.unwrap(),
CancelOutcome::NotFound,
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_batch_atomic() {
let q = Queue::open(make_store(), "test").await.unwrap();
let payloads = vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()];
let ids = q.enqueue_batch("work", payloads).await.unwrap();
assert_eq!(ids.len(), 3);
let s = q.stats("work").await.unwrap();
assert_eq!(s.pending, 3);
let j1 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j2 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j3 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j1.id, ids[0]);
assert_eq!(j2.id, ids[1]);
assert_eq!(j3.id, ids[2]);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_batch_empty_is_noop() {
let q = Queue::open(make_store(), "test").await.unwrap();
let ids = q.enqueue_batch("work", vec![]).await.unwrap();
assert!(ids.is_empty());
assert_eq!(q.stats("work").await.unwrap().pending, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_unique_deduplicates() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id1 = q
.enqueue_with(
"work",
b"first".to_vec(),
EnqueueOptions {
dedup_key: Some("my-key".to_string()),
..Default::default()
},
)
.await
.unwrap();
let id2 = q
.enqueue_with(
"work",
b"second".to_vec(),
EnqueueOptions {
dedup_key: Some("my-key".to_string()),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(id1, id2);
assert_eq!(q.stats("work").await.unwrap().pending, 1);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_unique_allows_reenqueue_after_claim() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id1 = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
dedup_key: Some("my-key".to_string()),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.id, id1);
let id2 = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
dedup_key: Some("my-key".to_string()),
..Default::default()
},
)
.await
.unwrap();
assert_ne!(id1, id2);
assert_eq!(q.stats("work").await.unwrap().pending, 1);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_unique_nack_then_reenqueue_does_not_corrupt_dedup() {
let q = Queue::open_with_options(make_store(), "test", no_backoff_opts())
.await
.unwrap();
let id1 = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
dedup_key: Some("user-42".to_string()),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert!(job.dedup_key.is_none());
q.nack(job, "transient").await.unwrap();
let id2 = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
dedup_key: Some("user-42".to_string()),
..Default::default()
},
)
.await
.unwrap();
assert_ne!(id1, id2);
let j1 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let id3 = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
dedup_key: Some("user-42".to_string()),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(
id3, id2,
"id2's dedup index must still block the third enqueue while id2 is pending"
);
q.ack(&j1).await.unwrap();
let j2 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.ack(&j2).await.unwrap();
assert_eq!(q.stats("work").await.unwrap().pending, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_nack_with_backoff_parks_in_scheduled() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(3),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "transient").await.unwrap();
let s = q.stats("work").await.unwrap();
assert_eq!(s.pending, 0, "must not be pending immediately");
assert_eq!(s.claimed, 0);
assert_eq!(s.scheduled, 1, "must be parked in scheduled");
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_nack_backoff_promoted_after_run_at() {
let opts = OpenOptions {
default_queue_config: QueueConfig {
retry_backoff_base: Duration::from_millis(10),
retry_backoff_max: Duration::from_millis(10),
..QueueConfig::default()
},
..OpenOptions::default()
};
let q = Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(5),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let id = job.id.clone();
q.nack(job, "boom").await.unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
q.promote_scheduled_now().await.unwrap();
let retried = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(retried.id, id);
assert_eq!(retried.attempts, 2);
assert_eq!(retried.last_error.as_deref(), Some("boom"));
q.close().await.unwrap();
}
#[tokio::test]
async fn test_backoff_delay_calculation() {
let base = Duration::from_secs(1);
let max = Duration::from_secs(60);
assert_eq!(backoff_delay(1, base, max), Duration::from_secs(1));
assert_eq!(backoff_delay(2, base, max), Duration::from_secs(2));
assert_eq!(backoff_delay(3, base, max), Duration::from_secs(4));
assert_eq!(backoff_delay(4, base, max), Duration::from_secs(8));
assert_eq!(backoff_delay(20, base, max), max);
assert_eq!(
backoff_delay(5, Duration::ZERO, Duration::from_secs(10)),
Duration::ZERO
);
}
#[tokio::test]
async fn test_dead_jobs_pagination() {
let q = Queue::open(make_store(), "test").await.unwrap();
let mut ids = Vec::new();
for _ in 0..5 {
let id = q
.enqueue_with(
"work",
b"x".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "fail").await.unwrap();
ids.push(id);
}
let p1 = q.dead_jobs("work", None, 2).await.unwrap();
assert_eq!(p1.len(), 2);
assert_eq!(p1[0].id, ids[0]);
assert_eq!(p1[1].id, ids[1]);
let p2 = q.dead_jobs("work", Some(&p1[1].id), 2).await.unwrap();
assert_eq!(p2.len(), 2);
assert_eq!(p2[0].id, ids[2]);
assert_eq!(p2[1].id, ids[3]);
let p3 = q.dead_jobs("work", Some(&p2[1].id), 2).await.unwrap();
assert_eq!(p3.len(), 1);
assert_eq!(p3[0].id, ids[4]);
assert!(q.dead_jobs("work", None, 0).await.unwrap().is_empty());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_worker_finishes_in_flight_job_on_shutdown() {
use crate::worker::{Worker, WorkerError, run_worker};
use std::sync::atomic::{AtomicBool, Ordering};
struct SlowWorker {
finished: Arc<AtomicBool>,
}
impl Worker for SlowWorker {
async fn process(&self, _job: &JobRecord) -> std::result::Result<(), WorkerError> {
tokio::time::sleep(Duration::from_millis(100)).await;
self.finished.store(true, Ordering::SeqCst);
Ok(())
}
}
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
q.enqueue("work", b"x".to_vec()).await.unwrap();
let finished = Arc::new(AtomicBool::new(false));
let worker = SlowWorker {
finished: finished.clone(),
};
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let q2 = q.clone();
let handle = tokio::spawn(async move {
run_worker(
&q2,
"work",
&worker,
Duration::from_millis(50),
async move {
let _ = shutdown_rx.await;
},
)
.await
});
loop {
if q.stats("work").await.unwrap().claimed == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
let _ = shutdown_tx.send(());
let _ = handle.await;
assert!(
finished.load(Ordering::SeqCst),
"in-flight job must finish before shutdown returns"
);
assert_eq!(q.stats("work").await.unwrap().claimed, 0);
assert_eq!(q.stats("work").await.unwrap().done, 1);
}
#[tokio::test]
async fn test_claim_with_wait_wakes_or_times_out() {
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
let timed_out = q
.claim_with_wait("work", Duration::from_secs(30), Duration::from_millis(50))
.await
.unwrap();
assert!(timed_out.is_none());
let q2 = q.clone();
let waiter = tokio::spawn(async move {
let start = std::time::Instant::now();
let job = q2
.claim_with_wait("work", Duration::from_secs(30), Duration::from_secs(10))
.await
.unwrap();
(start.elapsed(), job)
});
tokio::time::sleep(Duration::from_millis(20)).await;
q.enqueue("work", b"hello".to_vec()).await.unwrap();
let (elapsed, job) = waiter.await.unwrap();
assert!(job.is_some(), "claim_with_wait must wake on enqueue");
assert!(
elapsed < Duration::from_millis(500),
"expected fast wake; took {elapsed:?}"
);
}
#[tokio::test]
async fn test_concurrent_worker() {
use crate::worker::{Worker, WorkerError, run_worker_concurrent};
struct EchoWorker;
impl Worker for EchoWorker {
async fn process(&self, _job: &JobRecord) -> std::result::Result<(), WorkerError> {
tokio::time::sleep(Duration::from_millis(5)).await;
Ok(())
}
}
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
let ids = q
.enqueue_batch(
"work",
vec![
b"a".to_vec(),
b"b".to_vec(),
b"c".to_vec(),
b"d".to_vec(),
b"e".to_vec(),
],
)
.await
.unwrap();
assert_eq!(ids.len(), 5);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let q2 = q.clone();
let handle = tokio::spawn(async move {
run_worker_concurrent(
&q2,
"work",
Arc::new(EchoWorker),
3,
Duration::from_millis(10),
async move {
let _ = shutdown_rx.await;
},
)
.await
});
loop {
let s = q.stats("work").await.unwrap();
if s.pending == 0 && s.claimed == 0 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let _ = shutdown_tx.send(());
let _ = handle.await;
assert_eq!(q.stats("work").await.unwrap().done, 5);
}
#[tokio::test]
async fn test_enqueue_with_kv_new_writes_apply() {
let q = Queue::open(make_store(), "test").await.unwrap();
let mut kv = HashMap::new();
kv.insert(b"runs/abc".to_vec(), b"submitted".to_vec());
let outcome = q
.enqueue_with_kv("work", b"payload".to_vec(), EnqueueOptions::default(), kv)
.await
.unwrap();
let id = match outcome {
EnqueueResult::New(id) => id,
other => panic!("expected New, got {other:?}"),
};
let s = q.stats("work").await.unwrap();
assert_eq!(s.pending, 1);
let claimed = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(claimed.id, id);
assert_eq!(claimed.payload, b"payload");
let v = q.kv_get(b"runs/abc").await.unwrap();
assert_eq!(v.as_deref(), Some(b"submitted".as_slice()));
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_with_kv_dedup_hit_skips_kv_writes() {
let q = Queue::open(make_store(), "test").await.unwrap();
let first_outcome = q
.enqueue_with_kv(
"work",
b"first".to_vec(),
EnqueueOptions {
dedup_key: Some("run-abc".into()),
..Default::default()
},
HashMap::from([(b"runs/abc".to_vec(), b"first-record".to_vec())]),
)
.await
.unwrap();
let first_id = match first_outcome {
EnqueueResult::New(id) => id,
other => panic!("expected New, got {other:?}"),
};
let second_outcome = q
.enqueue_with_kv(
"work",
b"second".to_vec(),
EnqueueOptions {
dedup_key: Some("run-abc".into()),
..Default::default()
},
HashMap::from([(b"runs/abc".to_vec(), b"second-record".to_vec())]),
)
.await
.unwrap();
match second_outcome {
EnqueueResult::AlreadyEnqueued(id) => assert_eq!(id, first_id),
other => panic!("expected AlreadyEnqueued, got {other:?}"),
}
let s = q.stats("work").await.unwrap();
assert_eq!(s.pending, 1);
let v = q.kv_get(b"runs/abc").await.unwrap();
assert_eq!(v.as_deref(), Some(b"first-record".as_slice()));
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_with_kv_rejects_oversized_value() {
let q = Queue::open(make_store(), "test").await.unwrap();
let oversized = vec![0u8; MAX_KV_VALUE_SIZE + 1];
let err = q
.enqueue_with_kv(
"work",
b"x".to_vec(),
EnqueueOptions::default(),
HashMap::from([(b"big".to_vec(), oversized)]),
)
.await
.unwrap_err();
match err {
Error::KvValueTooLarge { size, max } => {
assert_eq!(size, MAX_KV_VALUE_SIZE + 1);
assert_eq!(max, MAX_KV_VALUE_SIZE);
}
other => panic!("expected KvValueTooLarge, got {other:?}"),
}
assert_eq!(q.stats("work").await.unwrap().pending, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_kv_keys_cannot_collide_with_internal_layout() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("work", b"payload".to_vec()).await.unwrap();
q.enqueue_with_kv(
"other",
b"sentinel".to_vec(),
EnqueueOptions::default(),
HashMap::from([(
b"pending:work:0000000001:fake-id".to_vec(),
b"trickery".to_vec(),
)]),
)
.await
.unwrap();
let s = q.stats("work").await.unwrap();
assert_eq!(s.pending, 1);
let claimed = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(claimed.payload, b"payload");
let v = q.kv_get(b"pending:work:0000000001:fake-id").await.unwrap();
assert_eq!(v.as_deref(), Some(b"trickery".as_slice()));
q.close().await.unwrap();
}
#[tokio::test]
async fn test_kv_delete_removes_value() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with_kv(
"work",
b"x".to_vec(),
EnqueueOptions::default(),
HashMap::from([(b"runs/xyz".to_vec(), b"active".to_vec())]),
)
.await
.unwrap();
assert_eq!(
q.kv_get(b"runs/xyz").await.unwrap().as_deref(),
Some(b"active".as_slice())
);
q.kv_delete(b"runs/xyz").await.unwrap();
assert!(q.kv_get(b"runs/xyz").await.unwrap().is_none());
q.close().await.unwrap();
}
}