Skip to main content

oximedia_dedup/
progress.rs

1//! Progress reporting callbacks for long-running deduplication operations.
2//!
3//! Provides a [`ProgressReporter`] trait and concrete implementations for
4//! monitoring the progress of `DuplicateDetector::find_duplicates()` and
5//! similar batch operations in large media libraries.
6//!
7//! # Example
8//!
9//! ```
10//! use oximedia_dedup::progress::{ProgressReporter, ProgressEvent, LoggingReporter};
11//!
12//! let reporter = LoggingReporter::new();
13//! reporter.on_event(&ProgressEvent::PhaseStarted {
14//!     phase: "exact_hash",
15//!     total_items: 1000,
16//! });
17//! reporter.on_event(&ProgressEvent::ItemProcessed {
18//!     current: 500,
19//!     total: 1000,
20//! });
21//! reporter.on_event(&ProgressEvent::PhaseCompleted {
22//!     phase: "exact_hash",
23//!     groups_found: 42,
24//!     elapsed_ms: 1234,
25//! });
26//! ```
27
28#![allow(dead_code)]
29
30use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
31use std::sync::Arc;
32
33// ---------------------------------------------------------------------------
34// Progress events
35// ---------------------------------------------------------------------------
36
37/// An event emitted during deduplication progress.
38#[derive(Debug, Clone)]
39pub enum ProgressEvent<'a> {
40    /// A detection phase has started.
41    PhaseStarted {
42        /// Name of the phase (e.g., "exact_hash", "perceptual_hash", "ssim").
43        phase: &'a str,
44        /// Total number of items to process in this phase.
45        total_items: usize,
46    },
47
48    /// A single item has been processed.
49    ItemProcessed {
50        /// Current item index (1-based).
51        current: usize,
52        /// Total items in this phase.
53        total: usize,
54    },
55
56    /// A batch of items has been processed.
57    BatchProcessed {
58        /// Number of items in this batch.
59        batch_size: usize,
60        /// Cumulative items processed so far.
61        cumulative: usize,
62        /// Total items in this phase.
63        total: usize,
64    },
65
66    /// A detection phase has completed.
67    PhaseCompleted {
68        /// Name of the phase.
69        phase: &'a str,
70        /// Number of duplicate groups found in this phase.
71        groups_found: usize,
72        /// Wall-clock time in milliseconds.
73        elapsed_ms: u64,
74    },
75
76    /// The entire deduplication run has completed.
77    RunCompleted {
78        /// Total duplicate groups found across all phases.
79        total_groups: usize,
80        /// Total wall-clock time in milliseconds.
81        total_elapsed_ms: u64,
82    },
83
84    /// An error occurred during processing (non-fatal; processing continues).
85    ItemError {
86        /// Item identifier (e.g., file path).
87        item_id: &'a str,
88        /// Error description.
89        error: &'a str,
90    },
91}
92
93impl ProgressEvent<'_> {
94    /// Returns the progress percentage (0.0 - 100.0) if applicable.
95    #[must_use]
96    pub fn percentage(&self) -> Option<f64> {
97        match self {
98            Self::ItemProcessed { current, total } if *total > 0 => {
99                Some(*current as f64 / *total as f64 * 100.0)
100            }
101            Self::BatchProcessed {
102                cumulative, total, ..
103            } if *total > 0 => Some(*cumulative as f64 / *total as f64 * 100.0),
104            _ => None,
105        }
106    }
107}
108
109// ---------------------------------------------------------------------------
110// ProgressReporter trait
111// ---------------------------------------------------------------------------
112
113/// Trait for receiving progress updates during deduplication.
114///
115/// Implementations should be efficient -- `on_event` may be called thousands
116/// of times per second for large libraries.
117pub trait ProgressReporter: Send + Sync {
118    /// Called when a progress event occurs.
119    fn on_event(&self, event: &ProgressEvent<'_>);
120
121    /// Returns `true` if the operation should be cancelled.
122    ///
123    /// Implementations can use this to allow user-initiated cancellation.
124    /// The default returns `false` (never cancel).
125    fn is_cancelled(&self) -> bool {
126        false
127    }
128}
129
130// ---------------------------------------------------------------------------
131// Null reporter (no-op)
132// ---------------------------------------------------------------------------
133
134/// A no-op progress reporter that discards all events.
135///
136/// This is the default when no progress reporting is needed.
137#[derive(Debug, Clone, Copy, Default)]
138pub struct NullReporter;
139
140impl NullReporter {
141    /// Create a new null reporter.
142    #[must_use]
143    pub fn new() -> Self {
144        Self
145    }
146}
147
148impl ProgressReporter for NullReporter {
149    fn on_event(&self, _event: &ProgressEvent<'_>) {
150        // intentionally empty
151    }
152}
153
154// ---------------------------------------------------------------------------
155// Logging reporter
156// ---------------------------------------------------------------------------
157
158/// A progress reporter that logs events to a Vec for later inspection.
159///
160/// Useful for testing and batch processing where you want to review
161/// progress after the fact.
162#[derive(Debug, Default)]
163pub struct LoggingReporter {
164    /// Logged messages.
165    messages: std::sync::Mutex<Vec<String>>,
166}
167
168impl LoggingReporter {
169    /// Create a new logging reporter.
170    #[must_use]
171    pub fn new() -> Self {
172        Self::default()
173    }
174
175    /// Return all logged messages.
176    pub fn messages(&self) -> Vec<String> {
177        self.messages
178            .lock()
179            .map(|msgs| msgs.clone())
180            .unwrap_or_default()
181    }
182
183    /// Return the number of logged messages.
184    pub fn message_count(&self) -> usize {
185        self.messages.lock().map(|m| m.len()).unwrap_or(0)
186    }
187}
188
189impl ProgressReporter for LoggingReporter {
190    fn on_event(&self, event: &ProgressEvent<'_>) {
191        let msg = match event {
192            ProgressEvent::PhaseStarted { phase, total_items } => {
193                format!("[START] {phase}: {total_items} items")
194            }
195            ProgressEvent::ItemProcessed { current, total } => {
196                format!("[ITEM] {current}/{total}")
197            }
198            ProgressEvent::BatchProcessed {
199                batch_size,
200                cumulative,
201                total,
202            } => {
203                format!("[BATCH] +{batch_size} ({cumulative}/{total})")
204            }
205            ProgressEvent::PhaseCompleted {
206                phase,
207                groups_found,
208                elapsed_ms,
209            } => {
210                format!("[DONE] {phase}: {groups_found} groups in {elapsed_ms}ms")
211            }
212            ProgressEvent::RunCompleted {
213                total_groups,
214                total_elapsed_ms,
215            } => {
216                format!("[COMPLETE] {total_groups} groups in {total_elapsed_ms}ms")
217            }
218            ProgressEvent::ItemError { item_id, error } => {
219                format!("[ERROR] {item_id}: {error}")
220            }
221        };
222
223        if let Ok(mut msgs) = self.messages.lock() {
224            msgs.push(msg);
225        }
226    }
227}
228
229// ---------------------------------------------------------------------------
230// Callback reporter
231// ---------------------------------------------------------------------------
232
233/// A progress reporter backed by a user-supplied closure.
234pub struct CallbackReporter<F>
235where
236    F: Fn(&ProgressEvent<'_>) + Send + Sync,
237{
238    callback: F,
239    cancelled: AtomicBool,
240}
241
242impl<F> CallbackReporter<F>
243where
244    F: Fn(&ProgressEvent<'_>) + Send + Sync,
245{
246    /// Create a new callback reporter.
247    pub fn new(callback: F) -> Self {
248        Self {
249            callback,
250            cancelled: AtomicBool::new(false),
251        }
252    }
253
254    /// Signal that the operation should be cancelled.
255    pub fn cancel(&self) {
256        self.cancelled.store(true, Ordering::Relaxed);
257    }
258}
259
260impl<F> ProgressReporter for CallbackReporter<F>
261where
262    F: Fn(&ProgressEvent<'_>) + Send + Sync,
263{
264    fn on_event(&self, event: &ProgressEvent<'_>) {
265        (self.callback)(event);
266    }
267
268    fn is_cancelled(&self) -> bool {
269        self.cancelled.load(Ordering::Relaxed)
270    }
271}
272
273// ---------------------------------------------------------------------------
274// Throttled reporter
275// ---------------------------------------------------------------------------
276
277/// A progress reporter that throttles `ItemProcessed` and `BatchProcessed`
278/// events to at most one per `interval_ms` milliseconds.
279///
280/// Phase start/complete and error events are always forwarded immediately.
281pub struct ThrottledReporter<R: ProgressReporter> {
282    inner: R,
283    interval_ms: u64,
284    last_emit_ms: AtomicU64,
285}
286
287impl<R: ProgressReporter> ThrottledReporter<R> {
288    /// Create a new throttled reporter wrapping `inner`.
289    ///
290    /// # Arguments
291    /// * `inner` - The underlying reporter to forward events to.
292    /// * `interval_ms` - Minimum milliseconds between forwarded progress events.
293    pub fn new(inner: R, interval_ms: u64) -> Self {
294        Self {
295            inner,
296            interval_ms,
297            last_emit_ms: AtomicU64::new(0),
298        }
299    }
300
301    /// Current wall-clock time in milliseconds since an arbitrary epoch.
302    fn now_ms(&self) -> u64 {
303        std::time::SystemTime::now()
304            .duration_since(std::time::UNIX_EPOCH)
305            .unwrap_or_default()
306            .as_millis() as u64
307    }
308}
309
310impl<R: ProgressReporter> ProgressReporter for ThrottledReporter<R> {
311    fn on_event(&self, event: &ProgressEvent<'_>) {
312        let should_throttle = matches!(
313            event,
314            ProgressEvent::ItemProcessed { .. } | ProgressEvent::BatchProcessed { .. }
315        );
316
317        if should_throttle {
318            let now = self.now_ms();
319            let last = self.last_emit_ms.load(Ordering::Relaxed);
320            if now.saturating_sub(last) < self.interval_ms {
321                return;
322            }
323            self.last_emit_ms.store(now, Ordering::Relaxed);
324        }
325
326        self.inner.on_event(event);
327    }
328
329    fn is_cancelled(&self) -> bool {
330        self.inner.is_cancelled()
331    }
332}
333
334// ---------------------------------------------------------------------------
335// Multi-reporter (fan-out)
336// ---------------------------------------------------------------------------
337
338/// Forwards events to multiple reporters.
339pub struct MultiReporter {
340    reporters: Vec<Arc<dyn ProgressReporter>>,
341}
342
343impl MultiReporter {
344    /// Create a new multi-reporter.
345    #[must_use]
346    pub fn new(reporters: Vec<Arc<dyn ProgressReporter>>) -> Self {
347        Self { reporters }
348    }
349}
350
351impl ProgressReporter for MultiReporter {
352    fn on_event(&self, event: &ProgressEvent<'_>) {
353        for r in &self.reporters {
354            r.on_event(event);
355        }
356    }
357
358    fn is_cancelled(&self) -> bool {
359        self.reporters.iter().any(|r| r.is_cancelled())
360    }
361}
362
363// ---------------------------------------------------------------------------
364// Progress tracker (helper for emitting events)
365// ---------------------------------------------------------------------------
366
367/// Helper struct for tracking and emitting progress events from dedup algorithms.
368pub struct ProgressTracker<'a> {
369    reporter: &'a dyn ProgressReporter,
370    phase: String,
371    total: usize,
372    processed: usize,
373    start_time_ms: u64,
374}
375
376impl<'a> ProgressTracker<'a> {
377    /// Create a new tracker for a phase.
378    pub fn new(reporter: &'a dyn ProgressReporter, phase: &str, total: usize) -> Self {
379        let start = std::time::SystemTime::now()
380            .duration_since(std::time::UNIX_EPOCH)
381            .unwrap_or_default()
382            .as_millis() as u64;
383
384        reporter.on_event(&ProgressEvent::PhaseStarted {
385            phase,
386            total_items: total,
387        });
388
389        Self {
390            reporter,
391            phase: phase.to_string(),
392            total,
393            processed: 0,
394            start_time_ms: start,
395        }
396    }
397
398    /// Report that one item has been processed.
399    pub fn tick(&mut self) {
400        self.processed += 1;
401        self.reporter.on_event(&ProgressEvent::ItemProcessed {
402            current: self.processed,
403            total: self.total,
404        });
405    }
406
407    /// Report that a batch of items has been processed.
408    pub fn tick_batch(&mut self, batch_size: usize) {
409        self.processed += batch_size;
410        self.reporter.on_event(&ProgressEvent::BatchProcessed {
411            batch_size,
412            cumulative: self.processed,
413            total: self.total,
414        });
415    }
416
417    /// Report an error for an item.
418    pub fn report_error(&self, item_id: &str, error: &str) {
419        self.reporter
420            .on_event(&ProgressEvent::ItemError { item_id, error });
421    }
422
423    /// Returns `true` if cancellation has been requested.
424    pub fn is_cancelled(&self) -> bool {
425        self.reporter.is_cancelled()
426    }
427
428    /// Complete the phase and emit a `PhaseCompleted` event.
429    pub fn complete(self, groups_found: usize) {
430        let now = std::time::SystemTime::now()
431            .duration_since(std::time::UNIX_EPOCH)
432            .unwrap_or_default()
433            .as_millis() as u64;
434        let elapsed = now.saturating_sub(self.start_time_ms);
435
436        self.reporter.on_event(&ProgressEvent::PhaseCompleted {
437            phase: &self.phase,
438            groups_found,
439            elapsed_ms: elapsed,
440        });
441    }
442}
443
444// ---------------------------------------------------------------------------
445// Tests
446// ---------------------------------------------------------------------------
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451
452    #[test]
453    fn test_null_reporter() {
454        let reporter = NullReporter::new();
455        reporter.on_event(&ProgressEvent::PhaseStarted {
456            phase: "test",
457            total_items: 100,
458        });
459        assert!(!reporter.is_cancelled());
460    }
461
462    #[test]
463    fn test_logging_reporter_captures_events() {
464        let reporter = LoggingReporter::new();
465        reporter.on_event(&ProgressEvent::PhaseStarted {
466            phase: "exact_hash",
467            total_items: 500,
468        });
469        reporter.on_event(&ProgressEvent::ItemProcessed {
470            current: 1,
471            total: 500,
472        });
473        reporter.on_event(&ProgressEvent::PhaseCompleted {
474            phase: "exact_hash",
475            groups_found: 10,
476            elapsed_ms: 250,
477        });
478
479        assert_eq!(reporter.message_count(), 3);
480        let msgs = reporter.messages();
481        assert!(msgs[0].contains("[START]"));
482        assert!(msgs[0].contains("exact_hash"));
483        assert!(msgs[1].contains("[ITEM]"));
484        assert!(msgs[2].contains("[DONE]"));
485    }
486
487    #[test]
488    fn test_logging_reporter_error_event() {
489        let reporter = LoggingReporter::new();
490        reporter.on_event(&ProgressEvent::ItemError {
491            item_id: "bad_file.mp4",
492            error: "Permission denied",
493        });
494        let msgs = reporter.messages();
495        assert_eq!(msgs.len(), 1);
496        assert!(msgs[0].contains("[ERROR]"));
497        assert!(msgs[0].contains("bad_file.mp4"));
498    }
499
500    #[test]
501    fn test_callback_reporter() {
502        let counter = Arc::new(AtomicU64::new(0));
503        let counter_clone = counter.clone();
504
505        let reporter = CallbackReporter::new(move |_event| {
506            counter_clone.fetch_add(1, Ordering::Relaxed);
507        });
508
509        reporter.on_event(&ProgressEvent::PhaseStarted {
510            phase: "test",
511            total_items: 10,
512        });
513        reporter.on_event(&ProgressEvent::ItemProcessed {
514            current: 1,
515            total: 10,
516        });
517
518        assert_eq!(counter.load(Ordering::Relaxed), 2);
519        assert!(!reporter.is_cancelled());
520    }
521
522    #[test]
523    fn test_callback_reporter_cancellation() {
524        let reporter = CallbackReporter::new(|_| {});
525        assert!(!reporter.is_cancelled());
526        reporter.cancel();
527        assert!(reporter.is_cancelled());
528    }
529
530    #[test]
531    fn test_progress_event_percentage() {
532        let event = ProgressEvent::ItemProcessed {
533            current: 50,
534            total: 200,
535        };
536        let pct = event.percentage().expect("should have percentage");
537        assert!((pct - 25.0).abs() < 0.01);
538    }
539
540    #[test]
541    fn test_progress_event_percentage_batch() {
542        let event = ProgressEvent::BatchProcessed {
543            batch_size: 10,
544            cumulative: 75,
545            total: 100,
546        };
547        let pct = event.percentage().expect("should have percentage");
548        assert!((pct - 75.0).abs() < 0.01);
549    }
550
551    #[test]
552    fn test_progress_event_no_percentage_for_phase() {
553        let event = ProgressEvent::PhaseStarted {
554            phase: "test",
555            total_items: 100,
556        };
557        assert!(event.percentage().is_none());
558    }
559
560    #[test]
561    fn test_multi_reporter() {
562        let r1 = Arc::new(LoggingReporter::new());
563        let r2 = Arc::new(LoggingReporter::new());
564
565        let multi = MultiReporter::new(vec![r1.clone(), r2.clone()]);
566        multi.on_event(&ProgressEvent::PhaseStarted {
567            phase: "test",
568            total_items: 50,
569        });
570
571        assert_eq!(r1.message_count(), 1);
572        assert_eq!(r2.message_count(), 1);
573        assert!(!multi.is_cancelled());
574    }
575
576    #[test]
577    fn test_progress_tracker_lifecycle() {
578        let reporter = LoggingReporter::new();
579        let mut tracker = ProgressTracker::new(&reporter, "scan", 3);
580
581        tracker.tick();
582        tracker.tick();
583        tracker.tick();
584        assert!(!tracker.is_cancelled());
585        tracker.complete(1);
586
587        let msgs = reporter.messages();
588        // 1 start + 3 items + 1 complete = 5 messages
589        assert_eq!(msgs.len(), 5);
590        assert!(msgs[0].contains("[START]"));
591        assert!(msgs[4].contains("[DONE]"));
592    }
593
594    #[test]
595    fn test_progress_tracker_batch() {
596        let reporter = LoggingReporter::new();
597        let mut tracker = ProgressTracker::new(&reporter, "index", 100);
598
599        tracker.tick_batch(25);
600        tracker.tick_batch(25);
601        tracker.complete(5);
602
603        let msgs = reporter.messages();
604        assert_eq!(msgs.len(), 4); // start + 2 batches + complete
605        assert!(msgs[1].contains("[BATCH]"));
606    }
607
608    #[test]
609    fn test_progress_tracker_error_reporting() {
610        let reporter = LoggingReporter::new();
611        let tracker = ProgressTracker::new(&reporter, "scan", 10);
612
613        tracker.report_error("corrupt.mp4", "invalid header");
614        tracker.complete(0);
615
616        let msgs = reporter.messages();
617        assert!(msgs.iter().any(|m| m.contains("[ERROR]")));
618        assert!(msgs.iter().any(|m| m.contains("corrupt.mp4")));
619    }
620
621    #[test]
622    fn test_throttled_reporter_forwards_phase_events() {
623        let inner = LoggingReporter::new();
624        let throttled = ThrottledReporter::new(inner, 1000);
625
626        // Phase events should always be forwarded
627        throttled.on_event(&ProgressEvent::PhaseStarted {
628            phase: "test",
629            total_items: 100,
630        });
631        throttled.on_event(&ProgressEvent::PhaseCompleted {
632            phase: "test",
633            groups_found: 5,
634            elapsed_ms: 500,
635        });
636
637        assert_eq!(throttled.inner.message_count(), 2);
638    }
639
640    #[test]
641    fn test_throttled_reporter_throttles_items() {
642        let inner = LoggingReporter::new();
643        // Very long interval to ensure throttling
644        let throttled = ThrottledReporter::new(inner, 60_000);
645
646        // First item should be forwarded
647        throttled.on_event(&ProgressEvent::ItemProcessed {
648            current: 1,
649            total: 100,
650        });
651        // Subsequent items within the interval should be throttled
652        throttled.on_event(&ProgressEvent::ItemProcessed {
653            current: 2,
654            total: 100,
655        });
656        throttled.on_event(&ProgressEvent::ItemProcessed {
657            current: 3,
658            total: 100,
659        });
660
661        // Only 1 item event should have been forwarded
662        assert_eq!(throttled.inner.message_count(), 1);
663    }
664
665    #[test]
666    fn test_run_completed_event() {
667        let reporter = LoggingReporter::new();
668        reporter.on_event(&ProgressEvent::RunCompleted {
669            total_groups: 15,
670            total_elapsed_ms: 5000,
671        });
672        let msgs = reporter.messages();
673        assert_eq!(msgs.len(), 1);
674        assert!(msgs[0].contains("[COMPLETE]"));
675        assert!(msgs[0].contains("15 groups"));
676    }
677}