use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::marker::PhantomData;
use crate::aggregate::Mergeable;
use crate::collect::error::CollectionError;
use crate::collect::reducer::{MergeReducer, Reducer};
use crate::collect::window::{AdmissionWindow, DeltaPoint, FinalizedBucket};
#[derive(Debug, Clone)]
pub struct BucketedAggregator<K, V, R = MergeReducer> {
period: u64,
offset: u64,
admission: AdmissionWindow,
watermark: u64,
reducer: R,
buckets: BTreeMap<(u64, K), V>,
_value: PhantomData<V>,
}
impl<K, V> BucketedAggregator<K, V, MergeReducer>
where
K: Ord,
V: Mergeable,
{
pub fn new(period: u64, admission: AdmissionWindow) -> Result<Self, CollectionError<V::Error>> {
Self::with_reducer(period, admission, MergeReducer)
}
}
impl<K, V, R> BucketedAggregator<K, V, R>
where
K: Ord,
R: Reducer<V>,
{
pub fn with_reducer(
period: u64,
admission: AdmissionWindow,
reducer: R,
) -> Result<Self, CollectionError<R::Error>> {
Self::with_offset_and_reducer(period, 0, admission, reducer)
}
pub fn with_offset_and_reducer(
period: u64,
offset: u64,
admission: AdmissionWindow,
reducer: R,
) -> Result<Self, CollectionError<R::Error>> {
if period == 0 {
return Err(CollectionError::EmptyBucketPeriod);
}
if offset >= period {
return Err(CollectionError::OffsetOutsidePeriod);
}
Ok(Self {
period,
offset,
admission,
watermark: 0,
reducer,
buckets: BTreeMap::new(),
_value: PhantomData,
})
}
pub fn period(&self) -> u64 {
self.period
}
pub fn offset(&self) -> u64 {
self.offset
}
pub fn admission(&self) -> AdmissionWindow {
self.admission
}
pub fn watermark(&self) -> u64 {
self.watermark
}
pub fn open_bucket_count(&self) -> usize {
self.buckets.len()
}
pub fn advance_to(&mut self, watermark: u64) -> Vec<FinalizedBucket<K, V>>
where
K: Clone,
{
self.watermark = self.watermark.max(watermark);
self.drain_finalized()
}
pub fn ingest(&mut self, delta: DeltaPoint<K, V>) -> Result<(), CollectionError<R::Error>> {
if delta.end_time < self.admission.cutoff(self.watermark) {
return Err(CollectionError::LateDelta);
}
let bucket_start = self.bucket_start(delta.end_time);
match self.buckets.entry((bucket_start, delta.key)) {
Entry::Occupied(mut entry) => self
.reducer
.reduce(entry.get_mut(), delta.value)
.map_err(CollectionError::Reducer),
Entry::Vacant(entry) => {
entry.insert(delta.value);
Ok(())
}
}
}
pub fn ingest_at(
&mut self,
watermark: u64,
delta: DeltaPoint<K, V>,
) -> Result<Vec<FinalizedBucket<K, V>>, CollectionError<R::Error>>
where
K: Clone,
{
let mut finalized = self.advance_to(watermark);
self.ingest(delta)?;
finalized.extend(self.drain_finalized());
Ok(finalized)
}
fn bucket_start(&self, timestamp: u64) -> u64 {
if timestamp < self.offset {
return 0;
}
((timestamp - self.offset) / self.period) * self.period + self.offset
}
fn drain_finalized(&mut self) -> Vec<FinalizedBucket<K, V>>
where
K: Clone,
{
let cutoff = self.admission.cutoff(self.watermark);
let mut finalized_keys = Vec::new();
for (start, key) in self.buckets.keys() {
if start.saturating_add(self.period) <= cutoff {
finalized_keys.push((*start, key.clone()));
} else {
break;
}
}
let mut finalized = Vec::with_capacity(finalized_keys.len());
for (start, key) in finalized_keys {
if let Some(value) = self.buckets.remove(&(start, key.clone())) {
finalized.push(FinalizedBucket {
key,
start,
end: start + self.period,
value,
});
}
}
finalized
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::collect::reducer::{MaxReducer, MinReducer, SumReducer};
use crate::distribution::{BucketLayout, Distribution};
#[test]
fn bucketed_aggregator_sums_deltas_and_finalizes_old_buckets() {
let mut aggregator =
BucketedAggregator::with_reducer(10, AdmissionWindow::new(5), SumReducer).unwrap();
aggregator
.ingest(DeltaPoint {
key: "user-a",
end_time: 12,
value: 3_u64,
})
.unwrap();
aggregator
.ingest(DeltaPoint {
key: "user-a",
end_time: 18,
value: 4_u64,
})
.unwrap();
let finalized = aggregator.advance_to(25);
assert_eq!(
finalized,
vec![FinalizedBucket {
key: "user-a",
start: 10,
end: 20,
value: 7
}]
);
assert_eq!(aggregator.open_bucket_count(), 0);
}
#[test]
fn bucketed_aggregator_rejects_late_deltas() {
let mut aggregator =
BucketedAggregator::with_reducer(10, AdmissionWindow::new(5), SumReducer).unwrap();
aggregator.advance_to(30);
let error = aggregator
.ingest(DeltaPoint {
key: "user-a",
end_time: 24,
value: 1_u64,
})
.unwrap_err();
assert_eq!(error, CollectionError::LateDelta);
}
#[test]
fn bucketed_aggregator_supports_offset_bucket_boundaries() {
let mut aggregator =
BucketedAggregator::with_offset_and_reducer(10, 3, AdmissionWindow::new(0), SumReducer)
.unwrap();
assert_eq!(aggregator.offset(), 3);
aggregator
.ingest(DeltaPoint {
key: "user-a",
end_time: 12,
value: 5_u64,
})
.unwrap();
let finalized = aggregator.advance_to(23);
assert_eq!(finalized[0].start, 3);
assert_eq!(finalized[0].end, 13);
}
#[test]
fn bucketed_aggregator_handles_timestamps_before_and_at_offset() {
let mut before_offset =
BucketedAggregator::with_offset_and_reducer(10, 3, AdmissionWindow::new(0), SumReducer)
.unwrap();
before_offset
.ingest(DeltaPoint {
key: "user-a",
end_time: 1,
value: 5_u64,
})
.unwrap();
assert_eq!(before_offset.advance_to(10)[0].start, 0);
let mut at_offset =
BucketedAggregator::with_offset_and_reducer(10, 3, AdmissionWindow::new(0), SumReducer)
.unwrap();
at_offset
.ingest(DeltaPoint {
key: "user-a",
end_time: 3,
value: 7_u64,
})
.unwrap();
assert_eq!(at_offset.advance_to(13)[0].start, 3);
}
#[test]
fn bucketed_aggregator_accepts_delta_at_admission_cutoff() {
let mut aggregator =
BucketedAggregator::with_reducer(10, AdmissionWindow::new(5), SumReducer).unwrap();
aggregator.advance_to(30);
aggregator
.ingest(DeltaPoint {
key: "user-a",
end_time: 25,
value: 1_u64,
})
.unwrap();
assert_eq!(aggregator.open_bucket_count(), 1);
}
#[test]
fn bucketed_aggregator_merges_distribution_deltas() {
let layout = BucketLayout::fixed_width(0.0, 10.0, 2).unwrap();
let mut left = Distribution::<()>::from_layout(&layout);
left.record(3.0, 2).unwrap();
let mut right = Distribution::<()>::from_layout(&layout);
right.record(14.0, 5).unwrap();
let mut aggregator = BucketedAggregator::new(10, AdmissionWindow::new(0)).unwrap();
aggregator
.ingest(DeltaPoint {
key: "latency",
end_time: 1,
value: left,
})
.unwrap();
aggregator
.ingest(DeltaPoint {
key: "latency",
end_time: 2,
value: right,
})
.unwrap();
let finalized = aggregator.advance_to(10);
assert_eq!(finalized[0].value.total_count(), 7);
}
#[test]
fn min_and_max_reducers_keep_extreme_values() {
let mut min =
BucketedAggregator::with_reducer(10, AdmissionWindow::new(0), MinReducer).unwrap();
min.ingest(DeltaPoint {
key: "cpu",
end_time: 1,
value: 9_u64,
})
.unwrap();
min.ingest(DeltaPoint {
key: "cpu",
end_time: 2,
value: 4_u64,
})
.unwrap();
let mut max =
BucketedAggregator::with_reducer(10, AdmissionWindow::new(0), MaxReducer).unwrap();
max.ingest(DeltaPoint {
key: "cpu",
end_time: 1,
value: 9_u64,
})
.unwrap();
max.ingest(DeltaPoint {
key: "cpu",
end_time: 2,
value: 14_u64,
})
.unwrap();
assert_eq!(min.advance_to(10)[0].value, 4);
assert_eq!(max.advance_to(10)[0].value, 14);
}
#[test]
fn bucketed_aggregator_validates_period_and_offset() {
assert_eq!(
BucketedAggregator::<&str, u64, SumReducer>::with_reducer(
0,
AdmissionWindow::new(0),
SumReducer
)
.unwrap_err(),
CollectionError::EmptyBucketPeriod
);
assert_eq!(
BucketedAggregator::<&str, u64, SumReducer>::with_offset_and_reducer(
10,
10,
AdmissionWindow::new(0),
SumReducer
)
.unwrap_err(),
CollectionError::OffsetOutsidePeriod
);
}
}