use std::time::Instant;
use super::subscriber::PerSubscriber;
#[derive(Debug, Clone)]
pub struct TwccSample {
pub seq: u64,
pub arrival: Option<Instant>,
}
pub struct TwccFeedback {
pub samples: Vec<TwccSample>,
}
pub fn ingest_twcc(sub: &mut PerSubscriber, feedback: &TwccFeedback, now: Instant) {
for sample in &feedback.samples {
sub.loss.record(sample.arrival.is_some());
if let Some(arrival) = sample.arrival {
if let Some(send_t) = sub.send_times.get(&sample.seq).copied() {
if let (Some(prev_arr), Some(prev_send)) =
(sub.last_arrival, sub.last_send_for_received)
{
let recv_delta_us = arrival
.duration_since(prev_arr)
.as_micros() as f64;
let send_delta_us = send_t
.duration_since(prev_send)
.as_micros() as f64;
let gradient_us = recv_delta_us - send_delta_us;
sub.delay.update_kalman(gradient_us);
}
sub.last_arrival = Some(arrival);
sub.last_send_for_received = Some(send_t);
let one_way = arrival.duration_since(send_t);
sub.rtt = Some(one_way * 2);
}
}
}
sub.delay.apply_rate_control(now);
sub.loss.apply_rate_control(now);
}
#[cfg(test)]
mod tests {
use super::*;
use super::super::subscriber::PerSubscriber;
use std::time::{Duration, Instant};
fn make_send_times(base: Instant, seqs: &[u64], interval_ms: u64) -> PerSubscriber {
let mut sub = PerSubscriber::new();
for (i, &seq) in seqs.iter().enumerate() {
sub.send_times.insert(seq, base + Duration::from_millis(i as u64 * interval_ms));
}
sub
}
#[test]
fn positive_gradient_feeds_kalman() {
let base = Instant::now();
let seqs = [1u64, 2, 3, 4, 5];
let mut sub = make_send_times(base, &seqs, 10);
let feedback = TwccFeedback {
samples: seqs.iter().enumerate().map(|(i, &seq)| TwccSample {
seq,
arrival: Some(base + Duration::from_millis(15 + i as u64 * 15)),
}).collect(),
};
let before = sub.delay.filtered_gradient_us();
ingest_twcc(&mut sub, &feedback, base + Duration::from_millis(90));
let after = sub.delay.filtered_gradient_us();
assert!(after > before || after > 0.0,
"expected Kalman to pick up positive gradient; before={before}, after={after}");
}
#[test]
fn loss_recorded_for_missing_packets() {
let base = Instant::now();
let mut sub = PerSubscriber::new();
sub.send_times.insert(1u64, base);
sub.send_times.insert(2u64, base + Duration::from_millis(10));
let feedback = TwccFeedback {
samples: vec![
TwccSample { seq: 1, arrival: Some(base + Duration::from_millis(15)) },
TwccSample { seq: 2, arrival: None }, ],
};
ingest_twcc(&mut sub, &feedback, base + Duration::from_millis(30));
assert!(sub.loss.loss_fraction() > 0.0, "lost packet should be recorded");
}
#[test]
fn zero_gradient_on_uniform_spacing() {
let base = Instant::now();
let seqs = [10u64, 11, 12, 13, 14];
let mut sub = make_send_times(base, &seqs, 10);
let feedback = TwccFeedback {
samples: seqs.iter().enumerate().map(|(i, &seq)| TwccSample {
seq,
arrival: Some(base + Duration::from_millis(20 + i as u64 * 10)),
}).collect(),
};
ingest_twcc(&mut sub, &feedback, base + Duration::from_millis(70));
let gradient = sub.delay.filtered_gradient_us();
assert!(gradient.abs() < 5_000.0,
"uniform spacing should give ~0 gradient, got {gradient}");
}
#[test]
fn loss_in_middle_does_not_corrupt_gradient() {
let base = Instant::now();
let seqs = [1u64, 2, 3, 4, 5];
let mut sub = make_send_times(base, &seqs, 10);
let feedback = TwccFeedback {
samples: vec![
TwccSample { seq: 1, arrival: Some(base + Duration::from_millis(5)) },
TwccSample { seq: 2, arrival: Some(base + Duration::from_millis(15)) },
TwccSample { seq: 3, arrival: None }, TwccSample { seq: 4, arrival: Some(base + Duration::from_millis(35)) },
TwccSample { seq: 5, arrival: Some(base + Duration::from_millis(45)) },
],
};
ingest_twcc(&mut sub, &feedback, base + Duration::from_millis(60));
let gradient = sub.delay.filtered_gradient_us();
assert!(gradient.abs() < 5_000.0,
"lost packet should not corrupt gradient; got {gradient} us");
}
}