1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::{event, event::IntoEvent, recovery::congestion_controller::Publisher, time::Timestamp};
use core::{
    cmp::{max, Ordering},
    time::Duration,
};
use num_rational::Ratio;
use num_traits::Inv;

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
/// Bandwidth-related data tracked for each sent packet
pub struct PacketInfo {
    /// [Estimator::delivered_bytes] at the time this packet was sent.
    pub delivered_bytes: u64,
    /// `Estimator::delivered_time` at the time this packet was sent.
    pub delivered_time: Timestamp,
    /// `Estimator::lost_bytes` at the time this packet was sent.
    pub lost_bytes: u64,
    /// `Estimator::ecn_ce_count` at the time this packet was sent.
    pub ecn_ce_count: u64,
    /// `Estimator::first_sent_time` at the time this packet was sent.
    pub first_sent_time: Timestamp,
    /// The volume of data that was estimated to be in flight at the
    /// time of the transmission of this packet
    pub bytes_in_flight: u32,
    /// Whether the path send rate was limited by the application rather
    /// than congestion control at the time this packet was sent
    pub is_app_limited: bool,
}

/// Represents a rate at which data is transferred
///
/// While bandwidth is typically thought of as an amount of data over a fixed
/// amount of time (bytes per second, for example), in this case we internally
/// represent bandwidth as the inverse: an amount of time to send a fixed amount
/// of data (nanoseconds per kibibyte or 1024 bytes, in this case). This allows for
/// some of the math operations needed on `Bandwidth` to avoid division, while
/// reducing the likelihood of panicking due to overflow.
///
/// The maximum (non-infinite) value that can be represented is ~1 TB/second.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Bandwidth {
    nanos_per_kibibyte: u64,
}

// 2^10 = 1024 bytes in kibibyte
const KIBIBYTE_SHIFT: u8 = 10;

impl Bandwidth {
    pub const ZERO: Bandwidth = Bandwidth {
        nanos_per_kibibyte: u64::MAX,
    };

    pub const INFINITY: Bandwidth = Bandwidth {
        nanos_per_kibibyte: 0,
    };

    /// Constructs a new `Bandwidth` with the given bytes per interval
    pub const fn new(bytes: u64, interval: Duration) -> Self {
        let interval = (interval.as_nanos() as u64) << KIBIBYTE_SHIFT;
        if interval == 0 || bytes == 0 {
            Bandwidth::ZERO
        } else {
            Self {
                nanos_per_kibibyte: interval / bytes,
            }
        }
    }

    /// Represents the bandwidth as bytes per second
    pub fn as_bytes_per_second(&self) -> u64 {
        const ONE_SECOND_IN_NANOS: u64 = Duration::from_secs(1).as_nanos() as u64;

        if *self == Bandwidth::INFINITY {
            return u64::MAX;
        }

        (ONE_SECOND_IN_NANOS << KIBIBYTE_SHIFT) / self.nanos_per_kibibyte
    }
}

impl Default for Bandwidth {
    fn default() -> Self {
        Bandwidth::ZERO
    }
}

impl core::cmp::PartialOrd for Bandwidth {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl core::cmp::Ord for Bandwidth {
    fn cmp(&self, other: &Self) -> Ordering {
        // The higher the nanos_per_kibibyte, the lower the bandwidth,
        // so reverse the ordering when comparing
        self.nanos_per_kibibyte
            .cmp(&other.nanos_per_kibibyte)
            .reverse()
    }
}

impl core::ops::Mul<Ratio<u64>> for Bandwidth {
    type Output = Bandwidth;

    fn mul(self, rhs: Ratio<u64>) -> Self::Output {
        if self == Bandwidth::ZERO {
            return Bandwidth::ZERO;
        }

        Bandwidth {
            // Since `Bandwidth` is represented as time/byte and not bytes/time, we should divide
            // by the given ratio to result in a higher nanos_per_kibibyte value (lower bandwidth).
            // To avoid division, we can multiply by the inverse of the ratio instead
            nanos_per_kibibyte: (rhs.inv() * self.nanos_per_kibibyte).to_integer(),
        }
    }
}

impl core::ops::Mul<Duration> for Bandwidth {
    type Output = u64;

