use crate::aggregator::{Aggregator, PartialAggregateType};
use roaring::{RoaringBitmap, RoaringTreemap};
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct RoaringPartial32 {
inner: Option<RoaringBitmap>,
}
impl RoaringPartial32 {
const EMPTY: Self = Self { inner: None };
pub fn from_bitmap(bitmap: RoaringBitmap) -> Self {
Self {
inner: Some(bitmap),
}
}
pub fn into_bitmap(self) -> RoaringBitmap {
self.inner.unwrap_or_default()
}
pub fn as_bitmap(&self) -> RoaringBitmap {
self.inner.clone().unwrap_or_default()
}
pub fn as_ref(&self) -> Option<&RoaringBitmap> {
self.inner.as_ref()
}
#[inline]
pub fn contains(&self, value: u32) -> bool {
self.inner
.as_ref()
.is_some_and(|bitmap| bitmap.contains(value))
}
fn union(self, other: Self) -> Self {
match (self.inner, other.inner) {
(Some(mut left), Some(right)) => {
left |= &right;
Self { inner: Some(left) }
}
(Some(left), None) => Self { inner: Some(left) },
(None, Some(right)) => Self { inner: Some(right) },
(None, None) => Self::EMPTY,
}
}
}
impl From<RoaringPartial32> for RoaringBitmap {
fn from(value: RoaringPartial32) -> Self {
value.into_bitmap()
}
}
impl From<&RoaringPartial32> for RoaringBitmap {
fn from(value: &RoaringPartial32) -> Self {
value.as_bitmap()
}
}
impl PartialAggregateType for RoaringPartial32 {}
#[derive(Default, Copy, Debug, Clone)]
pub struct RoaringAggregator32;
impl Aggregator for RoaringAggregator32 {
const IDENTITY: Self::PartialAggregate = RoaringPartial32::EMPTY;
type Input = u32;
type MutablePartialAggregate = RoaringBitmap;
type PartialAggregate = RoaringPartial32;
type Aggregate = RoaringBitmap;
fn lift(input: Self::Input) -> Self::MutablePartialAggregate {
let mut bitmap = RoaringBitmap::new();
bitmap.insert(input);
bitmap
}
fn combine_mutable(mutable: &mut Self::MutablePartialAggregate, input: Self::Input) {
mutable.insert(input);
}
fn freeze(mutable: Self::MutablePartialAggregate) -> Self::PartialAggregate {
RoaringPartial32::from_bitmap(mutable)
}
fn combine(a: Self::PartialAggregate, b: Self::PartialAggregate) -> Self::PartialAggregate {
a.union(b)
}
fn lower(partial: Self::PartialAggregate) -> Self::Aggregate {
partial.into_bitmap()
}
}
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct RoaringPartial64 {
inner: Option<RoaringTreemap>,
}
impl RoaringPartial64 {
const EMPTY: Self = Self { inner: None };
pub fn from_treemap(treemap: RoaringTreemap) -> Self {
Self {
inner: Some(treemap),
}
}
pub fn into_treemap(self) -> RoaringTreemap {
self.inner.unwrap_or_default()
}
pub fn as_treemap(&self) -> RoaringTreemap {
self.inner.clone().unwrap_or_default()
}
pub fn as_ref(&self) -> Option<&RoaringTreemap> {
self.inner.as_ref()
}
#[inline]
pub fn contains(&self, value: u64) -> bool {
self.inner
.as_ref()
.is_some_and(|bitmap| bitmap.contains(value))
}
fn union(self, other: Self) -> Self {
match (self.inner, other.inner) {
(Some(mut left), Some(right)) => {
left |= &right;
Self { inner: Some(left) }
}
(Some(left), None) => Self { inner: Some(left) },
(None, Some(right)) => Self { inner: Some(right) },
(None, None) => Self::EMPTY,
}
}
}
impl From<RoaringPartial64> for RoaringTreemap {
fn from(value: RoaringPartial64) -> Self {
value.into_treemap()
}
}
impl From<&RoaringPartial64> for RoaringTreemap {
fn from(value: &RoaringPartial64) -> Self {
value.as_treemap()
}
}
impl PartialAggregateType for RoaringPartial64 {}
#[derive(Default, Debug, Copy, Clone)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct RoaringAggregator64;
impl Aggregator for RoaringAggregator64 {
const IDENTITY: Self::PartialAggregate = RoaringPartial64::EMPTY;
type Input = u64;
type MutablePartialAggregate = RoaringTreemap;
type PartialAggregate = RoaringPartial64;
type Aggregate = RoaringTreemap;
fn lift(input: Self::Input) -> Self::MutablePartialAggregate {
let mut treemap = RoaringTreemap::new();
treemap.insert(input);
treemap
}
fn combine_mutable(mutable: &mut Self::MutablePartialAggregate, input: Self::Input) {
mutable.insert(input);
}
fn freeze(mutable: Self::MutablePartialAggregate) -> Self::PartialAggregate {
RoaringPartial64::from_treemap(mutable)
}
fn combine(a: Self::PartialAggregate, b: Self::PartialAggregate) -> Self::PartialAggregate {
a.union(b)
}
fn lower(partial: Self::PartialAggregate) -> Self::Aggregate {
partial.into_treemap()
}
}
#[cfg(test)]
mod tests {
use crate::{Entry, RwWheel, WheelRange, aggregator::Aggregator};
use super::{RoaringAggregator32, RoaringAggregator64};
#[derive(Clone, Debug)]
struct SensorEvent32 {
timestamp_ms: u64,
value: u32,
}
#[derive(Clone, Debug)]
struct SensorEvent64 {
timestamp_ms: u64,
value: u64,
}
fn bucket32(timestamp_ms: u64) -> u32 {
(timestamp_ms / 1_000) as u32
}
fn bucket64(timestamp_ms: u64) -> u64 {
timestamp_ms / 1_000
}
#[test]
fn bitmap_reports_temporal_membership() {
const THRESHOLD: u32 = 70;
let events = vec![
SensorEvent32 {
timestamp_ms: 1_000,
value: 65,
},
SensorEvent32 {
timestamp_ms: 2_000,
value: 72,
},
SensorEvent32 {
timestamp_ms: 3_000,
value: 88,
},
SensorEvent32 {
timestamp_ms: 6_000,
value: 63,
},
SensorEvent32 {
timestamp_ms: 7_000,
value: 95,
},
SensorEvent32 {
timestamp_ms: 9_000,
value: 77,
},
];
let mut wheel: RwWheel<RoaringAggregator32> = RwWheel::new(0);
for event in &events {
if event.value >= THRESHOLD {
wheel.insert(Entry::new(bucket32(event.timestamp_ms), event.timestamp_ms));
}
}
let _ = wheel.advance_to(12_000);
let ranges = [
(0, 4_000, vec![2, 3]),
(4_000, 8_000, vec![7]),
(8_000, 12_000, vec![9]),
];
for (start, end, buckets) in ranges {
let bitmap = wheel
.read()
.combine_range(WheelRange::new_unchecked(start, end))
.expect("range should produce aggregate")
.into_bitmap();
assert_eq!(bitmap.iter().collect::<Vec<_>>(), buckets);
}
}
#[test]
fn treemap_reports_temporal_membership() {
const THRESHOLD: u64 = 70;
let events = vec![
SensorEvent64 {
timestamp_ms: 1_000,
value: 65,
},
SensorEvent64 {
timestamp_ms: 2_000,
value: 72,
},
SensorEvent64 {
timestamp_ms: 3_000,
value: 88,
},
SensorEvent64 {
timestamp_ms: 6_000,
value: 63,
},
SensorEvent64 {
timestamp_ms: 7_000,
value: 95,
},
SensorEvent64 {
timestamp_ms: 9_000,
value: 77,
},
];
let mut wheel: RwWheel<RoaringAggregator64> = RwWheel::new(0);
for event in &events {
if event.value >= THRESHOLD {
wheel.insert(Entry::new(bucket64(event.timestamp_ms), event.timestamp_ms));
}
}
let _ = wheel.advance_to(12_000);
let ranges = [
(0, 4_000, vec![2_u64, 3]),
(4_000, 8_000, vec![7]),
(8_000, 12_000, vec![9]),
];
for (start, end, buckets) in ranges {
let treemap = wheel
.read()
.combine_range(WheelRange::new_unchecked(start, end))
.expect("range should produce aggregate")
.into_treemap();
assert_eq!(treemap.iter().collect::<Vec<_>>(), buckets);
}
}
#[test]
fn bitmap_combines_partials_using_union() {
let left_partial = RoaringAggregator32::freeze({
let mut bitmap = RoaringAggregator32::lift(1);
RoaringAggregator32::combine_mutable(&mut bitmap, 3);
bitmap
});
let right_partial = RoaringAggregator32::freeze({
let mut bitmap = RoaringAggregator32::lift(2);
RoaringAggregator32::combine_mutable(&mut bitmap, 4);
bitmap
});
let combined = RoaringAggregator32::combine(left_partial, right_partial);
let bitmap = RoaringAggregator32::lower(combined);
assert_eq!(bitmap.iter().collect::<Vec<_>>(), vec![1, 2, 3, 4]);
}
#[test]
fn treemap_combines_partials_using_union() {
let left_partial = RoaringAggregator64::freeze({
let mut treemap = RoaringAggregator64::lift(1);
RoaringAggregator64::combine_mutable(&mut treemap, 3);
treemap
});
let right_partial = RoaringAggregator64::freeze({
let mut treemap = RoaringAggregator64::lift(2);
RoaringAggregator64::combine_mutable(&mut treemap, 4);
treemap
});
let combined = RoaringAggregator64::combine(left_partial, right_partial);
let treemap = RoaringAggregator64::lower(combined);
assert_eq!(treemap.iter().collect::<Vec<_>>(), vec![1, 2, 3, 4]);
}
}