Skip to main content

oximedia_dedup/
dedup_queue.rs

1#![allow(dead_code)]
2//! Deduplication work queue with priority scheduling.
3//!
4//! Provides a priority queue for scheduling deduplication tasks across
5//! media files, allowing high-priority items (large files, user requests)
6//! to be processed before low-priority background scans.
7
8use std::cmp::Ordering;
9use std::collections::BinaryHeap;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12/// Priority level for dedup tasks.
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
14pub enum DedupPriority {
15    /// Critical priority (user-requested immediate scan).
16    Critical,
17    /// High priority (newly ingested files).
18    High,
19    /// Normal priority (scheduled background scan).
20    Normal,
21    /// Low priority (periodic re-verification).
22    Low,
23    /// Background priority (idle-time processing).
24    Background,
25}
26
27impl DedupPriority {
28    /// Convert priority to a numeric value (higher = more urgent).
29    fn numeric(&self) -> u8 {
30        match self {
31            Self::Critical => 4,
32            Self::High => 3,
33            Self::Normal => 2,
34            Self::Low => 1,
35            Self::Background => 0,
36        }
37    }
38
39    /// Get display name for the priority.
40    pub fn name(&self) -> &'static str {
41        match self {
42            Self::Critical => "critical",
43            Self::High => "high",
44            Self::Normal => "normal",
45            Self::Low => "low",
46            Self::Background => "background",
47        }
48    }
49}
50
51impl PartialOrd for DedupPriority {
52    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
53        Some(self.cmp(other))
54    }
55}
56
57impl Ord for DedupPriority {
58    fn cmp(&self, other: &Self) -> Ordering {
59        self.numeric().cmp(&other.numeric())
60    }
61}
62
63/// Type of dedup task.
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum DedupTaskKind {
66    /// Hash a file for exact duplicate detection.
67    HashFile,
68    /// Compute perceptual hash for visual similarity.
69    PerceptualHash,
70    /// Full similarity comparison between two items.
71    Compare,
72    /// Re-verify an existing duplicate entry.
73    Verify,
74    /// Clean up stale entries.
75    Cleanup,
76}
77
78impl DedupTaskKind {
79    /// Get display name.
80    pub fn name(&self) -> &'static str {
81        match self {
82            Self::HashFile => "hash_file",
83            Self::PerceptualHash => "perceptual_hash",
84            Self::Compare => "compare",
85            Self::Verify => "verify",
86            Self::Cleanup => "cleanup",
87        }
88    }
89}
90
91/// A single dedup task in the queue.
92#[derive(Debug, Clone)]
93pub struct DedupTask {
94    /// Unique task identifier.
95    pub id: u64,
96    /// Priority.
97    pub priority: DedupPriority,
98    /// Task kind.
99    pub kind: DedupTaskKind,
100    /// File path or identifier.
101    pub target: String,
102    /// Optional second target for comparison tasks.
103    pub compare_target: Option<String>,
104    /// File size hint (for scheduling).
105    pub size_hint: u64,
106    /// Timestamp when the task was created (epoch millis).
107    pub created_at: u64,
108    /// Number of retry attempts.
109    pub retries: u32,
110    /// Maximum retries allowed.
111    pub max_retries: u32,
112}
113
114impl DedupTask {
115    /// Create a new dedup task.
116    pub fn new(id: u64, priority: DedupPriority, kind: DedupTaskKind, target: String) -> Self {
117        let created_at = SystemTime::now()
118            .duration_since(UNIX_EPOCH)
119            .map(|d| d.as_millis() as u64)
120            .unwrap_or(0);
121        Self {
122            id,
123            priority,
124            kind,
125            target,
126            compare_target: None,
127            size_hint: 0,
128            created_at,
129            retries: 0,
130            max_retries: 3,
131        }
132    }
133
134    /// Set the size hint.
135    pub fn with_size_hint(mut self, size: u64) -> Self {
136        self.size_hint = size;
137        self
138    }
139
140    /// Set the compare target.
141    pub fn with_compare_target(mut self, target: String) -> Self {
142        self.compare_target = Some(target);
143        self
144    }
145
146    /// Set the maximum retries.
147    pub fn with_max_retries(mut self, max: u32) -> Self {
148        self.max_retries = max;
149        self
150    }
151
152    /// Check if this task can be retried.
153    pub fn can_retry(&self) -> bool {
154        self.retries < self.max_retries
155    }
156
157    /// Increment the retry counter and return a new task for retry.
158    pub fn retry(&self) -> Option<Self> {
159        if !self.can_retry() {
160            return None;
161        }
162        let mut task = self.clone();
163        task.retries += 1;
164        Some(task)
165    }
166}
167
168impl PartialEq for DedupTask {
169    fn eq(&self, other: &Self) -> bool {
170        self.id == other.id
171    }
172}
173
174impl Eq for DedupTask {}
175
176impl PartialOrd for DedupTask {
177    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
178        Some(self.cmp(other))
179    }
180}
181
182impl Ord for DedupTask {
183    fn cmp(&self, other: &Self) -> Ordering {
184        // Higher priority first, then older tasks first (lower created_at)
185        self.priority
186            .cmp(&other.priority)
187            .then_with(|| other.created_at.cmp(&self.created_at))
188    }
189}
190
191/// Priority queue for dedup tasks.
192#[derive(Debug)]
193pub struct DedupQueue {
194    /// The priority queue.
195    heap: BinaryHeap<DedupTask>,
196    /// Next task ID.
197    next_id: u64,
198    /// Total tasks ever enqueued.
199    total_enqueued: u64,
200    /// Total tasks completed.
201    total_completed: u64,
202    /// Total tasks failed.
203    total_failed: u64,
204}
205
206impl DedupQueue {
207    /// Create a new empty dedup queue.
208    pub fn new() -> Self {
209        Self {
210            heap: BinaryHeap::new(),
211            next_id: 1,
212            total_enqueued: 0,
213            total_completed: 0,
214            total_failed: 0,
215        }
216    }
217
218    /// Enqueue a task, returning the assigned ID.
219    pub fn enqueue(&mut self, priority: DedupPriority, kind: DedupTaskKind, target: String) -> u64 {
220        let id = self.next_id;
221        self.next_id += 1;
222        let task = DedupTask::new(id, priority, kind, target);
223        self.heap.push(task);
224        self.total_enqueued += 1;
225        id
226    }
227
228    /// Enqueue a pre-built task.
229    pub fn enqueue_task(&mut self, task: DedupTask) {
230        self.heap.push(task);
231        self.total_enqueued += 1;
232    }
233
234    /// Dequeue the highest-priority task.
235    pub fn dequeue(&mut self) -> Option<DedupTask> {
236        self.heap.pop()
237    }
238
239    /// Peek at the highest-priority task without removing it.
240    pub fn peek(&self) -> Option<&DedupTask> {
241        self.heap.peek()
242    }
243
244    /// Get the number of pending tasks.
245    pub fn len(&self) -> usize {
246        self.heap.len()
247    }
248
249    /// Check if the queue is empty.
250    pub fn is_empty(&self) -> bool {
251        self.heap.is_empty()
252    }
253
254    /// Record a task completion.
255    pub fn record_completed(&mut self) {
256        self.total_completed += 1;
257    }
258
259    /// Record a task failure.
260    pub fn record_failed(&mut self) {
261        self.total_failed += 1;
262    }
263
264    /// Get queue statistics.
265    pub fn stats(&self) -> QueueStats {
266        QueueStats {
267            pending: self.heap.len(),
268            total_enqueued: self.total_enqueued,
269            total_completed: self.total_completed,
270            total_failed: self.total_failed,
271        }
272    }
273
274    /// Clear all pending tasks.
275    pub fn clear(&mut self) {
276        self.heap.clear();
277    }
278
279    /// Drain up to `n` tasks from the queue.
280    pub fn drain_batch(&mut self, n: usize) -> Vec<DedupTask> {
281        let mut batch = Vec::with_capacity(n);
282        for _ in 0..n {
283            if let Some(task) = self.heap.pop() {
284                batch.push(task);
285            } else {
286                break;
287            }
288        }
289        batch
290    }
291}
292
293impl Default for DedupQueue {
294    fn default() -> Self {
295        Self::new()
296    }
297}
298
299/// Queue statistics.
300#[derive(Debug, Clone)]
301pub struct QueueStats {
302    /// Number of pending tasks.
303    pub pending: usize,
304    /// Total tasks ever enqueued.
305    pub total_enqueued: u64,
306    /// Total tasks completed.
307    pub total_completed: u64,
308    /// Total tasks failed.
309    pub total_failed: u64,
310}
311
312impl QueueStats {
313    /// Get success rate as a fraction.
314    #[allow(clippy::cast_precision_loss)]
315    pub fn success_rate(&self) -> f64 {
316        let total = self.total_completed + self.total_failed;
317        if total == 0 {
318            return 1.0;
319        }
320        self.total_completed as f64 / total as f64
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_priority_ordering() {
330        assert!(DedupPriority::Critical > DedupPriority::High);
331        assert!(DedupPriority::High > DedupPriority::Normal);
332        assert!(DedupPriority::Normal > DedupPriority::Low);
333        assert!(DedupPriority::Low > DedupPriority::Background);
334    }
335
336    #[test]
337    fn test_priority_name() {
338        assert_eq!(DedupPriority::Critical.name(), "critical");
339        assert_eq!(DedupPriority::Background.name(), "background");
340    }
341
342    #[test]
343    fn test_task_kind_name() {
344        assert_eq!(DedupTaskKind::HashFile.name(), "hash_file");
345        assert_eq!(DedupTaskKind::Cleanup.name(), "cleanup");
346    }
347
348    #[test]
349    fn test_task_creation() {
350        let task = DedupTask::new(
351            1,
352            DedupPriority::Normal,
353            DedupTaskKind::HashFile,
354            "test.mp4".to_string(),
355        );
356        assert_eq!(task.id, 1);
357        assert_eq!(task.priority, DedupPriority::Normal);
358        assert_eq!(task.kind, DedupTaskKind::HashFile);
359        assert_eq!(task.target, "test.mp4");
360        assert!(task.compare_target.is_none());
361        assert_eq!(task.retries, 0);
362    }
363
364    #[test]
365    fn test_task_builders() {
366        let task = DedupTask::new(
367            1,
368            DedupPriority::High,
369            DedupTaskKind::Compare,
370            "a.mp4".to_string(),
371        )
372        .with_size_hint(1024)
373        .with_compare_target("b.mp4".to_string())
374        .with_max_retries(5);
375        assert_eq!(task.size_hint, 1024);
376        assert_eq!(task.compare_target.as_deref(), Some("b.mp4"));
377        assert_eq!(task.max_retries, 5);
378    }
379
380    #[test]
381    fn test_task_retry() {
382        let task = DedupTask::new(
383            1,
384            DedupPriority::Normal,
385            DedupTaskKind::HashFile,
386            "x".to_string(),
387        )
388        .with_max_retries(2);
389        assert!(task.can_retry());
390
391        let r1 = task.retry().expect("operation should succeed");
392        assert_eq!(r1.retries, 1);
393        assert!(r1.can_retry());
394
395        let r2 = r1.retry().expect("operation should succeed");
396        assert_eq!(r2.retries, 2);
397        assert!(!r2.can_retry());
398        assert!(r2.retry().is_none());
399    }
400
401    #[test]
402    fn test_queue_enqueue_dequeue() {
403        let mut q = DedupQueue::new();
404        assert!(q.is_empty());
405
406        q.enqueue(
407            DedupPriority::Normal,
408            DedupTaskKind::HashFile,
409            "a.mp4".to_string(),
410        );
411        q.enqueue(
412            DedupPriority::High,
413            DedupTaskKind::HashFile,
414            "b.mp4".to_string(),
415        );
416        q.enqueue(
417            DedupPriority::Low,
418            DedupTaskKind::HashFile,
419            "c.mp4".to_string(),
420        );
421
422        assert_eq!(q.len(), 3);
423
424        // Should dequeue in priority order
425        let t = q.dequeue().expect("operation should succeed");
426        assert_eq!(t.priority, DedupPriority::High);
427        let t = q.dequeue().expect("operation should succeed");
428        assert_eq!(t.priority, DedupPriority::Normal);
429        let t = q.dequeue().expect("operation should succeed");
430        assert_eq!(t.priority, DedupPriority::Low);
431        assert!(q.dequeue().is_none());
432    }
433
434    #[test]
435    fn test_queue_peek() {
436        let mut q = DedupQueue::new();
437        assert!(q.peek().is_none());
438
439        q.enqueue(
440            DedupPriority::Normal,
441            DedupTaskKind::HashFile,
442            "x".to_string(),
443        );
444        assert!(q.peek().is_some());
445        assert_eq!(q.len(), 1); // peek doesn't remove
446    }
447
448    #[test]
449    fn test_queue_stats() {
450        let mut q = DedupQueue::new();
451        q.enqueue(
452            DedupPriority::Normal,
453            DedupTaskKind::HashFile,
454            "a".to_string(),
455        );
456        q.enqueue(
457            DedupPriority::High,
458            DedupTaskKind::HashFile,
459            "b".to_string(),
460        );
461        let _ = q.dequeue();
462        q.record_completed();
463        q.record_failed();
464
465        let stats = q.stats();
466        assert_eq!(stats.pending, 1);
467        assert_eq!(stats.total_enqueued, 2);
468        assert_eq!(stats.total_completed, 1);
469        assert_eq!(stats.total_failed, 1);
470    }
471
472    #[test]
473    fn test_queue_clear() {
474        let mut q = DedupQueue::new();
475        q.enqueue(
476            DedupPriority::Normal,
477            DedupTaskKind::HashFile,
478            "a".to_string(),
479        );
480        q.enqueue(
481            DedupPriority::Normal,
482            DedupTaskKind::HashFile,
483            "b".to_string(),
484        );
485        q.clear();
486        assert!(q.is_empty());
487    }
488
489    #[test]
490    fn test_queue_drain_batch() {
491        let mut q = DedupQueue::new();
492        for i in 0..5 {
493            q.enqueue(
494                DedupPriority::Normal,
495                DedupTaskKind::HashFile,
496                format!("f{i}"),
497            );
498        }
499        let batch = q.drain_batch(3);
500        assert_eq!(batch.len(), 3);
501        assert_eq!(q.len(), 2);
502    }
503
504    #[test]
505    fn test_queue_drain_batch_more_than_available() {
506        let mut q = DedupQueue::new();
507        q.enqueue(
508            DedupPriority::Normal,
509            DedupTaskKind::HashFile,
510            "a".to_string(),
511        );
512        let batch = q.drain_batch(10);
513        assert_eq!(batch.len(), 1);
514        assert!(q.is_empty());
515    }
516
517    #[test]
518    fn test_success_rate() {
519        let stats = QueueStats {
520            pending: 0,
521            total_enqueued: 10,
522            total_completed: 8,
523            total_failed: 2,
524        };
525        assert!((stats.success_rate() - 0.8).abs() < f64::EPSILON);
526
527        let empty_stats = QueueStats {
528            pending: 0,
529            total_enqueued: 0,
530            total_completed: 0,
531            total_failed: 0,
532        };
533        assert!((empty_stats.success_rate() - 1.0).abs() < f64::EPSILON);
534    }
535
536    #[test]
537    fn test_queue_id_autoincrement() {
538        let mut q = DedupQueue::new();
539        let id1 = q.enqueue(
540            DedupPriority::Normal,
541            DedupTaskKind::HashFile,
542            "a".to_string(),
543        );
544        let id2 = q.enqueue(
545            DedupPriority::Normal,
546            DedupTaskKind::HashFile,
547            "b".to_string(),
548        );
549        assert_eq!(id1, 1);
550        assert_eq!(id2, 2);
551    }
552}