use async_trait::async_trait;
use coreon_core::{Exchange, Processor, Result};
use std::{
collections::HashSet,
sync::Arc,
};
use tokio::sync::Mutex;
pub type KeyFn = dyn Fn(&Exchange) -> Option<String> + Send + Sync;
pub struct IdempotentConsumer {
key: Arc<KeyFn>,
seen: Mutex<HashSet<String>>,
then: Arc<dyn Processor>,
}
impl IdempotentConsumer {
pub fn new<F>(key: F, then: Arc<dyn Processor>) -> Arc<Self>
where
F: Fn(&Exchange) -> Option<String> + Send + Sync + 'static,
{
Arc::new(Self {
key: Arc::new(key),
seen: Mutex::new(HashSet::new()),
then,
})
}
}
#[async_trait]
impl Processor for IdempotentConsumer {
async fn process(&self, exchange: &mut Exchange) -> Result<()> {
let Some(k) = (self.key)(exchange) else {
return self.then.process(exchange).await;
};
let inserted = {
let mut set = self.seen.lock().await;
set.insert(k)
};
if inserted {
self.then.process(exchange).await
} else {
exchange
.properties
.insert("camel.idempotent.duplicate".into(), "true".into());
Ok(())
}
}
}