use std::collections::VecDeque;
use std::time::{Duration, Instant};
use super::super::macros::{log_bitrate_estimate, log_delay_variation};
use super::super::{AckedPacket, BandwidthUsage};
use super::arrival_group::ArrivalGroupAccumulator;
use super::rate_control::RateControl;
use super::trendline::TrendlineEstimator;
use crate::rtp_::Bitrate;
use crate::util::{MovingAverage, already_happened};
const MAX_RTT_HISTORY_WINDOW: usize = 32;
const UPDATE_INTERVAL: Duration = Duration::from_millis(25);
const MAX_TWCC_GAP: Duration = Duration::from_millis(500);
const RTT_SMOOTHING_FACTOR: f64 = 0.125;
pub struct DelayController {
arrival_group_accumulator: ArrivalGroupAccumulator,
trendline_estimator: TrendlineEstimator,
rate_control: RateControl,
last_estimate: Option<Bitrate>,
smoothed_rtt: MovingAverage,
max_rtt_history: VecDeque<Duration>,
next_timeout: Instant,
last_twcc_report: Instant,
}
impl DelayController {
pub fn new(initial_bitrate: Bitrate) -> Self {
Self {
arrival_group_accumulator: ArrivalGroupAccumulator::default(),
trendline_estimator: TrendlineEstimator::new(20),
rate_control: RateControl::new(initial_bitrate, Bitrate::kbps(40), Bitrate::gbps(10)),
last_estimate: Some(initial_bitrate),
smoothed_rtt: MovingAverage::new(RTT_SMOOTHING_FACTOR),
max_rtt_history: VecDeque::default(),
next_timeout: already_happened(),
last_twcc_report: already_happened(),
}
}
pub fn update(
&mut self,
acked: &[AckedPacket],
acked_bitrate: Option<Bitrate>,
probe_bitrate: Option<Bitrate>,
now: Instant,
) -> Option<Bitrate> {
let mut max_rtt = None;
for acked_packet in acked {
max_rtt = max_rtt.max(Some(acked_packet.rtt()));
if let Some(delay_variation) = self
.arrival_group_accumulator
.accumulate_packet(acked_packet)
{
log_delay_variation!(delay_variation.arrival_delta);
self.trendline_estimator
.add_delay_observation(delay_variation, delay_variation.last_remote_recv_time);
}
}
if let Some(rtt) = max_rtt {
self.update_rtt(rtt);
}
let new_hypothesis = self.trendline_estimator.hypothesis();
self.update_estimate(
new_hypothesis,
acked_bitrate,
probe_bitrate,
self.get_smoothed_rtt(),
now,
);
self.last_twcc_report = now;
self.last_estimate
}
pub fn poll_timeout(&self) -> Instant {
self.next_timeout
}
pub fn handle_timeout(&mut self, acked_bitrate: Option<Bitrate>, now: Instant) {
if !self.trendline_hypothesis_valid(now) {
let next_timeout_in = self
.get_smoothed_rtt()
.unwrap_or(MAX_TWCC_GAP)
.min(UPDATE_INTERVAL);
self.next_timeout = now + next_timeout_in;
return;
}
self.update_estimate(
self.trendline_estimator.hypothesis(),
acked_bitrate,
None,
self.get_smoothed_rtt(),
now,
);
}
pub fn last_estimate(&self) -> Option<Bitrate> {
self.last_estimate
}
pub fn is_overusing(&self) -> bool {
self.trendline_estimator.hypothesis() == BandwidthUsage::Overuse
}
fn update_rtt(&mut self, rtt: Duration) {
while self.max_rtt_history.len() >= MAX_RTT_HISTORY_WINDOW {
self.max_rtt_history.pop_front();
}
self.max_rtt_history.push_back(rtt);
self.smoothed_rtt.update(rtt.as_secs_f64());
}
fn get_smoothed_rtt(&self) -> Option<Duration> {
if let Some(avg_secs) = self.smoothed_rtt.get() {
return Some(Duration::from_secs_f64(avg_secs));
}
if self.max_rtt_history.is_empty() {
return None;
}
let sum = self
.max_rtt_history
.iter()
.fold(Duration::ZERO, |acc, rtt| acc + *rtt);
Some(sum / self.max_rtt_history.len() as u32)
}
fn update_estimate(
&mut self,
hypothesis: BandwidthUsage,
observed_bitrate: Option<Bitrate>,
probe_bitrate: Option<Bitrate>,
mean_max_rtt: Option<Duration>,
now: Instant,
) {
if let Some(probe_rate) = probe_bitrate {
self.rate_control.set_probe_result(probe_rate, now);
let estimated_rate = self.rate_control.estimated_bitrate();
log_bitrate_estimate!(estimated_rate.as_f64());
self.last_estimate = Some(estimated_rate);
} else if let Some(observed_bitrate) = observed_bitrate {
self.rate_control
.update(hypothesis.into(), observed_bitrate, mean_max_rtt, now);
let estimated_rate = self.rate_control.estimated_bitrate();
log_bitrate_estimate!(estimated_rate.as_f64());
self.last_estimate = Some(estimated_rate);
}
self.next_timeout = now + UPDATE_INTERVAL;
}
fn trendline_hypothesis_valid(&self, now: Instant) -> bool {
now.duration_since(self.last_twcc_report)
<= self
.get_smoothed_rtt()
.map(|rtt| rtt * 2)
.unwrap_or(MAX_TWCC_GAP)
.min(UPDATE_INTERVAL * 2)
}
}