1use std::future::Future;
24use std::pin::Pin;
25use std::sync::Arc;
26
27use camel_api::{Exchange, IdempotentRepository, OutcomePipeline, OutcomeSegment, PipelineOutcome};
28
29pub type MessageIdExpression = Arc<dyn Fn(&Exchange) -> Option<String> + Send + Sync>;
35
36pub struct IdempotentConsumerSegment {
62 repository: Arc<dyn IdempotentRepository>,
63 message_id: MessageIdExpression,
64 child_pipeline: OutcomeSegment,
65 eager: bool,
66 remove_on_failure: bool,
67}
68
69impl IdempotentConsumerSegment {
70 pub fn new(
73 repository: Arc<dyn IdempotentRepository>,
74 message_id: MessageIdExpression,
75 child_pipeline: OutcomeSegment,
76 eager: bool,
77 remove_on_failure: bool,
78 ) -> Self {
79 Self {
80 repository,
81 message_id,
82 child_pipeline,
83 eager,
84 remove_on_failure,
85 }
86 }
87}
88
89impl Clone for IdempotentConsumerSegment {
90 fn clone(&self) -> Self {
91 Self {
92 repository: Arc::clone(&self.repository),
93 message_id: Arc::clone(&self.message_id),
94 child_pipeline: self.child_pipeline.clone(),
95 eager: self.eager,
96 remove_on_failure: self.remove_on_failure,
97 }
98 }
99}
100
101impl OutcomePipeline for IdempotentConsumerSegment {
102 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
103 Box::new(self.clone())
104 }
105
106 fn run<'a>(
107 &'a mut self,
108 exchange: Exchange,
109 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
110 Box::pin(async move {
111 let key = match (self.message_id)(&exchange) {
113 Some(k) => k,
114 None => return self.child_pipeline.run(exchange).await,
115 };
116
117 match self.repository.contains(&key).await {
119 Ok(true) => return PipelineOutcome::Completed(exchange),
120 Ok(false) => {}
121 Err(e) => return PipelineOutcome::Failed(e),
122 }
123
124 if self.eager {
126 match self.repository.add(&key).await {
127 Ok(true) => {} Ok(false) => return PipelineOutcome::Completed(exchange), Err(e) => return PipelineOutcome::Failed(e),
130 }
131 }
132
133 let outcome = self.child_pipeline.run(exchange).await;
135
136 match &outcome {
138 PipelineOutcome::Completed(_) if !self.eager => {
139 if let Err(e) = self.repository.add(&key).await {
140 tracing::warn!(
141 error = %e,
142 repository = %self.repository.name(),
143 key = %key,
144 "idempotent repository add failed post-success; next delivery may re-process"
145 );
146 }
147 }
148 PipelineOutcome::Failed(_) if self.eager && self.remove_on_failure => {
149 if let Err(e) = self.repository.remove(&key).await {
150 tracing::warn!(
151 error = %e,
152 repository = %self.repository.name(),
153 key = %key,
154 "idempotent repository remove failed on failure rollback"
155 );
156 }
157 }
158 _ => {}
159 }
160
161 outcome
162 })
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use async_trait::async_trait;
170 use camel_api::{CamelError, Exchange, Message, Value};
171 use std::collections::HashSet;
172 use tokio::sync::Mutex;
173
174 #[derive(Debug, Default)]
179 struct MockRepo {
180 keys: Mutex<HashSet<String>>,
181 fail_contains: bool,
182 }
183
184 impl MockRepo {
185 fn new() -> Self {
186 Self::default()
187 }
188
189 fn failing_contains() -> Self {
190 Self {
191 keys: Mutex::new(HashSet::new()),
192 fail_contains: true,
193 }
194 }
195
196 async fn pre_add(&self, key: &str) {
197 self.keys.lock().await.insert(key.to_string());
198 }
199
200 async fn contains_key(&self, key: &str) -> bool {
201 self.keys.lock().await.contains(key)
202 }
203 }
204
205 #[async_trait]
206 impl IdempotentRepository for MockRepo {
207 fn name(&self) -> &str {
208 "mock"
209 }
210
211 async fn contains(&self, key: &str) -> Result<bool, CamelError> {
212 if self.fail_contains {
213 return Err(CamelError::ProcessorError(
214 "synthetic contains failure".into(),
215 ));
216 }
217 Ok(self.keys.lock().await.contains(key))
218 }
219
220 async fn add(&self, key: &str) -> Result<bool, CamelError> {
221 let mut guard = self.keys.lock().await;
222 Ok(guard.insert(key.to_string()))
223 }
224
225 async fn remove(&self, key: &str) -> Result<(), CamelError> {
226 self.keys.lock().await.remove(key);
227 Ok(())
228 }
229
230 async fn clear(&self) -> Result<(), CamelError> {
231 self.keys.lock().await.clear();
232 Ok(())
233 }
234 }
235
236 struct ScriptedChild {
239 outcome: PipelineOutcome,
240 invoked: Arc<std::sync::atomic::AtomicBool>,
241 }
242
243 impl OutcomePipeline for ScriptedChild {
244 fn clone_box(&self) -> Box<dyn OutcomePipeline> {
245 unreachable!("clone_box not used in idempotent_consumer tests")
248 }
249
250 fn run<'a>(
251 &'a mut self,
252 exchange: Exchange,
253 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
254 self.invoked
255 .store(true, std::sync::atomic::Ordering::SeqCst);
256 let outcome = std::mem::replace(
257 &mut self.outcome,
258 PipelineOutcome::Completed(Exchange::new(Message::new(""))),
259 );
260 Box::pin(async move { outcome_with_exchange(outcome, exchange) })
261 }
262 }
263
264 fn outcome_with_exchange(outcome: PipelineOutcome, exchange: Exchange) -> PipelineOutcome {
267 match outcome {
268 PipelineOutcome::Completed(_) => PipelineOutcome::Completed(exchange),
269 PipelineOutcome::Stopped(_) => PipelineOutcome::Stopped(exchange),
270 PipelineOutcome::Failed(e) => PipelineOutcome::Failed(e),
271 }
272 }
273
274 fn exchange_with_id(id: &str) -> Exchange {
275 let mut ex = Exchange::new(Message::new("payload"));
276 ex.input.set_header("messageId", Value::String(id.into()));
277 ex
278 }
279
280 fn header_message_id() -> MessageIdExpression {
281 Arc::new(|ex: &Exchange| {
282 ex.input
283 .header("messageId")
284 .and_then(|v| v.as_str().map(|s| s.to_string()))
285 })
286 }
287
288 fn build_segment(
289 repo: Arc<MockRepo>,
290 child_outcome: PipelineOutcome,
291 eager: bool,
292 remove_on_failure: bool,
293 ) -> (
294 IdempotentConsumerSegment,
295 Arc<std::sync::atomic::AtomicBool>,
296 ) {
297 let invoked = Arc::new(std::sync::atomic::AtomicBool::new(false));
298 let child = ScriptedChild {
299 outcome: child_outcome,
300 invoked: invoked.clone(),
301 };
302 let segment = IdempotentConsumerSegment::new(
303 repo,
304 header_message_id(),
305 OutcomeSegment::new(Box::new(child)),
306 eager,
307 remove_on_failure,
308 );
309 (segment, invoked)
310 }
311
312 #[tokio::test]
314 async fn duplicate_key_returns_completed_without_running_child() {
315 let repo = Arc::new(MockRepo::new());
316 repo.pre_add("dup-1").await;
317 let (mut segment, child_invoked) = build_segment(
318 repo.clone(),
319 PipelineOutcome::Failed(stub_error()),
320 false,
321 false,
322 );
323
324 let ex = exchange_with_id("dup-1");
325 let outcome = segment.run(ex).await;
326
327 assert!(matches!(outcome, PipelineOutcome::Completed(_)));
328 assert!(
329 !child_invoked.load(std::sync::atomic::Ordering::SeqCst),
330 "child must NOT run when key is a duplicate"
331 );
332 assert!(repo.contains_key("dup-1").await);
334 }
335
336 #[tokio::test]
338 async fn new_key_runs_child_and_returns_child_outcome() {
339 let repo = Arc::new(MockRepo::new());
340 let (mut segment, child_invoked) = build_segment(
341 repo.clone(),
342 PipelineOutcome::Completed(Exchange::new(Message::new(""))),
343 false,
344 false,
345 );
346
347 let ex = exchange_with_id("new-1");
348 let outcome = segment.run(ex).await;
349
350 assert!(matches!(outcome, PipelineOutcome::Completed(_)));
351 assert!(
352 child_invoked.load(std::sync::atomic::Ordering::SeqCst),
353 "child MUST run when key is new"
354 );
355 assert!(
357 repo.contains_key("new-1").await,
358 "non-eager mode must add key after successful child run"
359 );
360 }
361
362 #[tokio::test]
364 async fn failed_repo_read_propagates_error() {
365 let repo = Arc::new(MockRepo::failing_contains());
366 let (mut segment, child_invoked) = build_segment(
367 repo,
368 PipelineOutcome::Completed(stub_exchange()),
369 false,
370 false,
371 );
372
373 let ex = exchange_with_id("any");
374 let outcome = segment.run(ex).await;
375
376 match outcome {
377 PipelineOutcome::Failed(e) => {
378 let msg = e.to_string();
379 assert!(
380 msg.contains("synthetic contains failure"),
381 "expected synthetic failure in error, got: {msg}"
382 );
383 }
384 other => panic!("expected Failed, got {other:?}"),
385 }
386 assert!(
387 !child_invoked.load(std::sync::atomic::Ordering::SeqCst),
388 "child must NOT run when repo read fails"
389 );
390 }
391
392 #[tokio::test]
394 async fn stopped_child_propagates_stopped() {
395 let repo = Arc::new(MockRepo::new());
396 let (mut segment, child_invoked) = build_segment(
397 repo.clone(),
398 PipelineOutcome::Stopped(stub_exchange()),
399 false,
400 false,
401 );
402
403 let ex = exchange_with_id("stop-1");
404 let outcome = segment.run(ex).await;
405
406 assert!(
407 matches!(outcome, PipelineOutcome::Stopped(_)),
408 "Stopped from child MUST propagate as Stopped (segment-mode contract)"
409 );
410 assert!(
411 child_invoked.load(std::sync::atomic::Ordering::SeqCst),
412 "child must run before its Stopped can propagate"
413 );
414 assert!(
416 !repo.contains_key("stop-1").await,
417 "Stopped outcome must not register the key"
418 );
419 }
420
421 #[tokio::test]
423 async fn eager_mode_removes_key_on_failure_when_configured() {
424 let repo = Arc::new(MockRepo::new());
425 let (mut segment, _child_invoked) = build_segment(
426 repo.clone(),
427 PipelineOutcome::Failed(stub_error()),
428 true, true, );
431
432 let ex = exchange_with_id("eager-fail");
433 let outcome = segment.run(ex).await;
434
435 assert!(matches!(outcome, PipelineOutcome::Failed(_)));
436 assert!(
437 !repo.contains_key("eager-fail").await,
438 "eager + remove_on_failure must roll back the key on failure"
439 );
440 }
441
442 #[tokio::test]
444 async fn missing_message_id_forwards_to_child() {
445 let repo = Arc::new(MockRepo::new());
446 let (mut segment, child_invoked) = build_segment(
447 repo.clone(),
448 PipelineOutcome::Completed(stub_exchange()),
449 false,
450 false,
451 );
452
453 let ex = Exchange::new(Message::new("no-id"));
455 let outcome = segment.run(ex).await;
456
457 assert!(matches!(outcome, PipelineOutcome::Completed(_)));
458 assert!(
459 child_invoked.load(std::sync::atomic::Ordering::SeqCst),
460 "child must run when message-id cannot be extracted"
461 );
462 }
463
464 fn stub_exchange() -> Exchange {
465 Exchange::new(Message::new(""))
466 }
467
468 fn stub_error() -> CamelError {
469 CamelError::ProcessorError("child failed".into())
470 }
471}