1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6use std::time::{Duration, Instant};
7
8use tokio::sync::mpsc;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tower::Service;
12
13use camel_api::{
14 CamelError,
15 aggregator::{
16 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode,
17 CompletionReason, CorrelationStrategy,
18 },
19 body::Body,
20 exchange::Exchange,
21 message::Message,
22};
23use camel_language_api::Language;
24
25pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
26
27pub const CAMEL_AGGREGATOR_PENDING: &str = "CamelAggregatorPending";
28pub const CAMEL_AGGREGATED_SIZE: &str = "CamelAggregatedSize";
29pub const CAMEL_AGGREGATED_KEY: &str = "CamelAggregatedKey";
30pub const CAMEL_AGGREGATED_COMPLETION_REASON: &str = "CamelAggregatedCompletionReason";
31
32struct Bucket {
34 exchanges: Vec<Exchange>,
35 last_updated: Instant,
36}
37
38impl Bucket {
39 fn new() -> Self {
40 Self {
41 exchanges: Vec::new(),
42 last_updated: Instant::now(),
43 }
44 }
45
46 fn push(&mut self, exchange: Exchange) {
47 self.exchanges.push(exchange);
48 self.last_updated = Instant::now();
49 }
50
51 fn is_expired(&self, ttl: Duration) -> bool {
52 Instant::now().duration_since(self.last_updated) >= ttl
53 }
54}
55
56#[derive(Clone)]
57pub struct AggregatorService {
58 config: AggregatorConfig,
59 buckets: Arc<Mutex<HashMap<String, Bucket>>>,
60 timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
61 timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
62 late_tx: mpsc::Sender<Exchange>,
63 language_registry: SharedLanguageRegistry,
64 route_cancel: CancellationToken,
65}
66
67impl AggregatorService {
68 pub fn new(
69 config: AggregatorConfig,
70 late_tx: mpsc::Sender<Exchange>,
71 language_registry: SharedLanguageRegistry,
72 route_cancel: CancellationToken,
73 ) -> Self {
74 Self {
75 config,
76 buckets: Arc::new(Mutex::new(HashMap::new())),
77 timeout_tasks: Arc::new(Mutex::new(HashMap::new())),
78 timeout_handles: Arc::new(Mutex::new(HashMap::new())),
79 late_tx,
80 language_registry,
81 route_cancel,
82 }
83 }
84
85 pub fn has_timeout(&self) -> bool {
86 has_timeout_condition(&self.config.completion)
87 }
88
89 pub fn force_complete_all(&self) {
90 let mut buckets_guard = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
91 let keys: Vec<String> = buckets_guard.keys().cloned().collect();
92
93 for key in keys {
94 if let Some(bucket) = buckets_guard.remove(&key) {
95 if self.config.force_completion_on_stop {
96 cancel_timeout_task_with_handle(
97 &key,
98 &self.timeout_tasks,
99 &self.timeout_handles,
100 );
101 match aggregate(bucket.exchanges, &self.config.strategy) {
102 Ok(mut result) => {
103 result.set_property(
104 CAMEL_AGGREGATED_COMPLETION_REASON,
105 serde_json::json!(CompletionReason::Stop.as_str()),
106 );
107 if self.late_tx.try_send(result).is_err() {
108 tracing::warn!(
109 key = %key,
110 "aggregator force-complete emit dropped: late channel full"
111 );
112 }
113 }
114 Err(e) => {
115 tracing::error!(
116 key = %key,
117 error = %e,
118 "aggregation failed in force_complete_all"
119 );
120 }
121 }
122 } else {
123 cancel_timeout_task_with_handle(
124 &key,
125 &self.timeout_tasks,
126 &self.timeout_handles,
127 );
128 }
129 }
130 }
131 }
132
133 pub async fn shutdown(&self) {
136 {
138 let mut guard = self.timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
139 for token in guard.values() {
140 token.cancel();
141 }
142 guard.clear();
143 };
144
145 let handles: Vec<JoinHandle<()>> = {
147 let mut guard = self
148 .timeout_handles
149 .lock()
150 .unwrap_or_else(|e| e.into_inner());
151 guard.drain().map(|(_, handle)| handle).collect()
152 };
153
154 if handles.is_empty() {
155 return;
156 }
157
158 let _ = tokio::time::timeout(Duration::from_secs(5), async {
160 for handle in handles {
161 let _ = handle.await;
162 }
163 })
164 .await;
165 }
166}
167
168pub fn has_timeout_condition(mode: &CompletionMode) -> bool {
169 match mode {
170 CompletionMode::Single(CompletionCondition::Timeout(_)) => true,
171 CompletionMode::Any(conditions) => conditions
172 .iter()
173 .any(|c| matches!(c, CompletionCondition::Timeout(_))),
174 _ => false,
175 }
176}
177
178impl Service<Exchange> for AggregatorService {
179 type Response = Exchange;
180 type Error = CamelError;
181 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
182
183 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
184 Poll::Ready(Ok(()))
185 }
186
187 fn call(&mut self, exchange: Exchange) -> Self::Future {
188 let config = self.config.clone();
189 let buckets = Arc::clone(&self.buckets);
190 let timeout_tasks = Arc::clone(&self.timeout_tasks);
191 let timeout_handles = Arc::clone(&self.timeout_handles);
192 let late_tx = self.late_tx.clone();
193 let language_registry = Arc::clone(&self.language_registry);
194 let route_cancel = self.route_cancel.clone();
195
196 Box::pin(async move {
197 let key_value =
198 extract_correlation_key(&exchange, &config.correlation, &language_registry).await?;
199
200 let key_str = serde_json::to_string(&key_value)
201 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
202
203 let completed_bucket = {
204 let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
205
206 if let Some(ttl) = config.bucket_ttl {
207 guard.retain(|_, bucket| !bucket.is_expired(ttl));
208 }
209
210 if let Some(max) = config.max_buckets
211 && !guard.contains_key(&key_str)
212 && guard.len() >= max
213 {
214 tracing::warn!(
215 max_buckets = max,
216 correlation_key = %key_str,
217 "Aggregator reached max buckets limit, rejecting new correlation key"
218 );
219 return Err(CamelError::ProcessorError(format!(
220 "Aggregator reached maximum {} buckets",
221 max
222 )));
223 }
224
225 let bucket = guard.entry(key_str.clone()).or_insert_with(Bucket::new);
226 bucket.push(exchange);
227
228 let (is_complete, reason) =
229 check_sync_completion(&config.completion, &bucket.exchanges);
230
231 if is_complete {
232 let exchanges = guard.remove(&key_str).map(|b| b.exchanges);
233 (exchanges, reason)
234 } else {
235 (None, CompletionReason::Size) }
237 };
238
239 if completed_bucket.0.is_none() && has_timeout_condition(&config.completion) {
240 let cancel = {
241 let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
242 if let Some(existing) = tt_guard.get(&key_str) {
244 existing.cancel();
245 }
246 let token = CancellationToken::new();
247 tt_guard.insert(key_str.clone(), token.clone());
248 token
249 };
250
251 let timeout_dur = extract_timeout_duration(&config.completion);
252 if let Some(timeout) = timeout_dur {
253 {
255 let mut hh = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
256 if let Some(old) = hh.remove(&key_str) {
257 old.abort();
258 }
259 }
260 let handle = spawn_timeout_task(
261 key_str.clone(),
262 timeout,
263 cancel,
264 buckets.clone(),
265 timeout_tasks.clone(),
266 timeout_handles.clone(),
267 late_tx,
268 config.strategy.clone(),
269 config.discard_on_timeout,
270 route_cancel,
271 );
272 timeout_handles
273 .lock()
274 .unwrap_or_else(|e| e.into_inner())
275 .insert(key_str.clone(), handle);
276 }
277 }
278
279 if let Some(exchanges) = completed_bucket.0 {
280 cancel_timeout_task_with_handle(&key_str, &timeout_tasks, &timeout_handles);
281 let reason = completed_bucket.1;
282 let size = exchanges.len();
283 let mut result = aggregate(exchanges, &config.strategy)?;
284 result.set_property(CAMEL_AGGREGATED_SIZE, serde_json::json!(size as u64));
285 result.set_property(CAMEL_AGGREGATED_KEY, key_value);
286 result.set_property(
287 CAMEL_AGGREGATED_COMPLETION_REASON,
288 serde_json::json!(reason.as_str()),
289 );
290 Ok(result)
291 } else {
292 let mut pending = Exchange::new(Message {
293 headers: Default::default(),
294 body: Body::Empty,
295 });
296 pending.set_property(CAMEL_AGGREGATOR_PENDING, serde_json::json!(true));
297 Ok(pending)
298 }
299 })
300 }
301}
302
303async fn extract_correlation_key(
304 exchange: &Exchange,
305 strategy: &CorrelationStrategy,
306 registry: &SharedLanguageRegistry,
307) -> Result<serde_json::Value, CamelError> {
308 match strategy {
309 CorrelationStrategy::HeaderName(h) => {
310 exchange.input.headers.get(h).cloned().ok_or_else(|| {
311 CamelError::ProcessorError(format!(
312 "Aggregator: missing correlation key header '{}'",
313 h
314 ))
315 })
316 }
317 CorrelationStrategy::Expression { expr, language } => {
318 let expression = {
319 let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
320 let lang = reg.get(language).ok_or_else(|| {
321 CamelError::ProcessorError(format!(
322 "Aggregator: language '{}' not found in registry",
323 language
324 ))
325 })?;
326 lang.create_expression(expr)
327 .map_err(|e| CamelError::ProcessorError(e.to_string()))?
328 };
329 let value = expression
330 .evaluate(exchange)
331 .await
332 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
333 if value.is_null() {
334 return Err(CamelError::ProcessorError(format!(
335 "Aggregator: correlation expression '{}' evaluated to null",
336 expr
337 )));
338 }
339 Ok(value)
340 }
341 CorrelationStrategy::Fn(f) => f(exchange).map(serde_json::Value::String).ok_or_else(|| {
342 CamelError::ProcessorError("Aggregator: correlation function returned None".to_string())
343 }),
344 }
345}
346
347fn check_sync_completion(
348 mode: &CompletionMode,
349 exchanges: &[Exchange],
350) -> (bool, CompletionReason) {
351 match mode {
352 CompletionMode::Single(cond) => check_single(cond, exchanges),
353 CompletionMode::Any(conditions) => {
354 for cond in conditions {
355 if let CompletionCondition::Timeout(_) = cond {
356 continue;
357 }
358 let (done, reason) = check_single(cond, exchanges);
359 if done {
360 return (true, reason);
361 }
362 }
363 (false, CompletionReason::Size)
364 }
365 }
366}
367
368fn check_single(cond: &CompletionCondition, exchanges: &[Exchange]) -> (bool, CompletionReason) {
369 match cond {
370 CompletionCondition::Size(n) => (exchanges.len() >= *n, CompletionReason::Size),
371 CompletionCondition::Predicate(pred) => (pred(exchanges), CompletionReason::Predicate),
372 CompletionCondition::Timeout(_) => (false, CompletionReason::Timeout),
373 }
374}
375
376fn extract_timeout_duration(mode: &CompletionMode) -> Option<Duration> {
377 match mode {
378 CompletionMode::Single(CompletionCondition::Timeout(d)) => Some(*d),
379 CompletionMode::Any(conditions) => conditions.iter().find_map(|c| {
380 if let CompletionCondition::Timeout(d) = c {
381 Some(*d)
382 } else {
383 None
384 }
385 }),
386 _ => None,
387 }
388}
389
390fn cancel_timeout_task(key: &str, timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>) {
391 let mut guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
392 if let Some(token) = guard.remove(key) {
393 token.cancel();
394 }
395}
396
397fn cancel_timeout_task_with_handle(
399 key: &str,
400 timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>,
401 timeout_handles: &Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
402) {
403 cancel_timeout_task(key, timeout_tasks);
404 let mut guard = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
405 guard.remove(key);
406}
407
408#[allow(clippy::too_many_arguments)]
409fn spawn_timeout_task(
410 key: String,
411 timeout: Duration,
412 cancel: CancellationToken,
413 buckets: Arc<Mutex<HashMap<String, Bucket>>>,
414 timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
415 _timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
416 late_tx: mpsc::Sender<Exchange>,
417 strategy: AggregationStrategy,
418 discard: bool,
419 _route_cancel: CancellationToken,
420) -> JoinHandle<()> {
421 let cancel_clone = cancel.clone();
422 tokio::spawn(async move {
423 tokio::select! {
424 _ = tokio::time::sleep(timeout) => {
425 let should_proceed = {
426 let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
427 if cancel_clone.is_cancelled() {
428 false
429 } else {
430 tt_guard.remove(&key);
431 true
432 }
433 };
434 if !should_proceed {
435 return;
436 }
437 let bucket_exchanges = {
438 let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
439 guard.remove(&key).map(|b| b.exchanges)
440 };
441 if let Some(exchanges) = bucket_exchanges
442 && !discard
443 {
444 match aggregate(exchanges, &strategy) {
445 Ok(mut result) => {
446 result.set_property(
447 CAMEL_AGGREGATED_COMPLETION_REASON,
448 serde_json::json!(CompletionReason::Timeout.as_str()),
449 );
450 if late_tx.try_send(result).is_err() {
451 tracing::warn!(
452 key = %key,
453 "aggregator timeout emit dropped: late channel full"
454 );
455 }
456 }
457 Err(e) => {
458 tracing::error!(
459 key = %key,
460 error = %e,
461 "aggregation failed in timeout task"
462 );
463 }
464 }
465 }
466 }
467 _ = cancel_clone.cancelled() => {}
468 }
469 })
470}
471
472fn aggregate(
473 exchanges: Vec<Exchange>,
474 strategy: &AggregationStrategy,
475) -> Result<Exchange, CamelError> {
476 match strategy {
477 AggregationStrategy::CollectAll => {
478 let bodies: Vec<serde_json::Value> = exchanges
479 .into_iter()
480 .map(|e| match e.input.body {
481 Body::Json(v) => v,
482 Body::Text(s) => serde_json::Value::String(s),
483 Body::Xml(s) => serde_json::Value::String(s),
484 Body::Bytes(b) => {
485 serde_json::Value::String(String::from_utf8_lossy(&b).into_owned())
486 }
487 Body::Empty => serde_json::Value::Null,
488 Body::Stream(s) => serde_json::json!({
489 "_stream": {
490 "origin": s.metadata.origin,
491 "placeholder": true,
492 "hint": "Materialize exchange body with .into_bytes() before aggregation if content needed"
493 }
494 }),
495 })
496 .collect();
497 Ok(Exchange::new(Message {
498 headers: Default::default(),
499 body: Body::Json(serde_json::Value::Array(bodies)),
500 }))
501 }
502 AggregationStrategy::Custom(f) => {
503 let mut iter = exchanges.into_iter();
504 let first = iter.next().ok_or_else(|| {
505 CamelError::ProcessorError("Aggregator: empty bucket".to_string())
506 })?;
507 Ok(iter.fold(first, |acc, next| f(acc, next)))
508 }
509 }
510}
511
512#[cfg(test)]
513mod tests {
514 use super::*;
515 use std::collections::HashMap;
516
517 use camel_api::{
518 aggregator::{AggregationStrategy, AggregatorConfig},
519 body::Body,
520 exchange::Exchange,
521 message::Message,
522 };
523 use tokio::sync::mpsc;
524 use tokio_util::sync::CancellationToken;
525 use tower::ServiceExt;
526
527 fn make_exchange(header: &str, value: &str, body: &str) -> Exchange {
528 let mut msg = Message {
529 headers: Default::default(),
530 body: Body::Text(body.to_string()),
531 };
532 msg.headers
533 .insert(header.to_string(), serde_json::json!(value));
534 Exchange::new(msg)
535 }
536
537 fn config_size(n: usize) -> AggregatorConfig {
538 AggregatorConfig::correlate_by("orderId")
539 .complete_when_size(n)
540 .build()
541 .unwrap()
542 }
543
544 fn new_test_svc(config: AggregatorConfig) -> AggregatorService {
545 let (tx, _rx) = mpsc::channel(256);
546 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
547 let cancel = CancellationToken::new();
548 AggregatorService::new(config, tx, registry, cancel)
549 }
550
551 #[tokio::test]
552 async fn test_pending_exchange_not_yet_complete() {
553 let mut svc = new_test_svc(config_size(3));
554 let ex = make_exchange("orderId", "A", "first");
555 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
556 assert!(matches!(result.input.body, Body::Empty));
557 assert_eq!(
558 result.property(CAMEL_AGGREGATOR_PENDING),
559 Some(&serde_json::json!(true))
560 );
561 }
562
563 #[tokio::test]
564 async fn test_completes_on_size() {
565 let mut svc = new_test_svc(config_size(3));
566 for _ in 0..2 {
567 let ex = make_exchange("orderId", "A", "item");
568 let r = svc.ready().await.unwrap().call(ex).await.unwrap();
569 assert!(matches!(r.input.body, Body::Empty));
570 }
571 let ex = make_exchange("orderId", "A", "last");
572 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
573 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
574 assert_eq!(
575 result.property(CAMEL_AGGREGATED_SIZE),
576 Some(&serde_json::json!(3u64))
577 );
578 }
579
580 #[tokio::test]
581 async fn test_collect_all_produces_json_array() {
582 let mut svc = new_test_svc(config_size(2));
583 svc.ready()
584 .await
585 .unwrap()
586 .call(make_exchange("orderId", "A", "alpha"))
587 .await
588 .unwrap();
589 let result = svc
590 .ready()
591 .await
592 .unwrap()
593 .call(make_exchange("orderId", "A", "beta"))
594 .await
595 .unwrap();
596 let Body::Json(v) = &result.input.body else {
597 panic!("expected Body::Json")
598 };
599 let arr = v.as_array().unwrap();
600 assert_eq!(arr.len(), 2);
601 assert_eq!(arr[0], serde_json::json!("alpha"));
602 assert_eq!(arr[1], serde_json::json!("beta"));
603 }
604
605 #[tokio::test]
606 async fn test_two_keys_independent_buckets() {
607 let mut svc = new_test_svc(config_size(3));
609 svc.ready()
610 .await
611 .unwrap()
612 .call(make_exchange("orderId", "A", "a1"))
613 .await
614 .unwrap();
615 svc.ready()
616 .await
617 .unwrap()
618 .call(make_exchange("orderId", "B", "b1"))
619 .await
620 .unwrap();
621 svc.ready()
622 .await
623 .unwrap()
624 .call(make_exchange("orderId", "A", "a2"))
625 .await
626 .unwrap();
627 let ra = svc
629 .ready()
630 .await
631 .unwrap()
632 .call(make_exchange("orderId", "A", "a3"))
633 .await
634 .unwrap();
635 assert!(matches!(ra.input.body, Body::Json(_)));
637 let rb = svc
639 .ready()
640 .await
641 .unwrap()
642 .call(make_exchange("orderId", "B", "b_check"))
643 .await
644 .unwrap();
645 assert!(matches!(rb.input.body, Body::Empty));
646 }
647
648 #[tokio::test]
649 async fn test_bucket_resets_after_completion() {
650 let mut svc = new_test_svc(config_size(2));
651 svc.ready()
652 .await
653 .unwrap()
654 .call(make_exchange("orderId", "A", "x"))
655 .await
656 .unwrap();
657 svc.ready()
658 .await
659 .unwrap()
660 .call(make_exchange("orderId", "A", "x"))
661 .await
662 .unwrap(); let r = svc
665 .ready()
666 .await
667 .unwrap()
668 .call(make_exchange("orderId", "A", "new"))
669 .await
670 .unwrap();
671 assert!(matches!(r.input.body, Body::Empty)); }
673
674 #[tokio::test]
675 async fn test_completion_size_1_emits_immediately() {
676 let mut svc = new_test_svc(config_size(1));
677 let ex = make_exchange("orderId", "A", "solo");
678 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
679 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
680 }
681
682 #[tokio::test]
683 async fn test_custom_aggregation_strategy() {
684 use camel_api::aggregator::AggregationFn;
685 use std::sync::Arc;
686
687 let f: AggregationFn = Arc::new(|mut acc: Exchange, next: Exchange| {
688 let combined = format!(
689 "{}+{}",
690 acc.input.body.as_text().unwrap_or(""),
691 next.input.body.as_text().unwrap_or("")
692 );
693 acc.input.body = Body::Text(combined);
694 acc
695 });
696 let config = AggregatorConfig::correlate_by("key")
697 .complete_when_size(2)
698 .strategy(AggregationStrategy::Custom(f))
699 .build()
700 .unwrap();
701 let mut svc = new_test_svc(config);
702 svc.ready()
703 .await
704 .unwrap()
705 .call(make_exchange("key", "X", "hello"))
706 .await
707 .unwrap();
708 let result = svc
709 .ready()
710 .await
711 .unwrap()
712 .call(make_exchange("key", "X", "world"))
713 .await
714 .unwrap();
715 assert_eq!(result.input.body.as_text(), Some("hello+world"));
716 }
717
718 #[tokio::test]
719 async fn test_completion_predicate() {
720 let config = AggregatorConfig::correlate_by("key")
721 .complete_when(|bucket| {
722 bucket
723 .iter()
724 .any(|e| e.input.body.as_text() == Some("DONE"))
725 })
726 .build()
727 .unwrap();
728 let mut svc = new_test_svc(config);
729 svc.ready()
730 .await
731 .unwrap()
732 .call(make_exchange("key", "K", "first"))
733 .await
734 .unwrap();
735 svc.ready()
736 .await
737 .unwrap()
738 .call(make_exchange("key", "K", "second"))
739 .await
740 .unwrap();
741 let result = svc
742 .ready()
743 .await
744 .unwrap()
745 .call(make_exchange("key", "K", "DONE"))
746 .await
747 .unwrap();
748 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
749 }
750
751 #[tokio::test]
752 async fn test_missing_header_returns_error() {
753 let mut svc = new_test_svc(config_size(2));
754 let msg = Message {
755 headers: Default::default(),
756 body: Body::Text("no key".into()),
757 };
758 let ex = Exchange::new(msg);
759 let result = svc.ready().await.unwrap().call(ex).await;
760 assert!(result.is_err());
761 assert!(matches!(
762 result.unwrap_err(),
763 camel_api::CamelError::ProcessorError(_)
764 ));
765 }
766
767 #[tokio::test]
768 async fn test_cloned_service_shares_state() {
769 let svc1 = new_test_svc(config_size(2));
770 let mut svc2 = svc1.clone();
771 svc1.clone()
773 .ready()
774 .await
775 .unwrap()
776 .call(make_exchange("orderId", "A", "from-svc1"))
777 .await
778 .unwrap();
779 let result = svc2
781 .ready()
782 .await
783 .unwrap()
784 .call(make_exchange("orderId", "A", "from-svc2"))
785 .await
786 .unwrap();
787 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
788 }
789
790 #[tokio::test]
791 async fn test_camel_aggregated_key_property_set() {
792 let mut svc = new_test_svc(config_size(1));
793 let ex = make_exchange("orderId", "ORDER-42", "body");
794 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
795 assert_eq!(
796 result.property(CAMEL_AGGREGATED_KEY),
797 Some(&serde_json::json!("ORDER-42"))
798 );
799 }
800
801 #[tokio::test]
802 async fn test_aggregator_enforces_max_buckets() {
803 let config = AggregatorConfig::correlate_by("orderId")
804 .complete_when_size(2)
805 .max_buckets(3)
806 .build()
807 .unwrap();
808
809 let mut svc = new_test_svc(config);
810
811 for i in 0..3 {
813 let ex = make_exchange("orderId", &format!("key-{}", i), "body");
814 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
815 }
816
817 let ex = make_exchange("orderId", "key-4", "body");
819 let result = svc.ready().await.unwrap().call(ex).await;
820
821 assert!(result.is_err(), "Should reject when max buckets reached");
822 let err = result.unwrap_err().to_string();
823 assert!(
824 err.contains("maximum"),
825 "Error message should contain 'maximum': {}",
826 err
827 );
828 }
829
830 #[tokio::test]
831 async fn test_max_buckets_allows_existing_key() {
832 let config = AggregatorConfig::correlate_by("orderId")
833 .complete_when_size(5) .max_buckets(2)
835 .build()
836 .unwrap();
837
838 let mut svc = new_test_svc(config);
839
840 let ex1 = make_exchange("orderId", "key-A", "body1");
842 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
843 let ex2 = make_exchange("orderId", "key-B", "body2");
844 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
845
846 let ex3 = make_exchange("orderId", "key-A", "body3");
848 let result = svc.ready().await.unwrap().call(ex3).await;
849 assert!(
850 result.is_ok(),
851 "Should allow adding to existing bucket even at max limit"
852 );
853 }
854
855 #[tokio::test]
856 async fn test_bucket_ttl_eviction() {
857 let config = AggregatorConfig::correlate_by("orderId")
858 .complete_when_size(10) .bucket_ttl(Duration::from_millis(50))
860 .build()
861 .unwrap();
862
863 let mut svc = new_test_svc(config);
864
865 let ex1 = make_exchange("orderId", "key-A", "body1");
867 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
868
869 tokio::time::sleep(Duration::from_millis(100)).await;
871
872 let ex2 = make_exchange("orderId", "key-B", "body2");
874 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
875
876 let ex3 = make_exchange("orderId", "key-A", "body3");
879 let result = svc.ready().await.unwrap().call(ex3).await;
880 assert!(result.is_ok(), "Should be able to recreate evicted bucket");
881 }
882
883 #[tokio::test(start_paused = true)]
884 async fn test_timeout_completes_bucket() {
885 let config = AggregatorConfig::correlate_by("key")
886 .complete_on_timeout(Duration::from_millis(100))
887 .build()
888 .unwrap();
889 let mut svc = new_test_svc(config);
890 let ex = make_exchange("key", "A", "data");
891 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
892 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_some());
893
894 tokio::time::sleep(Duration::from_millis(200)).await;
895
896 assert_eq!(
897 svc.buckets.lock().unwrap().len(),
898 0,
899 "bucket should be removed after timeout"
900 );
901 }
902
903 #[tokio::test(start_paused = true)]
904 async fn test_timeout_resets_on_new_exchange() {
905 let config = AggregatorConfig::correlate_by("key")
906 .complete_on_timeout(Duration::from_millis(150))
907 .build()
908 .unwrap();
909 let mut svc = new_test_svc(config);
910
911 let ex1 = make_exchange("key", "A", "first");
912 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
913
914 tokio::time::sleep(Duration::from_millis(100)).await;
915
916 let ex2 = make_exchange("key", "A", "second");
917 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
918
919 tokio::time::sleep(Duration::from_millis(100)).await;
920
921 assert_eq!(
922 svc.buckets.lock().unwrap().len(),
923 1,
924 "bucket should still exist — timeout was reset"
925 );
926
927 tokio::time::sleep(Duration::from_millis(100)).await;
928
929 assert_eq!(
930 svc.buckets.lock().unwrap().len(),
931 0,
932 "bucket should be gone after timeout fires"
933 );
934 }
935
936 #[tokio::test]
937 async fn test_composable_size_and_timeout() {
938 let config = AggregatorConfig::correlate_by("key")
939 .complete_on_size_or_timeout(2, Duration::from_millis(200))
940 .build()
941 .unwrap();
942 let mut svc = new_test_svc(config);
943
944 let ex1 = make_exchange("key", "A", "first");
945 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
946 assert!(svc.buckets.lock().unwrap().contains_key("\"A\""));
947
948 let ex2 = make_exchange("key", "A", "second");
949 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
950 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
951 assert_eq!(
952 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
953 Some(&serde_json::json!("size"))
954 );
955 }
956
957 #[tokio::test(start_paused = true)]
958 async fn test_discard_on_timeout() {
959 let config = AggregatorConfig::correlate_by("key")
960 .complete_on_timeout(Duration::from_millis(50))
961 .discard_on_timeout(true)
962 .build()
963 .unwrap();
964 let (tx, mut rx) = mpsc::channel(256);
965 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
966 let cancel = CancellationToken::new();
967 let mut svc = AggregatorService::new(config, tx, registry, cancel);
968
969 let ex = make_exchange("key", "A", "data");
970 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
971
972 tokio::time::sleep(Duration::from_millis(100)).await;
973
974 assert!(
975 rx.try_recv().is_err(),
976 "no emit expected with discard_on_timeout"
977 );
978 assert_eq!(svc.buckets.lock().unwrap().len(), 0);
979 assert!(
980 svc.timeout_tasks.lock().unwrap().is_empty(),
981 "timeout task should be cleaned up"
982 );
983 }
984
985 #[tokio::test]
986 async fn test_force_completion_on_stop() {
987 let config = AggregatorConfig::correlate_by("key")
988 .complete_when_size(10)
989 .force_completion_on_stop(true)
990 .build()
991 .unwrap();
992 let (tx, mut rx) = mpsc::channel(256);
993 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
994 let cancel = CancellationToken::new();
995 let svc = AggregatorService::new(config, tx, registry, cancel);
996
997 let mut call_svc = svc.clone();
998 let ex = make_exchange("key", "A", "data");
999 let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1000
1001 svc.force_complete_all();
1002
1003 let result = rx.try_recv().expect("should emit on force-complete");
1004 assert!(
1005 result.input.body.as_text().is_some() || matches!(result.input.body, Body::Json(_))
1006 );
1007 assert_eq!(
1008 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1009 Some(&serde_json::json!("stop"))
1010 );
1011 }
1012
1013 #[tokio::test]
1014 async fn test_completion_reason_property_size() {
1015 let config = AggregatorConfig::correlate_by("key")
1016 .complete_when_size(1)
1017 .build()
1018 .unwrap();
1019 let mut svc = new_test_svc(config);
1020 let ex = make_exchange("key", "X", "body");
1021 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1022 assert_eq!(
1023 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1024 Some(&serde_json::json!("size"))
1025 );
1026 }
1027
1028 #[tokio::test]
1029 async fn test_completion_reason_property_predicate() {
1030 let config = AggregatorConfig::correlate_by("key")
1031 .complete_when(|_| true)
1032 .build()
1033 .unwrap();
1034 let mut svc = new_test_svc(config);
1035 let ex = make_exchange("key", "X", "body");
1036 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1037 assert_eq!(
1038 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1039 Some(&serde_json::json!("predicate"))
1040 );
1041 }
1042
1043 #[tokio::test(start_paused = true)]
1044 async fn test_size_completes_before_timeout() {
1045 let config = AggregatorConfig::correlate_by("key")
1046 .complete_on_size_or_timeout(2, Duration::from_millis(200))
1047 .build()
1048 .unwrap();
1049 let mut svc = new_test_svc(config);
1050
1051 let ex1 = make_exchange("key", "A", "first");
1052 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
1053
1054 let ex2 = make_exchange("key", "A", "second");
1055 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1056
1057 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
1058 assert_eq!(
1059 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1060 Some(&serde_json::json!("size"))
1061 );
1062 assert_eq!(svc.buckets.lock().unwrap().len(), 0);
1063
1064 tokio::time::sleep(Duration::from_millis(300)).await;
1065 assert_eq!(
1066 svc.buckets.lock().unwrap().len(),
1067 0,
1068 "no re-fire after timeout"
1069 );
1070 }
1071
1072 #[tokio::test(start_paused = true)]
1073 async fn test_concurrent_timeout_fire_and_new_exchange() {
1074 let config = AggregatorConfig::correlate_by("key")
1075 .complete_on_size_or_timeout(2, Duration::from_millis(100))
1076 .build()
1077 .unwrap();
1078 let (tx, mut rx) = mpsc::channel(256);
1079 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1080 let cancel = CancellationToken::new();
1081 let mut svc = AggregatorService::new(config, tx, registry, cancel);
1082
1083 let ex = make_exchange("key", "A", "data");
1084 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1085
1086 tokio::time::sleep(Duration::from_millis(150)).await;
1088
1089 let ex2 = make_exchange("key", "A", "data2");
1091 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1092 assert!(
1093 result.property(CAMEL_AGGREGATOR_PENDING).is_some(),
1094 "should be pending in new bucket"
1095 );
1096
1097 let mut late_count = 0;
1099 while rx.try_recv().is_ok() {
1100 late_count += 1;
1101 }
1102 assert_eq!(
1103 late_count, 1,
1104 "exactly 1 late emit from the timed-out bucket"
1105 );
1106 }
1107
1108 #[tokio::test(start_paused = true)]
1109 async fn test_late_channel_full_drops_with_warning() {
1110 let config = AggregatorConfig::correlate_by("key")
1111 .complete_on_timeout(Duration::from_millis(50))
1112 .build()
1113 .unwrap();
1114 let (tx, mut rx) = mpsc::channel(1);
1115 rx.close();
1116 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1117 let cancel = CancellationToken::new();
1118 let mut svc = AggregatorService::new(config, tx, registry, cancel);
1119
1120 let ex = make_exchange("key", "A", "data");
1121 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1122
1123 tokio::time::sleep(Duration::from_millis(100)).await;
1124 assert_eq!(
1125 svc.buckets.lock().unwrap().len(),
1126 0,
1127 "bucket removed despite channel closed"
1128 );
1129 }
1130
1131 #[tokio::test]
1132 async fn test_aggregate_stream_bodies_creates_valid_json() {
1133 use bytes::Bytes;
1134 use camel_api::{Body, StreamBody, StreamMetadata};
1135 use futures::stream;
1136 use tokio::sync::Mutex;
1137
1138 let chunks = vec![Ok(Bytes::from("test"))];
1139 let stream_body = StreamBody {
1140 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1141 metadata: StreamMetadata {
1142 origin: Some("file:///test.txt".to_string()),
1143 ..Default::default()
1144 },
1145 };
1146
1147 let ex1 = Exchange::new(Message {
1148 headers: Default::default(),
1149 body: Body::Stream(stream_body),
1150 });
1151
1152 let exchanges = vec![ex1];
1153 let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1154
1155 let exchange = result.expect("Expected Ok result");
1156 assert!(
1157 matches!(exchange.input.body, Body::Json(_)),
1158 "Expected Json body"
1159 );
1160
1161 if let Body::Json(value) = exchange.input.body {
1162 let json_str = serde_json::to_string(&value).unwrap();
1163 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1164
1165 assert!(parsed.is_array(), "Result should be an array");
1166 let arr = parsed.as_array().unwrap();
1167 assert!(arr[0].is_object(), "First element should be an object");
1168 assert!(
1169 arr[0]["_stream"].is_object(),
1170 "Should contain _stream object"
1171 );
1172 assert_eq!(arr[0]["_stream"]["origin"], "file:///test.txt");
1173 assert_eq!(
1174 arr[0]["_stream"]["placeholder"], true,
1175 "placeholder flag should be true"
1176 );
1177 }
1178 }
1179
1180 #[tokio::test]
1181 async fn test_aggregate_stream_bodies_with_none_origin() {
1182 use bytes::Bytes;
1183 use camel_api::{Body, StreamBody, StreamMetadata};
1184 use futures::stream;
1185 use tokio::sync::Mutex;
1186
1187 let chunks = vec![Ok(Bytes::from("test"))];
1188 let stream_body = StreamBody {
1189 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1190 metadata: StreamMetadata {
1191 origin: None,
1192 ..Default::default()
1193 },
1194 };
1195
1196 let ex1 = Exchange::new(Message {
1197 headers: Default::default(),
1198 body: Body::Stream(stream_body),
1199 });
1200
1201 let exchanges = vec![ex1];
1202 let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1203
1204 let exchange = result.expect("Expected Ok result");
1205 assert!(
1206 matches!(exchange.input.body, Body::Json(_)),
1207 "Expected Json body"
1208 );
1209
1210 if let Body::Json(value) = exchange.input.body {
1211 let json_str = serde_json::to_string(&value).unwrap();
1212 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1213
1214 assert!(parsed.is_array(), "Result should be an array");
1215 let arr = parsed.as_array().unwrap();
1216 assert!(arr[0].is_object(), "First element should be an object");
1217 assert!(
1218 arr[0]["_stream"].is_object(),
1219 "Should contain _stream object"
1220 );
1221 assert_eq!(
1222 arr[0]["_stream"]["origin"],
1223 serde_json::Value::Null,
1224 "origin should be null when None"
1225 );
1226 assert_eq!(
1227 arr[0]["_stream"]["placeholder"], true,
1228 "placeholder flag should be true"
1229 );
1230 }
1231 }
1232
1233 #[tokio::test(start_paused = true)]
1234 async fn test_shutdown_awaits_timeout_handles() {
1235 let config = AggregatorConfig::correlate_by("key")
1236 .complete_on_timeout(Duration::from_millis(100))
1237 .build()
1238 .unwrap();
1239 let (tx, _rx) = mpsc::channel(256);
1240 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1241 let cancel = CancellationToken::new();
1242 let svc = AggregatorService::new(config, tx, registry, cancel);
1243
1244 let mut call_svc = svc.clone();
1246 let ex = make_exchange("key", "A", "data");
1247 let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1248
1249 assert!(
1251 !svc.timeout_handles.lock().unwrap().is_empty(),
1252 "should have a timeout handle"
1253 );
1254
1255 svc.shutdown().await;
1258
1259 assert!(
1260 svc.timeout_handles.lock().unwrap().is_empty(),
1261 "all handles should be cleaned up after shutdown"
1262 );
1263 }
1264}