Skip to main content

camel_processor/
aggregator.rs

1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6use std::time::{Duration, Instant};
7
8use tokio::sync::mpsc;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tower::Service;
12
13use camel_api::{
14    CamelError,
15    aggregator::{
16        AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode,
17        CompletionReason, CorrelationStrategy,
18    },
19    body::Body,
20    exchange::Exchange,
21    message::Message,
22};
23use camel_language_api::Language;
24
25pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
26
27pub const CAMEL_AGGREGATOR_PENDING: &str = "CamelAggregatorPending";
28pub const CAMEL_AGGREGATED_SIZE: &str = "CamelAggregatedSize";
29pub const CAMEL_AGGREGATED_KEY: &str = "CamelAggregatedKey";
30pub const CAMEL_AGGREGATED_COMPLETION_REASON: &str = "CamelAggregatedCompletionReason";
31
32/// Internal bucket structure with timestamp tracking for TTL eviction.
33struct Bucket {
34    exchanges: Vec<Exchange>,
35    #[allow(dead_code)]
36    created_at: Instant,
37    last_updated: Instant,
38}
39
40impl Bucket {
41    fn new() -> Self {
42        let now = Instant::now();
43        Self {
44            exchanges: Vec::new(),
45            created_at: now,
46            last_updated: now,
47        }
48    }
49
50    fn push(&mut self, exchange: Exchange) {
51        self.exchanges.push(exchange);
52        self.last_updated = Instant::now();
53    }
54
55    #[allow(dead_code)]
56    fn len(&self) -> usize {
57        self.exchanges.len()
58    }
59
60    fn is_expired(&self, ttl: Duration) -> bool {
61        Instant::now().duration_since(self.last_updated) >= ttl
62    }
63}
64
65#[derive(Clone)]
66pub struct AggregatorService {
67    config: AggregatorConfig,
68    buckets: Arc<Mutex<HashMap<String, Bucket>>>,
69    timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
70    timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
71    late_tx: mpsc::Sender<Exchange>,
72    language_registry: SharedLanguageRegistry,
73    route_cancel: CancellationToken,
74}
75
76impl AggregatorService {
77    pub fn new(
78        config: AggregatorConfig,
79        late_tx: mpsc::Sender<Exchange>,
80        language_registry: SharedLanguageRegistry,
81        route_cancel: CancellationToken,
82    ) -> Self {
83        Self {
84            config,
85            buckets: Arc::new(Mutex::new(HashMap::new())),
86            timeout_tasks: Arc::new(Mutex::new(HashMap::new())),
87            timeout_handles: Arc::new(Mutex::new(HashMap::new())),
88            late_tx,
89            language_registry,
90            route_cancel,
91        }
92    }
93
94    pub fn has_timeout(&self) -> bool {
95        has_timeout_condition(&self.config.completion)
96    }
97
98    pub fn force_complete_all(&self) {
99        let mut buckets_guard = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
100        let keys: Vec<String> = buckets_guard.keys().cloned().collect();
101
102        for key in keys {
103            if let Some(bucket) = buckets_guard.remove(&key) {
104                if self.config.force_completion_on_stop {
105                    cancel_timeout_task_with_handle(
106                        &key,
107                        &self.timeout_tasks,
108                        &self.timeout_handles,
109                    );
110                    match aggregate(bucket.exchanges, &self.config.strategy) {
111                        Ok(mut result) => {
112                            result.set_property(
113                                CAMEL_AGGREGATED_COMPLETION_REASON,
114                                serde_json::json!(CompletionReason::Stop.as_str()),
115                            );
116                            if self.late_tx.try_send(result).is_err() {
117                                tracing::warn!(
118                                    key = %key,
119                                    "aggregator force-complete emit dropped: late channel full"
120                                );
121                            }
122                        }
123                        Err(e) => {
124                            tracing::error!(
125                                key = %key,
126                                error = %e,
127                                "aggregation failed in force_complete_all"
128                            );
129                        }
130                    }
131                } else {
132                    cancel_timeout_task_with_handle(
133                        &key,
134                        &self.timeout_tasks,
135                        &self.timeout_handles,
136                    );
137                }
138            }
139        }
140    }
141
142    /// Graceful shutdown: cancel all outstanding timeout tasks and await their
143    /// JoinHandles (with a 5s deadline) so that no tasks are leaked.
144    pub async fn shutdown(&self) {
145        // Cancel all timeout cancellation tokens.
146        {
147            let mut guard = self.timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
148            for token in guard.values() {
149                token.cancel();
150            }
151            guard.clear();
152        };
153
154        // Remove and collect all JoinHandles.
155        let handles: Vec<JoinHandle<()>> = {
156            let mut guard = self
157                .timeout_handles
158                .lock()
159                .unwrap_or_else(|e| e.into_inner());
160            guard.drain().map(|(_, handle)| handle).collect()
161        };
162
163        if handles.is_empty() {
164            return;
165        }
166
167        // Await all handles with a deadline.
168        let _ = tokio::time::timeout(Duration::from_secs(5), async {
169            for handle in handles {
170                let _ = handle.await;
171            }
172        })
173        .await;
174    }
175}
176
177pub fn has_timeout_condition(mode: &CompletionMode) -> bool {
178    match mode {
179        CompletionMode::Single(CompletionCondition::Timeout(_)) => true,
180        CompletionMode::Any(conditions) => conditions
181            .iter()
182            .any(|c| matches!(c, CompletionCondition::Timeout(_))),
183        _ => false,
184    }
185}
186
187impl Service<Exchange> for AggregatorService {
188    type Response = Exchange;
189    type Error = CamelError;
190    type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
191
192    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
193        Poll::Ready(Ok(()))
194    }
195
196    fn call(&mut self, exchange: Exchange) -> Self::Future {
197        let config = self.config.clone();
198        let buckets = Arc::clone(&self.buckets);
199        let timeout_tasks = Arc::clone(&self.timeout_tasks);
200        let timeout_handles = Arc::clone(&self.timeout_handles);
201        let late_tx = self.late_tx.clone();
202        let language_registry = Arc::clone(&self.language_registry);
203        let route_cancel = self.route_cancel.clone();
204
205        Box::pin(async move {
206            let key_value =
207                extract_correlation_key(&exchange, &config.correlation, &language_registry).await?;
208
209            let key_str = serde_json::to_string(&key_value)
210                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
211
212            let completed_bucket = {
213                let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
214
215                if let Some(ttl) = config.bucket_ttl {
216                    guard.retain(|_, bucket| !bucket.is_expired(ttl));
217                }
218
219                if let Some(max) = config.max_buckets
220                    && !guard.contains_key(&key_str)
221                    && guard.len() >= max
222                {
223                    tracing::warn!(
224                        max_buckets = max,
225                        correlation_key = %key_str,
226                        "Aggregator reached max buckets limit, rejecting new correlation key"
227                    );
228                    return Err(CamelError::ProcessorError(format!(
229                        "Aggregator reached maximum {} buckets",
230                        max
231                    )));
232                }
233
234                let bucket = guard.entry(key_str.clone()).or_insert_with(Bucket::new);
235                bucket.push(exchange);
236
237                let (is_complete, reason) =
238                    check_sync_completion(&config.completion, &bucket.exchanges);
239
240                if is_complete {
241                    let exchanges = guard.remove(&key_str).map(|b| b.exchanges);
242                    (exchanges, reason)
243                } else {
244                    (None, CompletionReason::Size) // placeholder; reason unused when None
245                }
246            };
247
248            if completed_bucket.0.is_none() && has_timeout_condition(&config.completion) {
249                let cancel = {
250                    let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
251                    // Cancel and remove old handle for this key (if any).
252                    if let Some(existing) = tt_guard.get(&key_str) {
253                        existing.cancel();
254                    }
255                    let token = CancellationToken::new();
256                    tt_guard.insert(key_str.clone(), token.clone());
257                    token
258                };
259
260                let timeout_dur = extract_timeout_duration(&config.completion);
261                if let Some(timeout) = timeout_dur {
262                    // Remove old handle if present.
263                    {
264                        let mut hh = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
265                        if let Some(old) = hh.remove(&key_str) {
266                            old.abort();
267                        }
268                    }
269                    let handle = spawn_timeout_task(
270                        key_str.clone(),
271                        timeout,
272                        cancel,
273                        buckets.clone(),
274                        timeout_tasks.clone(),
275                        timeout_handles.clone(),
276                        late_tx,
277                        config.strategy.clone(),
278                        config.discard_on_timeout,
279                        route_cancel,
280                    );
281                    timeout_handles
282                        .lock()
283                        .unwrap_or_else(|e| e.into_inner())
284                        .insert(key_str.clone(), handle);
285                }
286            }
287
288            if let Some(exchanges) = completed_bucket.0 {
289                cancel_timeout_task_with_handle(&key_str, &timeout_tasks, &timeout_handles);
290                let reason = completed_bucket.1;
291                let size = exchanges.len();
292                let mut result = aggregate(exchanges, &config.strategy)?;
293                result.set_property(CAMEL_AGGREGATED_SIZE, serde_json::json!(size as u64));
294                result.set_property(CAMEL_AGGREGATED_KEY, key_value);
295                result.set_property(
296                    CAMEL_AGGREGATED_COMPLETION_REASON,
297                    serde_json::json!(reason.as_str()),
298                );
299                Ok(result)
300            } else {
301                let mut pending = Exchange::new(Message {
302                    headers: Default::default(),
303                    body: Body::Empty,
304                });
305                pending.set_property(CAMEL_AGGREGATOR_PENDING, serde_json::json!(true));
306                Ok(pending)
307            }
308        })
309    }
310}
311
312async fn extract_correlation_key(
313    exchange: &Exchange,
314    strategy: &CorrelationStrategy,
315    registry: &SharedLanguageRegistry,
316) -> Result<serde_json::Value, CamelError> {
317    match strategy {
318        CorrelationStrategy::HeaderName(h) => {
319            exchange.input.headers.get(h).cloned().ok_or_else(|| {
320                CamelError::ProcessorError(format!(
321                    "Aggregator: missing correlation key header '{}'",
322                    h
323                ))
324            })
325        }
326        CorrelationStrategy::Expression { expr, language } => {
327            let expression = {
328                let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
329                let lang = reg.get(language).ok_or_else(|| {
330                    CamelError::ProcessorError(format!(
331                        "Aggregator: language '{}' not found in registry",
332                        language
333                    ))
334                })?;
335                lang.create_expression(expr)
336                    .map_err(|e| CamelError::ProcessorError(e.to_string()))?
337            };
338            let value = expression
339                .evaluate(exchange)
340                .await
341                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
342            if value.is_null() {
343                return Err(CamelError::ProcessorError(format!(
344                    "Aggregator: correlation expression '{}' evaluated to null",
345                    expr
346                )));
347            }
348            Ok(value)
349        }
350        CorrelationStrategy::Fn(f) => f(exchange).map(serde_json::Value::String).ok_or_else(|| {
351            CamelError::ProcessorError("Aggregator: correlation function returned None".to_string())
352        }),
353    }
354}
355
356fn check_sync_completion(
357    mode: &CompletionMode,
358    exchanges: &[Exchange],
359) -> (bool, CompletionReason) {
360    match mode {
361        CompletionMode::Single(cond) => check_single(cond, exchanges),
362        CompletionMode::Any(conditions) => {
363            for cond in conditions {
364                if let CompletionCondition::Timeout(_) = cond {
365                    continue;
366                }
367                let (done, reason) = check_single(cond, exchanges);
368                if done {
369                    return (true, reason);
370                }
371            }
372            (false, CompletionReason::Size)
373        }
374    }
375}
376
377fn check_single(cond: &CompletionCondition, exchanges: &[Exchange]) -> (bool, CompletionReason) {
378    match cond {
379        CompletionCondition::Size(n) => (exchanges.len() >= *n, CompletionReason::Size),
380        CompletionCondition::Predicate(pred) => (pred(exchanges), CompletionReason::Predicate),
381        CompletionCondition::Timeout(_) => (false, CompletionReason::Timeout),
382    }
383}
384
385fn extract_timeout_duration(mode: &CompletionMode) -> Option<Duration> {
386    match mode {
387        CompletionMode::Single(CompletionCondition::Timeout(d)) => Some(*d),
388        CompletionMode::Any(conditions) => conditions.iter().find_map(|c| {
389            if let CompletionCondition::Timeout(d) = c {
390                Some(*d)
391            } else {
392                None
393            }
394        }),
395        _ => None,
396    }
397}
398
399fn cancel_timeout_task(key: &str, timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>) {
400    let mut guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
401    if let Some(token) = guard.remove(key) {
402        token.cancel();
403    }
404}
405
406/// Also removes the stored JoinHandle for a cancelled/completed timeout task.
407fn cancel_timeout_task_with_handle(
408    key: &str,
409    timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>,
410    timeout_handles: &Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
411) {
412    cancel_timeout_task(key, timeout_tasks);
413    let mut guard = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
414    guard.remove(key);
415}
416
417#[allow(clippy::too_many_arguments)]
418fn spawn_timeout_task(
419    key: String,
420    timeout: Duration,
421    cancel: CancellationToken,
422    buckets: Arc<Mutex<HashMap<String, Bucket>>>,
423    timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
424    _timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
425    late_tx: mpsc::Sender<Exchange>,
426    strategy: AggregationStrategy,
427    discard: bool,
428    _route_cancel: CancellationToken,
429) -> JoinHandle<()> {
430    let cancel_clone = cancel.clone();
431    tokio::spawn(async move {
432        tokio::select! {
433            _ = tokio::time::sleep(timeout) => {
434                let should_proceed = {
435                    let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
436                    if cancel_clone.is_cancelled() {
437                        false
438                    } else {
439                        tt_guard.remove(&key);
440                        true
441                    }
442                };
443                if !should_proceed {
444                    return;
445                }
446                let bucket_exchanges = {
447                    let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
448                    guard.remove(&key).map(|b| b.exchanges)
449                };
450                if let Some(exchanges) = bucket_exchanges
451                    && !discard
452                {
453                    match aggregate(exchanges, &strategy) {
454                        Ok(mut result) => {
455                            result.set_property(
456                                CAMEL_AGGREGATED_COMPLETION_REASON,
457                                serde_json::json!(CompletionReason::Timeout.as_str()),
458                            );
459                            if late_tx.try_send(result).is_err() {
460                                tracing::warn!(
461                                    key = %key,
462                                    "aggregator timeout emit dropped: late channel full"
463                                );
464                            }
465                        }
466                        Err(e) => {
467                            tracing::error!(
468                                key = %key,
469                                error = %e,
470                                "aggregation failed in timeout task"
471                            );
472                        }
473                    }
474                }
475            }
476            _ = cancel_clone.cancelled() => {}
477        }
478    })
479}
480
481fn aggregate(
482    exchanges: Vec<Exchange>,
483    strategy: &AggregationStrategy,
484) -> Result<Exchange, CamelError> {
485    match strategy {
486        AggregationStrategy::CollectAll => {
487            let bodies: Vec<serde_json::Value> = exchanges
488                .into_iter()
489                .map(|e| match e.input.body {
490                    Body::Json(v) => v,
491                    Body::Text(s) => serde_json::Value::String(s),
492                    Body::Xml(s) => serde_json::Value::String(s),
493                    Body::Bytes(b) => {
494                        serde_json::Value::String(String::from_utf8_lossy(&b).into_owned())
495                    }
496                    Body::Empty => serde_json::Value::Null,
497                    Body::Stream(s) => serde_json::json!({
498                        "_stream": {
499                            "origin": s.metadata.origin,
500                            "placeholder": true,
501                            "hint": "Materialize exchange body with .into_bytes() before aggregation if content needed"
502                        }
503                    }),
504                })
505                .collect();
506            Ok(Exchange::new(Message {
507                headers: Default::default(),
508                body: Body::Json(serde_json::Value::Array(bodies)),
509            }))
510        }
511        AggregationStrategy::Custom(f) => {
512            let mut iter = exchanges.into_iter();
513            let first = iter.next().ok_or_else(|| {
514                CamelError::ProcessorError("Aggregator: empty bucket".to_string())
515            })?;
516            Ok(iter.fold(first, |acc, next| f(acc, next)))
517        }
518    }
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524    use std::collections::HashMap;
525
526    use camel_api::{
527        aggregator::{AggregationStrategy, AggregatorConfig},
528        body::Body,
529        exchange::Exchange,
530        message::Message,
531    };
532    use tokio::sync::mpsc;
533    use tokio_util::sync::CancellationToken;
534    use tower::ServiceExt;
535
536    fn make_exchange(header: &str, value: &str, body: &str) -> Exchange {
537        let mut msg = Message {
538            headers: Default::default(),
539            body: Body::Text(body.to_string()),
540        };
541        msg.headers
542            .insert(header.to_string(), serde_json::json!(value));
543        Exchange::new(msg)
544    }
545
546    fn config_size(n: usize) -> AggregatorConfig {
547        AggregatorConfig::correlate_by("orderId")
548            .complete_when_size(n)
549            .build()
550            .unwrap()
551    }
552
553    fn new_test_svc(config: AggregatorConfig) -> AggregatorService {
554        let (tx, _rx) = mpsc::channel(256);
555        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
556        let cancel = CancellationToken::new();
557        AggregatorService::new(config, tx, registry, cancel)
558    }
559
560    #[tokio::test]
561    async fn test_pending_exchange_not_yet_complete() {
562        let mut svc = new_test_svc(config_size(3));
563        let ex = make_exchange("orderId", "A", "first");
564        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
565        assert!(matches!(result.input.body, Body::Empty));
566        assert_eq!(
567            result.property(CAMEL_AGGREGATOR_PENDING),
568            Some(&serde_json::json!(true))
569        );
570    }
571
572    #[tokio::test]
573    async fn test_completes_on_size() {
574        let mut svc = new_test_svc(config_size(3));
575        for _ in 0..2 {
576            let ex = make_exchange("orderId", "A", "item");
577            let r = svc.ready().await.unwrap().call(ex).await.unwrap();
578            assert!(matches!(r.input.body, Body::Empty));
579        }
580        let ex = make_exchange("orderId", "A", "last");
581        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
582        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
583        assert_eq!(
584            result.property(CAMEL_AGGREGATED_SIZE),
585            Some(&serde_json::json!(3u64))
586        );
587    }
588
589    #[tokio::test]
590    async fn test_collect_all_produces_json_array() {
591        let mut svc = new_test_svc(config_size(2));
592        svc.ready()
593            .await
594            .unwrap()
595            .call(make_exchange("orderId", "A", "alpha"))
596            .await
597            .unwrap();
598        let result = svc
599            .ready()
600            .await
601            .unwrap()
602            .call(make_exchange("orderId", "A", "beta"))
603            .await
604            .unwrap();
605        let Body::Json(v) = &result.input.body else {
606            panic!("expected Body::Json")
607        };
608        let arr = v.as_array().unwrap();
609        assert_eq!(arr.len(), 2);
610        assert_eq!(arr[0], serde_json::json!("alpha"));
611        assert_eq!(arr[1], serde_json::json!("beta"));
612    }
613
614    #[tokio::test]
615    async fn test_two_keys_independent_buckets() {
616        // completionSize=3 so we can test that A and B accumulate independently.
617        let mut svc = new_test_svc(config_size(3));
618        svc.ready()
619            .await
620            .unwrap()
621            .call(make_exchange("orderId", "A", "a1"))
622            .await
623            .unwrap();
624        svc.ready()
625            .await
626            .unwrap()
627            .call(make_exchange("orderId", "B", "b1"))
628            .await
629            .unwrap();
630        svc.ready()
631            .await
632            .unwrap()
633            .call(make_exchange("orderId", "A", "a2"))
634            .await
635            .unwrap();
636        // A has 2 items, B has 1 item — neither complete yet
637        let ra = svc
638            .ready()
639            .await
640            .unwrap()
641            .call(make_exchange("orderId", "A", "a3"))
642            .await
643            .unwrap();
644        // A now has 3 → completes
645        assert!(matches!(ra.input.body, Body::Json(_)));
646        // B only has 1 → still pending
647        let rb = svc
648            .ready()
649            .await
650            .unwrap()
651            .call(make_exchange("orderId", "B", "b_check"))
652            .await
653            .unwrap();
654        assert!(matches!(rb.input.body, Body::Empty));
655    }
656
657    #[tokio::test]
658    async fn test_bucket_resets_after_completion() {
659        let mut svc = new_test_svc(config_size(2));
660        svc.ready()
661            .await
662            .unwrap()
663            .call(make_exchange("orderId", "A", "x"))
664            .await
665            .unwrap();
666        svc.ready()
667            .await
668            .unwrap()
669            .call(make_exchange("orderId", "A", "x"))
670            .await
671            .unwrap(); // completes
672        // New bucket starts
673        let r = svc
674            .ready()
675            .await
676            .unwrap()
677            .call(make_exchange("orderId", "A", "new"))
678            .await
679            .unwrap();
680        assert!(matches!(r.input.body, Body::Empty)); // pending again
681    }
682
683    #[tokio::test]
684    async fn test_completion_size_1_emits_immediately() {
685        let mut svc = new_test_svc(config_size(1));
686        let ex = make_exchange("orderId", "A", "solo");
687        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
688        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
689    }
690
691    #[tokio::test]
692    async fn test_custom_aggregation_strategy() {
693        use camel_api::aggregator::AggregationFn;
694        use std::sync::Arc;
695
696        let f: AggregationFn = Arc::new(|mut acc: Exchange, next: Exchange| {
697            let combined = format!(
698                "{}+{}",
699                acc.input.body.as_text().unwrap_or(""),
700                next.input.body.as_text().unwrap_or("")
701            );
702            acc.input.body = Body::Text(combined);
703            acc
704        });
705        let config = AggregatorConfig::correlate_by("key")
706            .complete_when_size(2)
707            .strategy(AggregationStrategy::Custom(f))
708            .build()
709            .unwrap();
710        let mut svc = new_test_svc(config);
711        svc.ready()
712            .await
713            .unwrap()
714            .call(make_exchange("key", "X", "hello"))
715            .await
716            .unwrap();
717        let result = svc
718            .ready()
719            .await
720            .unwrap()
721            .call(make_exchange("key", "X", "world"))
722            .await
723            .unwrap();
724        assert_eq!(result.input.body.as_text(), Some("hello+world"));
725    }
726
727    #[tokio::test]
728    async fn test_completion_predicate() {
729        let config = AggregatorConfig::correlate_by("key")
730            .complete_when(|bucket| {
731                bucket
732                    .iter()
733                    .any(|e| e.input.body.as_text() == Some("DONE"))
734            })
735            .build()
736            .unwrap();
737        let mut svc = new_test_svc(config);
738        svc.ready()
739            .await
740            .unwrap()
741            .call(make_exchange("key", "K", "first"))
742            .await
743            .unwrap();
744        svc.ready()
745            .await
746            .unwrap()
747            .call(make_exchange("key", "K", "second"))
748            .await
749            .unwrap();
750        let result = svc
751            .ready()
752            .await
753            .unwrap()
754            .call(make_exchange("key", "K", "DONE"))
755            .await
756            .unwrap();
757        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
758    }
759
760    #[tokio::test]
761    async fn test_missing_header_returns_error() {
762        let mut svc = new_test_svc(config_size(2));
763        let msg = Message {
764            headers: Default::default(),
765            body: Body::Text("no key".into()),
766        };
767        let ex = Exchange::new(msg);
768        let result = svc.ready().await.unwrap().call(ex).await;
769        assert!(result.is_err());
770        assert!(matches!(
771            result.unwrap_err(),
772            camel_api::CamelError::ProcessorError(_)
773        ));
774    }
775
776    #[tokio::test]
777    async fn test_cloned_service_shares_state() {
778        let svc1 = new_test_svc(config_size(2));
779        let mut svc2 = svc1.clone();
780        // send first exchange via svc1
781        svc1.clone()
782            .ready()
783            .await
784            .unwrap()
785            .call(make_exchange("orderId", "A", "from-svc1"))
786            .await
787            .unwrap();
788        // send second exchange via svc2 — should complete because same Arc<Mutex>
789        let result = svc2
790            .ready()
791            .await
792            .unwrap()
793            .call(make_exchange("orderId", "A", "from-svc2"))
794            .await
795            .unwrap();
796        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
797    }
798
799    #[tokio::test]
800    async fn test_camel_aggregated_key_property_set() {
801        let mut svc = new_test_svc(config_size(1));
802        let ex = make_exchange("orderId", "ORDER-42", "body");
803        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
804        assert_eq!(
805            result.property(CAMEL_AGGREGATED_KEY),
806            Some(&serde_json::json!("ORDER-42"))
807        );
808    }
809
810    #[tokio::test]
811    async fn test_aggregator_enforces_max_buckets() {
812        let config = AggregatorConfig::correlate_by("orderId")
813            .complete_when_size(2)
814            .max_buckets(3)
815            .build()
816            .unwrap();
817
818        let mut svc = new_test_svc(config);
819
820        // Create 3 different correlation keys (fills limit)
821        for i in 0..3 {
822            let ex = make_exchange("orderId", &format!("key-{}", i), "body");
823            let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
824        }
825
826        // 4th key should be rejected
827        let ex = make_exchange("orderId", "key-4", "body");
828        let result = svc.ready().await.unwrap().call(ex).await;
829
830        assert!(result.is_err(), "Should reject when max buckets reached");
831        let err = result.unwrap_err().to_string();
832        assert!(
833            err.contains("maximum"),
834            "Error message should contain 'maximum': {}",
835            err
836        );
837    }
838
839    #[tokio::test]
840    async fn test_max_buckets_allows_existing_key() {
841        let config = AggregatorConfig::correlate_by("orderId")
842            .complete_when_size(5) // Large size so bucket doesn't complete
843            .max_buckets(2)
844            .build()
845            .unwrap();
846
847        let mut svc = new_test_svc(config);
848
849        // Create 2 different correlation keys (fills limit)
850        let ex1 = make_exchange("orderId", "key-A", "body1");
851        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
852        let ex2 = make_exchange("orderId", "key-B", "body2");
853        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
854
855        // Should still allow adding to existing key
856        let ex3 = make_exchange("orderId", "key-A", "body3");
857        let result = svc.ready().await.unwrap().call(ex3).await;
858        assert!(
859            result.is_ok(),
860            "Should allow adding to existing bucket even at max limit"
861        );
862    }
863
864    #[tokio::test]
865    async fn test_bucket_ttl_eviction() {
866        let config = AggregatorConfig::correlate_by("orderId")
867            .complete_when_size(10) // Large size so bucket doesn't complete normally
868            .bucket_ttl(Duration::from_millis(50))
869            .build()
870            .unwrap();
871
872        let mut svc = new_test_svc(config);
873
874        // Create a bucket
875        let ex1 = make_exchange("orderId", "key-A", "body1");
876        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
877
878        // Wait for TTL to expire
879        tokio::time::sleep(Duration::from_millis(100)).await;
880
881        // Create a new bucket - this should trigger eviction of the old one
882        let ex2 = make_exchange("orderId", "key-B", "body2");
883        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
884
885        // The expired bucket should have been evicted, so we should be able to
886        // add a new key-A bucket again
887        let ex3 = make_exchange("orderId", "key-A", "body3");
888        let result = svc.ready().await.unwrap().call(ex3).await;
889        assert!(result.is_ok(), "Should be able to recreate evicted bucket");
890    }
891
892    #[tokio::test(start_paused = true)]
893    async fn test_timeout_completes_bucket() {
894        let config = AggregatorConfig::correlate_by("key")
895            .complete_on_timeout(Duration::from_millis(100))
896            .build()
897            .unwrap();
898        let mut svc = new_test_svc(config);
899        let ex = make_exchange("key", "A", "data");
900        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
901        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_some());
902
903        tokio::time::sleep(Duration::from_millis(200)).await;
904
905        assert_eq!(
906            svc.buckets.lock().unwrap().len(),
907            0,
908            "bucket should be removed after timeout"
909        );
910    }
911
912    #[tokio::test(start_paused = true)]
913    async fn test_timeout_resets_on_new_exchange() {
914        let config = AggregatorConfig::correlate_by("key")
915            .complete_on_timeout(Duration::from_millis(150))
916            .build()
917            .unwrap();
918        let mut svc = new_test_svc(config);
919
920        let ex1 = make_exchange("key", "A", "first");
921        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
922
923        tokio::time::sleep(Duration::from_millis(100)).await;
924
925        let ex2 = make_exchange("key", "A", "second");
926        let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
927
928        tokio::time::sleep(Duration::from_millis(100)).await;
929
930        assert_eq!(
931            svc.buckets.lock().unwrap().len(),
932            1,
933            "bucket should still exist — timeout was reset"
934        );
935
936        tokio::time::sleep(Duration::from_millis(100)).await;
937
938        assert_eq!(
939            svc.buckets.lock().unwrap().len(),
940            0,
941            "bucket should be gone after timeout fires"
942        );
943    }
944
945    #[tokio::test]
946    async fn test_composable_size_and_timeout() {
947        let config = AggregatorConfig::correlate_by("key")
948            .complete_on_size_or_timeout(2, Duration::from_millis(200))
949            .build()
950            .unwrap();
951        let mut svc = new_test_svc(config);
952
953        let ex1 = make_exchange("key", "A", "first");
954        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
955        assert!(svc.buckets.lock().unwrap().contains_key("\"A\""));
956
957        let ex2 = make_exchange("key", "A", "second");
958        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
959        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
960        assert_eq!(
961            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
962            Some(&serde_json::json!("size"))
963        );
964    }
965
966    #[tokio::test(start_paused = true)]
967    async fn test_discard_on_timeout() {
968        let config = AggregatorConfig::correlate_by("key")
969            .complete_on_timeout(Duration::from_millis(50))
970            .discard_on_timeout(true)
971            .build()
972            .unwrap();
973        let (tx, mut rx) = mpsc::channel(256);
974        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
975        let cancel = CancellationToken::new();
976        let mut svc = AggregatorService::new(config, tx, registry, cancel);
977
978        let ex = make_exchange("key", "A", "data");
979        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
980
981        tokio::time::sleep(Duration::from_millis(100)).await;
982
983        assert!(
984            rx.try_recv().is_err(),
985            "no emit expected with discard_on_timeout"
986        );
987        assert_eq!(svc.buckets.lock().unwrap().len(), 0);
988        assert!(
989            svc.timeout_tasks.lock().unwrap().is_empty(),
990            "timeout task should be cleaned up"
991        );
992    }
993
994    #[tokio::test]
995    async fn test_force_completion_on_stop() {
996        let config = AggregatorConfig::correlate_by("key")
997            .complete_when_size(10)
998            .force_completion_on_stop(true)
999            .build()
1000            .unwrap();
1001        let (tx, mut rx) = mpsc::channel(256);
1002        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1003        let cancel = CancellationToken::new();
1004        let svc = AggregatorService::new(config, tx, registry, cancel);
1005
1006        let mut call_svc = svc.clone();
1007        let ex = make_exchange("key", "A", "data");
1008        let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1009
1010        svc.force_complete_all();
1011
1012        let result = rx.try_recv().expect("should emit on force-complete");
1013        assert!(
1014            result.input.body.as_text().is_some() || matches!(result.input.body, Body::Json(_))
1015        );
1016        assert_eq!(
1017            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1018            Some(&serde_json::json!("stop"))
1019        );
1020    }
1021
1022    #[tokio::test]
1023    async fn test_completion_reason_property_size() {
1024        let config = AggregatorConfig::correlate_by("key")
1025            .complete_when_size(1)
1026            .build()
1027            .unwrap();
1028        let mut svc = new_test_svc(config);
1029        let ex = make_exchange("key", "X", "body");
1030        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1031        assert_eq!(
1032            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1033            Some(&serde_json::json!("size"))
1034        );
1035    }
1036
1037    #[tokio::test]
1038    async fn test_completion_reason_property_predicate() {
1039        let config = AggregatorConfig::correlate_by("key")
1040            .complete_when(|_| true)
1041            .build()
1042            .unwrap();
1043        let mut svc = new_test_svc(config);
1044        let ex = make_exchange("key", "X", "body");
1045        let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1046        assert_eq!(
1047            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1048            Some(&serde_json::json!("predicate"))
1049        );
1050    }
1051
1052    #[tokio::test(start_paused = true)]
1053    async fn test_size_completes_before_timeout() {
1054        let config = AggregatorConfig::correlate_by("key")
1055            .complete_on_size_or_timeout(2, Duration::from_millis(200))
1056            .build()
1057            .unwrap();
1058        let mut svc = new_test_svc(config);
1059
1060        let ex1 = make_exchange("key", "A", "first");
1061        let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
1062
1063        let ex2 = make_exchange("key", "A", "second");
1064        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1065
1066        assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
1067        assert_eq!(
1068            result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1069            Some(&serde_json::json!("size"))
1070        );
1071        assert_eq!(svc.buckets.lock().unwrap().len(), 0);
1072
1073        tokio::time::sleep(Duration::from_millis(300)).await;
1074        assert_eq!(
1075            svc.buckets.lock().unwrap().len(),
1076            0,
1077            "no re-fire after timeout"
1078        );
1079    }
1080
1081    #[tokio::test(start_paused = true)]
1082    async fn test_concurrent_timeout_fire_and_new_exchange() {
1083        let config = AggregatorConfig::correlate_by("key")
1084            .complete_on_size_or_timeout(2, Duration::from_millis(100))
1085            .build()
1086            .unwrap();
1087        let (tx, mut rx) = mpsc::channel(256);
1088        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1089        let cancel = CancellationToken::new();
1090        let mut svc = AggregatorService::new(config, tx, registry, cancel);
1091
1092        let ex = make_exchange("key", "A", "data");
1093        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1094
1095        // Advance time past timeout — timeout task fires and removes bucket
1096        tokio::time::sleep(Duration::from_millis(150)).await;
1097
1098        // New exchange arrives after timeout — starts a fresh bucket
1099        let ex2 = make_exchange("key", "A", "data2");
1100        let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1101        assert!(
1102            result.property(CAMEL_AGGREGATOR_PENDING).is_some(),
1103            "should be pending in new bucket"
1104        );
1105
1106        // Drain late emits from timeout
1107        let mut late_count = 0;
1108        while rx.try_recv().is_ok() {
1109            late_count += 1;
1110        }
1111        assert_eq!(
1112            late_count, 1,
1113            "exactly 1 late emit from the timed-out bucket"
1114        );
1115    }
1116
1117    #[tokio::test(start_paused = true)]
1118    async fn test_late_channel_full_drops_with_warning() {
1119        let config = AggregatorConfig::correlate_by("key")
1120            .complete_on_timeout(Duration::from_millis(50))
1121            .build()
1122            .unwrap();
1123        let (tx, mut rx) = mpsc::channel(1);
1124        rx.close();
1125        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1126        let cancel = CancellationToken::new();
1127        let mut svc = AggregatorService::new(config, tx, registry, cancel);
1128
1129        let ex = make_exchange("key", "A", "data");
1130        let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1131
1132        tokio::time::sleep(Duration::from_millis(100)).await;
1133        assert_eq!(
1134            svc.buckets.lock().unwrap().len(),
1135            0,
1136            "bucket removed despite channel closed"
1137        );
1138    }
1139
1140    #[tokio::test]
1141    async fn test_aggregate_stream_bodies_creates_valid_json() {
1142        use bytes::Bytes;
1143        use camel_api::{Body, StreamBody, StreamMetadata};
1144        use futures::stream;
1145        use tokio::sync::Mutex;
1146
1147        let chunks = vec![Ok(Bytes::from("test"))];
1148        let stream_body = StreamBody {
1149            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1150            metadata: StreamMetadata {
1151                origin: Some("file:///test.txt".to_string()),
1152                ..Default::default()
1153            },
1154        };
1155
1156        let ex1 = Exchange::new(Message {
1157            headers: Default::default(),
1158            body: Body::Stream(stream_body),
1159        });
1160
1161        let exchanges = vec![ex1];
1162        let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1163
1164        let exchange = result.expect("Expected Ok result");
1165        assert!(
1166            matches!(exchange.input.body, Body::Json(_)),
1167            "Expected Json body"
1168        );
1169
1170        if let Body::Json(value) = exchange.input.body {
1171            let json_str = serde_json::to_string(&value).unwrap();
1172            let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1173
1174            assert!(parsed.is_array(), "Result should be an array");
1175            let arr = parsed.as_array().unwrap();
1176            assert!(arr[0].is_object(), "First element should be an object");
1177            assert!(
1178                arr[0]["_stream"].is_object(),
1179                "Should contain _stream object"
1180            );
1181            assert_eq!(arr[0]["_stream"]["origin"], "file:///test.txt");
1182            assert_eq!(
1183                arr[0]["_stream"]["placeholder"], true,
1184                "placeholder flag should be true"
1185            );
1186        }
1187    }
1188
1189    #[tokio::test]
1190    async fn test_aggregate_stream_bodies_with_none_origin() {
1191        use bytes::Bytes;
1192        use camel_api::{Body, StreamBody, StreamMetadata};
1193        use futures::stream;
1194        use tokio::sync::Mutex;
1195
1196        let chunks = vec![Ok(Bytes::from("test"))];
1197        let stream_body = StreamBody {
1198            stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1199            metadata: StreamMetadata {
1200                origin: None,
1201                ..Default::default()
1202            },
1203        };
1204
1205        let ex1 = Exchange::new(Message {
1206            headers: Default::default(),
1207            body: Body::Stream(stream_body),
1208        });
1209
1210        let exchanges = vec![ex1];
1211        let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1212
1213        let exchange = result.expect("Expected Ok result");
1214        assert!(
1215            matches!(exchange.input.body, Body::Json(_)),
1216            "Expected Json body"
1217        );
1218
1219        if let Body::Json(value) = exchange.input.body {
1220            let json_str = serde_json::to_string(&value).unwrap();
1221            let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1222
1223            assert!(parsed.is_array(), "Result should be an array");
1224            let arr = parsed.as_array().unwrap();
1225            assert!(arr[0].is_object(), "First element should be an object");
1226            assert!(
1227                arr[0]["_stream"].is_object(),
1228                "Should contain _stream object"
1229            );
1230            assert_eq!(
1231                arr[0]["_stream"]["origin"],
1232                serde_json::Value::Null,
1233                "origin should be null when None"
1234            );
1235            assert_eq!(
1236                arr[0]["_stream"]["placeholder"], true,
1237                "placeholder flag should be true"
1238            );
1239        }
1240    }
1241
1242    #[tokio::test(start_paused = true)]
1243    async fn test_shutdown_awaits_timeout_handles() {
1244        let config = AggregatorConfig::correlate_by("key")
1245            .complete_on_timeout(Duration::from_millis(100))
1246            .build()
1247            .unwrap();
1248        let (tx, _rx) = mpsc::channel(256);
1249        let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1250        let cancel = CancellationToken::new();
1251        let svc = AggregatorService::new(config, tx, registry, cancel);
1252
1253        // Send an exchange to create a pending bucket with a timeout task.
1254        let mut call_svc = svc.clone();
1255        let ex = make_exchange("key", "A", "data");
1256        let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1257
1258        // Verify timeout handle exists.
1259        assert!(
1260            !svc.timeout_handles.lock().unwrap().is_empty(),
1261            "should have a timeout handle"
1262        );
1263
1264        // Shutdown should complete within the 5s deadline (the timeout task
1265        // gets cancelled so it won't wait for the full 100ms sleep).
1266        svc.shutdown().await;
1267
1268        assert!(
1269            svc.timeout_handles.lock().unwrap().is_empty(),
1270            "all handles should be cleaned up after shutdown"
1271        );
1272    }
1273}