1use crate::ResamplingFunction;
8use chrono::{DateTime, TimeDelta, Utc};
9use itertools::Itertools;
10use log::warn;
11use num_traits::FromPrimitive;
12use std::fmt::Debug;
13use std::ops::Div;
14
15pub trait Sample: Clone + Debug + Default {
17 type Value;
18 fn new(timestamp: DateTime<Utc>, value: Option<Self::Value>) -> Self;
19 fn timestamp(&self) -> DateTime<Utc>;
20 fn value(&self) -> Option<Self::Value>;
21}
22
23impl<
24 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
25 S: Sample<Value = T>,
26 > ResamplingFunction<T, S>
27{
28 pub fn apply(&mut self, samples: &[&S]) -> Option<T> {
29 match self {
30 Self::Average => Self::Sum
31 .apply(samples)
32 .and_then(|sum| Self::Count.apply(samples).map(|count| sum.div(count))),
33 Self::Sum => samples.iter().filter_map(|s| s.value()).sum1(),
34 Self::Max => samples.iter().filter_map(|s| s.value()).max_by(|a, b| {
35 a.partial_cmp(b).unwrap_or_else(|| {
36 if a.partial_cmp(&T::default()).is_some() {
37 std::cmp::Ordering::Greater
38 } else {
39 std::cmp::Ordering::Less
40 }
41 })
42 }),
43 Self::Min => samples.iter().filter_map(|s| s.value()).min_by(|a, b| {
44 a.partial_cmp(b).unwrap_or_else(|| {
45 if a.partial_cmp(&T::default()).is_some() {
46 std::cmp::Ordering::Less
47 } else {
48 std::cmp::Ordering::Greater
49 }
50 })
51 }),
52 Self::First => samples.first().and_then(|s| s.value()),
53 Self::Last => samples.last().and_then(|s| s.value()),
54 Self::Coalesce => samples.iter().find_map(|s| s.value()),
55 Self::Count => Some(
56 T::from_usize(samples.iter().filter_map(|s| s.value()).count())
57 .unwrap_or_else(|| T::default()),
58 ),
59 Self::Custom(f) => f.as_mut()(samples),
60 }
61 }
62}
63
64impl<T: Div<Output = T> + std::iter::Sum + Default + Debug, S: Sample<Value = T>> Debug
65 for ResamplingFunction<T, S>
66{
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 Self::Average => write!(f, "Average"),
70 Self::Sum => write!(f, "Sum"),
71 Self::Max => write!(f, "Max"),
72 Self::Min => write!(f, "Min"),
73 Self::First => write!(f, "First"),
74 Self::Last => write!(f, "Last"),
75 Self::Coalesce => write!(f, "Coalesce"),
76 Self::Count => write!(f, "Count"),
77 Self::Custom(_) => write!(f, "Custom"),
78 }
79 }
80}
81
82#[derive(Debug, Default)]
87pub struct Resampler<
88 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
89 S: Sample<Value = T>,
90> {
91 interval: TimeDelta,
93 resampling_function: ResamplingFunction<T, S>,
95 buffer: Vec<S>,
97 max_age_in_intervals: i32,
100 start: DateTime<Utc>,
102 input_start: Option<DateTime<Utc>>,
105 input_interval: Option<TimeDelta>,
107 first_timestamp: bool,
117}
118
119impl<
120 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
121 S: Sample<Value = T>,
122 > Resampler<T, S>
123{
124 pub fn new(
127 interval: TimeDelta,
128 resampling_function: ResamplingFunction<T, S>,
129 max_age_in_intervals: i32,
130 start: DateTime<Utc>,
131 first_timestamp: bool,
132 ) -> Self {
133 let aligned_start = epoch_align(interval, start, None);
134 Self {
135 interval,
136 resampling_function,
137 max_age_in_intervals,
138 start: aligned_start,
139 first_timestamp,
140 ..Default::default()
141 }
142 }
143
144 pub fn push(&mut self, sample: S) {
146 self.buffer.push(sample);
147 }
148
149 pub fn buffer(&self) -> &Vec<S> {
151 &self.buffer
152 }
153
154 pub fn resample(&mut self, end: DateTime<Utc>) -> Vec<S> {
157 if self.start >= end {
158 warn!("start time is greater or equal to end time");
159 return vec![];
160 }
161 let mut res = vec![];
162 let mut interval_buffer = vec![];
163 let mut buffer_iter = self.buffer.iter();
164 let mut next_sample: Option<&S> = buffer_iter.next();
165 self.input_start = next_sample.map(|s| s.timestamp());
166 let offset = if self.first_timestamp {
167 TimeDelta::zero()
168 } else {
169 self.interval
170 };
171
172 while self.start < end {
174 while next_sample
176 .map(|s| {
177 is_left_of_buffer_edge(
178 self.first_timestamp,
179 &s.timestamp(),
180 &(self.start + self.interval),
181 )
182 })
183 .unwrap_or(false)
184 {
185 if let Some(s) = next_sample {
187 interval_buffer.push(s);
189 next_sample = buffer_iter.next();
191 if let Some(input_start) = self.input_start {
194 if self.input_interval.is_none() {
195 self.input_interval =
196 Some((s.timestamp() - input_start).max(self.interval));
197 }
198 }
199 }
200 }
201
202 let input_interval = self.input_interval.unwrap_or(self.interval);
205 let drain_end_date =
206 self.start + self.interval - input_interval * self.max_age_in_intervals;
207 interval_buffer.retain(|s| {
208 is_right_of_buffer_edge(self.first_timestamp, &s.timestamp(), &drain_end_date)
209 });
210
211 res.push(Sample::new(
213 self.start + offset,
214 self.resampling_function.apply(interval_buffer.as_slice()),
215 ));
216
217 self.start += self.interval;
219 }
220
221 let interval = self.input_interval.unwrap_or(self.interval);
223 let drain_end_date = end - interval * self.max_age_in_intervals;
224 self.buffer.retain(|s| {
225 is_right_of_buffer_edge(self.first_timestamp, &s.timestamp(), &drain_end_date)
226 });
227
228 res
229 }
230
231 pub fn resample_now(&mut self) -> Vec<S> {
234 self.resample(Utc::now())
235 }
236}
237
238impl<
239 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
240 S: Sample<Value = T>,
241 > Extend<S> for Resampler<T, S>
242{
243 fn extend<I: IntoIterator<Item = S>>(&mut self, iter: I) {
244 self.buffer.extend(iter);
245 }
246}
247
248pub(crate) fn epoch_align(
250 interval: TimeDelta,
251 timestamp: DateTime<Utc>,
252 alignment_timestamp: Option<DateTime<Utc>>,
253) -> DateTime<Utc> {
254 let alignment_timestamp = alignment_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
255 DateTime::from_timestamp_millis(
256 (timestamp.timestamp_millis() / interval.num_milliseconds()) * interval.num_milliseconds()
257 + alignment_timestamp.timestamp_millis(),
258 )
259 .unwrap_or(timestamp)
260}
261
262fn is_left_of_buffer_edge(
263 first_timestamp: bool,
264 timestamp: &DateTime<Utc>,
265 edge_timestamp: &DateTime<Utc>,
266) -> bool {
267 if first_timestamp {
268 timestamp < edge_timestamp
269 } else {
270 timestamp <= edge_timestamp
271 }
272}
273
274fn is_right_of_buffer_edge(
275 first_timestamp: bool,
276 timestamp: &DateTime<Utc>,
277 edge_timestamp: &DateTime<Utc>,
278) -> bool {
279 if first_timestamp {
280 timestamp >= edge_timestamp
281 } else {
282 timestamp > edge_timestamp
283 }
284}