1use std::pin::Pin;
2use std::sync::Arc;
3
4use futures::Stream;
5
6use crate::body::Body;
7use crate::error::CamelError;
8use crate::exchange::Exchange;
9use crate::message::Message;
10
11pub type SplitExpression = Arc<dyn Fn(&Exchange) -> Vec<Exchange> + Send + Sync>;
13
14pub type StreamingSplitExpression = Arc<
21 dyn Fn(Exchange) -> Pin<Box<dyn Stream<Item = Result<Exchange, CamelError>> + Send>>
22 + Send
23 + Sync,
24>;
25
26#[derive(Clone, Default)]
28pub enum AggregationStrategy {
29 #[default]
31 LastWins,
32 CollectAll,
34 Original,
36 Custom(Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>),
38}
39
40#[derive(
42 Clone,
43 Debug,
44 Default,
45 PartialEq,
46 Eq,
47 serde::Serialize,
48 serde::Deserialize,
49 schemars::JsonSchema,
50 ts_rs::TS,
51)]
52#[serde(rename_all = "snake_case")]
53#[ts(rename_all = "snake_case")]
54pub enum StreamSplitFormat {
55 #[default]
57 Auto,
58 Ndjson,
60 Lines,
62 Chunks,
64}
65
66#[derive(
71 Clone,
72 Debug,
73 PartialEq,
74 Eq,
75 serde::Serialize,
76 serde::Deserialize,
77 schemars::JsonSchema,
78 ts_rs::TS,
79)]
80#[serde(rename_all = "snake_case")]
81#[ts(rename_all = "snake_case")]
82pub struct StreamSplitConfig {
83 pub format: StreamSplitFormat,
85 pub max_record_bytes: usize,
87 pub batch_size: usize,
89 pub chunk_size: Option<usize>,
91 pub include_origin: bool,
93}
94
95impl Default for StreamSplitConfig {
96 fn default() -> Self {
97 Self {
98 format: StreamSplitFormat::Auto,
99 max_record_bytes: 1024 * 1024,
100 batch_size: 1,
101 chunk_size: None,
102 include_origin: true,
103 }
104 }
105}
106
107impl StreamSplitConfig {
108 pub fn validate(&self) -> Result<(), CamelError> {
118 if self.batch_size == 0 {
119 return Err(CamelError::Config(
120 "stream split batch_size must be > 0".into(),
121 ));
122 }
123 if self.max_record_bytes == 0 {
124 return Err(CamelError::Config(
125 "stream split max_record_bytes must be > 0".into(),
126 ));
127 }
128 if self.format == StreamSplitFormat::Chunks && self.chunk_size.is_none() {
129 return Err(CamelError::Config(
130 "stream split format=Chunks requires chunk_size".into(),
131 ));
132 }
133 if let Some(cs) = self.chunk_size
134 && cs == 0
135 {
136 return Err(CamelError::Config(
137 "stream split chunk_size must be > 0".into(),
138 ));
139 }
140 if self.format == StreamSplitFormat::Chunks
141 && let Some(cs) = self.chunk_size
142 && cs > self.max_record_bytes
143 {
144 return Err(CamelError::Config(
145 "stream split chunk_size must be <= max_record_bytes".into(),
146 ));
147 }
148 Ok(())
149 }
150}
151
152pub struct SplitterConfig {
154 pub expression: SplitExpression,
156 pub aggregation: AggregationStrategy,
158 pub parallel: bool,
160 pub parallel_limit: Option<usize>,
162 pub stop_on_exception: bool,
168}
169
170impl SplitterConfig {
171 pub fn new(expression: SplitExpression) -> Self {
173 Self {
174 expression,
175 aggregation: AggregationStrategy::default(),
176 parallel: false,
177 parallel_limit: None,
178 stop_on_exception: true,
179 }
180 }
181
182 pub fn aggregation(mut self, strategy: AggregationStrategy) -> Self {
184 self.aggregation = strategy;
185 self
186 }
187
188 pub fn parallel(mut self, parallel: bool) -> Self {
190 self.parallel = parallel;
191 self
192 }
193
194 pub fn parallel_limit(mut self, limit: usize) -> Self {
196 self.parallel_limit = Some(limit);
197 self
198 }
199
200 pub fn stop_on_exception(mut self, stop: bool) -> Self {
205 self.stop_on_exception = stop;
206 self
207 }
208
209 pub fn validate(&self) -> Result<(), CamelError> {
214 if self.parallel && self.parallel_limit == Some(0) {
215 return Err(CamelError::Config(
216 "splitter parallel_limit must be > 0".to_string(),
217 ));
218 }
219 Ok(())
220 }
221}
222
223pub fn fragment_exchange(parent: &Exchange, body: Body) -> Exchange {
248 let mut msg = Message::new(body);
249 msg.headers = parent.input.headers.clone();
250 let mut ex = Exchange::new(msg);
251 ex.properties = parent.properties.clone();
252 ex.pattern = parent.pattern;
253 ex.otel_context = parent.otel_context.clone();
255 ex
256}
257
258pub fn split_body_lines() -> SplitExpression {
261 Arc::new(|exchange: &Exchange| {
262 let text = match &exchange.input.body {
263 Body::Text(s) => s.as_str(),
264 _ => return Vec::new(),
265 };
266 text.lines()
267 .map(|line| fragment_exchange(exchange, Body::Text(line.to_string())))
268 .collect()
269 })
270}
271
272pub fn split_body_json_array() -> SplitExpression {
275 Arc::new(|exchange: &Exchange| {
276 let arr = match &exchange.input.body {
277 Body::Json(serde_json::Value::Array(arr)) => arr,
278 _ => return Vec::new(),
279 };
280 arr.iter()
281 .map(|val| fragment_exchange(exchange, Body::Json(val.clone())))
282 .collect()
283 })
284}
285
286pub fn split_body<F>(f: F) -> SplitExpression
288where
289 F: Fn(&Body) -> Vec<Body> + Send + Sync + 'static,
290{
291 Arc::new(move |exchange: &Exchange| {
292 f(&exchange.input.body)
293 .into_iter()
294 .map(|body| fragment_exchange(exchange, body))
295 .collect()
296 })
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use crate::value::Value;
303
304 #[test]
305 fn test_split_body_lines() {
306 let mut ex = Exchange::new(Message::new("a\nb\nc"));
307 ex.input.set_header("source", Value::String("test".into()));
308 ex.set_property("trace", Value::Bool(true));
309
310 let fragments = split_body_lines()(&ex);
311 assert_eq!(fragments.len(), 3);
312 assert_eq!(fragments[0].input.body.as_text(), Some("a"));
313 assert_eq!(fragments[1].input.body.as_text(), Some("b"));
314 assert_eq!(fragments[2].input.body.as_text(), Some("c"));
315
316 for frag in &fragments {
318 assert_eq!(
319 frag.input.header("source"),
320 Some(&Value::String("test".into()))
321 );
322 assert_eq!(frag.property("trace"), Some(&Value::Bool(true)));
323 }
324 }
325
326 #[test]
327 fn test_split_body_lines_empty() {
328 let ex = Exchange::new(Message::default()); let fragments = split_body_lines()(&ex);
330 assert!(fragments.is_empty());
331 }
332
333 #[test]
334 fn test_split_body_json_array() {
335 let arr = serde_json::json!([1, 2, 3]);
336 let ex = Exchange::new(Message::new(arr));
337
338 let fragments = split_body_json_array()(&ex);
339 assert_eq!(fragments.len(), 3);
340 assert!(matches!(&fragments[0].input.body, Body::Json(v) if *v == serde_json::json!(1)));
341 assert!(matches!(&fragments[1].input.body, Body::Json(v) if *v == serde_json::json!(2)));
342 assert!(matches!(&fragments[2].input.body, Body::Json(v) if *v == serde_json::json!(3)));
343 }
344
345 #[test]
346 fn test_split_body_json_array_not_array() {
347 let obj = serde_json::json!({"not": "array"});
348 let ex = Exchange::new(Message::new(obj));
349
350 let fragments = split_body_json_array()(&ex);
351 assert!(fragments.is_empty());
352 }
353
354 #[test]
355 fn test_split_body_custom() {
356 let splitter = split_body(|body: &Body| match body {
357 Body::Text(s) => s
358 .split(',')
359 .map(|part| Body::Text(part.trim().to_string()))
360 .collect(),
361 _ => Vec::new(),
362 });
363
364 let mut ex = Exchange::new(Message::new("x, y, z"));
365 ex.set_property("id", Value::from(42));
366
367 let fragments = splitter(&ex);
368 assert_eq!(fragments.len(), 3);
369 assert_eq!(fragments[0].input.body.as_text(), Some("x"));
370 assert_eq!(fragments[1].input.body.as_text(), Some("y"));
371 assert_eq!(fragments[2].input.body.as_text(), Some("z"));
372
373 for frag in &fragments {
375 assert_eq!(frag.property("id"), Some(&Value::from(42)));
376 }
377 }
378
379 #[test]
380 fn test_splitter_config_defaults() {
381 let config = SplitterConfig::new(split_body_lines());
382 assert!(matches!(config.aggregation, AggregationStrategy::LastWins));
383 assert!(!config.parallel);
384 assert!(config.parallel_limit.is_none());
385 assert!(config.stop_on_exception);
386 }
387
388 #[test]
389 fn test_splitter_config_builder() {
390 let config = SplitterConfig::new(split_body_lines())
391 .aggregation(AggregationStrategy::CollectAll)
392 .parallel(true)
393 .parallel_limit(4)
394 .stop_on_exception(false);
395
396 assert!(matches!(
397 config.aggregation,
398 AggregationStrategy::CollectAll
399 ));
400 assert!(config.parallel);
401 assert_eq!(config.parallel_limit, Some(4));
402 assert!(!config.stop_on_exception);
403 }
404
405 #[test]
406 fn test_fragment_exchange_inherits_otel_context() {
407 use opentelemetry::Context;
408 use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId};
409
410 let mut parent = Exchange::new(Message::new("test"));
412 let trace_id = TraceId::from_bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 123]);
413 let span_id = SpanId::from_bytes([0, 0, 0, 0, 0, 0, 1, 200]);
414 let span_context = SpanContext::new(
415 trace_id,
416 span_id,
417 TraceFlags::SAMPLED,
418 true,
419 Default::default(),
420 );
421 let expected_trace_id = span_context.trace_id();
422 parent.otel_context = Context::current().with_remote_span_context(span_context);
423
424 let fragments = split_body_lines()(&parent);
426 assert!(!fragments.is_empty(), "Should have at least one fragment");
427
428 for fragment in &fragments {
430 let span = fragment.otel_context.span();
431 let frag_span_ctx = span.span_context();
432 assert!(
433 frag_span_ctx.is_valid(),
434 "Fragment should have valid span context"
435 );
436 assert_eq!(
437 frag_span_ctx.trace_id(),
438 expected_trace_id,
439 "Fragment should have same trace ID as parent"
440 );
441 }
442 }
443
444 #[test]
445 fn test_stream_split_config_defaults_valid() {
446 let config = StreamSplitConfig::default();
447 assert!(config.validate().is_ok());
448 }
449
450 #[test]
451 fn test_stream_split_config_batch_size_zero_rejected() {
452 let config = StreamSplitConfig {
453 batch_size: 0,
454 ..Default::default()
455 };
456 let err = config.validate().unwrap_err();
457 assert!(err.to_string().contains("batch_size"));
458 }
459
460 #[test]
461 fn test_stream_split_config_max_record_bytes_zero_rejected() {
462 let config = StreamSplitConfig {
463 max_record_bytes: 0,
464 ..Default::default()
465 };
466 let err = config.validate().unwrap_err();
467 assert!(err.to_string().contains("max_record_bytes"));
468 }
469
470 #[test]
471 fn test_stream_split_config_chunks_requires_chunk_size() {
472 let config = StreamSplitConfig {
473 format: StreamSplitFormat::Chunks,
474 chunk_size: None,
475 ..Default::default()
476 };
477 let err = config.validate().unwrap_err();
478 assert!(err.to_string().contains("Chunks requires chunk_size"));
479 }
480
481 #[test]
482 fn test_stream_split_config_chunk_size_zero_rejected() {
483 let config = StreamSplitConfig {
484 format: StreamSplitFormat::Chunks,
485 chunk_size: Some(0),
486 ..Default::default()
487 };
488 let err = config.validate().unwrap_err();
489 assert!(err.to_string().contains("chunk_size must be > 0"));
490 }
491
492 #[test]
493 fn test_stream_split_config_chunk_size_exceeds_max_record_bytes() {
494 let config = StreamSplitConfig {
495 format: StreamSplitFormat::Chunks,
496 chunk_size: Some(2000),
497 max_record_bytes: 1000,
498 ..Default::default()
499 };
500 let err = config.validate().unwrap_err();
501 assert!(
502 err.to_string()
503 .contains("chunk_size must be <= max_record_bytes")
504 );
505 }
506
507 #[test]
508 fn test_all_fragments_share_same_trace_context() {
509 use opentelemetry::Context;
510 use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId};
511
512 let mut parent = Exchange::new(Message::new("line1\nline2\nline3"));
514 let trace_id =
515 TraceId::from_bytes([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0x3B, 0x9A, 0xCA, 0x09]);
516 let span_id = SpanId::from_bytes([0, 0, 0, 0, 0, 0, 0, 111]);
517 let span_context = SpanContext::new(
518 trace_id,
519 span_id,
520 TraceFlags::SAMPLED,
521 true,
522 Default::default(),
523 );
524 parent.otel_context = Context::current().with_remote_span_context(span_context);
525
526 let fragments = split_body_lines()(&parent);
527 assert_eq!(fragments.len(), 3);
528
529 let trace_ids: Vec<_> = fragments
531 .iter()
532 .map(|f| {
533 let span = f.otel_context.span();
534 span.span_context().trace_id()
535 })
536 .collect();
537
538 assert!(
539 trace_ids.iter().all(|&id| id == trace_id),
540 "All fragments should have the same trace ID"
541 );
542 }
543}