    fn mul(self, rhs: Duration) -> Self::Output {
        if self == Bandwidth::INFINITY {
            return u64::MAX;
        } else if rhs.is_zero() {
            return 0;
        }

        let interval = (rhs.as_nanos() as u64) << KIBIBYTE_SHIFT;

        if interval == 0 {
            return u64::MAX;
        }

        interval / self.nanos_per_kibibyte
    }
}

/// Divides a count of bytes represented as a u64 by the given `Bandwidth`
///
/// Since `Bandwidth` is a rate of bytes over a time period, this division
/// results in a `Duration` being returned, representing how long a path
/// with the given `Bandwidth` would take to transmit the given number of
/// bytes.
impl core::ops::Div<Bandwidth> for u64 {
    type Output = Duration;

    fn div(self, rhs: Bandwidth) -> Self::Output {
        #[allow(clippy::suspicious_arithmetic_impl)]
        Duration::from_nanos(rhs.nanos_per_kibibyte.saturating_mul(self) >> KIBIBYTE_SHIFT)
    }
}

#[derive(Clone, Copy, Debug, Default)]
/// A bandwidth delivery rate estimate with associated metadata
pub struct RateSample {
    /// The length of the sampling interval
    pub interval: Duration,
    /// The amount of data in bytes marked as delivered over the sampling interval
    pub delivered_bytes: u64,
    /// The amount of data in bytes marked as lost over the sampling interval
    pub lost_bytes: u64,
    /// The number of packets marked as explicit congestion experienced over the sampling interval
    pub ecn_ce_count: u64,
    /// [PacketInfo::is_app_limited] from the most recent acknowledged packet
    pub is_app_limited: bool,
    /// [PacketInfo::delivered_bytes] from the most recent acknowledged packet
    pub prior_delivered_bytes: u64,
    /// [PacketInfo::bytes_in_flight] from the most recent acknowledged packet
    pub bytes_in_flight: u32,
    /// [PacketInfo::lost_bytes] from the most recent acknowledged packet
    pub prior_lost_bytes: u64,
    /// [PacketInfo::ecn_ce_count] from the most recent acknowledged packet
    pub prior_ecn_ce_count: u64,
}

impl RateSample {
    /// Updates the rate sample with the most recent acknowledged packet
    fn on_ack(&mut self, packet_info: PacketInfo) {
        debug_assert!(
            packet_info.delivered_bytes >= self.prior_delivered_bytes,
            "on_ack should only be called with the newest acknowledged packet"
        );

        self.is_app_limited = packet_info.is_app_limited;
        self.prior_delivered_bytes = packet_info.delivered_bytes;
        self.prior_lost_bytes = packet_info.lost_bytes;
        self.prior_ecn_ce_count = packet_info.ecn_ce_count;
        self.bytes_in_flight = packet_info.bytes_in_flight;
    }

    /// Gets the delivery rate of this rate sample
    pub fn delivery_rate(&self) -> Bandwidth {
        Bandwidth::new(self.delivered_bytes, self.interval)
    }
}

impl IntoEvent<event::builder::RateSample> for RateSample {
    #[inline]
    fn into_event(self) -> event::builder::RateSample {
        event::builder::RateSample {
            interval: self.interval.into_event(),
            delivered_bytes: self.delivered_bytes.into_event(),
            lost_bytes: self.lost_bytes.into_event(),
            ecn_ce_count: self.ecn_ce_count.into_event(),
            is_app_limited: self.is_app_limited.into_event(),
            prior_delivered_bytes: self.prior_delivered_bytes.into_event(),
            bytes_in_flight: self.bytes_in_flight.into_event(),
            prior_lost_bytes: self.prior_lost_bytes.into_event(),
            prior_ecn_ce_count: self.prior_ecn_ce_count.into_event(),
            delivery_rate_bytes_per_second: self.delivery_rate().as_bytes_per_second().into_event(),
        }
    }
}

/// Bandwidth estimator as defined in [Delivery Rate Estimation](https://datatracker.ietf.org/doc/draft-cheng-iccrg-delivery-rate-estimation/)
/// and [BBR Congestion Control](https://datatracker.ietf.org/doc/draft-cardwell-iccrg-bbr-congestion-control/).
#[derive(Clone, Debug, Default)]
pub struct Estimator {
    //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#2.2
    //# The amount of data delivered MAY be tracked in units of either octets or packets.
    /// The total amount of data in bytes delivered so far over the lifetime of the path, not including
    /// non-congestion-controlled packets such as pure ACK packets.
    delivered_bytes: u64,
    /// The timestamp when delivered_bytes was last updated, or if the connection
    /// was recently idle, the send time of the first packet sent after resuming from idle.
    delivered_time: Option<Timestamp>,
    /// The total amount of data in bytes declared lost so far over the lifetime of the path, not including
    /// non-congestion-controlled packets such as pure ACK packets.
    lost_bytes: u64,
    /// The total amount of explicit congestion experienced packets over the lifetime of the path
    ecn_ce_count: u64,
    /// The send time of the packet that was most recently marked as delivered, or if the connection
    /// was recently idle, the send time of the first packet sent after resuming from idle.
    first_sent_time: Option<Timestamp>,
    /// The `delivered_bytes` that marks the end of the current application-limited period, or
    /// `None` if the connection is not currently application-limited.
    app_limited_delivered_bytes: Option<u64>,
    rate_sample: RateSample,
}

impl Estimator {
    /// Gets the total amount of data in bytes delivered so far over the lifetime of the path, not including
    /// non-congestion-controlled packets such as pure ACK packets.
    pub fn delivered_bytes(&self) -> u64 {
        self.delivered_bytes
    }

