frequenz-resampling 0.3.0

A library for resampling a stream of samples to a given interval.
Documentation
// License: MIT
// Copyright © 2024 Frequenz Energy-as-a-Service GmbH

//! The resampler module provides the Resampler struct that is used to resample
//! a time series of samples.

use crate::ResamplingFunction;
use chrono::{DateTime, TimeDelta, Utc};
use itertools::Itertools;
use log::warn;
use num_traits::FromPrimitive;
use std::fmt::Debug;
use std::ops::Div;

/// Controls which edge of an interval is used as the output timestamp label.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Label {
    /// Label each bucket with its left edge, i.e. the interval start.
    #[default]
    Left,
    /// Label each bucket with its right edge, i.e. the interval end.
    Right,
}

/// Controls which edge of an interval is closed for sample membership.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Closed {
    /// Intervals are left-closed and right-open, i.e. `[start, end)`.
    #[default]
    Left,
    /// Intervals are left-open and right-closed, i.e. `(start, end]`.
    Right,
}

/// The Sample trait represents a single sample in a time series.
pub trait Sample: Clone + Debug + Default {
    type Value;
    fn new(timestamp: DateTime<Utc>, value: Option<Self::Value>) -> Self;
    fn timestamp(&self) -> DateTime<Utc>;
    fn value(&self) -> Option<Self::Value>;
}

impl<
        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
        S: Sample<Value = T>,
    > ResamplingFunction<T, S>
{
    pub fn apply(&mut self, samples: &[&S]) -> Option<T> {
        match self {
            Self::Average => Self::Sum
                .apply(samples)
                .and_then(|sum| Self::Count.apply(samples).map(|count| sum.div(count))),
            Self::Sum => samples.iter().filter_map(|s| s.value()).sum1(),
            Self::Max => samples.iter().filter_map(|s| s.value()).max_by(|a, b| {
                a.partial_cmp(b).unwrap_or_else(|| {
                    if a.partial_cmp(&T::default()).is_some() {
                        std::cmp::Ordering::Greater
                    } else {
                        std::cmp::Ordering::Less
                    }
                })
            }),
            Self::Min => samples.iter().filter_map(|s| s.value()).min_by(|a, b| {
                a.partial_cmp(b).unwrap_or_else(|| {
                    if a.partial_cmp(&T::default()).is_some() {
                        std::cmp::Ordering::Less
                    } else {
                        std::cmp::Ordering::Greater
                    }
                })
            }),
            Self::First => samples.first().and_then(|s| s.value()),
            Self::Last => samples.last().and_then(|s| s.value()),
            Self::Coalesce => samples.iter().find_map(|s| s.value()),
            Self::Count => Some(
                T::from_usize(samples.iter().filter_map(|s| s.value()).count())
                    .unwrap_or_else(|| T::default()),
            ),
            Self::Custom(f) => f.as_mut()(samples),
        }
    }
}

impl<T: Div<Output = T> + std::iter::Sum + Default + Debug, S: Sample<Value = T>> Debug
    for ResamplingFunction<T, S>
{
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::Average => write!(f, "Average"),
            Self::Sum => write!(f, "Sum"),
            Self::Max => write!(f, "Max"),
            Self::Min => write!(f, "Min"),
            Self::First => write!(f, "First"),
            Self::Last => write!(f, "Last"),
            Self::Coalesce => write!(f, "Coalesce"),
            Self::Count => write!(f, "Count"),
            Self::Custom(_) => write!(f, "Custom"),
        }
    }
}

/// The Resampler struct is used to resample a time series of samples. It stores
/// the samples in a buffer and resamples the samples in the buffer when the
/// resample method is called. A resampler can be configured with a resampling
/// function and a resampling interval.
#[derive(Debug, Default)]
pub struct Resampler<
    T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
    S: Sample<Value = T>,
> {
    /// The time step between each resampled sample
    interval: TimeDelta,
    /// The resampling functions to use for each channel
    resampling_function: ResamplingFunction<T, S>,
    /// The buffer that stores the samples
    buffer: Vec<S>,
    /// Resample the data in the buffer that is not older than max_age_in_intervals. Number of
    /// intervals. If set to 0, all samples are skipped.
    max_age_in_intervals: i32,
    /// The start time of the resampling.
    start: DateTime<Utc>,
    /// The timestamp of the first sample in the buffer. If None, the timestamp
    /// of the first sample in the buffer is used as input_start
    input_start: Option<DateTime<Utc>>,
    /// The interval between the first and the second sample in the buffer
    input_interval: Option<TimeDelta>,
    /// Controls which edge of an interval is closed for sample membership.
    closed: Closed,
    /// Controls the output timestamp labeling for resampled samples.
    ///
    /// This parameter only affects how output timestamps are labeled, not how
    /// samples are grouped into intervals.
    ///
    /// - If `label` is [`Label::Left`], the output timestamp is set to the
    ///   start of the interval.
    /// - If `label` is [`Label::Right`], the output timestamp is set to the
    ///   end of the interval.
    ///
    /// For example, with an interval of 5 seconds starting at t=0:
    /// - Interval `[0, 5)` contains samples with timestamps 0, 1, 2, 3, 4
    /// - If `label=Label::Left`: output timestamp = 0
    /// - If `label=Label::Right`: output timestamp = 5
    label: Label,
}

