1pub mod envelope;
2pub mod types;
3
4use alloc::collections::VecDeque;
5use alloc::vec::Vec;
6
7#[cfg(test)]
8use crate::constants::CHANNEL_SEQ_MAX;
9use crate::constants::{
10 CHANNEL_ENVELOPE_OVERHEAD, CHANNEL_FAST_RATE_THRESHOLD, CHANNEL_MAX_TRIES, CHANNEL_RTT_FAST,
11 CHANNEL_RTT_MEDIUM, CHANNEL_RTT_SLOW, CHANNEL_SEQ_MODULUS, CHANNEL_WINDOW,
12 CHANNEL_WINDOW_FLEXIBILITY, CHANNEL_WINDOW_MAX_FAST, CHANNEL_WINDOW_MAX_MEDIUM,
13 CHANNEL_WINDOW_MAX_SLOW, CHANNEL_WINDOW_MIN, CHANNEL_WINDOW_MIN_LIMIT_FAST,
14 CHANNEL_WINDOW_MIN_LIMIT_MEDIUM,
15};
16
17pub use types::{ChannelAction, ChannelError, MessageType, Sequence};
18
19use envelope::{pack_envelope, unpack_envelope};
20
21struct Envelope {
23 sequence: Sequence,
24 raw: Vec<u8>,
25 tries: u8,
26 sent_at: f64,
27 delivered: bool,
28}
29
30pub struct Channel {
35 tx_ring: VecDeque<Envelope>,
36 rx_ring: VecDeque<Envelope>,
37 next_sequence: u16,
38 next_rx_sequence: u16,
39 window: u16,
40 window_max: u16,
41 window_min: u16,
42 window_flexibility: u16,
43 fast_rate_rounds: u16,
44 medium_rate_rounds: u16,
45 max_tries: u8,
46 rtt: f64,
47}
48
49impl Channel {
50 pub fn new(initial_rtt: f64) -> Self {
52 let (window, window_max, window_min, window_flexibility) = if initial_rtt > CHANNEL_RTT_SLOW
53 {
54 (1, 1, 1, 1)
55 } else {
56 (
57 CHANNEL_WINDOW,
58 CHANNEL_WINDOW_MAX_SLOW,
59 CHANNEL_WINDOW_MIN,
60 CHANNEL_WINDOW_FLEXIBILITY,
61 )
62 };
63
64 Channel {
65 tx_ring: VecDeque::new(),
66 rx_ring: VecDeque::new(),
67 next_sequence: 0,
68 next_rx_sequence: 0,
69 window,
70 window_max,
71 window_min,
72 window_flexibility,
73 fast_rate_rounds: 0,
74 medium_rate_rounds: 0,
75 max_tries: CHANNEL_MAX_TRIES,
76 rtt: initial_rtt,
77 }
78 }
79
80 pub fn set_rtt(&mut self, rtt: f64) {
82 self.rtt = rtt;
83 }
84
85 pub fn mdu(&self, link_mdu: usize) -> usize {
87 let mdu = link_mdu.saturating_sub(CHANNEL_ENVELOPE_OVERHEAD);
88 mdu.min(0xFFFF)
89 }
90
91 pub fn is_ready_to_send(&self) -> bool {
93 let outstanding = self.tx_ring.iter().filter(|e| !e.delivered).count() as u16;
94 outstanding < self.window
95 }
96
97 pub fn send(
99 &mut self,
100 msgtype: u16,
101 payload: &[u8],
102 now: f64,
103 link_mdu: usize,
104 ) -> Result<Vec<ChannelAction>, ChannelError> {
105 if !self.is_ready_to_send() {
106 return Err(ChannelError::NotReady);
107 }
108
109 let sequence = self.next_sequence;
110 self.next_sequence = ((self.next_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
111
112 let raw = pack_envelope(msgtype, sequence, payload);
113 if raw.len() > link_mdu {
114 return Err(ChannelError::MessageTooBig);
115 }
116
117 self.tx_ring.push_back(Envelope {
118 sequence,
119 raw: raw.clone(),
120 tries: 1,
121 sent_at: now,
122 delivered: false,
123 });
124
125 Ok(alloc::vec![ChannelAction::SendOnLink { raw, sequence }])
126 }
127
128 pub fn receive(&mut self, raw: &[u8], _now: f64) -> Vec<ChannelAction> {
133 let (_msgtype, sequence, _payload) = match unpack_envelope(raw) {
134 Ok(r) => r,
135 Err(_) => return Vec::new(),
136 };
137
138 if self.is_behind_rx_window(sequence) {
140 return Vec::new();
141 }
142
143 if self.rx_ring.iter().any(|e| e.sequence == sequence) {
145 return Vec::new();
146 }
147
148 let envelope = Envelope {
150 sequence,
151 raw: raw.to_vec(),
152 tries: 0,
153 sent_at: 0.0,
154 delivered: false,
155 };
156 self.emplace_rx(envelope);
157
158 self.collect_contiguous()
160 }
161
162 pub fn flush_tx(&mut self) {
165 self.tx_ring.clear();
166 }
167
168 pub fn packet_delivered(&mut self, sequence: Sequence) -> Vec<ChannelAction> {
170 if let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) {
171 self.tx_ring.remove(pos);
172
173 if self.window < self.window_max {
174 self.window += 1;
175 }
176
177 self.adapt_window_on_delivery();
179 }
180 Vec::new()
181 }
182
183 pub fn packet_timeout(&mut self, sequence: Sequence, now: f64) -> Vec<ChannelAction> {
185 let pos = match self.tx_ring.iter().position(|e| e.sequence == sequence) {
186 Some(p) => p,
187 None => return Vec::new(),
188 };
189
190 let envelope = &self.tx_ring[pos];
191 if envelope.tries >= self.max_tries {
192 self.tx_ring.clear();
193 self.rx_ring.clear();
194 return alloc::vec![ChannelAction::TeardownLink];
195 }
196
197 let envelope = &mut self.tx_ring[pos];
199 envelope.tries += 1;
200 envelope.sent_at = now;
201 let raw = envelope.raw.clone();
202
203 if self.window > self.window_min {
205 self.window -= 1;
206 if self.window_max > self.window_min + self.window_flexibility {
207 self.window_max -= 1;
208 }
209 }
210
211 alloc::vec![ChannelAction::SendOnLink { raw, sequence }]
212 }
213
214 pub fn get_packet_timeout(&self, tries: u8) -> f64 {
218 let base = 1.5_f64.powi((tries as i32) - 1);
219 let rtt_factor = (self.rtt * 2.5).max(0.025);
220 let ring_factor = (self.tx_ring.len() as f64) + 1.5;
221 base * rtt_factor * ring_factor
222 }
223
224 pub fn get_tries(&self, sequence: Sequence) -> Option<u8> {
226 self.tx_ring
227 .iter()
228 .find(|e| e.sequence == sequence)
229 .map(|e| e.tries)
230 }
231
232 pub fn tick(&mut self, now: f64) -> Vec<ChannelAction> {
234 let timed_out: Vec<Sequence> = self
235 .tx_ring
236 .iter()
237 .filter(|e| !e.delivered && now - e.sent_at >= self.get_packet_timeout(e.tries))
238 .map(|e| e.sequence)
239 .collect();
240
241 let mut actions = Vec::new();
242 for sequence in timed_out {
243 actions.extend(self.packet_timeout(sequence, now));
244 }
245 actions
246 }
247
248 pub fn shutdown(&mut self) {
250 self.tx_ring.clear();
251 self.rx_ring.clear();
252 }
253
254 pub fn window(&self) -> u16 {
256 self.window
257 }
258
259 pub fn window_max(&self) -> u16 {
261 self.window_max
262 }
263
264 pub fn outstanding(&self) -> usize {
266 self.tx_ring.iter().filter(|e| !e.delivered).count()
267 }
268
269 fn is_behind_rx_window(&self, sequence: Sequence) -> bool {
272 if sequence < self.next_rx_sequence {
273 let window_overflow = (self.next_rx_sequence as u32 + CHANNEL_WINDOW_MAX_FAST as u32)
274 % CHANNEL_SEQ_MODULUS;
275 let overflow = window_overflow as u16;
276 if overflow < self.next_rx_sequence {
277 if sequence > overflow {
279 return true; }
281 return false; }
283 return true;
284 }
285 false
286 }
287
288 fn emplace_rx(&mut self, envelope: Envelope) {
289 let env_dist = envelope.sequence.wrapping_sub(self.next_rx_sequence);
292 for (i, existing) in self.rx_ring.iter().enumerate() {
293 if envelope.sequence == existing.sequence {
294 return; }
296 let exist_dist = existing.sequence.wrapping_sub(self.next_rx_sequence);
297 if env_dist < exist_dist {
298 self.rx_ring.insert(i, envelope);
299 return;
300 }
301 }
302 self.rx_ring.push_back(envelope);
303 }
304
305 fn collect_contiguous(&mut self) -> Vec<ChannelAction> {
306 let mut actions = Vec::new();
307
308 loop {
309 let front_match = self
310 .rx_ring
311 .front()
312 .map(|e| e.sequence == self.next_rx_sequence)
313 .unwrap_or(false);
314
315 if !front_match {
316 break;
317 }
318
319 let envelope = self.rx_ring.pop_front().unwrap();
320
321 if let Ok((msgtype, _seq, payload)) = unpack_envelope(&envelope.raw) {
323 actions.push(ChannelAction::MessageReceived {
324 msgtype,
325 payload: payload.to_vec(),
326 sequence: envelope.sequence,
327 });
328 }
329
330 self.next_rx_sequence =
331 ((self.next_rx_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
332
333 if self.next_rx_sequence == 0 {
335 }
337 }
338
339 actions
340 }
341
342 fn adapt_window_on_delivery(&mut self) {
343 if self.rtt == 0.0 {
344 return;
345 }
346
347 if self.rtt > CHANNEL_RTT_FAST {
348 self.fast_rate_rounds = 0;
349
350 if self.rtt > CHANNEL_RTT_MEDIUM {
351 self.medium_rate_rounds = 0;
352 } else {
353 self.medium_rate_rounds += 1;
354 if self.window_max < CHANNEL_WINDOW_MAX_MEDIUM
355 && self.medium_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
356 {
357 self.window_max = CHANNEL_WINDOW_MAX_MEDIUM;
358 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_MEDIUM;
359 }
360 }
361 } else {
362 self.fast_rate_rounds += 1;
363 if self.window_max < CHANNEL_WINDOW_MAX_FAST
364 && self.fast_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
365 {
366 self.window_max = CHANNEL_WINDOW_MAX_FAST;
367 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_FAST;
368 }
369 }
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[test]
378 fn test_new_default() {
379 let ch = Channel::new(0.5);
380 assert_eq!(ch.window, CHANNEL_WINDOW);
381 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_SLOW);
382 assert!(ch.is_ready_to_send());
383 }
384
385 #[test]
386 fn test_new_very_slow() {
387 let ch = Channel::new(2.0);
388 assert_eq!(ch.window, 1);
389 assert_eq!(ch.window_max, 1);
390 }
391
392 #[test]
393 fn test_send_receive() {
394 let mut ch = Channel::new(0.1);
395 let actions = ch.send(0x01, b"hello", 1.0, 500).unwrap();
396 assert_eq!(actions.len(), 1);
397 match &actions[0] {
398 ChannelAction::SendOnLink { raw, sequence } => {
399 assert_eq!(*sequence, 0);
400 let mut ch2 = Channel::new(0.1);
402 let recv_actions = ch2.receive(raw, 1.1);
403 assert_eq!(recv_actions.len(), 1);
404 match &recv_actions[0] {
405 ChannelAction::MessageReceived {
406 msgtype,
407 payload,
408 sequence,
409 } => {
410 assert_eq!(*msgtype, 0x01);
411 assert_eq!(payload, b"hello");
412 assert_eq!(*sequence, 0);
413 }
414 _ => panic!("Expected MessageReceived"),
415 }
416 }
417 _ => panic!("Expected SendOnLink"),
418 }
419 }
420
421 #[test]
422 fn test_send_not_ready() {
423 let mut ch = Channel::new(0.1);
424 ch.send(0x01, b"a", 1.0, 500).unwrap();
426 ch.send(0x01, b"b", 1.0, 500).unwrap();
427 assert!(!ch.is_ready_to_send());
429 assert_eq!(ch.send(0x01, b"c", 1.0, 500), Err(ChannelError::NotReady));
430 }
431
432 #[test]
433 fn test_packet_delivered_grows_window() {
434 let mut ch = Channel::new(0.1);
435 ch.send(0x01, b"a", 1.0, 500).unwrap();
436 ch.send(0x01, b"b", 1.0, 500).unwrap();
437
438 assert_eq!(ch.window, 2);
439 ch.packet_delivered(0);
440 assert_eq!(ch.window, 3);
441 }
442
443 #[test]
444 fn test_packet_timeout_shrinks_window() {
445 let mut ch = Channel::new(0.1);
446 ch.send(0x01, b"a", 1.0, 500).unwrap();
447 ch.send(0x01, b"b", 1.0, 500).unwrap();
448
449 ch.packet_delivered(0);
451 assert_eq!(ch.window, 3);
452
453 let actions = ch.packet_timeout(1, 2.0);
455 assert_eq!(actions.len(), 1); assert_eq!(ch.window, 2);
457 }
458
459 #[test]
460 fn test_tick_retransmits_timed_out_packets() {
461 let mut ch = Channel::new(0.1);
462 ch.send(0x01, b"a", 0.0, 500).unwrap();
463
464 let timeout = ch.get_packet_timeout(1);
465 let actions = ch.tick(timeout + 0.01);
466 assert_eq!(actions.len(), 1);
467 match &actions[0] {
468 ChannelAction::SendOnLink { sequence, .. } => assert_eq!(*sequence, 0),
469 _ => panic!("Expected SendOnLink"),
470 }
471 assert_eq!(ch.get_tries(0), Some(2));
472 }
473
474 #[test]
475 fn test_max_retries_teardown() {
476 let mut ch = Channel::new(0.1);
477 ch.send(0x01, b"a", 1.0, 500).unwrap();
478
479 for i in 0..4 {
481 let actions = ch.packet_timeout(0, 2.0 + i as f64);
482 assert_eq!(actions.len(), 1);
483 match &actions[0] {
484 ChannelAction::SendOnLink { .. } => {}
485 _ => panic!("Expected SendOnLink"),
486 }
487 }
488
489 let actions = ch.packet_timeout(0, 10.0);
491 assert_eq!(actions.len(), 1);
492 match &actions[0] {
493 ChannelAction::TeardownLink => {}
494 _ => panic!("Expected TeardownLink"),
495 }
496 }
497
498 #[test]
499 fn test_sequence_wrapping() {
500 let mut ch = Channel::new(0.1);
501 ch.next_sequence = CHANNEL_SEQ_MAX;
502
503 ch.send(0x01, b"wrap", 1.0, 500).unwrap();
504 assert_eq!(ch.next_sequence, 0);
505
506 ch.send(0x01, b"after", 1.0, 500).unwrap();
507 assert_eq!(ch.next_sequence, 1);
508 }
509
510 #[test]
511 fn test_out_of_order_buffering() {
512 let mut ch = Channel::new(0.1);
513
514 let raw0 = pack_envelope(0x01, 0, b"first");
516 let raw1 = pack_envelope(0x01, 1, b"second");
517
518 let actions = ch.receive(&raw1, 1.0);
520 assert!(actions.is_empty()); let actions = ch.receive(&raw0, 1.1);
524 assert_eq!(actions.len(), 2); match &actions[0] {
526 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
527 _ => panic!("Expected MessageReceived"),
528 }
529 match &actions[1] {
530 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 1),
531 _ => panic!("Expected MessageReceived"),
532 }
533 }
534
535 #[test]
536 fn test_duplicate_rejection() {
537 let mut ch = Channel::new(0.1);
538 let raw = pack_envelope(0x01, 0, b"hello");
539
540 let actions = ch.receive(&raw, 1.0);
541 assert_eq!(actions.len(), 1);
542
543 let actions = ch.receive(&raw, 1.1);
545 assert!(actions.is_empty());
546 }
547
548 #[test]
549 fn test_get_packet_timeout() {
550 let ch = Channel::new(0.1);
551 let t1 = ch.get_packet_timeout(1);
552 let t2 = ch.get_packet_timeout(2);
553 assert!(t2 > t1); }
555
556 #[test]
557 fn test_mdu() {
558 let ch = Channel::new(0.1);
559 assert_eq!(ch.mdu(431), 431 - CHANNEL_ENVELOPE_OVERHEAD);
560 }
561
562 #[test]
563 fn test_window_upgrade_fast() {
564 let mut ch = Channel::new(0.05); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
566
567 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
569 ch.send(0x01, b"x", i as f64, 500).unwrap();
570 ch.packet_delivered(i);
571 }
572
573 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_FAST);
574 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_FAST);
575 }
576
577 #[test]
578 fn test_window_upgrade_medium() {
579 let mut ch = Channel::new(0.5); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
581
582 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
583 ch.send(0x01, b"x", i as f64, 500).unwrap();
584 ch.packet_delivered(i);
585 }
586
587 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_MEDIUM);
588 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM);
589 }
590
591 #[test]
592 fn test_shutdown() {
593 let mut ch = Channel::new(0.1);
594 ch.send(0x01, b"a", 1.0, 500).unwrap();
595 ch.shutdown();
596 assert_eq!(ch.outstanding(), 0);
597 }
598
599 #[test]
600 fn test_message_too_big() {
601 let mut ch = Channel::new(0.1);
602 let big = vec![0u8; 500];
603 assert_eq!(
605 ch.send(0x01, &big, 1.0, 10),
606 Err(ChannelError::MessageTooBig)
607 );
608 }
609
610 #[test]
611 fn test_receive_sequence_wrap_at_boundary() {
612 let mut ch = Channel::new(0.1);
613 ch.next_rx_sequence = CHANNEL_SEQ_MAX;
614
615 let raw_max = pack_envelope(0x01, CHANNEL_SEQ_MAX, b"last");
616 let raw_zero = pack_envelope(0x01, 0, b"first_after_wrap");
617
618 let actions = ch.receive(&raw_max, 1.0);
619 assert_eq!(actions.len(), 1);
620 assert_eq!(ch.next_rx_sequence, 0);
621
622 let actions = ch.receive(&raw_zero, 1.1);
623 assert_eq!(actions.len(), 1);
624 match &actions[0] {
625 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
626 _ => panic!("Expected MessageReceived"),
627 }
628 }
629
630 #[test]
631 fn test_receive_wrap_boundary_out_of_order() {
632 let mut ch = Channel::new(0.1);
634 ch.next_rx_sequence = 0xFFFE;
635
636 let raw_fffe = pack_envelope(0x01, 0xFFFE, b"a");
637 let raw_ffff = pack_envelope(0x01, 0xFFFF, b"b");
638 let raw_0000 = pack_envelope(0x01, 0x0000, b"c");
639
640 let actions = ch.receive(&raw_0000, 1.0);
642 assert!(actions.is_empty()); let actions = ch.receive(&raw_ffff, 1.1);
645 assert!(actions.is_empty()); let actions = ch.receive(&raw_fffe, 1.2);
648 assert_eq!(actions.len(), 3); match &actions[0] {
650 ChannelAction::MessageReceived {
651 sequence, payload, ..
652 } => {
653 assert_eq!(*sequence, 0xFFFE);
654 assert_eq!(payload, b"a");
655 }
656 _ => panic!("Expected MessageReceived"),
657 }
658 match &actions[1] {
659 ChannelAction::MessageReceived {
660 sequence, payload, ..
661 } => {
662 assert_eq!(*sequence, 0xFFFF);
663 assert_eq!(payload, b"b");
664 }
665 _ => panic!("Expected MessageReceived"),
666 }
667 match &actions[2] {
668 ChannelAction::MessageReceived {
669 sequence, payload, ..
670 } => {
671 assert_eq!(*sequence, 0x0000);
672 assert_eq!(payload, b"c");
673 }
674 _ => panic!("Expected MessageReceived"),
675 }
676 }
677
678 #[test]
679 fn test_many_messages_in_order() {
680 let mut sender = Channel::new(0.05);
681 let mut receiver = Channel::new(0.05);
682
683 for i in 0..20u16 {
684 if i >= 2 {
686 sender.packet_delivered(i - 2);
687 }
688
689 let actions = sender.send(0x01, &[i as u8], i as f64, 500).unwrap();
690 let raw = match &actions[0] {
691 ChannelAction::SendOnLink { raw, .. } => raw.clone(),
692 _ => panic!("Expected SendOnLink"),
693 };
694
695 let recv_actions = receiver.receive(&raw, i as f64 + 0.1);
696 assert_eq!(recv_actions.len(), 1);
697 match &recv_actions[0] {
698 ChannelAction::MessageReceived {
699 payload, sequence, ..
700 } => {
701 assert_eq!(*sequence, i);
702 assert_eq!(payload, &[i as u8]);
703 }
704 _ => panic!("Expected MessageReceived"),
705 }
706 }
707 }
708}