1pub mod envelope;
2pub mod types;
3
4use alloc::collections::VecDeque;
5use alloc::vec::Vec;
6
7#[cfg(test)]
8use crate::constants::CHANNEL_SEQ_MAX;
9use crate::constants::{
10 CHANNEL_ENVELOPE_OVERHEAD, CHANNEL_FAST_RATE_THRESHOLD, CHANNEL_MAX_TRIES, CHANNEL_RTT_FAST,
11 CHANNEL_RTT_MEDIUM, CHANNEL_RTT_SLOW, CHANNEL_SEQ_MODULUS, CHANNEL_WINDOW,
12 CHANNEL_WINDOW_FLEXIBILITY, CHANNEL_WINDOW_MAX_FAST, CHANNEL_WINDOW_MAX_MEDIUM,
13 CHANNEL_WINDOW_MAX_SLOW, CHANNEL_WINDOW_MIN, CHANNEL_WINDOW_MIN_LIMIT_FAST,
14 CHANNEL_WINDOW_MIN_LIMIT_MEDIUM,
15};
16
17pub use types::{ChannelAction, ChannelError, MessageType, Sequence};
18
19use envelope::{pack_envelope, unpack_envelope};
20
21struct Envelope {
23 sequence: Sequence,
24 raw: Vec<u8>,
25 tries: u8,
26 sent_at: f64,
27 delivered: bool,
28}
29
30pub struct Channel {
35 tx_ring: VecDeque<Envelope>,
36 rx_ring: VecDeque<Envelope>,
37 next_sequence: u16,
38 next_rx_sequence: u16,
39 window: u16,
40 window_max: u16,
41 window_min: u16,
42 window_flexibility: u16,
43 fast_rate_rounds: u16,
44 medium_rate_rounds: u16,
45 max_tries: u8,
46 rtt: f64,
47}
48
49impl Channel {
50 pub fn new(initial_rtt: f64) -> Self {
52 let (window, window_max, window_min, window_flexibility) = if initial_rtt > CHANNEL_RTT_SLOW
53 {
54 (1, 1, 1, 1)
55 } else {
56 (
57 CHANNEL_WINDOW,
58 CHANNEL_WINDOW_MAX_SLOW,
59 CHANNEL_WINDOW_MIN,
60 CHANNEL_WINDOW_FLEXIBILITY,
61 )
62 };
63
64 Channel {
65 tx_ring: VecDeque::new(),
66 rx_ring: VecDeque::new(),
67 next_sequence: 0,
68 next_rx_sequence: 0,
69 window,
70 window_max,
71 window_min,
72 window_flexibility,
73 fast_rate_rounds: 0,
74 medium_rate_rounds: 0,
75 max_tries: CHANNEL_MAX_TRIES,
76 rtt: initial_rtt,
77 }
78 }
79
80 pub fn set_rtt(&mut self, rtt: f64) {
82 self.rtt = rtt;
83 }
84
85 pub fn mdu(&self, link_mdu: usize) -> usize {
87 let mdu = link_mdu.saturating_sub(CHANNEL_ENVELOPE_OVERHEAD);
88 mdu.min(0xFFFF)
89 }
90
91 pub fn is_ready_to_send(&self) -> bool {
93 let outstanding = self.tx_ring.iter().filter(|e| !e.delivered).count() as u16;
94 outstanding < self.window
95 }
96
97 pub fn send(
99 &mut self,
100 msgtype: u16,
101 payload: &[u8],
102 now: f64,
103 link_mdu: usize,
104 ) -> Result<Vec<ChannelAction>, ChannelError> {
105 if !self.is_ready_to_send() {
106 return Err(ChannelError::NotReady);
107 }
108
109 let sequence = self.next_sequence;
110 let raw = pack_envelope(msgtype, sequence, payload);
111 if raw.len() > link_mdu {
112 return Err(ChannelError::MessageTooBig);
113 }
114
115 self.next_sequence = ((self.next_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
116 self.tx_ring.push_back(Envelope {
117 sequence,
118 raw: raw.clone(),
119 tries: 1,
120 sent_at: now,
121 delivered: false,
122 });
123
124 Ok(alloc::vec![ChannelAction::SendOnLink { raw, sequence }])
125 }
126
127 pub fn receive(&mut self, raw: &[u8], _now: f64) -> Vec<ChannelAction> {
132 let (_msgtype, sequence, _payload) = match unpack_envelope(raw) {
133 Ok(r) => r,
134 Err(_) => return Vec::new(),
135 };
136
137 if self.is_behind_rx_window(sequence) {
139 return Vec::new();
140 }
141
142 if self.rx_ring.iter().any(|e| e.sequence == sequence) {
144 return Vec::new();
145 }
146
147 let envelope = Envelope {
149 sequence,
150 raw: raw.to_vec(),
151 tries: 0,
152 sent_at: 0.0,
153 delivered: false,
154 };
155 self.emplace_rx(envelope);
156
157 self.collect_contiguous()
159 }
160
161 pub fn flush_tx(&mut self) {
164 self.tx_ring.clear();
165 }
166
167 pub fn cancel_send(&mut self, sequence: Sequence) -> bool {
169 let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) else {
170 return false;
171 };
172 self.tx_ring.remove(pos);
173 let expected_next = ((sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
174 if self.next_sequence == expected_next {
175 self.next_sequence = sequence;
176 }
177 true
178 }
179
180 pub fn packet_delivered(&mut self, sequence: Sequence) -> Vec<ChannelAction> {
182 if let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) {
183 self.tx_ring.remove(pos);
184
185 if self.window < self.window_max {
186 self.window += 1;
187 }
188
189 self.adapt_window_on_delivery();
191 }
192 Vec::new()
193 }
194
195 pub fn packet_timeout(&mut self, sequence: Sequence, now: f64) -> Vec<ChannelAction> {
197 let pos = match self.tx_ring.iter().position(|e| e.sequence == sequence) {
198 Some(p) => p,
199 None => return Vec::new(),
200 };
201
202 let envelope = &self.tx_ring[pos];
203 if envelope.tries >= self.max_tries {
204 self.tx_ring.clear();
205 self.rx_ring.clear();
206 return alloc::vec![ChannelAction::TeardownLink];
207 }
208
209 let envelope = &mut self.tx_ring[pos];
211 envelope.tries += 1;
212 envelope.sent_at = now;
213 let raw = envelope.raw.clone();
214
215 if self.window > self.window_min {
217 self.window -= 1;
218 if self.window_max > self.window_min + self.window_flexibility {
219 self.window_max -= 1;
220 }
221 }
222
223 alloc::vec![ChannelAction::SendOnLink { raw, sequence }]
224 }
225
226 pub fn get_packet_timeout(&self, tries: u8) -> f64 {
230 let base = 1.5_f64.powi((tries as i32) - 1);
231 let rtt_factor = (self.rtt * 2.5).max(0.025);
232 let ring_factor = (self.tx_ring.len() as f64) + 1.5;
233 base * rtt_factor * ring_factor
234 }
235
236 pub fn get_tries(&self, sequence: Sequence) -> Option<u8> {
238 self.tx_ring
239 .iter()
240 .find(|e| e.sequence == sequence)
241 .map(|e| e.tries)
242 }
243
244 pub fn tick(&mut self, now: f64) -> Vec<ChannelAction> {
246 let timed_out: Vec<Sequence> = self
247 .tx_ring
248 .iter()
249 .filter(|e| !e.delivered && now - e.sent_at >= self.get_packet_timeout(e.tries))
250 .map(|e| e.sequence)
251 .collect();
252
253 let mut actions = Vec::new();
254 for sequence in timed_out {
255 actions.extend(self.packet_timeout(sequence, now));
256 }
257 actions
258 }
259
260 pub fn shutdown(&mut self) {
262 self.tx_ring.clear();
263 self.rx_ring.clear();
264 }
265
266 pub fn window(&self) -> u16 {
268 self.window
269 }
270
271 pub fn window_max(&self) -> u16 {
273 self.window_max
274 }
275
276 pub fn outstanding(&self) -> usize {
278 self.tx_ring.iter().filter(|e| !e.delivered).count()
279 }
280
281 fn is_behind_rx_window(&self, sequence: Sequence) -> bool {
284 if sequence < self.next_rx_sequence {
285 let window_overflow = (self.next_rx_sequence as u32 + CHANNEL_WINDOW_MAX_FAST as u32)
286 % CHANNEL_SEQ_MODULUS;
287 let overflow = window_overflow as u16;
288 if overflow < self.next_rx_sequence {
289 if sequence > overflow {
291 return true; }
293 return false; }
295 return true;
296 }
297 false
298 }
299
300 fn emplace_rx(&mut self, envelope: Envelope) {
301 let env_dist = envelope.sequence.wrapping_sub(self.next_rx_sequence);
304 for (i, existing) in self.rx_ring.iter().enumerate() {
305 if envelope.sequence == existing.sequence {
306 return; }
308 let exist_dist = existing.sequence.wrapping_sub(self.next_rx_sequence);
309 if env_dist < exist_dist {
310 self.rx_ring.insert(i, envelope);
311 return;
312 }
313 }
314 self.rx_ring.push_back(envelope);
315 }
316
317 fn collect_contiguous(&mut self) -> Vec<ChannelAction> {
318 let mut actions = Vec::new();
319
320 loop {
321 let front_match = self
322 .rx_ring
323 .front()
324 .map(|e| e.sequence == self.next_rx_sequence)
325 .unwrap_or(false);
326
327 if !front_match {
328 break;
329 }
330
331 let envelope = self.rx_ring.pop_front().unwrap();
332
333 if let Ok((msgtype, _seq, payload)) = unpack_envelope(&envelope.raw) {
335 actions.push(ChannelAction::MessageReceived {
336 msgtype,
337 payload: payload.to_vec(),
338 sequence: envelope.sequence,
339 });
340 }
341
342 self.next_rx_sequence =
343 ((self.next_rx_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
344
345 if self.next_rx_sequence == 0 {
347 }
349 }
350
351 actions
352 }
353
354 fn adapt_window_on_delivery(&mut self) {
355 if self.rtt == 0.0 {
356 return;
357 }
358
359 if self.rtt > CHANNEL_RTT_FAST {
360 self.fast_rate_rounds = 0;
361
362 if self.rtt > CHANNEL_RTT_MEDIUM {
363 self.medium_rate_rounds = 0;
364 } else {
365 self.medium_rate_rounds += 1;
366 if self.window_max < CHANNEL_WINDOW_MAX_MEDIUM
367 && self.medium_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
368 {
369 self.window_max = CHANNEL_WINDOW_MAX_MEDIUM;
370 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_MEDIUM;
371 }
372 }
373 } else {
374 self.fast_rate_rounds += 1;
375 if self.window_max < CHANNEL_WINDOW_MAX_FAST
376 && self.fast_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
377 {
378 self.window_max = CHANNEL_WINDOW_MAX_FAST;
379 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_FAST;
380 }
381 }
382 }
383}
384
385#[cfg(test)]
386mod tests {
387 use super::*;
388
389 #[test]
390 fn test_new_default() {
391 let ch = Channel::new(0.5);
392 assert_eq!(ch.window, CHANNEL_WINDOW);
393 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_SLOW);
394 assert!(ch.is_ready_to_send());
395 }
396
397 #[test]
398 fn test_new_very_slow() {
399 let ch = Channel::new(2.0);
400 assert_eq!(ch.window, 1);
401 assert_eq!(ch.window_max, 1);
402 }
403
404 #[test]
405 fn test_send_receive() {
406 let mut ch = Channel::new(0.1);
407 let actions = ch.send(0x01, b"hello", 1.0, 500).unwrap();
408 assert_eq!(actions.len(), 1);
409 match &actions[0] {
410 ChannelAction::SendOnLink { raw, sequence } => {
411 assert_eq!(*sequence, 0);
412 let mut ch2 = Channel::new(0.1);
414 let recv_actions = ch2.receive(raw, 1.1);
415 assert_eq!(recv_actions.len(), 1);
416 match &recv_actions[0] {
417 ChannelAction::MessageReceived {
418 msgtype,
419 payload,
420 sequence,
421 } => {
422 assert_eq!(*msgtype, 0x01);
423 assert_eq!(payload, b"hello");
424 assert_eq!(*sequence, 0);
425 }
426 _ => panic!("Expected MessageReceived"),
427 }
428 }
429 _ => panic!("Expected SendOnLink"),
430 }
431 }
432
433 #[test]
434 fn test_send_not_ready() {
435 let mut ch = Channel::new(0.1);
436 ch.send(0x01, b"a", 1.0, 500).unwrap();
438 ch.send(0x01, b"b", 1.0, 500).unwrap();
439 assert!(!ch.is_ready_to_send());
441 assert_eq!(ch.send(0x01, b"c", 1.0, 500), Err(ChannelError::NotReady));
442 }
443
444 #[test]
445 fn test_message_too_big_does_not_consume_sequence() {
446 let mut ch = Channel::new(0.1);
447 assert_eq!(
448 ch.send(0x01, b"hello", 1.0, 2),
449 Err(ChannelError::MessageTooBig)
450 );
451
452 let actions = ch.send(0x01, b"ok", 2.0, 500).unwrap();
453 match &actions[0] {
454 ChannelAction::SendOnLink { sequence, .. } => assert_eq!(*sequence, 0),
455 _ => panic!("Expected SendOnLink"),
456 }
457 }
458
459 #[test]
460 fn test_cancel_send_rewinds_sequence_and_frees_window() {
461 let mut ch = Channel::new(CHANNEL_RTT_SLOW + 1.0);
462 let actions = ch.send(0x01, b"first", 1.0, 500).unwrap();
463 let sequence = match &actions[0] {
464 ChannelAction::SendOnLink { sequence, .. } => *sequence,
465 _ => panic!("Expected SendOnLink"),
466 };
467 assert!(!ch.is_ready_to_send());
468
469 assert!(ch.cancel_send(sequence));
470 assert!(ch.is_ready_to_send());
471 let actions = ch.send(0x01, b"retry", 2.0, 500).unwrap();
472 match &actions[0] {
473 ChannelAction::SendOnLink { sequence, .. } => assert_eq!(*sequence, 0),
474 _ => panic!("Expected SendOnLink"),
475 }
476 }
477
478 #[test]
479 fn test_packet_delivered_grows_window() {
480 let mut ch = Channel::new(0.1);
481 ch.send(0x01, b"a", 1.0, 500).unwrap();
482 ch.send(0x01, b"b", 1.0, 500).unwrap();
483
484 assert_eq!(ch.window, 2);
485 ch.packet_delivered(0);
486 assert_eq!(ch.window, 3);
487 }
488
489 #[test]
490 fn test_packet_timeout_shrinks_window() {
491 let mut ch = Channel::new(0.1);
492 ch.send(0x01, b"a", 1.0, 500).unwrap();
493 ch.send(0x01, b"b", 1.0, 500).unwrap();
494
495 ch.packet_delivered(0);
497 assert_eq!(ch.window, 3);
498
499 let actions = ch.packet_timeout(1, 2.0);
501 assert_eq!(actions.len(), 1); assert_eq!(ch.window, 2);
503 }
504
505 #[test]
506 fn test_tick_retransmits_timed_out_packets() {
507 let mut ch = Channel::new(0.1);
508 ch.send(0x01, b"a", 0.0, 500).unwrap();
509
510 let timeout = ch.get_packet_timeout(1);
511 let actions = ch.tick(timeout + 0.01);
512 assert_eq!(actions.len(), 1);
513 match &actions[0] {
514 ChannelAction::SendOnLink { sequence, .. } => assert_eq!(*sequence, 0),
515 _ => panic!("Expected SendOnLink"),
516 }
517 assert_eq!(ch.get_tries(0), Some(2));
518 }
519
520 #[test]
521 fn test_max_retries_teardown() {
522 let mut ch = Channel::new(0.1);
523 ch.send(0x01, b"a", 1.0, 500).unwrap();
524
525 for i in 0..4 {
527 let actions = ch.packet_timeout(0, 2.0 + i as f64);
528 assert_eq!(actions.len(), 1);
529 match &actions[0] {
530 ChannelAction::SendOnLink { .. } => {}
531 _ => panic!("Expected SendOnLink"),
532 }
533 }
534
535 let actions = ch.packet_timeout(0, 10.0);
537 assert_eq!(actions.len(), 1);
538 match &actions[0] {
539 ChannelAction::TeardownLink => {}
540 _ => panic!("Expected TeardownLink"),
541 }
542 }
543
544 #[test]
545 fn test_sequence_wrapping() {
546 let mut ch = Channel::new(0.1);
547 ch.next_sequence = CHANNEL_SEQ_MAX;
548
549 ch.send(0x01, b"wrap", 1.0, 500).unwrap();
550 assert_eq!(ch.next_sequence, 0);
551
552 ch.send(0x01, b"after", 1.0, 500).unwrap();
553 assert_eq!(ch.next_sequence, 1);
554 }
555
556 #[test]
557 fn test_out_of_order_buffering() {
558 let mut ch = Channel::new(0.1);
559
560 let raw0 = pack_envelope(0x01, 0, b"first");
562 let raw1 = pack_envelope(0x01, 1, b"second");
563
564 let actions = ch.receive(&raw1, 1.0);
566 assert!(actions.is_empty()); let actions = ch.receive(&raw0, 1.1);
570 assert_eq!(actions.len(), 2); match &actions[0] {
572 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
573 _ => panic!("Expected MessageReceived"),
574 }
575 match &actions[1] {
576 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 1),
577 _ => panic!("Expected MessageReceived"),
578 }
579 }
580
581 #[test]
582 fn test_duplicate_rejection() {
583 let mut ch = Channel::new(0.1);
584 let raw = pack_envelope(0x01, 0, b"hello");
585
586 let actions = ch.receive(&raw, 1.0);
587 assert_eq!(actions.len(), 1);
588
589 let actions = ch.receive(&raw, 1.1);
591 assert!(actions.is_empty());
592 }
593
594 #[test]
595 fn test_get_packet_timeout() {
596 let ch = Channel::new(0.1);
597 let t1 = ch.get_packet_timeout(1);
598 let t2 = ch.get_packet_timeout(2);
599 assert!(t2 > t1); }
601
602 #[test]
603 fn test_mdu() {
604 let ch = Channel::new(0.1);
605 assert_eq!(ch.mdu(431), 431 - CHANNEL_ENVELOPE_OVERHEAD);
606 }
607
608 #[test]
609 fn test_window_upgrade_fast() {
610 let mut ch = Channel::new(0.05); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
612
613 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
615 ch.send(0x01, b"x", i as f64, 500).unwrap();
616 ch.packet_delivered(i);
617 }
618
619 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_FAST);
620 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_FAST);
621 }
622
623 #[test]
624 fn test_window_upgrade_medium() {
625 let mut ch = Channel::new(0.5); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
627
628 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
629 ch.send(0x01, b"x", i as f64, 500).unwrap();
630 ch.packet_delivered(i);
631 }
632
633 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_MEDIUM);
634 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM);
635 }
636
637 #[test]
638 fn test_shutdown() {
639 let mut ch = Channel::new(0.1);
640 ch.send(0x01, b"a", 1.0, 500).unwrap();
641 ch.shutdown();
642 assert_eq!(ch.outstanding(), 0);
643 }
644
645 #[test]
646 fn test_message_too_big() {
647 let mut ch = Channel::new(0.1);
648 let big = vec![0u8; 500];
649 assert_eq!(
651 ch.send(0x01, &big, 1.0, 10),
652 Err(ChannelError::MessageTooBig)
653 );
654 }
655
656 #[test]
657 fn test_receive_sequence_wrap_at_boundary() {
658 let mut ch = Channel::new(0.1);
659 ch.next_rx_sequence = CHANNEL_SEQ_MAX;
660
661 let raw_max = pack_envelope(0x01, CHANNEL_SEQ_MAX, b"last");
662 let raw_zero = pack_envelope(0x01, 0, b"first_after_wrap");
663
664 let actions = ch.receive(&raw_max, 1.0);
665 assert_eq!(actions.len(), 1);
666 assert_eq!(ch.next_rx_sequence, 0);
667
668 let actions = ch.receive(&raw_zero, 1.1);
669 assert_eq!(actions.len(), 1);
670 match &actions[0] {
671 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
672 _ => panic!("Expected MessageReceived"),
673 }
674 }
675
676 #[test]
677 fn test_receive_wrap_boundary_out_of_order() {
678 let mut ch = Channel::new(0.1);
680 ch.next_rx_sequence = 0xFFFE;
681
682 let raw_fffe = pack_envelope(0x01, 0xFFFE, b"a");
683 let raw_ffff = pack_envelope(0x01, 0xFFFF, b"b");
684 let raw_0000 = pack_envelope(0x01, 0x0000, b"c");
685
686 let actions = ch.receive(&raw_0000, 1.0);
688 assert!(actions.is_empty()); let actions = ch.receive(&raw_ffff, 1.1);
691 assert!(actions.is_empty()); let actions = ch.receive(&raw_fffe, 1.2);
694 assert_eq!(actions.len(), 3); match &actions[0] {
696 ChannelAction::MessageReceived {
697 sequence, payload, ..
698 } => {
699 assert_eq!(*sequence, 0xFFFE);
700 assert_eq!(payload, b"a");
701 }
702 _ => panic!("Expected MessageReceived"),
703 }
704 match &actions[1] {
705 ChannelAction::MessageReceived {
706 sequence, payload, ..
707 } => {
708 assert_eq!(*sequence, 0xFFFF);
709 assert_eq!(payload, b"b");
710 }
711 _ => panic!("Expected MessageReceived"),
712 }
713 match &actions[2] {
714 ChannelAction::MessageReceived {
715 sequence, payload, ..
716 } => {
717 assert_eq!(*sequence, 0x0000);
718 assert_eq!(payload, b"c");
719 }
720 _ => panic!("Expected MessageReceived"),
721 }
722 }
723
724 #[test]
725 fn test_many_messages_in_order() {
726 let mut sender = Channel::new(0.05);
727 let mut receiver = Channel::new(0.05);
728
729 for i in 0..20u16 {
730 if i >= 2 {
732 sender.packet_delivered(i - 2);
733 }
734
735 let actions = sender.send(0x01, &[i as u8], i as f64, 500).unwrap();
736 let raw = match &actions[0] {
737 ChannelAction::SendOnLink { raw, .. } => raw.clone(),
738 _ => panic!("Expected SendOnLink"),
739 };
740
741 let recv_actions = receiver.receive(&raw, i as f64 + 0.1);
742 assert_eq!(recv_actions.len(), 1);
743 match &recv_actions[0] {
744 ChannelAction::MessageReceived {
745 payload, sequence, ..
746 } => {
747 assert_eq!(*sequence, i);
748 assert_eq!(payload, &[i as u8]);
749 }
750 _ => panic!("Expected MessageReceived"),
751 }
752 }
753 }
754}