use crate::Error;
use async_trait::async_trait;
#[async_trait]
pub trait ProcessedEventLog: Send + Sync {
async fn try_mark_processed(&self, event_id: &str) -> Result<bool, Error>;
}
pub struct MemoryProcessedLog {
seen: dashmap::DashMap<String, ()>,
}
impl MemoryProcessedLog {
pub fn new() -> Self {
Self {
seen: dashmap::DashMap::new(),
}
}
}
impl Default for MemoryProcessedLog {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl ProcessedEventLog for MemoryProcessedLog {
async fn try_mark_processed(&self, event_id: &str) -> Result<bool, Error> {
Ok(self.seen.insert(event_id.to_string(), ()).is_none())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn memory_log_true_then_false() {
let log = MemoryProcessedLog::new();
assert!(
log.try_mark_processed("evt_001").await.unwrap(),
"first call with a new id must return Ok(true)"
);
assert!(
!log.try_mark_processed("evt_001").await.unwrap(),
"second call with the same id must return Ok(false)"
);
assert!(
log.try_mark_processed("evt_002").await.unwrap(),
"different id must return Ok(true) even after an earlier id was seen"
);
}
#[tokio::test]
async fn memory_log_concurrent_insert_applies_once() {
use std::sync::Arc;
let log = Arc::new(MemoryProcessedLog::new());
let log2 = Arc::clone(&log);
let t1 = tokio::spawn(async move { log.try_mark_processed("evt_race_001").await });
let t2 = tokio::spawn(async move { log2.try_mark_processed("evt_race_001").await });
let (r1, r2) = tokio::join!(t1, t2);
let v1 = r1.unwrap().unwrap();
let v2 = r2.unwrap().unwrap();
assert_ne!(
v1,
v2,
"concurrent inserts must apply exactly once: one Ok(true), one Ok(false), got ({v1}, {v2})"
);
}
}