1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6use std::time::{Duration, Instant};
7
8use tokio::sync::mpsc;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tower::Service;
12
13use camel_api::{
14 CamelError,
15 aggregator::{
16 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode,
17 CompletionReason, CorrelationStrategy,
18 },
19 body::Body,
20 exchange::Exchange,
21 message::Message,
22};
23use camel_language_api::Language;
24
25pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
26
27pub const CAMEL_AGGREGATOR_PENDING: &str = "CamelAggregatorPending";
28pub const CAMEL_AGGREGATED_SIZE: &str = "CamelAggregatedSize";
29pub const CAMEL_AGGREGATED_KEY: &str = "CamelAggregatedKey";
30pub const CAMEL_AGGREGATED_COMPLETION_REASON: &str = "CamelAggregatedCompletionReason";
31
32struct Bucket {
34 exchanges: Vec<Exchange>,
35 last_updated: Instant,
36}
37
38impl Bucket {
39 fn new() -> Self {
40 Self {
41 exchanges: Vec::new(),
42 last_updated: Instant::now(),
43 }
44 }
45
46 fn push(&mut self, exchange: Exchange) {
47 self.exchanges.push(exchange);
48 self.last_updated = Instant::now();
49 }
50
51 fn is_expired(&self, ttl: Duration) -> bool {
52 Instant::now().duration_since(self.last_updated) >= ttl
53 }
54}
55
56#[derive(Clone)]
57pub struct AggregatorService {
58 config: AggregatorConfig,
59 buckets: Arc<Mutex<HashMap<String, Bucket>>>,
60 timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
61 timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
62 late_tx: mpsc::Sender<Exchange>,
63 language_registry: SharedLanguageRegistry,
64 route_cancel: CancellationToken,
65}
66
67impl AggregatorService {
68 pub fn new(
69 config: AggregatorConfig,
70 late_tx: mpsc::Sender<Exchange>,
71 language_registry: SharedLanguageRegistry,
72 route_cancel: CancellationToken,
73 ) -> Self {
74 Self {
75 config,
76 buckets: Arc::new(Mutex::new(HashMap::new())),
77 timeout_tasks: Arc::new(Mutex::new(HashMap::new())),
78 timeout_handles: Arc::new(Mutex::new(HashMap::new())),
79 late_tx,
80 language_registry,
81 route_cancel,
82 }
83 }
84
85 pub fn config(&self) -> &AggregatorConfig {
86 &self.config
87 }
88
89 pub fn has_timeout(&self) -> bool {
90 has_timeout_condition(&self.config.completion)
91 }
92
93 pub fn force_complete_all(&self) {
94 let mut buckets_guard = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
95 let keys: Vec<String> = buckets_guard.keys().cloned().collect();
96
97 for key in keys {
98 if let Some(bucket) = buckets_guard.remove(&key) {
99 if self.config.force_completion_on_stop {
100 cancel_timeout_task_with_handle(
101 &key,
102 &self.timeout_tasks,
103 &self.timeout_handles,
104 );
105 match aggregate(bucket.exchanges, &self.config.strategy) {
106 Ok(mut result) => {
107 result.set_property(
108 CAMEL_AGGREGATED_COMPLETION_REASON,
109 serde_json::json!(CompletionReason::Stop.as_str()),
110 );
111 if self.late_tx.try_send(result).is_err() {
112 tracing::warn!(
113 key = %key,
114 "aggregator force-complete emit dropped: late channel full"
115 );
116 }
117 }
118 Err(e) => {
119 tracing::warn!(
121 key = %key,
122 error = %e,
123 "aggregation failed in force_complete_all"
124 );
125 }
126 }
127 } else {
128 cancel_timeout_task_with_handle(
129 &key,
130 &self.timeout_tasks,
131 &self.timeout_handles,
132 );
133 }
134 }
135 }
136 }
137
138 pub async fn shutdown(&self) {
141 {
143 let mut guard = self.timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
144 for token in guard.values() {
145 token.cancel();
146 }
147 guard.clear();
148 };
149
150 let handles: Vec<JoinHandle<()>> = {
152 let mut guard = self
153 .timeout_handles
154 .lock()
155 .unwrap_or_else(|e| e.into_inner());
156 guard.drain().map(|(_, handle)| handle).collect()
157 };
158
159 if handles.is_empty() {
160 return;
161 }
162
163 let _ = tokio::time::timeout(Duration::from_secs(5), async {
165 for handle in handles {
166 let _ = handle.await;
167 }
168 })
169 .await;
170 }
171}
172
173pub fn has_timeout_condition(mode: &CompletionMode) -> bool {
174 match mode {
175 CompletionMode::Single(CompletionCondition::Timeout(_)) => true,
176 CompletionMode::Any(conditions) => conditions
177 .iter()
178 .any(|c| matches!(c, CompletionCondition::Timeout(_))),
179 _ => false,
180 }
181}
182
183impl Service<Exchange> for AggregatorService {
184 type Response = Exchange;
185 type Error = CamelError;
186 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
187
188 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
189 Poll::Ready(Ok(()))
190 }
191
192 fn call(&mut self, exchange: Exchange) -> Self::Future {
193 let config = self.config.clone();
194 let buckets = Arc::clone(&self.buckets);
195 let timeout_tasks = Arc::clone(&self.timeout_tasks);
196 let timeout_handles = Arc::clone(&self.timeout_handles);
197 let late_tx = self.late_tx.clone();
198 let language_registry = Arc::clone(&self.language_registry);
199 let route_cancel = self.route_cancel.clone();
200
201 Box::pin(async move {
202 let key_value =
203 extract_correlation_key(&exchange, &config.correlation, &language_registry).await?;
204
205 let key_str = serde_json::to_string(&key_value)
206 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
207
208 let completed_bucket = {
209 let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
210
211 if let Some(ttl) = config.bucket_ttl {
212 guard.retain(|_, bucket| !bucket.is_expired(ttl));
213 }
214
215 if let Some(max) = config.max_buckets
216 && !guard.contains_key(&key_str)
217 && guard.len() >= max
218 {
219 tracing::warn!(
220 max_buckets = max,
221 correlation_key = %key_str,
222 "Aggregator reached max buckets limit, rejecting new correlation key"
223 );
224 return Err(CamelError::ProcessorError(format!(
225 "Aggregator reached maximum {} buckets",
226 max
227 )));
228 }
229
230 let bucket = guard.entry(key_str.clone()).or_insert_with(Bucket::new);
231 bucket.push(exchange);
232
233 let (is_complete, reason) =
234 check_sync_completion(&config.completion, &bucket.exchanges);
235
236 if is_complete {
237 let exchanges = guard.remove(&key_str).map(|b| b.exchanges);
238 (exchanges, reason)
239 } else {
240 (None, CompletionReason::Size) }
242 };
243
244 if completed_bucket.0.is_none() && has_timeout_condition(&config.completion) {
245 let cancel = {
246 let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
247 if let Some(existing) = tt_guard.get(&key_str) {
249 existing.cancel();
250 }
251 let token = CancellationToken::new();
252 tt_guard.insert(key_str.clone(), token.clone());
253 token
254 };
255
256 let timeout_dur = extract_timeout_duration(&config.completion);
257 if let Some(timeout) = timeout_dur {
258 {
260 let mut hh = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
261 if let Some(old) = hh.remove(&key_str) {
262 old.abort();
263 }
264 }
265 let handle = spawn_timeout_task(
266 key_str.clone(),
267 timeout,
268 cancel,
269 buckets.clone(),
270 timeout_tasks.clone(),
271 timeout_handles.clone(),
272 late_tx,
273 config.strategy.clone(),
274 config.discard_on_timeout,
275 route_cancel,
276 );
277 timeout_handles
278 .lock()
279 .unwrap_or_else(|e| e.into_inner())
280 .insert(key_str.clone(), handle);
281 }
282 }
283
284 if let Some(exchanges) = completed_bucket.0 {
285 cancel_timeout_task_with_handle(&key_str, &timeout_tasks, &timeout_handles);
286 let reason = completed_bucket.1;
287 let size = exchanges.len();
288 let mut result = aggregate(exchanges, &config.strategy)?;
289 result.set_property(CAMEL_AGGREGATED_SIZE, serde_json::json!(size as u64));
290 result.set_property(CAMEL_AGGREGATED_KEY, key_value);
291 result.set_property(
292 CAMEL_AGGREGATED_COMPLETION_REASON,
293 serde_json::json!(reason.as_str()),
294 );
295 Ok(result)
296 } else {
297 let mut pending = Exchange::new(Message {
298 headers: Default::default(),
299 body: Body::Empty,
300 });
301 pending.set_property(CAMEL_AGGREGATOR_PENDING, serde_json::json!(true));
302 Ok(pending)
303 }
304 })
305 }
306}
307
308async fn extract_correlation_key(
309 exchange: &Exchange,
310 strategy: &CorrelationStrategy,
311 registry: &SharedLanguageRegistry,
312) -> Result<serde_json::Value, CamelError> {
313 match strategy {
314 CorrelationStrategy::HeaderName(h) => {
315 exchange.input.headers.get(h).cloned().ok_or_else(|| {
316 CamelError::ProcessorError(format!(
317 "Aggregator: missing correlation key header '{}'",
318 h
319 ))
320 })
321 }
322 CorrelationStrategy::Expression { expr, language } => {
323 let expression = {
324 let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
325 let lang = reg.get(language).ok_or_else(|| {
326 CamelError::ProcessorError(format!(
327 "Aggregator: language '{}' not found in registry",
328 language
329 ))
330 })?;
331 lang.create_expression(expr)
332 .map_err(|e| CamelError::ProcessorError(e.to_string()))?
333 };
334 let value = expression
335 .evaluate(exchange)
336 .await
337 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
338 if value.is_null() {
339 return Err(CamelError::ProcessorError(format!(
340 "Aggregator: correlation expression '{}' evaluated to null",
341 expr
342 )));
343 }
344 Ok(value)
345 }
346 CorrelationStrategy::Fn(f) => f(exchange).map(serde_json::Value::String).ok_or_else(|| {
347 CamelError::ProcessorError("Aggregator: correlation function returned None".to_string())
348 }),
349 }
350}
351
352fn check_sync_completion(
353 mode: &CompletionMode,
354 exchanges: &[Exchange],
355) -> (bool, CompletionReason) {
356 match mode {
357 CompletionMode::Single(cond) => check_single(cond, exchanges),
358 CompletionMode::Any(conditions) => {
359 for cond in conditions {
360 if let CompletionCondition::Timeout(_) = cond {
361 continue;
362 }
363 let (done, reason) = check_single(cond, exchanges);
364 if done {
365 return (true, reason);
366 }
367 }
368 (false, CompletionReason::Size)
369 }
370 }
371}
372
373fn check_single(cond: &CompletionCondition, exchanges: &[Exchange]) -> (bool, CompletionReason) {
374 match cond {
375 CompletionCondition::Size(n) => (exchanges.len() >= *n, CompletionReason::Size),
376 CompletionCondition::Predicate(pred) => (pred(exchanges), CompletionReason::Predicate),
377 CompletionCondition::Timeout(_) => (false, CompletionReason::Timeout),
378 }
379}
380
381fn extract_timeout_duration(mode: &CompletionMode) -> Option<Duration> {
382 match mode {
383 CompletionMode::Single(CompletionCondition::Timeout(d)) => Some(*d),
384 CompletionMode::Any(conditions) => conditions.iter().find_map(|c| {
385 if let CompletionCondition::Timeout(d) = c {
386 Some(*d)
387 } else {
388 None
389 }
390 }),
391 _ => None,
392 }
393}
394
395fn cancel_timeout_task(key: &str, timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>) {
396 let mut guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
397 if let Some(token) = guard.remove(key) {
398 token.cancel();
399 }
400}
401
402fn cancel_timeout_task_with_handle(
404 key: &str,
405 timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>,
406 timeout_handles: &Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
407) {
408 cancel_timeout_task(key, timeout_tasks);
409 let mut guard = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
410 guard.remove(key);
411}
412
413#[allow(clippy::too_many_arguments)]
414fn spawn_timeout_task(
415 key: String,
416 timeout: Duration,
417 cancel: CancellationToken,
418 buckets: Arc<Mutex<HashMap<String, Bucket>>>,
419 timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
420 _timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
421 late_tx: mpsc::Sender<Exchange>,
422 strategy: AggregationStrategy,
423 discard: bool,
424 _route_cancel: CancellationToken,
425) -> JoinHandle<()> {
426 let cancel_clone = cancel.clone();
427 tokio::spawn(async move {
428 tokio::select! {
429 _ = tokio::time::sleep(timeout) => {
430 let should_proceed = {
431 let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
432 if cancel_clone.is_cancelled() {
433 false
434 } else {
435 tt_guard.remove(&key);
436 true
437 }
438 };
439 if !should_proceed {
440 return;
441 }
442 let bucket_exchanges = {
443 let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
444 guard.remove(&key).map(|b| b.exchanges)
445 };
446 if let Some(exchanges) = bucket_exchanges
447 && !discard
448 {
449 match aggregate(exchanges, &strategy) {
450 Ok(mut result) => {
451 result.set_property(
452 CAMEL_AGGREGATED_COMPLETION_REASON,
453 serde_json::json!(CompletionReason::Timeout.as_str()),
454 );
455 if late_tx.try_send(result).is_err() {
456 tracing::warn!(
457 key = %key,
458 "aggregator timeout emit dropped: late channel full"
459 );
460 }
461 }
462 Err(e) => {
463 tracing::warn!(
465 key = %key,
466 error = %e,
467 "aggregation failed in timeout task"
468 );
469 }
470 }
471 }
472 }
473 _ = cancel_clone.cancelled() => {}
474 }
475 })
476}
477
478fn aggregate(
479 exchanges: Vec<Exchange>,
480 strategy: &AggregationStrategy,
481) -> Result<Exchange, CamelError> {
482 match strategy {
483 AggregationStrategy::CollectAll => {
484 let bodies: Vec<serde_json::Value> = exchanges
485 .into_iter()
486 .map(|e| match e.input.body {
487 Body::Json(v) => v,
488 Body::Text(s) => serde_json::Value::String(s),
489 Body::Xml(s) => serde_json::Value::String(s),
490 Body::Bytes(b) => {
491 serde_json::Value::String(String::from_utf8_lossy(&b).into_owned())
492 }
493 Body::Empty => serde_json::Value::Null,
494 Body::Stream(s) => serde_json::json!({
495 "_stream": {
496 "origin": s.metadata.origin,
497 "placeholder": true,
498 "hint": "Materialize exchange body with .into_bytes() before aggregation if content needed"
499 }
500 }),
501 })
502 .collect();
503 Ok(Exchange::new(Message {
504 headers: Default::default(),
505 body: Body::Json(serde_json::Value::Array(bodies)),
506 }))
507 }
508 AggregationStrategy::Custom(f) => {
509 let mut iter = exchanges.into_iter();
510 let first = iter.next().ok_or_else(|| {
511 CamelError::ProcessorError("Aggregator: empty bucket".to_string())
512 })?;
513 Ok(iter.fold(first, |acc, next| f(acc, next)))
514 }
515 }
516}
517
518#[cfg(test)]
519mod tests {
520 use super::*;
521 use std::collections::HashMap;
522
523 use camel_api::{
524 aggregator::{AggregationStrategy, AggregatorConfig},
525 body::Body,
526 exchange::Exchange,
527 message::Message,
528 };
529 use tokio::sync::mpsc;
530 use tokio_util::sync::CancellationToken;
531 use tower::ServiceExt;
532
533 fn make_exchange(header: &str, value: &str, body: &str) -> Exchange {
534 let mut msg = Message {
535 headers: Default::default(),
536 body: Body::Text(body.to_string()),
537 };
538 msg.headers
539 .insert(header.to_string(), serde_json::json!(value));
540 Exchange::new(msg)
541 }
542
543 fn config_size(n: usize) -> AggregatorConfig {
544 AggregatorConfig::correlate_by("orderId")
545 .complete_when_size(n)
546 .build()
547 .unwrap()
548 }
549
550 fn new_test_svc(config: AggregatorConfig) -> AggregatorService {
551 let (tx, _rx) = mpsc::channel(256);
552 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
553 let cancel = CancellationToken::new();
554 AggregatorService::new(config, tx, registry, cancel)
555 }
556
557 #[tokio::test]
558 async fn test_pending_exchange_not_yet_complete() {
559 let mut svc = new_test_svc(config_size(3));
560 let ex = make_exchange("orderId", "A", "first");
561 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
562 assert!(matches!(result.input.body, Body::Empty));
563 assert_eq!(
564 result.property(CAMEL_AGGREGATOR_PENDING),
565 Some(&serde_json::json!(true))
566 );
567 }
568
569 #[tokio::test]
570 async fn test_completes_on_size() {
571 let mut svc = new_test_svc(config_size(3));
572 for _ in 0..2 {
573 let ex = make_exchange("orderId", "A", "item");
574 let r = svc.ready().await.unwrap().call(ex).await.unwrap();
575 assert!(matches!(r.input.body, Body::Empty));
576 }
577 let ex = make_exchange("orderId", "A", "last");
578 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
579 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
580 assert_eq!(
581 result.property(CAMEL_AGGREGATED_SIZE),
582 Some(&serde_json::json!(3u64))
583 );
584 }
585
586 #[tokio::test]
587 async fn test_collect_all_produces_json_array() {
588 let mut svc = new_test_svc(config_size(2));
589 svc.ready()
590 .await
591 .unwrap()
592 .call(make_exchange("orderId", "A", "alpha"))
593 .await
594 .unwrap();
595 let result = svc
596 .ready()
597 .await
598 .unwrap()
599 .call(make_exchange("orderId", "A", "beta"))
600 .await
601 .unwrap();
602 let Body::Json(v) = &result.input.body else {
603 panic!("expected Body::Json")
604 };
605 let arr = v.as_array().unwrap();
606 assert_eq!(arr.len(), 2);
607 assert_eq!(arr[0], serde_json::json!("alpha"));
608 assert_eq!(arr[1], serde_json::json!("beta"));
609 }
610
611 #[tokio::test]
612 async fn test_two_keys_independent_buckets() {
613 let mut svc = new_test_svc(config_size(3));
615 svc.ready()
616 .await
617 .unwrap()
618 .call(make_exchange("orderId", "A", "a1"))
619 .await
620 .unwrap();
621 svc.ready()
622 .await
623 .unwrap()
624 .call(make_exchange("orderId", "B", "b1"))
625 .await
626 .unwrap();
627 svc.ready()
628 .await
629 .unwrap()
630 .call(make_exchange("orderId", "A", "a2"))
631 .await
632 .unwrap();
633 let ra = svc
635 .ready()
636 .await
637 .unwrap()
638 .call(make_exchange("orderId", "A", "a3"))
639 .await
640 .unwrap();
641 assert!(matches!(ra.input.body, Body::Json(_)));
643 let rb = svc
645 .ready()
646 .await
647 .unwrap()
648 .call(make_exchange("orderId", "B", "b_check"))
649 .await
650 .unwrap();
651 assert!(matches!(rb.input.body, Body::Empty));
652 }
653
654 #[tokio::test]
655 async fn test_bucket_resets_after_completion() {
656 let mut svc = new_test_svc(config_size(2));
657 svc.ready()
658 .await
659 .unwrap()
660 .call(make_exchange("orderId", "A", "x"))
661 .await
662 .unwrap();
663 svc.ready()
664 .await
665 .unwrap()
666 .call(make_exchange("orderId", "A", "x"))
667 .await
668 .unwrap(); let r = svc
671 .ready()
672 .await
673 .unwrap()
674 .call(make_exchange("orderId", "A", "new"))
675 .await
676 .unwrap();
677 assert!(matches!(r.input.body, Body::Empty)); }
679
680 #[tokio::test]
681 async fn test_completion_size_1_emits_immediately() {
682 let mut svc = new_test_svc(config_size(1));
683 let ex = make_exchange("orderId", "A", "solo");
684 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
685 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
686 }
687
688 #[tokio::test]
689 async fn test_custom_aggregation_strategy() {
690 use camel_api::aggregator::AggregationFn;
691 use std::sync::Arc;
692
693 let f: AggregationFn = Arc::new(|mut acc: Exchange, next: Exchange| {
694 let combined = format!(
695 "{}+{}",
696 acc.input.body.as_text().unwrap_or(""),
697 next.input.body.as_text().unwrap_or("")
698 );
699 acc.input.body = Body::Text(combined);
700 acc
701 });
702 let config = AggregatorConfig::correlate_by("key")
703 .complete_when_size(2)
704 .strategy(AggregationStrategy::Custom(f))
705 .build()
706 .unwrap();
707 let mut svc = new_test_svc(config);
708 svc.ready()
709 .await
710 .unwrap()
711 .call(make_exchange("key", "X", "hello"))
712 .await
713 .unwrap();
714 let result = svc
715 .ready()
716 .await
717 .unwrap()
718 .call(make_exchange("key", "X", "world"))
719 .await
720 .unwrap();
721 assert_eq!(result.input.body.as_text(), Some("hello+world"));
722 }
723
724 #[tokio::test]
725 async fn test_completion_predicate() {
726 let config = AggregatorConfig::correlate_by("key")
727 .complete_when(|bucket| {
728 bucket
729 .iter()
730 .any(|e| e.input.body.as_text() == Some("DONE"))
731 })
732 .build()
733 .unwrap();
734 let mut svc = new_test_svc(config);
735 svc.ready()
736 .await
737 .unwrap()
738 .call(make_exchange("key", "K", "first"))
739 .await
740 .unwrap();
741 svc.ready()
742 .await
743 .unwrap()
744 .call(make_exchange("key", "K", "second"))
745 .await
746 .unwrap();
747 let result = svc
748 .ready()
749 .await
750 .unwrap()
751 .call(make_exchange("key", "K", "DONE"))
752 .await
753 .unwrap();
754 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
755 }
756
757 #[tokio::test]
758 async fn test_missing_header_returns_error() {
759 let mut svc = new_test_svc(config_size(2));
760 let msg = Message {
761 headers: Default::default(),
762 body: Body::Text("no key".into()),
763 };
764 let ex = Exchange::new(msg);
765 let result = svc.ready().await.unwrap().call(ex).await;
766 assert!(result.is_err());
767 assert!(matches!(
768 result.unwrap_err(),
769 camel_api::CamelError::ProcessorError(_)
770 ));
771 }
772
773 #[tokio::test]
774 async fn test_cloned_service_shares_state() {
775 let svc1 = new_test_svc(config_size(2));
776 let mut svc2 = svc1.clone();
777 svc1.clone()
779 .ready()
780 .await
781 .unwrap()
782 .call(make_exchange("orderId", "A", "from-svc1"))
783 .await
784 .unwrap();
785 let result = svc2
787 .ready()
788 .await
789 .unwrap()
790 .call(make_exchange("orderId", "A", "from-svc2"))
791 .await
792 .unwrap();
793 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
794 }
795
796 #[tokio::test]
797 async fn test_camel_aggregated_key_property_set() {
798 let mut svc = new_test_svc(config_size(1));
799 let ex = make_exchange("orderId", "ORDER-42", "body");
800 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
801 assert_eq!(
802 result.property(CAMEL_AGGREGATED_KEY),
803 Some(&serde_json::json!("ORDER-42"))
804 );
805 }
806
807 #[tokio::test]
808 async fn test_aggregator_enforces_max_buckets() {
809 let config = AggregatorConfig::correlate_by("orderId")
810 .complete_when_size(2)
811 .max_buckets(3)
812 .build()
813 .unwrap();
814
815 let mut svc = new_test_svc(config);
816
817 for i in 0..3 {
819 let ex = make_exchange("orderId", &format!("key-{}", i), "body");
820 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
821 }
822
823 let ex = make_exchange("orderId", "key-4", "body");
825 let result = svc.ready().await.unwrap().call(ex).await;
826
827 assert!(result.is_err(), "Should reject when max buckets reached");
828 let err = result.unwrap_err().to_string();
829 assert!(
830 err.contains("maximum"),
831 "Error message should contain 'maximum': {}",
832 err
833 );
834 }
835
836 #[tokio::test]
837 async fn test_max_buckets_allows_existing_key() {
838 let config = AggregatorConfig::correlate_by("orderId")
839 .complete_when_size(5) .max_buckets(2)
841 .build()
842 .unwrap();
843
844 let mut svc = new_test_svc(config);
845
846 let ex1 = make_exchange("orderId", "key-A", "body1");
848 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
849 let ex2 = make_exchange("orderId", "key-B", "body2");
850 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
851
852 let ex3 = make_exchange("orderId", "key-A", "body3");
854 let result = svc.ready().await.unwrap().call(ex3).await;
855 assert!(
856 result.is_ok(),
857 "Should allow adding to existing bucket even at max limit"
858 );
859 }
860
861 #[tokio::test]
862 async fn test_bucket_ttl_eviction() {
863 let config = AggregatorConfig::correlate_by("orderId")
864 .complete_when_size(10) .bucket_ttl(Duration::from_millis(50))
866 .build()
867 .unwrap();
868
869 let mut svc = new_test_svc(config);
870
871 let ex1 = make_exchange("orderId", "key-A", "body1");
873 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
874
875 tokio::time::sleep(Duration::from_millis(100)).await;
877
878 let ex2 = make_exchange("orderId", "key-B", "body2");
880 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
881
882 let ex3 = make_exchange("orderId", "key-A", "body3");
885 let result = svc.ready().await.unwrap().call(ex3).await;
886 assert!(result.is_ok(), "Should be able to recreate evicted bucket");
887 }
888
889 #[tokio::test(start_paused = true)]
890 async fn test_timeout_completes_bucket() {
891 let config = AggregatorConfig::correlate_by("key")
892 .complete_on_timeout(Duration::from_millis(100))
893 .build()
894 .unwrap();
895 let mut svc = new_test_svc(config);
896 let ex = make_exchange("key", "A", "data");
897 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
898 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_some());
899
900 tokio::time::sleep(Duration::from_millis(200)).await;
901
902 assert_eq!(
903 svc.buckets.lock().unwrap().len(),
904 0,
905 "bucket should be removed after timeout"
906 );
907 }
908
909 #[tokio::test(start_paused = true)]
910 async fn test_timeout_resets_on_new_exchange() {
911 let config = AggregatorConfig::correlate_by("key")
912 .complete_on_timeout(Duration::from_millis(150))
913 .build()
914 .unwrap();
915 let mut svc = new_test_svc(config);
916
917 let ex1 = make_exchange("key", "A", "first");
918 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
919
920 tokio::time::sleep(Duration::from_millis(100)).await;
921
922 let ex2 = make_exchange("key", "A", "second");
923 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
924
925 tokio::time::sleep(Duration::from_millis(100)).await;
926
927 assert_eq!(
928 svc.buckets.lock().unwrap().len(),
929 1,
930 "bucket should still exist — timeout was reset"
931 );
932
933 tokio::time::sleep(Duration::from_millis(100)).await;
934
935 assert_eq!(
936 svc.buckets.lock().unwrap().len(),
937 0,
938 "bucket should be gone after timeout fires"
939 );
940 }
941
942 #[tokio::test]
943 async fn test_composable_size_and_timeout() {
944 let config = AggregatorConfig::correlate_by("key")
945 .complete_on_size_or_timeout(2, Duration::from_millis(200))
946 .build()
947 .unwrap();
948 let mut svc = new_test_svc(config);
949
950 let ex1 = make_exchange("key", "A", "first");
951 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
952 assert!(svc.buckets.lock().unwrap().contains_key("\"A\""));
953
954 let ex2 = make_exchange("key", "A", "second");
955 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
956 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
957 assert_eq!(
958 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
959 Some(&serde_json::json!("size"))
960 );
961 }
962
963 #[tokio::test(start_paused = true)]
964 async fn test_discard_on_timeout() {
965 let config = AggregatorConfig::correlate_by("key")
966 .complete_on_timeout(Duration::from_millis(50))
967 .discard_on_timeout(true)
968 .build()
969 .unwrap();
970 let (tx, mut rx) = mpsc::channel(256);
971 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
972 let cancel = CancellationToken::new();
973 let mut svc = AggregatorService::new(config, tx, registry, cancel);
974
975 let ex = make_exchange("key", "A", "data");
976 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
977
978 tokio::time::sleep(Duration::from_millis(100)).await;
979
980 assert!(
981 rx.try_recv().is_err(),
982 "no emit expected with discard_on_timeout"
983 );
984 assert_eq!(svc.buckets.lock().unwrap().len(), 0);
985 assert!(
986 svc.timeout_tasks.lock().unwrap().is_empty(),
987 "timeout task should be cleaned up"
988 );
989 }
990
991 #[tokio::test]
992 async fn test_force_completion_on_stop() {
993 let config = AggregatorConfig::correlate_by("key")
994 .complete_when_size(10)
995 .force_completion_on_stop(true)
996 .build()
997 .unwrap();
998 let (tx, mut rx) = mpsc::channel(256);
999 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1000 let cancel = CancellationToken::new();
1001 let svc = AggregatorService::new(config, tx, registry, cancel);
1002
1003 let mut call_svc = svc.clone();
1004 let ex = make_exchange("key", "A", "data");
1005 let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1006
1007 svc.force_complete_all();
1008
1009 let result = rx.try_recv().expect("should emit on force-complete");
1010 assert!(
1011 result.input.body.as_text().is_some() || matches!(result.input.body, Body::Json(_))
1012 );
1013 assert_eq!(
1014 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1015 Some(&serde_json::json!("stop"))
1016 );
1017 }
1018
1019 #[tokio::test]
1020 async fn test_completion_reason_property_size() {
1021 let config = AggregatorConfig::correlate_by("key")
1022 .complete_when_size(1)
1023 .build()
1024 .unwrap();
1025 let mut svc = new_test_svc(config);
1026 let ex = make_exchange("key", "X", "body");
1027 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1028 assert_eq!(
1029 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1030 Some(&serde_json::json!("size"))
1031 );
1032 }
1033
1034 #[tokio::test]
1035 async fn test_completion_reason_property_predicate() {
1036 let config = AggregatorConfig::correlate_by("key")
1037 .complete_when(|_| true)
1038 .build()
1039 .unwrap();
1040 let mut svc = new_test_svc(config);
1041 let ex = make_exchange("key", "X", "body");
1042 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1043 assert_eq!(
1044 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1045 Some(&serde_json::json!("predicate"))
1046 );
1047 }
1048
1049 #[tokio::test(start_paused = true)]
1050 async fn test_size_completes_before_timeout() {
1051 let config = AggregatorConfig::correlate_by("key")
1052 .complete_on_size_or_timeout(2, Duration::from_millis(200))
1053 .build()
1054 .unwrap();
1055 let mut svc = new_test_svc(config);
1056
1057 let ex1 = make_exchange("key", "A", "first");
1058 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
1059
1060 let ex2 = make_exchange("key", "A", "second");
1061 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1062
1063 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
1064 assert_eq!(
1065 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1066 Some(&serde_json::json!("size"))
1067 );
1068 assert_eq!(svc.buckets.lock().unwrap().len(), 0);
1069
1070 tokio::time::sleep(Duration::from_millis(300)).await;
1071 assert_eq!(
1072 svc.buckets.lock().unwrap().len(),
1073 0,
1074 "no re-fire after timeout"
1075 );
1076 }
1077
1078 #[tokio::test(start_paused = true)]
1079 async fn test_concurrent_timeout_fire_and_new_exchange() {
1080 let config = AggregatorConfig::correlate_by("key")
1081 .complete_on_size_or_timeout(2, Duration::from_millis(100))
1082 .build()
1083 .unwrap();
1084 let (tx, mut rx) = mpsc::channel(256);
1085 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1086 let cancel = CancellationToken::new();
1087 let mut svc = AggregatorService::new(config, tx, registry, cancel);
1088
1089 let ex = make_exchange("key", "A", "data");
1090 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1091
1092 tokio::time::sleep(Duration::from_millis(150)).await;
1094
1095 let ex2 = make_exchange("key", "A", "data2");
1097 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1098 assert!(
1099 result.property(CAMEL_AGGREGATOR_PENDING).is_some(),
1100 "should be pending in new bucket"
1101 );
1102
1103 let mut late_count = 0;
1105 while rx.try_recv().is_ok() {
1106 late_count += 1;
1107 }
1108 assert_eq!(
1109 late_count, 1,
1110 "exactly 1 late emit from the timed-out bucket"
1111 );
1112 }
1113
1114 #[tokio::test(start_paused = true)]
1115 async fn test_late_channel_full_drops_with_warning() {
1116 let config = AggregatorConfig::correlate_by("key")
1117 .complete_on_timeout(Duration::from_millis(50))
1118 .build()
1119 .unwrap();
1120 let (tx, mut rx) = mpsc::channel(1);
1121 rx.close();
1122 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1123 let cancel = CancellationToken::new();
1124 let mut svc = AggregatorService::new(config, tx, registry, cancel);
1125
1126 let ex = make_exchange("key", "A", "data");
1127 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1128
1129 tokio::time::sleep(Duration::from_millis(100)).await;
1130 assert_eq!(
1131 svc.buckets.lock().unwrap().len(),
1132 0,
1133 "bucket removed despite channel closed"
1134 );
1135 }
1136
1137 #[tokio::test]
1138 async fn test_aggregate_stream_bodies_creates_valid_json() {
1139 use bytes::Bytes;
1140 use camel_api::{Body, StreamBody, StreamMetadata};
1141 use futures::stream;
1142 use tokio::sync::Mutex;
1143
1144 let chunks = vec![Ok(Bytes::from("test"))];
1145 let stream_body = StreamBody {
1146 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1147 metadata: StreamMetadata {
1148 origin: Some("file:///test.txt".to_string()),
1149 ..Default::default()
1150 },
1151 };
1152
1153 let ex1 = Exchange::new(Message {
1154 headers: Default::default(),
1155 body: Body::Stream(stream_body),
1156 });
1157
1158 let exchanges = vec![ex1];
1159 let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1160
1161 let exchange = result.expect("Expected Ok result");
1162 assert!(
1163 matches!(exchange.input.body, Body::Json(_)),
1164 "Expected Json body"
1165 );
1166
1167 if let Body::Json(value) = exchange.input.body {
1168 let json_str = serde_json::to_string(&value).unwrap();
1169 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1170
1171 assert!(parsed.is_array(), "Result should be an array");
1172 let arr = parsed.as_array().unwrap();
1173 assert!(arr[0].is_object(), "First element should be an object");
1174 assert!(
1175 arr[0]["_stream"].is_object(),
1176 "Should contain _stream object"
1177 );
1178 assert_eq!(arr[0]["_stream"]["origin"], "file:///test.txt");
1179 assert_eq!(
1180 arr[0]["_stream"]["placeholder"], true,
1181 "placeholder flag should be true"
1182 );
1183 }
1184 }
1185
1186 #[tokio::test]
1187 async fn test_aggregate_stream_bodies_with_none_origin() {
1188 use bytes::Bytes;
1189 use camel_api::{Body, StreamBody, StreamMetadata};
1190 use futures::stream;
1191 use tokio::sync::Mutex;
1192
1193 let chunks = vec![Ok(Bytes::from("test"))];
1194 let stream_body = StreamBody {
1195 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1196 metadata: StreamMetadata {
1197 origin: None,
1198 ..Default::default()
1199 },
1200 };
1201
1202 let ex1 = Exchange::new(Message {
1203 headers: Default::default(),
1204 body: Body::Stream(stream_body),
1205 });
1206
1207 let exchanges = vec![ex1];
1208 let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1209
1210 let exchange = result.expect("Expected Ok result");
1211 assert!(
1212 matches!(exchange.input.body, Body::Json(_)),
1213 "Expected Json body"
1214 );
1215
1216 if let Body::Json(value) = exchange.input.body {
1217 let json_str = serde_json::to_string(&value).unwrap();
1218 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1219
1220 assert!(parsed.is_array(), "Result should be an array");
1221 let arr = parsed.as_array().unwrap();
1222 assert!(arr[0].is_object(), "First element should be an object");
1223 assert!(
1224 arr[0]["_stream"].is_object(),
1225 "Should contain _stream object"
1226 );
1227 assert_eq!(
1228 arr[0]["_stream"]["origin"],
1229 serde_json::Value::Null,
1230 "origin should be null when None"
1231 );
1232 assert_eq!(
1233 arr[0]["_stream"]["placeholder"], true,
1234 "placeholder flag should be true"
1235 );
1236 }
1237 }
1238
1239 #[tokio::test(start_paused = true)]
1240 async fn test_shutdown_awaits_timeout_handles() {
1241 let config = AggregatorConfig::correlate_by("key")
1242 .complete_on_timeout(Duration::from_millis(100))
1243 .build()
1244 .unwrap();
1245 let (tx, _rx) = mpsc::channel(256);
1246 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1247 let cancel = CancellationToken::new();
1248 let svc = AggregatorService::new(config, tx, registry, cancel);
1249
1250 let mut call_svc = svc.clone();
1252 let ex = make_exchange("key", "A", "data");
1253 let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1254
1255 assert!(
1257 !svc.timeout_handles.lock().unwrap().is_empty(),
1258 "should have a timeout handle"
1259 );
1260
1261 svc.shutdown().await;
1264
1265 assert!(
1266 svc.timeout_handles.lock().unwrap().is_empty(),
1267 "all handles should be cleaned up after shutdown"
1268 );
1269 }
1270}