use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
pub type Timestamp = u64;
pub fn now() -> Timestamp {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_micros() as Timestamp)
.unwrap_or(0)
}
pub fn to_duration(micros: Timestamp) -> Duration {
Duration::from_micros(micros)
}
pub fn from_duration(duration: Duration) -> Timestamp {
duration.as_micros() as Timestamp
}
#[derive(Debug, Clone)]
pub struct ClockSync {
offset: i64,
rtt: u64,
jitter: u64,
samples: u32,
last_sync: Instant,
rtt_history: Vec<u64>,
}
impl Default for ClockSync {
fn default() -> Self {
Self::new()
}
}
impl ClockSync {
pub fn new() -> Self {
Self {
offset: 0,
rtt: 0,
jitter: 0,
samples: 0,
last_sync: Instant::now(),
rtt_history: Vec::with_capacity(10),
}
}
pub fn process_sync(&mut self, t1: u64, t2: u64, t3: u64, t4: u64) {
let rtt = (t4 - t1) - (t3 - t2);
let offset = ((t2 as i64 - t1 as i64) + (t3 as i64 - t4 as i64)) / 2;
self.rtt_history.push(rtt);
if self.rtt_history.len() > 10 {
self.rtt_history.remove(0);
}
if self.rtt_history.len() >= 2 {
let mean: u64 = self.rtt_history.iter().sum::<u64>() / self.rtt_history.len() as u64;
let variance: u64 = self
.rtt_history
.iter()
.map(|&x| {
let diff = x as i64 - mean as i64;
(diff * diff) as u64
})
.sum::<u64>()
/ self.rtt_history.len() as u64;
self.jitter = (variance as f64).sqrt() as u64;
}
if self.samples == 0 {
self.offset = offset;
self.rtt = rtt;
} else {
let alpha = 0.3;
self.offset = ((1.0 - alpha) * self.offset as f64 + alpha * offset as f64) as i64;
self.rtt = ((1.0 - alpha) * self.rtt as f64 + alpha * rtt as f64) as u64;
}
self.samples += 1;
self.last_sync = Instant::now();
}
pub fn server_time(&self) -> Timestamp {
let local = now();
(local as i64 + self.offset) as Timestamp
}
pub fn to_server_time(&self, local: Timestamp) -> Timestamp {
(local as i64 + self.offset) as Timestamp
}
pub fn to_local_time(&self, server: Timestamp) -> Timestamp {
(server as i64 - self.offset) as Timestamp
}
pub fn offset(&self) -> i64 {
self.offset
}
pub fn rtt(&self) -> u64 {
self.rtt
}
pub fn jitter(&self) -> u64 {
self.jitter
}
pub fn needs_sync(&self, interval_secs: u64) -> bool {
self.samples == 0 || self.last_sync.elapsed().as_secs() >= interval_secs
}
pub fn quality(&self) -> f64 {
if self.samples == 0 {
return 0.0;
}
let rtt_score = (10000.0 - self.rtt.min(10000) as f64) / 10000.0;
let jitter_score = (1000.0 - self.jitter.min(1000) as f64) / 1000.0;
let sample_score = (self.samples.min(10) as f64) / 10.0;
(rtt_score * 0.4 + jitter_score * 0.4 + sample_score * 0.2).clamp(0.0, 1.0)
}
}
#[derive(Debug, Clone)]
pub struct SessionTime {
start: Instant,
start_unix: Timestamp,
}
impl Default for SessionTime {
fn default() -> Self {
Self::new()
}
}
impl SessionTime {
pub fn new() -> Self {
Self {
start: Instant::now(),
start_unix: now(),
}
}
pub fn elapsed(&self) -> Timestamp {
self.start.elapsed().as_micros() as Timestamp
}
pub fn start_time(&self) -> Timestamp {
self.start_unix
}
pub fn to_unix(&self, session_time: Timestamp) -> Timestamp {
self.start_unix + session_time
}
pub fn from_unix(&self, unix_time: Timestamp) -> Timestamp {
unix_time.saturating_sub(self.start_unix)
}
}
#[derive(Debug)]
pub struct JitterBuffer<T> {
buffer: Vec<(Timestamp, T)>,
capacity: usize,
window_us: u64,
}
impl<T: Clone> JitterBuffer<T> {
pub fn new(capacity: usize, window_ms: u64) -> Self {
Self {
buffer: Vec::with_capacity(capacity),
capacity,
window_us: window_ms * 1000,
}
}
pub fn push(&mut self, timestamp: Timestamp, value: T) {
let cutoff = now().saturating_sub(self.window_us);
self.buffer.retain(|(ts, _)| *ts > cutoff);
let pos = self.buffer.partition_point(|(ts, _)| *ts < timestamp);
if self.buffer.len() < self.capacity {
self.buffer.insert(pos, (timestamp, value));
} else if pos > 0 {
self.buffer.remove(0);
let new_pos = pos.saturating_sub(1);
self.buffer.insert(new_pos, (timestamp, value));
}
}
pub fn pop(&mut self, playback_time: Timestamp) -> Option<T> {
if let Some((ts, _)) = self.buffer.first() {
if *ts <= playback_time {
return Some(self.buffer.remove(0).1);
}
}
None
}
pub fn drain_ready(&mut self, playback_time: Timestamp) -> Vec<T> {
let mut ready = Vec::new();
while let Some((ts, _)) = self.buffer.first() {
if *ts <= playback_time {
ready.push(self.buffer.remove(0).1);
} else {
break;
}
}
ready
}
pub fn len(&self) -> usize {
self.buffer.len()
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn depth_us(&self) -> u64 {
match (self.buffer.first(), self.buffer.last()) {
(Some(first), Some(last)) if self.buffer.len() >= 2 => last.0 - first.0,
_ => 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_clock_sync() {
let mut sync = ClockSync::new();
let t1 = 1000000u64;
let t2 = 1000050u64; let t3 = 1000051u64;
let t4 = 1000100u64;
sync.process_sync(t1, t2, t3, t4);
assert!(sync.samples > 0);
assert!(sync.rtt > 0);
}
#[test]
fn test_session_time() {
let session = SessionTime::new();
std::thread::sleep(std::time::Duration::from_millis(10));
let elapsed = session.elapsed();
assert!(elapsed >= 10000); }
#[test]
fn test_jitter_buffer() {
let mut buffer: JitterBuffer<f64> = JitterBuffer::new(10, 100);
let base = now();
buffer.push(base + 10000, 0.1);
buffer.push(base + 20000, 0.2);
buffer.push(base + 5000, 0.05);
assert_eq!(buffer.len(), 3);
let first = buffer.pop(base + 10000);
assert_eq!(first, Some(0.05));
}
}