rtlola_interpreter/storage/
window.rs

1use std::cmp::Ordering;
2use std::fmt::Debug;
3use std::ops::Add;
4use std::time::Duration;
5
6use dyn_clone::DynClone;
7use num::{FromPrimitive, ToPrimitive};
8use ordered_float::NotNan;
9use rtlola_frontend::mir::{
10    MemorizationBound, SlidingWindow as MirSlidingWindow, Type, Window, WindowOperation as WinOp,
11};
12use rust_decimal::Decimal;
13
14use super::discrete_window::DiscreteWindowInstance;
15use super::window_aggregations::*;
16use super::Value;
17use crate::Time;
18
19pub(crate) trait WindowInstanceTrait: Debug + DynClone {
20    /// Computes the current value of a sliding window instance with the given timestamp:
21    /// # Arguments:
22    /// * 'ts' - the current timestamp of the monitor
23    ///   Note: You should always call `SlidingWindow::update` before calling `SlidingWindow::get_value()`!
24    fn get_value(&self, ts: Time) -> Value;
25    /// Updates the value of the current bucket of a sliding window instance with the current value of the accessed stream:
26    /// # Arguments:
27    /// * 'v' - the current value of the accessed stream
28    /// * 'ts' - the current timestamp of the monitor
29    fn accept_value(&mut self, v: Value, ts: Time);
30    /// Updates the buckets of a sliding window instance with the given timestamp:
31    /// # Arguments:
32    /// * 'ts' - the current timestamp of the monitor
33    fn update_buckets(&mut self, ts: Time);
34    /// Clears the current sliding window state
35    fn deactivate(&mut self);
36
37    /// Returns true if the window instance is currently active. I.e. the target stream instance currently exists.
38    fn is_active(&self) -> bool;
39
40    /// Restarts the sliding window
41    fn activate(&mut self, ts: Time);
42}
43dyn_clone::clone_trait_object!(WindowInstanceTrait);
44
45/// Representation of sliding window aggregations:
46/// The enum differentiates the aggregation functions and between different value types, dependent on the aggregation function.
47/// # Example:
48/// * The aggregation function 'count' is independent of the value type.
49/// * The aggregation function 'min' depends on the value type, e.g., the minimum value of unsigned values is 0, whereas the minimum value for signed values is negative.
50#[derive(Debug, Clone)]
51pub(crate) struct SlidingWindow {
52    inner: Box<dyn WindowInstanceTrait>,
53}
54
55macro_rules! create_window_instance {
56    ($type: ty, $w: ident, $ts: ident, $active: ident) => {
57        Self {
58            inner: Box::new(RealTimeWindowInstance::<$type>::new($w, $ts, $active)),
59        }
60    };
61}
62macro_rules! create_percentile_instance {
63    ($type: ty, $w: ident, $ts: ident, $active: ident, $percentile: ident) => {
64        Self {
65            inner: Box::new(PercentileWindow {
66                inner: RealTimeWindowInstance::<$type>::new($w, $ts, $active),
67                percentile: $percentile,
68            }),
69        }
70    };
71}
72macro_rules! create_discrete_window_instance {
73    ($type: ty, $dur: ident, $wait: ident, $ts: ident, $active: ident) => {
74        Self {
75            inner: Box::new(DiscreteWindowInstance::<$type>::new(
76                $dur, $wait, $ts, $active,
77            )),
78        }
79    };
80}
81macro_rules! create_discrete_percentile_instance {
82    ($type: ty, $dur: ident, $wait: ident, $ts: ident, $active: ident, $percentile: ident) => {
83        Self {
84            inner: Box::new(PercentileWindow {
85                inner: DiscreteWindowInstance::<$type>::new($dur, $wait, $ts, $active),
86                percentile: $percentile,
87            }),
88        }
89    };
90}
91impl SlidingWindow {
92    /// Returns a sliding window instance, from:
93    /// # Arguments:
94    /// * 'dur'- the duration of the window
95    /// * 'wait' - the boolean flag to decide if the window returns its value after the complete duration has passed
96    /// * 'op' - the type of the aggregation function
97    /// * 'ts' - the starting time of the window
98    /// * 'ty' - the value type of the aggregated stream
99    pub(crate) fn from_sliding(ts: Time, window: &MirSlidingWindow, active: bool) -> SlidingWindow {
100        match (window.op, &window.ty) {
101            (WinOp::Count, _) => create_window_instance!(CountIv, window, ts, active),
102            (WinOp::Min, Type::UInt(_)) => {
103                create_window_instance!(MinIv<WindowUnsigned>, window, ts, active)
104            }
105            (WinOp::Min, Type::Int(_)) => {
106                create_window_instance!(MinIv<WindowSigned>, window, ts, active)
107            }
108            (WinOp::Min, Type::Float(_)) => {
109                create_window_instance!(MinIv<WindowFloat>, window, ts, active)
110            }
111            (WinOp::Min, Type::Fixed(_) | Type::UFixed(_)) => {
112                create_window_instance!(MinIv<WindowDecimal>, window, ts, active)
113            }
114            (WinOp::Max, Type::UInt(_)) => {
115                create_window_instance!(MaxIv<WindowUnsigned>, window, ts, active)
116            }
117            (WinOp::Max, Type::Int(_)) => {
118                create_window_instance!(MaxIv<WindowSigned>, window, ts, active)
119            }
120            (WinOp::Max, Type::Float(_)) => {
121                create_window_instance!(MaxIv<WindowFloat>, window, ts, active)
122            }
123            (WinOp::Max, Type::Fixed(_) | Type::UFixed(_)) => {
124                create_window_instance!(MaxIv<WindowDecimal>, window, ts, active)
125            }
126            (WinOp::Sum, Type::UInt(_)) => {
127                create_window_instance!(SumIv<WindowUnsigned>, window, ts, active)
128            }
129            (WinOp::Sum, Type::Int(_)) => {
130                create_window_instance!(SumIv<WindowSigned>, window, ts, active)
131            }
132            (WinOp::Sum, Type::Float(_)) => {
133                create_window_instance!(SumIv<WindowFloat>, window, ts, active)
134            }
135            (WinOp::Sum, Type::Fixed(_) | Type::UFixed(_)) => {
136                create_window_instance!(SumIv<WindowDecimal>, window, ts, active)
137            }
138            (WinOp::Sum, Type::Bool) => {
139                create_window_instance!(SumIv<WindowBool>, window, ts, active)
140            }
141            (WinOp::Average, Type::UInt(_)) => {
142                create_window_instance!(AvgIv<WindowUnsigned>, window, ts, active)
143            }
144            (WinOp::Average, Type::Int(_)) => {
145                create_window_instance!(AvgIv<WindowSigned>, window, ts, active)
146            }
147            (WinOp::Average, Type::Float(_)) => {
148                create_window_instance!(AvgIv<WindowFloat>, window, ts, active)
149            }
150            (WinOp::Average, Type::Fixed(_) | Type::UFixed(_)) => {
151                create_window_instance!(AvgIv<WindowDecimal>, window, ts, active)
152            }
153            (WinOp::Integral, Type::Float(_))
154            | (WinOp::Integral, Type::Int(_))
155            | (WinOp::Integral, Type::UInt(_)) => {
156                create_window_instance!(IntegralIv<WindowFloat>, window, ts, active)
157            }
158            (WinOp::Integral, Type::Fixed(_) | Type::UFixed(_)) => {
159                create_window_instance!(IntegralIv<WindowDecimal>, window, ts, active)
160            }
161            (WinOp::Conjunction, Type::Bool) => create_window_instance!(ConjIv, window, ts, active),
162            (WinOp::Disjunction, Type::Bool) => create_window_instance!(DisjIv, window, ts, active),
163            (_, Type::Option(t)) => Self::from_sliding(
164                ts,
165                &MirSlidingWindow {
166                    target: window.target,
167                    caller: window.caller,
168                    duration: window.duration,
169                    num_buckets: window.num_buckets,
170                    bucket_size: window.bucket_size,
171                    wait: window.wait,
172                    op: window.op,
173                    reference: window.reference,
174                    origin: window.origin,
175                    pacing: window.pacing.clone(),
176                    ty: t.as_ref().clone(),
177                },
178                active,
179            ),
180            (WinOp::Conjunction, _) | (WinOp::Disjunction, _) => {
181                panic!("conjunction and disjunction only defined on bool")
182            }
183            (WinOp::Min, _)
184            | (WinOp::Max, _)
185            | (WinOp::Sum, _)
186            | (WinOp::Average, _)
187            | (WinOp::Integral, _) => {
188                panic!("arithmetic operation only defined on atomic numerics")
189            }
190            (WinOp::Last, Type::Int(_)) => {
191                create_window_instance!(LastIv<WindowSigned>, window, ts, active)
192            }
193            (WinOp::Last, Type::UInt(_)) => {
194                create_window_instance!(LastIv<WindowUnsigned>, window, ts, active)
195            }
196            (WinOp::Last, Type::Float(_)) => {
197                create_window_instance!(LastIv<WindowFloat>, window, ts, active)
198            }
199            (WinOp::Last, Type::Fixed(_)) => {
200                create_window_instance!(LastIv<WindowDecimal>, window, ts, active)
201            }
202            (WinOp::NthPercentile(x), Type::Int(_)) => {
203                create_percentile_instance!(PercentileIv<WindowSigned>, window, ts, active, x)
204            }
205            (WinOp::NthPercentile(x), Type::UInt(_)) => {
206                create_percentile_instance!(PercentileIv<WindowUnsigned>, window, ts, active, x)
207            }
208            (WinOp::NthPercentile(x), Type::Float(_)) => {
209                create_percentile_instance!(PercentileIv<WindowFloat>, window, ts, active, x)
210            }
211            (WinOp::NthPercentile(x), Type::Fixed(_) | Type::UFixed(_)) => {
212                create_percentile_instance!(PercentileIv<WindowDecimal>, window, ts, active, x)
213            }
214            (WinOp::Variance, Type::Float(_)) => {
215                create_window_instance!(VarianceIv<WindowFloat>, window, ts, active)
216            }
217            (WinOp::Variance, Type::Fixed(_) | Type::UFixed(_)) => {
218                create_window_instance!(VarianceIv<WindowDecimal>, window, ts, active)
219            }
220            (WinOp::StandardDeviation, Type::Float(_)) => {
221                create_window_instance!(SdIv<WindowFloat>, window, ts, active)
222            }
223            (WinOp::StandardDeviation, Type::Fixed(_) | Type::UFixed(_)) => {
224                create_window_instance!(SdIv<WindowDecimal>, window, ts, active)
225            }
226            (WinOp::Covariance, Type::Float(_)) => {
227                create_window_instance!(CovIv<WindowFloat>, window, ts, active)
228            }
229            (WinOp::Covariance, Type::Fixed(_) | Type::UFixed(_)) => {
230                create_window_instance!(CovIv<WindowDecimal>, window, ts, active)
231            }
232            (WinOp::Product, _) => unimplemented!("product not implemented"),
233            (WinOp::Last, _) => unimplemented!(),
234            (WinOp::Variance, _) => unimplemented!(),
235            (WinOp::Covariance, _) => unimplemented!(),
236            (WinOp::StandardDeviation, _) => unimplemented!(),
237            (WinOp::NthPercentile(_), _) => unimplemented!(),
238        }
239    }
240
241    pub(crate) fn from_discrete(
242        size: usize,
243        wait: bool,
244        op: WinOp,
245        ts: Time,
246        ty: &Type,
247        active: bool,
248    ) -> SlidingWindow {
249        match (op, ty) {
250            (WinOp::Count, _) => create_discrete_window_instance!(CountIv, size, wait, ts, active),
251            (WinOp::Min, Type::UInt(_)) => {
252                create_discrete_window_instance!(MinIv<WindowUnsigned>, size, wait, ts, active)
253            }
254            (WinOp::Min, Type::Int(_)) => {
255                create_discrete_window_instance!(MinIv<WindowSigned>, size, wait, ts, active)
256            }
257            (WinOp::Min, Type::Float(_)) => {
258                create_discrete_window_instance!(MinIv<WindowFloat>, size, wait, ts, active)
259            }
260            (WinOp::Min, Type::Fixed(_) | Type::UFixed(_)) => {
261                create_discrete_window_instance!(MinIv<WindowDecimal>, size, wait, ts, active)
262            }
263            (WinOp::Max, Type::UInt(_)) => {
264                create_discrete_window_instance!(MaxIv<WindowUnsigned>, size, wait, ts, active)
265            }
266            (WinOp::Max, Type::Int(_)) => {
267                create_discrete_window_instance!(MaxIv<WindowSigned>, size, wait, ts, active)
268            }
269            (WinOp::Max, Type::Float(_)) => {
270                create_discrete_window_instance!(MaxIv<WindowFloat>, size, wait, ts, active)
271            }
272            (WinOp::Max, Type::Fixed(_) | Type::UFixed(_)) => {
273                create_discrete_window_instance!(MaxIv<WindowDecimal>, size, wait, ts, active)
274            }
275            (WinOp::Sum, Type::UInt(_)) => {
276                create_discrete_window_instance!(SumIv<WindowUnsigned>, size, wait, ts, active)
277            }
278            (WinOp::Sum, Type::Int(_)) => {
279                create_discrete_window_instance!(SumIv<WindowSigned>, size, wait, ts, active)
280            }
281            (WinOp::Sum, Type::Float(_)) => {
282                create_discrete_window_instance!(SumIv<WindowFloat>, size, wait, ts, active)
283            }
284            (WinOp::Sum, Type::Fixed(_) | Type::UFixed(_)) => {
285                create_discrete_window_instance!(SumIv<WindowDecimal>, size, wait, ts, active)
286            }
287            (WinOp::Sum, Type::Bool) => {
288                create_discrete_window_instance!(SumIv<WindowBool>, size, wait, ts, active)
289            }
290            (WinOp::Average, Type::UInt(_)) => {
291                create_discrete_window_instance!(AvgIv<WindowUnsigned>, size, wait, ts, active)
292            }
293            (WinOp::Average, Type::Int(_)) => {
294                create_discrete_window_instance!(AvgIv<WindowSigned>, size, wait, ts, active)
295            }
296            (WinOp::Average, Type::Float(_)) => {
297                create_discrete_window_instance!(AvgIv<WindowFloat>, size, wait, ts, active)
298            }
299            (WinOp::Average, Type::Fixed(_) | Type::UFixed(_)) => {
300                create_discrete_window_instance!(AvgIv<WindowDecimal>, size, wait, ts, active)
301            }
302            (WinOp::Integral, Type::Float(_))
303            | (WinOp::Integral, Type::Int(_))
304            | (WinOp::Integral, Type::UInt(_)) => {
305                create_discrete_window_instance!(IntegralIv<WindowFloat>, size, wait, ts, active)
306            }
307            (WinOp::Integral, Type::Fixed(_) | Type::UFixed(_)) => {
308                create_discrete_window_instance!(IntegralIv<WindowDecimal>, size, wait, ts, active)
309            }
310            (WinOp::Conjunction, Type::Bool) => {
311                create_discrete_window_instance!(ConjIv, size, wait, ts, active)
312            }
313            (WinOp::Disjunction, Type::Bool) => {
314                create_discrete_window_instance!(DisjIv, size, wait, ts, active)
315            }
316            (_, Type::Option(t)) => Self::from_discrete(size, wait, op, ts, t, active),
317            (WinOp::Conjunction, _) | (WinOp::Disjunction, _) => {
318                panic!("conjunction and disjunction only defined on bool")
319            }
320            (WinOp::Min, _)
321            | (WinOp::Max, _)
322            | (WinOp::Sum, _)
323            | (WinOp::Average, _)
324            | (WinOp::Integral, _) => {
325                panic!("arithmetic operation only defined on atomic numerics")
326            }
327            (WinOp::Last, Type::Int(_)) => {
328                create_discrete_window_instance!(LastIv<WindowSigned>, size, wait, ts, active)
329            }
330            (WinOp::Last, Type::UInt(_)) => {
331                create_discrete_window_instance!(LastIv<WindowUnsigned>, size, wait, ts, active)
332            }
333            (WinOp::Last, Type::Float(_)) => {
334                create_discrete_window_instance!(LastIv<WindowFloat>, size, wait, ts, active)
335            }
336            (WinOp::Last, Type::Fixed(_) | Type::UFixed(_)) => {
337                create_discrete_window_instance!(LastIv<WindowDecimal>, size, wait, ts, active)
338            }
339            (WinOp::NthPercentile(x), Type::Int(_)) => {
340                create_discrete_percentile_instance!(
341                    PercentileIv<WindowSigned>,
342                    size,
343                    wait,
344                    ts,
345                    active,
346                    x
347                )
348            }
349            (WinOp::NthPercentile(x), Type::UInt(_)) => {
350                create_discrete_percentile_instance!(
351                    PercentileIv<WindowUnsigned>,
352                    size,
353                    wait,
354                    ts,
355                    active,
356                    x
357                )
358            }
359            (WinOp::NthPercentile(x), Type::Float(_)) => {
360                create_discrete_percentile_instance!(
361                    PercentileIv<WindowFloat>,
362                    size,
363                    wait,
364                    ts,
365                    active,
366                    x
367                )
368            }
369            (WinOp::NthPercentile(x), Type::Fixed(_) | Type::UFixed(_)) => {
370                create_discrete_percentile_instance!(
371                    PercentileIv<WindowDecimal>,
372                    size,
373                    wait,
374                    ts,
375                    active,
376                    x
377                )
378            }
379            (WinOp::Variance, Type::Float(_)) => {
380                create_discrete_window_instance!(VarianceIv<WindowFloat>, size, wait, ts, active)
381            }
382            (WinOp::Variance, Type::Fixed(_) | Type::UFixed(_)) => {
383                create_discrete_window_instance!(VarianceIv<WindowDecimal>, size, wait, ts, active)
384            }
385            (WinOp::StandardDeviation, Type::Float(_)) => {
386                create_discrete_window_instance!(SdIv<WindowFloat>, size, wait, ts, active)
387            }
388            (WinOp::StandardDeviation, Type::Fixed(_) | Type::UFixed(_)) => {
389                create_discrete_window_instance!(SdIv<WindowDecimal>, size, wait, ts, active)
390            }
391            (WinOp::Covariance, Type::Float(_)) => {
392                create_discrete_window_instance!(CovIv<WindowFloat>, size, wait, ts, active)
393            }
394            (WinOp::Covariance, Type::Fixed(_) | Type::UFixed(_)) => {
395                create_discrete_window_instance!(CovIv<WindowDecimal>, size, wait, ts, active)
396            }
397            (WinOp::Product, _) => unimplemented!("product not implemented"),
398            (WinOp::Last, _) => unimplemented!(),
399            (WinOp::Variance, _) => unimplemented!(),
400            (WinOp::Covariance, _) => unimplemented!(),
401            (WinOp::StandardDeviation, _) => unimplemented!(),
402            (WinOp::NthPercentile(_), _) => unimplemented!(),
403        }
404    }
405
406    /// Updates the buckets of a sliding window instance with the given timestamp:
407    /// # Arguments:
408    /// * 'ts' - the current timestamp of the monitor
409    pub(crate) fn update(&mut self, ts: Time) {
410        self.inner.update_buckets(ts);
411    }
412
413    /// Computes the current value of a sliding window instance with the given timestamp:
414    /// # Arguments:
415    /// * 'ts' - the current timestamp of the monitor
416    ///   Note: You should always call `SlidingWindow::update` before calling `SlidingWindow::get_value()`!
417    pub(crate) fn get_value(&self, ts: Time) -> Value {
418        self.inner.get_value(ts)
419    }
420
421    /// Updates the value of the first bucket of a sliding window instance with the current value of the accessed stream:
422    /// # Arguments:
423    /// * 'v' - the current value of the accessed stream
424    /// * 'ts' - the current timestamp of the monitor
425    pub(crate) fn accept_value(&mut self, v: Value, ts: Time) {
426        self.inner.accept_value(v, ts);
427    }
428
429    /// Clears the current sliding window state
430    pub(crate) fn deactivate(&mut self) {
431        self.inner.deactivate();
432    }
433
434    /// Returns true if the window instance is currently active. I.e. the target stream instance currently exists.
435    pub(crate) fn is_active(&self) -> bool {
436        self.inner.is_active()
437    }
438
439    /// Restarts the sliding window
440    pub(crate) fn activate(&mut self, ts: Time) {
441        self.inner.activate(ts);
442    }
443}
444
445// TODO: Consider using None rather than Default.
446/// Trait to summarize common logic for the different window aggregations, e.g., returning a default value for an empty bucket
447pub(crate) trait WindowIv:
448    Clone + Add<Output = Self> + From<(Value, Time)> + Sized + Debug + Into<Value>
449{
450    fn default(ts: Time) -> Self;
451}
452
453/// Struct to summarize common logic for the different window aggregations, e.g. iterating over the buckets to compute the result of an aggregation
454#[derive(Debug, Clone)]
455pub(crate) struct RealTimeWindowInstance<IV: WindowIv> {
456    buckets: Vec<IV>,
457    start_time: u64,
458    total_duration: u64,
459    bucket_duration: u64,
460    current_bucket: usize,
461    current_bucket_end: u64,
462    wait: bool,
463    active: bool,
464}
465
466impl<IV: WindowIv> WindowInstanceTrait for RealTimeWindowInstance<IV> {
467    /// Clears the current sliding window state
468    fn deactivate(&mut self) {
469        self.active = false;
470    }
471
472    /// Returns true if the window instance is currently active. I.e. the target stream instance currently exists.
473    fn is_active(&self) -> bool {
474        self.active
475    }
476
477    /// Restarts the sliding window
478    fn activate(&mut self, ts: Time) {
479        self.clear_all_buckets(ts);
480        let ts = ts.as_nanos() as u64;
481        self.current_bucket = self.buckets.len() - 1;
482        self.start_time = ts;
483        self.current_bucket_end = ts;
484        self.active = true;
485    }
486
487    /// You should always call `WindowInstance::update_buckets` before calling `WindowInstance::get_value()`!
488    fn get_value(&self, ts: Time) -> Value {
489        if !self.active {
490            return IV::default(ts).into();
491        }
492
493        if self.wait && (ts.as_nanos() as u64) - self.start_time < self.total_duration {
494            return Value::None;
495        }
496
497        self.buckets
498            .clone()
499            .into_iter()
500            .cycle()
501            .skip(self.current_bucket + 1)
502            .take(self.buckets.len())
503            .reduce(|acc, e| acc + e)
504            .unwrap_or_else(|| IV::default(ts))
505            .into()
506    }
507
508    fn accept_value(&mut self, v: Value, ts: Time) {
509        if !self.active {
510            // ignore value if window has not started yet
511            return;
512        }
513        self.update_buckets(ts);
514        let b = self.buckets.get_mut(self.current_bucket).expect("Bug!");
515        *b = b.clone() + (v, ts).into(); // TODO: Require add_assign rather than add.
516    }
517
518    fn update_buckets(&mut self, ts: Time) {
519        assert!(self.active);
520        let last = self.current_bucket;
521        let curr = self.get_current_bucket(ts);
522
523        let current_time = ts.as_nanos() as u64;
524        if current_time > self.current_bucket_end {
525            // clear passed buckets
526            if current_time > self.current_bucket_end + self.total_duration - self.bucket_duration {
527                // we completed a whole round in the ring buffer and clear all buckets
528                self.clear_all_buckets(ts);
529            } else {
530                // we only clear buckets between last / curr
531                match curr.cmp(&last) {
532                    Ordering::Less => {
533                        self.clear_buckets(ts, last + 1, self.buckets.len());
534                        self.clear_buckets(ts, 0, curr + 1);
535                    }
536                    Ordering::Greater => {
537                        self.clear_buckets(ts, last + 1, curr + 1);
538                    }
539                    Ordering::Equal => {}
540                }
541            }
542
543            self.current_bucket = curr;
544            self.current_bucket_end = self.get_current_bucket_end(ts);
545        }
546    }
547}
548
549impl<IV: WindowIv> RealTimeWindowInstance<IV> {
550    fn new(window: &MirSlidingWindow, ts: Time, active: bool) -> Self {
551        let num_buckets = if let MemorizationBound::Bounded(num_buckets) = window.memory_bound() {
552            num_buckets as usize
553        } else {
554            unreachable!()
555        };
556
557        let buckets = vec![IV::default(ts); num_buckets];
558        let current_ts = ts.as_nanos() as u64;
559        let bucket_duration = window.bucket_size.as_nanos() as u64;
560        Self {
561            buckets,
562            bucket_duration,
563            total_duration: window.duration.as_nanos() as u64,
564            start_time: current_ts,
565            current_bucket: num_buckets - 1,
566            current_bucket_end: current_ts,
567            wait: window.wait,
568            active,
569        }
570    }
571
572    fn get_current_bucket(&self, ts: Time) -> usize {
573        assert!(self.active);
574        assert!(
575            ts.as_nanos() as u64 >= self.start_time,
576            "Time does not behave monotonically! It is {} now and the window started at: {}",
577            ts.as_secs_f64(),
578            Duration::from_nanos(self.start_time).as_secs_f64()
579        );
580        let ts = ts.as_nanos() as u64;
581        let relative_to_window = (ts - self.start_time) % self.total_duration;
582        let idx = relative_to_window / self.bucket_duration;
583        if relative_to_window % self.bucket_duration == 0 {
584            // A bucket includes time from x < ts <= x + bucket_duration
585            // Consequently, if we hit the "edge" of bucket we have to chose the previous one
586            if idx > 0 {
587                (idx - 1) as usize
588            } else {
589                self.buckets.len() - 1
590            }
591        } else {
592            idx as usize
593        }
594    }
595
596    fn get_current_bucket_end(&mut self, ts: Time) -> u64 {
597        let current_time = ts.as_nanos() as u64;
598
599        let period = if (current_time - self.start_time) % self.total_duration == 0 {
600            // A bucket includes time from x < ts <= x + bucket_duration
601            // Consequently, if we hit the "edge" of bucket we have to chose the previous one
602            let p = (current_time - self.start_time) / self.total_duration;
603            if p > 0 {
604                p - 1
605            } else {
606                0
607            }
608        } else {
609            (current_time - self.start_time) / self.total_duration
610        };
611        self.start_time
612            + period * self.total_duration
613            + (self.current_bucket + 1) as u64 * self.bucket_duration
614    }
615
616    // clear buckets starting from `start` to `last` including start, excluding end
617    fn clear_buckets(&mut self, ts: Time, start: usize, end: usize) {
618        self.buckets[start..end]
619            .iter_mut()
620            .for_each(|x| *x = IV::default(ts));
621    }
622
623    fn clear_all_buckets(&mut self, ts: Time) {
624        self.clear_buckets(ts, 0, self.buckets.len())
625    }
626}
627
628#[derive(Debug, Clone)]
629pub(crate) struct PercentileWindow<IC: WindowInstanceTrait> {
630    pub(crate) inner: IC,
631    pub(crate) percentile: u8,
632}
633
634impl<G: WindowGeneric> WindowInstanceTrait
635    for PercentileWindow<RealTimeWindowInstance<PercentileIv<G>>>
636{
637    fn get_value(&self, ts: Time) -> Value {
638        if !self.is_active() {
639            return PercentileIv::<G>::default(ts).into();
640        }
641        // Reversal is essential for non-commutative operations.
642        if self.inner.wait
643            && (ts.as_nanos() as u64) - self.inner.start_time < self.inner.total_duration
644        {
645            return Value::None;
646        }
647        self.inner
648            .buckets
649            .clone()
650            .into_iter()
651            .cycle()
652            .skip(self.inner.current_bucket + 1)
653            .take(self.inner.buckets.len())
654            .reduce(|acc, e| acc + e)
655            .unwrap_or_else(|| PercentileIv::default(ts))
656            .percentile_get_value(self.percentile)
657    }
658
659    fn accept_value(&mut self, v: Value, ts: Time) {
660        self.inner.accept_value(v, ts)
661    }
662
663    fn update_buckets(&mut self, ts: Time) {
664        self.inner.update_buckets(ts)
665    }
666
667    fn deactivate(&mut self) {
668        self.inner.deactivate()
669    }
670
671    fn is_active(&self) -> bool {
672        self.inner.is_active()
673    }
674
675    fn activate(&mut self, ts: Time) {
676        self.inner.activate(ts)
677    }
678}
679
680pub(crate) trait WindowGeneric: Debug + Clone {
681    fn from_value(v: Value) -> Value;
682}
683
684#[derive(Debug, Clone)]
685pub(crate) struct WindowSigned {}
686impl WindowGeneric for WindowSigned {
687    fn from_value(v: Value) -> Value {
688        match v {
689            Value::Signed(_) => v,
690            Value::Unsigned(u) => Value::Signed(u as i64),
691            _ => unreachable!("Type error."),
692        }
693    }
694}
695
696#[derive(Debug, Clone)]
697pub(crate) struct WindowBool {}
698impl WindowGeneric for WindowBool {
699    fn from_value(v: Value) -> Value {
700        match v {
701            Value::Bool(b) if b => Value::Unsigned(1),
702            Value::Bool(_) => Value::Unsigned(0),
703            _ => unreachable!("Type error."),
704        }
705    }
706}
707
708#[derive(Debug, Clone)]
709pub(crate) struct WindowUnsigned {}
710impl WindowGeneric for WindowUnsigned {
711    fn from_value(v: Value) -> Value {
712        match v {
713            Value::Unsigned(_) => v,
714            _ => unreachable!("Type error."),
715        }
716    }
717}
718
719#[derive(Debug, Clone)]
720pub(crate) struct WindowFloat {}
721impl WindowGeneric for WindowFloat {
722    fn from_value(v: Value) -> Value {
723        let f = match v {
724            Value::Signed(i) => i as f64,
725            Value::Unsigned(u) => u as f64,
726            Value::Float(f) => f.into(),
727            Value::Decimal(f) => f.to_f64().unwrap(),
728            _ => unreachable!("Type error."),
729        };
730        Value::Float(NotNan::new(f).unwrap())
731    }
732}
733
734#[derive(Debug, Clone)]
735pub(crate) struct WindowDecimal {}
736impl WindowGeneric for WindowDecimal {
737    fn from_value(v: Value) -> Value {
738        let f = match v {
739            Value::Signed(i) => i.into(),
740            Value::Unsigned(u) => u.into(),
741            Value::Float(f) => Decimal::from_f64(f.to_f64().unwrap()).unwrap(),
742            Value::Decimal(f) => f,
743            _ => unreachable!("Type error."),
744        };
745        Value::Decimal(f)
746    }
747}