Skip to main content

camel_api/
aggregator.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::error::CamelError;
5use crate::exchange::Exchange;
6
7/// Aggregation function — left-fold binary: (accumulated, next) -> merged.
8pub type AggregationFn = Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>;
9
10/// Strategy for correlating exchanges into aggregation buckets.
11pub enum CorrelationStrategy {
12    /// Correlate by the value of a named header.
13    HeaderName(String),
14    /// Correlate by evaluating an expression using a language registry.
15    Expression { expr: String, language: String },
16    /// Correlate using a custom function.
17    #[allow(clippy::type_complexity)]
18    Fn(Arc<dyn Fn(&Exchange) -> Option<String> + Send + Sync>),
19}
20
21impl Clone for CorrelationStrategy {
22    fn clone(&self) -> Self {
23        match self {
24            CorrelationStrategy::HeaderName(h) => CorrelationStrategy::HeaderName(h.clone()),
25            CorrelationStrategy::Expression { expr, language } => CorrelationStrategy::Expression {
26                expr: expr.clone(),
27                language: language.clone(),
28            },
29            CorrelationStrategy::Fn(f) => CorrelationStrategy::Fn(Arc::clone(f)),
30        }
31    }
32}
33
34impl std::fmt::Debug for CorrelationStrategy {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            CorrelationStrategy::HeaderName(h) => f.debug_tuple("HeaderName").field(h).finish(),
38            CorrelationStrategy::Expression { expr, language } => f
39                .debug_struct("Expression")
40                .field("expr", expr)
41                .field("language", language)
42                .finish(),
43            CorrelationStrategy::Fn(_) => f.write_str("Fn(..)"),
44        }
45    }
46}
47
48/// How to combine collected exchanges into one.
49#[derive(Clone)]
50pub enum AggregationStrategy {
51    /// Collects all bodies into Body::Json([body1, body2, ...]).
52    CollectAll,
53    /// Left-fold: f(f(ex1, ex2), ex3), ...
54    Custom(AggregationFn),
55}
56
57/// When the bucket is considered complete and should be emitted.
58#[derive(Clone)]
59pub enum CompletionCondition {
60    /// Emit when bucket reaches exactly N exchanges.
61    Size(usize),
62    /// Emit when predicate returns true for current bucket.
63    #[allow(clippy::type_complexity)]
64    Predicate(Arc<dyn Fn(&[Exchange]) -> bool + Send + Sync>),
65    /// Emit when the bucket has been inactive for the given duration.
66    Timeout(Duration),
67}
68
69/// Determines how a bucket's completion is evaluated.
70/// `Single` wraps one condition; `Any` completes when the first condition triggers.
71#[derive(Clone)]
72pub enum CompletionMode {
73    Single(CompletionCondition),
74    Any(Vec<CompletionCondition>),
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub enum CompletionReason {
79    Size,
80    Predicate,
81    Timeout,
82    Stop,
83}
84
85impl CompletionReason {
86    pub fn as_str(&self) -> &'static str {
87        match self {
88            CompletionReason::Size => "size",
89            CompletionReason::Predicate => "predicate",
90            CompletionReason::Timeout => "timeout",
91            CompletionReason::Stop => "stop",
92        }
93    }
94}
95
96/// Configuration for the Aggregator EIP.
97#[derive(Clone)]
98pub struct AggregatorConfig {
99    /// Name of the header used as correlation key.
100    pub header_name: String,
101    /// When to emit the aggregated exchange.
102    pub completion: CompletionMode,
103    /// Strategy for determining correlation keys.
104    pub correlation: CorrelationStrategy,
105    /// How to combine the bucket into one exchange.
106    pub strategy: AggregationStrategy,
107    /// Maximum number of correlation key buckets (memory protection).
108    /// When limit is reached, new correlation keys are rejected.
109    pub max_buckets: Option<usize>,
110    /// Time-to-live for inactive buckets (memory protection).
111    /// Buckets not updated for this duration are evicted.
112    pub bucket_ttl: Option<Duration>,
113    /// Force-complete all pending buckets when the route is stopped.
114    pub force_completion_on_stop: bool,
115    /// Discard bucket contents on timeout instead of emitting.
116    pub discard_on_timeout: bool,
117}
118
119impl AggregatorConfig {
120    /// Start building config with correlation key extracted from the named header.
121    pub fn correlate_by(header: impl Into<String>) -> AggregatorConfigBuilder {
122        let header_name = header.into();
123        AggregatorConfigBuilder {
124            header_name: header_name.clone(),
125            completion: None,
126            correlation: CorrelationStrategy::HeaderName(header_name),
127            strategy: AggregationStrategy::CollectAll,
128            max_buckets: None,
129            bucket_ttl: None,
130            force_completion_on_stop: false,
131            discard_on_timeout: false,
132        }
133    }
134}
135
136/// Builder for `AggregatorConfig`.
137pub struct AggregatorConfigBuilder {
138    header_name: String,
139    completion: Option<CompletionMode>,
140    correlation: CorrelationStrategy,
141    strategy: AggregationStrategy,
142    max_buckets: Option<usize>,
143    bucket_ttl: Option<Duration>,
144    force_completion_on_stop: bool,
145    discard_on_timeout: bool,
146}
147
148impl AggregatorConfigBuilder {
149    /// Emit when bucket has N exchanges.
150    pub fn complete_when_size(mut self, n: usize) -> Self {
151        self.completion = Some(CompletionMode::Single(CompletionCondition::Size(n)));
152        self
153    }
154
155    /// Emit when predicate returns true for the current bucket.
156    pub fn complete_when<F>(mut self, predicate: F) -> Self
157    where
158        F: Fn(&[Exchange]) -> bool + Send + Sync + 'static,
159    {
160        self.completion = Some(CompletionMode::Single(CompletionCondition::Predicate(
161            Arc::new(predicate),
162        )));
163        self
164    }
165
166    /// Emit when the bucket has been inactive for the given duration.
167    pub fn complete_on_timeout(mut self, duration: Duration) -> Self {
168        self.completion = Some(CompletionMode::Single(CompletionCondition::Timeout(
169            duration,
170        )));
171        self
172    }
173
174    /// Emit when the bucket reaches `size` OR has been inactive for `timeout`.
175    pub fn complete_on_size_or_timeout(mut self, size: usize, timeout: Duration) -> Self {
176        self.completion = Some(CompletionMode::Any(vec![
177            CompletionCondition::Size(size),
178            CompletionCondition::Timeout(timeout),
179        ]));
180        self
181    }
182
183    /// Enable force-completion of pending buckets when the route is stopped.
184    pub fn force_completion_on_stop(mut self, enabled: bool) -> Self {
185        self.force_completion_on_stop = enabled;
186        self
187    }
188
189    /// Discard bucket contents on timeout instead of emitting the aggregated exchange.
190    pub fn discard_on_timeout(mut self, enabled: bool) -> Self {
191        self.discard_on_timeout = enabled;
192        self
193    }
194
195    /// Override the correlation strategy with a header-based key.
196    pub fn correlate_by(mut self, header: impl Into<String>) -> Self {
197        let header = header.into();
198        self.header_name = header.clone();
199        self.correlation = CorrelationStrategy::HeaderName(header);
200        self
201    }
202
203    /// Override the default `CollectAll` aggregation strategy.
204    pub fn strategy(mut self, strategy: AggregationStrategy) -> Self {
205        self.strategy = strategy;
206        self
207    }
208
209    /// Set the maximum number of correlation key buckets.
210    /// When the limit is reached, new correlation keys are rejected with an error.
211    pub fn max_buckets(mut self, max: usize) -> Self {
212        self.max_buckets = Some(max);
213        self
214    }
215
216    /// Set the time-to-live for inactive buckets.
217    /// Buckets that haven't been updated for this duration will be evicted.
218    pub fn bucket_ttl(mut self, ttl: Duration) -> Self {
219        self.bucket_ttl = Some(ttl);
220        self
221    }
222
223    pub fn try_build(self) -> Result<AggregatorConfig, CamelError> {
224        let completion = self.completion.ok_or_else(|| {
225            CamelError::Config("completion condition required for AggregatorConfig".into())
226        })?;
227        Ok(AggregatorConfig {
228            header_name: self.header_name,
229            completion,
230            correlation: self.correlation,
231            strategy: self.strategy,
232            max_buckets: self.max_buckets,
233            bucket_ttl: self.bucket_ttl,
234            force_completion_on_stop: self.force_completion_on_stop,
235            discard_on_timeout: self.discard_on_timeout,
236        })
237    }
238
239    /// Build the config. Returns an error if no completion condition was set.
240    pub fn build(self) -> Result<AggregatorConfig, CamelError> {
241        self.try_build()
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248
249    #[test]
250    fn test_aggregator_config_complete_when_size() {
251        let config = AggregatorConfig::correlate_by("orderId")
252            .complete_when_size(3)
253            .build()
254            .unwrap();
255        assert_eq!(config.header_name, "orderId");
256        assert!(matches!(
257            config.completion,
258            CompletionMode::Single(CompletionCondition::Size(3))
259        ));
260        assert!(matches!(config.strategy, AggregationStrategy::CollectAll));
261    }
262
263    #[test]
264    fn test_aggregator_config_complete_when_predicate() {
265        let config = AggregatorConfig::correlate_by("key")
266            .complete_when(|bucket| bucket.len() >= 2)
267            .build()
268            .unwrap();
269        assert!(matches!(
270            config.completion,
271            CompletionMode::Single(CompletionCondition::Predicate(_))
272        ));
273    }
274
275    #[test]
276    fn test_aggregator_config_custom_strategy() {
277        use std::sync::Arc;
278        let f: AggregationFn = Arc::new(|acc, _next| acc);
279        let config = AggregatorConfig::correlate_by("key")
280            .complete_when_size(1)
281            .strategy(AggregationStrategy::Custom(f))
282            .build()
283            .unwrap();
284        assert!(matches!(config.strategy, AggregationStrategy::Custom(_)));
285    }
286
287    #[test]
288    fn test_aggregator_config_missing_completion_returns_err() {
289        let result = AggregatorConfig::correlate_by("key").build();
290        let err = match result {
291            Err(e) => e,
292            Ok(_) => panic!("expected error, got Ok"),
293        };
294        assert!(
295            err.to_string().contains("completion"),
296            "error message should mention 'completion': {err}"
297        );
298    }
299
300    #[test]
301    fn test_complete_on_size_or_timeout() {
302        let config = AggregatorConfig::correlate_by("key")
303            .complete_on_size_or_timeout(3, Duration::from_secs(5))
304            .build()
305            .unwrap();
306        assert!(matches!(config.completion, CompletionMode::Any(v) if v.len() == 2));
307    }
308
309    #[test]
310    fn test_force_completion_on_stop_default() {
311        let config = AggregatorConfig::correlate_by("key")
312            .complete_when_size(1)
313            .build()
314            .unwrap();
315        assert!(!config.force_completion_on_stop);
316        assert!(!config.discard_on_timeout);
317    }
318
319    #[test]
320    fn test_builder_sets_timeout_and_flags_and_limits() {
321        let config = AggregatorConfig::correlate_by("key")
322            .complete_on_timeout(Duration::from_secs(2))
323            .max_buckets(7)
324            .bucket_ttl(Duration::from_secs(10))
325            .force_completion_on_stop(true)
326            .discard_on_timeout(true)
327            .build()
328            .unwrap();
329
330        assert!(matches!(
331            config.completion,
332            CompletionMode::Single(CompletionCondition::Timeout(d)) if d == Duration::from_secs(2)
333        ));
334        assert_eq!(config.max_buckets, Some(7));
335        assert_eq!(config.bucket_ttl, Some(Duration::from_secs(10)));
336        assert!(config.force_completion_on_stop);
337        assert!(config.discard_on_timeout);
338    }
339
340    #[test]
341    fn test_builder_correlate_by_overrides_header_and_strategy() {
342        let config = AggregatorConfig::correlate_by("original")
343            .correlate_by("override")
344            .complete_when_size(1)
345            .build()
346            .unwrap();
347
348        assert_eq!(config.header_name, "override");
349        assert!(matches!(
350            config.correlation,
351            CorrelationStrategy::HeaderName(ref h) if h == "override"
352        ));
353    }
354
355    #[test]
356    fn test_completion_reason_as_str_all_variants() {
357        assert_eq!(CompletionReason::Size.as_str(), "size");
358        assert_eq!(CompletionReason::Predicate.as_str(), "predicate");
359        assert_eq!(CompletionReason::Timeout.as_str(), "timeout");
360        assert_eq!(CompletionReason::Stop.as_str(), "stop");
361    }
362
363    #[test]
364    fn test_correlation_strategy_clone_and_debug() {
365        let strategy = CorrelationStrategy::Expression {
366            expr: "${header.orderId}".to_string(),
367            language: "simple".to_string(),
368        };
369        let cloned = strategy.clone();
370        assert!(matches!(
371            cloned,
372            CorrelationStrategy::Expression { ref expr, ref language }
373                if expr == "${header.orderId}" && language == "simple"
374        ));
375
376        let f = CorrelationStrategy::Fn(Arc::new(|_| Some("k".to_string())));
377        assert_eq!(format!("{:?}", f), "Fn(..)");
378    }
379
380    #[test]
381    fn test_complete_on_size_or_timeout_contains_both_conditions() {
382        let config = AggregatorConfig::correlate_by("k")
383            .complete_on_size_or_timeout(4, Duration::from_millis(250))
384            .build()
385            .unwrap();
386
387        match config.completion {
388            CompletionMode::Any(conditions) => {
389                assert!(matches!(conditions[0], CompletionCondition::Size(4)));
390                assert!(matches!(
391                    conditions[1],
392                    CompletionCondition::Timeout(d) if d == Duration::from_millis(250)
393                ));
394            }
395            _ => panic!("expected CompletionMode::Any"),
396        }
397    }
398
399    #[test]
400    fn test_correlation_strategy_fn_clone_shares_same_arc() {
401        let f: Arc<dyn Fn(&Exchange) -> Option<String> + Send + Sync> =
402            Arc::new(|_| Some("shared".to_string()));
403        let strategy = CorrelationStrategy::Fn(f.clone());
404        let cloned = strategy.clone();
405
406        match cloned {
407            CorrelationStrategy::Fn(cloned_fn) => assert!(Arc::ptr_eq(&f, &cloned_fn)),
408            _ => panic!("expected fn strategy"),
409        }
410    }
411
412    #[test]
413    fn test_builder_correlate_by_overrides_previous() {
414        let config = AggregatorConfig::correlate_by("first")
415            .correlate_by("second")
416            .complete_when_size(2)
417            .build()
418            .unwrap();
419
420        assert_eq!(config.header_name, "second");
421        assert!(
422            matches!(config.correlation, CorrelationStrategy::HeaderName(ref h) if h == "second")
423        );
424    }
425
426    #[test]
427    fn test_aggregator_try_build_missing_completion_returns_error() {
428        let result = AggregatorConfig::correlate_by("key").try_build();
429        assert!(result.is_err());
430    }
431}