use hashbrown::hash_map::RawEntryMut;
use metrique::{InflectableEntry, RootEntry, RootMetric};
use metrique_core::CloseValue;
use metrique_writer::EntrySink;
use std::hash::BuildHasher;
use std::marker::PhantomData;
use metrique::writer::BoxEntrySink;
use crate::traits::{
AggregateSink, AggregateSinkRef, AggregateStrategy, AggregateTy, FlushableSink, Key, KeyTy,
Merge, MergeRef,
};
use crate::value::NoKey;
pub type AggregatedEntry<T> = crate::traits::AggregationResult<
<<<T as AggregateStrategy>::Key as Key<<T as AggregateStrategy>::Source>>::Key<'static> as CloseValue>::Closed,
<<<T as AggregateStrategy>::Source as Merge>::Merged as CloseValue>::Closed,
>;
pub struct KeyedAggregator<T: AggregateStrategy, Sink = BoxEntrySink> {
storage: hashbrown::HashMap<KeyTy<'static, T>, AggregateTy<T>>,
sink: Sink,
_phantom: PhantomData<T>,
}
impl<T, Sink> KeyedAggregator<T, Sink>
where
T: AggregateStrategy,
<T::Source as Merge>::MergeConfig: Default,
Sink: metrique_writer::EntrySink<AggregatedEntry<T>>,
{
pub fn new(sink: Sink) -> Self {
Self {
storage: Default::default(),
sink,
_phantom: PhantomData,
}
}
}
impl<T, Sink> KeyedAggregator<T, Sink>
where
T: AggregateStrategy,
<T::Source as Merge>::MergeConfig: Default,
Sink: metrique_writer::EntrySink<AggregatedEntry<T>>,
{
fn get_or_create_accum<'a>(
storage: &'a mut hashbrown::HashMap<KeyTy<'static, T>, AggregateTy<T>>,
entry: &T::Source,
) -> &'a mut AggregateTy<T> {
let borrowed_key = T::Key::from_source(entry);
let hash = storage.hasher().hash_one(&borrowed_key);
match storage
.raw_entry_mut()
.from_hash(hash, |k| T::Key::static_key_matches(k, &borrowed_key))
{
RawEntryMut::Occupied(occupied) => occupied.into_mut(),
RawEntryMut::Vacant(vacant) => {
let static_key = T::Key::static_key(&borrowed_key);
let new_value = T::Source::new_merged(&Default::default());
vacant.insert_hashed_nocheck(hash, static_key, new_value).1
}
}
}
}
impl<T, Sink> AggregateSink<T::Source> for KeyedAggregator<T, Sink>
where
T: AggregateStrategy,
<T::Source as Merge>::MergeConfig: Default,
Sink: metrique_writer::EntrySink<AggregatedEntry<T>>,
{
fn merge(&mut self, entry: T::Source) {
let accum = Self::get_or_create_accum(&mut self.storage, &entry);
T::Source::merge(accum, entry);
}
}
impl<T, Sink> AggregateSinkRef<T::Source> for KeyedAggregator<T, Sink>
where
T: AggregateStrategy,
T::Source: MergeRef,
<T::Source as Merge>::MergeConfig: Default,
Sink: metrique_writer::EntrySink<AggregatedEntry<T>>,
{
fn merge_ref(&mut self, entry: &T::Source) {
let accum = Self::get_or_create_accum(&mut self.storage, entry);
T::Source::merge_ref(accum, entry);
}
}
impl<T, Sink> FlushableSink for KeyedAggregator<T, Sink>
where
T: AggregateStrategy,
Sink: metrique_writer::EntrySink<AggregatedEntry<T>>,
{
fn flush(&mut self) {
for (key, aggregated) in self.storage.drain() {
let merged = crate::traits::AggregationResult {
key: key.close(),
aggregated: aggregated.close(),
};
self.sink.append(merged);
}
}
}
pub struct Aggregate<T: AggregateStrategy> {
aggregated: <T::Source as Merge>::Merged,
}
impl<T: AggregateStrategy> CloseValue for Aggregate<T>
where
<T::Source as Merge>::Merged: CloseValue,
{
type Closed = <<T::Source as Merge>::Merged as CloseValue>::Closed;
fn close(self) -> <Self as CloseValue>::Closed {
self.aggregated.close()
}
}
pub type AggregateMustBeUsedOnStructsWithNoKeys = NoKey;
impl<T: AggregateStrategy> Aggregate<T> {
pub fn insert(&mut self, entry: T)
where
T: CloseValue<Closed = T::Source>,
T: AggregateStrategy<Key = AggregateMustBeUsedOnStructsWithNoKeys>,
T::Source: Merge,
{
T::Source::merge(&mut self.aggregated, entry.close());
}
pub fn insert_and_send_to(&mut self, entry: T, sink: &impl EntrySink<RootMetric<T>>)
where
T: CloseValue<Closed = T::Source>,
T: AggregateStrategy<Key = AggregateMustBeUsedOnStructsWithNoKeys>,
T::Source: MergeRef + InflectableEntry,
{
let entry = entry.close();
T::Source::merge_ref(&mut self.aggregated, &entry);
sink.append(RootEntry::new(entry));
}
pub fn insert_direct(&mut self, entry: T::Source)
where
T::Source: Merge,
{
T::Source::merge(&mut self.aggregated, entry);
}
pub fn new(value: <T::Source as Merge>::Merged) -> Self {
Self { aggregated: value }
}
}
impl<T> AggregateSink<T::Source> for Aggregate<T>
where
T: AggregateStrategy,
{
fn merge(&mut self, entry: T::Source) {
T::Source::merge(&mut self.aggregated, entry);
}
}
impl<T> AggregateSinkRef<T::Source> for Aggregate<T>
where
T: AggregateStrategy,
T::Source: MergeRef,
{
fn merge_ref(&mut self, entry: &T::Source) {
T::Source::merge_ref(&mut self.aggregated, entry);
}
}
impl<T: AggregateStrategy> Default for Aggregate<T>
where
<T::Source as Merge>::Merged: Default,
{
fn default() -> Self {
Self {
aggregated: <T::Source as Merge>::Merged::default(),
}
}
}