quiche 0.24.4

🥧 Savoury implementation of the QUIC transport protocol and HTTP/3
Documentation
// Copyright (C) 2022, Cloudflare, Inc.
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
//     * Redistributions of source code must retain the above copyright notice,
//       this list of conditions and the following disclaimer.
//
//     * Redistributions in binary form must reproduce the above copyright
//       notice, this list of conditions and the following disclaimer in the
//       documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

//! Pacer provides the timestamp for the next packet to be sent based on the
//! current send_quantum, pacing rate and last updated time.
//!
//! It's a kind of leaky bucket algorithm (RFC9002, 7.7 Pacing) but it considers
//! max burst (send_quantum, in bytes) and provide the same timestamp for the
//! same sized packets (except last one) to be GSO friendly, assuming we send
//! packets using multiple sendmsg(), a sendmmsg(), or sendmsg() with GSO
//! without waiting for new I/O events.
//!
//! After sending a burst of packets, the next timestamp will be updated based
//! on the current pacing rate. It will make actual timestamp sent and recorded
//! timestamp (Sent.time_sent) as close as possible. If GSO is not used, it will
//! still try to provide close timestamp if the send burst is implemented.

use std::time::Duration;
use std::time::Instant;

#[derive(Debug)]
pub struct Pacer {
    /// Whether pacing is enabled.
    enabled: bool,

    /// Bucket capacity (bytes).
    capacity: usize,

    /// Bucket used (bytes).
    used: usize,

    /// Sending pacing rate (bytes/sec).
    rate: u64,

    /// Timestamp of the last packet sent time update.
    last_update: Instant,

    /// Timestamp of the next packet to be sent.
    next_time: Instant,

    /// Current MSS.
    max_datagram_size: usize,

    /// Last packet size.
    last_packet_size: Option<usize>,

    /// Interval to be added in next burst.
    iv: Duration,

    /// Max pacing rate (bytes/sec).
    max_pacing_rate: Option<u64>,
}

impl Pacer {
    pub fn new(
        enabled: bool, capacity: usize, rate: u64, max_datagram_size: usize,
        max_pacing_rate: Option<u64>,
    ) -> Self {
        // Round capacity to MSS.
        let capacity = capacity / max_datagram_size * max_datagram_size;
        let pacing_rate = if let Some(max_rate) = max_pacing_rate {
            max_rate.min(rate)
        } else {
            rate
        };

        Pacer {
            enabled,

            capacity,

            used: 0,

            rate: pacing_rate,

            last_update: Instant::now(),

            next_time: Instant::now(),

            max_datagram_size,

            last_packet_size: None,

            iv: Duration::ZERO,

            max_pacing_rate,
        }
    }

    /// Returns whether pacing is enabled.
    pub fn enabled(&self) -> bool {
        self.enabled
    }

    /// Returns the current pacing rate.
    pub fn rate(&self) -> u64 {
        self.rate
    }

    /// Returns max pacing rate.
    pub fn max_pacing_rate(&self) -> Option<u64> {
        self.max_pacing_rate
    }

    /// Updates the bucket capacity or pacing_rate.
    pub fn update(&mut self, capacity: usize, rate: u64, now: Instant) {
        let capacity = capacity / self.max_datagram_size * self.max_datagram_size;

        if self.capacity != capacity {
            self.reset(now);
        }

        self.capacity = capacity;

        self.rate = if let Some(max_rate) = self.max_pacing_rate {
            max_rate.min(rate)
        } else {
            rate
        };
    }

    /// Resets the pacer for the next burst.
    fn reset(&mut self, now: Instant) {
        self.used = 0;

        self.last_update = now;

        self.next_time = self.next_time.max(now);

        self.last_packet_size = None;

        self.iv = Duration::ZERO;
    }

