use std::{
collections::HashSet,
fs::File,
io::{BufRead, BufReader, BufWriter, Write},
path::PathBuf,
};
use async_trait::async_trait;
use tokio::sync::Mutex;
use crate::error::KumoError;
use super::Pipeline;
struct DropDuplicatesInner {
seen: HashSet<String>,
writer: Option<BufWriter<File>>,
}
pub struct DropDuplicates {
field: String,
inner: Mutex<DropDuplicatesInner>,
}
impl DropDuplicates {
pub fn by_field(field: impl Into<String>) -> Self {
Self {
field: field.into(),
inner: Mutex::new(DropDuplicatesInner {
seen: HashSet::new(),
writer: None,
}),
}
}
pub fn with_persistence(
field: impl Into<String>,
path: impl Into<PathBuf>,
) -> Result<Self, crate::error::KumoError> {
let path = path.into();
if let Some(parent) = path.parent()
&& !parent.as_os_str().is_empty()
{
std::fs::create_dir_all(parent)
.map_err(|e| crate::error::KumoError::store("dedup persistence", e))?;
}
let seen: HashSet<String> = if path.exists() {
let file = File::open(&path)
.map_err(|e| crate::error::KumoError::store("dedup persistence", e))?;
BufReader::new(file)
.lines()
.map_while(|l| l.ok())
.filter(|l| !l.is_empty())
.collect()
} else {
HashSet::new()
};
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.map_err(|e| crate::error::KumoError::store("dedup persistence", e))?;
Ok(Self {
field: field.into(),
inner: Mutex::new(DropDuplicatesInner {
seen,
writer: Some(BufWriter::new(file)),
}),
})
}
}
#[async_trait]
impl Pipeline for DropDuplicates {
async fn process(
&self,
item: serde_json::Value,
) -> Result<Option<serde_json::Value>, KumoError> {
let key = item
.get(&self.field)
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or_else(|| item.to_string());
let mut inner = self.inner.lock().await;
if inner.seen.contains(&key) {
tracing::debug!(
target: "kumo::item",
field = %self.field,
key = %key,
reason = "duplicate",
"item.drop"
);
return Ok(None);
}
if let Some(ref mut writer) = inner.writer {
writeln!(writer, "{key}").map_err(|e| KumoError::store("dedup persistence", e))?;
}
inner.seen.insert(key);
Ok(Some(item))
}
}