1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::error::CamelError;
5use crate::exchange::Exchange;
6
7pub type AggregationFn = Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>;
9
10pub enum CorrelationStrategy {
12 HeaderName(String),
14 Expression { expr: String, language: String },
16 #[allow(clippy::type_complexity)]
18 Fn(Arc<dyn Fn(&Exchange) -> Option<String> + Send + Sync>),
19}
20
21impl Clone for CorrelationStrategy {
22 fn clone(&self) -> Self {
23 match self {
24 CorrelationStrategy::HeaderName(h) => CorrelationStrategy::HeaderName(h.clone()),
25 CorrelationStrategy::Expression { expr, language } => CorrelationStrategy::Expression {
26 expr: expr.clone(),
27 language: language.clone(),
28 },
29 CorrelationStrategy::Fn(f) => CorrelationStrategy::Fn(Arc::clone(f)),
30 }
31 }
32}
33
34impl std::fmt::Debug for CorrelationStrategy {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 CorrelationStrategy::HeaderName(h) => f.debug_tuple("HeaderName").field(h).finish(),
38 CorrelationStrategy::Expression { expr, language } => f
39 .debug_struct("Expression")
40 .field("expr", expr)
41 .field("language", language)
42 .finish(),
43 CorrelationStrategy::Fn(_) => f.write_str("Fn(..)"),
44 }
45 }
46}
47
48#[derive(Clone)]
50pub enum AggregationStrategy {
51 CollectAll,
53 Custom(AggregationFn),
55}
56
57#[derive(Clone)]
59pub enum CompletionCondition {
60 Size(usize),
62 #[allow(clippy::type_complexity)]
64 Predicate(Arc<dyn Fn(&[Exchange]) -> bool + Send + Sync>),
65 Timeout(Duration),
67}
68
69#[derive(Clone)]
72pub enum CompletionMode {
73 Single(CompletionCondition),
74 Any(Vec<CompletionCondition>),
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub enum CompletionReason {
79 Size,
80 Predicate,
81 Timeout,
82 Stop,
83}
84
85impl CompletionReason {
86 pub fn as_str(&self) -> &'static str {
87 match self {
88 CompletionReason::Size => "size",
89 CompletionReason::Predicate => "predicate",
90 CompletionReason::Timeout => "timeout",
91 CompletionReason::Stop => "stop",
92 }
93 }
94}
95
96#[derive(Clone)]
98pub struct AggregatorConfig {
99 pub header_name: String,
101 pub completion: CompletionMode,
103 pub correlation: CorrelationStrategy,
105 pub strategy: AggregationStrategy,
107 pub max_buckets: Option<usize>,
110 pub bucket_ttl: Option<Duration>,
113 pub force_completion_on_stop: bool,
115 pub discard_on_timeout: bool,
117}
118
119impl AggregatorConfig {
120 pub fn correlate_by(header: impl Into<String>) -> AggregatorConfigBuilder {
122 let header_name = header.into();
123 AggregatorConfigBuilder {
124 header_name: header_name.clone(),
125 completion: None,
126 correlation: CorrelationStrategy::HeaderName(header_name),
127 strategy: AggregationStrategy::CollectAll,
128 max_buckets: None,
129 bucket_ttl: None,
130 force_completion_on_stop: false,
131 discard_on_timeout: false,
132 }
133 }
134}
135
136pub struct AggregatorConfigBuilder {
138 header_name: String,
139 completion: Option<CompletionMode>,
140 correlation: CorrelationStrategy,
141 strategy: AggregationStrategy,
142 max_buckets: Option<usize>,
143 bucket_ttl: Option<Duration>,
144 force_completion_on_stop: bool,
145 discard_on_timeout: bool,
146}
147
148impl AggregatorConfigBuilder {
149 pub fn complete_when_size(mut self, n: usize) -> Self {
151 self.completion = Some(CompletionMode::Single(CompletionCondition::Size(n)));
152 self
153 }
154
155 pub fn complete_when<F>(mut self, predicate: F) -> Self
157 where
158 F: Fn(&[Exchange]) -> bool + Send + Sync + 'static,
159 {
160 self.completion = Some(CompletionMode::Single(CompletionCondition::Predicate(
161 Arc::new(predicate),
162 )));
163 self
164 }
165
166 pub fn complete_on_timeout(mut self, duration: Duration) -> Self {
168 self.completion = Some(CompletionMode::Single(CompletionCondition::Timeout(
169 duration,
170 )));
171 self
172 }
173
174 pub fn complete_on_size_or_timeout(mut self, size: usize, timeout: Duration) -> Self {
176 self.completion = Some(CompletionMode::Any(vec![
177 CompletionCondition::Size(size),
178 CompletionCondition::Timeout(timeout),
179 ]));
180 self
181 }
182
183 pub fn force_completion_on_stop(mut self, enabled: bool) -> Self {
185 self.force_completion_on_stop = enabled;
186 self
187 }
188
189 pub fn discard_on_timeout(mut self, enabled: bool) -> Self {
191 self.discard_on_timeout = enabled;
192 self
193 }
194
195 pub fn correlate_by(mut self, header: impl Into<String>) -> Self {
197 let header = header.into();
198 self.header_name = header.clone();
199 self.correlation = CorrelationStrategy::HeaderName(header);
200 self
201 }
202
203 pub fn strategy(mut self, strategy: AggregationStrategy) -> Self {
205 self.strategy = strategy;
206 self
207 }
208
209 pub fn max_buckets(mut self, max: usize) -> Self {
212 self.max_buckets = Some(max);
213 self
214 }
215
216 pub fn bucket_ttl(mut self, ttl: Duration) -> Self {
219 self.bucket_ttl = Some(ttl);
220 self
221 }
222
223 pub fn try_build(self) -> Result<AggregatorConfig, CamelError> {
224 let completion = self.completion.ok_or_else(|| {
225 CamelError::Config("completion condition required for AggregatorConfig".into())
226 })?;
227 Ok(AggregatorConfig {
228 header_name: self.header_name,
229 completion,
230 correlation: self.correlation,
231 strategy: self.strategy,
232 max_buckets: self.max_buckets,
233 bucket_ttl: self.bucket_ttl,
234 force_completion_on_stop: self.force_completion_on_stop,
235 discard_on_timeout: self.discard_on_timeout,
236 })
237 }
238
239 pub fn build(self) -> Result<AggregatorConfig, CamelError> {
241 self.try_build()
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn test_aggregator_config_complete_when_size() {
251 let config = AggregatorConfig::correlate_by("orderId")
252 .complete_when_size(3)
253 .build()
254 .unwrap();
255 assert_eq!(config.header_name, "orderId");
256 assert!(matches!(
257 config.completion,
258 CompletionMode::Single(CompletionCondition::Size(3))
259 ));
260 assert!(matches!(config.strategy, AggregationStrategy::CollectAll));
261 }
262
263 #[test]
264 fn test_aggregator_config_complete_when_predicate() {
265 let config = AggregatorConfig::correlate_by("key")
266 .complete_when(|bucket| bucket.len() >= 2)
267 .build()
268 .unwrap();
269 assert!(matches!(
270 config.completion,
271 CompletionMode::Single(CompletionCondition::Predicate(_))
272 ));
273 }
274
275 #[test]
276 fn test_aggregator_config_custom_strategy() {
277 use std::sync::Arc;
278 let f: AggregationFn = Arc::new(|acc, _next| acc);
279 let config = AggregatorConfig::correlate_by("key")
280 .complete_when_size(1)
281 .strategy(AggregationStrategy::Custom(f))
282 .build()
283 .unwrap();
284 assert!(matches!(config.strategy, AggregationStrategy::Custom(_)));
285 }
286
287 #[test]
288 fn test_aggregator_config_missing_completion_returns_err() {
289 let result = AggregatorConfig::correlate_by("key").build();
290 let err = match result {
291 Err(e) => e,
292 Ok(_) => panic!("expected error, got Ok"),
293 };
294 assert!(
295 err.to_string().contains("completion"),
296 "error message should mention 'completion': {err}"
297 );
298 }
299
300 #[test]
301 fn test_complete_on_size_or_timeout() {
302 let config = AggregatorConfig::correlate_by("key")
303 .complete_on_size_or_timeout(3, Duration::from_secs(5))
304 .build()
305 .unwrap();
306 assert!(matches!(config.completion, CompletionMode::Any(v) if v.len() == 2));
307 }
308
309 #[test]
310 fn test_force_completion_on_stop_default() {
311 let config = AggregatorConfig::correlate_by("key")
312 .complete_when_size(1)
313 .build()
314 .unwrap();
315 assert!(!config.force_completion_on_stop);
316 assert!(!config.discard_on_timeout);
317 }
318
319 #[test]
320 fn test_builder_sets_timeout_and_flags_and_limits() {
321 let config = AggregatorConfig::correlate_by("key")
322 .complete_on_timeout(Duration::from_secs(2))
323 .max_buckets(7)
324 .bucket_ttl(Duration::from_secs(10))
325 .force_completion_on_stop(true)
326 .discard_on_timeout(true)
327 .build()
328 .unwrap();
329
330 assert!(matches!(
331 config.completion,
332 CompletionMode::Single(CompletionCondition::Timeout(d)) if d == Duration::from_secs(2)
333 ));
334 assert_eq!(config.max_buckets, Some(7));
335 assert_eq!(config.bucket_ttl, Some(Duration::from_secs(10)));
336 assert!(config.force_completion_on_stop);
337 assert!(config.discard_on_timeout);
338 }
339
340 #[test]
341 fn test_builder_correlate_by_overrides_header_and_strategy() {
342 let config = AggregatorConfig::correlate_by("original")
343 .correlate_by("override")
344 .complete_when_size(1)
345 .build()
346 .unwrap();
347
348 assert_eq!(config.header_name, "override");
349 assert!(matches!(
350 config.correlation,
351 CorrelationStrategy::HeaderName(ref h) if h == "override"
352 ));
353 }
354
355 #[test]
356 fn test_completion_reason_as_str_all_variants() {
357 assert_eq!(CompletionReason::Size.as_str(), "size");
358 assert_eq!(CompletionReason::Predicate.as_str(), "predicate");
359 assert_eq!(CompletionReason::Timeout.as_str(), "timeout");
360 assert_eq!(CompletionReason::Stop.as_str(), "stop");
361 }
362
363 #[test]
364 fn test_correlation_strategy_clone_and_debug() {
365 let strategy = CorrelationStrategy::Expression {
366 expr: "${header.orderId}".to_string(),
367 language: "simple".to_string(),
368 };
369 let cloned = strategy.clone();
370 assert!(matches!(
371 cloned,
372 CorrelationStrategy::Expression { ref expr, ref language }
373 if expr == "${header.orderId}" && language == "simple"
374 ));
375
376 let f = CorrelationStrategy::Fn(Arc::new(|_| Some("k".to_string())));
377 assert_eq!(format!("{:?}", f), "Fn(..)");
378 }
379
380 #[test]
381 fn test_complete_on_size_or_timeout_contains_both_conditions() {
382 let config = AggregatorConfig::correlate_by("k")
383 .complete_on_size_or_timeout(4, Duration::from_millis(250))
384 .build()
385 .unwrap();
386
387 match config.completion {
388 CompletionMode::Any(conditions) => {
389 assert!(matches!(conditions[0], CompletionCondition::Size(4)));
390 assert!(matches!(
391 conditions[1],
392 CompletionCondition::Timeout(d) if d == Duration::from_millis(250)
393 ));
394 }
395 _ => panic!("expected CompletionMode::Any"),
396 }
397 }
398
399 #[test]
400 fn test_correlation_strategy_fn_clone_shares_same_arc() {
401 let f: Arc<dyn Fn(&Exchange) -> Option<String> + Send + Sync> =
402 Arc::new(|_| Some("shared".to_string()));
403 let strategy = CorrelationStrategy::Fn(f.clone());
404 let cloned = strategy.clone();
405
406 match cloned {
407 CorrelationStrategy::Fn(cloned_fn) => assert!(Arc::ptr_eq(&f, &cloned_fn)),
408 _ => panic!("expected fn strategy"),
409 }
410 }
411
412 #[test]
413 fn test_builder_correlate_by_overrides_previous() {
414 let config = AggregatorConfig::correlate_by("first")
415 .correlate_by("second")
416 .complete_when_size(2)
417 .build()
418 .unwrap();
419
420 assert_eq!(config.header_name, "second");
421 assert!(
422 matches!(config.correlation, CorrelationStrategy::HeaderName(ref h) if h == "second")
423 );
424 }
425
426 #[test]
427 fn test_aggregator_try_build_missing_completion_returns_error() {
428 let result = AggregatorConfig::correlate_by("key").try_build();
429 assert!(result.is_err());
430 }
431}