use std::collections::HashMap;
use std::sync::RwLock;
use anyhow::Result;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use crate::engine::models::{Job, JobId, JobState, QueueCounts, Schedule};
use crate::storage::StorageBackend;
pub struct MemoryStorage {
jobs: DashMap<JobId, Job>,
schedules: RwLock<HashMap<String, Schedule>>,
}
impl MemoryStorage {
pub fn new() -> Self {
Self {
jobs: DashMap::new(),
schedules: RwLock::new(HashMap::new()),
}
}
}
impl Default for MemoryStorage {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl StorageBackend for MemoryStorage {
async fn insert_job(&self, job: &Job) -> Result<JobId> {
let id = job.id;
self.jobs.insert(id, job.clone());
Ok(id)
}
async fn get_job(&self, id: JobId) -> Result<Option<Job>> {
Ok(self.jobs.get(&id).map(|r| r.value().clone()))
}
async fn update_job(&self, job: &Job) -> Result<()> {
self.jobs.insert(job.id, job.clone());
Ok(())
}
async fn delete_job(&self, id: JobId) -> Result<()> {
self.jobs.remove(&id);
Ok(())
}
async fn dequeue(&self, queue: &str, count: u32) -> Result<Vec<Job>> {
let mut candidates: Vec<(JobId, i32, DateTime<Utc>)> = Vec::new();
for entry in self.jobs.iter() {
let j = entry.value();
if j.queue == queue && j.state == JobState::Waiting {
candidates.push((j.id, j.priority, j.created_at));
}
}
candidates.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.2.cmp(&b.2)));
let now = Utc::now();
let mut selected = Vec::new();
for (id, _, _) in candidates.into_iter().take(count as usize) {
if let Some(mut entry) = self.jobs.get_mut(&id) {
let job = entry.value_mut();
if job.state == JobState::Waiting {
job.state = JobState::Active;
job.started_at = Some(now);
job.updated_at = now;
selected.push(job.clone());
}
}
}
Ok(selected)
}
async fn get_queue_counts(&self, queue: &str) -> Result<QueueCounts> {
let mut counts = QueueCounts::default();
for entry in self.jobs.iter() {
let job = entry.value();
if job.queue != queue {
continue;
}
match job.state {
JobState::Waiting | JobState::Created => counts.waiting += 1,
JobState::Active => counts.active += 1,
JobState::Delayed => counts.delayed += 1,
JobState::Completed => counts.completed += 1,
JobState::Failed => counts.failed += 1,
JobState::Dlq => counts.dlq += 1,
JobState::Blocked => counts.blocked += 1,
_ => {}
}
}
Ok(counts)
}
async fn get_ready_scheduled(&self, now: DateTime<Utc>) -> Result<Vec<Job>> {
let ready = self
.jobs
.iter()
.filter(|entry| {
let j = entry.value();
j.state == JobState::Delayed
&& j.delay_until
.map(|delay_until| delay_until <= now)
.unwrap_or(false)
})
.map(|entry| entry.value().clone())
.collect();
Ok(ready)
}
async fn move_to_dlq(&self, job: &Job, reason: &str) -> Result<()> {
let mut updated = job.clone();
updated.state = JobState::Dlq;
updated.last_error = Some(reason.to_string());
updated.updated_at = Utc::now();
self.jobs.insert(updated.id, updated);
Ok(())
}
async fn get_dlq_jobs(&self, queue: &str, limit: u32) -> Result<Vec<Job>> {
let dlq_jobs: Vec<Job> = self
.jobs
.iter()
.filter(|entry| {
let j = entry.value();
j.queue == queue && j.state == JobState::Dlq
})
.take(limit as usize)
.map(|entry| entry.value().clone())
.collect();
Ok(dlq_jobs)
}
async fn remove_completed_before(&self, before: DateTime<Utc>) -> Result<u64> {
let to_remove: Vec<JobId> = self
.jobs
.iter()
.filter(|entry| {
let j = entry.value();
j.state == JobState::Completed
&& j.completed_at
.map(|completed_at| completed_at < before)
.unwrap_or(false)
})
.map(|entry| *entry.key())
.collect();
let count = to_remove.len() as u64;
for id in to_remove {
self.jobs.remove(&id);
}
Ok(count)
}
async fn remove_failed_before(&self, before: DateTime<Utc>) -> Result<u64> {
let to_remove: Vec<JobId> = self
.jobs
.iter()
.filter(|entry| {
let j = entry.value();
j.state == JobState::Failed && j.updated_at < before
})
.map(|entry| *entry.key())
.collect();
let count = to_remove.len() as u64;
for id in to_remove {
self.jobs.remove(&id);
}
Ok(count)
}
async fn remove_dlq_before(&self, before: DateTime<Utc>) -> Result<u64> {
let to_remove: Vec<JobId> = self
.jobs
.iter()
.filter(|entry| {
let j = entry.value();
j.state == JobState::Dlq && j.updated_at < before
})
.map(|entry| *entry.key())
.collect();
let count = to_remove.len() as u64;
for id in to_remove {
self.jobs.remove(&id);
}
Ok(count)
}
async fn upsert_schedule(&self, schedule: &Schedule) -> Result<()> {
let mut schedules = self.schedules.write().unwrap();
schedules.insert(schedule.name.clone(), schedule.clone());
Ok(())
}
async fn get_active_schedules(&self) -> Result<Vec<Schedule>> {
let schedules = self.schedules.read().unwrap();
let active = schedules.values().filter(|s| !s.paused).cloned().collect();
Ok(active)
}
async fn delete_schedule(&self, name: &str) -> Result<()> {
let mut schedules = self.schedules.write().unwrap();
schedules.remove(name);
Ok(())
}
async fn get_schedule(&self, name: &str) -> Result<Option<Schedule>> {
let schedules = self.schedules.read().unwrap();
Ok(schedules.get(name).cloned())
}
async fn list_all_schedules(&self) -> Result<Vec<Schedule>> {
let schedules = self.schedules.read().unwrap();
Ok(schedules.values().cloned().collect())
}
async fn list_queue_names(&self) -> Result<Vec<String>> {
let mut names: std::collections::BTreeSet<String> = std::collections::BTreeSet::new();
for entry in self.jobs.iter() {
names.insert(entry.value().queue.clone());
}
Ok(names.into_iter().collect())
}
async fn get_job_by_unique_key(&self, queue: &str, key: &str) -> Result<Option<Job>> {
for entry in self.jobs.iter() {
let job = entry.value();
if job.queue == queue
&& job.unique_key.as_deref() == Some(key)
&& !matches!(
job.state,
JobState::Completed | JobState::Dlq | JobState::Cancelled
)
{
return Ok(Some(job.clone()));
}
}
Ok(None)
}
async fn get_active_jobs(&self) -> Result<Vec<Job>> {
Ok(self
.jobs
.iter()
.filter(|entry| entry.value().state == JobState::Active)
.map(|entry| entry.value().clone())
.collect())
}
async fn get_jobs_by_flow_id(&self, flow_id: &str) -> Result<Vec<Job>> {
Ok(self
.jobs
.iter()
.filter(|entry| entry.value().flow_id.as_deref() == Some(flow_id))
.map(|entry| entry.value().clone())
.collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Duration;
use serde_json::json;
fn test_job(queue: &str) -> Job {
Job::new(queue, "test-job", json!({"key": "value"}))
}
#[tokio::test]
async fn test_insert_and_get_job() {
let storage = MemoryStorage::new();
let job = test_job("emails");
let id = storage.insert_job(&job).await.unwrap();
assert_eq!(id, job.id);
let retrieved = storage
.get_job(id)
.await
.unwrap()
.expect("job should exist");
assert_eq!(retrieved.id, job.id);
assert_eq!(retrieved.queue, "emails");
assert_eq!(retrieved.name, "test-job");
assert_eq!(retrieved.state, JobState::Waiting);
assert_eq!(retrieved.data, json!({"key": "value"}));
assert_eq!(retrieved.priority, 0);
assert_eq!(retrieved.max_attempts, 3);
}
#[tokio::test]
async fn test_get_nonexistent_returns_none() {
let storage = MemoryStorage::new();
let fake_id = uuid::Uuid::now_v7();
let result = storage.get_job(fake_id).await.unwrap();
assert!(result.is_none());
}
#[tokio::test]
async fn test_dequeue_fifo_and_priority() {
let storage = MemoryStorage::new();
let mut low1 = test_job("work");
low1.priority = 1;
let mut low2 = test_job("work");
low2.priority = 1;
low2.created_at = low1.created_at + Duration::seconds(1);
let mut high = test_job("work");
high.priority = 10;
high.created_at = low1.created_at + Duration::seconds(5);
storage.insert_job(&low1).await.unwrap();
storage.insert_job(&low2).await.unwrap();
storage.insert_job(&high).await.unwrap();
let dequeued = storage.dequeue("work", 3).await.unwrap();
assert_eq!(dequeued.len(), 3);
assert_eq!(
dequeued[0].id, high.id,
"highest priority job should come first"
);
assert_eq!(
dequeued[1].id, low1.id,
"among same priority, earlier job should come first (FIFO)"
);
assert_eq!(
dequeued[2].id, low2.id,
"among same priority, later job should come last"
);
for job in &dequeued {
assert_eq!(job.state, JobState::Active);
assert!(job.started_at.is_some());
}
let stored = storage.get_job(high.id).await.unwrap().unwrap();
assert_eq!(stored.state, JobState::Active);
}
}