Skip to main content

reddb_server/telemetry/
slow_query_store.rs

1//! Bounded in-memory ring buffer for slow-query telemetry events.
2//!
3//! This is the operational-telemetry substrate for slow queries
4//! (ADR 0060, §2 "Operational events"). A ring of fixed capacity
5//! evicts the oldest record on overflow — cardinality and retention are
6//! bounded by construction.
7//!
8//! Privacy contract (ADR 0060, §5): tenant and identity are stored as
9//! keyed FNV-1a hashes, never as raw strings. The raw SQL is the
10//! caller-supplied redacted string; fingerprinting tightens this in a
11//! follow-up slice.
12//!
13//! The `read()` method is the pure read-model layer — no filesystem
14//! coupling — and is unit-tested over synthetic records in this file.
15
16use std::collections::VecDeque;
17use std::sync::{Arc, Mutex, OnceLock};
18
19// ---------------------------------------------------------------------------
20// Process-local hash key
21// ---------------------------------------------------------------------------
22
23static HASH_KEY: OnceLock<u64> = OnceLock::new();
24
25fn hash_key() -> u64 {
26    *HASH_KEY.get_or_init(|| {
27        // Process-local seed: timestamp XOR'd with PID. Not secret-quality
28        // randomness but sufficient for operational grouping as per ADR 0060 §5
29        // ("The keyed hash secret is process/deployment-scoped config").
30        std::time::SystemTime::now()
31            .duration_since(std::time::UNIX_EPOCH)
32            .map(|d| d.as_nanos() as u64)
33            .unwrap_or(0xcafe_babe_dead_beef)
34            ^ (std::process::id() as u64).wrapping_mul(0x517cc1b727220a95)
35    })
36}
37
38/// Hash `label` with the process-local key (FNV-1a keyed on `key`).
39pub(crate) fn hash_label(label: &str) -> u64 {
40    hash_label_with_key(label, hash_key())
41}
42
43fn hash_label_with_key(label: &str, key: u64) -> u64 {
44    const FNV_PRIME: u64 = 0x0000_0100_0000_01B3;
45    // XOR the key into the FNV offset basis so different keys produce
46    // different hash spaces for the same input.
47    let mut h = key ^ 0xcbf2_9ce4_8422_2325;
48    for byte in label.bytes() {
49        h ^= byte as u64;
50        h = h.wrapping_mul(FNV_PRIME);
51    }
52    h
53}
54
55// ---------------------------------------------------------------------------
56// SlowQueryEvent
57// ---------------------------------------------------------------------------
58
59/// A single slow-query operational telemetry event.
60///
61/// `tenant_hash` and `identity_hash` are keyed FNV-1a digests of the
62/// raw tenant / identity strings (ADR 0060, §5). Raw values are never
63/// stored by the substrate.
64#[derive(Debug, Clone)]
65pub struct SlowQueryEvent {
66    pub ts_ms: u64,
67    pub kind: &'static str,
68    pub duration_ms: u64,
69    /// Caller-supplied redacted SQL (literals collapsed by the producer).
70    pub sql_redacted: String,
71    /// Keyed hash of the raw tenant label.
72    pub tenant_hash: u64,
73    /// Keyed hash of the raw identity label.
74    pub identity_hash: u64,
75}
76
77// ---------------------------------------------------------------------------
78// SlowQueryFilter
79// ---------------------------------------------------------------------------
80
81/// Parameters for the pure read-model filter over recent slow-query events.
82///
83/// All fields are optional; an absent field means "no constraint."
84/// `limit` defaults to [`DEFAULT_READ_LIMIT`] when `None`.
85#[derive(Debug, Default, Clone)]
86pub struct SlowQueryFilter {
87    /// Maximum number of events to return (most-recent first).
88    pub limit: Option<usize>,
89    /// Return only events with `ts_ms >= since_ms`.
90    pub since_ms: Option<u64>,
91    /// Return only events with `duration_ms >= min_duration_ms`.
92    pub min_duration_ms: Option<u64>,
93    /// Return only events whose `kind` equals this exact string.
94    pub kind: Option<&'static str>,
95}
96
97// ---------------------------------------------------------------------------
98// SlowQueryStore
99// ---------------------------------------------------------------------------
100
101/// Default per-class ring capacity (ADR 0060, §3: "last 10k slow queries").
102pub const DEFAULT_CAP: usize = 10_000;
103
104/// Default read limit when `SlowQueryFilter::limit` is absent.
105const DEFAULT_READ_LIMIT: usize = 100;
106
107/// Bounded ring buffer holding the most-recent `cap` slow-query events.
108///
109/// Thread-safe via a `Mutex<VecDeque>`. The lock is held only for
110/// above-threshold, sampled events so contention is minimal.
111pub struct SlowQueryStore {
112    ring: Mutex<VecDeque<SlowQueryEvent>>,
113    cap: usize,
114}
115
116impl SlowQueryStore {
117    pub fn new(cap: usize) -> Arc<Self> {
118        Arc::new(Self {
119            ring: Mutex::new(VecDeque::with_capacity(cap.min(1024))),
120            cap,
121        })
122    }
123
124    /// Append an event; evicts the oldest record when the ring is at capacity.
125    pub fn push(&self, event: SlowQueryEvent) {
126        if let Ok(mut ring) = self.ring.lock() {
127            if ring.len() >= self.cap {
128                ring.pop_front();
129            }
130            ring.push_back(event);
131        }
132    }
133
134    /// Return recent events, most-recent first, applying `filter`.
135    ///
136    /// This is the pure read-model layer; it never touches the filesystem
137    /// and holds the lock only for the duration of the linear scan.
138    pub fn read(&self, filter: &SlowQueryFilter) -> Vec<SlowQueryEvent> {
139        let limit = filter.limit.unwrap_or(DEFAULT_READ_LIMIT);
140        let Ok(ring) = self.ring.lock() else {
141            return vec![];
142        };
143
144        ring.iter()
145            .rev()
146            .filter(|e| {
147                if let Some(since) = filter.since_ms {
148                    if e.ts_ms < since {
149                        return false;
150                    }
151                }
152                if let Some(min_dur) = filter.min_duration_ms {
153                    if e.duration_ms < min_dur {
154                        return false;
155                    }
156                }
157                if let Some(kind) = filter.kind {
158                    if e.kind != kind {
159                        return false;
160                    }
161                }
162                true
163            })
164            .take(limit)
165            .cloned()
166            .collect()
167    }
168
169    /// Number of events currently in the ring.
170    pub fn len(&self) -> usize {
171        self.ring.lock().map(|r| r.len()).unwrap_or(0)
172    }
173
174    pub fn is_empty(&self) -> bool {
175        self.len() == 0
176    }
177}
178
179// ---------------------------------------------------------------------------
180// Tests — pure filter layer, zero filesystem coupling
181// ---------------------------------------------------------------------------
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186
187    fn event(ts_ms: u64, kind: &'static str, duration_ms: u64) -> SlowQueryEvent {
188        SlowQueryEvent {
189            ts_ms,
190            kind,
191            duration_ms,
192            sql_redacted: format!("SELECT {ts_ms} FROM t"),
193            tenant_hash: hash_label_with_key("tenant_a", 0xdead_beef),
194            identity_hash: hash_label_with_key("user_1", 0xdead_beef),
195        }
196    }
197
198    fn filled(events: &[(u64, &'static str, u64)]) -> Arc<SlowQueryStore> {
199        let store = SlowQueryStore::new(DEFAULT_CAP);
200        for &(ts, kind, dur) in events {
201            store.push(event(ts, kind, dur));
202        }
203        store
204    }
205
206    #[test]
207    fn empty_store_returns_empty() {
208        let store = SlowQueryStore::new(DEFAULT_CAP);
209        assert!(store.read(&SlowQueryFilter::default()).is_empty());
210    }
211
212    #[test]
213    fn results_most_recent_first() {
214        let store = filled(&[
215            (1000, "select", 100),
216            (2000, "select", 100),
217            (3000, "select", 100),
218        ]);
219        let result = store.read(&SlowQueryFilter::default());
220        assert_eq!(result.len(), 3);
221        assert!(result[0].ts_ms >= result[1].ts_ms);
222        assert!(result[1].ts_ms >= result[2].ts_ms);
223    }
224
225    #[test]
226    fn limit_returns_n_most_recent() {
227        let store = filled(&[
228            (1000, "select", 100),
229            (2000, "select", 100),
230            (3000, "select", 100),
231            (4000, "select", 100),
232            (5000, "select", 100),
233        ]);
234        let result = store.read(&SlowQueryFilter {
235            limit: Some(2),
236            ..Default::default()
237        });
238        assert_eq!(result.len(), 2);
239        assert_eq!(result[0].ts_ms, 5000);
240        assert_eq!(result[1].ts_ms, 4000);
241    }
242
243    #[test]
244    fn since_ms_excludes_older_events() {
245        let store = filled(&[
246            (1000, "select", 100),
247            (2000, "select", 100),
248            (3000, "select", 100),
249        ]);
250        let result = store.read(&SlowQueryFilter {
251            since_ms: Some(2000),
252            ..Default::default()
253        });
254        assert_eq!(result.len(), 2);
255        for e in &result {
256            assert!(e.ts_ms >= 2000, "ts_ms {} below since_ms", e.ts_ms);
257        }
258    }
259
260    #[test]
261    fn min_duration_ms_excludes_fast_queries() {
262        let store = filled(&[
263            (1000, "select", 50),
264            (2000, "select", 200),
265            (3000, "select", 500),
266        ]);
267        let result = store.read(&SlowQueryFilter {
268            min_duration_ms: Some(200),
269            ..Default::default()
270        });
271        assert_eq!(result.len(), 2);
272        for e in &result {
273            assert!(e.duration_ms >= 200, "duration {} below min", e.duration_ms);
274        }
275    }
276
277    #[test]
278    fn kind_filter_returns_only_matching() {
279        let store = filled(&[
280            (1000, "select", 100),
281            (2000, "insert", 100),
282            (3000, "select", 100),
283            (4000, "delete", 100),
284        ]);
285        let result = store.read(&SlowQueryFilter {
286            kind: Some("select"),
287            ..Default::default()
288        });
289        assert_eq!(result.len(), 2);
290        for e in &result {
291            assert_eq!(e.kind, "select");
292        }
293    }
294
295    #[test]
296    fn combined_filters_are_conjunctive() {
297        let store = filled(&[
298            (1000, "select", 100),
299            (2000, "select", 300),
300            (3000, "insert", 300),
301            (4000, "select", 300),
302        ]);
303        let result = store.read(&SlowQueryFilter {
304            since_ms: Some(2000),
305            min_duration_ms: Some(300),
306            kind: Some("select"),
307            limit: Some(10),
308        });
309        assert_eq!(result.len(), 2);
310        for e in &result {
311            assert_eq!(e.kind, "select");
312            assert!(e.ts_ms >= 2000);
313            assert!(e.duration_ms >= 300);
314        }
315    }
316
317    #[test]
318    fn ring_evicts_oldest_on_overflow() {
319        let store = SlowQueryStore::new(3);
320        for i in 0..5u64 {
321            store.push(event(i * 1000, "select", 100));
322        }
323        assert_eq!(store.len(), 3);
324        let result = store.read(&SlowQueryFilter::default());
325        let tss: Vec<u64> = result.iter().map(|e| e.ts_ms).collect();
326        assert!(!tss.contains(&0), "ts=0 should have been evicted");
327        assert!(!tss.contains(&1000), "ts=1000 should have been evicted");
328        assert!(tss.contains(&4000), "ts=4000 must be present");
329    }
330
331    #[test]
332    fn default_limit_caps_read() {
333        let store = SlowQueryStore::new(DEFAULT_CAP);
334        for i in 0..(DEFAULT_READ_LIMIT + 50) as u64 {
335            store.push(event(i * 1000, "select", 100));
336        }
337        assert_eq!(
338            store.read(&SlowQueryFilter::default()).len(),
339            DEFAULT_READ_LIMIT
340        );
341    }
342
343    #[test]
344    fn hash_stable_same_key() {
345        assert_eq!(
346            hash_label_with_key("my-tenant", 0xdead_beef),
347            hash_label_with_key("my-tenant", 0xdead_beef),
348        );
349    }
350
351    #[test]
352    fn hash_different_inputs_differ() {
353        assert_ne!(
354            hash_label_with_key("tenant_a", 0xdead_beef),
355            hash_label_with_key("tenant_b", 0xdead_beef),
356        );
357    }
358
359    #[test]
360    fn hash_different_keys_differ() {
361        assert_ne!(
362            hash_label_with_key("tenant", 0x1111),
363            hash_label_with_key("tenant", 0x2222),
364        );
365    }
366}