1use std::collections::VecDeque;
2use std::sync::OnceLock;
3use std::time::{Duration, Instant};
4
5static PROCESS_START: OnceLock<Instant> = OnceLock::new();
8static PROCESS_START_TIME: OnceLock<Instant> = OnceLock::new();
9
10#[derive(Debug)]
18pub struct ClockSynchronizer {
19 window_size: usize,
21 history: VecDeque<(u64, Instant)>,
23 #[allow(dead_code)]
25 baseline_established: bool,
26 estimated_slope: f64,
28 estimated_offset: f64,
30}
31
32impl ClockSynchronizer {
33 pub fn new(window_size: usize) -> Self {
34 Self {
35 window_size: window_size.max(2), history: VecDeque::with_capacity(window_size),
37 baseline_established: false,
38 estimated_slope: 1.0,
39 estimated_offset: 0.0,
40 }
41 }
42
43 pub fn correct(&mut self, hw_ns: u64, arrival_time: Instant) -> Duration {
48 if self.history.len() >= self.window_size {
50 self.history.pop_front();
51 }
52 self.history.push_back((hw_ns, arrival_time));
53
54 if self.history.len() < 5 {
56 if let Some(start_time) = self.history.front() {
58 let elapsed_hw = hw_ns.saturating_sub(start_time.0);
60 return start_time.1.elapsed() + Duration::from_nanos(elapsed_hw);
61 }
63 return self.instant_to_duration(arrival_time);
66 }
67
68 self.recalculate_regression();
73
74 let (base_hw, base_sys) = self.history.front().unwrap();
76 let dx = (hw_ns as f64) - (*base_hw as f64);
77 let predicted_dy_ns = self.estimated_slope * dx + self.estimated_offset;
78
79 let base_sys_dur = self.instant_to_duration(*base_sys);
80 base_sys_dur + Duration::from_nanos(predicted_dy_ns as u64)
81 }
82
83 fn recalculate_regression(&mut self) {
85 let n = self.history.len() as f64;
86 let (base_hw, base_sys) = self.history.front().unwrap();
87 let base_sys_scalar = self.instant_to_scalar(*base_sys);
88
89 let mut sum_x = 0.0;
90 let mut sum_y = 0.0;
91 let mut sum_xy = 0.0;
92 let mut sum_xx = 0.0;
93
94 for (hw, sys) in &self.history {
95 let x = (*hw as f64) - (*base_hw as f64);
96 let y = self.instant_to_scalar(*sys) - base_sys_scalar;
97
98 sum_x += x;
99 sum_y += y;
100 sum_xy += x * y;
101 sum_xx += x * x;
102 }
103
104 let denominator = n * sum_xx - sum_x * sum_x;
105 if denominator.abs() < 1e-6 {
106 self.estimated_slope = 1.0;
108 self.estimated_offset = 0.0;
109 } else {
110 self.estimated_slope = (n * sum_xy - sum_x * sum_y) / denominator;
111 self.estimated_offset = (sum_y * sum_xx - sum_x * sum_xy) / denominator;
112 }
113 }
114
115 fn instant_to_scalar(&self, t: Instant) -> f64 {
117 let start_time = PROCESS_START.get_or_init(Instant::now);
129
130 if t >= *start_time {
131 t.duration_since(*start_time).as_secs_f64()
133 } else {
134 -(start_time.duration_since(t).as_secs_f64())
137 }
138 }
139
140 fn instant_to_duration(&self, t: Instant) -> Duration {
141 let anchor = PROCESS_START_TIME.get_or_init(Instant::now);
150
151 t.saturating_duration_since(*anchor)
153 }
154}