streaming_harness/
timeline.rs

1use super::output::Metrics;
2use std::ops::{Add, Sub};
3use num_traits::{Zero, Bounded};
4
5#[derive(Debug, Clone)]
6pub struct TimelineElement<T: Eq+Ord+Copy+Zero+Bounded, M: Metrics<T>> {
7    pub time: T,
8    pub metrics: M,
9    pub samples: usize,
10}
11
12impl<T: Eq+Ord+Copy+Zero+Bounded, M: Metrics<T>> TimelineElement<T, M> {
13    pub fn combined(mut self, other: Self) -> Self {
14        assert!(self.time == other.time, "self.time != other.time");
15        self.metrics = self.metrics.combined(other.metrics);
16        self.samples = self.samples + other.samples;
17        self
18    }
19}
20
21pub struct Timeline<
22    T: Eq+Ord+Copy+Zero+Bounded+Add<DT, Output=T>+Sub<T, Output=T>,
23    DT: Copy,
24    M: Metrics<T>,
25    TM: Metrics<T>> {
26
27    pub latency_metrics: M,
28    timeline_dt: DT,
29    cur_element: usize,
30    cur_element_t: T,
31    pub timeline: Vec<TimelineElement<T, TM>>,
32}
33
34impl<
35    T: Eq+Ord+Copy+Zero+Bounded+Add<DT, Output=T>+Sub<T, Output=T>,
36    DT: Copy,
37    M: Metrics<T>,
38    TM: Metrics<T>> Timeline<T, DT, M, TM> {
39
40    pub fn new(start_t: T, end_t: T, timeline_dt: DT, latency_metrics: M, timeline_metrics: impl Fn()->TM) -> Self {
41        Self {
42            latency_metrics,
43            timeline_dt,
44            cur_element: 0usize,
45            cur_element_t: start_t,
46            timeline: (0..).scan(start_t, |t, _| {
47                let cur = *t;
48                *t = *t + timeline_dt;
49                Some(cur)
50            }).take_while(|t| *t < end_t).map(|time| TimelineElement {
51                time,
52                metrics: timeline_metrics(),
53                samples: 0,
54            }).collect(),
55        }
56    }
57}
58
59impl<
60    T: Eq+Ord+Copy+Zero+Bounded+Add<DT, Output=T>+Sub<T, Output=T>,
61    DT: Copy+Eq+::std::fmt::Debug,
62    M: Metrics<T>,
63    TM: Metrics<T>> Metrics<T> for Timeline<T, DT, M, TM> {
64
65    #[inline(always)]
66    fn record(&mut self, begin_t: T, end_t: T) {
67        self.latency_metrics.record(begin_t, end_t);
68        while begin_t >= self.cur_element_t + self.timeline_dt {
69            self.cur_element_t = self.cur_element_t + self.timeline_dt;
70            self.cur_element += 1;
71        }
72        let TimelineElement {
73            ref mut metrics,
74            ref mut samples,
75            ..
76        } = &mut self.timeline[self.cur_element];
77        metrics.record(begin_t, end_t);
78        *samples += 1;
79    }
80
81    fn combined(self, other: Self) -> Self {
82        let Timeline {
83            timeline,
84            latency_metrics,
85            timeline_dt,
86            cur_element,
87            cur_element_t,
88        } = self;
89        let Timeline {
90            timeline: other_timeline,
91            latency_metrics: other_latency_metrics,
92            timeline_dt: other_timeline_dt,
93            ..
94        } = other;
95        assert_eq!(timeline_dt, other_timeline_dt);
96        Timeline {
97            timeline: 
98                timeline.into_iter().zip(other_timeline.into_iter()).map(|(s, m)| s.combined(m)).collect(),
99            latency_metrics: latency_metrics.combined(other_latency_metrics),
100            timeline_dt,
101            cur_element,
102            cur_element_t,
103        }
104    }
105}