Skip to main content

camel_processor/
aggregator.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6use std::time::{Duration, Instant};
7
8use tokio::sync::mpsc;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tower::Service;
12
13use camel_api::{
14    CamelError,
15    aggregator::{
16        AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode,
17        CompletionReason, CorrelationStrategy,
18    },
19    body::Body,
20    exchange::Exchange,
21    message::Message,
22};
23use camel_language_api::Language;
24
25pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
26
27pub const CAMEL_AGGREGATOR_PENDING: &str = "CamelAggregatorPending";
28pub const CAMEL_AGGREGATED_SIZE: &str = "CamelAggregatedSize";
29pub const CAMEL_AGGREGATED_KEY: &str = "CamelAggregatedKey";
30pub const CAMEL_AGGREGATED_COMPLETION_REASON: &str = "CamelAggregatedCompletionReason";
31
32/// Internal bucket structure with timestamp tracking for TTL eviction.
33struct Bucket {
34    exchanges: Vec<Exchange>,
35    last_updated: Instant,
36}
37
38impl Bucket {
39    fn new() -> Self {
40        Self {
41            exchanges: Vec::new(),
42            last_updated: Instant::now(),
43        }
44    }
45
46    fn push(&mut self, exchange: Exchange) {
47        self.exchanges.push(exchange);
48        self.last_updated = Instant::now();
49    }
50
51    fn is_expired(&self, ttl: Duration) -> bool {
52        Instant::now().duration_since(self.last_updated) >= ttl
53    }
54}
55
56#[derive(Clone)]
57pub struct AggregatorService {
58    config: AggregatorConfig,
59    buckets: Arc<Mutex<HashMap<String, Bucket>>>,
60    timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
61    timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
62    late_tx: mpsc::Sender<Exchange>,
63    language_registry: SharedLanguageRegistry,
64    route_cancel: CancellationToken,
65}
66
67impl AggregatorService {
68    pub fn new(
69        config: AggregatorConfig,
70        late_tx: mpsc::Sender<Exchange>,
71        language_registry: SharedLanguageRegistry,
72        route_cancel: CancellationToken,
73    ) -> Self {
74        Self {
75            config,
76            buckets: Arc::new(Mutex::new(HashMap::new())),
77            timeout_tasks: Arc::new(Mutex::new(HashMap::new())),
78            timeout_handles: Arc::new(Mutex::new(HashMap::new())),
79            late_tx,
80            language_registry,
81            route_cancel,
82        }
83    }
84
85    pub fn has_timeout(&self) -> bool {
86        has_timeout_condition(&self.config.completion)
87    }
88
89    pub fn force_complete_all(&self) {
90        let mut buckets_guard = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
91        let keys: Vec<String> = buckets_guard.keys().cloned().collect();
92
93        for key in keys {
94            if let Some(bucket) = buckets_guard.remove(&key) {
95                if self.config.force_completion_on_stop {
96                    cancel_timeout_task_with_handle(
97                        &key,
98                        &self.timeout_tasks,
99                        &self.timeout_handles,
100                    );
101                    match aggregate(bucket.exchanges, &self.config.strategy) {
102                        Ok(mut result) => {
103                            result.set_property(
104                                CAMEL_AGGREGATED_COMPLETION_REASON,
105                                serde_json::json!(CompletionReason::Stop.as_str()),
106                            );
107                            if self.late_tx.try_send(result).is_err() {
108                                tracing::warn!(
109                                    key = %key,
110                                    "aggregator force-complete emit dropped: late channel full"
111                                );
112                            }
113                        }
114                        Err(e) => {
115                            tracing::error!(
116                                key = %key,
117                                error = %e,
118                                "aggregation failed in force_complete_all"
119                            );
120                        }
121                    }
122                } else {
123                    cancel_timeout_task_with_handle(
124                        &key,
125                        &self.timeout_tasks,
126                        &self.timeout_handles,
127                    );
128                }
129            }
130        }
131    }
132
133    /// Graceful shutdown: cancel all outstanding timeout tasks and await their
134    /// JoinHandles (with a 5s deadline) so that no tasks are leaked.
135    pub async fn shutdown(&self) {
136        // Cancel all timeout cancellation tokens.
137        {
138            let mut guard = self.timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
139            for token in guard.values() {
140                token.cancel();
141            }
142            guard.clear();
143        };
144
145        // Remove and collect all JoinHandles.
146        let handles: Vec<JoinHandle<()>> = {
147            let mut guard = self
148                .timeout_handles
149                .lock()
150                .unwrap_or_else(|e| e.into_inner());
151            guard.drain().map(|(_, handle)| handle).collect()
152        };
153
154        if handles.is_empty() {
155            return;
156        }
157
158        // Await all handles with a deadline.
159        let _ = tokio::time::timeout(Duration::from_secs(5), async {
160            for handle in handles {
161                let _ = handle.await;
162            }
163        })
164        .await;
165    }
166}
167
168pub fn has_timeout_condition(mode: &CompletionMode) -> bool {
169    match mode {
170        CompletionMode::Single(CompletionCondition::Timeout(_)) => true,
171        CompletionMode::Any(conditions) => conditions
172            .iter()
173            .any(|c| matches!(c, CompletionCondition::Timeout(_))),
174        _ => false,
175    }
176}
177
178impl Service<Exchange> for AggregatorService {
179    type Response = Exchange;
180    type Error = CamelError;
181    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
182
183    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
184        Poll::Ready(Ok(()))
185    }
186
187    fn call(&mut self, exchange: Exchange) -> Self::Future {
188        let config = self.config.clone();
189        let buckets = Arc::clone(&self.buckets);
190        let timeout_tasks = Arc::clone(&self.timeout_tasks);
191        let timeout_handles = Arc::clone(&self.timeout_handles);
192        let late_tx = self.late_tx.clone();
193        let language_registry = Arc::clone(&self.language_registry);
194        let route_cancel = self.route_cancel.clone();
195
196        Box::pin(async move {
197            let key_value =
198                extract_correlation_key(&exchange, &config.correlation, &language_registry).await?;
199
200            let key_str = serde_json::to_string(&key_value)
201                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
202
203            let completed_bucket = {
204                let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
205
206                if let Some(ttl) = config.bucket_ttl {
207                    guard.retain(|_, bucket| !bucket.is_expired(ttl));
208                }
209
210                if let Some(max) = config.max_buckets
211                    && !guard.contains_key(&key_str)
212                    && guard.len() >= max
213                {
214                    tracing::warn!(
215                        max_buckets = max,
216                        correlation_key = %key_str,
217                        "Aggregator reached max buckets limit, rejecting new correlation key"
218                    );
219                    return Err(CamelError::ProcessorError(format!(
220                        "Aggregator reached maximum {} buckets",
221                        max
222                    )));
223                }
224
225                let bucket = guard.entry(key_str.clone()).or_insert_with(Bucket::new);
226                bucket.push(exchange);
227
228                let (is_complete, reason) =
229                    check_sync_completion(&config.completion, &bucket.exchanges);
230
231                if is_complete {
232                    let exchanges = guard.remove(&key_str).map(|b| b.exchanges);
233                    (exchanges, reason)
234                } else {
235                    (None, CompletionReason::Size) // placeholder; reason unused when None
236                }
237            };
238
239            if completed_bucket.0.is_none() && has_timeout_condition(&config.completion) {
240                let cancel = {
241                    let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
242                    // Cancel and remove old handle for this key (if any).
243                    if let Some(existing) = tt_guard.get(&key_str) {
244                        existing.cancel();
245                    }
246                    let token = CancellationToken::new();
247                    tt_guard.insert(key_str.clone(), token.clone());
248                    token
249                };
250
251                let timeout_dur = extract_timeout_duration(&config.completion);
252                if let Some(timeout) = timeout_dur {
253                    // Remove old handle if present.
254                    {
255                        let mut hh = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
256                        if let Some(old) = hh.remove(&key_str) {
257                            old.abort();
258                        }
259                    }
260                    let handle = spawn_timeout_task(
261                        key_str.clone(),
262                        timeout,
263                        cancel,
264                        buckets.clone(),
265                        timeout_tasks.clone(),
266                        timeout_handles.clone(),
267                        late_tx,
268                        config.strategy.clone(),
269                        config.discard_on_timeout,
270                        route_cancel,
271                    );
272                    timeout_handles
273                        .lock()
274                        .unwrap_or_else(|e| e.into_inner())
275                        .insert(key_str.clone(), handle);
276                }
277            }
278
279            if let Some(exchanges) = completed_bucket.0 {
280                cancel_timeout_task_with_handle(&key_str, &timeout_tasks, &timeout_handles);
281                let reason = completed_bucket.1;
282                let size = exchanges.len();
283                let mut result = aggregate(exchanges, &config.strategy)?;
284                result.set_property(CAMEL_AGGREGATED_SIZE, serde_json::json!(size as u64));
285                result.set_property(CAMEL_AGGREGATED_KEY, key_value);
286                result.set_property(
287                    CAMEL_AGGREGATED_COMPLETION_REASON,
288                    serde_json::json!(reason.as_str()),
289                );
290                Ok(result)
291            } else {
292                let mut pending = Exchange::new(Message {
293                    headers: Default::default(),
294                    body: Body::Empty,
295                });
296                pending.set_property(CAMEL_AGGREGATOR_PENDING, serde_json::json!(true));
297                Ok(pending)
298            }
299        })
300    }
301}
302
303async fn extract_correlation_key(
304    exchange: &Exchange,
305    strategy: &CorrelationStrategy,
306    registry: &SharedLanguageRegistry,
307) -> Result<serde_json::Value, CamelError> {
308    match strategy {
309        CorrelationStrategy::HeaderName(h) => {
310            exchange.input.headers.get(h).cloned().ok_or_else(|| {
311                CamelError::ProcessorError(format!(
312                    "Aggregator: missing correlation key header '{}'",
313                    h
314                ))
315            })
316        }
317        CorrelationStrategy::Expression { expr, language } => {
318            let expression = {
319                let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
320                let lang = reg.get(language).ok_or_else(|| {
321                    CamelError::ProcessorError(format!(
322                        "Aggregator: language '{}' not found in registry",
323                        language
324                    ))
325                })?;
326                lang.create_expression(expr)
327                    .map_err(|e| CamelError::ProcessorError(e.to_string()))?
328            };
329            let value = expression
330                .evaluate(exchange)
331                .await
332                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
333            if value.is_null() {
334                return Err(CamelError::ProcessorError(format!(
335                    "Aggregator: correlation expression '{}' evaluated to null",
336                    expr
337                )));
338            }
339            Ok(value)
340        }
341        CorrelationStrategy::Fn(f) => f(exchange).map(serde_json::Value::String).ok_or_else(|| {
342            CamelError::ProcessorError("Aggregator: correlation function returned None".to_string())
343        }),
344    }
345}
346
347fn check_sync_completion(
348    mode: &CompletionMode,
349    exchanges: &[Exchange],
350) -> (bool, CompletionReason) {
351    match mode {
352        CompletionMode::Single(cond) => check_single(cond, exchanges),
353        CompletionMode::Any(conditions) => {
354            for cond in conditions {
355                if let CompletionCondition::Timeout(_) = cond {
356                    continue;
357                }
358                let (done, reason) = check_single(cond, exchanges);
359                if done {
360                    return (true, reason);
361                }
362            }
363            (false, CompletionReason::Size)
364        }
365    }
366}
367
368fn check_single(cond: &CompletionCondition, exchanges: &[Exchange]) -> (bool, CompletionReason) {
369    match cond {
370        CompletionCondition::Size(n) => (exchanges.len() >= *n, CompletionReason::Size),
371        CompletionCondition::Predicate(pred) => (pred(exchanges), CompletionReason::Predicate),
372        CompletionCondition::Timeout(_) => (false, CompletionReason::Timeout),
373    }
374}
375
376fn extract_timeout_duration(mode: &CompletionMode) -> Option<Duration> {
377    match mode {
378        CompletionMode::Single(CompletionCondition::Timeout(d)) => Some(*d),
379        CompletionMode::Any(conditions) => conditions.iter().find_map(|c| {
380            if let CompletionCondition::Timeout(d) = c {
381                Some(*d)
382            } else {
383                None
384            }
385        }),
386        _ => None,
387    }
388}
389
390fn cancel_timeout_task(key: &str, timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>) {
391    let mut guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
392    if let Some(token) = guard.remove(key) {
393        token.cancel();
394    }
395}
396
397/// Also removes the stored JoinHandle for a cancelled/completed timeout task.
398fn cancel_timeout_task_with_handle(
399    key: &str,
400    timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>,
401    timeout_handles: &Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
402) {
403    cancel_timeout_task(key, timeout_tasks);
404    let mut guard = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
405    guard.remove(key);
406}
407
408#[allow(clippy::too_many_arguments)]
409fn spawn_timeout_task(
410    key: String,
411    timeout: Duration,
412    cancel: CancellationToken,
413    buckets: Arc<Mutex<HashMap<String, Bucket>>>,
414    timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
415    _timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
416    late_tx: mpsc::Sender<Exchange>,
417    strategy: AggregationStrategy,
418    discard: bool,
419    _route_cancel: CancellationToken,
420) -> JoinHandle<()> {
421    let cancel_clone = cancel.clone();
422    tokio::spawn(async move {
423        tokio::select! {
424            _ = tokio::time::sleep(timeout) => {
425                let should_proceed = {
426                    let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
427                    if cancel_clone.is_cancelled() {
428                        false
429                    } else {
430                        tt_guard.remove(&key);
431                        true
432                    }
433                };
434                if !should_proceed {
435                    return;
436                }
437                let bucket_exchanges = {
438                    let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
439                    guard.remove(&key).map(|b| b.exchanges)
440                };
441                if let Some(exchanges) = bucket_exchanges
442                    && !discard
443                {
444                    match aggregate(exchanges, &strategy) {
445                        Ok(mut result) => {
446                            result.set_property(
447                                CAMEL_AGGREGATED_COMPLETION_REASON,
448                                serde_json::json!(CompletionReason::Timeout.as_str()),
449                            );
450                            if late_tx.try_send(result).is_err() {
451                                tracing::warn!(
452                                    key = %key,
453                                    "aggregator timeout emit dropped: late channel full"
454                                );
455                            }
456                        }
457                        Err(e) => {
458                            tracing::error!(
459                                key = %key,
460                                error = %e,
461                                "aggregation failed in timeout task"
462                            );
463                        }
464                    }
465                }
466            }
467            _ = cancel_clone.cancelled() => {}
468        }
469    })
470}
471
472fn aggregate(
473    exchanges: Vec<Exchange>,
474    strategy: &AggregationStrategy,
475) -> Result<Exchange, CamelError> {
476    match strategy {
477        AggregationStrategy::CollectAll => {
478            let bodies: Vec<serde_json::Value> = exchanges
479                .into_iter()
480                .map(|e| match e.input.body {
481                    Body::Json(v) => v,
482                    Body::Text(s) => serde_json::Value::String(s),
483                    Body::Xml(s) => serde_json::Value::String(s),
484                    Body::Bytes(b) => {
485                        serde_json::Value::String(String::from_utf8_lossy(&b).into_owned())
486                    }
487                    Body::Empty => serde_json::Value::Null,
488                    Body::Stream(s) => serde_json::json!({
489                        "_stream": {
490                            "origin": s.metadata.origin,
491                            "placeholder": true,
492                            "hint": "Materialize exchange body with .into_bytes() before aggregation if content needed"
493                        }
494                    }),
495                })
496                .collect();
497            Ok(Exchange::new(Message {
498                headers: Default::default(),
499                body: Body::Json(serde_json::Value::Array(bodies)),
500            }))
501        }
502        AggregationStrategy::Custom(f) => {
503            let mut iter = exchanges.into_iter();
504            let first = iter.next().ok_or_else(|| {
505                CamelError::ProcessorError("Aggregator: empty bucket".to_string())
506            })?;
507            Ok(iter.fold(first, |acc, next| f(acc, next)))
508        }
509    }
510}
511
512#[cfg(test)]
513mod tests {
514    use super::*;
515    use std::collections::HashMap;
516
517    use camel_api::{
518        aggregator::{AggregationStrategy, AggregatorConfig},
519        body::Body,
520        exchange::Exchange,
521        message::Message,
522    };
523    use tokio::sync::mpsc;
524    use tokio_util::sync::CancellationToken;
525    use tower::ServiceExt;
526
527    fn make_exchange(header: &str, value: &str, body: &str) -> Exchange {
528        let mut msg = Message {
529            headers: Default::default(),
530            body: Body::Text(body.to_string()),
531        };
532        msg.headers
533            .insert(header.to_string(), serde_json::json!(value));
534        Exchange::new(msg)
535    }
536
537    fn config_size(n: usize) -> AggregatorConfig {
538        AggregatorConfig::correlate_by("orderId")
539            .complete_when_size(n)
540            .build()
541            .unwrap()
542    }
543
544    fn new_test_svc(config: AggregatorConfig) -> AggregatorService {
545        let (tx, _rx) = mpsc::channel(256);
546        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
547        let cancel = CancellationToken::new();
548        AggregatorService::new(config, tx, registry, cancel)
549    }
550
551    #[tokio::test]
552    async fn test_pending_exchange_not_yet_complete() {
553        let mut svc = new_test_svc(config_size(3));
554        let ex = make_exchange("orderId", "A", "first");
555        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
556        assert!(matches!(result.input.body, Body::Empty));
557        assert_eq!(
558            result.property(CAMEL_AGGREGATOR_PENDING),
559            Some(&serde_json::json!(true))
560        );
561    }
562
563    #[tokio::test]
564    async fn test_completes_on_size() {
565        let mut svc = new_test_svc(config_size(3));
566        for _ in 0..2 {
567            let ex = make_exchange("orderId", "A", "item");
568            let r = svc.ready().await.unwrap().call(ex).await.unwrap();
569            assert!(matches!(r.input.body, Body::Empty));
570        }
571        let ex = make_exchange("orderId", "A", "last");
572        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
573        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
574        assert_eq!(
575            result.property(CAMEL_AGGREGATED_SIZE),
576            Some(&serde_json::json!(3u64))
577        );
578    }
579
580    #[tokio::test]
581    async fn test_collect_all_produces_json_array() {
582        let mut svc = new_test_svc(config_size(2));
583        svc.ready()
584            .await
585            .unwrap()
586            .call(make_exchange("orderId", "A", "alpha"))
587            .await
588            .unwrap();
589        let result = svc
590            .ready()
591            .await
592            .unwrap()
593            .call(make_exchange("orderId", "A", "beta"))
594            .await
595            .unwrap();
596        let Body::Json(v) = &result.input.body else {
597            panic!("expected Body::Json")
598        };
599        let arr = v.as_array().unwrap();
600        assert_eq!(arr.len(), 2);
601        assert_eq!(arr[0], serde_json::json!("alpha"));
602        assert_eq!(arr[1], serde_json::json!("beta"));
603    }
604
605    #[tokio::test]
606    async fn test_two_keys_independent_buckets() {
607        // completionSize=3 so we can test that A and B accumulate independently.
608        let mut svc = new_test_svc(config_size(3));
609        svc.ready()
610            .await
611            .unwrap()
612            .call(make_exchange("orderId", "A", "a1"))
613            .await
614            .unwrap();
615        svc.ready()
616            .await
617            .unwrap()
618            .call(make_exchange("orderId", "B", "b1"))
619            .await
620            .unwrap();
621        svc.ready()
622            .await
623            .unwrap()
624            .call(make_exchange("orderId", "A", "a2"))
625            .await
626            .unwrap();
627        // A has 2 items, B has 1 item — neither complete yet
628        let ra = svc
629            .ready()
630            .await
631            .unwrap()
632            .call(make_exchange("orderId", "A", "a3"))
633            .await
634            .unwrap();
635        // A now has 3 → completes
636        assert!(matches!(ra.input.body, Body::Json(_)));
637        // B only has 1 → still pending
638        let rb = svc
639            .ready()
640            .await
641            .unwrap()
642            .call(make_exchange("orderId", "B", "b_check"))
643            .await
644            .unwrap();
645        assert!(matches!(rb.input.body, Body::Empty));
646    }
647
648    #[tokio::test]
649    async fn test_bucket_resets_after_completion() {
650        let mut svc = new_test_svc(config_size(2));
651        svc.ready()
652            .await
653            .unwrap()
654            .call(make_exchange("orderId", "A", "x"))
655            .await
656            .unwrap();
657        svc.ready()
658            .await
659            .unwrap()
660            .call(make_exchange("orderId", "A", "x"))
661            .await
662            .unwrap(); // completes
663        // New bucket starts
664        let r = svc
665            .ready()
666            .await
667            .unwrap()
668            .call(make_exchange("orderId", "A", "new"))
669            .await
670            .unwrap();
671        assert!(matches!(r.input.body, Body::Empty)); // pending again
672    }
673
674    #[tokio::test]
675    async fn test_completion_size_1_emits_immediately() {
676        let mut svc = new_test_svc(config_size(1));
677        let ex = make_exchange("orderId", "A", "solo");
678        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
679        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
680    }
681
682    #[tokio::test]
683    async fn test_custom_aggregation_strategy() {
684        use camel_api::aggregator::AggregationFn;
685        use std::sync::Arc;
686
687        let f: AggregationFn = Arc::new(|mut acc: Exchange, next: Exchange| {
688            let combined = format!(
689                "{}+{}",
690                acc.input.body.as_text().unwrap_or(""),
691                next.input.body.as_text().unwrap_or("")
692            );
693            acc.input.body = Body::Text(combined);
694            acc
695        });
696        let config = AggregatorConfig::correlate_by("key")
697            .complete_when_size(2)
698            .strategy(AggregationStrategy::Custom(f))
699            .build()
700            .unwrap();
701        let mut svc = new_test_svc(config);
702        svc.ready()
703            .await
704            .unwrap()
705            .call(make_exchange("key", "X", "hello"))
706            .await
707            .unwrap();
708        let result = svc
709            .ready()
710            .await
711            .unwrap()
712            .call(make_exchange("key", "X", "world"))
713            .await
714            .unwrap();
715        assert_eq!(result.input.body.as_text(), Some("hello+world"));
716    }
717
718    #[tokio::test]
719    async fn test_completion_predicate() {
720        let config = AggregatorConfig::correlate_by("key")
721            .complete_when(|bucket| {
722                bucket
723                    .iter()
724                    .any(|e| e.input.body.as_text() == Some("DONE"))
725            })
726            .build()
727            .unwrap();
728        let mut svc = new_test_svc(config);
729        svc.ready()
730            .await
731            .unwrap()
732            .call(make_exchange("key", "K", "first"))
733            .await
734            .unwrap();
735        svc.ready()
736            .await
737            .unwrap()
738            .call(make_exchange("key", "K", "second"))
739            .await
740            .unwrap();
741        let result = svc
742            .ready()
743            .await
744            .unwrap()
745            .call(make_exchange("key", "K", "DONE"))
746            .await
747            .unwrap();
748        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
749    }
750
751    #[tokio::test]
752    async fn test_missing_header_returns_error() {
753        let mut svc = new_test_svc(config_size(2));
754        let msg = Message {
755            headers: Default::default(),
756            body: Body::Text("no key".into()),
757        };
758        let ex = Exchange::new(msg);
759        let result = svc.ready().await.unwrap().call(ex).await;
760        assert!(result.is_err());
761        assert!(matches!(
762            result.unwrap_err(),
763            camel_api::CamelError::ProcessorError(_)
764        ));
765    }
766
767    #[tokio::test]
768    async fn test_cloned_service_shares_state() {
769        let svc1 = new_test_svc(config_size(2));
770        let mut svc2 = svc1.clone();
771        // send first exchange via svc1
772        svc1.clone()
773            .ready()
774            .await
775            .unwrap()
776            .call(make_exchange("orderId", "A", "from-svc1"))
777            .await
778            .unwrap();
779        // send second exchange via svc2 — should complete because same Arc<Mutex>
780        let result = svc2
781            .ready()
782            .await
783            .unwrap()
784            .call(make_exchange("orderId", "A", "from-svc2"))
785            .await
786            .unwrap();
787        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
788    }
789
790    #[tokio::test]
791    async fn test_camel_aggregated_key_property_set() {
792        let mut svc = new_test_svc(config_size(1));
793        let ex = make_exchange("orderId", "ORDER-42", "body");
794        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
795        assert_eq!(
796            result.property(CAMEL_AGGREGATED_KEY),
797            Some(&serde_json::json!("ORDER-42"))
798        );
799    }
800
801    #[tokio::test]
802    async fn test_aggregator_enforces_max_buckets() {
803        let config = AggregatorConfig::correlate_by("orderId")
804            .complete_when_size(2)
805            .max_buckets(3)
806            .build()
807            .unwrap();
808
809        let mut svc = new_test_svc(config);
810
811        // Create 3 different correlation keys (fills limit)
812        for i in 0..3 {
813            let ex = make_exchange("orderId", &format!("key-{}", i), "body");
814            let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
815        }
816
817        // 4th key should be rejected
818        let ex = make_exchange("orderId", "key-4", "body");
819        let result = svc.ready().await.unwrap().call(ex).await;
820
821        assert!(result.is_err(), "Should reject when max buckets reached");
822        let err = result.unwrap_err().to_string();
823        assert!(
824            err.contains("maximum"),
825            "Error message should contain 'maximum': {}",
826            err
827        );
828    }
829
830    #[tokio::test]
831    async fn test_max_buckets_allows_existing_key() {
832        let config = AggregatorConfig::correlate_by("orderId")
833            .complete_when_size(5) // Large size so bucket doesn't complete
834            .max_buckets(2)
835            .build()
836            .unwrap();
837
838        let mut svc = new_test_svc(config);
839
840        // Create 2 different correlation keys (fills limit)
841        let ex1 = make_exchange("orderId", "key-A", "body1");
842        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
843        let ex2 = make_exchange("orderId", "key-B", "body2");
844        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
845
846        // Should still allow adding to existing key
847        let ex3 = make_exchange("orderId", "key-A", "body3");
848        let result = svc.ready().await.unwrap().call(ex3).await;
849        assert!(
850            result.is_ok(),
851            "Should allow adding to existing bucket even at max limit"
852        );
853    }
854
855    #[tokio::test]
856    async fn test_bucket_ttl_eviction() {
857        let config = AggregatorConfig::correlate_by("orderId")
858            .complete_when_size(10) // Large size so bucket doesn't complete normally
859            .bucket_ttl(Duration::from_millis(50))
860            .build()
861            .unwrap();
862
863        let mut svc = new_test_svc(config);
864
865        // Create a bucket
866        let ex1 = make_exchange("orderId", "key-A", "body1");
867        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
868
869        // Wait for TTL to expire
870        tokio::time::sleep(Duration::from_millis(100)).await;
871
872        // Create a new bucket - this should trigger eviction of the old one
873        let ex2 = make_exchange("orderId", "key-B", "body2");
874        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
875
876        // The expired bucket should have been evicted, so we should be able to
877        // add a new key-A bucket again
878        let ex3 = make_exchange("orderId", "key-A", "body3");
879        let result = svc.ready().await.unwrap().call(ex3).await;
880        assert!(result.is_ok(), "Should be able to recreate evicted bucket");
881    }
882
883    #[tokio::test(start_paused = true)]
884    async fn test_timeout_completes_bucket() {
885        let config = AggregatorConfig::correlate_by("key")
886            .complete_on_timeout(Duration::from_millis(100))
887            .build()
888            .unwrap();
889        let mut svc = new_test_svc(config);
890        let ex = make_exchange("key", "A", "data");
891        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
892        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_some());
893
894        tokio::time::sleep(Duration::from_millis(200)).await;
895
896        assert_eq!(
897            svc.buckets.lock().unwrap().len(),
898            0,
899            "bucket should be removed after timeout"
900        );
901    }
902
903    #[tokio::test(start_paused = true)]
904    async fn test_timeout_resets_on_new_exchange() {
905        let config = AggregatorConfig::correlate_by("key")
906            .complete_on_timeout(Duration::from_millis(150))
907            .build()
908            .unwrap();
909        let mut svc = new_test_svc(config);
910
911        let ex1 = make_exchange("key", "A", "first");
912        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
913
914        tokio::time::sleep(Duration::from_millis(100)).await;
915
916        let ex2 = make_exchange("key", "A", "second");
917        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
918
919        tokio::time::sleep(Duration::from_millis(100)).await;
920
921        assert_eq!(
922            svc.buckets.lock().unwrap().len(),
923            1,
924            "bucket should still exist — timeout was reset"
925        );
926
927        tokio::time::sleep(Duration::from_millis(100)).await;
928
929        assert_eq!(
930            svc.buckets.lock().unwrap().len(),
931            0,
932            "bucket should be gone after timeout fires"
933        );
934    }
935
936    #[tokio::test]
937    async fn test_composable_size_and_timeout() {
938        let config = AggregatorConfig::correlate_by("key")
939            .complete_on_size_or_timeout(2, Duration::from_millis(200))
940            .build()
941            .unwrap();
942        let mut svc = new_test_svc(config);
943
944        let ex1 = make_exchange("key", "A", "first");
945        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
946        assert!(svc.buckets.lock().unwrap().contains_key("\"A\""));
947
948        let ex2 = make_exchange("key", "A", "second");
949        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
950        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
951        assert_eq!(
952            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
953            Some(&serde_json::json!("size"))
954        );
955    }
956
957    #[tokio::test(start_paused = true)]
958    async fn test_discard_on_timeout() {
959        let config = AggregatorConfig::correlate_by("key")
960            .complete_on_timeout(Duration::from_millis(50))
961            .discard_on_timeout(true)
962            .build()
963            .unwrap();
964        let (tx, mut rx) = mpsc::channel(256);
965        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
966        let cancel = CancellationToken::new();
967        let mut svc = AggregatorService::new(config, tx, registry, cancel);
968
969        let ex = make_exchange("key", "A", "data");
970        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
971
972        tokio::time::sleep(Duration::from_millis(100)).await;
973
974        assert!(
975            rx.try_recv().is_err(),
976            "no emit expected with discard_on_timeout"
977        );
978        assert_eq!(svc.buckets.lock().unwrap().len(), 0);
979        assert!(
980            svc.timeout_tasks.lock().unwrap().is_empty(),
981            "timeout task should be cleaned up"
982        );
983    }
984
985    #[tokio::test]
986    async fn test_force_completion_on_stop() {
987        let config = AggregatorConfig::correlate_by("key")
988            .complete_when_size(10)
989            .force_completion_on_stop(true)
990            .build()
991            .unwrap();
992        let (tx, mut rx) = mpsc::channel(256);
993        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
994        let cancel = CancellationToken::new();
995        let svc = AggregatorService::new(config, tx, registry, cancel);
996
997        let mut call_svc = svc.clone();
998        let ex = make_exchange("key", "A", "data");
999        let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1000
1001        svc.force_complete_all();
1002
1003        let result = rx.try_recv().expect("should emit on force-complete");
1004        assert!(
1005            result.input.body.as_text().is_some() || matches!(result.input.body, Body::Json(_))
1006        );
1007        assert_eq!(
1008            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1009            Some(&serde_json::json!("stop"))
1010        );
1011    }
1012
1013    #[tokio::test]
1014    async fn test_completion_reason_property_size() {
1015        let config = AggregatorConfig::correlate_by("key")
1016            .complete_when_size(1)
1017            .build()
1018            .unwrap();
1019        let mut svc = new_test_svc(config);
1020        let ex = make_exchange("key", "X", "body");
1021        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1022        assert_eq!(
1023            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1024            Some(&serde_json::json!("size"))
1025        );
1026    }
1027
1028    #[tokio::test]
1029    async fn test_completion_reason_property_predicate() {
1030        let config = AggregatorConfig::correlate_by("key")
1031            .complete_when(|_| true)
1032            .build()
1033            .unwrap();
1034        let mut svc = new_test_svc(config);
1035        let ex = make_exchange("key", "X", "body");
1036        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1037        assert_eq!(
1038            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1039            Some(&serde_json::json!("predicate"))
1040        );
1041    }
1042
1043    #[tokio::test(start_paused = true)]
1044    async fn test_size_completes_before_timeout() {
1045        let config = AggregatorConfig::correlate_by("key")
1046            .complete_on_size_or_timeout(2, Duration::from_millis(200))
1047            .build()
1048            .unwrap();
1049        let mut svc = new_test_svc(config);
1050
1051        let ex1 = make_exchange("key", "A", "first");
1052        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
1053
1054        let ex2 = make_exchange("key", "A", "second");
1055        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1056
1057        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
1058        assert_eq!(
1059            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1060            Some(&serde_json::json!("size"))
1061        );
1062        assert_eq!(svc.buckets.lock().unwrap().len(), 0);
1063
1064        tokio::time::sleep(Duration::from_millis(300)).await;
1065        assert_eq!(
1066            svc.buckets.lock().unwrap().len(),
1067            0,
1068            "no re-fire after timeout"
1069        );
1070    }
1071
1072    #[tokio::test(start_paused = true)]
1073    async fn test_concurrent_timeout_fire_and_new_exchange() {
1074        let config = AggregatorConfig::correlate_by("key")
1075            .complete_on_size_or_timeout(2, Duration::from_millis(100))
1076            .build()
1077            .unwrap();
1078        let (tx, mut rx) = mpsc::channel(256);
1079        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1080        let cancel = CancellationToken::new();
1081        let mut svc = AggregatorService::new(config, tx, registry, cancel);
1082
1083        let ex = make_exchange("key", "A", "data");
1084        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1085
1086        // Advance time past timeout — timeout task fires and removes bucket
1087        tokio::time::sleep(Duration::from_millis(150)).await;
1088
1089        // New exchange arrives after timeout — starts a fresh bucket
1090        let ex2 = make_exchange("key", "A", "data2");
1091        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1092        assert!(
1093            result.property(CAMEL_AGGREGATOR_PENDING).is_some(),
1094            "should be pending in new bucket"
1095        );
1096
1097        // Drain late emits from timeout
1098        let mut late_count = 0;
1099        while rx.try_recv().is_ok() {
1100            late_count += 1;
1101        }
1102        assert_eq!(
1103            late_count, 1,
1104            "exactly 1 late emit from the timed-out bucket"
1105        );
1106    }
1107
1108    #[tokio::test(start_paused = true)]
1109    async fn test_late_channel_full_drops_with_warning() {
1110        let config = AggregatorConfig::correlate_by("key")
1111            .complete_on_timeout(Duration::from_millis(50))
1112            .build()
1113            .unwrap();
1114        let (tx, mut rx) = mpsc::channel(1);
1115        rx.close();
1116        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1117        let cancel = CancellationToken::new();
1118        let mut svc = AggregatorService::new(config, tx, registry, cancel);
1119
1120        let ex = make_exchange("key", "A", "data");
1121        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1122
1123        tokio::time::sleep(Duration::from_millis(100)).await;
1124        assert_eq!(
1125            svc.buckets.lock().unwrap().len(),
1126            0,
1127            "bucket removed despite channel closed"
1128        );
1129    }
1130
1131    #[tokio::test]
1132    async fn test_aggregate_stream_bodies_creates_valid_json() {
1133        use bytes::Bytes;
1134        use camel_api::{Body, StreamBody, StreamMetadata};
1135        use futures::stream;
1136        use tokio::sync::Mutex;
1137
1138        let chunks = vec![Ok(Bytes::from("test"))];
1139        let stream_body = StreamBody {
1140            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1141            metadata: StreamMetadata {
1142                origin: Some("file:///test.txt".to_string()),
1143                ..Default::default()
1144            },
1145        };
1146
1147        let ex1 = Exchange::new(Message {
1148            headers: Default::default(),
1149            body: Body::Stream(stream_body),
1150        });
1151
1152        let exchanges = vec![ex1];
1153        let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1154
1155        let exchange = result.expect("Expected Ok result");
1156        assert!(
1157            matches!(exchange.input.body, Body::Json(_)),
1158            "Expected Json body"
1159        );
1160
1161        if let Body::Json(value) = exchange.input.body {
1162            let json_str = serde_json::to_string(&value).unwrap();
1163            let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1164
1165            assert!(parsed.is_array(), "Result should be an array");
1166            let arr = parsed.as_array().unwrap();
1167            assert!(arr[0].is_object(), "First element should be an object");
1168            assert!(
1169                arr[0]["_stream"].is_object(),
1170                "Should contain _stream object"
1171            );
1172            assert_eq!(arr[0]["_stream"]["origin"], "file:///test.txt");
1173            assert_eq!(
1174                arr[0]["_stream"]["placeholder"], true,
1175                "placeholder flag should be true"
1176            );
1177        }
1178    }
1179
1180    #[tokio::test]
1181    async fn test_aggregate_stream_bodies_with_none_origin() {
1182        use bytes::Bytes;
1183        use camel_api::{Body, StreamBody, StreamMetadata};
1184        use futures::stream;
1185        use tokio::sync::Mutex;
1186
1187        let chunks = vec![Ok(Bytes::from("test"))];
1188        let stream_body = StreamBody {
1189            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1190            metadata: StreamMetadata {
1191                origin: None,
1192                ..Default::default()
1193            },
1194        };
1195
1196        let ex1 = Exchange::new(Message {
1197            headers: Default::default(),
1198            body: Body::Stream(stream_body),
1199        });
1200
1201        let exchanges = vec![ex1];
1202        let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1203
1204        let exchange = result.expect("Expected Ok result");
1205        assert!(
1206            matches!(exchange.input.body, Body::Json(_)),
1207            "Expected Json body"
1208        );
1209
1210        if let Body::Json(value) = exchange.input.body {
1211            let json_str = serde_json::to_string(&value).unwrap();
1212            let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1213
1214            assert!(parsed.is_array(), "Result should be an array");
1215            let arr = parsed.as_array().unwrap();
1216            assert!(arr[0].is_object(), "First element should be an object");
1217            assert!(
1218                arr[0]["_stream"].is_object(),
1219                "Should contain _stream object"
1220            );
1221            assert_eq!(
1222                arr[0]["_stream"]["origin"],
1223                serde_json::Value::Null,
1224                "origin should be null when None"
1225            );
1226            assert_eq!(
1227                arr[0]["_stream"]["placeholder"], true,
1228                "placeholder flag should be true"
1229            );
1230        }
1231    }
1232
1233    #[tokio::test(start_paused = true)]
1234    async fn test_shutdown_awaits_timeout_handles() {
1235        let config = AggregatorConfig::correlate_by("key")
1236            .complete_on_timeout(Duration::from_millis(100))
1237            .build()
1238            .unwrap();
1239        let (tx, _rx) = mpsc::channel(256);
1240        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1241        let cancel = CancellationToken::new();
1242        let svc = AggregatorService::new(config, tx, registry, cancel);
1243
1244        // Send an exchange to create a pending bucket with a timeout task.
1245        let mut call_svc = svc.clone();
1246        let ex = make_exchange("key", "A", "data");
1247        let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1248
1249        // Verify timeout handle exists.
1250        assert!(
1251            !svc.timeout_handles.lock().unwrap().is_empty(),
1252            "should have a timeout handle"
1253        );
1254
1255        // Shutdown should complete within the 5s deadline (the timeout task
1256        // gets cancelled so it won't wait for the full 100ms sleep).
1257        svc.shutdown().await;
1258
1259        assert!(
1260            svc.timeout_handles.lock().unwrap().is_empty(),
1261            "all handles should be cleaned up after shutdown"
1262        );
1263    }
1264}