1#![allow(dead_code)]
37#![allow(clippy::cast_precision_loss)]
38
39use std::collections::HashMap;
40use std::path::{Path, PathBuf};
41use std::sync::{Arc, Mutex};
42use std::time::Instant;
43
44use rayon::prelude::*;
45
46use crate::hash::{compute_file_hash, FileHash};
47use crate::visual::{compute_phash, Image, PerceptualHash};
48
49pub type ProgressFn = Arc<dyn Fn(usize, usize) + Send + Sync>;
58
59#[derive(Debug, Clone)]
65pub struct IndexConfig {
66 pub max_threads: usize,
70
71 pub compute_phash: bool,
77
78 pub max_file_size: u64,
80
81 pub min_file_size: u64,
84
85 pub allowed_extensions: Vec<String>,
89}
90
91impl Default for IndexConfig {
92 fn default() -> Self {
93 Self {
94 max_threads: 0,
95 compute_phash: false,
96 max_file_size: 0,
97 min_file_size: 0,
98 allowed_extensions: Vec::new(),
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
109pub struct IndexEntry {
110 pub path: PathBuf,
112 pub size_bytes: u64,
114 pub blake3_hex: String,
116 pub phash: Option<u64>,
118}
119
120impl IndexEntry {
121 #[must_use]
123 pub fn extension(&self) -> &str {
124 self.path.extension().and_then(|e| e.to_str()).unwrap_or("")
125 }
126}
127
128#[derive(Debug, Clone)]
134pub struct IndexError {
135 pub path: PathBuf,
137 pub message: String,
139}
140
141impl IndexError {
142 fn new(path: PathBuf, message: impl Into<String>) -> Self {
143 Self {
144 path,
145 message: message.into(),
146 }
147 }
148}
149
150#[derive(Debug)]
156pub struct IndexResult {
157 pub entries: Vec<IndexEntry>,
159 pub errors: Vec<IndexError>,
161 pub elapsed_secs: f64,
163}
164
165impl IndexResult {
166 #[must_use]
170 pub fn exact_duplicate_groups(&self) -> Vec<Vec<&IndexEntry>> {
171 let mut by_hash: HashMap<&str, Vec<&IndexEntry>> = HashMap::new();
172 for entry in &self.entries {
173 by_hash
174 .entry(entry.blake3_hex.as_str())
175 .or_default()
176 .push(entry);
177 }
178 by_hash.into_values().filter(|g| g.len() >= 2).collect()
179 }
180
181 #[must_use]
186 pub fn reclaimable_bytes(&self) -> u64 {
187 self.exact_duplicate_groups()
188 .iter()
189 .map(|group| {
190 let total: u64 = group.iter().map(|e| e.size_bytes).sum();
191 let min_size = group.iter().map(|e| e.size_bytes).min().unwrap_or(0);
192 total.saturating_sub(min_size)
193 })
194 .sum()
195 }
196
197 #[must_use]
199 pub fn indexed_count(&self) -> usize {
200 self.entries.len()
201 }
202
203 #[must_use]
205 pub fn error_count(&self) -> usize {
206 self.errors.len()
207 }
208
209 #[must_use]
211 pub fn files_per_second(&self) -> f64 {
212 if self.elapsed_secs < f64::EPSILON {
213 return 0.0;
214 }
215 self.entries.len() as f64 / self.elapsed_secs
216 }
217}
218
219pub struct ParallelIndexer {
228 config: IndexConfig,
229 progress_fn: Option<ProgressFn>,
230}
231
232impl ParallelIndexer {
233 #[must_use]
235 pub fn new(config: IndexConfig) -> Self {
236 Self {
237 config,
238 progress_fn: None,
239 }
240 }
241
242 #[must_use]
246 pub fn with_progress(mut self, f: ProgressFn) -> Self {
247 self.progress_fn = Some(f);
248 self
249 }
250
251 pub fn index_files(&self, paths: &[PathBuf]) -> IndexResult {
257 let start = Instant::now();
258
259 let filtered: Vec<&PathBuf> = paths.iter().filter(|p| self.passes_filter(p)).collect();
261
262 let total = filtered.len();
263 let completed = Arc::new(Mutex::new(0usize));
264
265 let pool = build_pool(self.config.max_threads);
267
268 let progress = self.progress_fn.clone();
269 let config = self.config.clone();
270
271 let results: Vec<Result<IndexEntry, IndexError>> = pool.install(|| {
272 filtered
273 .par_iter()
274 .map(|path| {
275 let r = process_file(path, &config);
276
277 if let Some(ref cb) = progress {
279 let mut guard = completed.lock().unwrap_or_else(|e| e.into_inner());
280 *guard += 1;
281 cb(*guard, total);
282 }
283
284 r
285 })
286 .collect()
287 });
288
289 let elapsed_secs = start.elapsed().as_secs_f64();
290
291 let mut entries = Vec::new();
292 let mut errors = Vec::new();
293 for r in results {
294 match r {
295 Ok(entry) => entries.push(entry),
296 Err(err) => errors.push(err),
297 }
298 }
299
300 IndexResult {
301 entries,
302 errors,
303 elapsed_secs,
304 }
305 }
306
307 fn passes_filter(&self, path: &Path) -> bool {
309 if !self.config.allowed_extensions.is_empty() {
311 let ext = path
312 .extension()
313 .and_then(|e| e.to_str())
314 .unwrap_or("")
315 .to_lowercase();
316 if !self.config.allowed_extensions.contains(&ext) {
317 return false;
318 }
319 }
320
321 if self.config.max_file_size > 0 || self.config.min_file_size > 0 {
323 if let Ok(meta) = std::fs::metadata(path) {
324 let size = meta.len();
325 if self.config.min_file_size > 0 && size < self.config.min_file_size {
326 return false;
327 }
328 if self.config.max_file_size > 0 && size > self.config.max_file_size {
329 return false;
330 }
331 }
332 }
333
334 true
335 }
336}
337
338fn process_file(path: &Path, config: &IndexConfig) -> Result<IndexEntry, IndexError> {
344 let meta = std::fs::metadata(path)
345 .map_err(|e| IndexError::new(path.to_path_buf(), format!("stat failed: {e}")))?;
346 let size_bytes = meta.len();
347
348 let file_hash: FileHash = compute_file_hash(path)
349 .map_err(|e| IndexError::new(path.to_path_buf(), format!("hash failed: {e}")))?;
350
351 let blake3_hex = file_hash.to_hex();
352
353 let phash_val = if config.compute_phash {
359 Some(derive_phash_from_hash(file_hash.as_bytes()))
360 } else {
361 None
362 };
363
364 Ok(IndexEntry {
365 path: path.to_path_buf(),
366 size_bytes,
367 blake3_hex,
368 phash: phash_val,
369 })
370}
371
372fn derive_phash_from_hash(hash_bytes: &[u8; 32]) -> u64 {
375 let mut pixels = [0u8; 64];
377 for i in 0..64 {
378 pixels[i] = hash_bytes[i % 32];
379 }
380 let image = Image {
381 width: 8,
382 height: 8,
383 data: pixels.to_vec(),
384 channels: 1,
385 };
386 let ph: PerceptualHash = compute_phash(&image);
387 ph.hash()
388}
389
390fn build_pool(max_threads: usize) -> rayon::ThreadPool {
399 let primary = if max_threads == 0 {
400 rayon::ThreadPoolBuilder::new().build()
401 } else {
402 rayon::ThreadPoolBuilder::new()
403 .num_threads(max_threads)
404 .build()
405 };
406
407 primary
409 .or_else(|_| rayon::ThreadPoolBuilder::new().num_threads(1).build())
410 .or_else(|_| rayon::ThreadPoolBuilder::new().build())
412 .unwrap_or_else(|e| {
413 panic!("oximedia-dedup: rayon failed to create any thread pool: {e}")
417 })
418}
419
420#[cfg(test)]
425mod tests {
426 use super::*;
427 use std::io::Write;
428
429 fn write_temp_file(content: &[u8]) -> PathBuf {
431 let mut path = std::env::temp_dir();
432 let suffix = format!(
434 "oxidedup_test_{:x}_{}.tmp",
435 content.as_ptr() as usize,
436 content.len()
437 );
438 path.push(suffix);
439 let mut f = std::fs::File::create(&path).expect("create temp file");
440 f.write_all(content).expect("write temp file");
441 path
442 }
443
444 #[test]
445 fn test_index_single_file() {
446 let path = write_temp_file(b"hello world");
447 let config = IndexConfig::default();
448 let indexer = ParallelIndexer::new(config);
449 let result = indexer.index_files(std::slice::from_ref(&path));
450 assert_eq!(result.indexed_count(), 1);
451 assert_eq!(result.error_count(), 0);
452 assert_eq!(result.entries[0].size_bytes, 11);
453 let _ = std::fs::remove_file(&path);
454 }
455
456 #[test]
457 fn test_index_detects_exact_duplicates() {
458 let content = b"duplicate content for testing";
459 let p1 = write_temp_file(content);
460 let p2 = write_temp_file(content);
461 let p3 = write_temp_file(b"different content");
462
463 let config = IndexConfig::default();
464 let indexer = ParallelIndexer::new(config);
465 let result = indexer.index_files(&[p1.clone(), p2.clone(), p3.clone()]);
466
467 assert_eq!(result.indexed_count(), 3);
468 let groups = result.exact_duplicate_groups();
469 assert_eq!(groups.len(), 1, "expected exactly one duplicate group");
470 assert_eq!(groups[0].len(), 2);
471
472 let _ = std::fs::remove_file(&p1);
473 let _ = std::fs::remove_file(&p2);
474 let _ = std::fs::remove_file(&p3);
475 }
476
477 #[test]
478 fn test_index_no_duplicates() {
479 let p1 = write_temp_file(b"alpha content");
480 let p2 = write_temp_file(b"beta content");
481 let config = IndexConfig::default();
482 let indexer = ParallelIndexer::new(config);
483 let result = indexer.index_files(&[p1.clone(), p2.clone()]);
484 assert!(result.exact_duplicate_groups().is_empty());
485 let _ = std::fs::remove_file(&p1);
486 let _ = std::fs::remove_file(&p2);
487 }
488
489 #[test]
490 fn test_reclaimable_bytes() {
491 let content = b"same bytes here";
492 let p1 = write_temp_file(content);
493 let p2 = write_temp_file(content);
494 let config = IndexConfig::default();
495 let indexer = ParallelIndexer::new(config);
496 let result = indexer.index_files(&[p1.clone(), p2.clone()]);
497 assert_eq!(result.reclaimable_bytes(), content.len() as u64);
498 let _ = std::fs::remove_file(&p1);
499 let _ = std::fs::remove_file(&p2);
500 }
501
502 #[test]
503 fn test_nonexistent_file_goes_to_errors() {
504 let bad_path =
505 std::env::temp_dir().join("oximedia-dedup-parallel-nonexistent_12345678.tmp");
506 let config = IndexConfig::default();
507 let indexer = ParallelIndexer::new(config);
508 let result = indexer.index_files(&[bad_path]);
509 assert_eq!(result.error_count(), 1);
510 assert_eq!(result.indexed_count(), 0);
511 }
512
513 #[test]
514 fn test_extension_filter() {
515 let p_mp4 = write_temp_file(b"fake mp4 data");
516 let mut mp4_path = p_mp4.clone();
518 mp4_path.set_extension("mp4");
519 let _ = std::fs::rename(&p_mp4, &mp4_path);
520
521 let p_txt = write_temp_file(b"text file data");
522 let mut txt_path = p_txt.clone();
523 txt_path.set_extension("txt");
524 let _ = std::fs::rename(&p_txt, &txt_path);
525
526 let config = IndexConfig {
527 allowed_extensions: vec!["mp4".to_string()],
528 ..Default::default()
529 };
530 let indexer = ParallelIndexer::new(config);
531 let result = indexer.index_files(&[mp4_path.clone(), txt_path.clone()]);
532 assert_eq!(result.indexed_count(), 1);
534 assert_eq!(
535 result.entries[0]
536 .path
537 .extension()
538 .and_then(|e| e.to_str())
539 .unwrap_or(""),
540 "mp4"
541 );
542
543 let _ = std::fs::remove_file(&mp4_path);
544 let _ = std::fs::remove_file(&txt_path);
545 }
546
547 #[test]
548 fn test_phash_computation() {
549 let path = write_temp_file(b"some media bytes for phash");
550 let config = IndexConfig {
551 compute_phash: true,
552 ..Default::default()
553 };
554 let indexer = ParallelIndexer::new(config);
555 let result = indexer.index_files(std::slice::from_ref(&path));
556 assert_eq!(result.indexed_count(), 1);
557 assert!(result.entries[0].phash.is_some());
558 let _ = std::fs::remove_file(&path);
559 }
560
561 #[test]
562 fn test_empty_input() {
563 let config = IndexConfig::default();
564 let indexer = ParallelIndexer::new(config);
565 let result = indexer.index_files(&[]);
566 assert_eq!(result.indexed_count(), 0);
567 assert_eq!(result.error_count(), 0);
568 assert!(result.exact_duplicate_groups().is_empty());
569 }
570
571 #[test]
572 fn test_progress_callback_fires() {
573 let p1 = write_temp_file(b"progress test a");
574 let p2 = write_temp_file(b"progress test b");
575
576 let counter = Arc::new(Mutex::new(0usize));
577 let counter_clone = Arc::clone(&counter);
578 let cb: ProgressFn = Arc::new(move |_completed, _total| {
579 let mut c = counter_clone.lock().unwrap_or_else(|e| e.into_inner());
580 *c += 1;
581 });
582
583 let config = IndexConfig::default();
584 let indexer = ParallelIndexer::new(config).with_progress(cb);
585 let _ = indexer.index_files(&[p1.clone(), p2.clone()]);
586
587 let fired = *counter.lock().unwrap_or_else(|e| e.into_inner());
588 assert_eq!(fired, 2, "progress callback should fire once per file");
589
590 let _ = std::fs::remove_file(&p1);
591 let _ = std::fs::remove_file(&p2);
592 }
593
594 #[test]
595 fn test_files_per_second_positive() {
596 let p = write_temp_file(b"throughput test");
597 let config = IndexConfig::default();
598 let indexer = ParallelIndexer::new(config);
599 let result = indexer.index_files(std::slice::from_ref(&p));
600 assert!(result.files_per_second() >= 0.0);
602 let _ = std::fs::remove_file(&p);
603 }
604
605 #[test]
606 fn test_size_filter_min() {
607 let small = write_temp_file(b"tiny");
608 let large = write_temp_file(&vec![0u8; 200]);
609
610 let config = IndexConfig {
611 min_file_size: 100,
612 ..Default::default()
613 };
614 let indexer = ParallelIndexer::new(config);
615 let result = indexer.index_files(&[small.clone(), large.clone()]);
616 assert_eq!(result.indexed_count(), 1, "only large file should pass");
617
618 let _ = std::fs::remove_file(&small);
619 let _ = std::fs::remove_file(&large);
620 }
621
622 #[test]
623 fn test_size_filter_max() {
624 let small = write_temp_file(b"tiny file data");
625 let large = write_temp_file(&vec![0u8; 500]);
626
627 let config = IndexConfig {
628 max_file_size: 100,
629 ..Default::default()
630 };
631 let indexer = ParallelIndexer::new(config);
632 let result = indexer.index_files(&[small.clone(), large.clone()]);
633 assert_eq!(result.indexed_count(), 1, "only small file should pass");
634
635 let _ = std::fs::remove_file(&small);
636 let _ = std::fs::remove_file(&large);
637 }
638
639 #[test]
640 fn test_multi_threaded_correctness() {
641 let contents: &[&[u8]] = &[b"alpha-multi", b"beta-multi", b"gamma-multi"];
643 let mut paths = Vec::new();
644 for content in contents {
645 paths.push(write_temp_file(content));
646 paths.push(write_temp_file(content));
647 }
648
649 let config = IndexConfig {
650 max_threads: 4,
651 ..Default::default()
652 };
653 let indexer = ParallelIndexer::new(config);
654 let result = indexer.index_files(&paths);
655
656 assert_eq!(result.indexed_count(), 6);
657 assert_eq!(result.exact_duplicate_groups().len(), 3);
658
659 for p in &paths {
660 let _ = std::fs::remove_file(p);
661 }
662 }
663}