1use chrono::{DateTime, TimeDelta, Utc};
8use log::warn;
9use num_traits::FromPrimitive;
10use std::fmt::Debug;
11use std::ops::Div;
12
13use itertools::Itertools;
14
15pub type CustomResamplingFunction<S, T> = Box<dyn FnMut(&[&S]) -> Option<T> + Send + Sync>;
16
17pub trait Sample: Clone + Debug + Default {
19 type Value;
20 fn new(timestamp: DateTime<Utc>, value: Option<Self::Value>) -> Self;
21 fn timestamp(&self) -> DateTime<Utc>;
22 fn value(&self) -> Option<Self::Value>;
23}
24
25#[derive(Default)]
28pub enum ResamplingFunction<
29 T: Div<Output = T> + std::iter::Sum + Default + Debug,
30 S: Sample<Value = T>,
31> {
32 #[default]
35 Average,
36 Sum,
39 Max,
42 Min,
45 First,
48 Last,
51 Coalesce,
54 Count,
56 Custom(CustomResamplingFunction<S, T>),
59}
60
61impl<
62 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
63 S: Sample<Value = T>,
64 > ResamplingFunction<T, S>
65{
66 pub fn apply(&mut self, samples: &[&S]) -> Option<T> {
67 match self {
68 Self::Average => Self::Sum
69 .apply(samples)
70 .and_then(|sum| Self::Count.apply(samples).map(|count| sum.div(count))),
71 Self::Sum => samples.iter().filter_map(|s| s.value()).sum1(),
72 Self::Max => samples.iter().filter_map(|s| s.value()).max_by(|a, b| {
73 a.partial_cmp(b).unwrap_or_else(|| {
74 if a.partial_cmp(&T::default()).is_some() {
75 std::cmp::Ordering::Greater
76 } else {
77 std::cmp::Ordering::Less
78 }
79 })
80 }),
81 Self::Min => samples.iter().filter_map(|s| s.value()).min_by(|a, b| {
82 a.partial_cmp(b).unwrap_or_else(|| {
83 if a.partial_cmp(&T::default()).is_some() {
84 std::cmp::Ordering::Less
85 } else {
86 std::cmp::Ordering::Greater
87 }
88 })
89 }),
90 Self::First => samples.first().and_then(|s| s.value()),
91 Self::Last => samples.last().and_then(|s| s.value()),
92 Self::Coalesce => samples.iter().find_map(|s| s.value()),
93 Self::Count => Some(
94 T::from_usize(samples.iter().filter_map(|s| s.value()).count())
95 .unwrap_or_else(|| T::default()),
96 ),
97 Self::Custom(f) => f.as_mut()(samples),
98 }
99 }
100}
101
102impl<T: Div<Output = T> + std::iter::Sum + Default + Debug, S: Sample<Value = T>> Debug
103 for ResamplingFunction<T, S>
104{
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 Self::Average => write!(f, "Average"),
108 Self::Sum => write!(f, "Sum"),
109 Self::Max => write!(f, "Max"),
110 Self::Min => write!(f, "Min"),
111 Self::First => write!(f, "First"),
112 Self::Last => write!(f, "Last"),
113 Self::Coalesce => write!(f, "Coalesce"),
114 Self::Count => write!(f, "Count"),
115 Self::Custom(_) => write!(f, "Custom"),
116 }
117 }
118}
119
120#[derive(Debug, Default)]
125pub struct Resampler<
126 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
127 S: Sample<Value = T>,
128> {
129 interval: TimeDelta,
131 resampling_function: ResamplingFunction<T, S>,
133 buffer: Vec<S>,
135 max_age_in_intervals: i32,
138 start: DateTime<Utc>,
140 input_start: Option<DateTime<Utc>>,
143 input_interval: Option<TimeDelta>,
145 first_timestamp: bool,
155}
156
157impl<
158 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
159 S: Sample<Value = T>,
160 > Resampler<T, S>
161{
162 pub fn new(
165 interval: TimeDelta,
166 resampling_function: ResamplingFunction<T, S>,
167 max_age_in_intervals: i32,
168 start: DateTime<Utc>,
169 first_timestamp: bool,
170 ) -> Self {
171 let aligned_start = epoch_align(interval, start, None);
172 Self {
173 interval,
174 resampling_function,
175 max_age_in_intervals,
176 start: aligned_start,
177 first_timestamp,
178 ..Default::default()
179 }
180 }
181
182 pub fn push(&mut self, sample: S) {
184 self.buffer.push(sample);
185 }
186
187 pub fn buffer(&self) -> &Vec<S> {
189 &self.buffer
190 }
191
192 pub fn resample(&mut self, end: DateTime<Utc>) -> Vec<S> {
195 if self.start >= end {
196 warn!("start time is greater or equal to end time");
197 return vec![];
198 }
199 let mut res = vec![];
200 let mut interval_buffer = vec![];
201 let mut buffer_iter = self.buffer.iter();
202 let mut next_sample: Option<&S> = buffer_iter.next();
203 self.input_start = next_sample.map(|s| s.timestamp());
204 let offset = if self.first_timestamp {
205 TimeDelta::zero()
206 } else {
207 self.interval
208 };
209
210 while self.start < end {
212 while next_sample
214 .map(|s| {
215 is_left_of_buffer_edge(
216 self.first_timestamp,
217 &s.timestamp(),
218 &(self.start + self.interval),
219 )
220 })
221 .unwrap_or(false)
222 {
223 if let Some(s) = next_sample {
225 interval_buffer.push(s);
227 next_sample = buffer_iter.next();
229 if let Some(input_start) = self.input_start {
232 if self.input_interval.is_none() {
233 self.input_interval =
234 Some((s.timestamp() - input_start).max(self.interval));
235 }
236 }
237 }
238 }
239
240 let input_interval = self.input_interval.unwrap_or(self.interval);
243 let drain_end_date =
244 self.start + self.interval - input_interval * self.max_age_in_intervals;
245 interval_buffer.retain(|s| {
246 is_right_of_buffer_edge(self.first_timestamp, &s.timestamp(), &drain_end_date)
247 });
248
249 res.push(Sample::new(
251 self.start + offset,
252 self.resampling_function.apply(interval_buffer.as_slice()),
253 ));
254
255 self.start += self.interval;
257 }
258
259 let interval = self.input_interval.unwrap_or(self.interval);
261 let drain_end_date = end - interval * self.max_age_in_intervals;
262 self.buffer.retain(|s| {
263 is_right_of_buffer_edge(self.first_timestamp, &s.timestamp(), &drain_end_date)
264 });
265
266 res
267 }
268
269 pub fn resample_now(&mut self) -> Vec<S> {
272 self.resample(Utc::now())
273 }
274}
275
276impl<
277 T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
278 S: Sample<Value = T>,
279 > Extend<S> for Resampler<T, S>
280{
281 fn extend<I: IntoIterator<Item = S>>(&mut self, iter: I) {
282 self.buffer.extend(iter);
283 }
284}
285
286pub(crate) fn epoch_align(
288 interval: TimeDelta,
289 timestamp: DateTime<Utc>,
290 alignment_timestamp: Option<DateTime<Utc>>,
291) -> DateTime<Utc> {
292 let alignment_timestamp = alignment_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
293 DateTime::from_timestamp_millis(
294 (timestamp.timestamp_millis() / interval.num_milliseconds()) * interval.num_milliseconds()
295 + alignment_timestamp.timestamp_millis(),
296 )
297 .unwrap_or(timestamp)
298}
299
300fn is_left_of_buffer_edge(
301 first_timestamp: bool,
302 timestamp: &DateTime<Utc>,
303 edge_timestamp: &DateTime<Utc>,
304) -> bool {
305 if first_timestamp {
306 timestamp < edge_timestamp
307 } else {
308 timestamp <= edge_timestamp
309 }
310}
311
312fn is_right_of_buffer_edge(
313 first_timestamp: bool,
314 timestamp: &DateTime<Utc>,
315 edge_timestamp: &DateTime<Utc>,
316) -> bool {
317 if first_timestamp {
318 timestamp >= edge_timestamp
319 } else {
320 timestamp > edge_timestamp
321 }
322}