s2n_quic_core/recovery/bandwidth/
estimator.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{event, event::IntoEvent, recovery::congestion_controller::Publisher, time::Timestamp};
5use core::{
6    cmp::{max, Ordering},
7    time::Duration,
8};
9use num_rational::Ratio;
10use num_traits::Inv;
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
13/// Bandwidth-related data tracked for each sent packet
14pub struct PacketInfo {
15    /// [Estimator::delivered_bytes] at the time this packet was sent.
16    pub delivered_bytes: u64,
17    /// `Estimator::delivered_time` at the time this packet was sent.
18    pub delivered_time: Timestamp,
19    /// `Estimator::lost_bytes` at the time this packet was sent.
20    pub lost_bytes: u64,
21    /// `Estimator::ecn_ce_count` at the time this packet was sent.
22    pub ecn_ce_count: u64,
23    /// `Estimator::first_sent_time` at the time this packet was sent.
24    pub first_sent_time: Timestamp,
25    /// The volume of data that was estimated to be in flight at the
26    /// time of the transmission of this packet
27    pub bytes_in_flight: u32,
28    /// Whether the path send rate was limited by the application rather
29    /// than congestion control at the time this packet was sent
30    pub is_app_limited: bool,
31}
32
33/// Represents a rate at which data is transferred
34///
35/// While bandwidth is typically thought of as an amount of data over a fixed
36/// amount of time (bytes per second, for example), in this case we internally
37/// represent bandwidth as the inverse: an amount of time to send a fixed amount
38/// of data (nanoseconds per kibibyte or 1024 bytes, in this case). This allows for
39/// some of the math operations needed on `Bandwidth` to avoid division, while
40/// reducing the likelihood of panicking due to overflow.
41///
42/// The maximum (non-infinite) value that can be represented is ~1 TB/second.
43#[derive(Copy, Clone, Debug, Eq, PartialEq)]
44pub struct Bandwidth {
45    nanos_per_kibibyte: u64,
46}
47
48// 2^10 = 1024 bytes in kibibyte
49const KIBIBYTE_SHIFT: u8 = 10;
50
51impl Bandwidth {
52    pub const ZERO: Bandwidth = Bandwidth {
53        nanos_per_kibibyte: u64::MAX,
54    };
55
56    pub const INFINITY: Bandwidth = Bandwidth {
57        nanos_per_kibibyte: 0,
58    };
59
60    /// Constructs a new `Bandwidth` with the given bytes per interval
61    pub const fn new(bytes: u64, interval: Duration) -> Self {
62        let interval = (interval.as_nanos() as u64) << KIBIBYTE_SHIFT;
63        if interval == 0 || bytes == 0 {
64            Bandwidth::ZERO
65        } else {
66            Self {
67                nanos_per_kibibyte: interval / bytes,
68            }
69        }
70    }
71
72    #[inline]
73    pub fn serialize(self) -> u64 {
74        self.nanos_per_kibibyte
75    }
76
77    #[inline]
78    pub fn deserialize(value: u64) -> Self {
79        Self {
80            nanos_per_kibibyte: value,
81        }
82    }
83
84    /// Represents the bandwidth as bytes per second
85    pub fn as_bytes_per_second(&self) -> u64 {
86        const ONE_SECOND_IN_NANOS: u64 = Duration::from_secs(1).as_nanos() as u64;
87
88        if *self == Bandwidth::INFINITY {
89            return u64::MAX;
90        }
91
92        (ONE_SECOND_IN_NANOS << KIBIBYTE_SHIFT) / self.nanos_per_kibibyte
93    }
94}
95
96impl Default for Bandwidth {
97    fn default() -> Self {
98        Bandwidth::ZERO
99    }
100}
101
102impl core::cmp::PartialOrd for Bandwidth {
103    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
104        Some(self.cmp(other))
105    }
106}
107
108impl core::cmp::Ord for Bandwidth {
109    fn cmp(&self, other: &Self) -> Ordering {
110        // The higher the nanos_per_kibibyte, the lower the bandwidth,
111        // so reverse the ordering when comparing
112        self.nanos_per_kibibyte
113            .cmp(&other.nanos_per_kibibyte)
114            .reverse()
115    }
116}
117
118impl core::ops::Mul<Ratio<u64>> for Bandwidth {
119    type Output = Bandwidth;
120
121    fn mul(self, rhs: Ratio<u64>) -> Self::Output {
122        if self == Bandwidth::ZERO {
123            return Bandwidth::ZERO;
124        }
125
126        Bandwidth {
127            // Since `Bandwidth` is represented as time/byte and not bytes/time, we should divide
128            // by the given ratio to result in a higher nanos_per_kibibyte value (lower bandwidth).
129            // To avoid division, we can multiply by the inverse of the ratio instead
130            nanos_per_kibibyte: (rhs.inv() * self.nanos_per_kibibyte).to_integer(),
131        }
132    }
133}
134
135impl core::ops::Mul<Duration> for Bandwidth {
136    type Output = u64;
137
138    fn mul(self, rhs: Duration) -> Self::Output {
139        if self == Bandwidth::INFINITY {
140            return u64::MAX;
141        } else if rhs.is_zero() {
142            return 0;
143        }
144
145        let interval = (rhs.as_nanos() as u64) << KIBIBYTE_SHIFT;
146
147        if interval == 0 {
148            return u64::MAX;
149        }
150
151        interval / self.nanos_per_kibibyte
152    }
153}
154
155/// Divides a count of bytes represented as a u64 by the given `Bandwidth`
156///
157/// Since `Bandwidth` is a rate of bytes over a time period, this division
158/// results in a `Duration` being returned, representing how long a path
159/// with the given `Bandwidth` would take to transmit the given number of
160/// bytes.
161impl core::ops::Div<Bandwidth> for u64 {
162    type Output = Duration;
163
164    fn div(self, rhs: Bandwidth) -> Self::Output {
165        #[allow(clippy::suspicious_arithmetic_impl)]
166        Duration::from_nanos(rhs.nanos_per_kibibyte.saturating_mul(self) >> KIBIBYTE_SHIFT)
167    }
168}
169
170#[derive(Clone, Copy, Debug, Default)]
171/// A bandwidth delivery rate estimate with associated metadata
172pub struct RateSample {
173    /// The length of the sampling interval
174    pub interval: Duration,
175    /// The amount of data in bytes marked as delivered over the sampling interval
176    pub delivered_bytes: u64,
177    /// The amount of data in bytes marked as lost over the sampling interval
178    pub lost_bytes: u64,
179    /// The number of packets marked as explicit congestion experienced over the sampling interval
180    pub ecn_ce_count: u64,
181    /// [PacketInfo::is_app_limited] from the most recent acknowledged packet
182    pub is_app_limited: bool,
183    /// [PacketInfo::delivered_bytes] from the most recent acknowledged packet
184    pub prior_delivered_bytes: u64,
185    /// [PacketInfo::bytes_in_flight] from the most recent acknowledged packet
186    pub bytes_in_flight: u32,
187    /// [PacketInfo::lost_bytes] from the most recent acknowledged packet
188    pub prior_lost_bytes: u64,
189    /// [PacketInfo::ecn_ce_count] from the most recent acknowledged packet
190    pub prior_ecn_ce_count: u64,
191}
192
193impl RateSample {
194    /// Updates the rate sample with the most recent acknowledged packet
195    fn on_ack(&mut self, packet_info: PacketInfo) {
196        debug_assert!(
197            packet_info.delivered_bytes >= self.prior_delivered_bytes,
198            "on_ack should only be called with the newest acknowledged packet"
199        );
200
201        self.is_app_limited = packet_info.is_app_limited;
202        self.prior_delivered_bytes = packet_info.delivered_bytes;
203        self.prior_lost_bytes = packet_info.lost_bytes;
204        self.prior_ecn_ce_count = packet_info.ecn_ce_count;
205        self.bytes_in_flight = packet_info.bytes_in_flight;
206    }
207
208    /// Gets the delivery rate of this rate sample
209    pub fn delivery_rate(&self) -> Bandwidth {
210        Bandwidth::new(self.delivered_bytes, self.interval)
211    }
212}
213
214impl IntoEvent<event::builder::RateSample> for RateSample {
215    #[inline]
216    fn into_event(self) -> event::builder::RateSample {
217        event::builder::RateSample {
218            interval: self.interval.into_event(),
219            delivered_bytes: self.delivered_bytes.into_event(),
220            lost_bytes: self.lost_bytes.into_event(),
221            ecn_ce_count: self.ecn_ce_count.into_event(),
222            is_app_limited: self.is_app_limited.into_event(),
223            prior_delivered_bytes: self.prior_delivered_bytes.into_event(),
224            bytes_in_flight: self.bytes_in_flight.into_event(),
225            prior_lost_bytes: self.prior_lost_bytes.into_event(),
226            prior_ecn_ce_count: self.prior_ecn_ce_count.into_event(),
227            delivery_rate_bytes_per_second: self.delivery_rate().as_bytes_per_second().into_event(),
228        }
229    }
230}
231
232/// Bandwidth estimator as defined in [Delivery Rate Estimation](https://datatracker.ietf.org/doc/draft-cheng-iccrg-delivery-rate-estimation/)
233/// and [BBR Congestion Control](https://datatracker.ietf.org/doc/draft-cardwell-iccrg-bbr-congestion-control/).
234#[derive(Clone, Debug, Default)]
235pub struct Estimator {
236    //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#2.2
237    //# The amount of data delivered MAY be tracked in units of either octets or packets.
238    /// The total amount of data in bytes delivered so far over the lifetime of the path, not including
239    /// non-congestion-controlled packets such as pure ACK packets.
240    delivered_bytes: u64,
241    /// The timestamp when delivered_bytes was last updated, or if the connection
242    /// was recently idle, the send time of the first packet sent after resuming from idle.
243    delivered_time: Option<Timestamp>,
244    /// The total amount of data in bytes declared lost so far over the lifetime of the path, not including
245    /// non-congestion-controlled packets such as pure ACK packets.
246    lost_bytes: u64,
247    /// The total amount of explicit congestion experienced packets over the lifetime of the path
248    ecn_ce_count: u64,
249    /// The send time of the packet that was most recently marked as delivered, or if the connection
250    /// was recently idle, the send time of the first packet sent after resuming from idle.
251    first_sent_time: Option<Timestamp>,
252    /// The `delivered_bytes` that marks the end of the current application-limited period, or
253    /// `None` if the connection is not currently application-limited.
254    app_limited_delivered_bytes: Option<u64>,
255    rate_sample: RateSample,
256}
257
258impl Estimator {
259    /// Gets the total amount of data in bytes delivered so far over the lifetime of the path, not including
260    /// non-congestion-controlled packets such as pure ACK packets.
261    pub fn delivered_bytes(&self) -> u64 {
262        self.delivered_bytes
263    }
264
265    /// Gets the total amount of data in bytes lost so far over the lifetime of the path
266    pub fn lost_bytes(&self) -> u64 {
267        self.lost_bytes
268    }
269
270    /// Gets the latest [RateSample]
271    pub fn rate_sample(&self) -> RateSample {
272        self.rate_sample
273    }
274
275    /// Returns true if the path is currently in an application-limited period
276    pub fn is_app_limited(&self) -> bool {
277        self.app_limited_delivered_bytes.is_some()
278    }
279
280    /// Called when a packet is transmitted
281    pub fn on_packet_sent(
282        &mut self,
283        prior_bytes_in_flight: u32,
284        sent_bytes: usize,
285        app_limited: Option<bool>,
286        now: Timestamp,
287    ) -> PacketInfo {
288        //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.2
289        //# If there are no packets in flight yet, then we can start the delivery rate interval
290        //# at the current time, since we know that any ACKs after now indicate that the network
291        //# was able to deliver those packets completely in the sampling interval between now
292        //# and the next ACK.
293
294        //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.2
295        //# Upon each packet transmission, the sender executes the following steps:
296        //#
297        //# SendPacket(Packet P):
298        //#   if (SND.NXT == SND.UNA)  /* no packets in flight yet? */
299        //#     C.first_sent_time  = C.delivered_time = Now()
300        //#   P.first_sent_time = C.first_sent_time
301        //#   P.delivered_time  = C.delivered_time
302        //#   P.delivered       = C.delivered
303        //#   P.is_app_limited  = (C.app_limited != 0)
304        if prior_bytes_in_flight == 0 {
305            self.first_sent_time = Some(now);
306            self.delivered_time = Some(now);
307        }
308
309        let bytes_in_flight = prior_bytes_in_flight.saturating_add(sent_bytes as u32);
310
311        if app_limited.unwrap_or(true) {
312            self.on_app_limited(bytes_in_flight);
313        }
314
315        PacketInfo {
316            delivered_bytes: self.delivered_bytes,
317            delivered_time: self
318                .delivered_time
319                .expect("initialized on first sent packet"),
320            lost_bytes: self.lost_bytes,
321            ecn_ce_count: self.ecn_ce_count,
322            first_sent_time: self
323                .first_sent_time
324                .expect("initialized on first sent packet"),
325            bytes_in_flight,
326            is_app_limited: self.app_limited_delivered_bytes.is_some(),
327        }
328    }
329
330    //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.3
331    //# For each packet that was newly SACKed or ACKed, UpdateRateSample() updates the
332    //# rate sample based on a snapshot of connection delivery information from the time
333    //# at which the packet was last transmitted.
334    /// Called for each acknowledgement of one or more packets
335    pub fn on_ack<Pub: Publisher>(
336        &mut self,
337        bytes_acknowledged: usize,
338        newest_acked_time_sent: Timestamp,
339        newest_acked_packet_info: PacketInfo,
340        now: Timestamp,
341        publisher: &mut Pub,
342    ) {
343        self.delivered_bytes += bytes_acknowledged as u64;
344        self.delivered_time = Some(now);
345
346        let is_app_limited_period_over =
347            |app_limited_bytes| self.delivered_bytes > app_limited_bytes;
348        if self
349            .app_limited_delivered_bytes
350            .is_some_and(is_app_limited_period_over)
351        {
352            // Clear app-limited field if bubble is ACKed and gone
353            self.app_limited_delivered_bytes = None;
354        }
355
356        //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.3
357        //# UpdateRateSample() is invoked multiple times when a stretched ACK acknowledges
358        //# multiple data packets. In this case we use the information from the most recently
359        //# sent packet, i.e., the packet with the highest "P.delivered" value.
360        if self.rate_sample.prior_delivered_bytes == 0
361            || newest_acked_packet_info.delivered_bytes > self.rate_sample.prior_delivered_bytes
362        {
363            // Update info using the newest packet
364            self.rate_sample.on_ack(newest_acked_packet_info);
365            self.first_sent_time = Some(newest_acked_time_sent);
366
367            debug_assert!(
368                newest_acked_time_sent >= newest_acked_packet_info.first_sent_time,
369                "newest_acked_time_sent={newest_acked_time_sent}, first_sent_time={}",
370                newest_acked_packet_info.first_sent_time
371            );
372            let send_elapsed = newest_acked_time_sent - newest_acked_packet_info.first_sent_time;
373            debug_assert!(
374                now >= newest_acked_packet_info.delivered_time,
375                "now={now}, delivered_time={}",
376                newest_acked_packet_info.delivered_time
377            );
378            let ack_elapsed = now - newest_acked_packet_info.delivered_time;
379
380            //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#2.2.4
381            //# Since it is physically impossible to have data delivered faster than it is sent
382            //# in a sustained fashion, when the estimator notices that the ack_rate for a flight
383            //# is faster than the send rate for the flight, it filters out the implausible ack_rate
384            //# by capping the delivery rate sample to be no higher than the send rate.
385            self.rate_sample.interval = max(send_elapsed, ack_elapsed);
386        }
387
388        self.rate_sample.delivered_bytes =
389            self.delivered_bytes - self.rate_sample.prior_delivered_bytes;
390        // Lost bytes and ECN CE Count are updated here as well as in `on_loss` and `on_explicit_congestion`
391        // so the values are up to date even when no loss or ECN CE markings are received.
392        self.rate_sample.lost_bytes = self.lost_bytes - self.rate_sample.prior_lost_bytes;
393        self.rate_sample.ecn_ce_count = self.ecn_ce_count - self.rate_sample.prior_ecn_ce_count;
394
395        publisher.on_delivery_rate_sampled(self.rate_sample);
396    }
397
398    /// Called when packets are declared lost
399    #[inline]
400    pub fn on_loss(&mut self, lost_bytes: usize) {
401        self.lost_bytes += lost_bytes as u64;
402        self.rate_sample.lost_bytes = self.lost_bytes - self.rate_sample.prior_lost_bytes;
403
404        // Move the app-limited period earlier as the lost bytes will not be delivered
405        if let Some(ref mut app_limited_delivered_bytes) = self.app_limited_delivered_bytes {
406            *app_limited_delivered_bytes =
407                app_limited_delivered_bytes.saturating_sub(lost_bytes as u64)
408        }
409    }
410
411    /// Called when packets are discarded
412    #[inline]
413    pub fn on_packet_discarded(&mut self, bytes_sent: usize) {
414        // Move the app-limited period earlier as the discarded bytes will not be delivered
415        if let Some(ref mut app_limited_delivered_bytes) = self.app_limited_delivered_bytes {
416            *app_limited_delivered_bytes =
417                app_limited_delivered_bytes.saturating_sub(bytes_sent as u64)
418        }
419    }
420
421    /// Called each time explicit congestion is recorded
422    #[inline]
423    pub fn on_explicit_congestion(&mut self, ce_count: u64) {
424        self.ecn_ce_count += ce_count;
425        self.rate_sample.ecn_ce_count = self.ecn_ce_count - self.rate_sample.prior_ecn_ce_count;
426    }
427
428    /// Mark the path as app limited until the given `bytes_in_flight` are acknowledged
429    pub fn on_app_limited(&mut self, bytes_in_flight: u32) {
430        //= https://tools.ietf.org/id/draft-cardwell-iccrg-bbr-congestion-control-02#4.3.4.4
431        //# MarkConnectionAppLimited():
432        //#   C.app_limited =
433        //#     (C.delivered + packets_in_flight) ? : 1
434        self.app_limited_delivered_bytes = Some(self.delivered_bytes + bytes_in_flight as u64);
435    }
436
437    #[cfg(test)]
438    pub fn set_rate_sample_for_test(&mut self, rate_sample: RateSample) {
439        self.rate_sample = rate_sample
440    }
441
442    #[cfg(test)]
443    pub fn set_delivered_bytes_for_test(&mut self, delivered_bytes: u64) {
444        self.delivered_bytes = delivered_bytes
445    }
446}
447
448#[cfg(test)]
449mod tests;