use core::time::Duration;
use tracing::{error, trace};
const RTT_EWMA_ALPHA: f64 = 1.0 / 12.0;
const RTT_DEV_EWMA_BETA: f64 = 1.0 / 6.0;
const OUTLIER_STDDEV_FACTOR: f64 = 3.0;
const MAX_RTT_RELATIVE_INCREASE_FACTOR: f64 = 3.0;
const MAX_RTT_ABSOLUTE_INCREASE_SECS: f64 = 0.5;
#[derive(Debug, Clone, Default, PartialEq)]
pub struct FinalStats {
pub rtt: Duration,
pub jitter: Duration,
}
#[derive(Debug, Default)]
pub struct RttEstimatorEwma {
smoothed_rtt: Option<Duration>,
rtt_abs_deviation: Option<Duration>,
pub final_stats: FinalStats,
samples_processed: u64,
}
impl RttEstimatorEwma {
pub fn new() -> Self {
RttEstimatorEwma {
smoothed_rtt: None,
rtt_abs_deviation: None,
final_stats: FinalStats::default(),
samples_processed: 0,
}
}
pub fn update_with_new_sample(&mut self, new_rtt_sample: Duration) {
let mut rtt_sample_secs = new_rtt_sample.as_secs_f64();
if rtt_sample_secs < 0.0 {
error!(
"Received negative RTT sample, ignoring: {:?}",
new_rtt_sample
);
return;
}
self.samples_processed += 1;
let (prev_srtt_secs, prev_rtt_abs_dev_secs) = (
self.smoothed_rtt.map(|d| d.as_secs_f64()),
self.rtt_abs_deviation.map(|d| d.as_secs_f64()),
);
if self.samples_processed > 2
&& let Some(prev_srtt_secs) = prev_srtt_secs
&& let Some(prev_rtt_abs_dev_secs) = prev_rtt_abs_dev_secs
{
let dev_based_upper_bound =
prev_srtt_secs + OUTLIER_STDDEV_FACTOR * prev_rtt_abs_dev_secs;
let relative_upper_bound = prev_srtt_secs * MAX_RTT_RELATIVE_INCREASE_FACTOR;
let absolute_increase_upper_bound = prev_srtt_secs + MAX_RTT_ABSOLUTE_INCREASE_SECS;
let mut clamp_upper_bound = dev_based_upper_bound
.min(relative_upper_bound)
.min(absolute_increase_upper_bound);
clamp_upper_bound = clamp_upper_bound.max(prev_srtt_secs * 1.2);
if rtt_sample_secs > clamp_upper_bound {
trace!(
original_sample_ms = rtt_sample_secs * 1000.0,
clamped_to_ms = clamp_upper_bound * 1000.0,
prev_srtt_ms = prev_srtt_secs * 1000.0,
prev_dev_ms = prev_rtt_abs_dev_secs * 1000.0,
"RTT sample clamped as outlier."
);
rtt_sample_secs = clamp_upper_bound;
}
}
let (current_srtt_secs, current_rtt_abs_dev_secs) =
match (self.smoothed_rtt, self.rtt_abs_deviation) {
(Some(prev_srtt_duration), Some(prev_rtt_abs_dev_duration)) => {
let prev_srtt_secs = prev_srtt_duration.as_secs_f64();
let prev_rtt_abs_dev_secs = prev_rtt_abs_dev_duration.as_secs_f64();
let rtt_error_secs = (rtt_sample_secs - prev_srtt_secs).abs();
let updated_srtt_secs =
(1.0 - RTT_EWMA_ALPHA) * prev_srtt_secs + RTT_EWMA_ALPHA * rtt_sample_secs;
let updated_rtt_abs_dev_secs = (1.0 - RTT_DEV_EWMA_BETA)
* prev_rtt_abs_dev_secs
+ RTT_DEV_EWMA_BETA * rtt_error_secs;
(updated_srtt_secs, updated_rtt_abs_dev_secs)
}
_ => {
let initial_srtt_secs = rtt_sample_secs;
let initial_rtt_abs_dev_secs = rtt_sample_secs / 2.0;
(initial_srtt_secs, initial_rtt_abs_dev_secs)
}
};
self.smoothed_rtt = Some(Duration::from_secs_f64(current_srtt_secs.max(0.0)));
self.rtt_abs_deviation = Some(Duration::from_secs_f64(current_rtt_abs_dev_secs.max(0.0)));
let final_rtt = self.smoothed_rtt.unwrap();
let rtt_deviation_for_jitter_secs = self.rtt_abs_deviation.unwrap().as_secs_f64();
let final_jitter = Duration::from_secs_f64((rtt_deviation_for_jitter_secs / 2.0).max(0.0));
self.final_stats = FinalStats {
rtt: final_rtt,
jitter: final_jitter,
};
trace!(
rtt = ?self.final_stats.rtt,
jitter = ?self.final_stats.jitter,
new_sample_ms = rtt_sample_secs * 1000.0,
"RTT stats updated!"
);
}
pub fn get_current_stats(&self) -> &FinalStats {
&self.final_stats
}
pub fn reset(&mut self) {
self.smoothed_rtt = None;
self.rtt_abs_deviation = None;
self.final_stats = FinalStats::default();
self.samples_processed = 0;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ewma_rtt_estimator_initialization() {
let estimator = RttEstimatorEwma::new();
assert_eq!(estimator.get_current_stats().rtt, Duration::ZERO);
assert_eq!(estimator.get_current_stats().jitter, Duration::ZERO);
assert!(estimator.smoothed_rtt.is_none());
assert!(estimator.rtt_abs_deviation.is_none());
}
#[test]
fn test_ewma_first_sample() {
let mut estimator = RttEstimatorEwma::new();
let sample1 = Duration::from_millis(100);
estimator.update_with_new_sample(sample1);
let stats = estimator.get_current_stats();
assert_eq!(stats.rtt, Duration::from_millis(100)); assert_eq!(stats.jitter, Duration::from_millis(25));
assert_eq!(estimator.smoothed_rtt, Some(Duration::from_millis(100)));
assert_eq!(estimator.rtt_abs_deviation, Some(Duration::from_millis(50)));
}
#[test]
#[ignore = "Broken on main"]
fn test_ewma_multiple_samples_stable() {
let mut estimator = RttEstimatorEwma::new();
estimator.update_with_new_sample(Duration::from_millis(100)); estimator.update_with_new_sample(Duration::from_millis(100)); estimator.update_with_new_sample(Duration::from_millis(100));
let stats1 = estimator.get_current_stats().clone(); assert_eq!(stats1.rtt, Duration::from_millis(100));
assert_eq!(stats1.jitter, Duration::from_secs_f64(0.0375 / 2.0));
estimator.update_with_new_sample(Duration::from_millis(100));
let stats2 = estimator.get_current_stats();
assert_eq!(stats2.rtt, Duration::from_millis(100));
assert_eq!(stats2.jitter, Duration::from_secs_f64(0.028125 / 2.0)); }
}