1#![allow(dead_code)]
59#![allow(clippy::cast_precision_loss)]
60
61use std::collections::HashMap;
62
63use serde::{Deserialize, Serialize};
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct FileRecord {
72 pub uri: String,
74 pub blake3_hex: String,
76 pub phash: Option<u64>,
78 pub duration_s: Option<f64>,
80 pub file_size: Option<u64>,
82}
83
84impl FileRecord {
85 #[must_use]
87 pub fn new(
88 uri: String,
89 blake3_hex: String,
90 phash: Option<u64>,
91 duration_s: Option<f64>,
92 file_size: Option<u64>,
93 ) -> Self {
94 Self {
95 uri,
96 blake3_hex,
97 phash,
98 duration_s,
99 file_size,
100 }
101 }
102
103 #[must_use]
107 pub fn has_valid_digest(&self) -> bool {
108 self.blake3_hex.len() == 64 && self.blake3_hex.chars().all(|c| c.is_ascii_hexdigit())
109 }
110
111 #[must_use]
115 pub fn phash_distance(&self, other: &Self) -> Option<u32> {
116 match (self.phash, other.phash) {
117 (Some(a), Some(b)) => Some((a ^ b).count_ones()),
118 _ => None,
119 }
120 }
121
122 #[must_use]
125 pub fn node_name(&self) -> Option<&str> {
126 self.uri.split(':').next()
127 }
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct NodeManifest {
140 pub node_id: String,
142 pub records: Vec<FileRecord>,
144 pub created_at: u64,
146}
147
148impl NodeManifest {
149 #[must_use]
151 pub fn new(node_id: String) -> Self {
152 Self {
153 node_id,
154 records: Vec::new(),
155 created_at: 0,
156 }
157 }
158
159 pub fn add_file(&mut self, record: FileRecord) {
161 self.records.push(record);
162 }
163
164 #[must_use]
166 pub fn len(&self) -> usize {
167 self.records.len()
168 }
169
170 #[must_use]
172 pub fn is_empty(&self) -> bool {
173 self.records.is_empty()
174 }
175
176 pub fn to_json(&self) -> Result<String, serde_json::Error> {
183 serde_json::to_string(self)
184 }
185
186 pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
193 serde_json::from_str(json)
194 }
195}
196
197#[derive(Debug, Clone)]
203pub struct NetworkDedupConfig {
204 pub phash_max_distance: u32,
206 pub duration_tolerance_s: f64,
209 pub min_file_size: u64,
212}
213
214impl Default for NetworkDedupConfig {
215 fn default() -> Self {
216 Self {
217 phash_max_distance: 10,
218 duration_tolerance_s: 5.0,
219 min_file_size: 65_536, }
221 }
222}
223
224#[derive(Debug, Clone)]
230pub struct CrossNodeGroup {
231 pub uris: Vec<String>,
233 pub method: DuplicateMethod,
235 pub phash_distance: Option<u32>,
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum DuplicateMethod {
242 ExactHash,
244 PerceptualHash,
246}
247
248#[derive(Debug)]
257pub struct NetworkDedupEngine {
258 config: NetworkDedupConfig,
259 manifests: Vec<NodeManifest>,
260}
261
262impl NetworkDedupEngine {
263 #[must_use]
265 pub fn new(config: NetworkDedupConfig) -> Self {
266 Self {
267 config,
268 manifests: Vec::new(),
269 }
270 }
271
272 pub fn add_manifest(&mut self, manifest: NodeManifest) {
274 self.manifests.push(manifest);
275 }
276
277 #[must_use]
279 pub fn manifest_count(&self) -> usize {
280 self.manifests.len()
281 }
282
283 #[must_use]
285 pub fn total_records(&self) -> usize {
286 self.manifests.iter().map(|m| m.records.len()).sum()
287 }
288
289 #[must_use]
299 pub fn find_cross_node_duplicates(&self) -> Vec<CrossNodeGroup> {
300 let mut groups = Vec::new();
301
302 let all: Vec<(&str, &FileRecord)> = self
304 .manifests
305 .iter()
306 .flat_map(|m| m.records.iter().map(move |r| (m.node_id.as_str(), r)))
307 .collect();
308
309 let mut by_digest: HashMap<&str, Vec<(&str, &FileRecord)>> = HashMap::new();
311 for &(node, rec) in &all {
312 by_digest
313 .entry(rec.blake3_hex.as_str())
314 .or_default()
315 .push((node, rec));
316 }
317
318 let mut exact_uris: std::collections::HashSet<String> = std::collections::HashSet::new();
319
320 for (_digest, records) in &by_digest {
321 if records.len() < 2 {
322 continue;
323 }
324 let nodes: std::collections::HashSet<&str> = records.iter().map(|(n, _)| *n).collect();
326 if nodes.len() < 2 {
327 continue;
328 }
329 let uris: Vec<String> = records.iter().map(|(_, r)| r.uri.clone()).collect();
330 for u in &uris {
331 exact_uris.insert(u.clone());
332 }
333 groups.push(CrossNodeGroup {
334 uris,
335 method: DuplicateMethod::ExactHash,
336 phash_distance: Some(0),
337 });
338 }
339
340 let phash_candidates: Vec<(&str, &FileRecord)> = all
342 .iter()
343 .filter(|(_, r)| {
344 r.phash.is_some()
345 && !exact_uris.contains(&r.uri)
346 && r.file_size
347 .map(|s| s >= self.config.min_file_size)
348 .unwrap_or(true)
349 })
350 .copied()
351 .collect();
352
353 let n = phash_candidates.len();
354 let mut grouped = vec![false; n];
355
356 for i in 0..n {
357 if grouped[i] {
358 continue;
359 }
360 let (node_i, rec_i) = phash_candidates[i];
361 let mut grp_uris = vec![rec_i.uri.clone()];
362 let mut min_dist = u32::MAX;
363
364 for j in (i + 1)..n {
365 if grouped[j] {
366 continue;
367 }
368 let (node_j, rec_j) = phash_candidates[j];
369 if node_i == node_j {
371 continue;
372 }
373 if let (Some(d1), Some(d2)) = (rec_i.duration_s, rec_j.duration_s) {
375 if (d1 - d2).abs() > self.config.duration_tolerance_s {
376 continue;
377 }
378 }
379 if let Some(dist) = rec_i.phash_distance(rec_j) {
380 if dist <= self.config.phash_max_distance {
381 grp_uris.push(rec_j.uri.clone());
382 grouped[j] = true;
383 if dist < min_dist {
384 min_dist = dist;
385 }
386 }
387 }
388 }
389
390 if grp_uris.len() >= 2 {
391 grouped[i] = true;
392 groups.push(CrossNodeGroup {
393 uris: grp_uris,
394 method: DuplicateMethod::PerceptualHash,
395 phash_distance: if min_dist == u32::MAX {
396 None
397 } else {
398 Some(min_dist)
399 },
400 });
401 }
402 }
403
404 groups
405 }
406
407 #[must_use]
409 pub fn cross_node_summary(&self) -> CrossNodeSummary {
410 let groups = self.find_cross_node_duplicates();
411 let exact_groups = groups
412 .iter()
413 .filter(|g| g.method == DuplicateMethod::ExactHash)
414 .count();
415 let perceptual_groups = groups
416 .iter()
417 .filter(|g| g.method == DuplicateMethod::PerceptualHash)
418 .count();
419 let total_duplicate_files: usize = groups.iter().map(|g| g.uris.len()).sum();
420 CrossNodeSummary {
421 total_groups: groups.len(),
422 exact_groups,
423 perceptual_groups,
424 total_duplicate_files,
425 }
426 }
427}
428
429#[derive(Debug, Clone)]
431pub struct CrossNodeSummary {
432 pub total_groups: usize,
434 pub exact_groups: usize,
436 pub perceptual_groups: usize,
438 pub total_duplicate_files: usize,
440}
441
442impl Default for NetworkDedupEngine {
443 fn default() -> Self {
444 Self::new(NetworkDedupConfig::default())
445 }
446}
447
448#[cfg(test)]
453mod tests {
454 use super::*;
455
456 fn make_record(uri: &str, digest: &str, phash: Option<u64>, dur: Option<f64>) -> FileRecord {
457 FileRecord::new(
458 uri.to_string(),
459 digest.to_string(),
460 phash,
461 dur,
462 Some(1_000_000),
463 )
464 }
465
466 fn two_node_exact() -> NetworkDedupEngine {
468 let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
469
470 let digest = "a".repeat(64);
471 let mut ma = NodeManifest::new("node-a".to_string());
472 ma.add_file(make_record(
473 "node-a:/movie.mp4",
474 &digest,
475 None,
476 Some(3600.0),
477 ));
478
479 let mut mb = NodeManifest::new("node-b".to_string());
480 mb.add_file(make_record(
481 "node-b:/backup/movie.mp4",
482 &digest,
483 None,
484 Some(3600.0),
485 ));
486
487 engine.add_manifest(ma);
488 engine.add_manifest(mb);
489 engine
490 }
491
492 #[test]
493 fn test_exact_cross_node_duplicate() {
494 let engine = two_node_exact();
495 let groups = engine.find_cross_node_duplicates();
496 assert_eq!(groups.len(), 1);
497 assert_eq!(groups[0].method, DuplicateMethod::ExactHash);
498 assert_eq!(groups[0].uris.len(), 2);
499 }
500
501 #[test]
502 fn test_no_duplicate_same_node() {
503 let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
505 let digest = "b".repeat(64);
506 let mut ma = NodeManifest::new("node-a".to_string());
507 ma.add_file(make_record("node-a:/v1.mp4", &digest, None, None));
508 ma.add_file(make_record("node-a:/v2.mp4", &digest, None, None));
509 engine.add_manifest(ma);
510
511 let groups = engine.find_cross_node_duplicates();
512 assert!(groups.is_empty(), "same-node duplicates must be excluded");
513 }
514
515 #[test]
516 fn test_perceptual_cross_node_match() {
517 let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
518 let base: u64 = 0xFF00_FF00_FF00_FF00;
520 let close: u64 = base ^ 0b11; let mut ma = NodeManifest::new("node-a".to_string());
523 ma.add_file(make_record(
524 "node-a:/clip.mp4",
525 &"c".repeat(64),
526 Some(base),
527 Some(60.0),
528 ));
529 let mut mb = NodeManifest::new("node-b".to_string());
530 mb.add_file(make_record(
531 "node-b:/clip_re.mp4",
532 &"d".repeat(64),
533 Some(close),
534 Some(60.0),
535 ));
536 engine.add_manifest(ma);
537 engine.add_manifest(mb);
538
539 let groups = engine.find_cross_node_duplicates();
540 let perceptual: Vec<_> = groups
541 .iter()
542 .filter(|g| g.method == DuplicateMethod::PerceptualHash)
543 .collect();
544 assert_eq!(perceptual.len(), 1);
545 assert_eq!(perceptual[0].phash_distance, Some(2));
546 }
547
548 #[test]
549 fn test_duration_guard_excludes_mismatch() {
550 let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
551 let base: u64 = 0xAAAA_AAAA_AAAA_AAAA;
552
553 let mut ma = NodeManifest::new("node-a".to_string());
554 ma.add_file(make_record(
555 "node-a:/short.mp4",
556 &"e".repeat(64),
557 Some(base),
558 Some(30.0),
559 ));
560 let mut mb = NodeManifest::new("node-b".to_string());
561 mb.add_file(make_record(
562 "node-b:/long.mp4",
563 &"f".repeat(64),
564 Some(base ^ 1), Some(90.0),
566 ));
567 engine.add_manifest(ma);
568 engine.add_manifest(mb);
569
570 let groups = engine.find_cross_node_duplicates();
571 let perceptual: Vec<_> = groups
573 .iter()
574 .filter(|g| g.method == DuplicateMethod::PerceptualHash)
575 .collect();
576 assert!(
577 perceptual.is_empty(),
578 "duration guard should exclude this pair"
579 );
580 }
581
582 #[test]
583 fn test_manifest_serialise_roundtrip() {
584 let mut m = NodeManifest::new("node-z".to_string());
585 m.add_file(FileRecord::new(
586 "node-z:/test.mp4".to_string(),
587 "0".repeat(64),
588 Some(12345),
589 Some(99.9),
590 Some(1024),
591 ));
592 let json = m.to_json().expect("serialise should succeed");
593 let m2 = NodeManifest::from_json(&json).expect("deserialise should succeed");
594 assert_eq!(m2.node_id, "node-z");
595 assert_eq!(m2.records.len(), 1);
596 assert_eq!(m2.records[0].phash, Some(12345));
597 }
598
599 #[test]
600 fn test_file_record_valid_digest() {
601 let good = FileRecord::new(
602 "n:/f.mp4".to_string(),
603 "a1b2c3".repeat(10) + "a1b2c3", None,
605 None,
606 None,
607 );
608 assert!(!good.has_valid_digest());
610
611 let valid = FileRecord::new("n:/f.mp4".to_string(), "0".repeat(64), None, None, None);
612 assert!(valid.has_valid_digest());
613 }
614
615 #[test]
616 fn test_phash_distance_calculation() {
617 let a = FileRecord::new(
618 "n:/a.mp4".to_string(),
619 "0".repeat(64),
620 Some(0xFF),
621 None,
622 None,
623 );
624 let b = FileRecord::new(
625 "n:/b.mp4".to_string(),
626 "0".repeat(64),
627 Some(0xFE),
628 None,
629 None,
630 );
631 assert_eq!(a.phash_distance(&b), Some(1));
633
634 let no_hash = FileRecord::new("n:/c.mp4".to_string(), "0".repeat(64), None, None, None);
635 assert_eq!(a.phash_distance(&no_hash), None);
636 }
637
638 #[test]
639 fn test_cross_node_summary() {
640 let engine = two_node_exact();
641 let summary = engine.cross_node_summary();
642 assert_eq!(summary.total_groups, 1);
643 assert_eq!(summary.exact_groups, 1);
644 assert_eq!(summary.perceptual_groups, 0);
645 assert_eq!(summary.total_duplicate_files, 2);
646 }
647
648 #[test]
649 fn test_empty_engine() {
650 let engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
651 assert_eq!(engine.manifest_count(), 0);
652 assert_eq!(engine.total_records(), 0);
653 let groups = engine.find_cross_node_duplicates();
654 assert!(groups.is_empty());
655 }
656
657 #[test]
658 fn test_node_name_extraction() {
659 let rec = FileRecord::new(
660 "node-alpha:/path/to/file.mp4".to_string(),
661 "0".repeat(64),
662 None,
663 None,
664 None,
665 );
666 assert_eq!(rec.node_name(), Some("node-alpha"));
667 }
668
669 #[test]
670 fn test_three_node_perceptual_cluster() {
671 let base: u64 = 0x0F0F_0F0F_0F0F_0F0F;
675 let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
676
677 for (node, delta) in [("n1", 0u64), ("n2", 0b1), ("n3", 0b11)] {
678 let mut m = NodeManifest::new(node.to_string());
679 m.add_file(make_record(
680 &format!("{node}:/clip.mp4"),
681 &format!("{:0>64}", node),
683 Some(base ^ delta),
684 Some(120.0),
685 ));
686 engine.add_manifest(m);
687 }
688
689 let groups = engine.find_cross_node_duplicates();
690 let perceptual_total: usize = groups
694 .iter()
695 .filter(|g| g.method == DuplicateMethod::PerceptualHash)
696 .map(|g| g.uris.len())
697 .sum();
698 assert!(
699 perceptual_total >= 2,
700 "expected at least 2 files in perceptual groups, got {perceptual_total}"
701 );
702 }
703}