Skip to main content

rns_core/channel/
mod.rs

1pub mod envelope;
2pub mod types;
3
4use alloc::collections::VecDeque;
5use alloc::vec::Vec;
6
7#[cfg(test)]
8use crate::constants::CHANNEL_SEQ_MAX;
9use crate::constants::{
10    CHANNEL_ENVELOPE_OVERHEAD, CHANNEL_FAST_RATE_THRESHOLD, CHANNEL_MAX_TRIES, CHANNEL_RTT_FAST,
11    CHANNEL_RTT_MEDIUM, CHANNEL_RTT_SLOW, CHANNEL_SEQ_MODULUS, CHANNEL_WINDOW,
12    CHANNEL_WINDOW_FLEXIBILITY, CHANNEL_WINDOW_MAX_FAST, CHANNEL_WINDOW_MAX_MEDIUM,
13    CHANNEL_WINDOW_MAX_SLOW, CHANNEL_WINDOW_MIN, CHANNEL_WINDOW_MIN_LIMIT_FAST,
14    CHANNEL_WINDOW_MIN_LIMIT_MEDIUM,
15};
16
17pub use types::{ChannelAction, ChannelError, MessageType, Sequence};
18
19use envelope::{pack_envelope, unpack_envelope};
20
21/// Internal envelope tracking state.
22struct Envelope {
23    sequence: Sequence,
24    raw: Vec<u8>,
25    tries: u8,
26    sent_at: f64,
27    delivered: bool,
28}
29
30/// Window-based reliable messaging channel.
31///
32/// Follows the action-queue model: `send`/`receive`/`tick` return
33/// `Vec<ChannelAction>`. The caller dispatches actions.
34pub struct Channel {
35    tx_ring: VecDeque<Envelope>,
36    rx_ring: VecDeque<Envelope>,
37    next_sequence: u16,
38    next_rx_sequence: u16,
39    window: u16,
40    window_max: u16,
41    window_min: u16,
42    window_flexibility: u16,
43    fast_rate_rounds: u16,
44    medium_rate_rounds: u16,
45    max_tries: u8,
46    rtt: f64,
47}
48
49impl Channel {
50    /// Create a new Channel with initial RTT.
51    pub fn new(initial_rtt: f64) -> Self {
52        let (window, window_max, window_min, window_flexibility) = if initial_rtt > CHANNEL_RTT_SLOW
53        {
54            (1, 1, 1, 1)
55        } else {
56            (
57                CHANNEL_WINDOW,
58                CHANNEL_WINDOW_MAX_SLOW,
59                CHANNEL_WINDOW_MIN,
60                CHANNEL_WINDOW_FLEXIBILITY,
61            )
62        };
63
64        Channel {
65            tx_ring: VecDeque::new(),
66            rx_ring: VecDeque::new(),
67            next_sequence: 0,
68            next_rx_sequence: 0,
69            window,
70            window_max,
71            window_min,
72            window_flexibility,
73            fast_rate_rounds: 0,
74            medium_rate_rounds: 0,
75            max_tries: CHANNEL_MAX_TRIES,
76            rtt: initial_rtt,
77        }
78    }
79
80    /// Update the RTT value.
81    pub fn set_rtt(&mut self, rtt: f64) {
82        self.rtt = rtt;
83    }
84
85    /// Maximum data unit available for message payload.
86    pub fn mdu(&self, link_mdu: usize) -> usize {
87        let mdu = link_mdu.saturating_sub(CHANNEL_ENVELOPE_OVERHEAD);
88        mdu.min(0xFFFF)
89    }
90
91    /// Check if channel is ready to send (has window capacity).
92    pub fn is_ready_to_send(&self) -> bool {
93        let outstanding = self.tx_ring.iter().filter(|e| !e.delivered).count() as u16;
94        outstanding < self.window
95    }
96
97    /// Send a message. Returns `SendOnLink` action with packed envelope.
98    pub fn send(
99        &mut self,
100        msgtype: u16,
101        payload: &[u8],
102        now: f64,
103        link_mdu: usize,
104    ) -> Result<Vec<ChannelAction>, ChannelError> {
105        if !self.is_ready_to_send() {
106            return Err(ChannelError::NotReady);
107        }
108
109        let sequence = self.next_sequence;
110        self.next_sequence = ((self.next_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
111
112        let raw = pack_envelope(msgtype, sequence, payload);
113        if raw.len() > link_mdu {
114            return Err(ChannelError::MessageTooBig);
115        }
116
117        self.tx_ring.push_back(Envelope {
118            sequence,
119            raw: raw.clone(),
120            tries: 1,
121            sent_at: now,
122            delivered: false,
123        });
124
125        Ok(alloc::vec![ChannelAction::SendOnLink { raw }])
126    }
127
128    /// Receive decrypted envelope bytes.
129    ///
130    /// Returns `MessageReceived` for contiguous sequences starting from
131    /// `next_rx_sequence`.
132    pub fn receive(&mut self, raw: &[u8], _now: f64) -> Vec<ChannelAction> {
133        let (_msgtype, sequence, _payload) = match unpack_envelope(raw) {
134            Ok(r) => r,
135            Err(_) => return Vec::new(),
136        };
137
138        // Reject sequences behind our window
139        if self.is_behind_rx_window(sequence) {
140            return Vec::new();
141        }
142
143        // Reject duplicates
144        if self.rx_ring.iter().any(|e| e.sequence == sequence) {
145            return Vec::new();
146        }
147
148        // Emplace in sorted order
149        let envelope = Envelope {
150            sequence,
151            raw: raw.to_vec(),
152            tries: 0,
153            sent_at: 0.0,
154            delivered: false,
155        };
156        self.emplace_rx(envelope);
157
158        // Collect contiguous messages
159        self.collect_contiguous()
160    }
161
162    /// Clear all outstanding TX entries, restoring the window to full capacity.
163    /// Used after holepunch completion where signaling messages are fire-and-forget.
164    pub fn flush_tx(&mut self) {
165        self.tx_ring.clear();
166    }
167
168    /// Notify that a packet with given sequence was delivered (acknowledged).
169    pub fn packet_delivered(&mut self, sequence: Sequence) -> Vec<ChannelAction> {
170        if let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) {
171            self.tx_ring.remove(pos);
172
173            if self.window < self.window_max {
174                self.window += 1;
175            }
176
177            // Adapt window based on RTT
178            self.adapt_window_on_delivery();
179        }
180        Vec::new()
181    }
182
183    /// Notify that a packet with given sequence timed out.
184    pub fn packet_timeout(&mut self, sequence: Sequence, now: f64) -> Vec<ChannelAction> {
185        let pos = match self.tx_ring.iter().position(|e| e.sequence == sequence) {
186            Some(p) => p,
187            None => return Vec::new(),
188        };
189
190        let envelope = &self.tx_ring[pos];
191        if envelope.tries >= self.max_tries {
192            self.tx_ring.clear();
193            self.rx_ring.clear();
194            return alloc::vec![ChannelAction::TeardownLink];
195        }
196
197        // Retry
198        let envelope = &mut self.tx_ring[pos];
199        envelope.tries += 1;
200        envelope.sent_at = now;
201        let raw = envelope.raw.clone();
202
203        // Shrink window (Python nests window_max shrink inside window shrink)
204        if self.window > self.window_min {
205            self.window -= 1;
206            if self.window_max > self.window_min + self.window_flexibility {
207                self.window_max -= 1;
208            }
209        }
210
211        alloc::vec![ChannelAction::SendOnLink { raw }]
212    }
213
214    /// Compute timeout duration for the given try count.
215    ///
216    /// Formula: `1.5^(tries-1) * max(rtt*2.5, 0.025) * (tx_ring.len() + 1.5)`
217    pub fn get_packet_timeout(&self, tries: u8) -> f64 {
218        let base = 1.5_f64.powi((tries as i32) - 1);
219        let rtt_factor = (self.rtt * 2.5).max(0.025);
220        let ring_factor = (self.tx_ring.len() as f64) + 1.5;
221        base * rtt_factor * ring_factor
222    }
223
224    /// Get the current try count for a given sequence.
225    pub fn get_tries(&self, sequence: Sequence) -> Option<u8> {
226        self.tx_ring
227            .iter()
228            .find(|e| e.sequence == sequence)
229            .map(|e| e.tries)
230    }
231
232    /// Shut down the channel, clearing all rings.
233    pub fn shutdown(&mut self) {
234        self.tx_ring.clear();
235        self.rx_ring.clear();
236    }
237
238    /// Current window size.
239    pub fn window(&self) -> u16 {
240        self.window
241    }
242
243    /// Current maximum window size.
244    pub fn window_max(&self) -> u16 {
245        self.window_max
246    }
247
248    /// Number of outstanding (undelivered) envelopes in TX ring.
249    pub fn outstanding(&self) -> usize {
250        self.tx_ring.iter().filter(|e| !e.delivered).count()
251    }
252
253    // --- Internal ---
254
255    fn is_behind_rx_window(&self, sequence: Sequence) -> bool {
256        if sequence < self.next_rx_sequence {
257            let window_overflow = (self.next_rx_sequence as u32 + CHANNEL_WINDOW_MAX_FAST as u32)
258                % CHANNEL_SEQ_MODULUS;
259            let overflow = window_overflow as u16;
260            if overflow < self.next_rx_sequence {
261                // Wrapped around — sequence is valid if > overflow
262                if sequence > overflow {
263                    return true; // actually behind
264                }
265                return false; // valid wrap-around sequence
266            }
267            return true;
268        }
269        false
270    }
271
272    fn emplace_rx(&mut self, envelope: Envelope) {
273        // Use modular distance from next_rx_sequence for correct wrap-boundary ordering.
274        // wrapping_sub gives the unsigned distance in sequence space.
275        let env_dist = envelope.sequence.wrapping_sub(self.next_rx_sequence);
276        for (i, existing) in self.rx_ring.iter().enumerate() {
277            if envelope.sequence == existing.sequence {
278                return; // duplicate
279            }
280            let exist_dist = existing.sequence.wrapping_sub(self.next_rx_sequence);
281            if env_dist < exist_dist {
282                self.rx_ring.insert(i, envelope);
283                return;
284            }
285        }
286        self.rx_ring.push_back(envelope);
287    }
288
289    fn collect_contiguous(&mut self) -> Vec<ChannelAction> {
290        let mut actions = Vec::new();
291
292        loop {
293            let front_match = self
294                .rx_ring
295                .front()
296                .map(|e| e.sequence == self.next_rx_sequence)
297                .unwrap_or(false);
298
299            if !front_match {
300                break;
301            }
302
303            let envelope = self.rx_ring.pop_front().unwrap();
304
305            // Re-parse the envelope to get payload
306            if let Ok((msgtype, _seq, payload)) = unpack_envelope(&envelope.raw) {
307                actions.push(ChannelAction::MessageReceived {
308                    msgtype,
309                    payload: payload.to_vec(),
310                    sequence: envelope.sequence,
311                });
312            }
313
314            self.next_rx_sequence =
315                ((self.next_rx_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
316
317            // After wrapping to 0, check if 0 is also in the ring
318            if self.next_rx_sequence == 0 {
319                // Continue the loop — it will check front again
320            }
321        }
322
323        actions
324    }
325
326    fn adapt_window_on_delivery(&mut self) {
327        if self.rtt == 0.0 {
328            return;
329        }
330
331        if self.rtt > CHANNEL_RTT_FAST {
332            self.fast_rate_rounds = 0;
333
334            if self.rtt > CHANNEL_RTT_MEDIUM {
335                self.medium_rate_rounds = 0;
336            } else {
337                self.medium_rate_rounds += 1;
338                if self.window_max < CHANNEL_WINDOW_MAX_MEDIUM
339                    && self.medium_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
340                {
341                    self.window_max = CHANNEL_WINDOW_MAX_MEDIUM;
342                    self.window_min = CHANNEL_WINDOW_MIN_LIMIT_MEDIUM;
343                }
344            }
345        } else {
346            self.fast_rate_rounds += 1;
347            if self.window_max < CHANNEL_WINDOW_MAX_FAST
348                && self.fast_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
349            {
350                self.window_max = CHANNEL_WINDOW_MAX_FAST;
351                self.window_min = CHANNEL_WINDOW_MIN_LIMIT_FAST;
352            }
353        }
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use super::*;
360
361    #[test]
362    fn test_new_default() {
363        let ch = Channel::new(0.5);
364        assert_eq!(ch.window, CHANNEL_WINDOW);
365        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_SLOW);
366        assert!(ch.is_ready_to_send());
367    }
368
369    #[test]
370    fn test_new_very_slow() {
371        let ch = Channel::new(2.0);
372        assert_eq!(ch.window, 1);
373        assert_eq!(ch.window_max, 1);
374    }
375
376    #[test]
377    fn test_send_receive() {
378        let mut ch = Channel::new(0.1);
379        let actions = ch.send(0x01, b"hello", 1.0, 500).unwrap();
380        assert_eq!(actions.len(), 1);
381        match &actions[0] {
382            ChannelAction::SendOnLink { raw } => {
383                // Simulate receive on the other side
384                let mut ch2 = Channel::new(0.1);
385                let recv_actions = ch2.receive(raw, 1.1);
386                assert_eq!(recv_actions.len(), 1);
387                match &recv_actions[0] {
388                    ChannelAction::MessageReceived {
389                        msgtype,
390                        payload,
391                        sequence,
392                    } => {
393                        assert_eq!(*msgtype, 0x01);
394                        assert_eq!(payload, b"hello");
395                        assert_eq!(*sequence, 0);
396                    }
397                    _ => panic!("Expected MessageReceived"),
398                }
399            }
400            _ => panic!("Expected SendOnLink"),
401        }
402    }
403
404    #[test]
405    fn test_send_not_ready() {
406        let mut ch = Channel::new(0.1);
407        // Fill the window
408        ch.send(0x01, b"a", 1.0, 500).unwrap();
409        ch.send(0x01, b"b", 1.0, 500).unwrap();
410        // Window = 2, both outstanding
411        assert!(!ch.is_ready_to_send());
412        assert_eq!(ch.send(0x01, b"c", 1.0, 500), Err(ChannelError::NotReady));
413    }
414
415    #[test]
416    fn test_packet_delivered_grows_window() {
417        let mut ch = Channel::new(0.1);
418        ch.send(0x01, b"a", 1.0, 500).unwrap();
419        ch.send(0x01, b"b", 1.0, 500).unwrap();
420
421        assert_eq!(ch.window, 2);
422        ch.packet_delivered(0);
423        assert_eq!(ch.window, 3);
424    }
425
426    #[test]
427    fn test_packet_timeout_shrinks_window() {
428        let mut ch = Channel::new(0.1);
429        ch.send(0x01, b"a", 1.0, 500).unwrap();
430        ch.send(0x01, b"b", 1.0, 500).unwrap();
431
432        // Deliver one to grow window
433        ch.packet_delivered(0);
434        assert_eq!(ch.window, 3);
435
436        // Timeout on seq 1
437        let actions = ch.packet_timeout(1, 2.0);
438        assert_eq!(actions.len(), 1); // resend
439        assert_eq!(ch.window, 2);
440    }
441
442    #[test]
443    fn test_max_retries_teardown() {
444        let mut ch = Channel::new(0.1);
445        ch.send(0x01, b"a", 1.0, 500).unwrap();
446
447        // Time out until max_tries exceeded
448        for i in 0..4 {
449            let actions = ch.packet_timeout(0, 2.0 + i as f64);
450            assert_eq!(actions.len(), 1);
451            match &actions[0] {
452                ChannelAction::SendOnLink { .. } => {}
453                _ => panic!("Expected SendOnLink"),
454            }
455        }
456
457        // One more timeout → teardown
458        let actions = ch.packet_timeout(0, 10.0);
459        assert_eq!(actions.len(), 1);
460        match &actions[0] {
461            ChannelAction::TeardownLink => {}
462            _ => panic!("Expected TeardownLink"),
463        }
464    }
465
466    #[test]
467    fn test_sequence_wrapping() {
468        let mut ch = Channel::new(0.1);
469        ch.next_sequence = CHANNEL_SEQ_MAX;
470
471        ch.send(0x01, b"wrap", 1.0, 500).unwrap();
472        assert_eq!(ch.next_sequence, 0);
473
474        ch.send(0x01, b"after", 1.0, 500).unwrap();
475        assert_eq!(ch.next_sequence, 1);
476    }
477
478    #[test]
479    fn test_out_of_order_buffering() {
480        let mut ch = Channel::new(0.1);
481
482        // Send messages out of order (simulate): sequence 1 arrives before 0
483        let raw0 = pack_envelope(0x01, 0, b"first");
484        let raw1 = pack_envelope(0x01, 1, b"second");
485
486        // Receive seq 1 first
487        let actions = ch.receive(&raw1, 1.0);
488        assert!(actions.is_empty()); // buffered, waiting for 0
489
490        // Receive seq 0
491        let actions = ch.receive(&raw0, 1.1);
492        assert_eq!(actions.len(), 2); // both delivered in order
493        match &actions[0] {
494            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
495            _ => panic!("Expected MessageReceived"),
496        }
497        match &actions[1] {
498            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 1),
499            _ => panic!("Expected MessageReceived"),
500        }
501    }
502
503    #[test]
504    fn test_duplicate_rejection() {
505        let mut ch = Channel::new(0.1);
506        let raw = pack_envelope(0x01, 0, b"hello");
507
508        let actions = ch.receive(&raw, 1.0);
509        assert_eq!(actions.len(), 1);
510
511        // Duplicate
512        let actions = ch.receive(&raw, 1.1);
513        assert!(actions.is_empty());
514    }
515
516    #[test]
517    fn test_get_packet_timeout() {
518        let ch = Channel::new(0.1);
519        let t1 = ch.get_packet_timeout(1);
520        let t2 = ch.get_packet_timeout(2);
521        assert!(t2 > t1); // exponential backoff
522    }
523
524    #[test]
525    fn test_mdu() {
526        let ch = Channel::new(0.1);
527        assert_eq!(ch.mdu(431), 431 - CHANNEL_ENVELOPE_OVERHEAD);
528    }
529
530    #[test]
531    fn test_window_upgrade_fast() {
532        let mut ch = Channel::new(0.05); // fast RTT
533        ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
534
535        // Deliver FAST_RATE_THRESHOLD messages
536        for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
537            ch.send(0x01, b"x", i as f64, 500).unwrap();
538            ch.packet_delivered(i);
539        }
540
541        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_FAST);
542        assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_FAST);
543    }
544
545    #[test]
546    fn test_window_upgrade_medium() {
547        let mut ch = Channel::new(0.5); // medium RTT
548        ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
549
550        for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
551            ch.send(0x01, b"x", i as f64, 500).unwrap();
552            ch.packet_delivered(i);
553        }
554
555        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_MEDIUM);
556        assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM);
557    }
558
559    #[test]
560    fn test_shutdown() {
561        let mut ch = Channel::new(0.1);
562        ch.send(0x01, b"a", 1.0, 500).unwrap();
563        ch.shutdown();
564        assert_eq!(ch.outstanding(), 0);
565    }
566
567    #[test]
568    fn test_message_too_big() {
569        let mut ch = Channel::new(0.1);
570        let big = vec![0u8; 500];
571        // link_mdu = 10, message + header won't fit
572        assert_eq!(
573            ch.send(0x01, &big, 1.0, 10),
574            Err(ChannelError::MessageTooBig)
575        );
576    }
577
578    #[test]
579    fn test_receive_sequence_wrap_at_boundary() {
580        let mut ch = Channel::new(0.1);
581        ch.next_rx_sequence = CHANNEL_SEQ_MAX;
582
583        let raw_max = pack_envelope(0x01, CHANNEL_SEQ_MAX, b"last");
584        let raw_zero = pack_envelope(0x01, 0, b"first_after_wrap");
585
586        let actions = ch.receive(&raw_max, 1.0);
587        assert_eq!(actions.len(), 1);
588        assert_eq!(ch.next_rx_sequence, 0);
589
590        let actions = ch.receive(&raw_zero, 1.1);
591        assert_eq!(actions.len(), 1);
592        match &actions[0] {
593            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
594            _ => panic!("Expected MessageReceived"),
595        }
596    }
597
598    #[test]
599    fn test_receive_wrap_boundary_out_of_order() {
600        // Test that out-of-order messages at the wrap boundary (0xFFFF→0) are sorted correctly.
601        let mut ch = Channel::new(0.1);
602        ch.next_rx_sequence = 0xFFFE;
603
604        let raw_fffe = pack_envelope(0x01, 0xFFFE, b"a");
605        let raw_ffff = pack_envelope(0x01, 0xFFFF, b"b");
606        let raw_0000 = pack_envelope(0x01, 0x0000, b"c");
607
608        // Deliver in reverse order: 0, 0xFFFF, 0xFFFE
609        let actions = ch.receive(&raw_0000, 1.0);
610        assert!(actions.is_empty()); // waiting for 0xFFFE
611
612        let actions = ch.receive(&raw_ffff, 1.1);
613        assert!(actions.is_empty()); // still waiting for 0xFFFE
614
615        let actions = ch.receive(&raw_fffe, 1.2);
616        assert_eq!(actions.len(), 3); // all three delivered in order
617        match &actions[0] {
618            ChannelAction::MessageReceived {
619                sequence, payload, ..
620            } => {
621                assert_eq!(*sequence, 0xFFFE);
622                assert_eq!(payload, b"a");
623            }
624            _ => panic!("Expected MessageReceived"),
625        }
626        match &actions[1] {
627            ChannelAction::MessageReceived {
628                sequence, payload, ..
629            } => {
630                assert_eq!(*sequence, 0xFFFF);
631                assert_eq!(payload, b"b");
632            }
633            _ => panic!("Expected MessageReceived"),
634        }
635        match &actions[2] {
636            ChannelAction::MessageReceived {
637                sequence, payload, ..
638            } => {
639                assert_eq!(*sequence, 0x0000);
640                assert_eq!(payload, b"c");
641            }
642            _ => panic!("Expected MessageReceived"),
643        }
644    }
645
646    #[test]
647    fn test_many_messages_in_order() {
648        let mut sender = Channel::new(0.05);
649        let mut receiver = Channel::new(0.05);
650
651        for i in 0..20u16 {
652            // Deliver previous to make window available
653            if i >= 2 {
654                sender.packet_delivered(i - 2);
655            }
656
657            let actions = sender.send(0x01, &[i as u8], i as f64, 500).unwrap();
658            let raw = match &actions[0] {
659                ChannelAction::SendOnLink { raw } => raw.clone(),
660                _ => panic!("Expected SendOnLink"),
661            };
662
663            let recv_actions = receiver.receive(&raw, i as f64 + 0.1);
664            assert_eq!(recv_actions.len(), 1);
665            match &recv_actions[0] {
666                ChannelAction::MessageReceived {
667                    payload, sequence, ..
668                } => {
669                    assert_eq!(*sequence, i);
670                    assert_eq!(payload, &[i as u8]);
671                }
672                _ => panic!("Expected MessageReceived"),
673            }
674        }
675    }
676}