use crate::metrics::event_slot::EventSlot;
use crate::metrics::metric_types::MetricEvent;
use chrono::{DateTime, Duration, Utc};
use spdlog::debug;
use std::collections::HashMap;
pub struct Event {
pub metric_event: MetricEvent,
pub date_time: DateTime<Utc>,
pub total: u64,
}
pub struct MetricAggregator {
slot_size: Duration,
slots: HashMap<String, EventSlot>,
history: Vec<EventSlot>,
}
impl MetricAggregator {
pub fn new(slot_size: Duration) -> Self {
Self {
slot_size,
slots: Default::default(),
history: vec![],
}
}
pub fn flush(&mut self) {
let date_time = Utc::now();
let mut should_drain = false;
for (_, slot) in self.slots.iter_mut() {
if date_time >= slot.stats_date_end {
should_drain = true;
break;
}
}
debug!(
"Flush called for {}. Should_drain={}",
date_time, should_drain
);
if should_drain {
let values: Vec<EventSlot> = self.slots.drain().map(|(_, v)| v).collect();
self.history.extend(values);
}
}
pub fn add_event(&mut self, event: Event) {
let hash_key = EventSlot::key_from(&event);
if let Some(slot) = self.slots.get_mut(&hash_key) {
if event.date_time < slot.stats_date_end {
let inserted = slot.origins.insert(event.metric_event.origin);
if inserted {
slot.unique_total += event.total;
}
slot.total += event.total;
return;
} else {
let values: Vec<EventSlot> = self.slots.drain().map(|(_, v)| v).collect();
self.history.extend(values);
}
}
let slot = EventSlot::from_event(event, &self.slot_size);
self.slots.insert(hash_key, slot);
}
pub fn take_events(&mut self) -> Option<Vec<EventSlot>> {
if self.history.is_empty() {
return None;
}
Some(std::mem::take(&mut self.history))
}
}