use super::subscriber::PerSubscriber;
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct TwccSample {
pub seq: u64,
pub arrival: Option<Instant>,
}
pub struct TwccFeedback {
pub samples: Vec<TwccSample>,
}
pub fn ingest_twcc(sub: &mut PerSubscriber, feedback: &TwccFeedback, now: Instant) {
for sample in &feedback.samples {
sub.loss.record(sample.arrival.is_some());
if let Some(arrival) = sample.arrival {
if let Some(send_t) = sub.send_times.get(&sample.seq).copied() {
if let (Some(prev_arr), Some(prev_send)) =
(sub.last_arrival, sub.last_send_for_received)
{
let recv_delta_us = arrival.duration_since(prev_arr).as_micros() as f64;
let send_delta_us = send_t.duration_since(prev_send).as_micros() as f64;
let gradient_us = recv_delta_us - send_delta_us;
sub.delay.update_kalman(gradient_us);
}
sub.last_arrival = Some(arrival);
sub.last_send_for_received = Some(send_t);
let one_way = arrival.duration_since(send_t);
sub.rtt = Some(one_way * 2);
}
}
}
sub.delay.apply_rate_control(now);
sub.loss.apply_rate_control(now);
}
#[cfg(test)]
mod tests {
use super::super::subscriber::PerSubscriber;
use super::*;
use std::time::{Duration, Instant};
fn make_send_times(base: Instant, seqs: &[u64], interval_ms: u64) -> PerSubscriber {
let mut sub = PerSubscriber::new();
for (i, &seq) in seqs.iter().enumerate() {
sub.send_times
.insert(seq, base + Duration::from_millis(i as u64 * interval_ms));
}
sub
}
#[test]
fn positive_gradient_feeds_kalman() {
let base = Instant::now();
let seqs = [1u64, 2, 3, 4, 5];
let mut sub = make_send_times(base, &seqs, 10);
let feedback = TwccFeedback {
samples: seqs
.iter()
.enumerate()
.map(|(i, &seq)| TwccSample {
seq,
arrival: Some(base + Duration::from_millis(15 + i as u64 * 15)),
})
.collect(),
};
let before = sub.delay.filtered_gradient_us();
ingest_twcc(&mut sub, &feedback, base + Duration::from_millis(90));
let after = sub.delay.filtered_gradient_us();
assert!(
after > before || after > 0.0,
"expected Kalman to pick up positive gradient; before={before}, after={after}"
);
}
#[test]
fn loss_recorded_for_missing_packets() {
let base = Instant::now();
let mut sub = PerSubscriber::new();
sub.send_times.insert(1u64, base);
sub.send_times
.insert(2u64, base + Duration::from_millis(10));
let feedback = TwccFeedback {
samples: vec![
TwccSample {
seq: 1,
arrival: Some(base + Duration::from_millis(15)),
},
TwccSample {
seq: 2,
arrival: None,
}, ],
};
ingest_twcc(&mut sub, &feedback, base + Duration::from_millis(30));
assert!(
sub.loss.loss_fraction() > 0.0,
"lost packet should be recorded"
);
}
#[test]
fn zero_gradient_on_uniform_spacing() {
let base = Instant::now();
let seqs = [10u64, 11, 12, 13, 14];
let mut sub = make_send_times(base, &seqs, 10);
let feedback = TwccFeedback {
samples: seqs
.iter()
.enumerate()
.map(|(i, &seq)| TwccSample {
seq,
arrival: Some(base + Duration::from_millis(20 + i as u64 * 10)),
})
.collect(),
};
ingest_twcc(&mut sub, &feedback, base + Duration::from_millis(70));
let gradient = sub.delay.filtered_gradient_us();
assert!(
gradient.abs() < 5_000.0,
"uniform spacing should give ~0 gradient, got {gradient}"
);
}
#[test]
fn loss_in_middle_does_not_corrupt_gradient() {
let base = Instant::now();
let seqs = [1u64, 2, 3, 4, 5];
let mut sub = make_send_times(base, &seqs, 10);
let feedback = TwccFeedback {
samples: vec![
TwccSample {
seq: 1,
arrival: Some(base + Duration::from_millis(5)),
},
TwccSample {
seq: 2,
arrival: Some(base + Duration::from_millis(15)),
},
TwccSample {
seq: 3,
arrival: None,
}, TwccSample {
seq: 4,
arrival: Some(base + Duration::from_millis(35)),
},
TwccSample {
seq: 5,
arrival: Some(base + Duration::from_millis(45)),
},
],
};
ingest_twcc(&mut sub, &feedback, base + Duration::from_millis(60));
let gradient = sub.delay.filtered_gradient_us();
assert!(
gradient.abs() < 5_000.0,
"lost packet should not corrupt gradient; got {gradient} us"
);
}
}