s2n_quic_core/recovery/bandwidth/estimator.rs
1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::{event, event::IntoEvent, recovery::congestion_controller::Publisher, time::Timestamp};
5use core::{
6 cmp::{max, Ordering},
7 time::Duration,
8};
9use num_rational::Ratio;
10use num_traits::Inv;
11
12#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
13/// Bandwidth-related data tracked for each sent packet
14pub struct PacketInfo {
15 /// [Estimator::delivered_bytes] at the time this packet was sent.
16 pub delivered_bytes: u64,
17 /// `Estimator::delivered_time` at the time this packet was sent.
18 pub delivered_time: Timestamp,
19 /// `Estimator::lost_bytes` at the time this packet was sent.
20 pub lost_bytes: u64,
21 /// `Estimator::ecn_ce_count` at the time this packet was sent.
22 pub ecn_ce_count: u64,
23 /// `Estimator::first_sent_time` at the time this packet was sent.
24 pub first_sent_time: Timestamp,
25 /// The volume of data that was estimated to be in flight at the
26 /// time of the transmission of this packet
27 pub bytes_in_flight: u32,
28 /// Whether the path send rate was limited by the application rather
29 /// than congestion control at the time this packet was sent
30 pub is_app_limited: bool,
31}
32
33/// Represents a rate at which data is transferred
34///
35/// While bandwidth is typically thought of as an amount of data over a fixed
36/// amount of time (bytes per second, for example), in this case we internally
37/// represent bandwidth as the inverse: an amount of time to send a fixed amount
38/// of data (nanoseconds per kibibyte or 1024 bytes, in this case). This allows for
39/// some of the math operations needed on `Bandwidth` to avoid division, while
40/// reducing the likelihood of panicking due to overflow.
41///
42/// The maximum (non-infinite) value that can be represented is ~1 TB/second.
43#[derive(Copy, Clone, Debug, Eq, PartialEq)]
44pub struct Bandwidth {
45 nanos_per_kibibyte: u64,
46}
47
48// 2^10 = 1024 bytes in kibibyte
49const KIBIBYTE_SHIFT: u8 = 10;
50
51impl Bandwidth {
52 pub const ZERO: Bandwidth = Bandwidth {
53 nanos_per_kibibyte: u64::MAX,
54 };
55
56 pub const INFINITY: Bandwidth = Bandwidth {
57 nanos_per_kibibyte: 0,
58 };
59
60 /// Constructs a new `Bandwidth` with the given bytes per interval
61 pub const fn new(bytes: u64, interval: Duration) -> Self {
62 let interval = (interval.as_nanos() as u64) << KIBIBYTE_SHIFT;
63 if interval == 0 || bytes == 0 {
64 Bandwidth::ZERO
65 } else {
66 Self {
67 nanos_per_kibibyte: interval / bytes,
68 }
69 }
70 }
71
72 #[inline]
73 pub fn serialize(self) -> u64 {
74 self.nanos_per_kibibyte
75 }
76
77 #[inline]
78 pub fn deserialize(value: u64) -> Self {
79 Self {
80 nanos_per_kibibyte: value,
81 }
82 }
83
84 /// Represents the bandwidth as bytes per second
85 pub fn as_bytes_per_second(&self) -> u64 {
86 const ONE_SECOND_IN_NANOS: u64 = Duration::from_secs(1).as_nanos() as u64;
87
88 if *self == Bandwidth::INFINITY {
89 return u64::MAX;
90 }
91
92 (ONE_SECOND_IN_NANOS << KIBIBYTE_SHIFT) / self.nanos_per_kibibyte
93 }
94}
95
96impl Default for Bandwidth {
97 fn default() -> Self {
98 Bandwidth::ZERO
99 }
100}
101
102impl core::cmp::PartialOrd for Bandwidth {
103 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
104 Some(self.cmp(other))
105 }
106}
107
108impl core::cmp::Ord for Bandwidth {
109 fn cmp(&self, other: &Self) -> Ordering {
110 // The higher the nanos_per_kibibyte, the lower the bandwidth,
111 // so reverse the ordering when comparing
112 self.nanos_per_kibibyte
113 .cmp(&other.nanos_per_kibibyte)
114 .reverse()
115 }
116}
117
118impl core::ops::Mul<Ratio<u64>> for Bandwidth {
119 type Output = Bandwidth;
120
121 fn mul(self, rhs: Ratio<u64>) -> Self::Output {
122 if self == Bandwidth::ZERO {
123 return Bandwidth::ZERO;
124 }
125
126 Bandwidth {
127 // Since `Bandwidth` is represented as time/byte and not bytes/time, we should divide
128 // by the given ratio to result in a higher nanos_per_kibibyte value (lower bandwidth).
129 // To avoid division, we can multiply by the inverse of the ratio instead
130 nanos_per_kibibyte: (rhs.inv() * self.nanos_per_kibibyte).to_integer(),
131 }
132 }
133}
134
135impl core::ops::Mul<Duration> for Bandwidth {
136 type Output = u64;
137
138 fn mul(self, rhs: Duration) -> Self::Output {
139 if self == Bandwidth::INFINITY {
140 return u64::MAX;
141 } else if rhs.is_zero() {
142 return 0;
143 }
144
145 let interval = (rhs.as_nanos() as u64) << KIBIBYTE_SHIFT;
146
147 if interval == 0 {
148 return u64::MAX;
149 }
150
151 interval / self.nanos_per_kibibyte
152 }
153}
154
155/// Divides a count of bytes represented as a u64 by the given `Bandwidth`
156///
157/// Since `Bandwidth` is a rate of bytes over a time period, this division
158/// results in a `Duration` being returned, representing how long a path
159/// with the given `Bandwidth` would take to transmit the given number of
160/// bytes.
161impl core::ops::Div<Bandwidth> for u64 {
162 type Output = Duration;
163
164 fn div(self, rhs: Bandwidth) -> Self::Output {
165 #[allow(clippy::suspicious_arithmetic_impl)]
166 Duration::from_nanos(rhs.nanos_per_kibibyte.saturating_mul(self) >> KIBIBYTE_SHIFT)
167 }
168}
169
170#[derive(Clone, Copy, Debug, Default)]
171/// A bandwidth delivery rate estimate with associated metadata
172pub struct RateSample {
173 /// The length of the sampling interval
174 pub interval: Duration,
175 /// The amount of data in bytes marked as delivered over the sampling interval
176 pub delivered_bytes: u64,
177 /// The amount of data in bytes marked as lost over the sampling interval
178 pub lost_bytes: u64,
179 /// The number of packets marked as explicit congestion experienced over the sampling interval
180 pub ecn_ce_count: u64,
181 /// [PacketInfo::is_app_limited] from the most recent acknowledged packet
182 pub is_app_limited: bool,
183 /// [PacketInfo::delivered_bytes] from the most recent acknowledged packet
184 pub prior_delivered_bytes: u64,
185 /// [PacketInfo::bytes_in_flight] from the most recent acknowledged packet
186 pub bytes_in_flight: u32,
187 /// [PacketInfo::lost_bytes] from the most recent acknowledged packet
188 pub prior_lost_bytes: u64,
189 /// [PacketInfo::ecn_ce_count] from the most recent acknowledged packet
190 pub prior_ecn_ce_count: u64,
191}
192
193impl RateSample {
194 /// Updates the rate sample with the most recent acknowledged packet
195 fn on_ack(&mut self, packet_info: PacketInfo) {
196 debug_assert!(
197 packet_info.delivered_bytes >= self.prior_delivered_bytes,
198 "on_ack should only be called with the newest acknowledged packet"
199 );
200
201 self.is_app_limited = packet_info.is_app_limited;
202 self.prior_delivered_bytes = packet_info.delivered_bytes;
203 self.prior_lost_bytes = packet_info.lost_bytes;
204 self.prior_ecn_ce_count = packet_info.ecn_ce_count;
205 self.bytes_in_flight = packet_info.bytes_in_flight;
206 }
207
208 /// Gets the delivery rate of this rate sample
209 pub fn delivery_rate(&self) -> Bandwidth {
210 Bandwidth::new(self.delivered_bytes, self.interval)
211 }
212}
213
214impl IntoEvent<event::builder::RateSample> for RateSample {
215 #[inline]
216 fn into_event(self) -> event::builder::RateSample {
217 event::builder::RateSample {
218 interval: self.interval.into_event(),
219 delivered_bytes: self.delivered_bytes.into_event(),
220 lost_bytes: self.lost_bytes.into_event(),
221 ecn_ce_count: self.ecn_ce_count.into_event(),
222 is_app_limited: self.is_app_limited.into_event(),
223 prior_delivered_bytes: self.prior_delivered_bytes.into_event(),
224 bytes_in_flight: self.bytes_in_flight.into_event(),
225 prior_lost_bytes: self.prior_lost_bytes.into_event(),
226 prior_ecn_ce_count: self.prior_ecn_ce_count.into_event(),
227 delivery_rate_bytes_per_second: self.delivery_rate().as_bytes_per_second().into_event(),
228 }
229 }
230}
231
232/// Bandwidth estimator as defined in [Delivery Rate Estimation](https://datatracker.ietf.org/doc/draft-cheng-iccrg-delivery-rate-estimation/)
233/// and [BBR Congestion Control](https://datatracker.ietf.org/doc/draft-cardwell-iccrg-bbr-congestion-control/).
234#[derive(Clone, Debug, Default)]
235pub struct Estimator {
236 //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#2.2
237 //# The amount of data delivered MAY be tracked in units of either octets or packets.
238 /// The total amount of data in bytes delivered so far over the lifetime of the path, not including
239 /// non-congestion-controlled packets such as pure ACK packets.
240 delivered_bytes: u64,
241 /// The timestamp when delivered_bytes was last updated, or if the connection
242 /// was recently idle, the send time of the first packet sent after resuming from idle.
243 delivered_time: Option<Timestamp>,
244 /// The total amount of data in bytes declared lost so far over the lifetime of the path, not including
245 /// non-congestion-controlled packets such as pure ACK packets.
246 lost_bytes: u64,
247 /// The total amount of explicit congestion experienced packets over the lifetime of the path
248 ecn_ce_count: u64,
249 /// The send time of the packet that was most recently marked as delivered, or if the connection
250 /// was recently idle, the send time of the first packet sent after resuming from idle.
251 first_sent_time: Option<Timestamp>,
252 /// The `delivered_bytes` that marks the end of the current application-limited period, or
253 /// `None` if the connection is not currently application-limited.
254 app_limited_delivered_bytes: Option<u64>,
255 rate_sample: RateSample,
256}
257
258impl Estimator {
259 /// Gets the total amount of data in bytes delivered so far over the lifetime of the path, not including
260 /// non-congestion-controlled packets such as pure ACK packets.
261 pub fn delivered_bytes(&self) -> u64 {
262 self.delivered_bytes
263 }
264
265 /// Gets the total amount of data in bytes lost so far over the lifetime of the path
266 pub fn lost_bytes(&self) -> u64 {
267 self.lost_bytes
268 }
269
270 /// Gets the latest [RateSample]
271 pub fn rate_sample(&self) -> RateSample {
272 self.rate_sample
273 }
274
275 /// Returns true if the path is currently in an application-limited period
276 pub fn is_app_limited(&self) -> bool {
277 self.app_limited_delivered_bytes.is_some()
278 }
279
280 /// Called when a packet is transmitted
281 pub fn on_packet_sent(
282 &mut self,
283 prior_bytes_in_flight: u32,
284 sent_bytes: usize,
285 app_limited: Option<bool>,
286 now: Timestamp,
287 ) -> PacketInfo {
288 //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.2
289 //# If there are no packets in flight yet, then we can start the delivery rate interval
290 //# at the current time, since we know that any ACKs after now indicate that the network
291 //# was able to deliver those packets completely in the sampling interval between now
292 //# and the next ACK.
293
294 //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.2
295 //# Upon each packet transmission, the sender executes the following steps:
296 //#
297 //# SendPacket(Packet P):
298 //# if (SND.NXT == SND.UNA) /* no packets in flight yet? */
299 //# C.first_sent_time = C.delivered_time = Now()
300 //# P.first_sent_time = C.first_sent_time
301 //# P.delivered_time = C.delivered_time
302 //# P.delivered = C.delivered
303 //# P.is_app_limited = (C.app_limited != 0)
304 if prior_bytes_in_flight == 0 {
305 self.first_sent_time = Some(now);
306 self.delivered_time = Some(now);
307 }
308
309 let bytes_in_flight = prior_bytes_in_flight.saturating_add(sent_bytes as u32);
310
311 if app_limited.unwrap_or(true) {
312 self.on_app_limited(bytes_in_flight);
313 }
314
315 PacketInfo {
316 delivered_bytes: self.delivered_bytes,
317 delivered_time: self
318 .delivered_time
319 .expect("initialized on first sent packet"),
320 lost_bytes: self.lost_bytes,
321 ecn_ce_count: self.ecn_ce_count,
322 first_sent_time: self
323 .first_sent_time
324 .expect("initialized on first sent packet"),
325 bytes_in_flight,
326 is_app_limited: self.app_limited_delivered_bytes.is_some(),
327 }
328 }
329
330 //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.3
331 //# For each packet that was newly SACKed or ACKed, UpdateRateSample() updates the
332 //# rate sample based on a snapshot of connection delivery information from the time
333 //# at which the packet was last transmitted.
334 /// Called for each acknowledgement of one or more packets
335 pub fn on_ack<Pub: Publisher>(
336 &mut self,
337 bytes_acknowledged: usize,
338 newest_acked_time_sent: Timestamp,
339 newest_acked_packet_info: PacketInfo,
340 now: Timestamp,
341 publisher: &mut Pub,
342 ) {
343 self.delivered_bytes += bytes_acknowledged as u64;
344 self.delivered_time = Some(now);
345
346 let is_app_limited_period_over =
347 |app_limited_bytes| self.delivered_bytes > app_limited_bytes;
348 if self
349 .app_limited_delivered_bytes
350 .is_some_and(is_app_limited_period_over)
351 {
352 // Clear app-limited field if bubble is ACKed and gone
353 self.app_limited_delivered_bytes = None;
354 }
355
356 //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.3
357 //# UpdateRateSample() is invoked multiple times when a stretched ACK acknowledges
358 //# multiple data packets. In this case we use the information from the most recently
359 //# sent packet, i.e., the packet with the highest "P.delivered" value.
360 if self.rate_sample.prior_delivered_bytes == 0
361 || newest_acked_packet_info.delivered_bytes > self.rate_sample.prior_delivered_bytes
362 {
363 // Update info using the newest packet
364 self.rate_sample.on_ack(newest_acked_packet_info);
365 self.first_sent_time = Some(newest_acked_time_sent);
366
367 debug_assert!(
368 newest_acked_time_sent >= newest_acked_packet_info.first_sent_time,
369 "newest_acked_time_sent={newest_acked_time_sent}, first_sent_time={}",
370 newest_acked_packet_info.first_sent_time
371 );
372 let send_elapsed = newest_acked_time_sent - newest_acked_packet_info.first_sent_time;
373 debug_assert!(
374 now >= newest_acked_packet_info.delivered_time,
375 "now={now}, delivered_time={}",
376 newest_acked_packet_info.delivered_time
377 );
378 let ack_elapsed = now - newest_acked_packet_info.delivered_time;
379
380 //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#2.2.4
381 //# Since it is physically impossible to have data delivered faster than it is sent
382 //# in a sustained fashion, when the estimator notices that the ack_rate for a flight
383 //# is faster than the send rate for the flight, it filters out the implausible ack_rate
384 //# by capping the delivery rate sample to be no higher than the send rate.
385 self.rate_sample.interval = max(send_elapsed, ack_elapsed);
386 }
387
388 self.rate_sample.delivered_bytes =
389 self.delivered_bytes - self.rate_sample.prior_delivered_bytes;
390 // Lost bytes and ECN CE Count are updated here as well as in `on_loss` and `on_explicit_congestion`
391 // so the values are up to date even when no loss or ECN CE markings are received.
392 self.rate_sample.lost_bytes = self.lost_bytes - self.rate_sample.prior_lost_bytes;
393 self.rate_sample.ecn_ce_count = self.ecn_ce_count - self.rate_sample.prior_ecn_ce_count;
394
395 publisher.on_delivery_rate_sampled(self.rate_sample);
396 }
397
398 /// Called when packets are declared lost
399 #[inline]
400 pub fn on_loss(&mut self, lost_bytes: usize) {
401 self.lost_bytes += lost_bytes as u64;
402 self.rate_sample.lost_bytes = self.lost_bytes - self.rate_sample.prior_lost_bytes;
403
404 // Move the app-limited period earlier as the lost bytes will not be delivered
405 if let Some(ref mut app_limited_delivered_bytes) = self.app_limited_delivered_bytes {
406 *app_limited_delivered_bytes =
407 app_limited_delivered_bytes.saturating_sub(lost_bytes as u64)
408 }
409 }
410
411 /// Called when packets are discarded
412 #[inline]
413 pub fn on_packet_discarded(&mut self, bytes_sent: usize) {
414 // Move the app-limited period earlier as the discarded bytes will not be delivered
415 if let Some(ref mut app_limited_delivered_bytes) = self.app_limited_delivered_bytes {
416 *app_limited_delivered_bytes =
417 app_limited_delivered_bytes.saturating_sub(bytes_sent as u64)
418 }
419 }
420
421 /// Called each time explicit congestion is recorded
422 #[inline]
423 pub fn on_explicit_congestion(&mut self, ce_count: u64) {
424 self.ecn_ce_count += ce_count;
425 self.rate_sample.ecn_ce_count = self.ecn_ce_count - self.rate_sample.prior_ecn_ce_count;
426 }
427
428 /// Mark the path as app limited until the given `bytes_in_flight` are acknowledged
429 pub fn on_app_limited(&mut self, bytes_in_flight: u32) {
430 //= https://tools.ietf.org/id/draft-cardwell-iccrg-bbr-congestion-control-02#4.3.4.4
431 //# MarkConnectionAppLimited():
432 //# C.app_limited =
433 //# (C.delivered + packets_in_flight) ? : 1
434 self.app_limited_delivered_bytes = Some(self.delivered_bytes + bytes_in_flight as u64);
435 }
436
437 #[cfg(test)]
438 pub fn set_rate_sample_for_test(&mut self, rate_sample: RateSample) {
439 self.rate_sample = rate_sample
440 }
441
442 #[cfg(test)]
443 pub fn set_delivered_bytes_for_test(&mut self, delivered_bytes: u64) {
444 self.delivered_bytes = delivered_bytes
445 }
446}
447
448#[cfg(test)]
449mod tests;