1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6use std::time::{Duration, Instant};
7
8use tokio::sync::mpsc;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tower::Service;
12
13use camel_api::{
14 CamelError,
15 aggregator::{
16 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode,
17 CompletionReason, CorrelationStrategy,
18 },
19 body::Body,
20 exchange::Exchange,
21 message::Message,
22};
23use camel_language_api::Language;
24
25pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
26
27pub const CAMEL_AGGREGATOR_PENDING: &str = "CamelAggregatorPending";
28pub const CAMEL_AGGREGATED_SIZE: &str = "CamelAggregatedSize";
29pub const CAMEL_AGGREGATED_KEY: &str = "CamelAggregatedKey";
30pub const CAMEL_AGGREGATED_COMPLETION_REASON: &str = "CamelAggregatedCompletionReason";
31
32struct Bucket {
34 exchanges: Vec<Exchange>,
35 last_updated: Instant,
36}
37
38impl Bucket {
39 fn new() -> Self {
40 Self {
41 exchanges: Vec::new(),
42 last_updated: Instant::now(),
43 }
44 }
45
46 fn push(&mut self, exchange: Exchange) {
47 self.exchanges.push(exchange);
48 self.last_updated = Instant::now();
49 }
50
51 fn is_expired(&self, ttl: Duration) -> bool {
52 Instant::now().duration_since(self.last_updated) >= ttl
53 }
54}
55
56#[derive(Clone)]
57pub struct AggregatorService {
58 config: AggregatorConfig,
59 buckets: Arc<Mutex<HashMap<String, Bucket>>>,
60 timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
61 timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
62 late_tx: mpsc::Sender<Exchange>,
63 language_registry: SharedLanguageRegistry,
64 route_cancel: CancellationToken,
65}
66
67impl AggregatorService {
68 pub fn new(
69 config: AggregatorConfig,
70 late_tx: mpsc::Sender<Exchange>,
71 language_registry: SharedLanguageRegistry,
72 route_cancel: CancellationToken,
73 ) -> Self {
74 Self {
75 config,
76 buckets: Arc::new(Mutex::new(HashMap::new())),
77 timeout_tasks: Arc::new(Mutex::new(HashMap::new())),
78 timeout_handles: Arc::new(Mutex::new(HashMap::new())),
79 late_tx,
80 language_registry,
81 route_cancel,
82 }
83 }
84
85 pub fn config(&self) -> &AggregatorConfig {
86 &self.config
87 }
88
89 pub fn has_timeout(&self) -> bool {
90 has_timeout_condition(&self.config.completion)
91 }
92
93 pub fn force_complete_all(&self) {
94 let mut buckets_guard = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
95 let keys: Vec<String> = buckets_guard.keys().cloned().collect();
96
97 for key in keys {
98 if let Some(bucket) = buckets_guard.remove(&key) {
99 if self.config.force_completion_on_stop {
100 cancel_timeout_task_with_handle(
101 &key,
102 &self.timeout_tasks,
103 &self.timeout_handles,
104 );
105 match aggregate(bucket.exchanges, &self.config.strategy) {
106 Ok(mut result) => {
107 result.set_property(
108 CAMEL_AGGREGATED_COMPLETION_REASON,
109 serde_json::json!(CompletionReason::Stop.as_str()),
110 );
111 if self.late_tx.try_send(result).is_err() {
112 tracing::warn!(
113 key = %key,
114 "aggregator force-complete emit dropped: late channel full"
115 );
116 }
117 }
118 Err(e) => {
119 tracing::warn!(
121 key = %key,
122 error = %e,
123 "aggregation failed in force_complete_all"
124 );
125 }
126 }
127 } else {
128 cancel_timeout_task_with_handle(
129 &key,
130 &self.timeout_tasks,
131 &self.timeout_handles,
132 );
133 }
134 }
135 }
136 }
137
138 pub async fn shutdown(&self) {
141 {
143 let mut guard = self.timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
144 for token in guard.values() {
145 token.cancel();
146 }
147 guard.clear();
148 };
149
150 let handles: Vec<JoinHandle<()>> = {
152 let mut guard = self
153 .timeout_handles
154 .lock()
155 .unwrap_or_else(|e| e.into_inner());
156 guard.drain().map(|(_, handle)| handle).collect()
157 };
158
159 if handles.is_empty() {
160 return;
161 }
162
163 let _ = tokio::time::timeout(Duration::from_secs(5), async {
165 for handle in handles {
166 let _ = handle.await;
167 }
168 })
169 .await;
170 }
171}
172
173pub fn has_timeout_condition(mode: &CompletionMode) -> bool {
174 match mode {
175 CompletionMode::Single(CompletionCondition::Timeout(_)) => true,
176 CompletionMode::Any(conditions) => conditions
177 .iter()
178 .any(|c| matches!(c, CompletionCondition::Timeout(_))),
179 _ => false,
180 }
181}
182
183impl Service<Exchange> for AggregatorService {
184 type Response = Exchange;
185 type Error = CamelError;
186 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
187
188 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
189 Poll::Ready(Ok(()))
190 }
191
192 fn call(&mut self, exchange: Exchange) -> Self::Future {
193 let config = self.config.clone();
194 let buckets = Arc::clone(&self.buckets);
195 let timeout_tasks = Arc::clone(&self.timeout_tasks);
196 let timeout_handles = Arc::clone(&self.timeout_handles);
197 let late_tx = self.late_tx.clone();
198 let language_registry = Arc::clone(&self.language_registry);
199 let route_cancel = self.route_cancel.clone();
200
201 Box::pin(async move {
202 let key_value =
203 extract_correlation_key(&exchange, &config.correlation, &language_registry).await?;
204
205 let key_str = serde_json::to_string(&key_value)
206 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
207
208 let completed_bucket = {
209 let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
210
211 if let Some(ttl) = config.bucket_ttl {
212 guard.retain(|_, bucket| !bucket.is_expired(ttl));
213 }
214
215 if let Some(max) = config.max_buckets
216 && !guard.contains_key(&key_str)
217 && guard.len() >= max
218 {
219 tracing::warn!(
220 max_buckets = max,
221 correlation_key = %key_str,
222 "Aggregator reached max buckets limit, rejecting new correlation key"
223 );
224 return Err(CamelError::ProcessorError(format!(
225 "Aggregator reached maximum {} buckets",
226 max
227 )));
228 }
229
230 let bucket = guard.entry(key_str.clone()).or_insert_with(Bucket::new);
231 bucket.push(exchange);
232
233 let (is_complete, reason) =
234 check_sync_completion(&config.completion, &bucket.exchanges);
235
236 if is_complete {
237 let exchanges = guard.remove(&key_str).map(|b| b.exchanges);
238 (exchanges, reason)
239 } else {
240 (None, CompletionReason::Size) }
242 };
243
244 if completed_bucket.0.is_none() && has_timeout_condition(&config.completion) {
245 let cancel = {
246 let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
247 if let Some(existing) = tt_guard.get(&key_str) {
249 existing.cancel();
250 }
251 let token = CancellationToken::new();
252 tt_guard.insert(key_str.clone(), token.clone());
253 token
254 };
255
256 let timeout_dur = extract_timeout_duration(&config.completion);
257 if let Some(timeout) = timeout_dur {
258 {
260 let mut hh = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
261 if let Some(old) = hh.remove(&key_str) {
262 old.abort();
263 }
264 }
265 let handle = spawn_timeout_task(
266 key_str.clone(),
267 timeout,
268 cancel,
269 buckets.clone(),
270 timeout_tasks.clone(),
271 timeout_handles.clone(),
272 late_tx,
273 config.strategy.clone(),
274 config.discard_on_timeout,
275 route_cancel,
276 );
277 timeout_handles
278 .lock()
279 .unwrap_or_else(|e| e.into_inner())
280 .insert(key_str.clone(), handle);
281 }
282 }
283
284 if let Some(exchanges) = completed_bucket.0 {
285 cancel_timeout_task_with_handle(&key_str, &timeout_tasks, &timeout_handles);
286 let reason = completed_bucket.1;
287 let size = exchanges.len();
288 let mut result = aggregate(exchanges, &config.strategy)?;
289 result.set_property(CAMEL_AGGREGATED_SIZE, serde_json::json!(size as u64));
290 result.set_property(CAMEL_AGGREGATED_KEY, key_value);
291 result.set_property(
292 CAMEL_AGGREGATED_COMPLETION_REASON,
293 serde_json::json!(reason.as_str()),
294 );
295 Ok(result)
296 } else {
297 let mut pending = Exchange::new(Message {
298 headers: Default::default(),
299 body: Body::Empty,
300 });
301 pending.set_property(CAMEL_AGGREGATOR_PENDING, serde_json::json!(true));
302 Ok(pending)
303 }
304 })
305 }
306}
307
308async fn extract_correlation_key(
309 exchange: &Exchange,
310 strategy: &CorrelationStrategy,
311 registry: &SharedLanguageRegistry,
312) -> Result<serde_json::Value, CamelError> {
313 match strategy {
314 CorrelationStrategy::HeaderName(h) => {
315 exchange.input.headers.get(h).cloned().ok_or_else(|| {
316 CamelError::ProcessorError(format!(
317 "Aggregator: missing correlation key header '{}'",
318 h
319 ))
320 })
321 }
322 CorrelationStrategy::Expression { expr, language } => {
323 let expression = {
324 let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
325 let lang = reg.get(language).ok_or_else(|| {
326 CamelError::ProcessorError(format!(
327 "Aggregator: language '{}' not found in registry",
328 language
329 ))
330 })?;
331 lang.create_expression(expr)
332 .map_err(|e| CamelError::ProcessorError(e.to_string()))?
333 };
334 let value = expression
335 .evaluate(exchange)
336 .await
337 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
338 if value.is_null() {
339 return Err(CamelError::ProcessorError(format!(
340 "Aggregator: correlation expression '{}' evaluated to null",
341 expr
342 )));
343 }
344 Ok(value)
345 }
346 CorrelationStrategy::Fn(f) => f(exchange).map(serde_json::Value::String).ok_or_else(|| {
347 CamelError::ProcessorError("Aggregator: correlation function returned None".to_string())
348 }),
349 }
350}
351
352fn check_sync_completion(
353 mode: &CompletionMode,
354 exchanges: &[Exchange],
355) -> (bool, CompletionReason) {
356 match mode {
357 CompletionMode::Single(cond) => check_single(cond, exchanges),
358 CompletionMode::Any(conditions) => {
359 for cond in conditions {
360 if let CompletionCondition::Timeout(_) = cond {
361 continue;
362 }
363 let (done, reason) = check_single(cond, exchanges);
364 if done {
365 return (true, reason);
366 }
367 }
368 (false, CompletionReason::Size)
369 }
370 }
371}
372
373fn check_single(cond: &CompletionCondition, exchanges: &[Exchange]) -> (bool, CompletionReason) {
374 match cond {
375 CompletionCondition::Size(n) => (exchanges.len() >= *n, CompletionReason::Size),
376 CompletionCondition::Predicate(pred) => (pred(exchanges), CompletionReason::Predicate),
377 CompletionCondition::Timeout(_) => (false, CompletionReason::Timeout),
378 }
379}
380
381fn extract_timeout_duration(mode: &CompletionMode) -> Option<Duration> {
382 match mode {
383 CompletionMode::Single(CompletionCondition::Timeout(d)) => Some(*d),
384 CompletionMode::Any(conditions) => conditions.iter().find_map(|c| {
385 if let CompletionCondition::Timeout(d) = c {
386 Some(*d)
387 } else {
388 None
389 }
390 }),
391 _ => None,
392 }
393}
394
395fn cancel_timeout_task(key: &str, timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>) {
396 let mut guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
397 if let Some(token) = guard.remove(key) {
398 token.cancel();
399 }
400}
401
402fn cancel_timeout_task_with_handle(
404 key: &str,
405 timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>,
406 timeout_handles: &Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
407) {
408 cancel_timeout_task(key, timeout_tasks);
409 let mut guard = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
410 guard.remove(key);
411}
412
413#[allow(clippy::too_many_arguments)]
414fn spawn_timeout_task(
415 key: String,
416 timeout: Duration,
417 cancel: CancellationToken,
418 buckets: Arc<Mutex<HashMap<String, Bucket>>>,
419 timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
420 timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
421 late_tx: mpsc::Sender<Exchange>,
422 strategy: AggregationStrategy,
423 discard: bool,
424 _route_cancel: CancellationToken,
425) -> JoinHandle<()> {
426 let cancel_clone = cancel.clone();
427 tokio::spawn(async move {
428 tokio::select! {
429 _ = tokio::time::sleep(timeout) => {
430 let should_proceed = {
431 let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
432 if cancel_clone.is_cancelled() {
433 false
434 } else {
435 tt_guard.remove(&key);
436 true
437 }
438 };
439 if !should_proceed {
440 return;
441 }
442 {
445 let mut hh = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
446 hh.remove(&key);
447 }
448 let bucket_exchanges = {
449 let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
450 guard.remove(&key).map(|b| b.exchanges)
451 };
452 if let Some(exchanges) = bucket_exchanges
453 && !discard
454 {
455 match aggregate(exchanges, &strategy) {
456 Ok(mut result) => {
457 result.set_property(
458 CAMEL_AGGREGATED_COMPLETION_REASON,
459 serde_json::json!(CompletionReason::Timeout.as_str()),
460 );
461 if late_tx.try_send(result).is_err() {
462 tracing::warn!(
463 key = %key,
464 "aggregator timeout emit dropped: late channel full"
465 );
466 }
467 }
468 Err(e) => {
469 tracing::warn!(
471 key = %key,
472 error = %e,
473 "aggregation failed in timeout task"
474 );
475 }
476 }
477 }
478 }
479 _ = cancel_clone.cancelled() => {}
480 }
481 })
482}
483
484fn aggregate(
485 exchanges: Vec<Exchange>,
486 strategy: &AggregationStrategy,
487) -> Result<Exchange, CamelError> {
488 match strategy {
489 AggregationStrategy::CollectAll => {
490 let bodies: Vec<serde_json::Value> = exchanges
491 .into_iter()
492 .map(|e| match e.input.body {
493 Body::Json(v) => v,
494 Body::Text(s) => serde_json::Value::String(s),
495 Body::Xml(s) => serde_json::Value::String(s),
496 Body::Bytes(b) => {
497 serde_json::Value::String(String::from_utf8_lossy(&b).into_owned())
498 }
499 Body::Empty => serde_json::Value::Null,
500 Body::Stream(s) => serde_json::json!({
501 "_stream": {
502 "origin": s.metadata.origin,
503 "placeholder": true,
504 "hint": "Materialize exchange body with .into_bytes() before aggregation if content needed"
505 }
506 }),
507 })
508 .collect();
509 Ok(Exchange::new(Message {
510 headers: Default::default(),
511 body: Body::Json(serde_json::Value::Array(bodies)),
512 }))
513 }
514 AggregationStrategy::Custom(f) => {
515 let mut iter = exchanges.into_iter();
516 let first = iter.next().ok_or_else(|| {
517 CamelError::ProcessorError("Aggregator: empty bucket".to_string())
518 })?;
519 Ok(iter.fold(first, |acc, next| f(acc, next)))
520 }
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527 use std::collections::HashMap;
528
529 use camel_api::{
530 aggregator::{AggregationStrategy, AggregatorConfig},
531 body::Body,
532 exchange::Exchange,
533 message::Message,
534 };
535 use tokio::sync::mpsc;
536 use tokio_util::sync::CancellationToken;
537 use tower::ServiceExt;
538
539 fn make_exchange(header: &str, value: &str, body: &str) -> Exchange {
540 let mut msg = Message {
541 headers: Default::default(),
542 body: Body::Text(body.to_string()),
543 };
544 msg.headers
545 .insert(header.to_string(), serde_json::json!(value));
546 Exchange::new(msg)
547 }
548
549 fn config_size(n: usize) -> AggregatorConfig {
550 AggregatorConfig::correlate_by("orderId")
551 .complete_when_size(n)
552 .build()
553 .unwrap()
554 }
555
556 fn new_test_svc(config: AggregatorConfig) -> AggregatorService {
557 let (tx, _rx) = mpsc::channel(256);
558 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
559 let cancel = CancellationToken::new();
560 AggregatorService::new(config, tx, registry, cancel)
561 }
562
563 #[tokio::test]
564 async fn test_pending_exchange_not_yet_complete() {
565 let mut svc = new_test_svc(config_size(3));
566 let ex = make_exchange("orderId", "A", "first");
567 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
568 assert!(matches!(result.input.body, Body::Empty));
569 assert_eq!(
570 result.property(CAMEL_AGGREGATOR_PENDING),
571 Some(&serde_json::json!(true))
572 );
573 }
574
575 #[tokio::test]
576 async fn test_completes_on_size() {
577 let mut svc = new_test_svc(config_size(3));
578 for _ in 0..2 {
579 let ex = make_exchange("orderId", "A", "item");
580 let r = svc.ready().await.unwrap().call(ex).await.unwrap();
581 assert!(matches!(r.input.body, Body::Empty));
582 }
583 let ex = make_exchange("orderId", "A", "last");
584 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
585 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
586 assert_eq!(
587 result.property(CAMEL_AGGREGATED_SIZE),
588 Some(&serde_json::json!(3u64))
589 );
590 }
591
592 #[tokio::test]
593 async fn test_collect_all_produces_json_array() {
594 let mut svc = new_test_svc(config_size(2));
595 svc.ready()
596 .await
597 .unwrap()
598 .call(make_exchange("orderId", "A", "alpha"))
599 .await
600 .unwrap();
601 let result = svc
602 .ready()
603 .await
604 .unwrap()
605 .call(make_exchange("orderId", "A", "beta"))
606 .await
607 .unwrap();
608 let Body::Json(v) = &result.input.body else {
609 panic!("expected Body::Json")
610 };
611 let arr = v.as_array().unwrap();
612 assert_eq!(arr.len(), 2);
613 assert_eq!(arr[0], serde_json::json!("alpha"));
614 assert_eq!(arr[1], serde_json::json!("beta"));
615 }
616
617 #[tokio::test]
618 async fn test_two_keys_independent_buckets() {
619 let mut svc = new_test_svc(config_size(3));
621 svc.ready()
622 .await
623 .unwrap()
624 .call(make_exchange("orderId", "A", "a1"))
625 .await
626 .unwrap();
627 svc.ready()
628 .await
629 .unwrap()
630 .call(make_exchange("orderId", "B", "b1"))
631 .await
632 .unwrap();
633 svc.ready()
634 .await
635 .unwrap()
636 .call(make_exchange("orderId", "A", "a2"))
637 .await
638 .unwrap();
639 let ra = svc
641 .ready()
642 .await
643 .unwrap()
644 .call(make_exchange("orderId", "A", "a3"))
645 .await
646 .unwrap();
647 assert!(matches!(ra.input.body, Body::Json(_)));
649 let rb = svc
651 .ready()
652 .await
653 .unwrap()
654 .call(make_exchange("orderId", "B", "b_check"))
655 .await
656 .unwrap();
657 assert!(matches!(rb.input.body, Body::Empty));
658 }
659
660 #[tokio::test]
661 async fn test_bucket_resets_after_completion() {
662 let mut svc = new_test_svc(config_size(2));
663 svc.ready()
664 .await
665 .unwrap()
666 .call(make_exchange("orderId", "A", "x"))
667 .await
668 .unwrap();
669 svc.ready()
670 .await
671 .unwrap()
672 .call(make_exchange("orderId", "A", "x"))
673 .await
674 .unwrap(); let r = svc
677 .ready()
678 .await
679 .unwrap()
680 .call(make_exchange("orderId", "A", "new"))
681 .await
682 .unwrap();
683 assert!(matches!(r.input.body, Body::Empty)); }
685
686 #[tokio::test]
687 async fn test_completion_size_1_emits_immediately() {
688 let mut svc = new_test_svc(config_size(1));
689 let ex = make_exchange("orderId", "A", "solo");
690 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
691 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
692 }
693
694 #[tokio::test]
695 async fn test_custom_aggregation_strategy() {
696 use camel_api::aggregator::AggregationFn;
697 use std::sync::Arc;
698
699 let f: AggregationFn = Arc::new(|mut acc: Exchange, next: Exchange| {
700 let combined = format!(
701 "{}+{}",
702 acc.input.body.as_text().unwrap_or(""),
703 next.input.body.as_text().unwrap_or("")
704 );
705 acc.input.body = Body::Text(combined);
706 acc
707 });
708 let config = AggregatorConfig::correlate_by("key")
709 .complete_when_size(2)
710 .strategy(AggregationStrategy::Custom(f))
711 .build()
712 .unwrap();
713 let mut svc = new_test_svc(config);
714 svc.ready()
715 .await
716 .unwrap()
717 .call(make_exchange("key", "X", "hello"))
718 .await
719 .unwrap();
720 let result = svc
721 .ready()
722 .await
723 .unwrap()
724 .call(make_exchange("key", "X", "world"))
725 .await
726 .unwrap();
727 assert_eq!(result.input.body.as_text(), Some("hello+world"));
728 }
729
730 #[tokio::test]
731 async fn test_completion_predicate() {
732 let config = AggregatorConfig::correlate_by("key")
733 .complete_when(|bucket| {
734 bucket
735 .iter()
736 .any(|e| e.input.body.as_text() == Some("DONE"))
737 })
738 .build()
739 .unwrap();
740 let mut svc = new_test_svc(config);
741 svc.ready()
742 .await
743 .unwrap()
744 .call(make_exchange("key", "K", "first"))
745 .await
746 .unwrap();
747 svc.ready()
748 .await
749 .unwrap()
750 .call(make_exchange("key", "K", "second"))
751 .await
752 .unwrap();
753 let result = svc
754 .ready()
755 .await
756 .unwrap()
757 .call(make_exchange("key", "K", "DONE"))
758 .await
759 .unwrap();
760 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
761 }
762
763 #[tokio::test]
764 async fn test_missing_header_returns_error() {
765 let mut svc = new_test_svc(config_size(2));
766 let msg = Message {
767 headers: Default::default(),
768 body: Body::Text("no key".into()),
769 };
770 let ex = Exchange::new(msg);
771 let result = svc.ready().await.unwrap().call(ex).await;
772 assert!(result.is_err());
773 assert!(matches!(
774 result.unwrap_err(),
775 camel_api::CamelError::ProcessorError(_)
776 ));
777 }
778
779 #[tokio::test]
780 async fn test_cloned_service_shares_state() {
781 let svc1 = new_test_svc(config_size(2));
782 let mut svc2 = svc1.clone();
783 svc1.clone()
785 .ready()
786 .await
787 .unwrap()
788 .call(make_exchange("orderId", "A", "from-svc1"))
789 .await
790 .unwrap();
791 let result = svc2
793 .ready()
794 .await
795 .unwrap()
796 .call(make_exchange("orderId", "A", "from-svc2"))
797 .await
798 .unwrap();
799 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
800 }
801
802 #[tokio::test]
803 async fn test_camel_aggregated_key_property_set() {
804 let mut svc = new_test_svc(config_size(1));
805 let ex = make_exchange("orderId", "ORDER-42", "body");
806 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
807 assert_eq!(
808 result.property(CAMEL_AGGREGATED_KEY),
809 Some(&serde_json::json!("ORDER-42"))
810 );
811 }
812
813 #[tokio::test]
814 async fn test_aggregator_enforces_max_buckets() {
815 let config = AggregatorConfig::correlate_by("orderId")
816 .complete_when_size(2)
817 .max_buckets(3)
818 .build()
819 .unwrap();
820
821 let mut svc = new_test_svc(config);
822
823 for i in 0..3 {
825 let ex = make_exchange("orderId", &format!("key-{}", i), "body");
826 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
827 }
828
829 let ex = make_exchange("orderId", "key-4", "body");
831 let result = svc.ready().await.unwrap().call(ex).await;
832
833 assert!(result.is_err(), "Should reject when max buckets reached");
834 let err = result.unwrap_err().to_string();
835 assert!(
836 err.contains("maximum"),
837 "Error message should contain 'maximum': {}",
838 err
839 );
840 }
841
842 #[tokio::test]
843 async fn test_max_buckets_allows_existing_key() {
844 let config = AggregatorConfig::correlate_by("orderId")
845 .complete_when_size(5) .max_buckets(2)
847 .build()
848 .unwrap();
849
850 let mut svc = new_test_svc(config);
851
852 let ex1 = make_exchange("orderId", "key-A", "body1");
854 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
855 let ex2 = make_exchange("orderId", "key-B", "body2");
856 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
857
858 let ex3 = make_exchange("orderId", "key-A", "body3");
860 let result = svc.ready().await.unwrap().call(ex3).await;
861 assert!(
862 result.is_ok(),
863 "Should allow adding to existing bucket even at max limit"
864 );
865 }
866
867 #[tokio::test]
868 async fn test_bucket_ttl_eviction() {
869 let config = AggregatorConfig::correlate_by("orderId")
870 .complete_when_size(10) .bucket_ttl(Duration::from_millis(50))
872 .build()
873 .unwrap();
874
875 let mut svc = new_test_svc(config);
876
877 let ex1 = make_exchange("orderId", "key-A", "body1");
879 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
880
881 tokio::time::sleep(Duration::from_millis(100)).await;
883
884 let ex2 = make_exchange("orderId", "key-B", "body2");
886 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
887
888 let ex3 = make_exchange("orderId", "key-A", "body3");
891 let result = svc.ready().await.unwrap().call(ex3).await;
892 assert!(result.is_ok(), "Should be able to recreate evicted bucket");
893 }
894
895 #[tokio::test(start_paused = true)]
896 async fn test_timeout_completes_bucket() {
897 let config = AggregatorConfig::correlate_by("key")
898 .complete_on_timeout(Duration::from_millis(100))
899 .build()
900 .unwrap();
901 let mut svc = new_test_svc(config);
902 let ex = make_exchange("key", "A", "data");
903 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
904 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_some());
905
906 tokio::time::sleep(Duration::from_millis(200)).await;
907
908 assert_eq!(
909 svc.buckets.lock().unwrap().len(),
910 0,
911 "bucket should be removed after timeout"
912 );
913 }
914
915 #[tokio::test(start_paused = true)]
916 async fn test_timeout_resets_on_new_exchange() {
917 let config = AggregatorConfig::correlate_by("key")
918 .complete_on_timeout(Duration::from_millis(150))
919 .build()
920 .unwrap();
921 let mut svc = new_test_svc(config);
922
923 let ex1 = make_exchange("key", "A", "first");
924 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
925
926 tokio::time::sleep(Duration::from_millis(100)).await;
927
928 let ex2 = make_exchange("key", "A", "second");
929 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
930
931 tokio::time::sleep(Duration::from_millis(100)).await;
932
933 assert_eq!(
934 svc.buckets.lock().unwrap().len(),
935 1,
936 "bucket should still exist — timeout was reset"
937 );
938
939 tokio::time::sleep(Duration::from_millis(100)).await;
940
941 assert_eq!(
942 svc.buckets.lock().unwrap().len(),
943 0,
944 "bucket should be gone after timeout fires"
945 );
946 }
947
948 #[tokio::test]
949 async fn test_composable_size_and_timeout() {
950 let config = AggregatorConfig::correlate_by("key")
951 .complete_on_size_or_timeout(2, Duration::from_millis(200))
952 .build()
953 .unwrap();
954 let mut svc = new_test_svc(config);
955
956 let ex1 = make_exchange("key", "A", "first");
957 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
958 assert!(svc.buckets.lock().unwrap().contains_key("\"A\""));
959
960 let ex2 = make_exchange("key", "A", "second");
961 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
962 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
963 assert_eq!(
964 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
965 Some(&serde_json::json!("size"))
966 );
967 }
968
969 #[tokio::test(start_paused = true)]
970 async fn test_discard_on_timeout() {
971 let config = AggregatorConfig::correlate_by("key")
972 .complete_on_timeout(Duration::from_millis(50))
973 .discard_on_timeout(true)
974 .build()
975 .unwrap();
976 let (tx, mut rx) = mpsc::channel(256);
977 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
978 let cancel = CancellationToken::new();
979 let mut svc = AggregatorService::new(config, tx, registry, cancel);
980
981 let ex = make_exchange("key", "A", "data");
982 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
983
984 tokio::time::sleep(Duration::from_millis(100)).await;
985
986 assert!(
987 rx.try_recv().is_err(),
988 "no emit expected with discard_on_timeout"
989 );
990 assert_eq!(svc.buckets.lock().unwrap().len(), 0);
991 assert!(
992 svc.timeout_tasks.lock().unwrap().is_empty(),
993 "timeout task should be cleaned up"
994 );
995 }
996
997 #[tokio::test]
998 async fn test_force_completion_on_stop() {
999 let config = AggregatorConfig::correlate_by("key")
1000 .complete_when_size(10)
1001 .force_completion_on_stop(true)
1002 .build()
1003 .unwrap();
1004 let (tx, mut rx) = mpsc::channel(256);
1005 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1006 let cancel = CancellationToken::new();
1007 let svc = AggregatorService::new(config, tx, registry, cancel);
1008
1009 let mut call_svc = svc.clone();
1010 let ex = make_exchange("key", "A", "data");
1011 let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1012
1013 svc.force_complete_all();
1014
1015 let result = rx.try_recv().expect("should emit on force-complete");
1016 assert!(
1017 result.input.body.as_text().is_some() || matches!(result.input.body, Body::Json(_))
1018 );
1019 assert_eq!(
1020 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1021 Some(&serde_json::json!("stop"))
1022 );
1023 }
1024
1025 #[tokio::test]
1026 async fn test_completion_reason_property_size() {
1027 let config = AggregatorConfig::correlate_by("key")
1028 .complete_when_size(1)
1029 .build()
1030 .unwrap();
1031 let mut svc = new_test_svc(config);
1032 let ex = make_exchange("key", "X", "body");
1033 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1034 assert_eq!(
1035 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1036 Some(&serde_json::json!("size"))
1037 );
1038 }
1039
1040 #[tokio::test]
1041 async fn test_completion_reason_property_predicate() {
1042 let config = AggregatorConfig::correlate_by("key")
1043 .complete_when(|_| true)
1044 .build()
1045 .unwrap();
1046 let mut svc = new_test_svc(config);
1047 let ex = make_exchange("key", "X", "body");
1048 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1049 assert_eq!(
1050 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1051 Some(&serde_json::json!("predicate"))
1052 );
1053 }
1054
1055 #[tokio::test(start_paused = true)]
1056 async fn test_size_completes_before_timeout() {
1057 let config = AggregatorConfig::correlate_by("key")
1058 .complete_on_size_or_timeout(2, Duration::from_millis(200))
1059 .build()
1060 .unwrap();
1061 let mut svc = new_test_svc(config);
1062
1063 let ex1 = make_exchange("key", "A", "first");
1064 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
1065
1066 let ex2 = make_exchange("key", "A", "second");
1067 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1068
1069 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
1070 assert_eq!(
1071 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1072 Some(&serde_json::json!("size"))
1073 );
1074 assert_eq!(svc.buckets.lock().unwrap().len(), 0);
1075
1076 tokio::time::sleep(Duration::from_millis(300)).await;
1077 assert_eq!(
1078 svc.buckets.lock().unwrap().len(),
1079 0,
1080 "no re-fire after timeout"
1081 );
1082 }
1083
1084 #[tokio::test(start_paused = true)]
1085 async fn test_concurrent_timeout_fire_and_new_exchange() {
1086 let config = AggregatorConfig::correlate_by("key")
1087 .complete_on_size_or_timeout(2, Duration::from_millis(100))
1088 .build()
1089 .unwrap();
1090 let (tx, mut rx) = mpsc::channel(256);
1091 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1092 let cancel = CancellationToken::new();
1093 let mut svc = AggregatorService::new(config, tx, registry, cancel);
1094
1095 let ex = make_exchange("key", "A", "data");
1096 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1097
1098 tokio::time::sleep(Duration::from_millis(150)).await;
1100
1101 let ex2 = make_exchange("key", "A", "data2");
1103 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1104 assert!(
1105 result.property(CAMEL_AGGREGATOR_PENDING).is_some(),
1106 "should be pending in new bucket"
1107 );
1108
1109 let mut late_count = 0;
1111 while rx.try_recv().is_ok() {
1112 late_count += 1;
1113 }
1114 assert_eq!(
1115 late_count, 1,
1116 "exactly 1 late emit from the timed-out bucket"
1117 );
1118 }
1119
1120 #[tokio::test(start_paused = true)]
1121 async fn test_late_channel_full_drops_with_warning() {
1122 let config = AggregatorConfig::correlate_by("key")
1123 .complete_on_timeout(Duration::from_millis(50))
1124 .build()
1125 .unwrap();
1126 let (tx, mut rx) = mpsc::channel(1);
1127 rx.close();
1128 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1129 let cancel = CancellationToken::new();
1130 let mut svc = AggregatorService::new(config, tx, registry, cancel);
1131
1132 let ex = make_exchange("key", "A", "data");
1133 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1134
1135 tokio::time::sleep(Duration::from_millis(100)).await;
1136 assert_eq!(
1137 svc.buckets.lock().unwrap().len(),
1138 0,
1139 "bucket removed despite channel closed"
1140 );
1141 }
1142
1143 #[tokio::test]
1144 async fn test_aggregate_stream_bodies_creates_valid_json() {
1145 use bytes::Bytes;
1146 use camel_api::{Body, StreamBody, StreamMetadata};
1147 use futures::stream;
1148 use tokio::sync::Mutex;
1149
1150 let chunks = vec![Ok(Bytes::from("test"))];
1151 let stream_body = StreamBody {
1152 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1153 metadata: StreamMetadata {
1154 origin: Some("file:///test.txt".to_string()),
1155 ..Default::default()
1156 },
1157 };
1158
1159 let ex1 = Exchange::new(Message {
1160 headers: Default::default(),
1161 body: Body::Stream(stream_body),
1162 });
1163
1164 let exchanges = vec![ex1];
1165 let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1166
1167 let exchange = result.expect("Expected Ok result");
1168 assert!(
1169 matches!(exchange.input.body, Body::Json(_)),
1170 "Expected Json body"
1171 );
1172
1173 if let Body::Json(value) = exchange.input.body {
1174 let json_str = serde_json::to_string(&value).unwrap();
1175 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1176
1177 assert!(parsed.is_array(), "Result should be an array");
1178 let arr = parsed.as_array().unwrap();
1179 assert!(arr[0].is_object(), "First element should be an object");
1180 assert!(
1181 arr[0]["_stream"].is_object(),
1182 "Should contain _stream object"
1183 );
1184 assert_eq!(arr[0]["_stream"]["origin"], "file:///test.txt");
1185 assert_eq!(
1186 arr[0]["_stream"]["placeholder"], true,
1187 "placeholder flag should be true"
1188 );
1189 }
1190 }
1191
1192 #[tokio::test]
1193 async fn test_aggregate_stream_bodies_with_none_origin() {
1194 use bytes::Bytes;
1195 use camel_api::{Body, StreamBody, StreamMetadata};
1196 use futures::stream;
1197 use tokio::sync::Mutex;
1198
1199 let chunks = vec![Ok(Bytes::from("test"))];
1200 let stream_body = StreamBody {
1201 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1202 metadata: StreamMetadata {
1203 origin: None,
1204 ..Default::default()
1205 },
1206 };
1207
1208 let ex1 = Exchange::new(Message {
1209 headers: Default::default(),
1210 body: Body::Stream(stream_body),
1211 });
1212
1213 let exchanges = vec![ex1];
1214 let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1215
1216 let exchange = result.expect("Expected Ok result");
1217 assert!(
1218 matches!(exchange.input.body, Body::Json(_)),
1219 "Expected Json body"
1220 );
1221
1222 if let Body::Json(value) = exchange.input.body {
1223 let json_str = serde_json::to_string(&value).unwrap();
1224 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1225
1226 assert!(parsed.is_array(), "Result should be an array");
1227 let arr = parsed.as_array().unwrap();
1228 assert!(arr[0].is_object(), "First element should be an object");
1229 assert!(
1230 arr[0]["_stream"].is_object(),
1231 "Should contain _stream object"
1232 );
1233 assert_eq!(
1234 arr[0]["_stream"]["origin"],
1235 serde_json::Value::Null,
1236 "origin should be null when None"
1237 );
1238 assert_eq!(
1239 arr[0]["_stream"]["placeholder"], true,
1240 "placeholder flag should be true"
1241 );
1242 }
1243 }
1244
1245 #[tokio::test]
1246 async fn timeout_completion_clears_handle_from_map() {
1247 let config = AggregatorConfig::correlate_by("key")
1252 .complete_on_timeout(Duration::from_millis(50))
1253 .build()
1254 .unwrap();
1255 let (tx, _rx) = mpsc::channel(256);
1256 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1257 let cancel = CancellationToken::new();
1258 let svc = AggregatorService::new(config, tx, registry, cancel);
1259
1260 let mut call_svc = svc.clone();
1262 let ex = make_exchange("key", "A", "data");
1263 let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1264 assert!(
1265 !svc.timeout_handles.lock().unwrap().is_empty(),
1266 "handle should exist while timeout pending"
1267 );
1268
1269 tokio::time::sleep(Duration::from_millis(200)).await;
1271
1272 assert!(
1273 svc.timeout_handles.lock().unwrap().is_empty(),
1274 "handle should be cleared from map after natural timeout completion (was leak)"
1275 );
1276 }
1277
1278 #[tokio::test(start_paused = true)]
1279 async fn test_shutdown_awaits_timeout_handles() {
1280 let config = AggregatorConfig::correlate_by("key")
1281 .complete_on_timeout(Duration::from_millis(100))
1282 .build()
1283 .unwrap();
1284 let (tx, _rx) = mpsc::channel(256);
1285 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1286 let cancel = CancellationToken::new();
1287 let svc = AggregatorService::new(config, tx, registry, cancel);
1288
1289 let mut call_svc = svc.clone();
1291 let ex = make_exchange("key", "A", "data");
1292 let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1293
1294 assert!(
1296 !svc.timeout_handles.lock().unwrap().is_empty(),
1297 "should have a timeout handle"
1298 );
1299
1300 svc.shutdown().await;
1303
1304 assert!(
1305 svc.timeout_handles.lock().unwrap().is_empty(),
1306 "all handles should be cleaned up after shutdown"
1307 );
1308 }
1309}