Skip to main content

reddb_server/telemetry/
slow_query_logger.rs

1//! Dedicated slow-query sink — writes structured JSON lines to `red-slow.log`.
2//!
3//! Below-threshold calls pay only one relaxed atomic load and return
4//! immediately with zero allocations. Above-threshold calls go through
5//! a deterministic counter-based sampler before writing to the
6//! `NonBlocking` writer, which pushes the bytes onto a channel and
7//! returns without waiting for the disk write.
8
9use std::io::Write;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
12use std::sync::{Arc, Mutex};
13
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15
16use crate::runtime::EffectiveScope;
17
18// ---------------------------------------------------------------------------
19// Public types
20// ---------------------------------------------------------------------------
21
22pub struct SlowQueryOpts {
23    pub log_dir: PathBuf,
24    pub threshold_ms: u64,
25    /// 0..=100; values > 100 are clamped to 100.
26    pub sample_pct: u8,
27}
28
29/// Closed enum of query kinds emitted in the slow-query log.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum QueryKind {
32    Select,
33    Insert,
34    Update,
35    Delete,
36    Bulk,
37    Aggregate,
38    DDL,
39    Internal,
40}
41
42impl QueryKind {
43    fn as_str(self) -> &'static str {
44        match self {
45            Self::Select => "select",
46            Self::Insert => "insert",
47            Self::Update => "update",
48            Self::Delete => "delete",
49            Self::Bulk => "bulk",
50            Self::Aggregate => "aggregate",
51            Self::DDL => "ddl",
52            Self::Internal => "internal",
53        }
54    }
55}
56
57// ---------------------------------------------------------------------------
58// SlowQueryLogger
59// ---------------------------------------------------------------------------
60
61pub struct SlowQueryLogger {
62    /// Interior-mutable because `Write::write_all` takes `&mut self`.
63    writer: Mutex<NonBlocking>,
64    /// Keeps the background writer thread alive for the process lifetime.
65    _guard: WorkerGuard,
66    threshold_ms: AtomicU64,
67    /// 0..100; 100 means "emit every above-threshold query".
68    sample_pct: AtomicU8,
69    /// Monotonic counter across above-threshold calls — drives round-robin
70    /// sampling so exactly `sample_pct`% of above-threshold calls emit.
71    above_count: AtomicU64,
72}
73
74impl SlowQueryLogger {
75    pub fn new(opts: SlowQueryOpts) -> Arc<Self> {
76        let path = opts.log_dir.join("red-slow.log");
77        Self::open_at(path, opts.threshold_ms, opts.sample_pct)
78    }
79
80    /// Resolve a [`crate::storage::layout::LogDestination`] into a concrete
81    /// slow-query sink. `File(p)` writes to that exact path; `Stderr` and
82    /// `Syslog` fall back to `<fallback_log_dir>/red-slow.log` until the
83    /// dedicated sinks are wired (ADR 0018).
84    pub fn for_destination(
85        dest: &crate::storage::layout::LogDestination,
86        fallback_log_dir: &std::path::Path,
87        threshold_ms: u64,
88        sample_pct: u8,
89    ) -> Arc<Self> {
90        use crate::storage::layout::LogDestination;
91        let path = match dest {
92            LogDestination::File(p) => p.clone(),
93            LogDestination::Stderr => fallback_log_dir.join("red-slow.log"),
94            LogDestination::Syslog => {
95                tracing::warn!(
96                    target: "reddb::slow",
97                    "slow-query LogDestination::Syslog requested; sink not implemented, falling back to file"
98                );
99                fallback_log_dir.join("red-slow.log")
100            }
101        };
102        Self::open_at(path, threshold_ms, sample_pct)
103    }
104
105    fn open_at(path: PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<Self> {
106        if let Some(parent) = path.parent() {
107            let _ = std::fs::create_dir_all(parent);
108        }
109        let file = std::fs::OpenOptions::new()
110            .create(true)
111            .append(true)
112            .open(&path)
113            .unwrap_or_else(|e| panic!("SlowQueryLogger: cannot open {}: {e}", path.display()));
114
115        let (writer, guard) = NonBlockingBuilder::default()
116            .buffered_lines_limit(65_536)
117            .lossy(true)
118            .finish(file);
119
120        Arc::new(Self {
121            writer: Mutex::new(writer),
122            _guard: guard,
123            threshold_ms: AtomicU64::new(threshold_ms),
124            sample_pct: AtomicU8::new(sample_pct.min(100)),
125            above_count: AtomicU64::new(0),
126        })
127    }
128
129    /// Record a completed query. Below-threshold: single relaxed load, no
130    /// allocation. Above-threshold + sampled: emit a JSON line to the sink.
131    pub fn record(
132        &self,
133        kind: QueryKind,
134        duration_ms: u64,
135        sql_redacted: String,
136        scope: &EffectiveScope,
137    ) {
138        // Hot path: threshold gate — single relaxed atomic load.
139        if duration_ms < self.threshold_ms.load(Ordering::Relaxed) {
140            return;
141        }
142
143        // Sampling: deterministic round-robin counter.
144        // `above_count % 100 < sample_pct` gives exactly sample_pct% over
145        // long runs without any floating-point or RNG overhead.
146        let pct = u64::from(self.sample_pct.load(Ordering::Relaxed));
147        if pct < 100 {
148            let n = self.above_count.fetch_add(1, Ordering::Relaxed);
149            if (n % 100) >= pct {
150                return;
151            }
152        }
153
154        self.emit(kind, duration_ms, sql_redacted, scope);
155    }
156
157    fn emit(
158        &self,
159        kind: QueryKind,
160        duration_ms: u64,
161        sql_redacted: String,
162        scope: &EffectiveScope,
163    ) {
164        let ts_ms = crate::utils::now_unix_millis();
165        let tenant = scope.tenant.as_deref().unwrap_or("").to_string();
166        let identity = scope
167            .identity
168            .as_ref()
169            .map(|(u, _)| u.as_str())
170            .unwrap_or("")
171            .to_string();
172
173        let mut map = std::collections::BTreeMap::new();
174        map.insert(
175            "ts_ms".to_string(),
176            crate::json::Value::Number(ts_ms as f64),
177        );
178        map.insert(
179            "kind".to_string(),
180            crate::json::Value::String(kind.as_str().to_string()),
181        );
182        map.insert(
183            "duration_ms".to_string(),
184            crate::json::Value::Number(duration_ms as f64),
185        );
186        map.insert("sql".to_string(), crate::json::Value::String(sql_redacted));
187        map.insert("tenant".to_string(), crate::json::Value::String(tenant));
188        map.insert("identity".to_string(), crate::json::Value::String(identity));
189
190        let obj = crate::json::Value::Object(map);
191        if let Ok(mut line) = crate::json::to_string(&obj) {
192            line.push('\n');
193            if let Ok(mut w) = self.writer.lock() {
194                let _ = w.write_all(line.as_bytes());
195            }
196        }
197    }
198}
199
200// ---------------------------------------------------------------------------
201// Tests
202// ---------------------------------------------------------------------------
203
204#[cfg(test)]
205mod tests {
206    use std::collections::HashSet;
207    use std::time::Instant;
208
209    use super::*;
210    use crate::runtime::EffectiveScope;
211    use crate::storage::transaction::snapshot::Snapshot;
212
213    fn tmp_dir() -> PathBuf {
214        let mut d = std::env::temp_dir();
215        d.push(format!(
216            "reddb-slow-{}-{}",
217            std::process::id(),
218            crate::utils::now_unix_nanos()
219        ));
220        d
221    }
222
223    fn logger(dir: &PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<SlowQueryLogger> {
224        SlowQueryLogger::new(SlowQueryOpts {
225            log_dir: dir.clone(),
226            threshold_ms,
227            sample_pct,
228        })
229    }
230
231    fn empty_scope() -> EffectiveScope {
232        EffectiveScope {
233            tenant: None,
234            identity: None,
235            snapshot: Snapshot {
236                xid: 0,
237                in_progress: HashSet::new(),
238            },
239            visible_collections: None,
240        }
241    }
242
243    fn flush(_log: &Arc<SlowQueryLogger>) {
244        // NonBlocking sends bytes to a background thread; sleep gives the
245        // worker time to drain the channel and write to disk.
246        std::thread::sleep(std::time::Duration::from_millis(50));
247    }
248
249    fn read_log_lines(dir: &PathBuf) -> Vec<crate::json::Value> {
250        let path = dir.join("red-slow.log");
251        let body = std::fs::read_to_string(&path).unwrap_or_default();
252        body.lines()
253            .filter(|l| !l.is_empty())
254            .map(|l| crate::json::from_str::<crate::json::Value>(l).expect("valid JSON"))
255            .collect()
256    }
257
258    // -----------------------------------------------------------------------
259    // Below-threshold: zero writes, fast
260    // -----------------------------------------------------------------------
261
262    #[test]
263    fn below_threshold_no_file_writes() {
264        let dir = tmp_dir();
265        let log = logger(&dir, 1000, 100);
266        let scope = empty_scope();
267
268        for _ in 0..10_000 {
269            log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
270        }
271
272        flush(&log);
273        let lines = read_log_lines(&dir);
274        assert!(
275            lines.is_empty(),
276            "expected zero writes, got {}",
277            lines.len()
278        );
279    }
280
281    #[test]
282    fn below_threshold_wall_time_under_10ms() {
283        let dir = tmp_dir();
284        let log = logger(&dir, 1000, 100);
285        let scope = empty_scope();
286
287        let start = Instant::now();
288        for _ in 0..10_000 {
289            log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
290        }
291        let elapsed = start.elapsed();
292        assert!(
293            elapsed.as_millis() < 10,
294            "10k below-threshold calls took {}ms (>10ms budget)",
295            elapsed.as_millis()
296        );
297    }
298
299    // -----------------------------------------------------------------------
300    // Above-threshold: structured JSON line, parseable fields
301    // -----------------------------------------------------------------------
302
303    #[test]
304    fn above_threshold_emits_json_line() {
305        let dir = tmp_dir();
306        let log = logger(&dir, 10, 100);
307        let scope = empty_scope();
308
309        log.record(QueryKind::Select, 100, "SELECT * FROM t".into(), &scope);
310        flush(&log);
311
312        let lines = read_log_lines(&dir);
313        assert_eq!(lines.len(), 1, "expected 1 line");
314        let v = &lines[0];
315        assert_eq!(v.get("kind").and_then(|x| x.as_str()), Some("select"));
316        assert_eq!(v.get("duration_ms").and_then(|x| x.as_i64()), Some(100));
317        let sql = v.get("sql").and_then(|x| x.as_str());
318        assert_eq!(sql, Some("SELECT * FROM t"));
319    }
320
321    #[test]
322    fn json_line_has_all_required_fields() {
323        let dir = tmp_dir();
324        let log = logger(&dir, 0, 100);
325        let scope = empty_scope();
326
327        log.record(
328            QueryKind::Insert,
329            42,
330            "INSERT INTO t VALUES (1)".into(),
331            &scope,
332        );
333        flush(&log);
334
335        let lines = read_log_lines(&dir);
336        assert_eq!(lines.len(), 1);
337        let v = &lines[0];
338        assert!(v.get("ts_ms").is_some(), "missing ts_ms");
339        assert!(v.get("kind").is_some(), "missing kind");
340        assert!(v.get("duration_ms").is_some(), "missing duration_ms");
341        assert!(v.get("sql").is_some(), "missing sql");
342        assert!(v.get("tenant").is_some(), "missing tenant");
343        assert!(v.get("identity").is_some(), "missing identity");
344    }
345
346    #[test]
347    fn all_query_kinds_serialise() {
348        let kinds = [
349            QueryKind::Select,
350            QueryKind::Insert,
351            QueryKind::Update,
352            QueryKind::Delete,
353            QueryKind::Bulk,
354            QueryKind::Aggregate,
355            QueryKind::DDL,
356            QueryKind::Internal,
357        ];
358        for k in kinds {
359            assert!(!k.as_str().is_empty());
360        }
361    }
362
363    // -----------------------------------------------------------------------
364    // Sampling property test: sample_pct=10 → ~10% (±2pp) over 10_000 calls
365    // -----------------------------------------------------------------------
366
367    #[test]
368    fn sampling_property_10pct() {
369        let dir = tmp_dir();
370        // threshold=0 so every call is above-threshold
371        let log = logger(&dir, 0, 10);
372        let scope = empty_scope();
373
374        let calls = 10_000u64;
375        for i in 0..calls {
376            log.record(QueryKind::Select, 1, format!("SELECT {i}"), &scope);
377        }
378        flush(&log);
379
380        let lines = read_log_lines(&dir);
381        let emitted = lines.len() as u64;
382        // Expected: 1000 ± 200 (2pp of 10_000)
383        assert!(
384            emitted >= 800 && emitted <= 1200,
385            "sample_pct=10 over {calls} calls emitted {emitted} (expected 800..=1200)"
386        );
387    }
388
389    // -----------------------------------------------------------------------
390    // Adversarial SQL: CRLF / NUL / quote → escape-safe JSON
391    // -----------------------------------------------------------------------
392
393    #[test]
394    fn adversarial_sql_is_escape_safe() {
395        let payloads: &[(&str, &str)] = &[
396            ("crlf", "SELECT 1\r\nDROP TABLE t--"),
397            ("nul", "SELECT '\x00'"),
398            ("quote", r#"SELECT "secret" FROM t"#),
399            ("json_inject", r#"SELECT 1},"pwned":true,{"x":"#),
400            ("low_ctrl", "SELECT \x01\x02\x07\x1f"),
401            ("backslash", "SELECT 'C:\\path\\file'"),
402        ];
403
404        for (label, sql) in payloads {
405            let dir = tmp_dir();
406            let log = logger(&dir, 0, 100);
407            let scope = empty_scope();
408
409            log.record(QueryKind::Select, 1, sql.to_string(), &scope);
410            flush(&log);
411
412            let path = dir.join("red-slow.log");
413            let body = std::fs::read_to_string(&path).unwrap_or_default();
414            let line = body
415                .lines()
416                .find(|l| !l.is_empty())
417                .unwrap_or_else(|| panic!("{label}: no line emitted"));
418
419            // Must be a single JSONL row — no embedded newline.
420            assert!(
421                !line.contains('\n'),
422                "{label}: embedded newline in JSONL row"
423            );
424
425            // Must parse as valid JSON.
426            let v: crate::json::Value = crate::json::from_str(line)
427                .unwrap_or_else(|e| panic!("{label}: not valid JSON: {e}\n{line:?}"));
428
429            // SQL must round-trip correctly.
430            let recovered = v.get("sql").and_then(|x| x.as_str()).unwrap_or("");
431            assert_eq!(recovered, *sql, "{label}: SQL round-trip mismatch");
432        }
433    }
434
435    // -----------------------------------------------------------------------
436    // Exact-threshold boundary: duration == threshold emits
437    // -----------------------------------------------------------------------
438
439    #[test]
440    fn at_threshold_boundary_emits() {
441        let dir = tmp_dir();
442        let log = logger(&dir, 50, 100);
443        let scope = empty_scope();
444
445        // Exactly at threshold: duration_ms < threshold_ms is false → emits.
446        log.record(QueryKind::Select, 50, "SELECT 1".into(), &scope);
447        flush(&log);
448        let lines = read_log_lines(&dir);
449        assert_eq!(lines.len(), 1, "duration == threshold should emit");
450    }
451
452    #[test]
453    fn just_below_threshold_does_not_emit() {
454        let dir = tmp_dir();
455        let log = logger(&dir, 50, 100);
456        let scope = empty_scope();
457
458        log.record(QueryKind::Select, 49, "SELECT 1".into(), &scope);
459        flush(&log);
460        let lines = read_log_lines(&dir);
461        assert!(lines.is_empty(), "duration < threshold must not emit");
462    }
463}