1use crate::ResamplingFunction;
8use chrono::{DateTime, TimeDelta, Utc};
9use itertools::Itertools;
10use log::warn;
11use num_traits::FromPrimitive;
12use std::fmt::Debug;
13use std::ops::Div;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17pub enum Label {
18 #[default]
20 Left,
21 Right,
23}
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum Closed {
28 #[default]
30 Left,
31 Right,
33}
34
35pub trait Sample: Clone + Debug + Default {
37 type Value;
38 fn new(timestamp: DateTime<Utc>, value: Option<Self::Value>) -> Self;
39 fn timestamp(&self) -> DateTime<Utc>;
40 fn value(&self) -> Option<Self::Value>;
41}
42
43impl<
44 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
45 S: Sample<Value = T>,
46 > ResamplingFunction<T, S>
47{
48 pub fn apply(&mut self, samples: &[&S]) -> Option<T> {
49 match self {
50 Self::Average => Self::Sum
51 .apply(samples)
52 .and_then(|sum| Self::Count.apply(samples).map(|count| sum.div(count))),
53 Self::Sum => samples.iter().filter_map(|s| s.value()).sum1(),
54 Self::Max => samples.iter().filter_map(|s| s.value()).max_by(|a, b| {
55 a.partial_cmp(b).unwrap_or_else(|| {
56 if a.partial_cmp(&T::default()).is_some() {
57 std::cmp::Ordering::Greater
58 } else {
59 std::cmp::Ordering::Less
60 }
61 })
62 }),
63 Self::Min => samples.iter().filter_map(|s| s.value()).min_by(|a, b| {
64 a.partial_cmp(b).unwrap_or_else(|| {
65 if a.partial_cmp(&T::default()).is_some() {
66 std::cmp::Ordering::Less
67 } else {
68 std::cmp::Ordering::Greater
69 }
70 })
71 }),
72 Self::First => samples.first().and_then(|s| s.value()),
73 Self::Last => samples.last().and_then(|s| s.value()),
74 Self::Coalesce => samples.iter().find_map(|s| s.value()),
75 Self::Count => Some(
76 T::from_usize(samples.iter().filter_map(|s| s.value()).count())
77 .unwrap_or_else(|| T::default()),
78 ),
79 Self::Custom(f) => f.as_mut()(samples),
80 }
81 }
82}
83
84impl<T: Div<Output = T> + std::iter::Sum + Default + Debug, S: Sample<Value = T>> Debug
85 for ResamplingFunction<T, S>
86{
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match self {
89 Self::Average => write!(f, "Average"),
90 Self::Sum => write!(f, "Sum"),
91 Self::Max => write!(f, "Max"),
92 Self::Min => write!(f, "Min"),
93 Self::First => write!(f, "First"),
94 Self::Last => write!(f, "Last"),
95 Self::Coalesce => write!(f, "Coalesce"),
96 Self::Count => write!(f, "Count"),
97 Self::Custom(_) => write!(f, "Custom"),
98 }
99 }
100}
101
102#[derive(Debug, Default)]
107pub struct Resampler<
108 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
109 S: Sample<Value = T>,
110> {
111 interval: TimeDelta,
113 resampling_function: ResamplingFunction<T, S>,
115 buffer: Vec<S>,
117 max_age_in_intervals: i32,
120 start: DateTime<Utc>,
122 input_start: Option<DateTime<Utc>>,
125 input_interval: Option<TimeDelta>,
127 closed: Closed,
129 label: Label,
144}
145
146impl<
147 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
148 S: Sample<Value = T>,
149 > Resampler<T, S>
150{
151 pub fn new(
154 interval: TimeDelta,
155 resampling_function: ResamplingFunction<T, S>,
156 max_age_in_intervals: i32,
157 start: DateTime<Utc>,
158 closed: Closed,
159 label: Label,
160 ) -> Self {
161 let aligned_start = epoch_align(interval, start, None);
162 Self {
163 interval,
164 resampling_function,
165 max_age_in_intervals,
166 start: aligned_start,
167 closed,
168 label,
169 ..Default::default()
170 }
171 }
172
173 pub fn push(&mut self, sample: S) {
175 self.buffer.push(sample);
176 }
177
178 pub fn buffer(&self) -> &Vec<S> {
180 &self.buffer
181 }
182
183 pub fn resample(&mut self, end: DateTime<Utc>) -> Vec<S> {
186 if self.start >= end {
187 warn!("start time is greater or equal to end time");
188 return vec![];
189 }
190 let mut res = vec![];
191 let mut interval_buffer = vec![];
192 let mut buffer_iter = self.buffer.iter();
193 let mut next_sample: Option<&S> = buffer_iter.next();
194 self.input_start = next_sample.map(|s| s.timestamp());
195 let offset = match self.label {
196 Label::Left => TimeDelta::zero(),
197 Label::Right => self.interval,
198 };
199
200 while self.start < end {
202 while next_sample
204 .map(|s| {
205 is_in_interval(
206 &s.timestamp(),
207 &self.start,
208 &(self.start + self.interval),
209 self.closed,
210 )
211 })
212 .unwrap_or(false)
213 {
214 if let Some(s) = next_sample {
216 interval_buffer.push(s);
218 next_sample = buffer_iter.next();
220 if let Some(input_start) = self.input_start {
223 if self.input_interval.is_none() {
224 self.input_interval =
225 Some((s.timestamp() - input_start).max(self.interval));
226 }
227 }
228 }
229 }
230
231 let input_interval = self.input_interval.unwrap_or(self.interval);
234 let drain_end_date =
235 self.start + self.interval - input_interval * self.max_age_in_intervals;
236 interval_buffer
237 .retain(|s| is_after_retention_edge(&s.timestamp(), &drain_end_date, self.closed));
238
239 res.push(Sample::new(
241 self.start + offset,
242 self.resampling_function.apply(interval_buffer.as_slice()),
243 ));
244
245 self.start += self.interval;
247 }
248
249 let interval = self.input_interval.unwrap_or(self.interval);
251 let drain_end_date = end - interval * self.max_age_in_intervals;
252 self.buffer
253 .retain(|s| is_after_retention_edge(&s.timestamp(), &drain_end_date, self.closed));
254
255 res
256 }
257
258 pub fn resample_now(&mut self) -> Vec<S> {
261 self.resample(Utc::now())
262 }
263}
264
265impl<
266 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
267 S: Sample<Value = T>,
268 > Extend<S> for Resampler<T, S>
269{
270 fn extend<I: IntoIterator<Item = S>>(&mut self, iter: I) {
271 self.buffer.extend(iter);
272 }
273}
274
275pub fn epoch_align(
277 interval: TimeDelta,
278 timestamp: DateTime<Utc>,
279 alignment_timestamp: Option<DateTime<Utc>>,
280) -> DateTime<Utc> {
281 let alignment_timestamp = alignment_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
282 DateTime::from_timestamp_millis(
283 (timestamp.timestamp_millis() / interval.num_milliseconds()) * interval.num_milliseconds()
284 + alignment_timestamp.timestamp_millis(),
285 )
286 .unwrap_or(timestamp)
287}
288
289fn is_in_interval(
291 timestamp: &DateTime<Utc>,
292 _start: &DateTime<Utc>,
293 end: &DateTime<Utc>,
294 closed: Closed,
295) -> bool {
296 match closed {
297 Closed::Left => timestamp < end,
298 Closed::Right => timestamp <= end,
299 }
300}
301
302fn is_after_retention_edge(
304 timestamp: &DateTime<Utc>,
305 edge_timestamp: &DateTime<Utc>,
306 closed: Closed,
307) -> bool {
308 match closed {
309 Closed::Left => timestamp >= edge_timestamp,
310 Closed::Right => timestamp > edge_timestamp,
311 }
312}