1use std::sync::Arc;
2
3use crate::body::Body;
4use crate::exchange::Exchange;
5use crate::message::Message;
6
7pub type SplitExpression = Arc<dyn Fn(&Exchange) -> Vec<Exchange> + Send + Sync>;
9
10#[derive(Clone, Default)]
12pub enum AggregationStrategy {
13 #[default]
15 LastWins,
16 CollectAll,
18 Original,
20 Custom(Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>),
22}
23
24pub struct SplitterConfig {
26 pub expression: SplitExpression,
28 pub aggregation: AggregationStrategy,
30 pub parallel: bool,
32 pub parallel_limit: Option<usize>,
34 pub stop_on_exception: bool,
40}
41
42impl SplitterConfig {
43 pub fn new(expression: SplitExpression) -> Self {
45 Self {
46 expression,
47 aggregation: AggregationStrategy::default(),
48 parallel: false,
49 parallel_limit: None,
50 stop_on_exception: true,
51 }
52 }
53
54 pub fn aggregation(mut self, strategy: AggregationStrategy) -> Self {
56 self.aggregation = strategy;
57 self
58 }
59
60 pub fn parallel(mut self, parallel: bool) -> Self {
62 self.parallel = parallel;
63 self
64 }
65
66 pub fn parallel_limit(mut self, limit: usize) -> Self {
68 self.parallel_limit = Some(limit);
69 self
70 }
71
72 pub fn stop_on_exception(mut self, stop: bool) -> Self {
77 self.stop_on_exception = stop;
78 self
79 }
80}
81
82fn fragment_exchange(parent: &Exchange, body: Body) -> Exchange {
89 let mut msg = Message::new(body);
90 msg.headers = parent.input.headers.clone();
91 let mut ex = Exchange::new(msg);
92 ex.properties = parent.properties.clone();
93 ex.pattern = parent.pattern;
94 ex
95}
96
97pub fn split_body_lines() -> SplitExpression {
100 Arc::new(|exchange: &Exchange| {
101 let text = match &exchange.input.body {
102 Body::Text(s) => s.as_str(),
103 _ => return Vec::new(),
104 };
105 text.lines()
106 .map(|line| fragment_exchange(exchange, Body::Text(line.to_string())))
107 .collect()
108 })
109}
110
111pub fn split_body_json_array() -> SplitExpression {
114 Arc::new(|exchange: &Exchange| {
115 let arr = match &exchange.input.body {
116 Body::Json(serde_json::Value::Array(arr)) => arr,
117 _ => return Vec::new(),
118 };
119 arr.iter()
120 .map(|val| fragment_exchange(exchange, Body::Json(val.clone())))
121 .collect()
122 })
123}
124
125pub fn split_body<F>(f: F) -> SplitExpression
127where
128 F: Fn(&Body) -> Vec<Body> + Send + Sync + 'static,
129{
130 Arc::new(move |exchange: &Exchange| {
131 f(&exchange.input.body)
132 .into_iter()
133 .map(|body| fragment_exchange(exchange, body))
134 .collect()
135 })
136}
137
138#[cfg(test)]
139mod tests {
140 use super::*;
141 use crate::value::Value;
142
143 #[test]
144 fn test_split_body_lines() {
145 let mut ex = Exchange::new(Message::new("a\nb\nc"));
146 ex.input.set_header("source", Value::String("test".into()));
147 ex.set_property("trace", Value::Bool(true));
148
149 let fragments = split_body_lines()(&ex);
150 assert_eq!(fragments.len(), 3);
151 assert_eq!(fragments[0].input.body.as_text(), Some("a"));
152 assert_eq!(fragments[1].input.body.as_text(), Some("b"));
153 assert_eq!(fragments[2].input.body.as_text(), Some("c"));
154
155 for frag in &fragments {
157 assert_eq!(
158 frag.input.header("source"),
159 Some(&Value::String("test".into()))
160 );
161 assert_eq!(frag.property("trace"), Some(&Value::Bool(true)));
162 }
163 }
164
165 #[test]
166 fn test_split_body_lines_empty() {
167 let ex = Exchange::new(Message::default()); let fragments = split_body_lines()(&ex);
169 assert!(fragments.is_empty());
170 }
171
172 #[test]
173 fn test_split_body_json_array() {
174 let arr = serde_json::json!([1, 2, 3]);
175 let ex = Exchange::new(Message::new(arr));
176
177 let fragments = split_body_json_array()(&ex);
178 assert_eq!(fragments.len(), 3);
179 assert!(matches!(&fragments[0].input.body, Body::Json(v) if *v == serde_json::json!(1)));
180 assert!(matches!(&fragments[1].input.body, Body::Json(v) if *v == serde_json::json!(2)));
181 assert!(matches!(&fragments[2].input.body, Body::Json(v) if *v == serde_json::json!(3)));
182 }
183
184 #[test]
185 fn test_split_body_json_array_not_array() {
186 let obj = serde_json::json!({"not": "array"});
187 let ex = Exchange::new(Message::new(obj));
188
189 let fragments = split_body_json_array()(&ex);
190 assert!(fragments.is_empty());
191 }
192
193 #[test]
194 fn test_split_body_custom() {
195 let splitter = split_body(|body: &Body| match body {
196 Body::Text(s) => s
197 .split(',')
198 .map(|part| Body::Text(part.trim().to_string()))
199 .collect(),
200 _ => Vec::new(),
201 });
202
203 let mut ex = Exchange::new(Message::new("x, y, z"));
204 ex.set_property("id", Value::from(42));
205
206 let fragments = splitter(&ex);
207 assert_eq!(fragments.len(), 3);
208 assert_eq!(fragments[0].input.body.as_text(), Some("x"));
209 assert_eq!(fragments[1].input.body.as_text(), Some("y"));
210 assert_eq!(fragments[2].input.body.as_text(), Some("z"));
211
212 for frag in &fragments {
214 assert_eq!(frag.property("id"), Some(&Value::from(42)));
215 }
216 }
217
218 #[test]
219 fn test_splitter_config_defaults() {
220 let config = SplitterConfig::new(split_body_lines());
221 assert!(matches!(config.aggregation, AggregationStrategy::LastWins));
222 assert!(!config.parallel);
223 assert!(config.parallel_limit.is_none());
224 assert!(config.stop_on_exception);
225 }
226
227 #[test]
228 fn test_splitter_config_builder() {
229 let config = SplitterConfig::new(split_body_lines())
230 .aggregation(AggregationStrategy::CollectAll)
231 .parallel(true)
232 .parallel_limit(4)
233 .stop_on_exception(false);
234
235 assert!(matches!(
236 config.aggregation,
237 AggregationStrategy::CollectAll
238 ));
239 assert!(config.parallel);
240 assert_eq!(config.parallel_limit, Some(4));
241 assert!(!config.stop_on_exception);
242 }
243}