Skip to main content

oximedia_dedup/
network_dedup.rs

1//! Network-aware deduplication for distributed media libraries.
2//!
3//! This module provides mechanisms to deduplicate media across multiple nodes in
4//! a distributed system.  Rather than requiring every node to download every file,
5//! nodes exchange compact **fingerprint manifests** and only transfer content when
6//! necessary.
7//!
8//! # Design
9//!
10//! Each node maintains a local [`NodeManifest`] containing fingerprint summaries
11//! (Blake3 hex digest, perceptual hash bits, duration, file size) for its local
12//! media files.  Manifests are serialisable as JSON so they can be transmitted over
13//! HTTP or any byte channel without coupling to a particular transport.
14//!
15//! The [`NetworkDedupEngine`] accepts manifests from multiple remote nodes and
16//! computes cross-node duplicate groups by:
17//!
18//! 1. **Exact match** – identical Blake3 digests → definite duplicate.
19//! 2. **Perceptual match** – Hamming distance on 64-bit pHash ≤ configured
20//!    threshold → near-duplicate candidate.
21//! 3. **Duration guard** – files with very different durations (> `duration_tolerance_s`)
22//!    are excluded from perceptual matching to reduce false positives.
23//!
24//! # Example
25//!
26//! ```rust
27//! use oximedia_dedup::network_dedup::{
28//!     NetworkDedupEngine, NetworkDedupConfig, NodeManifest, FileRecord,
29//! };
30//!
31//! let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
32//!
33//! let mut manifest_a = NodeManifest::new("node-a".to_string());
34//! manifest_a.add_file(FileRecord::new(
35//!     "node-a:/videos/movie.mp4".to_string(),
36//!     "abcdef01".repeat(8),
37//!     Some(0xDEAD_BEEF_1234_5678),
38//!     Some(7200.0),
39//!     Some(4_000_000_000),
40//! ));
41//!
42//! let mut manifest_b = NodeManifest::new("node-b".to_string());
43//! manifest_b.add_file(FileRecord::new(
44//!     "node-b:/archive/movie_copy.mp4".to_string(),
45//!     "abcdef01".repeat(8),
46//!     Some(0xDEAD_BEEF_1234_5678),
47//!     Some(7200.0),
48//!     Some(4_000_000_000),
49//! ));
50//!
51//! engine.add_manifest(manifest_a);
52//! engine.add_manifest(manifest_b);
53//!
54//! let groups = engine.find_cross_node_duplicates();
55//! assert!(!groups.is_empty());
56//! ```
57
58#![allow(dead_code)]
59#![allow(clippy::cast_precision_loss)]
60
61use std::collections::HashMap;
62
63use serde::{Deserialize, Serialize};
64
65// ---------------------------------------------------------------------------
66// FileRecord
67// ---------------------------------------------------------------------------
68
69/// A single file entry within a node's manifest.
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct FileRecord {
72    /// Logical URI for this file (e.g. `"node-a:/path/to/file.mp4"`).
73    pub uri: String,
74    /// Lower-case hexadecimal BLAKE3 digest (64 hex characters).
75    pub blake3_hex: String,
76    /// Optional 64-bit perceptual hash.
77    pub phash: Option<u64>,
78    /// Optional duration in seconds.
79    pub duration_s: Option<f64>,
80    /// Optional file size in bytes.
81    pub file_size: Option<u64>,
82}
83
84impl FileRecord {
85    /// Create a new `FileRecord`.
86    #[must_use]
87    pub fn new(
88        uri: String,
89        blake3_hex: String,
90        phash: Option<u64>,
91        duration_s: Option<f64>,
92        file_size: Option<u64>,
93    ) -> Self {
94        Self {
95            uri,
96            blake3_hex,
97            phash,
98            duration_s,
99            file_size,
100        }
101    }
102
103    /// Return `true` if this record has a valid-looking Blake3 hex digest.
104    ///
105    /// BLAKE3 produces 32 bytes → 64 hex characters.
106    #[must_use]
107    pub fn has_valid_digest(&self) -> bool {
108        self.blake3_hex.len() == 64 && self.blake3_hex.chars().all(|c| c.is_ascii_hexdigit())
109    }
110
111    /// Compute the Hamming distance to another record's perceptual hash.
112    ///
113    /// Returns `None` if either record lacks a perceptual hash.
114    #[must_use]
115    pub fn phash_distance(&self, other: &Self) -> Option<u32> {
116        match (self.phash, other.phash) {
117            (Some(a), Some(b)) => Some((a ^ b).count_ones()),
118            _ => None,
119        }
120    }
121
122    /// Return the logical node name extracted from the URI prefix
123    /// (`"node-a:/foo"` → `"node-a"`).
124    #[must_use]
125    pub fn node_name(&self) -> Option<&str> {
126        self.uri.split(':').next()
127    }
128}
129
130// ---------------------------------------------------------------------------
131// NodeManifest
132// ---------------------------------------------------------------------------
133
134/// Fingerprint manifest for a single node.
135///
136/// A manifest holds all [`FileRecord`]s known to a particular node and can be
137/// serialised/deserialised as JSON for transport.
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct NodeManifest {
140    /// Human-readable node identifier.
141    pub node_id: String,
142    /// The file records.
143    pub records: Vec<FileRecord>,
144    /// Creation timestamp (Unix seconds) — informational only.
145    pub created_at: u64,
146}
147
148impl NodeManifest {
149    /// Create an empty manifest for `node_id`.
150    #[must_use]
151    pub fn new(node_id: String) -> Self {
152        Self {
153            node_id,
154            records: Vec::new(),
155            created_at: 0,
156        }
157    }
158
159    /// Add a file record to the manifest.
160    pub fn add_file(&mut self, record: FileRecord) {
161        self.records.push(record);
162    }
163
164    /// Return the number of records.
165    #[must_use]
166    pub fn len(&self) -> usize {
167        self.records.len()
168    }
169
170    /// Return `true` if the manifest has no records.
171    #[must_use]
172    pub fn is_empty(&self) -> bool {
173        self.records.is_empty()
174    }
175
176    /// Serialise the manifest to a JSON string.
177    ///
178    /// # Errors
179    ///
180    /// Returns a `serde_json::Error` if serialisation fails (which should never
181    /// happen for this type).
182    pub fn to_json(&self) -> Result<String, serde_json::Error> {
183        serde_json::to_string(self)
184    }
185
186    /// Deserialise a manifest from a JSON string.
187    ///
188    /// # Errors
189    ///
190    /// Returns a `serde_json::Error` if the JSON is malformed or the schema
191    /// doesn't match.
192    pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
193        serde_json::from_str(json)
194    }
195}
196
197// ---------------------------------------------------------------------------
198// Configuration
199// ---------------------------------------------------------------------------
200
201/// Configuration for the [`NetworkDedupEngine`].
202#[derive(Debug, Clone)]
203pub struct NetworkDedupConfig {
204    /// Maximum Hamming distance for perceptual hash matching.
205    pub phash_max_distance: u32,
206    /// Maximum difference in duration (seconds) for two files to be considered
207    /// near-duplicate candidates during perceptual matching.
208    pub duration_tolerance_s: f64,
209    /// Minimum file size (bytes) to include a record in perceptual matching.
210    /// Very small files are excluded to avoid spurious perceptual matches.
211    pub min_file_size: u64,
212}
213
214impl Default for NetworkDedupConfig {
215    fn default() -> Self {
216        Self {
217            phash_max_distance: 10,
218            duration_tolerance_s: 5.0,
219            min_file_size: 65_536, // 64 KiB
220        }
221    }
222}
223
224// ---------------------------------------------------------------------------
225// DuplicateGroup
226// ---------------------------------------------------------------------------
227
228/// A group of cross-node duplicate files.
229#[derive(Debug, Clone)]
230pub struct CrossNodeGroup {
231    /// The URIs of all files in this duplicate group.
232    pub uris: Vec<String>,
233    /// How the duplicates were detected.
234    pub method: DuplicateMethod,
235    /// Perceptual hash distance (0 for exact duplicates, None if method != Perceptual).
236    pub phash_distance: Option<u32>,
237}
238
239/// Detection method for cross-node duplicates.
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241pub enum DuplicateMethod {
242    /// Identical BLAKE3 cryptographic digest.
243    ExactHash,
244    /// Perceptual hash Hamming distance within configured threshold.
245    PerceptualHash,
246}
247
248// ---------------------------------------------------------------------------
249// NetworkDedupEngine
250// ---------------------------------------------------------------------------
251
252/// Engine for detecting duplicates across distributed media nodes.
253///
254/// Add manifests from each remote node then call [`find_cross_node_duplicates`]
255/// to get a list of [`CrossNodeGroup`]s.
256#[derive(Debug)]
257pub struct NetworkDedupEngine {
258    config: NetworkDedupConfig,
259    manifests: Vec<NodeManifest>,
260}
261
262impl NetworkDedupEngine {
263    /// Create a new engine with the given configuration.
264    #[must_use]
265    pub fn new(config: NetworkDedupConfig) -> Self {
266        Self {
267            config,
268            manifests: Vec::new(),
269        }
270    }
271
272    /// Add a [`NodeManifest`] to the engine.
273    pub fn add_manifest(&mut self, manifest: NodeManifest) {
274        self.manifests.push(manifest);
275    }
276
277    /// Return the number of manifests registered.
278    #[must_use]
279    pub fn manifest_count(&self) -> usize {
280        self.manifests.len()
281    }
282
283    /// Return the total number of file records across all manifests.
284    #[must_use]
285    pub fn total_records(&self) -> usize {
286        self.manifests.iter().map(|m| m.records.len()).sum()
287    }
288
289    /// Find duplicate groups across all registered node manifests.
290    ///
291    /// The algorithm runs two passes:
292    ///
293    /// 1. **Exact pass** – groups records by Blake3 hex digest.  Only groups
294    ///    that span at least two *different* nodes are returned.
295    /// 2. **Perceptual pass** – for records with pHash and not already grouped
296    ///    as exact duplicates, applies Hamming-distance comparison with the
297    ///    configured threshold and duration guard.
298    #[must_use]
299    pub fn find_cross_node_duplicates(&self) -> Vec<CrossNodeGroup> {
300        let mut groups = Vec::new();
301
302        // Flatten all records with their node id attached.
303        let all: Vec<(&str, &FileRecord)> = self
304            .manifests
305            .iter()
306            .flat_map(|m| m.records.iter().map(move |r| (m.node_id.as_str(), r)))
307            .collect();
308
309        // --- Pass 1: exact hash ---
310        let mut by_digest: HashMap<&str, Vec<(&str, &FileRecord)>> = HashMap::new();
311        for &(node, rec) in &all {
312            by_digest
313                .entry(rec.blake3_hex.as_str())
314                .or_default()
315                .push((node, rec));
316        }
317
318        let mut exact_uris: std::collections::HashSet<String> = std::collections::HashSet::new();
319
320        for (_digest, records) in &by_digest {
321            if records.len() < 2 {
322                continue;
323            }
324            // Check that at least two *different* nodes are represented.
325            let nodes: std::collections::HashSet<&str> = records.iter().map(|(n, _)| *n).collect();
326            if nodes.len() < 2 {
327                continue;
328            }
329            let uris: Vec<String> = records.iter().map(|(_, r)| r.uri.clone()).collect();
330            for u in &uris {
331                exact_uris.insert(u.clone());
332            }
333            groups.push(CrossNodeGroup {
334                uris,
335                method: DuplicateMethod::ExactHash,
336                phash_distance: Some(0),
337            });
338        }
339
340        // --- Pass 2: perceptual hash ---
341        let phash_candidates: Vec<(&str, &FileRecord)> = all
342            .iter()
343            .filter(|(_, r)| {
344                r.phash.is_some()
345                    && !exact_uris.contains(&r.uri)
346                    && r.file_size
347                        .map(|s| s >= self.config.min_file_size)
348                        .unwrap_or(true)
349            })
350            .copied()
351            .collect();
352
353        let n = phash_candidates.len();
354        let mut grouped = vec![false; n];
355
356        for i in 0..n {
357            if grouped[i] {
358                continue;
359            }
360            let (node_i, rec_i) = phash_candidates[i];
361            let mut grp_uris = vec![rec_i.uri.clone()];
362            let mut min_dist = u32::MAX;
363
364            for j in (i + 1)..n {
365                if grouped[j] {
366                    continue;
367                }
368                let (node_j, rec_j) = phash_candidates[j];
369                // Only match across different nodes.
370                if node_i == node_j {
371                    continue;
372                }
373                // Duration guard.
374                if let (Some(d1), Some(d2)) = (rec_i.duration_s, rec_j.duration_s) {
375                    if (d1 - d2).abs() > self.config.duration_tolerance_s {
376                        continue;
377                    }
378                }
379                if let Some(dist) = rec_i.phash_distance(rec_j) {
380                    if dist <= self.config.phash_max_distance {
381                        grp_uris.push(rec_j.uri.clone());
382                        grouped[j] = true;
383                        if dist < min_dist {
384                            min_dist = dist;
385                        }
386                    }
387                }
388            }
389
390            if grp_uris.len() >= 2 {
391                grouped[i] = true;
392                groups.push(CrossNodeGroup {
393                    uris: grp_uris,
394                    method: DuplicateMethod::PerceptualHash,
395                    phash_distance: if min_dist == u32::MAX {
396                        None
397                    } else {
398                        Some(min_dist)
399                    },
400                });
401            }
402        }
403
404        groups
405    }
406
407    /// Return a summary of how many duplicates span how many nodes.
408    #[must_use]
409    pub fn cross_node_summary(&self) -> CrossNodeSummary {
410        let groups = self.find_cross_node_duplicates();
411        let exact_groups = groups
412            .iter()
413            .filter(|g| g.method == DuplicateMethod::ExactHash)
414            .count();
415        let perceptual_groups = groups
416            .iter()
417            .filter(|g| g.method == DuplicateMethod::PerceptualHash)
418            .count();
419        let total_duplicate_files: usize = groups.iter().map(|g| g.uris.len()).sum();
420        CrossNodeSummary {
421            total_groups: groups.len(),
422            exact_groups,
423            perceptual_groups,
424            total_duplicate_files,
425        }
426    }
427}
428
429/// Summary of cross-node deduplication results.
430#[derive(Debug, Clone)]
431pub struct CrossNodeSummary {
432    /// Total number of duplicate groups found.
433    pub total_groups: usize,
434    /// Groups detected via exact hash.
435    pub exact_groups: usize,
436    /// Groups detected via perceptual hash.
437    pub perceptual_groups: usize,
438    /// Total number of file URIs across all duplicate groups.
439    pub total_duplicate_files: usize,
440}
441
442impl Default for NetworkDedupEngine {
443    fn default() -> Self {
444        Self::new(NetworkDedupConfig::default())
445    }
446}
447
448// ---------------------------------------------------------------------------
449// Tests
450// ---------------------------------------------------------------------------
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455
456    fn make_record(uri: &str, digest: &str, phash: Option<u64>, dur: Option<f64>) -> FileRecord {
457        FileRecord::new(
458            uri.to_string(),
459            digest.to_string(),
460            phash,
461            dur,
462            Some(1_000_000),
463        )
464    }
465
466    /// Build an engine with two nodes that share one exact duplicate.
467    fn two_node_exact() -> NetworkDedupEngine {
468        let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
469
470        let digest = "a".repeat(64);
471        let mut ma = NodeManifest::new("node-a".to_string());
472        ma.add_file(make_record(
473            "node-a:/movie.mp4",
474            &digest,
475            None,
476            Some(3600.0),
477        ));
478
479        let mut mb = NodeManifest::new("node-b".to_string());
480        mb.add_file(make_record(
481            "node-b:/backup/movie.mp4",
482            &digest,
483            None,
484            Some(3600.0),
485        ));
486
487        engine.add_manifest(ma);
488        engine.add_manifest(mb);
489        engine
490    }
491
492    #[test]
493    fn test_exact_cross_node_duplicate() {
494        let engine = two_node_exact();
495        let groups = engine.find_cross_node_duplicates();
496        assert_eq!(groups.len(), 1);
497        assert_eq!(groups[0].method, DuplicateMethod::ExactHash);
498        assert_eq!(groups[0].uris.len(), 2);
499    }
500
501    #[test]
502    fn test_no_duplicate_same_node() {
503        // Same digest but same node — should NOT appear in cross-node groups.
504        let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
505        let digest = "b".repeat(64);
506        let mut ma = NodeManifest::new("node-a".to_string());
507        ma.add_file(make_record("node-a:/v1.mp4", &digest, None, None));
508        ma.add_file(make_record("node-a:/v2.mp4", &digest, None, None));
509        engine.add_manifest(ma);
510
511        let groups = engine.find_cross_node_duplicates();
512        assert!(groups.is_empty(), "same-node duplicates must be excluded");
513    }
514
515    #[test]
516    fn test_perceptual_cross_node_match() {
517        let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
518        // pHash distance of 2 — well within default threshold of 10.
519        let base: u64 = 0xFF00_FF00_FF00_FF00;
520        let close: u64 = base ^ 0b11; // 2 bits different
521
522        let mut ma = NodeManifest::new("node-a".to_string());
523        ma.add_file(make_record(
524            "node-a:/clip.mp4",
525            &"c".repeat(64),
526            Some(base),
527            Some(60.0),
528        ));
529        let mut mb = NodeManifest::new("node-b".to_string());
530        mb.add_file(make_record(
531            "node-b:/clip_re.mp4",
532            &"d".repeat(64),
533            Some(close),
534            Some(60.0),
535        ));
536        engine.add_manifest(ma);
537        engine.add_manifest(mb);
538
539        let groups = engine.find_cross_node_duplicates();
540        let perceptual: Vec<_> = groups
541            .iter()
542            .filter(|g| g.method == DuplicateMethod::PerceptualHash)
543            .collect();
544        assert_eq!(perceptual.len(), 1);
545        assert_eq!(perceptual[0].phash_distance, Some(2));
546    }
547
548    #[test]
549    fn test_duration_guard_excludes_mismatch() {
550        let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
551        let base: u64 = 0xAAAA_AAAA_AAAA_AAAA;
552
553        let mut ma = NodeManifest::new("node-a".to_string());
554        ma.add_file(make_record(
555            "node-a:/short.mp4",
556            &"e".repeat(64),
557            Some(base),
558            Some(30.0),
559        ));
560        let mut mb = NodeManifest::new("node-b".to_string());
561        mb.add_file(make_record(
562            "node-b:/long.mp4",
563            &"f".repeat(64),
564            Some(base ^ 1), // distance 1, but durations differ by 60 s
565            Some(90.0),
566        ));
567        engine.add_manifest(ma);
568        engine.add_manifest(mb);
569
570        let groups = engine.find_cross_node_duplicates();
571        // Duration differs by 60 s > tolerance 5 s → no perceptual match.
572        let perceptual: Vec<_> = groups
573            .iter()
574            .filter(|g| g.method == DuplicateMethod::PerceptualHash)
575            .collect();
576        assert!(
577            perceptual.is_empty(),
578            "duration guard should exclude this pair"
579        );
580    }
581
582    #[test]
583    fn test_manifest_serialise_roundtrip() {
584        let mut m = NodeManifest::new("node-z".to_string());
585        m.add_file(FileRecord::new(
586            "node-z:/test.mp4".to_string(),
587            "0".repeat(64),
588            Some(12345),
589            Some(99.9),
590            Some(1024),
591        ));
592        let json = m.to_json().expect("serialise should succeed");
593        let m2 = NodeManifest::from_json(&json).expect("deserialise should succeed");
594        assert_eq!(m2.node_id, "node-z");
595        assert_eq!(m2.records.len(), 1);
596        assert_eq!(m2.records[0].phash, Some(12345));
597    }
598
599    #[test]
600    fn test_file_record_valid_digest() {
601        let good = FileRecord::new(
602            "n:/f.mp4".to_string(),
603            "a1b2c3".repeat(10) + "a1b2c3", // 66 chars — invalid
604            None,
605            None,
606            None,
607        );
608        // 66 hex chars is not 64 → invalid.
609        assert!(!good.has_valid_digest());
610
611        let valid = FileRecord::new("n:/f.mp4".to_string(), "0".repeat(64), None, None, None);
612        assert!(valid.has_valid_digest());
613    }
614
615    #[test]
616    fn test_phash_distance_calculation() {
617        let a = FileRecord::new(
618            "n:/a.mp4".to_string(),
619            "0".repeat(64),
620            Some(0xFF),
621            None,
622            None,
623        );
624        let b = FileRecord::new(
625            "n:/b.mp4".to_string(),
626            "0".repeat(64),
627            Some(0xFE),
628            None,
629            None,
630        );
631        // 0xFF ^ 0xFE = 0x01 → 1 bit
632        assert_eq!(a.phash_distance(&b), Some(1));
633
634        let no_hash = FileRecord::new("n:/c.mp4".to_string(), "0".repeat(64), None, None, None);
635        assert_eq!(a.phash_distance(&no_hash), None);
636    }
637
638    #[test]
639    fn test_cross_node_summary() {
640        let engine = two_node_exact();
641        let summary = engine.cross_node_summary();
642        assert_eq!(summary.total_groups, 1);
643        assert_eq!(summary.exact_groups, 1);
644        assert_eq!(summary.perceptual_groups, 0);
645        assert_eq!(summary.total_duplicate_files, 2);
646    }
647
648    #[test]
649    fn test_empty_engine() {
650        let engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
651        assert_eq!(engine.manifest_count(), 0);
652        assert_eq!(engine.total_records(), 0);
653        let groups = engine.find_cross_node_duplicates();
654        assert!(groups.is_empty());
655    }
656
657    #[test]
658    fn test_node_name_extraction() {
659        let rec = FileRecord::new(
660            "node-alpha:/path/to/file.mp4".to_string(),
661            "0".repeat(64),
662            None,
663            None,
664            None,
665        );
666        assert_eq!(rec.node_name(), Some("node-alpha"));
667    }
668
669    #[test]
670    fn test_three_node_perceptual_cluster() {
671        // Three nodes all have pHashes within distance 10 of each other.
672        // The greedy algorithm may group different pairs; we only require that
673        // at least two of them end up in a perceptual group.
674        let base: u64 = 0x0F0F_0F0F_0F0F_0F0F;
675        let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
676
677        for (node, delta) in [("n1", 0u64), ("n2", 0b1), ("n3", 0b11)] {
678            let mut m = NodeManifest::new(node.to_string());
679            m.add_file(make_record(
680                &format!("{node}:/clip.mp4"),
681                // Use distinct digests so exact-hash pass doesn't fire.
682                &format!("{:0>64}", node),
683                Some(base ^ delta),
684                Some(120.0),
685            ));
686            engine.add_manifest(m);
687        }
688
689        let groups = engine.find_cross_node_duplicates();
690        // At least one perceptual group should exist — n1/n2 and n1/n3 are
691        // all within distance 3 which is well below the default threshold of 10.
692        // The greedy pass guarantees at least the first pair is found.
693        let perceptual_total: usize = groups
694            .iter()
695            .filter(|g| g.method == DuplicateMethod::PerceptualHash)
696            .map(|g| g.uris.len())
697            .sum();
698        assert!(
699            perceptual_total >= 2,
700            "expected at least 2 files in perceptual groups, got {perceptual_total}"
701        );
702    }
703}