clasp_core/
time.rs

1//! Timing utilities for Clasp
2//!
3//! Provides clock synchronization and timestamp handling.
4
5use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6
7/// Timestamp type (microseconds)
8pub type Timestamp = u64;
9
10/// Get current Unix timestamp in microseconds
11pub fn now() -> Timestamp {
12    SystemTime::now()
13        .duration_since(UNIX_EPOCH)
14        .unwrap()
15        .as_micros() as Timestamp
16}
17
18/// Convert microseconds to Duration
19pub fn to_duration(micros: Timestamp) -> Duration {
20    Duration::from_micros(micros)
21}
22
23/// Convert Duration to microseconds
24pub fn from_duration(duration: Duration) -> Timestamp {
25    duration.as_micros() as Timestamp
26}
27
28/// Clock synchronization state
29#[derive(Debug, Clone)]
30pub struct ClockSync {
31    /// Estimated offset from server time (microseconds)
32    offset: i64,
33    /// Round-trip time (microseconds)
34    rtt: u64,
35    /// Jitter estimate (microseconds)
36    jitter: u64,
37    /// Number of sync samples
38    samples: u32,
39    /// Last sync time (local)
40    last_sync: Instant,
41    /// Recent RTT samples for jitter calculation
42    rtt_history: Vec<u64>,
43}
44
45impl Default for ClockSync {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl ClockSync {
52    /// Create a new clock sync instance
53    pub fn new() -> Self {
54        Self {
55            offset: 0,
56            rtt: 0,
57            jitter: 0,
58            samples: 0,
59            last_sync: Instant::now(),
60            rtt_history: Vec::with_capacity(10),
61        }
62    }
63
64    /// Process a sync response
65    ///
66    /// # Arguments
67    /// * `t1` - Client send time
68    /// * `t2` - Server receive time
69    /// * `t3` - Server send time
70    /// * `t4` - Client receive time
71    pub fn process_sync(&mut self, t1: u64, t2: u64, t3: u64, t4: u64) {
72        // Calculate round-trip time
73        let rtt = (t4 - t1) - (t3 - t2);
74
75        // Calculate offset using NTP algorithm
76        let offset = ((t2 as i64 - t1 as i64) + (t3 as i64 - t4 as i64)) / 2;
77
78        // Update RTT history
79        self.rtt_history.push(rtt);
80        if self.rtt_history.len() > 10 {
81            self.rtt_history.remove(0);
82        }
83
84        // Calculate jitter (variance of RTT)
85        if self.rtt_history.len() >= 2 {
86            let mean: u64 = self.rtt_history.iter().sum::<u64>() / self.rtt_history.len() as u64;
87            let variance: u64 = self
88                .rtt_history
89                .iter()
90                .map(|&x| {
91                    let diff = x as i64 - mean as i64;
92                    (diff * diff) as u64
93                })
94                .sum::<u64>()
95                / self.rtt_history.len() as u64;
96            self.jitter = (variance as f64).sqrt() as u64;
97        }
98
99        // Use exponential moving average for offset
100        if self.samples == 0 {
101            self.offset = offset;
102            self.rtt = rtt;
103        } else {
104            // Weight newer samples more
105            let alpha = 0.3;
106            self.offset = ((1.0 - alpha) * self.offset as f64 + alpha * offset as f64) as i64;
107            self.rtt = ((1.0 - alpha) * self.rtt as f64 + alpha * rtt as f64) as u64;
108        }
109
110        self.samples += 1;
111        self.last_sync = Instant::now();
112    }
113
114    /// Get estimated server time
115    pub fn server_time(&self) -> Timestamp {
116        let local = now();
117        (local as i64 + self.offset) as Timestamp
118    }
119
120    /// Convert local time to server time
121    pub fn to_server_time(&self, local: Timestamp) -> Timestamp {
122        (local as i64 + self.offset) as Timestamp
123    }
124
125    /// Convert server time to local time
126    pub fn to_local_time(&self, server: Timestamp) -> Timestamp {
127        (server as i64 - self.offset) as Timestamp
128    }
129
130    /// Get current offset estimate
131    pub fn offset(&self) -> i64 {
132        self.offset
133    }
134
135    /// Get current RTT estimate
136    pub fn rtt(&self) -> u64 {
137        self.rtt
138    }
139
140    /// Get current jitter estimate
141    pub fn jitter(&self) -> u64 {
142        self.jitter
143    }
144
145    /// Check if sync is needed (e.g., every 30 seconds)
146    pub fn needs_sync(&self, interval_secs: u64) -> bool {
147        self.samples == 0 || self.last_sync.elapsed().as_secs() >= interval_secs
148    }
149
150    /// Get sync quality (0.0 = poor, 1.0 = excellent)
151    pub fn quality(&self) -> f64 {
152        if self.samples == 0 {
153            return 0.0;
154        }
155
156        // Based on RTT and jitter
157        let rtt_score = (10000.0 - self.rtt.min(10000) as f64) / 10000.0;
158        let jitter_score = (1000.0 - self.jitter.min(1000) as f64) / 1000.0;
159        let sample_score = (self.samples.min(10) as f64) / 10.0;
160
161        (rtt_score * 0.4 + jitter_score * 0.4 + sample_score * 0.2).clamp(0.0, 1.0)
162    }
163}
164
165/// Session time tracker (time since session start)
166#[derive(Debug, Clone)]
167pub struct SessionTime {
168    start: Instant,
169    start_unix: Timestamp,
170}
171
172impl Default for SessionTime {
173    fn default() -> Self {
174        Self::new()
175    }
176}
177
178impl SessionTime {
179    /// Create a new session time tracker
180    pub fn new() -> Self {
181        Self {
182            start: Instant::now(),
183            start_unix: now(),
184        }
185    }
186
187    /// Get microseconds since session start
188    pub fn elapsed(&self) -> Timestamp {
189        self.start.elapsed().as_micros() as Timestamp
190    }
191
192    /// Get the session start time (Unix timestamp)
193    pub fn start_time(&self) -> Timestamp {
194        self.start_unix
195    }
196
197    /// Convert session time to Unix timestamp
198    pub fn to_unix(&self, session_time: Timestamp) -> Timestamp {
199        self.start_unix + session_time
200    }
201
202    /// Convert Unix timestamp to session time
203    pub fn from_unix(&self, unix_time: Timestamp) -> Timestamp {
204        unix_time.saturating_sub(self.start_unix)
205    }
206}
207
208/// Jitter buffer for smoothing high-rate streams
209#[derive(Debug)]
210pub struct JitterBuffer<T> {
211    buffer: Vec<(Timestamp, T)>,
212    capacity: usize,
213    window_us: u64,
214}
215
216impl<T: Clone> JitterBuffer<T> {
217    /// Create a new jitter buffer
218    ///
219    /// # Arguments
220    /// * `capacity` - Maximum number of items
221    /// * `window_ms` - Buffer window in milliseconds
222    pub fn new(capacity: usize, window_ms: u64) -> Self {
223        Self {
224            buffer: Vec::with_capacity(capacity),
225            capacity,
226            window_us: window_ms * 1000,
227        }
228    }
229
230    /// Add a sample with timestamp
231    pub fn push(&mut self, timestamp: Timestamp, value: T) {
232        // Remove old samples
233        let cutoff = now().saturating_sub(self.window_us);
234        self.buffer.retain(|(ts, _)| *ts > cutoff);
235
236        // Add new sample (maintain sorted order)
237        let pos = self.buffer.partition_point(|(ts, _)| *ts < timestamp);
238        if self.buffer.len() < self.capacity {
239            self.buffer.insert(pos, (timestamp, value));
240        } else if pos > 0 {
241            // Replace oldest
242            self.buffer.remove(0);
243            let new_pos = pos.saturating_sub(1);
244            self.buffer.insert(new_pos, (timestamp, value));
245        }
246    }
247
248    /// Get the next sample ready for playback
249    pub fn pop(&mut self, playback_time: Timestamp) -> Option<T> {
250        if let Some((ts, _)) = self.buffer.first() {
251            if *ts <= playback_time {
252                return Some(self.buffer.remove(0).1);
253            }
254        }
255        None
256    }
257
258    /// Get all samples ready for playback
259    pub fn drain_ready(&mut self, playback_time: Timestamp) -> Vec<T> {
260        let mut ready = Vec::new();
261        while let Some((ts, _)) = self.buffer.first() {
262            if *ts <= playback_time {
263                ready.push(self.buffer.remove(0).1);
264            } else {
265                break;
266            }
267        }
268        ready
269    }
270
271    /// Current buffer depth (samples)
272    pub fn len(&self) -> usize {
273        self.buffer.len()
274    }
275
276    /// Check if buffer is empty
277    pub fn is_empty(&self) -> bool {
278        self.buffer.is_empty()
279    }
280
281    /// Current buffer depth (time span in microseconds)
282    pub fn depth_us(&self) -> u64 {
283        if self.buffer.len() < 2 {
284            0
285        } else {
286            self.buffer.last().unwrap().0 - self.buffer.first().unwrap().0
287        }
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use super::*;
294
295    #[test]
296    fn test_clock_sync() {
297        let mut sync = ClockSync::new();
298
299        // Simulate sync exchange
300        // Client sends at T1, server receives at T2, server sends at T3, client receives at T4
301        let t1 = 1000000u64;
302        let t2 = 1000050u64; // Server is 50µs ahead
303        let t3 = 1000051u64;
304        let t4 = 1000100u64;
305
306        sync.process_sync(t1, t2, t3, t4);
307
308        assert!(sync.samples > 0);
309        assert!(sync.rtt > 0);
310    }
311
312    #[test]
313    fn test_session_time() {
314        let session = SessionTime::new();
315
316        std::thread::sleep(std::time::Duration::from_millis(10));
317
318        let elapsed = session.elapsed();
319        assert!(elapsed >= 10000); // At least 10ms in microseconds
320    }
321
322    #[test]
323    fn test_jitter_buffer() {
324        let mut buffer: JitterBuffer<f64> = JitterBuffer::new(10, 100);
325
326        let base = now();
327        buffer.push(base + 10000, 0.1);
328        buffer.push(base + 20000, 0.2);
329        buffer.push(base + 5000, 0.05); // Out of order
330
331        assert_eq!(buffer.len(), 3);
332
333        // Should return in timestamp order
334        let first = buffer.pop(base + 10000);
335        assert_eq!(first, Some(0.05));
336    }
337}