frequenz_resampling/
resampler.rs

1// License: MIT
2// Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3
4//! The resampler module provides the Resampler struct that is used to resample
5//! a time series of samples.
6
7use chrono::{DateTime, TimeDelta, Utc};
8use log::warn;
9use num_traits::FromPrimitive;
10use std::fmt::Debug;
11use std::ops::Div;
12
13use itertools::Itertools;
14
15pub type CustomResamplingFunction<S, T> = Box<dyn FnMut(&[&S]) -> Option<T> + Send + Sync>;
16
17/// The Sample trait represents a single sample in a time series.
18pub trait Sample: Clone + Debug + Default {
19    type Value;
20    fn new(timestamp: DateTime<Utc>, value: Option<Self::Value>) -> Self;
21    fn timestamp(&self) -> DateTime<Utc>;
22    fn value(&self) -> Option<Self::Value>;
23}
24
25/// The ResamplingFunction enum represents the different resampling functions
26/// that can be used to resample a channel.
27#[derive(Default)]
28pub enum ResamplingFunction<
29    T: Div<Output = T> + std::iter::Sum + Default + Debug,
30    S: Sample<Value = T>,
31> {
32    /// Calculates the average of all samples in the time step (ignoring None
33    /// values)
34    #[default]
35    Average,
36    /// Calculates the sum of all samples in the time step (ignoring None
37    /// values)
38    Sum,
39    /// Calculates the maximum value of all samples in the time step (ignoring
40    /// None values)
41    Max,
42    /// Calculates the minimum value of all samples in the time step (ignoring
43    /// None values)
44    Min,
45    /// Uses the first sample in the time step. If the first sample is None, the
46    /// resampling function will return None.
47    First,
48    /// Uses the last sample in the time step. If the last sample is None, the
49    /// resampling function will return None.
50    Last,
51    /// Returns the first non-None sample in the time step. If all samples are
52    /// None, the resampling function will return None.
53    Coalesce,
54    /// Counts the number of samples in the time step (ignoring None values)
55    Count,
56    /// A custom resampling function that takes a closure that takes a slice of
57    /// samples and returns an optional value.
58    Custom(CustomResamplingFunction<S, T>),
59}
60
61impl<
62        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
63        S: Sample<Value = T>,
64    > ResamplingFunction<T, S>
65{
66    pub fn apply(&mut self, samples: &[&S]) -> Option<T> {
67        match self {
68            Self::Average => Self::Sum
69                .apply(samples)
70                .and_then(|sum| Self::Count.apply(samples).map(|count| sum.div(count))),
71            Self::Sum => samples.iter().filter_map(|s| s.value()).sum1(),
72            Self::Max => samples.iter().filter_map(|s| s.value()).max_by(|a, b| {
73                a.partial_cmp(b).unwrap_or_else(|| {
74                    if a.partial_cmp(&T::default()).is_some() {
75                        std::cmp::Ordering::Greater
76                    } else {
77                        std::cmp::Ordering::Less
78                    }
79                })
80            }),
81            Self::Min => samples.iter().filter_map(|s| s.value()).min_by(|a, b| {
82                a.partial_cmp(b).unwrap_or_else(|| {
83                    if a.partial_cmp(&T::default()).is_some() {
84                        std::cmp::Ordering::Less
85                    } else {
86                        std::cmp::Ordering::Greater
87                    }
88                })
89            }),
90            Self::First => samples.first().and_then(|s| s.value()),
91            Self::Last => samples.last().and_then(|s| s.value()),
92            Self::Coalesce => samples.iter().find_map(|s| s.value()),
93            Self::Count => Some(
94                T::from_usize(samples.iter().filter_map(|s| s.value()).count())
95                    .unwrap_or_else(|| T::default()),
96            ),
97            Self::Custom(f) => f.as_mut()(samples),
98        }
99    }
100}
101
102impl<T: Div<Output = T> + std::iter::Sum + Default + Debug, S: Sample<Value = T>> Debug
103    for ResamplingFunction<T, S>
104{
105    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106        match self {
107            Self::Average => write!(f, "Average"),
108            Self::Sum => write!(f, "Sum"),
109            Self::Max => write!(f, "Max"),
110            Self::Min => write!(f, "Min"),
111            Self::First => write!(f, "First"),
112            Self::Last => write!(f, "Last"),
113            Self::Coalesce => write!(f, "Coalesce"),
114            Self::Count => write!(f, "Count"),
115            Self::Custom(_) => write!(f, "Custom"),
116        }
117    }
118}
119
120/// The Resampler struct is used to resample a time series of samples. It stores
121/// the samples in a buffer and resamples the samples in the buffer when the
122/// resample method is called. A resampler can be configured with a resampling
123/// function and a resampling interval.
124#[derive(Debug, Default)]
125pub struct Resampler<
126    T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
127    S: Sample<Value = T>,
128> {
129    /// The time step between each resampled sample
130    interval: TimeDelta,
131    /// The resampling functions to use for each channel
132    resampling_function: ResamplingFunction<T, S>,
133    /// The buffer that stores the samples
134    buffer: Vec<S>,
135    /// Resample the data in the buffer that is not older than max_age_in_intervals. Number of
136    /// intervals. If set to 0, all samples are skipped.
137    max_age_in_intervals: i32,
138    /// The start time of the resampling.
139    start: DateTime<Utc>,
140    /// The timestamp of the first sample in the buffer. If None, the timestamp
141    /// of the first sample in the buffer is used as input_start
142    input_start: Option<DateTime<Utc>>,
143    /// The interval between the first and the second sample in the buffer
144    input_interval: Option<TimeDelta>,
145    /// Whether the resampled timestamp should be the first timestamp (if
146    /// `first_timestamp` is `true`) or the last timestamp (if
147    /// `first_timestamp` is `false`) in the buffer.
148    /// If `first_timestamp` is `true`, the resampled timestamp will be the
149    /// timestamp of the first sample in the buffer and the aggregation will
150    /// be done with the samples that are `interval` in the future.
151    /// If `first_timestamp` is `false`, the resampled timestamp will be the
152    /// timestamp of the last sample in the buffer and the aggregation will
153    /// be done with the samples that are `interval` in the past.
154    first_timestamp: bool,
155}
156
157impl<
158        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
159        S: Sample<Value = T>,
160    > Resampler<T, S>
161{
162    /// Creates a new Resampler with the given resampling interval and
163    /// resampling function.
164    pub fn new(
165        interval: TimeDelta,
166        resampling_function: ResamplingFunction<T, S>,
167        max_age_in_intervals: i32,
168        start: DateTime<Utc>,
169        first_timestamp: bool,
170    ) -> Self {
171        let aligned_start = epoch_align(interval, start, None);
172        Self {
173            interval,
174            resampling_function,
175            max_age_in_intervals,
176            start: aligned_start,
177            first_timestamp,
178            ..Default::default()
179        }
180    }
181
182    /// Adds a sample to the buffer.
183    pub fn push(&mut self, sample: S) {
184        self.buffer.push(sample);
185    }
186
187    /// Returns a reference to the buffer.
188    pub fn buffer(&self) -> &Vec<S> {
189        &self.buffer
190    }
191
192    /// Resamples the samples in the buffer and returns the resampled samples
193    /// until the given end time.
194    pub fn resample(&mut self, end: DateTime<Utc>) -> Vec<S> {
195        if self.start >= end {
196            warn!("start time is greater or equal to end time");
197            return vec![];
198        }
199        let mut res = vec![];
200        let mut interval_buffer = vec![];
201        let mut buffer_iter = self.buffer.iter();
202        let mut next_sample: Option<&S> = buffer_iter.next();
203        self.input_start = next_sample.map(|s| s.timestamp());
204        let offset = if self.first_timestamp {
205            TimeDelta::zero()
206        } else {
207            self.interval
208        };
209
210        // loop over the intervals
211        while self.start < end {
212            // loop over the samples in the buffer
213            while next_sample
214                .map(|s| {
215                    is_left_of_buffer_edge(
216                        self.first_timestamp,
217                        &s.timestamp(),
218                        &(self.start + self.interval),
219                    )
220                })
221                .unwrap_or(false)
222            {
223                // next sample is not newer than the current interval
224                if let Some(s) = next_sample {
225                    // add the sample to the interval_buffer
226                    interval_buffer.push(s);
227                    // get the next sample
228                    next_sample = buffer_iter.next();
229                    // update the input_start and input_interval to adapt
230                    // the resampling interval to the input data
231                    if let Some(input_start) = self.input_start {
232                        if self.input_interval.is_none() {
233                            self.input_interval =
234                                Some((s.timestamp() - input_start).max(self.interval));
235                        }
236                    }
237                }
238            }
239
240            // Remove samples from interval_buffer that are older than
241            // max_age
242            let input_interval = self.input_interval.unwrap_or(self.interval);
243            let drain_end_date =
244                self.start + self.interval - input_interval * self.max_age_in_intervals;
245            interval_buffer.retain(|s| {
246                is_right_of_buffer_edge(self.first_timestamp, &s.timestamp(), &drain_end_date)
247            });
248
249            // resample the interval_buffer
250            res.push(Sample::new(
251                self.start + offset,
252                self.resampling_function.apply(interval_buffer.as_slice()),
253            ));
254
255            // Go to the next interval
256            self.start += self.interval;
257        }
258
259        // Remove samples from buffer that are older than max_age
260        let interval = self.input_interval.unwrap_or(self.interval);
261        let drain_end_date = end - interval * self.max_age_in_intervals;
262        self.buffer.retain(|s| {
263            is_right_of_buffer_edge(self.first_timestamp, &s.timestamp(), &drain_end_date)
264        });
265
266        res
267    }
268
269    /// Resamples the samples in the buffer and returns the resampled samples
270    /// until now.
271    pub fn resample_now(&mut self) -> Vec<S> {
272        self.resample(Utc::now())
273    }
274}
275
276impl<
277        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
278        S: Sample<Value = T>,
279    > Extend<S> for Resampler<T, S>
280{
281    fn extend<I: IntoIterator<Item = S>>(&mut self, iter: I) {
282        self.buffer.extend(iter);
283    }
284}
285
286/// Aligns a timestamp to the epoch of the resampling interval.
287pub(crate) fn epoch_align(
288    interval: TimeDelta,
289    timestamp: DateTime<Utc>,
290    alignment_timestamp: Option<DateTime<Utc>>,
291) -> DateTime<Utc> {
292    let alignment_timestamp = alignment_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
293    DateTime::from_timestamp_millis(
294        (timestamp.timestamp_millis() / interval.num_milliseconds()) * interval.num_milliseconds()
295            + alignment_timestamp.timestamp_millis(),
296    )
297    .unwrap_or(timestamp)
298}
299
300fn is_left_of_buffer_edge(
301    first_timestamp: bool,
302    timestamp: &DateTime<Utc>,
303    edge_timestamp: &DateTime<Utc>,
304) -> bool {
305    if first_timestamp {
306        timestamp < edge_timestamp
307    } else {
308        timestamp <= edge_timestamp
309    }
310}
311
312fn is_right_of_buffer_edge(
313    first_timestamp: bool,
314    timestamp: &DateTime<Utc>,
315    edge_timestamp: &DateTime<Utc>,
316) -> bool {
317    if first_timestamp {
318        timestamp >= edge_timestamp
319    } else {
320        timestamp > edge_timestamp
321    }
322}