Skip to main content

camel_processor/
aggregator.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6use std::time::{Duration, Instant};
7
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10use tower::Service;
11
12use camel_api::{
13    CamelError,
14    aggregator::{
15        AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode,
16        CompletionReason, CorrelationStrategy,
17    },
18    body::Body,
19    exchange::Exchange,
20    message::Message,
21};
22use camel_language_api::Language;
23
24pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
25
26pub const CAMEL_AGGREGATOR_PENDING: &str = "CamelAggregatorPending";
27pub const CAMEL_AGGREGATED_SIZE: &str = "CamelAggregatedSize";
28pub const CAMEL_AGGREGATED_KEY: &str = "CamelAggregatedKey";
29pub const CAMEL_AGGREGATED_COMPLETION_REASON: &str = "CamelAggregatedCompletionReason";
30
31/// Internal bucket structure with timestamp tracking for TTL eviction.
32struct Bucket {
33    exchanges: Vec<Exchange>,
34    #[allow(dead_code)]
35    created_at: Instant,
36    last_updated: Instant,
37}
38
39impl Bucket {
40    fn new() -> Self {
41        let now = Instant::now();
42        Self {
43            exchanges: Vec::new(),
44            created_at: now,
45            last_updated: now,
46        }
47    }
48
49    fn push(&mut self, exchange: Exchange) {
50        self.exchanges.push(exchange);
51        self.last_updated = Instant::now();
52    }
53
54    #[allow(dead_code)]
55    fn len(&self) -> usize {
56        self.exchanges.len()
57    }
58
59    fn is_expired(&self, ttl: Duration) -> bool {
60        Instant::now().duration_since(self.last_updated) >= ttl
61    }
62}
63
64#[derive(Clone)]
65pub struct AggregatorService {
66    config: AggregatorConfig,
67    buckets: Arc<Mutex<HashMap<String, Bucket>>>,
68    timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
69    late_tx: mpsc::Sender<Exchange>,
70    language_registry: SharedLanguageRegistry,
71    route_cancel: CancellationToken,
72}
73
74impl AggregatorService {
75    pub fn new(
76        config: AggregatorConfig,
77        late_tx: mpsc::Sender<Exchange>,
78        language_registry: SharedLanguageRegistry,
79        route_cancel: CancellationToken,
80    ) -> Self {
81        Self {
82            config,
83            buckets: Arc::new(Mutex::new(HashMap::new())),
84            timeout_tasks: Arc::new(Mutex::new(HashMap::new())),
85            late_tx,
86            language_registry,
87            route_cancel,
88        }
89    }
90
91    pub fn has_timeout(&self) -> bool {
92        has_timeout_condition(&self.config.completion)
93    }
94
95    pub fn force_complete_all(&self) {
96        let mut buckets_guard = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
97        let keys: Vec<String> = buckets_guard.keys().cloned().collect();
98
99        for key in keys {
100            if let Some(bucket) = buckets_guard.remove(&key) {
101                if self.config.force_completion_on_stop {
102                    cancel_timeout_task(&key, &self.timeout_tasks);
103                    match aggregate(bucket.exchanges, &self.config.strategy) {
104                        Ok(mut result) => {
105                            result.set_property(
106                                CAMEL_AGGREGATED_COMPLETION_REASON,
107                                serde_json::json!(CompletionReason::Stop.as_str()),
108                            );
109                            if self.late_tx.try_send(result).is_err() {
110                                tracing::warn!(
111                                    key = %key,
112                                    "aggregator force-complete emit dropped: late channel full"
113                                );
114                            }
115                        }
116                        Err(e) => {
117                            tracing::error!(
118                                key = %key,
119                                error = %e,
120                                "aggregation failed in force_complete_all"
121                            );
122                        }
123                    }
124                } else {
125                    cancel_timeout_task(&key, &self.timeout_tasks);
126                }
127            }
128        }
129    }
130}
131
132pub fn has_timeout_condition(mode: &CompletionMode) -> bool {
133    match mode {
134        CompletionMode::Single(CompletionCondition::Timeout(_)) => true,
135        CompletionMode::Any(conditions) => conditions
136            .iter()
137            .any(|c| matches!(c, CompletionCondition::Timeout(_))),
138        _ => false,
139    }
140}
141
142impl Service<Exchange> for AggregatorService {
143    type Response = Exchange;
144    type Error = CamelError;
145    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
146
147    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
148        Poll::Ready(Ok(()))
149    }
150
151    fn call(&mut self, exchange: Exchange) -> Self::Future {
152        let config = self.config.clone();
153        let buckets = Arc::clone(&self.buckets);
154        let timeout_tasks = Arc::clone(&self.timeout_tasks);
155        let late_tx = self.late_tx.clone();
156        let language_registry = Arc::clone(&self.language_registry);
157        let route_cancel = self.route_cancel.clone();
158
159        Box::pin(async move {
160            let key_value =
161                extract_correlation_key(&exchange, &config.correlation, &language_registry)?;
162
163            let key_str = serde_json::to_string(&key_value)
164                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
165
166            let completed_bucket = {
167                let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
168
169                if let Some(ttl) = config.bucket_ttl {
170                    guard.retain(|_, bucket| !bucket.is_expired(ttl));
171                }
172
173                if let Some(max) = config.max_buckets
174                    && !guard.contains_key(&key_str)
175                    && guard.len() >= max
176                {
177                    tracing::warn!(
178                        max_buckets = max,
179                        correlation_key = %key_str,
180                        "Aggregator reached max buckets limit, rejecting new correlation key"
181                    );
182                    return Err(CamelError::ProcessorError(format!(
183                        "Aggregator reached maximum {} buckets",
184                        max
185                    )));
186                }
187
188                let bucket = guard.entry(key_str.clone()).or_insert_with(Bucket::new);
189                bucket.push(exchange);
190
191                let (is_complete, reason) =
192                    check_sync_completion(&config.completion, &bucket.exchanges);
193
194                if is_complete {
195                    let exchanges = guard.remove(&key_str).map(|b| b.exchanges);
196                    (exchanges, reason)
197                } else {
198                    (None, CompletionReason::Size) // placeholder; reason unused when None
199                }
200            };
201
202            if completed_bucket.0.is_none() && has_timeout_condition(&config.completion) {
203                let cancel = {
204                    let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
205                    if let Some(existing) = tt_guard.get(&key_str) {
206                        existing.cancel();
207                    }
208                    let token = CancellationToken::new();
209                    tt_guard.insert(key_str.clone(), token.clone());
210                    token
211                };
212
213                let timeout_dur = extract_timeout_duration(&config.completion);
214                if let Some(timeout) = timeout_dur {
215                    spawn_timeout_task(
216                        key_str.clone(),
217                        timeout,
218                        cancel,
219                        buckets.clone(),
220                        timeout_tasks.clone(),
221                        late_tx,
222                        config.strategy.clone(),
223                        config.discard_on_timeout,
224                        route_cancel,
225                    );
226                }
227            }
228
229            if let Some(exchanges) = completed_bucket.0 {
230                cancel_timeout_task(&key_str, &timeout_tasks);
231                let reason = completed_bucket.1;
232                let size = exchanges.len();
233                let mut result = aggregate(exchanges, &config.strategy)?;
234                result.set_property(CAMEL_AGGREGATED_SIZE, serde_json::json!(size as u64));
235                result.set_property(CAMEL_AGGREGATED_KEY, key_value);
236                result.set_property(
237                    CAMEL_AGGREGATED_COMPLETION_REASON,
238                    serde_json::json!(reason.as_str()),
239                );
240                Ok(result)
241            } else {
242                let mut pending = Exchange::new(Message {
243                    headers: Default::default(),
244                    body: Body::Empty,
245                });
246                pending.set_property(CAMEL_AGGREGATOR_PENDING, serde_json::json!(true));
247                Ok(pending)
248            }
249        })
250    }
251}
252
253fn extract_correlation_key(
254    exchange: &Exchange,
255    strategy: &CorrelationStrategy,
256    registry: &SharedLanguageRegistry,
257) -> Result<serde_json::Value, CamelError> {
258    match strategy {
259        CorrelationStrategy::HeaderName(h) => {
260            exchange.input.headers.get(h).cloned().ok_or_else(|| {
261                CamelError::ProcessorError(format!(
262                    "Aggregator: missing correlation key header '{}'",
263                    h
264                ))
265            })
266        }
267        CorrelationStrategy::Expression { expr, language } => {
268            let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
269            let lang = reg.get(language).ok_or_else(|| {
270                CamelError::ProcessorError(format!(
271                    "Aggregator: language '{}' not found in registry",
272                    language
273                ))
274            })?;
275            let expression = lang
276                .create_expression(expr)
277                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
278            let value = expression
279                .evaluate(exchange)
280                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
281            if value.is_null() {
282                return Err(CamelError::ProcessorError(format!(
283                    "Aggregator: correlation expression '{}' evaluated to null",
284                    expr
285                )));
286            }
287            Ok(value)
288        }
289        CorrelationStrategy::Fn(f) => f(exchange).map(serde_json::Value::String).ok_or_else(|| {
290            CamelError::ProcessorError("Aggregator: correlation function returned None".to_string())
291        }),
292    }
293}
294
295fn check_sync_completion(
296    mode: &CompletionMode,
297    exchanges: &[Exchange],
298) -> (bool, CompletionReason) {
299    match mode {
300        CompletionMode::Single(cond) => check_single(cond, exchanges),
301        CompletionMode::Any(conditions) => {
302            for cond in conditions {
303                if let CompletionCondition::Timeout(_) = cond {
304                    continue;
305                }
306                let (done, reason) = check_single(cond, exchanges);
307                if done {
308                    return (true, reason);
309                }
310            }
311            (false, CompletionReason::Size)
312        }
313    }
314}
315
316fn check_single(cond: &CompletionCondition, exchanges: &[Exchange]) -> (bool, CompletionReason) {
317    match cond {
318        CompletionCondition::Size(n) => (exchanges.len() >= *n, CompletionReason::Size),
319        CompletionCondition::Predicate(pred) => (pred(exchanges), CompletionReason::Predicate),
320        CompletionCondition::Timeout(_) => (false, CompletionReason::Timeout),
321    }
322}
323
324fn extract_timeout_duration(mode: &CompletionMode) -> Option<Duration> {
325    match mode {
326        CompletionMode::Single(CompletionCondition::Timeout(d)) => Some(*d),
327        CompletionMode::Any(conditions) => conditions.iter().find_map(|c| {
328            if let CompletionCondition::Timeout(d) = c {
329                Some(*d)
330            } else {
331                None
332            }
333        }),
334        _ => None,
335    }
336}
337
338fn cancel_timeout_task(key: &str, timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>) {
339    let mut guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
340    if let Some(token) = guard.remove(key) {
341        token.cancel();
342    }
343}
344
345#[allow(clippy::too_many_arguments)]
346fn spawn_timeout_task(
347    key: String,
348    timeout: Duration,
349    cancel: CancellationToken,
350    buckets: Arc<Mutex<HashMap<String, Bucket>>>,
351    timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
352    late_tx: mpsc::Sender<Exchange>,
353    strategy: AggregationStrategy,
354    discard: bool,
355    _route_cancel: CancellationToken,
356) {
357    let cancel_clone = cancel.clone();
358    tokio::spawn(async move {
359        tokio::select! {
360            _ = tokio::time::sleep(timeout) => {
361                let should_proceed = {
362                    let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
363                    if cancel_clone.is_cancelled() {
364                        false
365                    } else {
366                        tt_guard.remove(&key);
367                        true
368                    }
369                };
370                if !should_proceed {
371                    return;
372                }
373                let bucket_exchanges = {
374                    let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
375                    guard.remove(&key).map(|b| b.exchanges)
376                };
377                if let Some(exchanges) = bucket_exchanges
378                    && !discard
379                {
380                    match aggregate(exchanges, &strategy) {
381                        Ok(mut result) => {
382                            result.set_property(
383                                CAMEL_AGGREGATED_COMPLETION_REASON,
384                                serde_json::json!(CompletionReason::Timeout.as_str()),
385                            );
386                            if late_tx.try_send(result).is_err() {
387                                tracing::warn!(
388                                    key = %key,
389                                    "aggregator timeout emit dropped: late channel full"
390                                );
391                            }
392                        }
393                        Err(e) => {
394                            tracing::error!(
395                                key = %key,
396                                error = %e,
397                                "aggregation failed in timeout task"
398                            );
399                        }
400                    }
401                }
402            }
403            _ = cancel_clone.cancelled() => {}
404        }
405    });
406}
407
408fn aggregate(
409    exchanges: Vec<Exchange>,
410    strategy: &AggregationStrategy,
411) -> Result<Exchange, CamelError> {
412    match strategy {
413        AggregationStrategy::CollectAll => {
414            let bodies: Vec<serde_json::Value> = exchanges
415                .into_iter()
416                .map(|e| match e.input.body {
417                    Body::Json(v) => v,
418                    Body::Text(s) => serde_json::Value::String(s),
419                    Body::Xml(s) => serde_json::Value::String(s),
420                    Body::Bytes(b) => {
421                        serde_json::Value::String(String::from_utf8_lossy(&b).into_owned())
422                    }
423                    Body::Empty => serde_json::Value::Null,
424                    Body::Stream(s) => serde_json::json!({
425                        "_stream": {
426                            "origin": s.metadata.origin,
427                            "placeholder": true,
428                            "hint": "Materialize exchange body with .into_bytes() before aggregation if content needed"
429                        }
430                    }),
431                })
432                .collect();
433            Ok(Exchange::new(Message {
434                headers: Default::default(),
435                body: Body::Json(serde_json::Value::Array(bodies)),
436            }))
437        }
438        AggregationStrategy::Custom(f) => {
439            let mut iter = exchanges.into_iter();
440            let first = iter.next().ok_or_else(|| {
441                CamelError::ProcessorError("Aggregator: empty bucket".to_string())
442            })?;
443            Ok(iter.fold(first, |acc, next| f(acc, next)))
444        }
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use super::*;
451    use std::collections::HashMap;
452
453    use camel_api::{
454        aggregator::{AggregationStrategy, AggregatorConfig},
455        body::Body,
456        exchange::Exchange,
457        message::Message,
458    };
459    use tokio::sync::mpsc;
460    use tokio_util::sync::CancellationToken;
461    use tower::ServiceExt;
462
463    fn make_exchange(header: &str, value: &str, body: &str) -> Exchange {
464        let mut msg = Message {
465            headers: Default::default(),
466            body: Body::Text(body.to_string()),
467        };
468        msg.headers
469            .insert(header.to_string(), serde_json::json!(value));
470        Exchange::new(msg)
471    }
472
473    fn config_size(n: usize) -> AggregatorConfig {
474        AggregatorConfig::correlate_by("orderId")
475            .complete_when_size(n)
476            .build()
477    }
478
479    fn new_test_svc(config: AggregatorConfig) -> AggregatorService {
480        let (tx, _rx) = mpsc::channel(256);
481        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
482        let cancel = CancellationToken::new();
483        AggregatorService::new(config, tx, registry, cancel)
484    }
485
486    #[tokio::test]
487    async fn test_pending_exchange_not_yet_complete() {
488        let mut svc = new_test_svc(config_size(3));
489        let ex = make_exchange("orderId", "A", "first");
490        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
491        assert!(matches!(result.input.body, Body::Empty));
492        assert_eq!(
493            result.property(CAMEL_AGGREGATOR_PENDING),
494            Some(&serde_json::json!(true))
495        );
496    }
497
498    #[tokio::test]
499    async fn test_completes_on_size() {
500        let mut svc = new_test_svc(config_size(3));
501        for _ in 0..2 {
502            let ex = make_exchange("orderId", "A", "item");
503            let r = svc.ready().await.unwrap().call(ex).await.unwrap();
504            assert!(matches!(r.input.body, Body::Empty));
505        }
506        let ex = make_exchange("orderId", "A", "last");
507        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
508        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
509        assert_eq!(
510            result.property(CAMEL_AGGREGATED_SIZE),
511            Some(&serde_json::json!(3u64))
512        );
513    }
514
515    #[tokio::test]
516    async fn test_collect_all_produces_json_array() {
517        let mut svc = new_test_svc(config_size(2));
518        svc.ready()
519            .await
520            .unwrap()
521            .call(make_exchange("orderId", "A", "alpha"))
522            .await
523            .unwrap();
524        let result = svc
525            .ready()
526            .await
527            .unwrap()
528            .call(make_exchange("orderId", "A", "beta"))
529            .await
530            .unwrap();
531        let Body::Json(v) = &result.input.body else {
532            panic!("expected Body::Json")
533        };
534        let arr = v.as_array().unwrap();
535        assert_eq!(arr.len(), 2);
536        assert_eq!(arr[0], serde_json::json!("alpha"));
537        assert_eq!(arr[1], serde_json::json!("beta"));
538    }
539
540    #[tokio::test]
541    async fn test_two_keys_independent_buckets() {
542        // completionSize=3 so we can test that A and B accumulate independently.
543        let mut svc = new_test_svc(config_size(3));
544        svc.ready()
545            .await
546            .unwrap()
547            .call(make_exchange("orderId", "A", "a1"))
548            .await
549            .unwrap();
550        svc.ready()
551            .await
552            .unwrap()
553            .call(make_exchange("orderId", "B", "b1"))
554            .await
555            .unwrap();
556        svc.ready()
557            .await
558            .unwrap()
559            .call(make_exchange("orderId", "A", "a2"))
560            .await
561            .unwrap();
562        // A has 2 items, B has 1 item — neither complete yet
563        let ra = svc
564            .ready()
565            .await
566            .unwrap()
567            .call(make_exchange("orderId", "A", "a3"))
568            .await
569            .unwrap();
570        // A now has 3 → completes
571        assert!(matches!(ra.input.body, Body::Json(_)));
572        // B only has 1 → still pending
573        let rb = svc
574            .ready()
575            .await
576            .unwrap()
577            .call(make_exchange("orderId", "B", "b_check"))
578            .await
579            .unwrap();
580        assert!(matches!(rb.input.body, Body::Empty));
581    }
582
583    #[tokio::test]
584    async fn test_bucket_resets_after_completion() {
585        let mut svc = new_test_svc(config_size(2));
586        svc.ready()
587            .await
588            .unwrap()
589            .call(make_exchange("orderId", "A", "x"))
590            .await
591            .unwrap();
592        svc.ready()
593            .await
594            .unwrap()
595            .call(make_exchange("orderId", "A", "x"))
596            .await
597            .unwrap(); // completes
598        // New bucket starts
599        let r = svc
600            .ready()
601            .await
602            .unwrap()
603            .call(make_exchange("orderId", "A", "new"))
604            .await
605            .unwrap();
606        assert!(matches!(r.input.body, Body::Empty)); // pending again
607    }
608
609    #[tokio::test]
610    async fn test_completion_size_1_emits_immediately() {
611        let mut svc = new_test_svc(config_size(1));
612        let ex = make_exchange("orderId", "A", "solo");
613        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
614        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
615    }
616
617    #[tokio::test]
618    async fn test_custom_aggregation_strategy() {
619        use camel_api::aggregator::AggregationFn;
620        use std::sync::Arc;
621
622        let f: AggregationFn = Arc::new(|mut acc: Exchange, next: Exchange| {
623            let combined = format!(
624                "{}+{}",
625                acc.input.body.as_text().unwrap_or(""),
626                next.input.body.as_text().unwrap_or("")
627            );
628            acc.input.body = Body::Text(combined);
629            acc
630        });
631        let config = AggregatorConfig::correlate_by("key")
632            .complete_when_size(2)
633            .strategy(AggregationStrategy::Custom(f))
634            .build();
635        let mut svc = new_test_svc(config);
636        svc.ready()
637            .await
638            .unwrap()
639            .call(make_exchange("key", "X", "hello"))
640            .await
641            .unwrap();
642        let result = svc
643            .ready()
644            .await
645            .unwrap()
646            .call(make_exchange("key", "X", "world"))
647            .await
648            .unwrap();
649        assert_eq!(result.input.body.as_text(), Some("hello+world"));
650    }
651
652    #[tokio::test]
653    async fn test_completion_predicate() {
654        let config = AggregatorConfig::correlate_by("key")
655            .complete_when(|bucket| {
656                bucket
657                    .iter()
658                    .any(|e| e.input.body.as_text() == Some("DONE"))
659            })
660            .build();
661        let mut svc = new_test_svc(config);
662        svc.ready()
663            .await
664            .unwrap()
665            .call(make_exchange("key", "K", "first"))
666            .await
667            .unwrap();
668        svc.ready()
669            .await
670            .unwrap()
671            .call(make_exchange("key", "K", "second"))
672            .await
673            .unwrap();
674        let result = svc
675            .ready()
676            .await
677            .unwrap()
678            .call(make_exchange("key", "K", "DONE"))
679            .await
680            .unwrap();
681        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
682    }
683
684    #[tokio::test]
685    async fn test_missing_header_returns_error() {
686        let mut svc = new_test_svc(config_size(2));
687        let msg = Message {
688            headers: Default::default(),
689            body: Body::Text("no key".into()),
690        };
691        let ex = Exchange::new(msg);
692        let result = svc.ready().await.unwrap().call(ex).await;
693        assert!(result.is_err());
694        assert!(matches!(
695            result.unwrap_err(),
696            camel_api::CamelError::ProcessorError(_)
697        ));
698    }
699
700    #[tokio::test]
701    async fn test_cloned_service_shares_state() {
702        let svc1 = new_test_svc(config_size(2));
703        let mut svc2 = svc1.clone();
704        // send first exchange via svc1
705        svc1.clone()
706            .ready()
707            .await
708            .unwrap()
709            .call(make_exchange("orderId", "A", "from-svc1"))
710            .await
711            .unwrap();
712        // send second exchange via svc2 — should complete because same Arc<Mutex>
713        let result = svc2
714            .ready()
715            .await
716            .unwrap()
717            .call(make_exchange("orderId", "A", "from-svc2"))
718            .await
719            .unwrap();
720        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
721    }
722
723    #[tokio::test]
724    async fn test_camel_aggregated_key_property_set() {
725        let mut svc = new_test_svc(config_size(1));
726        let ex = make_exchange("orderId", "ORDER-42", "body");
727        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
728        assert_eq!(
729            result.property(CAMEL_AGGREGATED_KEY),
730            Some(&serde_json::json!("ORDER-42"))
731        );
732    }
733
734    #[tokio::test]
735    async fn test_aggregator_enforces_max_buckets() {
736        let config = AggregatorConfig::correlate_by("orderId")
737            .complete_when_size(2)
738            .max_buckets(3)
739            .build();
740
741        let mut svc = new_test_svc(config);
742
743        // Create 3 different correlation keys (fills limit)
744        for i in 0..3 {
745            let ex = make_exchange("orderId", &format!("key-{}", i), "body");
746            let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
747        }
748
749        // 4th key should be rejected
750        let ex = make_exchange("orderId", "key-4", "body");
751        let result = svc.ready().await.unwrap().call(ex).await;
752
753        assert!(result.is_err(), "Should reject when max buckets reached");
754        let err = result.unwrap_err().to_string();
755        assert!(
756            err.contains("maximum"),
757            "Error message should contain 'maximum': {}",
758            err
759        );
760    }
761
762    #[tokio::test]
763    async fn test_max_buckets_allows_existing_key() {
764        let config = AggregatorConfig::correlate_by("orderId")
765            .complete_when_size(5) // Large size so bucket doesn't complete
766            .max_buckets(2)
767            .build();
768
769        let mut svc = new_test_svc(config);
770
771        // Create 2 different correlation keys (fills limit)
772        let ex1 = make_exchange("orderId", "key-A", "body1");
773        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
774        let ex2 = make_exchange("orderId", "key-B", "body2");
775        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
776
777        // Should still allow adding to existing key
778        let ex3 = make_exchange("orderId", "key-A", "body3");
779        let result = svc.ready().await.unwrap().call(ex3).await;
780        assert!(
781            result.is_ok(),
782            "Should allow adding to existing bucket even at max limit"
783        );
784    }
785
786    #[tokio::test]
787    async fn test_bucket_ttl_eviction() {
788        let config = AggregatorConfig::correlate_by("orderId")
789            .complete_when_size(10) // Large size so bucket doesn't complete normally
790            .bucket_ttl(Duration::from_millis(50))
791            .build();
792
793        let mut svc = new_test_svc(config);
794
795        // Create a bucket
796        let ex1 = make_exchange("orderId", "key-A", "body1");
797        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
798
799        // Wait for TTL to expire
800        tokio::time::sleep(Duration::from_millis(100)).await;
801
802        // Create a new bucket - this should trigger eviction of the old one
803        let ex2 = make_exchange("orderId", "key-B", "body2");
804        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
805
806        // The expired bucket should have been evicted, so we should be able to
807        // add a new key-A bucket again
808        let ex3 = make_exchange("orderId", "key-A", "body3");
809        let result = svc.ready().await.unwrap().call(ex3).await;
810        assert!(result.is_ok(), "Should be able to recreate evicted bucket");
811    }
812
813    #[tokio::test(start_paused = true)]
814    async fn test_timeout_completes_bucket() {
815        let config = AggregatorConfig::correlate_by("key")
816            .complete_on_timeout(Duration::from_millis(100))
817            .build();
818        let mut svc = new_test_svc(config);
819        let ex = make_exchange("key", "A", "data");
820        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
821        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_some());
822
823        tokio::time::sleep(Duration::from_millis(200)).await;
824
825        assert_eq!(
826            svc.buckets.lock().unwrap().len(),
827            0,
828            "bucket should be removed after timeout"
829        );
830    }
831
832    #[tokio::test(start_paused = true)]
833    async fn test_timeout_resets_on_new_exchange() {
834        let config = AggregatorConfig::correlate_by("key")
835            .complete_on_timeout(Duration::from_millis(150))
836            .build();
837        let mut svc = new_test_svc(config);
838
839        let ex1 = make_exchange("key", "A", "first");
840        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
841
842        tokio::time::sleep(Duration::from_millis(100)).await;
843
844        let ex2 = make_exchange("key", "A", "second");
845        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
846
847        tokio::time::sleep(Duration::from_millis(100)).await;
848
849        assert_eq!(
850            svc.buckets.lock().unwrap().len(),
851            1,
852            "bucket should still exist — timeout was reset"
853        );
854
855        tokio::time::sleep(Duration::from_millis(100)).await;
856
857        assert_eq!(
858            svc.buckets.lock().unwrap().len(),
859            0,
860            "bucket should be gone after timeout fires"
861        );
862    }
863
864    #[tokio::test]
865    async fn test_composable_size_and_timeout() {
866        let config = AggregatorConfig::correlate_by("key")
867            .complete_on_size_or_timeout(2, Duration::from_millis(200))
868            .build();
869        let mut svc = new_test_svc(config);
870
871        let ex1 = make_exchange("key", "A", "first");
872        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
873        assert!(svc.buckets.lock().unwrap().contains_key("\"A\""));
874
875        let ex2 = make_exchange("key", "A", "second");
876        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
877        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
878        assert_eq!(
879            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
880            Some(&serde_json::json!("size"))
881        );
882    }
883
884    #[tokio::test(start_paused = true)]
885    async fn test_discard_on_timeout() {
886        let config = AggregatorConfig::correlate_by("key")
887            .complete_on_timeout(Duration::from_millis(50))
888            .discard_on_timeout(true)
889            .build();
890        let (tx, mut rx) = mpsc::channel(256);
891        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
892        let cancel = CancellationToken::new();
893        let mut svc = AggregatorService::new(config, tx, registry, cancel);
894
895        let ex = make_exchange("key", "A", "data");
896        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
897
898        tokio::time::sleep(Duration::from_millis(100)).await;
899
900        assert!(
901            rx.try_recv().is_err(),
902            "no emit expected with discard_on_timeout"
903        );
904        assert_eq!(svc.buckets.lock().unwrap().len(), 0);
905        assert!(
906            svc.timeout_tasks.lock().unwrap().is_empty(),
907            "timeout task should be cleaned up"
908        );
909    }
910
911    #[tokio::test]
912    async fn test_force_completion_on_stop() {
913        let config = AggregatorConfig::correlate_by("key")
914            .complete_when_size(10)
915            .force_completion_on_stop(true)
916            .build();
917        let (tx, mut rx) = mpsc::channel(256);
918        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
919        let cancel = CancellationToken::new();
920        let svc = AggregatorService::new(config, tx, registry, cancel);
921
922        let mut call_svc = svc.clone();
923        let ex = make_exchange("key", "A", "data");
924        let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
925
926        svc.force_complete_all();
927
928        let result = rx.try_recv().expect("should emit on force-complete");
929        assert!(
930            result.input.body.as_text().is_some() || matches!(result.input.body, Body::Json(_))
931        );
932        assert_eq!(
933            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
934            Some(&serde_json::json!("stop"))
935        );
936    }
937
938    #[tokio::test]
939    async fn test_completion_reason_property_size() {
940        let config = AggregatorConfig::correlate_by("key")
941            .complete_when_size(1)
942            .build();
943        let mut svc = new_test_svc(config);
944        let ex = make_exchange("key", "X", "body");
945        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
946        assert_eq!(
947            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
948            Some(&serde_json::json!("size"))
949        );
950    }
951
952    #[tokio::test]
953    async fn test_completion_reason_property_predicate() {
954        let config = AggregatorConfig::correlate_by("key")
955            .complete_when(|_| true)
956            .build();
957        let mut svc = new_test_svc(config);
958        let ex = make_exchange("key", "X", "body");
959        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
960        assert_eq!(
961            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
962            Some(&serde_json::json!("predicate"))
963        );
964    }
965
966    #[tokio::test(start_paused = true)]
967    async fn test_size_completes_before_timeout() {
968        let config = AggregatorConfig::correlate_by("key")
969            .complete_on_size_or_timeout(2, Duration::from_millis(200))
970            .build();
971        let mut svc = new_test_svc(config);
972
973        let ex1 = make_exchange("key", "A", "first");
974        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
975
976        let ex2 = make_exchange("key", "A", "second");
977        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
978
979        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
980        assert_eq!(
981            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
982            Some(&serde_json::json!("size"))
983        );
984        assert_eq!(svc.buckets.lock().unwrap().len(), 0);
985
986        tokio::time::sleep(Duration::from_millis(300)).await;
987        assert_eq!(
988            svc.buckets.lock().unwrap().len(),
989            0,
990            "no re-fire after timeout"
991        );
992    }
993
994    #[tokio::test(start_paused = true)]
995    async fn test_concurrent_timeout_fire_and_new_exchange() {
996        let config = AggregatorConfig::correlate_by("key")
997            .complete_on_size_or_timeout(2, Duration::from_millis(100))
998            .build();
999        let (tx, mut rx) = mpsc::channel(256);
1000        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1001        let cancel = CancellationToken::new();
1002        let mut svc = AggregatorService::new(config, tx, registry, cancel);
1003
1004        let ex = make_exchange("key", "A", "data");
1005        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1006
1007        // Advance time past timeout — timeout task fires and removes bucket
1008        tokio::time::sleep(Duration::from_millis(150)).await;
1009
1010        // New exchange arrives after timeout — starts a fresh bucket
1011        let ex2 = make_exchange("key", "A", "data2");
1012        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1013        assert!(
1014            result.property(CAMEL_AGGREGATOR_PENDING).is_some(),
1015            "should be pending in new bucket"
1016        );
1017
1018        // Drain late emits from timeout
1019        let mut late_count = 0;
1020        while rx.try_recv().is_ok() {
1021            late_count += 1;
1022        }
1023        assert_eq!(
1024            late_count, 1,
1025            "exactly 1 late emit from the timed-out bucket"
1026        );
1027    }
1028
1029    #[tokio::test(start_paused = true)]
1030    async fn test_late_channel_full_drops_with_warning() {
1031        let config = AggregatorConfig::correlate_by("key")
1032            .complete_on_timeout(Duration::from_millis(50))
1033            .build();
1034        let (tx, mut rx) = mpsc::channel(1);
1035        rx.close();
1036        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1037        let cancel = CancellationToken::new();
1038        let mut svc = AggregatorService::new(config, tx, registry, cancel);
1039
1040        let ex = make_exchange("key", "A", "data");
1041        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1042
1043        tokio::time::sleep(Duration::from_millis(100)).await;
1044        assert_eq!(
1045            svc.buckets.lock().unwrap().len(),
1046            0,
1047            "bucket removed despite channel closed"
1048        );
1049    }
1050
1051    #[tokio::test]
1052    async fn test_aggregate_stream_bodies_creates_valid_json() {
1053        use bytes::Bytes;
1054        use camel_api::{Body, StreamBody, StreamMetadata};
1055        use futures::stream;
1056        use tokio::sync::Mutex;
1057
1058        let chunks = vec![Ok(Bytes::from("test"))];
1059        let stream_body = StreamBody {
1060            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1061            metadata: StreamMetadata {
1062                origin: Some("file:///test.txt".to_string()),
1063                ..Default::default()
1064            },
1065        };
1066
1067        let ex1 = Exchange::new(Message {
1068            headers: Default::default(),
1069            body: Body::Stream(stream_body),
1070        });
1071
1072        let exchanges = vec![ex1];
1073        let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1074
1075        let exchange = result.expect("Expected Ok result");
1076        assert!(
1077            matches!(exchange.input.body, Body::Json(_)),
1078            "Expected Json body"
1079        );
1080
1081        if let Body::Json(value) = exchange.input.body {
1082            let json_str = serde_json::to_string(&value).unwrap();
1083            let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1084
1085            assert!(parsed.is_array(), "Result should be an array");
1086            let arr = parsed.as_array().unwrap();
1087            assert!(arr[0].is_object(), "First element should be an object");
1088            assert!(
1089                arr[0]["_stream"].is_object(),
1090                "Should contain _stream object"
1091            );
1092            assert_eq!(arr[0]["_stream"]["origin"], "file:///test.txt");
1093            assert_eq!(
1094                arr[0]["_stream"]["placeholder"], true,
1095                "placeholder flag should be true"
1096            );
1097        }
1098    }
1099
1100    #[tokio::test]
1101    async fn test_aggregate_stream_bodies_with_none_origin() {
1102        use bytes::Bytes;
1103        use camel_api::{Body, StreamBody, StreamMetadata};
1104        use futures::stream;
1105        use tokio::sync::Mutex;
1106
1107        let chunks = vec![Ok(Bytes::from("test"))];
1108        let stream_body = StreamBody {
1109            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1110            metadata: StreamMetadata {
1111                origin: None,
1112                ..Default::default()
1113            },
1114        };
1115
1116        let ex1 = Exchange::new(Message {
1117            headers: Default::default(),
1118            body: Body::Stream(stream_body),
1119        });
1120
1121        let exchanges = vec![ex1];
1122        let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1123
1124        let exchange = result.expect("Expected Ok result");
1125        assert!(
1126            matches!(exchange.input.body, Body::Json(_)),
1127            "Expected Json body"
1128        );
1129
1130        if let Body::Json(value) = exchange.input.body {
1131            let json_str = serde_json::to_string(&value).unwrap();
1132            let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1133
1134            assert!(parsed.is_array(), "Result should be an array");
1135            let arr = parsed.as_array().unwrap();
1136            assert!(arr[0].is_object(), "First element should be an object");
1137            assert!(
1138                arr[0]["_stream"].is_object(),
1139                "Should contain _stream object"
1140            );
1141            assert_eq!(
1142                arr[0]["_stream"]["origin"],
1143                serde_json::Value::Null,
1144                "origin should be null when None"
1145            );
1146            assert_eq!(
1147                arr[0]["_stream"]["placeholder"], true,
1148                "placeholder flag should be true"
1149            );
1150        }
1151    }
1152}