1#![allow(dead_code)]
29
30use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
31use std::sync::Arc;
32
33#[derive(Debug, Clone)]
39pub enum ProgressEvent<'a> {
40 PhaseStarted {
42 phase: &'a str,
44 total_items: usize,
46 },
47
48 ItemProcessed {
50 current: usize,
52 total: usize,
54 },
55
56 BatchProcessed {
58 batch_size: usize,
60 cumulative: usize,
62 total: usize,
64 },
65
66 PhaseCompleted {
68 phase: &'a str,
70 groups_found: usize,
72 elapsed_ms: u64,
74 },
75
76 RunCompleted {
78 total_groups: usize,
80 total_elapsed_ms: u64,
82 },
83
84 ItemError {
86 item_id: &'a str,
88 error: &'a str,
90 },
91}
92
93impl ProgressEvent<'_> {
94 #[must_use]
96 pub fn percentage(&self) -> Option<f64> {
97 match self {
98 Self::ItemProcessed { current, total } if *total > 0 => {
99 Some(*current as f64 / *total as f64 * 100.0)
100 }
101 Self::BatchProcessed {
102 cumulative, total, ..
103 } if *total > 0 => Some(*cumulative as f64 / *total as f64 * 100.0),
104 _ => None,
105 }
106 }
107}
108
109pub trait ProgressReporter: Send + Sync {
118 fn on_event(&self, event: &ProgressEvent<'_>);
120
121 fn is_cancelled(&self) -> bool {
126 false
127 }
128}
129
130#[derive(Debug, Clone, Copy, Default)]
138pub struct NullReporter;
139
140impl NullReporter {
141 #[must_use]
143 pub fn new() -> Self {
144 Self
145 }
146}
147
148impl ProgressReporter for NullReporter {
149 fn on_event(&self, _event: &ProgressEvent<'_>) {
150 }
152}
153
154#[derive(Debug, Default)]
163pub struct LoggingReporter {
164 messages: std::sync::Mutex<Vec<String>>,
166}
167
168impl LoggingReporter {
169 #[must_use]
171 pub fn new() -> Self {
172 Self::default()
173 }
174
175 pub fn messages(&self) -> Vec<String> {
177 self.messages
178 .lock()
179 .map(|msgs| msgs.clone())
180 .unwrap_or_default()
181 }
182
183 pub fn message_count(&self) -> usize {
185 self.messages.lock().map(|m| m.len()).unwrap_or(0)
186 }
187}
188
189impl ProgressReporter for LoggingReporter {
190 fn on_event(&self, event: &ProgressEvent<'_>) {
191 let msg = match event {
192 ProgressEvent::PhaseStarted { phase, total_items } => {
193 format!("[START] {phase}: {total_items} items")
194 }
195 ProgressEvent::ItemProcessed { current, total } => {
196 format!("[ITEM] {current}/{total}")
197 }
198 ProgressEvent::BatchProcessed {
199 batch_size,
200 cumulative,
201 total,
202 } => {
203 format!("[BATCH] +{batch_size} ({cumulative}/{total})")
204 }
205 ProgressEvent::PhaseCompleted {
206 phase,
207 groups_found,
208 elapsed_ms,
209 } => {
210 format!("[DONE] {phase}: {groups_found} groups in {elapsed_ms}ms")
211 }
212 ProgressEvent::RunCompleted {
213 total_groups,
214 total_elapsed_ms,
215 } => {
216 format!("[COMPLETE] {total_groups} groups in {total_elapsed_ms}ms")
217 }
218 ProgressEvent::ItemError { item_id, error } => {
219 format!("[ERROR] {item_id}: {error}")
220 }
221 };
222
223 if let Ok(mut msgs) = self.messages.lock() {
224 msgs.push(msg);
225 }
226 }
227}
228
229pub struct CallbackReporter<F>
235where
236 F: Fn(&ProgressEvent<'_>) + Send + Sync,
237{
238 callback: F,
239 cancelled: AtomicBool,
240}
241
242impl<F> CallbackReporter<F>
243where
244 F: Fn(&ProgressEvent<'_>) + Send + Sync,
245{
246 pub fn new(callback: F) -> Self {
248 Self {
249 callback,
250 cancelled: AtomicBool::new(false),
251 }
252 }
253
254 pub fn cancel(&self) {
256 self.cancelled.store(true, Ordering::Relaxed);
257 }
258}
259
260impl<F> ProgressReporter for CallbackReporter<F>
261where
262 F: Fn(&ProgressEvent<'_>) + Send + Sync,
263{
264 fn on_event(&self, event: &ProgressEvent<'_>) {
265 (self.callback)(event);
266 }
267
268 fn is_cancelled(&self) -> bool {
269 self.cancelled.load(Ordering::Relaxed)
270 }
271}
272
273pub struct ThrottledReporter<R: ProgressReporter> {
282 inner: R,
283 interval_ms: u64,
284 last_emit_ms: AtomicU64,
285}
286
287impl<R: ProgressReporter> ThrottledReporter<R> {
288 pub fn new(inner: R, interval_ms: u64) -> Self {
294 Self {
295 inner,
296 interval_ms,
297 last_emit_ms: AtomicU64::new(0),
298 }
299 }
300
301 fn now_ms(&self) -> u64 {
303 std::time::SystemTime::now()
304 .duration_since(std::time::UNIX_EPOCH)
305 .unwrap_or_default()
306 .as_millis() as u64
307 }
308}
309
310impl<R: ProgressReporter> ProgressReporter for ThrottledReporter<R> {
311 fn on_event(&self, event: &ProgressEvent<'_>) {
312 let should_throttle = matches!(
313 event,
314 ProgressEvent::ItemProcessed { .. } | ProgressEvent::BatchProcessed { .. }
315 );
316
317 if should_throttle {
318 let now = self.now_ms();
319 let last = self.last_emit_ms.load(Ordering::Relaxed);
320 if now.saturating_sub(last) < self.interval_ms {
321 return;
322 }
323 self.last_emit_ms.store(now, Ordering::Relaxed);
324 }
325
326 self.inner.on_event(event);
327 }
328
329 fn is_cancelled(&self) -> bool {
330 self.inner.is_cancelled()
331 }
332}
333
334pub struct MultiReporter {
340 reporters: Vec<Arc<dyn ProgressReporter>>,
341}
342
343impl MultiReporter {
344 #[must_use]
346 pub fn new(reporters: Vec<Arc<dyn ProgressReporter>>) -> Self {
347 Self { reporters }
348 }
349}
350
351impl ProgressReporter for MultiReporter {
352 fn on_event(&self, event: &ProgressEvent<'_>) {
353 for r in &self.reporters {
354 r.on_event(event);
355 }
356 }
357
358 fn is_cancelled(&self) -> bool {
359 self.reporters.iter().any(|r| r.is_cancelled())
360 }
361}
362
363pub struct ProgressTracker<'a> {
369 reporter: &'a dyn ProgressReporter,
370 phase: String,
371 total: usize,
372 processed: usize,
373 start_time_ms: u64,
374}
375
376impl<'a> ProgressTracker<'a> {
377 pub fn new(reporter: &'a dyn ProgressReporter, phase: &str, total: usize) -> Self {
379 let start = std::time::SystemTime::now()
380 .duration_since(std::time::UNIX_EPOCH)
381 .unwrap_or_default()
382 .as_millis() as u64;
383
384 reporter.on_event(&ProgressEvent::PhaseStarted {
385 phase,
386 total_items: total,
387 });
388
389 Self {
390 reporter,
391 phase: phase.to_string(),
392 total,
393 processed: 0,
394 start_time_ms: start,
395 }
396 }
397
398 pub fn tick(&mut self) {
400 self.processed += 1;
401 self.reporter.on_event(&ProgressEvent::ItemProcessed {
402 current: self.processed,
403 total: self.total,
404 });
405 }
406
407 pub fn tick_batch(&mut self, batch_size: usize) {
409 self.processed += batch_size;
410 self.reporter.on_event(&ProgressEvent::BatchProcessed {
411 batch_size,
412 cumulative: self.processed,
413 total: self.total,
414 });
415 }
416
417 pub fn report_error(&self, item_id: &str, error: &str) {
419 self.reporter
420 .on_event(&ProgressEvent::ItemError { item_id, error });
421 }
422
423 pub fn is_cancelled(&self) -> bool {
425 self.reporter.is_cancelled()
426 }
427
428 pub fn complete(self, groups_found: usize) {
430 let now = std::time::SystemTime::now()
431 .duration_since(std::time::UNIX_EPOCH)
432 .unwrap_or_default()
433 .as_millis() as u64;
434 let elapsed = now.saturating_sub(self.start_time_ms);
435
436 self.reporter.on_event(&ProgressEvent::PhaseCompleted {
437 phase: &self.phase,
438 groups_found,
439 elapsed_ms: elapsed,
440 });
441 }
442}
443
444#[cfg(test)]
449mod tests {
450 use super::*;
451
452 #[test]
453 fn test_null_reporter() {
454 let reporter = NullReporter::new();
455 reporter.on_event(&ProgressEvent::PhaseStarted {
456 phase: "test",
457 total_items: 100,
458 });
459 assert!(!reporter.is_cancelled());
460 }
461
462 #[test]
463 fn test_logging_reporter_captures_events() {
464 let reporter = LoggingReporter::new();
465 reporter.on_event(&ProgressEvent::PhaseStarted {
466 phase: "exact_hash",
467 total_items: 500,
468 });
469 reporter.on_event(&ProgressEvent::ItemProcessed {
470 current: 1,
471 total: 500,
472 });
473 reporter.on_event(&ProgressEvent::PhaseCompleted {
474 phase: "exact_hash",
475 groups_found: 10,
476 elapsed_ms: 250,
477 });
478
479 assert_eq!(reporter.message_count(), 3);
480 let msgs = reporter.messages();
481 assert!(msgs[0].contains("[START]"));
482 assert!(msgs[0].contains("exact_hash"));
483 assert!(msgs[1].contains("[ITEM]"));
484 assert!(msgs[2].contains("[DONE]"));
485 }
486
487 #[test]
488 fn test_logging_reporter_error_event() {
489 let reporter = LoggingReporter::new();
490 reporter.on_event(&ProgressEvent::ItemError {
491 item_id: "bad_file.mp4",
492 error: "Permission denied",
493 });
494 let msgs = reporter.messages();
495 assert_eq!(msgs.len(), 1);
496 assert!(msgs[0].contains("[ERROR]"));
497 assert!(msgs[0].contains("bad_file.mp4"));
498 }
499
500 #[test]
501 fn test_callback_reporter() {
502 let counter = Arc::new(AtomicU64::new(0));
503 let counter_clone = counter.clone();
504
505 let reporter = CallbackReporter::new(move |_event| {
506 counter_clone.fetch_add(1, Ordering::Relaxed);
507 });
508
509 reporter.on_event(&ProgressEvent::PhaseStarted {
510 phase: "test",
511 total_items: 10,
512 });
513 reporter.on_event(&ProgressEvent::ItemProcessed {
514 current: 1,
515 total: 10,
516 });
517
518 assert_eq!(counter.load(Ordering::Relaxed), 2);
519 assert!(!reporter.is_cancelled());
520 }
521
522 #[test]
523 fn test_callback_reporter_cancellation() {
524 let reporter = CallbackReporter::new(|_| {});
525 assert!(!reporter.is_cancelled());
526 reporter.cancel();
527 assert!(reporter.is_cancelled());
528 }
529
530 #[test]
531 fn test_progress_event_percentage() {
532 let event = ProgressEvent::ItemProcessed {
533 current: 50,
534 total: 200,
535 };
536 let pct = event.percentage().expect("should have percentage");
537 assert!((pct - 25.0).abs() < 0.01);
538 }
539
540 #[test]
541 fn test_progress_event_percentage_batch() {
542 let event = ProgressEvent::BatchProcessed {
543 batch_size: 10,
544 cumulative: 75,
545 total: 100,
546 };
547 let pct = event.percentage().expect("should have percentage");
548 assert!((pct - 75.0).abs() < 0.01);
549 }
550
551 #[test]
552 fn test_progress_event_no_percentage_for_phase() {
553 let event = ProgressEvent::PhaseStarted {
554 phase: "test",
555 total_items: 100,
556 };
557 assert!(event.percentage().is_none());
558 }
559
560 #[test]
561 fn test_multi_reporter() {
562 let r1 = Arc::new(LoggingReporter::new());
563 let r2 = Arc::new(LoggingReporter::new());
564
565 let multi = MultiReporter::new(vec![r1.clone(), r2.clone()]);
566 multi.on_event(&ProgressEvent::PhaseStarted {
567 phase: "test",
568 total_items: 50,
569 });
570
571 assert_eq!(r1.message_count(), 1);
572 assert_eq!(r2.message_count(), 1);
573 assert!(!multi.is_cancelled());
574 }
575
576 #[test]
577 fn test_progress_tracker_lifecycle() {
578 let reporter = LoggingReporter::new();
579 let mut tracker = ProgressTracker::new(&reporter, "scan", 3);
580
581 tracker.tick();
582 tracker.tick();
583 tracker.tick();
584 assert!(!tracker.is_cancelled());
585 tracker.complete(1);
586
587 let msgs = reporter.messages();
588 assert_eq!(msgs.len(), 5);
590 assert!(msgs[0].contains("[START]"));
591 assert!(msgs[4].contains("[DONE]"));
592 }
593
594 #[test]
595 fn test_progress_tracker_batch() {
596 let reporter = LoggingReporter::new();
597 let mut tracker = ProgressTracker::new(&reporter, "index", 100);
598
599 tracker.tick_batch(25);
600 tracker.tick_batch(25);
601 tracker.complete(5);
602
603 let msgs = reporter.messages();
604 assert_eq!(msgs.len(), 4); assert!(msgs[1].contains("[BATCH]"));
606 }
607
608 #[test]
609 fn test_progress_tracker_error_reporting() {
610 let reporter = LoggingReporter::new();
611 let tracker = ProgressTracker::new(&reporter, "scan", 10);
612
613 tracker.report_error("corrupt.mp4", "invalid header");
614 tracker.complete(0);
615
616 let msgs = reporter.messages();
617 assert!(msgs.iter().any(|m| m.contains("[ERROR]")));
618 assert!(msgs.iter().any(|m| m.contains("corrupt.mp4")));
619 }
620
621 #[test]
622 fn test_throttled_reporter_forwards_phase_events() {
623 let inner = LoggingReporter::new();
624 let throttled = ThrottledReporter::new(inner, 1000);
625
626 throttled.on_event(&ProgressEvent::PhaseStarted {
628 phase: "test",
629 total_items: 100,
630 });
631 throttled.on_event(&ProgressEvent::PhaseCompleted {
632 phase: "test",
633 groups_found: 5,
634 elapsed_ms: 500,
635 });
636
637 assert_eq!(throttled.inner.message_count(), 2);
638 }
639
640 #[test]
641 fn test_throttled_reporter_throttles_items() {
642 let inner = LoggingReporter::new();
643 let throttled = ThrottledReporter::new(inner, 60_000);
645
646 throttled.on_event(&ProgressEvent::ItemProcessed {
648 current: 1,
649 total: 100,
650 });
651 throttled.on_event(&ProgressEvent::ItemProcessed {
653 current: 2,
654 total: 100,
655 });
656 throttled.on_event(&ProgressEvent::ItemProcessed {
657 current: 3,
658 total: 100,
659 });
660
661 assert_eq!(throttled.inner.message_count(), 1);
663 }
664
665 #[test]
666 fn test_run_completed_event() {
667 let reporter = LoggingReporter::new();
668 reporter.on_event(&ProgressEvent::RunCompleted {
669 total_groups: 15,
670 total_elapsed_ms: 5000,
671 });
672 let msgs = reporter.messages();
673 assert_eq!(msgs.len(), 1);
674 assert!(msgs[0].contains("[COMPLETE]"));
675 assert!(msgs[0].contains("15 groups"));
676 }
677}