1pub mod types;
2pub mod envelope;
3
4use alloc::collections::VecDeque;
5use alloc::vec::Vec;
6
7use crate::constants::{
8 CHANNEL_ENVELOPE_OVERHEAD, CHANNEL_FAST_RATE_THRESHOLD, CHANNEL_MAX_TRIES,
9 CHANNEL_RTT_FAST, CHANNEL_RTT_MEDIUM, CHANNEL_RTT_SLOW,
10 CHANNEL_SEQ_MODULUS, CHANNEL_WINDOW, CHANNEL_WINDOW_FLEXIBILITY, CHANNEL_WINDOW_MAX_FAST,
11 CHANNEL_WINDOW_MAX_MEDIUM, CHANNEL_WINDOW_MAX_SLOW, CHANNEL_WINDOW_MIN,
12 CHANNEL_WINDOW_MIN_LIMIT_FAST, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM,
13};
14#[cfg(test)]
15use crate::constants::CHANNEL_SEQ_MAX;
16
17pub use types::{ChannelAction, ChannelError, MessageType, Sequence};
18
19use envelope::{pack_envelope, unpack_envelope};
20
21struct Envelope {
23 sequence: Sequence,
24 raw: Vec<u8>,
25 tries: u8,
26 sent_at: f64,
27 delivered: bool,
28}
29
30pub struct Channel {
35 tx_ring: VecDeque<Envelope>,
36 rx_ring: VecDeque<Envelope>,
37 next_sequence: u16,
38 next_rx_sequence: u16,
39 window: u16,
40 window_max: u16,
41 window_min: u16,
42 window_flexibility: u16,
43 fast_rate_rounds: u16,
44 medium_rate_rounds: u16,
45 max_tries: u8,
46 rtt: f64,
47}
48
49impl Channel {
50 pub fn new(initial_rtt: f64) -> Self {
52 let (window, window_max, window_min, window_flexibility) = if initial_rtt > CHANNEL_RTT_SLOW {
53 (1, 1, 1, 1)
54 } else {
55 (
56 CHANNEL_WINDOW,
57 CHANNEL_WINDOW_MAX_SLOW,
58 CHANNEL_WINDOW_MIN,
59 CHANNEL_WINDOW_FLEXIBILITY,
60 )
61 };
62
63 Channel {
64 tx_ring: VecDeque::new(),
65 rx_ring: VecDeque::new(),
66 next_sequence: 0,
67 next_rx_sequence: 0,
68 window,
69 window_max,
70 window_min,
71 window_flexibility,
72 fast_rate_rounds: 0,
73 medium_rate_rounds: 0,
74 max_tries: CHANNEL_MAX_TRIES,
75 rtt: initial_rtt,
76 }
77 }
78
79 pub fn set_rtt(&mut self, rtt: f64) {
81 self.rtt = rtt;
82 }
83
84 pub fn mdu(&self, link_mdu: usize) -> usize {
86 let mdu = link_mdu.saturating_sub(CHANNEL_ENVELOPE_OVERHEAD);
87 mdu.min(0xFFFF)
88 }
89
90 pub fn is_ready_to_send(&self) -> bool {
92 let outstanding = self.tx_ring.iter()
93 .filter(|e| !e.delivered)
94 .count() as u16;
95 outstanding < self.window
96 }
97
98 pub fn send(
100 &mut self,
101 msgtype: u16,
102 payload: &[u8],
103 now: f64,
104 link_mdu: usize,
105 ) -> Result<Vec<ChannelAction>, ChannelError> {
106 if !self.is_ready_to_send() {
107 return Err(ChannelError::NotReady);
108 }
109
110 let sequence = self.next_sequence;
111 self.next_sequence = ((self.next_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
112
113 let raw = pack_envelope(msgtype, sequence, payload);
114 if raw.len() > link_mdu {
115 return Err(ChannelError::MessageTooBig);
116 }
117
118 self.tx_ring.push_back(Envelope {
119 sequence,
120 raw: raw.clone(),
121 tries: 1,
122 sent_at: now,
123 delivered: false,
124 });
125
126 Ok(alloc::vec![ChannelAction::SendOnLink { raw }])
127 }
128
129 pub fn receive(&mut self, raw: &[u8], _now: f64) -> Vec<ChannelAction> {
134 let (_msgtype, sequence, _payload) = match unpack_envelope(raw) {
135 Ok(r) => r,
136 Err(_) => return Vec::new(),
137 };
138
139 if self.is_behind_rx_window(sequence) {
141 return Vec::new();
142 }
143
144 if self.rx_ring.iter().any(|e| e.sequence == sequence) {
146 return Vec::new();
147 }
148
149 let envelope = Envelope {
151 sequence,
152 raw: raw.to_vec(),
153 tries: 0,
154 sent_at: 0.0,
155 delivered: false,
156 };
157 self.emplace_rx(envelope);
158
159 self.collect_contiguous()
161 }
162
163 pub fn flush_tx(&mut self) {
166 self.tx_ring.clear();
167 }
168
169 pub fn packet_delivered(&mut self, sequence: Sequence) -> Vec<ChannelAction> {
171 if let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) {
172 self.tx_ring.remove(pos);
173
174 if self.window < self.window_max {
175 self.window += 1;
176 }
177
178 self.adapt_window_on_delivery();
180 }
181 Vec::new()
182 }
183
184 pub fn packet_timeout(&mut self, sequence: Sequence, now: f64) -> Vec<ChannelAction> {
186 let pos = match self.tx_ring.iter().position(|e| e.sequence == sequence) {
187 Some(p) => p,
188 None => return Vec::new(),
189 };
190
191 let envelope = &self.tx_ring[pos];
192 if envelope.tries >= self.max_tries {
193 self.tx_ring.clear();
194 self.rx_ring.clear();
195 return alloc::vec![ChannelAction::TeardownLink];
196 }
197
198 let envelope = &mut self.tx_ring[pos];
200 envelope.tries += 1;
201 envelope.sent_at = now;
202 let raw = envelope.raw.clone();
203
204 if self.window > self.window_min {
206 self.window -= 1;
207 if self.window_max > self.window_min + self.window_flexibility {
208 self.window_max -= 1;
209 }
210 }
211
212 alloc::vec![ChannelAction::SendOnLink { raw }]
213 }
214
215 pub fn get_packet_timeout(&self, tries: u8) -> f64 {
219 let base = 1.5_f64.powi((tries as i32) - 1);
220 let rtt_factor = (self.rtt * 2.5).max(0.025);
221 let ring_factor = (self.tx_ring.len() as f64) + 1.5;
222 base * rtt_factor * ring_factor
223 }
224
225 pub fn get_tries(&self, sequence: Sequence) -> Option<u8> {
227 self.tx_ring.iter()
228 .find(|e| e.sequence == sequence)
229 .map(|e| e.tries)
230 }
231
232 pub fn shutdown(&mut self) {
234 self.tx_ring.clear();
235 self.rx_ring.clear();
236 }
237
238 pub fn window(&self) -> u16 {
240 self.window
241 }
242
243 pub fn window_max(&self) -> u16 {
245 self.window_max
246 }
247
248 pub fn outstanding(&self) -> usize {
250 self.tx_ring.iter().filter(|e| !e.delivered).count()
251 }
252
253 fn is_behind_rx_window(&self, sequence: Sequence) -> bool {
256 if sequence < self.next_rx_sequence {
257 let window_overflow = (self.next_rx_sequence as u32 + CHANNEL_WINDOW_MAX_FAST as u32) % CHANNEL_SEQ_MODULUS;
258 let overflow = window_overflow as u16;
259 if overflow < self.next_rx_sequence {
260 if sequence > overflow {
262 return true; }
264 return false; }
266 return true;
267 }
268 false
269 }
270
271 fn emplace_rx(&mut self, envelope: Envelope) {
272 let env_dist = envelope.sequence.wrapping_sub(self.next_rx_sequence);
275 let mut i = 0;
276 for existing in self.rx_ring.iter() {
277 if envelope.sequence == existing.sequence {
278 return; }
280 let exist_dist = existing.sequence.wrapping_sub(self.next_rx_sequence);
281 if env_dist < exist_dist {
282 self.rx_ring.insert(i, envelope);
283 return;
284 }
285 i += 1;
286 }
287 self.rx_ring.push_back(envelope);
288 }
289
290 fn collect_contiguous(&mut self) -> Vec<ChannelAction> {
291 let mut actions = Vec::new();
292
293 loop {
294 let front_match = self.rx_ring.front()
295 .map(|e| e.sequence == self.next_rx_sequence)
296 .unwrap_or(false);
297
298 if !front_match {
299 break;
300 }
301
302 let envelope = self.rx_ring.pop_front().unwrap();
303
304 if let Ok((msgtype, _seq, payload)) = unpack_envelope(&envelope.raw) {
306 actions.push(ChannelAction::MessageReceived {
307 msgtype,
308 payload: payload.to_vec(),
309 sequence: envelope.sequence,
310 });
311 }
312
313 self.next_rx_sequence = ((self.next_rx_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
314
315 if self.next_rx_sequence == 0 {
317 }
319 }
320
321 actions
322 }
323
324 fn adapt_window_on_delivery(&mut self) {
325 if self.rtt == 0.0 {
326 return;
327 }
328
329 if self.rtt > CHANNEL_RTT_FAST {
330 self.fast_rate_rounds = 0;
331
332 if self.rtt > CHANNEL_RTT_MEDIUM {
333 self.medium_rate_rounds = 0;
334 } else {
335 self.medium_rate_rounds += 1;
336 if self.window_max < CHANNEL_WINDOW_MAX_MEDIUM
337 && self.medium_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
338 {
339 self.window_max = CHANNEL_WINDOW_MAX_MEDIUM;
340 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_MEDIUM;
341 }
342 }
343 } else {
344 self.fast_rate_rounds += 1;
345 if self.window_max < CHANNEL_WINDOW_MAX_FAST
346 && self.fast_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
347 {
348 self.window_max = CHANNEL_WINDOW_MAX_FAST;
349 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_FAST;
350 }
351 }
352 }
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_new_default() {
361 let ch = Channel::new(0.5);
362 assert_eq!(ch.window, CHANNEL_WINDOW);
363 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_SLOW);
364 assert!(ch.is_ready_to_send());
365 }
366
367 #[test]
368 fn test_new_very_slow() {
369 let ch = Channel::new(2.0);
370 assert_eq!(ch.window, 1);
371 assert_eq!(ch.window_max, 1);
372 }
373
374 #[test]
375 fn test_send_receive() {
376 let mut ch = Channel::new(0.1);
377 let actions = ch.send(0x01, b"hello", 1.0, 500).unwrap();
378 assert_eq!(actions.len(), 1);
379 match &actions[0] {
380 ChannelAction::SendOnLink { raw } => {
381 let mut ch2 = Channel::new(0.1);
383 let recv_actions = ch2.receive(raw, 1.1);
384 assert_eq!(recv_actions.len(), 1);
385 match &recv_actions[0] {
386 ChannelAction::MessageReceived { msgtype, payload, sequence } => {
387 assert_eq!(*msgtype, 0x01);
388 assert_eq!(payload, b"hello");
389 assert_eq!(*sequence, 0);
390 }
391 _ => panic!("Expected MessageReceived"),
392 }
393 }
394 _ => panic!("Expected SendOnLink"),
395 }
396 }
397
398 #[test]
399 fn test_send_not_ready() {
400 let mut ch = Channel::new(0.1);
401 ch.send(0x01, b"a", 1.0, 500).unwrap();
403 ch.send(0x01, b"b", 1.0, 500).unwrap();
404 assert!(!ch.is_ready_to_send());
406 assert_eq!(ch.send(0x01, b"c", 1.0, 500), Err(ChannelError::NotReady));
407 }
408
409 #[test]
410 fn test_packet_delivered_grows_window() {
411 let mut ch = Channel::new(0.1);
412 ch.send(0x01, b"a", 1.0, 500).unwrap();
413 ch.send(0x01, b"b", 1.0, 500).unwrap();
414
415 assert_eq!(ch.window, 2);
416 ch.packet_delivered(0);
417 assert_eq!(ch.window, 3);
418 }
419
420 #[test]
421 fn test_packet_timeout_shrinks_window() {
422 let mut ch = Channel::new(0.1);
423 ch.send(0x01, b"a", 1.0, 500).unwrap();
424 ch.send(0x01, b"b", 1.0, 500).unwrap();
425
426 ch.packet_delivered(0);
428 assert_eq!(ch.window, 3);
429
430 let actions = ch.packet_timeout(1, 2.0);
432 assert_eq!(actions.len(), 1); assert_eq!(ch.window, 2);
434 }
435
436 #[test]
437 fn test_max_retries_teardown() {
438 let mut ch = Channel::new(0.1);
439 ch.send(0x01, b"a", 1.0, 500).unwrap();
440
441 for i in 0..4 {
443 let actions = ch.packet_timeout(0, 2.0 + i as f64);
444 assert_eq!(actions.len(), 1);
445 match &actions[0] {
446 ChannelAction::SendOnLink { .. } => {}
447 _ => panic!("Expected SendOnLink"),
448 }
449 }
450
451 let actions = ch.packet_timeout(0, 10.0);
453 assert_eq!(actions.len(), 1);
454 match &actions[0] {
455 ChannelAction::TeardownLink => {}
456 _ => panic!("Expected TeardownLink"),
457 }
458 }
459
460 #[test]
461 fn test_sequence_wrapping() {
462 let mut ch = Channel::new(0.1);
463 ch.next_sequence = CHANNEL_SEQ_MAX;
464
465 ch.send(0x01, b"wrap", 1.0, 500).unwrap();
466 assert_eq!(ch.next_sequence, 0);
467
468 ch.send(0x01, b"after", 1.0, 500).unwrap();
469 assert_eq!(ch.next_sequence, 1);
470 }
471
472 #[test]
473 fn test_out_of_order_buffering() {
474 let mut ch = Channel::new(0.1);
475
476 let raw0 = pack_envelope(0x01, 0, b"first");
478 let raw1 = pack_envelope(0x01, 1, b"second");
479
480 let actions = ch.receive(&raw1, 1.0);
482 assert!(actions.is_empty()); let actions = ch.receive(&raw0, 1.1);
486 assert_eq!(actions.len(), 2); match &actions[0] {
488 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
489 _ => panic!("Expected MessageReceived"),
490 }
491 match &actions[1] {
492 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 1),
493 _ => panic!("Expected MessageReceived"),
494 }
495 }
496
497 #[test]
498 fn test_duplicate_rejection() {
499 let mut ch = Channel::new(0.1);
500 let raw = pack_envelope(0x01, 0, b"hello");
501
502 let actions = ch.receive(&raw, 1.0);
503 assert_eq!(actions.len(), 1);
504
505 let actions = ch.receive(&raw, 1.1);
507 assert!(actions.is_empty());
508 }
509
510 #[test]
511 fn test_get_packet_timeout() {
512 let ch = Channel::new(0.1);
513 let t1 = ch.get_packet_timeout(1);
514 let t2 = ch.get_packet_timeout(2);
515 assert!(t2 > t1); }
517
518 #[test]
519 fn test_mdu() {
520 let ch = Channel::new(0.1);
521 assert_eq!(ch.mdu(431), 431 - CHANNEL_ENVELOPE_OVERHEAD);
522 }
523
524 #[test]
525 fn test_window_upgrade_fast() {
526 let mut ch = Channel::new(0.05); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
528
529 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
531 ch.send(0x01, b"x", i as f64, 500).unwrap();
532 ch.packet_delivered(i);
533 }
534
535 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_FAST);
536 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_FAST);
537 }
538
539 #[test]
540 fn test_window_upgrade_medium() {
541 let mut ch = Channel::new(0.5); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
543
544 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
545 ch.send(0x01, b"x", i as f64, 500).unwrap();
546 ch.packet_delivered(i);
547 }
548
549 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_MEDIUM);
550 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM);
551 }
552
553 #[test]
554 fn test_shutdown() {
555 let mut ch = Channel::new(0.1);
556 ch.send(0x01, b"a", 1.0, 500).unwrap();
557 ch.shutdown();
558 assert_eq!(ch.outstanding(), 0);
559 }
560
561 #[test]
562 fn test_message_too_big() {
563 let mut ch = Channel::new(0.1);
564 let big = vec![0u8; 500];
565 assert_eq!(ch.send(0x01, &big, 1.0, 10), Err(ChannelError::MessageTooBig));
567 }
568
569 #[test]
570 fn test_receive_sequence_wrap_at_boundary() {
571 let mut ch = Channel::new(0.1);
572 ch.next_rx_sequence = CHANNEL_SEQ_MAX;
573
574 let raw_max = pack_envelope(0x01, CHANNEL_SEQ_MAX, b"last");
575 let raw_zero = pack_envelope(0x01, 0, b"first_after_wrap");
576
577 let actions = ch.receive(&raw_max, 1.0);
578 assert_eq!(actions.len(), 1);
579 assert_eq!(ch.next_rx_sequence, 0);
580
581 let actions = ch.receive(&raw_zero, 1.1);
582 assert_eq!(actions.len(), 1);
583 match &actions[0] {
584 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
585 _ => panic!("Expected MessageReceived"),
586 }
587 }
588
589 #[test]
590 fn test_receive_wrap_boundary_out_of_order() {
591 let mut ch = Channel::new(0.1);
593 ch.next_rx_sequence = 0xFFFE;
594
595 let raw_fffe = pack_envelope(0x01, 0xFFFE, b"a");
596 let raw_ffff = pack_envelope(0x01, 0xFFFF, b"b");
597 let raw_0000 = pack_envelope(0x01, 0x0000, b"c");
598
599 let actions = ch.receive(&raw_0000, 1.0);
601 assert!(actions.is_empty()); let actions = ch.receive(&raw_ffff, 1.1);
604 assert!(actions.is_empty()); let actions = ch.receive(&raw_fffe, 1.2);
607 assert_eq!(actions.len(), 3); match &actions[0] {
609 ChannelAction::MessageReceived { sequence, payload, .. } => {
610 assert_eq!(*sequence, 0xFFFE);
611 assert_eq!(payload, b"a");
612 }
613 _ => panic!("Expected MessageReceived"),
614 }
615 match &actions[1] {
616 ChannelAction::MessageReceived { sequence, payload, .. } => {
617 assert_eq!(*sequence, 0xFFFF);
618 assert_eq!(payload, b"b");
619 }
620 _ => panic!("Expected MessageReceived"),
621 }
622 match &actions[2] {
623 ChannelAction::MessageReceived { sequence, payload, .. } => {
624 assert_eq!(*sequence, 0x0000);
625 assert_eq!(payload, b"c");
626 }
627 _ => panic!("Expected MessageReceived"),
628 }
629 }
630
631 #[test]
632 fn test_many_messages_in_order() {
633 let mut sender = Channel::new(0.05);
634 let mut receiver = Channel::new(0.05);
635
636 for i in 0..20u16 {
637 if i >= 2 {
639 sender.packet_delivered(i - 2);
640 }
641
642 let actions = sender.send(0x01, &[i as u8], i as f64, 500).unwrap();
643 let raw = match &actions[0] {
644 ChannelAction::SendOnLink { raw } => raw.clone(),
645 _ => panic!("Expected SendOnLink"),
646 };
647
648 let recv_actions = receiver.receive(&raw, i as f64 + 0.1);
649 assert_eq!(recv_actions.len(), 1);
650 match &recv_actions[0] {
651 ChannelAction::MessageReceived { payload, sequence, .. } => {
652 assert_eq!(*sequence, i);
653 assert_eq!(payload, &[i as u8]);
654 }
655 _ => panic!("Expected MessageReceived"),
656 }
657 }
658 }
659}