Skip to main content

camel_api/
splitter.rs

1use std::sync::Arc;
2
3use crate::body::Body;
4use crate::exchange::Exchange;
5use crate::message::Message;
6
7/// A function that splits a single exchange into multiple fragment exchanges.
8pub type SplitExpression = Arc<dyn Fn(&Exchange) -> Vec<Exchange> + Send + Sync>;
9
10/// Strategy for aggregating fragment results back into a single exchange.
11#[derive(Clone, Default)]
12pub enum AggregationStrategy {
13    /// Result is the last fragment's exchange (default).
14    #[default]
15    LastWins,
16    /// Collects all fragment bodies into a JSON array.
17    CollectAll,
18    /// Returns the original exchange unchanged.
19    Original,
20    /// Custom aggregation function: `(accumulated, next) -> merged`.
21    Custom(Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>),
22}
23
24/// Configuration for the Splitter EIP.
25pub struct SplitterConfig {
26    /// Expression that splits an exchange into fragments.
27    pub expression: SplitExpression,
28    /// How to aggregate fragment results.
29    pub aggregation: AggregationStrategy,
30    /// Whether to process fragments in parallel.
31    pub parallel: bool,
32    /// Maximum number of parallel fragments (None = unlimited).
33    pub parallel_limit: Option<usize>,
34    /// Whether to stop processing on the first exception.
35    ///
36    /// In parallel mode this only affects aggregation (the first error is
37    /// propagated), **not** in-flight futures — `join_all` cannot cancel
38    /// already-spawned work.
39    pub stop_on_exception: bool,
40}
41
42impl SplitterConfig {
43    /// Create a new splitter config with the given split expression.
44    pub fn new(expression: SplitExpression) -> Self {
45        Self {
46            expression,
47            aggregation: AggregationStrategy::default(),
48            parallel: false,
49            parallel_limit: None,
50            stop_on_exception: true,
51        }
52    }
53
54    /// Set the aggregation strategy for combining fragment results.
55    pub fn aggregation(mut self, strategy: AggregationStrategy) -> Self {
56        self.aggregation = strategy;
57        self
58    }
59
60    /// Enable or disable parallel fragment processing.
61    pub fn parallel(mut self, parallel: bool) -> Self {
62        self.parallel = parallel;
63        self
64    }
65
66    /// Set the maximum number of concurrent fragments in parallel mode.
67    pub fn parallel_limit(mut self, limit: usize) -> Self {
68        self.parallel_limit = Some(limit);
69        self
70    }
71
72    /// Control whether processing stops on the first fragment error.
73    ///
74    /// In parallel mode this only affects aggregation — see the field-level
75    /// doc comment for details.
76    pub fn stop_on_exception(mut self, stop: bool) -> Self {
77        self.stop_on_exception = stop;
78        self
79    }
80}
81
82// ---------------------------------------------------------------------------
83// Helpers
84// ---------------------------------------------------------------------------
85
86/// Create a fragment exchange that inherits headers and properties from the
87/// parent, but with a new body.
88fn fragment_exchange(parent: &Exchange, body: Body) -> Exchange {
89    let mut msg = Message::new(body);
90    msg.headers = parent.input.headers.clone();
91    let mut ex = Exchange::new(msg);
92    ex.properties = parent.properties.clone();
93    ex.pattern = parent.pattern;
94    ex
95}
96
97/// Split the exchange body by newlines. Returns one fragment per line.
98/// Non-text bodies produce an empty vec.
99pub fn split_body_lines() -> SplitExpression {
100    Arc::new(|exchange: &Exchange| {
101        let text = match &exchange.input.body {
102            Body::Text(s) => s.as_str(),
103            _ => return Vec::new(),
104        };
105        text.lines()
106            .map(|line| fragment_exchange(exchange, Body::Text(line.to_string())))
107            .collect()
108    })
109}
110
111/// Split a JSON array body into one fragment per element.
112/// Non-array bodies produce an empty vec.
113pub fn split_body_json_array() -> SplitExpression {
114    Arc::new(|exchange: &Exchange| {
115        let arr = match &exchange.input.body {
116            Body::Json(serde_json::Value::Array(arr)) => arr,
117            _ => return Vec::new(),
118        };
119        arr.iter()
120            .map(|val| fragment_exchange(exchange, Body::Json(val.clone())))
121            .collect()
122    })
123}
124
125/// Split the exchange body using a custom function that operates on the body.
126pub fn split_body<F>(f: F) -> SplitExpression
127where
128    F: Fn(&Body) -> Vec<Body> + Send + Sync + 'static,
129{
130    Arc::new(move |exchange: &Exchange| {
131        f(&exchange.input.body)
132            .into_iter()
133            .map(|body| fragment_exchange(exchange, body))
134            .collect()
135    })
136}
137
138#[cfg(test)]
139mod tests {
140    use super::*;
141    use crate::value::Value;
142
143    #[test]
144    fn test_split_body_lines() {
145        let mut ex = Exchange::new(Message::new("a\nb\nc"));
146        ex.input.set_header("source", Value::String("test".into()));
147        ex.set_property("trace", Value::Bool(true));
148
149        let fragments = split_body_lines()(&ex);
150        assert_eq!(fragments.len(), 3);
151        assert_eq!(fragments[0].input.body.as_text(), Some("a"));
152        assert_eq!(fragments[1].input.body.as_text(), Some("b"));
153        assert_eq!(fragments[2].input.body.as_text(), Some("c"));
154
155        // Verify headers and properties inherited
156        for frag in &fragments {
157            assert_eq!(
158                frag.input.header("source"),
159                Some(&Value::String("test".into()))
160            );
161            assert_eq!(frag.property("trace"), Some(&Value::Bool(true)));
162        }
163    }
164
165    #[test]
166    fn test_split_body_lines_empty() {
167        let ex = Exchange::new(Message::default()); // Body::Empty
168        let fragments = split_body_lines()(&ex);
169        assert!(fragments.is_empty());
170    }
171
172    #[test]
173    fn test_split_body_json_array() {
174        let arr = serde_json::json!([1, 2, 3]);
175        let ex = Exchange::new(Message::new(arr));
176
177        let fragments = split_body_json_array()(&ex);
178        assert_eq!(fragments.len(), 3);
179        assert!(matches!(&fragments[0].input.body, Body::Json(v) if *v == serde_json::json!(1)));
180        assert!(matches!(&fragments[1].input.body, Body::Json(v) if *v == serde_json::json!(2)));
181        assert!(matches!(&fragments[2].input.body, Body::Json(v) if *v == serde_json::json!(3)));
182    }
183
184    #[test]
185    fn test_split_body_json_array_not_array() {
186        let obj = serde_json::json!({"not": "array"});
187        let ex = Exchange::new(Message::new(obj));
188
189        let fragments = split_body_json_array()(&ex);
190        assert!(fragments.is_empty());
191    }
192
193    #[test]
194    fn test_split_body_custom() {
195        let splitter = split_body(|body: &Body| match body {
196            Body::Text(s) => s
197                .split(',')
198                .map(|part| Body::Text(part.trim().to_string()))
199                .collect(),
200            _ => Vec::new(),
201        });
202
203        let mut ex = Exchange::new(Message::new("x, y, z"));
204        ex.set_property("id", Value::from(42));
205
206        let fragments = splitter(&ex);
207        assert_eq!(fragments.len(), 3);
208        assert_eq!(fragments[0].input.body.as_text(), Some("x"));
209        assert_eq!(fragments[1].input.body.as_text(), Some("y"));
210        assert_eq!(fragments[2].input.body.as_text(), Some("z"));
211
212        // Properties inherited
213        for frag in &fragments {
214            assert_eq!(frag.property("id"), Some(&Value::from(42)));
215        }
216    }
217
218    #[test]
219    fn test_splitter_config_defaults() {
220        let config = SplitterConfig::new(split_body_lines());
221        assert!(matches!(config.aggregation, AggregationStrategy::LastWins));
222        assert!(!config.parallel);
223        assert!(config.parallel_limit.is_none());
224        assert!(config.stop_on_exception);
225    }
226
227    #[test]
228    fn test_splitter_config_builder() {
229        let config = SplitterConfig::new(split_body_lines())
230            .aggregation(AggregationStrategy::CollectAll)
231            .parallel(true)
232            .parallel_limit(4)
233            .stop_on_exception(false);
234
235        assert!(matches!(
236            config.aggregation,
237            AggregationStrategy::CollectAll
238        ));
239        assert!(config.parallel);
240        assert_eq!(config.parallel_limit, Some(4));
241        assert!(!config.stop_on_exception);
242    }
243}