oxpulse-sfu-kit 0.11.0

Reusable multi-client SFU kit built on top of str0m. Simulcast, fanout, per-peer event routing.
Documentation
//! Per-subscriber state combining Kalman delay, loss, native GCC, and client hint.
//!
//! Ported from `oxpulse-partner-edge/crates/sfu/src/bandwidth/subscriber.rs`.

use std::collections::HashMap;
use std::time::{Duration, Instant};

#[cfg(feature = "googcc-bwe")]
use super::googcc::GoogCcEstimator;
use super::kalman::DelayEstimator;
use super::loss::LossEstimator;

/// Initial bitrate assigned to a new subscriber (bps).
const INITIAL_BITRATE_BPS: f64 = 300_000.0;

/// Maximum age of a client-reported budget hint before it is discarded.
const CLIENT_HINT_MAX_AGE: Duration = Duration::from_secs(5);

/// Browser-reported bandwidth budget from DataChannel `{"type":"budget","bps":N}`.
#[derive(Debug, Clone, Copy)]
pub struct ClientHint {
    /// Budget ceiling in bits per second.
    pub bps: u64,
    /// Monotonic timestamp when the hint was received.
    pub received_at: Instant,
}

/// Per-subscriber BWE state: delay estimate, loss estimate, native GCC ceiling,
/// browser hint ceiling, and send-time map for TWCC gradient computation.
#[derive(Debug)]
pub struct PerSubscriber {
    /// Map from extended RTP seq number -> send Instant, used to compute
    /// inter-send deltas for the TWCC gradient.
    pub send_times: HashMap<u64, Instant>,
    /// Arrival time of the last successfully received packet for gradient delta.
    pub last_arrival: Option<Instant>,
    /// Send time corresponding to `last_arrival` (i.e. the send time of the last
    /// *received* packet). Tracked separately from `send_times` so that a lost
    /// packet does not corrupt the inter-send delta on the next received packet.
    pub last_send_for_received: Option<Instant>,
    /// Kalman-filtered delay-based rate estimator.
    pub delay: DelayEstimator,
    /// Loss-window-based rate estimator.
    pub loss: LossEstimator,
    /// Estimated round-trip time (from RTCP SR/RR, if available).
    pub rtt: Option<Duration>,
    /// Native GCC estimate from str0m EgressBitrateEstimate event (ceiling).
    pub native_estimate_bps: Option<f64>,
    /// Browser-reported budget hint (additional ceiling, expires after 5s).
    pub client_hint: Option<ClientHint>,
    /// GoogCC v2 per-subscriber estimator (trendline + AIMD).
    ///
    /// When `Some`, its estimate is included as an additional ceiling in
    /// [`Self::combined_bps`]. Enabled via the `googcc-bwe` feature.
    #[cfg(feature = "googcc-bwe")]
    pub googcc: Option<GoogCcEstimator>,
}

impl PerSubscriber {
    /// Create new subscriber state at INITIAL_BITRATE_BPS.
    pub fn new() -> Self {
        Self {
            send_times: HashMap::new(),
            last_arrival: None,
            last_send_for_received: None,
            delay: DelayEstimator::new(INITIAL_BITRATE_BPS),
            loss: LossEstimator::new(INITIAL_BITRATE_BPS),
            rtt: None,
            native_estimate_bps: None,
            client_hint: None,
            #[cfg(feature = "googcc-bwe")]
            googcc: None,
        }
    }

    /// Combined bitrate estimate: min(kalman, loss) then apply GCC and hint ceilings.
    ///
    /// Returns at least 0; the result is not further clamped to MIN_BITRATE_BPS
    /// here so callers can distinguish "no estimate yet" from a floor-constrained value.
    #[must_use]
    pub fn combined_bps(&self, now: Instant) -> f64 {
        let base = self.delay.bitrate_bps().min(self.loss.bitrate_bps());

        let after_native = match self.native_estimate_bps {
            Some(native) => base.min(native),
            None => base,
        };

        #[cfg(feature = "googcc-bwe")]
        let after_native = match &self.googcc {
            Some(gcc) => after_native.min(gcc.current_bps() as f64),
            None => after_native,
        };

        let after_hint = match self.client_hint {
            Some(h) if now.duration_since(h.received_at) < CLIENT_HINT_MAX_AGE => {
                after_native.min(h.bps as f64)
            }
            _ => after_native,
        };

        after_hint.max(0.0)
    }
}