    /// Gets the total amount of data in bytes lost so far over the lifetime of the path
    pub fn lost_bytes(&self) -> u64 {
        self.lost_bytes
    }

    /// Gets the latest [RateSample]
    pub fn rate_sample(&self) -> RateSample {
        self.rate_sample
    }

    /// Returns true if the path is currently in an application-limited period
    pub fn is_app_limited(&self) -> bool {
        self.app_limited_delivered_bytes.is_some()
    }

    /// Called when a packet is transmitted
    pub fn on_packet_sent(
        &mut self,
        prior_bytes_in_flight: u32,
        sent_bytes: usize,
        app_limited: Option<bool>,
        now: Timestamp,
    ) -> PacketInfo {
        //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.2
        //# If there are no packets in flight yet, then we can start the delivery rate interval
        //# at the current time, since we know that any ACKs after now indicate that the network
        //# was able to deliver those packets completely in the sampling interval between now
        //# and the next ACK.

        //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.2
        //# Upon each packet transmission, the sender executes the following steps:
        //#
        //# SendPacket(Packet P):
        //#   if (SND.NXT == SND.UNA)  /* no packets in flight yet? */
        //#     C.first_sent_time  = C.delivered_time = Now()
        //#   P.first_sent_time = C.first_sent_time
        //#   P.delivered_time  = C.delivered_time
        //#   P.delivered       = C.delivered
        //#   P.is_app_limited  = (C.app_limited != 0)
        if prior_bytes_in_flight == 0 {
            self.first_sent_time = Some(now);
            self.delivered_time = Some(now);
        }

        let bytes_in_flight = prior_bytes_in_flight.saturating_add(sent_bytes as u32);

        if app_limited.unwrap_or(true) {
            self.on_app_limited(bytes_in_flight);
        }

        PacketInfo {
            delivered_bytes: self.delivered_bytes,
            delivered_time: self
                .delivered_time
                .expect("initialized on first sent packet"),
            lost_bytes: self.lost_bytes,
            ecn_ce_count: self.ecn_ce_count,
            first_sent_time: self
                .first_sent_time
                .expect("initialized on first sent packet"),
            bytes_in_flight,
            is_app_limited: self.app_limited_delivered_bytes.is_some(),
        }
    }

