use std::collections::VecDeque;
use std::time::{Duration, Instant};
use super::{ProbeClusterConfig, ProbeKind};
use crate::rtp_::{Bitrate, TwccClusterId};
use crate::util::{already_happened, not_happening};
const MAX_WAITING_TIME_FOR_PROBING_RESULT: Duration = Duration::from_secs(1);
const BITRATE_DROP_THRESHOLD: f64 = 0.66;
const BITRATE_DROP_TIMEOUT: Duration = Duration::from_secs(5);
const PROBE_FRACTION_AFTER_DROP: f64 = 0.85;
const PROBE_UNCERTAINTY: f64 = 0.05;
const ALR_ENDED_TIMEOUT: Duration = Duration::from_secs(3);
const MIN_TIME_BETWEEN_ALR_PROBES: Duration = Duration::from_secs(5);
const MAX_PROBE_BITRATE_FACTOR: f64 = 2.0;
const MIN_TIME_BETWEEN_STAGNANT_PROBES: Duration = Duration::from_secs(15);
const ESTIMATE_CHANGE_THRESHOLD: f64 = 0.05;
const STAGNANT_PROBE_SCALE: f64 = 2.0;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BandwidthLimitedCause {
LossLimitedBweIncreasing,
LossLimitedBwe,
DelayBasedLimited,
DelayBasedLimitedDelayIncreased,
}
pub struct ProbeControl {
config: Config,
next_timeout: Instant,
enabled: bool,
desired_bitrate: Option<Bitrate>,
prev_desired: Option<Bitrate>,
last_estimate: Option<Bitrate>,
last_estimate_change: Option<Instant>,
last_cause: BandwidthLimitedCause,
prev_estimate: Option<Bitrate>,
alr_start: Option<Instant>,
alr_stop: Option<Instant>,
last_probe: Option<LastProbe>,
large_drop: Option<LargeDrop>,
last_stagnant: Option<Instant>,
next_cluster_id: TwccClusterId,
pending: VecDeque<ProbeClusterConfig>,
scheduled_exponential: Option<Instant>,
scheduled_periodic_alr: Option<Instant>,
scheduled_stagnant: Option<Instant>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
struct LastProbe {
when: Instant,
kind: ProbeKind,
further: Bitrate,
was_estimate: Option<Bitrate>,
}
struct LargeDrop {
when: Instant,
bitrate_before: Bitrate,
}
impl Default for ProbeControl {
fn default() -> Self {
Self {
config: Config::default(),
enabled: false,
next_timeout: not_happening(),
desired_bitrate: None,
prev_desired: None,
last_estimate: None,
last_estimate_change: None,
last_cause: BandwidthLimitedCause::DelayBasedLimited,
prev_estimate: None,
alr_start: None,
alr_stop: None,
next_cluster_id: 0.into(),
last_probe: None,
large_drop: None,
last_stagnant: None,
pending: VecDeque::new(),
scheduled_exponential: None,
scheduled_periodic_alr: None,
scheduled_stagnant: None,
}
}
}
impl ProbeControl {
pub fn new() -> Self {
Self::default()
}
pub fn enable(&mut self, v: bool) {
if !self.enabled && v {
self.enabled = true;
self.request_immediate();
} else if self.enabled && !v {
self.enabled = false;
self.pending.clear();
self.last_estimate = None;
self.desired_bitrate = None;
self.last_estimate_change = None;
self.last_stagnant = None;
self.last_probe = None;
self.prev_estimate = None;
self.scheduled_exponential = None;
self.scheduled_periodic_alr = None;
self.scheduled_stagnant = None;
self.next_timeout = not_happening();
}
}
pub fn set_desired_bitrate(&mut self, v: Bitrate) {
if self.desired_bitrate.is_none() && v.is_zero() {
return;
}
self.desired_bitrate = Some(v);
self.request_immediate();
}
pub fn set_estimated_bitrate(&mut self, v: Bitrate, cause: BandwidthLimitedCause) {
if self.last_estimate.is_none() && v.is_zero() {
return;
}
let dominated_by_last = self.last_estimate.is_some_and(|last| {
let upper = last * (1.0 + ESTIMATE_CHANGE_THRESHOLD);
let lower = last * (1.0 - ESTIMATE_CHANGE_THRESHOLD);
v <= upper && v >= lower
});
if dominated_by_last && self.last_cause == cause {
return;
}
self.last_estimate = Some(v);
self.last_cause = cause;
self.request_immediate();
}
pub fn set_alr_start_time(&mut self, t: Instant) {
if self.alr_start.is_some() {
return;
}
self.alr_start = Some(t);
self.alr_stop = None;
self.request_immediate();
}
pub fn set_alr_stop_time(&mut self, t: Instant) {
if self.alr_start.is_none() || self.alr_stop.is_some() {
return;
}
self.alr_start = None;
self.alr_stop = Some(t);
self.request_immediate();
}
fn request_immediate(&mut self) {
self.next_timeout = already_happened();
self.scheduled_exponential = None;
self.scheduled_periodic_alr = None;
self.scheduled_stagnant = None;
}
pub fn poll_timeout(&self) -> Instant {
self.next_timeout
}
pub fn handle_timeout(&mut self, now: Instant) -> Option<ProbeClusterConfig> {
if now < self.next_timeout {
return None;
}
self.next_timeout = not_happening();
if !self.enabled {
return None;
}
let desired = self.desired_bitrate?;
let estimate = self.last_estimate?;
if let Some(config) = self.pending.pop_front() {
self.request_immediate();
return Some(config);
}
if !self.can_probe(estimate) {
return None;
}
let _ = self.maybe_initial(now, desired, estimate)
|| self.maybe_exponential(now, desired, estimate)
|| self.maybe_increase_alr(now, desired, estimate)
|| self.maybe_large_drop(now, desired, estimate)
|| self.maybe_periodic_alr(now, desired)
|| self.maybe_stagnant(now, desired, estimate);
self.update_estimate_change(now, estimate);
self.prev_estimate = Some(estimate);
self.next_timeout = self.compute_next_timeout(now);
if !self.pending.is_empty() {
self.request_immediate();
}
self.pending.pop_front()
}
fn update_estimate_change(&mut self, now: Instant, estimate: Bitrate) {
if let Some(prev) = self.prev_estimate {
if estimate != prev {
self.last_estimate_change = Some(now);
}
}
if self.last_estimate_change.is_none() {
self.last_estimate_change = Some(now);
}
}
fn maybe_initial(&mut self, now: Instant, desired: Bitrate, estimate: Bitrate) -> bool {
if self.last_probe.is_some() {
return false;
}
let p1 = estimate * self.config.first_exponential_probe_scale;
let p2 = estimate * self.config.second_exponential_probe_scale;
self.queue_probe(p1, ProbeKind::Initial, desired, now);
self.queue_probe(p2, ProbeKind::Initial, desired, now);
true
}
fn maybe_exponential(&mut self, now: Instant, desired: Bitrate, estimate: Bitrate) -> bool {
if !self.pending.is_empty() {
return false;
}
let Some(last) = self.last_probe else {
return false;
};
if estimate < last.further {
return false;
}
let is_same = Some(estimate) == last.was_estimate;
let time_since = self.time_since_last_probe(now);
if is_same && time_since < MAX_WAITING_TIME_FOR_PROBING_RESULT {
return false;
}
let scale = self.last_cause.probe_scale(&self.config);
let target = estimate * scale;
let max = desired * MAX_PROBE_BITRATE_FACTOR;
if target >= max && last.further >= max * self.config.further_probe_threshold {
return false;
}
self.queue_probe(target, ProbeKind::Exponential, desired, now);
true
}
fn maybe_increase_alr(&mut self, now: Instant, desired: Bitrate, estimate: Bitrate) -> bool {
if self.is_during_initial(now) {
return false;
}
if !self.in_alr() {
return false;
}
let prev = self.prev_desired;
self.prev_desired = Some(desired);
let Some(prev) = prev else {
return false;
};
if desired <= prev {
return false;
}
if desired <= estimate {
return false;
}
let current_bwe_limit = estimate * self.config.allocation_probe_limit_by_current_scale;
let p1 = (desired * self.config.first_allocation_probe_scale).min(current_bwe_limit);
self.queue_probe(p1, ProbeKind::IncreaseAlr, desired, now);
let p2 = desired * self.config.second_allocation_probe_scale;
if p2 <= current_bwe_limit && p2 > p1 {
self.queue_probe(p2, ProbeKind::IncreaseAlr, desired, now);
}
true
}
fn maybe_periodic_alr(&mut self, now: Instant, desired: Bitrate) -> bool {
if self.is_during_initial(now) {
return false;
}
if !self.in_alr() {
return false;
}
if self.time_since_last_probe(now) < MIN_TIME_BETWEEN_ALR_PROBES {
return false;
}
let target = desired * self.config.further_exponential_probe_scale;
self.queue_probe(target, ProbeKind::PeriodicAlr, desired, now);
true
}
fn maybe_stagnant(&mut self, now: Instant, desired: Bitrate, estimate: Bitrate) -> bool {
if self.is_during_initial(now) {
return false;
}
if self.in_alr() {
return false;
}
let Some(last_change) = self.last_estimate_change else {
return false;
};
if now.saturating_duration_since(last_change) < MIN_TIME_BETWEEN_STAGNANT_PROBES {
return false;
}
if desired <= estimate {
return false;
}
if let Some(last_probe) = self.last_stagnant {
if now.saturating_duration_since(last_probe) < MIN_TIME_BETWEEN_STAGNANT_PROBES {
return false;
}
}
let probe_rate = estimate * STAGNANT_PROBE_SCALE;
self.queue_probe(probe_rate, ProbeKind::Stagnant, desired, now);
self.last_stagnant = Some(now);
true
}
fn maybe_large_drop(&mut self, now: Instant, desired: Bitrate, estimate: Bitrate) -> bool {
if self.is_during_initial(now) {
return false;
}
if self.large_drop.is_none() {
if let Some(prev) = self.prev_estimate {
if estimate < prev * BITRATE_DROP_THRESHOLD {
self.large_drop = Some(LargeDrop {
when: now,
bitrate_before: prev,
});
}
}
}
let Some(drop) = &self.large_drop else {
return false;
};
if now.saturating_duration_since(drop.when) > BITRATE_DROP_TIMEOUT {
self.large_drop = None;
return false;
}
if !self.in_alr() && !self.alr_ended_recently(now) {
return false;
}
if self.time_since_last_probe(now) < MIN_TIME_BETWEEN_ALR_PROBES {
return false;
}
let target = drop.bitrate_before * PROBE_FRACTION_AFTER_DROP;
self.queue_probe(target, ProbeKind::LargeDrop, desired, now);
self.large_drop = None;
true
}
fn queue_probe(&mut self, bitrate: Bitrate, kind: ProbeKind, desired: Bitrate, now: Instant) {
let max = desired * MAX_PROBE_BITRATE_FACTOR;
let bitrate = bitrate.min(max);
if bitrate < Bitrate::kbps(5) {
return;
}
let cluster_id = self.next_cluster_id.inc();
let config = ProbeClusterConfig::new(cluster_id, bitrate, kind)
.with_min_packet_count(self.config.min_probe_packets_sent)
.with_duration(self.config.min_probe_duration)
.with_min_probe_delta(self.config.min_probe_delta);
let probe_further = bitrate * self.config.further_probe_threshold;
self.pending.push_back(config);
self.last_probe = Some(LastProbe {
when: now,
kind,
further: probe_further,
was_estimate: self.last_estimate,
});
}
fn compute_next_timeout(&mut self, now: Instant) -> Instant {
if let Some(last) = &self.last_probe {
if matches!(last.kind, ProbeKind::Initial | ProbeKind::Exponential) {
if self.scheduled_exponential.is_none() {
self.scheduled_exponential = Some(now + MAX_WAITING_TIME_FOR_PROBING_RESULT);
}
return self.scheduled_exponential.unwrap();
}
}
if self.in_alr() {
if self.scheduled_periodic_alr.is_none() {
self.scheduled_periodic_alr = Some(now + MIN_TIME_BETWEEN_ALR_PROBES);
}
return self.scheduled_periodic_alr.unwrap();
}
if !self.in_alr() {
if self.scheduled_stagnant.is_none() {
self.scheduled_stagnant = Some(now + MIN_TIME_BETWEEN_STAGNANT_PROBES);
}
return self.scheduled_stagnant.unwrap();
}
not_happening()
}
fn can_probe(&self, estimate: Bitrate) -> bool {
if estimate == Bitrate::INFINITY {
return false;
}
matches!(
self.last_cause,
BandwidthLimitedCause::LossLimitedBweIncreasing
| BandwidthLimitedCause::DelayBasedLimited
)
}
fn in_alr(&self) -> bool {
self.alr_start.is_some() && self.alr_stop.is_none()
}
fn alr_ended_recently(&self, now: Instant) -> bool {
self.alr_stop
.map(|stop| now.saturating_duration_since(stop) < ALR_ENDED_TIMEOUT)
.unwrap_or(false)
}
fn is_during_initial(&self, now: Instant) -> bool {
let is_initial = matches!(
self.last_probe.map(|p| p.kind),
Some(ProbeKind::Initial) | Some(ProbeKind::Exponential)
);
is_initial && self.time_since_last_probe(now) <= MAX_WAITING_TIME_FOR_PROBING_RESULT
}
fn last_when(&self) -> Option<Instant> {
self.last_probe.map(|p| p.when)
}
fn time_since_last_probe(&self, now: Instant) -> Duration {
self.last_when()
.map(|t| now.saturating_duration_since(t))
.unwrap_or(Duration::MAX)
}
}
#[derive(Debug, Clone, Copy)]
struct Config {
first_exponential_probe_scale: f64, second_exponential_probe_scale: f64, further_exponential_probe_scale: f64, further_probe_threshold: f64,
first_allocation_probe_scale: f64, second_allocation_probe_scale: f64, allocation_probe_limit_by_current_scale: f64,
min_probe_packets_sent: usize, min_probe_duration: Duration, min_probe_delta: Duration,
loss_limited_probe_scale: f64, }
impl Default for Config {
fn default() -> Self {
Self {
first_exponential_probe_scale: 3.0,
second_exponential_probe_scale: 6.0,
further_exponential_probe_scale: 2.0,
further_probe_threshold: 0.7,
first_allocation_probe_scale: 1.0,
second_allocation_probe_scale: 2.0,
allocation_probe_limit_by_current_scale: 2.0,
min_probe_packets_sent: 5,
min_probe_duration: Duration::from_millis(15),
min_probe_delta: Duration::from_millis(2),
loss_limited_probe_scale: 1.5,
}
}
}
impl BandwidthLimitedCause {
fn probe_scale(&self, config: &Config) -> f64 {
match self {
BandwidthLimitedCause::LossLimitedBweIncreasing => {
config.loss_limited_probe_scale * (1.0 + PROBE_UNCERTAINTY)
}
_ => config.further_exponential_probe_scale,
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn initial_exponential_probes_are_queued_and_emitted_one_per_tick() {
let mut pc = ProbeControl::new();
pc.enable(true);
let now = Instant::now();
pc.set_desired_bitrate(Bitrate::mbps(50));
pc.set_estimated_bitrate(Bitrate::kbps(300), BandwidthLimitedCause::DelayBasedLimited);
let p1 = pc.handle_timeout(now).unwrap();
assert_eq!(pc.poll_timeout(), already_happened());
let p2 = pc.handle_timeout(now).unwrap();
assert_eq!(p1.target_bitrate(), Bitrate::kbps(900));
assert_eq!(p2.target_bitrate(), Bitrate::kbps(1800));
assert_eq!(p1.min_packet_count(), 5);
assert_eq!(p1.min_probe_delta(), Duration::from_millis(2));
assert!(!p1.is_alr_probe());
assert!(pc.handle_timeout(now).is_none());
}
#[test]
fn further_probe_is_triggered_when_probe_result_is_high_enough() {
let mut pc = ProbeControl::new();
pc.enable(true);
let now = Instant::now();
pc.enable(true);
pc.set_desired_bitrate(Bitrate::mbps(50));
pc.set_estimated_bitrate(Bitrate::mbps(1), BandwidthLimitedCause::DelayBasedLimited);
let _ = pc.handle_timeout(now).unwrap();
let _ = pc.handle_timeout(now).unwrap();
pc.set_estimated_bitrate(Bitrate::mbps(5), BandwidthLimitedCause::DelayBasedLimited);
let p = pc.handle_timeout(now + Duration::from_millis(10)).unwrap();
assert_eq!(p.target_bitrate(), Bitrate::mbps(10));
}
#[test]
fn allocation_probe_is_triggered_in_alr_when_allocation_increases() {
let mut pc = ProbeControl::new();
pc.enable(true);
let now = Instant::now();
pc.set_desired_bitrate(Bitrate::mbps(1));
pc.set_estimated_bitrate(Bitrate::mbps(1), BandwidthLimitedCause::DelayBasedLimited);
let _ = pc.handle_timeout(now).unwrap();
let _ = pc.handle_timeout(now).unwrap();
assert!(pc.handle_timeout(now + Duration::from_secs(2)).is_none());
pc.set_alr_start_time(now + Duration::from_secs(2));
assert!(pc.handle_timeout(now + Duration::from_secs(2)).is_none());
pc.set_desired_bitrate(Bitrate::mbps(4));
let p = pc.handle_timeout(now + Duration::from_secs(2)).unwrap();
assert_eq!(p.target_bitrate(), Bitrate::mbps(2));
}
#[test]
fn handles_bitrate_infinity_without_panic() {
let mut pc = ProbeControl::new();
pc.enable(true);
let now = Instant::now();
pc.set_desired_bitrate(Bitrate::mbps(50));
pc.set_estimated_bitrate(Bitrate::INFINITY, BandwidthLimitedCause::DelayBasedLimited);
assert!(pc.handle_timeout(now).is_none());
}
#[test]
fn handles_clock_skew_gracefully() {
let mut pc = ProbeControl::new();
pc.enable(true);
let now = Instant::now();
pc.set_desired_bitrate(Bitrate::mbps(50));
pc.set_estimated_bitrate(Bitrate::kbps(300), BandwidthLimitedCause::DelayBasedLimited);
let _ = pc.handle_timeout(now);
let _ = pc.handle_timeout(now);
let earlier = now - Duration::from_secs(5);
let _ = pc.handle_timeout(earlier);
let _ = pc.handle_timeout(now + Duration::from_secs(1));
}
#[test]
fn handles_max_bitrate_zero() {
let mut pc = ProbeControl::new();
pc.enable(true);
let now = Instant::now();
pc.set_desired_bitrate(Bitrate::ZERO);
pc.set_estimated_bitrate(Bitrate::kbps(300), BandwidthLimitedCause::DelayBasedLimited);
let p1 = pc.handle_timeout(now);
assert!(p1.is_none(), "Should not create probes with zero desired");
}
#[test]
fn allocation_probe_fires_when_desired_increases_in_alr() {
let mut pc = ProbeControl::new();
pc.enable(true);
let now = Instant::now();
pc.set_desired_bitrate(Bitrate::kbps(500));
pc.set_estimated_bitrate(Bitrate::kbps(500), BandwidthLimitedCause::DelayBasedLimited);
let _ = pc.handle_timeout(now);
let _ = pc.handle_timeout(now);
assert!(pc.handle_timeout(now + Duration::from_secs(2)).is_none());
pc.set_alr_start_time(now + Duration::from_secs(3));
assert!(pc.handle_timeout(now + Duration::from_secs(3)).is_none());
pc.set_desired_bitrate(Bitrate::mbps(4));
let probe = pc.handle_timeout(now + Duration::from_secs(3));
assert!(
probe.is_some(),
"Allocation probe should trigger when desired increases in ALR"
);
}
#[test]
fn large_drop_probing_after_alr_ended() {
let mut pc = ProbeControl::new();
pc.enable(true);
let now = Instant::now();
pc.set_desired_bitrate(Bitrate::mbps(5));
pc.set_estimated_bitrate(Bitrate::mbps(5), BandwidthLimitedCause::DelayBasedLimited);
let _ = pc.handle_timeout(now);
let _ = pc.handle_timeout(now);
assert!(pc.handle_timeout(now + Duration::from_secs(2)).is_none());
pc.set_alr_start_time(now + Duration::from_secs(2));
pc.set_alr_stop_time(now + Duration::from_secs(3));
pc.set_estimated_bitrate(Bitrate::mbps(3), BandwidthLimitedCause::DelayBasedLimited);
let later = now + Duration::from_secs(5);
let p = pc.handle_timeout(later);
assert!(p.is_some(), "Large-drop recovery should schedule probe");
if let Some(probe) = p {
assert!(probe.target_bitrate() >= Bitrate::mbps(4));
assert!(probe.target_bitrate() <= Bitrate::mbps(5));
}
}
#[test]
fn allocation_probe_requires_desired_increase_in_alr() {
let mut pc = ProbeControl::new();
pc.enable(true);
let now = Instant::now();
pc.set_desired_bitrate(Bitrate::mbps(5));
pc.set_estimated_bitrate(Bitrate::mbps(1), BandwidthLimitedCause::DelayBasedLimited);
let _ = pc.handle_timeout(now);
let _ = pc.handle_timeout(now);
assert!(pc.handle_timeout(now + Duration::from_secs(2)).is_none());
pc.set_alr_start_time(now + Duration::from_secs(2));
let probe = pc.handle_timeout(now + Duration::from_secs(2));
assert!(
probe.is_none(),
"Should NOT trigger allocation probe on ALR entry alone"
);
pc.set_desired_bitrate(Bitrate::mbps(10));
let probe = pc.handle_timeout(now + Duration::from_secs(2));
assert!(
probe.is_some(),
"Should trigger allocation probe when desired increases in ALR"
);
}
#[test]
fn periodic_alr_probing() {
let mut pc = ProbeControl::new();
pc.enable(true);
let now = Instant::now();
pc.set_desired_bitrate(Bitrate::mbps(5));
pc.set_estimated_bitrate(Bitrate::mbps(1), BandwidthLimitedCause::DelayBasedLimited);
let _ = pc.handle_timeout(now);
let _ = pc.handle_timeout(now);
assert!(pc.handle_timeout(now + Duration::from_secs(2)).is_none());
pc.set_alr_start_time(now + Duration::from_secs(2));
assert!(pc.handle_timeout(now + Duration::from_secs(2)).is_none());
let probe = pc.handle_timeout(now + Duration::from_secs(7));
assert!(
probe.is_some(),
"Should trigger periodic ALR probe after 5 seconds in ALR"
);
assert!(probe.unwrap().is_alr_probe());
}
#[test]
fn periodic_alr_probing_continues_even_when_estimate_reaches_max() {
let mut pc = ProbeControl::new();
pc.enable(true);
let now = Instant::now();
pc.set_desired_bitrate(Bitrate::mbps(2));
pc.set_estimated_bitrate(Bitrate::mbps(1), BandwidthLimitedCause::DelayBasedLimited);
let _ = pc.handle_timeout(now);
let _ = pc.handle_timeout(now);
assert!(pc.handle_timeout(now + Duration::from_secs(2)).is_none());
pc.set_alr_start_time(now + Duration::from_secs(2));
assert!(pc.handle_timeout(now + Duration::from_secs(2)).is_none());
pc.set_estimated_bitrate(Bitrate::mbps(2), BandwidthLimitedCause::DelayBasedLimited);
let probe = pc.handle_timeout(now + Duration::from_secs(7));
assert!(
probe.is_some(),
"Should continue periodic probing in ALR even when estimate >= max_bitrate"
);
}
}