    /// Updates the timestamp for the packet to send.
    pub fn send(&mut self, packet_size: usize, now: Instant) {
        if self.rate() == 0 {
            self.reset(now);

            return;
        }

        if !self.iv.is_zero() {
            self.next_time = self.next_time.max(now) + self.iv;

            self.iv = Duration::ZERO;
        }

        let interval =
            Duration::from_secs_f64(self.capacity as f64 / self.rate() as f64);

        let elapsed = now.saturating_duration_since(self.last_update);

        // If too old, reset it.
        if elapsed > interval {
            self.reset(now);
        }

        self.used += packet_size;

        let same_size = if let Some(last_packet_size) = self.last_packet_size {
            last_packet_size == packet_size
        } else {
            true
        };

        self.last_packet_size = Some(packet_size);

        if self.used >= self.capacity || !same_size {
            self.iv =
                Duration::from_secs_f64(self.used as f64 / self.rate() as f64);

            self.used = 0;

            self.last_update = now;

            self.last_packet_size = None;
        };
    }

    /// Returns the timestamp for the next packet.
    pub fn next_time(&self) -> Instant {
        self.next_time
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn pacer_update() {
        let datagram_size = 1200;
        let max_burst = datagram_size * 10;
        let pacing_rate = 100_000;

        let mut p = Pacer::new(true, max_burst, pacing_rate, datagram_size, None);

        let now = Instant::now();

        // Send 6000 (half of max_burst) -> no timestamp change yet.
        p.send(6000, now);

        assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));

        // Send 6000 bytes -> max_burst filled.
        p.send(6000, now);

        assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));

        // Start of a new burst.
        let now = now + Duration::from_millis(5);

        // Send 1000 bytes and next_time is updated.
        p.send(1000, now);

        let interval = max_burst as f64 / pacing_rate as f64;

        assert_eq!(p.next_time() - now, Duration::from_secs_f64(interval));
    }

    #[test]
    /// Same as pacer_update() but adds some idle time between transfers to
    /// trigger a reset.
    fn pacer_idle() {
        let datagram_size = 1200;
        let max_burst = datagram_size * 10;
        let pacing_rate = 100_000;

        let mut p = Pacer::new(true, max_burst, pacing_rate, datagram_size, None);

        let now = Instant::now();

        // Send 6000 (half of max_burst) -> no timestamp change yet.
        p.send(6000, now);

        assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));

        // Sleep 200ms to reset the idle pacer (at least 120ms).
        let now = now + Duration::from_millis(200);

        // Send 6000 bytes -> idle reset and a new burst  isstarted.
        p.send(6000, now);

        assert_eq!(p.next_time(), now);
    }

    #[test]
    fn pacer_set_max_pacing_rate() {
        let datagram_size = 1200;
        let max_burst = datagram_size * 10;
        let pacing_rate = 100_000;
        let max_pacing_rate = 50_000;

        // Use the max_pacing_rate.
        let mut p = Pacer::new(
            true,
            max_burst,
            pacing_rate,
            datagram_size,
            Some(max_pacing_rate),
        );

        let now = Instant::now();

        // Send 6000 (half of max_burst) -> no timestamp change yet.
        p.send(6000, now);

        assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));

        // Send 6000 bytes -> max_burst filled.
        p.send(6000, now);

        assert!(now.duration_since(p.next_time()) < Duration::from_millis(1));

        // Start of a second burst.
        let now = now + Duration::from_millis(5);
        p.send(12000, now);

        let second_burst_send_time = p.next_time();

        let interval = max_burst as f64 / max_pacing_rate as f64;

        assert_eq!(
            second_burst_send_time - now,
            Duration::from_secs_f64(interval)
        );

        // Start of third burst
        let now = now + Duration::from_millis(5);

        // Update pacer rate.
        p.update(max_burst, 75_000, now);

        p.send(12000, now);

        let third_burst_send_time = p.next_time();

        assert_eq!(
            third_burst_send_time - second_burst_send_time,
            Duration::from_secs_f64(interval)
        );
    }
}