rsigma_eval/correlation/
buffers.rs1use std::collections::VecDeque;
2use std::io::{Read as IoRead, Write as IoWrite};
3
4use flate2::Compression;
5use flate2::read::DeflateDecoder;
6use flate2::write::DeflateEncoder;
7use serde::Serialize;
8
9const COMPRESSION_LEVEL: Compression = Compression::fast();
16
17#[derive(Debug, Clone, Serialize, serde::Deserialize)]
32pub struct EventBuffer {
33 #[serde(with = "event_buffer_serde")]
35 entries: VecDeque<(i64, Vec<u8>)>,
36 max_events: usize,
39}
40
41mod event_buffer_serde {
44 use serde::{Deserialize, Deserializer, Serialize, Serializer};
45 use std::collections::VecDeque;
46
47 #[derive(Serialize, Deserialize)]
48 struct Entry {
49 ts: i64,
50 #[serde(with = "base64_bytes")]
51 data: Vec<u8>,
52 }
53
54 mod base64_bytes {
55 use base64::Engine as _;
56 use base64::engine::general_purpose::STANDARD;
57 use serde::{Deserializer, Serializer};
58
59 pub fn serialize<S: Serializer>(bytes: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
60 s.serialize_str(&STANDARD.encode(bytes))
61 }
62
63 pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
64 let s: String = serde::Deserialize::deserialize(d)?;
65 STANDARD.decode(s).map_err(serde::de::Error::custom)
66 }
67 }
68
69 pub fn serialize<S: Serializer>(
70 entries: &VecDeque<(i64, Vec<u8>)>,
71 s: S,
72 ) -> Result<S::Ok, S::Error> {
73 let v: Vec<Entry> = entries
74 .iter()
75 .map(|(ts, data)| Entry {
76 ts: *ts,
77 data: data.clone(),
78 })
79 .collect();
80 v.serialize(s)
81 }
82
83 pub fn deserialize<'de, D: Deserializer<'de>>(
84 d: D,
85 ) -> Result<VecDeque<(i64, Vec<u8>)>, D::Error> {
86 let v: Vec<Entry> = Vec::deserialize(d)?;
87 Ok(v.into_iter().map(|e| (e.ts, e.data)).collect())
88 }
89}
90
91impl EventBuffer {
92 pub fn new(max_events: usize) -> Self {
94 EventBuffer {
95 entries: VecDeque::with_capacity(max_events.min(64)),
96 max_events,
97 }
98 }
99
100 pub fn push(&mut self, ts: i64, event: &serde_json::Value) {
102 if let Some(compressed) = compress_event(event) {
104 if self.entries.len() >= self.max_events {
105 self.entries.pop_front();
106 }
107 self.entries.push_back((ts, compressed));
108 }
109 }
110
111 pub fn evict(&mut self, cutoff: i64) {
113 while self.entries.front().is_some_and(|(t, _)| *t < cutoff) {
114 self.entries.pop_front();
115 }
116 }
117
118 pub fn decompress_all(&self) -> Vec<serde_json::Value> {
120 self.entries
121 .iter()
122 .filter_map(|(_, compressed)| decompress_event(compressed))
123 .collect()
124 }
125
126 pub fn is_empty(&self) -> bool {
128 self.entries.is_empty()
129 }
130
131 pub fn clear(&mut self) {
133 self.entries.clear();
134 }
135
136 pub fn compressed_bytes(&self) -> usize {
138 self.entries.iter().map(|(_, data)| data.len()).sum()
139 }
140
141 pub fn len(&self) -> usize {
143 self.entries.len()
144 }
145}
146
147pub(super) fn compress_event(event: &serde_json::Value) -> Option<Vec<u8>> {
149 let json_bytes = serde_json::to_vec(event).ok()?;
150 let mut encoder = DeflateEncoder::new(Vec::new(), COMPRESSION_LEVEL);
151 encoder.write_all(&json_bytes).ok()?;
152 encoder.finish().ok()
153}
154
155pub(super) fn decompress_event(compressed: &[u8]) -> Option<serde_json::Value> {
157 let mut decoder = DeflateDecoder::new(compressed);
158 let mut json_bytes = Vec::new();
159 decoder.read_to_end(&mut json_bytes).ok()?;
160 serde_json::from_slice(&json_bytes).ok()
161}
162
163#[derive(Debug, Clone, Serialize, serde::Deserialize)]
174pub struct EventRef {
175 pub timestamp: i64,
177 #[serde(skip_serializing_if = "Option::is_none")]
179 pub id: Option<String>,
180}
181
182#[derive(Debug, Clone, Serialize, serde::Deserialize)]
187pub struct EventRefBuffer {
188 entries: VecDeque<EventRef>,
190 max_events: usize,
192}
193
194impl EventRefBuffer {
195 pub fn new(max_events: usize) -> Self {
197 EventRefBuffer {
198 entries: VecDeque::with_capacity(max_events.min(64)),
199 max_events,
200 }
201 }
202
203 pub fn push(&mut self, ts: i64, event: &serde_json::Value) {
205 if self.entries.len() >= self.max_events {
206 self.entries.pop_front();
207 }
208 let id = extract_event_id(event);
209 self.entries.push_back(EventRef { timestamp: ts, id });
210 }
211
212 pub fn evict(&mut self, cutoff: i64) {
214 while self.entries.front().is_some_and(|r| r.timestamp < cutoff) {
215 self.entries.pop_front();
216 }
217 }
218
219 pub fn refs(&self) -> Vec<EventRef> {
221 self.entries.iter().cloned().collect()
222 }
223
224 pub fn is_empty(&self) -> bool {
226 self.entries.is_empty()
227 }
228
229 pub fn clear(&mut self) {
231 self.entries.clear();
232 }
233
234 pub fn len(&self) -> usize {
236 self.entries.len()
237 }
238}
239
240pub(super) fn extract_event_id(event: &serde_json::Value) -> Option<String> {
245 const ID_FIELDS: &[&str] = &["id", "_id", "event_id", "EventRecordID", "event.id"];
246 for field in ID_FIELDS {
247 if let Some(val) = event.get(field) {
248 return match val {
249 serde_json::Value::String(s) => Some(s.clone()),
250 serde_json::Value::Number(n) => Some(n.to_string()),
251 _ => None,
252 };
253 }
254 }
255 None
256}