1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::error::CamelError;
5use crate::exchange::Exchange;
6
7pub type AggregationFn = Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>;
9
10pub enum CorrelationStrategy {
12 HeaderName(String),
14 Expression { expr: String, language: String },
16 #[allow(clippy::type_complexity)]
18 Fn(Arc<dyn Fn(&Exchange) -> Option<String> + Send + Sync>),
19}
20
21impl Clone for CorrelationStrategy {
22 fn clone(&self) -> Self {
23 match self {
24 CorrelationStrategy::HeaderName(h) => CorrelationStrategy::HeaderName(h.clone()),
25 CorrelationStrategy::Expression { expr, language } => CorrelationStrategy::Expression {
26 expr: expr.clone(),
27 language: language.clone(),
28 },
29 CorrelationStrategy::Fn(f) => CorrelationStrategy::Fn(Arc::clone(f)),
30 }
31 }
32}
33
34impl std::fmt::Debug for CorrelationStrategy {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 CorrelationStrategy::HeaderName(h) => f.debug_tuple("HeaderName").field(h).finish(),
38 CorrelationStrategy::Expression { expr, language } => f
39 .debug_struct("Expression")
40 .field("expr", expr)
41 .field("language", language)
42 .finish(),
43 CorrelationStrategy::Fn(_) => f.write_str("Fn(..)"),
44 }
45 }
46}
47
48#[derive(Clone)]
50pub enum AggregationStrategy {
51 CollectAll,
53 Custom(AggregationFn),
55}
56
57#[derive(Clone)]
59pub enum CompletionCondition {
60 Size(usize),
62 #[allow(clippy::type_complexity)]
64 Predicate(Arc<dyn Fn(&[Exchange]) -> bool + Send + Sync>),
65 Timeout(Duration),
67}
68
69#[derive(Clone)]
72pub enum CompletionMode {
73 Single(CompletionCondition),
74 Any(Vec<CompletionCondition>),
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub enum CompletionReason {
79 Size,
80 Predicate,
81 Timeout,
82 Stop,
83}
84
85impl CompletionReason {
86 pub fn as_str(&self) -> &'static str {
87 match self {
88 CompletionReason::Size => "size",
89 CompletionReason::Predicate => "predicate",
90 CompletionReason::Timeout => "timeout",
91 CompletionReason::Stop => "stop",
92 }
93 }
94}
95
96#[derive(Clone)]
98pub struct AggregatorConfig {
99 pub header_name: String,
101 pub completion: CompletionMode,
103 pub correlation: CorrelationStrategy,
105 pub strategy: AggregationStrategy,
107 pub max_buckets: Option<usize>,
110 pub bucket_ttl: Option<Duration>,
113 pub force_completion_on_stop: bool,
115 pub discard_on_timeout: bool,
117}
118
119impl AggregatorConfig {
120 pub fn correlate_by(header: impl Into<String>) -> AggregatorConfigBuilder {
122 let header_name = header.into();
123 AggregatorConfigBuilder {
124 header_name: header_name.clone(),
125 completion: None,
126 correlation: CorrelationStrategy::HeaderName(header_name),
127 strategy: AggregationStrategy::CollectAll,
128 max_buckets: None,
129 bucket_ttl: None,
130 force_completion_on_stop: false,
131 discard_on_timeout: false,
132 }
133 }
134}
135
136pub struct AggregatorConfigBuilder {
138 header_name: String,
139 completion: Option<CompletionMode>,
140 correlation: CorrelationStrategy,
141 strategy: AggregationStrategy,
142 max_buckets: Option<usize>,
143 bucket_ttl: Option<Duration>,
144 force_completion_on_stop: bool,
145 discard_on_timeout: bool,
146}
147
148impl AggregatorConfigBuilder {
149 pub fn complete_when_size(mut self, n: usize) -> Self {
151 self.completion = Some(CompletionMode::Single(CompletionCondition::Size(n)));
152 self
153 }
154
155 pub fn complete_when<F>(mut self, predicate: F) -> Self
157 where
158 F: Fn(&[Exchange]) -> bool + Send + Sync + 'static,
159 {
160 self.completion = Some(CompletionMode::Single(CompletionCondition::Predicate(
161 Arc::new(predicate),
162 )));
163 self
164 }
165
166 pub fn complete_on_timeout(mut self, duration: Duration) -> Self {
168 self.completion = Some(CompletionMode::Single(CompletionCondition::Timeout(
169 duration,
170 )));
171 self
172 }
173
174 pub fn complete_on_size_or_timeout(mut self, size: usize, timeout: Duration) -> Self {
176 self.completion = Some(CompletionMode::Any(vec![
177 CompletionCondition::Size(size),
178 CompletionCondition::Timeout(timeout),
179 ]));
180 self
181 }
182
183 pub fn force_completion_on_stop(mut self, enabled: bool) -> Self {
185 self.force_completion_on_stop = enabled;
186 self
187 }
188
189 pub fn discard_on_timeout(mut self, enabled: bool) -> Self {
191 self.discard_on_timeout = enabled;
192 self
193 }
194
195 pub fn correlate_by(mut self, header: impl Into<String>) -> Self {
197 let header = header.into();
198 self.header_name = header.clone();
199 self.correlation = CorrelationStrategy::HeaderName(header);
200 self
201 }
202
203 pub fn strategy(mut self, strategy: AggregationStrategy) -> Self {
205 self.strategy = strategy;
206 self
207 }
208
209 pub fn max_buckets(mut self, max: usize) -> Self {
212 self.max_buckets = Some(max);
213 self
214 }
215
216 pub fn bucket_ttl(mut self, ttl: Duration) -> Self {
219 self.bucket_ttl = Some(ttl);
220 self
221 }
222
223 pub fn try_build(self) -> Result<AggregatorConfig, CamelError> {
224 let completion = self.completion.ok_or_else(|| {
225 CamelError::ProcessorError("completion condition required for AggregatorConfig".into())
226 })?;
227 Ok(AggregatorConfig {
228 header_name: self.header_name,
229 completion,
230 correlation: self.correlation,
231 strategy: self.strategy,
232 max_buckets: self.max_buckets,
233 bucket_ttl: self.bucket_ttl,
234 force_completion_on_stop: self.force_completion_on_stop,
235 discard_on_timeout: self.discard_on_timeout,
236 })
237 }
238
239 pub fn build(self) -> AggregatorConfig {
241 self.try_build().expect("completion condition required") }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248
249 #[test]
250 fn test_aggregator_config_complete_when_size() {
251 let config = AggregatorConfig::correlate_by("orderId")
252 .complete_when_size(3)
253 .build();
254 assert_eq!(config.header_name, "orderId");
255 assert!(matches!(
256 config.completion,
257 CompletionMode::Single(CompletionCondition::Size(3))
258 ));
259 assert!(matches!(config.strategy, AggregationStrategy::CollectAll));
260 }
261
262 #[test]
263 fn test_aggregator_config_complete_when_predicate() {
264 let config = AggregatorConfig::correlate_by("key")
265 .complete_when(|bucket| bucket.len() >= 2)
266 .build();
267 assert!(matches!(
268 config.completion,
269 CompletionMode::Single(CompletionCondition::Predicate(_))
270 ));
271 }
272
273 #[test]
274 fn test_aggregator_config_custom_strategy() {
275 use std::sync::Arc;
276 let f: AggregationFn = Arc::new(|acc, _next| acc);
277 let config = AggregatorConfig::correlate_by("key")
278 .complete_when_size(1)
279 .strategy(AggregationStrategy::Custom(f))
280 .build();
281 assert!(matches!(config.strategy, AggregationStrategy::Custom(_)));
282 }
283
284 #[test]
285 #[should_panic(expected = "completion condition required")]
286 fn test_aggregator_config_missing_completion_panics() {
287 AggregatorConfig::correlate_by("key").build();
288 }
289
290 #[test]
291 fn test_complete_on_size_or_timeout() {
292 let config = AggregatorConfig::correlate_by("key")
293 .complete_on_size_or_timeout(3, Duration::from_secs(5))
294 .build();
295 assert!(matches!(config.completion, CompletionMode::Any(v) if v.len() == 2));
296 }
297
298 #[test]
299 fn test_force_completion_on_stop_default() {
300 let config = AggregatorConfig::correlate_by("key")
301 .complete_when_size(1)
302 .build();
303 assert!(!config.force_completion_on_stop);
304 assert!(!config.discard_on_timeout);
305 }
306
307 #[test]
308 fn test_builder_sets_timeout_and_flags_and_limits() {
309 let config = AggregatorConfig::correlate_by("key")
310 .complete_on_timeout(Duration::from_secs(2))
311 .max_buckets(7)
312 .bucket_ttl(Duration::from_secs(10))
313 .force_completion_on_stop(true)
314 .discard_on_timeout(true)
315 .build();
316
317 assert!(matches!(
318 config.completion,
319 CompletionMode::Single(CompletionCondition::Timeout(d)) if d == Duration::from_secs(2)
320 ));
321 assert_eq!(config.max_buckets, Some(7));
322 assert_eq!(config.bucket_ttl, Some(Duration::from_secs(10)));
323 assert!(config.force_completion_on_stop);
324 assert!(config.discard_on_timeout);
325 }
326
327 #[test]
328 fn test_builder_correlate_by_overrides_header_and_strategy() {
329 let config = AggregatorConfig::correlate_by("original")
330 .correlate_by("override")
331 .complete_when_size(1)
332 .build();
333
334 assert_eq!(config.header_name, "override");
335 assert!(matches!(
336 config.correlation,
337 CorrelationStrategy::HeaderName(ref h) if h == "override"
338 ));
339 }
340
341 #[test]
342 fn test_completion_reason_as_str_all_variants() {
343 assert_eq!(CompletionReason::Size.as_str(), "size");
344 assert_eq!(CompletionReason::Predicate.as_str(), "predicate");
345 assert_eq!(CompletionReason::Timeout.as_str(), "timeout");
346 assert_eq!(CompletionReason::Stop.as_str(), "stop");
347 }
348
349 #[test]
350 fn test_correlation_strategy_clone_and_debug() {
351 let strategy = CorrelationStrategy::Expression {
352 expr: "${header.orderId}".to_string(),
353 language: "simple".to_string(),
354 };
355 let cloned = strategy.clone();
356 assert!(matches!(
357 cloned,
358 CorrelationStrategy::Expression { ref expr, ref language }
359 if expr == "${header.orderId}" && language == "simple"
360 ));
361
362 let f = CorrelationStrategy::Fn(Arc::new(|_| Some("k".to_string())));
363 assert_eq!(format!("{:?}", f), "Fn(..)");
364 }
365
366 #[test]
367 fn test_complete_on_size_or_timeout_contains_both_conditions() {
368 let config = AggregatorConfig::correlate_by("k")
369 .complete_on_size_or_timeout(4, Duration::from_millis(250))
370 .build();
371
372 match config.completion {
373 CompletionMode::Any(conditions) => {
374 assert!(matches!(conditions[0], CompletionCondition::Size(4)));
375 assert!(matches!(
376 conditions[1],
377 CompletionCondition::Timeout(d) if d == Duration::from_millis(250)
378 ));
379 }
380 _ => panic!("expected CompletionMode::Any"),
381 }
382 }
383
384 #[test]
385 fn test_correlation_strategy_fn_clone_shares_same_arc() {
386 let f: Arc<dyn Fn(&Exchange) -> Option<String> + Send + Sync> =
387 Arc::new(|_| Some("shared".to_string()));
388 let strategy = CorrelationStrategy::Fn(f.clone());
389 let cloned = strategy.clone();
390
391 match cloned {
392 CorrelationStrategy::Fn(cloned_fn) => assert!(Arc::ptr_eq(&f, &cloned_fn)),
393 _ => panic!("expected fn strategy"),
394 }
395 }
396
397 #[test]
398 fn test_builder_correlate_by_overrides_previous() {
399 let config = AggregatorConfig::correlate_by("first")
400 .correlate_by("second")
401 .complete_when_size(2)
402 .build();
403
404 assert_eq!(config.header_name, "second");
405 assert!(
406 matches!(config.correlation, CorrelationStrategy::HeaderName(ref h) if h == "second")
407 );
408 }
409
410 #[test]
411 fn test_aggregator_try_build_missing_completion_returns_error() {
412 let result = AggregatorConfig::correlate_by("key").try_build();
413 assert!(result.is_err());
414 }
415}