clasp_core/
time.rs

1//! Timing utilities for Clasp
2//!
3//! Provides clock synchronization and timestamp handling.
4
5use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6
7/// Timestamp type (microseconds)
8pub type Timestamp = u64;
9
10/// Get current Unix timestamp in microseconds
11pub fn now() -> Timestamp {
12    SystemTime::now()
13        .duration_since(UNIX_EPOCH)
14        .unwrap()
15        .as_micros() as Timestamp
16}
17
18/// Convert microseconds to Duration
19pub fn to_duration(micros: Timestamp) -> Duration {
20    Duration::from_micros(micros)
21}
22
23/// Convert Duration to microseconds
24pub fn from_duration(duration: Duration) -> Timestamp {
25    duration.as_micros() as Timestamp
26}
27
28/// Clock synchronization state
29#[derive(Debug, Clone)]
30pub struct ClockSync {
31    /// Estimated offset from server time (microseconds)
32    offset: i64,
33    /// Round-trip time (microseconds)
34    rtt: u64,
35    /// Jitter estimate (microseconds)
36    jitter: u64,
37    /// Number of sync samples
38    samples: u32,
39    /// Last sync time (local)
40    last_sync: Instant,
41    /// Recent RTT samples for jitter calculation
42    rtt_history: Vec<u64>,
43}
44
45impl Default for ClockSync {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl ClockSync {
52    /// Create a new clock sync instance
53    pub fn new() -> Self {
54        Self {
55            offset: 0,
56            rtt: 0,
57            jitter: 0,
58            samples: 0,
59            last_sync: Instant::now(),
60            rtt_history: Vec::with_capacity(10),
61        }
62    }
63
64    /// Process a sync response
65    ///
66    /// # Arguments
67    /// * `t1` - Client send time
68    /// * `t2` - Server receive time
69    /// * `t3` - Server send time
70    /// * `t4` - Client receive time
71    pub fn process_sync(&mut self, t1: u64, t2: u64, t3: u64, t4: u64) {
72        // Calculate round-trip time
73        let rtt = (t4 - t1) - (t3 - t2);
74
75        // Calculate offset using NTP algorithm
76        let offset = ((t2 as i64 - t1 as i64) + (t3 as i64 - t4 as i64)) / 2;
77
78        // Update RTT history
79        self.rtt_history.push(rtt);
80        if self.rtt_history.len() > 10 {
81            self.rtt_history.remove(0);
82        }
83
84        // Calculate jitter (variance of RTT)
85        if self.rtt_history.len() >= 2 {
86            let mean: u64 = self.rtt_history.iter().sum::<u64>() / self.rtt_history.len() as u64;
87            let variance: u64 = self.rtt_history
88                .iter()
89                .map(|&x| {
90                    let diff = x as i64 - mean as i64;
91                    (diff * diff) as u64
92                })
93                .sum::<u64>() / self.rtt_history.len() as u64;
94            self.jitter = (variance as f64).sqrt() as u64;
95        }
96
97        // Use exponential moving average for offset
98        if self.samples == 0 {
99            self.offset = offset;
100            self.rtt = rtt;
101        } else {
102            // Weight newer samples more
103            let alpha = 0.3;
104            self.offset = ((1.0 - alpha) * self.offset as f64 + alpha * offset as f64) as i64;
105            self.rtt = ((1.0 - alpha) * self.rtt as f64 + alpha * rtt as f64) as u64;
106        }
107
108        self.samples += 1;
109        self.last_sync = Instant::now();
110    }
111
112    /// Get estimated server time
113    pub fn server_time(&self) -> Timestamp {
114        let local = now();
115        (local as i64 + self.offset) as Timestamp
116    }
117
118    /// Convert local time to server time
119    pub fn to_server_time(&self, local: Timestamp) -> Timestamp {
120        (local as i64 + self.offset) as Timestamp
121    }
122
123    /// Convert server time to local time
124    pub fn to_local_time(&self, server: Timestamp) -> Timestamp {
125        (server as i64 - self.offset) as Timestamp
126    }
127
128    /// Get current offset estimate
129    pub fn offset(&self) -> i64 {
130        self.offset
131    }
132
133    /// Get current RTT estimate
134    pub fn rtt(&self) -> u64 {
135        self.rtt
136    }
137
138    /// Get current jitter estimate
139    pub fn jitter(&self) -> u64 {
140        self.jitter
141    }
142
143    /// Check if sync is needed (e.g., every 30 seconds)
144    pub fn needs_sync(&self, interval_secs: u64) -> bool {
145        self.samples == 0 || self.last_sync.elapsed().as_secs() >= interval_secs
146    }
147
148    /// Get sync quality (0.0 = poor, 1.0 = excellent)
149    pub fn quality(&self) -> f64 {
150        if self.samples == 0 {
151            return 0.0;
152        }
153
154        // Based on RTT and jitter
155        let rtt_score = (10000.0 - self.rtt.min(10000) as f64) / 10000.0;
156        let jitter_score = (1000.0 - self.jitter.min(1000) as f64) / 1000.0;
157        let sample_score = (self.samples.min(10) as f64) / 10.0;
158
159        (rtt_score * 0.4 + jitter_score * 0.4 + sample_score * 0.2).clamp(0.0, 1.0)
160    }
161}
162
163/// Session time tracker (time since session start)
164#[derive(Debug, Clone)]
165pub struct SessionTime {
166    start: Instant,
167    start_unix: Timestamp,
168}
169
170impl Default for SessionTime {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176impl SessionTime {
177    /// Create a new session time tracker
178    pub fn new() -> Self {
179        Self {
180            start: Instant::now(),
181            start_unix: now(),
182        }
183    }
184
185    /// Get microseconds since session start
186    pub fn elapsed(&self) -> Timestamp {
187        self.start.elapsed().as_micros() as Timestamp
188    }
189
190    /// Get the session start time (Unix timestamp)
191    pub fn start_time(&self) -> Timestamp {
192        self.start_unix
193    }
194
195    /// Convert session time to Unix timestamp
196    pub fn to_unix(&self, session_time: Timestamp) -> Timestamp {
197        self.start_unix + session_time
198    }
199
200    /// Convert Unix timestamp to session time
201    pub fn from_unix(&self, unix_time: Timestamp) -> Timestamp {
202        unix_time.saturating_sub(self.start_unix)
203    }
204}
205
206/// Jitter buffer for smoothing high-rate streams
207#[derive(Debug)]
208pub struct JitterBuffer<T> {
209    buffer: Vec<(Timestamp, T)>,
210    capacity: usize,
211    window_us: u64,
212}
213
214impl<T: Clone> JitterBuffer<T> {
215    /// Create a new jitter buffer
216    ///
217    /// # Arguments
218    /// * `capacity` - Maximum number of items
219    /// * `window_ms` - Buffer window in milliseconds
220    pub fn new(capacity: usize, window_ms: u64) -> Self {
221        Self {
222            buffer: Vec::with_capacity(capacity),
223            capacity,
224            window_us: window_ms * 1000,
225        }
226    }
227
228    /// Add a sample with timestamp
229    pub fn push(&mut self, timestamp: Timestamp, value: T) {
230        // Remove old samples
231        let cutoff = now().saturating_sub(self.window_us);
232        self.buffer.retain(|(ts, _)| *ts > cutoff);
233
234        // Add new sample (maintain sorted order)
235        let pos = self.buffer.partition_point(|(ts, _)| *ts < timestamp);
236        if self.buffer.len() < self.capacity {
237            self.buffer.insert(pos, (timestamp, value));
238        } else if pos > 0 {
239            // Replace oldest
240            self.buffer.remove(0);
241            let new_pos = pos.saturating_sub(1);
242            self.buffer.insert(new_pos, (timestamp, value));
243        }
244    }
245
246    /// Get the next sample ready for playback
247    pub fn pop(&mut self, playback_time: Timestamp) -> Option<T> {
248        if let Some((ts, _)) = self.buffer.first() {
249            if *ts <= playback_time {
250                return Some(self.buffer.remove(0).1);
251            }
252        }
253        None
254    }
255
256    /// Get all samples ready for playback
257    pub fn drain_ready(&mut self, playback_time: Timestamp) -> Vec<T> {
258        let mut ready = Vec::new();
259        while let Some((ts, _)) = self.buffer.first() {
260            if *ts <= playback_time {
261                ready.push(self.buffer.remove(0).1);
262            } else {
263                break;
264            }
265        }
266        ready
267    }
268
269    /// Current buffer depth (samples)
270    pub fn len(&self) -> usize {
271        self.buffer.len()
272    }
273
274    /// Check if buffer is empty
275    pub fn is_empty(&self) -> bool {
276        self.buffer.is_empty()
277    }
278
279    /// Current buffer depth (time span in microseconds)
280    pub fn depth_us(&self) -> u64 {
281        if self.buffer.len() < 2 {
282            0
283        } else {
284            self.buffer.last().unwrap().0 - self.buffer.first().unwrap().0
285        }
286    }
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292
293    #[test]
294    fn test_clock_sync() {
295        let mut sync = ClockSync::new();
296
297        // Simulate sync exchange
298        // Client sends at T1, server receives at T2, server sends at T3, client receives at T4
299        let t1 = 1000000u64;
300        let t2 = 1000050u64; // Server is 50µs ahead
301        let t3 = 1000051u64;
302        let t4 = 1000100u64;
303
304        sync.process_sync(t1, t2, t3, t4);
305
306        assert!(sync.samples > 0);
307        assert!(sync.rtt > 0);
308    }
309
310    #[test]
311    fn test_session_time() {
312        let session = SessionTime::new();
313
314        std::thread::sleep(std::time::Duration::from_millis(10));
315
316        let elapsed = session.elapsed();
317        assert!(elapsed >= 10000); // At least 10ms in microseconds
318    }
319
320    #[test]
321    fn test_jitter_buffer() {
322        let mut buffer: JitterBuffer<f64> = JitterBuffer::new(10, 100);
323
324        let base = now();
325        buffer.push(base + 10000, 0.1);
326        buffer.push(base + 20000, 0.2);
327        buffer.push(base + 5000, 0.05); // Out of order
328
329        assert_eq!(buffer.len(), 3);
330
331        // Should return in timestamp order
332        let first = buffer.pop(base + 10000);
333        assert_eq!(first, Some(0.05));
334    }
335}