1pub mod types;
2pub mod envelope;
3
4use alloc::collections::VecDeque;
5use alloc::vec::Vec;
6
7use crate::constants::{
8 CHANNEL_ENVELOPE_OVERHEAD, CHANNEL_FAST_RATE_THRESHOLD, CHANNEL_MAX_TRIES,
9 CHANNEL_RTT_FAST, CHANNEL_RTT_MEDIUM, CHANNEL_RTT_SLOW,
10 CHANNEL_SEQ_MODULUS, CHANNEL_WINDOW, CHANNEL_WINDOW_FLEXIBILITY, CHANNEL_WINDOW_MAX_FAST,
11 CHANNEL_WINDOW_MAX_MEDIUM, CHANNEL_WINDOW_MAX_SLOW, CHANNEL_WINDOW_MIN,
12 CHANNEL_WINDOW_MIN_LIMIT_FAST, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM,
13};
14#[cfg(test)]
15use crate::constants::CHANNEL_SEQ_MAX;
16
17pub use types::{ChannelAction, ChannelError, MessageType, Sequence};
18
19use envelope::{pack_envelope, unpack_envelope};
20
21struct Envelope {
23 sequence: Sequence,
24 msgtype: MessageType,
25 raw: Vec<u8>,
26 tries: u8,
27 sent_at: f64,
28 delivered: bool,
29}
30
31pub struct Channel {
36 tx_ring: VecDeque<Envelope>,
37 rx_ring: VecDeque<Envelope>,
38 next_sequence: u16,
39 next_rx_sequence: u16,
40 window: u16,
41 window_max: u16,
42 window_min: u16,
43 window_flexibility: u16,
44 fast_rate_rounds: u16,
45 medium_rate_rounds: u16,
46 max_tries: u8,
47 rtt: f64,
48}
49
50impl Channel {
51 pub fn new(initial_rtt: f64) -> Self {
53 let (window, window_max, window_min, window_flexibility) = if initial_rtt > CHANNEL_RTT_SLOW {
54 (1, 1, 1, 1)
55 } else {
56 (
57 CHANNEL_WINDOW,
58 CHANNEL_WINDOW_MAX_SLOW,
59 CHANNEL_WINDOW_MIN,
60 CHANNEL_WINDOW_FLEXIBILITY,
61 )
62 };
63
64 Channel {
65 tx_ring: VecDeque::new(),
66 rx_ring: VecDeque::new(),
67 next_sequence: 0,
68 next_rx_sequence: 0,
69 window,
70 window_max,
71 window_min,
72 window_flexibility,
73 fast_rate_rounds: 0,
74 medium_rate_rounds: 0,
75 max_tries: CHANNEL_MAX_TRIES,
76 rtt: initial_rtt,
77 }
78 }
79
80 pub fn set_rtt(&mut self, rtt: f64) {
82 self.rtt = rtt;
83 }
84
85 pub fn mdu(&self, link_mdu: usize) -> usize {
87 let mdu = link_mdu.saturating_sub(CHANNEL_ENVELOPE_OVERHEAD);
88 mdu.min(0xFFFF)
89 }
90
91 pub fn is_ready_to_send(&self) -> bool {
93 let outstanding = self.tx_ring.iter()
94 .filter(|e| !e.delivered)
95 .count() as u16;
96 outstanding < self.window
97 }
98
99 pub fn send(
101 &mut self,
102 msgtype: u16,
103 payload: &[u8],
104 now: f64,
105 link_mdu: usize,
106 ) -> Result<Vec<ChannelAction>, ChannelError> {
107 if !self.is_ready_to_send() {
108 return Err(ChannelError::NotReady);
109 }
110
111 let sequence = self.next_sequence;
112 self.next_sequence = ((self.next_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
113
114 let raw = pack_envelope(msgtype, sequence, payload);
115 if raw.len() > link_mdu {
116 return Err(ChannelError::MessageTooBig);
117 }
118
119 self.tx_ring.push_back(Envelope {
120 sequence,
121 msgtype,
122 raw: raw.clone(),
123 tries: 1,
124 sent_at: now,
125 delivered: false,
126 });
127
128 Ok(alloc::vec![ChannelAction::SendOnLink { raw }])
129 }
130
131 pub fn receive(&mut self, raw: &[u8], _now: f64) -> Vec<ChannelAction> {
136 let (msgtype, sequence, _payload) = match unpack_envelope(raw) {
137 Ok(r) => r,
138 Err(_) => return Vec::new(),
139 };
140
141 if self.is_behind_rx_window(sequence) {
143 return Vec::new();
144 }
145
146 if self.rx_ring.iter().any(|e| e.sequence == sequence) {
148 return Vec::new();
149 }
150
151 let envelope = Envelope {
153 sequence,
154 msgtype,
155 raw: raw.to_vec(),
156 tries: 0,
157 sent_at: 0.0,
158 delivered: false,
159 };
160 self.emplace_rx(envelope);
161
162 self.collect_contiguous()
164 }
165
166 pub fn packet_delivered(&mut self, sequence: Sequence) -> Vec<ChannelAction> {
168 if let Some(pos) = self.tx_ring.iter().position(|e| e.sequence == sequence) {
169 self.tx_ring.remove(pos);
170
171 if self.window < self.window_max {
172 self.window += 1;
173 }
174
175 self.adapt_window_on_delivery();
177 }
178 Vec::new()
179 }
180
181 pub fn packet_timeout(&mut self, sequence: Sequence, now: f64) -> Vec<ChannelAction> {
183 let pos = match self.tx_ring.iter().position(|e| e.sequence == sequence) {
184 Some(p) => p,
185 None => return Vec::new(),
186 };
187
188 let envelope = &self.tx_ring[pos];
189 if envelope.tries >= self.max_tries {
190 self.tx_ring.clear();
191 self.rx_ring.clear();
192 return alloc::vec![ChannelAction::TeardownLink];
193 }
194
195 let envelope = &mut self.tx_ring[pos];
197 envelope.tries += 1;
198 envelope.sent_at = now;
199 let raw = envelope.raw.clone();
200
201 if self.window > self.window_min {
203 self.window -= 1;
204 if self.window_max > self.window_min + self.window_flexibility {
205 self.window_max -= 1;
206 }
207 }
208
209 alloc::vec![ChannelAction::SendOnLink { raw }]
210 }
211
212 pub fn get_packet_timeout(&self, tries: u8) -> f64 {
216 let base = 1.5_f64.powi((tries as i32) - 1);
217 let rtt_factor = (self.rtt * 2.5).max(0.025);
218 let ring_factor = (self.tx_ring.len() as f64) + 1.5;
219 base * rtt_factor * ring_factor
220 }
221
222 pub fn get_tries(&self, sequence: Sequence) -> Option<u8> {
224 self.tx_ring.iter()
225 .find(|e| e.sequence == sequence)
226 .map(|e| e.tries)
227 }
228
229 pub fn shutdown(&mut self) {
231 self.tx_ring.clear();
232 self.rx_ring.clear();
233 }
234
235 pub fn window(&self) -> u16 {
237 self.window
238 }
239
240 pub fn window_max(&self) -> u16 {
242 self.window_max
243 }
244
245 pub fn outstanding(&self) -> usize {
247 self.tx_ring.iter().filter(|e| !e.delivered).count()
248 }
249
250 fn is_behind_rx_window(&self, sequence: Sequence) -> bool {
253 if sequence < self.next_rx_sequence {
254 let window_overflow = (self.next_rx_sequence as u32 + CHANNEL_WINDOW_MAX_FAST as u32) % CHANNEL_SEQ_MODULUS;
255 let overflow = window_overflow as u16;
256 if overflow < self.next_rx_sequence {
257 if sequence > overflow {
259 return true; }
261 return false; }
263 return true;
264 }
265 false
266 }
267
268 fn emplace_rx(&mut self, envelope: Envelope) {
269 let env_dist = envelope.sequence.wrapping_sub(self.next_rx_sequence);
272 let mut i = 0;
273 for existing in self.rx_ring.iter() {
274 if envelope.sequence == existing.sequence {
275 return; }
277 let exist_dist = existing.sequence.wrapping_sub(self.next_rx_sequence);
278 if env_dist < exist_dist {
279 self.rx_ring.insert(i, envelope);
280 return;
281 }
282 i += 1;
283 }
284 self.rx_ring.push_back(envelope);
285 }
286
287 fn collect_contiguous(&mut self) -> Vec<ChannelAction> {
288 let mut actions = Vec::new();
289
290 loop {
291 let front_match = self.rx_ring.front()
292 .map(|e| e.sequence == self.next_rx_sequence)
293 .unwrap_or(false);
294
295 if !front_match {
296 break;
297 }
298
299 let envelope = self.rx_ring.pop_front().unwrap();
300
301 if let Ok((msgtype, _seq, payload)) = unpack_envelope(&envelope.raw) {
303 actions.push(ChannelAction::MessageReceived {
304 msgtype,
305 payload: payload.to_vec(),
306 sequence: envelope.sequence,
307 });
308 }
309
310 self.next_rx_sequence = ((self.next_rx_sequence as u32 + 1) % CHANNEL_SEQ_MODULUS) as u16;
311
312 if self.next_rx_sequence == 0 {
314 }
316 }
317
318 actions
319 }
320
321 fn adapt_window_on_delivery(&mut self) {
322 if self.rtt == 0.0 {
323 return;
324 }
325
326 if self.rtt > CHANNEL_RTT_FAST {
327 self.fast_rate_rounds = 0;
328
329 if self.rtt > CHANNEL_RTT_MEDIUM {
330 self.medium_rate_rounds = 0;
331 } else {
332 self.medium_rate_rounds += 1;
333 if self.window_max < CHANNEL_WINDOW_MAX_MEDIUM
334 && self.medium_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
335 {
336 self.window_max = CHANNEL_WINDOW_MAX_MEDIUM;
337 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_MEDIUM;
338 }
339 }
340 } else {
341 self.fast_rate_rounds += 1;
342 if self.window_max < CHANNEL_WINDOW_MAX_FAST
343 && self.fast_rate_rounds == CHANNEL_FAST_RATE_THRESHOLD
344 {
345 self.window_max = CHANNEL_WINDOW_MAX_FAST;
346 self.window_min = CHANNEL_WINDOW_MIN_LIMIT_FAST;
347 }
348 }
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355
356 #[test]
357 fn test_new_default() {
358 let ch = Channel::new(0.5);
359 assert_eq!(ch.window, CHANNEL_WINDOW);
360 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_SLOW);
361 assert!(ch.is_ready_to_send());
362 }
363
364 #[test]
365 fn test_new_very_slow() {
366 let ch = Channel::new(2.0);
367 assert_eq!(ch.window, 1);
368 assert_eq!(ch.window_max, 1);
369 }
370
371 #[test]
372 fn test_send_receive() {
373 let mut ch = Channel::new(0.1);
374 let actions = ch.send(0x01, b"hello", 1.0, 500).unwrap();
375 assert_eq!(actions.len(), 1);
376 match &actions[0] {
377 ChannelAction::SendOnLink { raw } => {
378 let mut ch2 = Channel::new(0.1);
380 let recv_actions = ch2.receive(raw, 1.1);
381 assert_eq!(recv_actions.len(), 1);
382 match &recv_actions[0] {
383 ChannelAction::MessageReceived { msgtype, payload, sequence } => {
384 assert_eq!(*msgtype, 0x01);
385 assert_eq!(payload, b"hello");
386 assert_eq!(*sequence, 0);
387 }
388 _ => panic!("Expected MessageReceived"),
389 }
390 }
391 _ => panic!("Expected SendOnLink"),
392 }
393 }
394
395 #[test]
396 fn test_send_not_ready() {
397 let mut ch = Channel::new(0.1);
398 ch.send(0x01, b"a", 1.0, 500).unwrap();
400 ch.send(0x01, b"b", 1.0, 500).unwrap();
401 assert!(!ch.is_ready_to_send());
403 assert_eq!(ch.send(0x01, b"c", 1.0, 500), Err(ChannelError::NotReady));
404 }
405
406 #[test]
407 fn test_packet_delivered_grows_window() {
408 let mut ch = Channel::new(0.1);
409 ch.send(0x01, b"a", 1.0, 500).unwrap();
410 ch.send(0x01, b"b", 1.0, 500).unwrap();
411
412 assert_eq!(ch.window, 2);
413 ch.packet_delivered(0);
414 assert_eq!(ch.window, 3);
415 }
416
417 #[test]
418 fn test_packet_timeout_shrinks_window() {
419 let mut ch = Channel::new(0.1);
420 ch.send(0x01, b"a", 1.0, 500).unwrap();
421 ch.send(0x01, b"b", 1.0, 500).unwrap();
422
423 ch.packet_delivered(0);
425 assert_eq!(ch.window, 3);
426
427 let actions = ch.packet_timeout(1, 2.0);
429 assert_eq!(actions.len(), 1); assert_eq!(ch.window, 2);
431 }
432
433 #[test]
434 fn test_max_retries_teardown() {
435 let mut ch = Channel::new(0.1);
436 ch.send(0x01, b"a", 1.0, 500).unwrap();
437
438 for i in 0..4 {
440 let actions = ch.packet_timeout(0, 2.0 + i as f64);
441 assert_eq!(actions.len(), 1);
442 match &actions[0] {
443 ChannelAction::SendOnLink { .. } => {}
444 _ => panic!("Expected SendOnLink"),
445 }
446 }
447
448 let actions = ch.packet_timeout(0, 10.0);
450 assert_eq!(actions.len(), 1);
451 match &actions[0] {
452 ChannelAction::TeardownLink => {}
453 _ => panic!("Expected TeardownLink"),
454 }
455 }
456
457 #[test]
458 fn test_sequence_wrapping() {
459 let mut ch = Channel::new(0.1);
460 ch.next_sequence = CHANNEL_SEQ_MAX;
461
462 ch.send(0x01, b"wrap", 1.0, 500).unwrap();
463 assert_eq!(ch.next_sequence, 0);
464
465 ch.send(0x01, b"after", 1.0, 500).unwrap();
466 assert_eq!(ch.next_sequence, 1);
467 }
468
469 #[test]
470 fn test_out_of_order_buffering() {
471 let mut ch = Channel::new(0.1);
472
473 let raw0 = pack_envelope(0x01, 0, b"first");
475 let raw1 = pack_envelope(0x01, 1, b"second");
476
477 let actions = ch.receive(&raw1, 1.0);
479 assert!(actions.is_empty()); let actions = ch.receive(&raw0, 1.1);
483 assert_eq!(actions.len(), 2); match &actions[0] {
485 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
486 _ => panic!("Expected MessageReceived"),
487 }
488 match &actions[1] {
489 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 1),
490 _ => panic!("Expected MessageReceived"),
491 }
492 }
493
494 #[test]
495 fn test_duplicate_rejection() {
496 let mut ch = Channel::new(0.1);
497 let raw = pack_envelope(0x01, 0, b"hello");
498
499 let actions = ch.receive(&raw, 1.0);
500 assert_eq!(actions.len(), 1);
501
502 let actions = ch.receive(&raw, 1.1);
504 assert!(actions.is_empty());
505 }
506
507 #[test]
508 fn test_get_packet_timeout() {
509 let ch = Channel::new(0.1);
510 let t1 = ch.get_packet_timeout(1);
511 let t2 = ch.get_packet_timeout(2);
512 assert!(t2 > t1); }
514
515 #[test]
516 fn test_mdu() {
517 let ch = Channel::new(0.1);
518 assert_eq!(ch.mdu(431), 431 - CHANNEL_ENVELOPE_OVERHEAD);
519 }
520
521 #[test]
522 fn test_window_upgrade_fast() {
523 let mut ch = Channel::new(0.05); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
525
526 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
528 ch.send(0x01, b"x", i as f64, 500).unwrap();
529 ch.packet_delivered(i);
530 }
531
532 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_FAST);
533 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_FAST);
534 }
535
536 #[test]
537 fn test_window_upgrade_medium() {
538 let mut ch = Channel::new(0.5); ch.window_max = CHANNEL_WINDOW_MAX_SLOW;
540
541 for i in 0..CHANNEL_FAST_RATE_THRESHOLD {
542 ch.send(0x01, b"x", i as f64, 500).unwrap();
543 ch.packet_delivered(i);
544 }
545
546 assert_eq!(ch.window_max, CHANNEL_WINDOW_MAX_MEDIUM);
547 assert_eq!(ch.window_min, CHANNEL_WINDOW_MIN_LIMIT_MEDIUM);
548 }
549
550 #[test]
551 fn test_shutdown() {
552 let mut ch = Channel::new(0.1);
553 ch.send(0x01, b"a", 1.0, 500).unwrap();
554 ch.shutdown();
555 assert_eq!(ch.outstanding(), 0);
556 }
557
558 #[test]
559 fn test_message_too_big() {
560 let mut ch = Channel::new(0.1);
561 let big = vec![0u8; 500];
562 assert_eq!(ch.send(0x01, &big, 1.0, 10), Err(ChannelError::MessageTooBig));
564 }
565
566 #[test]
567 fn test_receive_sequence_wrap_at_boundary() {
568 let mut ch = Channel::new(0.1);
569 ch.next_rx_sequence = CHANNEL_SEQ_MAX;
570
571 let raw_max = pack_envelope(0x01, CHANNEL_SEQ_MAX, b"last");
572 let raw_zero = pack_envelope(0x01, 0, b"first_after_wrap");
573
574 let actions = ch.receive(&raw_max, 1.0);
575 assert_eq!(actions.len(), 1);
576 assert_eq!(ch.next_rx_sequence, 0);
577
578 let actions = ch.receive(&raw_zero, 1.1);
579 assert_eq!(actions.len(), 1);
580 match &actions[0] {
581 ChannelAction::MessageReceived { sequence, .. } => assert_eq!(*sequence, 0),
582 _ => panic!("Expected MessageReceived"),
583 }
584 }
585
586 #[test]
587 fn test_receive_wrap_boundary_out_of_order() {
588 let mut ch = Channel::new(0.1);
590 ch.next_rx_sequence = 0xFFFE;
591
592 let raw_fffe = pack_envelope(0x01, 0xFFFE, b"a");
593 let raw_ffff = pack_envelope(0x01, 0xFFFF, b"b");
594 let raw_0000 = pack_envelope(0x01, 0x0000, b"c");
595
596 let actions = ch.receive(&raw_0000, 1.0);
598 assert!(actions.is_empty()); let actions = ch.receive(&raw_ffff, 1.1);
601 assert!(actions.is_empty()); let actions = ch.receive(&raw_fffe, 1.2);
604 assert_eq!(actions.len(), 3); match &actions[0] {
606 ChannelAction::MessageReceived { sequence, payload, .. } => {
607 assert_eq!(*sequence, 0xFFFE);
608 assert_eq!(payload, b"a");
609 }
610 _ => panic!("Expected MessageReceived"),
611 }
612 match &actions[1] {
613 ChannelAction::MessageReceived { sequence, payload, .. } => {
614 assert_eq!(*sequence, 0xFFFF);
615 assert_eq!(payload, b"b");
616 }
617 _ => panic!("Expected MessageReceived"),
618 }
619 match &actions[2] {
620 ChannelAction::MessageReceived { sequence, payload, .. } => {
621 assert_eq!(*sequence, 0x0000);
622 assert_eq!(payload, b"c");
623 }
624 _ => panic!("Expected MessageReceived"),
625 }
626 }
627
628 #[test]
629 fn test_many_messages_in_order() {
630 let mut sender = Channel::new(0.05);
631 let mut receiver = Channel::new(0.05);
632
633 for i in 0..20u16 {
634 if i >= 2 {
636 sender.packet_delivered(i - 2);
637 }
638
639 let actions = sender.send(0x01, &[i as u8], i as f64, 500).unwrap();
640 let raw = match &actions[0] {
641 ChannelAction::SendOnLink { raw } => raw.clone(),
642 _ => panic!("Expected SendOnLink"),
643 };
644
645 let recv_actions = receiver.receive(&raw, i as f64 + 0.1);
646 assert_eq!(recv_actions.len(), 1);
647 match &recv_actions[0] {
648 ChannelAction::MessageReceived { payload, sequence, .. } => {
649 assert_eq!(*sequence, i);
650 assert_eq!(payload, &[i as u8]);
651 }
652 _ => panic!("Expected MessageReceived"),
653 }
654 }
655 }
656}