impl<
        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
        S: Sample<Value = T>,
    > Resampler<T, S>
{
    /// Creates a new Resampler with the given resampling interval and
    /// resampling function.
    pub fn new(
        interval: TimeDelta,
        resampling_function: ResamplingFunction<T, S>,
        max_age_in_intervals: i32,
        start: DateTime<Utc>,
        closed: Closed,
        label: Label,
    ) -> Self {
        let aligned_start = epoch_align(interval, start, None);
        Self {
            interval,
            resampling_function,
            max_age_in_intervals,
            start: aligned_start,
            closed,
            label,
            ..Default::default()
        }
    }

    /// Adds a sample to the buffer.
    pub fn push(&mut self, sample: S) {
        self.buffer.push(sample);
    }

    /// Returns a reference to the buffer.
    pub fn buffer(&self) -> &Vec<S> {
        &self.buffer
    }

    /// Resamples the samples in the buffer and returns the resampled samples
    /// until the given end time.
    pub fn resample(&mut self, end: DateTime<Utc>) -> Vec<S> {
        if self.start >= end {
            warn!("start time is greater or equal to end time");
            return vec![];
        }
        let mut res = vec![];
        let mut interval_buffer = vec![];
        let mut buffer_iter = self.buffer.iter();
        let mut next_sample: Option<&S> = buffer_iter.next();
        self.input_start = next_sample.map(|s| s.timestamp());
        let offset = match self.label {
            Label::Left => TimeDelta::zero(),
            Label::Right => self.interval,
        };

        // loop over the intervals
        while self.start < end {
            // loop over the samples in the buffer
            while next_sample
                .map(|s| {
                    is_in_interval(
                        &s.timestamp(),
                        &self.start,
                        &(self.start + self.interval),
                        self.closed,
                    )
                })
                .unwrap_or(false)
            {
                // next sample is not newer than the current interval
                if let Some(s) = next_sample {
                    // add the sample to the interval_buffer
                    interval_buffer.push(s);
                    // get the next sample
                    next_sample = buffer_iter.next();
                    // update the input_start and input_interval to adapt
                    // the resampling interval to the input data
                    if let Some(input_start) = self.input_start {
                        if self.input_interval.is_none() {
                            self.input_interval =
                                Some((s.timestamp() - input_start).max(self.interval));
                        }
                    }
                }
            }

            // Remove samples from interval_buffer that are older than
            // max_age
            let input_interval = self.input_interval.unwrap_or(self.interval);
            let drain_end_date =
                self.start + self.interval - input_interval * self.max_age_in_intervals;
            interval_buffer
                .retain(|s| is_after_retention_edge(&s.timestamp(), &drain_end_date, self.closed));

            // resample the interval_buffer
            res.push(Sample::new(
                self.start + offset,
                self.resampling_function.apply(interval_buffer.as_slice()),
            ));

            // Go to the next interval
            self.start += self.interval;
        }

        // Remove samples from buffer that are older than max_age
        let interval = self.input_interval.unwrap_or(self.interval);
        let drain_end_date = end - interval * self.max_age_in_intervals;
        self.buffer
            .retain(|s| is_after_retention_edge(&s.timestamp(), &drain_end_date, self.closed));

        res
    }

    /// Resamples the samples in the buffer and returns the resampled samples
    /// until now.
    pub fn resample_now(&mut self) -> Vec<S> {
        self.resample(Utc::now())
    }
}

impl<
        T: Div<Output = T> + std::iter::Sum + PartialOrd + FromPrimitive + Default + Debug,
        S: Sample<Value = T>,
    > Extend<S> for Resampler<T, S>
{
    fn extend<I: IntoIterator<Item = S>>(&mut self, iter: I) {
        self.buffer.extend(iter);
    }
}

/// Aligns a timestamp to the epoch of the resampling interval.
pub fn epoch_align(
    interval: TimeDelta,
    timestamp: DateTime<Utc>,
    alignment_timestamp: Option<DateTime<Utc>>,
) -> DateTime<Utc> {
    let alignment_timestamp = alignment_timestamp.unwrap_or(DateTime::UNIX_EPOCH);
    DateTime::from_timestamp_millis(
        (timestamp.timestamp_millis() / interval.num_milliseconds()) * interval.num_milliseconds()
            + alignment_timestamp.timestamp_millis(),
    )
    .unwrap_or(timestamp)
}

/// Checks whether a timestamp belongs to the current interval.
fn is_in_interval(
    timestamp: &DateTime<Utc>,
    _start: &DateTime<Utc>,
    end: &DateTime<Utc>,
    closed: Closed,
) -> bool {
    match closed {
        Closed::Left => timestamp < end,
        Closed::Right => timestamp <= end,
    }
}

/// Checks if a timestamp should be retained in the buffer.
fn is_after_retention_edge(
    timestamp: &DateTime<Utc>,
    edge_timestamp: &DateTime<Utc>,
    closed: Closed,
) -> bool {
    match closed {
        Closed::Left => timestamp >= edge_timestamp,
        Closed::Right => timestamp > edge_timestamp,
    }
}