use std::cmp::Ordering;
use std::fmt;
use std::time::{Duration, Instant};
use crate::Reason;
use crate::rtp_::{Bitrate, DataSize, TwccClusterId, TwccSendRecord, TwccSeq};
use crate::util::Soonest;
mod acked_bitrate_estimator;
mod alr_detector;
pub(crate) mod api;
mod delay;
mod link_capacity_estimator;
mod loss_controller;
mod macros;
mod probe;
mod smoother;
mod time;
use acked_bitrate_estimator::AckedBitrateEstimator;
use alr_detector::AlrDetector;
use delay::DelayController;
use link_capacity_estimator::LinkCapacityEstimator;
use loss_controller::{LossController, LossControllerState};
use macros::log_loss;
use smoother::EstimateSmoother;
pub(crate) use macros::{log_pacer_media_debt, log_pacer_padding_debt};
pub(crate) use probe::{BandwidthLimitedCause, ProbeEstimator};
pub(crate) use probe::{ProbeClusterState, ProbeControl};
#[cfg(feature = "_internal_test_exports")]
pub use probe::ProbeClusterConfig;
#[cfg(not(feature = "_internal_test_exports"))]
pub(crate) use probe::ProbeClusterConfig;
const INITIAL_BITRATE_WINDOW: Duration = Duration::from_millis(500);
const BITRATE_WINDOW: Duration = Duration::from_millis(150);
const STARTUP_PHASE: Duration = Duration::from_secs(2);
pub struct Bwe {
bwe: SendSideBandwidthEstimator,
desired_bitrate: Bitrate,
smoother: EstimateSmoother,
}
impl Bwe {
pub fn new(initial: Bitrate) -> Self {
let send_side_bwe = SendSideBandwidthEstimator::new(initial);
Bwe {
bwe: send_side_bwe,
desired_bitrate: Bitrate::ZERO,
smoother: EstimateSmoother::new(),
}
}
pub fn handle_timeout(&mut self, now: Instant, do_probe: bool) -> Option<ProbeClusterConfig> {
let result = self.bwe.handle_timeout(self.desired_bitrate, do_probe, now);
if let Some(estimate) = self.bwe.last_estimate() {
self.smoother.record(now, estimate);
}
result
}
pub fn start_probe(&mut self, config: ProbeClusterConfig, now: Instant) -> bool {
self.bwe.start_probe(config, now)
}
pub fn end_probe(&mut self, now: Instant, cluster_id: TwccClusterId) {
self.bwe.end_probe(now, cluster_id);
}
pub fn reset(&mut self, init_bitrate: Bitrate) {
self.bwe.reset(init_bitrate);
}
pub fn update<'t>(
&mut self,
records: impl Iterator<Item = &'t crate::rtp_::TwccSendRecord>,
now: Instant,
) {
self.bwe.update(records, now);
}
pub fn poll_estimate(&mut self) -> Option<Bitrate> {
self.smoother.poll()
}
pub fn poll_timeout(&self) -> (Option<Instant>, Reason) {
self.bwe.poll_timeout()
}
pub fn last_estimate(&self) -> Option<Bitrate> {
self.bwe.last_estimate()
}
pub fn on_media_sent(&mut self, payload_size: DataSize, is_padding: bool, now: Instant) {
if !is_padding {
self.bwe.on_media_sent(payload_size, now);
}
}
pub fn is_overusing(&self) -> bool {
self.bwe.is_overusing()
}
pub fn set_desired_bitrate(&mut self, v: Bitrate) {
self.desired_bitrate = v;
}
}
struct SendSideBandwidthEstimator {
delay_controller: DelayController,
loss_controller: LossController,
acked_bitrate_estimator: AckedBitrateEstimator,
probe_control: ProbeControl,
probe_estimator: ProbeEstimator,
started_at: Option<Instant>,
alr_detector: AlrDetector,
link_capacity_estimator: LinkCapacityEstimator,
last_updated_estimate: Option<Bitrate>,
}
impl SendSideBandwidthEstimator {
pub fn new(initial_bitrate: Bitrate) -> Self {
let mut alr_detector = AlrDetector::new();
alr_detector.set_estimated_bitrate(initial_bitrate);
let mut loss_controller = LossController::new();
loss_controller.set_bandwidth_estimate(initial_bitrate);
Self {
delay_controller: DelayController::new(initial_bitrate),
loss_controller,
acked_bitrate_estimator: AckedBitrateEstimator::new(
INITIAL_BITRATE_WINDOW,
BITRATE_WINDOW,
),
probe_control: ProbeControl::new(),
probe_estimator: ProbeEstimator::new(),
started_at: None,
alr_detector,
link_capacity_estimator: LinkCapacityEstimator::new(),
last_updated_estimate: None,
}
}
pub fn is_overusing(&self) -> bool {
self.delay_controller.is_overusing()
}
pub fn on_media_sent(&mut self, bytes: DataSize, now: Instant) {
self.alr_detector.on_bytes_sent(bytes, now);
}
pub fn update<'t>(&mut self, records: impl Iterator<Item = &'t TwccSendRecord>, now: Instant) {
let _ = self.started_at.get_or_insert(now);
let send_records: Vec<_> = records.collect();
let mut latest_probe_result = None;
for (config, bitrate) in self.probe_estimator.update(send_records.iter().copied()) {
latest_probe_result = Some(bitrate);
if config.is_alr_probe() {
self.link_capacity_estimator.update_from_probe(bitrate, now);
}
}
let mut acked_packets = vec![];
let mut max_rtt = None;
let mut count = 0;
let mut lost = 0;
for record in send_records.iter() {
count += 1;
let Ok(acked_packet) = (*record).try_into() else {
lost += 1;
continue;
};
acked_packets.push(acked_packet);
max_rtt = max_rtt.max(record.rtt());
}
acked_packets.sort_by(AckedPacket::order_by_receive_time);
for acked_packet in acked_packets.iter() {
self.acked_bitrate_estimator
.update(acked_packet.remote_recv_time, acked_packet.size);
}
let acked_bitrate = self.acked_bitrate_estimator.current_estimate();
let probe_result = latest_probe_result;
let is_probe_result = probe_result.is_some();
let maybe_estimate =
self.delay_controller
.update(&acked_packets, acked_bitrate, probe_result, now);
let Some(delay_estimate) = maybe_estimate else {
return;
};
let loss = if count == 0 {
0.0
} else {
lost as f64 / count as f64
};
log_loss!(loss);
if in_startup_phase(self.started_at, now) && loss <= 0.001 {
self.loss_controller.set_bandwidth_estimate(delay_estimate);
return;
}
if is_probe_result {
self.loss_controller.set_bandwidth_estimate(delay_estimate);
}
if let Some(acked_bitrate) = acked_bitrate {
self.loss_controller.set_acknowledged_bitrate(acked_bitrate);
}
self.loss_controller
.update_bandwidth_estimate(&send_records, delay_estimate);
let loss_result = self.loss_controller.loss_based_result();
if let Some(loss_estimate) = loss_result.bandwidth_estimate {
if loss_estimate > delay_estimate {
self.loss_controller.set_bandwidth_estimate(delay_estimate);
}
}
self.propagate_estimate();
}
pub fn poll_timeout(&self) -> (Option<Instant>, Reason) {
let delay_timeout = Some(self.delay_controller.poll_timeout());
let probe_timeout = Some(self.probe_control.poll_timeout());
let probe_estimator_timeout = Some(self.probe_estimator.poll_timeout());
(delay_timeout, Reason::BweDelayControl)
.soonest((probe_timeout, Reason::BweProbeControl))
.soonest((probe_estimator_timeout, Reason::BweProbeEstimator))
}
pub fn handle_timeout(
&mut self,
desired_bitrate: Bitrate,
do_probe: bool,
now: Instant,
) -> Option<ProbeClusterConfig> {
self.delay_controller
.handle_timeout(self.acked_bitrate_estimator.current_estimate(), now);
self.probe_control.set_desired_bitrate(desired_bitrate);
let alr_start_time = self.alr_detector.alr_start_time();
if let Some(t) = alr_start_time {
self.probe_control.set_alr_start_time(t);
} else {
self.probe_control.set_alr_stop_time(now);
}
self.loss_controller.set_alr_start_time(alr_start_time);
let link_capacity = self.link_capacity_estimator.capacity_estimate(now);
self.loss_controller
.set_link_capacity_estimate(link_capacity);
self.probe_estimator.handle_timeout(now);
self.propagate_estimate();
if !do_probe {
self.probe_estimator.clear_probes();
}
self.probe_control.enable(do_probe);
self.probe_control.handle_timeout(now)
}
fn propagate_estimate(&mut self) {
let Some(estimate) = self.last_estimate() else {
return;
};
if self.last_updated_estimate == Some(estimate) {
return;
}
let cause = self.bandwidth_limited_cause();
self.probe_control.set_estimated_bitrate(estimate, cause);
self.alr_detector.set_estimated_bitrate(estimate);
self.last_updated_estimate = Some(estimate);
}
fn bandwidth_limited_cause(&self) -> BandwidthLimitedCause {
if self.delay_controller.is_overusing() {
return BandwidthLimitedCause::DelayBasedLimitedDelayIncreased;
}
match self.loss_controller.loss_based_result().state {
LossControllerState::DelayBased => BandwidthLimitedCause::DelayBasedLimited,
LossControllerState::Increasing => BandwidthLimitedCause::LossLimitedBweIncreasing,
LossControllerState::Decreasing => BandwidthLimitedCause::LossLimitedBwe,
}
}
pub fn last_estimate(&self) -> Option<Bitrate> {
let delay_estimate = self.delay_controller.last_estimate();
let loss_result = self.loss_controller.loss_based_result();
match loss_result.state {
LossControllerState::DelayBased => {
delay_estimate
}
LossControllerState::Decreasing | LossControllerState::Increasing => {
match (delay_estimate, loss_result.bandwidth_estimate) {
(Some(de), Some(le)) => Some(de.min(le)),
(None, le @ Some(_)) => le,
(de @ Some(_), None) => de,
(None, None) => None,
}
}
}
}
pub fn start_probe(&mut self, config: ProbeClusterConfig, now: Instant) -> bool {
self.probe_estimator.probe_start(config, now)
}
pub fn end_probe(&mut self, now: Instant, cluster_id: TwccClusterId) {
self.probe_estimator.end_probe(now, cluster_id);
}
pub fn reset(&mut self, init_bitrate: Bitrate) {
*self = Self::new(init_bitrate);
}
}
#[derive(Debug, Copy, Clone)]
pub struct AckedPacket {
seq_no: TwccSeq,
size: DataSize,
local_send_time: Instant,
remote_recv_time: Instant,
local_recv_time: Instant,
}
impl AckedPacket {
fn rtt(&self) -> Duration {
self.local_recv_time - self.local_send_time
}
fn order_by_receive_time(lhs: &Self, rhs: &Self) -> Ordering {
if lhs.remote_recv_time != rhs.remote_recv_time {
lhs.remote_recv_time.cmp(&rhs.remote_recv_time)
} else if lhs.local_send_time != rhs.local_send_time {
lhs.local_send_time.cmp(&rhs.local_send_time)
} else {
lhs.seq_no.cmp(&rhs.seq_no)
}
}
}
fn in_startup_phase(started_at: Option<Instant>, now: Instant) -> bool {
started_at
.map(|s| now.duration_since(s) <= STARTUP_PHASE)
.unwrap_or(false)
}
impl TryFrom<&TwccSendRecord> for AckedPacket {
type Error = ();
fn try_from(value: &TwccSendRecord) -> Result<Self, Self::Error> {
let Some(remote_recv_time) = value.remote_recv_time() else {
return Err(());
};
let Some(local_recv_time) = value.local_recv_time() else {
return Err(());
};
Ok(Self {
seq_no: value.seq(),
size: value.size().into(),
local_send_time: value.local_send_time(),
remote_recv_time,
local_recv_time,
})
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BandwidthUsage {
Overuse,
Normal,
Underuse,
}
impl fmt::Display for BandwidthUsage {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
BandwidthUsage::Overuse => write!(f, "overuse"),
BandwidthUsage::Normal => write!(f, "normal"),
BandwidthUsage::Underuse => write!(f, "underuse"),
}
}
}