scepter 0.1.5

Composable primitives for planet-scale time-series routing, indexing, and aggregation.
Documentation
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::marker::PhantomData;

use crate::aggregate::Mergeable;
use crate::collect::error::CollectionError;
use crate::collect::reducer::{MergeReducer, Reducer};
use crate::collect::window::{AdmissionWindow, DeltaPoint, FinalizedBucket};

/// Aggregates deltas into aligned time buckets and finalizes immutable buckets
/// after they leave the admission window.
#[derive(Debug, Clone)]
pub struct BucketedAggregator<K, V, R = MergeReducer> {
    period: u64,
    offset: u64,
    admission: AdmissionWindow,
    watermark: u64,
    reducer: R,
    buckets: BTreeMap<(u64, K), V>,
    _value: PhantomData<V>,
}

impl<K, V> BucketedAggregator<K, V, MergeReducer>
where
    K: Ord,
    V: Mergeable,
{
    /// Creates a bucketed aggregator using `MergeReducer`.
    pub fn new(period: u64, admission: AdmissionWindow) -> Result<Self, CollectionError<V::Error>> {
        Self::with_reducer(period, admission, MergeReducer)
    }
}

impl<K, V, R> BucketedAggregator<K, V, R>
where
    K: Ord,
    R: Reducer<V>,
{
    /// Creates a bucketed aggregator with a custom reducer.
    pub fn with_reducer(
        period: u64,
        admission: AdmissionWindow,
        reducer: R,
    ) -> Result<Self, CollectionError<R::Error>> {
        Self::with_offset_and_reducer(period, 0, admission, reducer)
    }

    /// Creates a bucketed aggregator with an offset for load-smearing bucket
    /// boundaries across output series.
    pub fn with_offset_and_reducer(
        period: u64,
        offset: u64,
        admission: AdmissionWindow,
        reducer: R,
    ) -> Result<Self, CollectionError<R::Error>> {
        if period == 0 {
            return Err(CollectionError::EmptyBucketPeriod);
        }
        if offset >= period {
            return Err(CollectionError::OffsetOutsidePeriod);
        }

        Ok(Self {
            period,
            offset,
            admission,
            watermark: 0,
            reducer,
            buckets: BTreeMap::new(),
            _value: PhantomData,
        })
    }

    /// Returns the bucket period.
    pub fn period(&self) -> u64 {
        self.period
    }

    /// Returns the bucket offset.
    pub fn offset(&self) -> u64 {
        self.offset
    }

    /// Returns the admission window.
    pub fn admission(&self) -> AdmissionWindow {
        self.admission
    }

    /// Returns the current watermark.
    pub fn watermark(&self) -> u64 {
        self.watermark
    }

    /// Returns the number of open `(bucket, key)` aggregates.
    pub fn open_bucket_count(&self) -> usize {
        self.buckets.len()
    }

    /// Advances the watermark and returns buckets finalized by this movement.
    pub fn advance_to(&mut self, watermark: u64) -> Vec<FinalizedBucket<K, V>>
    where
        K: Clone,
    {
        self.watermark = self.watermark.max(watermark);
        self.drain_finalized()
    }

    /// Adds a delta into its end-time bucket.
    pub fn ingest(&mut self, delta: DeltaPoint<K, V>) -> Result<(), CollectionError<R::Error>> {
        if delta.end_time < self.admission.cutoff(self.watermark) {
            return Err(CollectionError::LateDelta);
        }

        let bucket_start = self.bucket_start(delta.end_time);
        match self.buckets.entry((bucket_start, delta.key)) {
            Entry::Occupied(mut entry) => self
                .reducer
                .reduce(entry.get_mut(), delta.value)
                .map_err(CollectionError::Reducer),
            Entry::Vacant(entry) => {
                entry.insert(delta.value);
                Ok(())
            }
        }
    }

    /// Advances the watermark, ingests one delta, then finalizes old buckets.
    pub fn ingest_at(
        &mut self,
        watermark: u64,
        delta: DeltaPoint<K, V>,
    ) -> Result<Vec<FinalizedBucket<K, V>>, CollectionError<R::Error>>
    where
        K: Clone,
    {
        let mut finalized = self.advance_to(watermark);
        self.ingest(delta)?;
        finalized.extend(self.drain_finalized());
        Ok(finalized)
    }

    fn bucket_start(&self, timestamp: u64) -> u64 {
        if timestamp < self.offset {
            return 0;
        }
        ((timestamp - self.offset) / self.period) * self.period + self.offset
    }

    fn drain_finalized(&mut self) -> Vec<FinalizedBucket<K, V>>
    where
        K: Clone,
    {
        let cutoff = self.admission.cutoff(self.watermark);
        let mut finalized_keys = Vec::new();
        for (start, key) in self.buckets.keys() {
            if start.saturating_add(self.period) <= cutoff {
                finalized_keys.push((*start, key.clone()));
            } else {
                break;
            }
        }

        let mut finalized = Vec::with_capacity(finalized_keys.len());
        for (start, key) in finalized_keys {
            if let Some(value) = self.buckets.remove(&(start, key.clone())) {
                finalized.push(FinalizedBucket {
                    key,
                    start,
                    end: start + self.period,
                    value,
                });
            }
        }
        finalized
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::collect::reducer::{MaxReducer, MinReducer, SumReducer};
    use crate::distribution::{BucketLayout, Distribution};

    #[test]
    fn bucketed_aggregator_sums_deltas_and_finalizes_old_buckets() {
        let mut aggregator =
            BucketedAggregator::with_reducer(10, AdmissionWindow::new(5), SumReducer).unwrap();

        aggregator
            .ingest(DeltaPoint {
                key: "user-a",
                end_time: 12,
                value: 3_u64,
            })
            .unwrap();
        aggregator
            .ingest(DeltaPoint {
                key: "user-a",
                end_time: 18,
                value: 4_u64,
            })
            .unwrap();

        let finalized = aggregator.advance_to(25);

        assert_eq!(
            finalized,
            vec![FinalizedBucket {
                key: "user-a",
                start: 10,
                end: 20,
                value: 7
            }]
        );
        assert_eq!(aggregator.open_bucket_count(), 0);
    }

    #[test]
    fn bucketed_aggregator_rejects_late_deltas() {
        let mut aggregator =
            BucketedAggregator::with_reducer(10, AdmissionWindow::new(5), SumReducer).unwrap();
        aggregator.advance_to(30);

        let error = aggregator
            .ingest(DeltaPoint {
                key: "user-a",
                end_time: 24,
                value: 1_u64,
            })
            .unwrap_err();

        assert_eq!(error, CollectionError::LateDelta);
    }

    #[test]
    fn bucketed_aggregator_supports_offset_bucket_boundaries() {
        let mut aggregator =
            BucketedAggregator::with_offset_and_reducer(10, 3, AdmissionWindow::new(0), SumReducer)
                .unwrap();
        assert_eq!(aggregator.offset(), 3);

        aggregator
            .ingest(DeltaPoint {
                key: "user-a",
                end_time: 12,
                value: 5_u64,
            })
            .unwrap();

        let finalized = aggregator.advance_to(23);

        assert_eq!(finalized[0].start, 3);
        assert_eq!(finalized[0].end, 13);
    }

    #[test]
    fn bucketed_aggregator_handles_timestamps_before_and_at_offset() {
        let mut before_offset =
            BucketedAggregator::with_offset_and_reducer(10, 3, AdmissionWindow::new(0), SumReducer)
                .unwrap();
        before_offset
            .ingest(DeltaPoint {
                key: "user-a",
                end_time: 1,
                value: 5_u64,
            })
            .unwrap();
        assert_eq!(before_offset.advance_to(10)[0].start, 0);

        let mut at_offset =
            BucketedAggregator::with_offset_and_reducer(10, 3, AdmissionWindow::new(0), SumReducer)
                .unwrap();
        at_offset
            .ingest(DeltaPoint {
                key: "user-a",
                end_time: 3,
                value: 7_u64,
            })
            .unwrap();
        assert_eq!(at_offset.advance_to(13)[0].start, 3);
    }

    #[test]
    fn bucketed_aggregator_accepts_delta_at_admission_cutoff() {
        let mut aggregator =
            BucketedAggregator::with_reducer(10, AdmissionWindow::new(5), SumReducer).unwrap();
        aggregator.advance_to(30);

        aggregator
            .ingest(DeltaPoint {
                key: "user-a",
                end_time: 25,
                value: 1_u64,
            })
            .unwrap();

        assert_eq!(aggregator.open_bucket_count(), 1);
    }

    #[test]
    fn bucketed_aggregator_merges_distribution_deltas() {
        let layout = BucketLayout::fixed_width(0.0, 10.0, 2).unwrap();
        let mut left = Distribution::<()>::from_layout(&layout);
        left.record(3.0, 2).unwrap();
        let mut right = Distribution::<()>::from_layout(&layout);
        right.record(14.0, 5).unwrap();

        let mut aggregator = BucketedAggregator::new(10, AdmissionWindow::new(0)).unwrap();
        aggregator
            .ingest(DeltaPoint {
                key: "latency",
                end_time: 1,
                value: left,
            })
            .unwrap();
        aggregator
            .ingest(DeltaPoint {
                key: "latency",
                end_time: 2,
                value: right,
            })
            .unwrap();

        let finalized = aggregator.advance_to(10);

        assert_eq!(finalized[0].value.total_count(), 7);
    }

    #[test]
    fn min_and_max_reducers_keep_extreme_values() {
        let mut min =
            BucketedAggregator::with_reducer(10, AdmissionWindow::new(0), MinReducer).unwrap();
        min.ingest(DeltaPoint {
            key: "cpu",
            end_time: 1,
            value: 9_u64,
        })
        .unwrap();
        min.ingest(DeltaPoint {
            key: "cpu",
            end_time: 2,
            value: 4_u64,
        })
        .unwrap();

        let mut max =
            BucketedAggregator::with_reducer(10, AdmissionWindow::new(0), MaxReducer).unwrap();
        max.ingest(DeltaPoint {
            key: "cpu",
            end_time: 1,
            value: 9_u64,
        })
        .unwrap();
        max.ingest(DeltaPoint {
            key: "cpu",
            end_time: 2,
            value: 14_u64,
        })
        .unwrap();

        assert_eq!(min.advance_to(10)[0].value, 4);
        assert_eq!(max.advance_to(10)[0].value, 14);
    }

    #[test]
    fn bucketed_aggregator_validates_period_and_offset() {
        assert_eq!(
            BucketedAggregator::<&str, u64, SumReducer>::with_reducer(
                0,
                AdmissionWindow::new(0),
                SumReducer
            )
            .unwrap_err(),
            CollectionError::EmptyBucketPeriod
        );
        assert_eq!(
            BucketedAggregator::<&str, u64, SumReducer>::with_offset_and_reducer(
                10,
                10,
                AdmissionWindow::new(0),
                SumReducer
            )
            .unwrap_err(),
            CollectionError::OffsetOutsidePeriod
        );
    }
}