use crate::error::Result;
use crate::traits::job::{Job, JobData, JobQueue};
use async_trait::async_trait;
#[cfg(feature = "jobs")]
use chrono::{DateTime, Duration, Utc};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::{Mutex, Notify};
use uuid::Uuid;
const DEFAULT_MAX_HISTORY_SIZE: usize = 10_000;
#[derive(Clone)]
pub struct InMemoryJobQueue {
pending: Arc<Mutex<VecDeque<JobData>>>,
processing: Arc<Mutex<HashMap<String, JobData>>>,
completed: Arc<Mutex<VecDeque<JobData>>>,
failed: Arc<Mutex<VecDeque<JobData>>>,
scheduled: Arc<Mutex<BTreeMap<DateTime<Utc>, Vec<JobData>>>>,
max_retries: u32,
retry_backoff_seconds: u64,
max_history_size: usize,
health_status: Arc<AtomicBool>,
shutdown: Arc<AtomicBool>,
job_available: Arc<Notify>,
scheduler_wakeup: Arc<Notify>,
scheduler_handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
}
impl InMemoryJobQueue {
pub fn new(max_retries: u32, retry_backoff_seconds: u64) -> Self {
Self::with_history_limit(max_retries, retry_backoff_seconds, DEFAULT_MAX_HISTORY_SIZE)
}
pub fn with_history_limit(
max_retries: u32,
retry_backoff_seconds: u64,
max_history_size: usize,
) -> Self {
let shutdown = Arc::new(AtomicBool::new(false));
let queue = Self {
pending: Arc::new(Mutex::new(VecDeque::new())),
processing: Arc::new(Mutex::new(HashMap::new())),
completed: Arc::new(Mutex::new(VecDeque::new())),
failed: Arc::new(Mutex::new(VecDeque::new())),
scheduled: Arc::new(Mutex::new(BTreeMap::new())),
max_retries,
retry_backoff_seconds,
max_history_size,
health_status: Arc::new(AtomicBool::new(true)),
shutdown,
job_available: Arc::new(Notify::new()),
scheduler_wakeup: Arc::new(Notify::new()),
scheduler_handle: Arc::new(Mutex::new(None)),
};
queue.start_scheduler_task();
queue
}
pub async fn shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
self.job_available.notify_waiters();
self.scheduler_wakeup.notify_waiters();
let mut handle_guard = self.scheduler_handle.lock().await;
if let Some(handle) = handle_guard.take() {
match tokio::time::timeout(tokio::time::Duration::from_secs(5), handle).await {
Ok(_) => tracing::debug!("In-memory job queue scheduler stopped cleanly"),
Err(_) => {
tracing::warn!("In-memory job queue scheduler did not stop within timeout")
}
}
}
}
fn push_to_bounded_history(history: &mut VecDeque<JobData>, job: JobData, max_size: usize) {
if history.len() >= max_size {
history.pop_front(); }
history.push_back(job);
}
fn start_scheduler_task(&self) {
let scheduled = self.scheduled.clone();
let pending = self.pending.clone();
let shutdown = self.shutdown.clone();
let job_available = self.job_available.clone();
let scheduler_wakeup = self.scheduler_wakeup.clone();
let scheduler_handle = self.scheduler_handle.clone();
let handle = tokio::spawn(async move {
loop {
if shutdown.load(Ordering::Acquire) {
tracing::debug!("In-memory job queue scheduler shutting down");
break;
}
let now = Utc::now();
let mut moved_due_jobs = false;
let next_wait = {
let mut scheduled_guard = scheduled.lock().await;
let mut pending_guard = pending.lock().await;
let keys_to_remove: Vec<DateTime<Utc>> = scheduled_guard
.iter()
.take_while(|(time, _)| **time <= now)
.map(|(time, _)| *time)
.collect();
for key in keys_to_remove {
if let Some(jobs) = scheduled_guard.remove(&key) {
moved_due_jobs = true;
for job in jobs {
pending_guard.push_back(job);
}
}
}
scheduled_guard.iter().next().map(|(run_at, _)| {
(*run_at - now)
.to_std()
.unwrap_or(std::time::Duration::ZERO)
})
};
if moved_due_jobs {
job_available.notify_waiters();
continue;
}
match next_wait {
Some(wait) => {
tokio::select! {
_ = tokio::time::sleep(wait) => {},
_ = scheduler_wakeup.notified() => {},
}
}
None => {
scheduler_wakeup.notified().await;
}
}
}
});
if let Ok(mut guard) = scheduler_handle.try_lock() {
*guard = Some(handle);
} else {
handle.abort();
tracing::error!("Failed to store scheduler handle");
}
}
}
#[async_trait]
impl JobQueue for InMemoryJobQueue {
async fn enqueue(&self, job: &dyn Job) -> Result<String> {
let job_id = Uuid::new_v4().to_string();
let payload = job.serialize()?;
let job_data = JobData::new(
job_id.clone(),
job.job_type().to_string(),
payload,
self.max_retries,
);
let mut pending = self.pending.lock().await;
pending.push_back(job_data);
self.job_available.notify_one();
Ok(job_id)
}
async fn dequeue(&self) -> Result<Option<JobData>> {
let mut pending = self.pending.lock().await;
if let Some(job_data) = pending.pop_front() {
let mut processing = self.processing.lock().await;
processing.insert(job_data.job_id.clone(), job_data.clone());
Ok(Some(job_data))
} else {
Ok(None)
}
}
async fn complete(&self, job_id: &str) -> Result<()> {
let mut processing = self.processing.lock().await;
if let Some(job_data) = processing.remove(job_id) {
let mut completed = self.completed.lock().await;
Self::push_to_bounded_history(&mut completed, job_data, self.max_history_size);
}
Ok(())
}
async fn fail(&self, job_id: &str, _error: String) -> Result<()> {
let mut processing = self.processing.lock().await;
if let Some(mut job_data) = processing.remove(job_id) {
if job_data.should_retry() {
let backoff_seconds =
self.retry_backoff_seconds * (2_u64.pow(job_data.retry_count));
let retry_at = Utc::now() + Duration::seconds(backoff_seconds as i64);
job_data.increment_retry();
let mut scheduled = self.scheduled.lock().await;
scheduled
.entry(retry_at)
.or_insert_with(Vec::new)
.push(job_data);
self.scheduler_wakeup.notify_one();
} else {
let mut failed = self.failed.lock().await;
Self::push_to_bounded_history(&mut failed, job_data, self.max_history_size);
}
}
Ok(())
}
async fn retry(&self, job_id: &str) -> Result<()> {
let mut processing = self.processing.lock().await;
if let Some(mut job_data) = processing.remove(job_id) {
if job_data.should_retry() {
job_data.increment_retry();
let mut pending = self.pending.lock().await;
pending.push_back(job_data);
self.job_available.notify_one();
} else {
let mut failed = self.failed.lock().await;
Self::push_to_bounded_history(&mut failed, job_data, self.max_history_size);
}
}
Ok(())
}
#[cfg(feature = "jobs")]
async fn schedule(&self, job: &dyn Job, run_at: DateTime<Utc>) -> Result<String> {
let job_id = Uuid::new_v4().to_string();
let payload = job.serialize()?;
let job_data = JobData::scheduled(
job_id.clone(),
job.job_type().to_string(),
payload,
self.max_retries,
run_at,
);
let mut scheduled = self.scheduled.lock().await;
scheduled
.entry(run_at)
.or_insert_with(Vec::new)
.push(job_data);
self.scheduler_wakeup.notify_one();
Ok(job_id)
}
fn is_healthy(&self) -> bool {
self.health_status.load(Ordering::Acquire)
}
async fn wait_for_job(&self, timeout: std::time::Duration) {
let notified = self.job_available.notified();
if !self.pending.lock().await.is_empty() {
return;
}
let _ = tokio::time::timeout(timeout, notified).await;
}
}
impl Default for InMemoryJobQueue {
fn default() -> Self {
Self::new(3, 60)
}
}