Skip to main content

rns_core/channel/
mod.rs

1pub mod envelope;
2pub mod types;
3
4use alloc::collections::VecDeque;
5use alloc::vec::Vec;
6
7#[cfg(test)]
8use crate::constants::CHANNEL_SEQ_MAX;
9use crate::constants::{
10    CHANNEL_ENVELOPE_OVERHEAD, CHANNEL_FAST_RATE_THRESHOLD, CHANNEL_MAX_TRIES, CHANNEL_RTT_FAST,
11    CHANNEL_RTT_MEDIUM, CHANNEL_RTT_SLOW, CHANNEL_SEQ_MODULUS, CHANNEL_WINDOW,
12    CHANNEL_WINDOW_FLEXIBILITY, CHANNEL_WINDOW_MAX_FAST, CHANNEL_WINDOW_MAX_MEDIUM,
13    CHANNEL_WINDOW_MAX_SLOW, CHANNEL_WINDOW_MIN, CHANNEL_WINDOW_MIN_LIMIT_FAST,
14    CHANNEL_WINDOW_MIN_LIMIT_MEDIUM,
15};
16
17pub use types::{ChannelAction, ChannelError, MessageType, Sequence};
18
19use envelope::{pack_envelope, unpack_envelope};
20
21/// Internal envelope tracking state.
22struct Envelope {
23    sequence: Sequence,
24    raw: Vec<u8>,
25    tries: u8,
26    sent_at: f64,
27    delivered: bool,
28}
29
30/// Window-based reliable messaging channel.
31///
32/// Follows the action-queue model: `send`/`receive`/`tick` return
33/// `Vec<ChannelAction>`. The caller dispatches actions.
34pub struct Channel {
35    tx_ring: VecDeque<Envelope>,
36    rx_ring: VecDeque<Envelope>,
37    next_sequence: u16,
38    next_rx_sequence: u16,
39    window: u16,
40    window_max: u16,
41    window_min: u16,
42    window_flexibility: u16,
43    fast_rate_rounds: u16,
44    medium_rate_rounds: u16,
45    max_tries: u8,
46    rtt: f64,
47}
48
49impl Channel {
50    /// Create a new Channel with initial RTT.
51    pub fn new(initial_rtt: f64) -> Self {
52        let (window, window_max, window_min, window_flexibility) = if initial_rtt > CHANNEL_RTT_SLOW
53        {
54            (1, 1, 1, 1)
55        } else {
56            (
57                CHANNEL_WINDOW,
58                CHANNEL_WINDOW_MAX_SLOW,
59                CHANNEL_WINDOW_MIN,
60                CHANNEL_WINDOW_FLEXIBILITY,
61            )
62        };
63
64        Channel {
65            tx_ring: VecDeque::new(),
66            rx_ring: VecDeque::new(),
67            next_sequence: 0,
68            next_rx_sequence: 0,
69            window,
70            window_max,
71            window_min,
72            window_flexibility,
73            fast_rate_rounds: 0,
74            medium_rate_rounds: 0,
75            max_tries: CHANNEL_MAX_TRIES,
76            rtt: initial_rtt,
77        }
78    }
79
80    /// Update the RTT value.
81    pub fn set_rtt(&mut self, rtt: f64) {
82        self.rtt = rtt;
83    }
84
85    /// Maximum data unit available for message payload.
86    pub fn mdu(&self, link_mdu: usize) -> usize {
87        let mdu = link_mdu.saturating_sub(CHANNEL_ENVELOPE_OVERHEAD);
88        mdu.min(0xFFFF)
89    }
90
91    /// Check if channel is ready to send (has window capacity).
92    pub fn is_ready_to_send(&self) -> bool {
93        let outstanding = self.tx_ring.iter().filter(|e| !e.delivered).count() as u16;
94        outstanding < self.window
95    }
96
97    /// Send a message. Returns `SendOnLink` action with packed envelope.
98    pub fn send(
99        &mut self,
100        msgtype: u16,
101        payload: &[u8],
102        now: f64,
103        link_mdu: usize,
104    ) -> Result<Vec<ChannelAction>, ChannelError> {
105        if !self.is_ready_to_send() {
106            return Err(ChannelError::NotReady);
107        }
108
109        let sequence = self.next_sequence;
110        let raw = pack_envelope(msgtype, sequence, payload);
111        if raw.len() > link_mdu {
112            return Err(ChannelError::MessageTooBig);
113        }
114
115        self.next_sequence = ((self.next_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
116        self.tx_ring.push_back(Envelope {
117            sequence,
118            raw: raw.clone(),
119            tries: 1,
120            sent_at: now,
121            delivered: false,
122        });
123
124        Ok(alloc::vec![ChannelAction::SendOnLink { raw, sequence }])
125    }
126
127    /// Receive decrypted envelope bytes.
128    ///
129    /// Returns `MessageReceived` for contiguous sequences starting from
130    /// `next_rx_sequence`.
131    pub fn receive(&mut self, raw: &[u8], _now: f64) -> Vec<ChannelAction> {
132        let (_msgtype, sequence, _payload) = match unpack_envelope(raw) {
133            Ok(r) => r,
134            Err(_) => return Vec::new(),
135        };
136
137        // Reject sequences behind our window
138        if self.is_behind_rx_window(sequence) {
139            return Vec::new();
140        }
141
142        // Reject duplicates
143        if self.rx_ring.iter().any(|e| e.sequence == sequence) {
144            return Vec::new();
145        }
146
147        // Emplace in sorted order
148        let envelope = Envelope {
149            sequence,
150            raw: raw.to_vec(),
151            tries: 0,
152            sent_at: 0.0,
153            delivered: false,
154        };
155        self.emplace_rx(envelope);
156
157        // Collect contiguous messages
158        self.collect_contiguous()
159    }
160
161    /// Clear all outstanding TX entries, restoring the window to full capacity.
162    /// Used after holepunch completion where signaling messages are fire-and-forget.
163    pub fn flush_tx(&mut self) {
164        self.tx_ring.clear();
165    }
166
167    /// Cancel a send that did not reach the link layer.
168    pub fn cancel_send(&mut self, sequence: Sequence) -> bool {
169        let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) else {
170            return false;
171        };
172        self.tx_ring.remove(pos);
173        let expected_next = ((sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
174        if self.next_sequence == expected_next {
175            self.next_sequence = sequence;
176        }
177        true
178    }
179
180    /// Notify that a packet with given sequence was delivered (acknowledged).
181    pub fn packet_delivered(&mut self, sequence: Sequence) -> Vec<ChannelAction> {
182        if let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) {
183            self.tx_ring.remove(pos);
184
185            if self.window < self.window_max {
186                self.window += 1;
187            }
188
189            // Adapt window based on RTT
190            self.adapt_window_on_delivery();
191        }
192        Vec::new()
193    }
194
195    /// Notify that a packet with given sequence timed out.
196    pub fn packet_timeout(&mut self, sequence: Sequence, now: f64) -> Vec<ChannelAction> {
197        let pos = match self.tx_ring.iter().position(|e| e.sequence == sequence) {
198            Some(p) => p,
199            None => return Vec::new(),
200        };
201
202        let envelope = &self.tx_ring[pos];
203        if envelope.tries >= self.max_tries {
204            self.tx_ring.clear();
205            self.rx_ring.clear();
206            return alloc::vec![ChannelAction::TeardownLink];
207        }
208
209        // Retry
210        let envelope = &mut self.tx_ring[pos];
211        envelope.tries += 1;
212        envelope.sent_at = now;
213        let raw = envelope.raw.clone();
214
215        // Shrink window (Python nests window_max shrink inside window shrink)
216        if self.window > self.window_min {
217            self.window -= 1;
218            if self.window_max > self.window_min + self.window_flexibility {
219                self.window_max -= 1;
220            }
221        }
222
223        alloc::vec![ChannelAction::SendOnLink { raw, sequence }]
224    }
225
226    /// Compute timeout duration for the given try count.
227    ///
228    /// Formula: `1.5^(tries-1) * max(rtt*2.5, 0.025) * (tx_ring.len() + 1.5)`
229    pub fn get_packet_timeout(&self, tries: u8) -> f64 {
230        let base = 1.5_f64.powi((tries as i32) - 1);
231        let rtt_factor = (self.rtt * 2.5).max(0.025);
232        let ring_factor = (self.tx_ring.len() as f64) + 1.5;
233        base * rtt_factor * ring_factor
234    }
235
236    /// Get the current try count for a given sequence.
237    pub fn get_tries(&self, sequence: Sequence) -> Option<u8> {
238        self.tx_ring
239            .iter()
240            .find(|e| e.sequence == sequence)
241            .map(|e| e.tries)
242    }
243
244    /// Periodic maintenance for retransmissions and timeout handling.
245    pub fn tick(&mut self, now: f64) -> Vec<ChannelAction> {
246        let timed_out: Vec<Sequence> = self
247            .tx_ring
248            .iter()
249            .filter(|e| !e.delivered && now - e.sent_at >= self.get_packet_timeout(e.tries))
250            .map(|e| e.sequence)
251            .collect();
252
253        let mut actions = Vec::new();
254        for sequence in timed_out {
255            actions.extend(self.packet_timeout(sequence, now));
256        }
257        actions
258    }
259
260    /// Shut down the channel, clearing all rings.
261    pub fn shutdown(&mut self) {
262        self.tx_ring.clear();
263        self.rx_ring.clear();
264    }
265
266    /// Current window size.
267    pub fn window(&self) -> u16 {
268        self.window
269    }
270
271    /// Current maximum window size.
272    pub fn window_max(&self) -> u16 {
273        self.window_max
274    }
275
276    /// Number of outstanding (undelivered) envelopes in TX ring.
277    pub fn outstanding(&self) -> usize {
278        self.tx_ring.iter().filter(|e| !e.delivered).count()
279    }
280
281    // --- Internal ---
282
283    fn is_behind_rx_window(&self, sequence: Sequence) -> bool {
284        if sequence < self.next_rx_sequence {
285            let window_overflow = (self.next_rx_sequence as u32 + CHANNEL_WINDOW_MAX_FAST as u32)
286                % CHANNEL_SEQ_MODULUS;
287            let overflow = window_overflow as u16;
288            if overflow < self.next_rx_sequence {
289                // Wrapped around — sequence is valid if > overflow
290                if sequence > overflow {
291                    return true; // actually behind
292                }
293                return false; // valid wrap-around sequence
294            }
295            return true;
296        }
297        false
298    }
299
300    fn emplace_rx(&mut self, envelope: Envelope) {
301        // Use modular distance from next_rx_sequence for correct wrap-boundary ordering.
302        // wrapping_sub gives the unsigned distance in sequence space.
303        let env_dist = envelope.sequence.wrapping_sub(self.next_rx_sequence);
304        for (i, existing) in self.rx_ring.iter().enumerate() {
305            if envelope.sequence == existing.sequence {
306                return; // duplicate
307            }
308            let exist_dist = existing.sequence.wrapping_sub(self.next_rx_sequence);
309            if env_dist < exist_dist {
310                self.rx_ring.insert(i, envelope);
311                return;
312            }
313        }
314        self.rx_ring.push_back(envelope);
315    }
316
317    fn collect_contiguous(&mut self) -> Vec<ChannelAction> {
318        let mut actions = Vec::new();
319
320        loop {
321            let front_match = self
322                .rx_ring
323                .front()
324                .map(|e| e.sequence == self.next_rx_sequence)
325                .unwrap_or(false);
326
327            if !front_match {
328                break;
329            }
330
331            let envelope = self.rx_ring.pop_front().unwrap();
332
333            // Re-parse the envelope to get payload
334            if let Ok((msgtype, _seq, payload)) = unpack_envelope(&envelope.raw) {
335                actions.push(ChannelAction::MessageReceived {
336                    msgtype,
337                    payload: payload.to_vec(),
338                    sequence: envelope.sequence,
339                });
340            }
341
342            self.next_rx_sequence =
343                ((self.next_rx_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
344
345            // After wrapping to 0, check if 0 is also in the ring
346            if self.next_rx_sequence == 0 {
347                // Continue the loop — it will check front again
348            }
349        }
350
351        actions
352    }
353
354    fn adapt_window_on_delivery(&mut self) {
355        if self.rtt == 0.0 {
356            return;
357        }
358
359        if self.rtt > CHANNEL_RTT_FAST {
360            self.fast_rate_rounds = 0;
361
362            if self.rtt > CHANNEL_RTT_MEDIUM {
363                self.medium_rate_rounds = 0;
364            } else {
365                self.medium_rate_rounds += 1;
366                if self.window_max < CHANNEL_WINDOW_MAX_MEDIUM
367                    && self.medium_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
368                {
369                    self.window_max = CHANNEL_WINDOW_MAX_MEDIUM;
370                    self.window_min = CHANNEL_WINDOW_MIN_LIMIT_MEDIUM;
371                }
372            }
373        } else {
374            self.fast_rate_rounds += 1;
375            if self.window_max < CHANNEL_WINDOW_MAX_FAST
376                && self.fast_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
377            {
378                self.window_max = CHANNEL_WINDOW_MAX_FAST;
379                self.window_min = CHANNEL_WINDOW_MIN_LIMIT_FAST;
380            }
381        }
382    }
383}
384
385#[cfg(test)]
386mod tests {
387    use super::*;
388
389    #[test]
390    fn test_new_default() {
391        let ch = Channel::new(0.5);
392        assert_eq!(ch.window, CHANNEL_WINDOW);
393        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_SLOW);
394        assert!(ch.is_ready_to_send());
395    }
396
397    #[test]
398    fn test_new_very_slow() {
399        let ch = Channel::new(2.0);
400        assert_eq!(ch.window, 1);
401        assert_eq!(ch.window_max, 1);
402    }
403
404    #[test]
405    fn test_send_receive() {
406        let mut ch = Channel::new(0.1);
407        let actions = ch.send(0x01, b"hello", 1.0, 500).unwrap();
408        assert_eq!(actions.len(), 1);
409        match &actions[0] {
410            ChannelAction::SendOnLink { raw, sequence } => {
411                assert_eq!(*sequence, 0);
412                // Simulate receive on the other side
413                let mut ch2 = Channel::new(0.1);
414                let recv_actions = ch2.receive(raw, 1.1);
415                assert_eq!(recv_actions.len(), 1);
416                match &recv_actions[0] {
417                    ChannelAction::MessageReceived {
418                        msgtype,
419                        payload,
420                        sequence,
421                    } => {
422                        assert_eq!(*msgtype, 0x01);
423                        assert_eq!(payload, b"hello");
424                        assert_eq!(*sequence, 0);
425                    }
426                    _ => panic!("Expected MessageReceived"),
427                }
428            }
429            _ => panic!("Expected SendOnLink"),
430        }
431    }
432
433    #[test]
434    fn test_send_not_ready() {
435        let mut ch = Channel::new(0.1);
436        // Fill the window
437        ch.send(0x01, b"a", 1.0, 500).unwrap();
438        ch.send(0x01, b"b", 1.0, 500).unwrap();
439        // Window = 2, both outstanding
440        assert!(!ch.is_ready_to_send());
441        assert_eq!(ch.send(0x01, b"c", 1.0, 500), Err(ChannelError::NotReady));
442    }
443
444    #[test]
445    fn test_message_too_big_does_not_consume_sequence() {
446        let mut ch = Channel::new(0.1);
447        assert_eq!(
448            ch.send(0x01, b"hello", 1.0, 2),
449            Err(ChannelError::MessageTooBig)
450        );
451
452        let actions = ch.send(0x01, b"ok", 2.0, 500).unwrap();
453        match &actions[0] {
454            ChannelAction::SendOnLink { sequence, .. } => assert_eq!(*sequence, 0),
455            _ => panic!("Expected SendOnLink"),
456        }
457    }
458
459    #[test]
460    fn test_cancel_send_rewinds_sequence_and_frees_window() {
461        let mut ch = Channel::new(CHANNEL_RTT_SLOW + 1.0);
462        let actions = ch.send(0x01, b"first", 1.0, 500).unwrap();
463        let sequence = match &actions[0] {
464            ChannelAction::SendOnLink { sequence, .. } => *sequence,
465            _ => panic!("Expected SendOnLink"),
466        };
467        assert!(!ch.is_ready_to_send());
468
469        assert!(ch.cancel_send(sequence));
470        assert!(ch.is_ready_to_send());
471        let actions = ch.send(0x01, b"retry", 2.0, 500).unwrap();
472        match &actions[0] {
473            ChannelAction::SendOnLink { sequence, .. } => assert_eq!(*sequence, 0),
474            _ => panic!("Expected SendOnLink"),
475        }
476    }
477
478    #[test]
479    fn test_packet_delivered_grows_window() {
480        let mut ch = Channel::new(0.1);
481        ch.send(0x01, b"a", 1.0, 500).unwrap();
482        ch.send(0x01, b"b", 1.0, 500).unwrap();
483
484        assert_eq!(ch.window, 2);
485        ch.packet_delivered(0);
486        assert_eq!(ch.window, 3);
487    }
488
489    #[test]
490    fn test_packet_timeout_shrinks_window() {
491        let mut ch = Channel::new(0.1);
492        ch.send(0x01, b"a", 1.0, 500).unwrap();
493        ch.send(0x01, b"b", 1.0, 500).unwrap();
494
495        // Deliver one to grow window
496        ch.packet_delivered(0);
497        assert_eq!(ch.window, 3);
498
499        // Timeout on seq 1
500        let actions = ch.packet_timeout(1, 2.0);
501        assert_eq!(actions.len(), 1); // resend
502        assert_eq!(ch.window, 2);
503    }
504
505    #[test]
506    fn test_tick_retransmits_timed_out_packets() {
507        let mut ch = Channel::new(0.1);
508        ch.send(0x01, b"a", 0.0, 500).unwrap();
509
510        let timeout = ch.get_packet_timeout(1);
511        let actions = ch.tick(timeout + 0.01);
512        assert_eq!(actions.len(), 1);
513        match &actions[0] {
514            ChannelAction::SendOnLink { sequence, .. } => assert_eq!(*sequence, 0),
515            _ => panic!("Expected SendOnLink"),
516        }
517        assert_eq!(ch.get_tries(0), Some(2));
518    }
519
520    #[test]
521    fn test_max_retries_teardown() {
522        let mut ch = Channel::new(0.1);
523        ch.send(0x01, b"a", 1.0, 500).unwrap();
524
525        // Time out until max_tries exceeded
526        for i in 0..4 {
527            let actions = ch.packet_timeout(0, 2.0 + i as f64);
528            assert_eq!(actions.len(), 1);
529            match &actions[0] {
530                ChannelAction::SendOnLink { .. } => {}
531                _ => panic!("Expected SendOnLink"),
532            }
533        }
534
535        // One more timeout → teardown
536        let actions = ch.packet_timeout(0, 10.0);
537        assert_eq!(actions.len(), 1);
538        match &actions[0] {
539            ChannelAction::TeardownLink => {}
540            _ => panic!("Expected TeardownLink"),
541        }
542    }
543
544    #[test]
545    fn test_sequence_wrapping() {
546        let mut ch = Channel::new(0.1);
547        ch.next_sequence = CHANNEL_SEQ_MAX;
548
549        ch.send(0x01, b"wrap", 1.0, 500).unwrap();
550        assert_eq!(ch.next_sequence, 0);
551
552        ch.send(0x01, b"after", 1.0, 500).unwrap();
553        assert_eq!(ch.next_sequence, 1);
554    }
555
556    #[test]
557    fn test_out_of_order_buffering() {
558        let mut ch = Channel::new(0.1);
559
560        // Send messages out of order (simulate): sequence 1 arrives before 0
561        let raw0 = pack_envelope(0x01, 0, b"first");
562        let raw1 = pack_envelope(0x01, 1, b"second");
563
564        // Receive seq 1 first
565        let actions = ch.receive(&raw1, 1.0);
566        assert!(actions.is_empty()); // buffered, waiting for 0
567
568        // Receive seq 0
569        let actions = ch.receive(&raw0, 1.1);
570        assert_eq!(actions.len(), 2); // both delivered in order
571        match &actions[0] {
572            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
573            _ => panic!("Expected MessageReceived"),
574        }
575        match &actions[1] {
576            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 1),
577            _ => panic!("Expected MessageReceived"),
578        }
579    }
580
581    #[test]
582    fn test_duplicate_rejection() {
583        let mut ch = Channel::new(0.1);
584        let raw = pack_envelope(0x01, 0, b"hello");
585
586        let actions = ch.receive(&raw, 1.0);
587        assert_eq!(actions.len(), 1);
588
589        // Duplicate
590        let actions = ch.receive(&raw, 1.1);
591        assert!(actions.is_empty());
592    }
593
594    #[test]
595    fn test_get_packet_timeout() {
596        let ch = Channel::new(0.1);
597        let t1 = ch.get_packet_timeout(1);
598        let t2 = ch.get_packet_timeout(2);
599        assert!(t2 > t1); // exponential backoff
600    }
601
602    #[test]
603    fn test_mdu() {
604        let ch = Channel::new(0.1);
605        assert_eq!(ch.mdu(431), 431 - CHANNEL_ENVELOPE_OVERHEAD);
606    }
607
608    #[test]
609    fn test_window_upgrade_fast() {
610        let mut ch = Channel::new(0.05); // fast RTT
611        ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
612
613        // Deliver FAST_RATE_THRESHOLD messages
614        for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
615            ch.send(0x01, b"x", i as f64, 500).unwrap();
616            ch.packet_delivered(i);
617        }
618
619        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_FAST);
620        assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_FAST);
621    }
622
623    #[test]
624    fn test_window_upgrade_medium() {
625        let mut ch = Channel::new(0.5); // medium RTT
626        ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
627
628        for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
629            ch.send(0x01, b"x", i as f64, 500).unwrap();
630            ch.packet_delivered(i);
631        }
632
633        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_MEDIUM);
634        assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM);
635    }
636
637    #[test]
638    fn test_shutdown() {
639        let mut ch = Channel::new(0.1);
640        ch.send(0x01, b"a", 1.0, 500).unwrap();
641        ch.shutdown();
642        assert_eq!(ch.outstanding(), 0);
643    }
644
645    #[test]
646    fn test_message_too_big() {
647        let mut ch = Channel::new(0.1);
648        let big = vec![0u8; 500];
649        // link_mdu = 10, message + header won't fit
650        assert_eq!(
651            ch.send(0x01, &big, 1.0, 10),
652            Err(ChannelError::MessageTooBig)
653        );
654    }
655
656    #[test]
657    fn test_receive_sequence_wrap_at_boundary() {
658        let mut ch = Channel::new(0.1);
659        ch.next_rx_sequence = CHANNEL_SEQ_MAX;
660
661        let raw_max = pack_envelope(0x01, CHANNEL_SEQ_MAX, b"last");
662        let raw_zero = pack_envelope(0x01, 0, b"first_after_wrap");
663
664        let actions = ch.receive(&raw_max, 1.0);
665        assert_eq!(actions.len(), 1);
666        assert_eq!(ch.next_rx_sequence, 0);
667
668        let actions = ch.receive(&raw_zero, 1.1);
669        assert_eq!(actions.len(), 1);
670        match &actions[0] {
671            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
672            _ => panic!("Expected MessageReceived"),
673        }
674    }
675
676    #[test]
677    fn test_receive_wrap_boundary_out_of_order() {
678        // Test that out-of-order messages at the wrap boundary (0xFFFF→0) are sorted correctly.
679        let mut ch = Channel::new(0.1);
680        ch.next_rx_sequence = 0xFFFE;
681
682        let raw_fffe = pack_envelope(0x01, 0xFFFE, b"a");
683        let raw_ffff = pack_envelope(0x01, 0xFFFF, b"b");
684        let raw_0000 = pack_envelope(0x01, 0x0000, b"c");
685
686        // Deliver in reverse order: 0, 0xFFFF, 0xFFFE
687        let actions = ch.receive(&raw_0000, 1.0);
688        assert!(actions.is_empty()); // waiting for 0xFFFE
689
690        let actions = ch.receive(&raw_ffff, 1.1);
691        assert!(actions.is_empty()); // still waiting for 0xFFFE
692
693        let actions = ch.receive(&raw_fffe, 1.2);
694        assert_eq!(actions.len(), 3); // all three delivered in order
695        match &actions[0] {
696            ChannelAction::MessageReceived {
697                sequence, payload, ..
698            } => {
699                assert_eq!(*sequence, 0xFFFE);
700                assert_eq!(payload, b"a");
701            }
702            _ => panic!("Expected MessageReceived"),
703        }
704        match &actions[1] {
705            ChannelAction::MessageReceived {
706                sequence, payload, ..
707            } => {
708                assert_eq!(*sequence, 0xFFFF);
709                assert_eq!(payload, b"b");
710            }
711            _ => panic!("Expected MessageReceived"),
712        }
713        match &actions[2] {
714            ChannelAction::MessageReceived {
715                sequence, payload, ..
716            } => {
717                assert_eq!(*sequence, 0x0000);
718                assert_eq!(payload, b"c");
719            }
720            _ => panic!("Expected MessageReceived"),
721        }
722    }
723
724    #[test]
725    fn test_many_messages_in_order() {
726        let mut sender = Channel::new(0.05);
727        let mut receiver = Channel::new(0.05);
728
729        for i in 0..20u16 {
730            // Deliver previous to make window available
731            if i >= 2 {
732                sender.packet_delivered(i - 2);
733            }
734
735            let actions = sender.send(0x01, &[i as u8], i as f64, 500).unwrap();
736            let raw = match &actions[0] {
737                ChannelAction::SendOnLink { raw, .. } => raw.clone(),
738                _ => panic!("Expected SendOnLink"),
739            };
740
741            let recv_actions = receiver.receive(&raw, i as f64 + 0.1);
742            assert_eq!(recv_actions.len(), 1);
743            match &recv_actions[0] {
744                ChannelAction::MessageReceived {
745                    payload, sequence, ..
746                } => {
747                    assert_eq!(*sequence, i);
748                    assert_eq!(payload, &[i as u8]);
749                }
750                _ => panic!("Expected MessageReceived"),
751            }
752        }
753    }
754}