1use crate::error::{IoError, IoResult};
7use scirs2_core::ndarray::Array1;
8use std::collections::{HashMap, VecDeque};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11pub type Timestamp = u64;
13
14#[derive(Debug, Clone)]
16pub struct TimestampedSample {
17 pub timestamp: Timestamp,
18 pub data: Array1<f32>,
19 pub stream_id: String,
20}
21
22impl TimestampedSample {
23 pub fn new(timestamp: Timestamp, data: Array1<f32>, stream_id: String) -> Self {
25 Self {
26 timestamp,
27 data,
28 stream_id,
29 }
30 }
31
32 pub fn now(data: Array1<f32>, stream_id: String) -> Self {
34 let timestamp = SystemTime::now()
35 .duration_since(UNIX_EPOCH)
36 .expect("System time must be after UNIX_EPOCH")
37 .as_micros() as u64;
38 Self::new(timestamp, data, stream_id)
39 }
40
41 pub fn age(&self) -> Duration {
43 let now = SystemTime::now()
44 .duration_since(UNIX_EPOCH)
45 .unwrap()
46 .as_micros() as u64;
47 Duration::from_micros(now.saturating_sub(self.timestamp))
48 }
49}
50
51#[derive(Debug, Clone)]
53pub struct SyncConfig {
54 pub max_time_diff: u64,
56 pub buffer_size: usize,
58 pub timeout: Duration,
60 pub interpolation: InterpolationMethod,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
65pub enum InterpolationMethod {
66 Nearest,
68 Linear,
70 Hold,
72}
73
74impl Default for SyncConfig {
75 fn default() -> Self {
76 Self {
77 max_time_diff: 10_000, buffer_size: 100,
79 timeout: Duration::from_millis(100),
80 interpolation: InterpolationMethod::Linear,
81 }
82 }
83}
84
85pub struct StreamSynchronizer {
87 config: SyncConfig,
88 buffers: HashMap<String, VecDeque<TimestampedSample>>,
89 last_sync_time: Option<Timestamp>,
90}
91
92impl StreamSynchronizer {
93 pub fn new(config: SyncConfig) -> Self {
95 Self {
96 config,
97 buffers: HashMap::new(),
98 last_sync_time: None,
99 }
100 }
101
102 pub fn add_stream(&mut self, stream_id: String) {
104 self.buffers
105 .entry(stream_id)
106 .or_insert_with(|| VecDeque::with_capacity(self.config.buffer_size));
107 }
108
109 pub fn push(&mut self, sample: TimestampedSample) -> IoResult<()> {
111 let buffer = self
112 .buffers
113 .entry(sample.stream_id.clone())
114 .or_insert_with(|| VecDeque::with_capacity(self.config.buffer_size));
115
116 if buffer.len() >= self.config.buffer_size {
117 buffer.pop_front(); }
119
120 buffer.push_back(sample);
121 Ok(())
122 }
123
124 pub fn try_sync(&mut self) -> IoResult<HashMap<String, Array1<f32>>> {
127 if self.buffers.is_empty() {
128 return Err(IoError::InvalidConfig("No streams registered".to_string()));
129 }
130
131 let target_time = self.find_sync_time()?;
133
134 let mut synced = HashMap::new();
136 for (stream_id, buffer) in &self.buffers {
137 let sample = self.interpolate_at_time(buffer, target_time)?;
138 synced.insert(stream_id.clone(), sample);
139 }
140
141 self.last_sync_time = Some(target_time);
142 Ok(synced)
143 }
144
145 fn find_sync_time(&self) -> IoResult<Timestamp> {
147 let mut min_max_time: Option<Timestamp> = None;
149
150 for buffer in self.buffers.values() {
151 if buffer.is_empty() {
152 return Err(IoError::BufferEmpty);
153 }
154
155 let max_time = buffer
156 .back()
157 .expect("Buffer must be non-empty (checked above)")
158 .timestamp;
159 min_max_time = Some(match min_max_time {
160 None => max_time,
161 Some(current) => current.min(max_time),
162 });
163 }
164
165 min_max_time.ok_or_else(|| IoError::BufferEmpty)
166 }
167
168 fn interpolate_at_time(
170 &self,
171 buffer: &VecDeque<TimestampedSample>,
172 target_time: Timestamp,
173 ) -> IoResult<Array1<f32>> {
174 if buffer.is_empty() {
175 return Err(IoError::BufferEmpty);
176 }
177
178 let mut before: Option<&TimestampedSample> = None;
180 let mut after: Option<&TimestampedSample> = None;
181
182 for sample in buffer {
183 if sample.timestamp <= target_time {
184 before = Some(sample);
185 }
186 if sample.timestamp >= target_time && after.is_none() {
187 after = Some(sample);
188 break;
189 }
190 }
191
192 match self.config.interpolation {
193 InterpolationMethod::Nearest => {
194 let nearest = match (before, after) {
196 (Some(b), Some(a)) => {
197 let diff_before = target_time - b.timestamp;
198 let diff_after = a.timestamp - target_time;
199 if diff_before < diff_after {
200 b
201 } else {
202 a
203 }
204 }
205 (Some(b), None) => b,
206 (None, Some(a)) => a,
207 (None, None) => return Err(IoError::BufferEmpty),
208 };
209 Ok(nearest.data.clone())
210 }
211 InterpolationMethod::Linear => {
212 match (before, after) {
213 (Some(b), Some(a)) if b.timestamp != a.timestamp => {
214 let t =
216 (target_time - b.timestamp) as f32 / (a.timestamp - b.timestamp) as f32;
217 Ok(&b.data * (1.0 - t) + &a.data * t)
218 }
219 (Some(b), _) => Ok(b.data.clone()),
220 (None, Some(a)) => Ok(a.data.clone()),
221 _ => Err(IoError::BufferEmpty),
222 }
223 }
224 InterpolationMethod::Hold => {
225 match before {
227 Some(b) => Ok(b.data.clone()),
228 None => match after {
229 Some(a) => Ok(a.data.clone()),
230 None => Err(IoError::BufferEmpty),
231 },
232 }
233 }
234 }
235 }
236
237 pub fn buffer_len(&self, stream_id: &str) -> usize {
239 self.buffers.get(stream_id).map(|b| b.len()).unwrap_or(0)
240 }
241
242 pub fn clear(&mut self) {
244 for buffer in self.buffers.values_mut() {
245 buffer.clear();
246 }
247 self.last_sync_time = None;
248 }
249
250 pub fn stream_ids(&self) -> Vec<String> {
252 self.buffers.keys().cloned().collect()
253 }
254}
255
256impl Default for StreamSynchronizer {
257 fn default() -> Self {
258 Self::new(SyncConfig::default())
259 }
260}
261
262pub struct TimeSynchronizer {
264 offset: i64,
266 rtt: u64,
268 samples: VecDeque<(i64, u64)>, max_samples: usize,
272}
273
274impl TimeSynchronizer {
275 pub fn new() -> Self {
277 Self {
278 offset: 0,
279 rtt: 0,
280 samples: VecDeque::with_capacity(10),
281 max_samples: 10,
282 }
283 }
284
285 pub fn add_sample(
287 &mut self,
288 send_time: Timestamp,
289 recv_time: Timestamp,
290 remote_time: Timestamp,
291 ) {
292 let t1 = send_time as i64;
293 let t2 = remote_time as i64;
294 let t3 = recv_time as i64;
295
296 let offset = ((t2 - t1) + (t2 - t3)) / 2;
297 let rtt = (t3 - t1) as u64;
298
299 self.samples.push_back((offset, rtt));
300 if self.samples.len() > self.max_samples {
301 self.samples.pop_front();
302 }
303
304 self.update_estimate();
305 }
306
307 fn update_estimate(&mut self) {
309 if self.samples.is_empty() {
310 return;
311 }
312
313 let mut offsets: Vec<i64> = self.samples.iter().map(|(o, _)| *o).collect();
315 offsets.sort_unstable();
316 self.offset = offsets[offsets.len() / 2];
317
318 let mut rtts: Vec<u64> = self.samples.iter().map(|(_, r)| *r).collect();
319 rtts.sort_unstable();
320 self.rtt = rtts[rtts.len() / 2];
321 }
322
323 pub fn synchronized_time(&self) -> Timestamp {
325 let now = SystemTime::now()
326 .duration_since(UNIX_EPOCH)
327 .expect("System time must be after UNIX_EPOCH")
328 .as_micros() as u64;
329
330 (now as i64 + self.offset) as u64
331 }
332
333 pub fn offset(&self) -> i64 {
335 self.offset
336 }
337
338 pub fn rtt(&self) -> u64 {
340 self.rtt
341 }
342}
343
344impl Default for TimeSynchronizer {
345 fn default() -> Self {
346 Self::new()
347 }
348}
349
350pub struct PhaseLockLoop {
352 target_phase: i64,
354 integral: f64,
356 kp: f64,
358 ki: f64,
360 kd: f64,
362 last_error: f64,
364}
365
366impl PhaseLockLoop {
367 pub fn new(kp: f64, ki: f64, kd: f64) -> Self {
369 Self {
370 target_phase: 0,
371 integral: 0.0,
372 kp,
373 ki,
374 kd,
375 last_error: 0.0,
376 }
377 }
378
379 pub fn update(&mut self, measured_phase: i64) -> f64 {
381 let error = (self.target_phase - measured_phase) as f64;
382
383 self.integral += error;
385 let derivative = error - self.last_error;
386
387 let correction = self.kp * error + self.ki * self.integral + self.kd * derivative;
388
389 self.last_error = error;
390 correction
391 }
392
393 pub fn set_target_phase(&mut self, phase: i64) {
395 self.target_phase = phase;
396 }
397
398 pub fn reset(&mut self) {
400 self.integral = 0.0;
401 self.last_error = 0.0;
402 }
403}
404
405impl Default for PhaseLockLoop {
406 fn default() -> Self {
407 Self::new(1.0, 0.1, 0.01)
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414
415 #[test]
416 fn test_timestamped_sample() {
417 let data = Array1::from_vec(vec![1.0, 2.0, 3.0]);
418 let sample = TimestampedSample::now(data.clone(), "test".to_string());
419
420 assert_eq!(sample.stream_id, "test");
421 assert_eq!(sample.data, data);
422 assert!(sample.timestamp > 0);
423 }
424
425 #[test]
426 fn test_stream_synchronizer() {
427 let mut sync = StreamSynchronizer::default();
428 sync.add_stream("stream1".to_string());
429 sync.add_stream("stream2".to_string());
430
431 let base_time = 1000000u64;
432
433 for i in 0..5 {
435 let time = base_time + i * 1000;
436 let data1 = Array1::from_vec(vec![i as f32]);
437 let data2 = Array1::from_vec(vec![(i * 2) as f32]);
438
439 sync.push(TimestampedSample::new(time, data1, "stream1".to_string()))
440 .unwrap();
441 sync.push(TimestampedSample::new(time, data2, "stream2".to_string()))
442 .unwrap();
443 }
444
445 let result = sync.try_sync();
447 assert!(result.is_ok());
448
449 let synced = result.unwrap();
450 assert_eq!(synced.len(), 2);
451 assert!(synced.contains_key("stream1"));
452 assert!(synced.contains_key("stream2"));
453 }
454
455 #[test]
456 fn test_time_synchronizer() {
457 let mut sync = TimeSynchronizer::new();
458
459 let now = SystemTime::now()
460 .duration_since(UNIX_EPOCH)
461 .unwrap()
462 .as_micros() as u64;
463
464 sync.add_sample(now, now + 1000, now + 600);
468 sync.add_sample(now + 10000, now + 11000, now + 10600);
469
470 assert!(sync.offset().abs() > 0);
472 assert!(sync.rtt() > 0);
473 }
474
475 #[test]
476 fn test_pll() {
477 let mut pll = PhaseLockLoop::default();
478 pll.set_target_phase(0);
479
480 let correction1 = pll.update(100);
482 let correction2 = pll.update(50);
483 let correction3 = pll.update(10);
484
485 assert!(correction1.abs() > correction2.abs());
487 assert!(correction2.abs() > correction3.abs());
488 }
489}