sosistab2/multiplex/stream/inflight/
mod.rs

1use std::{
2    collections::{btree_map::Entry, BTreeMap},
3    time::{Duration, Instant},
4};
5
6use crate::frame::Seqno;
7
8use self::rtt_calc::{BwCalculator, RttCalculator};
9
10use super::StreamMessage;
11
12mod rtt_calc;
13
14#[derive(Debug, Clone)]
15/// An element of Inflight.
16pub struct InflightEntry {
17    send_time: Instant,
18    retrans: u64,
19    payload: StreamMessage,
20
21    retrans_time: Instant,
22    delivered: u64,
23}
24
25/// A data structure that tracks in-flight packets.
26pub struct Inflight {
27    segments: BTreeMap<Seqno, InflightEntry>,
28    rtos: BTreeMap<Instant, Vec<Seqno>>,
29
30    rtt: RttCalculator,
31    bw: BwCalculator,
32
33    sent: u64,
34    retrans: u64,
35}
36
37impl Inflight {
38    /// Creates a new Inflight.
39    pub fn new() -> Self {
40        Inflight {
41            segments: Default::default(),
42            rtos: Default::default(),
43            rtt: Default::default(),
44            bw: Default::default(),
45
46            sent: 0,
47            retrans: 0,
48        }
49    }
50
51    pub fn inflight(&self) -> usize {
52        // all segments that are still in flight
53        self.segments.len()
54    }
55
56    pub fn lost_at(&self, now: Instant) -> usize {
57        self.rtos
58            .iter()
59            .take_while(|(&retrans_time, _)| retrans_time <= now)
60            .map(|(_, seqnos)| seqnos.len())
61            .sum()
62    }
63
64    /// Mark all inflight packets less than a certain sequence number as acknowledged.
65    pub fn mark_acked_lt(&mut self, seqno: Seqno) -> usize {
66        let mut to_remove = vec![];
67        for (k, _) in self.segments.iter() {
68            if *k < seqno {
69                to_remove.push(*k);
70            } else {
71                // we can rely on iteration order
72                break;
73            }
74        }
75        let mut sum = 0;
76        for seqno in to_remove {
77            if self.mark_acked(seqno) {
78                sum += 1;
79            }
80        }
81        sum
82    }
83
84    /// Marks a particular inflight packet as acknowledged. Returns whether or not there was actually such an inflight packet.
85    pub fn mark_acked(&mut self, acked_seqno: Seqno) -> bool {
86        let mut to_remove = vec![];
87        let now_rto = Instant::now();
88        for (seqno, entry) in self.segments.iter_mut() {
89            if acked_seqno > seqno + 5 && entry.retrans == 0 && entry.retrans_time > now_rto {
90                log::debug!(
91                    "fast retransmit triggered, acked_seqno = {acked_seqno}; seqno = {seqno}"
92                );
93
94                to_remove.push((entry.retrans_time, *seqno));
95                entry.retrans_time = now_rto;
96                self.rtos.entry(now_rto).or_default().push(*seqno);
97            } else {
98                break;
99            }
100        }
101
102        for (a, b) in to_remove {
103            self.remove_rto(a, b)
104        }
105
106        let now = Instant::now();
107
108        if let Some(acked_seg) = self.segments.remove(&acked_seqno) {
109            // record RTT
110            if acked_seg.retrans == 0 {
111                self.rtt
112                    .record_sample(now.saturating_duration_since(acked_seg.send_time));
113            }
114            // record bandwidth
115            self.bw.on_ack(acked_seg.delivered, acked_seg.send_time);
116            // remove from rtos
117            self.remove_rto(acked_seg.retrans_time, acked_seqno);
118
119            true
120        } else {
121            false
122        }
123    }
124
125    /// Inserts a packet to the inflight.
126    pub fn insert(&mut self, msg: StreamMessage) {
127        let seqno = msg.seqno();
128        let now = Instant::now();
129        let rto_duration = self.rtt.rto();
130        let rto = now + rto_duration;
131        let prev = self.segments.insert(
132            seqno,
133            InflightEntry {
134                send_time: now,
135                payload: msg,
136                retrans: 0,
137                retrans_time: rto,
138
139                delivered: self.bw.delivered(),
140            },
141        );
142        assert!(prev.is_none());
143        // we insert into RTOs.
144        self.rtos.entry(rto).or_default().push(seqno);
145        self.sent += 1;
146    }
147
148    /// Returns the retransmission time of the first possibly retransmitted packet, as well as its seqno. This skips all known-lost packets.
149    pub fn first_rto(&self) -> Option<(Seqno, Instant)> {
150        self.rtos
151            .iter()
152            .next()
153            .map(|(instant, seqno)| (seqno[0], *instant))
154    }
155
156    /// Retransmits a particular seqno
157    pub fn retransmit(&mut self, seqno: Seqno) -> Option<StreamMessage> {
158        let rto = self.rtt.rto();
159        let (payload, old_retrans, new_retrans) = {
160            let entry = self.segments.get_mut(&seqno);
161            entry.map(|entry| {
162                let old_retrans = entry.retrans_time;
163                entry.retrans += 1;
164
165                entry.retrans_time =
166                    Instant::now() + rto.mul_f64(2.0f64.powi(entry.retrans as i32).min(60.0));
167
168                (entry.payload.clone(), old_retrans, entry.retrans_time)
169            })?
170        };
171        // eprintln!("retransmit {}", seqno);
172        self.remove_rto(old_retrans, seqno);
173        self.rtos.entry(new_retrans).or_default().push(seqno);
174        self.sent += 1;
175        self.retrans += 1;
176        log::debug!(
177            "retransmission {:.2}% ({}/{})",
178            (self.retrans as f64) / (self.sent as f64) * 100.0,
179            self.retrans,
180            self.sent
181        );
182        Some(payload)
183    }
184
185    fn remove_rto(&mut self, retrans_time: Instant, seqno: Seqno) {
186        let rto_entry = self.rtos.entry(retrans_time);
187        if let Entry::Occupied(mut o) = rto_entry {
188            o.get_mut().retain(|v| *v != seqno);
189            if o.get().is_empty() {
190                o.remove();
191            }
192        }
193    }
194
195    /// The total bdp of the link, in packets
196    pub fn bdp(&self) -> usize {
197        (self.bw.delivery_rate() * self.rtt.min_rtt().as_secs_f64()) as usize
198    }
199
200    /// Minimum RTT
201    pub fn min_rtt(&self) -> Duration {
202        self.rtt.min_rtt()
203    }
204
205    /// The estimated delivery rate of the link
206    pub fn delivery_rate(&self) -> f64 {
207        self.bw.delivery_rate()
208    }
209}