1use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6
7pub type Timestamp = u64;
9
10pub fn now() -> Timestamp {
12 SystemTime::now()
13 .duration_since(UNIX_EPOCH)
14 .unwrap()
15 .as_micros() as Timestamp
16}
17
18pub fn to_duration(micros: Timestamp) -> Duration {
20 Duration::from_micros(micros)
21}
22
23pub fn from_duration(duration: Duration) -> Timestamp {
25 duration.as_micros() as Timestamp
26}
27
28#[derive(Debug, Clone)]
30pub struct ClockSync {
31 offset: i64,
33 rtt: u64,
35 jitter: u64,
37 samples: u32,
39 last_sync: Instant,
41 rtt_history: Vec<u64>,
43}
44
45impl Default for ClockSync {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl ClockSync {
52 pub fn new() -> Self {
54 Self {
55 offset: 0,
56 rtt: 0,
57 jitter: 0,
58 samples: 0,
59 last_sync: Instant::now(),
60 rtt_history: Vec::with_capacity(10),
61 }
62 }
63
64 pub fn process_sync(&mut self, t1: u64, t2: u64, t3: u64, t4: u64) {
72 let rtt = (t4 - t1) - (t3 - t2);
74
75 let offset = ((t2 as i64 - t1 as i64) + (t3 as i64 - t4 as i64)) / 2;
77
78 self.rtt_history.push(rtt);
80 if self.rtt_history.len() > 10 {
81 self.rtt_history.remove(0);
82 }
83
84 if self.rtt_history.len() >= 2 {
86 let mean: u64 = self.rtt_history.iter().sum::<u64>() / self.rtt_history.len() as u64;
87 let variance: u64 = self
88 .rtt_history
89 .iter()
90 .map(|&x| {
91 let diff = x as i64 - mean as i64;
92 (diff * diff) as u64
93 })
94 .sum::<u64>()
95 / self.rtt_history.len() as u64;
96 self.jitter = (variance as f64).sqrt() as u64;
97 }
98
99 if self.samples == 0 {
101 self.offset = offset;
102 self.rtt = rtt;
103 } else {
104 let alpha = 0.3;
106 self.offset = ((1.0 - alpha) * self.offset as f64 + alpha * offset as f64) as i64;
107 self.rtt = ((1.0 - alpha) * self.rtt as f64 + alpha * rtt as f64) as u64;
108 }
109
110 self.samples += 1;
111 self.last_sync = Instant::now();
112 }
113
114 pub fn server_time(&self) -> Timestamp {
116 let local = now();
117 (local as i64 + self.offset) as Timestamp
118 }
119
120 pub fn to_server_time(&self, local: Timestamp) -> Timestamp {
122 (local as i64 + self.offset) as Timestamp
123 }
124
125 pub fn to_local_time(&self, server: Timestamp) -> Timestamp {
127 (server as i64 - self.offset) as Timestamp
128 }
129
130 pub fn offset(&self) -> i64 {
132 self.offset
133 }
134
135 pub fn rtt(&self) -> u64 {
137 self.rtt
138 }
139
140 pub fn jitter(&self) -> u64 {
142 self.jitter
143 }
144
145 pub fn needs_sync(&self, interval_secs: u64) -> bool {
147 self.samples == 0 || self.last_sync.elapsed().as_secs() >= interval_secs
148 }
149
150 pub fn quality(&self) -> f64 {
152 if self.samples == 0 {
153 return 0.0;
154 }
155
156 let rtt_score = (10000.0 - self.rtt.min(10000) as f64) / 10000.0;
158 let jitter_score = (1000.0 - self.jitter.min(1000) as f64) / 1000.0;
159 let sample_score = (self.samples.min(10) as f64) / 10.0;
160
161 (rtt_score * 0.4 + jitter_score * 0.4 + sample_score * 0.2).clamp(0.0, 1.0)
162 }
163}
164
165#[derive(Debug, Clone)]
167pub struct SessionTime {
168 start: Instant,
169 start_unix: Timestamp,
170}
171
172impl Default for SessionTime {
173 fn default() -> Self {
174 Self::new()
175 }
176}
177
178impl SessionTime {
179 pub fn new() -> Self {
181 Self {
182 start: Instant::now(),
183 start_unix: now(),
184 }
185 }
186
187 pub fn elapsed(&self) -> Timestamp {
189 self.start.elapsed().as_micros() as Timestamp
190 }
191
192 pub fn start_time(&self) -> Timestamp {
194 self.start_unix
195 }
196
197 pub fn to_unix(&self, session_time: Timestamp) -> Timestamp {
199 self.start_unix + session_time
200 }
201
202 pub fn from_unix(&self, unix_time: Timestamp) -> Timestamp {
204 unix_time.saturating_sub(self.start_unix)
205 }
206}
207
208#[derive(Debug)]
210pub struct JitterBuffer<T> {
211 buffer: Vec<(Timestamp, T)>,
212 capacity: usize,
213 window_us: u64,
214}
215
216impl<T: Clone> JitterBuffer<T> {
217 pub fn new(capacity: usize, window_ms: u64) -> Self {
223 Self {
224 buffer: Vec::with_capacity(capacity),
225 capacity,
226 window_us: window_ms * 1000,
227 }
228 }
229
230 pub fn push(&mut self, timestamp: Timestamp, value: T) {
232 let cutoff = now().saturating_sub(self.window_us);
234 self.buffer.retain(|(ts, _)| *ts > cutoff);
235
236 let pos = self.buffer.partition_point(|(ts, _)| *ts < timestamp);
238 if self.buffer.len() < self.capacity {
239 self.buffer.insert(pos, (timestamp, value));
240 } else if pos > 0 {
241 self.buffer.remove(0);
243 let new_pos = pos.saturating_sub(1);
244 self.buffer.insert(new_pos, (timestamp, value));
245 }
246 }
247
248 pub fn pop(&mut self, playback_time: Timestamp) -> Option<T> {
250 if let Some((ts, _)) = self.buffer.first() {
251 if *ts <= playback_time {
252 return Some(self.buffer.remove(0).1);
253 }
254 }
255 None
256 }
257
258 pub fn drain_ready(&mut self, playback_time: Timestamp) -> Vec<T> {
260 let mut ready = Vec::new();
261 while let Some((ts, _)) = self.buffer.first() {
262 if *ts <= playback_time {
263 ready.push(self.buffer.remove(0).1);
264 } else {
265 break;
266 }
267 }
268 ready
269 }
270
271 pub fn len(&self) -> usize {
273 self.buffer.len()
274 }
275
276 pub fn is_empty(&self) -> bool {
278 self.buffer.is_empty()
279 }
280
281 pub fn depth_us(&self) -> u64 {
283 if self.buffer.len() < 2 {
284 0
285 } else {
286 self.buffer.last().unwrap().0 - self.buffer.first().unwrap().0
287 }
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294
295 #[test]
296 fn test_clock_sync() {
297 let mut sync = ClockSync::new();
298
299 let t1 = 1000000u64;
302 let t2 = 1000050u64; let t3 = 1000051u64;
304 let t4 = 1000100u64;
305
306 sync.process_sync(t1, t2, t3, t4);
307
308 assert!(sync.samples > 0);
309 assert!(sync.rtt > 0);
310 }
311
312 #[test]
313 fn test_session_time() {
314 let session = SessionTime::new();
315
316 std::thread::sleep(std::time::Duration::from_millis(10));
317
318 let elapsed = session.elapsed();
319 assert!(elapsed >= 10000); }
321
322 #[test]
323 fn test_jitter_buffer() {
324 let mut buffer: JitterBuffer<f64> = JitterBuffer::new(10, 100);
325
326 let base = now();
327 buffer.push(base + 10000, 0.1);
328 buffer.push(base + 20000, 0.2);
329 buffer.push(base + 5000, 0.05); assert_eq!(buffer.len(), 3);
332
333 let first = buffer.pop(base + 10000);
335 assert_eq!(first, Some(0.05));
336 }
337}