Skip to main content

rns_core/channel/
mod.rs

1pub mod envelope;
2pub mod types;
3
4use alloc::collections::VecDeque;
5use alloc::vec::Vec;
6
7#[cfg(test)]
8use crate::constants::CHANNEL_SEQ_MAX;
9use crate::constants::{
10    CHANNEL_ENVELOPE_OVERHEAD, CHANNEL_FAST_RATE_THRESHOLD, CHANNEL_MAX_TRIES, CHANNEL_RTT_FAST,
11    CHANNEL_RTT_MEDIUM, CHANNEL_RTT_SLOW, CHANNEL_SEQ_MODULUS, CHANNEL_WINDOW,
12    CHANNEL_WINDOW_FLEXIBILITY, CHANNEL_WINDOW_MAX_FAST, CHANNEL_WINDOW_MAX_MEDIUM,
13    CHANNEL_WINDOW_MAX_SLOW, CHANNEL_WINDOW_MIN, CHANNEL_WINDOW_MIN_LIMIT_FAST,
14    CHANNEL_WINDOW_MIN_LIMIT_MEDIUM,
15};
16
17pub use types::{ChannelAction, ChannelError, MessageType, Sequence};
18
19use envelope::{pack_envelope, unpack_envelope};
20
21/// Internal envelope tracking state.
22struct Envelope {
23    sequence: Sequence,
24    raw: Vec<u8>,
25    tries: u8,
26    sent_at: f64,
27    delivered: bool,
28}
29
30/// Window-based reliable messaging channel.
31///
32/// Follows the action-queue model: `send`/`receive`/`tick` return
33/// `Vec<ChannelAction>`. The caller dispatches actions.
34pub struct Channel {
35    tx_ring: VecDeque<Envelope>,
36    rx_ring: VecDeque<Envelope>,
37    next_sequence: u16,
38    next_rx_sequence: u16,
39    window: u16,
40    window_max: u16,
41    window_min: u16,
42    window_flexibility: u16,
43    fast_rate_rounds: u16,
44    medium_rate_rounds: u16,
45    max_tries: u8,
46    rtt: f64,
47}
48
49impl Channel {
50    /// Create a new Channel with initial RTT.
51    pub fn new(initial_rtt: f64) -> Self {
52        let (window, window_max, window_min, window_flexibility) = if initial_rtt > CHANNEL_RTT_SLOW
53        {
54            (1, 1, 1, 1)
55        } else {
56            (
57                CHANNEL_WINDOW,
58                CHANNEL_WINDOW_MAX_SLOW,
59                CHANNEL_WINDOW_MIN,
60                CHANNEL_WINDOW_FLEXIBILITY,
61            )
62        };
63
64        Channel {
65            tx_ring: VecDeque::new(),
66            rx_ring: VecDeque::new(),
67            next_sequence: 0,
68            next_rx_sequence: 0,
69            window,
70            window_max,
71            window_min,
72            window_flexibility,
73            fast_rate_rounds: 0,
74            medium_rate_rounds: 0,
75            max_tries: CHANNEL_MAX_TRIES,
76            rtt: initial_rtt,
77        }
78    }
79
80    /// Update the RTT value.
81    pub fn set_rtt(&mut self, rtt: f64) {
82        self.rtt = rtt;
83    }
84
85    /// Maximum data unit available for message payload.
86    pub fn mdu(&self, link_mdu: usize) -> usize {
87        let mdu = link_mdu.saturating_sub(CHANNEL_ENVELOPE_OVERHEAD);
88        mdu.min(0xFFFF)
89    }
90
91    /// Check if channel is ready to send (has window capacity).
92    pub fn is_ready_to_send(&self) -> bool {
93        let outstanding = self.tx_ring.iter().filter(|e| !e.delivered).count() as u16;
94        outstanding < self.window
95    }
96
97    /// Send a message. Returns `SendOnLink` action with packed envelope.
98    pub fn send(
99        &mut self,
100        msgtype: u16,
101        payload: &[u8],
102        now: f64,
103        link_mdu: usize,
104    ) -> Result<Vec<ChannelAction>, ChannelError> {
105        if !self.is_ready_to_send() {
106            return Err(ChannelError::NotReady);
107        }
108
109        let sequence = self.next_sequence;
110        self.next_sequence = ((self.next_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
111
112        let raw = pack_envelope(msgtype, sequence, payload);
113        if raw.len() > link_mdu {
114            return Err(ChannelError::MessageTooBig);
115        }
116
117        self.tx_ring.push_back(Envelope {
118            sequence,
119            raw: raw.clone(),
120            tries: 1,
121            sent_at: now,
122            delivered: false,
123        });
124
125        Ok(alloc::vec![ChannelAction::SendOnLink { raw }])
126    }
127
128    /// Receive decrypted envelope bytes.
129    ///
130    /// Returns `MessageReceived` for contiguous sequences starting from
131    /// `next_rx_sequence`.
132    pub fn receive(&mut self, raw: &[u8], _now: f64) -> Vec<ChannelAction> {
133        let (_msgtype, sequence, _payload) = match unpack_envelope(raw) {
134            Ok(r) => r,
135            Err(_) => return Vec::new(),
136        };
137
138        // Reject sequences behind our window
139        if self.is_behind_rx_window(sequence) {
140            return Vec::new();
141        }
142
143        // Reject duplicates
144        if self.rx_ring.iter().any(|e| e.sequence == sequence) {
145            return Vec::new();
146        }
147
148        // Emplace in sorted order
149        let envelope = Envelope {
150            sequence,
151            raw: raw.to_vec(),
152            tries: 0,
153            sent_at: 0.0,
154            delivered: false,
155        };
156        self.emplace_rx(envelope);
157
158        // Collect contiguous messages
159        self.collect_contiguous()
160    }
161
162    /// Clear all outstanding TX entries, restoring the window to full capacity.
163    /// Used after holepunch completion where signaling messages are fire-and-forget.
164    pub fn flush_tx(&mut self) {
165        self.tx_ring.clear();
166    }
167
168    /// Notify that a packet with given sequence was delivered (acknowledged).
169    pub fn packet_delivered(&mut self, sequence: Sequence) -> Vec<ChannelAction> {
170        if let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) {
171            self.tx_ring.remove(pos);
172
173            if self.window < self.window_max {
174                self.window += 1;
175            }
176
177            // Adapt window based on RTT
178            self.adapt_window_on_delivery();
179        }
180        Vec::new()
181    }
182
183    /// Notify that a packet with given sequence timed out.
184    pub fn packet_timeout(&mut self, sequence: Sequence, now: f64) -> Vec<ChannelAction> {
185        let pos = match self.tx_ring.iter().position(|e| e.sequence == sequence) {
186            Some(p) => p,
187            None => return Vec::new(),
188        };
189
190        let envelope = &self.tx_ring[pos];
191        if envelope.tries >= self.max_tries {
192            self.tx_ring.clear();
193            self.rx_ring.clear();
194            return alloc::vec![ChannelAction::TeardownLink];
195        }
196
197        // Retry
198        let envelope = &mut self.tx_ring[pos];
199        envelope.tries += 1;
200        envelope.sent_at = now;
201        let raw = envelope.raw.clone();
202
203        // Shrink window (Python nests window_max shrink inside window shrink)
204        if self.window > self.window_min {
205            self.window -= 1;
206            if self.window_max > self.window_min + self.window_flexibility {
207                self.window_max -= 1;
208            }
209        }
210
211        alloc::vec![ChannelAction::SendOnLink { raw }]
212    }
213
214    /// Compute timeout duration for the given try count.
215    ///
216    /// Formula: `1.5^(tries-1) * max(rtt*2.5, 0.025) * (tx_ring.len() + 1.5)`
217    pub fn get_packet_timeout(&self, tries: u8) -> f64 {
218        let base = 1.5_f64.powi((tries as i32) - 1);
219        let rtt_factor = (self.rtt * 2.5).max(0.025);
220        let ring_factor = (self.tx_ring.len() as f64) + 1.5;
221        base * rtt_factor * ring_factor
222    }
223
224    /// Get the current try count for a given sequence.
225    pub fn get_tries(&self, sequence: Sequence) -> Option<u8> {
226        self.tx_ring
227            .iter()
228            .find(|e| e.sequence == sequence)
229            .map(|e| e.tries)
230    }
231
232    /// Shut down the channel, clearing all rings.
233    pub fn shutdown(&mut self) {
234        self.tx_ring.clear();
235        self.rx_ring.clear();
236    }
237
238    /// Current window size.
239    pub fn window(&self) -> u16 {
240        self.window
241    }
242
243    /// Current maximum window size.
244    pub fn window_max(&self) -> u16 {
245        self.window_max
246    }
247
248    /// Number of outstanding (undelivered) envelopes in TX ring.
249    pub fn outstanding(&self) -> usize {
250        self.tx_ring.iter().filter(|e| !e.delivered).count()
251    }
252
253    // --- Internal ---
254
255    fn is_behind_rx_window(&self, sequence: Sequence) -> bool {
256        if sequence < self.next_rx_sequence {
257            let window_overflow = (self.next_rx_sequence as u32 + CHANNEL_WINDOW_MAX_FAST as u32)
258                % CHANNEL_SEQ_MODULUS;
259            let overflow = window_overflow as u16;
260            if overflow < self.next_rx_sequence {
261                // Wrapped around — sequence is valid if > overflow
262                if sequence > overflow {
263                    return true; // actually behind
264                }
265                return false; // valid wrap-around sequence
266            }
267            return true;
268        }
269        false
270    }
271
272    fn emplace_rx(&mut self, envelope: Envelope) {
273        // Use modular distance from next_rx_sequence for correct wrap-boundary ordering.
274        // wrapping_sub gives the unsigned distance in sequence space.
275        let env_dist = envelope.sequence.wrapping_sub(self.next_rx_sequence);
276        let mut i = 0;
277        for existing in self.rx_ring.iter() {
278            if envelope.sequence == existing.sequence {
279                return; // duplicate
280            }
281            let exist_dist = existing.sequence.wrapping_sub(self.next_rx_sequence);
282            if env_dist < exist_dist {
283                self.rx_ring.insert(i, envelope);
284                return;
285            }
286            i += 1;
287        }
288        self.rx_ring.push_back(envelope);
289    }
290
291    fn collect_contiguous(&mut self) -> Vec<ChannelAction> {
292        let mut actions = Vec::new();
293
294        loop {
295            let front_match = self
296                .rx_ring
297                .front()
298                .map(|e| e.sequence == self.next_rx_sequence)
299                .unwrap_or(false);
300
301            if !front_match {
302                break;
303            }
304
305            let envelope = self.rx_ring.pop_front().unwrap();
306
307            // Re-parse the envelope to get payload
308            if let Ok((msgtype, _seq, payload)) = unpack_envelope(&envelope.raw) {
309                actions.push(ChannelAction::MessageReceived {
310                    msgtype,
311                    payload: payload.to_vec(),
312                    sequence: envelope.sequence,
313                });
314            }
315
316            self.next_rx_sequence =
317                ((self.next_rx_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
318
319            // After wrapping to 0, check if 0 is also in the ring
320            if self.next_rx_sequence == 0 {
321                // Continue the loop — it will check front again
322            }
323        }
324
325        actions
326    }
327
328    fn adapt_window_on_delivery(&mut self) {
329        if self.rtt == 0.0 {
330            return;
331        }
332
333        if self.rtt > CHANNEL_RTT_FAST {
334            self.fast_rate_rounds = 0;
335
336            if self.rtt > CHANNEL_RTT_MEDIUM {
337                self.medium_rate_rounds = 0;
338            } else {
339                self.medium_rate_rounds += 1;
340                if self.window_max < CHANNEL_WINDOW_MAX_MEDIUM
341                    && self.medium_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
342                {
343                    self.window_max = CHANNEL_WINDOW_MAX_MEDIUM;
344                    self.window_min = CHANNEL_WINDOW_MIN_LIMIT_MEDIUM;
345                }
346            }
347        } else {
348            self.fast_rate_rounds += 1;
349            if self.window_max < CHANNEL_WINDOW_MAX_FAST
350                && self.fast_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
351            {
352                self.window_max = CHANNEL_WINDOW_MAX_FAST;
353                self.window_min = CHANNEL_WINDOW_MIN_LIMIT_FAST;
354            }
355        }
356    }
357}
358
359#[cfg(test)]
360mod tests {
361    use super::*;
362
363    #[test]
364    fn test_new_default() {
365        let ch = Channel::new(0.5);
366        assert_eq!(ch.window, CHANNEL_WINDOW);
367        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_SLOW);
368        assert!(ch.is_ready_to_send());
369    }
370
371    #[test]
372    fn test_new_very_slow() {
373        let ch = Channel::new(2.0);
374        assert_eq!(ch.window, 1);
375        assert_eq!(ch.window_max, 1);
376    }
377
378    #[test]
379    fn test_send_receive() {
380        let mut ch = Channel::new(0.1);
381        let actions = ch.send(0x01, b"hello", 1.0, 500).unwrap();
382        assert_eq!(actions.len(), 1);
383        match &actions[0] {
384            ChannelAction::SendOnLink { raw } => {
385                // Simulate receive on the other side
386                let mut ch2 = Channel::new(0.1);
387                let recv_actions = ch2.receive(raw, 1.1);
388                assert_eq!(recv_actions.len(), 1);
389                match &recv_actions[0] {
390                    ChannelAction::MessageReceived {
391                        msgtype,
392                        payload,
393                        sequence,
394                    } => {
395                        assert_eq!(*msgtype, 0x01);
396                        assert_eq!(payload, b"hello");
397                        assert_eq!(*sequence, 0);
398                    }
399                    _ => panic!("Expected MessageReceived"),
400                }
401            }
402            _ => panic!("Expected SendOnLink"),
403        }
404    }
405
406    #[test]
407    fn test_send_not_ready() {
408        let mut ch = Channel::new(0.1);
409        // Fill the window
410        ch.send(0x01, b"a", 1.0, 500).unwrap();
411        ch.send(0x01, b"b", 1.0, 500).unwrap();
412        // Window = 2, both outstanding
413        assert!(!ch.is_ready_to_send());
414        assert_eq!(ch.send(0x01, b"c", 1.0, 500), Err(ChannelError::NotReady));
415    }
416
417    #[test]
418    fn test_packet_delivered_grows_window() {
419        let mut ch = Channel::new(0.1);
420        ch.send(0x01, b"a", 1.0, 500).unwrap();
421        ch.send(0x01, b"b", 1.0, 500).unwrap();
422
423        assert_eq!(ch.window, 2);
424        ch.packet_delivered(0);
425        assert_eq!(ch.window, 3);
426    }
427
428    #[test]
429    fn test_packet_timeout_shrinks_window() {
430        let mut ch = Channel::new(0.1);
431        ch.send(0x01, b"a", 1.0, 500).unwrap();
432        ch.send(0x01, b"b", 1.0, 500).unwrap();
433
434        // Deliver one to grow window
435        ch.packet_delivered(0);
436        assert_eq!(ch.window, 3);
437
438        // Timeout on seq 1
439        let actions = ch.packet_timeout(1, 2.0);
440        assert_eq!(actions.len(), 1); // resend
441        assert_eq!(ch.window, 2);
442    }
443
444    #[test]
445    fn test_max_retries_teardown() {
446        let mut ch = Channel::new(0.1);
447        ch.send(0x01, b"a", 1.0, 500).unwrap();
448
449        // Time out until max_tries exceeded
450        for i in 0..4 {
451            let actions = ch.packet_timeout(0, 2.0 + i as f64);
452            assert_eq!(actions.len(), 1);
453            match &actions[0] {
454                ChannelAction::SendOnLink { .. } => {}
455                _ => panic!("Expected SendOnLink"),
456            }
457        }
458
459        // One more timeout → teardown
460        let actions = ch.packet_timeout(0, 10.0);
461        assert_eq!(actions.len(), 1);
462        match &actions[0] {
463            ChannelAction::TeardownLink => {}
464            _ => panic!("Expected TeardownLink"),
465        }
466    }
467
468    #[test]
469    fn test_sequence_wrapping() {
470        let mut ch = Channel::new(0.1);
471        ch.next_sequence = CHANNEL_SEQ_MAX;
472
473        ch.send(0x01, b"wrap", 1.0, 500).unwrap();
474        assert_eq!(ch.next_sequence, 0);
475
476        ch.send(0x01, b"after", 1.0, 500).unwrap();
477        assert_eq!(ch.next_sequence, 1);
478    }
479
480    #[test]
481    fn test_out_of_order_buffering() {
482        let mut ch = Channel::new(0.1);
483
484        // Send messages out of order (simulate): sequence 1 arrives before 0
485        let raw0 = pack_envelope(0x01, 0, b"first");
486        let raw1 = pack_envelope(0x01, 1, b"second");
487
488        // Receive seq 1 first
489        let actions = ch.receive(&raw1, 1.0);
490        assert!(actions.is_empty()); // buffered, waiting for 0
491
492        // Receive seq 0
493        let actions = ch.receive(&raw0, 1.1);
494        assert_eq!(actions.len(), 2); // both delivered in order
495        match &actions[0] {
496            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
497            _ => panic!("Expected MessageReceived"),
498        }
499        match &actions[1] {
500            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 1),
501            _ => panic!("Expected MessageReceived"),
502        }
503    }
504
505    #[test]
506    fn test_duplicate_rejection() {
507        let mut ch = Channel::new(0.1);
508        let raw = pack_envelope(0x01, 0, b"hello");
509
510        let actions = ch.receive(&raw, 1.0);
511        assert_eq!(actions.len(), 1);
512
513        // Duplicate
514        let actions = ch.receive(&raw, 1.1);
515        assert!(actions.is_empty());
516    }
517
518    #[test]
519    fn test_get_packet_timeout() {
520        let ch = Channel::new(0.1);
521        let t1 = ch.get_packet_timeout(1);
522        let t2 = ch.get_packet_timeout(2);
523        assert!(t2 > t1); // exponential backoff
524    }
525
526    #[test]
527    fn test_mdu() {
528        let ch = Channel::new(0.1);
529        assert_eq!(ch.mdu(431), 431 - CHANNEL_ENVELOPE_OVERHEAD);
530    }
531
532    #[test]
533    fn test_window_upgrade_fast() {
534        let mut ch = Channel::new(0.05); // fast RTT
535        ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
536
537        // Deliver FAST_RATE_THRESHOLD messages
538        for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
539            ch.send(0x01, b"x", i as f64, 500).unwrap();
540            ch.packet_delivered(i);
541        }
542
543        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_FAST);
544        assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_FAST);
545    }
546
547    #[test]
548    fn test_window_upgrade_medium() {
549        let mut ch = Channel::new(0.5); // medium RTT
550        ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
551
552        for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
553            ch.send(0x01, b"x", i as f64, 500).unwrap();
554            ch.packet_delivered(i);
555        }
556
557        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_MEDIUM);
558        assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM);
559    }
560
561    #[test]
562    fn test_shutdown() {
563        let mut ch = Channel::new(0.1);
564        ch.send(0x01, b"a", 1.0, 500).unwrap();
565        ch.shutdown();
566        assert_eq!(ch.outstanding(), 0);
567    }
568
569    #[test]
570    fn test_message_too_big() {
571        let mut ch = Channel::new(0.1);
572        let big = vec![0u8; 500];
573        // link_mdu = 10, message + header won't fit
574        assert_eq!(
575            ch.send(0x01, &big, 1.0, 10),
576            Err(ChannelError::MessageTooBig)
577        );
578    }
579
580    #[test]
581    fn test_receive_sequence_wrap_at_boundary() {
582        let mut ch = Channel::new(0.1);
583        ch.next_rx_sequence = CHANNEL_SEQ_MAX;
584
585        let raw_max = pack_envelope(0x01, CHANNEL_SEQ_MAX, b"last");
586        let raw_zero = pack_envelope(0x01, 0, b"first_after_wrap");
587
588        let actions = ch.receive(&raw_max, 1.0);
589        assert_eq!(actions.len(), 1);
590        assert_eq!(ch.next_rx_sequence, 0);
591
592        let actions = ch.receive(&raw_zero, 1.1);
593        assert_eq!(actions.len(), 1);
594        match &actions[0] {
595            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
596            _ => panic!("Expected MessageReceived"),
597        }
598    }
599
600    #[test]
601    fn test_receive_wrap_boundary_out_of_order() {
602        // Test that out-of-order messages at the wrap boundary (0xFFFF→0) are sorted correctly.
603        let mut ch = Channel::new(0.1);
604        ch.next_rx_sequence = 0xFFFE;
605
606        let raw_fffe = pack_envelope(0x01, 0xFFFE, b"a");
607        let raw_ffff = pack_envelope(0x01, 0xFFFF, b"b");
608        let raw_0000 = pack_envelope(0x01, 0x0000, b"c");
609
610        // Deliver in reverse order: 0, 0xFFFF, 0xFFFE
611        let actions = ch.receive(&raw_0000, 1.0);
612        assert!(actions.is_empty()); // waiting for 0xFFFE
613
614        let actions = ch.receive(&raw_ffff, 1.1);
615        assert!(actions.is_empty()); // still waiting for 0xFFFE
616
617        let actions = ch.receive(&raw_fffe, 1.2);
618        assert_eq!(actions.len(), 3); // all three delivered in order
619        match &actions[0] {
620            ChannelAction::MessageReceived {
621                sequence, payload, ..
622            } => {
623                assert_eq!(*sequence, 0xFFFE);
624                assert_eq!(payload, b"a");
625            }
626            _ => panic!("Expected MessageReceived"),
627        }
628        match &actions[1] {
629            ChannelAction::MessageReceived {
630                sequence, payload, ..
631            } => {
632                assert_eq!(*sequence, 0xFFFF);
633                assert_eq!(payload, b"b");
634            }
635            _ => panic!("Expected MessageReceived"),
636        }
637        match &actions[2] {
638            ChannelAction::MessageReceived {
639                sequence, payload, ..
640            } => {
641                assert_eq!(*sequence, 0x0000);
642                assert_eq!(payload, b"c");
643            }
644            _ => panic!("Expected MessageReceived"),
645        }
646    }
647
648    #[test]
649    fn test_many_messages_in_order() {
650        let mut sender = Channel::new(0.05);
651        let mut receiver = Channel::new(0.05);
652
653        for i in 0..20u16 {
654            // Deliver previous to make window available
655            if i >= 2 {
656                sender.packet_delivered(i - 2);
657            }
658
659            let actions = sender.send(0x01, &[i as u8], i as f64, 500).unwrap();
660            let raw = match &actions[0] {
661                ChannelAction::SendOnLink { raw } => raw.clone(),
662                _ => panic!("Expected SendOnLink"),
663            };
664
665            let recv_actions = receiver.receive(&raw, i as f64 + 0.1);
666            assert_eq!(recv_actions.len(), 1);
667            match &recv_actions[0] {
668                ChannelAction::MessageReceived {
669                    payload, sequence, ..
670                } => {
671                    assert_eq!(*sequence, i);
672                    assert_eq!(payload, &[i as u8]);
673                }
674                _ => panic!("Expected MessageReceived"),
675            }
676        }
677    }
678}