1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6use std::time::{Duration, Instant};
7
8use tokio::sync::mpsc;
9use tokio_util::sync::CancellationToken;
10use tower::Service;
11
12use camel_api::{
13 CamelError,
14 aggregator::{
15 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode,
16 CompletionReason, CorrelationStrategy,
17 },
18 body::Body,
19 exchange::Exchange,
20 message::Message,
21};
22use camel_language_api::Language;
23
24pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
25
26pub const CAMEL_AGGREGATOR_PENDING: &str = "CamelAggregatorPending";
27pub const CAMEL_AGGREGATED_SIZE: &str = "CamelAggregatedSize";
28pub const CAMEL_AGGREGATED_KEY: &str = "CamelAggregatedKey";
29pub const CAMEL_AGGREGATED_COMPLETION_REASON: &str = "CamelAggregatedCompletionReason";
30
31struct Bucket {
33 exchanges: Vec<Exchange>,
34 #[allow(dead_code)]
35 created_at: Instant,
36 last_updated: Instant,
37}
38
39impl Bucket {
40 fn new() -> Self {
41 let now = Instant::now();
42 Self {
43 exchanges: Vec::new(),
44 created_at: now,
45 last_updated: now,
46 }
47 }
48
49 fn push(&mut self, exchange: Exchange) {
50 self.exchanges.push(exchange);
51 self.last_updated = Instant::now();
52 }
53
54 #[allow(dead_code)]
55 fn len(&self) -> usize {
56 self.exchanges.len()
57 }
58
59 fn is_expired(&self, ttl: Duration) -> bool {
60 Instant::now().duration_since(self.last_updated) >= ttl
61 }
62}
63
64#[derive(Clone)]
65pub struct AggregatorService {
66 config: AggregatorConfig,
67 buckets: Arc<Mutex<HashMap<String, Bucket>>>,
68 timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
69 late_tx: mpsc::Sender<Exchange>,
70 language_registry: SharedLanguageRegistry,
71 route_cancel: CancellationToken,
72}
73
74impl AggregatorService {
75 pub fn new(
76 config: AggregatorConfig,
77 late_tx: mpsc::Sender<Exchange>,
78 language_registry: SharedLanguageRegistry,
79 route_cancel: CancellationToken,
80 ) -> Self {
81 Self {
82 config,
83 buckets: Arc::new(Mutex::new(HashMap::new())),
84 timeout_tasks: Arc::new(Mutex::new(HashMap::new())),
85 late_tx,
86 language_registry,
87 route_cancel,
88 }
89 }
90
91 pub fn has_timeout(&self) -> bool {
92 has_timeout_condition(&self.config.completion)
93 }
94
95 pub fn force_complete_all(&self) {
96 let mut buckets_guard = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
97 let keys: Vec<String> = buckets_guard.keys().cloned().collect();
98
99 for key in keys {
100 if let Some(bucket) = buckets_guard.remove(&key) {
101 if self.config.force_completion_on_stop {
102 cancel_timeout_task(&key, &self.timeout_tasks);
103 match aggregate(bucket.exchanges, &self.config.strategy) {
104 Ok(mut result) => {
105 result.set_property(
106 CAMEL_AGGREGATED_COMPLETION_REASON,
107 serde_json::json!(CompletionReason::Stop.as_str()),
108 );
109 if self.late_tx.try_send(result).is_err() {
110 tracing::warn!(
111 key = %key,
112 "aggregator force-complete emit dropped: late channel full"
113 );
114 }
115 }
116 Err(e) => {
117 tracing::error!(
118 key = %key,
119 error = %e,
120 "aggregation failed in force_complete_all"
121 );
122 }
123 }
124 } else {
125 cancel_timeout_task(&key, &self.timeout_tasks);
126 }
127 }
128 }
129 }
130}
131
132pub fn has_timeout_condition(mode: &CompletionMode) -> bool {
133 match mode {
134 CompletionMode::Single(CompletionCondition::Timeout(_)) => true,
135 CompletionMode::Any(conditions) => conditions
136 .iter()
137 .any(|c| matches!(c, CompletionCondition::Timeout(_))),
138 _ => false,
139 }
140}
141
142impl Service<Exchange> for AggregatorService {
143 type Response = Exchange;
144 type Error = CamelError;
145 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
146
147 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
148 Poll::Ready(Ok(()))
149 }
150
151 fn call(&mut self, exchange: Exchange) -> Self::Future {
152 let config = self.config.clone();
153 let buckets = Arc::clone(&self.buckets);
154 let timeout_tasks = Arc::clone(&self.timeout_tasks);
155 let late_tx = self.late_tx.clone();
156 let language_registry = Arc::clone(&self.language_registry);
157 let route_cancel = self.route_cancel.clone();
158
159 Box::pin(async move {
160 let key_value =
161 extract_correlation_key(&exchange, &config.correlation, &language_registry)?;
162
163 let key_str = serde_json::to_string(&key_value)
164 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
165
166 let completed_bucket = {
167 let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
168
169 if let Some(ttl) = config.bucket_ttl {
170 guard.retain(|_, bucket| !bucket.is_expired(ttl));
171 }
172
173 if let Some(max) = config.max_buckets
174 && !guard.contains_key(&key_str)
175 && guard.len() >= max
176 {
177 tracing::warn!(
178 max_buckets = max,
179 correlation_key = %key_str,
180 "Aggregator reached max buckets limit, rejecting new correlation key"
181 );
182 return Err(CamelError::ProcessorError(format!(
183 "Aggregator reached maximum {} buckets",
184 max
185 )));
186 }
187
188 let bucket = guard.entry(key_str.clone()).or_insert_with(Bucket::new);
189 bucket.push(exchange);
190
191 let (is_complete, reason) =
192 check_sync_completion(&config.completion, &bucket.exchanges);
193
194 if is_complete {
195 let exchanges = guard.remove(&key_str).map(|b| b.exchanges);
196 (exchanges, reason)
197 } else {
198 (None, CompletionReason::Size) }
200 };
201
202 if completed_bucket.0.is_none() && has_timeout_condition(&config.completion) {
203 let cancel = {
204 let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
205 if let Some(existing) = tt_guard.get(&key_str) {
206 existing.cancel();
207 }
208 let token = CancellationToken::new();
209 tt_guard.insert(key_str.clone(), token.clone());
210 token
211 };
212
213 let timeout_dur = extract_timeout_duration(&config.completion);
214 if let Some(timeout) = timeout_dur {
215 spawn_timeout_task(
216 key_str.clone(),
217 timeout,
218 cancel,
219 buckets.clone(),
220 timeout_tasks.clone(),
221 late_tx,
222 config.strategy.clone(),
223 config.discard_on_timeout,
224 route_cancel,
225 );
226 }
227 }
228
229 if let Some(exchanges) = completed_bucket.0 {
230 cancel_timeout_task(&key_str, &timeout_tasks);
231 let reason = completed_bucket.1;
232 let size = exchanges.len();
233 let mut result = aggregate(exchanges, &config.strategy)?;
234 result.set_property(CAMEL_AGGREGATED_SIZE, serde_json::json!(size as u64));
235 result.set_property(CAMEL_AGGREGATED_KEY, key_value);
236 result.set_property(
237 CAMEL_AGGREGATED_COMPLETION_REASON,
238 serde_json::json!(reason.as_str()),
239 );
240 Ok(result)
241 } else {
242 let mut pending = Exchange::new(Message {
243 headers: Default::default(),
244 body: Body::Empty,
245 });
246 pending.set_property(CAMEL_AGGREGATOR_PENDING, serde_json::json!(true));
247 Ok(pending)
248 }
249 })
250 }
251}
252
253fn extract_correlation_key(
254 exchange: &Exchange,
255 strategy: &CorrelationStrategy,
256 registry: &SharedLanguageRegistry,
257) -> Result<serde_json::Value, CamelError> {
258 match strategy {
259 CorrelationStrategy::HeaderName(h) => {
260 exchange.input.headers.get(h).cloned().ok_or_else(|| {
261 CamelError::ProcessorError(format!(
262 "Aggregator: missing correlation key header '{}'",
263 h
264 ))
265 })
266 }
267 CorrelationStrategy::Expression { expr, language } => {
268 let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
269 let lang = reg.get(language).ok_or_else(|| {
270 CamelError::ProcessorError(format!(
271 "Aggregator: language '{}' not found in registry",
272 language
273 ))
274 })?;
275 let expression = lang
276 .create_expression(expr)
277 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
278 let value = expression
279 .evaluate(exchange)
280 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
281 if value.is_null() {
282 return Err(CamelError::ProcessorError(format!(
283 "Aggregator: correlation expression '{}' evaluated to null",
284 expr
285 )));
286 }
287 Ok(value)
288 }
289 CorrelationStrategy::Fn(f) => f(exchange).map(serde_json::Value::String).ok_or_else(|| {
290 CamelError::ProcessorError("Aggregator: correlation function returned None".to_string())
291 }),
292 }
293}
294
295fn check_sync_completion(
296 mode: &CompletionMode,
297 exchanges: &[Exchange],
298) -> (bool, CompletionReason) {
299 match mode {
300 CompletionMode::Single(cond) => check_single(cond, exchanges),
301 CompletionMode::Any(conditions) => {
302 for cond in conditions {
303 if let CompletionCondition::Timeout(_) = cond {
304 continue;
305 }
306 let (done, reason) = check_single(cond, exchanges);
307 if done {
308 return (true, reason);
309 }
310 }
311 (false, CompletionReason::Size)
312 }
313 }
314}
315
316fn check_single(cond: &CompletionCondition, exchanges: &[Exchange]) -> (bool, CompletionReason) {
317 match cond {
318 CompletionCondition::Size(n) => (exchanges.len() >= *n, CompletionReason::Size),
319 CompletionCondition::Predicate(pred) => (pred(exchanges), CompletionReason::Predicate),
320 CompletionCondition::Timeout(_) => (false, CompletionReason::Timeout),
321 }
322}
323
324fn extract_timeout_duration(mode: &CompletionMode) -> Option<Duration> {
325 match mode {
326 CompletionMode::Single(CompletionCondition::Timeout(d)) => Some(*d),
327 CompletionMode::Any(conditions) => conditions.iter().find_map(|c| {
328 if let CompletionCondition::Timeout(d) = c {
329 Some(*d)
330 } else {
331 None
332 }
333 }),
334 _ => None,
335 }
336}
337
338fn cancel_timeout_task(key: &str, timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>) {
339 let mut guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
340 if let Some(token) = guard.remove(key) {
341 token.cancel();
342 }
343}
344
345#[allow(clippy::too_many_arguments)]
346fn spawn_timeout_task(
347 key: String,
348 timeout: Duration,
349 cancel: CancellationToken,
350 buckets: Arc<Mutex<HashMap<String, Bucket>>>,
351 timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
352 late_tx: mpsc::Sender<Exchange>,
353 strategy: AggregationStrategy,
354 discard: bool,
355 _route_cancel: CancellationToken,
356) {
357 let cancel_clone = cancel.clone();
358 tokio::spawn(async move {
359 tokio::select! {
360 _ = tokio::time::sleep(timeout) => {
361 let should_proceed = {
362 let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
363 if cancel_clone.is_cancelled() {
364 false
365 } else {
366 tt_guard.remove(&key);
367 true
368 }
369 };
370 if !should_proceed {
371 return;
372 }
373 let bucket_exchanges = {
374 let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
375 guard.remove(&key).map(|b| b.exchanges)
376 };
377 if let Some(exchanges) = bucket_exchanges
378 && !discard
379 {
380 match aggregate(exchanges, &strategy) {
381 Ok(mut result) => {
382 result.set_property(
383 CAMEL_AGGREGATED_COMPLETION_REASON,
384 serde_json::json!(CompletionReason::Timeout.as_str()),
385 );
386 if late_tx.try_send(result).is_err() {
387 tracing::warn!(
388 key = %key,
389 "aggregator timeout emit dropped: late channel full"
390 );
391 }
392 }
393 Err(e) => {
394 tracing::error!(
395 key = %key,
396 error = %e,
397 "aggregation failed in timeout task"
398 );
399 }
400 }
401 }
402 }
403 _ = cancel_clone.cancelled() => {}
404 }
405 });
406}
407
408fn aggregate(
409 exchanges: Vec<Exchange>,
410 strategy: &AggregationStrategy,
411) -> Result<Exchange, CamelError> {
412 match strategy {
413 AggregationStrategy::CollectAll => {
414 let bodies: Vec<serde_json::Value> = exchanges
415 .into_iter()
416 .map(|e| match e.input.body {
417 Body::Json(v) => v,
418 Body::Text(s) => serde_json::Value::String(s),
419 Body::Xml(s) => serde_json::Value::String(s),
420 Body::Bytes(b) => {
421 serde_json::Value::String(String::from_utf8_lossy(&b).into_owned())
422 }
423 Body::Empty => serde_json::Value::Null,
424 Body::Stream(s) => serde_json::json!({
425 "_stream": {
426 "origin": s.metadata.origin,
427 "placeholder": true,
428 "hint": "Materialize exchange body with .into_bytes() before aggregation if content needed"
429 }
430 }),
431 })
432 .collect();
433 Ok(Exchange::new(Message {
434 headers: Default::default(),
435 body: Body::Json(serde_json::Value::Array(bodies)),
436 }))
437 }
438 AggregationStrategy::Custom(f) => {
439 let mut iter = exchanges.into_iter();
440 let first = iter.next().ok_or_else(|| {
441 CamelError::ProcessorError("Aggregator: empty bucket".to_string())
442 })?;
443 Ok(iter.fold(first, |acc, next| f(acc, next)))
444 }
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use super::*;
451 use std::collections::HashMap;
452
453 use camel_api::{
454 aggregator::{AggregationStrategy, AggregatorConfig},
455 body::Body,
456 exchange::Exchange,
457 message::Message,
458 };
459 use tokio::sync::mpsc;
460 use tokio_util::sync::CancellationToken;
461 use tower::ServiceExt;
462
463 fn make_exchange(header: &str, value: &str, body: &str) -> Exchange {
464 let mut msg = Message {
465 headers: Default::default(),
466 body: Body::Text(body.to_string()),
467 };
468 msg.headers
469 .insert(header.to_string(), serde_json::json!(value));
470 Exchange::new(msg)
471 }
472
473 fn config_size(n: usize) -> AggregatorConfig {
474 AggregatorConfig::correlate_by("orderId")
475 .complete_when_size(n)
476 .build()
477 }
478
479 fn new_test_svc(config: AggregatorConfig) -> AggregatorService {
480 let (tx, _rx) = mpsc::channel(256);
481 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
482 let cancel = CancellationToken::new();
483 AggregatorService::new(config, tx, registry, cancel)
484 }
485
486 #[tokio::test]
487 async fn test_pending_exchange_not_yet_complete() {
488 let mut svc = new_test_svc(config_size(3));
489 let ex = make_exchange("orderId", "A", "first");
490 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
491 assert!(matches!(result.input.body, Body::Empty));
492 assert_eq!(
493 result.property(CAMEL_AGGREGATOR_PENDING),
494 Some(&serde_json::json!(true))
495 );
496 }
497
498 #[tokio::test]
499 async fn test_completes_on_size() {
500 let mut svc = new_test_svc(config_size(3));
501 for _ in 0..2 {
502 let ex = make_exchange("orderId", "A", "item");
503 let r = svc.ready().await.unwrap().call(ex).await.unwrap();
504 assert!(matches!(r.input.body, Body::Empty));
505 }
506 let ex = make_exchange("orderId", "A", "last");
507 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
508 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
509 assert_eq!(
510 result.property(CAMEL_AGGREGATED_SIZE),
511 Some(&serde_json::json!(3u64))
512 );
513 }
514
515 #[tokio::test]
516 async fn test_collect_all_produces_json_array() {
517 let mut svc = new_test_svc(config_size(2));
518 svc.ready()
519 .await
520 .unwrap()
521 .call(make_exchange("orderId", "A", "alpha"))
522 .await
523 .unwrap();
524 let result = svc
525 .ready()
526 .await
527 .unwrap()
528 .call(make_exchange("orderId", "A", "beta"))
529 .await
530 .unwrap();
531 let Body::Json(v) = &result.input.body else {
532 panic!("expected Body::Json")
533 };
534 let arr = v.as_array().unwrap();
535 assert_eq!(arr.len(), 2);
536 assert_eq!(arr[0], serde_json::json!("alpha"));
537 assert_eq!(arr[1], serde_json::json!("beta"));
538 }
539
540 #[tokio::test]
541 async fn test_two_keys_independent_buckets() {
542 let mut svc = new_test_svc(config_size(3));
544 svc.ready()
545 .await
546 .unwrap()
547 .call(make_exchange("orderId", "A", "a1"))
548 .await
549 .unwrap();
550 svc.ready()
551 .await
552 .unwrap()
553 .call(make_exchange("orderId", "B", "b1"))
554 .await
555 .unwrap();
556 svc.ready()
557 .await
558 .unwrap()
559 .call(make_exchange("orderId", "A", "a2"))
560 .await
561 .unwrap();
562 let ra = svc
564 .ready()
565 .await
566 .unwrap()
567 .call(make_exchange("orderId", "A", "a3"))
568 .await
569 .unwrap();
570 assert!(matches!(ra.input.body, Body::Json(_)));
572 let rb = svc
574 .ready()
575 .await
576 .unwrap()
577 .call(make_exchange("orderId", "B", "b_check"))
578 .await
579 .unwrap();
580 assert!(matches!(rb.input.body, Body::Empty));
581 }
582
583 #[tokio::test]
584 async fn test_bucket_resets_after_completion() {
585 let mut svc = new_test_svc(config_size(2));
586 svc.ready()
587 .await
588 .unwrap()
589 .call(make_exchange("orderId", "A", "x"))
590 .await
591 .unwrap();
592 svc.ready()
593 .await
594 .unwrap()
595 .call(make_exchange("orderId", "A", "x"))
596 .await
597 .unwrap(); let r = svc
600 .ready()
601 .await
602 .unwrap()
603 .call(make_exchange("orderId", "A", "new"))
604 .await
605 .unwrap();
606 assert!(matches!(r.input.body, Body::Empty)); }
608
609 #[tokio::test]
610 async fn test_completion_size_1_emits_immediately() {
611 let mut svc = new_test_svc(config_size(1));
612 let ex = make_exchange("orderId", "A", "solo");
613 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
614 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
615 }
616
617 #[tokio::test]
618 async fn test_custom_aggregation_strategy() {
619 use camel_api::aggregator::AggregationFn;
620 use std::sync::Arc;
621
622 let f: AggregationFn = Arc::new(|mut acc: Exchange, next: Exchange| {
623 let combined = format!(
624 "{}+{}",
625 acc.input.body.as_text().unwrap_or(""),
626 next.input.body.as_text().unwrap_or("")
627 );
628 acc.input.body = Body::Text(combined);
629 acc
630 });
631 let config = AggregatorConfig::correlate_by("key")
632 .complete_when_size(2)
633 .strategy(AggregationStrategy::Custom(f))
634 .build();
635 let mut svc = new_test_svc(config);
636 svc.ready()
637 .await
638 .unwrap()
639 .call(make_exchange("key", "X", "hello"))
640 .await
641 .unwrap();
642 let result = svc
643 .ready()
644 .await
645 .unwrap()
646 .call(make_exchange("key", "X", "world"))
647 .await
648 .unwrap();
649 assert_eq!(result.input.body.as_text(), Some("hello+world"));
650 }
651
652 #[tokio::test]
653 async fn test_completion_predicate() {
654 let config = AggregatorConfig::correlate_by("key")
655 .complete_when(|bucket| {
656 bucket
657 .iter()
658 .any(|e| e.input.body.as_text() == Some("DONE"))
659 })
660 .build();
661 let mut svc = new_test_svc(config);
662 svc.ready()
663 .await
664 .unwrap()
665 .call(make_exchange("key", "K", "first"))
666 .await
667 .unwrap();
668 svc.ready()
669 .await
670 .unwrap()
671 .call(make_exchange("key", "K", "second"))
672 .await
673 .unwrap();
674 let result = svc
675 .ready()
676 .await
677 .unwrap()
678 .call(make_exchange("key", "K", "DONE"))
679 .await
680 .unwrap();
681 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
682 }
683
684 #[tokio::test]
685 async fn test_missing_header_returns_error() {
686 let mut svc = new_test_svc(config_size(2));
687 let msg = Message {
688 headers: Default::default(),
689 body: Body::Text("no key".into()),
690 };
691 let ex = Exchange::new(msg);
692 let result = svc.ready().await.unwrap().call(ex).await;
693 assert!(result.is_err());
694 assert!(matches!(
695 result.unwrap_err(),
696 camel_api::CamelError::ProcessorError(_)
697 ));
698 }
699
700 #[tokio::test]
701 async fn test_cloned_service_shares_state() {
702 let svc1 = new_test_svc(config_size(2));
703 let mut svc2 = svc1.clone();
704 svc1.clone()
706 .ready()
707 .await
708 .unwrap()
709 .call(make_exchange("orderId", "A", "from-svc1"))
710 .await
711 .unwrap();
712 let result = svc2
714 .ready()
715 .await
716 .unwrap()
717 .call(make_exchange("orderId", "A", "from-svc2"))
718 .await
719 .unwrap();
720 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
721 }
722
723 #[tokio::test]
724 async fn test_camel_aggregated_key_property_set() {
725 let mut svc = new_test_svc(config_size(1));
726 let ex = make_exchange("orderId", "ORDER-42", "body");
727 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
728 assert_eq!(
729 result.property(CAMEL_AGGREGATED_KEY),
730 Some(&serde_json::json!("ORDER-42"))
731 );
732 }
733
734 #[tokio::test]
735 async fn test_aggregator_enforces_max_buckets() {
736 let config = AggregatorConfig::correlate_by("orderId")
737 .complete_when_size(2)
738 .max_buckets(3)
739 .build();
740
741 let mut svc = new_test_svc(config);
742
743 for i in 0..3 {
745 let ex = make_exchange("orderId", &format!("key-{}", i), "body");
746 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
747 }
748
749 let ex = make_exchange("orderId", "key-4", "body");
751 let result = svc.ready().await.unwrap().call(ex).await;
752
753 assert!(result.is_err(), "Should reject when max buckets reached");
754 let err = result.unwrap_err().to_string();
755 assert!(
756 err.contains("maximum"),
757 "Error message should contain 'maximum': {}",
758 err
759 );
760 }
761
762 #[tokio::test]
763 async fn test_max_buckets_allows_existing_key() {
764 let config = AggregatorConfig::correlate_by("orderId")
765 .complete_when_size(5) .max_buckets(2)
767 .build();
768
769 let mut svc = new_test_svc(config);
770
771 let ex1 = make_exchange("orderId", "key-A", "body1");
773 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
774 let ex2 = make_exchange("orderId", "key-B", "body2");
775 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
776
777 let ex3 = make_exchange("orderId", "key-A", "body3");
779 let result = svc.ready().await.unwrap().call(ex3).await;
780 assert!(
781 result.is_ok(),
782 "Should allow adding to existing bucket even at max limit"
783 );
784 }
785
786 #[tokio::test]
787 async fn test_bucket_ttl_eviction() {
788 let config = AggregatorConfig::correlate_by("orderId")
789 .complete_when_size(10) .bucket_ttl(Duration::from_millis(50))
791 .build();
792
793 let mut svc = new_test_svc(config);
794
795 let ex1 = make_exchange("orderId", "key-A", "body1");
797 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
798
799 tokio::time::sleep(Duration::from_millis(100)).await;
801
802 let ex2 = make_exchange("orderId", "key-B", "body2");
804 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
805
806 let ex3 = make_exchange("orderId", "key-A", "body3");
809 let result = svc.ready().await.unwrap().call(ex3).await;
810 assert!(result.is_ok(), "Should be able to recreate evicted bucket");
811 }
812
813 #[tokio::test(start_paused = true)]
814 async fn test_timeout_completes_bucket() {
815 let config = AggregatorConfig::correlate_by("key")
816 .complete_on_timeout(Duration::from_millis(100))
817 .build();
818 let mut svc = new_test_svc(config);
819 let ex = make_exchange("key", "A", "data");
820 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
821 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_some());
822
823 tokio::time::sleep(Duration::from_millis(200)).await;
824
825 assert_eq!(
826 svc.buckets.lock().unwrap().len(),
827 0,
828 "bucket should be removed after timeout"
829 );
830 }
831
832 #[tokio::test(start_paused = true)]
833 async fn test_timeout_resets_on_new_exchange() {
834 let config = AggregatorConfig::correlate_by("key")
835 .complete_on_timeout(Duration::from_millis(150))
836 .build();
837 let mut svc = new_test_svc(config);
838
839 let ex1 = make_exchange("key", "A", "first");
840 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
841
842 tokio::time::sleep(Duration::from_millis(100)).await;
843
844 let ex2 = make_exchange("key", "A", "second");
845 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
846
847 tokio::time::sleep(Duration::from_millis(100)).await;
848
849 assert_eq!(
850 svc.buckets.lock().unwrap().len(),
851 1,
852 "bucket should still exist — timeout was reset"
853 );
854
855 tokio::time::sleep(Duration::from_millis(100)).await;
856
857 assert_eq!(
858 svc.buckets.lock().unwrap().len(),
859 0,
860 "bucket should be gone after timeout fires"
861 );
862 }
863
864 #[tokio::test]
865 async fn test_composable_size_and_timeout() {
866 let config = AggregatorConfig::correlate_by("key")
867 .complete_on_size_or_timeout(2, Duration::from_millis(200))
868 .build();
869 let mut svc = new_test_svc(config);
870
871 let ex1 = make_exchange("key", "A", "first");
872 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
873 assert!(svc.buckets.lock().unwrap().contains_key("\"A\""));
874
875 let ex2 = make_exchange("key", "A", "second");
876 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
877 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
878 assert_eq!(
879 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
880 Some(&serde_json::json!("size"))
881 );
882 }
883
884 #[tokio::test(start_paused = true)]
885 async fn test_discard_on_timeout() {
886 let config = AggregatorConfig::correlate_by("key")
887 .complete_on_timeout(Duration::from_millis(50))
888 .discard_on_timeout(true)
889 .build();
890 let (tx, mut rx) = mpsc::channel(256);
891 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
892 let cancel = CancellationToken::new();
893 let mut svc = AggregatorService::new(config, tx, registry, cancel);
894
895 let ex = make_exchange("key", "A", "data");
896 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
897
898 tokio::time::sleep(Duration::from_millis(100)).await;
899
900 assert!(
901 rx.try_recv().is_err(),
902 "no emit expected with discard_on_timeout"
903 );
904 assert_eq!(svc.buckets.lock().unwrap().len(), 0);
905 assert!(
906 svc.timeout_tasks.lock().unwrap().is_empty(),
907 "timeout task should be cleaned up"
908 );
909 }
910
911 #[tokio::test]
912 async fn test_force_completion_on_stop() {
913 let config = AggregatorConfig::correlate_by("key")
914 .complete_when_size(10)
915 .force_completion_on_stop(true)
916 .build();
917 let (tx, mut rx) = mpsc::channel(256);
918 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
919 let cancel = CancellationToken::new();
920 let svc = AggregatorService::new(config, tx, registry, cancel);
921
922 let mut call_svc = svc.clone();
923 let ex = make_exchange("key", "A", "data");
924 let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
925
926 svc.force_complete_all();
927
928 let result = rx.try_recv().expect("should emit on force-complete");
929 assert!(
930 result.input.body.as_text().is_some() || matches!(result.input.body, Body::Json(_))
931 );
932 assert_eq!(
933 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
934 Some(&serde_json::json!("stop"))
935 );
936 }
937
938 #[tokio::test]
939 async fn test_completion_reason_property_size() {
940 let config = AggregatorConfig::correlate_by("key")
941 .complete_when_size(1)
942 .build();
943 let mut svc = new_test_svc(config);
944 let ex = make_exchange("key", "X", "body");
945 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
946 assert_eq!(
947 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
948 Some(&serde_json::json!("size"))
949 );
950 }
951
952 #[tokio::test]
953 async fn test_completion_reason_property_predicate() {
954 let config = AggregatorConfig::correlate_by("key")
955 .complete_when(|_| true)
956 .build();
957 let mut svc = new_test_svc(config);
958 let ex = make_exchange("key", "X", "body");
959 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
960 assert_eq!(
961 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
962 Some(&serde_json::json!("predicate"))
963 );
964 }
965
966 #[tokio::test(start_paused = true)]
967 async fn test_size_completes_before_timeout() {
968 let config = AggregatorConfig::correlate_by("key")
969 .complete_on_size_or_timeout(2, Duration::from_millis(200))
970 .build();
971 let mut svc = new_test_svc(config);
972
973 let ex1 = make_exchange("key", "A", "first");
974 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
975
976 let ex2 = make_exchange("key", "A", "second");
977 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
978
979 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
980 assert_eq!(
981 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
982 Some(&serde_json::json!("size"))
983 );
984 assert_eq!(svc.buckets.lock().unwrap().len(), 0);
985
986 tokio::time::sleep(Duration::from_millis(300)).await;
987 assert_eq!(
988 svc.buckets.lock().unwrap().len(),
989 0,
990 "no re-fire after timeout"
991 );
992 }
993
994 #[tokio::test(start_paused = true)]
995 async fn test_concurrent_timeout_fire_and_new_exchange() {
996 let config = AggregatorConfig::correlate_by("key")
997 .complete_on_size_or_timeout(2, Duration::from_millis(100))
998 .build();
999 let (tx, mut rx) = mpsc::channel(256);
1000 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1001 let cancel = CancellationToken::new();
1002 let mut svc = AggregatorService::new(config, tx, registry, cancel);
1003
1004 let ex = make_exchange("key", "A", "data");
1005 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1006
1007 tokio::time::sleep(Duration::from_millis(150)).await;
1009
1010 let ex2 = make_exchange("key", "A", "data2");
1012 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1013 assert!(
1014 result.property(CAMEL_AGGREGATOR_PENDING).is_some(),
1015 "should be pending in new bucket"
1016 );
1017
1018 let mut late_count = 0;
1020 while rx.try_recv().is_ok() {
1021 late_count += 1;
1022 }
1023 assert_eq!(
1024 late_count, 1,
1025 "exactly 1 late emit from the timed-out bucket"
1026 );
1027 }
1028
1029 #[tokio::test(start_paused = true)]
1030 async fn test_late_channel_full_drops_with_warning() {
1031 let config = AggregatorConfig::correlate_by("key")
1032 .complete_on_timeout(Duration::from_millis(50))
1033 .build();
1034 let (tx, mut rx) = mpsc::channel(1);
1035 rx.close();
1036 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1037 let cancel = CancellationToken::new();
1038 let mut svc = AggregatorService::new(config, tx, registry, cancel);
1039
1040 let ex = make_exchange("key", "A", "data");
1041 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1042
1043 tokio::time::sleep(Duration::from_millis(100)).await;
1044 assert_eq!(
1045 svc.buckets.lock().unwrap().len(),
1046 0,
1047 "bucket removed despite channel closed"
1048 );
1049 }
1050
1051 #[tokio::test]
1052 async fn test_aggregate_stream_bodies_creates_valid_json() {
1053 use bytes::Bytes;
1054 use camel_api::{Body, StreamBody, StreamMetadata};
1055 use futures::stream;
1056 use tokio::sync::Mutex;
1057
1058 let chunks = vec![Ok(Bytes::from("test"))];
1059 let stream_body = StreamBody {
1060 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1061 metadata: StreamMetadata {
1062 origin: Some("file:///test.txt".to_string()),
1063 ..Default::default()
1064 },
1065 };
1066
1067 let ex1 = Exchange::new(Message {
1068 headers: Default::default(),
1069 body: Body::Stream(stream_body),
1070 });
1071
1072 let exchanges = vec![ex1];
1073 let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1074
1075 let exchange = result.expect("Expected Ok result");
1076 assert!(
1077 matches!(exchange.input.body, Body::Json(_)),
1078 "Expected Json body"
1079 );
1080
1081 if let Body::Json(value) = exchange.input.body {
1082 let json_str = serde_json::to_string(&value).unwrap();
1083 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1084
1085 assert!(parsed.is_array(), "Result should be an array");
1086 let arr = parsed.as_array().unwrap();
1087 assert!(arr[0].is_object(), "First element should be an object");
1088 assert!(
1089 arr[0]["_stream"].is_object(),
1090 "Should contain _stream object"
1091 );
1092 assert_eq!(arr[0]["_stream"]["origin"], "file:///test.txt");
1093 assert_eq!(
1094 arr[0]["_stream"]["placeholder"], true,
1095 "placeholder flag should be true"
1096 );
1097 }
1098 }
1099
1100 #[tokio::test]
1101 async fn test_aggregate_stream_bodies_with_none_origin() {
1102 use bytes::Bytes;
1103 use camel_api::{Body, StreamBody, StreamMetadata};
1104 use futures::stream;
1105 use tokio::sync::Mutex;
1106
1107 let chunks = vec![Ok(Bytes::from("test"))];
1108 let stream_body = StreamBody {
1109 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1110 metadata: StreamMetadata {
1111 origin: None,
1112 ..Default::default()
1113 },
1114 };
1115
1116 let ex1 = Exchange::new(Message {
1117 headers: Default::default(),
1118 body: Body::Stream(stream_body),
1119 });
1120
1121 let exchanges = vec![ex1];
1122 let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1123
1124 let exchange = result.expect("Expected Ok result");
1125 assert!(
1126 matches!(exchange.input.body, Body::Json(_)),
1127 "Expected Json body"
1128 );
1129
1130 if let Body::Json(value) = exchange.input.body {
1131 let json_str = serde_json::to_string(&value).unwrap();
1132 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1133
1134 assert!(parsed.is_array(), "Result should be an array");
1135 let arr = parsed.as_array().unwrap();
1136 assert!(arr[0].is_object(), "First element should be an object");
1137 assert!(
1138 arr[0]["_stream"].is_object(),
1139 "Should contain _stream object"
1140 );
1141 assert_eq!(
1142 arr[0]["_stream"]["origin"],
1143 serde_json::Value::Null,
1144 "origin should be null when None"
1145 );
1146 assert_eq!(
1147 arr[0]["_stream"]["placeholder"], true,
1148 "placeholder flag should be true"
1149 );
1150 }
1151 }
1152}