Skip to main content

camel_api/
splitter.rs

1use std::pin::Pin;
2use std::sync::Arc;
3
4use futures::Stream;
5
6use crate::body::Body;
7use crate::error::CamelError;
8use crate::exchange::Exchange;
9use crate::message::Message;
10
11/// A function that splits a single exchange into multiple fragment exchanges.
12pub type SplitExpression = Arc<dyn Fn(&Exchange) -> Vec<Exchange> + Send + Sync>;
13
14/// A function that lazily produces a stream of exchange fragments.
15///
16/// Used by [`StreamingSplitterService`] for v1 sequential streaming split
17/// (e.g., ZIP entry extraction, CSV/JSON streaming in future work).
18///
19/// Each call returns a `Stream` that yields fragments one at a time.
20pub type StreamingSplitExpression = Arc<
21    dyn Fn(Exchange) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>>
22        + Send
23        + Sync,
24>;
25
26/// Strategy for aggregating fragment results back into a single exchange.
27#[derive(Clone, Default)]
28pub enum AggregationStrategy {
29    /// Result is the last fragment's exchange (default).
30    #[default]
31    LastWins,
32    /// Collects all fragment bodies into a JSON array.
33    CollectAll,
34    /// Returns the original exchange unchanged.
35    Original,
36    /// Custom aggregation function: `(accumulated, next) -> merged`.
37    Custom(Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>),
38}
39
40/// The streaming format to use when splitting a stream body.
41#[derive(
42    Clone,
43    Debug,
44    Default,
45    PartialEq,
46    Eq,
47    serde::Serialize,
48    serde::Deserialize,
49    schemars::JsonSchema,
50    ts_rs::TS,
51)]
52#[serde(rename_all = "snake_case")]
53#[ts(rename_all = "snake_case")]
54pub enum StreamSplitFormat {
55    /// Auto-detect the format from the body content.
56    #[default]
57    Auto,
58    /// Newline-delimited JSON — each line is a complete JSON value.
59    Ndjson,
60    /// Split by newlines, each line becomes a text fragment.
61    Lines,
62    /// Split into fixed-size byte chunks.
63    Chunks,
64}
65
66/// Configuration for splitting a streaming body into fragments.
67///
68/// Controls how the stream splitter processes the body, including format
69/// detection, sizing limits, and metadata propagation.
70#[derive(
71    Clone,
72    Debug,
73    PartialEq,
74    Eq,
75    serde::Serialize,
76    serde::Deserialize,
77    schemars::JsonSchema,
78    ts_rs::TS,
79)]
80#[serde(rename_all = "snake_case")]
81#[ts(rename_all = "snake_case")]
82pub struct StreamSplitConfig {
83    /// The streaming format to use.
84    pub format: StreamSplitFormat,
85    /// Maximum size (in bytes) of a single record or chunk.
86    pub max_record_bytes: usize,
87    /// Number of records/chunks to collect into a single exchange batch.
88    pub batch_size: usize,
89    /// Explicit chunk size in bytes (required when format is [`Chunks`](StreamSplitFormat::Chunks)).
90    pub chunk_size: Option<usize>,
91    /// Whether to include origin metadata in each fragment.
92    pub include_origin: bool,
93}
94
95impl Default for StreamSplitConfig {
96    fn default() -> Self {
97        Self {
98            format: StreamSplitFormat::Auto,
99            max_record_bytes: 1024 * 1024,
100            batch_size: 1,
101            chunk_size: None,
102            include_origin: true,
103        }
104    }
105}
106
107impl StreamSplitConfig {
108    /// Validates the configuration.
109    ///
110    /// # Errors
111    ///
112    /// Returns [`CamelError::Config`] if:
113    /// - `batch_size` is `0`
114    /// - `max_record_bytes` is `0`
115    /// - `format` is [`Chunks`](StreamSplitFormat::Chunks) but `chunk_size` is `None`
116    /// - `chunk_size` is `Some(0)`
117    pub fn validate(&self) -> Result<(), CamelError> {
118        if self.batch_size == 0 {
119            return Err(CamelError::Config(
120                "stream split batch_size must be > 0".into(),
121            ));
122        }
123        if self.max_record_bytes == 0 {
124            return Err(CamelError::Config(
125                "stream split max_record_bytes must be > 0".into(),
126            ));
127        }
128        if self.format == StreamSplitFormat::Chunks && self.chunk_size.is_none() {
129            return Err(CamelError::Config(
130                "stream split format=Chunks requires chunk_size".into(),
131            ));
132        }
133        if let Some(cs) = self.chunk_size
134            && cs == 0
135        {
136            return Err(CamelError::Config(
137                "stream split chunk_size must be > 0".into(),
138            ));
139        }
140        if self.format == StreamSplitFormat::Chunks
141            && let Some(cs) = self.chunk_size
142            && cs > self.max_record_bytes
143        {
144            return Err(CamelError::Config(
145                "stream split chunk_size must be <= max_record_bytes".into(),
146            ));
147        }
148        Ok(())
149    }
150}
151
152/// Configuration for the Splitter EIP.
153pub struct SplitterConfig {
154    /// Expression that splits an exchange into fragments.
155    pub expression: SplitExpression,
156    /// How to aggregate fragment results.
157    pub aggregation: AggregationStrategy,
158    /// Whether to process fragments in parallel.
159    pub parallel: bool,
160    /// Maximum number of parallel fragments (None = unlimited).
161    pub parallel_limit: Option<usize>,
162    /// Whether to stop processing on the first exception.
163    ///
164    /// In parallel mode this only affects aggregation (the first error is
165    /// propagated), **not** in-flight futures — `join_all` cannot cancel
166    /// already-spawned work.
167    pub stop_on_exception: bool,
168}
169
170impl SplitterConfig {
171    /// Create a new splitter config with the given split expression.
172    pub fn new(expression: SplitExpression) -> Self {
173        Self {
174            expression,
175            aggregation: AggregationStrategy::default(),
176            parallel: false,
177            parallel_limit: None,
178            stop_on_exception: true,
179        }
180    }
181
182    /// Set the aggregation strategy for combining fragment results.
183    pub fn aggregation(mut self, strategy: AggregationStrategy) -> Self {
184        self.aggregation = strategy;
185        self
186    }
187
188    /// Enable or disable parallel fragment processing.
189    pub fn parallel(mut self, parallel: bool) -> Self {
190        self.parallel = parallel;
191        self
192    }
193
194    /// Set the maximum number of concurrent fragments in parallel mode.
195    pub fn parallel_limit(mut self, limit: usize) -> Self {
196        self.parallel_limit = Some(limit);
197        self
198    }
199
200    /// Control whether processing stops on the first fragment error.
201    ///
202    /// In parallel mode this only affects aggregation — see the field-level
203    /// doc comment for details.
204    pub fn stop_on_exception(mut self, stop: bool) -> Self {
205        self.stop_on_exception = stop;
206        self
207    }
208
209    /// Validates the configuration.
210    ///
211    /// Returns `Err(CamelError::Config)` if `parallel_limit` is set to 0,
212    /// which would cause a `Semaphore::new(0)` panic at runtime.
213    pub fn validate(&self) -> Result<(), CamelError> {
214        if self.parallel && self.parallel_limit == Some(0) {
215            return Err(CamelError::Config(
216                "splitter parallel_limit must be > 0".to_string(),
217            ));
218        }
219        Ok(())
220    }
221}
222
223// ---------------------------------------------------------------------------
224// Helpers
225// ---------------------------------------------------------------------------
226
227/// Create a fragment exchange that inherits headers, properties, and OTel context
228/// from the parent, but with a new body.
229///
230/// # OpenTelemetry Trace Propagation
231///
232/// Each fragment inherits the parent's `otel_context`, which carries the active span
233/// context. When TracingProcessor processes a fragment, it will create a child span
234/// linked to the parent span. This creates a natural fan-out relationship in the
235/// distributed trace:
236///
237/// ```text
238/// ParentExchange (span A)
239///   ├─ Fragment 1 (span B, child of A)
240///   ├─ Fragment 2 (span C, child of A)
241///   └─ Fragment N (span N, child of A)
242/// ```
243///
244/// This parent-child relationship is the correct semantic for message splitting,
245/// as fragments are logical subdivisions of the parent message, not independent
246/// operations that merely reference the parent (which would warrant span links).
247pub fn fragment_exchange(parent: &Exchange, body: Body) -> Exchange {
248    let mut msg = Message::new(body);
249    msg.headers = parent.input.headers.clone();
250    let mut ex = Exchange::new(msg);
251    ex.properties = parent.properties.clone();
252    ex.pattern = parent.pattern;
253    // Inherit OTel context so fragment spans are children of the parent span
254    ex.otel_context = parent.otel_context.clone();
255    ex
256}
257
258/// Split the exchange body by newlines. Returns one fragment per line.
259/// Non-text bodies produce an empty vec.
260pub fn split_body_lines() -> SplitExpression {
261    Arc::new(|exchange: &Exchange| {
262        let text = match &exchange.input.body {
263            Body::Text(s) => s.as_str(),
264            _ => return Vec::new(),
265        };
266        text.lines()
267            .map(|line| fragment_exchange(exchange, Body::Text(line.to_string())))
268            .collect()
269    })
270}
271
272/// Split a JSON array body into one fragment per element.
273/// Non-array bodies produce an empty vec.
274pub fn split_body_json_array() -> SplitExpression {
275    Arc::new(|exchange: &Exchange| {
276        let arr = match &exchange.input.body {
277            Body::Json(serde_json::Value::Array(arr)) => arr,
278            _ => return Vec::new(),
279        };
280        arr.iter()
281            .map(|val| fragment_exchange(exchange, Body::Json(val.clone())))
282            .collect()
283    })
284}
285
286/// Split the exchange body using a custom function that operates on the body.
287pub fn split_body<F>(f: F) -> SplitExpression
288where
289    F: Fn(&Body) -> Vec<Body> + Send + Sync + 'static,
290{
291    Arc::new(move |exchange: &Exchange| {
292        f(&exchange.input.body)
293            .into_iter()
294            .map(|body| fragment_exchange(exchange, body))
295            .collect()
296    })
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use crate::value::Value;
303
304    #[test]
305    fn test_split_body_lines() {
306        let mut ex = Exchange::new(Message::new("a\nb\nc"));
307        ex.input.set_header("source", Value::String("test".into()));
308        ex.set_property("trace", Value::Bool(true));
309
310        let fragments = split_body_lines()(&ex);
311        assert_eq!(fragments.len(), 3);
312        assert_eq!(fragments[0].input.body.as_text(), Some("a"));
313        assert_eq!(fragments[1].input.body.as_text(), Some("b"));
314        assert_eq!(fragments[2].input.body.as_text(), Some("c"));
315
316        // Verify headers and properties inherited
317        for frag in &fragments {
318            assert_eq!(
319                frag.input.header("source"),
320                Some(&Value::String("test".into()))
321            );
322            assert_eq!(frag.property("trace"), Some(&Value::Bool(true)));
323        }
324    }
325
326    #[test]
327    fn test_split_body_lines_empty() {
328        let ex = Exchange::new(Message::default()); // Body::Empty
329        let fragments = split_body_lines()(&ex);
330        assert!(fragments.is_empty());
331    }
332
333    #[test]
334    fn test_split_body_json_array() {
335        let arr = serde_json::json!([1, 2, 3]);
336        let ex = Exchange::new(Message::new(arr));
337
338        let fragments = split_body_json_array()(&ex);
339        assert_eq!(fragments.len(), 3);
340        assert!(matches!(&fragments[0].input.body, Body::Json(v) if *v == serde_json::json!(1)));
341        assert!(matches!(&fragments[1].input.body, Body::Json(v) if *v == serde_json::json!(2)));
342        assert!(matches!(&fragments[2].input.body, Body::Json(v) if *v == serde_json::json!(3)));
343    }
344
345    #[test]
346    fn test_split_body_json_array_not_array() {
347        let obj = serde_json::json!({"not": "array"});
348        let ex = Exchange::new(Message::new(obj));
349
350        let fragments = split_body_json_array()(&ex);
351        assert!(fragments.is_empty());
352    }
353
354    #[test]
355    fn test_split_body_custom() {
356        let splitter = split_body(|body: &Body| match body {
357            Body::Text(s) => s
358                .split(',')
359                .map(|part| Body::Text(part.trim().to_string()))
360                .collect(),
361            _ => Vec::new(),
362        });
363
364        let mut ex = Exchange::new(Message::new("x, y, z"));
365        ex.set_property("id", Value::from(42));
366
367        let fragments = splitter(&ex);
368        assert_eq!(fragments.len(), 3);
369        assert_eq!(fragments[0].input.body.as_text(), Some("x"));
370        assert_eq!(fragments[1].input.body.as_text(), Some("y"));
371        assert_eq!(fragments[2].input.body.as_text(), Some("z"));
372
373        // Properties inherited
374        for frag in &fragments {
375            assert_eq!(frag.property("id"), Some(&Value::from(42)));
376        }
377    }
378
379    #[test]
380    fn test_splitter_config_defaults() {
381        let config = SplitterConfig::new(split_body_lines());
382        assert!(matches!(config.aggregation, AggregationStrategy::LastWins));
383        assert!(!config.parallel);
384        assert!(config.parallel_limit.is_none());
385        assert!(config.stop_on_exception);
386    }
387
388    #[test]
389    fn test_splitter_config_builder() {
390        let config = SplitterConfig::new(split_body_lines())
391            .aggregation(AggregationStrategy::CollectAll)
392            .parallel(true)
393            .parallel_limit(4)
394            .stop_on_exception(false);
395
396        assert!(matches!(
397            config.aggregation,
398            AggregationStrategy::CollectAll
399        ));
400        assert!(config.parallel);
401        assert_eq!(config.parallel_limit, Some(4));
402        assert!(!config.stop_on_exception);
403    }
404
405    #[test]
406    fn test_fragment_exchange_inherits_otel_context() {
407        use opentelemetry::Context;
408        use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId};
409
410        // Create parent exchange with a valid span context
411        let mut parent = Exchange::new(Message::new("test"));
412        let trace_id = TraceId::from_bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 123]);
413        let span_id = SpanId::from_bytes([0, 0, 0, 0, 0, 0, 1, 200]);
414        let span_context = SpanContext::new(
415            trace_id,
416            span_id,
417            TraceFlags::SAMPLED,
418            true,
419            Default::default(),
420        );
421        let expected_trace_id = span_context.trace_id();
422        parent.otel_context = Context::current().with_remote_span_context(span_context);
423
424        // Create fragment via split_body_lines
425        let fragments = split_body_lines()(&parent);
426        assert!(!fragments.is_empty(), "Should have at least one fragment");
427
428        // Verify each fragment has the same span context as parent
429        for fragment in &fragments {
430            let span = fragment.otel_context.span();
431            let frag_span_ctx = span.span_context();
432            assert!(
433                frag_span_ctx.is_valid(),
434                "Fragment should have valid span context"
435            );
436            assert_eq!(
437                frag_span_ctx.trace_id(),
438                expected_trace_id,
439                "Fragment should have same trace ID as parent"
440            );
441        }
442    }
443
444    #[test]
445    fn test_stream_split_config_defaults_valid() {
446        let config = StreamSplitConfig::default();
447        assert!(config.validate().is_ok());
448    }
449
450    #[test]
451    fn test_stream_split_config_batch_size_zero_rejected() {
452        let config = StreamSplitConfig {
453            batch_size: 0,
454            ..Default::default()
455        };
456        let err = config.validate().unwrap_err();
457        assert!(err.to_string().contains("batch_size"));
458    }
459
460    #[test]
461    fn test_stream_split_config_max_record_bytes_zero_rejected() {
462        let config = StreamSplitConfig {
463            max_record_bytes: 0,
464            ..Default::default()
465        };
466        let err = config.validate().unwrap_err();
467        assert!(err.to_string().contains("max_record_bytes"));
468    }
469
470    #[test]
471    fn test_stream_split_config_chunks_requires_chunk_size() {
472        let config = StreamSplitConfig {
473            format: StreamSplitFormat::Chunks,
474            chunk_size: None,
475            ..Default::default()
476        };
477        let err = config.validate().unwrap_err();
478        assert!(err.to_string().contains("Chunks requires chunk_size"));
479    }
480
481    #[test]
482    fn test_stream_split_config_chunk_size_zero_rejected() {
483        let config = StreamSplitConfig {
484            format: StreamSplitFormat::Chunks,
485            chunk_size: Some(0),
486            ..Default::default()
487        };
488        let err = config.validate().unwrap_err();
489        assert!(err.to_string().contains("chunk_size must be > 0"));
490    }
491
492    #[test]
493    fn test_stream_split_config_chunk_size_exceeds_max_record_bytes() {
494        let config = StreamSplitConfig {
495            format: StreamSplitFormat::Chunks,
496            chunk_size: Some(2000),
497            max_record_bytes: 1000,
498            ..Default::default()
499        };
500        let err = config.validate().unwrap_err();
501        assert!(
502            err.to_string()
503                .contains("chunk_size must be <= max_record_bytes")
504        );
505    }
506
507    #[test]
508    fn test_all_fragments_share_same_trace_context() {
509        use opentelemetry::Context;
510        use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId};
511
512        // Create parent with a specific trace ID
513        let mut parent = Exchange::new(Message::new("line1\nline2\nline3"));
514        let trace_id =
515            TraceId::from_bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x3B, 0x9A, 0xCA, 0x09]);
516        let span_id = SpanId::from_bytes([0, 0, 0, 0, 0, 0, 0, 111]);
517        let span_context = SpanContext::new(
518            trace_id,
519            span_id,
520            TraceFlags::SAMPLED,
521            true,
522            Default::default(),
523        );
524        parent.otel_context = Context::current().with_remote_span_context(span_context);
525
526        let fragments = split_body_lines()(&parent);
527        assert_eq!(fragments.len(), 3);
528
529        // All fragments should share the same trace ID
530        let trace_ids: Vec<_> = fragments
531            .iter()
532            .map(|f| {
533                let span = f.otel_context.span();
534                span.span_context().trace_id()
535            })
536            .collect();
537
538        assert!(
539            trace_ids.iter().all(|&id| id == trace_id),
540            "All fragments should have the same trace ID"
541        );
542    }
543}