1pub mod envelope;
2pub mod types;
3
4use alloc::collections::VecDeque;
5use alloc::vec::Vec;
6
7#[cfg(test)]
8use crate::constants::CHANNEL_SEQ_MAX;
9use crate::constants::{
10 CHANNEL_ENVELOPE_OVERHEAD, CHANNEL_FAST_RATE_THRESHOLD, CHANNEL_MAX_TRIES, CHANNEL_RTT_FAST,
11 CHANNEL_RTT_MEDIUM, CHANNEL_RTT_SLOW, CHANNEL_SEQ_MODULUS, CHANNEL_WINDOW,
12 CHANNEL_WINDOW_FLEXIBILITY, CHANNEL_WINDOW_MAX_FAST, CHANNEL_WINDOW_MAX_MEDIUM,
13 CHANNEL_WINDOW_MAX_SLOW, CHANNEL_WINDOW_MIN, CHANNEL_WINDOW_MIN_LIMIT_FAST,
14 CHANNEL_WINDOW_MIN_LIMIT_MEDIUM,
15};
16
17pub use types::{ChannelAction, ChannelError, MessageType, Sequence};
18
19use envelope::{pack_envelope, unpack_envelope};
20
21struct Envelope {
23 sequence: Sequence,
24 raw: Vec<u8>,
25 tries: u8,
26 sent_at: f64,
27 delivered: bool,
28}
29
30pub struct Channel {
35 tx_ring: VecDeque<Envelope>,
36 rx_ring: VecDeque<Envelope>,
37 next_sequence: u16,
38 next_rx_sequence: u16,
39 window: u16,
40 window_max: u16,
41 window_min: u16,
42 window_flexibility: u16,
43 fast_rate_rounds: u16,
44 medium_rate_rounds: u16,
45 max_tries: u8,
46 rtt: f64,
47}
48
49impl Channel {
50 pub fn new(initial_rtt: f64) -> Self {
52 let (window, window_max, window_min, window_flexibility) = if initial_rtt > CHANNEL_RTT_SLOW
53 {
54 (1, 1, 1, 1)
55 } else {
56 (
57 CHANNEL_WINDOW,
58 CHANNEL_WINDOW_MAX_SLOW,
59 CHANNEL_WINDOW_MIN,
60 CHANNEL_WINDOW_FLEXIBILITY,
61 )
62 };
63
64 Channel {
65 tx_ring: VecDeque::new(),
66 rx_ring: VecDeque::new(),
67 next_sequence: 0,
68 next_rx_sequence: 0,
69 window,
70 window_max,
71 window_min,
72 window_flexibility,
73 fast_rate_rounds: 0,
74 medium_rate_rounds: 0,
75 max_tries: CHANNEL_MAX_TRIES,
76 rtt: initial_rtt,
77 }
78 }
79
80 pub fn set_rtt(&mut self, rtt: f64) {
82 self.rtt = rtt;
83 }
84
85 pub fn mdu(&self, link_mdu: usize) -> usize {
87 let mdu = link_mdu.saturating_sub(CHANNEL_ENVELOPE_OVERHEAD);
88 mdu.min(0xFFFF)
89 }
90
91 pub fn is_ready_to_send(&self) -> bool {
93 let outstanding = self.tx_ring.iter().filter(|e| !e.delivered).count() as u16;
94 outstanding < self.window
95 }
96
97 pub fn send(
99 &mut self,
100 msgtype: u16,
101 payload: &[u8],
102 now: f64,
103 link_mdu: usize,
104 ) -> Result<Vec<ChannelAction>, ChannelError> {
105 if !self.is_ready_to_send() {
106 return Err(ChannelError::NotReady);
107 }
108
109 let sequence = self.next_sequence;
110 self.next_sequence = ((self.next_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
111
112 let raw = pack_envelope(msgtype, sequence, payload);
113 if raw.len() > link_mdu {
114 return Err(ChannelError::MessageTooBig);
115 }
116
117 self.tx_ring.push_back(Envelope {
118 sequence,
119 raw: raw.clone(),
120 tries: 1,
121 sent_at: now,
122 delivered: false,
123 });
124
125 Ok(alloc::vec![ChannelAction::SendOnLink { raw }])
126 }
127
128 pub fn receive(&mut self, raw: &[u8], _now: f64) -> Vec<ChannelAction> {
133 let (_msgtype, sequence, _payload) = match unpack_envelope(raw) {
134 Ok(r) => r,
135 Err(_) => return Vec::new(),
136 };
137
138 if self.is_behind_rx_window(sequence) {
140 return Vec::new();
141 }
142
143 if self.rx_ring.iter().any(|e| e.sequence == sequence) {
145 return Vec::new();
146 }
147
148 let envelope = Envelope {
150 sequence,
151 raw: raw.to_vec(),
152 tries: 0,
153 sent_at: 0.0,
154 delivered: false,
155 };
156 self.emplace_rx(envelope);
157
158 self.collect_contiguous()
160 }
161
162 pub fn flush_tx(&mut self) {
165 self.tx_ring.clear();
166 }
167
168 pub fn packet_delivered(&mut self, sequence: Sequence) -> Vec<ChannelAction> {
170 if let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) {
171 self.tx_ring.remove(pos);
172
173 if self.window < self.window_max {
174 self.window += 1;
175 }
176
177 self.adapt_window_on_delivery();
179 }
180 Vec::new()
181 }
182
183 pub fn packet_timeout(&mut self, sequence: Sequence, now: f64) -> Vec<ChannelAction> {
185 let pos = match self.tx_ring.iter().position(|e| e.sequence == sequence) {
186 Some(p) => p,
187 None => return Vec::new(),
188 };
189
190 let envelope = &self.tx_ring[pos];
191 if envelope.tries >= self.max_tries {
192 self.tx_ring.clear();
193 self.rx_ring.clear();
194 return alloc::vec![ChannelAction::TeardownLink];
195 }
196
197 let envelope = &mut self.tx_ring[pos];
199 envelope.tries += 1;
200 envelope.sent_at = now;
201 let raw = envelope.raw.clone();
202
203 if self.window > self.window_min {
205 self.window -= 1;
206 if self.window_max > self.window_min + self.window_flexibility {
207 self.window_max -= 1;
208 }
209 }
210
211 alloc::vec![ChannelAction::SendOnLink { raw }]
212 }
213
214 pub fn get_packet_timeout(&self, tries: u8) -> f64 {
218 let base = 1.5_f64.powi((tries as i32) - 1);
219 let rtt_factor = (self.rtt * 2.5).max(0.025);
220 let ring_factor = (self.tx_ring.len() as f64) + 1.5;
221 base * rtt_factor * ring_factor
222 }
223
224 pub fn get_tries(&self, sequence: Sequence) -> Option<u8> {
226 self.tx_ring
227 .iter()
228 .find(|e| e.sequence == sequence)
229 .map(|e| e.tries)
230 }
231
232 pub fn shutdown(&mut self) {
234 self.tx_ring.clear();
235 self.rx_ring.clear();
236 }
237
238 pub fn window(&self) -> u16 {
240 self.window
241 }
242
243 pub fn window_max(&self) -> u16 {
245 self.window_max
246 }
247
248 pub fn outstanding(&self) -> usize {
250 self.tx_ring.iter().filter(|e| !e.delivered).count()
251 }
252
253 fn is_behind_rx_window(&self, sequence: Sequence) -> bool {
256 if sequence < self.next_rx_sequence {
257 let window_overflow = (self.next_rx_sequence as u32 + CHANNEL_WINDOW_MAX_FAST as u32)
258 % CHANNEL_SEQ_MODULUS;
259 let overflow = window_overflow as u16;
260 if overflow < self.next_rx_sequence {
261 if sequence > overflow {
263 return true; }
265 return false; }
267 return true;
268 }
269 false
270 }
271
272 fn emplace_rx(&mut self, envelope: Envelope) {
273 let env_dist = envelope.sequence.wrapping_sub(self.next_rx_sequence);
276 for (i, existing) in self.rx_ring.iter().enumerate() {
277 if envelope.sequence == existing.sequence {
278 return; }
280 let exist_dist = existing.sequence.wrapping_sub(self.next_rx_sequence);
281 if env_dist < exist_dist {
282 self.rx_ring.insert(i, envelope);
283 return;
284 }
285 }
286 self.rx_ring.push_back(envelope);
287 }
288
289 fn collect_contiguous(&mut self) -> Vec<ChannelAction> {
290 let mut actions = Vec::new();
291
292 loop {
293 let front_match = self
294 .rx_ring
295 .front()
296 .map(|e| e.sequence == self.next_rx_sequence)
297 .unwrap_or(false);
298
299 if !front_match {
300 break;
301 }
302
303 let envelope = self.rx_ring.pop_front().unwrap();
304
305 if let Ok((msgtype, _seq, payload)) = unpack_envelope(&envelope.raw) {
307 actions.push(ChannelAction::MessageReceived {
308 msgtype,
309 payload: payload.to_vec(),
310 sequence: envelope.sequence,
311 });
312 }
313
314 self.next_rx_sequence =
315 ((self.next_rx_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
316
317 if self.next_rx_sequence == 0 {
319 }
321 }
322
323 actions
324 }
325
326 fn adapt_window_on_delivery(&mut self) {
327 if self.rtt == 0.0 {
328 return;
329 }
330
331 if self.rtt > CHANNEL_RTT_FAST {
332 self.fast_rate_rounds = 0;
333
334 if self.rtt > CHANNEL_RTT_MEDIUM {
335 self.medium_rate_rounds = 0;
336 } else {
337 self.medium_rate_rounds += 1;
338 if self.window_max < CHANNEL_WINDOW_MAX_MEDIUM
339 && self.medium_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
340 {
341 self.window_max = CHANNEL_WINDOW_MAX_MEDIUM;
342 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_MEDIUM;
343 }
344 }
345 } else {
346 self.fast_rate_rounds += 1;
347 if self.window_max < CHANNEL_WINDOW_MAX_FAST
348 && self.fast_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
349 {
350 self.window_max = CHANNEL_WINDOW_MAX_FAST;
351 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_FAST;
352 }
353 }
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360
361 #[test]
362 fn test_new_default() {
363 let ch = Channel::new(0.5);
364 assert_eq!(ch.window, CHANNEL_WINDOW);
365 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_SLOW);
366 assert!(ch.is_ready_to_send());
367 }
368
369 #[test]
370 fn test_new_very_slow() {
371 let ch = Channel::new(2.0);
372 assert_eq!(ch.window, 1);
373 assert_eq!(ch.window_max, 1);
374 }
375
376 #[test]
377 fn test_send_receive() {
378 let mut ch = Channel::new(0.1);
379 let actions = ch.send(0x01, b"hello", 1.0, 500).unwrap();
380 assert_eq!(actions.len(), 1);
381 match &actions[0] {
382 ChannelAction::SendOnLink { raw } => {
383 let mut ch2 = Channel::new(0.1);
385 let recv_actions = ch2.receive(raw, 1.1);
386 assert_eq!(recv_actions.len(), 1);
387 match &recv_actions[0] {
388 ChannelAction::MessageReceived {
389 msgtype,
390 payload,
391 sequence,
392 } => {
393 assert_eq!(*msgtype, 0x01);
394 assert_eq!(payload, b"hello");
395 assert_eq!(*sequence, 0);
396 }
397 _ => panic!("Expected MessageReceived"),
398 }
399 }
400 _ => panic!("Expected SendOnLink"),
401 }
402 }
403
404 #[test]
405 fn test_send_not_ready() {
406 let mut ch = Channel::new(0.1);
407 ch.send(0x01, b"a", 1.0, 500).unwrap();
409 ch.send(0x01, b"b", 1.0, 500).unwrap();
410 assert!(!ch.is_ready_to_send());
412 assert_eq!(ch.send(0x01, b"c", 1.0, 500), Err(ChannelError::NotReady));
413 }
414
415 #[test]
416 fn test_packet_delivered_grows_window() {
417 let mut ch = Channel::new(0.1);
418 ch.send(0x01, b"a", 1.0, 500).unwrap();
419 ch.send(0x01, b"b", 1.0, 500).unwrap();
420
421 assert_eq!(ch.window, 2);
422 ch.packet_delivered(0);
423 assert_eq!(ch.window, 3);
424 }
425
426 #[test]
427 fn test_packet_timeout_shrinks_window() {
428 let mut ch = Channel::new(0.1);
429 ch.send(0x01, b"a", 1.0, 500).unwrap();
430 ch.send(0x01, b"b", 1.0, 500).unwrap();
431
432 ch.packet_delivered(0);
434 assert_eq!(ch.window, 3);
435
436 let actions = ch.packet_timeout(1, 2.0);
438 assert_eq!(actions.len(), 1); assert_eq!(ch.window, 2);
440 }
441
442 #[test]
443 fn test_max_retries_teardown() {
444 let mut ch = Channel::new(0.1);
445 ch.send(0x01, b"a", 1.0, 500).unwrap();
446
447 for i in 0..4 {
449 let actions = ch.packet_timeout(0, 2.0 + i as f64);
450 assert_eq!(actions.len(), 1);
451 match &actions[0] {
452 ChannelAction::SendOnLink { .. } => {}
453 _ => panic!("Expected SendOnLink"),
454 }
455 }
456
457 let actions = ch.packet_timeout(0, 10.0);
459 assert_eq!(actions.len(), 1);
460 match &actions[0] {
461 ChannelAction::TeardownLink => {}
462 _ => panic!("Expected TeardownLink"),
463 }
464 }
465
466 #[test]
467 fn test_sequence_wrapping() {
468 let mut ch = Channel::new(0.1);
469 ch.next_sequence = CHANNEL_SEQ_MAX;
470
471 ch.send(0x01, b"wrap", 1.0, 500).unwrap();
472 assert_eq!(ch.next_sequence, 0);
473
474 ch.send(0x01, b"after", 1.0, 500).unwrap();
475 assert_eq!(ch.next_sequence, 1);
476 }
477
478 #[test]
479 fn test_out_of_order_buffering() {
480 let mut ch = Channel::new(0.1);
481
482 let raw0 = pack_envelope(0x01, 0, b"first");
484 let raw1 = pack_envelope(0x01, 1, b"second");
485
486 let actions = ch.receive(&raw1, 1.0);
488 assert!(actions.is_empty()); let actions = ch.receive(&raw0, 1.1);
492 assert_eq!(actions.len(), 2); match &actions[0] {
494 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
495 _ => panic!("Expected MessageReceived"),
496 }
497 match &actions[1] {
498 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 1),
499 _ => panic!("Expected MessageReceived"),
500 }
501 }
502
503 #[test]
504 fn test_duplicate_rejection() {
505 let mut ch = Channel::new(0.1);
506 let raw = pack_envelope(0x01, 0, b"hello");
507
508 let actions = ch.receive(&raw, 1.0);
509 assert_eq!(actions.len(), 1);
510
511 let actions = ch.receive(&raw, 1.1);
513 assert!(actions.is_empty());
514 }
515
516 #[test]
517 fn test_get_packet_timeout() {
518 let ch = Channel::new(0.1);
519 let t1 = ch.get_packet_timeout(1);
520 let t2 = ch.get_packet_timeout(2);
521 assert!(t2 > t1); }
523
524 #[test]
525 fn test_mdu() {
526 let ch = Channel::new(0.1);
527 assert_eq!(ch.mdu(431), 431 - CHANNEL_ENVELOPE_OVERHEAD);
528 }
529
530 #[test]
531 fn test_window_upgrade_fast() {
532 let mut ch = Channel::new(0.05); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
534
535 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
537 ch.send(0x01, b"x", i as f64, 500).unwrap();
538 ch.packet_delivered(i);
539 }
540
541 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_FAST);
542 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_FAST);
543 }
544
545 #[test]
546 fn test_window_upgrade_medium() {
547 let mut ch = Channel::new(0.5); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
549
550 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
551 ch.send(0x01, b"x", i as f64, 500).unwrap();
552 ch.packet_delivered(i);
553 }
554
555 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_MEDIUM);
556 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM);
557 }
558
559 #[test]
560 fn test_shutdown() {
561 let mut ch = Channel::new(0.1);
562 ch.send(0x01, b"a", 1.0, 500).unwrap();
563 ch.shutdown();
564 assert_eq!(ch.outstanding(), 0);
565 }
566
567 #[test]
568 fn test_message_too_big() {
569 let mut ch = Channel::new(0.1);
570 let big = vec![0u8; 500];
571 assert_eq!(
573 ch.send(0x01, &big, 1.0, 10),
574 Err(ChannelError::MessageTooBig)
575 );
576 }
577
578 #[test]
579 fn test_receive_sequence_wrap_at_boundary() {
580 let mut ch = Channel::new(0.1);
581 ch.next_rx_sequence = CHANNEL_SEQ_MAX;
582
583 let raw_max = pack_envelope(0x01, CHANNEL_SEQ_MAX, b"last");
584 let raw_zero = pack_envelope(0x01, 0, b"first_after_wrap");
585
586 let actions = ch.receive(&raw_max, 1.0);
587 assert_eq!(actions.len(), 1);
588 assert_eq!(ch.next_rx_sequence, 0);
589
590 let actions = ch.receive(&raw_zero, 1.1);
591 assert_eq!(actions.len(), 1);
592 match &actions[0] {
593 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
594 _ => panic!("Expected MessageReceived"),
595 }
596 }
597
598 #[test]
599 fn test_receive_wrap_boundary_out_of_order() {
600 let mut ch = Channel::new(0.1);
602 ch.next_rx_sequence = 0xFFFE;
603
604 let raw_fffe = pack_envelope(0x01, 0xFFFE, b"a");
605 let raw_ffff = pack_envelope(0x01, 0xFFFF, b"b");
606 let raw_0000 = pack_envelope(0x01, 0x0000, b"c");
607
608 let actions = ch.receive(&raw_0000, 1.0);
610 assert!(actions.is_empty()); let actions = ch.receive(&raw_ffff, 1.1);
613 assert!(actions.is_empty()); let actions = ch.receive(&raw_fffe, 1.2);
616 assert_eq!(actions.len(), 3); match &actions[0] {
618 ChannelAction::MessageReceived {
619 sequence, payload, ..
620 } => {
621 assert_eq!(*sequence, 0xFFFE);
622 assert_eq!(payload, b"a");
623 }
624 _ => panic!("Expected MessageReceived"),
625 }
626 match &actions[1] {
627 ChannelAction::MessageReceived {
628 sequence, payload, ..
629 } => {
630 assert_eq!(*sequence, 0xFFFF);
631 assert_eq!(payload, b"b");
632 }
633 _ => panic!("Expected MessageReceived"),
634 }
635 match &actions[2] {
636 ChannelAction::MessageReceived {
637 sequence, payload, ..
638 } => {
639 assert_eq!(*sequence, 0x0000);
640 assert_eq!(payload, b"c");
641 }
642 _ => panic!("Expected MessageReceived"),
643 }
644 }
645
646 #[test]
647 fn test_many_messages_in_order() {
648 let mut sender = Channel::new(0.05);
649 let mut receiver = Channel::new(0.05);
650
651 for i in 0..20u16 {
652 if i >= 2 {
654 sender.packet_delivered(i - 2);
655 }
656
657 let actions = sender.send(0x01, &[i as u8], i as f64, 500).unwrap();
658 let raw = match &actions[0] {
659 ChannelAction::SendOnLink { raw } => raw.clone(),
660 _ => panic!("Expected SendOnLink"),
661 };
662
663 let recv_actions = receiver.receive(&raw, i as f64 + 0.1);
664 assert_eq!(recv_actions.len(), 1);
665 match &recv_actions[0] {
666 ChannelAction::MessageReceived {
667 payload, sequence, ..
668 } => {
669 assert_eq!(*sequence, i);
670 assert_eq!(payload, &[i as u8]);
671 }
672 _ => panic!("Expected MessageReceived"),
673 }
674 }
675 }
676}