1use std::collections::{BTreeMap, HashMap};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex, Weak};
7use std::time::Duration;
8
9use async_trait::async_trait;
10use camel_api::exchange::Exchange;
11use camel_api::resequencer::{CapacityPolicy, GapPolicy};
12use camel_language_api::Expression;
13use tokio::sync::mpsc;
14use tokio::task::JoinHandle;
15use tokio_util::sync::CancellationToken;
16
17use super::ResequencePolicy;
18
19pub struct StreamPolicy {
26 sequence_expr: Arc<dyn Expression>,
27 capacity: usize,
28 gap_timeout: Duration,
29 on_gap: GapPolicy,
30 on_capacity_exceeded: CapacityPolicy,
31 dedup: bool,
32
33 weak_self: Weak<Self>,
35
36 queue: Mutex<BTreeMap<u64, Exchange>>,
38
39 next_expected: Mutex<u64>,
41
42 gap_tokens: Mutex<HashMap<u64, CancellationToken>>,
44
45 gap_handles: Mutex<HashMap<u64, JoinHandle<()>>>,
47
48 driver_tx: Mutex<Option<mpsc::Sender<Exchange>>>,
50
51 shutdown_started: AtomicBool,
53}
54
55impl StreamPolicy {
56 pub fn new_cyclic(
58 sequence_expr: Arc<dyn Expression>,
59 capacity: usize,
60 gap_timeout_ms: u64,
61 on_gap: GapPolicy,
62 on_capacity_exceeded: CapacityPolicy,
63 dedup: bool,
64 ) -> Arc<Self> {
65 Arc::new_cyclic(|weak| Self {
66 sequence_expr,
67 capacity,
68 gap_timeout: Duration::from_millis(gap_timeout_ms),
69 on_gap,
70 on_capacity_exceeded,
71 dedup,
72 weak_self: weak.clone(),
73 queue: Mutex::new(BTreeMap::new()),
74 next_expected: Mutex::new(1),
75 gap_tokens: Mutex::new(HashMap::new()),
76 gap_handles: Mutex::new(HashMap::new()),
77 driver_tx: Mutex::new(None),
78 shutdown_started: AtomicBool::new(false),
79 })
80 }
81
82 fn set_driver_tx(&self, tx: mpsc::Sender<Exchange>) {
84 let mut guard = self.driver_tx.lock().unwrap_or_else(|e| e.into_inner());
85 *guard = Some(tx);
86 }
87
88 async fn eval_seq(&self, exchange: &Exchange) -> Result<u64, String> {
90 let val = self
91 .sequence_expr
92 .evaluate(exchange)
93 .await
94 .map_err(|e| format!("sequence expression evaluation failed: {e}"))?;
95 match val {
96 serde_json::Value::Number(n) => n
97 .as_u64()
98 .ok_or_else(|| format!("sequence value must be a non-negative integer, got {n}")),
99 _ => Err(format!(
100 "sequence expression must evaluate to a number, got {}",
101 val
102 )),
103 }
104 }
105
106 fn next_expected(&self) -> u64 {
108 *self.next_expected.lock().unwrap_or_else(|e| e.into_inner())
109 }
110
111 fn set_next_expected(&self, v: u64) {
113 *self.next_expected.lock().unwrap_or_else(|e| e.into_inner()) = v;
114 }
115
116 fn drain_contiguous(&self) -> Vec<Exchange> {
119 let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
120 let mut expected = *self.next_expected.lock().unwrap_or_else(|e| e.into_inner());
121 let mut emitted = Vec::new();
122
123 while let Some(ex) = queue.remove(&expected) {
124 self.cancel_gap_timer(expected);
126 emitted.push(ex);
127 expected += 1;
128 }
129
130 *self.next_expected.lock().unwrap_or_else(|e| e.into_inner()) = expected;
131 emitted
132 }
133
134 fn drain_all_with_max(&self) -> (Vec<Exchange>, u64) {
138 let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
139 let keys: Vec<u64> = queue.keys().copied().collect();
140 let max_seq = keys.iter().max().copied().unwrap_or(0);
141 let mut held = Vec::new();
142 for k in keys {
143 if let Some(ex) = queue.remove(&k) {
144 self.cancel_gap_timer(k);
145 held.push(ex);
146 }
147 }
148 (held, max_seq)
149 }
150
151 fn has_gap_timer(&self, seq: u64) -> bool {
153 let tokens = self.gap_tokens.lock().unwrap_or_else(|e| e.into_inner());
154 tokens.contains_key(&seq)
155 }
156
157 fn cancel_gap_timer(&self, seq: u64) {
159 {
160 let mut tokens = self.gap_tokens.lock().unwrap_or_else(|e| e.into_inner());
161 if let Some(token) = tokens.remove(&seq) {
162 token.cancel();
163 }
164 }
165 {
166 let mut handles = self.gap_handles.lock().unwrap_or_else(|e| e.into_inner());
167 handles.remove(&seq);
168 }
169 }
170
171 fn cancel_all_gap_timers(&self) {
173 let tokens: HashMap<u64, CancellationToken> = {
174 let mut guard = self.gap_tokens.lock().unwrap_or_else(|e| e.into_inner());
175 std::mem::take(&mut *guard)
176 };
177 for (_, token) in tokens {
178 token.cancel();
179 }
180 {
181 let mut handles = self.gap_handles.lock().unwrap_or_else(|e| e.into_inner());
182 handles.clear();
183 }
184 }
185
186 fn spawn_gap_timer(&self, missing_seq: u64) {
188 let cancel = CancellationToken::new();
189 let cancel_clone = cancel.clone();
190
191 {
192 let mut tokens = self.gap_tokens.lock().unwrap_or_else(|e| e.into_inner());
193 tokens.insert(missing_seq, cancel);
194 }
195
196 let weak = self.weak_self.clone();
197 let gap_timeout = self.gap_timeout;
198 let on_gap = self.on_gap;
199 let driver_tx_opt = {
200 let guard = self.driver_tx.lock().unwrap_or_else(|e| e.into_inner());
201 guard.clone()
202 };
203
204 let handle = tokio::spawn(async move {
205 tokio::select! {
206 _ = tokio::time::sleep(gap_timeout) => {
207 if cancel_clone.is_cancelled() {
208 return;
209 }
210 }
211 _ = cancel_clone.cancelled() => {
212 return;
213 }
214 }
215
216 let Some(policy) = weak.upgrade() else {
217 return;
218 };
219
220 if policy.shutdown_started.load(Ordering::SeqCst) {
221 return;
222 }
223
224 match on_gap {
225 GapPolicy::EmitPartial => {
226 policy.set_next_expected(missing_seq + 1);
230 let emitted = policy.drain_contiguous();
231
232 if emitted.is_empty() {
233 return;
234 }
235
236 if let Some(tx) = &driver_tx_opt {
237 for ex in emitted {
238 if tx.send(ex).await.is_err() {
239 break;
240 }
241 }
242 }
243 }
244 GapPolicy::DropAndLog => {
245 let (held, max_seq) = policy.drain_all_with_max();
246
247 if held.is_empty() {
248 return;
249 }
250
251 policy.set_next_expected(max_seq + 1);
252
253 for ex in &held {
255 tracing::warn!(
256 correlation_id = %ex.correlation_id(),
257 "stream resequencer: gap timeout — dropping held exchange (no dead-letter sink wired)"
258 );
259 }
260 let _ = held;
262 }
263 }
264
265 {
267 let mut handles = policy.gap_handles.lock().unwrap_or_else(|e| e.into_inner());
268 handles.remove(&missing_seq);
269 }
270 {
271 let mut tokens = policy.gap_tokens.lock().unwrap_or_else(|e| e.into_inner());
272 tokens.remove(&missing_seq);
273 }
274 });
275
276 {
277 let mut handles = self.gap_handles.lock().unwrap_or_else(|e| e.into_inner());
278 handles.insert(missing_seq, handle);
279 }
280 }
281}
282
283#[async_trait]
284impl ResequencePolicy for StreamPolicy {
285 async fn accept(&self, input: Exchange) -> Vec<Exchange> {
286 let seq = match self.eval_seq(&input).await {
287 Ok(s) => s,
288 Err(e) => {
289 tracing::warn!(
290 error = %e,
291 correlation_id = %input.correlation_id(),
292 "StreamPolicy: sequence expression failed, dropping exchange"
293 );
294 return vec![];
295 }
296 };
297
298 let expected = self.next_expected();
299
300 if seq == expected {
301 self.cancel_gap_timer(expected);
305 self.set_next_expected(seq + 1);
307 let mut emitted = vec![input];
308 emitted.append(&mut self.drain_contiguous());
309 emitted
310 } else if seq < expected {
311 if self.dedup {
313 tracing::debug!(
315 seq = seq,
316 expected = expected,
317 "StreamPolicy: ignoring duplicate/late sequence (dedup enabled)"
318 );
319 return vec![];
320 }
321 {
323 let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
324 queue.insert(seq, input);
325 }
326 vec![]
327 } else {
328 {
330 let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
331 let queue_len = queue.len();
332
333 if self.dedup && queue.contains_key(&seq) {
336 tracing::debug!(
337 seq = seq,
338 "StreamPolicy: ignoring redelivered held sequence (dedup enabled)"
339 );
340 return vec![];
341 }
342
343 if queue_len >= self.capacity {
345 match self.on_capacity_exceeded {
346 CapacityPolicy::LogAndDrop => {
347 tracing::warn!(
348 seq = seq,
349 capacity = self.capacity,
350 "StreamPolicy: capacity exceeded, dropping incoming exchange"
351 );
352 return vec![];
353 }
354 CapacityPolicy::DropOldest => {
355 let oldest_key = queue.keys().next().copied();
357 if let Some(oldest) = oldest_key {
358 let dropped = queue.remove(&oldest);
359 self.cancel_gap_timer(oldest);
360 tracing::debug!(
361 dropped_seq = oldest,
362 "StreamPolicy: capacity exceeded, dropped oldest exchange"
363 );
364 let _ = dropped;
365 }
366 }
367 }
368 }
369
370 queue.insert(seq, input);
371 }
372
373 if !self.has_gap_timer(expected) {
375 self.spawn_gap_timer(expected);
376 }
377
378 vec![]
379 }
380 }
381
382 async fn flush(&self) -> Vec<Exchange> {
383 self.shutdown_started.store(true, Ordering::SeqCst);
384 self.cancel_all_gap_timers();
385
386 let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
387 let keys: Vec<u64> = queue.keys().copied().collect();
388 let mut held = Vec::new();
389 for k in keys {
390 if let Some(ex) = queue.remove(&k) {
391 held.push(ex);
392 }
393 }
394 held
395 }
396
397 fn name(&self) -> &'static str {
398 "stream-resequencer"
399 }
400
401 fn set_timeout_tx(&self, tx: mpsc::Sender<Exchange>) {
402 self.set_driver_tx(tx);
403 }
404}
405
406#[cfg(test)]
409mod tests {
410 use super::*;
411 use camel_api::exchange::ExchangePattern;
412 use camel_api::message::Message;
413
414 struct PropExpr(String);
416
417 #[async_trait::async_trait]
418 impl Expression for PropExpr {
419 async fn evaluate(
420 &self,
421 exchange: &Exchange,
422 ) -> Result<serde_json::Value, camel_language_api::LanguageError> {
423 Ok(exchange
424 .property(&self.0)
425 .cloned()
426 .unwrap_or(serde_json::Value::Null))
427 }
428 }
429
430 fn mk_exchange(seq: u64) -> Exchange {
431 let mut ex = Exchange::new(Message::new(camel_api::body::Body::Text(format!(
432 "msg-{seq}"
433 ))));
434 ex.set_property("seq", serde_json::json!(seq));
435 ex.pattern = ExchangePattern::InOnly;
436 ex
437 }
438
439 fn default_policy() -> Arc<StreamPolicy> {
440 StreamPolicy::new_cyclic(
441 Arc::new(PropExpr("seq".into())),
442 100,
443 5000,
444 GapPolicy::EmitPartial,
445 CapacityPolicy::LogAndDrop,
446 false,
447 )
448 }
449
450 fn seq_of(ex: &Exchange) -> u64 {
451 ex.property("seq").and_then(|v| v.as_u64()).unwrap_or(0)
452 }
453
454 #[tokio::test]
457 async fn stream_emit_contiguous_run() {
458 let policy = default_policy();
459
460 let result2 = policy.accept(mk_exchange(2)).await;
462 assert!(
463 result2.is_empty(),
464 "seq 2 should be held (not yet contiguous)"
465 );
466
467 let result1 = policy.accept(mk_exchange(1)).await;
469 assert_eq!(result1.len(), 2, "should emit 1 then drain 2");
470 let seqs1: Vec<u64> = result1.iter().map(seq_of).collect();
471 assert_eq!(seqs1, vec![1, 2]);
472
473 let result4 = policy.accept(mk_exchange(4)).await;
475 assert!(result4.is_empty(), "seq 4 should be held");
476
477 let result3 = policy.accept(mk_exchange(3)).await;
479 assert_eq!(result3.len(), 2, "should emit 3 then drain 4");
480 let seqs3: Vec<u64> = result3.iter().map(seq_of).collect();
481 assert_eq!(seqs3, vec![3, 4]);
482 }
483
484 #[tokio::test]
487 async fn stream_gap_timeout_emit_partial() {
488 let policy = StreamPolicy::new_cyclic(
489 Arc::new(PropExpr("seq".into())),
490 100,
491 50, GapPolicy::EmitPartial,
493 CapacityPolicy::LogAndDrop,
494 false,
495 );
496
497 let (tx, mut rx) = mpsc::channel::<Exchange>(16);
498 policy.set_timeout_tx(tx);
499
500 assert!(policy.accept(mk_exchange(2)).await.is_empty());
502 assert!(policy.accept(mk_exchange(3)).await.is_empty());
503
504 let emitted: Vec<Exchange> = tokio::time::timeout(Duration::from_millis(500), async {
506 let mut out = Vec::new();
507 out.push(rx.recv().await.unwrap());
508 out.push(rx.recv().await.unwrap());
509 out
510 })
511 .await
512 .expect("gap timer should fire within 500ms");
513
514 assert_eq!(emitted.len(), 2, "should emit all held exchanges");
515 let seqs: Vec<u64> = emitted.iter().map(seq_of).collect();
516 assert_eq!(seqs, vec![2, 3], "should emit in sequence order");
517
518 assert_eq!(
520 policy.next_expected(),
521 4,
522 "next_expected should advance past gap to max(drained)+1"
523 );
524 }
525
526 #[tokio::test]
528 async fn stream_capacity_exceeded_log_and_drop() {
529 let policy = StreamPolicy::new_cyclic(
530 Arc::new(PropExpr("seq".into())),
531 2, 5000,
533 GapPolicy::EmitPartial,
534 CapacityPolicy::LogAndDrop,
535 false,
536 );
537
538 assert!(policy.accept(mk_exchange(3)).await.is_empty());
540 assert!(policy.accept(mk_exchange(4)).await.is_empty());
541
542 {
543 let queue = policy.queue.lock().unwrap();
544 assert_eq!(queue.len(), 2, "queue should be full");
545 }
546
547 let result = policy.accept(mk_exchange(5)).await;
549 assert!(
550 result.is_empty(),
551 "overflow exchange should be dead-lettered (empty result)"
552 );
553
554 {
555 let queue = policy.queue.lock().unwrap();
556 assert_eq!(queue.len(), 2, "queue should stay at capacity");
557 }
558 }
559
560 #[tokio::test]
562 async fn stream_capacity_exceeded_drop_oldest() {
563 let policy = StreamPolicy::new_cyclic(
564 Arc::new(PropExpr("seq".into())),
565 2,
566 5000,
567 GapPolicy::EmitPartial,
568 CapacityPolicy::DropOldest,
569 false,
570 );
571
572 assert!(policy.accept(mk_exchange(3)).await.is_empty());
574 assert!(policy.accept(mk_exchange(4)).await.is_empty());
575
576 let result = policy.accept(mk_exchange(5)).await;
578 assert!(
579 result.is_empty(),
580 "overflow with DropOldest should not emit"
581 );
582
583 {
584 let queue = policy.queue.lock().unwrap();
585 assert_eq!(queue.len(), 2, "queue should still be at capacity");
586 assert!(!queue.contains_key(&3), "oldest seq 3 should be dropped");
587 assert!(queue.contains_key(&4), "seq 4 should remain");
588 assert!(queue.contains_key(&5), "seq 5 should be inserted");
589 }
590 }
591
592 #[tokio::test]
594 async fn stream_dedup_on_ignores_duplicate() {
595 let policy = StreamPolicy::new_cyclic(
596 Arc::new(PropExpr("seq".into())),
597 100,
598 5000,
599 GapPolicy::EmitPartial,
600 CapacityPolicy::LogAndDrop,
601 true, );
603
604 let result1 = policy.accept(mk_exchange(1)).await;
606 assert_eq!(result1.len(), 1, "seq 1 should be emitted");
607
608 let result2 = policy.accept(mk_exchange(1)).await;
610 assert!(
611 result2.is_empty(),
612 "duplicate seq 1 should be ignored with dedup on"
613 );
614 }
615
616 #[tokio::test]
619 async fn stream_dedup_off_inserts_duplicate() {
620 let policy = StreamPolicy::new_cyclic(
621 Arc::new(PropExpr("seq".into())),
622 100,
623 5000,
624 GapPolicy::EmitPartial,
625 CapacityPolicy::LogAndDrop,
626 false, );
628
629 let result1 = policy.accept(mk_exchange(1)).await;
631 assert_eq!(result1.len(), 1, "seq 1 should be emitted");
632
633 let result2 = policy.accept(mk_exchange(1)).await;
635 assert!(
636 result2.is_empty(),
637 "duplicate seq 1 with dedup off should be inserted, not emitted"
638 );
639
640 {
641 let queue = policy.queue.lock().unwrap();
642 assert!(
643 queue.contains_key(&1),
644 "duplicate seq 1 should be in queue (dedup off)"
645 );
646 }
647 }
648
649 #[tokio::test]
651 async fn stream_flush_emits_remaining_sorted() {
652 let policy = default_policy();
653
654 assert!(!policy.accept(mk_exchange(1)).await.is_empty());
656
657 assert!(policy.accept(mk_exchange(5)).await.is_empty());
659 assert!(policy.accept(mk_exchange(3)).await.is_empty());
660
661 let flushed = policy.flush().await;
663 assert_eq!(flushed.len(), 2, "should emit all remaining held exchanges");
664 let seqs: Vec<u64> = flushed.iter().map(seq_of).collect();
665 assert_eq!(seqs, vec![3, 5], "should be in sequence order");
666 }
667
668 #[tokio::test]
670 async fn stream_late_sequence_after_advance() {
671 let policy_dedup = StreamPolicy::new_cyclic(
673 Arc::new(PropExpr("seq".into())),
674 100,
675 50,
676 GapPolicy::EmitPartial,
677 CapacityPolicy::LogAndDrop,
678 true,
679 );
680
681 {
683 let mut ne = policy_dedup.next_expected.lock().unwrap();
684 *ne = 5;
685 }
686
687 let result = policy_dedup.accept(mk_exchange(3)).await;
689 assert!(
690 result.is_empty(),
691 "late seq with dedup=true should be ignored"
692 );
693
694 let policy_no_dedup = StreamPolicy::new_cyclic(
696 Arc::new(PropExpr("seq".into())),
697 100,
698 50,
699 GapPolicy::EmitPartial,
700 CapacityPolicy::LogAndDrop,
701 false,
702 );
703
704 {
705 let mut ne = policy_no_dedup.next_expected.lock().unwrap();
706 *ne = 5;
707 }
708
709 let result = policy_no_dedup.accept(mk_exchange(3)).await;
711 assert!(
712 result.is_empty(),
713 "late seq with dedup=false should be inserted"
714 );
715
716 {
717 let queue = policy_no_dedup.queue.lock().unwrap();
718 assert!(
719 queue.contains_key(&3),
720 "late seq should be in queue with dedup off"
721 );
722 }
723 }
724
725 #[tokio::test]
728 async fn stream_stale_gap_timer_cancelled_on_normal_arrival() {
729 let policy = StreamPolicy::new_cyclic(
730 Arc::new(PropExpr("seq".into())),
731 100,
732 50, GapPolicy::DropAndLog,
734 CapacityPolicy::LogAndDrop,
735 false,
736 );
737
738 let (tx, mut rx) = mpsc::channel::<Exchange>(16);
739 policy.set_timeout_tx(tx);
740
741 assert!(policy.accept(mk_exchange(2)).await.is_empty());
743 assert!(
744 policy.has_gap_timer(1),
745 "gap timer should be armed for seq 1"
746 );
747
748 let result = policy.accept(mk_exchange(1)).await;
750 assert_eq!(result.len(), 2, "should emit seq 1 and drained seq 2");
751 let seqs: Vec<u64> = result.iter().map(seq_of).collect();
752 assert_eq!(seqs, vec![1, 2]);
753
754 assert!(
756 !policy.has_gap_timer(1),
757 "gap timer for seq 1 should be cancelled after normal arrival"
758 );
759
760 tokio::time::sleep(Duration::from_millis(200)).await;
763 match rx.try_recv() {
764 Ok(ex) => {
765 panic!(
766 "stale gap timer fired and sent exchange with seq={} — corruption!",
767 seq_of(&ex)
768 );
769 }
770 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
771 }
773 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {}
774 }
775 }
776
777 #[tokio::test]
779 async fn stream_dedup_held_future_seq() {
780 let policy = StreamPolicy::new_cyclic(
781 Arc::new(PropExpr("seq".into())),
782 100,
783 5000,
784 GapPolicy::EmitPartial,
785 CapacityPolicy::LogAndDrop,
786 true, );
788
789 assert!(policy.accept(mk_exchange(5)).await.is_empty());
791
792 let result = policy.accept(mk_exchange(5)).await;
794 assert!(result.is_empty(), "redelivered held seq should be ignored");
795
796 {
798 let queue = policy.queue.lock().unwrap();
799 assert_eq!(queue.len(), 1, "queue should still have exactly one entry");
800 assert!(queue.contains_key(&5), "seq 5 should still be in queue");
801 }
802 }
803}