use std::collections::VecDeque;
use std::io::{Read as IoRead, Write as IoWrite};
use flate2::Compression;
use flate2::read::DeflateDecoder;
use flate2::write::DeflateEncoder;
use serde::Serialize;
const COMPRESSION_LEVEL: Compression = Compression::fast();
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct EventBuffer {
#[serde(with = "event_buffer_serde")]
entries: VecDeque<(i64, Vec<u8>)>,
max_events: usize,
}
mod event_buffer_serde {
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::VecDeque;
#[derive(Serialize, Deserialize)]
struct Entry {
ts: i64,
#[serde(with = "base64_bytes")]
data: Vec<u8>,
}
mod base64_bytes {
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD;
use serde::{Deserializer, Serializer};
pub fn serialize<S: Serializer>(bytes: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
s.serialize_str(&STANDARD.encode(bytes))
}
pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
let s: String = serde::Deserialize::deserialize(d)?;
STANDARD.decode(s).map_err(serde::de::Error::custom)
}
}
pub fn serialize<S: Serializer>(
entries: &VecDeque<(i64, Vec<u8>)>,
s: S,
) -> Result<S::Ok, S::Error> {
let v: Vec<Entry> = entries
.iter()
.map(|(ts, data)| Entry {
ts: *ts,
data: data.clone(),
})
.collect();
v.serialize(s)
}
pub fn deserialize<'de, D: Deserializer<'de>>(
d: D,
) -> Result<VecDeque<(i64, Vec<u8>)>, D::Error> {
let v: Vec<Entry> = Vec::deserialize(d)?;
Ok(v.into_iter().map(|e| (e.ts, e.data)).collect())
}
}
impl EventBuffer {
pub fn new(max_events: usize) -> Self {
EventBuffer {
entries: VecDeque::with_capacity(max_events.min(64)),
max_events,
}
}
pub fn push(&mut self, ts: i64, event: &serde_json::Value) {
if let Some(compressed) = compress_event(event) {
if self.entries.len() >= self.max_events {
self.entries.pop_front();
}
self.entries.push_back((ts, compressed));
}
}
pub fn evict(&mut self, cutoff: i64) {
while self.entries.front().is_some_and(|(t, _)| *t < cutoff) {
self.entries.pop_front();
}
}
pub fn decompress_all(&self) -> Vec<serde_json::Value> {
self.entries
.iter()
.filter_map(|(_, compressed)| decompress_event(compressed))
.collect()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn clear(&mut self) {
self.entries.clear();
}
pub fn compressed_bytes(&self) -> usize {
self.entries.iter().map(|(_, data)| data.len()).sum()
}
pub fn len(&self) -> usize {
self.entries.len()
}
}
pub(super) fn compress_event(event: &serde_json::Value) -> Option<Vec<u8>> {
let json_bytes = serde_json::to_vec(event).ok()?;
let mut encoder = DeflateEncoder::new(Vec::new(), COMPRESSION_LEVEL);
encoder.write_all(&json_bytes).ok()?;
encoder.finish().ok()
}
pub(super) fn decompress_event(compressed: &[u8]) -> Option<serde_json::Value> {
let mut decoder = DeflateDecoder::new(compressed);
let mut json_bytes = Vec::new();
decoder.read_to_end(&mut json_bytes).ok()?;
serde_json::from_slice(&json_bytes).ok()
}
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct EventRef {
pub timestamp: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub id: Option<String>,
}
#[derive(Debug, Clone, Serialize, serde::Deserialize)]
pub struct EventRefBuffer {
entries: VecDeque<EventRef>,
max_events: usize,
}
impl EventRefBuffer {
pub fn new(max_events: usize) -> Self {
EventRefBuffer {
entries: VecDeque::with_capacity(max_events.min(64)),
max_events,
}
}
pub fn push(&mut self, ts: i64, event: &serde_json::Value) {
if self.entries.len() >= self.max_events {
self.entries.pop_front();
}
let id = extract_event_id(event);
self.entries.push_back(EventRef { timestamp: ts, id });
}
pub fn evict(&mut self, cutoff: i64) {
while self.entries.front().is_some_and(|r| r.timestamp < cutoff) {
self.entries.pop_front();
}
}
pub fn refs(&self) -> Vec<EventRef> {
self.entries.iter().cloned().collect()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn clear(&mut self) {
self.entries.clear();
}
pub fn len(&self) -> usize {
self.entries.len()
}
}
pub(super) fn extract_event_id(event: &serde_json::Value) -> Option<String> {
const ID_FIELDS: &[&str] = &["id", "_id", "event_id", "EventRecordID", "event.id"];
for field in ID_FIELDS {
if let Some(val) = event.get(field) {
return match val {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
_ => None,
};
}
}
None
}