Skip to main content

camel_processor/
idempotent_consumer.rs

1//! Idempotent Consumer EIP — outcome-aware Segment implementation.
2//!
3//! Implements the Idempotent Consumer pattern at the `OutcomePipeline` layer
4//! (one layer above Tower), so that `PipelineOutcome::Stopped` from the child
5//! sub-pipeline propagates with the Exchange intact (ADR-0024, ADR-0025).
6//!
7//! # Why Segment-mode (NOT Process-mode)
8//!
9//! `compose_pipeline()` translates `PipelineOutcome::Stopped → Ok(ex)` for
10//! Tower `Service<Exchange>` consumers. If this EIP were a Tower Service,
11//! a duplicate-detected `Stopped` would become `Ok(ex)` and downstream steps
12//! would re-process the duplicate. By implementing `OutcomePipeline` directly,
13//! the segment returns `PipelineOutcome::Completed(ex)` for duplicates and
14//! skips the child sub-pipeline entirely.
15//!
16//! # Contract C1
17//!
18//! `IdempotentRepository::contains` returns `Result<bool, CamelError>` because
19//! backends (Redis, JDBC) can have transient read failures. The segment
20//! propagates `Err` as `PipelineOutcome::Failed` — it NEVER treats a failed
21//! read as "not a duplicate."
22
23use std::future::Future;
24use std::pin::Pin;
25use std::sync::Arc;
26
27use camel_api::{Exchange, IdempotentRepository, OutcomePipeline, OutcomeSegment, PipelineOutcome};
28
29/// Synchronous closure that extracts the message-id key from an Exchange.
30///
31/// Returns `None` when no key can be extracted — in that case the segment
32/// forwards the exchange to the child sub-pipeline without idempotency
33/// (matching Apache Camel semantics).
34pub type MessageIdExpression = Arc<dyn Fn(&Exchange) -> Option<String> + Send + Sync>;
35
36/// Outcome-aware Idempotent Consumer segment.
37///
38/// Wraps a child sub-pipeline (`OutcomeSegment`) and a named
39/// `IdempotentRepository`. On each exchange:
40///
41/// 1. Extract message-id via `message_id(&exchange)`. `None` → forward to child.
42/// 2. Check `repo.contains(&key)`:
43///    - `Ok(true)` → duplicate, return `Completed(exchange)` (skip child).
44///    - `Ok(false)` → proceed.
45///    - `Err(e)` → `Failed(e)` (contract C1).
46/// 3. If `eager`: `repo.add(&key)` before child. `Ok(false)` = race duplicate
47///    → `Completed`. `Err` → `Failed`.
48/// 4. Run child pipeline.
49/// 5. Post-child:
50///    - `Completed` + non-eager → `repo.add(&key)` (best-effort).
51///    - `Stopped` → no add (pipeline terminating).
52///    - `Failed` + `eager` + `remove_on_failure` → `repo.remove(&key)` (best-effort).
53///
54/// # Eager mode + Stopped
55///
56/// In eager mode, the key is added BEFORE the child pipeline runs. If the child
57/// returns `Stopped`, the key remains registered — subsequent deliveries of the
58/// same message ID are treated as duplicates and return `Completed` immediately.
59/// This is intentional: a Stop indicates the route chose to terminate, and
60/// re-processing the same message would be incorrect.
61pub struct IdempotentConsumerSegment {
62    repository: Arc<dyn IdempotentRepository>,
63    message_id: MessageIdExpression,
64    child_pipeline: OutcomeSegment,
65    eager: bool,
66    remove_on_failure: bool,
67}
68
69impl IdempotentConsumerSegment {
70    /// Build a new segment from a resolved repository, message-id extractor,
71    /// child sub-pipeline, and behaviour flags.
72    pub fn new(
73        repository: Arc<dyn IdempotentRepository>,
74        message_id: MessageIdExpression,
75        child_pipeline: OutcomeSegment,
76        eager: bool,
77        remove_on_failure: bool,
78    ) -> Self {
79        Self {
80            repository,
81            message_id,
82            child_pipeline,
83            eager,
84            remove_on_failure,
85        }
86    }
87}
88
89impl Clone for IdempotentConsumerSegment {
90    fn clone(&self) -> Self {
91        Self {
92            repository: Arc::clone(&self.repository),
93            message_id: Arc::clone(&self.message_id),
94            child_pipeline: self.child_pipeline.clone(),
95            eager: self.eager,
96            remove_on_failure: self.remove_on_failure,
97        }
98    }
99}
100
101impl OutcomePipeline for IdempotentConsumerSegment {
102    fn clone_box(&self) -> Box<dyn OutcomePipeline> {
103        Box::new(self.clone())
104    }
105
106    fn run<'a>(
107        &'a mut self,
108        exchange: Exchange,
109    ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
110        Box::pin(async move {
111            // 1. Extract message-id. None → no idempotency possible, forward to child.
112            let key = match (self.message_id)(&exchange) {
113                Some(k) => k,
114                None => return self.child_pipeline.run(exchange).await,
115            };
116
117            // 2. Check repository (contract C1: propagate Err, never treat as non-duplicate).
118            match self.repository.contains(&key).await {
119                Ok(true) => return PipelineOutcome::Completed(exchange),
120                Ok(false) => {}
121                Err(e) => return PipelineOutcome::Failed(e),
122            }
123
124            // 3. Eager mode: reserve the key before running the child.
125            if self.eager {
126                match self.repository.add(&key).await {
127                    Ok(true) => {}                                            // reserved, proceed
128                    Ok(false) => return PipelineOutcome::Completed(exchange), // race duplicate
129                    Err(e) => return PipelineOutcome::Failed(e),
130                }
131            }
132
133            // 4. Run child sub-pipeline.
134            let outcome = self.child_pipeline.run(exchange).await;
135
136            // 5. Post-child bookkeeping (best-effort; log transient errors).
137            match &outcome {
138                PipelineOutcome::Completed(_) if !self.eager => {
139                    if let Err(e) = self.repository.add(&key).await {
140                        tracing::warn!(
141                            error = %e,
142                            repository = %self.repository.name(),
143                            key = %key,
144                            "idempotent repository add failed post-success; next delivery may re-process"
145                        );
146                    }
147                }
148                PipelineOutcome::Failed(_) if self.eager && self.remove_on_failure => {
149                    if let Err(e) = self.repository.remove(&key).await {
150                        tracing::warn!(
151                            error = %e,
152                            repository = %self.repository.name(),
153                            key = %key,
154                            "idempotent repository remove failed on failure rollback"
155                        );
156                    }
157                }
158                _ => {}
159            }
160
161            outcome
162        })
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use async_trait::async_trait;
170    use camel_api::{CamelError, Exchange, Message, Value};
171    use std::collections::HashSet;
172    use tokio::sync::Mutex;
173
174    // ── Test helpers ──
175
176    /// In-memory mock repository backed by a `Mutex<HashSet>`. Allows tests
177    /// to pre-populate keys and inspect state after the segment runs.
178    #[derive(Debug, Default)]
179    struct MockRepo {
180        keys: Mutex<HashSet<String>>,
181        fail_contains: bool,
182    }
183
184    impl MockRepo {
185        fn new() -> Self {
186            Self::default()
187        }
188
189        fn failing_contains() -> Self {
190            Self {
191                keys: Mutex::new(HashSet::new()),
192                fail_contains: true,
193            }
194        }
195
196        async fn pre_add(&self, key: &str) {
197            self.keys.lock().await.insert(key.to_string());
198        }
199
200        async fn contains_key(&self, key: &str) -> bool {
201            self.keys.lock().await.contains(key)
202        }
203    }
204
205    #[async_trait]
206    impl IdempotentRepository for MockRepo {
207        fn name(&self) -> &str {
208            "mock"
209        }
210
211        async fn contains(&self, key: &str) -> Result<bool, CamelError> {
212            if self.fail_contains {
213                return Err(CamelError::ProcessorError(
214                    "synthetic contains failure".into(),
215                ));
216            }
217            Ok(self.keys.lock().await.contains(key))
218        }
219
220        async fn add(&self, key: &str) -> Result<bool, CamelError> {
221            let mut guard = self.keys.lock().await;
222            Ok(guard.insert(key.to_string()))
223        }
224
225        async fn remove(&self, key: &str) -> Result<(), CamelError> {
226            self.keys.lock().await.remove(key);
227            Ok(())
228        }
229
230        async fn clear(&self) -> Result<(), CamelError> {
231            self.keys.lock().await.clear();
232            Ok(())
233        }
234    }
235
236    /// Test segment that returns a configurable outcome and records whether
237    /// it was invoked. Used to verify the segment's child-dispatch behaviour.
238    struct ScriptedChild {
239        outcome: PipelineOutcome,
240        invoked: Arc<std::sync::atomic::AtomicBool>,
241    }
242
243    impl OutcomePipeline for ScriptedChild {
244        fn clone_box(&self) -> Box<dyn OutcomePipeline> {
245            // clone_box is required by the trait but unused by these tests
246            // (each test constructs a fresh segment).
247            unreachable!("clone_box not used in idempotent_consumer tests")
248        }
249
250        fn run<'a>(
251            &'a mut self,
252            exchange: Exchange,
253        ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
254            self.invoked
255                .store(true, std::sync::atomic::Ordering::SeqCst);
256            let outcome = std::mem::replace(
257                &mut self.outcome,
258                PipelineOutcome::Completed(Exchange::new(Message::new(""))),
259            );
260            Box::pin(async move { outcome_with_exchange(outcome, exchange) })
261        }
262    }
263
264    /// Replace the placeholder exchange inside a scripted outcome with the
265    /// actual exchange passed to `run`. Keeps the variant, swaps the payload.
266    fn outcome_with_exchange(outcome: PipelineOutcome, exchange: Exchange) -> PipelineOutcome {
267        match outcome {
268            PipelineOutcome::Completed(_) => PipelineOutcome::Completed(exchange),
269            PipelineOutcome::Stopped(_) => PipelineOutcome::Stopped(exchange),
270            PipelineOutcome::Failed(e) => PipelineOutcome::Failed(e),
271        }
272    }
273
274    fn exchange_with_id(id: &str) -> Exchange {
275        let mut ex = Exchange::new(Message::new("payload"));
276        ex.input.set_header("messageId", Value::String(id.into()));
277        ex
278    }
279
280    fn header_message_id() -> MessageIdExpression {
281        Arc::new(|ex: &Exchange| {
282            ex.input
283                .header("messageId")
284                .and_then(|v| v.as_str().map(|s| s.to_string()))
285        })
286    }
287
288    fn build_segment(
289        repo: Arc<MockRepo>,
290        child_outcome: PipelineOutcome,
291        eager: bool,
292        remove_on_failure: bool,
293    ) -> (
294        IdempotentConsumerSegment,
295        Arc<std::sync::atomic::AtomicBool>,
296    ) {
297        let invoked = Arc::new(std::sync::atomic::AtomicBool::new(false));
298        let child = ScriptedChild {
299            outcome: child_outcome,
300            invoked: invoked.clone(),
301        };
302        let segment = IdempotentConsumerSegment::new(
303            repo,
304            header_message_id(),
305            OutcomeSegment::new(Box::new(child)),
306            eager,
307            remove_on_failure,
308        );
309        (segment, invoked)
310    }
311
312    // ── Test 1: duplicate key short-circuits to Completed without touching child ──
313    #[tokio::test]
314    async fn duplicate_key_returns_completed_without_running_child() {
315        let repo = Arc::new(MockRepo::new());
316        repo.pre_add("dup-1").await;
317        let (mut segment, child_invoked) = build_segment(
318            repo.clone(),
319            PipelineOutcome::Failed(stub_error()),
320            false,
321            false,
322        );
323
324        let ex = exchange_with_id("dup-1");
325        let outcome = segment.run(ex).await;
326
327        assert!(matches!(outcome, PipelineOutcome::Completed(_)));
328        assert!(
329            !child_invoked.load(std::sync::atomic::Ordering::SeqCst),
330            "child must NOT run when key is a duplicate"
331        );
332        // key still present
333        assert!(repo.contains_key("dup-1").await);
334    }
335
336    // ── Test 2: new key runs the child and returns its outcome ──
337    #[tokio::test]
338    async fn new_key_runs_child_and_returns_child_outcome() {
339        let repo = Arc::new(MockRepo::new());
340        let (mut segment, child_invoked) = build_segment(
341            repo.clone(),
342            PipelineOutcome::Completed(Exchange::new(Message::new(""))),
343            false,
344            false,
345        );
346
347        let ex = exchange_with_id("new-1");
348        let outcome = segment.run(ex).await;
349
350        assert!(matches!(outcome, PipelineOutcome::Completed(_)));
351        assert!(
352            child_invoked.load(std::sync::atomic::Ordering::SeqCst),
353            "child MUST run when key is new"
354        );
355        // Non-eager mode: key added after child Completed.
356        assert!(
357            repo.contains_key("new-1").await,
358            "non-eager mode must add key after successful child run"
359        );
360    }
361
362    // ── Test 3: failed repo read propagates as Failed (contract C1) ──
363    #[tokio::test]
364    async fn failed_repo_read_propagates_error() {
365        let repo = Arc::new(MockRepo::failing_contains());
366        let (mut segment, child_invoked) = build_segment(
367            repo,
368            PipelineOutcome::Completed(stub_exchange()),
369            false,
370            false,
371        );
372
373        let ex = exchange_with_id("any");
374        let outcome = segment.run(ex).await;
375
376        match outcome {
377            PipelineOutcome::Failed(e) => {
378                let msg = e.to_string();
379                assert!(
380                    msg.contains("synthetic contains failure"),
381                    "expected synthetic failure in error, got: {msg}"
382                );
383            }
384            other => panic!("expected Failed, got {other:?}"),
385        }
386        assert!(
387            !child_invoked.load(std::sync::atomic::Ordering::SeqCst),
388            "child must NOT run when repo read fails"
389        );
390    }
391
392    // ── Test 4: Stopped from child propagates as Stopped (segment-mode contract) ──
393    #[tokio::test]
394    async fn stopped_child_propagates_stopped() {
395        let repo = Arc::new(MockRepo::new());
396        let (mut segment, child_invoked) = build_segment(
397            repo.clone(),
398            PipelineOutcome::Stopped(stub_exchange()),
399            false,
400            false,
401        );
402
403        let ex = exchange_with_id("stop-1");
404        let outcome = segment.run(ex).await;
405
406        assert!(
407            matches!(outcome, PipelineOutcome::Stopped(_)),
408            "Stopped from child MUST propagate as Stopped (segment-mode contract)"
409        );
410        assert!(
411            child_invoked.load(std::sync::atomic::Ordering::SeqCst),
412            "child must run before its Stopped can propagate"
413        );
414        // Stopped path must NOT register the key (pipeline terminating).
415        assert!(
416            !repo.contains_key("stop-1").await,
417            "Stopped outcome must not register the key"
418        );
419    }
420
421    // ── Test 5: eager mode + remove_on_failure rolls back key on Failed ──
422    #[tokio::test]
423    async fn eager_mode_removes_key_on_failure_when_configured() {
424        let repo = Arc::new(MockRepo::new());
425        let (mut segment, _child_invoked) = build_segment(
426            repo.clone(),
427            PipelineOutcome::Failed(stub_error()),
428            true, // eager
429            true, // remove_on_failure
430        );
431
432        let ex = exchange_with_id("eager-fail");
433        let outcome = segment.run(ex).await;
434
435        assert!(matches!(outcome, PipelineOutcome::Failed(_)));
436        assert!(
437            !repo.contains_key("eager-fail").await,
438            "eager + remove_on_failure must roll back the key on failure"
439        );
440    }
441
442    // ── Test 6: missing message-id forwards to child without idempotency ──
443    #[tokio::test]
444    async fn missing_message_id_forwards_to_child() {
445        let repo = Arc::new(MockRepo::new());
446        let (mut segment, child_invoked) = build_segment(
447            repo.clone(),
448            PipelineOutcome::Completed(stub_exchange()),
449            false,
450            false,
451        );
452
453        // Exchange WITHOUT a messageId header.
454        let ex = Exchange::new(Message::new("no-id"));
455        let outcome = segment.run(ex).await;
456
457        assert!(matches!(outcome, PipelineOutcome::Completed(_)));
458        assert!(
459            child_invoked.load(std::sync::atomic::Ordering::SeqCst),
460            "child must run when message-id cannot be extracted"
461        );
462    }
463
464    fn stub_exchange() -> Exchange {
465        Exchange::new(Message::new(""))
466    }
467
468    fn stub_error() -> CamelError {
469        CamelError::ProcessorError("child failed".into())
470    }
471}