1use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6
7pub type Timestamp = u64;
9
10pub fn now() -> Timestamp {
12 SystemTime::now()
13 .duration_since(UNIX_EPOCH)
14 .unwrap()
15 .as_micros() as Timestamp
16}
17
18pub fn to_duration(micros: Timestamp) -> Duration {
20 Duration::from_micros(micros)
21}
22
23pub fn from_duration(duration: Duration) -> Timestamp {
25 duration.as_micros() as Timestamp
26}
27
28#[derive(Debug, Clone)]
30pub struct ClockSync {
31 offset: i64,
33 rtt: u64,
35 jitter: u64,
37 samples: u32,
39 last_sync: Instant,
41 rtt_history: Vec<u64>,
43}
44
45impl Default for ClockSync {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl ClockSync {
52 pub fn new() -> Self {
54 Self {
55 offset: 0,
56 rtt: 0,
57 jitter: 0,
58 samples: 0,
59 last_sync: Instant::now(),
60 rtt_history: Vec::with_capacity(10),
61 }
62 }
63
64 pub fn process_sync(&mut self, t1: u64, t2: u64, t3: u64, t4: u64) {
72 let rtt = (t4 - t1) - (t3 - t2);
74
75 let offset = ((t2 as i64 - t1 as i64) + (t3 as i64 - t4 as i64)) / 2;
77
78 self.rtt_history.push(rtt);
80 if self.rtt_history.len() > 10 {
81 self.rtt_history.remove(0);
82 }
83
84 if self.rtt_history.len() >= 2 {
86 let mean: u64 = self.rtt_history.iter().sum::<u64>() / self.rtt_history.len() as u64;
87 let variance: u64 = self.rtt_history
88 .iter()
89 .map(|&x| {
90 let diff = x as i64 - mean as i64;
91 (diff * diff) as u64
92 })
93 .sum::<u64>() / self.rtt_history.len() as u64;
94 self.jitter = (variance as f64).sqrt() as u64;
95 }
96
97 if self.samples == 0 {
99 self.offset = offset;
100 self.rtt = rtt;
101 } else {
102 let alpha = 0.3;
104 self.offset = ((1.0 - alpha) * self.offset as f64 + alpha * offset as f64) as i64;
105 self.rtt = ((1.0 - alpha) * self.rtt as f64 + alpha * rtt as f64) as u64;
106 }
107
108 self.samples += 1;
109 self.last_sync = Instant::now();
110 }
111
112 pub fn server_time(&self) -> Timestamp {
114 let local = now();
115 (local as i64 + self.offset) as Timestamp
116 }
117
118 pub fn to_server_time(&self, local: Timestamp) -> Timestamp {
120 (local as i64 + self.offset) as Timestamp
121 }
122
123 pub fn to_local_time(&self, server: Timestamp) -> Timestamp {
125 (server as i64 - self.offset) as Timestamp
126 }
127
128 pub fn offset(&self) -> i64 {
130 self.offset
131 }
132
133 pub fn rtt(&self) -> u64 {
135 self.rtt
136 }
137
138 pub fn jitter(&self) -> u64 {
140 self.jitter
141 }
142
143 pub fn needs_sync(&self, interval_secs: u64) -> bool {
145 self.samples == 0 || self.last_sync.elapsed().as_secs() >= interval_secs
146 }
147
148 pub fn quality(&self) -> f64 {
150 if self.samples == 0 {
151 return 0.0;
152 }
153
154 let rtt_score = (10000.0 - self.rtt.min(10000) as f64) / 10000.0;
156 let jitter_score = (1000.0 - self.jitter.min(1000) as f64) / 1000.0;
157 let sample_score = (self.samples.min(10) as f64) / 10.0;
158
159 (rtt_score * 0.4 + jitter_score * 0.4 + sample_score * 0.2).clamp(0.0, 1.0)
160 }
161}
162
163#[derive(Debug, Clone)]
165pub struct SessionTime {
166 start: Instant,
167 start_unix: Timestamp,
168}
169
170impl Default for SessionTime {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176impl SessionTime {
177 pub fn new() -> Self {
179 Self {
180 start: Instant::now(),
181 start_unix: now(),
182 }
183 }
184
185 pub fn elapsed(&self) -> Timestamp {
187 self.start.elapsed().as_micros() as Timestamp
188 }
189
190 pub fn start_time(&self) -> Timestamp {
192 self.start_unix
193 }
194
195 pub fn to_unix(&self, session_time: Timestamp) -> Timestamp {
197 self.start_unix + session_time
198 }
199
200 pub fn from_unix(&self, unix_time: Timestamp) -> Timestamp {
202 unix_time.saturating_sub(self.start_unix)
203 }
204}
205
206#[derive(Debug)]
208pub struct JitterBuffer<T> {
209 buffer: Vec<(Timestamp, T)>,
210 capacity: usize,
211 window_us: u64,
212}
213
214impl<T: Clone> JitterBuffer<T> {
215 pub fn new(capacity: usize, window_ms: u64) -> Self {
221 Self {
222 buffer: Vec::with_capacity(capacity),
223 capacity,
224 window_us: window_ms * 1000,
225 }
226 }
227
228 pub fn push(&mut self, timestamp: Timestamp, value: T) {
230 let cutoff = now().saturating_sub(self.window_us);
232 self.buffer.retain(|(ts, _)| *ts > cutoff);
233
234 let pos = self.buffer.partition_point(|(ts, _)| *ts < timestamp);
236 if self.buffer.len() < self.capacity {
237 self.buffer.insert(pos, (timestamp, value));
238 } else if pos > 0 {
239 self.buffer.remove(0);
241 let new_pos = pos.saturating_sub(1);
242 self.buffer.insert(new_pos, (timestamp, value));
243 }
244 }
245
246 pub fn pop(&mut self, playback_time: Timestamp) -> Option<T> {
248 if let Some((ts, _)) = self.buffer.first() {
249 if *ts <= playback_time {
250 return Some(self.buffer.remove(0).1);
251 }
252 }
253 None
254 }
255
256 pub fn drain_ready(&mut self, playback_time: Timestamp) -> Vec<T> {
258 let mut ready = Vec::new();
259 while let Some((ts, _)) = self.buffer.first() {
260 if *ts <= playback_time {
261 ready.push(self.buffer.remove(0).1);
262 } else {
263 break;
264 }
265 }
266 ready
267 }
268
269 pub fn len(&self) -> usize {
271 self.buffer.len()
272 }
273
274 pub fn is_empty(&self) -> bool {
276 self.buffer.is_empty()
277 }
278
279 pub fn depth_us(&self) -> u64 {
281 if self.buffer.len() < 2 {
282 0
283 } else {
284 self.buffer.last().unwrap().0 - self.buffer.first().unwrap().0
285 }
286 }
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292
293 #[test]
294 fn test_clock_sync() {
295 let mut sync = ClockSync::new();
296
297 let t1 = 1000000u64;
300 let t2 = 1000050u64; let t3 = 1000051u64;
302 let t4 = 1000100u64;
303
304 sync.process_sync(t1, t2, t3, t4);
305
306 assert!(sync.samples > 0);
307 assert!(sync.rtt > 0);
308 }
309
310 #[test]
311 fn test_session_time() {
312 let session = SessionTime::new();
313
314 std::thread::sleep(std::time::Duration::from_millis(10));
315
316 let elapsed = session.elapsed();
317 assert!(elapsed >= 10000); }
319
320 #[test]
321 fn test_jitter_buffer() {
322 let mut buffer: JitterBuffer<f64> = JitterBuffer::new(10, 100);
323
324 let base = now();
325 buffer.push(base + 10000, 0.1);
326 buffer.push(base + 20000, 0.2);
327 buffer.push(base + 5000, 0.05); assert_eq!(buffer.len(), 3);
330
331 let first = buffer.pop(base + 10000);
333 assert_eq!(first, Some(0.05));
334 }
335}