frequenz_resampling/
resampler.rs

1// License: MIT
2// Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3
4//! The resampler module provides the Resampler struct that is used to resample
5//! a time series of samples.
6
7use crate::ResamplingFunction;
8use chrono::{DateTime, TimeDelta, Utc};
9use itertools::Itertools;
10use log::warn;
11use num_traits::FromPrimitive;
12use std::fmt::Debug;
13use std::ops::Div;
14
15/// The Sample trait represents a single sample in a time series.
16pub trait Sample: Clone + Debug + Default {
17    type Value;
18    fn new(timestamp: DateTime<Utc>, value: Option<Self::Value>) -> Self;
19    fn timestamp(&self) -> DateTime<Utc>;
20    fn value(&self) -> Option<Self::Value>;
21}
22
23impl<
24        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
25        S: Sample<Value = T>,
26    > ResamplingFunction<T, S>
27{
28    pub fn apply(&mut self, samples: &[&S]) -> Option<T> {
29        match self {
30            Self::Average => Self::Sum
31                .apply(samples)
32                .and_then(|sum| Self::Count.apply(samples).map(|count| sum.div(count))),
33            Self::Sum => samples.iter().filter_map(|s| s.value()).sum1(),
34            Self::Max => samples.iter().filter_map(|s| s.value()).max_by(|a, b| {
35                a.partial_cmp(b).unwrap_or_else(|| {
36                    if a.partial_cmp(&T::default()).is_some() {
37                        std::cmp::Ordering::Greater
38                    } else {
39                        std::cmp::Ordering::Less
40                    }
41                })
42            }),
43            Self::Min => samples.iter().filter_map(|s| s.value()).min_by(|a, b| {
44                a.partial_cmp(b).unwrap_or_else(|| {
45                    if a.partial_cmp(&T::default()).is_some() {
46                        std::cmp::Ordering::Less
47                    } else {
48                        std::cmp::Ordering::Greater
49                    }
50                })
51            }),
52            Self::First => samples.first().and_then(|s| s.value()),
53            Self::Last => samples.last().and_then(|s| s.value()),
54            Self::Coalesce => samples.iter().find_map(|s| s.value()),
55            Self::Count => Some(
56                T::from_usize(samples.iter().filter_map(|s| s.value()).count())
57                    .unwrap_or_else(|| T::default()),
58            ),
59            Self::Custom(f) => f.as_mut()(samples),
60        }
61    }
62}
63
64impl<T: Div<Output = T> + std::iter::Sum + Default + Debug, S: Sample<Value = T>> Debug
65    for ResamplingFunction<T, S>
66{
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            Self::Average => write!(f, "Average"),
70            Self::Sum => write!(f, "Sum"),
71            Self::Max => write!(f, "Max"),
72            Self::Min => write!(f, "Min"),
73            Self::First => write!(f, "First"),
74            Self::Last => write!(f, "Last"),
75            Self::Coalesce => write!(f, "Coalesce"),
76            Self::Count => write!(f, "Count"),
77            Self::Custom(_) => write!(f, "Custom"),
78        }
79    }
80}
81
82/// The Resampler struct is used to resample a time series of samples. It stores
83/// the samples in a buffer and resamples the samples in the buffer when the
84/// resample method is called. A resampler can be configured with a resampling
85/// function and a resampling interval.
86#[derive(Debug, Default)]
87pub struct Resampler<
88    T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
89    S: Sample<Value = T>,
90> {
91    /// The time step between each resampled sample
92    interval: TimeDelta,
93    /// The resampling functions to use for each channel
94    resampling_function: ResamplingFunction<T, S>,
95    /// The buffer that stores the samples
96    buffer: Vec<S>,
97    /// Resample the data in the buffer that is not older than max_age_in_intervals. Number of
98    /// intervals. If set to 0, all samples are skipped.
99    max_age_in_intervals: i32,
100    /// The start time of the resampling.
101    start: DateTime<Utc>,
102    /// The timestamp of the first sample in the buffer. If None, the timestamp
103    /// of the first sample in the buffer is used as input_start
104    input_start: Option<DateTime<Utc>>,
105    /// The interval between the first and the second sample in the buffer
106    input_interval: Option<TimeDelta>,
107    /// Whether the resampled timestamp should be the first timestamp (if
108    /// `first_timestamp` is `true`) or the last timestamp (if
109    /// `first_timestamp` is `false`) in the buffer.
110    /// If `first_timestamp` is `true`, the resampled timestamp will be the
111    /// timestamp of the first sample in the buffer and the aggregation will
112    /// be done with the samples that are `interval` in the future.
113    /// If `first_timestamp` is `false`, the resampled timestamp will be the
114    /// timestamp of the last sample in the buffer and the aggregation will
115    /// be done with the samples that are `interval` in the past.
116    first_timestamp: bool,
117}
118
119impl<
120        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
121        S: Sample<Value = T>,
122    > Resampler<T, S>
123{
124    /// Creates a new Resampler with the given resampling interval and
125    /// resampling function.
126    pub fn new(
127        interval: TimeDelta,
128        resampling_function: ResamplingFunction<T, S>,
129        max_age_in_intervals: i32,
130        start: DateTime<Utc>,
131        first_timestamp: bool,
132    ) -> Self {
133        let aligned_start = epoch_align(interval, start, None);
134        Self {
135            interval,
136            resampling_function,
137            max_age_in_intervals,
138            start: aligned_start,
139            first_timestamp,
140            ..Default::default()
141        }
142    }
143
144    /// Adds a sample to the buffer.
145    pub fn push(&mut self, sample: S) {
146        self.buffer.push(sample);
147    }
148
149    /// Returns a reference to the buffer.
150    pub fn buffer(&self) -> &Vec<S> {
151        &self.buffer
152    }
153
154    /// Resamples the samples in the buffer and returns the resampled samples
155    /// until the given end time.
156    pub fn resample(&mut self, end: DateTime<Utc>) -> Vec<S> {
157        if self.start >= end {
158            warn!("start time is greater or equal to end time");
159            return vec![];
160        }
161        let mut res = vec![];
162        let mut interval_buffer = vec![];
163        let mut buffer_iter = self.buffer.iter();
164        let mut next_sample: Option<&S> = buffer_iter.next();
165        self.input_start = next_sample.map(|s| s.timestamp());
166        let offset = if self.first_timestamp {
167            TimeDelta::zero()
168        } else {
169            self.interval
170        };
171
172        // loop over the intervals
173        while self.start < end {
174            // loop over the samples in the buffer
175            while next_sample
176                .map(|s| {
177                    is_left_of_buffer_edge(
178                        self.first_timestamp,
179                        &s.timestamp(),
180                        &(self.start + self.interval),
181                    )
182                })
183                .unwrap_or(false)
184            {
185                // next sample is not newer than the current interval
186                if let Some(s) = next_sample {
187                    // add the sample to the interval_buffer
188                    interval_buffer.push(s);
189                    // get the next sample
190                    next_sample = buffer_iter.next();
191                    // update the input_start and input_interval to adapt
192                    // the resampling interval to the input data
193                    if let Some(input_start) = self.input_start {
194                        if self.input_interval.is_none() {
195                            self.input_interval =
196                                Some((s.timestamp() - input_start).max(self.interval));
197                        }
198                    }
199                }
200            }
201
202            // Remove samples from interval_buffer that are older than
203            // max_age
204            let input_interval = self.input_interval.unwrap_or(self.interval);
205            let drain_end_date =
206                self.start + self.interval - input_interval * self.max_age_in_intervals;
207            interval_buffer.retain(|s| {
208                is_right_of_buffer_edge(self.first_timestamp, &s.timestamp(), &drain_end_date)
209            });
210
211            // resample the interval_buffer
212            res.push(Sample::new(
213                self.start + offset,
214                self.resampling_function.apply(interval_buffer.as_slice()),
215            ));
216
217            // Go to the next interval
218            self.start += self.interval;
219        }
220
221        // Remove samples from buffer that are older than max_age
222        let interval = self.input_interval.unwrap_or(self.interval);
223        let drain_end_date = end - interval * self.max_age_in_intervals;
224        self.buffer.retain(|s| {
225            is_right_of_buffer_edge(self.first_timestamp, &s.timestamp(), &drain_end_date)
226        });
227
228        res
229    }
230
231    /// Resamples the samples in the buffer and returns the resampled samples
232    /// until now.
233    pub fn resample_now(&mut self) -> Vec<S> {
234        self.resample(Utc::now())
235    }
236}
237
238impl<
239        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
240        S: Sample<Value = T>,
241    > Extend<S> for Resampler<T, S>
242{
243    fn extend<I: IntoIterator<Item = S>>(&mut self, iter: I) {
244        self.buffer.extend(iter);
245    }
246}
247
248/// Aligns a timestamp to the epoch of the resampling interval.
249pub(crate) fn epoch_align(
250    interval: TimeDelta,
251    timestamp: DateTime<Utc>,
252    alignment_timestamp: Option<DateTime<Utc>>,
253) -> DateTime<Utc> {
254    let alignment_timestamp = alignment_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
255    DateTime::from_timestamp_millis(
256        (timestamp.timestamp_millis() / interval.num_milliseconds()) * interval.num_milliseconds()
257            + alignment_timestamp.timestamp_millis(),
258    )
259    .unwrap_or(timestamp)
260}
261
262fn is_left_of_buffer_edge(
263    first_timestamp: bool,
264    timestamp: &DateTime<Utc>,
265    edge_timestamp: &DateTime<Utc>,
266) -> bool {
267    if first_timestamp {
268        timestamp < edge_timestamp
269    } else {
270        timestamp <= edge_timestamp
271    }
272}
273
274fn is_right_of_buffer_edge(
275    first_timestamp: bool,
276    timestamp: &DateTime<Utc>,
277    edge_timestamp: &DateTime<Utc>,
278) -> bool {
279    if first_timestamp {
280        timestamp >= edge_timestamp
281    } else {
282        timestamp > edge_timestamp
283    }
284}