use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use parking_lot::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct JobId(pub u64);
#[derive(Debug, Clone, Default)]
pub struct PushOptions {
pub dedup_key: Option<String>,
pub run_after: Option<Instant>,
pub attempt: u32,
}
#[derive(Debug, Clone)]
pub struct ReservedJob {
pub id: JobId,
pub queue: String,
pub payload: Vec<u8>,
pub attempt: u32,
}
#[async_trait]
pub trait QueueBackend: Send + Sync + 'static {
async fn push(
&self,
queue: &str,
payload: &[u8],
opts: PushOptions,
) -> Result<JobId, BackendError>;
async fn reserve(&self, queue: &str) -> Result<Option<ReservedJob>, BackendError>;
async fn complete(&self, id: JobId) -> Result<(), BackendError>;
async fn fail(&self, id: JobId, retry_at: Option<Instant>) -> Result<(), BackendError>;
async fn dead_letter(&self, id: JobId) -> Result<(), BackendError>;
}
#[derive(Debug, Clone)]
pub enum BackendError {
Transport(String),
NotFound,
}
impl std::fmt::Display for BackendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Transport(e) => write!(f, "queue transport error: {e}"),
Self::NotFound => write!(f, "job not found"),
}
}
}
impl std::error::Error for BackendError {}
#[derive(Default)]
pub struct MemoryBackend {
inner: Arc<Mutex<MemoryInner>>,
next_id: std::sync::atomic::AtomicU64,
max_pending: Option<usize>,
}
#[derive(Default)]
struct MemoryInner {
pending: Vec<MemoryJob>,
reserved: Vec<MemoryJob>,
dlq: Vec<MemoryJob>,
dedup: std::collections::HashSet<String>,
}
#[derive(Clone)]
struct MemoryJob {
id: JobId,
queue: String,
payload: Vec<u8>,
attempt: u32,
run_after: Option<Instant>,
dedup_key: Option<String>,
}
impl MemoryBackend {
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_max_pending(mut self, max: usize) -> Self {
self.max_pending = Some(max);
self
}
}
#[async_trait]
impl QueueBackend for MemoryBackend {
async fn push(
&self,
queue: &str,
payload: &[u8],
opts: PushOptions,
) -> Result<JobId, BackendError> {
let mut inner = self.inner.lock();
if let Some(cap) = self.max_pending
&& inner.pending.len() >= cap
{
return Err(BackendError::Transport("queue full".into()));
}
if let Some(key) = opts.dedup_key.as_ref()
&& !inner.dedup.insert(key.clone())
{
return Ok(JobId(0));
}
let id = JobId(
self
.next_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
+ 1,
);
inner.pending.push(MemoryJob {
id,
queue: queue.to_string(),
payload: payload.to_vec(),
attempt: opts.attempt,
run_after: opts.run_after,
dedup_key: opts.dedup_key,
});
Ok(id)
}
async fn reserve(&self, queue: &str) -> Result<Option<ReservedJob>, BackendError> {
let mut inner = self.inner.lock();
let now = Instant::now();
let pos = inner
.pending
.iter()
.position(|j| j.queue == queue && j.run_after.is_none_or(|t| now >= t));
let Some(idx) = pos else {
return Ok(None);
};
let job = inner.pending.remove(idx);
let reserved = ReservedJob {
id: job.id,
queue: job.queue.clone(),
payload: job.payload.clone(),
attempt: job.attempt,
};
inner.reserved.push(job);
Ok(Some(reserved))
}
async fn complete(&self, id: JobId) -> Result<(), BackendError> {
let mut inner = self.inner.lock();
let pos = inner.reserved.iter().position(|j| j.id == id);
let Some(idx) = pos else {
return Err(BackendError::NotFound);
};
let job = inner.reserved.remove(idx);
if let Some(key) = job.dedup_key {
inner.dedup.remove(&key);
}
Ok(())
}
async fn fail(&self, id: JobId, retry_at: Option<Instant>) -> Result<(), BackendError> {
let mut inner = self.inner.lock();
let pos = inner.reserved.iter().position(|j| j.id == id);
let Some(idx) = pos else {
return Err(BackendError::NotFound);
};
let mut job = inner.reserved.remove(idx);
job.attempt = job.attempt.saturating_add(1);
job.run_after = retry_at;
inner.pending.push(job);
Ok(())
}
async fn dead_letter(&self, id: JobId) -> Result<(), BackendError> {
let mut inner = self.inner.lock();
let pos = inner.reserved.iter().position(|j| j.id == id);
let Some(idx) = pos else {
return Err(BackendError::NotFound);
};
let job = inner.reserved.remove(idx);
if let Some(key) = job.dedup_key.as_ref() {
inner.dedup.remove(key);
}
inner.dlq.push(job);
Ok(())
}
}
impl MemoryBackend {
pub fn dead_letters(&self) -> Vec<(JobId, String, Vec<u8>, u32)> {
self
.inner
.lock()
.dlq
.iter()
.map(|j| (j.id, j.queue.clone(), j.payload.clone(), j.attempt))
.collect()
}
pub fn pending_count(&self) -> usize {
self.inner.lock().pending.len()
}
pub fn reserved_count(&self) -> usize {
self.inner.lock().reserved.len()
}
pub fn dedup_size(&self) -> usize {
self.inner.lock().dedup.len()
}
pub fn purge_dedup_key(&self, key: &str) -> bool {
self.inner.lock().dedup.remove(key)
}
}