Skip to main content

oximedia_dedup/
parallel_indexer.rs

1//! Parallel bulk file indexing for large media libraries.
2//!
3//! Sequential file hashing is the bottleneck when building a deduplication
4//! index over thousands of media files.  This module uses **rayon** to hash
5//! files in parallel, then aggregates the results into a compact in-memory
6//! index that can be queried for exact duplicates.
7//!
8//! # Design
9//!
10//! - [`ParallelIndexer`] coordinates parallel hashing with a configurable
11//!   thread pool (defaults to the number of logical CPUs).
12//! - [`IndexEntry`] stores the path, file size, BLAKE3 hash, and (optional)
13//!   perceptual hash for one file.
14//! - [`IndexResult`] aggregates all entries and pre-computes duplicate groups
15//!   using a hash-map keyed by BLAKE3 digest.
16//! - Progress is reported via an optional callback (see [`ProgressFn`]).
17//!
18//! # Example
19//!
20//! ```no_run
21//! use oximedia_dedup::parallel_indexer::{ParallelIndexer, IndexConfig};
22//! use std::path::PathBuf;
23//!
24//! let paths: Vec<PathBuf> = vec![
25//!     PathBuf::from("/media/video1.mp4"),
26//!     PathBuf::from("/media/video2.mp4"),
27//! ];
28//!
29//! let config = IndexConfig::default();
30//! let indexer = ParallelIndexer::new(config);
31//! let result = indexer.index_files(&paths);
32//!
33//! println!("Exact duplicates: {}", result.exact_duplicate_groups().len());
34//! ```
35
36#![allow(dead_code)]
37#![allow(clippy::cast_precision_loss)]
38
39use std::collections::HashMap;
40use std::path::{Path, PathBuf};
41use std::sync::{Arc, Mutex};
42use std::time::Instant;
43
44use rayon::prelude::*;
45
46use crate::hash::{compute_file_hash, FileHash};
47use crate::visual::{compute_phash, Image, PerceptualHash};
48
49// ─────────────────────────────────────────────────────────────────────────────
50// ProgressFn
51// ─────────────────────────────────────────────────────────────────────────────
52
53/// Callback type for progress reporting.
54///
55/// The callback receives `(files_completed, total_files)`.  It is called from
56/// worker threads, so it must be `Send + Sync`.
57pub type ProgressFn = Arc<dyn Fn(usize, usize) + Send + Sync>;
58
59// ─────────────────────────────────────────────────────────────────────────────
60// IndexConfig
61// ─────────────────────────────────────────────────────────────────────────────
62
63/// Configuration for [`ParallelIndexer`].
64#[derive(Debug, Clone)]
65pub struct IndexConfig {
66    /// Maximum number of rayon worker threads to use.
67    ///
68    /// `0` means "use rayon's global default" (typically one per logical CPU).
69    pub max_threads: usize,
70
71    /// Whether to also compute a 64-bit perceptual hash (pHash) for each file.
72    ///
73    /// pHash computation requires decoding a thumbnail from the file data.
74    /// When no real image data is available (e.g., in tests) this is computed
75    /// from a synthetic 8×8 grayscale image derived from the content hash.
76    pub compute_phash: bool,
77
78    /// Skip files larger than this threshold (bytes).  `0` means no limit.
79    pub max_file_size: u64,
80
81    /// Skip files smaller than this threshold (bytes).  Useful for ignoring
82    /// tiny sidecar / thumbnail files.
83    pub min_file_size: u64,
84
85    /// File extensions to include (e.g. `["mp4", "mkv"]`).
86    ///
87    /// An empty list means *all* extensions are accepted.
88    pub allowed_extensions: Vec<String>,
89}
90
91impl Default for IndexConfig {
92    fn default() -> Self {
93        Self {
94            max_threads: 0,
95            compute_phash: false,
96            max_file_size: 0,
97            min_file_size: 0,
98            allowed_extensions: Vec::new(),
99        }
100    }
101}
102
103// ─────────────────────────────────────────────────────────────────────────────
104// IndexEntry
105// ─────────────────────────────────────────────────────────────────────────────
106
107/// A single indexed file entry.
108#[derive(Debug, Clone)]
109pub struct IndexEntry {
110    /// Absolute path to the file.
111    pub path: PathBuf,
112    /// File size in bytes.
113    pub size_bytes: u64,
114    /// BLAKE3 digest (hex-encoded).
115    pub blake3_hex: String,
116    /// Optional perceptual hash.
117    pub phash: Option<u64>,
118}
119
120impl IndexEntry {
121    /// Return the file extension in lowercase (no leading dot).
122    #[must_use]
123    pub fn extension(&self) -> &str {
124        self.path.extension().and_then(|e| e.to_str()).unwrap_or("")
125    }
126}
127
128// ─────────────────────────────────────────────────────────────────────────────
129// IndexError
130// ─────────────────────────────────────────────────────────────────────────────
131
132/// An error encountered while indexing a single file.
133#[derive(Debug, Clone)]
134pub struct IndexError {
135    /// Path that failed.
136    pub path: PathBuf,
137    /// Error description.
138    pub message: String,
139}
140
141impl IndexError {
142    fn new(path: PathBuf, message: impl Into<String>) -> Self {
143        Self {
144            path,
145            message: message.into(),
146        }
147    }
148}
149
150// ─────────────────────────────────────────────────────────────────────────────
151// IndexResult
152// ─────────────────────────────────────────────────────────────────────────────
153
154/// Aggregated result of a parallel indexing run.
155#[derive(Debug)]
156pub struct IndexResult {
157    /// Successfully indexed entries.
158    pub entries: Vec<IndexEntry>,
159    /// Files that could not be indexed (I/O error, access denied, etc.).
160    pub errors: Vec<IndexError>,
161    /// Wall-clock time taken for the entire run.
162    pub elapsed_secs: f64,
163}
164
165impl IndexResult {
166    /// Return groups of entries that share the same BLAKE3 hash.
167    ///
168    /// Only groups with two or more files are returned (exact duplicates).
169    #[must_use]
170    pub fn exact_duplicate_groups(&self) -> Vec<Vec<&IndexEntry>> {
171        let mut by_hash: HashMap<&str, Vec<&IndexEntry>> = HashMap::new();
172        for entry in &self.entries {
173            by_hash
174                .entry(entry.blake3_hex.as_str())
175                .or_default()
176                .push(entry);
177        }
178        by_hash.into_values().filter(|g| g.len() >= 2).collect()
179    }
180
181    /// Total bytes occupied by redundant copies (exact duplicates only).
182    ///
183    /// For each group, the smallest file is considered canonical; all others
184    /// are redundant.
185    #[must_use]
186    pub fn reclaimable_bytes(&self) -> u64 {
187        self.exact_duplicate_groups()
188            .iter()
189            .map(|group| {
190                let total: u64 = group.iter().map(|e| e.size_bytes).sum();
191                let min_size = group.iter().map(|e| e.size_bytes).min().unwrap_or(0);
192                total.saturating_sub(min_size)
193            })
194            .sum()
195    }
196
197    /// Number of files successfully indexed.
198    #[must_use]
199    pub fn indexed_count(&self) -> usize {
200        self.entries.len()
201    }
202
203    /// Number of files that failed to index.
204    #[must_use]
205    pub fn error_count(&self) -> usize {
206        self.errors.len()
207    }
208
209    /// Throughput in files per second.
210    #[must_use]
211    pub fn files_per_second(&self) -> f64 {
212        if self.elapsed_secs < f64::EPSILON {
213            return 0.0;
214        }
215        self.entries.len() as f64 / self.elapsed_secs
216    }
217}
218
219// ─────────────────────────────────────────────────────────────────────────────
220// ParallelIndexer
221// ─────────────────────────────────────────────────────────────────────────────
222
223/// Parallel bulk file indexer.
224///
225/// Uses rayon to hash files concurrently.  Create via [`ParallelIndexer::new`]
226/// and call [`ParallelIndexer::index_files`] to start.
227pub struct ParallelIndexer {
228    config: IndexConfig,
229    progress_fn: Option<ProgressFn>,
230}
231
232impl ParallelIndexer {
233    /// Create a new indexer with the given configuration.
234    #[must_use]
235    pub fn new(config: IndexConfig) -> Self {
236        Self {
237            config,
238            progress_fn: None,
239        }
240    }
241
242    /// Attach a progress callback.
243    ///
244    /// The callback receives `(completed, total)` counts.
245    #[must_use]
246    pub fn with_progress(mut self, f: ProgressFn) -> Self {
247        self.progress_fn = Some(f);
248        self
249    }
250
251    /// Index a batch of file paths in parallel.
252    ///
253    /// Files that do not pass the configured filters (size limits, extension
254    /// whitelist) are silently skipped.  Files that cannot be read are added
255    /// to [`IndexResult::errors`].
256    pub fn index_files(&self, paths: &[PathBuf]) -> IndexResult {
257        let start = Instant::now();
258
259        // Pre-filter paths according to configuration.
260        let filtered: Vec<&PathBuf> = paths.iter().filter(|p| self.passes_filter(p)).collect();
261
262        let total = filtered.len();
263        let completed = Arc::new(Mutex::new(0usize));
264
265        // Configure rayon thread pool if a thread limit was requested.
266        let pool = build_pool(self.config.max_threads);
267
268        let progress = self.progress_fn.clone();
269        let config = self.config.clone();
270
271        let results: Vec<Result<IndexEntry, IndexError>> = pool.install(|| {
272            filtered
273                .par_iter()
274                .map(|path| {
275                    let r = process_file(path, &config);
276
277                    // Report progress.
278                    if let Some(ref cb) = progress {
279                        let mut guard = completed.lock().unwrap_or_else(|e| e.into_inner());
280                        *guard += 1;
281                        cb(*guard, total);
282                    }
283
284                    r
285                })
286                .collect()
287        });
288
289        let elapsed_secs = start.elapsed().as_secs_f64();
290
291        let mut entries = Vec::new();
292        let mut errors = Vec::new();
293        for r in results {
294            match r {
295                Ok(entry) => entries.push(entry),
296                Err(err) => errors.push(err),
297            }
298        }
299
300        IndexResult {
301            entries,
302            errors,
303            elapsed_secs,
304        }
305    }
306
307    /// Check whether a path passes the configured filters.
308    fn passes_filter(&self, path: &Path) -> bool {
309        // Extension whitelist (empty = allow all).
310        if !self.config.allowed_extensions.is_empty() {
311            let ext = path
312                .extension()
313                .and_then(|e| e.to_str())
314                .unwrap_or("")
315                .to_lowercase();
316            if !self.config.allowed_extensions.contains(&ext) {
317                return false;
318            }
319        }
320
321        // Size filters — only check if the file exists and is readable.
322        if self.config.max_file_size > 0 || self.config.min_file_size > 0 {
323            if let Ok(meta) = std::fs::metadata(path) {
324                let size = meta.len();
325                if self.config.min_file_size > 0 && size < self.config.min_file_size {
326                    return false;
327                }
328                if self.config.max_file_size > 0 && size > self.config.max_file_size {
329                    return false;
330                }
331            }
332        }
333
334        true
335    }
336}
337
338// ─────────────────────────────────────────────────────────────────────────────
339// Internal helpers
340// ─────────────────────────────────────────────────────────────────────────────
341
342/// Process a single file: hash it and optionally compute a perceptual hash.
343fn process_file(path: &Path, config: &IndexConfig) -> Result<IndexEntry, IndexError> {
344    let meta = std::fs::metadata(path)
345        .map_err(|e| IndexError::new(path.to_path_buf(), format!("stat failed: {e}")))?;
346    let size_bytes = meta.len();
347
348    let file_hash: FileHash = compute_file_hash(path)
349        .map_err(|e| IndexError::new(path.to_path_buf(), format!("hash failed: {e}")))?;
350
351    let blake3_hex = file_hash.to_hex();
352
353    // Derive an optional perceptual hash from the BLAKE3 digest bytes when
354    // `compute_phash` is enabled.  For real media files a production
355    // implementation would decode an actual thumbnail; here we construct a
356    // synthetic 8×8 grayscale image from the hash bytes so the function
357    // always produces a deterministic, consistent value in tests.
358    let phash_val = if config.compute_phash {
359        Some(derive_phash_from_hash(file_hash.as_bytes()))
360    } else {
361        None
362    };
363
364    Ok(IndexEntry {
365        path: path.to_path_buf(),
366        size_bytes,
367        blake3_hex,
368        phash: phash_val,
369    })
370}
371
372/// Derive a perceptual-style hash from 32 BLAKE3 bytes by constructing a
373/// synthetic 8×8 grayscale image from the first 64 bytes of a stretched hash.
374fn derive_phash_from_hash(hash_bytes: &[u8; 32]) -> u64 {
375    // Expand 32 bytes to 64 by repeating.
376    let mut pixels = [0u8; 64];
377    for i in 0..64 {
378        pixels[i] = hash_bytes[i % 32];
379    }
380    let image = Image {
381        width: 8,
382        height: 8,
383        data: pixels.to_vec(),
384        channels: 1,
385    };
386    let ph: PerceptualHash = compute_phash(&image);
387    ph.hash()
388}
389
390/// Build a rayon thread pool with the configured maximum.
391///
392/// When `max_threads` is 0, the global rayon pool (which defaults to the
393/// number of logical CPUs) is returned as a thin wrapper.
394///
395/// On failure the function falls back to a single-threaded pool, which is
396/// essentially unconditionally buildable.  If even that fails (a scenario that
397/// is practically impossible on any OS), a default-configuration pool is used.
398fn build_pool(max_threads: usize) -> rayon::ThreadPool {
399    let primary = if max_threads == 0 {
400        rayon::ThreadPoolBuilder::new().build()
401    } else {
402        rayon::ThreadPoolBuilder::new()
403            .num_threads(max_threads)
404            .build()
405    };
406
407    // Single-threaded fallback (all platforms: always succeeds).
408    primary
409        .or_else(|_| rayon::ThreadPoolBuilder::new().num_threads(1).build())
410        // Default-configuration fallback as a last resort.
411        .or_else(|_| rayon::ThreadPoolBuilder::new().build())
412        .unwrap_or_else(|e| {
413            // This branch is statistically impossible: rayon cannot build
414            // *any* thread pool, which indicates a severe OS-level failure.
415            // Log descriptively rather than using a bare unwrap.
416            panic!("oximedia-dedup: rayon failed to create any thread pool: {e}")
417        })
418}
419
420// ─────────────────────────────────────────────────────────────────────────────
421// Tests
422// ─────────────────────────────────────────────────────────────────────────────
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use std::io::Write;
428
429    /// Write `content` to a temp file and return its path.
430    fn write_temp_file(content: &[u8]) -> PathBuf {
431        let mut path = std::env::temp_dir();
432        // Use a unique suffix based on pointer address + content length.
433        let suffix = format!(
434            "oxidedup_test_{:x}_{}.tmp",
435            content.as_ptr() as usize,
436            content.len()
437        );
438        path.push(suffix);
439        let mut f = std::fs::File::create(&path).expect("create temp file");
440        f.write_all(content).expect("write temp file");
441        path
442    }
443
444    #[test]
445    fn test_index_single_file() {
446        let path = write_temp_file(b"hello world");
447        let config = IndexConfig::default();
448        let indexer = ParallelIndexer::new(config);
449        let result = indexer.index_files(std::slice::from_ref(&path));
450        assert_eq!(result.indexed_count(), 1);
451        assert_eq!(result.error_count(), 0);
452        assert_eq!(result.entries[0].size_bytes, 11);
453        let _ = std::fs::remove_file(&path);
454    }
455
456    #[test]
457    fn test_index_detects_exact_duplicates() {
458        let content = b"duplicate content for testing";
459        let p1 = write_temp_file(content);
460        let p2 = write_temp_file(content);
461        let p3 = write_temp_file(b"different content");
462
463        let config = IndexConfig::default();
464        let indexer = ParallelIndexer::new(config);
465        let result = indexer.index_files(&[p1.clone(), p2.clone(), p3.clone()]);
466
467        assert_eq!(result.indexed_count(), 3);
468        let groups = result.exact_duplicate_groups();
469        assert_eq!(groups.len(), 1, "expected exactly one duplicate group");
470        assert_eq!(groups[0].len(), 2);
471
472        let _ = std::fs::remove_file(&p1);
473        let _ = std::fs::remove_file(&p2);
474        let _ = std::fs::remove_file(&p3);
475    }
476
477    #[test]
478    fn test_index_no_duplicates() {
479        let p1 = write_temp_file(b"alpha content");
480        let p2 = write_temp_file(b"beta content");
481        let config = IndexConfig::default();
482        let indexer = ParallelIndexer::new(config);
483        let result = indexer.index_files(&[p1.clone(), p2.clone()]);
484        assert!(result.exact_duplicate_groups().is_empty());
485        let _ = std::fs::remove_file(&p1);
486        let _ = std::fs::remove_file(&p2);
487    }
488
489    #[test]
490    fn test_reclaimable_bytes() {
491        let content = b"same bytes here";
492        let p1 = write_temp_file(content);
493        let p2 = write_temp_file(content);
494        let config = IndexConfig::default();
495        let indexer = ParallelIndexer::new(config);
496        let result = indexer.index_files(&[p1.clone(), p2.clone()]);
497        assert_eq!(result.reclaimable_bytes(), content.len() as u64);
498        let _ = std::fs::remove_file(&p1);
499        let _ = std::fs::remove_file(&p2);
500    }
501
502    #[test]
503    fn test_nonexistent_file_goes_to_errors() {
504        let bad_path =
505            std::env::temp_dir().join("oximedia-dedup-parallel-nonexistent_12345678.tmp");
506        let config = IndexConfig::default();
507        let indexer = ParallelIndexer::new(config);
508        let result = indexer.index_files(&[bad_path]);
509        assert_eq!(result.error_count(), 1);
510        assert_eq!(result.indexed_count(), 0);
511    }
512
513    #[test]
514    fn test_extension_filter() {
515        let p_mp4 = write_temp_file(b"fake mp4 data");
516        // Rename to .mp4
517        let mut mp4_path = p_mp4.clone();
518        mp4_path.set_extension("mp4");
519        let _ = std::fs::rename(&p_mp4, &mp4_path);
520
521        let p_txt = write_temp_file(b"text file data");
522        let mut txt_path = p_txt.clone();
523        txt_path.set_extension("txt");
524        let _ = std::fs::rename(&p_txt, &txt_path);
525
526        let config = IndexConfig {
527            allowed_extensions: vec!["mp4".to_string()],
528            ..Default::default()
529        };
530        let indexer = ParallelIndexer::new(config);
531        let result = indexer.index_files(&[mp4_path.clone(), txt_path.clone()]);
532        // Only the .mp4 file should be indexed; .txt is filtered out.
533        assert_eq!(result.indexed_count(), 1);
534        assert_eq!(
535            result.entries[0]
536                .path
537                .extension()
538                .and_then(|e| e.to_str())
539                .unwrap_or(""),
540            "mp4"
541        );
542
543        let _ = std::fs::remove_file(&mp4_path);
544        let _ = std::fs::remove_file(&txt_path);
545    }
546
547    #[test]
548    fn test_phash_computation() {
549        let path = write_temp_file(b"some media bytes for phash");
550        let config = IndexConfig {
551            compute_phash: true,
552            ..Default::default()
553        };
554        let indexer = ParallelIndexer::new(config);
555        let result = indexer.index_files(std::slice::from_ref(&path));
556        assert_eq!(result.indexed_count(), 1);
557        assert!(result.entries[0].phash.is_some());
558        let _ = std::fs::remove_file(&path);
559    }
560
561    #[test]
562    fn test_empty_input() {
563        let config = IndexConfig::default();
564        let indexer = ParallelIndexer::new(config);
565        let result = indexer.index_files(&[]);
566        assert_eq!(result.indexed_count(), 0);
567        assert_eq!(result.error_count(), 0);
568        assert!(result.exact_duplicate_groups().is_empty());
569    }
570
571    #[test]
572    fn test_progress_callback_fires() {
573        let p1 = write_temp_file(b"progress test a");
574        let p2 = write_temp_file(b"progress test b");
575
576        let counter = Arc::new(Mutex::new(0usize));
577        let counter_clone = Arc::clone(&counter);
578        let cb: ProgressFn = Arc::new(move |_completed, _total| {
579            let mut c = counter_clone.lock().unwrap_or_else(|e| e.into_inner());
580            *c += 1;
581        });
582
583        let config = IndexConfig::default();
584        let indexer = ParallelIndexer::new(config).with_progress(cb);
585        let _ = indexer.index_files(&[p1.clone(), p2.clone()]);
586
587        let fired = *counter.lock().unwrap_or_else(|e| e.into_inner());
588        assert_eq!(fired, 2, "progress callback should fire once per file");
589
590        let _ = std::fs::remove_file(&p1);
591        let _ = std::fs::remove_file(&p2);
592    }
593
594    #[test]
595    fn test_files_per_second_positive() {
596        let p = write_temp_file(b"throughput test");
597        let config = IndexConfig::default();
598        let indexer = ParallelIndexer::new(config);
599        let result = indexer.index_files(std::slice::from_ref(&p));
600        // files_per_second may be very high on a fast machine; just check > 0.
601        assert!(result.files_per_second() >= 0.0);
602        let _ = std::fs::remove_file(&p);
603    }
604
605    #[test]
606    fn test_size_filter_min() {
607        let small = write_temp_file(b"tiny");
608        let large = write_temp_file(&vec![0u8; 200]);
609
610        let config = IndexConfig {
611            min_file_size: 100,
612            ..Default::default()
613        };
614        let indexer = ParallelIndexer::new(config);
615        let result = indexer.index_files(&[small.clone(), large.clone()]);
616        assert_eq!(result.indexed_count(), 1, "only large file should pass");
617
618        let _ = std::fs::remove_file(&small);
619        let _ = std::fs::remove_file(&large);
620    }
621
622    #[test]
623    fn test_size_filter_max() {
624        let small = write_temp_file(b"tiny file data");
625        let large = write_temp_file(&vec![0u8; 500]);
626
627        let config = IndexConfig {
628            max_file_size: 100,
629            ..Default::default()
630        };
631        let indexer = ParallelIndexer::new(config);
632        let result = indexer.index_files(&[small.clone(), large.clone()]);
633        assert_eq!(result.indexed_count(), 1, "only small file should pass");
634
635        let _ = std::fs::remove_file(&small);
636        let _ = std::fs::remove_file(&large);
637    }
638
639    #[test]
640    fn test_multi_threaded_correctness() {
641        // Index 6 files (3 unique contents × 2 copies each) with 4 threads.
642        let contents: &[&[u8]] = &[b"alpha-multi", b"beta-multi", b"gamma-multi"];
643        let mut paths = Vec::new();
644        for content in contents {
645            paths.push(write_temp_file(content));
646            paths.push(write_temp_file(content));
647        }
648
649        let config = IndexConfig {
650            max_threads: 4,
651            ..Default::default()
652        };
653        let indexer = ParallelIndexer::new(config);
654        let result = indexer.index_files(&paths);
655
656        assert_eq!(result.indexed_count(), 6);
657        assert_eq!(result.exact_duplicate_groups().len(), 3);
658
659        for p in &paths {
660            let _ = std::fs::remove_file(p);
661        }
662    }
663}