1use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6
7pub type Timestamp = u64;
9
10pub fn now() -> Timestamp {
14 SystemTime::now()
15 .duration_since(UNIX_EPOCH)
16 .map(|d| d.as_micros() as Timestamp)
17 .unwrap_or(0)
18}
19
20pub fn to_duration(micros: Timestamp) -> Duration {
22 Duration::from_micros(micros)
23}
24
25pub fn from_duration(duration: Duration) -> Timestamp {
27 duration.as_micros() as Timestamp
28}
29
30#[derive(Debug, Clone)]
32pub struct ClockSync {
33 offset: i64,
35 rtt: u64,
37 jitter: u64,
39 samples: u32,
41 last_sync: Instant,
43 rtt_history: Vec<u64>,
45}
46
47impl Default for ClockSync {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl ClockSync {
54 pub fn new() -> Self {
56 Self {
57 offset: 0,
58 rtt: 0,
59 jitter: 0,
60 samples: 0,
61 last_sync: Instant::now(),
62 rtt_history: Vec::with_capacity(10),
63 }
64 }
65
66 pub fn process_sync(&mut self, t1: u64, t2: u64, t3: u64, t4: u64) {
74 let rtt = (t4 - t1) - (t3 - t2);
76
77 let offset = ((t2 as i64 - t1 as i64) + (t3 as i64 - t4 as i64)) / 2;
79
80 self.rtt_history.push(rtt);
82 if self.rtt_history.len() > 10 {
83 self.rtt_history.remove(0);
84 }
85
86 if self.rtt_history.len() >= 2 {
88 let mean: u64 = self.rtt_history.iter().sum::<u64>() / self.rtt_history.len() as u64;
89 let variance: u64 = self
90 .rtt_history
91 .iter()
92 .map(|&x| {
93 let diff = x as i64 - mean as i64;
94 (diff * diff) as u64
95 })
96 .sum::<u64>()
97 / self.rtt_history.len() as u64;
98 self.jitter = (variance as f64).sqrt() as u64;
99 }
100
101 if self.samples == 0 {
103 self.offset = offset;
104 self.rtt = rtt;
105 } else {
106 let alpha = 0.3;
108 self.offset = ((1.0 - alpha) * self.offset as f64 + alpha * offset as f64) as i64;
109 self.rtt = ((1.0 - alpha) * self.rtt as f64 + alpha * rtt as f64) as u64;
110 }
111
112 self.samples += 1;
113 self.last_sync = Instant::now();
114 }
115
116 pub fn server_time(&self) -> Timestamp {
118 let local = now();
119 (local as i64 + self.offset) as Timestamp
120 }
121
122 pub fn to_server_time(&self, local: Timestamp) -> Timestamp {
124 (local as i64 + self.offset) as Timestamp
125 }
126
127 pub fn to_local_time(&self, server: Timestamp) -> Timestamp {
129 (server as i64 - self.offset) as Timestamp
130 }
131
132 pub fn offset(&self) -> i64 {
134 self.offset
135 }
136
137 pub fn rtt(&self) -> u64 {
139 self.rtt
140 }
141
142 pub fn jitter(&self) -> u64 {
144 self.jitter
145 }
146
147 pub fn needs_sync(&self, interval_secs: u64) -> bool {
149 self.samples == 0 || self.last_sync.elapsed().as_secs() >= interval_secs
150 }
151
152 pub fn quality(&self) -> f64 {
154 if self.samples == 0 {
155 return 0.0;
156 }
157
158 let rtt_score = (10000.0 - self.rtt.min(10000) as f64) / 10000.0;
160 let jitter_score = (1000.0 - self.jitter.min(1000) as f64) / 1000.0;
161 let sample_score = (self.samples.min(10) as f64) / 10.0;
162
163 (rtt_score * 0.4 + jitter_score * 0.4 + sample_score * 0.2).clamp(0.0, 1.0)
164 }
165}
166
167#[derive(Debug, Clone)]
169pub struct SessionTime {
170 start: Instant,
171 start_unix: Timestamp,
172}
173
174impl Default for SessionTime {
175 fn default() -> Self {
176 Self::new()
177 }
178}
179
180impl SessionTime {
181 pub fn new() -> Self {
183 Self {
184 start: Instant::now(),
185 start_unix: now(),
186 }
187 }
188
189 pub fn elapsed(&self) -> Timestamp {
191 self.start.elapsed().as_micros() as Timestamp
192 }
193
194 pub fn start_time(&self) -> Timestamp {
196 self.start_unix
197 }
198
199 pub fn to_unix(&self, session_time: Timestamp) -> Timestamp {
201 self.start_unix + session_time
202 }
203
204 pub fn from_unix(&self, unix_time: Timestamp) -> Timestamp {
206 unix_time.saturating_sub(self.start_unix)
207 }
208}
209
210#[derive(Debug)]
212pub struct JitterBuffer<T> {
213 buffer: Vec<(Timestamp, T)>,
214 capacity: usize,
215 window_us: u64,
216}
217
218impl<T: Clone> JitterBuffer<T> {
219 pub fn new(capacity: usize, window_ms: u64) -> Self {
225 Self {
226 buffer: Vec::with_capacity(capacity),
227 capacity,
228 window_us: window_ms * 1000,
229 }
230 }
231
232 pub fn push(&mut self, timestamp: Timestamp, value: T) {
234 let cutoff = now().saturating_sub(self.window_us);
236 self.buffer.retain(|(ts, _)| *ts > cutoff);
237
238 let pos = self.buffer.partition_point(|(ts, _)| *ts < timestamp);
240 if self.buffer.len() < self.capacity {
241 self.buffer.insert(pos, (timestamp, value));
242 } else if pos > 0 {
243 self.buffer.remove(0);
245 let new_pos = pos.saturating_sub(1);
246 self.buffer.insert(new_pos, (timestamp, value));
247 }
248 }
249
250 pub fn pop(&mut self, playback_time: Timestamp) -> Option<T> {
252 if let Some((ts, _)) = self.buffer.first() {
253 if *ts <= playback_time {
254 return Some(self.buffer.remove(0).1);
255 }
256 }
257 None
258 }
259
260 pub fn drain_ready(&mut self, playback_time: Timestamp) -> Vec<T> {
262 let mut ready = Vec::new();
263 while let Some((ts, _)) = self.buffer.first() {
264 if *ts <= playback_time {
265 ready.push(self.buffer.remove(0).1);
266 } else {
267 break;
268 }
269 }
270 ready
271 }
272
273 pub fn len(&self) -> usize {
275 self.buffer.len()
276 }
277
278 pub fn is_empty(&self) -> bool {
280 self.buffer.is_empty()
281 }
282
283 pub fn depth_us(&self) -> u64 {
285 match (self.buffer.first(), self.buffer.last()) {
286 (Some(first), Some(last)) if self.buffer.len() >= 2 => last.0 - first.0,
287 _ => 0,
288 }
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295
296 #[test]
297 fn test_clock_sync() {
298 let mut sync = ClockSync::new();
299
300 let t1 = 1000000u64;
303 let t2 = 1000050u64; let t3 = 1000051u64;
305 let t4 = 1000100u64;
306
307 sync.process_sync(t1, t2, t3, t4);
308
309 assert!(sync.samples > 0);
310 assert!(sync.rtt > 0);
311 }
312
313 #[test]
314 fn test_session_time() {
315 let session = SessionTime::new();
316
317 std::thread::sleep(std::time::Duration::from_millis(10));
318
319 let elapsed = session.elapsed();
320 assert!(elapsed >= 10000); }
322
323 #[test]
324 fn test_jitter_buffer() {
325 let mut buffer: JitterBuffer<f64> = JitterBuffer::new(10, 100);
326
327 let base = now();
328 buffer.push(base + 10000, 0.1);
329 buffer.push(base + 20000, 0.2);
330 buffer.push(base + 5000, 0.05); assert_eq!(buffer.len(), 3);
333
334 let first = buffer.pop(base + 10000);
336 assert_eq!(first, Some(0.05));
337 }
338}