1pub mod envelope;
2pub mod types;
3
4use alloc::collections::VecDeque;
5use alloc::vec::Vec;
6
7#[cfg(test)]
8use crate::constants::CHANNEL_SEQ_MAX;
9use crate::constants::{
10 CHANNEL_ENVELOPE_OVERHEAD, CHANNEL_FAST_RATE_THRESHOLD, CHANNEL_MAX_TRIES, CHANNEL_RTT_FAST,
11 CHANNEL_RTT_MEDIUM, CHANNEL_RTT_SLOW, CHANNEL_SEQ_MODULUS, CHANNEL_WINDOW,
12 CHANNEL_WINDOW_FLEXIBILITY, CHANNEL_WINDOW_MAX_FAST, CHANNEL_WINDOW_MAX_MEDIUM,
13 CHANNEL_WINDOW_MAX_SLOW, CHANNEL_WINDOW_MIN, CHANNEL_WINDOW_MIN_LIMIT_FAST,
14 CHANNEL_WINDOW_MIN_LIMIT_MEDIUM,
15};
16
17pub use types::{ChannelAction, ChannelError, MessageType, Sequence};
18
19use envelope::{pack_envelope, unpack_envelope};
20
21struct Envelope {
23 sequence: Sequence,
24 raw: Vec<u8>,
25 tries: u8,
26 sent_at: f64,
27 delivered: bool,
28}
29
30pub struct Channel {
35 tx_ring: VecDeque<Envelope>,
36 rx_ring: VecDeque<Envelope>,
37 next_sequence: u16,
38 next_rx_sequence: u16,
39 window: u16,
40 window_max: u16,
41 window_min: u16,
42 window_flexibility: u16,
43 fast_rate_rounds: u16,
44 medium_rate_rounds: u16,
45 max_tries: u8,
46 rtt: f64,
47}
48
49impl Channel {
50 pub fn new(initial_rtt: f64) -> Self {
52 let (window, window_max, window_min, window_flexibility) = if initial_rtt > CHANNEL_RTT_SLOW
53 {
54 (1, 1, 1, 1)
55 } else {
56 (
57 CHANNEL_WINDOW,
58 CHANNEL_WINDOW_MAX_SLOW,
59 CHANNEL_WINDOW_MIN,
60 CHANNEL_WINDOW_FLEXIBILITY,
61 )
62 };
63
64 Channel {
65 tx_ring: VecDeque::new(),
66 rx_ring: VecDeque::new(),
67 next_sequence: 0,
68 next_rx_sequence: 0,
69 window,
70 window_max,
71 window_min,
72 window_flexibility,
73 fast_rate_rounds: 0,
74 medium_rate_rounds: 0,
75 max_tries: CHANNEL_MAX_TRIES,
76 rtt: initial_rtt,
77 }
78 }
79
80 pub fn set_rtt(&mut self, rtt: f64) {
82 self.rtt = rtt;
83 }
84
85 pub fn mdu(&self, link_mdu: usize) -> usize {
87 let mdu = link_mdu.saturating_sub(CHANNEL_ENVELOPE_OVERHEAD);
88 mdu.min(0xFFFF)
89 }
90
91 pub fn is_ready_to_send(&self) -> bool {
93 let outstanding = self.tx_ring.iter().filter(|e| !e.delivered).count() as u16;
94 outstanding < self.window
95 }
96
97 pub fn send(
99 &mut self,
100 msgtype: u16,
101 payload: &[u8],
102 now: f64,
103 link_mdu: usize,
104 ) -> Result<Vec<ChannelAction>, ChannelError> {
105 if !self.is_ready_to_send() {
106 return Err(ChannelError::NotReady);
107 }
108
109 let sequence = self.next_sequence;
110 self.next_sequence = ((self.next_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
111
112 let raw = pack_envelope(msgtype, sequence, payload);
113 if raw.len() > link_mdu {
114 return Err(ChannelError::MessageTooBig);
115 }
116
117 self.tx_ring.push_back(Envelope {
118 sequence,
119 raw: raw.clone(),
120 tries: 1,
121 sent_at: now,
122 delivered: false,
123 });
124
125 Ok(alloc::vec![ChannelAction::SendOnLink { raw }])
126 }
127
128 pub fn receive(&mut self, raw: &[u8], _now: f64) -> Vec<ChannelAction> {
133 let (_msgtype, sequence, _payload) = match unpack_envelope(raw) {
134 Ok(r) => r,
135 Err(_) => return Vec::new(),
136 };
137
138 if self.is_behind_rx_window(sequence) {
140 return Vec::new();
141 }
142
143 if self.rx_ring.iter().any(|e| e.sequence == sequence) {
145 return Vec::new();
146 }
147
148 let envelope = Envelope {
150 sequence,
151 raw: raw.to_vec(),
152 tries: 0,
153 sent_at: 0.0,
154 delivered: false,
155 };
156 self.emplace_rx(envelope);
157
158 self.collect_contiguous()
160 }
161
162 pub fn flush_tx(&mut self) {
165 self.tx_ring.clear();
166 }
167
168 pub fn packet_delivered(&mut self, sequence: Sequence) -> Vec<ChannelAction> {
170 if let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) {
171 self.tx_ring.remove(pos);
172
173 if self.window < self.window_max {
174 self.window += 1;
175 }
176
177 self.adapt_window_on_delivery();
179 }
180 Vec::new()
181 }
182
183 pub fn packet_timeout(&mut self, sequence: Sequence, now: f64) -> Vec<ChannelAction> {
185 let pos = match self.tx_ring.iter().position(|e| e.sequence == sequence) {
186 Some(p) => p,
187 None => return Vec::new(),
188 };
189
190 let envelope = &self.tx_ring[pos];
191 if envelope.tries >= self.max_tries {
192 self.tx_ring.clear();
193 self.rx_ring.clear();
194 return alloc::vec![ChannelAction::TeardownLink];
195 }
196
197 let envelope = &mut self.tx_ring[pos];
199 envelope.tries += 1;
200 envelope.sent_at = now;
201 let raw = envelope.raw.clone();
202
203 if self.window > self.window_min {
205 self.window -= 1;
206 if self.window_max > self.window_min + self.window_flexibility {
207 self.window_max -= 1;
208 }
209 }
210
211 alloc::vec![ChannelAction::SendOnLink { raw }]
212 }
213
214 pub fn get_packet_timeout(&self, tries: u8) -> f64 {
218 let base = 1.5_f64.powi((tries as i32) - 1);
219 let rtt_factor = (self.rtt * 2.5).max(0.025);
220 let ring_factor = (self.tx_ring.len() as f64) + 1.5;
221 base * rtt_factor * ring_factor
222 }
223
224 pub fn get_tries(&self, sequence: Sequence) -> Option<u8> {
226 self.tx_ring
227 .iter()
228 .find(|e| e.sequence == sequence)
229 .map(|e| e.tries)
230 }
231
232 pub fn shutdown(&mut self) {
234 self.tx_ring.clear();
235 self.rx_ring.clear();
236 }
237
238 pub fn window(&self) -> u16 {
240 self.window
241 }
242
243 pub fn window_max(&self) -> u16 {
245 self.window_max
246 }
247
248 pub fn outstanding(&self) -> usize {
250 self.tx_ring.iter().filter(|e| !e.delivered).count()
251 }
252
253 fn is_behind_rx_window(&self, sequence: Sequence) -> bool {
256 if sequence < self.next_rx_sequence {
257 let window_overflow = (self.next_rx_sequence as u32 + CHANNEL_WINDOW_MAX_FAST as u32)
258 % CHANNEL_SEQ_MODULUS;
259 let overflow = window_overflow as u16;
260 if overflow < self.next_rx_sequence {
261 if sequence > overflow {
263 return true; }
265 return false; }
267 return true;
268 }
269 false
270 }
271
272 fn emplace_rx(&mut self, envelope: Envelope) {
273 let env_dist = envelope.sequence.wrapping_sub(self.next_rx_sequence);
276 let mut i = 0;
277 for existing in self.rx_ring.iter() {
278 if envelope.sequence == existing.sequence {
279 return; }
281 let exist_dist = existing.sequence.wrapping_sub(self.next_rx_sequence);
282 if env_dist < exist_dist {
283 self.rx_ring.insert(i, envelope);
284 return;
285 }
286 i += 1;
287 }
288 self.rx_ring.push_back(envelope);
289 }
290
291 fn collect_contiguous(&mut self) -> Vec<ChannelAction> {
292 let mut actions = Vec::new();
293
294 loop {
295 let front_match = self
296 .rx_ring
297 .front()
298 .map(|e| e.sequence == self.next_rx_sequence)
299 .unwrap_or(false);
300
301 if !front_match {
302 break;
303 }
304
305 let envelope = self.rx_ring.pop_front().unwrap();
306
307 if let Ok((msgtype, _seq, payload)) = unpack_envelope(&envelope.raw) {
309 actions.push(ChannelAction::MessageReceived {
310 msgtype,
311 payload: payload.to_vec(),
312 sequence: envelope.sequence,
313 });
314 }
315
316 self.next_rx_sequence =
317 ((self.next_rx_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
318
319 if self.next_rx_sequence == 0 {
321 }
323 }
324
325 actions
326 }
327
328 fn adapt_window_on_delivery(&mut self) {
329 if self.rtt == 0.0 {
330 return;
331 }
332
333 if self.rtt > CHANNEL_RTT_FAST {
334 self.fast_rate_rounds = 0;
335
336 if self.rtt > CHANNEL_RTT_MEDIUM {
337 self.medium_rate_rounds = 0;
338 } else {
339 self.medium_rate_rounds += 1;
340 if self.window_max < CHANNEL_WINDOW_MAX_MEDIUM
341 && self.medium_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
342 {
343 self.window_max = CHANNEL_WINDOW_MAX_MEDIUM;
344 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_MEDIUM;
345 }
346 }
347 } else {
348 self.fast_rate_rounds += 1;
349 if self.window_max < CHANNEL_WINDOW_MAX_FAST
350 && self.fast_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
351 {
352 self.window_max = CHANNEL_WINDOW_MAX_FAST;
353 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_FAST;
354 }
355 }
356 }
357}
358
359#[cfg(test)]
360mod tests {
361 use super::*;
362
363 #[test]
364 fn test_new_default() {
365 let ch = Channel::new(0.5);
366 assert_eq!(ch.window, CHANNEL_WINDOW);
367 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_SLOW);
368 assert!(ch.is_ready_to_send());
369 }
370
371 #[test]
372 fn test_new_very_slow() {
373 let ch = Channel::new(2.0);
374 assert_eq!(ch.window, 1);
375 assert_eq!(ch.window_max, 1);
376 }
377
378 #[test]
379 fn test_send_receive() {
380 let mut ch = Channel::new(0.1);
381 let actions = ch.send(0x01, b"hello", 1.0, 500).unwrap();
382 assert_eq!(actions.len(), 1);
383 match &actions[0] {
384 ChannelAction::SendOnLink { raw } => {
385 let mut ch2 = Channel::new(0.1);
387 let recv_actions = ch2.receive(raw, 1.1);
388 assert_eq!(recv_actions.len(), 1);
389 match &recv_actions[0] {
390 ChannelAction::MessageReceived {
391 msgtype,
392 payload,
393 sequence,
394 } => {
395 assert_eq!(*msgtype, 0x01);
396 assert_eq!(payload, b"hello");
397 assert_eq!(*sequence, 0);
398 }
399 _ => panic!("Expected MessageReceived"),
400 }
401 }
402 _ => panic!("Expected SendOnLink"),
403 }
404 }
405
406 #[test]
407 fn test_send_not_ready() {
408 let mut ch = Channel::new(0.1);
409 ch.send(0x01, b"a", 1.0, 500).unwrap();
411 ch.send(0x01, b"b", 1.0, 500).unwrap();
412 assert!(!ch.is_ready_to_send());
414 assert_eq!(ch.send(0x01, b"c", 1.0, 500), Err(ChannelError::NotReady));
415 }
416
417 #[test]
418 fn test_packet_delivered_grows_window() {
419 let mut ch = Channel::new(0.1);
420 ch.send(0x01, b"a", 1.0, 500).unwrap();
421 ch.send(0x01, b"b", 1.0, 500).unwrap();
422
423 assert_eq!(ch.window, 2);
424 ch.packet_delivered(0);
425 assert_eq!(ch.window, 3);
426 }
427
428 #[test]
429 fn test_packet_timeout_shrinks_window() {
430 let mut ch = Channel::new(0.1);
431 ch.send(0x01, b"a", 1.0, 500).unwrap();
432 ch.send(0x01, b"b", 1.0, 500).unwrap();
433
434 ch.packet_delivered(0);
436 assert_eq!(ch.window, 3);
437
438 let actions = ch.packet_timeout(1, 2.0);
440 assert_eq!(actions.len(), 1); assert_eq!(ch.window, 2);
442 }
443
444 #[test]
445 fn test_max_retries_teardown() {
446 let mut ch = Channel::new(0.1);
447 ch.send(0x01, b"a", 1.0, 500).unwrap();
448
449 for i in 0..4 {
451 let actions = ch.packet_timeout(0, 2.0 + i as f64);
452 assert_eq!(actions.len(), 1);
453 match &actions[0] {
454 ChannelAction::SendOnLink { .. } => {}
455 _ => panic!("Expected SendOnLink"),
456 }
457 }
458
459 let actions = ch.packet_timeout(0, 10.0);
461 assert_eq!(actions.len(), 1);
462 match &actions[0] {
463 ChannelAction::TeardownLink => {}
464 _ => panic!("Expected TeardownLink"),
465 }
466 }
467
468 #[test]
469 fn test_sequence_wrapping() {
470 let mut ch = Channel::new(0.1);
471 ch.next_sequence = CHANNEL_SEQ_MAX;
472
473 ch.send(0x01, b"wrap", 1.0, 500).unwrap();
474 assert_eq!(ch.next_sequence, 0);
475
476 ch.send(0x01, b"after", 1.0, 500).unwrap();
477 assert_eq!(ch.next_sequence, 1);
478 }
479
480 #[test]
481 fn test_out_of_order_buffering() {
482 let mut ch = Channel::new(0.1);
483
484 let raw0 = pack_envelope(0x01, 0, b"first");
486 let raw1 = pack_envelope(0x01, 1, b"second");
487
488 let actions = ch.receive(&raw1, 1.0);
490 assert!(actions.is_empty()); let actions = ch.receive(&raw0, 1.1);
494 assert_eq!(actions.len(), 2); match &actions[0] {
496 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
497 _ => panic!("Expected MessageReceived"),
498 }
499 match &actions[1] {
500 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 1),
501 _ => panic!("Expected MessageReceived"),
502 }
503 }
504
505 #[test]
506 fn test_duplicate_rejection() {
507 let mut ch = Channel::new(0.1);
508 let raw = pack_envelope(0x01, 0, b"hello");
509
510 let actions = ch.receive(&raw, 1.0);
511 assert_eq!(actions.len(), 1);
512
513 let actions = ch.receive(&raw, 1.1);
515 assert!(actions.is_empty());
516 }
517
518 #[test]
519 fn test_get_packet_timeout() {
520 let ch = Channel::new(0.1);
521 let t1 = ch.get_packet_timeout(1);
522 let t2 = ch.get_packet_timeout(2);
523 assert!(t2 > t1); }
525
526 #[test]
527 fn test_mdu() {
528 let ch = Channel::new(0.1);
529 assert_eq!(ch.mdu(431), 431 - CHANNEL_ENVELOPE_OVERHEAD);
530 }
531
532 #[test]
533 fn test_window_upgrade_fast() {
534 let mut ch = Channel::new(0.05); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
536
537 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
539 ch.send(0x01, b"x", i as f64, 500).unwrap();
540 ch.packet_delivered(i);
541 }
542
543 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_FAST);
544 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_FAST);
545 }
546
547 #[test]
548 fn test_window_upgrade_medium() {
549 let mut ch = Channel::new(0.5); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
551
552 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
553 ch.send(0x01, b"x", i as f64, 500).unwrap();
554 ch.packet_delivered(i);
555 }
556
557 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_MEDIUM);
558 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM);
559 }
560
561 #[test]
562 fn test_shutdown() {
563 let mut ch = Channel::new(0.1);
564 ch.send(0x01, b"a", 1.0, 500).unwrap();
565 ch.shutdown();
566 assert_eq!(ch.outstanding(), 0);
567 }
568
569 #[test]
570 fn test_message_too_big() {
571 let mut ch = Channel::new(0.1);
572 let big = vec![0u8; 500];
573 assert_eq!(
575 ch.send(0x01, &big, 1.0, 10),
576 Err(ChannelError::MessageTooBig)
577 );
578 }
579
580 #[test]
581 fn test_receive_sequence_wrap_at_boundary() {
582 let mut ch = Channel::new(0.1);
583 ch.next_rx_sequence = CHANNEL_SEQ_MAX;
584
585 let raw_max = pack_envelope(0x01, CHANNEL_SEQ_MAX, b"last");
586 let raw_zero = pack_envelope(0x01, 0, b"first_after_wrap");
587
588 let actions = ch.receive(&raw_max, 1.0);
589 assert_eq!(actions.len(), 1);
590 assert_eq!(ch.next_rx_sequence, 0);
591
592 let actions = ch.receive(&raw_zero, 1.1);
593 assert_eq!(actions.len(), 1);
594 match &actions[0] {
595 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
596 _ => panic!("Expected MessageReceived"),
597 }
598 }
599
600 #[test]
601 fn test_receive_wrap_boundary_out_of_order() {
602 let mut ch = Channel::new(0.1);
604 ch.next_rx_sequence = 0xFFFE;
605
606 let raw_fffe = pack_envelope(0x01, 0xFFFE, b"a");
607 let raw_ffff = pack_envelope(0x01, 0xFFFF, b"b");
608 let raw_0000 = pack_envelope(0x01, 0x0000, b"c");
609
610 let actions = ch.receive(&raw_0000, 1.0);
612 assert!(actions.is_empty()); let actions = ch.receive(&raw_ffff, 1.1);
615 assert!(actions.is_empty()); let actions = ch.receive(&raw_fffe, 1.2);
618 assert_eq!(actions.len(), 3); match &actions[0] {
620 ChannelAction::MessageReceived {
621 sequence, payload, ..
622 } => {
623 assert_eq!(*sequence, 0xFFFE);
624 assert_eq!(payload, b"a");
625 }
626 _ => panic!("Expected MessageReceived"),
627 }
628 match &actions[1] {
629 ChannelAction::MessageReceived {
630 sequence, payload, ..
631 } => {
632 assert_eq!(*sequence, 0xFFFF);
633 assert_eq!(payload, b"b");
634 }
635 _ => panic!("Expected MessageReceived"),
636 }
637 match &actions[2] {
638 ChannelAction::MessageReceived {
639 sequence, payload, ..
640 } => {
641 assert_eq!(*sequence, 0x0000);
642 assert_eq!(payload, b"c");
643 }
644 _ => panic!("Expected MessageReceived"),
645 }
646 }
647
648 #[test]
649 fn test_many_messages_in_order() {
650 let mut sender = Channel::new(0.05);
651 let mut receiver = Channel::new(0.05);
652
653 for i in 0..20u16 {
654 if i >= 2 {
656 sender.packet_delivered(i - 2);
657 }
658
659 let actions = sender.send(0x01, &[i as u8], i as f64, 500).unwrap();
660 let raw = match &actions[0] {
661 ChannelAction::SendOnLink { raw } => raw.clone(),
662 _ => panic!("Expected SendOnLink"),
663 };
664
665 let recv_actions = receiver.receive(&raw, i as f64 + 0.1);
666 assert_eq!(recv_actions.len(), 1);
667 match &recv_actions[0] {
668 ChannelAction::MessageReceived {
669 payload, sequence, ..
670 } => {
671 assert_eq!(*sequence, i);
672 assert_eq!(payload, &[i as u8]);
673 }
674 _ => panic!("Expected MessageReceived"),
675 }
676 }
677 }
678}