1use std::future::Future;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use tokio::task::JoinSet;
10
11use camel_api::{
12 AggregationStrategy, Body, CamelError, Exchange, OutcomeSegment, PipelineOutcome,
13 SplitExpression, Value,
14};
15
16pub(crate) fn aggregate_completed(
23 completed: Vec<Exchange>,
24 original: Exchange,
25 strategy: AggregationStrategy,
26) -> Exchange {
27 match strategy {
28 AggregationStrategy::LastWins => completed.into_iter().last().unwrap_or(original),
29 AggregationStrategy::CollectAll => {
30 let mut bodies = Vec::new();
31 for ex in &completed {
32 let value = match &ex.input.body {
33 Body::Text(s) => Value::String(s.clone()),
34 Body::Json(v) => v.clone(),
35 Body::Xml(s) => Value::String(s.clone()),
36 Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
37 Body::Empty => Value::Null,
38 Body::Stream(s) => serde_json::json!({
39 "_stream": {
40 "origin": s.metadata.origin,
41 "placeholder": true,
42 "hint": "Materialize exchange body with .into_bytes() before aggregation if content needed"
43 }
44 }),
45 };
46 bodies.push(value);
47 }
48 let mut out = original;
49 out.input.body = Body::Json(Value::Array(bodies));
50 out
51 }
52 AggregationStrategy::Original => original,
53 AggregationStrategy::Custom(fold_fn) => {
54 let mut iter = completed.into_iter();
55 let first = iter.next().unwrap_or(original);
56 iter.fold(first, |acc, next| fold_fn(acc, next))
57 }
58 }
59}
60
61pub struct SplitSegment {
82 pub splitter: SplitExpression,
84 pub body: OutcomeSegment,
86 pub parallel: bool,
88 pub parallel_limit: Option<usize>,
90 pub stop_on_exception: bool,
100 pub aggregation: AggregationStrategy,
102}
103
104impl Clone for SplitSegment {
105 fn clone(&self) -> Self {
106 Self {
107 splitter: Arc::clone(&self.splitter),
108 body: self.body.clone(),
109 parallel: self.parallel,
110 parallel_limit: self.parallel_limit,
111 stop_on_exception: self.stop_on_exception,
112 aggregation: self.aggregation.clone(),
113 }
114 }
115}
116
117impl camel_api::OutcomePipeline for SplitSegment {
118 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
119 Box::new(self.clone())
120 }
121
122 fn run<'a>(
123 &'a mut self,
124 exchange: camel_api::Exchange,
125 ) -> Pin<Box<dyn Future<Output = camel_api::PipelineOutcome> + Send + 'a>> {
126 let splitter = Arc::clone(&self.splitter);
127 let aggregation = self.aggregation.clone();
128 let parallel = self.parallel;
129 let parallel_limit = self.parallel_limit;
130 let stop_on_exception = self.stop_on_exception;
131 let body = &mut self.body;
132
133 Box::pin(async move {
134 let original = exchange;
135 let fragments = splitter(&original);
136
137 if fragments.is_empty() {
138 return PipelineOutcome::Completed(original);
139 }
140
141 if parallel {
142 parallel_split(
143 fragments,
144 original,
145 body,
146 &aggregation,
147 parallel_limit,
148 stop_on_exception,
149 )
150 .await
151 } else {
152 sequential_split(fragments, original, body, &aggregation, stop_on_exception).await
153 }
154 })
155 }
156}
157
158async fn sequential_split(
161 fragments: Vec<Exchange>,
162 original: Exchange,
163 body: &mut OutcomeSegment,
164 aggregation: &AggregationStrategy,
165 stop_on_exception: bool,
166) -> PipelineOutcome {
167 let mut outputs = Vec::new();
168 let mut last_error: Option<CamelError> = None;
169 for frag in fragments {
170 match body.run(frag).await {
171 PipelineOutcome::Completed(ex) => outputs.push(ex),
172 PipelineOutcome::Stopped(ex) => return PipelineOutcome::Stopped(ex),
173 PipelineOutcome::Failed(err) => {
174 if stop_on_exception {
175 return PipelineOutcome::Failed(err);
176 }
177 last_error = Some(err);
179 }
180 }
181 }
182 if let Some(err) = last_error {
183 return PipelineOutcome::Failed(err);
184 }
185 PipelineOutcome::Completed(aggregate_completed(outputs, original, aggregation.clone()))
186}
187
188async fn parallel_split(
195 fragments: Vec<Exchange>,
196 original: Exchange,
197 body: &mut OutcomeSegment,
198 aggregation: &AggregationStrategy,
199 parallel_limit: Option<usize>,
200 stop_on_exception: bool,
201) -> PipelineOutcome {
202 use tokio::sync::Semaphore;
203
204 let stopped_seen = Arc::new(AtomicBool::new(false));
205 let stopped_idx = Arc::new(AtomicUsize::new(usize::MAX));
206 let aggregation = aggregation.clone();
207 let semaphore = parallel_limit
208 .filter(|&limit| limit > 0)
209 .map(|limit| Arc::new(Semaphore::new(limit)));
210
211 let mut set: JoinSet<(usize, Option<PipelineOutcome>)> = JoinSet::new();
212
213 for (idx, frag) in fragments.into_iter().enumerate() {
214 let mut body = body.clone();
215 let stopped_seen = Arc::clone(&stopped_seen);
216 let stopped_idx = Arc::clone(&stopped_idx);
217 let sem = semaphore.clone();
218 set.spawn(async move {
219 if stopped_seen.load(Ordering::SeqCst) {
224 return (idx, None);
225 }
226 let _permit: Option<tokio::sync::OwnedSemaphorePermit> = match &sem {
228 Some(s) => match std::sync::Arc::clone(s).acquire_owned().await {
229 Ok(p) => Some(p),
230 Err(_) => {
231 return (
232 idx,
233 Some(PipelineOutcome::Failed(CamelError::ProcessorError(
234 "semaphore closed".into(),
235 ))),
236 );
237 }
238 },
239 None => None,
240 };
241 if stopped_seen.load(Ordering::SeqCst) {
244 return (idx, None);
245 }
246 let outcome = body.run(frag).await;
247 if let PipelineOutcome::Stopped(_) = &outcome {
248 loop {
253 let cur = stopped_idx.load(Ordering::SeqCst);
254 if idx >= cur {
255 break; }
257 match stopped_idx.compare_exchange_weak(
258 cur,
259 idx,
260 Ordering::SeqCst,
261 Ordering::SeqCst,
262 ) {
263 Ok(_) => break,
264 Err(actual) => {
265 if actual <= idx {
268 break;
269 }
270 }
271 }
272 }
273 stopped_seen.store(true, Ordering::SeqCst);
274 }
275 (idx, Some(outcome))
276 });
277 }
278
279 let mut results: Vec<(usize, PipelineOutcome)> = Vec::new();
283 while let Some(res) = set.join_next().await {
284 if let Ok((idx, Some(o))) = res {
285 results.push((idx, o));
286 }
287 }
288
289 if stopped_seen.load(Ordering::SeqCst) {
291 let winning_idx = stopped_idx.load(Ordering::SeqCst);
292 if winning_idx == usize::MAX {
293 tracing::warn!(
294 target: "camel.phase4.split",
295 "stopped_seen=true but stopped_idx=usize::MAX — race condition; falling back to pre-split exchange"
296 );
297 return PipelineOutcome::Stopped(original);
298 }
299 let stopped_ex = results
300 .iter()
301 .find(|(idx, _)| *idx == winning_idx)
302 .and_then(|(_, o)| match o {
303 PipelineOutcome::Stopped(ex) => Some(ex.clone()),
304 _ => None,
305 });
306 if let Some(ex) = stopped_ex {
307 return PipelineOutcome::Stopped(ex);
308 }
309 tracing::warn!(
310 target: "camel.phase4.split",
311 winning_idx = winning_idx,
312 "winning_idx not found in results — falling back to pre-split exchange"
313 );
314 return PipelineOutcome::Stopped(original);
315 }
316
317 results.sort_by_key(|(idx, _)| *idx);
321 if stop_on_exception {
322 let mut first_failed: Option<(usize, CamelError)> = None;
323 for (idx, o) in &results {
324 if let PipelineOutcome::Failed(err) = o
325 && first_failed
326 .as_ref()
327 .map(|(i, _)| *i > *idx)
328 .unwrap_or(true)
329 {
330 first_failed = Some((*idx, err.clone()));
331 }
332 }
333 if let Some((_, err)) = first_failed {
334 return PipelineOutcome::Failed(err);
335 }
336 } else {
337 let mut last_error: Option<CamelError> = None;
339 for (_, o) in &results {
340 if let PipelineOutcome::Failed(err) = o {
341 last_error = Some(err.clone());
342 }
343 }
344 if let Some(err) = last_error {
345 return PipelineOutcome::Failed(err);
346 }
347 }
348
349 let completed: Vec<Exchange> = results
351 .into_iter()
352 .filter_map(|(_, o)| match o {
353 PipelineOutcome::Completed(ex) => Some(ex),
354 _ => None,
355 })
356 .collect();
357 PipelineOutcome::Completed(aggregate_completed(completed, original, aggregation))
358}
359
360#[cfg(test)]
363mod tests {
364 use super::*;
365 use camel_api::Message;
366
367 #[derive(Clone)]
371 struct CompletedBody;
372 impl camel_api::OutcomePipeline for CompletedBody {
373 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
374 Box::new(CompletedBody)
375 }
376 fn run<'a>(
377 &'a mut self,
378 exchange: Exchange,
379 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
380 Box::pin(async move { PipelineOutcome::Completed(exchange) })
381 }
382 }
383
384 #[derive(Clone)]
386 struct StopBody;
387 impl camel_api::OutcomePipeline for StopBody {
388 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
389 Box::new(StopBody)
390 }
391 fn run<'a>(
392 &'a mut self,
393 exchange: Exchange,
394 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
395 Box::pin(async move { PipelineOutcome::Stopped(exchange) })
396 }
397 }
398
399 #[derive(Clone)]
401 struct StopOnNthBody {
402 counter: Arc<AtomicUsize>,
403 stop_at: usize,
404 }
405 impl camel_api::OutcomePipeline for StopOnNthBody {
406 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
407 Box::new(self.clone())
408 }
409 fn run<'a>(
410 &'a mut self,
411 exchange: Exchange,
412 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
413 let count = self.counter.fetch_add(1, Ordering::SeqCst);
414 let stop_at = self.stop_at;
415 Box::pin(async move {
416 if count >= stop_at {
417 PipelineOutcome::Stopped(exchange)
418 } else {
419 PipelineOutcome::Completed(exchange)
420 }
421 })
422 }
423 }
424
425 #[derive(Clone)]
427 struct MutateAndStopBody;
428 impl camel_api::OutcomePipeline for MutateAndStopBody {
429 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
430 Box::new(MutateAndStopBody)
431 }
432 fn run<'a>(
433 &'a mut self,
434 mut exchange: Exchange,
435 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
436 Box::pin(async move {
437 exchange.input.body = Body::Text("mutated-by-body".to_string());
438 PipelineOutcome::Stopped(exchange)
439 })
440 }
441 }
442
443 #[tokio::test]
446 async fn stop_inside_split_sequential_halts_remaining_fragments() {
447 let invocations = Arc::new(AtomicUsize::new(0));
448 let body = StopOnNthBody {
449 counter: Arc::clone(&invocations),
450 stop_at: 1, };
452
453 let mut seg = SplitSegment {
454 splitter: camel_api::split_body_lines(),
455 body: OutcomeSegment::new(Box::new(body)),
456 parallel: false,
457 parallel_limit: None,
458 stop_on_exception: true,
459 aggregation: AggregationStrategy::LastWins,
460 };
461
462 let ex = Exchange::new(Message::new("a\nb\nc"));
463 let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
464
465 assert!(matches!(result, PipelineOutcome::Stopped(_)));
466 assert_eq!(invocations.load(Ordering::SeqCst), 2);
468 }
469
470 #[tokio::test]
473 async fn stop_inside_split_sequential_preserves_exchange_mutations() {
474 let mut seg = SplitSegment {
475 splitter: camel_api::split_body_lines(),
476 body: OutcomeSegment::new(Box::new(MutateAndStopBody)),
477 parallel: false,
478 parallel_limit: None,
479 stop_on_exception: true,
480 aggregation: AggregationStrategy::LastWins,
481 };
482
483 let ex = Exchange::new(Message::new("hello"));
484 let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
485
486 match result {
487 PipelineOutcome::Stopped(ex) => {
488 assert_eq!(
489 ex.input.body.as_text(),
490 Some("mutated-by-body"),
491 "Stopped exchange should carry body mutation"
492 );
493 }
494 other => panic!("Expected Stopped, got {other:?}"),
495 }
496 }
497
498 #[tokio::test(flavor = "multi_thread")]
509 async fn stop_inside_split_parallel_cancels_pending_and_waits_inflight() {
510 use tokio::sync::Barrier;
511
512 let barrier = Arc::new(Barrier::new(3));
513 let fragment1_completed = Arc::new(AtomicBool::new(false));
514 let fragment2_completed = Arc::new(AtomicBool::new(false));
515 let frag1_ok = Arc::clone(&fragment1_completed);
516 let frag2_ok = Arc::clone(&fragment2_completed);
517 let bar = Arc::clone(&barrier);
518
519 let splitter: SplitExpression = Arc::new(|ex: &Exchange| {
521 (0..3)
522 .map(|i| {
523 let mut frag = ex.clone();
524 frag.input.body = Body::Text(format!("frag-{i}"));
525 frag
526 })
527 .collect()
528 });
529
530 struct BarrierDispatchBody {
536 barrier: Arc<Barrier>,
537 f1_completed: Arc<AtomicBool>,
538 f2_completed: Arc<AtomicBool>,
539 }
540 impl Clone for BarrierDispatchBody {
541 fn clone(&self) -> Self {
542 Self {
543 barrier: Arc::clone(&self.barrier),
544 f1_completed: Arc::clone(&self.f1_completed),
545 f2_completed: Arc::clone(&self.f2_completed),
546 }
547 }
548 }
549 impl camel_api::OutcomePipeline for BarrierDispatchBody {
550 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
551 Box::new(self.clone())
552 }
553 fn run<'a>(
554 &'a mut self,
555 exchange: Exchange,
556 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
557 let bar = Arc::clone(&self.barrier);
558 let f1c = Arc::clone(&self.f1_completed);
559 let f2c = Arc::clone(&self.f2_completed);
560 Box::pin(async move {
561 let body_text = exchange.input.body.as_text().unwrap_or("").to_string();
562
563 bar.wait().await;
567
568 match body_text.as_str() {
569 "frag-0" => PipelineOutcome::Stopped(exchange),
570 "frag-1" => {
571 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
574 f1c.store(true, Ordering::SeqCst);
575 PipelineOutcome::Completed(exchange)
576 }
577 "frag-2" => {
578 f2c.store(true, Ordering::SeqCst);
579 PipelineOutcome::Completed(exchange)
580 }
581 _ => PipelineOutcome::Completed(exchange),
582 }
583 })
584 }
585 }
586
587 let body = BarrierDispatchBody {
588 barrier: bar,
589 f1_completed: frag1_ok,
590 f2_completed: frag2_ok,
591 };
592
593 let mut seg = SplitSegment {
594 splitter,
595 body: OutcomeSegment::new(Box::new(body)),
596 parallel: true,
597 parallel_limit: None,
598 stop_on_exception: true,
599 aggregation: AggregationStrategy::LastWins,
600 };
601
602 let ex = Exchange::new(Message::new("test"));
603 let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
604
605 assert!(
606 matches!(result, PipelineOutcome::Stopped(_)),
607 "Expected Stopped, got {result:?}"
608 );
609 assert!(
611 fragment1_completed.load(Ordering::SeqCst),
612 "fragment 1 should have completed despite Stop"
613 );
614 assert!(
616 fragment2_completed.load(Ordering::SeqCst),
617 "fragment 2 should have completed despite Stop"
618 );
619 }
620
621 #[tokio::test(flavor = "multi_thread")]
624 async fn stop_inside_split_parallel_lowest_stopped_index_wins() {
625 let splitter: SplitExpression = Arc::new(|ex: &Exchange| {
627 (0..3)
628 .map(|i| {
629 let mut frag = ex.clone();
630 frag.input.body = Body::Text(format!("from-fragment-{i}"));
631 frag
632 })
633 .collect()
634 });
635
636 struct DualStopBody;
638 impl Clone for DualStopBody {
639 fn clone(&self) -> Self {
640 DualStopBody
641 }
642 }
643 impl camel_api::OutcomePipeline for DualStopBody {
644 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
645 Box::new(DualStopBody)
646 }
647 fn run<'a>(
648 &'a mut self,
649 exchange: Exchange,
650 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
651 let is_frag0 = exchange
652 .input
653 .body
654 .as_text()
655 .map(|s| s == "from-fragment-0")
656 .unwrap_or(false);
657 let is_frag2 = exchange
658 .input
659 .body
660 .as_text()
661 .map(|s| s == "from-fragment-2")
662 .unwrap_or(false);
663 Box::pin(async move {
664 if is_frag0 {
665 return PipelineOutcome::Stopped(exchange);
666 }
667 if is_frag2 {
668 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
671 return PipelineOutcome::Stopped(exchange);
672 }
673 PipelineOutcome::Completed(exchange)
675 })
676 }
677 }
678
679 let mut seg = SplitSegment {
680 splitter,
681 body: OutcomeSegment::new(Box::new(DualStopBody)),
682 parallel: true,
683 parallel_limit: None,
684 stop_on_exception: true,
685 aggregation: AggregationStrategy::LastWins,
686 };
687
688 let ex = Exchange::new(Message::new("test"));
689 let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
690
691 match result {
692 PipelineOutcome::Stopped(ex) => {
693 assert_eq!(
694 ex.input.body.as_text(),
695 Some("from-fragment-0"),
696 "Lowest stopped index (0) should win, got body {:?}",
697 ex.input.body.as_text()
698 );
699 }
700 other => panic!("Expected Stopped with fragment-0 body, got {other:?}"),
701 }
702 }
703
704 #[tokio::test(flavor = "multi_thread")]
707 async fn split_parallel_limit_enforces_concurrency_cap() {
708 let concurrent = Arc::new(AtomicUsize::new(0));
709 let max_concurrent = Arc::new(AtomicUsize::new(0));
710
711 let splitter: SplitExpression = Arc::new(|ex: &Exchange| {
713 (0..6)
714 .map(|i| {
715 let mut frag = ex.clone();
716 frag.input.body = Body::Text(format!("frag-{i}"));
717 frag
718 })
719 .collect()
720 });
721
722 let c = Arc::clone(&concurrent);
723 let mc = Arc::clone(&max_concurrent);
724 struct LimitedBody {
725 concurrent: Arc<AtomicUsize>,
726 max_concurrent: Arc<AtomicUsize>,
727 }
728 impl Clone for LimitedBody {
729 fn clone(&self) -> Self {
730 Self {
731 concurrent: Arc::clone(&self.concurrent),
732 max_concurrent: Arc::clone(&self.max_concurrent),
733 }
734 }
735 }
736 impl camel_api::OutcomePipeline for LimitedBody {
737 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
738 Box::new(self.clone())
739 }
740 fn run<'a>(
741 &'a mut self,
742 exchange: Exchange,
743 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
744 let c = Arc::clone(&self.concurrent);
745 let mc = Arc::clone(&self.max_concurrent);
746 Box::pin(async move {
747 let current = c.fetch_add(1, Ordering::SeqCst) + 1;
748 mc.fetch_max(current, Ordering::SeqCst);
749 tokio::task::yield_now().await;
750 c.fetch_sub(1, Ordering::SeqCst);
751 PipelineOutcome::Completed(exchange)
752 })
753 }
754 }
755
756 let mut seg = SplitSegment {
757 splitter,
758 body: OutcomeSegment::new(Box::new(LimitedBody {
759 concurrent: c,
760 max_concurrent: mc,
761 })),
762 parallel: true,
763 parallel_limit: Some(2),
764 stop_on_exception: true,
765 aggregation: AggregationStrategy::LastWins,
766 };
767
768 let ex = Exchange::new(Message::new("test"));
769 let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
770 assert!(
771 matches!(result, PipelineOutcome::Completed(_)),
772 "Expected Completed, got {result:?}"
773 );
774
775 let observed_max = max_concurrent.load(Ordering::SeqCst);
776 assert!(
777 observed_max <= 2,
778 "parallel_limit=2 but max concurrency was {observed_max}"
779 );
780 }
781
782 #[tokio::test]
785 async fn split_sequential_stop_on_exception_true() {
786 fn make_fail_body(
788 fail_at: usize,
789 counter: Arc<AtomicUsize>,
790 ) -> impl camel_api::OutcomePipeline + Clone {
791 #[derive(Clone)]
792 struct FailAtBody {
793 fail_at: usize,
794 counter: Arc<AtomicUsize>,
795 }
796 impl camel_api::OutcomePipeline for FailAtBody {
797 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
798 Box::new(self.clone())
799 }
800 fn run<'a>(
801 &'a mut self,
802 exchange: Exchange,
803 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
804 let count = self.counter.fetch_add(1, Ordering::SeqCst);
805 let fail_at = self.fail_at;
806 Box::pin(async move {
807 if count == fail_at {
808 PipelineOutcome::Failed(CamelError::ProcessorError(format!(
809 "fail at {count}"
810 )))
811 } else {
812 PipelineOutcome::Completed(exchange)
813 }
814 })
815 }
816 }
817 FailAtBody { fail_at, counter }
818 }
819
820 let invocations = Arc::new(AtomicUsize::new(0));
821 let body = make_fail_body(1, Arc::clone(&invocations));
822 let mut seg = SplitSegment {
823 splitter: camel_api::split_body_lines(),
824 body: OutcomeSegment::new(Box::new(body)),
825 parallel: false,
826 parallel_limit: None,
827 stop_on_exception: true,
828 aggregation: AggregationStrategy::LastWins,
829 };
830
831 let ex = Exchange::new(Message::new("a\nb\nc\nd\ne"));
832 let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
833
834 assert!(
835 matches!(result, PipelineOutcome::Failed(_)),
836 "stop_on_exception=true should propagate first failure"
837 );
838 assert_eq!(
841 invocations.load(Ordering::SeqCst),
842 2,
843 "should stop after 2 fragments (0 pass, 1 fail)"
844 );
845 }
846
847 #[tokio::test]
850 async fn split_sequential_stop_on_exception_false() {
851 fn make_fail_body(
853 fail_at: usize,
854 counter: Arc<AtomicUsize>,
855 ) -> impl camel_api::OutcomePipeline + Clone {
856 #[derive(Clone)]
857 struct FailAtBody {
858 fail_at: usize,
859 counter: Arc<AtomicUsize>,
860 }
861 impl camel_api::OutcomePipeline for FailAtBody {
862 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
863 Box::new(self.clone())
864 }
865 fn run<'a>(
866 &'a mut self,
867 exchange: Exchange,
868 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
869 let count = self.counter.fetch_add(1, Ordering::SeqCst);
870 let fail_at = self.fail_at;
871 Box::pin(async move {
872 if count == fail_at {
873 PipelineOutcome::Failed(CamelError::ProcessorError(format!(
874 "fail at {count}"
875 )))
876 } else {
877 PipelineOutcome::Completed(exchange)
878 }
879 })
880 }
881 }
882 FailAtBody { fail_at, counter }
883 }
884
885 let invocations = Arc::new(AtomicUsize::new(0));
886 let body = make_fail_body(1, Arc::clone(&invocations));
887 let mut seg = SplitSegment {
888 splitter: camel_api::split_body_lines(),
889 body: OutcomeSegment::new(Box::new(body)),
890 parallel: false,
891 parallel_limit: None,
892 stop_on_exception: false,
893 aggregation: AggregationStrategy::LastWins,
894 };
895
896 let ex = Exchange::new(Message::new("a\nb\nc\nd\ne"));
897 let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
898
899 assert!(
902 matches!(result, PipelineOutcome::Failed(_)),
903 "stop_on_exception=false should still propagate error at end"
904 );
905 assert_eq!(
907 invocations.load(Ordering::SeqCst),
908 5,
909 "all fragments should be processed when stop_on_exception=false"
910 );
911 }
912
913 #[tokio::test(flavor = "multi_thread")]
916 async fn split_parallel_stop_on_exception_true() {
917 let splitter: SplitExpression = Arc::new(|ex: &Exchange| {
918 (0..5)
919 .map(|i| {
920 let mut frag = ex.clone();
921 frag.input.body = Body::Text(format!("frag-{i}"));
922 frag
923 })
924 .collect()
925 });
926
927 let invocations = Arc::new(AtomicUsize::new(0));
929 struct FailBody {
930 counter: Arc<AtomicUsize>,
931 }
932 impl Clone for FailBody {
933 fn clone(&self) -> Self {
934 Self {
935 counter: Arc::clone(&self.counter),
936 }
937 }
938 }
939 impl camel_api::OutcomePipeline for FailBody {
940 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
941 Box::new(self.clone())
942 }
943 fn run<'a>(
944 &'a mut self,
945 exchange: Exchange,
946 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
947 let count = self.counter.fetch_add(1, Ordering::SeqCst);
948 Box::pin(async move {
949 PipelineOutcome::Failed(CamelError::ProcessorError(format!("fail {count}")))
950 })
951 }
952 }
953
954 let mut seg = SplitSegment {
955 splitter,
956 body: OutcomeSegment::new(Box::new(FailBody {
957 counter: Arc::clone(&invocations),
958 })),
959 parallel: true,
960 parallel_limit: None,
961 stop_on_exception: true,
962 aggregation: AggregationStrategy::LastWins,
963 };
964
965 let ex = Exchange::new(Message::new("test"));
966 let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
967
968 assert!(
969 matches!(result, PipelineOutcome::Failed(_)),
970 "stop_on_exception=true should propagate first failure"
971 );
972 assert_eq!(
974 invocations.load(Ordering::SeqCst),
975 5,
976 "all fragments should be spawned"
977 );
978 }
979
980 #[tokio::test(flavor = "multi_thread")]
983 async fn split_parallel_stop_on_exception_false() {
984 let splitter: SplitExpression = Arc::new(|ex: &Exchange| {
985 (0..5)
986 .map(|i| {
987 let mut frag = ex.clone();
988 frag.input.body = Body::Text(format!("frag-{i}"));
989 frag
990 })
991 .collect()
992 });
993
994 let invocations = Arc::new(AtomicUsize::new(0));
996 struct MixedBody {
997 counter: Arc<AtomicUsize>,
998 }
999 impl Clone for MixedBody {
1000 fn clone(&self) -> Self {
1001 Self {
1002 counter: Arc::clone(&self.counter),
1003 }
1004 }
1005 }
1006 impl camel_api::OutcomePipeline for MixedBody {
1007 fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
1008 Box::new(self.clone())
1009 }
1010 fn run<'a>(
1011 &'a mut self,
1012 exchange: Exchange,
1013 ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
1014 let count = self.counter.fetch_add(1, Ordering::SeqCst);
1015 Box::pin(async move {
1016 if count == 1 {
1017 PipelineOutcome::Failed(CamelError::ProcessorError("fail 1".into()))
1018 } else {
1019 PipelineOutcome::Completed(exchange)
1020 }
1021 })
1022 }
1023 }
1024
1025 let mut seg = SplitSegment {
1026 splitter,
1027 body: OutcomeSegment::new(Box::new(MixedBody {
1028 counter: Arc::clone(&invocations),
1029 })),
1030 parallel: true,
1031 parallel_limit: None,
1032 stop_on_exception: false,
1033 aggregation: AggregationStrategy::LastWins,
1034 };
1035
1036 let ex = Exchange::new(Message::new("test"));
1037 let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
1038
1039 assert!(
1041 matches!(result, PipelineOutcome::Failed(_)),
1042 "stop_on_exception=false should propagate failure at end; got {result:?}"
1043 );
1044 assert_eq!(
1045 invocations.load(Ordering::SeqCst),
1046 5,
1047 "all fragments should be spawned"
1048 );
1049 }
1050}