use std::collections::{BTreeMap, VecDeque};
use std::time::Instant;
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use tokio::sync::Mutex;
#[async_trait]
pub trait Driver: Send + Sync + 'static {
async fn pop(&self, queue: &str) -> Result<Option<Bytes>>;
async fn push(&self, queue: &str, payload: Bytes) -> Result<()>;
async fn push_delayed(&self, queue: &str, payload: Bytes, delay_secs: u64) -> Result<()>;
async fn promote_delayed(&self, queue: &str) -> Result<u64>;
async fn depth(&self, queue: &str) -> Result<u64>;
}
struct DelayedJob {
ready_at: Instant,
payload: Bytes,
}
pub struct MemoryDriver {
queues: Mutex<std::collections::HashMap<String, VecDeque<Bytes>>>,
delayed: Mutex<std::collections::HashMap<String, BTreeMap<u64, Vec<DelayedJob>>>>,
next_id: Mutex<u64>,
notify: tokio::sync::Notify,
}
impl MemoryDriver {
pub fn new() -> std::sync::Arc<Self> {
std::sync::Arc::new(Self {
queues: Mutex::new(Default::default()),
delayed: Mutex::new(Default::default()),
next_id: Mutex::new(0),
notify: tokio::sync::Notify::new(),
})
}
}
#[async_trait]
impl Driver for MemoryDriver {
async fn pop(&self, queue: &str) -> Result<Option<Bytes>> {
loop {
{
let mut qs = self.queues.lock().await;
if let Some(item) = qs.entry(queue.to_string()).or_default().pop_front() {
return Ok(Some(item));
}
}
self.notify.notified().await;
}
}
async fn push(&self, queue: &str, payload: Bytes) -> Result<()> {
let mut qs = self.queues.lock().await;
qs.entry(queue.to_string()).or_default().push_back(payload);
self.notify.notify_one();
Ok(())
}
async fn push_delayed(&self, queue: &str, payload: Bytes, delay_secs: u64) -> Result<()> {
if delay_secs == 0 {
return self.push(queue, payload).await;
}
let ready_at = Instant::now() + std::time::Duration::from_secs(delay_secs);
let mut delayed = self.delayed.lock().await;
let mut id = self.next_id.lock().await;
*id += 1;
delayed
.entry(queue.to_string())
.or_default()
.entry(*id)
.or_default()
.push(DelayedJob { ready_at, payload });
Ok(())
}
async fn promote_delayed(&self, queue: &str) -> Result<u64> {
let now = Instant::now();
let ready_payloads = {
let mut delayed = self.delayed.lock().await;
let mut ready = Vec::new();
if let Some(jobs) = delayed.get_mut(queue) {
let keys: Vec<u64> = jobs.keys().copied().collect();
for key in keys {
let entry = jobs.get_mut(&key).unwrap();
let mut remaining = Vec::new();
for job in entry.drain(..) {
if job.ready_at <= now {
ready.push(job.payload);
} else {
remaining.push(job);
}
}
if remaining.is_empty() {
jobs.remove(&key);
} else {
*jobs.get_mut(&key).unwrap() = remaining;
}
}
}
ready
};
let count = ready_payloads.len() as u64;
if count > 0 {
let mut qs = self.queues.lock().await;
let q = qs.entry(queue.to_string()).or_default();
for payload in ready_payloads {
q.push_back(payload);
}
drop(qs);
self.notify.notify_one();
}
Ok(count)
}
async fn depth(&self, queue: &str) -> Result<u64> {
Ok(self
.queues
.lock()
.await
.get(queue)
.map(|q| q.len() as u64)
.unwrap_or(0))
}
}