use crate::jobs::JobId;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashSet};
use std::time::Duration;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueuedJob {
pub id: JobId,
pub job_type: String,
pub payload: Vec<u8>,
pub priority: i32,
pub max_retries: u32,
pub timeout: Duration,
pub enqueued_at: DateTime<Utc>,
pub attempt: u32,
}
#[derive(Debug, Clone)]
struct QueueEntry {
job: QueuedJob,
}
impl PartialEq for QueueEntry {
fn eq(&self, other: &Self) -> bool {
self.job.priority == other.job.priority
&& self.job.enqueued_at == other.job.enqueued_at
}
}
impl Eq for QueueEntry {}
impl PartialOrd for QueueEntry {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for QueueEntry {
fn cmp(&self, other: &Self) -> Ordering {
match other.job.priority.cmp(&self.job.priority) {
Ordering::Equal => {
self.job.enqueued_at.cmp(&other.job.enqueued_at)
}
ord => ord,
}
}
}
#[derive(Debug)]
pub(super) struct JobQueue {
heap: BinaryHeap<QueueEntry>,
ids: HashSet<JobId>,
max_size: usize,
}
impl JobQueue {
#[must_use]
pub(super) fn new(max_size: usize) -> Self {
Self {
heap: BinaryHeap::new(),
ids: HashSet::new(),
max_size,
}
}
pub(super) fn enqueue(&mut self, job: QueuedJob) -> Result<(), String> {
if self.heap.len() >= self.max_size {
return Err(format!("Queue is full (max: {})", self.max_size));
}
if self.ids.contains(&job.id) {
return Err(format!("Job {} is already queued", job.id));
}
self.ids.insert(job.id);
self.heap.push(QueueEntry { job });
Ok(())
}
#[must_use]
pub(super) fn contains(&self, id: &JobId) -> bool {
self.ids.contains(id)
}
pub(super) fn remove(&mut self, id: &JobId) -> Option<QueuedJob> {
if !self.ids.contains(id) {
return None;
}
self.ids.remove(id);
let jobs: Vec<QueueEntry> = std::mem::take(&mut self.heap).into_vec();
let (removed, remaining): (Vec<_>, Vec<_>) = jobs.into_iter().partition(|entry| entry.job.id == *id);
self.heap = remaining.into_iter().collect();
removed.into_iter().next().map(|entry| entry.job)
}
#[must_use]
#[allow(dead_code)] pub(super) fn len(&self) -> usize {
self.heap.len()
}
#[must_use]
#[allow(dead_code)] pub(super) fn is_empty(&self) -> bool {
self.heap.is_empty()
}
}