Skip to main content

camel_api/
aggregator.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use crate::error::CamelError;
5use crate::exchange::Exchange;
6
7/// Aggregation function — left-fold binary: (accumulated, next) -> merged.
8pub type AggregationFn = Arc<dyn Fn(Exchange, Exchange) -> Exchange + Send + Sync>;
9
10/// Strategy for correlating exchanges into aggregation buckets.
11pub enum CorrelationStrategy {
12    /// Correlate by the value of a named header.
13    HeaderName(String),
14    /// Correlate by evaluating an expression using a language registry.
15    Expression { expr: String, language: String },
16    /// Correlate using a custom function.
17    #[allow(clippy::type_complexity)]
18    Fn(Arc<dyn Fn(&Exchange) -> Option<String> + Send + Sync>),
19}
20
21impl Clone for CorrelationStrategy {
22    fn clone(&self) -> Self {
23        match self {
24            CorrelationStrategy::HeaderName(h) => CorrelationStrategy::HeaderName(h.clone()),
25            CorrelationStrategy::Expression { expr, language } => CorrelationStrategy::Expression {
26                expr: expr.clone(),
27                language: language.clone(),
28            },
29            CorrelationStrategy::Fn(f) => CorrelationStrategy::Fn(Arc::clone(f)),
30        }
31    }
32}
33
34impl std::fmt::Debug for CorrelationStrategy {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            CorrelationStrategy::HeaderName(h) => f.debug_tuple("HeaderName").field(h).finish(),
38            CorrelationStrategy::Expression { expr, language } => f
39                .debug_struct("Expression")
40                .field("expr", expr)
41                .field("language", language)
42                .finish(),
43            CorrelationStrategy::Fn(_) => f.write_str("Fn(..)"),
44        }
45    }
46}
47
48/// How to combine collected exchanges into one.
49#[derive(Clone)]
50pub enum AggregationStrategy {
51    /// Collects all bodies into Body::Json([body1, body2, ...]).
52    CollectAll,
53    /// Left-fold: f(f(ex1, ex2), ex3), ...
54    Custom(AggregationFn),
55}
56
57/// When the bucket is considered complete and should be emitted.
58#[derive(Clone)]
59pub enum CompletionCondition {
60    /// Emit when bucket reaches exactly N exchanges.
61    Size(usize),
62    /// Emit when predicate returns true for current bucket.
63    #[allow(clippy::type_complexity)]
64    Predicate(Arc<dyn Fn(&[Exchange]) -> bool + Send + Sync>),
65    /// Emit when the bucket has been inactive for the given duration.
66    Timeout(Duration),
67}
68
69/// Determines how a bucket's completion is evaluated.
70/// `Single` wraps one condition; `Any` completes when the first condition triggers.
71#[derive(Clone)]
72pub enum CompletionMode {
73    Single(CompletionCondition),
74    Any(Vec<CompletionCondition>),
75}
76
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub enum CompletionReason {
79    Size,
80    Predicate,
81    Timeout,
82    Stop,
83}
84
85impl CompletionReason {
86    pub fn as_str(&self) -> &'static str {
87        match self {
88            CompletionReason::Size => "size",
89            CompletionReason::Predicate => "predicate",
90            CompletionReason::Timeout => "timeout",
91            CompletionReason::Stop => "stop",
92        }
93    }
94}
95
96/// Configuration for the Aggregator EIP.
97#[derive(Clone)]
98pub struct AggregatorConfig {
99    /// Name of the header used as correlation key.
100    pub header_name: String,
101    /// When to emit the aggregated exchange.
102    pub completion: CompletionMode,
103    /// Strategy for determining correlation keys.
104    pub correlation: CorrelationStrategy,
105    /// How to combine the bucket into one exchange.
106    pub strategy: AggregationStrategy,
107    /// Maximum number of correlation key buckets (memory protection).
108    /// When limit is reached, new correlation keys are rejected.
109    pub max_buckets: Option<usize>,
110    /// Time-to-live for inactive buckets (memory protection).
111    /// Buckets not updated for this duration are evicted.
112    pub bucket_ttl: Option<Duration>,
113    /// Force-complete all pending buckets when the route is stopped.
114    pub force_completion_on_stop: bool,
115    /// Discard bucket contents on timeout instead of emitting.
116    pub discard_on_timeout: bool,
117}
118
119impl AggregatorConfig {
120    /// Start building config with correlation key extracted from the named header.
121    pub fn correlate_by(header: impl Into<String>) -> AggregatorConfigBuilder {
122        let header_name = header.into();
123        AggregatorConfigBuilder {
124            header_name: header_name.clone(),
125            completion: None,
126            correlation: CorrelationStrategy::HeaderName(header_name),
127            strategy: AggregationStrategy::CollectAll,
128            max_buckets: None,
129            bucket_ttl: None,
130            force_completion_on_stop: false,
131            discard_on_timeout: false,
132        }
133    }
134}
135
136/// Builder for `AggregatorConfig`.
137pub struct AggregatorConfigBuilder {
138    header_name: String,
139    completion: Option<CompletionMode>,
140    correlation: CorrelationStrategy,
141    strategy: AggregationStrategy,
142    max_buckets: Option<usize>,
143    bucket_ttl: Option<Duration>,
144    force_completion_on_stop: bool,
145    discard_on_timeout: bool,
146}
147
148impl AggregatorConfigBuilder {
149    /// Emit when bucket has N exchanges.
150    pub fn complete_when_size(mut self, n: usize) -> Self {
151        self.completion = Some(CompletionMode::Single(CompletionCondition::Size(n)));
152        self
153    }
154
155    /// Emit when predicate returns true for the current bucket.
156    pub fn complete_when<F>(mut self, predicate: F) -> Self
157    where
158        F: Fn(&[Exchange]) -> bool + Send + Sync + 'static,
159    {
160        self.completion = Some(CompletionMode::Single(CompletionCondition::Predicate(
161            Arc::new(predicate),
162        )));
163        self
164    }
165
166    /// Emit when the bucket has been inactive for the given duration.
167    pub fn complete_on_timeout(mut self, duration: Duration) -> Self {
168        self.completion = Some(CompletionMode::Single(CompletionCondition::Timeout(
169            duration,
170        )));
171        self
172    }
173
174    /// Emit when the bucket reaches `size` OR has been inactive for `timeout`.
175    pub fn complete_on_size_or_timeout(mut self, size: usize, timeout: Duration) -> Self {
176        self.completion = Some(CompletionMode::Any(vec![
177            CompletionCondition::Size(size),
178            CompletionCondition::Timeout(timeout),
179        ]));
180        self
181    }
182
183    /// Enable force-completion of pending buckets when the route is stopped.
184    pub fn force_completion_on_stop(mut self, enabled: bool) -> Self {
185        self.force_completion_on_stop = enabled;
186        self
187    }
188
189    /// Discard bucket contents on timeout instead of emitting the aggregated exchange.
190    pub fn discard_on_timeout(mut self, enabled: bool) -> Self {
191        self.discard_on_timeout = enabled;
192        self
193    }
194
195    /// Override the correlation strategy with a header-based key.
196    pub fn correlate_by(mut self, header: impl Into<String>) -> Self {
197        let header = header.into();
198        self.header_name = header.clone();
199        self.correlation = CorrelationStrategy::HeaderName(header);
200        self
201    }
202
203    /// Override the default `CollectAll` aggregation strategy.
204    pub fn strategy(mut self, strategy: AggregationStrategy) -> Self {
205        self.strategy = strategy;
206        self
207    }
208
209    /// Set the maximum number of correlation key buckets.
210    /// When the limit is reached, new correlation keys are rejected with an error.
211    pub fn max_buckets(mut self, max: usize) -> Self {
212        self.max_buckets = Some(max);
213        self
214    }
215
216    /// Set the time-to-live for inactive buckets.
217    /// Buckets that haven't been updated for this duration will be evicted.
218    pub fn bucket_ttl(mut self, ttl: Duration) -> Self {
219        self.bucket_ttl = Some(ttl);
220        self
221    }
222
223    pub fn try_build(self) -> Result<AggregatorConfig, CamelError> {
224        let completion = self.completion.ok_or_else(|| {
225            CamelError::ProcessorError("completion condition required for AggregatorConfig".into())
226        })?;
227        Ok(AggregatorConfig {
228            header_name: self.header_name,
229            completion,
230            correlation: self.correlation,
231            strategy: self.strategy,
232            max_buckets: self.max_buckets,
233            bucket_ttl: self.bucket_ttl,
234            force_completion_on_stop: self.force_completion_on_stop,
235            discard_on_timeout: self.discard_on_timeout,
236        })
237    }
238
239    /// Build the config. Panics if no completion condition was set.
240    pub fn build(self) -> AggregatorConfig {
241        self.try_build().expect("completion condition required") // allow-unwrap
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248
249    #[test]
250    fn test_aggregator_config_complete_when_size() {
251        let config = AggregatorConfig::correlate_by("orderId")
252            .complete_when_size(3)
253            .build();
254        assert_eq!(config.header_name, "orderId");
255        assert!(matches!(
256            config.completion,
257            CompletionMode::Single(CompletionCondition::Size(3))
258        ));
259        assert!(matches!(config.strategy, AggregationStrategy::CollectAll));
260    }
261
262    #[test]
263    fn test_aggregator_config_complete_when_predicate() {
264        let config = AggregatorConfig::correlate_by("key")
265            .complete_when(|bucket| bucket.len() >= 2)
266            .build();
267        assert!(matches!(
268            config.completion,
269            CompletionMode::Single(CompletionCondition::Predicate(_))
270        ));
271    }
272
273    #[test]
274    fn test_aggregator_config_custom_strategy() {
275        use std::sync::Arc;
276        let f: AggregationFn = Arc::new(|acc, _next| acc);
277        let config = AggregatorConfig::correlate_by("key")
278            .complete_when_size(1)
279            .strategy(AggregationStrategy::Custom(f))
280            .build();
281        assert!(matches!(config.strategy, AggregationStrategy::Custom(_)));
282    }
283
284    #[test]
285    #[should_panic(expected = "completion condition required")]
286    fn test_aggregator_config_missing_completion_panics() {
287        AggregatorConfig::correlate_by("key").build();
288    }
289
290    #[test]
291    fn test_complete_on_size_or_timeout() {
292        let config = AggregatorConfig::correlate_by("key")
293            .complete_on_size_or_timeout(3, Duration::from_secs(5))
294            .build();
295        assert!(matches!(config.completion, CompletionMode::Any(v) if v.len() == 2));
296    }
297
298    #[test]
299    fn test_force_completion_on_stop_default() {
300        let config = AggregatorConfig::correlate_by("key")
301            .complete_when_size(1)
302            .build();
303        assert!(!config.force_completion_on_stop);
304        assert!(!config.discard_on_timeout);
305    }
306
307    #[test]
308    fn test_builder_sets_timeout_and_flags_and_limits() {
309        let config = AggregatorConfig::correlate_by("key")
310            .complete_on_timeout(Duration::from_secs(2))
311            .max_buckets(7)
312            .bucket_ttl(Duration::from_secs(10))
313            .force_completion_on_stop(true)
314            .discard_on_timeout(true)
315            .build();
316
317        assert!(matches!(
318            config.completion,
319            CompletionMode::Single(CompletionCondition::Timeout(d)) if d == Duration::from_secs(2)
320        ));
321        assert_eq!(config.max_buckets, Some(7));
322        assert_eq!(config.bucket_ttl, Some(Duration::from_secs(10)));
323        assert!(config.force_completion_on_stop);
324        assert!(config.discard_on_timeout);
325    }
326
327    #[test]
328    fn test_builder_correlate_by_overrides_header_and_strategy() {
329        let config = AggregatorConfig::correlate_by("original")
330            .correlate_by("override")
331            .complete_when_size(1)
332            .build();
333
334        assert_eq!(config.header_name, "override");
335        assert!(matches!(
336            config.correlation,
337            CorrelationStrategy::HeaderName(ref h) if h == "override"
338        ));
339    }
340
341    #[test]
342    fn test_completion_reason_as_str_all_variants() {
343        assert_eq!(CompletionReason::Size.as_str(), "size");
344        assert_eq!(CompletionReason::Predicate.as_str(), "predicate");
345        assert_eq!(CompletionReason::Timeout.as_str(), "timeout");
346        assert_eq!(CompletionReason::Stop.as_str(), "stop");
347    }
348
349    #[test]
350    fn test_correlation_strategy_clone_and_debug() {
351        let strategy = CorrelationStrategy::Expression {
352            expr: "${header.orderId}".to_string(),
353            language: "simple".to_string(),
354        };
355        let cloned = strategy.clone();
356        assert!(matches!(
357            cloned,
358            CorrelationStrategy::Expression { ref expr, ref language }
359                if expr == "${header.orderId}" && language == "simple"
360        ));
361
362        let f = CorrelationStrategy::Fn(Arc::new(|_| Some("k".to_string())));
363        assert_eq!(format!("{:?}", f), "Fn(..)");
364    }
365
366    #[test]
367    fn test_complete_on_size_or_timeout_contains_both_conditions() {
368        let config = AggregatorConfig::correlate_by("k")
369            .complete_on_size_or_timeout(4, Duration::from_millis(250))
370            .build();
371
372        match config.completion {
373            CompletionMode::Any(conditions) => {
374                assert!(matches!(conditions[0], CompletionCondition::Size(4)));
375                assert!(matches!(
376                    conditions[1],
377                    CompletionCondition::Timeout(d) if d == Duration::from_millis(250)
378                ));
379            }
380            _ => panic!("expected CompletionMode::Any"),
381        }
382    }
383
384    #[test]
385    fn test_correlation_strategy_fn_clone_shares_same_arc() {
386        let f: Arc<dyn Fn(&Exchange) -> Option<String> + Send + Sync> =
387            Arc::new(|_| Some("shared".to_string()));
388        let strategy = CorrelationStrategy::Fn(f.clone());
389        let cloned = strategy.clone();
390
391        match cloned {
392            CorrelationStrategy::Fn(cloned_fn) => assert!(Arc::ptr_eq(&f, &cloned_fn)),
393            _ => panic!("expected fn strategy"),
394        }
395    }
396
397    #[test]
398    fn test_builder_correlate_by_overrides_previous() {
399        let config = AggregatorConfig::correlate_by("first")
400            .correlate_by("second")
401            .complete_when_size(2)
402            .build();
403
404        assert_eq!(config.header_name, "second");
405        assert!(
406            matches!(config.correlation, CorrelationStrategy::HeaderName(ref h) if h == "second")
407        );
408    }
409
410    #[test]
411    fn test_aggregator_try_build_missing_completion_returns_error() {
412        let result = AggregatorConfig::correlate_by("key").try_build();
413        assert!(result.is_err());
414    }
415}