kizzasi_io/
sync.rs

1//! Multi-stream synchronization for time-aligned processing
2//!
3//! This module provides utilities for synchronizing multiple data streams
4//! based on timestamps, enabling precise multi-modal sensor fusion.
5
6use crate::error::{IoError, IoResult};
7use scirs2_core::ndarray::Array1;
8use std::collections::{HashMap, VecDeque};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11/// Timestamp type (microseconds since epoch)
12pub type Timestamp = u64;
13
14/// Timestamped data sample
15#[derive(Debug, Clone)]
16pub struct TimestampedSample {
17    pub timestamp: Timestamp,
18    pub data: Array1<f32>,
19    pub stream_id: String,
20}
21
22impl TimestampedSample {
23    /// Create new timestamped sample
24    pub fn new(timestamp: Timestamp, data: Array1<f32>, stream_id: String) -> Self {
25        Self {
26            timestamp,
27            data,
28            stream_id,
29        }
30    }
31
32    /// Create with current time
33    pub fn now(data: Array1<f32>, stream_id: String) -> Self {
34        let timestamp = SystemTime::now()
35            .duration_since(UNIX_EPOCH)
36            .expect("System time must be after UNIX_EPOCH")
37            .as_micros() as u64;
38        Self::new(timestamp, data, stream_id)
39    }
40
41    /// Get age of sample
42    pub fn age(&self) -> Duration {
43        let now = SystemTime::now()
44            .duration_since(UNIX_EPOCH)
45            .unwrap()
46            .as_micros() as u64;
47        Duration::from_micros(now.saturating_sub(self.timestamp))
48    }
49}
50
51/// Configuration for stream synchronizer
52#[derive(Debug, Clone)]
53pub struct SyncConfig {
54    /// Maximum time difference for alignment (microseconds)
55    pub max_time_diff: u64,
56    /// Buffer size per stream
57    pub buffer_size: usize,
58    /// Timeout for waiting for samples
59    pub timeout: Duration,
60    /// Interpolation method
61    pub interpolation: InterpolationMethod,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum InterpolationMethod {
66    /// Use nearest sample in time
67    Nearest,
68    /// Linear interpolation between samples
69    Linear,
70    /// Hold previous value
71    Hold,
72}
73
74impl Default for SyncConfig {
75    fn default() -> Self {
76        Self {
77            max_time_diff: 10_000, // 10ms
78            buffer_size: 100,
79            timeout: Duration::from_millis(100),
80            interpolation: InterpolationMethod::Linear,
81        }
82    }
83}
84
85/// Multi-stream synchronizer
86pub struct StreamSynchronizer {
87    config: SyncConfig,
88    buffers: HashMap<String, VecDeque<TimestampedSample>>,
89    last_sync_time: Option<Timestamp>,
90}
91
92impl StreamSynchronizer {
93    /// Create new synchronizer with configuration
94    pub fn new(config: SyncConfig) -> Self {
95        Self {
96            config,
97            buffers: HashMap::new(),
98            last_sync_time: None,
99        }
100    }
101
102    /// Add a stream to synchronize
103    pub fn add_stream(&mut self, stream_id: String) {
104        self.buffers
105            .entry(stream_id)
106            .or_insert_with(|| VecDeque::with_capacity(self.config.buffer_size));
107    }
108
109    /// Push sample from a stream
110    pub fn push(&mut self, sample: TimestampedSample) -> IoResult<()> {
111        let buffer = self
112            .buffers
113            .entry(sample.stream_id.clone())
114            .or_insert_with(|| VecDeque::with_capacity(self.config.buffer_size));
115
116        if buffer.len() >= self.config.buffer_size {
117            buffer.pop_front(); // Remove oldest
118        }
119
120        buffer.push_back(sample);
121        Ok(())
122    }
123
124    /// Try to get synchronized samples from all streams
125    /// Returns HashMap of stream_id -> interpolated sample
126    pub fn try_sync(&mut self) -> IoResult<HashMap<String, Array1<f32>>> {
127        if self.buffers.is_empty() {
128            return Err(IoError::InvalidConfig("No streams registered".to_string()));
129        }
130
131        // Find latest common timestamp where all streams have data
132        let target_time = self.find_sync_time()?;
133
134        // Interpolate samples for each stream at target time
135        let mut synced = HashMap::new();
136        for (stream_id, buffer) in &self.buffers {
137            let sample = self.interpolate_at_time(buffer, target_time)?;
138            synced.insert(stream_id.clone(), sample);
139        }
140
141        self.last_sync_time = Some(target_time);
142        Ok(synced)
143    }
144
145    /// Find a timestamp where all streams have data
146    fn find_sync_time(&self) -> IoResult<Timestamp> {
147        // Get the latest minimum timestamp across all streams
148        let mut min_max_time: Option<Timestamp> = None;
149
150        for buffer in self.buffers.values() {
151            if buffer.is_empty() {
152                return Err(IoError::BufferEmpty);
153            }
154
155            let max_time = buffer
156                .back()
157                .expect("Buffer must be non-empty (checked above)")
158                .timestamp;
159            min_max_time = Some(match min_max_time {
160                None => max_time,
161                Some(current) => current.min(max_time),
162            });
163        }
164
165        min_max_time.ok_or_else(|| IoError::BufferEmpty)
166    }
167
168    /// Interpolate sample at specific timestamp
169    fn interpolate_at_time(
170        &self,
171        buffer: &VecDeque<TimestampedSample>,
172        target_time: Timestamp,
173    ) -> IoResult<Array1<f32>> {
174        if buffer.is_empty() {
175            return Err(IoError::BufferEmpty);
176        }
177
178        // Find samples before and after target time
179        let mut before: Option<&TimestampedSample> = None;
180        let mut after: Option<&TimestampedSample> = None;
181
182        for sample in buffer {
183            if sample.timestamp <= target_time {
184                before = Some(sample);
185            }
186            if sample.timestamp >= target_time && after.is_none() {
187                after = Some(sample);
188                break;
189            }
190        }
191
192        match self.config.interpolation {
193            InterpolationMethod::Nearest => {
194                // Use nearest sample
195                let nearest = match (before, after) {
196                    (Some(b), Some(a)) => {
197                        let diff_before = target_time - b.timestamp;
198                        let diff_after = a.timestamp - target_time;
199                        if diff_before < diff_after {
200                            b
201                        } else {
202                            a
203                        }
204                    }
205                    (Some(b), None) => b,
206                    (None, Some(a)) => a,
207                    (None, None) => return Err(IoError::BufferEmpty),
208                };
209                Ok(nearest.data.clone())
210            }
211            InterpolationMethod::Linear => {
212                match (before, after) {
213                    (Some(b), Some(a)) if b.timestamp != a.timestamp => {
214                        // Linear interpolation
215                        let t =
216                            (target_time - b.timestamp) as f32 / (a.timestamp - b.timestamp) as f32;
217                        Ok(&b.data * (1.0 - t) + &a.data * t)
218                    }
219                    (Some(b), _) => Ok(b.data.clone()),
220                    (None, Some(a)) => Ok(a.data.clone()),
221                    _ => Err(IoError::BufferEmpty),
222                }
223            }
224            InterpolationMethod::Hold => {
225                // Use previous value
226                match before {
227                    Some(b) => Ok(b.data.clone()),
228                    None => match after {
229                        Some(a) => Ok(a.data.clone()),
230                        None => Err(IoError::BufferEmpty),
231                    },
232                }
233            }
234        }
235    }
236
237    /// Get number of buffered samples for a stream
238    pub fn buffer_len(&self, stream_id: &str) -> usize {
239        self.buffers.get(stream_id).map(|b| b.len()).unwrap_or(0)
240    }
241
242    /// Clear all buffers
243    pub fn clear(&mut self) {
244        for buffer in self.buffers.values_mut() {
245            buffer.clear();
246        }
247        self.last_sync_time = None;
248    }
249
250    /// Get registered stream IDs
251    pub fn stream_ids(&self) -> Vec<String> {
252        self.buffers.keys().cloned().collect()
253    }
254}
255
256impl Default for StreamSynchronizer {
257    fn default() -> Self {
258        Self::new(SyncConfig::default())
259    }
260}
261
262/// Time synchronizer using NTP-like algorithm
263pub struct TimeSynchronizer {
264    /// Offset from system time (microseconds)
265    offset: i64,
266    /// Round-trip time (microseconds)
267    rtt: u64,
268    /// Synchronization samples
269    samples: VecDeque<(i64, u64)>, // (offset, rtt)
270    /// Maximum samples to keep
271    max_samples: usize,
272}
273
274impl TimeSynchronizer {
275    /// Create new time synchronizer
276    pub fn new() -> Self {
277        Self {
278            offset: 0,
279            rtt: 0,
280            samples: VecDeque::with_capacity(10),
281            max_samples: 10,
282        }
283    }
284
285    /// Record synchronization sample
286    pub fn add_sample(
287        &mut self,
288        send_time: Timestamp,
289        recv_time: Timestamp,
290        remote_time: Timestamp,
291    ) {
292        let t1 = send_time as i64;
293        let t2 = remote_time as i64;
294        let t3 = recv_time as i64;
295
296        let offset = ((t2 - t1) + (t2 - t3)) / 2;
297        let rtt = (t3 - t1) as u64;
298
299        self.samples.push_back((offset, rtt));
300        if self.samples.len() > self.max_samples {
301            self.samples.pop_front();
302        }
303
304        self.update_estimate();
305    }
306
307    /// Update offset and RTT estimate
308    fn update_estimate(&mut self) {
309        if self.samples.is_empty() {
310            return;
311        }
312
313        // Use median for robustness
314        let mut offsets: Vec<i64> = self.samples.iter().map(|(o, _)| *o).collect();
315        offsets.sort_unstable();
316        self.offset = offsets[offsets.len() / 2];
317
318        let mut rtts: Vec<u64> = self.samples.iter().map(|(_, r)| *r).collect();
319        rtts.sort_unstable();
320        self.rtt = rtts[rtts.len() / 2];
321    }
322
323    /// Get synchronized timestamp
324    pub fn synchronized_time(&self) -> Timestamp {
325        let now = SystemTime::now()
326            .duration_since(UNIX_EPOCH)
327            .expect("System time must be after UNIX_EPOCH")
328            .as_micros() as u64;
329
330        (now as i64 + self.offset) as u64
331    }
332
333    /// Get current offset
334    pub fn offset(&self) -> i64 {
335        self.offset
336    }
337
338    /// Get current RTT
339    pub fn rtt(&self) -> u64 {
340        self.rtt
341    }
342}
343
344impl Default for TimeSynchronizer {
345    fn default() -> Self {
346        Self::new()
347    }
348}
349
350/// Phase-locked loop for maintaining stream synchronization
351pub struct PhaseLockLoop {
352    /// Target phase (timestamp difference)
353    target_phase: i64,
354    /// Current phase error integral
355    integral: f64,
356    /// Proportional gain
357    kp: f64,
358    /// Integral gain
359    ki: f64,
360    /// Derivative gain
361    kd: f64,
362    /// Last error
363    last_error: f64,
364}
365
366impl PhaseLockLoop {
367    /// Create new PLL with gains
368    pub fn new(kp: f64, ki: f64, kd: f64) -> Self {
369        Self {
370            target_phase: 0,
371            integral: 0.0,
372            kp,
373            ki,
374            kd,
375            last_error: 0.0,
376        }
377    }
378
379    /// Update PLL with phase error and get correction
380    pub fn update(&mut self, measured_phase: i64) -> f64 {
381        let error = (self.target_phase - measured_phase) as f64;
382
383        // PID control
384        self.integral += error;
385        let derivative = error - self.last_error;
386
387        let correction = self.kp * error + self.ki * self.integral + self.kd * derivative;
388
389        self.last_error = error;
390        correction
391    }
392
393    /// Set target phase
394    pub fn set_target_phase(&mut self, phase: i64) {
395        self.target_phase = phase;
396    }
397
398    /// Reset integral term
399    pub fn reset(&mut self) {
400        self.integral = 0.0;
401        self.last_error = 0.0;
402    }
403}
404
405impl Default for PhaseLockLoop {
406    fn default() -> Self {
407        Self::new(1.0, 0.1, 0.01)
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414
415    #[test]
416    fn test_timestamped_sample() {
417        let data = Array1::from_vec(vec![1.0, 2.0, 3.0]);
418        let sample = TimestampedSample::now(data.clone(), "test".to_string());
419
420        assert_eq!(sample.stream_id, "test");
421        assert_eq!(sample.data, data);
422        assert!(sample.timestamp > 0);
423    }
424
425    #[test]
426    fn test_stream_synchronizer() {
427        let mut sync = StreamSynchronizer::default();
428        sync.add_stream("stream1".to_string());
429        sync.add_stream("stream2".to_string());
430
431        let base_time = 1000000u64;
432
433        // Add samples to both streams
434        for i in 0..5 {
435            let time = base_time + i * 1000;
436            let data1 = Array1::from_vec(vec![i as f32]);
437            let data2 = Array1::from_vec(vec![(i * 2) as f32]);
438
439            sync.push(TimestampedSample::new(time, data1, "stream1".to_string()))
440                .unwrap();
441            sync.push(TimestampedSample::new(time, data2, "stream2".to_string()))
442                .unwrap();
443        }
444
445        // Should be able to synchronize
446        let result = sync.try_sync();
447        assert!(result.is_ok());
448
449        let synced = result.unwrap();
450        assert_eq!(synced.len(), 2);
451        assert!(synced.contains_key("stream1"));
452        assert!(synced.contains_key("stream2"));
453    }
454
455    #[test]
456    fn test_time_synchronizer() {
457        let mut sync = TimeSynchronizer::new();
458
459        let now = SystemTime::now()
460            .duration_since(UNIX_EPOCH)
461            .unwrap()
462            .as_micros() as u64;
463
464        // Simulate time sync samples with realistic offset
465        // Client sends at t1=now, server responds with t2=now+600 (server is ahead),
466        // client receives at t3=now+1000
467        sync.add_sample(now, now + 1000, now + 600);
468        sync.add_sample(now + 10000, now + 11000, now + 10600);
469
470        // Should have non-zero offset (server is ahead)
471        assert!(sync.offset().abs() > 0);
472        assert!(sync.rtt() > 0);
473    }
474
475    #[test]
476    fn test_pll() {
477        let mut pll = PhaseLockLoop::default();
478        pll.set_target_phase(0);
479
480        // Test convergence
481        let correction1 = pll.update(100);
482        let correction2 = pll.update(50);
483        let correction3 = pll.update(10);
484
485        // Corrections should decrease as we approach target
486        assert!(correction1.abs() > correction2.abs());
487        assert!(correction2.abs() > correction3.abs());
488    }
489}