1use std::collections::HashMap;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Arc, Mutex};
5use std::task::{Context, Poll};
6use std::time::{Duration, Instant};
7
8use tokio::sync::mpsc;
9use tokio::task::JoinHandle;
10use tokio_util::sync::CancellationToken;
11use tower::Service;
12
13use camel_api::{
14 CamelError,
15 aggregator::{
16 AggregationStrategy, AggregatorConfig, CompletionCondition, CompletionMode,
17 CompletionReason, CorrelationStrategy,
18 },
19 body::Body,
20 exchange::Exchange,
21 message::Message,
22};
23use camel_language_api::Language;
24
25pub type SharedLanguageRegistry = Arc<std::sync::Mutex<HashMap<String, Arc<dyn Language>>>>;
26
27pub const CAMEL_AGGREGATOR_PENDING: &str = "CamelAggregatorPending";
28pub const CAMEL_AGGREGATED_SIZE: &str = "CamelAggregatedSize";
29pub const CAMEL_AGGREGATED_KEY: &str = "CamelAggregatedKey";
30pub const CAMEL_AGGREGATED_COMPLETION_REASON: &str = "CamelAggregatedCompletionReason";
31
32struct Bucket {
34 exchanges: Vec<Exchange>,
35 #[allow(dead_code)]
36 created_at: Instant,
37 last_updated: Instant,
38}
39
40impl Bucket {
41 fn new() -> Self {
42 let now = Instant::now();
43 Self {
44 exchanges: Vec::new(),
45 created_at: now,
46 last_updated: now,
47 }
48 }
49
50 fn push(&mut self, exchange: Exchange) {
51 self.exchanges.push(exchange);
52 self.last_updated = Instant::now();
53 }
54
55 #[allow(dead_code)]
56 fn len(&self) -> usize {
57 self.exchanges.len()
58 }
59
60 fn is_expired(&self, ttl: Duration) -> bool {
61 Instant::now().duration_since(self.last_updated) >= ttl
62 }
63}
64
65#[derive(Clone)]
66pub struct AggregatorService {
67 config: AggregatorConfig,
68 buckets: Arc<Mutex<HashMap<String, Bucket>>>,
69 timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
70 timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
71 late_tx: mpsc::Sender<Exchange>,
72 language_registry: SharedLanguageRegistry,
73 route_cancel: CancellationToken,
74}
75
76impl AggregatorService {
77 pub fn new(
78 config: AggregatorConfig,
79 late_tx: mpsc::Sender<Exchange>,
80 language_registry: SharedLanguageRegistry,
81 route_cancel: CancellationToken,
82 ) -> Self {
83 Self {
84 config,
85 buckets: Arc::new(Mutex::new(HashMap::new())),
86 timeout_tasks: Arc::new(Mutex::new(HashMap::new())),
87 timeout_handles: Arc::new(Mutex::new(HashMap::new())),
88 late_tx,
89 language_registry,
90 route_cancel,
91 }
92 }
93
94 pub fn has_timeout(&self) -> bool {
95 has_timeout_condition(&self.config.completion)
96 }
97
98 pub fn force_complete_all(&self) {
99 let mut buckets_guard = self.buckets.lock().unwrap_or_else(|e| e.into_inner());
100 let keys: Vec<String> = buckets_guard.keys().cloned().collect();
101
102 for key in keys {
103 if let Some(bucket) = buckets_guard.remove(&key) {
104 if self.config.force_completion_on_stop {
105 cancel_timeout_task_with_handle(
106 &key,
107 &self.timeout_tasks,
108 &self.timeout_handles,
109 );
110 match aggregate(bucket.exchanges, &self.config.strategy) {
111 Ok(mut result) => {
112 result.set_property(
113 CAMEL_AGGREGATED_COMPLETION_REASON,
114 serde_json::json!(CompletionReason::Stop.as_str()),
115 );
116 if self.late_tx.try_send(result).is_err() {
117 tracing::warn!(
118 key = %key,
119 "aggregator force-complete emit dropped: late channel full"
120 );
121 }
122 }
123 Err(e) => {
124 tracing::error!(
125 key = %key,
126 error = %e,
127 "aggregation failed in force_complete_all"
128 );
129 }
130 }
131 } else {
132 cancel_timeout_task_with_handle(
133 &key,
134 &self.timeout_tasks,
135 &self.timeout_handles,
136 );
137 }
138 }
139 }
140 }
141
142 pub async fn shutdown(&self) {
145 {
147 let mut guard = self.timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
148 for token in guard.values() {
149 token.cancel();
150 }
151 guard.clear();
152 };
153
154 let handles: Vec<JoinHandle<()>> = {
156 let mut guard = self
157 .timeout_handles
158 .lock()
159 .unwrap_or_else(|e| e.into_inner());
160 guard.drain().map(|(_, handle)| handle).collect()
161 };
162
163 if handles.is_empty() {
164 return;
165 }
166
167 let _ = tokio::time::timeout(Duration::from_secs(5), async {
169 for handle in handles {
170 let _ = handle.await;
171 }
172 })
173 .await;
174 }
175}
176
177pub fn has_timeout_condition(mode: &CompletionMode) -> bool {
178 match mode {
179 CompletionMode::Single(CompletionCondition::Timeout(_)) => true,
180 CompletionMode::Any(conditions) => conditions
181 .iter()
182 .any(|c| matches!(c, CompletionCondition::Timeout(_))),
183 _ => false,
184 }
185}
186
187impl Service<Exchange> for AggregatorService {
188 type Response = Exchange;
189 type Error = CamelError;
190 type Future = Pin<Box<dyn Future<Output = Result<Exchange, CamelError>> + Send>>;
191
192 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), CamelError>> {
193 Poll::Ready(Ok(()))
194 }
195
196 fn call(&mut self, exchange: Exchange) -> Self::Future {
197 let config = self.config.clone();
198 let buckets = Arc::clone(&self.buckets);
199 let timeout_tasks = Arc::clone(&self.timeout_tasks);
200 let timeout_handles = Arc::clone(&self.timeout_handles);
201 let late_tx = self.late_tx.clone();
202 let language_registry = Arc::clone(&self.language_registry);
203 let route_cancel = self.route_cancel.clone();
204
205 Box::pin(async move {
206 let key_value =
207 extract_correlation_key(&exchange, &config.correlation, &language_registry).await?;
208
209 let key_str = serde_json::to_string(&key_value)
210 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
211
212 let completed_bucket = {
213 let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
214
215 if let Some(ttl) = config.bucket_ttl {
216 guard.retain(|_, bucket| !bucket.is_expired(ttl));
217 }
218
219 if let Some(max) = config.max_buckets
220 && !guard.contains_key(&key_str)
221 && guard.len() >= max
222 {
223 tracing::warn!(
224 max_buckets = max,
225 correlation_key = %key_str,
226 "Aggregator reached max buckets limit, rejecting new correlation key"
227 );
228 return Err(CamelError::ProcessorError(format!(
229 "Aggregator reached maximum {} buckets",
230 max
231 )));
232 }
233
234 let bucket = guard.entry(key_str.clone()).or_insert_with(Bucket::new);
235 bucket.push(exchange);
236
237 let (is_complete, reason) =
238 check_sync_completion(&config.completion, &bucket.exchanges);
239
240 if is_complete {
241 let exchanges = guard.remove(&key_str).map(|b| b.exchanges);
242 (exchanges, reason)
243 } else {
244 (None, CompletionReason::Size) }
246 };
247
248 if completed_bucket.0.is_none() && has_timeout_condition(&config.completion) {
249 let cancel = {
250 let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
251 if let Some(existing) = tt_guard.get(&key_str) {
253 existing.cancel();
254 }
255 let token = CancellationToken::new();
256 tt_guard.insert(key_str.clone(), token.clone());
257 token
258 };
259
260 let timeout_dur = extract_timeout_duration(&config.completion);
261 if let Some(timeout) = timeout_dur {
262 {
264 let mut hh = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
265 if let Some(old) = hh.remove(&key_str) {
266 old.abort();
267 }
268 }
269 let handle = spawn_timeout_task(
270 key_str.clone(),
271 timeout,
272 cancel,
273 buckets.clone(),
274 timeout_tasks.clone(),
275 timeout_handles.clone(),
276 late_tx,
277 config.strategy.clone(),
278 config.discard_on_timeout,
279 route_cancel,
280 );
281 timeout_handles
282 .lock()
283 .unwrap_or_else(|e| e.into_inner())
284 .insert(key_str.clone(), handle);
285 }
286 }
287
288 if let Some(exchanges) = completed_bucket.0 {
289 cancel_timeout_task_with_handle(&key_str, &timeout_tasks, &timeout_handles);
290 let reason = completed_bucket.1;
291 let size = exchanges.len();
292 let mut result = aggregate(exchanges, &config.strategy)?;
293 result.set_property(CAMEL_AGGREGATED_SIZE, serde_json::json!(size as u64));
294 result.set_property(CAMEL_AGGREGATED_KEY, key_value);
295 result.set_property(
296 CAMEL_AGGREGATED_COMPLETION_REASON,
297 serde_json::json!(reason.as_str()),
298 );
299 Ok(result)
300 } else {
301 let mut pending = Exchange::new(Message {
302 headers: Default::default(),
303 body: Body::Empty,
304 });
305 pending.set_property(CAMEL_AGGREGATOR_PENDING, serde_json::json!(true));
306 Ok(pending)
307 }
308 })
309 }
310}
311
312async fn extract_correlation_key(
313 exchange: &Exchange,
314 strategy: &CorrelationStrategy,
315 registry: &SharedLanguageRegistry,
316) -> Result<serde_json::Value, CamelError> {
317 match strategy {
318 CorrelationStrategy::HeaderName(h) => {
319 exchange.input.headers.get(h).cloned().ok_or_else(|| {
320 CamelError::ProcessorError(format!(
321 "Aggregator: missing correlation key header '{}'",
322 h
323 ))
324 })
325 }
326 CorrelationStrategy::Expression { expr, language } => {
327 let expression = {
328 let reg = registry.lock().unwrap_or_else(|e| e.into_inner());
329 let lang = reg.get(language).ok_or_else(|| {
330 CamelError::ProcessorError(format!(
331 "Aggregator: language '{}' not found in registry",
332 language
333 ))
334 })?;
335 lang.create_expression(expr)
336 .map_err(|e| CamelError::ProcessorError(e.to_string()))?
337 };
338 let value = expression
339 .evaluate(exchange)
340 .await
341 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
342 if value.is_null() {
343 return Err(CamelError::ProcessorError(format!(
344 "Aggregator: correlation expression '{}' evaluated to null",
345 expr
346 )));
347 }
348 Ok(value)
349 }
350 CorrelationStrategy::Fn(f) => f(exchange).map(serde_json::Value::String).ok_or_else(|| {
351 CamelError::ProcessorError("Aggregator: correlation function returned None".to_string())
352 }),
353 }
354}
355
356fn check_sync_completion(
357 mode: &CompletionMode,
358 exchanges: &[Exchange],
359) -> (bool, CompletionReason) {
360 match mode {
361 CompletionMode::Single(cond) => check_single(cond, exchanges),
362 CompletionMode::Any(conditions) => {
363 for cond in conditions {
364 if let CompletionCondition::Timeout(_) = cond {
365 continue;
366 }
367 let (done, reason) = check_single(cond, exchanges);
368 if done {
369 return (true, reason);
370 }
371 }
372 (false, CompletionReason::Size)
373 }
374 }
375}
376
377fn check_single(cond: &CompletionCondition, exchanges: &[Exchange]) -> (bool, CompletionReason) {
378 match cond {
379 CompletionCondition::Size(n) => (exchanges.len() >= *n, CompletionReason::Size),
380 CompletionCondition::Predicate(pred) => (pred(exchanges), CompletionReason::Predicate),
381 CompletionCondition::Timeout(_) => (false, CompletionReason::Timeout),
382 }
383}
384
385fn extract_timeout_duration(mode: &CompletionMode) -> Option<Duration> {
386 match mode {
387 CompletionMode::Single(CompletionCondition::Timeout(d)) => Some(*d),
388 CompletionMode::Any(conditions) => conditions.iter().find_map(|c| {
389 if let CompletionCondition::Timeout(d) = c {
390 Some(*d)
391 } else {
392 None
393 }
394 }),
395 _ => None,
396 }
397}
398
399fn cancel_timeout_task(key: &str, timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>) {
400 let mut guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
401 if let Some(token) = guard.remove(key) {
402 token.cancel();
403 }
404}
405
406fn cancel_timeout_task_with_handle(
408 key: &str,
409 timeout_tasks: &Arc<Mutex<HashMap<String, CancellationToken>>>,
410 timeout_handles: &Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
411) {
412 cancel_timeout_task(key, timeout_tasks);
413 let mut guard = timeout_handles.lock().unwrap_or_else(|e| e.into_inner());
414 guard.remove(key);
415}
416
417#[allow(clippy::too_many_arguments)]
418fn spawn_timeout_task(
419 key: String,
420 timeout: Duration,
421 cancel: CancellationToken,
422 buckets: Arc<Mutex<HashMap<String, Bucket>>>,
423 timeout_tasks: Arc<Mutex<HashMap<String, CancellationToken>>>,
424 _timeout_handles: Arc<Mutex<HashMap<String, JoinHandle<()>>>>,
425 late_tx: mpsc::Sender<Exchange>,
426 strategy: AggregationStrategy,
427 discard: bool,
428 _route_cancel: CancellationToken,
429) -> JoinHandle<()> {
430 let cancel_clone = cancel.clone();
431 tokio::spawn(async move {
432 tokio::select! {
433 _ = tokio::time::sleep(timeout) => {
434 let should_proceed = {
435 let mut tt_guard = timeout_tasks.lock().unwrap_or_else(|e| e.into_inner());
436 if cancel_clone.is_cancelled() {
437 false
438 } else {
439 tt_guard.remove(&key);
440 true
441 }
442 };
443 if !should_proceed {
444 return;
445 }
446 let bucket_exchanges = {
447 let mut guard = buckets.lock().unwrap_or_else(|e| e.into_inner());
448 guard.remove(&key).map(|b| b.exchanges)
449 };
450 if let Some(exchanges) = bucket_exchanges
451 && !discard
452 {
453 match aggregate(exchanges, &strategy) {
454 Ok(mut result) => {
455 result.set_property(
456 CAMEL_AGGREGATED_COMPLETION_REASON,
457 serde_json::json!(CompletionReason::Timeout.as_str()),
458 );
459 if late_tx.try_send(result).is_err() {
460 tracing::warn!(
461 key = %key,
462 "aggregator timeout emit dropped: late channel full"
463 );
464 }
465 }
466 Err(e) => {
467 tracing::error!(
468 key = %key,
469 error = %e,
470 "aggregation failed in timeout task"
471 );
472 }
473 }
474 }
475 }
476 _ = cancel_clone.cancelled() => {}
477 }
478 })
479}
480
481fn aggregate(
482 exchanges: Vec<Exchange>,
483 strategy: &AggregationStrategy,
484) -> Result<Exchange, CamelError> {
485 match strategy {
486 AggregationStrategy::CollectAll => {
487 let bodies: Vec<serde_json::Value> = exchanges
488 .into_iter()
489 .map(|e| match e.input.body {
490 Body::Json(v) => v,
491 Body::Text(s) => serde_json::Value::String(s),
492 Body::Xml(s) => serde_json::Value::String(s),
493 Body::Bytes(b) => {
494 serde_json::Value::String(String::from_utf8_lossy(&b).into_owned())
495 }
496 Body::Empty => serde_json::Value::Null,
497 Body::Stream(s) => serde_json::json!({
498 "_stream": {
499 "origin": s.metadata.origin,
500 "placeholder": true,
501 "hint": "Materialize exchange body with .into_bytes() before aggregation if content needed"
502 }
503 }),
504 })
505 .collect();
506 Ok(Exchange::new(Message {
507 headers: Default::default(),
508 body: Body::Json(serde_json::Value::Array(bodies)),
509 }))
510 }
511 AggregationStrategy::Custom(f) => {
512 let mut iter = exchanges.into_iter();
513 let first = iter.next().ok_or_else(|| {
514 CamelError::ProcessorError("Aggregator: empty bucket".to_string())
515 })?;
516 Ok(iter.fold(first, |acc, next| f(acc, next)))
517 }
518 }
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use std::collections::HashMap;
525
526 use camel_api::{
527 aggregator::{AggregationStrategy, AggregatorConfig},
528 body::Body,
529 exchange::Exchange,
530 message::Message,
531 };
532 use tokio::sync::mpsc;
533 use tokio_util::sync::CancellationToken;
534 use tower::ServiceExt;
535
536 fn make_exchange(header: &str, value: &str, body: &str) -> Exchange {
537 let mut msg = Message {
538 headers: Default::default(),
539 body: Body::Text(body.to_string()),
540 };
541 msg.headers
542 .insert(header.to_string(), serde_json::json!(value));
543 Exchange::new(msg)
544 }
545
546 fn config_size(n: usize) -> AggregatorConfig {
547 AggregatorConfig::correlate_by("orderId")
548 .complete_when_size(n)
549 .build()
550 .unwrap()
551 }
552
553 fn new_test_svc(config: AggregatorConfig) -> AggregatorService {
554 let (tx, _rx) = mpsc::channel(256);
555 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
556 let cancel = CancellationToken::new();
557 AggregatorService::new(config, tx, registry, cancel)
558 }
559
560 #[tokio::test]
561 async fn test_pending_exchange_not_yet_complete() {
562 let mut svc = new_test_svc(config_size(3));
563 let ex = make_exchange("orderId", "A", "first");
564 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
565 assert!(matches!(result.input.body, Body::Empty));
566 assert_eq!(
567 result.property(CAMEL_AGGREGATOR_PENDING),
568 Some(&serde_json::json!(true))
569 );
570 }
571
572 #[tokio::test]
573 async fn test_completes_on_size() {
574 let mut svc = new_test_svc(config_size(3));
575 for _ in 0..2 {
576 let ex = make_exchange("orderId", "A", "item");
577 let r = svc.ready().await.unwrap().call(ex).await.unwrap();
578 assert!(matches!(r.input.body, Body::Empty));
579 }
580 let ex = make_exchange("orderId", "A", "last");
581 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
582 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
583 assert_eq!(
584 result.property(CAMEL_AGGREGATED_SIZE),
585 Some(&serde_json::json!(3u64))
586 );
587 }
588
589 #[tokio::test]
590 async fn test_collect_all_produces_json_array() {
591 let mut svc = new_test_svc(config_size(2));
592 svc.ready()
593 .await
594 .unwrap()
595 .call(make_exchange("orderId", "A", "alpha"))
596 .await
597 .unwrap();
598 let result = svc
599 .ready()
600 .await
601 .unwrap()
602 .call(make_exchange("orderId", "A", "beta"))
603 .await
604 .unwrap();
605 let Body::Json(v) = &result.input.body else {
606 panic!("expected Body::Json")
607 };
608 let arr = v.as_array().unwrap();
609 assert_eq!(arr.len(), 2);
610 assert_eq!(arr[0], serde_json::json!("alpha"));
611 assert_eq!(arr[1], serde_json::json!("beta"));
612 }
613
614 #[tokio::test]
615 async fn test_two_keys_independent_buckets() {
616 let mut svc = new_test_svc(config_size(3));
618 svc.ready()
619 .await
620 .unwrap()
621 .call(make_exchange("orderId", "A", "a1"))
622 .await
623 .unwrap();
624 svc.ready()
625 .await
626 .unwrap()
627 .call(make_exchange("orderId", "B", "b1"))
628 .await
629 .unwrap();
630 svc.ready()
631 .await
632 .unwrap()
633 .call(make_exchange("orderId", "A", "a2"))
634 .await
635 .unwrap();
636 let ra = svc
638 .ready()
639 .await
640 .unwrap()
641 .call(make_exchange("orderId", "A", "a3"))
642 .await
643 .unwrap();
644 assert!(matches!(ra.input.body, Body::Json(_)));
646 let rb = svc
648 .ready()
649 .await
650 .unwrap()
651 .call(make_exchange("orderId", "B", "b_check"))
652 .await
653 .unwrap();
654 assert!(matches!(rb.input.body, Body::Empty));
655 }
656
657 #[tokio::test]
658 async fn test_bucket_resets_after_completion() {
659 let mut svc = new_test_svc(config_size(2));
660 svc.ready()
661 .await
662 .unwrap()
663 .call(make_exchange("orderId", "A", "x"))
664 .await
665 .unwrap();
666 svc.ready()
667 .await
668 .unwrap()
669 .call(make_exchange("orderId", "A", "x"))
670 .await
671 .unwrap(); let r = svc
674 .ready()
675 .await
676 .unwrap()
677 .call(make_exchange("orderId", "A", "new"))
678 .await
679 .unwrap();
680 assert!(matches!(r.input.body, Body::Empty)); }
682
683 #[tokio::test]
684 async fn test_completion_size_1_emits_immediately() {
685 let mut svc = new_test_svc(config_size(1));
686 let ex = make_exchange("orderId", "A", "solo");
687 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
688 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
689 }
690
691 #[tokio::test]
692 async fn test_custom_aggregation_strategy() {
693 use camel_api::aggregator::AggregationFn;
694 use std::sync::Arc;
695
696 let f: AggregationFn = Arc::new(|mut acc: Exchange, next: Exchange| {
697 let combined = format!(
698 "{}+{}",
699 acc.input.body.as_text().unwrap_or(""),
700 next.input.body.as_text().unwrap_or("")
701 );
702 acc.input.body = Body::Text(combined);
703 acc
704 });
705 let config = AggregatorConfig::correlate_by("key")
706 .complete_when_size(2)
707 .strategy(AggregationStrategy::Custom(f))
708 .build()
709 .unwrap();
710 let mut svc = new_test_svc(config);
711 svc.ready()
712 .await
713 .unwrap()
714 .call(make_exchange("key", "X", "hello"))
715 .await
716 .unwrap();
717 let result = svc
718 .ready()
719 .await
720 .unwrap()
721 .call(make_exchange("key", "X", "world"))
722 .await
723 .unwrap();
724 assert_eq!(result.input.body.as_text(), Some("hello+world"));
725 }
726
727 #[tokio::test]
728 async fn test_completion_predicate() {
729 let config = AggregatorConfig::correlate_by("key")
730 .complete_when(|bucket| {
731 bucket
732 .iter()
733 .any(|e| e.input.body.as_text() == Some("DONE"))
734 })
735 .build()
736 .unwrap();
737 let mut svc = new_test_svc(config);
738 svc.ready()
739 .await
740 .unwrap()
741 .call(make_exchange("key", "K", "first"))
742 .await
743 .unwrap();
744 svc.ready()
745 .await
746 .unwrap()
747 .call(make_exchange("key", "K", "second"))
748 .await
749 .unwrap();
750 let result = svc
751 .ready()
752 .await
753 .unwrap()
754 .call(make_exchange("key", "K", "DONE"))
755 .await
756 .unwrap();
757 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
758 }
759
760 #[tokio::test]
761 async fn test_missing_header_returns_error() {
762 let mut svc = new_test_svc(config_size(2));
763 let msg = Message {
764 headers: Default::default(),
765 body: Body::Text("no key".into()),
766 };
767 let ex = Exchange::new(msg);
768 let result = svc.ready().await.unwrap().call(ex).await;
769 assert!(result.is_err());
770 assert!(matches!(
771 result.unwrap_err(),
772 camel_api::CamelError::ProcessorError(_)
773 ));
774 }
775
776 #[tokio::test]
777 async fn test_cloned_service_shares_state() {
778 let svc1 = new_test_svc(config_size(2));
779 let mut svc2 = svc1.clone();
780 svc1.clone()
782 .ready()
783 .await
784 .unwrap()
785 .call(make_exchange("orderId", "A", "from-svc1"))
786 .await
787 .unwrap();
788 let result = svc2
790 .ready()
791 .await
792 .unwrap()
793 .call(make_exchange("orderId", "A", "from-svc2"))
794 .await
795 .unwrap();
796 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
797 }
798
799 #[tokio::test]
800 async fn test_camel_aggregated_key_property_set() {
801 let mut svc = new_test_svc(config_size(1));
802 let ex = make_exchange("orderId", "ORDER-42", "body");
803 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
804 assert_eq!(
805 result.property(CAMEL_AGGREGATED_KEY),
806 Some(&serde_json::json!("ORDER-42"))
807 );
808 }
809
810 #[tokio::test]
811 async fn test_aggregator_enforces_max_buckets() {
812 let config = AggregatorConfig::correlate_by("orderId")
813 .complete_when_size(2)
814 .max_buckets(3)
815 .build()
816 .unwrap();
817
818 let mut svc = new_test_svc(config);
819
820 for i in 0..3 {
822 let ex = make_exchange("orderId", &format!("key-{}", i), "body");
823 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
824 }
825
826 let ex = make_exchange("orderId", "key-4", "body");
828 let result = svc.ready().await.unwrap().call(ex).await;
829
830 assert!(result.is_err(), "Should reject when max buckets reached");
831 let err = result.unwrap_err().to_string();
832 assert!(
833 err.contains("maximum"),
834 "Error message should contain 'maximum': {}",
835 err
836 );
837 }
838
839 #[tokio::test]
840 async fn test_max_buckets_allows_existing_key() {
841 let config = AggregatorConfig::correlate_by("orderId")
842 .complete_when_size(5) .max_buckets(2)
844 .build()
845 .unwrap();
846
847 let mut svc = new_test_svc(config);
848
849 let ex1 = make_exchange("orderId", "key-A", "body1");
851 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
852 let ex2 = make_exchange("orderId", "key-B", "body2");
853 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
854
855 let ex3 = make_exchange("orderId", "key-A", "body3");
857 let result = svc.ready().await.unwrap().call(ex3).await;
858 assert!(
859 result.is_ok(),
860 "Should allow adding to existing bucket even at max limit"
861 );
862 }
863
864 #[tokio::test]
865 async fn test_bucket_ttl_eviction() {
866 let config = AggregatorConfig::correlate_by("orderId")
867 .complete_when_size(10) .bucket_ttl(Duration::from_millis(50))
869 .build()
870 .unwrap();
871
872 let mut svc = new_test_svc(config);
873
874 let ex1 = make_exchange("orderId", "key-A", "body1");
876 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
877
878 tokio::time::sleep(Duration::from_millis(100)).await;
880
881 let ex2 = make_exchange("orderId", "key-B", "body2");
883 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
884
885 let ex3 = make_exchange("orderId", "key-A", "body3");
888 let result = svc.ready().await.unwrap().call(ex3).await;
889 assert!(result.is_ok(), "Should be able to recreate evicted bucket");
890 }
891
892 #[tokio::test(start_paused = true)]
893 async fn test_timeout_completes_bucket() {
894 let config = AggregatorConfig::correlate_by("key")
895 .complete_on_timeout(Duration::from_millis(100))
896 .build()
897 .unwrap();
898 let mut svc = new_test_svc(config);
899 let ex = make_exchange("key", "A", "data");
900 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
901 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_some());
902
903 tokio::time::sleep(Duration::from_millis(200)).await;
904
905 assert_eq!(
906 svc.buckets.lock().unwrap().len(),
907 0,
908 "bucket should be removed after timeout"
909 );
910 }
911
912 #[tokio::test(start_paused = true)]
913 async fn test_timeout_resets_on_new_exchange() {
914 let config = AggregatorConfig::correlate_by("key")
915 .complete_on_timeout(Duration::from_millis(150))
916 .build()
917 .unwrap();
918 let mut svc = new_test_svc(config);
919
920 let ex1 = make_exchange("key", "A", "first");
921 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
922
923 tokio::time::sleep(Duration::from_millis(100)).await;
924
925 let ex2 = make_exchange("key", "A", "second");
926 let _ = svc.ready().await.unwrap().call(ex2).await.unwrap();
927
928 tokio::time::sleep(Duration::from_millis(100)).await;
929
930 assert_eq!(
931 svc.buckets.lock().unwrap().len(),
932 1,
933 "bucket should still exist — timeout was reset"
934 );
935
936 tokio::time::sleep(Duration::from_millis(100)).await;
937
938 assert_eq!(
939 svc.buckets.lock().unwrap().len(),
940 0,
941 "bucket should be gone after timeout fires"
942 );
943 }
944
945 #[tokio::test]
946 async fn test_composable_size_and_timeout() {
947 let config = AggregatorConfig::correlate_by("key")
948 .complete_on_size_or_timeout(2, Duration::from_millis(200))
949 .build()
950 .unwrap();
951 let mut svc = new_test_svc(config);
952
953 let ex1 = make_exchange("key", "A", "first");
954 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
955 assert!(svc.buckets.lock().unwrap().contains_key("\"A\""));
956
957 let ex2 = make_exchange("key", "A", "second");
958 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
959 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
960 assert_eq!(
961 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
962 Some(&serde_json::json!("size"))
963 );
964 }
965
966 #[tokio::test(start_paused = true)]
967 async fn test_discard_on_timeout() {
968 let config = AggregatorConfig::correlate_by("key")
969 .complete_on_timeout(Duration::from_millis(50))
970 .discard_on_timeout(true)
971 .build()
972 .unwrap();
973 let (tx, mut rx) = mpsc::channel(256);
974 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
975 let cancel = CancellationToken::new();
976 let mut svc = AggregatorService::new(config, tx, registry, cancel);
977
978 let ex = make_exchange("key", "A", "data");
979 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
980
981 tokio::time::sleep(Duration::from_millis(100)).await;
982
983 assert!(
984 rx.try_recv().is_err(),
985 "no emit expected with discard_on_timeout"
986 );
987 assert_eq!(svc.buckets.lock().unwrap().len(), 0);
988 assert!(
989 svc.timeout_tasks.lock().unwrap().is_empty(),
990 "timeout task should be cleaned up"
991 );
992 }
993
994 #[tokio::test]
995 async fn test_force_completion_on_stop() {
996 let config = AggregatorConfig::correlate_by("key")
997 .complete_when_size(10)
998 .force_completion_on_stop(true)
999 .build()
1000 .unwrap();
1001 let (tx, mut rx) = mpsc::channel(256);
1002 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1003 let cancel = CancellationToken::new();
1004 let svc = AggregatorService::new(config, tx, registry, cancel);
1005
1006 let mut call_svc = svc.clone();
1007 let ex = make_exchange("key", "A", "data");
1008 let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1009
1010 svc.force_complete_all();
1011
1012 let result = rx.try_recv().expect("should emit on force-complete");
1013 assert!(
1014 result.input.body.as_text().is_some() || matches!(result.input.body, Body::Json(_))
1015 );
1016 assert_eq!(
1017 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1018 Some(&serde_json::json!("stop"))
1019 );
1020 }
1021
1022 #[tokio::test]
1023 async fn test_completion_reason_property_size() {
1024 let config = AggregatorConfig::correlate_by("key")
1025 .complete_when_size(1)
1026 .build()
1027 .unwrap();
1028 let mut svc = new_test_svc(config);
1029 let ex = make_exchange("key", "X", "body");
1030 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1031 assert_eq!(
1032 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1033 Some(&serde_json::json!("size"))
1034 );
1035 }
1036
1037 #[tokio::test]
1038 async fn test_completion_reason_property_predicate() {
1039 let config = AggregatorConfig::correlate_by("key")
1040 .complete_when(|_| true)
1041 .build()
1042 .unwrap();
1043 let mut svc = new_test_svc(config);
1044 let ex = make_exchange("key", "X", "body");
1045 let result = svc.ready().await.unwrap().call(ex).await.unwrap();
1046 assert_eq!(
1047 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1048 Some(&serde_json::json!("predicate"))
1049 );
1050 }
1051
1052 #[tokio::test(start_paused = true)]
1053 async fn test_size_completes_before_timeout() {
1054 let config = AggregatorConfig::correlate_by("key")
1055 .complete_on_size_or_timeout(2, Duration::from_millis(200))
1056 .build()
1057 .unwrap();
1058 let mut svc = new_test_svc(config);
1059
1060 let ex1 = make_exchange("key", "A", "first");
1061 let _ = svc.ready().await.unwrap().call(ex1).await.unwrap();
1062
1063 let ex2 = make_exchange("key", "A", "second");
1064 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1065
1066 assert!(result.property(CAMEL_AGGREGATOR_PENDING).is_none());
1067 assert_eq!(
1068 result.property(CAMEL_AGGREGATED_COMPLETION_REASON),
1069 Some(&serde_json::json!("size"))
1070 );
1071 assert_eq!(svc.buckets.lock().unwrap().len(), 0);
1072
1073 tokio::time::sleep(Duration::from_millis(300)).await;
1074 assert_eq!(
1075 svc.buckets.lock().unwrap().len(),
1076 0,
1077 "no re-fire after timeout"
1078 );
1079 }
1080
1081 #[tokio::test(start_paused = true)]
1082 async fn test_concurrent_timeout_fire_and_new_exchange() {
1083 let config = AggregatorConfig::correlate_by("key")
1084 .complete_on_size_or_timeout(2, Duration::from_millis(100))
1085 .build()
1086 .unwrap();
1087 let (tx, mut rx) = mpsc::channel(256);
1088 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1089 let cancel = CancellationToken::new();
1090 let mut svc = AggregatorService::new(config, tx, registry, cancel);
1091
1092 let ex = make_exchange("key", "A", "data");
1093 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1094
1095 tokio::time::sleep(Duration::from_millis(150)).await;
1097
1098 let ex2 = make_exchange("key", "A", "data2");
1100 let result = svc.ready().await.unwrap().call(ex2).await.unwrap();
1101 assert!(
1102 result.property(CAMEL_AGGREGATOR_PENDING).is_some(),
1103 "should be pending in new bucket"
1104 );
1105
1106 let mut late_count = 0;
1108 while rx.try_recv().is_ok() {
1109 late_count += 1;
1110 }
1111 assert_eq!(
1112 late_count, 1,
1113 "exactly 1 late emit from the timed-out bucket"
1114 );
1115 }
1116
1117 #[tokio::test(start_paused = true)]
1118 async fn test_late_channel_full_drops_with_warning() {
1119 let config = AggregatorConfig::correlate_by("key")
1120 .complete_on_timeout(Duration::from_millis(50))
1121 .build()
1122 .unwrap();
1123 let (tx, mut rx) = mpsc::channel(1);
1124 rx.close();
1125 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1126 let cancel = CancellationToken::new();
1127 let mut svc = AggregatorService::new(config, tx, registry, cancel);
1128
1129 let ex = make_exchange("key", "A", "data");
1130 let _ = svc.ready().await.unwrap().call(ex).await.unwrap();
1131
1132 tokio::time::sleep(Duration::from_millis(100)).await;
1133 assert_eq!(
1134 svc.buckets.lock().unwrap().len(),
1135 0,
1136 "bucket removed despite channel closed"
1137 );
1138 }
1139
1140 #[tokio::test]
1141 async fn test_aggregate_stream_bodies_creates_valid_json() {
1142 use bytes::Bytes;
1143 use camel_api::{Body, StreamBody, StreamMetadata};
1144 use futures::stream;
1145 use tokio::sync::Mutex;
1146
1147 let chunks = vec![Ok(Bytes::from("test"))];
1148 let stream_body = StreamBody {
1149 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1150 metadata: StreamMetadata {
1151 origin: Some("file:///test.txt".to_string()),
1152 ..Default::default()
1153 },
1154 };
1155
1156 let ex1 = Exchange::new(Message {
1157 headers: Default::default(),
1158 body: Body::Stream(stream_body),
1159 });
1160
1161 let exchanges = vec![ex1];
1162 let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1163
1164 let exchange = result.expect("Expected Ok result");
1165 assert!(
1166 matches!(exchange.input.body, Body::Json(_)),
1167 "Expected Json body"
1168 );
1169
1170 if let Body::Json(value) = exchange.input.body {
1171 let json_str = serde_json::to_string(&value).unwrap();
1172 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1173
1174 assert!(parsed.is_array(), "Result should be an array");
1175 let arr = parsed.as_array().unwrap();
1176 assert!(arr[0].is_object(), "First element should be an object");
1177 assert!(
1178 arr[0]["_stream"].is_object(),
1179 "Should contain _stream object"
1180 );
1181 assert_eq!(arr[0]["_stream"]["origin"], "file:///test.txt");
1182 assert_eq!(
1183 arr[0]["_stream"]["placeholder"], true,
1184 "placeholder flag should be true"
1185 );
1186 }
1187 }
1188
1189 #[tokio::test]
1190 async fn test_aggregate_stream_bodies_with_none_origin() {
1191 use bytes::Bytes;
1192 use camel_api::{Body, StreamBody, StreamMetadata};
1193 use futures::stream;
1194 use tokio::sync::Mutex;
1195
1196 let chunks = vec![Ok(Bytes::from("test"))];
1197 let stream_body = StreamBody {
1198 stream: Arc::new(Mutex::new(Some(Box::pin(stream::iter(chunks))))),
1199 metadata: StreamMetadata {
1200 origin: None,
1201 ..Default::default()
1202 },
1203 };
1204
1205 let ex1 = Exchange::new(Message {
1206 headers: Default::default(),
1207 body: Body::Stream(stream_body),
1208 });
1209
1210 let exchanges = vec![ex1];
1211 let result = aggregate(exchanges, &AggregationStrategy::CollectAll);
1212
1213 let exchange = result.expect("Expected Ok result");
1214 assert!(
1215 matches!(exchange.input.body, Body::Json(_)),
1216 "Expected Json body"
1217 );
1218
1219 if let Body::Json(value) = exchange.input.body {
1220 let json_str = serde_json::to_string(&value).unwrap();
1221 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
1222
1223 assert!(parsed.is_array(), "Result should be an array");
1224 let arr = parsed.as_array().unwrap();
1225 assert!(arr[0].is_object(), "First element should be an object");
1226 assert!(
1227 arr[0]["_stream"].is_object(),
1228 "Should contain _stream object"
1229 );
1230 assert_eq!(
1231 arr[0]["_stream"]["origin"],
1232 serde_json::Value::Null,
1233 "origin should be null when None"
1234 );
1235 assert_eq!(
1236 arr[0]["_stream"]["placeholder"], true,
1237 "placeholder flag should be true"
1238 );
1239 }
1240 }
1241
1242 #[tokio::test(start_paused = true)]
1243 async fn test_shutdown_awaits_timeout_handles() {
1244 let config = AggregatorConfig::correlate_by("key")
1245 .complete_on_timeout(Duration::from_millis(100))
1246 .build()
1247 .unwrap();
1248 let (tx, _rx) = mpsc::channel(256);
1249 let registry: SharedLanguageRegistry = Arc::new(std::sync::Mutex::new(HashMap::new()));
1250 let cancel = CancellationToken::new();
1251 let svc = AggregatorService::new(config, tx, registry, cancel);
1252
1253 let mut call_svc = svc.clone();
1255 let ex = make_exchange("key", "A", "data");
1256 let _ = call_svc.ready().await.unwrap().call(ex).await.unwrap();
1257
1258 assert!(
1260 !svc.timeout_handles.lock().unwrap().is_empty(),
1261 "should have a timeout handle"
1262 );
1263
1264 svc.shutdown().await;
1267
1268 assert!(
1269 svc.timeout_handles.lock().unwrap().is_empty(),
1270 "all handles should be cleaned up after shutdown"
1271 );
1272 }
1273}