Skip to main content

rns_core/channel/
mod.rs

1pub mod types;
2pub mod envelope;
3
4use alloc::collections::VecDeque;
5use alloc::vec::Vec;
6
7use crate::constants::{
8    CHANNEL_ENVELOPE_OVERHEAD, CHANNEL_FAST_RATE_THRESHOLD, CHANNEL_MAX_TRIES,
9    CHANNEL_RTT_FAST, CHANNEL_RTT_MEDIUM, CHANNEL_RTT_SLOW,
10    CHANNEL_SEQ_MODULUS, CHANNEL_WINDOW, CHANNEL_WINDOW_FLEXIBILITY, CHANNEL_WINDOW_MAX_FAST,
11    CHANNEL_WINDOW_MAX_MEDIUM, CHANNEL_WINDOW_MAX_SLOW, CHANNEL_WINDOW_MIN,
12    CHANNEL_WINDOW_MIN_LIMIT_FAST, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM,
13};
14#[cfg(test)]
15use crate::constants::CHANNEL_SEQ_MAX;
16
17pub use types::{ChannelAction, ChannelError, MessageType, Sequence};
18
19use envelope::{pack_envelope, unpack_envelope};
20
21/// Internal envelope tracking state.
22struct Envelope {
23    sequence: Sequence,
24    msgtype: MessageType,
25    raw: Vec<u8>,
26    tries: u8,
27    sent_at: f64,
28    delivered: bool,
29}
30
31/// Window-based reliable messaging channel.
32///
33/// Follows the action-queue model: `send`/`receive`/`tick` return
34/// `Vec<ChannelAction>`. The caller dispatches actions.
35pub struct Channel {
36    tx_ring: VecDeque<Envelope>,
37    rx_ring: VecDeque<Envelope>,
38    next_sequence: u16,
39    next_rx_sequence: u16,
40    window: u16,
41    window_max: u16,
42    window_min: u16,
43    window_flexibility: u16,
44    fast_rate_rounds: u16,
45    medium_rate_rounds: u16,
46    max_tries: u8,
47    rtt: f64,
48}
49
50impl Channel {
51    /// Create a new Channel with initial RTT.
52    pub fn new(initial_rtt: f64) -> Self {
53        let (window, window_max, window_min, window_flexibility) = if initial_rtt > CHANNEL_RTT_SLOW {
54            (1, 1, 1, 1)
55        } else {
56            (
57                CHANNEL_WINDOW,
58                CHANNEL_WINDOW_MAX_SLOW,
59                CHANNEL_WINDOW_MIN,
60                CHANNEL_WINDOW_FLEXIBILITY,
61            )
62        };
63
64        Channel {
65            tx_ring: VecDeque::new(),
66            rx_ring: VecDeque::new(),
67            next_sequence: 0,
68            next_rx_sequence: 0,
69            window,
70            window_max,
71            window_min,
72            window_flexibility,
73            fast_rate_rounds: 0,
74            medium_rate_rounds: 0,
75            max_tries: CHANNEL_MAX_TRIES,
76            rtt: initial_rtt,
77        }
78    }
79
80    /// Update the RTT value.
81    pub fn set_rtt(&mut self, rtt: f64) {
82        self.rtt = rtt;
83    }
84
85    /// Maximum data unit available for message payload.
86    pub fn mdu(&self, link_mdu: usize) -> usize {
87        let mdu = link_mdu.saturating_sub(CHANNEL_ENVELOPE_OVERHEAD);
88        mdu.min(0xFFFF)
89    }
90
91    /// Check if channel is ready to send (has window capacity).
92    pub fn is_ready_to_send(&self) -> bool {
93        let outstanding = self.tx_ring.iter()
94            .filter(|e| !e.delivered)
95            .count() as u16;
96        outstanding < self.window
97    }
98
99    /// Send a message. Returns `SendOnLink` action with packed envelope.
100    pub fn send(
101        &mut self,
102        msgtype: u16,
103        payload: &[u8],
104        now: f64,
105        link_mdu: usize,
106    ) -> Result<Vec<ChannelAction>, ChannelError> {
107        if !self.is_ready_to_send() {
108            return Err(ChannelError::NotReady);
109        }
110
111        let sequence = self.next_sequence;
112        self.next_sequence = ((self.next_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
113
114        let raw = pack_envelope(msgtype, sequence, payload);
115        if raw.len() > link_mdu {
116            return Err(ChannelError::MessageTooBig);
117        }
118
119        self.tx_ring.push_back(Envelope {
120            sequence,
121            msgtype,
122            raw: raw.clone(),
123            tries: 1,
124            sent_at: now,
125            delivered: false,
126        });
127
128        Ok(alloc::vec![ChannelAction::SendOnLink { raw }])
129    }
130
131    /// Receive decrypted envelope bytes.
132    ///
133    /// Returns `MessageReceived` for contiguous sequences starting from
134    /// `next_rx_sequence`.
135    pub fn receive(&mut self, raw: &[u8], _now: f64) -> Vec<ChannelAction> {
136        let (msgtype, sequence, _payload) = match unpack_envelope(raw) {
137            Ok(r) => r,
138            Err(_) => return Vec::new(),
139        };
140
141        // Reject sequences behind our window
142        if self.is_behind_rx_window(sequence) {
143            return Vec::new();
144        }
145
146        // Reject duplicates
147        if self.rx_ring.iter().any(|e| e.sequence == sequence) {
148            return Vec::new();
149        }
150
151        // Emplace in sorted order
152        let envelope = Envelope {
153            sequence,
154            msgtype,
155            raw: raw.to_vec(),
156            tries: 0,
157            sent_at: 0.0,
158            delivered: false,
159        };
160        self.emplace_rx(envelope);
161
162        // Collect contiguous messages
163        self.collect_contiguous()
164    }
165
166    /// Notify that a packet with given sequence was delivered (acknowledged).
167    pub fn packet_delivered(&mut self, sequence: Sequence) -> Vec<ChannelAction> {
168        if let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) {
169            self.tx_ring.remove(pos);
170
171            if self.window < self.window_max {
172                self.window += 1;
173            }
174
175            // Adapt window based on RTT
176            self.adapt_window_on_delivery();
177        }
178        Vec::new()
179    }
180
181    /// Notify that a packet with given sequence timed out.
182    pub fn packet_timeout(&mut self, sequence: Sequence, now: f64) -> Vec<ChannelAction> {
183        let pos = match self.tx_ring.iter().position(|e| e.sequence == sequence) {
184            Some(p) => p,
185            None => return Vec::new(),
186        };
187
188        let envelope = &self.tx_ring[pos];
189        if envelope.tries >= self.max_tries {
190            self.tx_ring.clear();
191            self.rx_ring.clear();
192            return alloc::vec![ChannelAction::TeardownLink];
193        }
194
195        // Retry
196        let envelope = &mut self.tx_ring[pos];
197        envelope.tries += 1;
198        envelope.sent_at = now;
199        let raw = envelope.raw.clone();
200
201        // Shrink window (Python nests window_max shrink inside window shrink)
202        if self.window > self.window_min {
203            self.window -= 1;
204            if self.window_max > self.window_min + self.window_flexibility {
205                self.window_max -= 1;
206            }
207        }
208
209        alloc::vec![ChannelAction::SendOnLink { raw }]
210    }
211
212    /// Compute timeout duration for the given try count.
213    ///
214    /// Formula: `1.5^(tries-1) * max(rtt*2.5, 0.025) * (tx_ring.len() + 1.5)`
215    pub fn get_packet_timeout(&self, tries: u8) -> f64 {
216        let base = 1.5_f64.powi((tries as i32) - 1);
217        let rtt_factor = (self.rtt * 2.5).max(0.025);
218        let ring_factor = (self.tx_ring.len() as f64) + 1.5;
219        base * rtt_factor * ring_factor
220    }
221
222    /// Get the current try count for a given sequence.
223    pub fn get_tries(&self, sequence: Sequence) -> Option<u8> {
224        self.tx_ring.iter()
225            .find(|e| e.sequence == sequence)
226            .map(|e| e.tries)
227    }
228
229    /// Shut down the channel, clearing all rings.
230    pub fn shutdown(&mut self) {
231        self.tx_ring.clear();
232        self.rx_ring.clear();
233    }
234
235    /// Current window size.
236    pub fn window(&self) -> u16 {
237        self.window
238    }
239
240    /// Current maximum window size.
241    pub fn window_max(&self) -> u16 {
242        self.window_max
243    }
244
245    /// Number of outstanding (undelivered) envelopes in TX ring.
246    pub fn outstanding(&self) -> usize {
247        self.tx_ring.iter().filter(|e| !e.delivered).count()
248    }
249
250    // --- Internal ---
251
252    fn is_behind_rx_window(&self, sequence: Sequence) -> bool {
253        if sequence < self.next_rx_sequence {
254            let window_overflow = (self.next_rx_sequence as u32 + CHANNEL_WINDOW_MAX_FAST as u32) % CHANNEL_SEQ_MODULUS;
255            let overflow = window_overflow as u16;
256            if overflow < self.next_rx_sequence {
257                // Wrapped around — sequence is valid if > overflow
258                if sequence > overflow {
259                    return true; // actually behind
260                }
261                return false; // valid wrap-around sequence
262            }
263            return true;
264        }
265        false
266    }
267
268    fn emplace_rx(&mut self, envelope: Envelope) {
269        // Use modular distance from next_rx_sequence for correct wrap-boundary ordering.
270        // wrapping_sub gives the unsigned distance in sequence space.
271        let env_dist = envelope.sequence.wrapping_sub(self.next_rx_sequence);
272        let mut i = 0;
273        for existing in self.rx_ring.iter() {
274            if envelope.sequence == existing.sequence {
275                return; // duplicate
276            }
277            let exist_dist = existing.sequence.wrapping_sub(self.next_rx_sequence);
278            if env_dist < exist_dist {
279                self.rx_ring.insert(i, envelope);
280                return;
281            }
282            i += 1;
283        }
284        self.rx_ring.push_back(envelope);
285    }
286
287    fn collect_contiguous(&mut self) -> Vec<ChannelAction> {
288        let mut actions = Vec::new();
289
290        loop {
291            let front_match = self.rx_ring.front()
292                .map(|e| e.sequence == self.next_rx_sequence)
293                .unwrap_or(false);
294
295            if !front_match {
296                break;
297            }
298
299            let envelope = self.rx_ring.pop_front().unwrap();
300
301            // Re-parse the envelope to get payload
302            if let Ok((msgtype, _seq, payload)) = unpack_envelope(&envelope.raw) {
303                actions.push(ChannelAction::MessageReceived {
304                    msgtype,
305                    payload: payload.to_vec(),
306                    sequence: envelope.sequence,
307                });
308            }
309
310            self.next_rx_sequence = ((self.next_rx_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
311
312            // After wrapping to 0, check if 0 is also in the ring
313            if self.next_rx_sequence == 0 {
314                // Continue the loop — it will check front again
315            }
316        }
317
318        actions
319    }
320
321    fn adapt_window_on_delivery(&mut self) {
322        if self.rtt == 0.0 {
323            return;
324        }
325
326        if self.rtt > CHANNEL_RTT_FAST {
327            self.fast_rate_rounds = 0;
328
329            if self.rtt > CHANNEL_RTT_MEDIUM {
330                self.medium_rate_rounds = 0;
331            } else {
332                self.medium_rate_rounds += 1;
333                if self.window_max < CHANNEL_WINDOW_MAX_MEDIUM
334                    && self.medium_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
335                {
336                    self.window_max = CHANNEL_WINDOW_MAX_MEDIUM;
337                    self.window_min = CHANNEL_WINDOW_MIN_LIMIT_MEDIUM;
338                }
339            }
340        } else {
341            self.fast_rate_rounds += 1;
342            if self.window_max < CHANNEL_WINDOW_MAX_FAST
343                && self.fast_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
344            {
345                self.window_max = CHANNEL_WINDOW_MAX_FAST;
346                self.window_min = CHANNEL_WINDOW_MIN_LIMIT_FAST;
347            }
348        }
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355
356    #[test]
357    fn test_new_default() {
358        let ch = Channel::new(0.5);
359        assert_eq!(ch.window, CHANNEL_WINDOW);
360        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_SLOW);
361        assert!(ch.is_ready_to_send());
362    }
363
364    #[test]
365    fn test_new_very_slow() {
366        let ch = Channel::new(2.0);
367        assert_eq!(ch.window, 1);
368        assert_eq!(ch.window_max, 1);
369    }
370
371    #[test]
372    fn test_send_receive() {
373        let mut ch = Channel::new(0.1);
374        let actions = ch.send(0x01, b"hello", 1.0, 500).unwrap();
375        assert_eq!(actions.len(), 1);
376        match &actions[0] {
377            ChannelAction::SendOnLink { raw } => {
378                // Simulate receive on the other side
379                let mut ch2 = Channel::new(0.1);
380                let recv_actions = ch2.receive(raw, 1.1);
381                assert_eq!(recv_actions.len(), 1);
382                match &recv_actions[0] {
383                    ChannelAction::MessageReceived { msgtype, payload, sequence } => {
384                        assert_eq!(*msgtype, 0x01);
385                        assert_eq!(payload, b"hello");
386                        assert_eq!(*sequence, 0);
387                    }
388                    _ => panic!("Expected MessageReceived"),
389                }
390            }
391            _ => panic!("Expected SendOnLink"),
392        }
393    }
394
395    #[test]
396    fn test_send_not_ready() {
397        let mut ch = Channel::new(0.1);
398        // Fill the window
399        ch.send(0x01, b"a", 1.0, 500).unwrap();
400        ch.send(0x01, b"b", 1.0, 500).unwrap();
401        // Window = 2, both outstanding
402        assert!(!ch.is_ready_to_send());
403        assert_eq!(ch.send(0x01, b"c", 1.0, 500), Err(ChannelError::NotReady));
404    }
405
406    #[test]
407    fn test_packet_delivered_grows_window() {
408        let mut ch = Channel::new(0.1);
409        ch.send(0x01, b"a", 1.0, 500).unwrap();
410        ch.send(0x01, b"b", 1.0, 500).unwrap();
411
412        assert_eq!(ch.window, 2);
413        ch.packet_delivered(0);
414        assert_eq!(ch.window, 3);
415    }
416
417    #[test]
418    fn test_packet_timeout_shrinks_window() {
419        let mut ch = Channel::new(0.1);
420        ch.send(0x01, b"a", 1.0, 500).unwrap();
421        ch.send(0x01, b"b", 1.0, 500).unwrap();
422
423        // Deliver one to grow window
424        ch.packet_delivered(0);
425        assert_eq!(ch.window, 3);
426
427        // Timeout on seq 1
428        let actions = ch.packet_timeout(1, 2.0);
429        assert_eq!(actions.len(), 1); // resend
430        assert_eq!(ch.window, 2);
431    }
432
433    #[test]
434    fn test_max_retries_teardown() {
435        let mut ch = Channel::new(0.1);
436        ch.send(0x01, b"a", 1.0, 500).unwrap();
437
438        // Time out until max_tries exceeded
439        for i in 0..4 {
440            let actions = ch.packet_timeout(0, 2.0 + i as f64);
441            assert_eq!(actions.len(), 1);
442            match &actions[0] {
443                ChannelAction::SendOnLink { .. } => {}
444                _ => panic!("Expected SendOnLink"),
445            }
446        }
447
448        // One more timeout → teardown
449        let actions = ch.packet_timeout(0, 10.0);
450        assert_eq!(actions.len(), 1);
451        match &actions[0] {
452            ChannelAction::TeardownLink => {}
453            _ => panic!("Expected TeardownLink"),
454        }
455    }
456
457    #[test]
458    fn test_sequence_wrapping() {
459        let mut ch = Channel::new(0.1);
460        ch.next_sequence = CHANNEL_SEQ_MAX;
461
462        ch.send(0x01, b"wrap", 1.0, 500).unwrap();
463        assert_eq!(ch.next_sequence, 0);
464
465        ch.send(0x01, b"after", 1.0, 500).unwrap();
466        assert_eq!(ch.next_sequence, 1);
467    }
468
469    #[test]
470    fn test_out_of_order_buffering() {
471        let mut ch = Channel::new(0.1);
472
473        // Send messages out of order (simulate): sequence 1 arrives before 0
474        let raw0 = pack_envelope(0x01, 0, b"first");
475        let raw1 = pack_envelope(0x01, 1, b"second");
476
477        // Receive seq 1 first
478        let actions = ch.receive(&raw1, 1.0);
479        assert!(actions.is_empty()); // buffered, waiting for 0
480
481        // Receive seq 0
482        let actions = ch.receive(&raw0, 1.1);
483        assert_eq!(actions.len(), 2); // both delivered in order
484        match &actions[0] {
485            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
486            _ => panic!("Expected MessageReceived"),
487        }
488        match &actions[1] {
489            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 1),
490            _ => panic!("Expected MessageReceived"),
491        }
492    }
493
494    #[test]
495    fn test_duplicate_rejection() {
496        let mut ch = Channel::new(0.1);
497        let raw = pack_envelope(0x01, 0, b"hello");
498
499        let actions = ch.receive(&raw, 1.0);
500        assert_eq!(actions.len(), 1);
501
502        // Duplicate
503        let actions = ch.receive(&raw, 1.1);
504        assert!(actions.is_empty());
505    }
506
507    #[test]
508    fn test_get_packet_timeout() {
509        let ch = Channel::new(0.1);
510        let t1 = ch.get_packet_timeout(1);
511        let t2 = ch.get_packet_timeout(2);
512        assert!(t2 > t1); // exponential backoff
513    }
514
515    #[test]
516    fn test_mdu() {
517        let ch = Channel::new(0.1);
518        assert_eq!(ch.mdu(431), 431 - CHANNEL_ENVELOPE_OVERHEAD);
519    }
520
521    #[test]
522    fn test_window_upgrade_fast() {
523        let mut ch = Channel::new(0.05); // fast RTT
524        ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
525
526        // Deliver FAST_RATE_THRESHOLD messages
527        for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
528            ch.send(0x01, b"x", i as f64, 500).unwrap();
529            ch.packet_delivered(i);
530        }
531
532        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_FAST);
533        assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_FAST);
534    }
535
536    #[test]
537    fn test_window_upgrade_medium() {
538        let mut ch = Channel::new(0.5); // medium RTT
539        ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
540
541        for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
542            ch.send(0x01, b"x", i as f64, 500).unwrap();
543            ch.packet_delivered(i);
544        }
545
546        assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_MEDIUM);
547        assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM);
548    }
549
550    #[test]
551    fn test_shutdown() {
552        let mut ch = Channel::new(0.1);
553        ch.send(0x01, b"a", 1.0, 500).unwrap();
554        ch.shutdown();
555        assert_eq!(ch.outstanding(), 0);
556    }
557
558    #[test]
559    fn test_message_too_big() {
560        let mut ch = Channel::new(0.1);
561        let big = vec![0u8; 500];
562        // link_mdu = 10, message + header won't fit
563        assert_eq!(ch.send(0x01, &big, 1.0, 10), Err(ChannelError::MessageTooBig));
564    }
565
566    #[test]
567    fn test_receive_sequence_wrap_at_boundary() {
568        let mut ch = Channel::new(0.1);
569        ch.next_rx_sequence = CHANNEL_SEQ_MAX;
570
571        let raw_max = pack_envelope(0x01, CHANNEL_SEQ_MAX, b"last");
572        let raw_zero = pack_envelope(0x01, 0, b"first_after_wrap");
573
574        let actions = ch.receive(&raw_max, 1.0);
575        assert_eq!(actions.len(), 1);
576        assert_eq!(ch.next_rx_sequence, 0);
577
578        let actions = ch.receive(&raw_zero, 1.1);
579        assert_eq!(actions.len(), 1);
580        match &actions[0] {
581            ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
582            _ => panic!("Expected MessageReceived"),
583        }
584    }
585
586    #[test]
587    fn test_receive_wrap_boundary_out_of_order() {
588        // Test that out-of-order messages at the wrap boundary (0xFFFF→0) are sorted correctly.
589        let mut ch = Channel::new(0.1);
590        ch.next_rx_sequence = 0xFFFE;
591
592        let raw_fffe = pack_envelope(0x01, 0xFFFE, b"a");
593        let raw_ffff = pack_envelope(0x01, 0xFFFF, b"b");
594        let raw_0000 = pack_envelope(0x01, 0x0000, b"c");
595
596        // Deliver in reverse order: 0, 0xFFFF, 0xFFFE
597        let actions = ch.receive(&raw_0000, 1.0);
598        assert!(actions.is_empty()); // waiting for 0xFFFE
599
600        let actions = ch.receive(&raw_ffff, 1.1);
601        assert!(actions.is_empty()); // still waiting for 0xFFFE
602
603        let actions = ch.receive(&raw_fffe, 1.2);
604        assert_eq!(actions.len(), 3); // all three delivered in order
605        match &actions[0] {
606            ChannelAction::MessageReceived { sequence, payload, .. } => {
607                assert_eq!(*sequence, 0xFFFE);
608                assert_eq!(payload, b"a");
609            }
610            _ => panic!("Expected MessageReceived"),
611        }
612        match &actions[1] {
613            ChannelAction::MessageReceived { sequence, payload, .. } => {
614                assert_eq!(*sequence, 0xFFFF);
615                assert_eq!(payload, b"b");
616            }
617            _ => panic!("Expected MessageReceived"),
618        }
619        match &actions[2] {
620            ChannelAction::MessageReceived { sequence, payload, .. } => {
621                assert_eq!(*sequence, 0x0000);
622                assert_eq!(payload, b"c");
623            }
624            _ => panic!("Expected MessageReceived"),
625        }
626    }
627
628    #[test]
629    fn test_many_messages_in_order() {
630        let mut sender = Channel::new(0.05);
631        let mut receiver = Channel::new(0.05);
632
633        for i in 0..20u16 {
634            // Deliver previous to make window available
635            if i >= 2 {
636                sender.packet_delivered(i - 2);
637            }
638
639            let actions = sender.send(0x01, &[i as u8], i as f64, 500).unwrap();
640            let raw = match &actions[0] {
641                ChannelAction::SendOnLink { raw } => raw.clone(),
642                _ => panic!("Expected SendOnLink"),
643            };
644
645            let recv_actions = receiver.receive(&raw, i as f64 + 0.1);
646            assert_eq!(recv_actions.len(), 1);
647            match &recv_actions[0] {
648                ChannelAction::MessageReceived { payload, sequence, .. } => {
649                    assert_eq!(*sequence, i);
650                    assert_eq!(payload, &[i as u8]);
651                }
652                _ => panic!("Expected MessageReceived"),
653            }
654        }
655    }
656}