Skip to main content

reddb_server/telemetry/
slow_query_logger.rs

1//! Dedicated slow-query sink — writes structured JSON lines to `red-slow.log`.
2//!
3//! Below-threshold calls pay only one relaxed atomic load and return
4//! immediately with zero allocations. Above-threshold calls go through
5//! a deterministic counter-based sampler before writing to the
6//! `NonBlocking` writer, which pushes the bytes onto a channel and
7//! returns without waiting for the disk write.
8
9use std::io::Write;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
12use std::sync::{Arc, Mutex};
13
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15
16use crate::runtime::EffectiveScope;
17
18// ---------------------------------------------------------------------------
19// Public types
20// ---------------------------------------------------------------------------
21
22pub struct SlowQueryOpts {
23    pub log_dir: PathBuf,
24    pub threshold_ms: u64,
25    /// 0..=100; values > 100 are clamped to 100.
26    pub sample_pct: u8,
27}
28
29/// Closed enum of query kinds emitted in the slow-query log.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum QueryKind {
32    Select,
33    Insert,
34    Update,
35    Delete,
36    Bulk,
37    Aggregate,
38    DDL,
39    Internal,
40}
41
42impl QueryKind {
43    fn as_str(self) -> &'static str {
44        match self {
45            Self::Select => "select",
46            Self::Insert => "insert",
47            Self::Update => "update",
48            Self::Delete => "delete",
49            Self::Bulk => "bulk",
50            Self::Aggregate => "aggregate",
51            Self::DDL => "ddl",
52            Self::Internal => "internal",
53        }
54    }
55}
56
57// ---------------------------------------------------------------------------
58// SlowQueryLogger
59// ---------------------------------------------------------------------------
60
61pub struct SlowQueryLogger {
62    /// Interior-mutable because `Write::write_all` takes `&mut self`.
63    writer: Mutex<NonBlocking>,
64    /// Keeps the background writer thread alive for the process lifetime.
65    _guard: WorkerGuard,
66    threshold_ms: AtomicU64,
67    /// 0..100; 100 means "emit every above-threshold query".
68    sample_pct: AtomicU8,
69    /// Monotonic counter across above-threshold calls — drives round-robin
70    /// sampling so exactly `sample_pct`% of above-threshold calls emit.
71    above_count: AtomicU64,
72}
73
74impl SlowQueryLogger {
75    pub fn new(opts: SlowQueryOpts) -> Arc<Self> {
76        let _ = std::fs::create_dir_all(&opts.log_dir);
77        let path = opts.log_dir.join("red-slow.log");
78        let file = std::fs::OpenOptions::new()
79            .create(true)
80            .append(true)
81            .open(&path)
82            .unwrap_or_else(|e| panic!("SlowQueryLogger: cannot open {}: {e}", path.display()));
83
84        let (writer, guard) = NonBlockingBuilder::default()
85            .buffered_lines_limit(65_536)
86            .lossy(true)
87            .finish(file);
88
89        Arc::new(Self {
90            writer: Mutex::new(writer),
91            _guard: guard,
92            threshold_ms: AtomicU64::new(opts.threshold_ms),
93            sample_pct: AtomicU8::new(opts.sample_pct.min(100)),
94            above_count: AtomicU64::new(0),
95        })
96    }
97
98    /// Record a completed query. Below-threshold: single relaxed load, no
99    /// allocation. Above-threshold + sampled: emit a JSON line to the sink.
100    pub fn record(
101        &self,
102        kind: QueryKind,
103        duration_ms: u64,
104        sql_redacted: String,
105        scope: &EffectiveScope,
106    ) {
107        // Hot path: threshold gate — single relaxed atomic load.
108        if duration_ms < self.threshold_ms.load(Ordering::Relaxed) {
109            return;
110        }
111
112        // Sampling: deterministic round-robin counter.
113        // `above_count % 100 < sample_pct` gives exactly sample_pct% over
114        // long runs without any floating-point or RNG overhead.
115        let pct = u64::from(self.sample_pct.load(Ordering::Relaxed));
116        if pct < 100 {
117            let n = self.above_count.fetch_add(1, Ordering::Relaxed);
118            if (n % 100) >= pct {
119                return;
120            }
121        }
122
123        self.emit(kind, duration_ms, sql_redacted, scope);
124    }
125
126    fn emit(
127        &self,
128        kind: QueryKind,
129        duration_ms: u64,
130        sql_redacted: String,
131        scope: &EffectiveScope,
132    ) {
133        let ts_ms = crate::utils::now_unix_millis();
134        let tenant = scope.tenant.as_deref().unwrap_or("").to_string();
135        let identity = scope
136            .identity
137            .as_ref()
138            .map(|(u, _)| u.as_str())
139            .unwrap_or("")
140            .to_string();
141
142        let mut map = std::collections::BTreeMap::new();
143        map.insert(
144            "ts_ms".to_string(),
145            crate::json::Value::Number(ts_ms as f64),
146        );
147        map.insert(
148            "kind".to_string(),
149            crate::json::Value::String(kind.as_str().to_string()),
150        );
151        map.insert(
152            "duration_ms".to_string(),
153            crate::json::Value::Number(duration_ms as f64),
154        );
155        map.insert("sql".to_string(), crate::json::Value::String(sql_redacted));
156        map.insert("tenant".to_string(), crate::json::Value::String(tenant));
157        map.insert("identity".to_string(), crate::json::Value::String(identity));
158
159        let obj = crate::json::Value::Object(map);
160        if let Ok(mut line) = crate::json::to_string(&obj) {
161            line.push('\n');
162            if let Ok(mut w) = self.writer.lock() {
163                let _ = w.write_all(line.as_bytes());
164            }
165        }
166    }
167}
168
169// ---------------------------------------------------------------------------
170// Tests
171// ---------------------------------------------------------------------------
172
173#[cfg(test)]
174mod tests {
175    use std::collections::HashSet;
176    use std::time::Instant;
177
178    use super::*;
179    use crate::runtime::EffectiveScope;
180    use crate::storage::transaction::snapshot::Snapshot;
181
182    fn tmp_dir() -> PathBuf {
183        let mut d = std::env::temp_dir();
184        d.push(format!(
185            "reddb-slow-{}-{}",
186            std::process::id(),
187            crate::utils::now_unix_nanos()
188        ));
189        d
190    }
191
192    fn logger(dir: &PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<SlowQueryLogger> {
193        SlowQueryLogger::new(SlowQueryOpts {
194            log_dir: dir.clone(),
195            threshold_ms,
196            sample_pct,
197        })
198    }
199
200    fn empty_scope() -> EffectiveScope {
201        EffectiveScope {
202            tenant: None,
203            identity: None,
204            snapshot: Snapshot {
205                xid: 0,
206                in_progress: HashSet::new(),
207            },
208            visible_collections: None,
209        }
210    }
211
212    fn flush(_log: &Arc<SlowQueryLogger>) {
213        // NonBlocking sends bytes to a background thread; sleep gives the
214        // worker time to drain the channel and write to disk.
215        std::thread::sleep(std::time::Duration::from_millis(50));
216    }
217
218    fn read_log_lines(dir: &PathBuf) -> Vec<crate::json::Value> {
219        let path = dir.join("red-slow.log");
220        let body = std::fs::read_to_string(&path).unwrap_or_default();
221        body.lines()
222            .filter(|l| !l.is_empty())
223            .map(|l| crate::json::from_str::<crate::json::Value>(l).expect("valid JSON"))
224            .collect()
225    }
226
227    // -----------------------------------------------------------------------
228    // Below-threshold: zero writes, fast
229    // -----------------------------------------------------------------------
230
231    #[test]
232    fn below_threshold_no_file_writes() {
233        let dir = tmp_dir();
234        let log = logger(&dir, 1000, 100);
235        let scope = empty_scope();
236
237        for _ in 0..10_000 {
238            log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
239        }
240
241        flush(&log);
242        let lines = read_log_lines(&dir);
243        assert!(
244            lines.is_empty(),
245            "expected zero writes, got {}",
246            lines.len()
247        );
248    }
249
250    #[test]
251    fn below_threshold_wall_time_under_10ms() {
252        let dir = tmp_dir();
253        let log = logger(&dir, 1000, 100);
254        let scope = empty_scope();
255
256        let start = Instant::now();
257        for _ in 0..10_000 {
258            log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
259        }
260        let elapsed = start.elapsed();
261        assert!(
262            elapsed.as_millis() < 10,
263            "10k below-threshold calls took {}ms (>10ms budget)",
264            elapsed.as_millis()
265        );
266    }
267
268    // -----------------------------------------------------------------------
269    // Above-threshold: structured JSON line, parseable fields
270    // -----------------------------------------------------------------------
271
272    #[test]
273    fn above_threshold_emits_json_line() {
274        let dir = tmp_dir();
275        let log = logger(&dir, 10, 100);
276        let scope = empty_scope();
277
278        log.record(QueryKind::Select, 100, "SELECT * FROM t".into(), &scope);
279        flush(&log);
280
281        let lines = read_log_lines(&dir);
282        assert_eq!(lines.len(), 1, "expected 1 line");
283        let v = &lines[0];
284        assert_eq!(v.get("kind").and_then(|x| x.as_str()), Some("select"));
285        assert_eq!(v.get("duration_ms").and_then(|x| x.as_i64()), Some(100));
286        let sql = v.get("sql").and_then(|x| x.as_str());
287        assert_eq!(sql, Some("SELECT * FROM t"));
288    }
289
290    #[test]
291    fn json_line_has_all_required_fields() {
292        let dir = tmp_dir();
293        let log = logger(&dir, 0, 100);
294        let scope = empty_scope();
295
296        log.record(
297            QueryKind::Insert,
298            42,
299            "INSERT INTO t VALUES (1)".into(),
300            &scope,
301        );
302        flush(&log);
303
304        let lines = read_log_lines(&dir);
305        assert_eq!(lines.len(), 1);
306        let v = &lines[0];
307        assert!(v.get("ts_ms").is_some(), "missing ts_ms");
308        assert!(v.get("kind").is_some(), "missing kind");
309        assert!(v.get("duration_ms").is_some(), "missing duration_ms");
310        assert!(v.get("sql").is_some(), "missing sql");
311        assert!(v.get("tenant").is_some(), "missing tenant");
312        assert!(v.get("identity").is_some(), "missing identity");
313    }
314
315    #[test]
316    fn all_query_kinds_serialise() {
317        let kinds = [
318            QueryKind::Select,
319            QueryKind::Insert,
320            QueryKind::Update,
321            QueryKind::Delete,
322            QueryKind::Bulk,
323            QueryKind::Aggregate,
324            QueryKind::DDL,
325            QueryKind::Internal,
326        ];
327        for k in kinds {
328            assert!(!k.as_str().is_empty());
329        }
330    }
331
332    // -----------------------------------------------------------------------
333    // Sampling property test: sample_pct=10 → ~10% (±2pp) over 10_000 calls
334    // -----------------------------------------------------------------------
335
336    #[test]
337    fn sampling_property_10pct() {
338        let dir = tmp_dir();
339        // threshold=0 so every call is above-threshold
340        let log = logger(&dir, 0, 10);
341        let scope = empty_scope();
342
343        let calls = 10_000u64;
344        for i in 0..calls {
345            log.record(QueryKind::Select, 1, format!("SELECT {i}"), &scope);
346        }
347        flush(&log);
348
349        let lines = read_log_lines(&dir);
350        let emitted = lines.len() as u64;
351        // Expected: 1000 ± 200 (2pp of 10_000)
352        assert!(
353            emitted >= 800 && emitted <= 1200,
354            "sample_pct=10 over {calls} calls emitted {emitted} (expected 800..=1200)"
355        );
356    }
357
358    // -----------------------------------------------------------------------
359    // Adversarial SQL: CRLF / NUL / quote → escape-safe JSON
360    // -----------------------------------------------------------------------
361
362    #[test]
363    fn adversarial_sql_is_escape_safe() {
364        let payloads: &[(&str, &str)] = &[
365            ("crlf", "SELECT 1\r\nDROP TABLE t--"),
366            ("nul", "SELECT '\x00'"),
367            ("quote", r#"SELECT "secret" FROM t"#),
368            ("json_inject", r#"SELECT 1},"pwned":true,{"x":"#),
369            ("low_ctrl", "SELECT \x01\x02\x07\x1f"),
370            ("backslash", "SELECT 'C:\\path\\file'"),
371        ];
372
373        for (label, sql) in payloads {
374            let dir = tmp_dir();
375            let log = logger(&dir, 0, 100);
376            let scope = empty_scope();
377
378            log.record(QueryKind::Select, 1, sql.to_string(), &scope);
379            flush(&log);
380
381            let path = dir.join("red-slow.log");
382            let body = std::fs::read_to_string(&path).unwrap_or_default();
383            let line = body
384                .lines()
385                .find(|l| !l.is_empty())
386                .unwrap_or_else(|| panic!("{label}: no line emitted"));
387
388            // Must be a single JSONL row — no embedded newline.
389            assert!(
390                !line.contains('\n'),
391                "{label}: embedded newline in JSONL row"
392            );
393
394            // Must parse as valid JSON.
395            let v: crate::json::Value = crate::json::from_str(line)
396                .unwrap_or_else(|e| panic!("{label}: not valid JSON: {e}\n{line:?}"));
397
398            // SQL must round-trip correctly.
399            let recovered = v.get("sql").and_then(|x| x.as_str()).unwrap_or("");
400            assert_eq!(recovered, *sql, "{label}: SQL round-trip mismatch");
401        }
402    }
403
404    // -----------------------------------------------------------------------
405    // Exact-threshold boundary: duration == threshold emits
406    // -----------------------------------------------------------------------
407
408    #[test]
409    fn at_threshold_boundary_emits() {
410        let dir = tmp_dir();
411        let log = logger(&dir, 50, 100);
412        let scope = empty_scope();
413
414        // Exactly at threshold: duration_ms < threshold_ms is false → emits.
415        log.record(QueryKind::Select, 50, "SELECT 1".into(), &scope);
416        flush(&log);
417        let lines = read_log_lines(&dir);
418        assert_eq!(lines.len(), 1, "duration == threshold should emit");
419    }
420
421    #[test]
422    fn just_below_threshold_does_not_emit() {
423        let dir = tmp_dir();
424        let log = logger(&dir, 50, 100);
425        let scope = empty_scope();
426
427        log.record(QueryKind::Select, 49, "SELECT 1".into(), &scope);
428        flush(&log);
429        let lines = read_log_lines(&dir);
430        assert!(lines.is_empty(), "duration < threshold must not emit");
431    }
432}