tycho_util/time/
mod.rs

1use std::collections::VecDeque;
2use std::sync::LazyLock;
3use std::time::{Duration, Instant};
4
5pub use p2::*;
6
7mod p2;
8
9pub fn now_sec() -> u32 {
10    std::time::SystemTime::now()
11        .duration_since(std::time::UNIX_EPOCH)
12        .unwrap()
13        .as_secs() as u32
14}
15
16pub fn now_millis() -> u64 {
17    std::time::SystemTime::now()
18        .duration_since(std::time::UNIX_EPOCH)
19        .unwrap()
20        .as_millis() as u64
21}
22
23pub fn shifted_interval(period: Duration, max_shift: Duration) -> tokio::time::Interval {
24    let shift = rand::random_range(Duration::ZERO..max_shift);
25    tokio::time::interval_at(tokio::time::Instant::now() + shift, period + shift)
26}
27
28pub fn shifted_interval_immediate(period: Duration, max_shift: Duration) -> tokio::time::Interval {
29    let shift = rand::random_range(Duration::ZERO..max_shift);
30    tokio::time::interval(period + shift)
31}
32
33pub trait Clock {
34    fn now(&self) -> Instant;
35}
36
37pub struct RealClock;
38impl Clock for RealClock {
39    fn now(&self) -> Instant {
40        Instant::now()
41    }
42}
43
44pub struct RollingP2Estimator<C: Clock = RealClock> {
45    estimators: VecDeque<P2>,
46    timings: VecDeque<Instant>,
47    quantile: f64,
48    window_length: Duration,
49    max_windows: usize,
50    clock: C,
51}
52
53impl<C: Clock> RollingP2Estimator<C> {
54    pub fn new_with_config(
55        quantile: f64,
56        window_length: Duration,
57        max_windows: usize,
58        clock: C,
59    ) -> Result<Self, RollingP2EstimatorError> {
60        if !(0.0..=1.0).contains(&quantile) {
61            return Err(p2::Error::InvalidQuantile(quantile).into());
62        }
63
64        if window_length.is_zero() {
65            return Err(RollingP2EstimatorError::ZeroWindowLength);
66        }
67
68        if max_windows == 0 {
69            return Err(RollingP2EstimatorError::ZeroMaxWindows);
70        }
71
72        Ok(RollingP2Estimator {
73            estimators: VecDeque::with_capacity(max_windows),
74            timings: VecDeque::with_capacity(max_windows),
75            quantile,
76            window_length,
77            max_windows,
78            clock,
79        })
80    }
81
82    pub fn append(&mut self, value: i64) {
83        self.get_estimator().append(value);
84    }
85
86    fn get_estimator(&mut self) -> &mut P2 {
87        let now = self.clock.now();
88
89        let needs_new_window = self.estimators.is_empty()
90            || now.duration_since(*self.timings.back().unwrap()) > self.window_length;
91
92        if needs_new_window {
93            if self.estimators.len() >= self.max_windows {
94                self.estimators.pop_front();
95                self.timings.pop_front();
96            }
97
98            self.estimators.push_back(P2::new(self.quantile).unwrap());
99            self.timings.push_back(now);
100        }
101
102        self.estimators.back_mut().unwrap()
103    }
104
105    pub fn exponentially_weighted_average(&self) -> Option<i64> {
106        if self.estimators.is_empty() {
107            return None;
108        }
109
110        let now = self.clock.now();
111
112        let mut total_weight = 0.0;
113        let mut weighted_sum = 0.0;
114
115        // Calculate exponentially weighted average
116        for (estimator, &timing) in self.estimators.iter().zip(self.timings.iter()) {
117            let age = now.duration_since(timing).as_secs_f64();
118
119            // decay factor: e^(-age/window)
120            // new values (age=0) have weight 1.0
121            // values at window_length have weight 1/e
122            // older values decay towards 0
123            let weight = (-age / self.window_length.as_secs_f64()).exp();
124
125            let estimate = estimator.value() as f64;
126            weighted_sum += estimate * weight;
127            total_weight += weight;
128        }
129
130        if total_weight > 0.0 {
131            Some((weighted_sum / total_weight) as i64)
132        } else {
133            None
134        }
135    }
136
137    pub fn max_over_window(&self) -> Option<i64> {
138        self.estimators
139            .iter()
140            .map(|estimator| estimator.value())
141            .max()
142    }
143}
144
145impl RollingP2Estimator<RealClock> {
146    pub fn new(quantile: f64) -> Result<Self, RollingP2EstimatorError> {
147        Self::new_with_config(quantile, Duration::from_secs(60), 5, RealClock)
148    }
149}
150
151/// Clock that will not decrease in runtime.
152/// **WARNING** Changing server time between node runs should be avoided or handled with care.
153pub struct MonotonicClock {
154    init_instant: Instant,
155    init_system_time: std::time::SystemTime,
156}
157static MONOTONIC_CLOCK: LazyLock<MonotonicClock> = LazyLock::new(|| MonotonicClock {
158    init_instant: Instant::now(),
159    init_system_time: std::time::SystemTime::now(),
160});
161impl MonotonicClock {
162    pub fn now_millis() -> u64 {
163        // initialize lazy lock
164        let Self {
165            init_instant,
166            init_system_time,
167        } = *MONOTONIC_CLOCK;
168
169        let since_init = {
170            let now = Instant::now();
171            now.checked_duration_since(init_instant)
172                .unwrap_or_else(|| panic!("current {now:?} < initial {init_instant:?}"))
173        };
174
175        let system_time = init_system_time.checked_add(since_init).unwrap_or_else(|| {
176            panic!(
177                "overflow at init system time {} + duration {} since {init_instant:?}",
178                humantime::format_rfc3339_nanos(init_system_time),
179                humantime::format_duration(since_init),
180            )
181        });
182
183        let since_epoch = system_time
184            .duration_since(std::time::SystemTime::UNIX_EPOCH)
185            .unwrap_or_else(|err| {
186                panic!(
187                    "calculated current {system_time:?} < UNIX_EPOCH {:?} for {}",
188                    std::time::SystemTime::UNIX_EPOCH,
189                    humantime::format_duration(err.duration())
190                )
191            });
192
193        u64::try_from(since_epoch.as_millis()).unwrap_or_else(|_| {
194            panic!(
195                "current time millis exceed u64: {}",
196                since_epoch.as_millis()
197            )
198        })
199    }
200}
201
202#[derive(thiserror::Error, Debug)]
203pub enum RollingP2EstimatorError {
204    #[error(transparent)]
205    P2Error(#[from] p2::Error),
206    #[error("Window length must be greater than zero")]
207    ZeroWindowLength,
208    #[error("Max windows must be greater than zero")]
209    ZeroMaxWindows,
210}
211
212#[cfg(test)]
213mod tests {
214    use std::cell::Cell;
215    use std::rc::Rc;
216
217    use super::*;
218
219    #[derive(Clone)]
220    struct MockClock {
221        current_time: Rc<Cell<Instant>>,
222    }
223
224    impl MockClock {
225        fn new() -> Self {
226            Self {
227                current_time: Rc::new(Cell::new(Instant::now())),
228            }
229        }
230
231        fn advance(&self, duration: Duration) {
232            let new_time = self.current_time.get() + duration;
233            self.current_time.set(new_time);
234        }
235    }
236
237    impl Clock for MockClock {
238        fn now(&self) -> Instant {
239            self.current_time.get()
240        }
241    }
242
243    #[test]
244    fn test_invalid_quantile() {
245        let clock = MockClock::new();
246        assert!(
247            RollingP2Estimator::new_with_config(1.5, Duration::from_secs(60), 5, clock).is_err()
248        );
249    }
250
251    #[test]
252    fn test_single_window() {
253        let clock = MockClock::new();
254        let mut estimator =
255            RollingP2Estimator::new_with_config(0.95, Duration::from_secs(60), 5, clock.clone())
256                .unwrap();
257
258        estimator.append(1);
259        estimator.append(2);
260        estimator.append(3);
261
262        assert_eq!(estimator.exponentially_weighted_average(), Some(3));
263    }
264
265    #[test]
266    fn test_multiple_windows_with_cleanup() {
267        let clock = MockClock::new();
268        let mut estimator =
269            RollingP2Estimator::new_with_config(0.95, Duration::from_secs(60), 5, clock.clone())
270                .unwrap();
271
272        // First window
273        estimator.append(1);
274        assert_eq!(estimator.exponentially_weighted_average().unwrap(), 1);
275
276        // Second window
277        clock.advance(Duration::from_secs(30));
278        estimator.append(2);
279
280        // Third window
281        clock.advance(Duration::from_secs(31)); // Total 61s
282        estimator.append(10);
283
284        let value = estimator.exponentially_weighted_average().unwrap();
285        println!("Value: {value}");
286        assert_eq!(value, 7); // sma of 1, 2, 10 with time decay
287
288        let max_value = estimator.max_over_window().unwrap();
289        assert_eq!(max_value, 10);
290    }
291
292    #[test]
293    fn test_max_windows_with_cleanup() {
294        let clock = MockClock::new();
295        let mut estimator = RollingP2Estimator::new_with_config(
296            0.95,
297            Duration::from_secs(120), // 2 minute window
298            2,                        // Only allow 2 windows
299            clock.clone(),
300        )
301        .unwrap();
302
303        println!("\n=== First window ===");
304        estimator.append(1);
305        println!(
306            "After first append: {} estimators",
307            estimator.estimators.len()
308        );
309        assert_eq!(estimator.estimators.len(), 1);
310
311        println!("\n=== Same window (61s < 120s) ===");
312        clock.advance(Duration::from_secs(61));
313        estimator.append(5);
314        println!(
315            "After second append: {} estimators",
316            estimator.estimators.len()
317        );
318        assert_eq!(estimator.estimators.len(), 1); // Changed expectation
319
320        println!("\n=== Second window (122s > 120s) ===");
321        clock.advance(Duration::from_secs(61)); // Total 122s
322        estimator.append(10);
323        println!(
324            "After third append: {} estimators",
325            estimator.estimators.len()
326        );
327        assert_eq!(estimator.estimators.len(), 2);
328
329        let value = estimator.exponentially_weighted_average().unwrap();
330        assert!(value > 5, "value = {value}");
331        let max_value = estimator.max_over_window().unwrap();
332        assert_eq!(max_value, 10);
333    }
334
335    #[test]
336    fn test_window_cleanup() {
337        let clock = MockClock::new();
338        let window_length = Duration::from_secs(60);
339        let mut estimator =
340            RollingP2Estimator::new_with_config(0.95, window_length, 2, clock.clone()).unwrap();
341
342        // Step 1: Add value to the first window
343        estimator.append(1);
344        assert_eq!(estimator.estimators.len(), 1);
345        assert_eq!(estimator.timings.len(), 1);
346
347        // Step 2: Create the second window
348        clock.advance(window_length + Duration::from_secs(1));
349        estimator.append(2);
350        assert_eq!(estimator.estimators.len(), 2);
351        assert_eq!(estimator.timings.len(), 2);
352
353        // Step 3: Create third window - should trigger cleanup
354        clock.advance(window_length + Duration::from_secs(1));
355        estimator.append(3);
356
357        // Verify cleanup mechanics
358        assert_eq!(
359            estimator.estimators.len(),
360            2,
361            "Should maintain max 2 windows"
362        );
363        assert_eq!(estimator.timings.len(), 2, "Should maintain max 2 timings");
364
365        // The value will be between 2 and 3 since a window with value 1 was removed
366        let value = estimator.exponentially_weighted_average().unwrap();
367        assert!(value >= 2, "Expected value >= 2, got {value}");
368        assert!(value <= 3, "Expected value <= 3, got {value}");
369
370        // Add more values to the latest window to verify it's active
371        estimator.append(4);
372        estimator.append(5);
373
374        let final_value = estimator.exponentially_weighted_average().unwrap();
375        assert!(
376            final_value > value,
377            "Value should increase after adding larger numbers"
378        );
379    }
380}