use std::collections::{BTreeMap, HashSet};
use std::fmt::Write as _;
use std::fs;
use std::path::{Path, PathBuf};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use sha2::{Digest, Sha256};
#[derive(Debug, Clone)]
pub struct DedupStore {
path: PathBuf,
}
#[derive(Debug, Default, Serialize, Deserialize)]
struct SeenSet {
#[serde(default)]
hashes: BTreeMap<String, String>,
}
fn prune_in_place(set: &mut SeenSet, cutoff: chrono::DateTime<chrono::Utc>) -> usize {
let before = set.hashes.len();
set.hashes.retain(|_, ts| {
match chrono::DateTime::parse_from_rfc3339(ts) {
Ok(seen) => seen.with_timezone(&chrono::Utc) >= cutoff,
Err(_) => true,
}
});
before - set.hashes.len()
}
impl DedupStore {
pub fn open(dir: impl AsRef<Path>, store: &str) -> std::io::Result<Self> {
let dir = dir.as_ref().to_path_buf();
fs::create_dir_all(&dir)?;
Ok(Self {
path: dir.join(format!("{}.json", sanitize(store))),
})
}
fn read(&self) -> std::io::Result<SeenSet> {
match fs::read_to_string(&self.path) {
Ok(json) => serde_json::from_str(&json)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(SeenSet::default()),
Err(e) => Err(e),
}
}
fn write(&self, set: &SeenSet) -> std::io::Result<()> {
let json = serde_json::to_string_pretty(set)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let parent = self.path.parent().unwrap_or_else(|| Path::new("."));
let tmp = parent.join(format!(".dedup-{}.tmp", uuid::Uuid::new_v4().simple()));
fs::write(&tmp, json)?;
fs::rename(&tmp, &self.path)?;
Ok(())
}
fn lock(&self) -> std::io::Result<fs::File> {
let mut p = self.path.as_os_str().to_owned();
p.push(".lock");
let f = fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(PathBuf::from(p))?;
f.lock()?;
Ok(f)
}
pub fn filter_unseen(
&self,
items: &[Value],
hash_fields: &[String],
now: chrono::DateTime<chrono::Utc>,
ttl: Option<chrono::Duration>,
) -> std::io::Result<Vec<Value>> {
let _lock = self.lock()?;
let mut set = self.read()?;
let evicted = match ttl {
Some(ttl) => prune_in_place(&mut set, now - ttl),
None => 0,
};
let now_str = now.to_rfc3339();
let mut unseen = Vec::new();
let mut batch: HashSet<String> = HashSet::new();
for item in items {
let h = content_hash(item, hash_fields);
if set.hashes.contains_key(&h) || !batch.insert(h.clone()) {
continue;
}
set.hashes.insert(h, now_str.clone());
unseen.push(item.clone());
}
if !unseen.is_empty() || evicted > 0 {
self.write(&set)?;
}
Ok(unseen)
}
pub fn prune_older_than(
&self,
cutoff: chrono::DateTime<chrono::Utc>,
) -> std::io::Result<usize> {
let _lock = self.lock()?;
let mut set = self.read()?;
let removed = prune_in_place(&mut set, cutoff);
if removed > 0 {
self.write(&set)?;
}
Ok(removed)
}
}
pub fn content_hash(item: &Value, hash_fields: &[String]) -> String {
let basis = if hash_fields.is_empty() {
item.clone()
} else {
let mut obj = serde_json::Map::new();
for f in hash_fields {
obj.insert(f.clone(), item.get(f).cloned().unwrap_or(Value::Null));
}
Value::Object(obj)
};
let mut hasher = Sha256::new();
hasher.update(canonicalize(&basis).as_bytes());
to_hex(&hasher.finalize())
}
fn canonicalize(v: &Value) -> String {
match v {
Value::Object(map) => {
let mut entries: Vec<(&String, &Value)> = map.iter().collect();
entries.sort_by(|a, b| a.0.cmp(b.0));
let inner: Vec<String> = entries
.iter()
.map(|(k, val)| {
format!(
"{}:{}",
serde_json::to_string(k).unwrap_or_default(),
canonicalize(val)
)
})
.collect();
format!("{{{}}}", inner.join(","))
}
Value::Array(arr) => {
let inner: Vec<String> = arr.iter().map(canonicalize).collect();
format!("[{}]", inner.join(","))
}
other => other.to_string(),
}
}
fn to_hex(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
let _ = write!(s, "{b:02x}");
}
s
}
fn sanitize(store: &str) -> String {
let cleaned: String = store
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect();
if cleaned.is_empty() {
"default".to_string()
} else {
cleaned
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{Duration, TimeZone, Utc};
use serde_json::json;
fn tmp_dir(tag: &str) -> PathBuf {
std::env::temp_dir().join(format!(
"car-dedup-{tag}-{}",
&uuid::Uuid::new_v4().simple().to_string()[..8]
))
}
fn at(secs: i64) -> chrono::DateTime<Utc> {
Utc.timestamp_opt(secs, 0).unwrap()
}
#[test]
fn first_run_passes_all_then_skips_seen() {
let dir = tmp_dir("basic");
let store = DedupStore::open(&dir, "src").unwrap();
let items = vec![json!({"id": 1}), json!({"id": 2})];
let first = store.filter_unseen(&items, &[], at(0), None).unwrap();
assert_eq!(first.len(), 2, "all new on first run");
let store2 = DedupStore::open(&dir, "src").unwrap();
let second = store2.filter_unseen(&items, &[], at(1), None).unwrap();
assert!(second.is_empty(), "nothing new on second run");
let mut more = items.clone();
more.push(json!({"id": 3}));
let third = store2.filter_unseen(&more, &[], at(2), None).unwrap();
assert_eq!(third, vec![json!({"id": 3})]);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn within_batch_duplicates_collapse() {
let dir = tmp_dir("batch");
let store = DedupStore::open(&dir, "s").unwrap();
let items = vec![json!({"id": 1}), json!({"id": 1}), json!({"id": 2})];
let unseen = store.filter_unseen(&items, &[], at(0), None).unwrap();
assert_eq!(unseen, vec![json!({"id": 1}), json!({"id": 2})]);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn hash_fields_key_on_identity_not_full_content() {
let dir = tmp_dir("fields");
let store = DedupStore::open(&dir, "s").unwrap();
let v1 = vec![json!({"id": 7, "title": "old"})];
assert_eq!(store.filter_unseen(&v1, &["id".into()], at(0), None).unwrap().len(), 1);
let v2 = vec![json!({"id": 7, "title": "new"})];
assert!(store.filter_unseen(&v2, &["id".into()], at(1), None).unwrap().is_empty());
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn key_order_does_not_affect_hash() {
assert_eq!(
content_hash(&json!({"a": 1, "b": 2}), &[]),
content_hash(&json!({"b": 2, "a": 1}), &[]),
);
}
#[test]
fn no_write_when_nothing_new() {
let dir = tmp_dir("nowrite");
let store = DedupStore::open(&dir, "s").unwrap();
assert!(store.filter_unseen(&[], &[], at(0), None).unwrap().is_empty());
assert!(!dir.join("s.json").exists(), "no write on empty input");
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn namespace_is_sanitized() {
let dir = tmp_dir("sanitize");
let store = DedupStore::open(&dir, "../escape/../x").unwrap();
store.filter_unseen(&[json!(1)], &[], at(0), None).unwrap();
let entries: Vec<_> = fs::read_dir(&dir).unwrap().filter_map(|e| e.ok()).collect();
assert!(entries.iter().all(|e| e.path().starts_with(&dir)));
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn ttl_evicts_expired_and_allows_reprocess() {
let dir = tmp_dir("ttl");
let store = DedupStore::open(&dir, "s").unwrap();
let ttl = Duration::seconds(3600);
let items = vec![json!({"id": 1})];
assert_eq!(store.filter_unseen(&items, &[], at(1_000_000), Some(ttl)).unwrap().len(), 1);
assert!(store
.filter_unseen(&items, &[], at(1_001_800), Some(ttl))
.unwrap()
.is_empty());
assert_eq!(
store.filter_unseen(&items, &[], at(1_007_200), Some(ttl)).unwrap().len(),
1
);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn prune_older_than_removes_only_expired() {
let dir = tmp_dir("prune");
let store = DedupStore::open(&dir, "s").unwrap();
store.filter_unseen(&[json!({"id": 1})], &[], at(1_000_000), None).unwrap();
store.filter_unseen(&[json!({"id": 2})], &[], at(2_000_000), None).unwrap();
assert_eq!(store.prune_older_than(at(1_500_000)).unwrap(), 1);
assert!(store
.filter_unseen(&[json!({"id": 2})], &[], at(2_000_001), None)
.unwrap()
.is_empty());
assert_eq!(
store.filter_unseen(&[json!({"id": 1})], &[], at(2_000_002), None).unwrap().len(),
1
);
let _ = fs::remove_dir_all(&dir);
}
#[test]
fn prune_keeps_unparseable_timestamps() {
let mut set = SeenSet::default();
set.hashes.insert("good".into(), at(0).to_rfc3339());
set.hashes.insert("weird".into(), "not-a-timestamp".into());
let removed = prune_in_place(&mut set, at(1_000_000));
assert_eq!(removed, 1);
assert!(set.hashes.contains_key("weird"));
assert!(!set.hashes.contains_key("good"));
}
}