Skip to main content

clasp_core/
time.rs

1//! Timing utilities for Clasp
2//!
3//! Provides clock synchronization and timestamp handling.
4
5use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
6
7/// Timestamp type (microseconds)
8pub type Timestamp = u64;
9
10/// Get current Unix timestamp in microseconds
11///
12/// Falls back to 0 if system time is before UNIX_EPOCH (edge case on misconfigured systems)
13pub fn now() -> Timestamp {
14    SystemTime::now()
15        .duration_since(UNIX_EPOCH)
16        .map(|d| d.as_micros() as Timestamp)
17        .unwrap_or(0)
18}
19
20/// Convert microseconds to Duration
21pub fn to_duration(micros: Timestamp) -> Duration {
22    Duration::from_micros(micros)
23}
24
25/// Convert Duration to microseconds
26pub fn from_duration(duration: Duration) -> Timestamp {
27    duration.as_micros() as Timestamp
28}
29
30/// Clock synchronization state
31#[derive(Debug, Clone)]
32pub struct ClockSync {
33    /// Estimated offset from server time (microseconds)
34    offset: i64,
35    /// Round-trip time (microseconds)
36    rtt: u64,
37    /// Jitter estimate (microseconds)
38    jitter: u64,
39    /// Number of sync samples
40    samples: u32,
41    /// Last sync time (local)
42    last_sync: Instant,
43    /// Recent RTT samples for jitter calculation
44    rtt_history: Vec<u64>,
45}
46
47impl Default for ClockSync {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl ClockSync {
54    /// Create a new clock sync instance
55    pub fn new() -> Self {
56        Self {
57            offset: 0,
58            rtt: 0,
59            jitter: 0,
60            samples: 0,
61            last_sync: Instant::now(),
62            rtt_history: Vec::with_capacity(10),
63        }
64    }
65
66    /// Process a sync response
67    ///
68    /// # Arguments
69    /// * `t1` - Client send time
70    /// * `t2` - Server receive time
71    /// * `t3` - Server send time
72    /// * `t4` - Client receive time
73    pub fn process_sync(&mut self, t1: u64, t2: u64, t3: u64, t4: u64) {
74        // Calculate round-trip time
75        let rtt = (t4 - t1) - (t3 - t2);
76
77        // Calculate offset using NTP algorithm
78        let offset = ((t2 as i64 - t1 as i64) + (t3 as i64 - t4 as i64)) / 2;
79
80        // Update RTT history
81        self.rtt_history.push(rtt);
82        if self.rtt_history.len() > 10 {
83            self.rtt_history.remove(0);
84        }
85
86        // Calculate jitter (variance of RTT)
87        if self.rtt_history.len() >= 2 {
88            let mean: u64 = self.rtt_history.iter().sum::<u64>() / self.rtt_history.len() as u64;
89            let variance: u64 = self
90                .rtt_history
91                .iter()
92                .map(|&x| {
93                    let diff = x as i64 - mean as i64;
94                    (diff * diff) as u64
95                })
96                .sum::<u64>()
97                / self.rtt_history.len() as u64;
98            self.jitter = (variance as f64).sqrt() as u64;
99        }
100
101        // Use exponential moving average for offset
102        if self.samples == 0 {
103            self.offset = offset;
104            self.rtt = rtt;
105        } else {
106            // Weight newer samples more
107            let alpha = 0.3;
108            self.offset = ((1.0 - alpha) * self.offset as f64 + alpha * offset as f64) as i64;
109            self.rtt = ((1.0 - alpha) * self.rtt as f64 + alpha * rtt as f64) as u64;
110        }
111
112        self.samples += 1;
113        self.last_sync = Instant::now();
114    }
115
116    /// Get estimated server time
117    pub fn server_time(&self) -> Timestamp {
118        let local = now();
119        (local as i64 + self.offset) as Timestamp
120    }
121
122    /// Convert local time to server time
123    pub fn to_server_time(&self, local: Timestamp) -> Timestamp {
124        (local as i64 + self.offset) as Timestamp
125    }
126
127    /// Convert server time to local time
128    pub fn to_local_time(&self, server: Timestamp) -> Timestamp {
129        (server as i64 - self.offset) as Timestamp
130    }
131
132    /// Get current offset estimate
133    pub fn offset(&self) -> i64 {
134        self.offset
135    }
136
137    /// Get current RTT estimate
138    pub fn rtt(&self) -> u64 {
139        self.rtt
140    }
141
142    /// Get current jitter estimate
143    pub fn jitter(&self) -> u64 {
144        self.jitter
145    }
146
147    /// Check if sync is needed (e.g., every 30 seconds)
148    pub fn needs_sync(&self, interval_secs: u64) -> bool {
149        self.samples == 0 || self.last_sync.elapsed().as_secs() >= interval_secs
150    }
151
152    /// Get sync quality (0.0 = poor, 1.0 = excellent)
153    pub fn quality(&self) -> f64 {
154        if self.samples == 0 {
155            return 0.0;
156        }
157
158        // Based on RTT and jitter
159        let rtt_score = (10000.0 - self.rtt.min(10000) as f64) / 10000.0;
160        let jitter_score = (1000.0 - self.jitter.min(1000) as f64) / 1000.0;
161        let sample_score = (self.samples.min(10) as f64) / 10.0;
162
163        (rtt_score * 0.4 + jitter_score * 0.4 + sample_score * 0.2).clamp(0.0, 1.0)
164    }
165}
166
167/// Session time tracker (time since session start)
168#[derive(Debug, Clone)]
169pub struct SessionTime {
170    start: Instant,
171    start_unix: Timestamp,
172}
173
174impl Default for SessionTime {
175    fn default() -> Self {
176        Self::new()
177    }
178}
179
180impl SessionTime {
181    /// Create a new session time tracker
182    pub fn new() -> Self {
183        Self {
184            start: Instant::now(),
185            start_unix: now(),
186        }
187    }
188
189    /// Get microseconds since session start
190    pub fn elapsed(&self) -> Timestamp {
191        self.start.elapsed().as_micros() as Timestamp
192    }
193
194    /// Get the session start time (Unix timestamp)
195    pub fn start_time(&self) -> Timestamp {
196        self.start_unix
197    }
198
199    /// Convert session time to Unix timestamp
200    pub fn to_unix(&self, session_time: Timestamp) -> Timestamp {
201        self.start_unix + session_time
202    }
203
204    /// Convert Unix timestamp to session time
205    pub fn from_unix(&self, unix_time: Timestamp) -> Timestamp {
206        unix_time.saturating_sub(self.start_unix)
207    }
208}
209
210/// Jitter buffer for smoothing high-rate streams
211#[derive(Debug)]
212pub struct JitterBuffer<T> {
213    buffer: Vec<(Timestamp, T)>,
214    capacity: usize,
215    window_us: u64,
216}
217
218impl<T: Clone> JitterBuffer<T> {
219    /// Create a new jitter buffer
220    ///
221    /// # Arguments
222    /// * `capacity` - Maximum number of items
223    /// * `window_ms` - Buffer window in milliseconds
224    pub fn new(capacity: usize, window_ms: u64) -> Self {
225        Self {
226            buffer: Vec::with_capacity(capacity),
227            capacity,
228            window_us: window_ms * 1000,
229        }
230    }
231
232    /// Add a sample with timestamp
233    pub fn push(&mut self, timestamp: Timestamp, value: T) {
234        // Remove old samples
235        let cutoff = now().saturating_sub(self.window_us);
236        self.buffer.retain(|(ts, _)| *ts > cutoff);
237
238        // Add new sample (maintain sorted order)
239        let pos = self.buffer.partition_point(|(ts, _)| *ts < timestamp);
240        if self.buffer.len() < self.capacity {
241            self.buffer.insert(pos, (timestamp, value));
242        } else if pos > 0 {
243            // Replace oldest
244            self.buffer.remove(0);
245            let new_pos = pos.saturating_sub(1);
246            self.buffer.insert(new_pos, (timestamp, value));
247        }
248    }
249
250    /// Get the next sample ready for playback
251    pub fn pop(&mut self, playback_time: Timestamp) -> Option<T> {
252        if let Some((ts, _)) = self.buffer.first() {
253            if *ts <= playback_time {
254                return Some(self.buffer.remove(0).1);
255            }
256        }
257        None
258    }
259
260    /// Get all samples ready for playback
261    pub fn drain_ready(&mut self, playback_time: Timestamp) -> Vec<T> {
262        let mut ready = Vec::new();
263        while let Some((ts, _)) = self.buffer.first() {
264            if *ts <= playback_time {
265                ready.push(self.buffer.remove(0).1);
266            } else {
267                break;
268            }
269        }
270        ready
271    }
272
273    /// Current buffer depth (samples)
274    pub fn len(&self) -> usize {
275        self.buffer.len()
276    }
277
278    /// Check if buffer is empty
279    pub fn is_empty(&self) -> bool {
280        self.buffer.is_empty()
281    }
282
283    /// Current buffer depth (time span in microseconds)
284    pub fn depth_us(&self) -> u64 {
285        match (self.buffer.first(), self.buffer.last()) {
286            (Some(first), Some(last)) if self.buffer.len() >= 2 => last.0 - first.0,
287            _ => 0,
288        }
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295
296    #[test]
297    fn test_clock_sync() {
298        let mut sync = ClockSync::new();
299
300        // Simulate sync exchange
301        // Client sends at T1, server receives at T2, server sends at T3, client receives at T4
302        let t1 = 1000000u64;
303        let t2 = 1000050u64; // Server is 50µs ahead
304        let t3 = 1000051u64;
305        let t4 = 1000100u64;
306
307        sync.process_sync(t1, t2, t3, t4);
308
309        assert!(sync.samples > 0);
310        assert!(sync.rtt > 0);
311    }
312
313    #[test]
314    fn test_session_time() {
315        let session = SessionTime::new();
316
317        std::thread::sleep(std::time::Duration::from_millis(10));
318
319        let elapsed = session.elapsed();
320        assert!(elapsed >= 10000); // At least 10ms in microseconds
321    }
322
323    #[test]
324    fn test_jitter_buffer() {
325        let mut buffer: JitterBuffer<f64> = JitterBuffer::new(10, 100);
326
327        let base = now();
328        buffer.push(base + 10000, 0.1);
329        buffer.push(base + 20000, 0.2);
330        buffer.push(base + 5000, 0.05); // Out of order
331
332        assert_eq!(buffer.len(), 3);
333
334        // Should return in timestamp order
335        let first = buffer.pop(base + 10000);
336        assert_eq!(first, Some(0.05));
337    }
338}