Skip to main content

camel_processor/
aggregator.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6use std::time::{Duration, Instant};
7
8use tokio::sync::mpsc;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tower::Service;
12
13use camel_api::{
14    CamelError,
15    aggregator::{
16        AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode,
17        CompletionReason, CorrelationStrategy,
18    },
19    body::Body,
20    exchange::Exchange,
21    message::Message,
22};
23use camel_language_api::Language;
24
25pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
26
27pub const CAMEL_AGGREGATOR_PENDING: &str = "CamelAggregatorPending";
28pub const CAMEL_AGGREGATED_SIZE: &str = "CamelAggregatedSize";
29pub const CAMEL_AGGREGATED_KEY: &str = "CamelAggregatedKey";
30pub const CAMEL_AGGREGATED_COMPLETION_REASON: &str = "CamelAggregatedCompletionReason";
31
32/// Internal bucket structure with timestamp tracking for TTL eviction.
33struct Bucket {
34    exchanges: Vec<Exchange>,
35    last_updated: Instant,
36}
37
38impl Bucket {
39    fn new() -> Self {
40        Self {
41            exchanges: Vec::new(),
42            last_updated: Instant::now(),
43        }
44    }
45
46    fn push(&mut self, exchange: Exchange) {
47        self.exchanges.push(exchange);
48        self.last_updated = Instant::now();
49    }
50
51    fn is_expired(&self, ttl: Duration) -> bool {
52        Instant::now().duration_since(self.last_updated) >= ttl
53    }
54}
55
56#[derive(Clone)]
57pub struct AggregatorService {
58    config: AggregatorConfig,
59    buckets: Arc<Mutex<HashMap<String, Bucket>>>,
60    timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
61    timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
62    late_tx: mpsc::Sender<Exchange>,
63    language_registry: SharedLanguageRegistry,
64    route_cancel: CancellationToken,
65}
66
67impl AggregatorService {
68    pub fn new(
69        config: AggregatorConfig,
70        late_tx: mpsc::Sender<Exchange>,
71        language_registry: SharedLanguageRegistry,
72        route_cancel: CancellationToken,
73    ) -> Self {
74        Self {
75            config,
76            buckets: Arc::new(Mutex::new(HashMap::new())),
77            timeout_tasks: Arc::new(Mutex::new(HashMap::new())),
78            timeout_handles: Arc::new(Mutex::new(HashMap::new())),
79            late_tx,
80            language_registry,
81            route_cancel,
82        }
83    }
84
85    pub fn config(&self) -> &AggregatorConfig {
86        &self.config
87    }
88
89    pub fn has_timeout(&self) -> bool {
90        has_timeout_condition(&self.config.completion)
91    }
92
93    pub fn force_complete_all(&self) {
94        let mut buckets_guard = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
95        let keys: Vec<String> = buckets_guard.keys().cloned().collect();
96
97        for key in keys {
98            if let Some(bucket) = buckets_guard.remove(&key) {
99                if self.config.force_completion_on_stop {
100                    cancel_timeout_task_with_handle(
101                        &key,
102                        &self.timeout_tasks,
103                        &self.timeout_handles,
104                    );
105                    match aggregate(bucket.exchanges, &self.config.strategy) {
106                        Ok(mut result) => {
107                            result.set_property(
108                                CAMEL_AGGREGATED_COMPLETION_REASON,
109                                serde_json::json!(CompletionReason::Stop.as_str()),
110                            );
111                            if self.late_tx.try_send(result).is_err() {
112                                tracing::warn!(
113                                    key = %key,
114                                    "aggregator force-complete emit dropped: late channel full"
115                                );
116                            }
117                        }
118                        Err(e) => {
119                            // log-policy: handler-owned
120                            tracing::warn!(
121                                key = %key,
122                                error = %e,
123                                "aggregation failed in force_complete_all"
124                            );
125                        }
126                    }
127                } else {
128                    cancel_timeout_task_with_handle(
129                        &key,
130                        &self.timeout_tasks,
131                        &self.timeout_handles,
132                    );
133                }
134            }
135        }
136    }
137
138    /// Graceful shutdown: cancel all outstanding timeout tasks and await their
139    /// JoinHandles (with a 5s deadline) so that no tasks are leaked.
140    pub async fn shutdown(&self) {
141        // Cancel all timeout cancellation tokens.
142        {
143            let mut guard = self.timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
144            for token in guard.values() {
145                token.cancel();
146            }
147            guard.clear();
148        };
149
150        // Remove and collect all JoinHandles.
151        let handles: Vec<JoinHandle<()>> = {
152            let mut guard = self
153                .timeout_handles
154                .lock()
155                .unwrap_or_else(|e| e.into_inner());
156            guard.drain().map(|(_, handle)| handle).collect()
157        };
158
159        if handles.is_empty() {
160            return;
161        }
162
163        // Await all handles with a deadline.
164        let _ = tokio::time::timeout(Duration::from_secs(5), async {
165            for handle in handles {
166                let _ = handle.await;
167            }
168        })
169        .await;
170    }
171}
172
173pub fn has_timeout_condition(mode: &CompletionMode) -> bool {
174    match mode {
175        CompletionMode::Single(CompletionCondition::Timeout(_)) => true,
176        CompletionMode::Any(conditions) => conditions
177            .iter()
178            .any(|c| matches!(c, CompletionCondition::Timeout(_))),
179        _ => false,
180    }
181}
182
183impl Service<Exchange> for AggregatorService {
184    type Response = Exchange;
185    type Error = CamelError;
186    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
187
188    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
189        Poll::Ready(Ok(()))
190    }
191
192    fn call(&mut self, exchange: Exchange) -> Self::Future {
193        let config = self.config.clone();
194        let buckets = Arc::clone(&self.buckets);
195        let timeout_tasks = Arc::clone(&self.timeout_tasks);
196        let timeout_handles = Arc::clone(&self.timeout_handles);
197        let late_tx = self.late_tx.clone();
198        let language_registry = Arc::clone(&self.language_registry);
199        let route_cancel = self.route_cancel.clone();
200
201        Box::pin(async move {
202            let key_value =
203                extract_correlation_key(&exchange, &config.correlation, &language_registry).await?;
204
205            let key_str = serde_json::to_string(&key_value)
206                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
207
208            let completed_bucket = {
209                let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
210
211                if let Some(ttl) = config.bucket_ttl {
212                    guard.retain(|_, bucket| !bucket.is_expired(ttl));
213                }
214
215                if let Some(max) = config.max_buckets
216                    && !guard.contains_key(&key_str)
217                    && guard.len() >= max
218                {
219                    tracing::warn!(
220                        max_buckets = max,
221                        correlation_key = %key_str,
222                        "Aggregator reached max buckets limit, rejecting new correlation key"
223                    );
224                    return Err(CamelError::ProcessorError(format!(
225                        "Aggregator reached maximum {} buckets",
226                        max
227                    )));
228                }
229
230                let bucket = guard.entry(key_str.clone()).or_insert_with(Bucket::new);
231                bucket.push(exchange);
232
233                let (is_complete, reason) =
234                    check_sync_completion(&config.completion, &bucket.exchanges);
235
236                if is_complete {
237                    let exchanges = guard.remove(&key_str).map(|b| b.exchanges);
238                    (exchanges, reason)
239                } else {
240                    (None, CompletionReason::Size) // placeholder; reason unused when None
241                }
242            };
243
244            if completed_bucket.0.is_none() && has_timeout_condition(&config.completion) {
245                let cancel = {
246                    let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
247                    // Cancel and remove old handle for this key (if any).
248                    if let Some(existing) = tt_guard.get(&key_str) {
249                        existing.cancel();
250                    }
251                    let token = CancellationToken::new();
252                    tt_guard.insert(key_str.clone(), token.clone());
253                    token
254                };
255
256                let timeout_dur = extract_timeout_duration(&config.completion);
257                if let Some(timeout) = timeout_dur {
258                    // Remove old handle if present.
259                    {
260                        let mut hh = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
261                        if let Some(old) = hh.remove(&key_str) {
262                            old.abort();
263                        }
264                    }
265                    let handle = spawn_timeout_task(
266                        key_str.clone(),
267                        timeout,
268                        cancel,
269                        buckets.clone(),
270                        timeout_tasks.clone(),
271                        timeout_handles.clone(),
272                        late_tx,
273                        config.strategy.clone(),
274                        config.discard_on_timeout,
275                        route_cancel,
276                    );
277                    timeout_handles
278                        .lock()
279                        .unwrap_or_else(|e| e.into_inner())
280                        .insert(key_str.clone(), handle);
281                }
282            }
283
284            if let Some(exchanges) = completed_bucket.0 {
285                cancel_timeout_task_with_handle(&key_str, &timeout_tasks, &timeout_handles);
286                let reason = completed_bucket.1;
287                let size = exchanges.len();
288                let mut result = aggregate(exchanges, &config.strategy)?;
289                result.set_property(CAMEL_AGGREGATED_SIZE, serde_json::json!(size as u64));
290                result.set_property(CAMEL_AGGREGATED_KEY, key_value);
291                result.set_property(
292                    CAMEL_AGGREGATED_COMPLETION_REASON,
293                    serde_json::json!(reason.as_str()),
294                );
295                Ok(result)
296            } else {
297                let mut pending = Exchange::new(Message {
298                    headers: Default::default(),
299                    body: Body::Empty,
300                });
301                pending.set_property(CAMEL_AGGREGATOR_PENDING, serde_json::json!(true));
302                Ok(pending)
303            }
304        })
305    }
306}
307
308async fn extract_correlation_key(
309    exchange: &Exchange,
310    strategy: &CorrelationStrategy,
311    registry: &SharedLanguageRegistry,
312) -> Result<serde_json::Value, CamelError> {
313    match strategy {
314        CorrelationStrategy::HeaderName(h) => {
315            exchange.input.headers.get(h).cloned().ok_or_else(|| {
316                CamelError::ProcessorError(format!(
317                    "Aggregator: missing correlation key header '{}'",
318                    h
319                ))
320            })
321        }
322        CorrelationStrategy::Expression { expr, language } => {
323            let expression = {
324                let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
325                let lang = reg.get(language).ok_or_else(|| {
326                    CamelError::ProcessorError(format!(
327                        "Aggregator: language '{}' not found in registry",
328                        language
329                    ))
330                })?;
331                lang.create_expression(expr)
332                    .map_err(|e| CamelError::ProcessorError(e.to_string()))?
333            };
334            let value = expression
335                .evaluate(exchange)
336                .await
337                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
338            if value.is_null() {
339                return Err(CamelError::ProcessorError(format!(
340                    "Aggregator: correlation expression '{}' evaluated to null",
341                    expr
342                )));
343            }
344            Ok(value)
345        }
346        CorrelationStrategy::Fn(f) => f(exchange).map(serde_json::Value::String).ok_or_else(|| {
347            CamelError::ProcessorError("Aggregator: correlation function returned None".to_string())
348        }),
349    }
350}
351
352fn check_sync_completion(
353    mode: &CompletionMode,
354    exchanges: &[Exchange],
355) -> (bool, CompletionReason) {
356    match mode {
357        CompletionMode::Single(cond) => check_single(cond, exchanges),
358        CompletionMode::Any(conditions) => {
359            for cond in conditions {
360                if let CompletionCondition::Timeout(_) = cond {
361                    continue;
362                }
363                let (done, reason) = check_single(cond, exchanges);
364                if done {
365                    return (true, reason);
366                }
367            }
368            (false, CompletionReason::Size)
369        }
370    }
371}
372
373fn check_single(cond: &CompletionCondition, exchanges: &[Exchange]) -> (bool, CompletionReason) {
374    match cond {
375        CompletionCondition::Size(n) => (exchanges.len() >= *n, CompletionReason::Size),
376        CompletionCondition::Predicate(pred) => (pred(exchanges), CompletionReason::Predicate),
377        CompletionCondition::Timeout(_) => (false, CompletionReason::Timeout),
378    }
379}
380
381fn extract_timeout_duration(mode: &CompletionMode) -> Option<Duration> {
382    match mode {
383        CompletionMode::Single(CompletionCondition::Timeout(d)) => Some(*d),
384        CompletionMode::Any(conditions) => conditions.iter().find_map(|c| {
385            if let CompletionCondition::Timeout(d) = c {
386                Some(*d)
387            } else {
388                None
389            }
390        }),
391        _ => None,
392    }
393}
394
395fn cancel_timeout_task(key: &str, timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>) {
396    let mut guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
397    if let Some(token) = guard.remove(key) {
398        token.cancel();
399    }
400}
401
402/// Also removes the stored JoinHandle for a cancelled/completed timeout task.
403fn cancel_timeout_task_with_handle(
404    key: &str,
405    timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>,
406    timeout_handles: &Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
407) {
408    cancel_timeout_task(key, timeout_tasks);
409    let mut guard = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
410    guard.remove(key);
411}
412
413#[allow(clippy::too_many_arguments)]
414fn spawn_timeout_task(
415    key: String,
416    timeout: Duration,
417    cancel: CancellationToken,
418    buckets: Arc<Mutex<HashMap<String, Bucket>>>,
419    timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
420    timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
421    late_tx: mpsc::Sender<Exchange>,
422    strategy: AggregationStrategy,
423    discard: bool,
424    _route_cancel: CancellationToken,
425) -> JoinHandle<()> {
426    let cancel_clone = cancel.clone();
427    tokio::spawn(async move {
428        tokio::select! {
429            _ = tokio::time::sleep(timeout) => {
430                let should_proceed = {
431                    let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
432                    if cancel_clone.is_cancelled() {
433                        false
434                    } else {
435                        tt_guard.remove(&key);
436                        true
437                    }
438                };
439                if !should_proceed {
440                    return;
441                }
442                // Clean up our own JoinHandle from the map on natural completion.
443                // Without this, the handle entry leaks until route shutdown.
444                {
445                    let mut hh = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
446                    hh.remove(&key);
447                }
448                let bucket_exchanges = {
449                    let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
450                    guard.remove(&key).map(|b| b.exchanges)
451                };
452                if let Some(exchanges) = bucket_exchanges
453                    && !discard
454                {
455                    match aggregate(exchanges, &strategy) {
456                        Ok(mut result) => {
457                            result.set_property(
458                                CAMEL_AGGREGATED_COMPLETION_REASON,
459                                serde_json::json!(CompletionReason::Timeout.as_str()),
460                            );
461                            if late_tx.try_send(result).is_err() {
462                                tracing::warn!(
463                                    key = %key,
464                                    "aggregator timeout emit dropped: late channel full"
465                                );
466                            }
467                        }
468                        Err(e) => {
469                            // log-policy: handler-owned
470                            tracing::warn!(
471                                key = %key,
472                                error = %e,
473                                "aggregation failed in timeout task"
474                            );
475                        }
476                    }
477                }
478            }
479            _ = cancel_clone.cancelled() => {}
480        }
481    })
482}
483
484fn aggregate(
485    exchanges: Vec<Exchange>,
486    strategy: &AggregationStrategy,
487) -> Result<Exchange, CamelError> {
488    match strategy {
489        AggregationStrategy::CollectAll => {
490            let bodies: Vec<serde_json::Value> = exchanges
491                .into_iter()
492                .map(|e| match e.input.body {
493                    Body::Json(v) => v,
494                    Body::Text(s) => serde_json::Value::String(s),
495                    Body::Xml(s) => serde_json::Value::String(s),
496                    Body::Bytes(b) => {
497                        serde_json::Value::String(String::from_utf8_lossy(&b).into_owned())
498                    }
499                    Body::Empty => serde_json::Value::Null,
500                    Body::Stream(s) => serde_json::json!({
501                        "_stream": {
502                            "origin": s.metadata.origin,
503                            "placeholder": true,
504                            "hint": "Materialize exchange body with .into_bytes() before aggregation if content needed"
505                        }
506                    }),
507                })
508                .collect();
509            Ok(Exchange::new(Message {
510                headers: Default::default(),
511                body: Body::Json(serde_json::Value::Array(bodies)),
512            }))
513        }
514        AggregationStrategy::Custom(f) => {
515            let mut iter = exchanges.into_iter();
516            let first = iter.next().ok_or_else(|| {
517                CamelError::ProcessorError("Aggregator: empty bucket".to_string())
518            })?;
519            Ok(iter.fold(first, |acc, next| f(acc, next)))
520        }
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use super::*;
527    use std::collections::HashMap;
528
529    use camel_api::{
530        aggregator::{AggregationStrategy, AggregatorConfig},
531        body::Body,
532        exchange::Exchange,
533        message::Message,
534    };
535    use tokio::sync::mpsc;
536    use tokio_util::sync::CancellationToken;
537    use tower::ServiceExt;
538
539    fn make_exchange(header: &str, value: &str, body: &str) -> Exchange {
540        let mut msg = Message {
541            headers: Default::default(),
542            body: Body::Text(body.to_string()),
543        };
544        msg.headers
545            .insert(header.to_string(), serde_json::json!(value));
546        Exchange::new(msg)
547    }
548
549    fn config_size(n: usize) -> AggregatorConfig {
550        AggregatorConfig::correlate_by("orderId")
551            .complete_when_size(n)
552            .build()
553            .unwrap()
554    }
555
556    fn new_test_svc(config: AggregatorConfig) -> AggregatorService {
557        let (tx, _rx) = mpsc::channel(256);
558        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
559        let cancel = CancellationToken::new();
560        AggregatorService::new(config, tx, registry, cancel)
561    }
562
563    #[tokio::test]
564    async fn test_pending_exchange_not_yet_complete() {
565        let mut svc = new_test_svc(config_size(3));
566        let ex = make_exchange("orderId", "A", "first");
567        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
568        assert!(matches!(result.input.body, Body::Empty));
569        assert_eq!(
570            result.property(CAMEL_AGGREGATOR_PENDING),
571            Some(&serde_json::json!(true))
572        );
573    }
574
575    #[tokio::test]
576    async fn test_completes_on_size() {
577        let mut svc = new_test_svc(config_size(3));
578        for _ in 0..2 {
579            let ex = make_exchange("orderId", "A", "item");
580            let r = svc.ready().await.unwrap().call(ex).await.unwrap();
581            assert!(matches!(r.input.body, Body::Empty));
582        }
583        let ex = make_exchange("orderId", "A", "last");
584        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
585        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
586        assert_eq!(
587            result.property(CAMEL_AGGREGATED_SIZE),
588            Some(&serde_json::json!(3u64))
589        );
590    }
591
592    #[tokio::test]
593    async fn test_collect_all_produces_json_array() {
594        let mut svc = new_test_svc(config_size(2));
595        svc.ready()
596            .await
597            .unwrap()
598            .call(make_exchange("orderId", "A", "alpha"))
599            .await
600            .unwrap();
601        let result = svc
602            .ready()
603            .await
604            .unwrap()
605            .call(make_exchange("orderId", "A", "beta"))
606            .await
607            .unwrap();
608        let Body::Json(v) = &result.input.body else {
609            panic!("expected Body::Json")
610        };
611        let arr = v.as_array().unwrap();
612        assert_eq!(arr.len(), 2);
613        assert_eq!(arr[0], serde_json::json!("alpha"));
614        assert_eq!(arr[1], serde_json::json!("beta"));
615    }
616
617    #[tokio::test]
618    async fn test_two_keys_independent_buckets() {
619        // completionSize=3 so we can test that A and B accumulate independently.
620        let mut svc = new_test_svc(config_size(3));
621        svc.ready()
622            .await
623            .unwrap()
624            .call(make_exchange("orderId", "A", "a1"))
625            .await
626            .unwrap();
627        svc.ready()
628            .await
629            .unwrap()
630            .call(make_exchange("orderId", "B", "b1"))
631            .await
632            .unwrap();
633        svc.ready()
634            .await
635            .unwrap()
636            .call(make_exchange("orderId", "A", "a2"))
637            .await
638            .unwrap();
639        // A has 2 items, B has 1 item — neither complete yet
640        let ra = svc
641            .ready()
642            .await
643            .unwrap()
644            .call(make_exchange("orderId", "A", "a3"))
645            .await
646            .unwrap();
647        // A now has 3 → completes
648        assert!(matches!(ra.input.body, Body::Json(_)));
649        // B only has 1 → still pending
650        let rb = svc
651            .ready()
652            .await
653            .unwrap()
654            .call(make_exchange("orderId", "B", "b_check"))
655            .await
656            .unwrap();
657        assert!(matches!(rb.input.body, Body::Empty));
658    }
659
660    #[tokio::test]
661    async fn test_bucket_resets_after_completion() {
662        let mut svc = new_test_svc(config_size(2));
663        svc.ready()
664            .await
665            .unwrap()
666            .call(make_exchange("orderId", "A", "x"))
667            .await
668            .unwrap();
669        svc.ready()
670            .await
671            .unwrap()
672            .call(make_exchange("orderId", "A", "x"))
673            .await
674            .unwrap(); // completes
675        // New bucket starts
676        let r = svc
677            .ready()
678            .await
679            .unwrap()
680            .call(make_exchange("orderId", "A", "new"))
681            .await
682            .unwrap();
683        assert!(matches!(r.input.body, Body::Empty)); // pending again
684    }
685
686    #[tokio::test]
687    async fn test_completion_size_1_emits_immediately() {
688        let mut svc = new_test_svc(config_size(1));
689        let ex = make_exchange("orderId", "A", "solo");
690        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
691        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
692    }
693
694    #[tokio::test]
695    async fn test_custom_aggregation_strategy() {
696        use camel_api::aggregator::AggregationFn;
697        use std::sync::Arc;
698
699        let f: AggregationFn = Arc::new(|mut acc: Exchange, next: Exchange| {
700            let combined = format!(
701                "{}+{}",
702                acc.input.body.as_text().unwrap_or(""),
703                next.input.body.as_text().unwrap_or("")
704            );
705            acc.input.body = Body::Text(combined);
706            acc
707        });
708        let config = AggregatorConfig::correlate_by("key")
709            .complete_when_size(2)
710            .strategy(AggregationStrategy::Custom(f))
711            .build()
712            .unwrap();
713        let mut svc = new_test_svc(config);
714        svc.ready()
715            .await
716            .unwrap()
717            .call(make_exchange("key", "X", "hello"))
718            .await
719            .unwrap();
720        let result = svc
721            .ready()
722            .await
723            .unwrap()
724            .call(make_exchange("key", "X", "world"))
725            .await
726            .unwrap();
727        assert_eq!(result.input.body.as_text(), Some("hello+world"));
728    }
729
730    #[tokio::test]
731    async fn test_completion_predicate() {
732        let config = AggregatorConfig::correlate_by("key")
733            .complete_when(|bucket| {
734                bucket
735                    .iter()
736                    .any(|e| e.input.body.as_text() == Some("DONE"))
737            })
738            .build()
739            .unwrap();
740        let mut svc = new_test_svc(config);
741        svc.ready()
742            .await
743            .unwrap()
744            .call(make_exchange("key", "K", "first"))
745            .await
746            .unwrap();
747        svc.ready()
748            .await
749            .unwrap()
750            .call(make_exchange("key", "K", "second"))
751            .await
752            .unwrap();
753        let result = svc
754            .ready()
755            .await
756            .unwrap()
757            .call(make_exchange("key", "K", "DONE"))
758            .await
759            .unwrap();
760        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
761    }
762
763    #[tokio::test]
764    async fn test_missing_header_returns_error() {
765        let mut svc = new_test_svc(config_size(2));
766        let msg = Message {
767            headers: Default::default(),
768            body: Body::Text("no key".into()),
769        };
770        let ex = Exchange::new(msg);
771        let result = svc.ready().await.unwrap().call(ex).await;
772        assert!(result.is_err());
773        assert!(matches!(
774            result.unwrap_err(),
775            camel_api::CamelError::ProcessorError(_)
776        ));
777    }
778
779    #[tokio::test]
780    async fn test_cloned_service_shares_state() {
781        let svc1 = new_test_svc(config_size(2));
782        let mut svc2 = svc1.clone();
783        // send first exchange via svc1
784        svc1.clone()
785            .ready()
786            .await
787            .unwrap()
788            .call(make_exchange("orderId", "A", "from-svc1"))
789            .await
790            .unwrap();
791        // send second exchange via svc2 — should complete because same Arc<Mutex>
792        let result = svc2
793            .ready()
794            .await
795            .unwrap()
796            .call(make_exchange("orderId", "A", "from-svc2"))
797            .await
798            .unwrap();
799        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
800    }
801
802    #[tokio::test]
803    async fn test_camel_aggregated_key_property_set() {
804        let mut svc = new_test_svc(config_size(1));
805        let ex = make_exchange("orderId", "ORDER-42", "body");
806        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
807        assert_eq!(
808            result.property(CAMEL_AGGREGATED_KEY),
809            Some(&serde_json::json!("ORDER-42"))
810        );
811    }
812
813    #[tokio::test]
814    async fn test_aggregator_enforces_max_buckets() {
815        let config = AggregatorConfig::correlate_by("orderId")
816            .complete_when_size(2)
817            .max_buckets(3)
818            .build()
819            .unwrap();
820
821        let mut svc = new_test_svc(config);
822
823        // Create 3 different correlation keys (fills limit)
824        for i in 0..3 {
825            let ex = make_exchange("orderId", &format!("key-{}", i), "body");
826            let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
827        }
828
829        // 4th key should be rejected
830        let ex = make_exchange("orderId", "key-4", "body");
831        let result = svc.ready().await.unwrap().call(ex).await;
832
833        assert!(result.is_err(), "Should reject when max buckets reached");
834        let err = result.unwrap_err().to_string();
835        assert!(
836            err.contains("maximum"),
837            "Error message should contain 'maximum': {}",
838            err
839        );
840    }
841
842    #[tokio::test]
843    async fn test_max_buckets_allows_existing_key() {
844        let config = AggregatorConfig::correlate_by("orderId")
845            .complete_when_size(5) // Large size so bucket doesn't complete
846            .max_buckets(2)
847            .build()
848            .unwrap();
849
850        let mut svc = new_test_svc(config);
851
852        // Create 2 different correlation keys (fills limit)
853        let ex1 = make_exchange("orderId", "key-A", "body1");
854        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
855        let ex2 = make_exchange("orderId", "key-B", "body2");
856        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
857
858        // Should still allow adding to existing key
859        let ex3 = make_exchange("orderId", "key-A", "body3");
860        let result = svc.ready().await.unwrap().call(ex3).await;
861        assert!(
862            result.is_ok(),
863            "Should allow adding to existing bucket even at max limit"
864        );
865    }
866
867    #[tokio::test]
868    async fn test_bucket_ttl_eviction() {
869        let config = AggregatorConfig::correlate_by("orderId")
870            .complete_when_size(10) // Large size so bucket doesn't complete normally
871            .bucket_ttl(Duration::from_millis(50))
872            .build()
873            .unwrap();
874
875        let mut svc = new_test_svc(config);
876
877        // Create a bucket
878        let ex1 = make_exchange("orderId", "key-A", "body1");
879        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
880
881        // Wait for TTL to expire
882        tokio::time::sleep(Duration::from_millis(100)).await;
883
884        // Create a new bucket - this should trigger eviction of the old one
885        let ex2 = make_exchange("orderId", "key-B", "body2");
886        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
887
888        // The expired bucket should have been evicted, so we should be able to
889        // add a new key-A bucket again
890        let ex3 = make_exchange("orderId", "key-A", "body3");
891        let result = svc.ready().await.unwrap().call(ex3).await;
892        assert!(result.is_ok(), "Should be able to recreate evicted bucket");
893    }
894
895    #[tokio::test(start_paused = true)]
896    async fn test_timeout_completes_bucket() {
897        let config = AggregatorConfig::correlate_by("key")
898            .complete_on_timeout(Duration::from_millis(100))
899            .build()
900            .unwrap();
901        let mut svc = new_test_svc(config);
902        let ex = make_exchange("key", "A", "data");
903        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
904        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_some());
905
906        tokio::time::sleep(Duration::from_millis(200)).await;
907
908        assert_eq!(
909            svc.buckets.lock().unwrap().len(),
910            0,
911            "bucket should be removed after timeout"
912        );
913    }
914
915    #[tokio::test(start_paused = true)]
916    async fn test_timeout_resets_on_new_exchange() {
917        let config = AggregatorConfig::correlate_by("key")
918            .complete_on_timeout(Duration::from_millis(150))
919            .build()
920            .unwrap();
921        let mut svc = new_test_svc(config);
922
923        let ex1 = make_exchange("key", "A", "first");
924        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
925
926        tokio::time::sleep(Duration::from_millis(100)).await;
927
928        let ex2 = make_exchange("key", "A", "second");
929        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
930
931        tokio::time::sleep(Duration::from_millis(100)).await;
932
933        assert_eq!(
934            svc.buckets.lock().unwrap().len(),
935            1,
936            "bucket should still exist — timeout was reset"
937        );
938
939        tokio::time::sleep(Duration::from_millis(100)).await;
940
941        assert_eq!(
942            svc.buckets.lock().unwrap().len(),
943            0,
944            "bucket should be gone after timeout fires"
945        );
946    }
947
948    #[tokio::test]
949    async fn test_composable_size_and_timeout() {
950        let config = AggregatorConfig::correlate_by("key")
951            .complete_on_size_or_timeout(2, Duration::from_millis(200))
952            .build()
953            .unwrap();
954        let mut svc = new_test_svc(config);
955
956        let ex1 = make_exchange("key", "A", "first");
957        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
958        assert!(svc.buckets.lock().unwrap().contains_key("\"A\""));
959
960        let ex2 = make_exchange("key", "A", "second");
961        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
962        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
963        assert_eq!(
964            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
965            Some(&serde_json::json!("size"))
966        );
967    }
968
969    #[tokio::test(start_paused = true)]
970    async fn test_discard_on_timeout() {
971        let config = AggregatorConfig::correlate_by("key")
972            .complete_on_timeout(Duration::from_millis(50))
973            .discard_on_timeout(true)
974            .build()
975            .unwrap();
976        let (tx, mut rx) = mpsc::channel(256);
977        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
978        let cancel = CancellationToken::new();
979        let mut svc = AggregatorService::new(config, tx, registry, cancel);
980
981        let ex = make_exchange("key", "A", "data");
982        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
983
984        tokio::time::sleep(Duration::from_millis(100)).await;
985
986        assert!(
987            rx.try_recv().is_err(),
988            "no emit expected with discard_on_timeout"
989        );
990        assert_eq!(svc.buckets.lock().unwrap().len(), 0);
991        assert!(
992            svc.timeout_tasks.lock().unwrap().is_empty(),
993            "timeout task should be cleaned up"
994        );
995    }
996
997    #[tokio::test]
998    async fn test_force_completion_on_stop() {
999        let config = AggregatorConfig::correlate_by("key")
1000            .complete_when_size(10)
1001            .force_completion_on_stop(true)
1002            .build()
1003            .unwrap();
1004        let (tx, mut rx) = mpsc::channel(256);
1005        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1006        let cancel = CancellationToken::new();
1007        let svc = AggregatorService::new(config, tx, registry, cancel);
1008
1009        let mut call_svc = svc.clone();
1010        let ex = make_exchange("key", "A", "data");
1011        let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1012
1013        svc.force_complete_all();
1014
1015        let result = rx.try_recv().expect("should emit on force-complete");
1016        assert!(
1017            result.input.body.as_text().is_some() || matches!(result.input.body, Body::Json(_))
1018        );
1019        assert_eq!(
1020            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1021            Some(&serde_json::json!("stop"))
1022        );
1023    }
1024
1025    #[tokio::test]
1026    async fn test_completion_reason_property_size() {
1027        let config = AggregatorConfig::correlate_by("key")
1028            .complete_when_size(1)
1029            .build()
1030            .unwrap();
1031        let mut svc = new_test_svc(config);
1032        let ex = make_exchange("key", "X", "body");
1033        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1034        assert_eq!(
1035            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1036            Some(&serde_json::json!("size"))
1037        );
1038    }
1039
1040    #[tokio::test]
1041    async fn test_completion_reason_property_predicate() {
1042        let config = AggregatorConfig::correlate_by("key")
1043            .complete_when(|_| true)
1044            .build()
1045            .unwrap();
1046        let mut svc = new_test_svc(config);
1047        let ex = make_exchange("key", "X", "body");
1048        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1049        assert_eq!(
1050            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1051            Some(&serde_json::json!("predicate"))
1052        );
1053    }
1054
1055    #[tokio::test(start_paused = true)]
1056    async fn test_size_completes_before_timeout() {
1057        let config = AggregatorConfig::correlate_by("key")
1058            .complete_on_size_or_timeout(2, Duration::from_millis(200))
1059            .build()
1060            .unwrap();
1061        let mut svc = new_test_svc(config);
1062
1063        let ex1 = make_exchange("key", "A", "first");
1064        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
1065
1066        let ex2 = make_exchange("key", "A", "second");
1067        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1068
1069        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
1070        assert_eq!(
1071            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1072            Some(&serde_json::json!("size"))
1073        );
1074        assert_eq!(svc.buckets.lock().unwrap().len(), 0);
1075
1076        tokio::time::sleep(Duration::from_millis(300)).await;
1077        assert_eq!(
1078            svc.buckets.lock().unwrap().len(),
1079            0,
1080            "no re-fire after timeout"
1081        );
1082    }
1083
1084    #[tokio::test(start_paused = true)]
1085    async fn test_concurrent_timeout_fire_and_new_exchange() {
1086        let config = AggregatorConfig::correlate_by("key")
1087            .complete_on_size_or_timeout(2, Duration::from_millis(100))
1088            .build()
1089            .unwrap();
1090        let (tx, mut rx) = mpsc::channel(256);
1091        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1092        let cancel = CancellationToken::new();
1093        let mut svc = AggregatorService::new(config, tx, registry, cancel);
1094
1095        let ex = make_exchange("key", "A", "data");
1096        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1097
1098        // Advance time past timeout — timeout task fires and removes bucket
1099        tokio::time::sleep(Duration::from_millis(150)).await;
1100
1101        // New exchange arrives after timeout — starts a fresh bucket
1102        let ex2 = make_exchange("key", "A", "data2");
1103        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1104        assert!(
1105            result.property(CAMEL_AGGREGATOR_PENDING).is_some(),
1106            "should be pending in new bucket"
1107        );
1108
1109        // Drain late emits from timeout
1110        let mut late_count = 0;
1111        while rx.try_recv().is_ok() {
1112            late_count += 1;
1113        }
1114        assert_eq!(
1115            late_count, 1,
1116            "exactly 1 late emit from the timed-out bucket"
1117        );
1118    }
1119
1120    #[tokio::test(start_paused = true)]
1121    async fn test_late_channel_full_drops_with_warning() {
1122        let config = AggregatorConfig::correlate_by("key")
1123            .complete_on_timeout(Duration::from_millis(50))
1124            .build()
1125            .unwrap();
1126        let (tx, mut rx) = mpsc::channel(1);
1127        rx.close();
1128        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1129        let cancel = CancellationToken::new();
1130        let mut svc = AggregatorService::new(config, tx, registry, cancel);
1131
1132        let ex = make_exchange("key", "A", "data");
1133        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1134
1135        tokio::time::sleep(Duration::from_millis(100)).await;
1136        assert_eq!(
1137            svc.buckets.lock().unwrap().len(),
1138            0,
1139            "bucket removed despite channel closed"
1140        );
1141    }
1142
1143    #[tokio::test]
1144    async fn test_aggregate_stream_bodies_creates_valid_json() {
1145        use bytes::Bytes;
1146        use camel_api::{Body, StreamBody, StreamMetadata};
1147        use futures::stream;
1148        use tokio::sync::Mutex;
1149
1150        let chunks = vec![Ok(Bytes::from("test"))];
1151        let stream_body = StreamBody {
1152            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1153            metadata: StreamMetadata {
1154                origin: Some("file:///test.txt".to_string()),
1155                ..Default::default()
1156            },
1157        };
1158
1159        let ex1 = Exchange::new(Message {
1160            headers: Default::default(),
1161            body: Body::Stream(stream_body),
1162        });
1163
1164        let exchanges = vec![ex1];
1165        let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1166
1167        let exchange = result.expect("Expected Ok result");
1168        assert!(
1169            matches!(exchange.input.body, Body::Json(_)),
1170            "Expected Json body"
1171        );
1172
1173        if let Body::Json(value) = exchange.input.body {
1174            let json_str = serde_json::to_string(&value).unwrap();
1175            let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1176
1177            assert!(parsed.is_array(), "Result should be an array");
1178            let arr = parsed.as_array().unwrap();
1179            assert!(arr[0].is_object(), "First element should be an object");
1180            assert!(
1181                arr[0]["_stream"].is_object(),
1182                "Should contain _stream object"
1183            );
1184            assert_eq!(arr[0]["_stream"]["origin"], "file:///test.txt");
1185            assert_eq!(
1186                arr[0]["_stream"]["placeholder"], true,
1187                "placeholder flag should be true"
1188            );
1189        }
1190    }
1191
1192    #[tokio::test]
1193    async fn test_aggregate_stream_bodies_with_none_origin() {
1194        use bytes::Bytes;
1195        use camel_api::{Body, StreamBody, StreamMetadata};
1196        use futures::stream;
1197        use tokio::sync::Mutex;
1198
1199        let chunks = vec![Ok(Bytes::from("test"))];
1200        let stream_body = StreamBody {
1201            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1202            metadata: StreamMetadata {
1203                origin: None,
1204                ..Default::default()
1205            },
1206        };
1207
1208        let ex1 = Exchange::new(Message {
1209            headers: Default::default(),
1210            body: Body::Stream(stream_body),
1211        });
1212
1213        let exchanges = vec![ex1];
1214        let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1215
1216        let exchange = result.expect("Expected Ok result");
1217        assert!(
1218            matches!(exchange.input.body, Body::Json(_)),
1219            "Expected Json body"
1220        );
1221
1222        if let Body::Json(value) = exchange.input.body {
1223            let json_str = serde_json::to_string(&value).unwrap();
1224            let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1225
1226            assert!(parsed.is_array(), "Result should be an array");
1227            let arr = parsed.as_array().unwrap();
1228            assert!(arr[0].is_object(), "First element should be an object");
1229            assert!(
1230                arr[0]["_stream"].is_object(),
1231                "Should contain _stream object"
1232            );
1233            assert_eq!(
1234                arr[0]["_stream"]["origin"],
1235                serde_json::Value::Null,
1236                "origin should be null when None"
1237            );
1238            assert_eq!(
1239                arr[0]["_stream"]["placeholder"], true,
1240                "placeholder flag should be true"
1241            );
1242        }
1243    }
1244
1245    #[tokio::test]
1246    async fn timeout_completion_clears_handle_from_map() {
1247        // Regression: Oracle audit found that natural timeout completion removed
1248        // the bucket but left the JoinHandle in `timeout_handles`, leaking the
1249        // entry until route shutdown. After fix, the timeout task itself cleans
1250        // its handle from the map on natural completion.
1251        let config = AggregatorConfig::correlate_by("key")
1252            .complete_on_timeout(Duration::from_millis(50))
1253            .build()
1254            .unwrap();
1255        let (tx, _rx) = mpsc::channel(256);
1256        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1257        let cancel = CancellationToken::new();
1258        let svc = AggregatorService::new(config, tx, registry, cancel);
1259
1260        // Send an exchange to create a pending bucket with a timeout task.
1261        let mut call_svc = svc.clone();
1262        let ex = make_exchange("key", "A", "data");
1263        let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1264        assert!(
1265            !svc.timeout_handles.lock().unwrap().is_empty(),
1266            "handle should exist while timeout pending"
1267        );
1268
1269        // Wait real time for the 50ms timeout to fire + spawned task to complete.
1270        tokio::time::sleep(Duration::from_millis(200)).await;
1271
1272        assert!(
1273            svc.timeout_handles.lock().unwrap().is_empty(),
1274            "handle should be cleared from map after natural timeout completion (was leak)"
1275        );
1276    }
1277
1278    #[tokio::test(start_paused = true)]
1279    async fn test_shutdown_awaits_timeout_handles() {
1280        let config = AggregatorConfig::correlate_by("key")
1281            .complete_on_timeout(Duration::from_millis(100))
1282            .build()
1283            .unwrap();
1284        let (tx, _rx) = mpsc::channel(256);
1285        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1286        let cancel = CancellationToken::new();
1287        let svc = AggregatorService::new(config, tx, registry, cancel);
1288
1289        // Send an exchange to create a pending bucket with a timeout task.
1290        let mut call_svc = svc.clone();
1291        let ex = make_exchange("key", "A", "data");
1292        let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1293
1294        // Verify timeout handle exists.
1295        assert!(
1296            !svc.timeout_handles.lock().unwrap().is_empty(),
1297            "should have a timeout handle"
1298        );
1299
1300        // Shutdown should complete within the 5s deadline (the timeout task
1301        // gets cancelled so it won't wait for the full 100ms sleep).
1302        svc.shutdown().await;
1303
1304        assert!(
1305            svc.timeout_handles.lock().unwrap().is_empty(),
1306            "all handles should be cleaned up after shutdown"
1307        );
1308    }
1309}