use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use slatedb::object_store::ObjectStore;
use slatedb::{Db, IsolationLevel};
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tracing::{debug, instrument, warn};
use ulid::Ulid;
use crate::error::{Error, Result};
use crate::job::{JobRecord, JobStatus};
use crate::reaper::{reap_expired, sweep_dead, sweep_done};
use crate::scheduler::{promote_due_jobs, schedule_loop};
use crate::stats::{CounterMergeOperator, QueueStats, read_stats, update_stats};
const DEFAULT_MAX_ATTEMPTS: u32 = 3;
const DEFAULT_LEASE_DURATION: Duration = Duration::from_secs(30);
pub const PRIORITY_HIGH: u32 = 100;
pub const PRIORITY_NORMAL: u32 = 1_000;
pub const PRIORITY_LOW: u32 = 10_000;
pub(crate) fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time before epoch")
.as_millis() as u64
}
pub(crate) fn pending_key(queue: &str, priority: u32, id: &str) -> String {
format!("pending:{}:{:010}:{}", queue, priority, id)
}
pub(crate) fn pending_prefix(queue: &str) -> String {
format!("pending:{}:", queue)
}
pub(crate) fn dead_key(queue: &str, id: &str) -> String {
format!("dead:{}:{}", queue, id)
}
pub(crate) fn claimed_key(queue: &str, lease_expires_at: u64, id: &str) -> String {
format!("claimed:{:020}:{}:{}", lease_expires_at, queue, id)
}
pub(crate) fn done_key(queue: &str, id: &str) -> String {
format!("done:{}:{}", queue, id)
}
pub(crate) fn scheduled_key(queue: &str, run_at: u64, id: &str) -> String {
format!("scheduled:{:020}:{}:{}", run_at, queue, id)
}
pub(crate) fn job_index_key(id: &str) -> String {
format!("jobindex:{}", id)
}
pub(crate) fn dedup_index_key(queue: &str, key: &str) -> String {
format!("dedup:{}:{}", queue, key)
}
pub(crate) fn backoff_delay(attempts: u32, base: Duration, max: Duration) -> Duration {
if base.is_zero() {
return Duration::ZERO;
}
let mult = 2u32.saturating_pow(attempts.saturating_sub(1));
base.saturating_mul(mult).min(max)
}
#[derive(Debug, Clone)]
pub struct QueueConfig {
pub max_attempts: u32,
pub lease_duration: Duration,
pub default_priority: u32,
pub retry_backoff_base: Duration,
pub retry_backoff_max: Duration,
}
impl Default for QueueConfig {
fn default() -> Self {
Self {
max_attempts: DEFAULT_MAX_ATTEMPTS,
lease_duration: DEFAULT_LEASE_DURATION,
default_priority: PRIORITY_NORMAL,
retry_backoff_base: Duration::from_secs(1),
retry_backoff_max: Duration::from_secs(300),
}
}
}
pub struct OpenOptions {
pub reaper_interval: Duration,
pub scheduler_interval: Duration,
pub default_queue_config: QueueConfig,
pub queue_configs: HashMap<String, QueueConfig>,
pub keep_done_jobs: Option<Duration>,
pub dead_retention: Option<Duration>,
}
impl Default for OpenOptions {
fn default() -> Self {
Self {
reaper_interval: Duration::from_secs(5),
scheduler_interval: Duration::from_secs(1),
default_queue_config: QueueConfig::default(),
queue_configs: HashMap::new(),
keep_done_jobs: None,
dead_retention: Some(Duration::from_secs(7 * 24 * 3600)),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct EnqueueOptions {
pub max_attempts: Option<u32>,
pub priority: Option<u32>,
pub run_at: Option<std::time::SystemTime>,
pub dedup_key: Option<String>,
pub headers: HashMap<String, String>,
}
pub struct Queue {
db: Arc<Db>,
reaper_shutdown: watch::Sender<bool>,
reaper_handle: JoinHandle<()>,
scheduler_shutdown: watch::Sender<bool>,
scheduler_handle: JoinHandle<()>,
default_queue_config: QueueConfig,
queue_configs: HashMap<String, QueueConfig>,
keep_done_jobs: Option<Duration>,
job_available: Arc<tokio::sync::Notify>,
}
impl Queue {
pub async fn open(object_store: Arc<dyn ObjectStore>, path: &str) -> Result<Self> {
Self::open_with_options(object_store, path, OpenOptions::default()).await
}
pub async fn open_with_options(
object_store: Arc<dyn ObjectStore>,
path: &str,
opts: OpenOptions,
) -> Result<Self> {
let db = Arc::new(
Db::builder(path, object_store)
.with_merge_operator(Arc::new(CounterMergeOperator))
.build()
.await?,
);
let job_available = Arc::new(tokio::sync::Notify::new());
let (reaper_shutdown, reaper_rx) = watch::channel(false);
let reaper_handle = tokio::spawn(crate::reaper::reap_loop(
db.clone(),
opts.reaper_interval,
opts.keep_done_jobs,
opts.dead_retention,
job_available.clone(),
reaper_rx,
));
let (scheduler_shutdown, scheduler_rx) = watch::channel(false);
let scheduler_handle = tokio::spawn(schedule_loop(
db.clone(),
opts.scheduler_interval,
job_available.clone(),
scheduler_rx,
));
Ok(Self {
db,
reaper_shutdown,
reaper_handle,
scheduler_shutdown,
scheduler_handle,
default_queue_config: opts.default_queue_config,
queue_configs: opts.queue_configs,
keep_done_jobs: opts.keep_done_jobs,
job_available,
})
}
fn queue_config(&self, queue: &str) -> &QueueConfig {
self.queue_configs
.get(queue)
.unwrap_or(&self.default_queue_config)
}
pub fn queue_lease_duration(&self, queue: &str) -> Duration {
self.queue_config(queue).lease_duration
}
pub async fn enqueue(&self, queue: &str, payload: Vec<u8>) -> Result<String> {
self.enqueue_with(queue, payload, EnqueueOptions::default())
.await
}
#[instrument(skip(self, payload), fields(queue, job_id))]
pub async fn enqueue_with(
&self,
queue: &str,
payload: Vec<u8>,
opts: EnqueueOptions,
) -> Result<String> {
let cfg = self.queue_config(queue);
let max_attempts = opts.max_attempts.unwrap_or(cfg.max_attempts);
let priority = opts.priority.unwrap_or(cfg.default_priority);
let run_at = opts.run_at.and_then(|when| {
let ms = when
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
(ms > now_ms()).then_some(ms)
});
let id = Ulid::new().to_string();
let (status, key) = match run_at {
Some(ms) => (JobStatus::Scheduled, scheduled_key(queue, ms, &id)),
None => (JobStatus::Pending, pending_key(queue, priority, &id)),
};
let job = JobRecord {
id,
queue: queue.to_string(),
payload,
headers: opts.headers,
status,
attempts: 0,
max_attempts,
enqueued_at: now_ms(),
claimed_at: None,
lease_expires_at: None,
run_at,
priority,
last_error: None,
dedup_key: opts.dedup_key.clone(),
completed_at: None,
failed_at: None,
};
match opts.dedup_key {
Some(dk) => self.write_unique(job, key, dk).await,
None => self.write_new(job, key).await,
}
}
async fn write_new(&self, job: JobRecord, key: String) -> Result<String> {
let value = rmp_serde::to_vec_named(&job)?;
let JobRecord {
id,
queue,
status,
priority,
run_at,
..
} = job;
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
txn.put(key.as_bytes(), &value)?;
txn.put(job_index_key(&id).as_bytes(), key.as_bytes())?;
update_stats(&txn, &queue, &[(status, 1)])?;
txn.commit().await?;
if matches!(status, JobStatus::Pending) {
self.job_available.notify_waiters();
}
debug!(queue = %queue, job_id = %id, priority, ?run_at, "job enqueued");
Ok(id)
}
async fn write_unique(&self, job: JobRecord, key: String, dedup_key: String) -> Result<String> {
let dkey = dedup_index_key(&job.queue, &dedup_key);
let value = rmp_serde::to_vec_named(&job)?;
let JobRecord {
id, queue, status, ..
} = job;
loop {
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
if let Some(bytes) = txn.get(dkey.as_bytes()).await? {
txn.rollback();
return String::from_utf8(bytes.to_vec()).map_err(|_| Error::InvalidState);
}
txn.put(key.as_bytes(), &value)?;
txn.put(job_index_key(&id).as_bytes(), key.as_bytes())?;
txn.put(dkey.as_bytes(), id.as_bytes())?;
update_stats(&txn, &queue, &[(status, 1)])?;
match txn.commit().await {
Ok(_) => {
if matches!(status, JobStatus::Pending) {
self.job_available.notify_waiters();
}
debug!(queue = %queue, job_id = %id, dedup_key, "unique job enqueued");
return Ok(id);
}
Err(e) if e.kind() == slatedb::ErrorKind::Transaction => continue,
Err(e) => return Err(e.into()),
}
}
}
pub async fn claim_next(&self, queue: &str) -> Result<Option<JobRecord>> {
let lease_duration = self.queue_config(queue).lease_duration;
self.claim(queue, lease_duration).await
}
pub async fn wait_for_jobs(&self, max_wait: Duration) {
let notified = self.job_available.notified();
tokio::pin!(notified);
tokio::select! {
_ = &mut notified => {}
_ = tokio::time::sleep(max_wait) => {}
}
}
pub async fn claim_with_wait(
&self,
queue: &str,
lease_duration: Duration,
max_wait: Duration,
) -> Result<Option<JobRecord>> {
let notified = self.job_available.notified();
tokio::pin!(notified);
if let Some(job) = self.claim(queue, lease_duration).await? {
return Ok(Some(job));
}
tokio::select! {
_ = &mut notified => {}
_ = tokio::time::sleep(max_wait) => return Ok(None),
}
self.claim(queue, lease_duration).await
}
#[instrument(skip(self), fields(queue))]
pub async fn claim(&self, queue: &str, lease_duration: Duration) -> Result<Option<JobRecord>> {
let prefix = pending_prefix(queue);
loop {
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
let mut iter = txn.scan_prefix(prefix.as_bytes()).await?;
let kv = match iter.next().await? {
Some(kv) => kv,
None => return Ok(None),
};
drop(iter);
let pending_key_bytes = kv.key.clone();
let mut job: JobRecord = rmp_serde::from_slice(&kv.value)?;
let now = now_ms();
let lease_expires_at = now + lease_duration.as_millis() as u64;
job.status = JobStatus::Claimed;
job.claimed_at = Some(now);
job.lease_expires_at = Some(lease_expires_at);
job.attempts += 1;
let dedup_key_to_release = job.dedup_key.take();
let claimed = claimed_key(&job.queue, lease_expires_at, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.delete(&pending_key_bytes)?;
txn.put(claimed.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), claimed.as_bytes())?;
if let Some(dk) = dedup_key_to_release.as_deref() {
txn.delete(dedup_index_key(&job.queue, dk).as_bytes())?;
}
update_stats(
&txn,
queue,
&[(JobStatus::Pending, -1), (JobStatus::Claimed, 1)],
)?;
match txn.commit().await {
Ok(_) => {
debug!(queue = queue, job_id = %job.id, attempt = job.attempts, "job claimed");
return Ok(Some(job));
}
Err(e) if e.kind() == slatedb::ErrorKind::Transaction => {
warn!(queue = queue, "claim transaction conflict, retrying");
continue;
}
Err(e) => return Err(e.into()),
}
}
}
#[instrument(skip(self, job), fields(queue = %job.queue, job_id = %job.id))]
pub async fn ack(&self, job: &JobRecord) -> Result<()> {
let lease_expires_at = job.lease_expires_at.ok_or(Error::InvalidState)?;
let claimed = claimed_key(&job.queue, lease_expires_at, &job.id);
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
txn.delete(claimed.as_bytes())?;
if self.keep_done_jobs.is_some() {
let mut done_job = job.clone();
done_job.status = JobStatus::Done;
done_job.completed_at = Some(now_ms());
let value = rmp_serde::to_vec_named(&done_job)?;
let done = done_key(&job.queue, &job.id);
txn.put(done.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), done.as_bytes())?;
} else {
txn.delete(job_index_key(&job.id).as_bytes())?;
}
update_stats(
&txn,
&job.queue,
&[(JobStatus::Claimed, -1), (JobStatus::Done, 1)],
)?;
txn.commit().await?;
debug!(queue = %job.queue, job_id = %job.id, "job acked");
Ok(())
}
#[instrument(skip(self, job), fields(queue = %job.queue, job_id = %job.id))]
pub async fn nack(&self, mut job: JobRecord, error: &str) -> Result<()> {
let lease_expires_at = job.lease_expires_at.ok_or(Error::InvalidState)?;
let claimed = claimed_key(&job.queue, lease_expires_at, &job.id);
job.last_error = Some(error.to_string());
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
txn.delete(claimed.as_bytes())?;
if job.attempts >= job.max_attempts {
job.status = JobStatus::Dead;
job.failed_at = Some(now_ms());
let dead = dead_key(&job.queue, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(dead.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), dead.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Claimed, -1), (JobStatus::Dead, 1)],
)?;
warn!(
queue = %job.queue,
job_id = %job.id,
attempts = job.attempts,
"job dead-lettered"
);
} else {
let cfg = self.queue_config(&job.queue);
let backoff =
backoff_delay(job.attempts, cfg.retry_backoff_base, cfg.retry_backoff_max);
job.claimed_at = None;
job.lease_expires_at = None;
if backoff.is_zero() {
job.status = JobStatus::Pending;
let priority = job.priority;
let pending = pending_key(&job.queue, priority, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(pending.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), pending.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Pending, 1), (JobStatus::Claimed, -1)],
)?;
debug!(
queue = %job.queue,
job_id = %job.id,
attempts = job.attempts,
"job re-queued"
);
} else {
let run_at = now_ms() + backoff.as_millis() as u64;
job.status = JobStatus::Scheduled;
job.run_at = Some(run_at);
let scheduled = scheduled_key(&job.queue, run_at, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(scheduled.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), scheduled.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Claimed, -1), (JobStatus::Scheduled, 1)],
)?;
debug!(
queue = %job.queue,
job_id = %job.id,
attempts = job.attempts,
backoff_ms = backoff.as_millis() as u64,
"job scheduled for retry"
);
}
}
let immediate_retry = matches!(job.status, JobStatus::Pending);
txn.commit().await?;
if immediate_retry {
self.job_available.notify_waiters();
}
Ok(())
}
pub async fn stats(&self, queue: &str) -> Result<QueueStats> {
read_stats(&self.db, queue).await
}
pub async fn list_queues(&self) -> Result<Vec<String>> {
let mut seen = std::collections::HashSet::new();
let mut queues = Vec::new();
let mut iter = self.db.scan_prefix(b"stats:").await?;
while let Some(kv) = iter.next().await? {
let key_str = match std::str::from_utf8(&kv.key) {
Ok(s) => s,
Err(_) => continue,
};
let without_prefix = key_str.strip_prefix("stats:").unwrap_or(key_str);
if let Some(idx) = without_prefix.rfind(':') {
let queue = &without_prefix[..idx];
if seen.insert(queue.to_string()) {
queues.push(queue.to_string());
}
}
}
Ok(queues)
}
pub async fn dead_jobs(
&self,
queue: &str,
after: Option<&str>,
limit: usize,
) -> Result<Vec<JobRecord>> {
if limit == 0 {
return Ok(Vec::new());
}
let prefix = format!("dead:{}:", queue);
let mut jobs = Vec::with_capacity(limit);
let mut iter = self.db.scan_prefix(prefix.as_bytes()).await?;
while let Some(kv) = iter.next().await? {
if let Some(after_id) = after {
let key_str = std::str::from_utf8(&kv.key).unwrap_or("");
let id = key_str.rsplit(':').next().unwrap_or("");
if id <= after_id {
continue;
}
}
let job: JobRecord = rmp_serde::from_slice(&kv.value)?;
jobs.push(job);
if jobs.len() >= limit {
break;
}
}
Ok(jobs)
}
#[instrument(skip(self, job), fields(queue = %job.queue, job_id = %job.id))]
pub async fn requeue_dead_job(&self, mut job: JobRecord) -> Result<()> {
if job.status != JobStatus::Dead {
return Err(Error::InvalidState);
}
let dead = dead_key(&job.queue, &job.id);
let priority = job.priority;
job.status = JobStatus::Pending;
job.attempts = 0;
job.last_error = None;
job.claimed_at = None;
job.lease_expires_at = None;
job.failed_at = None;
let pending = pending_key(&job.queue, priority, &job.id);
let value = rmp_serde::to_vec_named(&job)?;
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
txn.delete(dead.as_bytes())?;
txn.put(pending.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), pending.as_bytes())?;
update_stats(
&txn,
&job.queue,
&[(JobStatus::Pending, 1), (JobStatus::Dead, -1)],
)?;
txn.commit().await?;
self.job_available.notify_waiters();
debug!(queue = %job.queue, job_id = %job.id, "dead job re-queued");
Ok(())
}
#[instrument(skip(self, job), fields(queue = %job.queue, job_id = %job.id))]
pub async fn renew_lease(&self, job: &mut JobRecord, extension: Duration) -> Result<()> {
let old_expiry = job.lease_expires_at.ok_or(Error::InvalidState)?;
let old_claimed = claimed_key(&job.queue, old_expiry, &job.id);
let new_expiry = now_ms() + extension.as_millis() as u64;
job.lease_expires_at = Some(new_expiry);
let new_claimed = claimed_key(&job.queue, new_expiry, &job.id);
let value = rmp_serde::to_vec_named(job)?;
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
txn.delete(old_claimed.as_bytes())?;
txn.put(new_claimed.as_bytes(), &value)?;
txn.put(job_index_key(&job.id).as_bytes(), new_claimed.as_bytes())?;
txn.commit().await?;
debug!(queue = %job.queue, job_id = %job.id, new_expiry, "lease renewed");
Ok(())
}
pub async fn get_job(&self, id: &str) -> Result<Option<JobRecord>> {
let index_key = job_index_key(id);
let current_key = match self.db.get(index_key.as_bytes()).await? {
None => return Ok(None),
Some(bytes) => match String::from_utf8(bytes.to_vec()) {
Ok(s) => s,
Err(_) => return Err(Error::InvalidState),
},
};
match self.db.get(current_key.as_bytes()).await? {
None => Ok(None),
Some(bytes) => Ok(Some(rmp_serde::from_slice(&bytes)?)),
}
}
pub async fn cancel(&self, id: &str) -> Result<bool> {
loop {
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
let index_key = job_index_key(id);
let current_key = match txn.get(index_key.as_bytes()).await? {
None => {
txn.rollback();
return Ok(false);
}
Some(bytes) => match String::from_utf8(bytes.to_vec()) {
Ok(s) => s,
Err(_) => {
txn.rollback();
return Err(Error::InvalidState);
}
},
};
let job: JobRecord = match txn.get(current_key.as_bytes()).await? {
None => {
txn.rollback();
return Ok(false);
}
Some(bytes) => rmp_serde::from_slice(&bytes)?,
};
let is_scheduled = matches!(job.status, JobStatus::Scheduled);
let is_pending = matches!(job.status, JobStatus::Pending);
if !is_pending && !is_scheduled {
txn.rollback();
return Ok(false);
}
txn.delete(current_key.as_bytes())?;
txn.delete(index_key.as_bytes())?;
if let Some(ref dk) = job.dedup_key {
txn.delete(dedup_index_key(&job.queue, dk).as_bytes())?;
}
if is_scheduled {
update_stats(&txn, &job.queue, &[(JobStatus::Scheduled, -1)])?;
} else {
update_stats(&txn, &job.queue, &[(JobStatus::Pending, -1)])?;
}
match txn.commit().await {
Ok(_) => {
debug!(job_id = %id, "job cancelled");
return Ok(true);
}
Err(e) if e.kind() == slatedb::ErrorKind::Transaction => continue,
Err(e) => return Err(e.into()),
}
}
}
pub async fn enqueue_batch(&self, queue: &str, payloads: Vec<Vec<u8>>) -> Result<Vec<String>> {
if payloads.is_empty() {
return Ok(Vec::new());
}
let cfg = self.queue_config(queue);
let max_attempts = cfg.max_attempts;
let priority = cfg.default_priority;
let now = now_ms();
let mut ids = Vec::with_capacity(payloads.len());
let txn = self.db.begin(IsolationLevel::Snapshot).await?;
let mut id_gen = ulid::Generator::new();
for payload in payloads {
let id = id_gen
.generate()
.expect("monotonic ULID generator overflowed within one ms")
.to_string();
let job = JobRecord {
id: id.clone(),
queue: queue.to_string(),
payload,
headers: HashMap::new(),
status: JobStatus::Pending,
attempts: 0,
max_attempts,
enqueued_at: now,
claimed_at: None,
lease_expires_at: None,
run_at: None,
priority,
last_error: None,
dedup_key: None,
completed_at: None,
failed_at: None,
};
let key = pending_key(queue, priority, &id);
let value = rmp_serde::to_vec_named(&job)?;
txn.put(key.as_bytes(), &value)?;
txn.put(job_index_key(&id).as_bytes(), key.as_bytes())?;
ids.push(id);
}
update_stats(&txn, queue, &[(JobStatus::Pending, ids.len() as i64)])?;
txn.commit().await?;
self.job_available.notify_waiters();
debug!(queue = queue, count = ids.len(), "batch enqueued");
Ok(ids)
}
pub async fn reap_now(&self) -> Result<()> {
let count = reap_expired(&self.db).await?;
if count > 0 {
self.job_available.notify_waiters();
}
Ok(())
}
pub async fn promote_scheduled_now(&self) -> Result<()> {
let count = promote_due_jobs(&self.db).await?;
if count > 0 {
self.job_available.notify_waiters();
}
Ok(())
}
pub async fn sweep_done_now(&self, retention: Duration) -> Result<()> {
sweep_done(&self.db, retention).await
}
pub async fn sweep_dead_now(&self, retention: Duration) -> Result<()> {
sweep_dead(&self.db, retention).await
}
pub async fn close(self) -> Result<()> {
let _ = self.reaper_shutdown.send(true);
let _ = self.reaper_handle.await;
let _ = self.scheduler_shutdown.send(true);
let _ = self.scheduler_handle.await;
self.db.close().await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use slatedb::object_store::memory::InMemory;
fn make_store() -> Arc<dyn ObjectStore> {
Arc::new(InMemory::new())
}
fn no_backoff_opts() -> OpenOptions {
OpenOptions {
default_queue_config: QueueConfig {
retry_backoff_base: Duration::ZERO,
retry_backoff_max: Duration::ZERO,
..QueueConfig::default()
},
..OpenOptions::default()
}
}
#[tokio::test]
async fn test_enqueue_and_claim() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q.enqueue("email", b"hello".to_vec()).await.unwrap();
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.id, id);
assert_eq!(job.queue, "email");
assert_eq!(job.payload, b"hello");
assert_eq!(job.status, JobStatus::Claimed);
assert_eq!(job.attempts, 1);
assert!(job.claimed_at.is_some());
assert!(job.lease_expires_at.is_some());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_claim_empty_queue_returns_none() {
let q = Queue::open(make_store(), "test").await.unwrap();
assert!(
q.claim("email", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_ack_moves_job_to_done() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("email", b"hello".to_vec()).await.unwrap();
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.ack(&job).await.unwrap();
assert!(
q.claim("email", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_nack_requeues_job() {
let q = Queue::open_with_options(make_store(), "test", no_backoff_opts())
.await
.unwrap();
q.enqueue_with(
"email",
b"hello".to_vec(),
EnqueueOptions {
max_attempts: Some(3),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.attempts, 1);
q.nack(job, "transient error").await.unwrap();
let retried = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(retried.attempts, 2);
assert_eq!(retried.last_error.as_deref(), Some("transient error"));
assert_eq!(retried.status, JobStatus::Claimed);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_nack_dead_letters_after_max_attempts() {
let q = Queue::open_with_options(make_store(), "test", no_backoff_opts())
.await
.unwrap();
q.enqueue_with(
"email",
b"hello".to_vec(),
EnqueueOptions {
max_attempts: Some(2),
..Default::default()
},
)
.await
.unwrap();
for _ in 0..2 {
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "persistent error").await.unwrap();
}
assert!(
q.claim("email", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_fifo_ordering() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id_a = q.enqueue("work", b"first".to_vec()).await.unwrap();
let id_b = q.enqueue("work", b"second".to_vec()).await.unwrap();
let id_c = q.enqueue("work", b"third".to_vec()).await.unwrap();
let j1 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j2 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j3 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j1.id, id_a);
assert_eq!(j2.id, id_b);
assert_eq!(j3.id, id_c);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_queue_isolation() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id_email = q.enqueue("email", b"email job".to_vec()).await.unwrap();
let id_resize = q.enqueue("resize", b"resize job".to_vec()).await.unwrap();
let email_job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let resize_job = q
.claim("resize", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(email_job.id, id_email);
assert_eq!(resize_job.id, id_resize);
assert!(
q.claim("email", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
assert!(
q.claim("resize", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_reaper_requeues_expired_job() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(3),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_millis(0))
.await
.unwrap()
.unwrap();
assert_eq!(job.attempts, 1);
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.reap_now().await.unwrap();
let reclaimed = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(reclaimed.id, job.id);
assert_eq!(reclaimed.attempts, 2);
assert_eq!(reclaimed.status, JobStatus::Claimed);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_reaper_dead_letters_after_max_attempts() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(2),
..Default::default()
},
)
.await
.unwrap();
let _job = q
.claim("work", Duration::from_millis(0))
.await
.unwrap()
.unwrap();
q.reap_now().await.unwrap();
let _job = q
.claim("work", Duration::from_millis(0))
.await
.unwrap()
.unwrap();
q.reap_now().await.unwrap();
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_reaper_skips_active_leases() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(300))
.await
.unwrap()
.unwrap();
q.reap_now().await.unwrap();
assert!(
q.claim("work", Duration::from_secs(300))
.await
.unwrap()
.is_none()
);
q.ack(&job).await.unwrap();
q.close().await.unwrap();
}
#[tokio::test]
async fn test_reaper_ignores_already_acked_job() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_millis(0))
.await
.unwrap()
.unwrap();
q.ack(&job).await.unwrap();
q.reap_now().await.unwrap();
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_stats_track_job_lifecycle() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("email", b"a".to_vec()).await.unwrap();
q.enqueue("email", b"b".to_vec()).await.unwrap();
let s = q.stats("email").await.unwrap();
assert_eq!(s.pending, 2);
assert_eq!(s.claimed, 0);
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let s = q.stats("email").await.unwrap();
assert_eq!(s.pending, 1);
assert_eq!(s.claimed, 1);
q.ack(&job).await.unwrap();
let s = q.stats("email").await.unwrap();
assert_eq!(s.pending, 1);
assert_eq!(s.claimed, 0);
assert_eq!(s.done, 1);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_stats_nack_dead_letter() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"email",
b"x".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("email", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "fail").await.unwrap();
let s = q.stats("email").await.unwrap();
assert_eq!(s.pending, 0);
assert_eq!(s.claimed, 0);
assert_eq!(s.dead, 1);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_list_queues() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("alpha", b"1".to_vec()).await.unwrap();
q.enqueue("beta", b"2".to_vec()).await.unwrap();
q.enqueue("gamma", b"3".to_vec()).await.unwrap();
let mut queues = q.list_queues().await.unwrap();
queues.sort();
assert_eq!(queues, vec!["alpha", "beta", "gamma"]);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_dead_jobs_and_requeue() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "fatal").await.unwrap();
let dead = q.dead_jobs("work", None, 100).await.unwrap();
assert_eq!(dead.len(), 1);
assert_eq!(dead[0].id, id);
assert_eq!(dead[0].status, JobStatus::Dead);
q.requeue_dead_job(dead.into_iter().next().unwrap())
.await
.unwrap();
let revived = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(revived.id, id);
assert_eq!(revived.attempts, 1); assert!(revived.last_error.is_none());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_per_queue_config() {
let mut opts = OpenOptions::default();
opts.queue_configs.insert(
"fast".to_string(),
QueueConfig {
max_attempts: 1,
lease_duration: Duration::from_secs(5),
..QueueConfig::default()
},
);
let q = Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap();
q.enqueue("fast", b"x".to_vec()).await.unwrap();
let job = q.claim_next("fast").await.unwrap().unwrap();
assert_eq!(job.max_attempts, 1);
let lease_expires_at = job.lease_expires_at.unwrap();
let claimed_at = job.claimed_at.unwrap();
assert!(lease_expires_at - claimed_at <= 5_001);
q.ack(&job).await.unwrap();
q.close().await.unwrap();
}
#[tokio::test]
async fn test_priority_ordering() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id_low = q
.enqueue_with(
"jobs",
b"low".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_LOW),
..Default::default()
},
)
.await
.unwrap();
let id_normal = q
.enqueue_with(
"jobs",
b"normal".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_NORMAL),
..Default::default()
},
)
.await
.unwrap();
let id_high = q
.enqueue_with(
"jobs",
b"high".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_HIGH),
..Default::default()
},
)
.await
.unwrap();
let j1 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j2 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j3 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j1.id, id_high);
assert_eq!(j2.id, id_normal);
assert_eq!(j3.id, id_low);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_priority_fifo_within_same_priority() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id_first = q
.enqueue_with(
"jobs",
b"first".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_NORMAL),
..Default::default()
},
)
.await
.unwrap();
let id_second = q
.enqueue_with(
"jobs",
b"second".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_NORMAL),
..Default::default()
},
)
.await
.unwrap();
let j1 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j2 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j1.id, id_first);
assert_eq!(j2.id, id_second);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_priority_preserved_after_nack() {
let q = Queue::open_with_options(make_store(), "test", no_backoff_opts())
.await
.unwrap();
let id_high = q
.enqueue_with(
"jobs",
b"high".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_HIGH),
..Default::default()
},
)
.await
.unwrap();
let _id_normal = q
.enqueue_with(
"jobs",
b"normal".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_NORMAL),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.id, id_high);
q.nack(job, "retry me").await.unwrap();
let reclaimed = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(reclaimed.id, id_high);
assert_eq!(reclaimed.priority, PRIORITY_HIGH);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_priority_stored_on_job_record() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"jobs",
b"x".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_HIGH),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.priority, PRIORITY_HIGH);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_at_future_not_immediately_claimable() {
let q = Queue::open(make_store(), "test").await.unwrap();
let run_at = std::time::SystemTime::now() + Duration::from_secs(3600);
q.enqueue_with(
"jobs",
b"future".to_vec(),
EnqueueOptions {
run_at: Some(run_at),
..Default::default()
},
)
.await
.unwrap();
assert!(
q.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
let s = q.stats("jobs").await.unwrap();
assert_eq!(s.scheduled, 1);
assert_eq!(s.pending, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_at_past_is_immediately_pending() {
let q = Queue::open(make_store(), "test").await.unwrap();
let run_at = std::time::SystemTime::now() - Duration::from_secs(1);
q.enqueue_with(
"jobs",
b"past".to_vec(),
EnqueueOptions {
run_at: Some(run_at),
..Default::default()
},
)
.await
.unwrap();
let job = q.claim("jobs", Duration::from_secs(30)).await.unwrap();
assert!(job.is_some());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_promote_scheduled_now() {
let q = Queue::open(make_store(), "test").await.unwrap();
let run_at = std::time::SystemTime::now() + Duration::from_millis(1);
let id = q
.enqueue_with(
"jobs",
b"soon".to_vec(),
EnqueueOptions {
run_at: Some(run_at),
..Default::default()
},
)
.await
.unwrap();
assert!(
q.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
tokio::time::sleep(Duration::from_millis(5)).await;
q.promote_scheduled_now().await.unwrap();
let s = q.stats("jobs").await.unwrap();
assert_eq!(s.scheduled, 0);
assert_eq!(s.pending, 1);
let job = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.id, id);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_in_convenience() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"jobs",
b"delayed".to_vec(),
EnqueueOptions {
run_at: Some(std::time::SystemTime::now() + Duration::from_secs(3600)),
..Default::default()
},
)
.await
.unwrap();
let s = q.stats("jobs").await.unwrap();
assert_eq!(s.scheduled, 1);
assert_eq!(s.pending, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_scheduled_job_preserves_priority() {
let q = Queue::open(make_store(), "test").await.unwrap();
let run_at = std::time::SystemTime::now() + Duration::from_millis(1);
q.enqueue_with(
"jobs",
b"normal".to_vec(),
EnqueueOptions {
run_at: Some(run_at),
..Default::default()
},
)
.await
.unwrap();
q.enqueue_with(
"jobs",
b"high".to_vec(),
EnqueueOptions {
priority: Some(PRIORITY_HIGH),
..Default::default()
},
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(5)).await;
q.promote_scheduled_now().await.unwrap();
let j1 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j1.payload, b"high");
let j2 = q
.claim("jobs", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j2.payload, b"normal");
q.close().await.unwrap();
}
#[tokio::test]
async fn test_worker_trait() {
use crate::worker::{Worker, WorkerError, run_worker};
struct EchoWorker;
impl Worker for EchoWorker {
async fn process(&self, _job: &JobRecord) -> std::result::Result<(), WorkerError> {
Ok(())
}
}
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
q.enqueue("work", b"hello".to_vec()).await.unwrap();
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let q2 = q.clone();
let handle = tokio::spawn(async move {
run_worker(
&q2,
"work",
&EchoWorker,
Duration::from_millis(10),
async move {
let _ = shutdown_rx.await;
},
)
.await
});
loop {
let s = q.stats("work").await.unwrap();
if s.pending == 0 && s.claimed == 0 {
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
let _ = shutdown_tx.send(());
let _ = handle.await;
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
}
#[tokio::test]
async fn test_get_job_tracks_lifecycle() {
let opts = OpenOptions {
keep_done_jobs: Some(Duration::from_secs(60)),
..OpenOptions::default()
};
let q = Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q.get_job(&id).await.unwrap().unwrap();
assert_eq!(job.status, JobStatus::Pending);
let claimed = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let job = q.get_job(&id).await.unwrap().unwrap();
assert_eq!(job.status, JobStatus::Claimed);
q.ack(&claimed).await.unwrap();
let job = q.get_job(&id).await.unwrap().unwrap();
assert_eq!(job.status, JobStatus::Done);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_ack_deletes_job_by_default() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.ack(&job).await.unwrap();
assert!(
q.get_job(&id).await.unwrap().is_none(),
"ack must drop the index by default"
);
let s = q.stats("work").await.unwrap();
assert_eq!(s.done, 1, "done counter still tracks throughput");
assert_eq!(s.pending, 0);
assert_eq!(s.claimed, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_done_retention_sweeps_old_jobs() {
let opts = OpenOptions {
keep_done_jobs: Some(Duration::from_millis(20)),
..OpenOptions::default()
};
let q = Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.ack(&job).await.unwrap();
assert!(q.get_job(&id).await.unwrap().is_some());
tokio::time::sleep(Duration::from_millis(30)).await;
q.sweep_done_now(Duration::from_millis(20)).await.unwrap();
assert!(
q.get_job(&id).await.unwrap().is_none(),
"retention sweep must purge expired done jobs"
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_done_retention_uses_completion_time_not_enqueue_time() {
let opts = OpenOptions {
keep_done_jobs: Some(Duration::from_millis(500)),
..OpenOptions::default()
};
let q = Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap();
let id = q
.enqueue_with(
"work",
b"weekly".to_vec(),
EnqueueOptions {
run_at: Some(std::time::SystemTime::now() + Duration::from_millis(200)),
..Default::default()
},
)
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(220)).await;
q.promote_scheduled_now().await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let elapsed_since_enqueue = now_ms().saturating_sub(job.enqueued_at);
assert!(
elapsed_since_enqueue > 200,
"enqueued_at should be well over 200ms old (was {elapsed_since_enqueue}ms)"
);
q.ack(&job).await.unwrap();
q.sweep_done_now(Duration::from_millis(500)).await.unwrap();
let kept = q.get_job(&id).await.unwrap().expect(
"fresh completion must survive the sweep regardless of how long ago the job was enqueued",
);
assert!(
kept.completed_at.is_some(),
"ack must stamp completed_at when keep_done_jobs is set"
);
tokio::time::sleep(Duration::from_millis(550)).await;
q.sweep_done_now(Duration::from_millis(500)).await.unwrap();
assert!(q.get_job(&id).await.unwrap().is_none());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_dead_retention_sweep_boundary() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let id = job.id.clone();
q.nack(job, "fatal").await.unwrap();
let dead = q.dead_jobs("work", None, 100).await.unwrap();
assert_eq!(dead.len(), 1);
assert!(dead[0].failed_at.is_some(), "failed_at must be stamped");
assert_eq!(q.stats("work").await.unwrap().dead, 1);
q.sweep_dead_now(Duration::from_secs(3600)).await.unwrap();
assert_eq!(q.dead_jobs("work", None, 100).await.unwrap().len(), 1);
tokio::time::sleep(Duration::from_millis(30)).await;
q.sweep_dead_now(Duration::from_millis(20)).await.unwrap();
assert!(q.dead_jobs("work", None, 100).await.unwrap().is_empty());
assert_eq!(
q.stats("work").await.unwrap().dead,
0,
"dead counter must reflect the sweep"
);
assert!(q.get_job(&id).await.unwrap().is_none());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_requeue_dead_resets_failed_at() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "fatal").await.unwrap();
let dead = q.dead_jobs("work", None, 100).await.unwrap().pop().unwrap();
assert!(dead.failed_at.is_some());
q.requeue_dead_job(dead).await.unwrap();
let pending = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert!(
pending.failed_at.is_none(),
"requeue must clear failed_at so a re-fail starts a fresh retention window"
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_get_job_returns_none_for_unknown_id() {
let q = Queue::open(make_store(), "test").await.unwrap();
assert!(q.get_job("nonexistent").await.unwrap().is_none());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_get_job_after_nack_to_dead() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"x".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let id = job.id.clone();
q.nack(job, "fatal").await.unwrap();
let dead = q.get_job(&id).await.unwrap().unwrap();
assert_eq!(dead.status, JobStatus::Dead);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_renew_lease() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let mut job = q
.claim("work", Duration::from_millis(1))
.await
.unwrap()
.unwrap();
let original_expiry = job.lease_expires_at.unwrap();
q.renew_lease(&mut job, Duration::from_secs(30))
.await
.unwrap();
let new_expiry = job.lease_expires_at.unwrap();
assert!(new_expiry > original_expiry, "renewed expiry must be later");
q.reap_now().await.unwrap();
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
let fetched = q.get_job(&job.id).await.unwrap().unwrap();
assert_eq!(fetched.status, JobStatus::Claimed);
assert_eq!(fetched.lease_expires_at.unwrap(), new_expiry);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_cancel_pending_job() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q.enqueue("work", b"payload".to_vec()).await.unwrap();
let cancelled = q.cancel(&id).await.unwrap();
assert!(cancelled);
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
assert!(q.get_job(&id).await.unwrap().is_none());
assert_eq!(q.stats("work").await.unwrap().pending, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_cancel_scheduled_job() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
run_at: Some(std::time::SystemTime::now() + Duration::from_secs(3600)),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(q.stats("work").await.unwrap().scheduled, 1);
let cancelled = q.cancel(&id).await.unwrap();
assert!(cancelled);
assert_eq!(q.stats("work").await.unwrap().scheduled, 0);
assert!(q.get_job(&id).await.unwrap().is_none());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_cancel_claimed_job_returns_false() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue("work", b"payload".to_vec()).await.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let cancelled = q.cancel(&job.id).await.unwrap();
assert!(!cancelled);
q.ack(&job).await.unwrap();
q.close().await.unwrap();
}
#[tokio::test]
async fn test_cancel_nonexistent_returns_false() {
let q = Queue::open(make_store(), "test").await.unwrap();
assert!(!q.cancel("does-not-exist").await.unwrap());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_batch_atomic() {
let q = Queue::open(make_store(), "test").await.unwrap();
let payloads = vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()];
let ids = q.enqueue_batch("work", payloads).await.unwrap();
assert_eq!(ids.len(), 3);
let s = q.stats("work").await.unwrap();
assert_eq!(s.pending, 3);
let j1 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j2 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let j3 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(j1.id, ids[0]);
assert_eq!(j2.id, ids[1]);
assert_eq!(j3.id, ids[2]);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_batch_empty_is_noop() {
let q = Queue::open(make_store(), "test").await.unwrap();
let ids = q.enqueue_batch("work", vec![]).await.unwrap();
assert!(ids.is_empty());
assert_eq!(q.stats("work").await.unwrap().pending, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_unique_deduplicates() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id1 = q
.enqueue_with(
"work",
b"first".to_vec(),
EnqueueOptions {
dedup_key: Some("my-key".to_string()),
..Default::default()
},
)
.await
.unwrap();
let id2 = q
.enqueue_with(
"work",
b"second".to_vec(),
EnqueueOptions {
dedup_key: Some("my-key".to_string()),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(id1, id2);
assert_eq!(q.stats("work").await.unwrap().pending, 1);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_unique_allows_reenqueue_after_claim() {
let q = Queue::open(make_store(), "test").await.unwrap();
let id1 = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
dedup_key: Some("my-key".to_string()),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(job.id, id1);
let id2 = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
dedup_key: Some("my-key".to_string()),
..Default::default()
},
)
.await
.unwrap();
assert_ne!(id1, id2);
assert_eq!(q.stats("work").await.unwrap().pending, 1);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_enqueue_unique_nack_then_reenqueue_does_not_corrupt_dedup() {
let q = Queue::open_with_options(make_store(), "test", no_backoff_opts())
.await
.unwrap();
let id1 = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
dedup_key: Some("user-42".to_string()),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert!(job.dedup_key.is_none());
q.nack(job, "transient").await.unwrap();
let id2 = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
dedup_key: Some("user-42".to_string()),
..Default::default()
},
)
.await
.unwrap();
assert_ne!(id1, id2);
let j1 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let id3 = q
.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
dedup_key: Some("user-42".to_string()),
..Default::default()
},
)
.await
.unwrap();
assert_eq!(
id3, id2,
"id2's dedup index must still block the third enqueue while id2 is pending"
);
q.ack(&j1).await.unwrap();
let j2 = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.ack(&j2).await.unwrap();
assert_eq!(q.stats("work").await.unwrap().pending, 0);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_nack_with_backoff_parks_in_scheduled() {
let q = Queue::open(make_store(), "test").await.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(3),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "transient").await.unwrap();
let s = q.stats("work").await.unwrap();
assert_eq!(s.pending, 0, "must not be pending immediately");
assert_eq!(s.claimed, 0);
assert_eq!(s.scheduled, 1, "must be parked in scheduled");
assert!(
q.claim("work", Duration::from_secs(30))
.await
.unwrap()
.is_none()
);
q.close().await.unwrap();
}
#[tokio::test]
async fn test_nack_backoff_promoted_after_run_at() {
let opts = OpenOptions {
default_queue_config: QueueConfig {
retry_backoff_base: Duration::from_millis(10),
retry_backoff_max: Duration::from_millis(10),
..QueueConfig::default()
},
..OpenOptions::default()
};
let q = Queue::open_with_options(make_store(), "test", opts)
.await
.unwrap();
q.enqueue_with(
"work",
b"payload".to_vec(),
EnqueueOptions {
max_attempts: Some(5),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
let id = job.id.clone();
q.nack(job, "boom").await.unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
q.promote_scheduled_now().await.unwrap();
let retried = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
assert_eq!(retried.id, id);
assert_eq!(retried.attempts, 2);
assert_eq!(retried.last_error.as_deref(), Some("boom"));
q.close().await.unwrap();
}
#[tokio::test]
async fn test_backoff_delay_calculation() {
let base = Duration::from_secs(1);
let max = Duration::from_secs(60);
assert_eq!(backoff_delay(1, base, max), Duration::from_secs(1));
assert_eq!(backoff_delay(2, base, max), Duration::from_secs(2));
assert_eq!(backoff_delay(3, base, max), Duration::from_secs(4));
assert_eq!(backoff_delay(4, base, max), Duration::from_secs(8));
assert_eq!(backoff_delay(20, base, max), max);
assert_eq!(
backoff_delay(5, Duration::ZERO, Duration::from_secs(10)),
Duration::ZERO
);
}
#[tokio::test]
async fn test_dead_jobs_pagination() {
let q = Queue::open(make_store(), "test").await.unwrap();
let mut ids = Vec::new();
for _ in 0..5 {
let id = q
.enqueue_with(
"work",
b"x".to_vec(),
EnqueueOptions {
max_attempts: Some(1),
..Default::default()
},
)
.await
.unwrap();
let job = q
.claim("work", Duration::from_secs(30))
.await
.unwrap()
.unwrap();
q.nack(job, "fail").await.unwrap();
ids.push(id);
}
let p1 = q.dead_jobs("work", None, 2).await.unwrap();
assert_eq!(p1.len(), 2);
assert_eq!(p1[0].id, ids[0]);
assert_eq!(p1[1].id, ids[1]);
let p2 = q.dead_jobs("work", Some(&p1[1].id), 2).await.unwrap();
assert_eq!(p2.len(), 2);
assert_eq!(p2[0].id, ids[2]);
assert_eq!(p2[1].id, ids[3]);
let p3 = q.dead_jobs("work", Some(&p2[1].id), 2).await.unwrap();
assert_eq!(p3.len(), 1);
assert_eq!(p3[0].id, ids[4]);
assert!(q.dead_jobs("work", None, 0).await.unwrap().is_empty());
q.close().await.unwrap();
}
#[tokio::test]
async fn test_worker_finishes_in_flight_job_on_shutdown() {
use crate::worker::{Worker, WorkerError, run_worker};
use std::sync::atomic::{AtomicBool, Ordering};
struct SlowWorker {
finished: Arc<AtomicBool>,
}
impl Worker for SlowWorker {
async fn process(&self, _job: &JobRecord) -> std::result::Result<(), WorkerError> {
tokio::time::sleep(Duration::from_millis(100)).await;
self.finished.store(true, Ordering::SeqCst);
Ok(())
}
}
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
q.enqueue("work", b"x".to_vec()).await.unwrap();
let finished = Arc::new(AtomicBool::new(false));
let worker = SlowWorker {
finished: finished.clone(),
};
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let q2 = q.clone();
let handle = tokio::spawn(async move {
run_worker(
&q2,
"work",
&worker,
Duration::from_millis(50),
async move {
let _ = shutdown_rx.await;
},
)
.await
});
loop {
if q.stats("work").await.unwrap().claimed == 1 {
break;
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
let _ = shutdown_tx.send(());
let _ = handle.await;
assert!(
finished.load(Ordering::SeqCst),
"in-flight job must finish before shutdown returns"
);
assert_eq!(q.stats("work").await.unwrap().claimed, 0);
assert_eq!(q.stats("work").await.unwrap().done, 1);
}
#[tokio::test]
async fn test_claim_with_wait_wakes_or_times_out() {
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
let timed_out = q
.claim_with_wait("work", Duration::from_secs(30), Duration::from_millis(50))
.await
.unwrap();
assert!(timed_out.is_none());
let q2 = q.clone();
let waiter = tokio::spawn(async move {
let start = std::time::Instant::now();
let job = q2
.claim_with_wait("work", Duration::from_secs(30), Duration::from_secs(10))
.await
.unwrap();
(start.elapsed(), job)
});
tokio::time::sleep(Duration::from_millis(20)).await;
q.enqueue("work", b"hello".to_vec()).await.unwrap();
let (elapsed, job) = waiter.await.unwrap();
assert!(job.is_some(), "claim_with_wait must wake on enqueue");
assert!(
elapsed < Duration::from_millis(500),
"expected fast wake; took {elapsed:?}"
);
}
#[tokio::test]
async fn test_concurrent_worker() {
use crate::worker::{Worker, WorkerError, run_worker_concurrent};
struct EchoWorker;
impl Worker for EchoWorker {
async fn process(&self, _job: &JobRecord) -> std::result::Result<(), WorkerError> {
tokio::time::sleep(Duration::from_millis(5)).await;
Ok(())
}
}
let q = Arc::new(Queue::open(make_store(), "test").await.unwrap());
let ids = q
.enqueue_batch(
"work",
vec![
b"a".to_vec(),
b"b".to_vec(),
b"c".to_vec(),
b"d".to_vec(),
b"e".to_vec(),
],
)
.await
.unwrap();
assert_eq!(ids.len(), 5);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let q2 = q.clone();
let handle = tokio::spawn(async move {
run_worker_concurrent(
&q2,
"work",
Arc::new(EchoWorker),
3,
Duration::from_millis(10),
async move {
let _ = shutdown_rx.await;
},
)
.await
});
loop {
let s = q.stats("work").await.unwrap();
if s.pending == 0 && s.claimed == 0 {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
let _ = shutdown_tx.send(());
let _ = handle.await;
assert_eq!(q.stats("work").await.unwrap().done, 5);
}
}