Skip to main content

rsigma_eval/correlation/
buffers.rs

1use std::collections::VecDeque;
2use std::io::{Read as IoRead, Write as IoWrite};
3
4use flate2::Compression;
5use flate2::read::DeflateDecoder;
6use flate2::write::DeflateEncoder;
7use serde::Serialize;
8
9// =============================================================================
10// Compressed Event Buffer
11// =============================================================================
12
13/// Default compression level — fast compression (level 1) for minimal latency.
14/// Deflate level 1 still achieves ~2-3x compression on JSON while being very fast.
15const COMPRESSION_LEVEL: Compression = Compression::fast();
16
17/// Compressed event storage for correlation event inclusion.
18///
19/// Stores event JSON payloads as individually deflate-compressed blobs alongside
20/// their timestamps. This enables per-event eviction (matching `WindowState`
21/// eviction) while keeping memory usage low.
22///
23/// # Memory Model
24///
25/// Each stored event costs approximately `compressed_size + 24` bytes
26/// (8 for timestamp, 16 for Vec overhead). Typical JSON events (500B–5KB)
27/// compress to 100B–1KB with deflate, giving 3–5x memory savings.
28///
29/// The buffer enforces a hard cap (`max_events`) so memory is bounded at:
30///   `max_events × (avg_compressed_size + 24)` bytes per group key.
31#[derive(Debug, Clone, Serialize, serde::Deserialize)]
32pub struct EventBuffer {
33    /// (timestamp, deflate-compressed event JSON) pairs, ordered by timestamp.
34    #[serde(with = "event_buffer_serde")]
35    entries: VecDeque<(i64, Vec<u8>)>,
36    /// Maximum number of events to retain. When exceeded, the oldest event is
37    /// evicted regardless of the time window.
38    max_events: usize,
39}
40
41/// Custom serde for EventBuffer entries: encodes compressed bytes as base64
42/// instead of JSON number arrays, cutting snapshot size ~3x.
43mod event_buffer_serde {
44    use serde::{Deserialize, Deserializer, Serialize, Serializer};
45    use std::collections::VecDeque;
46
47    #[derive(Serialize, Deserialize)]
48    struct Entry {
49        ts: i64,
50        #[serde(with = "base64_bytes")]
51        data: Vec<u8>,
52    }
53
54    mod base64_bytes {
55        use base64::Engine as _;
56        use base64::engine::general_purpose::STANDARD;
57        use serde::{Deserializer, Serializer};
58
59        pub fn serialize<S: Serializer>(bytes: &Vec<u8>, s: S) -> Result<S::Ok, S::Error> {
60            s.serialize_str(&STANDARD.encode(bytes))
61        }
62
63        pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Vec<u8>, D::Error> {
64            let s: String = serde::Deserialize::deserialize(d)?;
65            STANDARD.decode(s).map_err(serde::de::Error::custom)
66        }
67    }
68
69    pub fn serialize<S: Serializer>(
70        entries: &VecDeque<(i64, Vec<u8>)>,
71        s: S,
72    ) -> Result<S::Ok, S::Error> {
73        let v: Vec<Entry> = entries
74            .iter()
75            .map(|(ts, data)| Entry {
76                ts: *ts,
77                data: data.clone(),
78            })
79            .collect();
80        v.serialize(s)
81    }
82
83    pub fn deserialize<'de, D: Deserializer<'de>>(
84        d: D,
85    ) -> Result<VecDeque<(i64, Vec<u8>)>, D::Error> {
86        let v: Vec<Entry> = Vec::deserialize(d)?;
87        Ok(v.into_iter().map(|e| (e.ts, e.data)).collect())
88    }
89}
90
91impl EventBuffer {
92    /// Create a new event buffer with the given capacity cap.
93    pub fn new(max_events: usize) -> Self {
94        EventBuffer {
95            entries: VecDeque::with_capacity(max_events.min(64)),
96            max_events,
97        }
98    }
99
100    /// Compress and store an event. Evicts the oldest entry if at capacity.
101    pub fn push(&mut self, ts: i64, event: &serde_json::Value) {
102        // Compress the event JSON with deflate
103        if let Some(compressed) = compress_event(event) {
104            if self.entries.len() >= self.max_events {
105                self.entries.pop_front();
106            }
107            self.entries.push_back((ts, compressed));
108        }
109    }
110
111    /// Remove all entries older than the cutoff timestamp.
112    pub fn evict(&mut self, cutoff: i64) {
113        while self.entries.front().is_some_and(|(t, _)| *t < cutoff) {
114            self.entries.pop_front();
115        }
116    }
117
118    /// Decompress and return all stored events.
119    pub fn decompress_all(&self) -> Vec<serde_json::Value> {
120        self.entries
121            .iter()
122            .filter_map(|(_, compressed)| decompress_event(compressed))
123            .collect()
124    }
125
126    /// Returns true if there are no stored events.
127    pub fn is_empty(&self) -> bool {
128        self.entries.is_empty()
129    }
130
131    /// Clear all stored events.
132    pub fn clear(&mut self) {
133        self.entries.clear();
134    }
135
136    /// Total compressed bytes stored (for monitoring/diagnostics).
137    pub fn compressed_bytes(&self) -> usize {
138        self.entries.iter().map(|(_, data)| data.len()).sum()
139    }
140
141    /// Number of stored events.
142    pub fn len(&self) -> usize {
143        self.entries.len()
144    }
145}
146
147/// Compress an event JSON value using deflate.
148pub(super) fn compress_event(event: &serde_json::Value) -> Option<Vec<u8>> {
149    let json_bytes = serde_json::to_vec(event).ok()?;
150    let mut encoder = DeflateEncoder::new(Vec::new(), COMPRESSION_LEVEL);
151    encoder.write_all(&json_bytes).ok()?;
152    encoder.finish().ok()
153}
154
155/// Decompress a deflate-compressed event back to a JSON value.
156pub(super) fn decompress_event(compressed: &[u8]) -> Option<serde_json::Value> {
157    let mut decoder = DeflateDecoder::new(compressed);
158    let mut json_bytes = Vec::new();
159    decoder.read_to_end(&mut json_bytes).ok()?;
160    serde_json::from_slice(&json_bytes).ok()
161}
162
163// =============================================================================
164// Event Reference (lightweight mode)
165// =============================================================================
166
167/// A lightweight event reference: timestamp plus optional event ID.
168///
169/// Used in `Refs` mode for memory-efficient correlation event tracking.
170/// Each ref costs ~40 bytes (vs. 100–1000+ bytes for compressed events),
171/// making this mode suitable for high-volume correlations where only
172/// traceability is needed.
173#[derive(Debug, Clone, Serialize, serde::Deserialize)]
174pub struct EventRef {
175    /// Event timestamp (epoch seconds).
176    pub timestamp: i64,
177    /// Event ID extracted from common fields (`id`, `_id`, `event_id`, etc.).
178    #[serde(skip_serializing_if = "Option::is_none")]
179    pub id: Option<String>,
180}
181
182/// Lightweight event reference buffer for `Refs` mode.
183///
184/// Stores only timestamps and optional event IDs — no event payload,
185/// no compression. This is the minimal-memory alternative to `EventBuffer`.
186#[derive(Debug, Clone, Serialize, serde::Deserialize)]
187pub struct EventRefBuffer {
188    /// Event references, ordered by timestamp.
189    entries: VecDeque<EventRef>,
190    /// Maximum number of refs to retain.
191    max_events: usize,
192}
193
194impl EventRefBuffer {
195    /// Create a new ref buffer with the given capacity cap.
196    pub fn new(max_events: usize) -> Self {
197        EventRefBuffer {
198            entries: VecDeque::with_capacity(max_events.min(64)),
199            max_events,
200        }
201    }
202
203    /// Store a reference to an event. Evicts the oldest ref if at capacity.
204    pub fn push(&mut self, ts: i64, event: &serde_json::Value) {
205        if self.entries.len() >= self.max_events {
206            self.entries.pop_front();
207        }
208        let id = extract_event_id(event);
209        self.entries.push_back(EventRef { timestamp: ts, id });
210    }
211
212    /// Remove all refs older than the cutoff timestamp.
213    pub fn evict(&mut self, cutoff: i64) {
214        while self.entries.front().is_some_and(|r| r.timestamp < cutoff) {
215            self.entries.pop_front();
216        }
217    }
218
219    /// Return cloned refs.
220    pub fn refs(&self) -> Vec<EventRef> {
221        self.entries.iter().cloned().collect()
222    }
223
224    /// Returns true if there are no stored refs.
225    pub fn is_empty(&self) -> bool {
226        self.entries.is_empty()
227    }
228
229    /// Clear all stored refs.
230    pub fn clear(&mut self) {
231        self.entries.clear();
232    }
233
234    /// Number of stored refs.
235    pub fn len(&self) -> usize {
236        self.entries.len()
237    }
238}
239
240/// Try to extract an event ID from common fields.
241///
242/// Checks (in order): `id`, `_id`, `event_id`, `EventRecordID`, `event.id`.
243/// Returns the first found value as a string.
244pub(super) fn extract_event_id(event: &serde_json::Value) -> Option<String> {
245    const ID_FIELDS: &[&str] = &["id", "_id", "event_id", "EventRecordID", "event.id"];
246    for field in ID_FIELDS {
247        if let Some(val) = event.get(field) {
248            return match val {
249                serde_json::Value::String(s) => Some(s.clone()),
250                serde_json::Value::Number(n) => Some(n.to_string()),
251                _ => None,
252            };
253        }
254    }
255    None
256}