1use std::collections::VecDeque;
2use std::sync::LazyLock;
3use std::time::{Duration, Instant};
4
5pub use p2::*;
6
7mod p2;
8
9pub fn now_sec() -> u32 {
10 std::time::SystemTime::now()
11 .duration_since(std::time::UNIX_EPOCH)
12 .unwrap()
13 .as_secs() as u32
14}
15
16pub fn now_millis() -> u64 {
17 std::time::SystemTime::now()
18 .duration_since(std::time::UNIX_EPOCH)
19 .unwrap()
20 .as_millis() as u64
21}
22
23pub fn shifted_interval(period: Duration, max_shift: Duration) -> tokio::time::Interval {
24 let shift = rand::random_range(Duration::ZERO..max_shift);
25 tokio::time::interval_at(tokio::time::Instant::now() + shift, period + shift)
26}
27
28pub fn shifted_interval_immediate(period: Duration, max_shift: Duration) -> tokio::time::Interval {
29 let shift = rand::random_range(Duration::ZERO..max_shift);
30 tokio::time::interval(period + shift)
31}
32
33pub trait Clock {
34 fn now(&self) -> Instant;
35}
36
37pub struct RealClock;
38impl Clock for RealClock {
39 fn now(&self) -> Instant {
40 Instant::now()
41 }
42}
43
44pub struct RollingP2Estimator<C: Clock = RealClock> {
45 estimators: VecDeque<P2>,
46 timings: VecDeque<Instant>,
47 quantile: f64,
48 window_length: Duration,
49 max_windows: usize,
50 clock: C,
51}
52
53impl<C: Clock> RollingP2Estimator<C> {
54 pub fn new_with_config(
55 quantile: f64,
56 window_length: Duration,
57 max_windows: usize,
58 clock: C,
59 ) -> Result<Self, RollingP2EstimatorError> {
60 if !(0.0..=1.0).contains(&quantile) {
61 return Err(p2::Error::InvalidQuantile(quantile).into());
62 }
63
64 if window_length.is_zero() {
65 return Err(RollingP2EstimatorError::ZeroWindowLength);
66 }
67
68 if max_windows == 0 {
69 return Err(RollingP2EstimatorError::ZeroMaxWindows);
70 }
71
72 Ok(RollingP2Estimator {
73 estimators: VecDeque::with_capacity(max_windows),
74 timings: VecDeque::with_capacity(max_windows),
75 quantile,
76 window_length,
77 max_windows,
78 clock,
79 })
80 }
81
82 pub fn append(&mut self, value: i64) {
83 self.get_estimator().append(value);
84 }
85
86 fn get_estimator(&mut self) -> &mut P2 {
87 let now = self.clock.now();
88
89 let needs_new_window = self.estimators.is_empty()
90 || now.duration_since(*self.timings.back().unwrap()) > self.window_length;
91
92 if needs_new_window {
93 if self.estimators.len() >= self.max_windows {
94 self.estimators.pop_front();
95 self.timings.pop_front();
96 }
97
98 self.estimators.push_back(P2::new(self.quantile).unwrap());
99 self.timings.push_back(now);
100 }
101
102 self.estimators.back_mut().unwrap()
103 }
104
105 pub fn exponentially_weighted_average(&self) -> Option<i64> {
106 if self.estimators.is_empty() {
107 return None;
108 }
109
110 let now = self.clock.now();
111
112 let mut total_weight = 0.0;
113 let mut weighted_sum = 0.0;
114
115 for (estimator, &timing) in self.estimators.iter().zip(self.timings.iter()) {
117 let age = now.duration_since(timing).as_secs_f64();
118
119 let weight = (-age / self.window_length.as_secs_f64()).exp();
124
125 let estimate = estimator.value() as f64;
126 weighted_sum += estimate * weight;
127 total_weight += weight;
128 }
129
130 if total_weight > 0.0 {
131 Some((weighted_sum / total_weight) as i64)
132 } else {
133 None
134 }
135 }
136
137 pub fn max_over_window(&self) -> Option<i64> {
138 self.estimators
139 .iter()
140 .map(|estimator| estimator.value())
141 .max()
142 }
143}
144
145impl RollingP2Estimator<RealClock> {
146 pub fn new(quantile: f64) -> Result<Self, RollingP2EstimatorError> {
147 Self::new_with_config(quantile, Duration::from_secs(60), 5, RealClock)
148 }
149}
150
151pub struct MonotonicClock {
154 init_instant: Instant,
155 init_system_time: std::time::SystemTime,
156}
157static MONOTONIC_CLOCK: LazyLock<MonotonicClock> = LazyLock::new(|| MonotonicClock {
158 init_instant: Instant::now(),
159 init_system_time: std::time::SystemTime::now(),
160});
161impl MonotonicClock {
162 pub fn now_millis() -> u64 {
163 let Self {
165 init_instant,
166 init_system_time,
167 } = *MONOTONIC_CLOCK;
168
169 let since_init = {
170 let now = Instant::now();
171 now.checked_duration_since(init_instant)
172 .unwrap_or_else(|| panic!("current {now:?} < initial {init_instant:?}"))
173 };
174
175 let system_time = init_system_time.checked_add(since_init).unwrap_or_else(|| {
176 panic!(
177 "overflow at init system time {} + duration {} since {init_instant:?}",
178 humantime::format_rfc3339_nanos(init_system_time),
179 humantime::format_duration(since_init),
180 )
181 });
182
183 let since_epoch = system_time
184 .duration_since(std::time::SystemTime::UNIX_EPOCH)
185 .unwrap_or_else(|err| {
186 panic!(
187 "calculated current {system_time:?} < UNIX_EPOCH {:?} for {}",
188 std::time::SystemTime::UNIX_EPOCH,
189 humantime::format_duration(err.duration())
190 )
191 });
192
193 u64::try_from(since_epoch.as_millis()).unwrap_or_else(|_| {
194 panic!(
195 "current time millis exceed u64: {}",
196 since_epoch.as_millis()
197 )
198 })
199 }
200}
201
202#[derive(thiserror::Error, Debug)]
203pub enum RollingP2EstimatorError {
204 #[error(transparent)]
205 P2Error(#[from] p2::Error),
206 #[error("Window length must be greater than zero")]
207 ZeroWindowLength,
208 #[error("Max windows must be greater than zero")]
209 ZeroMaxWindows,
210}
211
212#[cfg(test)]
213mod tests {
214 use std::cell::Cell;
215 use std::rc::Rc;
216
217 use super::*;
218
219 #[derive(Clone)]
220 struct MockClock {
221 current_time: Rc<Cell<Instant>>,
222 }
223
224 impl MockClock {
225 fn new() -> Self {
226 Self {
227 current_time: Rc::new(Cell::new(Instant::now())),
228 }
229 }
230
231 fn advance(&self, duration: Duration) {
232 let new_time = self.current_time.get() + duration;
233 self.current_time.set(new_time);
234 }
235 }
236
237 impl Clock for MockClock {
238 fn now(&self) -> Instant {
239 self.current_time.get()
240 }
241 }
242
243 #[test]
244 fn test_invalid_quantile() {
245 let clock = MockClock::new();
246 assert!(
247 RollingP2Estimator::new_with_config(1.5, Duration::from_secs(60), 5, clock).is_err()
248 );
249 }
250
251 #[test]
252 fn test_single_window() {
253 let clock = MockClock::new();
254 let mut estimator =
255 RollingP2Estimator::new_with_config(0.95, Duration::from_secs(60), 5, clock.clone())
256 .unwrap();
257
258 estimator.append(1);
259 estimator.append(2);
260 estimator.append(3);
261
262 assert_eq!(estimator.exponentially_weighted_average(), Some(3));
263 }
264
265 #[test]
266 fn test_multiple_windows_with_cleanup() {
267 let clock = MockClock::new();
268 let mut estimator =
269 RollingP2Estimator::new_with_config(0.95, Duration::from_secs(60), 5, clock.clone())
270 .unwrap();
271
272 estimator.append(1);
274 assert_eq!(estimator.exponentially_weighted_average().unwrap(), 1);
275
276 clock.advance(Duration::from_secs(30));
278 estimator.append(2);
279
280 clock.advance(Duration::from_secs(31)); estimator.append(10);
283
284 let value = estimator.exponentially_weighted_average().unwrap();
285 println!("Value: {value}");
286 assert_eq!(value, 7); let max_value = estimator.max_over_window().unwrap();
289 assert_eq!(max_value, 10);
290 }
291
292 #[test]
293 fn test_max_windows_with_cleanup() {
294 let clock = MockClock::new();
295 let mut estimator = RollingP2Estimator::new_with_config(
296 0.95,
297 Duration::from_secs(120), 2, clock.clone(),
300 )
301 .unwrap();
302
303 println!("\n=== First window ===");
304 estimator.append(1);
305 println!(
306 "After first append: {} estimators",
307 estimator.estimators.len()
308 );
309 assert_eq!(estimator.estimators.len(), 1);
310
311 println!("\n=== Same window (61s < 120s) ===");
312 clock.advance(Duration::from_secs(61));
313 estimator.append(5);
314 println!(
315 "After second append: {} estimators",
316 estimator.estimators.len()
317 );
318 assert_eq!(estimator.estimators.len(), 1); println!("\n=== Second window (122s > 120s) ===");
321 clock.advance(Duration::from_secs(61)); estimator.append(10);
323 println!(
324 "After third append: {} estimators",
325 estimator.estimators.len()
326 );
327 assert_eq!(estimator.estimators.len(), 2);
328
329 let value = estimator.exponentially_weighted_average().unwrap();
330 assert!(value > 5, "value = {value}");
331 let max_value = estimator.max_over_window().unwrap();
332 assert_eq!(max_value, 10);
333 }
334
335 #[test]
336 fn test_window_cleanup() {
337 let clock = MockClock::new();
338 let window_length = Duration::from_secs(60);
339 let mut estimator =
340 RollingP2Estimator::new_with_config(0.95, window_length, 2, clock.clone()).unwrap();
341
342 estimator.append(1);
344 assert_eq!(estimator.estimators.len(), 1);
345 assert_eq!(estimator.timings.len(), 1);
346
347 clock.advance(window_length + Duration::from_secs(1));
349 estimator.append(2);
350 assert_eq!(estimator.estimators.len(), 2);
351 assert_eq!(estimator.timings.len(), 2);
352
353 clock.advance(window_length + Duration::from_secs(1));
355 estimator.append(3);
356
357 assert_eq!(
359 estimator.estimators.len(),
360 2,
361 "Should maintain max 2 windows"
362 );
363 assert_eq!(estimator.timings.len(), 2, "Should maintain max 2 timings");
364
365 let value = estimator.exponentially_weighted_average().unwrap();
367 assert!(value >= 2, "Expected value >= 2, got {value}");
368 assert!(value <= 3, "Expected value <= 3, got {value}");
369
370 estimator.append(4);
372 estimator.append(5);
373
374 let final_value = estimator.exponentially_weighted_average().unwrap();
375 assert!(
376 final_value > value,
377 "Value should increase after adding larger numbers"
378 );
379 }
380}