Skip to main content

frequenz_resampling/
resampler.rs

1// License: MIT
2// Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3
4//! The resampler module provides the Resampler struct that is used to resample
5//! a time series of samples.
6
7use crate::ResamplingFunction;
8use chrono::{DateTime, TimeDelta, Utc};
9use itertools::Itertools;
10use log::warn;
11use num_traits::FromPrimitive;
12use std::fmt::Debug;
13use std::ops::Div;
14
15/// Controls which edge of an interval is used as the output timestamp label.
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
17pub enum Label {
18    /// Label each bucket with its left edge, i.e. the interval start.
19    #[default]
20    Left,
21    /// Label each bucket with its right edge, i.e. the interval end.
22    Right,
23}
24
25/// Controls which edge of an interval is closed for sample membership.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
27pub enum Closed {
28    /// Intervals are left-closed and right-open, i.e. `[start, end)`.
29    #[default]
30    Left,
31    /// Intervals are left-open and right-closed, i.e. `(start, end]`.
32    Right,
33}
34
35/// The Sample trait represents a single sample in a time series.
36pub trait Sample: Clone + Debug + Default {
37    type Value;
38    fn new(timestamp: DateTime<Utc>, value: Option<Self::Value>) -> Self;
39    fn timestamp(&self) -> DateTime<Utc>;
40    fn value(&self) -> Option<Self::Value>;
41}
42
43impl<
44        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
45        S: Sample<Value = T>,
46    > ResamplingFunction<T, S>
47{
48    pub fn apply(&mut self, samples: &[&S]) -> Option<T> {
49        match self {
50            Self::Average => Self::Sum
51                .apply(samples)
52                .and_then(|sum| Self::Count.apply(samples).map(|count| sum.div(count))),
53            Self::Sum => samples.iter().filter_map(|s| s.value()).sum1(),
54            Self::Max => samples.iter().filter_map(|s| s.value()).max_by(|a, b| {
55                a.partial_cmp(b).unwrap_or_else(|| {
56                    if a.partial_cmp(&T::default()).is_some() {
57                        std::cmp::Ordering::Greater
58                    } else {
59                        std::cmp::Ordering::Less
60                    }
61                })
62            }),
63            Self::Min => samples.iter().filter_map(|s| s.value()).min_by(|a, b| {
64                a.partial_cmp(b).unwrap_or_else(|| {
65                    if a.partial_cmp(&T::default()).is_some() {
66                        std::cmp::Ordering::Less
67                    } else {
68                        std::cmp::Ordering::Greater
69                    }
70                })
71            }),
72            Self::First => samples.first().and_then(|s| s.value()),
73            Self::Last => samples.last().and_then(|s| s.value()),
74            Self::Coalesce => samples.iter().find_map(|s| s.value()),
75            Self::Count => Some(
76                T::from_usize(samples.iter().filter_map(|s| s.value()).count())
77                    .unwrap_or_else(|| T::default()),
78            ),
79            Self::Custom(f) => f.as_mut()(samples),
80        }
81    }
82}
83
84impl<T: Div<Output = T> + std::iter::Sum + Default + Debug, S: Sample<Value = T>> Debug
85    for ResamplingFunction<T, S>
86{
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self {
89            Self::Average => write!(f, "Average"),
90            Self::Sum => write!(f, "Sum"),
91            Self::Max => write!(f, "Max"),
92            Self::Min => write!(f, "Min"),
93            Self::First => write!(f, "First"),
94            Self::Last => write!(f, "Last"),
95            Self::Coalesce => write!(f, "Coalesce"),
96            Self::Count => write!(f, "Count"),
97            Self::Custom(_) => write!(f, "Custom"),
98        }
99    }
100}
101
102/// The Resampler struct is used to resample a time series of samples. It stores
103/// the samples in a buffer and resamples the samples in the buffer when the
104/// resample method is called. A resampler can be configured with a resampling
105/// function and a resampling interval.
106#[derive(Debug, Default)]
107pub struct Resampler<
108    T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
109    S: Sample<Value = T>,
110> {
111    /// The time step between each resampled sample
112    interval: TimeDelta,
113    /// The resampling functions to use for each channel
114    resampling_function: ResamplingFunction<T, S>,
115    /// The buffer that stores the samples
116    buffer: Vec<S>,
117    /// Resample the data in the buffer that is not older than max_age_in_intervals. Number of
118    /// intervals. If set to 0, all samples are skipped.
119    max_age_in_intervals: i32,
120    /// The start time of the resampling.
121    start: DateTime<Utc>,
122    /// The timestamp of the first sample in the buffer. If None, the timestamp
123    /// of the first sample in the buffer is used as input_start
124    input_start: Option<DateTime<Utc>>,
125    /// The interval between the first and the second sample in the buffer
126    input_interval: Option<TimeDelta>,
127    /// Controls which edge of an interval is closed for sample membership.
128    closed: Closed,
129    /// Controls the output timestamp labeling for resampled samples.
130    ///
131    /// This parameter only affects how output timestamps are labeled, not how
132    /// samples are grouped into intervals.
133    ///
134    /// - If `label` is [`Label::Left`], the output timestamp is set to the
135    ///   start of the interval.
136    /// - If `label` is [`Label::Right`], the output timestamp is set to the
137    ///   end of the interval.
138    ///
139    /// For example, with an interval of 5 seconds starting at t=0:
140    /// - Interval `[0, 5)` contains samples with timestamps 0, 1, 2, 3, 4
141    /// - If `label=Label::Left`: output timestamp = 0
142    /// - If `label=Label::Right`: output timestamp = 5
143    label: Label,
144}
145
146impl<
147        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
148        S: Sample<Value = T>,
149    > Resampler<T, S>
150{
151    /// Creates a new Resampler with the given resampling interval and
152    /// resampling function.
153    pub fn new(
154        interval: TimeDelta,
155        resampling_function: ResamplingFunction<T, S>,
156        max_age_in_intervals: i32,
157        start: DateTime<Utc>,
158        closed: Closed,
159        label: Label,
160    ) -> Self {
161        let aligned_start = epoch_align(interval, start, None);
162        Self {
163            interval,
164            resampling_function,
165            max_age_in_intervals,
166            start: aligned_start,
167            closed,
168            label,
169            ..Default::default()
170        }
171    }
172
173    /// Adds a sample to the buffer.
174    pub fn push(&mut self, sample: S) {
175        self.buffer.push(sample);
176    }
177
178    /// Returns a reference to the buffer.
179    pub fn buffer(&self) -> &Vec<S> {
180        &self.buffer
181    }
182
183    /// Resamples the samples in the buffer and returns the resampled samples
184    /// until the given end time.
185    pub fn resample(&mut self, end: DateTime<Utc>) -> Vec<S> {
186        if self.start >= end {
187            warn!("start time is greater or equal to end time");
188            return vec![];
189        }
190        let mut res = vec![];
191        let mut interval_buffer = vec![];
192        let mut buffer_iter = self.buffer.iter();
193        let mut next_sample: Option<&S> = buffer_iter.next();
194        self.input_start = next_sample.map(|s| s.timestamp());
195        let offset = match self.label {
196            Label::Left => TimeDelta::zero(),
197            Label::Right => self.interval,
198        };
199
200        // loop over the intervals
201        while self.start < end {
202            // loop over the samples in the buffer
203            while next_sample
204                .map(|s| {
205                    is_in_interval(
206                        &s.timestamp(),
207                        &self.start,
208                        &(self.start + self.interval),
209                        self.closed,
210                    )
211                })
212                .unwrap_or(false)
213            {
214                // next sample is not newer than the current interval
215                if let Some(s) = next_sample {
216                    // add the sample to the interval_buffer
217                    interval_buffer.push(s);
218                    // get the next sample
219                    next_sample = buffer_iter.next();
220                    // update the input_start and input_interval to adapt
221                    // the resampling interval to the input data
222                    if let Some(input_start) = self.input_start {
223                        if self.input_interval.is_none() {
224                            self.input_interval =
225                                Some((s.timestamp() - input_start).max(self.interval));
226                        }
227                    }
228                }
229            }
230
231            // Remove samples from interval_buffer that are older than
232            // max_age
233            let input_interval = self.input_interval.unwrap_or(self.interval);
234            let drain_end_date =
235                self.start + self.interval - input_interval * self.max_age_in_intervals;
236            interval_buffer
237                .retain(|s| is_after_retention_edge(&s.timestamp(), &drain_end_date, self.closed));
238
239            // resample the interval_buffer
240            res.push(Sample::new(
241                self.start + offset,
242                self.resampling_function.apply(interval_buffer.as_slice()),
243            ));
244
245            // Go to the next interval
246            self.start += self.interval;
247        }
248
249        // Remove samples from buffer that are older than max_age
250        let interval = self.input_interval.unwrap_or(self.interval);
251        let drain_end_date = end - interval * self.max_age_in_intervals;
252        self.buffer
253            .retain(|s| is_after_retention_edge(&s.timestamp(), &drain_end_date, self.closed));
254
255        res
256    }
257
258    /// Resamples the samples in the buffer and returns the resampled samples
259    /// until now.
260    pub fn resample_now(&mut self) -> Vec<S> {
261        self.resample(Utc::now())
262    }
263}
264
265impl<
266        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
267        S: Sample<Value = T>,
268    > Extend<S> for Resampler<T, S>
269{
270    fn extend<I: IntoIterator<Item = S>>(&mut self, iter: I) {
271        self.buffer.extend(iter);
272    }
273}
274
275/// Aligns a timestamp to the epoch of the resampling interval.
276pub fn epoch_align(
277    interval: TimeDelta,
278    timestamp: DateTime<Utc>,
279    alignment_timestamp: Option<DateTime<Utc>>,
280) -> DateTime<Utc> {
281    let alignment_timestamp = alignment_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
282    DateTime::from_timestamp_millis(
283        (timestamp.timestamp_millis() / interval.num_milliseconds()) * interval.num_milliseconds()
284            + alignment_timestamp.timestamp_millis(),
285    )
286    .unwrap_or(timestamp)
287}
288
289/// Checks whether a timestamp belongs to the current interval.
290fn is_in_interval(
291    timestamp: &DateTime<Utc>,
292    _start: &DateTime<Utc>,
293    end: &DateTime<Utc>,
294    closed: Closed,
295) -> bool {
296    match closed {
297        Closed::Left => timestamp < end,
298        Closed::Right => timestamp <= end,
299    }
300}
301
302/// Checks if a timestamp should be retained in the buffer.
303fn is_after_retention_edge(
304    timestamp: &DateTime<Utc>,
305    edge_timestamp: &DateTime<Utc>,
306    closed: Closed,
307) -> bool {
308    match closed {
309        Closed::Left => timestamp >= edge_timestamp,
310        Closed::Right => timestamp > edge_timestamp,
311    }
312}