coreon-eip 0.1.0

Enterprise Integration Pattern processors for camel-rs.
Documentation
//! IdempotentConsumer — suppress exchanges whose extracted key has been seen.
//!
//! MVP uses an in-memory `HashSet`; a future revision can plug in pluggable
//! repositories (redis, etc.) behind a trait.

use async_trait::async_trait;
use coreon_core::{Exchange, Processor, Result};
use std::{
    collections::HashSet,
    sync::Arc,
};
use tokio::sync::Mutex;

pub type KeyFn = dyn Fn(&Exchange) -> Option<String> + Send + Sync;

pub struct IdempotentConsumer {
    key: Arc<KeyFn>,
    seen: Mutex<HashSet<String>>,
    then: Arc<dyn Processor>,
}

impl IdempotentConsumer {
    pub fn new<F>(key: F, then: Arc<dyn Processor>) -> Arc<Self>
    where
        F: Fn(&Exchange) -> Option<String> + Send + Sync + 'static,
    {
        Arc::new(Self {
            key: Arc::new(key),
            seen: Mutex::new(HashSet::new()),
            then,
        })
    }
}

#[async_trait]
impl Processor for IdempotentConsumer {
    async fn process(&self, exchange: &mut Exchange) -> Result<()> {
        let Some(k) = (self.key)(exchange) else {
            // No key → pass through (matches Camel's skipDuplicate=false default).
            return self.then.process(exchange).await;
        };
        let inserted = {
            let mut set = self.seen.lock().await;
            set.insert(k)
        };
        if inserted {
            self.then.process(exchange).await
        } else {
            exchange
                .properties
                .insert("camel.idempotent.duplicate".into(), "true".into());
            Ok(())
        }
    }
}