use priority_queue::PriorityQueue;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::time::Duration;
use tracing::{debug, warn};
use super::task::{Priority, UpdateTask};
const MAX_RETRIES: u32 = 3;
const BASE_BACKOFF_SECS: u64 = 1;
pub struct UpdateQueue {
queue: PriorityQueue<PathBuf, Priority>,
tasks: HashMap<PathBuf, UpdateTask>,
processing: HashSet<PathBuf>,
dead_letter: Vec<UpdateTask>,
}
impl UpdateQueue {
pub fn new() -> Self {
Self {
queue: PriorityQueue::new(),
tasks: HashMap::new(),
processing: HashSet::new(),
dead_letter: Vec::new(),
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
queue: PriorityQueue::with_capacity(capacity),
tasks: HashMap::with_capacity(capacity),
processing: HashSet::with_capacity(capacity / 2), dead_letter: Vec::new(),
}
}
pub fn enqueue(&mut self, task: UpdateTask) {
let path = task.path.clone();
let priority = task.priority;
if let Some(existing) = self.tasks.get_mut(&path) {
debug!(
"Merging task for path: {} (existing priority: {:?}, new priority: {:?})",
path.display(),
existing.priority,
priority
);
existing.merge(task);
if existing.priority != priority {
self.queue.change_priority(&path, existing.priority);
}
} else {
debug!(
"Enqueueing new task for path: {} (priority: {:?})",
path.display(),
priority
);
self.queue.push(path.clone(), priority);
self.tasks.insert(path, task);
}
}
pub fn dequeue(&mut self) -> Option<UpdateTask> {
while let Some((path, _priority)) = self.queue.pop() {
if self.processing.contains(&path) {
warn!(
"Task for {} already in processing set, skipping",
path.display()
);
continue;
}
if let Some(task) = self.tasks.remove(&path) {
self.processing.insert(path.clone());
debug!(
"Dequeued task for path: {} (priority: {:?})",
path.display(),
task.priority
);
return Some(task);
}
}
None
}
pub fn mark_completed(&mut self, path: &Path) {
if self.processing.remove(path) {
debug!("Marked task as completed: {}", path.display());
}
}
pub fn mark_failed(&mut self, mut task: UpdateTask, error: &str) {
self.processing.remove(&task.path);
task.increment_retry();
if task.has_exceeded_retries(MAX_RETRIES) {
warn!(
"Task exceeded max retries ({}), moving to dead letter queue: {} - last error: {}",
MAX_RETRIES,
task.path.display(),
error
);
self.dead_letter.push(task);
} else {
debug!(
"Re-enqueueing failed task: {} (retry {}/{}) - error: {}",
task.path.display(),
task.retry_count,
MAX_RETRIES,
error
);
self.enqueue(task);
}
}
pub fn get_dead_letter_queue(&self) -> &[UpdateTask] {
&self.dead_letter
}
pub fn queue_size(&self) -> usize {
self.queue.len()
}
pub fn processing_size(&self) -> usize {
self.processing.len()
}
pub fn dead_letter_size(&self) -> usize {
self.dead_letter.len()
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn clear(&mut self) {
debug!("Clearing {} tasks from queue", self.queue.len());
self.queue.clear();
self.tasks.clear();
}
pub fn clear_dead_letter(&mut self) {
debug!(
"Clearing {} tasks from dead letter queue",
self.dead_letter.len()
);
self.dead_letter.clear();
}
pub fn calculate_backoff(retry_count: u32) -> Duration {
let delay_secs = BASE_BACKOFF_SECS * 2_u64.pow(retry_count.saturating_sub(1));
Duration::from_secs(delay_secs)
}
pub fn retry_dead_letter(&mut self) -> usize {
let count = self.dead_letter.len();
debug!("Retrying {} tasks from dead letter queue", count);
let tasks: Vec<UpdateTask> = self.dead_letter.drain(..).collect();
for mut task in tasks {
task.retry_count = 0;
self.enqueue(task);
}
count
}
pub fn stats(&self) -> QueueStats {
QueueStats {
pending: self.queue_size(),
processing: self.processing_size(),
dead_letter: self.dead_letter_size(),
}
}
}
impl Default for UpdateQueue {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct QueueStats {
pub pending: usize,
pub processing: usize,
pub dead_letter: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::incremental::detector::ChangeType;
use crate::incremental::hash::FileHasher;
use crate::incremental::task::Trigger;
#[test]
fn test_queue_new() {
let queue = UpdateQueue::new();
assert_eq!(queue.queue_size(), 0);
assert_eq!(queue.processing_size(), 0);
assert_eq!(queue.dead_letter_size(), 0);
assert!(queue.is_empty());
}
#[test]
fn test_enqueue_dequeue() {
let mut queue = UpdateQueue::new();
let path = PathBuf::from("/test/file.rs");
let hash = FileHasher::hash_bytes(b"test");
let task = UpdateTask::new(path.clone(), ChangeType::New(hash), Trigger::Save);
queue.enqueue(task);
assert_eq!(queue.queue_size(), 1);
let dequeued = queue.dequeue();
assert!(dequeued.is_some());
assert_eq!(dequeued.unwrap().path, path);
assert_eq!(queue.queue_size(), 0);
assert_eq!(queue.processing_size(), 1);
}
#[test]
fn test_priority_ordering() {
let mut queue = UpdateQueue::new();
let hash = FileHasher::hash_bytes(b"test");
let low_task = UpdateTask::new(
PathBuf::from("/test/low.rs"),
ChangeType::New(hash),
Trigger::Auto,
);
let high_task = UpdateTask::new(
PathBuf::from("/test/high.rs"),
ChangeType::New(hash),
Trigger::User,
);
let medium_task = UpdateTask::new(
PathBuf::from("/test/medium.rs"),
ChangeType::New(hash),
Trigger::Save,
);
queue.enqueue(low_task);
queue.enqueue(high_task);
queue.enqueue(medium_task);
assert_eq!(queue.queue_size(), 3);
let task1 = queue.dequeue().unwrap();
assert_eq!(task1.priority, Priority::High);
let task2 = queue.dequeue().unwrap();
assert_eq!(task2.priority, Priority::Medium);
let task3 = queue.dequeue().unwrap();
assert_eq!(task3.priority, Priority::Low);
assert!(queue.is_empty());
}
#[test]
fn test_task_deduplication() {
let mut queue = UpdateQueue::new();
let path = PathBuf::from("/test/file.rs");
let hash1 = FileHasher::hash_bytes(b"v1");
let hash2 = FileHasher::hash_bytes(b"v2");
let task1 = UpdateTask::new(path.clone(), ChangeType::New(hash1), Trigger::Auto);
queue.enqueue(task1);
assert_eq!(queue.queue_size(), 1);
let task2 = UpdateTask::new(
path.clone(),
ChangeType::Modified {
old: hash1,
new: hash2,
},
Trigger::User,
);
queue.enqueue(task2);
assert_eq!(queue.queue_size(), 1);
let dequeued = queue.dequeue().unwrap();
assert_eq!(dequeued.priority, Priority::High);
}
#[test]
fn test_mark_completed() {
let mut queue = UpdateQueue::new();
let path = PathBuf::from("/test/file.rs");
let hash = FileHasher::hash_bytes(b"test");
let task = UpdateTask::new(path.clone(), ChangeType::New(hash), Trigger::Save);
queue.enqueue(task);
let dequeued = queue.dequeue().unwrap();
assert_eq!(queue.processing_size(), 1);
queue.mark_completed(&dequeued.path);
assert_eq!(queue.processing_size(), 0);
}
#[test]
fn test_mark_failed_with_retry() {
let mut queue = UpdateQueue::new();
let path = PathBuf::from("/test/file.rs");
let hash = FileHasher::hash_bytes(b"test");
let task = UpdateTask::new(path.clone(), ChangeType::New(hash), Trigger::Save);
queue.enqueue(task);
let dequeued = queue.dequeue().unwrap();
assert_eq!(queue.queue_size(), 0);
assert_eq!(queue.processing_size(), 1);
queue.mark_failed(dequeued, "test error");
assert_eq!(queue.queue_size(), 1);
assert_eq!(queue.processing_size(), 0);
assert_eq!(queue.dead_letter_size(), 0);
let retry_task = queue.dequeue().unwrap();
assert_eq!(retry_task.retry_count, 1);
}
#[test]
fn test_dead_letter_queue() {
let mut queue = UpdateQueue::new();
let path = PathBuf::from("/test/file.rs");
let hash = FileHasher::hash_bytes(b"test");
let task = UpdateTask::new(path.clone(), ChangeType::New(hash), Trigger::Save);
queue.enqueue(task);
for i in 0..MAX_RETRIES {
let dequeued = queue.dequeue().unwrap();
assert_eq!(dequeued.retry_count, i);
queue.mark_failed(dequeued, "persistent error");
}
assert_eq!(queue.queue_size(), 0);
assert_eq!(queue.dead_letter_size(), 1);
let dead_letter = queue.get_dead_letter_queue();
assert_eq!(dead_letter[0].path, path);
assert_eq!(dead_letter[0].retry_count, MAX_RETRIES);
}
#[test]
fn test_calculate_backoff() {
assert_eq!(UpdateQueue::calculate_backoff(0), Duration::from_secs(1));
assert_eq!(UpdateQueue::calculate_backoff(1), Duration::from_secs(1));
assert_eq!(UpdateQueue::calculate_backoff(2), Duration::from_secs(2));
assert_eq!(UpdateQueue::calculate_backoff(3), Duration::from_secs(4));
assert_eq!(UpdateQueue::calculate_backoff(4), Duration::from_secs(8));
}
#[test]
fn test_retry_dead_letter() {
let mut queue = UpdateQueue::new();
let path = PathBuf::from("/test/file.rs");
let hash = FileHasher::hash_bytes(b"test");
let task = UpdateTask::new(path.clone(), ChangeType::New(hash), Trigger::Save);
queue.enqueue(task);
for _ in 0..MAX_RETRIES {
let dequeued = queue.dequeue().unwrap();
queue.mark_failed(dequeued, "error");
}
assert_eq!(queue.dead_letter_size(), 1);
assert_eq!(queue.queue_size(), 0);
let count = queue.retry_dead_letter();
assert_eq!(count, 1);
assert_eq!(queue.dead_letter_size(), 0);
assert_eq!(queue.queue_size(), 1);
let task = queue.dequeue().unwrap();
assert_eq!(task.retry_count, 0);
}
#[test]
fn test_clear() {
let mut queue = UpdateQueue::new();
let hash = FileHasher::hash_bytes(b"test");
for i in 0..5 {
let task = UpdateTask::new(
PathBuf::from(format!("/test/file{}.rs", i)),
ChangeType::New(hash),
Trigger::Auto,
);
queue.enqueue(task);
}
assert_eq!(queue.queue_size(), 5);
queue.clear();
assert_eq!(queue.queue_size(), 0);
assert!(queue.is_empty());
}
#[test]
fn test_stats() {
let mut queue = UpdateQueue::new();
let hash = FileHasher::hash_bytes(b"test");
for i in 0..3 {
let task = UpdateTask::new(
PathBuf::from(format!("/test/file{}.rs", i)),
ChangeType::New(hash),
Trigger::Auto,
);
queue.enqueue(task);
}
let _processing_task = queue.dequeue();
let task = UpdateTask::new(
PathBuf::from("/test/failed.rs"),
ChangeType::New(hash),
Trigger::User, );
queue.enqueue(task);
for _ in 0..MAX_RETRIES {
let dequeued = queue.dequeue().unwrap();
queue.mark_failed(dequeued, "error");
}
let stats = queue.stats();
assert_eq!(stats.pending, 2); assert_eq!(stats.processing, 1); assert_eq!(stats.dead_letter, 1);
}
}