sosistab2/multiplex/stream/inflight/
mod.rs1use std::{
2 collections::{btree_map::Entry, BTreeMap},
3 time::{Duration, Instant},
4};
5
6use crate::frame::Seqno;
7
8use self::rtt_calc::{BwCalculator, RttCalculator};
9
10use super::StreamMessage;
11
12mod rtt_calc;
13
14#[derive(Debug, Clone)]
15pub struct InflightEntry {
17 send_time: Instant,
18 retrans: u64,
19 payload: StreamMessage,
20
21 retrans_time: Instant,
22 delivered: u64,
23}
24
25pub struct Inflight {
27 segments: BTreeMap<Seqno, InflightEntry>,
28 rtos: BTreeMap<Instant, Vec<Seqno>>,
29
30 rtt: RttCalculator,
31 bw: BwCalculator,
32
33 sent: u64,
34 retrans: u64,
35}
36
37impl Inflight {
38 pub fn new() -> Self {
40 Inflight {
41 segments: Default::default(),
42 rtos: Default::default(),
43 rtt: Default::default(),
44 bw: Default::default(),
45
46 sent: 0,
47 retrans: 0,
48 }
49 }
50
51 pub fn inflight(&self) -> usize {
52 self.segments.len()
54 }
55
56 pub fn lost_at(&self, now: Instant) -> usize {
57 self.rtos
58 .iter()
59 .take_while(|(&retrans_time, _)| retrans_time <= now)
60 .map(|(_, seqnos)| seqnos.len())
61 .sum()
62 }
63
64 pub fn mark_acked_lt(&mut self, seqno: Seqno) -> usize {
66 let mut to_remove = vec![];
67 for (k, _) in self.segments.iter() {
68 if *k < seqno {
69 to_remove.push(*k);
70 } else {
71 break;
73 }
74 }
75 let mut sum = 0;
76 for seqno in to_remove {
77 if self.mark_acked(seqno) {
78 sum += 1;
79 }
80 }
81 sum
82 }
83
84 pub fn mark_acked(&mut self, acked_seqno: Seqno) -> bool {
86 let mut to_remove = vec![];
87 let now_rto = Instant::now();
88 for (seqno, entry) in self.segments.iter_mut() {
89 if acked_seqno > seqno + 5 && entry.retrans == 0 && entry.retrans_time > now_rto {
90 log::debug!(
91 "fast retransmit triggered, acked_seqno = {acked_seqno}; seqno = {seqno}"
92 );
93
94 to_remove.push((entry.retrans_time, *seqno));
95 entry.retrans_time = now_rto;
96 self.rtos.entry(now_rto).or_default().push(*seqno);
97 } else {
98 break;
99 }
100 }
101
102 for (a, b) in to_remove {
103 self.remove_rto(a, b)
104 }
105
106 let now = Instant::now();
107
108 if let Some(acked_seg) = self.segments.remove(&acked_seqno) {
109 if acked_seg.retrans == 0 {
111 self.rtt
112 .record_sample(now.saturating_duration_since(acked_seg.send_time));
113 }
114 self.bw.on_ack(acked_seg.delivered, acked_seg.send_time);
116 self.remove_rto(acked_seg.retrans_time, acked_seqno);
118
119 true
120 } else {
121 false
122 }
123 }
124
125 pub fn insert(&mut self, msg: StreamMessage) {
127 let seqno = msg.seqno();
128 let now = Instant::now();
129 let rto_duration = self.rtt.rto();
130 let rto = now + rto_duration;
131 let prev = self.segments.insert(
132 seqno,
133 InflightEntry {
134 send_time: now,
135 payload: msg,
136 retrans: 0,
137 retrans_time: rto,
138
139 delivered: self.bw.delivered(),
140 },
141 );
142 assert!(prev.is_none());
143 self.rtos.entry(rto).or_default().push(seqno);
145 self.sent += 1;
146 }
147
148 pub fn first_rto(&self) -> Option<(Seqno, Instant)> {
150 self.rtos
151 .iter()
152 .next()
153 .map(|(instant, seqno)| (seqno[0], *instant))
154 }
155
156 pub fn retransmit(&mut self, seqno: Seqno) -> Option<StreamMessage> {
158 let rto = self.rtt.rto();
159 let (payload, old_retrans, new_retrans) = {
160 let entry = self.segments.get_mut(&seqno);
161 entry.map(|entry| {
162 let old_retrans = entry.retrans_time;
163 entry.retrans += 1;
164
165 entry.retrans_time =
166 Instant::now() + rto.mul_f64(2.0f64.powi(entry.retrans as i32).min(60.0));
167
168 (entry.payload.clone(), old_retrans, entry.retrans_time)
169 })?
170 };
171 self.remove_rto(old_retrans, seqno);
173 self.rtos.entry(new_retrans).or_default().push(seqno);
174 self.sent += 1;
175 self.retrans += 1;
176 log::debug!(
177 "retransmission {:.2}% ({}/{})",
178 (self.retrans as f64) / (self.sent as f64) * 100.0,
179 self.retrans,
180 self.sent
181 );
182 Some(payload)
183 }
184
185 fn remove_rto(&mut self, retrans_time: Instant, seqno: Seqno) {
186 let rto_entry = self.rtos.entry(retrans_time);
187 if let Entry::Occupied(mut o) = rto_entry {
188 o.get_mut().retain(|v| *v != seqno);
189 if o.get().is_empty() {
190 o.remove();
191 }
192 }
193 }
194
195 pub fn bdp(&self) -> usize {
197 (self.bw.delivery_rate() * self.rtt.min_rtt().as_secs_f64()) as usize
198 }
199
200 pub fn min_rtt(&self) -> Duration {
202 self.rtt.min_rtt()
203 }
204
205 pub fn delivery_rate(&self) -> f64 {
207 self.bw.delivery_rate()
208 }
209}