impl Default for PerSubscriber {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::time::Instant;

    #[test]
    fn combined_bps_takes_minimum_of_delay_and_loss() {
        let now = Instant::now();
        let mut sub = PerSubscriber::new();
        sub.delay = DelayEstimator::new(2_000_000.0);
        sub.loss = LossEstimator::new(1_000_000.0);
        let combined = sub.combined_bps(now);
        // Should take the minimum ~1_000_000
        assert!(
            combined <= 1_100_000.0,
            "expected approx 1Mbps (min of 2M and 1M), got {combined}"
        );
    }

    #[test]
    fn native_estimate_acts_as_ceiling() {
        let now = Instant::now();
        let mut sub = PerSubscriber::new();
        sub.delay = DelayEstimator::new(2_000_000.0);
        sub.loss = LossEstimator::new(2_000_000.0);
        sub.native_estimate_bps = Some(500_000.0);
        let combined = sub.combined_bps(now);
        assert!(
            combined <= 500_100.0,
            "native GCC ceiling not applied: {combined}"
        );
    }

    #[test]
    fn client_hint_acts_as_ceiling_when_fresh() {
        let now = Instant::now();
        let mut sub = PerSubscriber::new();
        sub.delay = DelayEstimator::new(2_000_000.0);
        sub.loss = LossEstimator::new(2_000_000.0);
        sub.client_hint = Some(ClientHint {
            bps: 400_000,
            received_at: now,
        });
        let combined = sub.combined_bps(now);
        assert!(
            combined <= 400_100.0,
            "client hint ceiling not applied: {combined}"
        );
    }

    #[test]
    fn stale_client_hint_is_ignored() {
        let past = Instant::now() - Duration::from_secs(10); // older than CLIENT_HINT_MAX_AGE
        let now = Instant::now();
        let mut sub = PerSubscriber::new();
        sub.delay = DelayEstimator::new(2_000_000.0);
        sub.loss = LossEstimator::new(2_000_000.0);
        sub.client_hint = Some(ClientHint {
            bps: 100,
            received_at: past,
        }); // absurdly small
        let combined = sub.combined_bps(now);
        // Hint is stale -> should not cap at 100 bps
        assert!(
            combined > 1_000.0,
            "stale client hint should be ignored, got {combined}"
        );
    }
}

#[cfg(all(test, feature = "googcc-bwe"))]
mod googcc_integration_tests {
    use super::*;
    use crate::bwe::googcc::GoogCcEstimator;

    #[test]
    fn googcc_acts_as_ceiling_when_set() {
        let now = Instant::now();
        let mut sub = PerSubscriber::new();
        // Force Kalman/loss high
        sub.delay = DelayEstimator::new(5_000_000.0);
        sub.loss = LossEstimator::new(5_000_000.0);
        // GoogCC at a lower estimate
        let mut gcc = GoogCcEstimator::new();
        gcc.force_bps_for_tests(300_000);
        sub.googcc = Some(gcc);

        let combined = sub.combined_bps(now);
        assert!(
            combined <= 300_100.0,
            "GoogCC ceiling not applied: {combined}"
        );
    }

    #[test]
    fn googcc_none_does_not_affect_combined() {
        let now = Instant::now();
        let mut sub = PerSubscriber::new();
        sub.delay = DelayEstimator::new(1_000_000.0);
        sub.loss = LossEstimator::new(1_000_000.0);
        // googcc = None (default)
        let combined = sub.combined_bps(now);
        // Should be ~1Mbps (from delay/loss), not artificially capped
        assert!(
            combined > 900_000.0,
            "missing GoogCC should not cap estimate: {combined}"
        );
    }
}