Skip to main content

rns_core/channel/
mod.rs

1pub mod types;
2pub mod envelope;
3
4use alloc::collections::VecDeque;
5use alloc::vec::Vec;
6
7use crate::constants::{
8    CHANNEL_ENVELOPE_OVERHEAD, CHANNEL_FAST_RATE_THRESHOLD, CHANNEL_MAX_TRIES,
9    CHANNEL_RTT_FAST, CHANNEL_RTT_MEDIUM, CHANNEL_RTT_SLOW,
10    CHANNEL_SEQ_MODULUS, CHANNEL_WINDOW, CHANNEL_WINDOW_FLEXIBILITY, CHANNEL_WINDOW_MAX_FAST,
11    CHANNEL_WINDOW_MAX_MEDIUM, CHANNEL_WINDOW_MAX_SLOW, CHANNEL_WINDOW_MIN,
12    CHANNEL_WINDOW_MIN_LIMIT_FAST, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM,
13};
14#[cfg(test)]
15use crate::constants::CHANNEL_SEQ_MAX;
16
17pub use types::{ChannelAction, ChannelError, MessageType, Sequence};
18
19use envelope::{pack_envelope, unpack_envelope};
20
21/// Internal envelope tracking state.
22struct Envelope {
23    sequence: Sequence,
24    raw: Vec<u8>,
25    tries: u8,
26    sent_at: f64,
27    delivered: bool,
28}
29
30/// Window-based reliable messaging channel.
31///
32/// Follows the action-queue model: `send`/`receive`/`tick` return
33/// `Vec<ChannelAction>`. The caller dispatches actions.
34pub struct Channel {
35    tx_ring: VecDeque<Envelope>,
36    rx_ring: VecDeque<Envelope>,
37    next_sequence: u16,
38    next_rx_sequence: u16,
39    window: u16,
40    window_max: u16,
41    window_min: u16,
42    window_flexibility: u16,
43    fast_rate_rounds: u16,
44    medium_rate_rounds: u16,
45    max_tries: u8,
46    rtt: f64,
47}
48
49impl Channel {
50    /// Create a new Channel with initial RTT.
51    pub fn new(initial_rtt: f64) -> Self {
52        let (window, window_max, window_min, window_flexibility) = if initial_rtt > CHANNEL_RTT_SLOW {
53            (1, 1, 1, 1)
54        } else {
55            (
56                CHANNEL_WINDOW,
57                CHANNEL_WINDOW_MAX_SLOW,
58                CHANNEL_WINDOW_MIN,
59                CHANNEL_WINDOW_FLEXIBILITY,
60            )
61        };
62
63        Channel {
64            tx_ring: VecDeque::new(),
65            rx_ring: VecDeque::new(),
66            next_sequence: 0,
67            next_rx_sequence: 0,
68            window,
69            window_max,
70            window_min,
71            window_flexibility,
72            fast_rate_rounds: 0,
73            medium_rate_rounds: 0,
74            max_tries: CHANNEL_MAX_TRIES,
75            rtt: initial_rtt,
76        }
77    }
78
79    /// Update the RTT value.
80    pub fn set_rtt(&mut self, rtt: f64) {
81        self.rtt = rtt;
82    }
83
84    /// Maximum data unit available for message payload.
85    pub fn mdu(&self, link_mdu: usize) -> usize {
86        let mdu = link_mdu.saturating_sub(CHANNEL_ENVELOPE_OVERHEAD);
87        mdu.min(0xFFFF)
88    }
89
90    /// Check if channel is ready to send (has window capacity).
91    pub fn is_ready_to_send(&self) -> bool {
92        let outstanding = self.tx_ring.iter()
93            .filter(|e| !e.delivered)
94            .count() as u16;
95        outstanding < self.window
96    }
97
98    /// Send a message. Returns `SendOnLink` action with packed envelope.
99    pub fn send(
100        &mut self,
101        msgtype: u16,
102        payload: &[u8],
103        now: f64,
104        link_mdu: usize,
105    ) -> Result<Vec<ChannelAction>, ChannelError> {
106        if !self.is_ready_to_send() {
107            return Err(ChannelError::NotReady);
108        }
109
110        let sequence = self.next_sequence;
111        self.next_sequence = ((self.next_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
112
113        let raw = pack_envelope(msgtype, sequence, payload);
114        if raw.len() > link_mdu {
115            return Err(ChannelError::MessageTooBig);
116        }
117
118        self.tx_ring.push_back(Envelope {
119            sequence,
120            raw: raw.clone(),
121            tries: 1,
122            sent_at: now,
123            delivered: false,
124        });
125
126        Ok(alloc::vec![ChannelAction::SendOnLink { raw }])
127    }
128
129    /// Receive decrypted envelope bytes.
130    ///
131    /// Returns `MessageReceived` for contiguous sequences starting from
132    /// `next_rx_sequence`.
133    pub fn receive(&mut self, raw: &[u8], _now: f64) -> Vec<ChannelAction> {
134        let (_msgtype, sequence, _payload) = match unpack_envelope(raw) {
135            Ok(r) => r,
136            Err(_) => return Vec::new(),
137        };
138
139        // Reject sequences behind our window
140        if self.is_behind_rx_window(sequence) {
141            return Vec::new();
142        }
143
144        // Reject duplicates
145        if self.rx_ring.iter().any(|e| e.sequence == sequence) {
146            return Vec::new();
147        }
148
149        // Emplace in sorted order
150        let envelope = Envelope {
151            sequence,
152            raw: raw.to_vec(),
153            tries: 0,
154            sent_at: 0.0,
155            delivered: false,
156        };
157        self.emplace_rx(envelope);
158
159        // Collect contiguous messages
160        self.collect_contiguous()
161    }
162
163    /// Clear all outstanding TX entries, restoring the window to full capacity.
164    /// Used after holepunch completion where signaling messages are fire-and-forget.
165    pub fn flush_tx(&mut self) {
166        self.tx_ring.clear();
167    }
168
169    /// Notify that a packet with given sequence was delivered (acknowledged).
170    pub fn packet_delivered(&mut self, sequence: Sequence) -> Vec<ChannelAction> {
171        if let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) {
172            self.tx_ring.remove(pos);
173
174            if self.window < self.window_max {
175                self.window += 1;
176            }
177
178            // Adapt window based on RTT
179            self.adapt_window_on_delivery();
180        }
181        Vec::new()
182    }
183
184    /// Notify that a packet with given sequence timed out.
185    pub fn packet_timeout(&mut self, sequence: Sequence, now: f64) -> Vec<ChannelAction> {
186        let pos = match self.tx_ring.iter().position(|e| e.sequence == sequence) {
187            Some(p) => p,
188            None => return Vec::new(),
189        };
190
191        let envelope = &self.tx_ring[pos];
192        if envelope.tries >= self.max_tries {
193            self.tx_ring.clear();
194            self.rx_ring.clear();
195            return alloc::vec![ChannelAction::TeardownLink];
196        }
197
198        // Retry
199        let envelope = &mut self.tx_ring[pos];
200        envelope.tries += 1;
201        envelope.sent_at = now;
202        let raw = envelope.raw.clone();
203
204        // Shrink window (Python nests window_max shrink inside window shrink)
205        if self.window > self.window_min {
206            self.window -= 1;
207            if self.window_max > self.window_min + self.window_flexibility {
208                self.window_max -= 1;
209            }
210        }
211
212        alloc::vec![ChannelAction::SendOnLink { raw }]
213    }
214
215    /// Compute timeout duration for the given try count.
216    ///
217    /// Formula: `1.5^(tries-1) * max(rtt*2.5, 0.025) * (tx_ring.len() + 1.5)`
218    pub fn get_packet_timeout(&self, tries: u8) -> f64 {
219        let base = 1.5_f64.powi((tries as i32) - 1);
220        let rtt_factor = (self.rtt * 2.5).max(0.025);
221        let ring_factor = (self.tx_ring.len() as f64) + 1.5;
222        base * rtt_factor * ring_factor
223    }
224
225    /// Get the current try count for a given sequence.
226    pub fn get_tries(&self, sequence: Sequence) -> Option<u8> {
227        self.tx_ring.iter()
228            .find(|e| e.sequence == sequence)
229            .map(|e| e.tries)
230    }
231
232    /// Shut down the channel, clearing all rings.
233    pub fn shutdown(&mut self) {
234        self.tx_ring.clear();
235        self.rx_ring.clear();
236    }
237
238    /// Current window size.
239    pub fn window(&self) -> u16 {
240        self.window
241    }
242
243    /// Current maximum window size.
244    pub fn window_max(&self) -> u16 {
245        self.window_max
246    }
247
248    /// Number of outstanding (undelivered) envelopes in TX ring.
249    pub fn outstanding(&self) -> usize {
250        self.tx_ring.iter().filter(|e| !e.delivered).count()
251    }
252
253    // --- Internal ---
254
255    fn is_behind_rx_window(&self, sequence: Sequence) -> bool {
256        if sequence < self.next_rx_sequence {
257            let window_overflow = (self.next_rx_sequence as u32 + CHANNEL_WINDOW_MAX_FAST as u32) % CHANNEL_SEQ_MODULUS;
258            let overflow = window_overflow as u16;
259            if overflow < self.next_rx_sequence {
260                // Wrapped around — sequence is valid if > overflow
261                if sequence > overflow {
262                    return true; // actually behind
263                }
264                return false; // valid wrap-around sequence
265            }
266            return true;
267        }
268        false
269    }
270
271    fn emplace_rx(&mut self, envelope: Envelope) {
272        // Use modular distance from next_rx_sequence for correct wrap-boundary ordering.
273        // wrapping_sub gives the unsigned distance in sequence space.
274        let env_dist = envelope.sequence.wrapping_sub(self.next_rx_sequence);
275        let mut i = 0;
276        for existing in self.rx_ring.iter() {
277            if envelope.sequence == existing.sequence {
278                return; // duplicate
279            }
280            let exist_dist = existing.sequence.wrapping_sub(self.next_rx_sequence);
281            if env_dist < exist_dist {
282                self.rx_ring.insert(i, envelope);
283                return;
284            }
285            i += 1;
286        }
287        self.rx_ring.push_back(envelope);
288    }
289
290    fn collect_contiguous(&mut self) -> Vec<ChannelAction> {
291        let mut actions = Vec::new();
292
293        loop {
294            let front_match = self.rx_ring.front()
295                .map(|e| e.sequence == self.next_rx_sequence)
296                .unwrap_or(false);
297
298            if !front_match {
299                break;
300            }
301
302            let envelope = self.rx_ring.pop_front().unwrap();
303
304            // Re-parse the envelope to get payload
305            if let Ok((msgtype, _seq, payload)) = unpack_envelope(&envelope.raw) {
306                actions.push(ChannelAction::MessageReceived {
307                    msgtype,
308                    payload: payload.to_vec(),
309                    sequence: envelope.sequence,
310                });
311            }
312
313            self.next_rx_sequence = ((self.next_rx_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
314
315            // After wrapping to 0, check if 0 is also in the ring
316            if self.next_rx_sequence == 0 {
317                // Continue the loop — it will check front again
318            }
319        }
320
321        actions
322    }
323
324    fn adapt_window_on_delivery(&mut self) {
325        if self.rtt == 0.0 {
326            return;
327        }
328
329        if self.rtt > CHANNEL_RTT_FAST {
330            self.fast_rate_rounds = 0;
331
332            if self.rtt > CHANNEL_RTT_MEDIUM {
333                self.medium_rate_rounds = 0;
334            } else {
335                self.medium_rate_rounds += 1;
336                if self.window_max < CHANNEL_WINDOW_MAX_MEDIUM
337                    && self.medium_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
338                {
339                    self.window_max = CHANNEL_WINDOW_MAX_MEDIUM;
340                    self.window_min = CHANNEL_WINDOW_MIN_LIMIT_MEDIUM;
341                }
342            }
343        } else {
344            self.fast_rate_rounds += 1;
345            if self.window_max < CHANNEL_WINDOW_MAX_FAST
346                && self.fast_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
347            {
348                self.window_max = CHANNEL_WINDOW_MAX_FAST;
349                self.window_min = CHANNEL_WINDOW_MIN_LIMIT_FAST;
350            }
351        }
352    }
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn test_new_default() {
361        let ch = Channel::new(0.5);
362        assert_eq!(ch.window, CHANNEL_WINDOW);
363        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_SLOW);
364        assert!(ch.is_ready_to_send());
365    }
366
367    #[test]
368    fn test_new_very_slow() {
369        let ch = Channel::new(2.0);
370        assert_eq!(ch.window, 1);
371        assert_eq!(ch.window_max, 1);
372    }
373
374    #[test]
375    fn test_send_receive() {
376        let mut ch = Channel::new(0.1);
377        let actions = ch.send(0x01, b"hello", 1.0, 500).unwrap();
378        assert_eq!(actions.len(), 1);
379        match &actions[0] {
380            ChannelAction::SendOnLink { raw } => {
381                // Simulate receive on the other side
382                let mut ch2 = Channel::new(0.1);
383                let recv_actions = ch2.receive(raw, 1.1);
384                assert_eq!(recv_actions.len(), 1);
385                match &recv_actions[0] {
386                    ChannelAction::MessageReceived { msgtype, payload, sequence } => {
387                        assert_eq!(*msgtype, 0x01);
388                        assert_eq!(payload, b"hello");
389                        assert_eq!(*sequence, 0);
390                    }
391                    _ => panic!("Expected MessageReceived"),
392                }
393            }
394            _ => panic!("Expected SendOnLink"),
395        }
396    }
397
398    #[test]
399    fn test_send_not_ready() {
400        let mut ch = Channel::new(0.1);
401        // Fill the window
402        ch.send(0x01, b"a", 1.0, 500).unwrap();
403        ch.send(0x01, b"b", 1.0, 500).unwrap();
404        // Window = 2, both outstanding
405        assert!(!ch.is_ready_to_send());
406        assert_eq!(ch.send(0x01, b"c", 1.0, 500), Err(ChannelError::NotReady));
407    }
408
409    #[test]
410    fn test_packet_delivered_grows_window() {
411        let mut ch = Channel::new(0.1);
412        ch.send(0x01, b"a", 1.0, 500).unwrap();
413        ch.send(0x01, b"b", 1.0, 500).unwrap();
414
415        assert_eq!(ch.window, 2);
416        ch.packet_delivered(0);
417        assert_eq!(ch.window, 3);
418    }
419
420    #[test]
421    fn test_packet_timeout_shrinks_window() {
422        let mut ch = Channel::new(0.1);
423        ch.send(0x01, b"a", 1.0, 500).unwrap();
424        ch.send(0x01, b"b", 1.0, 500).unwrap();
425
426        // Deliver one to grow window
427        ch.packet_delivered(0);
428        assert_eq!(ch.window, 3);
429
430        // Timeout on seq 1
431        let actions = ch.packet_timeout(1, 2.0);
432        assert_eq!(actions.len(), 1); // resend
433        assert_eq!(ch.window, 2);
434    }
435
436    #[test]
437    fn test_max_retries_teardown() {
438        let mut ch = Channel::new(0.1);
439        ch.send(0x01, b"a", 1.0, 500).unwrap();
440
441        // Time out until max_tries exceeded
442        for i in 0..4 {
443            let actions = ch.packet_timeout(0, 2.0 + i as f64);
444            assert_eq!(actions.len(), 1);
445            match &actions[0] {
446                ChannelAction::SendOnLink { .. } => {}
447                _ => panic!("Expected SendOnLink"),
448            }
449        }
450
451        // One more timeout → teardown
452        let actions = ch.packet_timeout(0, 10.0);
453        assert_eq!(actions.len(), 1);
454        match &actions[0] {
455            ChannelAction::TeardownLink => {}
456            _ => panic!("Expected TeardownLink"),
457        }
458    }
459
460    #[test]
461    fn test_sequence_wrapping() {
462        let mut ch = Channel::new(0.1);
463        ch.next_sequence = CHANNEL_SEQ_MAX;
464
465        ch.send(0x01, b"wrap", 1.0, 500).unwrap();
466        assert_eq!(ch.next_sequence, 0);
467
468        ch.send(0x01, b"after", 1.0, 500).unwrap();
469        assert_eq!(ch.next_sequence, 1);
470    }
471
472    #[test]
473    fn test_out_of_order_buffering() {
474        let mut ch = Channel::new(0.1);
475
476        // Send messages out of order (simulate): sequence 1 arrives before 0
477        let raw0 = pack_envelope(0x01, 0, b"first");
478        let raw1 = pack_envelope(0x01, 1, b"second");
479
480        // Receive seq 1 first
481        let actions = ch.receive(&raw1, 1.0);
482        assert!(actions.is_empty()); // buffered, waiting for 0
483
484        // Receive seq 0
485        let actions = ch.receive(&raw0, 1.1);
486        assert_eq!(actions.len(), 2); // both delivered in order
487        match &actions[0] {
488            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
489            _ => panic!("Expected MessageReceived"),
490        }
491        match &actions[1] {
492            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 1),
493            _ => panic!("Expected MessageReceived"),
494        }
495    }
496
497    #[test]
498    fn test_duplicate_rejection() {
499        let mut ch = Channel::new(0.1);
500        let raw = pack_envelope(0x01, 0, b"hello");
501
502        let actions = ch.receive(&raw, 1.0);
503        assert_eq!(actions.len(), 1);
504
505        // Duplicate
506        let actions = ch.receive(&raw, 1.1);
507        assert!(actions.is_empty());
508    }
509
510    #[test]
511    fn test_get_packet_timeout() {
512        let ch = Channel::new(0.1);
513        let t1 = ch.get_packet_timeout(1);
514        let t2 = ch.get_packet_timeout(2);
515        assert!(t2 > t1); // exponential backoff
516    }
517
518    #[test]
519    fn test_mdu() {
520        let ch = Channel::new(0.1);
521        assert_eq!(ch.mdu(431), 431 - CHANNEL_ENVELOPE_OVERHEAD);
522    }
523
524    #[test]
525    fn test_window_upgrade_fast() {
526        let mut ch = Channel::new(0.05); // fast RTT
527        ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
528
529        // Deliver FAST_RATE_THRESHOLD messages
530        for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
531            ch.send(0x01, b"x", i as f64, 500).unwrap();
532            ch.packet_delivered(i);
533        }
534
535        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_FAST);
536        assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_FAST);
537    }
538
539    #[test]
540    fn test_window_upgrade_medium() {
541        let mut ch = Channel::new(0.5); // medium RTT
542        ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
543
544        for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
545            ch.send(0x01, b"x", i as f64, 500).unwrap();
546            ch.packet_delivered(i);
547        }
548
549        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_MEDIUM);
550        assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM);
551    }
552
553    #[test]
554    fn test_shutdown() {
555        let mut ch = Channel::new(0.1);
556        ch.send(0x01, b"a", 1.0, 500).unwrap();
557        ch.shutdown();
558        assert_eq!(ch.outstanding(), 0);
559    }
560
561    #[test]
562    fn test_message_too_big() {
563        let mut ch = Channel::new(0.1);
564        let big = vec![0u8; 500];
565        // link_mdu = 10, message + header won't fit
566        assert_eq!(ch.send(0x01, &big, 1.0, 10), Err(ChannelError::MessageTooBig));
567    }
568
569    #[test]
570    fn test_receive_sequence_wrap_at_boundary() {
571        let mut ch = Channel::new(0.1);
572        ch.next_rx_sequence = CHANNEL_SEQ_MAX;
573
574        let raw_max = pack_envelope(0x01, CHANNEL_SEQ_MAX, b"last");
575        let raw_zero = pack_envelope(0x01, 0, b"first_after_wrap");
576
577        let actions = ch.receive(&raw_max, 1.0);
578        assert_eq!(actions.len(), 1);
579        assert_eq!(ch.next_rx_sequence, 0);
580
581        let actions = ch.receive(&raw_zero, 1.1);
582        assert_eq!(actions.len(), 1);
583        match &actions[0] {
584            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
585            _ => panic!("Expected MessageReceived"),
586        }
587    }
588
589    #[test]
590    fn test_receive_wrap_boundary_out_of_order() {
591        // Test that out-of-order messages at the wrap boundary (0xFFFF→0) are sorted correctly.
592        let mut ch = Channel::new(0.1);
593        ch.next_rx_sequence = 0xFFFE;
594
595        let raw_fffe = pack_envelope(0x01, 0xFFFE, b"a");
596        let raw_ffff = pack_envelope(0x01, 0xFFFF, b"b");
597        let raw_0000 = pack_envelope(0x01, 0x0000, b"c");
598
599        // Deliver in reverse order: 0, 0xFFFF, 0xFFFE
600        let actions = ch.receive(&raw_0000, 1.0);
601        assert!(actions.is_empty()); // waiting for 0xFFFE
602
603        let actions = ch.receive(&raw_ffff, 1.1);
604        assert!(actions.is_empty()); // still waiting for 0xFFFE
605
606        let actions = ch.receive(&raw_fffe, 1.2);
607        assert_eq!(actions.len(), 3); // all three delivered in order
608        match &actions[0] {
609            ChannelAction::MessageReceived { sequence, payload, .. } => {
610                assert_eq!(*sequence, 0xFFFE);
611                assert_eq!(payload, b"a");
612            }
613            _ => panic!("Expected MessageReceived"),
614        }
615        match &actions[1] {
616            ChannelAction::MessageReceived { sequence, payload, .. } => {
617                assert_eq!(*sequence, 0xFFFF);
618                assert_eq!(payload, b"b");
619            }
620            _ => panic!("Expected MessageReceived"),
621        }
622        match &actions[2] {
623            ChannelAction::MessageReceived { sequence, payload, .. } => {
624                assert_eq!(*sequence, 0x0000);
625                assert_eq!(payload, b"c");
626            }
627            _ => panic!("Expected MessageReceived"),
628        }
629    }
630
631    #[test]
632    fn test_many_messages_in_order() {
633        let mut sender = Channel::new(0.05);
634        let mut receiver = Channel::new(0.05);
635
636        for i in 0..20u16 {
637            // Deliver previous to make window available
638            if i >= 2 {
639                sender.packet_delivered(i - 2);
640            }
641
642            let actions = sender.send(0x01, &[i as u8], i as f64, 500).unwrap();
643            let raw = match &actions[0] {
644                ChannelAction::SendOnLink { raw } => raw.clone(),
645                _ => panic!("Expected SendOnLink"),
646            };
647
648            let recv_actions = receiver.receive(&raw, i as f64 + 0.1);
649            assert_eq!(recv_actions.len(), 1);
650            match &recv_actions[0] {
651                ChannelAction::MessageReceived { payload, sequence, .. } => {
652                    assert_eq!(*sequence, i);
653                    assert_eq!(payload, &[i as u8]);
654                }
655                _ => panic!("Expected MessageReceived"),
656            }
657        }
658    }
659}