Skip to main content

camel_processor/
split_segment.rs

1//! ## Stop semantics (ADR-0025)
2//!
3//! This segment implements `OutcomePipeline` and propagates `PipelineOutcome::Stopped(ex)` with the exchange state intact (including mutations made inside the segment body before Stop fired). See ADR-0025 §3 (stopped-exchange-state-preservation invariant).
4
5use std::future::Future;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
9use tokio::task::JoinSet;
10
11use camel_api::{
12    AggregationStrategy, Body, CamelError, Exchange, OutcomeSegment, PipelineOutcome,
13    SplitExpression, Value,
14};
15
16// ── aggregate_completed (SplitSegment helper) ─────────────────────────
17
18/// Aggregate completed fragment outputs into a single Exchange.
19///
20/// Unlike `aggregate` (which works on `Vec<Result<Exchange, CamelError>>`),
21/// this operates on `Vec<Exchange>` where all entries are `Completed` outcomes.
22pub(crate) fn aggregate_completed(
23    completed: Vec<Exchange>,
24    original: Exchange,
25    strategy: AggregationStrategy,
26) -> Exchange {
27    match strategy {
28        AggregationStrategy::LastWins => completed.into_iter().last().unwrap_or(original),
29        AggregationStrategy::CollectAll => {
30            let mut bodies = Vec::new();
31            for ex in &completed {
32                let value = match &ex.input.body {
33                    Body::Text(s) => Value::String(s.clone()),
34                    Body::Json(v) => v.clone(),
35                    Body::Xml(s) => Value::String(s.clone()),
36                    Body::Bytes(b) => Value::String(String::from_utf8_lossy(b).into_owned()),
37                    Body::Empty => Value::Null,
38                    Body::Stream(s) => serde_json::json!({
39                        "_stream": {
40                            "origin": s.metadata.origin,
41                            "placeholder": true,
42                            "hint": "Materialize exchange body with .into_bytes() before aggregation if content needed"
43                        }
44                    }),
45                };
46                bodies.push(value);
47            }
48            let mut out = original;
49            out.input.body = Body::Json(Value::Array(bodies));
50            out
51        }
52        AggregationStrategy::Original => original,
53        AggregationStrategy::Custom(fold_fn) => {
54            let mut iter = completed.into_iter();
55            let first = iter.next().unwrap_or(original);
56            iter.fold(first, |acc, next| fold_fn(acc, next))
57        }
58    }
59}
60
61// ── SplitSegment (ADR-0025 OutcomePipeline) ────────────────────────────
62
63/// Outcome-aware structural EIP segment for the Split pattern.
64///
65/// Splits an incoming exchange into fragments, processes each fragment through
66/// `body`, and aggregates the results. Supports sequential and parallel modes.
67///
68/// In sequential mode, fragments are processed in order. A `Stopped` or `Failed`
69/// outcome from any fragment halts processing immediately and propagates.
70///
71/// In parallel mode, all fragments are spawned as tokio tasks. The first
72/// `Stopped` outcome (lowest fragment index wins via CAS) propagates as the
73/// outer `Stopped`. In-flight tasks run to completion (spec §5.6: no abrupt
74/// abort — child sub-pipelines may have HTTP/SQL side effects). Tasks that
75/// have not started yet are short-circuited via a pre-start gate.
76///
77/// Unlike `SplitterService` (which operates at the Tower layer and cannot
78/// preserve `Stopped(ex)` with mutations), `SplitSegment` operates at the
79/// `PipelineOutcome` layer and preserves the exchange including all mutations
80/// at the Stop point.
81pub struct SplitSegment {
82    /// Splits an exchange into fragment exchanges.
83    pub splitter: SplitExpression,
84    /// The sub-pipeline executed for each fragment.
85    pub body: OutcomeSegment,
86    /// Whether to process fragments in parallel.
87    pub parallel: bool,
88    /// Maximum number of concurrent fragments in parallel mode (None = unlimited).
89    pub parallel_limit: Option<usize>,
90    /// Whether to stop processing on the first exception.
91    ///
92    /// When `true`, a `Failed` outcome from any fragment halts processing
93    /// immediately. When `false`, the error is collected and processing
94    /// continues; the last-seen error is propagated (last-wins, matching legacy multicast.rs::process_parallel)
95    /// is propagated after all fragments complete.
96    ///
97    /// `Stopped` outcomes always propagate immediately regardless of this
98    /// flag (per ADR-0025 §7 — Stop is successful control flow).
99    pub stop_on_exception: bool,
100    /// Strategy for aggregating fragment results.
101    pub aggregation: AggregationStrategy,
102}
103
104impl Clone for SplitSegment {
105    fn clone(&self) -> Self {
106        Self {
107            splitter: Arc::clone(&self.splitter),
108            body: self.body.clone(),
109            parallel: self.parallel,
110            parallel_limit: self.parallel_limit,
111            stop_on_exception: self.stop_on_exception,
112            aggregation: self.aggregation.clone(),
113        }
114    }
115}
116
117impl camel_api::OutcomePipeline for SplitSegment {
118    fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
119        Box::new(self.clone())
120    }
121
122    fn run<'a>(
123        &'a mut self,
124        exchange: camel_api::Exchange,
125    ) -> Pin<Box<dyn Future<Output = camel_api::PipelineOutcome> + Send + 'a>> {
126        let splitter = Arc::clone(&self.splitter);
127        let aggregation = self.aggregation.clone();
128        let parallel = self.parallel;
129        let parallel_limit = self.parallel_limit;
130        let stop_on_exception = self.stop_on_exception;
131        let body = &mut self.body;
132
133        Box::pin(async move {
134            let original = exchange;
135            let fragments = splitter(&original);
136
137            if fragments.is_empty() {
138                return PipelineOutcome::Completed(original);
139            }
140
141            if parallel {
142                parallel_split(
143                    fragments,
144                    original,
145                    body,
146                    &aggregation,
147                    parallel_limit,
148                    stop_on_exception,
149                )
150                .await
151            } else {
152                sequential_split(fragments, original, body, &aggregation, stop_on_exception).await
153            }
154        })
155    }
156}
157
158// ── Sequential split ────────────────────────────────────────────────────
159
160async fn sequential_split(
161    fragments: Vec<Exchange>,
162    original: Exchange,
163    body: &mut OutcomeSegment,
164    aggregation: &AggregationStrategy,
165    stop_on_exception: bool,
166) -> PipelineOutcome {
167    let mut outputs = Vec::new();
168    let mut last_error: Option<CamelError> = None;
169    for frag in fragments {
170        match body.run(frag).await {
171            PipelineOutcome::Completed(ex) => outputs.push(ex),
172            PipelineOutcome::Stopped(ex) => return PipelineOutcome::Stopped(ex),
173            PipelineOutcome::Failed(err) => {
174                if stop_on_exception {
175                    return PipelineOutcome::Failed(err);
176                }
177                // stop_on_exception=false: collect the error and continue.
178                last_error = Some(err);
179            }
180        }
181    }
182    if let Some(err) = last_error {
183        return PipelineOutcome::Failed(err);
184    }
185    PipelineOutcome::Completed(aggregate_completed(outputs, original, aggregation.clone()))
186}
187
188// ── Parallel split ──────────────────────────────────────────────────────
189
190/// Parallel split with lowest-index-wins CAS semantics.
191///
192/// See spec §5.2.2 line 497 for the CAS guarantee and §5.6 line 544 for the
193/// "no abrupt abort" in-flight task policy (pre-start gate + run-to-completion).
194async fn parallel_split(
195    fragments: Vec<Exchange>,
196    original: Exchange,
197    body: &mut OutcomeSegment,
198    aggregation: &AggregationStrategy,
199    parallel_limit: Option<usize>,
200    stop_on_exception: bool,
201) -> PipelineOutcome {
202    use tokio::sync::Semaphore;
203
204    let stopped_seen = Arc::new(AtomicBool::new(false));
205    let stopped_idx = Arc::new(AtomicUsize::new(usize::MAX));
206    let aggregation = aggregation.clone();
207    let semaphore = parallel_limit
208        .filter(|&limit| limit > 0)
209        .map(|limit| Arc::new(Semaphore::new(limit)));
210
211    let mut set: JoinSet<(usize, Option<PipelineOutcome>)> = JoinSet::new();
212
213    for (idx, frag) in fragments.into_iter().enumerate() {
214        let mut body = body.clone();
215        let stopped_seen = Arc::clone(&stopped_seen);
216        let stopped_idx = Arc::clone(&stopped_idx);
217        let sem = semaphore.clone();
218        set.spawn(async move {
219            // Pre-start gate: a lower-index branch already stopped.
220            // This is the ONLY cancellation check — once a body starts
221            // running, it runs to completion (spec §5.6: "in-flight futures
222            // MUST NOT be abruptly aborted").
223            if stopped_seen.load(Ordering::SeqCst) {
224                return (idx, None);
225            }
226            // Acquire semaphore permit if parallel_limit is set.
227            let _permit: Option<tokio::sync::OwnedSemaphorePermit> = match &sem {
228                Some(s) => match std::sync::Arc::clone(s).acquire_owned().await {
229                    Ok(p) => Some(p),
230                    Err(_) => {
231                        return (
232                            idx,
233                            Some(PipelineOutcome::Failed(CamelError::ProcessorError(
234                                "semaphore closed".into(),
235                            ))),
236                        );
237                    }
238                },
239                None => None,
240            };
241            // Re-check pre-start gate after permit acquisition
242            // (another branch may have stopped while we were waiting).
243            if stopped_seen.load(Ordering::SeqCst) {
244                return (idx, None);
245            }
246            let outcome = body.run(frag).await;
247            if let PipelineOutcome::Stopped(_) = &outcome {
248                // Lower-the-value CAS: ensures lowest-branch-index wins
249                // even under simultaneous Stop (spec §5.2.2 line 497).
250                // Loop until our idx is recorded or a lower idx has already
251                // claimed it.
252                loop {
253                    let cur = stopped_idx.load(Ordering::SeqCst);
254                    if idx >= cur {
255                        break; // a lower index already won
256                    }
257                    match stopped_idx.compare_exchange_weak(
258                        cur,
259                        idx,
260                        Ordering::SeqCst,
261                        Ordering::SeqCst,
262                    ) {
263                        Ok(_) => break,
264                        Err(actual) => {
265                            // CAS failed — `actual` is the new cur; loop and
266                            // retry with updated value.
267                            if actual <= idx {
268                                break;
269                            }
270                        }
271                    }
272                }
273                stopped_seen.store(true, Ordering::SeqCst);
274            }
275            (idx, Some(outcome))
276        });
277    }
278
279    // Wait for ALL in-flight branches to finish (spec §5.6: no abrupt
280    // abort). Post-stop outputs are discarded at aggregation, but the
281    // branches DO complete.
282    let mut results: Vec<(usize, PipelineOutcome)> = Vec::new();
283    while let Some(res) = set.join_next().await {
284        if let Ok((idx, Some(o))) = res {
285            results.push((idx, o));
286        }
287    }
288
289    // Deterministic lowest-branch-index wins (spec §5.2.2 line 497).
290    if stopped_seen.load(Ordering::SeqCst) {
291        let winning_idx = stopped_idx.load(Ordering::SeqCst);
292        if winning_idx == usize::MAX {
293            tracing::warn!(
294                target: "camel.phase4.split",
295                "stopped_seen=true but stopped_idx=usize::MAX — race condition; falling back to pre-split exchange"
296            );
297            return PipelineOutcome::Stopped(original);
298        }
299        let stopped_ex = results
300            .iter()
301            .find(|(idx, _)| *idx == winning_idx)
302            .and_then(|(_, o)| match o {
303                PipelineOutcome::Stopped(ex) => Some(ex.clone()),
304                _ => None,
305            });
306        if let Some(ex) = stopped_ex {
307            return PipelineOutcome::Stopped(ex);
308        }
309        tracing::warn!(
310            target: "camel.phase4.split",
311            winning_idx = winning_idx,
312            "winning_idx not found in results — falling back to pre-split exchange"
313        );
314        return PipelineOutcome::Stopped(original);
315    }
316
317    // No Stop — check for Failed.
318    // stop_on_exception=true: propagate first Failed (lowest branch index).
319    // stop_on_exception=false: collect last error (last-wins) and propagate at end.
320    results.sort_by_key(|(idx, _)| *idx);
321    if stop_on_exception {
322        let mut first_failed: Option<(usize, CamelError)> = None;
323        for (idx, o) in &results {
324            if let PipelineOutcome::Failed(err) = o
325                && first_failed
326                    .as_ref()
327                    .map(|(i, _)| *i > *idx)
328                    .unwrap_or(true)
329            {
330                first_failed = Some((*idx, err.clone()));
331            }
332        }
333        if let Some((_, err)) = first_failed {
334            return PipelineOutcome::Failed(err);
335        }
336    } else {
337        // Collect last error (last-wins, matching MulticastSegment semantics).
338        let mut last_error: Option<CamelError> = None;
339        for (_, o) in &results {
340            if let PipelineOutcome::Failed(err) = o {
341                last_error = Some(err.clone());
342            }
343        }
344        if let Some(err) = last_error {
345            return PipelineOutcome::Failed(err);
346        }
347    }
348
349    // All Completed — aggregate.
350    let completed: Vec<Exchange> = results
351        .into_iter()
352        .filter_map(|(_, o)| match o {
353            PipelineOutcome::Completed(ex) => Some(ex),
354            _ => None,
355        })
356        .collect();
357    PipelineOutcome::Completed(aggregate_completed(completed, original, aggregation))
358}
359
360// ── Tests ──────────────────────────────────────────────────────────────
361
362#[cfg(test)]
363mod tests {
364    use super::*;
365    use camel_api::Message;
366
367    // ── Test helpers ───────────────────────────────────────────────────
368
369    /// Helper: OutcomePipeline body that always returns `Completed(exchange)`.
370    #[derive(Clone)]
371    struct CompletedBody;
372    impl camel_api::OutcomePipeline for CompletedBody {
373        fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
374            Box::new(CompletedBody)
375        }
376        fn run<'a>(
377            &'a mut self,
378            exchange: Exchange,
379        ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
380            Box::pin(async move { PipelineOutcome::Completed(exchange) })
381        }
382    }
383
384    /// Helper: OutcomePipeline body that always returns `Stopped(exchange)`.
385    #[derive(Clone)]
386    struct StopBody;
387    impl camel_api::OutcomePipeline for StopBody {
388        fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
389            Box::new(StopBody)
390        }
391        fn run<'a>(
392            &'a mut self,
393            exchange: Exchange,
394        ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
395            Box::pin(async move { PipelineOutcome::Stopped(exchange) })
396        }
397    }
398
399    /// Helper: OutcomePipeline body that stops on the nth invocation (0-indexed).
400    #[derive(Clone)]
401    struct StopOnNthBody {
402        counter: Arc<AtomicUsize>,
403        stop_at: usize,
404    }
405    impl camel_api::OutcomePipeline for StopOnNthBody {
406        fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
407            Box::new(self.clone())
408        }
409        fn run<'a>(
410            &'a mut self,
411            exchange: Exchange,
412        ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
413            let count = self.counter.fetch_add(1, Ordering::SeqCst);
414            let stop_at = self.stop_at;
415            Box::pin(async move {
416                if count >= stop_at {
417                    PipelineOutcome::Stopped(exchange)
418                } else {
419                    PipelineOutcome::Completed(exchange)
420                }
421            })
422        }
423    }
424
425    /// Helper: OutcomePipeline body that mutates the exchange body then stops.
426    #[derive(Clone)]
427    struct MutateAndStopBody;
428    impl camel_api::OutcomePipeline for MutateAndStopBody {
429        fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
430            Box::new(MutateAndStopBody)
431        }
432        fn run<'a>(
433            &'a mut self,
434            mut exchange: Exchange,
435        ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
436            Box::pin(async move {
437                exchange.input.body = Body::Text("mutated-by-body".to_string());
438                PipelineOutcome::Stopped(exchange)
439            })
440        }
441    }
442
443    // ── Test 1: Sequential split — Stop halts remaining fragments ──
444
445    #[tokio::test]
446    async fn stop_inside_split_sequential_halts_remaining_fragments() {
447        let invocations = Arc::new(AtomicUsize::new(0));
448        let body = StopOnNthBody {
449            counter: Arc::clone(&invocations),
450            stop_at: 1, // stop on the 2nd fragment (index 1)
451        };
452
453        let mut seg = SplitSegment {
454            splitter: camel_api::split_body_lines(),
455            body: OutcomeSegment::new(Box::new(body)),
456            parallel: false,
457            parallel_limit: None,
458            stop_on_exception: true,
459            aggregation: AggregationStrategy::LastWins,
460        };
461
462        let ex = Exchange::new(Message::new("a\nb\nc"));
463        let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
464
465        assert!(matches!(result, PipelineOutcome::Stopped(_)));
466        // Fragments 0 (pass) + 1 (stop) = 2 invocations; fragment 2 never runs.
467        assert_eq!(invocations.load(Ordering::SeqCst), 2);
468    }
469
470    // ── Test 2: Sequential split — Stop preserves exchange mutations ──
471
472    #[tokio::test]
473    async fn stop_inside_split_sequential_preserves_exchange_mutations() {
474        let mut seg = SplitSegment {
475            splitter: camel_api::split_body_lines(),
476            body: OutcomeSegment::new(Box::new(MutateAndStopBody)),
477            parallel: false,
478            parallel_limit: None,
479            stop_on_exception: true,
480            aggregation: AggregationStrategy::LastWins,
481        };
482
483        let ex = Exchange::new(Message::new("hello"));
484        let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
485
486        match result {
487            PipelineOutcome::Stopped(ex) => {
488                assert_eq!(
489                    ex.input.body.as_text(),
490                    Some("mutated-by-body"),
491                    "Stopped exchange should carry body mutation"
492                );
493            }
494            other => panic!("Expected Stopped, got {other:?}"),
495        }
496    }
497
498    // ── Test 3: Parallel split — Stop cancels pending, waits in-flight ──
499    //
500    // NOTE: With JoinSet::spawn, all tasks are eagerly created. The
501    // pre-start gate only stops fragments whose spawned closure hasn't
502    // been polled yet. This test uses a tokio::sync::Barrier inside the
503    // body to ensure ALL fragments pass the pre-start gate, then verifies
504    // that in-flight (frag-1) completes even though Stop fires. The
505    // "cancels pending" invariant (frag-2 not started) is best-effort;
506    // the true invariant is: fragments that DO start MUST run to completion.
507
508    #[tokio::test(flavor = "multi_thread")]
509    async fn stop_inside_split_parallel_cancels_pending_and_waits_inflight() {
510        use tokio::sync::Barrier;
511
512        let barrier = Arc::new(Barrier::new(3));
513        let fragment1_completed = Arc::new(AtomicBool::new(false));
514        let fragment2_completed = Arc::new(AtomicBool::new(false));
515        let frag1_ok = Arc::clone(&fragment1_completed);
516        let frag2_ok = Arc::clone(&fragment2_completed);
517        let bar = Arc::clone(&barrier);
518
519        // Custom splitter producing 3 fragments.
520        let splitter: SplitExpression = Arc::new(|ex: &Exchange| {
521            (0..3)
522                .map(|i| {
523                    let mut frag = ex.clone();
524                    frag.input.body = Body::Text(format!("frag-{i}"));
525                    frag
526                })
527                .collect()
528        });
529
530        /// Body that uses a barrier to synchronize all fragments past the
531        /// pre-start gate, then dispatches:
532        ///   - frag-0: Stop
533        ///   - frag-1: slow (100ms) Completed
534        ///   - frag-2: fast Completed (asserts it started, proving no abort)
535        struct BarrierDispatchBody {
536            barrier: Arc<Barrier>,
537            f1_completed: Arc<AtomicBool>,
538            f2_completed: Arc<AtomicBool>,
539        }
540        impl Clone for BarrierDispatchBody {
541            fn clone(&self) -> Self {
542                Self {
543                    barrier: Arc::clone(&self.barrier),
544                    f1_completed: Arc::clone(&self.f1_completed),
545                    f2_completed: Arc::clone(&self.f2_completed),
546                }
547            }
548        }
549        impl camel_api::OutcomePipeline for BarrierDispatchBody {
550            fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
551                Box::new(self.clone())
552            }
553            fn run<'a>(
554                &'a mut self,
555                exchange: Exchange,
556            ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
557                let bar = Arc::clone(&self.barrier);
558                let f1c = Arc::clone(&self.f1_completed);
559                let f2c = Arc::clone(&self.f2_completed);
560                Box::pin(async move {
561                    let body_text = exchange.input.body.as_text().unwrap_or("").to_string();
562
563                    // All fragments synchronize AFTER passing the pre-start gate
564                    // (the gate is checked before body.run()). This ensures all
565                    // three fragments are in-flight when Stop fires.
566                    bar.wait().await;
567
568                    match body_text.as_str() {
569                        "frag-0" => PipelineOutcome::Stopped(exchange),
570                        "frag-1" => {
571                            // Slow in-flight — fragment 0's Stop is recorded and
572                            // propagates, but we still complete (spec §5.6).
573                            tokio::time::sleep(std::time::Duration::from_millis(100)).await;
574                            f1c.store(true, Ordering::SeqCst);
575                            PipelineOutcome::Completed(exchange)
576                        }
577                        "frag-2" => {
578                            f2c.store(true, Ordering::SeqCst);
579                            PipelineOutcome::Completed(exchange)
580                        }
581                        _ => PipelineOutcome::Completed(exchange),
582                    }
583                })
584            }
585        }
586
587        let body = BarrierDispatchBody {
588            barrier: bar,
589            f1_completed: frag1_ok,
590            f2_completed: frag2_ok,
591        };
592
593        let mut seg = SplitSegment {
594            splitter,
595            body: OutcomeSegment::new(Box::new(body)),
596            parallel: true,
597            parallel_limit: None,
598            stop_on_exception: true,
599            aggregation: AggregationStrategy::LastWins,
600        };
601
602        let ex = Exchange::new(Message::new("test"));
603        let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
604
605        assert!(
606            matches!(result, PipelineOutcome::Stopped(_)),
607            "Expected Stopped, got {result:?}"
608        );
609        // Fragment 1 was in-flight and completed (no abrupt abort per §5.6).
610        assert!(
611            fragment1_completed.load(Ordering::SeqCst),
612            "fragment 1 should have completed despite Stop"
613        );
614        // Fragment 2 was also in-flight (barrier ensures all start) and completed.
615        assert!(
616            fragment2_completed.load(Ordering::SeqCst),
617            "fragment 2 should have completed despite Stop"
618        );
619    }
620
621    // ── Test 4: Parallel split — lowest stopped index wins ──
622
623    #[tokio::test(flavor = "multi_thread")]
624    async fn stop_inside_split_parallel_lowest_stopped_index_wins() {
625        // Custom splitter producing 3 fragments with index-identifiable body.
626        let splitter: SplitExpression = Arc::new(|ex: &Exchange| {
627            (0..3)
628                .map(|i| {
629                    let mut frag = ex.clone();
630                    frag.input.body = Body::Text(format!("from-fragment-{i}"));
631                    frag
632                })
633                .collect()
634        });
635
636        // Body that stops for fragments 0 and 2; fragment 1 completes.
637        struct DualStopBody;
638        impl Clone for DualStopBody {
639            fn clone(&self) -> Self {
640                DualStopBody
641            }
642        }
643        impl camel_api::OutcomePipeline for DualStopBody {
644            fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
645                Box::new(DualStopBody)
646            }
647            fn run<'a>(
648                &'a mut self,
649                exchange: Exchange,
650            ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
651                let is_frag0 = exchange
652                    .input
653                    .body
654                    .as_text()
655                    .map(|s| s == "from-fragment-0")
656                    .unwrap_or(false);
657                let is_frag2 = exchange
658                    .input
659                    .body
660                    .as_text()
661                    .map(|s| s == "from-fragment-2")
662                    .unwrap_or(false);
663                Box::pin(async move {
664                    if is_frag0 {
665                        return PipelineOutcome::Stopped(exchange);
666                    }
667                    if is_frag2 {
668                        // Delay slightly to ensure fragment 0's Stop is recorded first
669                        // in the CAS, then verify that lowest index wins.
670                        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
671                        return PipelineOutcome::Stopped(exchange);
672                    }
673                    // frag-1: completed
674                    PipelineOutcome::Completed(exchange)
675                })
676            }
677        }
678
679        let mut seg = SplitSegment {
680            splitter,
681            body: OutcomeSegment::new(Box::new(DualStopBody)),
682            parallel: true,
683            parallel_limit: None,
684            stop_on_exception: true,
685            aggregation: AggregationStrategy::LastWins,
686        };
687
688        let ex = Exchange::new(Message::new("test"));
689        let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
690
691        match result {
692            PipelineOutcome::Stopped(ex) => {
693                assert_eq!(
694                    ex.input.body.as_text(),
695                    Some("from-fragment-0"),
696                    "Lowest stopped index (0) should win, got body {:?}",
697                    ex.input.body.as_text()
698                );
699            }
700            other => panic!("Expected Stopped with fragment-0 body, got {other:?}"),
701        }
702    }
703
704    // ── Test 5: parallel_limit enforcement ─────────────────────────────
705
706    #[tokio::test(flavor = "multi_thread")]
707    async fn split_parallel_limit_enforces_concurrency_cap() {
708        let concurrent = Arc::new(AtomicUsize::new(0));
709        let max_concurrent = Arc::new(AtomicUsize::new(0));
710
711        // Split into 6 fragments. parallel_limit=2.
712        let splitter: SplitExpression = Arc::new(|ex: &Exchange| {
713            (0..6)
714                .map(|i| {
715                    let mut frag = ex.clone();
716                    frag.input.body = Body::Text(format!("frag-{i}"));
717                    frag
718                })
719                .collect()
720        });
721
722        let c = Arc::clone(&concurrent);
723        let mc = Arc::clone(&max_concurrent);
724        struct LimitedBody {
725            concurrent: Arc<AtomicUsize>,
726            max_concurrent: Arc<AtomicUsize>,
727        }
728        impl Clone for LimitedBody {
729            fn clone(&self) -> Self {
730                Self {
731                    concurrent: Arc::clone(&self.concurrent),
732                    max_concurrent: Arc::clone(&self.max_concurrent),
733                }
734            }
735        }
736        impl camel_api::OutcomePipeline for LimitedBody {
737            fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
738                Box::new(self.clone())
739            }
740            fn run<'a>(
741                &'a mut self,
742                exchange: Exchange,
743            ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
744                let c = Arc::clone(&self.concurrent);
745                let mc = Arc::clone(&self.max_concurrent);
746                Box::pin(async move {
747                    let current = c.fetch_add(1, Ordering::SeqCst) + 1;
748                    mc.fetch_max(current, Ordering::SeqCst);
749                    tokio::task::yield_now().await;
750                    c.fetch_sub(1, Ordering::SeqCst);
751                    PipelineOutcome::Completed(exchange)
752                })
753            }
754        }
755
756        let mut seg = SplitSegment {
757            splitter,
758            body: OutcomeSegment::new(Box::new(LimitedBody {
759                concurrent: c,
760                max_concurrent: mc,
761            })),
762            parallel: true,
763            parallel_limit: Some(2),
764            stop_on_exception: true,
765            aggregation: AggregationStrategy::LastWins,
766        };
767
768        let ex = Exchange::new(Message::new("test"));
769        let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
770        assert!(
771            matches!(result, PipelineOutcome::Completed(_)),
772            "Expected Completed, got {result:?}"
773        );
774
775        let observed_max = max_concurrent.load(Ordering::SeqCst);
776        assert!(
777            observed_max <= 2,
778            "parallel_limit=2 but max concurrency was {observed_max}"
779        );
780    }
781
782    // ── Test 6: stop_on_exception=true (sequential) ────────────────────
783
784    #[tokio::test]
785    async fn split_sequential_stop_on_exception_true() {
786        // 5 fragments, fail on 2nd (index 1). stop_on_exception=true → stops.
787        fn make_fail_body(
788            fail_at: usize,
789            counter: Arc<AtomicUsize>,
790        ) -> impl camel_api::OutcomePipeline + Clone {
791            #[derive(Clone)]
792            struct FailAtBody {
793                fail_at: usize,
794                counter: Arc<AtomicUsize>,
795            }
796            impl camel_api::OutcomePipeline for FailAtBody {
797                fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
798                    Box::new(self.clone())
799                }
800                fn run<'a>(
801                    &'a mut self,
802                    exchange: Exchange,
803                ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
804                    let count = self.counter.fetch_add(1, Ordering::SeqCst);
805                    let fail_at = self.fail_at;
806                    Box::pin(async move {
807                        if count == fail_at {
808                            PipelineOutcome::Failed(CamelError::ProcessorError(format!(
809                                "fail at {count}"
810                            )))
811                        } else {
812                            PipelineOutcome::Completed(exchange)
813                        }
814                    })
815                }
816            }
817            FailAtBody { fail_at, counter }
818        }
819
820        let invocations = Arc::new(AtomicUsize::new(0));
821        let body = make_fail_body(1, Arc::clone(&invocations));
822        let mut seg = SplitSegment {
823            splitter: camel_api::split_body_lines(),
824            body: OutcomeSegment::new(Box::new(body)),
825            parallel: false,
826            parallel_limit: None,
827            stop_on_exception: true,
828            aggregation: AggregationStrategy::LastWins,
829        };
830
831        let ex = Exchange::new(Message::new("a\nb\nc\nd\ne"));
832        let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
833
834        assert!(
835            matches!(result, PipelineOutcome::Failed(_)),
836            "stop_on_exception=true should propagate first failure"
837        );
838        // Only 2 fragments processed (index 0 passed, index 1 failed);
839        // fragments 2-4 never run.
840        assert_eq!(
841            invocations.load(Ordering::SeqCst),
842            2,
843            "should stop after 2 fragments (0 pass, 1 fail)"
844        );
845    }
846
847    // ── Test 7: stop_on_exception=false (sequential) ───────────────────
848
849    #[tokio::test]
850    async fn split_sequential_stop_on_exception_false() {
851        // 5 fragments, fail on 2nd (index 1). stop_on_exception=false → continues.
852        fn make_fail_body(
853            fail_at: usize,
854            counter: Arc<AtomicUsize>,
855        ) -> impl camel_api::OutcomePipeline + Clone {
856            #[derive(Clone)]
857            struct FailAtBody {
858                fail_at: usize,
859                counter: Arc<AtomicUsize>,
860            }
861            impl camel_api::OutcomePipeline for FailAtBody {
862                fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
863                    Box::new(self.clone())
864                }
865                fn run<'a>(
866                    &'a mut self,
867                    exchange: Exchange,
868                ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
869                    let count = self.counter.fetch_add(1, Ordering::SeqCst);
870                    let fail_at = self.fail_at;
871                    Box::pin(async move {
872                        if count == fail_at {
873                            PipelineOutcome::Failed(CamelError::ProcessorError(format!(
874                                "fail at {count}"
875                            )))
876                        } else {
877                            PipelineOutcome::Completed(exchange)
878                        }
879                    })
880                }
881            }
882            FailAtBody { fail_at, counter }
883        }
884
885        let invocations = Arc::new(AtomicUsize::new(0));
886        let body = make_fail_body(1, Arc::clone(&invocations));
887        let mut seg = SplitSegment {
888            splitter: camel_api::split_body_lines(),
889            body: OutcomeSegment::new(Box::new(body)),
890            parallel: false,
891            parallel_limit: None,
892            stop_on_exception: false,
893            aggregation: AggregationStrategy::LastWins,
894        };
895
896        let ex = Exchange::new(Message::new("a\nb\nc\nd\ne"));
897        let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
898
899        // With stop_on_exception=false, processing continues after failure;
900        // last error is propagated.
901        assert!(
902            matches!(result, PipelineOutcome::Failed(_)),
903            "stop_on_exception=false should still propagate error at end"
904        );
905        // All 5 fragments processed.
906        assert_eq!(
907            invocations.load(Ordering::SeqCst),
908            5,
909            "all fragments should be processed when stop_on_exception=false"
910        );
911    }
912
913    // ── Test 8: stop_on_exception=true (parallel) ──────────────────────
914
915    #[tokio::test(flavor = "multi_thread")]
916    async fn split_parallel_stop_on_exception_true() {
917        let splitter: SplitExpression = Arc::new(|ex: &Exchange| {
918            (0..5)
919                .map(|i| {
920                    let mut frag = ex.clone();
921                    frag.input.body = Body::Text(format!("frag-{i}"));
922                    frag
923                })
924                .collect()
925        });
926
927        // All fragments fail. stop_on_exception=true → first Failed propagated.
928        let invocations = Arc::new(AtomicUsize::new(0));
929        struct FailBody {
930            counter: Arc<AtomicUsize>,
931        }
932        impl Clone for FailBody {
933            fn clone(&self) -> Self {
934                Self {
935                    counter: Arc::clone(&self.counter),
936                }
937            }
938        }
939        impl camel_api::OutcomePipeline for FailBody {
940            fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
941                Box::new(self.clone())
942            }
943            fn run<'a>(
944                &'a mut self,
945                exchange: Exchange,
946            ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
947                let count = self.counter.fetch_add(1, Ordering::SeqCst);
948                Box::pin(async move {
949                    PipelineOutcome::Failed(CamelError::ProcessorError(format!("fail {count}")))
950                })
951            }
952        }
953
954        let mut seg = SplitSegment {
955            splitter,
956            body: OutcomeSegment::new(Box::new(FailBody {
957                counter: Arc::clone(&invocations),
958            })),
959            parallel: true,
960            parallel_limit: None,
961            stop_on_exception: true,
962            aggregation: AggregationStrategy::LastWins,
963        };
964
965        let ex = Exchange::new(Message::new("test"));
966        let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
967
968        assert!(
969            matches!(result, PipelineOutcome::Failed(_)),
970            "stop_on_exception=true should propagate first failure"
971        );
972        // All 5 spawned (JoinSet), all completed.
973        assert_eq!(
974            invocations.load(Ordering::SeqCst),
975            5,
976            "all fragments should be spawned"
977        );
978    }
979
980    // ── Test 9: stop_on_exception=false (parallel) ─────────────────────
981
982    #[tokio::test(flavor = "multi_thread")]
983    async fn split_parallel_stop_on_exception_false() {
984        let splitter: SplitExpression = Arc::new(|ex: &Exchange| {
985            (0..5)
986                .map(|i| {
987                    let mut frag = ex.clone();
988                    frag.input.body = Body::Text(format!("frag-{i}"));
989                    frag
990                })
991                .collect()
992        });
993
994        // Fragment 0 passes, 1 fails, 2-4 pass.
995        let invocations = Arc::new(AtomicUsize::new(0));
996        struct MixedBody {
997            counter: Arc<AtomicUsize>,
998        }
999        impl Clone for MixedBody {
1000            fn clone(&self) -> Self {
1001                Self {
1002                    counter: Arc::clone(&self.counter),
1003                }
1004            }
1005        }
1006        impl camel_api::OutcomePipeline for MixedBody {
1007            fn clone_box(&self) -> Box<dyn camel_api::OutcomePipeline> {
1008                Box::new(self.clone())
1009            }
1010            fn run<'a>(
1011                &'a mut self,
1012                exchange: Exchange,
1013            ) -> Pin<Box<dyn Future<Output = PipelineOutcome> + Send + 'a>> {
1014                let count = self.counter.fetch_add(1, Ordering::SeqCst);
1015                Box::pin(async move {
1016                    if count == 1 {
1017                        PipelineOutcome::Failed(CamelError::ProcessorError("fail 1".into()))
1018                    } else {
1019                        PipelineOutcome::Completed(exchange)
1020                    }
1021                })
1022            }
1023        }
1024
1025        let mut seg = SplitSegment {
1026            splitter,
1027            body: OutcomeSegment::new(Box::new(MixedBody {
1028                counter: Arc::clone(&invocations),
1029            })),
1030            parallel: true,
1031            parallel_limit: None,
1032            stop_on_exception: false,
1033            aggregation: AggregationStrategy::LastWins,
1034        };
1035
1036        let ex = Exchange::new(Message::new("test"));
1037        let result = camel_api::OutcomePipeline::run(&mut seg, ex).await;
1038
1039        // stop_on_exception=false → last error propagated at end.
1040        assert!(
1041            matches!(result, PipelineOutcome::Failed(_)),
1042            "stop_on_exception=false should propagate failure at end; got {result:?}"
1043        );
1044        assert_eq!(
1045            invocations.load(Ordering::SeqCst),
1046            5,
1047            "all fragments should be spawned"
1048        );
1049    }
1050}