#![allow(dead_code)]
#![allow(clippy::cast_precision_loss)]
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FileRecord {
pub uri: String,
pub blake3_hex: String,
pub phash: Option<u64>,
pub duration_s: Option<f64>,
pub file_size: Option<u64>,
}
impl FileRecord {
#[must_use]
pub fn new(
uri: String,
blake3_hex: String,
phash: Option<u64>,
duration_s: Option<f64>,
file_size: Option<u64>,
) -> Self {
Self {
uri,
blake3_hex,
phash,
duration_s,
file_size,
}
}
#[must_use]
pub fn has_valid_digest(&self) -> bool {
self.blake3_hex.len() == 64
&& self
.blake3_hex
.chars()
.all(|c| c.is_ascii_hexdigit())
}
#[must_use]
pub fn phash_distance(&self, other: &Self) -> Option<u32> {
match (self.phash, other.phash) {
(Some(a), Some(b)) => Some((a ^ b).count_ones()),
_ => None,
}
}
#[must_use]
pub fn node_name(&self) -> Option<&str> {
self.uri.split(':').next()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeManifest {
pub node_id: String,
pub records: Vec<FileRecord>,
pub created_at: u64,
}
impl NodeManifest {
#[must_use]
pub fn new(node_id: String) -> Self {
Self {
node_id,
records: Vec::new(),
created_at: 0,
}
}
pub fn add_file(&mut self, record: FileRecord) {
self.records.push(record);
}
#[must_use]
pub fn len(&self) -> usize {
self.records.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.records.is_empty()
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
serde_json::to_string(self)
}
pub fn from_json(json: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(json)
}
}
#[derive(Debug, Clone)]
pub struct NetworkDedupConfig {
pub phash_max_distance: u32,
pub duration_tolerance_s: f64,
pub min_file_size: u64,
}
impl Default for NetworkDedupConfig {
fn default() -> Self {
Self {
phash_max_distance: 10,
duration_tolerance_s: 5.0,
min_file_size: 65_536, }
}
}
#[derive(Debug, Clone)]
pub struct CrossNodeGroup {
pub uris: Vec<String>,
pub method: DuplicateMethod,
pub phash_distance: Option<u32>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DuplicateMethod {
ExactHash,
PerceptualHash,
}
#[derive(Debug)]
pub struct NetworkDedupEngine {
config: NetworkDedupConfig,
manifests: Vec<NodeManifest>,
}
impl NetworkDedupEngine {
#[must_use]
pub fn new(config: NetworkDedupConfig) -> Self {
Self {
config,
manifests: Vec::new(),
}
}
pub fn add_manifest(&mut self, manifest: NodeManifest) {
self.manifests.push(manifest);
}
#[must_use]
pub fn manifest_count(&self) -> usize {
self.manifests.len()
}
#[must_use]
pub fn total_records(&self) -> usize {
self.manifests.iter().map(|m| m.records.len()).sum()
}
#[must_use]
pub fn find_cross_node_duplicates(&self) -> Vec<CrossNodeGroup> {
let mut groups = Vec::new();
let all: Vec<(&str, &FileRecord)> = self
.manifests
.iter()
.flat_map(|m| m.records.iter().map(move |r| (m.node_id.as_str(), r)))
.collect();
let mut by_digest: HashMap<&str, Vec<(&str, &FileRecord)>> = HashMap::new();
for &(node, rec) in &all {
by_digest
.entry(rec.blake3_hex.as_str())
.or_default()
.push((node, rec));
}
let mut exact_uris: std::collections::HashSet<String> =
std::collections::HashSet::new();
for (_digest, records) in &by_digest {
if records.len() < 2 {
continue;
}
let nodes: std::collections::HashSet<&str> =
records.iter().map(|(n, _)| *n).collect();
if nodes.len() < 2 {
continue;
}
let uris: Vec<String> = records.iter().map(|(_, r)| r.uri.clone()).collect();
for u in &uris {
exact_uris.insert(u.clone());
}
groups.push(CrossNodeGroup {
uris,
method: DuplicateMethod::ExactHash,
phash_distance: Some(0),
});
}
let phash_candidates: Vec<(&str, &FileRecord)> = all
.iter()
.filter(|(_, r)| {
r.phash.is_some()
&& !exact_uris.contains(&r.uri)
&& r.file_size
.map(|s| s >= self.config.min_file_size)
.unwrap_or(true)
})
.copied()
.collect();
let n = phash_candidates.len();
let mut grouped = vec![false; n];
for i in 0..n {
if grouped[i] {
continue;
}
let (node_i, rec_i) = phash_candidates[i];
let mut grp_uris = vec![rec_i.uri.clone()];
let mut min_dist = u32::MAX;
for j in (i + 1)..n {
if grouped[j] {
continue;
}
let (node_j, rec_j) = phash_candidates[j];
if node_i == node_j {
continue;
}
if let (Some(d1), Some(d2)) =
(rec_i.duration_s, rec_j.duration_s)
{
if (d1 - d2).abs() > self.config.duration_tolerance_s {
continue;
}
}
if let Some(dist) = rec_i.phash_distance(rec_j) {
if dist <= self.config.phash_max_distance {
grp_uris.push(rec_j.uri.clone());
grouped[j] = true;
if dist < min_dist {
min_dist = dist;
}
}
}
}
if grp_uris.len() >= 2 {
grouped[i] = true;
groups.push(CrossNodeGroup {
uris: grp_uris,
method: DuplicateMethod::PerceptualHash,
phash_distance: if min_dist == u32::MAX {
None
} else {
Some(min_dist)
},
});
}
}
groups
}
#[must_use]
pub fn cross_node_summary(&self) -> CrossNodeSummary {
let groups = self.find_cross_node_duplicates();
let exact_groups = groups
.iter()
.filter(|g| g.method == DuplicateMethod::ExactHash)
.count();
let perceptual_groups = groups
.iter()
.filter(|g| g.method == DuplicateMethod::PerceptualHash)
.count();
let total_duplicate_files: usize = groups.iter().map(|g| g.uris.len()).sum();
CrossNodeSummary {
total_groups: groups.len(),
exact_groups,
perceptual_groups,
total_duplicate_files,
}
}
}
#[derive(Debug, Clone)]
pub struct CrossNodeSummary {
pub total_groups: usize,
pub exact_groups: usize,
pub perceptual_groups: usize,
pub total_duplicate_files: usize,
}
impl Default for NetworkDedupEngine {
fn default() -> Self {
Self::new(NetworkDedupConfig::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_record(uri: &str, digest: &str, phash: Option<u64>, dur: Option<f64>) -> FileRecord {
FileRecord::new(
uri.to_string(),
digest.to_string(),
phash,
dur,
Some(1_000_000),
)
}
fn two_node_exact() -> NetworkDedupEngine {
let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
let digest = "a".repeat(64);
let mut ma = NodeManifest::new("node-a".to_string());
ma.add_file(make_record("node-a:/movie.mp4", &digest, None, Some(3600.0)));
let mut mb = NodeManifest::new("node-b".to_string());
mb.add_file(make_record(
"node-b:/backup/movie.mp4",
&digest,
None,
Some(3600.0),
));
engine.add_manifest(ma);
engine.add_manifest(mb);
engine
}
#[test]
fn test_exact_cross_node_duplicate() {
let engine = two_node_exact();
let groups = engine.find_cross_node_duplicates();
assert_eq!(groups.len(), 1);
assert_eq!(groups[0].method, DuplicateMethod::ExactHash);
assert_eq!(groups[0].uris.len(), 2);
}
#[test]
fn test_no_duplicate_same_node() {
let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
let digest = "b".repeat(64);
let mut ma = NodeManifest::new("node-a".to_string());
ma.add_file(make_record("node-a:/v1.mp4", &digest, None, None));
ma.add_file(make_record("node-a:/v2.mp4", &digest, None, None));
engine.add_manifest(ma);
let groups = engine.find_cross_node_duplicates();
assert!(groups.is_empty(), "same-node duplicates must be excluded");
}
#[test]
fn test_perceptual_cross_node_match() {
let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
let base: u64 = 0xFF00_FF00_FF00_FF00;
let close: u64 = base ^ 0b11;
let mut ma = NodeManifest::new("node-a".to_string());
ma.add_file(make_record(
"node-a:/clip.mp4",
&"c".repeat(64),
Some(base),
Some(60.0),
));
let mut mb = NodeManifest::new("node-b".to_string());
mb.add_file(make_record(
"node-b:/clip_re.mp4",
&"d".repeat(64),
Some(close),
Some(60.0),
));
engine.add_manifest(ma);
engine.add_manifest(mb);
let groups = engine.find_cross_node_duplicates();
let perceptual: Vec<_> = groups
.iter()
.filter(|g| g.method == DuplicateMethod::PerceptualHash)
.collect();
assert_eq!(perceptual.len(), 1);
assert_eq!(perceptual[0].phash_distance, Some(2));
}
#[test]
fn test_duration_guard_excludes_mismatch() {
let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
let base: u64 = 0xAAAA_AAAA_AAAA_AAAA;
let mut ma = NodeManifest::new("node-a".to_string());
ma.add_file(make_record(
"node-a:/short.mp4",
&"e".repeat(64),
Some(base),
Some(30.0),
));
let mut mb = NodeManifest::new("node-b".to_string());
mb.add_file(make_record(
"node-b:/long.mp4",
&"f".repeat(64),
Some(base ^ 1), Some(90.0),
));
engine.add_manifest(ma);
engine.add_manifest(mb);
let groups = engine.find_cross_node_duplicates();
let perceptual: Vec<_> = groups
.iter()
.filter(|g| g.method == DuplicateMethod::PerceptualHash)
.collect();
assert!(perceptual.is_empty(), "duration guard should exclude this pair");
}
#[test]
fn test_manifest_serialise_roundtrip() {
let mut m = NodeManifest::new("node-z".to_string());
m.add_file(FileRecord::new(
"node-z:/test.mp4".to_string(),
"0".repeat(64),
Some(12345),
Some(99.9),
Some(1024),
));
let json = m.to_json().expect("serialise should succeed");
let m2 = NodeManifest::from_json(&json).expect("deserialise should succeed");
assert_eq!(m2.node_id, "node-z");
assert_eq!(m2.records.len(), 1);
assert_eq!(m2.records[0].phash, Some(12345));
}
#[test]
fn test_file_record_valid_digest() {
let good = FileRecord::new(
"n:/f.mp4".to_string(),
"a1b2c3".repeat(10) + "a1b2c3", None,
None,
None,
);
assert!(!good.has_valid_digest());
let valid = FileRecord::new(
"n:/f.mp4".to_string(),
"0".repeat(64),
None,
None,
None,
);
assert!(valid.has_valid_digest());
}
#[test]
fn test_phash_distance_calculation() {
let a = FileRecord::new("n:/a.mp4".to_string(), "0".repeat(64), Some(0xFF), None, None);
let b = FileRecord::new("n:/b.mp4".to_string(), "0".repeat(64), Some(0xFE), None, None);
assert_eq!(a.phash_distance(&b), Some(1));
let no_hash = FileRecord::new("n:/c.mp4".to_string(), "0".repeat(64), None, None, None);
assert_eq!(a.phash_distance(&no_hash), None);
}
#[test]
fn test_cross_node_summary() {
let engine = two_node_exact();
let summary = engine.cross_node_summary();
assert_eq!(summary.total_groups, 1);
assert_eq!(summary.exact_groups, 1);
assert_eq!(summary.perceptual_groups, 0);
assert_eq!(summary.total_duplicate_files, 2);
}
#[test]
fn test_empty_engine() {
let engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
assert_eq!(engine.manifest_count(), 0);
assert_eq!(engine.total_records(), 0);
let groups = engine.find_cross_node_duplicates();
assert!(groups.is_empty());
}
#[test]
fn test_node_name_extraction() {
let rec = FileRecord::new(
"node-alpha:/path/to/file.mp4".to_string(),
"0".repeat(64),
None,
None,
None,
);
assert_eq!(rec.node_name(), Some("node-alpha"));
}
#[test]
fn test_three_node_perceptual_cluster() {
let base: u64 = 0x0F0F_0F0F_0F0F_0F0F;
let mut engine = NetworkDedupEngine::new(NetworkDedupConfig::default());
for (node, delta) in [("n1", 0u64), ("n2", 0b1), ("n3", 0b11)] {
let mut m = NodeManifest::new(node.to_string());
m.add_file(make_record(
&format!("{node}:/clip.mp4"),
&format!("{:0>64}", node),
Some(base ^ delta),
Some(120.0),
));
engine.add_manifest(m);
}
let groups = engine.find_cross_node_duplicates();
let perceptual_total: usize = groups
.iter()
.filter(|g| g.method == DuplicateMethod::PerceptualHash)
.map(|g| g.uris.len())
.sum();
assert!(
perceptual_total >= 2,
"expected at least 2 files in perceptual groups, got {perceptual_total}"
);
}
}