Skip to main content

camel_processor/resequencer/
stream.rs

1//! Stream resequencing policy — bounded priority queue keyed by sequence
2//! number, gap detection with per-gap timeout, capacity cap, opt-in dedup.
3
4use std::collections::{BTreeMap, HashMap};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex, Weak};
7use std::time::Duration;
8
9use async_trait::async_trait;
10use camel_api::exchange::Exchange;
11use camel_api::resequencer::{CapacityPolicy, GapPolicy};
12use camel_language_api::Expression;
13use tokio::sync::mpsc;
14use tokio::task::JoinHandle;
15use tokio_util::sync::CancellationToken;
16
17use super::ResequencePolicy;
18
19/// Stream resequencing policy.
20///
21/// Uses a `BTreeMap<u64, Exchange>` as a priority queue keyed by sequence
22/// number. Tracks `next_expected` and emits the contiguous run whenever the
23/// expected sequence arrives. Gap timers fire when a missing sequence is not
24/// received within `gap_timeout`.
25pub struct StreamPolicy {
26    sequence_expr: Arc<dyn Expression>,
27    capacity: usize,
28    gap_timeout: Duration,
29    on_gap: GapPolicy,
30    on_capacity_exceeded: CapacityPolicy,
31    dedup: bool,
32
33    /// Weak self-reference so gap timer tasks can upgrade to `Arc<Self>`.
34    weak_self: Weak<Self>,
35
36    /// Priority queue keyed by sequence number (min-first via BTreeMap).
37    queue: Mutex<BTreeMap<u64, Exchange>>,
38
39    /// Next expected sequence number.
40    next_expected: Mutex<u64>,
41
42    /// Gap cancellation tokens, keyed by the missing sequence number.
43    gap_tokens: Mutex<HashMap<u64, CancellationToken>>,
44
45    /// Gap timer task handles, keyed by the missing sequence number.
46    gap_handles: Mutex<HashMap<u64, JoinHandle<()>>>,
47
48    /// Channel to the post-driver for gap-timer-triggered emissions.
49    driver_tx: Mutex<Option<mpsc::Sender<Exchange>>>,
50
51    /// Shutdown guard — gap tasks check this before sending.
52    shutdown_started: AtomicBool,
53}
54
55impl StreamPolicy {
56    /// Create a new `Arc<StreamPolicy>` using `Arc::new_cyclic`.
57    pub fn new_cyclic(
58        sequence_expr: Arc<dyn Expression>,
59        capacity: usize,
60        gap_timeout_ms: u64,
61        on_gap: GapPolicy,
62        on_capacity_exceeded: CapacityPolicy,
63        dedup: bool,
64    ) -> Arc<Self> {
65        Arc::new_cyclic(|weak| Self {
66            sequence_expr,
67            capacity,
68            gap_timeout: Duration::from_millis(gap_timeout_ms),
69            on_gap,
70            on_capacity_exceeded,
71            dedup,
72            weak_self: weak.clone(),
73            queue: Mutex::new(BTreeMap::new()),
74            next_expected: Mutex::new(1),
75            gap_tokens: Mutex::new(HashMap::new()),
76            gap_handles: Mutex::new(HashMap::new()),
77            driver_tx: Mutex::new(None),
78            shutdown_started: AtomicBool::new(false),
79        })
80    }
81
82    /// Set the driver channel (via `set_timeout_tx` trait method).
83    fn set_driver_tx(&self, tx: mpsc::Sender<Exchange>) {
84        let mut guard = self.driver_tx.lock().unwrap_or_else(|e| e.into_inner());
85        *guard = Some(tx);
86    }
87
88    /// Evaluate the sequence expression against an exchange.
89    async fn eval_seq(&self, exchange: &Exchange) -> Result<u64, String> {
90        let val = self
91            .sequence_expr
92            .evaluate(exchange)
93            .await
94            .map_err(|e| format!("sequence expression evaluation failed: {e}"))?;
95        match val {
96            serde_json::Value::Number(n) => n
97                .as_u64()
98                .ok_or_else(|| format!("sequence value must be a non-negative integer, got {n}")),
99            _ => Err(format!(
100                "sequence expression must evaluate to a number, got {}",
101                val
102            )),
103        }
104    }
105
106    /// Get the next expected sequence number.
107    fn next_expected(&self) -> u64 {
108        *self.next_expected.lock().unwrap_or_else(|e| e.into_inner())
109    }
110
111    /// Set the next expected sequence number.
112    fn set_next_expected(&self, v: u64) {
113        *self.next_expected.lock().unwrap_or_else(|e| e.into_inner()) = v;
114    }
115
116    /// Drain the contiguous tail from the queue, starting at `next_expected`.
117    /// Returns all consecutive exchanges in sequence order and advances `next_expected`.
118    fn drain_contiguous(&self) -> Vec<Exchange> {
119        let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
120        let mut expected = *self.next_expected.lock().unwrap_or_else(|e| e.into_inner());
121        let mut emitted = Vec::new();
122
123        while let Some(ex) = queue.remove(&expected) {
124            // Cancel any gap timer for this sequence
125            self.cancel_gap_timer(expected);
126            emitted.push(ex);
127            expected += 1;
128        }
129
130        *self.next_expected.lock().unwrap_or_else(|e| e.into_inner()) = expected;
131        emitted
132    }
133
134    /// Drain ALL held exchanges from the queue in sequence order, returning
135    /// `(held_exchanges, max_seq)` where `max_seq` is the largest sequence
136    /// number drained (0 if empty).
137    fn drain_all_with_max(&self) -> (Vec<Exchange>, u64) {
138        let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
139        let keys: Vec<u64> = queue.keys().copied().collect();
140        let max_seq = keys.iter().max().copied().unwrap_or(0);
141        let mut held = Vec::new();
142        for k in keys {
143            if let Some(ex) = queue.remove(&k) {
144                self.cancel_gap_timer(k);
145                held.push(ex);
146            }
147        }
148        (held, max_seq)
149    }
150
151    /// Check if a gap timer exists for a sequence number.
152    fn has_gap_timer(&self, seq: u64) -> bool {
153        let tokens = self.gap_tokens.lock().unwrap_or_else(|e| e.into_inner());
154        tokens.contains_key(&seq)
155    }
156
157    /// Cancel and remove a gap timer for a sequence number.
158    fn cancel_gap_timer(&self, seq: u64) {
159        {
160            let mut tokens = self.gap_tokens.lock().unwrap_or_else(|e| e.into_inner());
161            if let Some(token) = tokens.remove(&seq) {
162                token.cancel();
163            }
164        }
165        {
166            let mut handles = self.gap_handles.lock().unwrap_or_else(|e| e.into_inner());
167            handles.remove(&seq);
168        }
169    }
170
171    /// Cancel all gap timers.
172    fn cancel_all_gap_timers(&self) {
173        let tokens: HashMap<u64, CancellationToken> = {
174            let mut guard = self.gap_tokens.lock().unwrap_or_else(|e| e.into_inner());
175            std::mem::take(&mut *guard)
176        };
177        for (_, token) in tokens {
178            token.cancel();
179        }
180        {
181            let mut handles = self.gap_handles.lock().unwrap_or_else(|e| e.into_inner());
182            handles.clear();
183        }
184    }
185
186    /// Spawn a gap timer for the given missing sequence.
187    fn spawn_gap_timer(&self, missing_seq: u64) {
188        let cancel = CancellationToken::new();
189        let cancel_clone = cancel.clone();
190
191        {
192            let mut tokens = self.gap_tokens.lock().unwrap_or_else(|e| e.into_inner());
193            tokens.insert(missing_seq, cancel);
194        }
195
196        let weak = self.weak_self.clone();
197        let gap_timeout = self.gap_timeout;
198        let on_gap = self.on_gap;
199        let driver_tx_opt = {
200            let guard = self.driver_tx.lock().unwrap_or_else(|e| e.into_inner());
201            guard.clone()
202        };
203
204        let handle = tokio::spawn(async move {
205            tokio::select! {
206                _ = tokio::time::sleep(gap_timeout) => {
207                    if cancel_clone.is_cancelled() {
208                        return;
209                    }
210                }
211                _ = cancel_clone.cancelled() => {
212                    return;
213                }
214            }
215
216            let Some(policy) = weak.upgrade() else {
217                return;
218            };
219
220            if policy.shutdown_started.load(Ordering::SeqCst) {
221                return;
222            }
223
224            match on_gap {
225                GapPolicy::EmitPartial => {
226                    // Advance past the missing sequence, then drain only the
227                    // contiguous run from the new position. Preserves
228                    // resequencing purity: intermediate gaps stay held.
229                    policy.set_next_expected(missing_seq + 1);
230                    let emitted = policy.drain_contiguous();
231
232                    if emitted.is_empty() {
233                        return;
234                    }
235
236                    if let Some(tx) = &driver_tx_opt {
237                        for ex in emitted {
238                            if tx.send(ex).await.is_err() {
239                                break;
240                            }
241                        }
242                    }
243                }
244                GapPolicy::DropAndLog => {
245                    let (held, max_seq) = policy.drain_all_with_max();
246
247                    if held.is_empty() {
248                        return;
249                    }
250
251                    policy.set_next_expected(max_seq + 1);
252
253                    // Drop-and-log: log and drop (best-effort, no dead-letter sink)
254                    for ex in &held {
255                        tracing::warn!(
256                            correlation_id = %ex.correlation_id(),
257                            "stream resequencer: gap timeout — dropping held exchange (no dead-letter sink wired)"
258                        );
259                    }
260                    // Held exchanges are dropped
261                    let _ = held;
262                }
263            }
264
265            // Clean up handle and token entries (C2 fix: remove token too)
266            {
267                let mut handles = policy.gap_handles.lock().unwrap_or_else(|e| e.into_inner());
268                handles.remove(&missing_seq);
269            }
270            {
271                let mut tokens = policy.gap_tokens.lock().unwrap_or_else(|e| e.into_inner());
272                tokens.remove(&missing_seq);
273            }
274        });
275
276        {
277            let mut handles = self.gap_handles.lock().unwrap_or_else(|e| e.into_inner());
278            handles.insert(missing_seq, handle);
279        }
280    }
281}
282
283#[async_trait]
284impl ResequencePolicy for StreamPolicy {
285    async fn accept(&self, input: Exchange) -> Vec<Exchange> {
286        let seq = match self.eval_seq(&input).await {
287            Ok(s) => s,
288            Err(e) => {
289                tracing::warn!(
290                    error = %e,
291                    correlation_id = %input.correlation_id(),
292                    "StreamPolicy: sequence expression failed, dropping exchange"
293                );
294                return vec![];
295            }
296        };
297
298        let expected = self.next_expected();
299
300        if seq == expected {
301            // C1: Cancel any gap timer that was armed for this expected sequence
302            // (e.g. next_expected=1, seq 2 arrived and armed timer for 1,
303            //  then seq 1 arrives — cancel the stale timer before draining).
304            self.cancel_gap_timer(expected);
305            // Advance next_expected past this sequence BEFORE draining
306            self.set_next_expected(seq + 1);
307            let mut emitted = vec![input];
308            emitted.append(&mut self.drain_contiguous());
309            emitted
310        } else if seq < expected {
311            // Late or duplicate
312            if self.dedup {
313                // Ignore duplicate
314                tracing::debug!(
315                    seq = seq,
316                    expected = expected,
317                    "StreamPolicy: ignoring duplicate/late sequence (dedup enabled)"
318                );
319                return vec![];
320            }
321            // dedup=false: insert anyway (Camel 4.x behavior)
322            {
323                let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
324                queue.insert(seq, input);
325            }
326            vec![]
327        } else {
328            // seq > expected: insert into queue + arm gap timer
329            {
330                let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
331                let queue_len = queue.len();
332
333                // I1: dedup check for held future seqs — redelivered held seq
334                // should be ignored when dedup is enabled.
335                if self.dedup && queue.contains_key(&seq) {
336                    tracing::debug!(
337                        seq = seq,
338                        "StreamPolicy: ignoring redelivered held sequence (dedup enabled)"
339                    );
340                    return vec![];
341                }
342
343                // Capacity overflow check
344                if queue_len >= self.capacity {
345                    match self.on_capacity_exceeded {
346                        CapacityPolicy::LogAndDrop => {
347                            tracing::warn!(
348                                seq = seq,
349                                capacity = self.capacity,
350                                "StreamPolicy: capacity exceeded, dropping incoming exchange"
351                            );
352                            return vec![];
353                        }
354                        CapacityPolicy::DropOldest => {
355                            // Remove the smallest seq in the queue
356                            let oldest_key = queue.keys().next().copied();
357                            if let Some(oldest) = oldest_key {
358                                let dropped = queue.remove(&oldest);
359                                self.cancel_gap_timer(oldest);
360                                tracing::debug!(
361                                    dropped_seq = oldest,
362                                    "StreamPolicy: capacity exceeded, dropped oldest exchange"
363                                );
364                                let _ = dropped;
365                            }
366                        }
367                    }
368                }
369
370                queue.insert(seq, input);
371            }
372
373            // Arm a gap timer for the expected (missing) sequence if not already armed
374            if !self.has_gap_timer(expected) {
375                self.spawn_gap_timer(expected);
376            }
377
378            vec![]
379        }
380    }
381
382    async fn flush(&self) -> Vec<Exchange> {
383        self.shutdown_started.store(true, Ordering::SeqCst);
384        self.cancel_all_gap_timers();
385
386        let mut queue = self.queue.lock().unwrap_or_else(|e| e.into_inner());
387        let keys: Vec<u64> = queue.keys().copied().collect();
388        let mut held = Vec::new();
389        for k in keys {
390            if let Some(ex) = queue.remove(&k) {
391                held.push(ex);
392            }
393        }
394        held
395    }
396
397    fn name(&self) -> &'static str {
398        "stream-resequencer"
399    }
400
401    fn set_timeout_tx(&self, tx: mpsc::Sender<Exchange>) {
402        self.set_driver_tx(tx);
403    }
404}
405
406// ── Tests ──
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411    use camel_api::exchange::ExchangePattern;
412    use camel_api::message::Message;
413
414    /// Mock expression that reads a property by name.
415    struct PropExpr(String);
416
417    #[async_trait::async_trait]
418    impl Expression for PropExpr {
419        async fn evaluate(
420            &self,
421            exchange: &Exchange,
422        ) -> Result<serde_json::Value, camel_language_api::LanguageError> {
423            Ok(exchange
424                .property(&self.0)
425                .cloned()
426                .unwrap_or(serde_json::Value::Null))
427        }
428    }
429
430    fn mk_exchange(seq: u64) -> Exchange {
431        let mut ex = Exchange::new(Message::new(camel_api::body::Body::Text(format!(
432            "msg-{seq}"
433        ))));
434        ex.set_property("seq", serde_json::json!(seq));
435        ex.pattern = ExchangePattern::InOnly;
436        ex
437    }
438
439    fn default_policy() -> Arc<StreamPolicy> {
440        StreamPolicy::new_cyclic(
441            Arc::new(PropExpr("seq".into())),
442            100,
443            5000,
444            GapPolicy::EmitPartial,
445            CapacityPolicy::LogAndDrop,
446            false,
447        )
448    }
449
450    fn seq_of(ex: &Exchange) -> u64 {
451        ex.property("seq").and_then(|v| v.as_u64()).unwrap_or(0)
452    }
453
454    /// S1: expected=1; receive 2 (held), 1 (emit 1 then drain 2),
455    ///     4 (held), 3 (emit 3 then drain 4)
456    #[tokio::test]
457    async fn stream_emit_contiguous_run() {
458        let policy = default_policy();
459
460        // Receive 2 (held: next_expected=1, seq=2 > 1 → held)
461        let result2 = policy.accept(mk_exchange(2)).await;
462        assert!(
463            result2.is_empty(),
464            "seq 2 should be held (not yet contiguous)"
465        );
466
467        // Receive 1 (expected! → emit 1, then drain 2)
468        let result1 = policy.accept(mk_exchange(1)).await;
469        assert_eq!(result1.len(), 2, "should emit 1 then drain 2");
470        let seqs1: Vec<u64> = result1.iter().map(seq_of).collect();
471        assert_eq!(seqs1, vec![1, 2]);
472
473        // Receive 4 (held: next_expected=3, seq=4 > 3 → held)
474        let result4 = policy.accept(mk_exchange(4)).await;
475        assert!(result4.is_empty(), "seq 4 should be held");
476
477        // Receive 3 (expected! → emit 3, then drain 4)
478        let result3 = policy.accept(mk_exchange(3)).await;
479        assert_eq!(result3.len(), 2, "should emit 3 then drain 4");
480        let seqs3: Vec<u64> = result3.iter().map(seq_of).collect();
481        assert_eq!(seqs3, vec![3, 4]);
482    }
483
484    /// S2: expected=1; receive 2,3 (held); gap timer fires →
485    ///     emit held [2,3], advance expected to 4
486    #[tokio::test]
487    async fn stream_gap_timeout_emit_partial() {
488        let policy = StreamPolicy::new_cyclic(
489            Arc::new(PropExpr("seq".into())),
490            100,
491            50, // 50ms gap timeout
492            GapPolicy::EmitPartial,
493            CapacityPolicy::LogAndDrop,
494            false,
495        );
496
497        let (tx, mut rx) = mpsc::channel::<Exchange>(16);
498        policy.set_timeout_tx(tx);
499
500        // Receive 2 and 3 (held)
501        assert!(policy.accept(mk_exchange(2)).await.is_empty());
502        assert!(policy.accept(mk_exchange(3)).await.is_empty());
503
504        // Wait for gap timer to fire (seq 1 never arrives)
505        let emitted: Vec<Exchange> = tokio::time::timeout(Duration::from_millis(500), async {
506            let mut out = Vec::new();
507            out.push(rx.recv().await.unwrap());
508            out.push(rx.recv().await.unwrap());
509            out
510        })
511        .await
512        .expect("gap timer should fire within 500ms");
513
514        assert_eq!(emitted.len(), 2, "should emit all held exchanges");
515        let seqs: Vec<u64> = emitted.iter().map(seq_of).collect();
516        assert_eq!(seqs, vec![2, 3], "should emit in sequence order");
517
518        // next_expected should have advanced past the gap to max(2,3) + 1 = 4
519        assert_eq!(
520            policy.next_expected(),
521            4,
522            "next_expected should advance past gap to max(drained)+1"
523        );
524    }
525
526    /// S3: queue fills at capacity; next input triggers LogAndDrop
527    #[tokio::test]
528    async fn stream_capacity_exceeded_log_and_drop() {
529        let policy = StreamPolicy::new_cyclic(
530            Arc::new(PropExpr("seq".into())),
531            2, // tiny capacity
532            5000,
533            GapPolicy::EmitPartial,
534            CapacityPolicy::LogAndDrop,
535            false,
536        );
537
538        // Fill queue: seq 3, 4 (next_expected=1)
539        assert!(policy.accept(mk_exchange(3)).await.is_empty());
540        assert!(policy.accept(mk_exchange(4)).await.is_empty());
541
542        {
543            let queue = policy.queue.lock().unwrap();
544            assert_eq!(queue.len(), 2, "queue should be full");
545        }
546
547        // Next input (seq 5) should trigger LogAndDrop
548        let result = policy.accept(mk_exchange(5)).await;
549        assert!(
550            result.is_empty(),
551            "overflow exchange should be dead-lettered (empty result)"
552        );
553
554        {
555            let queue = policy.queue.lock().unwrap();
556            assert_eq!(queue.len(), 2, "queue should stay at capacity");
557        }
558    }
559
560    /// S4: queue fills; next input drops oldest
561    #[tokio::test]
562    async fn stream_capacity_exceeded_drop_oldest() {
563        let policy = StreamPolicy::new_cyclic(
564            Arc::new(PropExpr("seq".into())),
565            2,
566            5000,
567            GapPolicy::EmitPartial,
568            CapacityPolicy::DropOldest,
569            false,
570        );
571
572        // Fill queue: seq 3, 4
573        assert!(policy.accept(mk_exchange(3)).await.is_empty());
574        assert!(policy.accept(mk_exchange(4)).await.is_empty());
575
576        // Next input (seq 5) should drop oldest (seq 3), insert seq 5
577        let result = policy.accept(mk_exchange(5)).await;
578        assert!(
579            result.is_empty(),
580            "overflow with DropOldest should not emit"
581        );
582
583        {
584            let queue = policy.queue.lock().unwrap();
585            assert_eq!(queue.len(), 2, "queue should still be at capacity");
586            assert!(!queue.contains_key(&3), "oldest seq 3 should be dropped");
587            assert!(queue.contains_key(&4), "seq 4 should remain");
588            assert!(queue.contains_key(&5), "seq 5 should be inserted");
589        }
590    }
591
592    /// S5: with dedup=true, duplicate seq is ignored
593    #[tokio::test]
594    async fn stream_dedup_on_ignores_duplicate() {
595        let policy = StreamPolicy::new_cyclic(
596            Arc::new(PropExpr("seq".into())),
597            100,
598            5000,
599            GapPolicy::EmitPartial,
600            CapacityPolicy::LogAndDrop,
601            true, // dedup on
602        );
603
604        // Receive 1 (emit)
605        let result1 = policy.accept(mk_exchange(1)).await;
606        assert_eq!(result1.len(), 1, "seq 1 should be emitted");
607
608        // Receive 1 again — should be ignored (duplicate with dedup=true)
609        let result2 = policy.accept(mk_exchange(1)).await;
610        assert!(
611            result2.is_empty(),
612            "duplicate seq 1 should be ignored with dedup on"
613        );
614    }
615
616    /// S6: with dedup=false (default), duplicate seq is inserted
617    ///     (Camel 4.x behavior)
618    #[tokio::test]
619    async fn stream_dedup_off_inserts_duplicate() {
620        let policy = StreamPolicy::new_cyclic(
621            Arc::new(PropExpr("seq".into())),
622            100,
623            5000,
624            GapPolicy::EmitPartial,
625            CapacityPolicy::LogAndDrop,
626            false, // dedup off (default)
627        );
628
629        // Receive 1 (emit)
630        let result1 = policy.accept(mk_exchange(1)).await;
631        assert_eq!(result1.len(), 1, "seq 1 should be emitted");
632
633        // Receive 1 again — with dedup=false, duplicate is inserted
634        let result2 = policy.accept(mk_exchange(1)).await;
635        assert!(
636            result2.is_empty(),
637            "duplicate seq 1 with dedup off should be inserted, not emitted"
638        );
639
640        {
641            let queue = policy.queue.lock().unwrap();
642            assert!(
643                queue.contains_key(&1),
644                "duplicate seq 1 should be in queue (dedup off)"
645            );
646        }
647    }
648
649    /// S7: flush() emits remaining in seq order
650    #[tokio::test]
651    async fn stream_flush_emits_remaining_sorted() {
652        let policy = default_policy();
653
654        // Emit 1 first
655        assert!(!policy.accept(mk_exchange(1)).await.is_empty());
656
657        // Now next_expected=2. Hold 5, 3
658        assert!(policy.accept(mk_exchange(5)).await.is_empty());
659        assert!(policy.accept(mk_exchange(3)).await.is_empty());
660
661        // Flush
662        let flushed = policy.flush().await;
663        assert_eq!(flushed.len(), 2, "should emit all remaining held exchanges");
664        let seqs: Vec<u64> = flushed.iter().map(seq_of).collect();
665        assert_eq!(seqs, vec![3, 5], "should be in sequence order");
666    }
667
668    /// S8: seq < next_expected after advance → dedup behavior
669    #[tokio::test]
670    async fn stream_late_sequence_after_advance() {
671        // Policy with dedup=true
672        let policy_dedup = StreamPolicy::new_cyclic(
673            Arc::new(PropExpr("seq".into())),
674            100,
675            50,
676            GapPolicy::EmitPartial,
677            CapacityPolicy::LogAndDrop,
678            true,
679        );
680
681        // Advance past seq 1,2,3 by setting next_expected to 5
682        {
683            let mut ne = policy_dedup.next_expected.lock().unwrap();
684            *ne = 5;
685        }
686
687        // seq 3 < 5, dedup=true → ignored
688        let result = policy_dedup.accept(mk_exchange(3)).await;
689        assert!(
690            result.is_empty(),
691            "late seq with dedup=true should be ignored"
692        );
693
694        // Policy with dedup=false
695        let policy_no_dedup = StreamPolicy::new_cyclic(
696            Arc::new(PropExpr("seq".into())),
697            100,
698            50,
699            GapPolicy::EmitPartial,
700            CapacityPolicy::LogAndDrop,
701            false,
702        );
703
704        {
705            let mut ne = policy_no_dedup.next_expected.lock().unwrap();
706            *ne = 5;
707        }
708
709        // seq 3 < 5, dedup=false → inserted into queue
710        let result = policy_no_dedup.accept(mk_exchange(3)).await;
711        assert!(
712            result.is_empty(),
713            "late seq with dedup=false should be inserted"
714        );
715
716        {
717            let queue = policy_no_dedup.queue.lock().unwrap();
718            assert!(
719                queue.contains_key(&3),
720                "late seq should be in queue with dedup off"
721            );
722        }
723    }
724
725    /// M4/C1: next_expected=1 → seq 2 arrives (arms timer for 1) →
726    /// seq 1 arrives within timeout → timer 1 must NOT fire → no corruption
727    #[tokio::test]
728    async fn stream_stale_gap_timer_cancelled_on_normal_arrival() {
729        let policy = StreamPolicy::new_cyclic(
730            Arc::new(PropExpr("seq".into())),
731            100,
732            50, // 50ms gap timeout
733            GapPolicy::DropAndLog,
734            CapacityPolicy::LogAndDrop,
735            false,
736        );
737
738        let (tx, mut rx) = mpsc::channel::<Exchange>(16);
739        policy.set_timeout_tx(tx);
740
741        // next_expected=1, seq 2 arrives → held, gap timer armed for seq 1
742        assert!(policy.accept(mk_exchange(2)).await.is_empty());
743        assert!(
744            policy.has_gap_timer(1),
745            "gap timer should be armed for seq 1"
746        );
747
748        // seq 1 arrives within timeout → cancels timer, emits 1 and drains 2
749        let result = policy.accept(mk_exchange(1)).await;
750        assert_eq!(result.len(), 2, "should emit seq 1 and drained seq 2");
751        let seqs: Vec<u64> = result.iter().map(seq_of).collect();
752        assert_eq!(seqs, vec![1, 2]);
753
754        // Verify gap timer for seq 1 was cancelled (C1 fix)
755        assert!(
756            !policy.has_gap_timer(1),
757            "gap timer for seq 1 should be cancelled after normal arrival"
758        );
759
760        // Verify no stale timer fire: the mpsc channel should NOT receive a
761        // corrupt re-emission from the old gap timer.
762        tokio::time::sleep(Duration::from_millis(200)).await;
763        match rx.try_recv() {
764            Ok(ex) => {
765                panic!(
766                    "stale gap timer fired and sent exchange with seq={} — corruption!",
767                    seq_of(&ex)
768                );
769            }
770            Err(tokio::sync::mpsc::error::TryRecvError::Empty) => {
771                // Expected: no stale emission
772            }
773            Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => {}
774        }
775    }
776
777    /// M4/I1: dedup=true, seq 5 held, redeliver seq 5 → ignored (original preserved)
778    #[tokio::test]
779    async fn stream_dedup_held_future_seq() {
780        let policy = StreamPolicy::new_cyclic(
781            Arc::new(PropExpr("seq".into())),
782            100,
783            5000,
784            GapPolicy::EmitPartial,
785            CapacityPolicy::LogAndDrop,
786            true, // dedup on
787        );
788
789        // next_expected=1, send seq 5 → held
790        assert!(policy.accept(mk_exchange(5)).await.is_empty());
791
792        // Redeliver seq 5 → should be ignored with dedup=true
793        let result = policy.accept(mk_exchange(5)).await;
794        assert!(result.is_empty(), "redelivered held seq should be ignored");
795
796        // Queue should still contain only one seq 5
797        {
798            let queue = policy.queue.lock().unwrap();
799            assert_eq!(queue.len(), 1, "queue should still have exactly one entry");
800            assert!(queue.contains_key(&5), "seq 5 should still be in queue");
801        }
802    }
803}