streaming_harness/
timeline.rs1use super::output::Metrics;
2use std::ops::{Add, Sub};
3use num_traits::{Zero, Bounded};
4
5#[derive(Debug, Clone)]
6pub struct TimelineElement<T: Eq+Ord+Copy+Zero+Bounded, M: Metrics<T>> {
7 pub time: T,
8 pub metrics: M,
9 pub samples: usize,
10}
11
12impl<T: Eq+Ord+Copy+Zero+Bounded, M: Metrics<T>> TimelineElement<T, M> {
13 pub fn combined(mut self, other: Self) -> Self {
14 assert!(self.time == other.time, "self.time != other.time");
15 self.metrics = self.metrics.combined(other.metrics);
16 self.samples = self.samples + other.samples;
17 self
18 }
19}
20
21pub struct Timeline<
22 T: Eq+Ord+Copy+Zero+Bounded+Add<DT, Output=T>+Sub<T, Output=T>,
23 DT: Copy,
24 M: Metrics<T>,
25 TM: Metrics<T>> {
26
27 pub latency_metrics: M,
28 timeline_dt: DT,
29 cur_element: usize,
30 cur_element_t: T,
31 pub timeline: Vec<TimelineElement<T, TM>>,
32}
33
34impl<
35 T: Eq+Ord+Copy+Zero+Bounded+Add<DT, Output=T>+Sub<T, Output=T>,
36 DT: Copy,
37 M: Metrics<T>,
38 TM: Metrics<T>> Timeline<T, DT, M, TM> {
39
40 pub fn new(start_t: T, end_t: T, timeline_dt: DT, latency_metrics: M, timeline_metrics: impl Fn()->TM) -> Self {
41 Self {
42 latency_metrics,
43 timeline_dt,
44 cur_element: 0usize,
45 cur_element_t: start_t,
46 timeline: (0..).scan(start_t, |t, _| {
47 let cur = *t;
48 *t = *t + timeline_dt;
49 Some(cur)
50 }).take_while(|t| *t < end_t).map(|time| TimelineElement {
51 time,
52 metrics: timeline_metrics(),
53 samples: 0,
54 }).collect(),
55 }
56 }
57}
58
59impl<
60 T: Eq+Ord+Copy+Zero+Bounded+Add<DT, Output=T>+Sub<T, Output=T>,
61 DT: Copy+Eq+::std::fmt::Debug,
62 M: Metrics<T>,
63 TM: Metrics<T>> Metrics<T> for Timeline<T, DT, M, TM> {
64
65 #[inline(always)]
66 fn record(&mut self, begin_t: T, end_t: T) {
67 self.latency_metrics.record(begin_t, end_t);
68 while begin_t >= self.cur_element_t + self.timeline_dt {
69 self.cur_element_t = self.cur_element_t + self.timeline_dt;
70 self.cur_element += 1;
71 }
72 let TimelineElement {
73 ref mut metrics,
74 ref mut samples,
75 ..
76 } = &mut self.timeline[self.cur_element];
77 metrics.record(begin_t, end_t);
78 *samples += 1;
79 }
80
81 fn combined(self, other: Self) -> Self {
82 let Timeline {
83 timeline,
84 latency_metrics,
85 timeline_dt,
86 cur_element,
87 cur_element_t,
88 } = self;
89 let Timeline {
90 timeline: other_timeline,
91 latency_metrics: other_latency_metrics,
92 timeline_dt: other_timeline_dt,
93 ..
94 } = other;
95 assert_eq!(timeline_dt, other_timeline_dt);
96 Timeline {
97 timeline:
98 timeline.into_iter().zip(other_timeline.into_iter()).map(|(s, m)| s.combined(m)).collect(),
99 latency_metrics: latency_metrics.combined(other_latency_metrics),
100 timeline_dt,
101 cur_element,
102 cur_element_t,
103 }
104 }
105}