    //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.3
    //# For each packet that was newly SACKed or ACKed, UpdateRateSample() updates the
    //# rate sample based on a snapshot of connection delivery information from the time
    //# at which the packet was last transmitted.
    /// Called for each acknowledgement of one or more packets
    pub fn on_ack<Pub: Publisher>(
        &mut self,
        bytes_acknowledged: usize,
        newest_acked_time_sent: Timestamp,
        newest_acked_packet_info: PacketInfo,
        now: Timestamp,
        publisher: &mut Pub,
    ) {
        self.delivered_bytes += bytes_acknowledged as u64;
        self.delivered_time = Some(now);

        let is_app_limited_period_over =
            |app_limited_bytes| self.delivered_bytes > app_limited_bytes;
        if self
            .app_limited_delivered_bytes
            .map_or(false, is_app_limited_period_over)
        {
            // Clear app-limited field if bubble is ACKed and gone
            self.app_limited_delivered_bytes = None;
        }

        //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#3.3
        //# UpdateRateSample() is invoked multiple times when a stretched ACK acknowledges
        //# multiple data packets. In this case we use the information from the most recently
        //# sent packet, i.e., the packet with the highest "P.delivered" value.
        if self.rate_sample.prior_delivered_bytes == 0
            || newest_acked_packet_info.delivered_bytes > self.rate_sample.prior_delivered_bytes
        {
            // Update info using the newest packet
            self.rate_sample.on_ack(newest_acked_packet_info);
            self.first_sent_time = Some(newest_acked_time_sent);

            let send_elapsed = newest_acked_time_sent - newest_acked_packet_info.first_sent_time;
            let ack_elapsed = now - newest_acked_packet_info.delivered_time;

            //= https://tools.ietf.org/id/draft-cheng-iccrg-delivery-rate-estimation-02#2.2.4
            //# Since it is physically impossible to have data delivered faster than it is sent
            //# in a sustained fashion, when the estimator notices that the ack_rate for a flight
            //# is faster than the send rate for the flight, it filters out the implausible ack_rate
            //# by capping the delivery rate sample to be no higher than the send rate.
            self.rate_sample.interval = max(send_elapsed, ack_elapsed);
        }

        self.rate_sample.delivered_bytes =
            self.delivered_bytes - self.rate_sample.prior_delivered_bytes;
        // Lost bytes and ECN CE Count are updated here as well as in `on_loss` and `on_explicit_congestion`
        // so the values are up to date even when no loss or ECN CE markings are received.
        self.rate_sample.lost_bytes = self.lost_bytes - self.rate_sample.prior_lost_bytes;
        self.rate_sample.ecn_ce_count = self.ecn_ce_count - self.rate_sample.prior_ecn_ce_count;

        publisher.on_delivery_rate_sampled(self.rate_sample);
    }

    /// Called when packets are declared lost
    #[inline]
    pub fn on_loss(&mut self, lost_bytes: usize) {
        self.lost_bytes += lost_bytes as u64;
        self.rate_sample.lost_bytes = self.lost_bytes - self.rate_sample.prior_lost_bytes;

        // Move the app-limited period earlier as the lost bytes will not be delivered
        if let Some(ref mut app_limited_delivered_bytes) = self.app_limited_delivered_bytes {
            *app_limited_delivered_bytes =
                app_limited_delivered_bytes.saturating_sub(lost_bytes as u64)
        }
    }

    /// Called when packets are discarded
    #[inline]
    pub fn on_packet_discarded(&mut self, bytes_sent: usize) {
        // Move the app-limited period earlier as the discarded bytes will not be delivered
        if let Some(ref mut app_limited_delivered_bytes) = self.app_limited_delivered_bytes {
            *app_limited_delivered_bytes =
                app_limited_delivered_bytes.saturating_sub(bytes_sent as u64)
        }
    }

    /// Called each time explicit congestion is recorded
    #[inline]
    pub fn on_explicit_congestion(&mut self, ce_count: u64) {
        self.ecn_ce_count += ce_count;
        self.rate_sample.ecn_ce_count = self.ecn_ce_count - self.rate_sample.prior_ecn_ce_count;
    }

    /// Mark the path as app limited until the given `bytes_in_flight` are acknowledged
    pub fn on_app_limited(&mut self, bytes_in_flight: u32) {
        //= https://tools.ietf.org/id/draft-cardwell-iccrg-bbr-congestion-control-02#4.3.4.4
        //# MarkConnectionAppLimited():
        //#   C.app_limited =
        //#     (C.delivered + packets_in_flight) ? : 1
        self.app_limited_delivered_bytes = Some(self.delivered_bytes + bytes_in_flight as u64);
    }

    #[cfg(test)]
    pub fn set_rate_sample_for_test(&mut self, rate_sample: RateSample) {
        self.rate_sample = rate_sample
    }

    #[cfg(test)]
    pub fn set_delivered_bytes_for_test(&mut self, delivered_bytes: u64) {
        self.delivered_bytes = delivered_bytes
    }
}

#[cfg(test)]
mod tests;