Skip to main content

reddb_server/telemetry/
slow_query_logger.rs

1//! Dedicated slow-query sink — writes structured JSON lines to `red-slow.log`.
2//!
3//! Below-threshold calls pay only one relaxed atomic load and return
4//! immediately with zero allocations. Above-threshold calls go through
5//! a deterministic counter-based sampler before writing to the
6//! `NonBlocking` writer, which pushes the bytes onto a channel and
7//! returns without waiting for the disk write.
8
9use std::io::Write;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
12use std::sync::{Arc, Mutex};
13
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15
16use crate::runtime::EffectiveScope;
17
18// ---------------------------------------------------------------------------
19// Public types
20// ---------------------------------------------------------------------------
21
22pub struct SlowQueryOpts {
23    pub log_dir: PathBuf,
24    pub threshold_ms: u64,
25    /// 0..=100; values > 100 are clamped to 100.
26    pub sample_pct: u8,
27}
28
29/// Closed enum of query kinds emitted in the slow-query log.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum QueryKind {
32    Select,
33    Insert,
34    Update,
35    Delete,
36    Bulk,
37    Aggregate,
38    DDL,
39    Internal,
40}
41
42impl QueryKind {
43    fn as_str(self) -> &'static str {
44        match self {
45            Self::Select => "select",
46            Self::Insert => "insert",
47            Self::Update => "update",
48            Self::Delete => "delete",
49            Self::Bulk => "bulk",
50            Self::Aggregate => "aggregate",
51            Self::DDL => "ddl",
52            Self::Internal => "internal",
53        }
54    }
55}
56
57// ---------------------------------------------------------------------------
58// SlowQueryLogger
59// ---------------------------------------------------------------------------
60
61pub struct SlowQueryLogger {
62    /// Interior-mutable because `Write::write_all` takes `&mut self`.
63    writer: Mutex<NonBlocking>,
64    /// Keeps the background writer thread alive for the process lifetime.
65    _guard: WorkerGuard,
66    threshold_ms: AtomicU64,
67    /// 0..100; 100 means "emit every above-threshold query".
68    sample_pct: AtomicU8,
69    /// Monotonic counter across above-threshold calls — drives round-robin
70    /// sampling so exactly `sample_pct`% of above-threshold calls emit.
71    above_count: AtomicU64,
72}
73
74impl SlowQueryLogger {
75    pub fn new(opts: SlowQueryOpts) -> Arc<Self> {
76        let path = reddb_file::layout::legacy_slow_query_log_path(&opts.log_dir);
77        Self::open_at(path, opts.threshold_ms, opts.sample_pct)
78    }
79
80    /// Resolve a [`crate::storage::layout::LogDestination`] into a concrete
81    /// slow-query sink. `File(p)` writes to that exact path; `Stderr` and
82    /// `Syslog` fall back to `<fallback_log_dir>/red-slow.log` until the
83    /// dedicated sinks are wired (ADR 0018).
84    pub fn for_destination(
85        dest: &crate::storage::layout::LogDestination,
86        fallback_log_dir: &std::path::Path,
87        threshold_ms: u64,
88        sample_pct: u8,
89    ) -> Arc<Self> {
90        use crate::storage::layout::LogDestination;
91        let path = match dest {
92            LogDestination::File(p) => p.clone(),
93            LogDestination::Stderr => {
94                reddb_file::layout::legacy_slow_query_log_path(fallback_log_dir)
95            }
96            LogDestination::Syslog => {
97                tracing::warn!(
98                    target: "reddb::slow",
99                    "slow-query LogDestination::Syslog requested; sink not implemented, falling back to file"
100                );
101                reddb_file::layout::legacy_slow_query_log_path(fallback_log_dir)
102            }
103        };
104        Self::open_at(path, threshold_ms, sample_pct)
105    }
106
107    fn open_at(path: PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<Self> {
108        if let Some(parent) = path.parent() {
109            let _ = std::fs::create_dir_all(parent);
110        }
111        let file = std::fs::OpenOptions::new()
112            .create(true)
113            .append(true)
114            .open(&path)
115            .unwrap_or_else(|e| panic!("SlowQueryLogger: cannot open {}: {e}", path.display()));
116
117        let (writer, guard) = NonBlockingBuilder::default()
118            .buffered_lines_limit(65_536)
119            .lossy(true)
120            .finish(file);
121
122        Arc::new(Self {
123            writer: Mutex::new(writer),
124            _guard: guard,
125            threshold_ms: AtomicU64::new(threshold_ms),
126            sample_pct: AtomicU8::new(sample_pct.min(100)),
127            above_count: AtomicU64::new(0),
128        })
129    }
130
131    /// Record a completed query. Below-threshold: single relaxed load, no
132    /// allocation. Above-threshold + sampled: emit a JSON line to the sink.
133    pub fn record(
134        &self,
135        kind: QueryKind,
136        duration_ms: u64,
137        sql_redacted: String,
138        scope: &EffectiveScope,
139    ) {
140        // Hot path: threshold gate — single relaxed atomic load.
141        if duration_ms < self.threshold_ms.load(Ordering::Relaxed) {
142            return;
143        }
144
145        // Sampling: deterministic round-robin counter.
146        // `above_count % 100 < sample_pct` gives exactly sample_pct% over
147        // long runs without any floating-point or RNG overhead.
148        let pct = u64::from(self.sample_pct.load(Ordering::Relaxed));
149        if pct < 100 {
150            let n = self.above_count.fetch_add(1, Ordering::Relaxed);
151            if (n % 100) >= pct {
152                return;
153            }
154        }
155
156        self.emit(kind, duration_ms, sql_redacted, scope);
157    }
158
159    fn emit(
160        &self,
161        kind: QueryKind,
162        duration_ms: u64,
163        sql_redacted: String,
164        scope: &EffectiveScope,
165    ) {
166        let ts_ms = crate::utils::now_unix_millis();
167        let tenant = scope.tenant.as_deref().unwrap_or("").to_string();
168        let identity = scope
169            .identity
170            .as_ref()
171            .map(|(u, _)| u.as_str())
172            .unwrap_or("")
173            .to_string();
174
175        let mut map = std::collections::BTreeMap::new();
176        map.insert(
177            "ts_ms".to_string(),
178            crate::json::Value::Number(ts_ms as f64),
179        );
180        map.insert(
181            "kind".to_string(),
182            crate::json::Value::String(kind.as_str().to_string()),
183        );
184        map.insert(
185            "duration_ms".to_string(),
186            crate::json::Value::Number(duration_ms as f64),
187        );
188        map.insert("sql".to_string(), crate::json::Value::String(sql_redacted));
189        map.insert("tenant".to_string(), crate::json::Value::String(tenant));
190        map.insert("identity".to_string(), crate::json::Value::String(identity));
191
192        let obj = crate::json::Value::Object(map);
193        if let Ok(mut line) = crate::json::to_string(&obj) {
194            line.push('\n');
195            if let Ok(mut w) = self.writer.lock() {
196                let _ = w.write_all(line.as_bytes());
197            }
198        }
199    }
200}
201
202// ---------------------------------------------------------------------------
203// Tests
204// ---------------------------------------------------------------------------
205
206#[cfg(test)]
207mod tests {
208    use std::collections::HashSet;
209    use std::time::Instant;
210
211    use super::*;
212    use crate::runtime::EffectiveScope;
213    use crate::storage::transaction::snapshot::Snapshot;
214
215    fn tmp_dir() -> PathBuf {
216        let mut d = std::env::temp_dir();
217        d.push(format!(
218            "reddb-slow-{}-{}",
219            std::process::id(),
220            crate::utils::now_unix_nanos()
221        ));
222        d
223    }
224
225    fn logger(dir: &PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<SlowQueryLogger> {
226        SlowQueryLogger::new(SlowQueryOpts {
227            log_dir: dir.clone(),
228            threshold_ms,
229            sample_pct,
230        })
231    }
232
233    fn empty_scope() -> EffectiveScope {
234        EffectiveScope {
235            tenant: None,
236            identity: None,
237            snapshot: Snapshot {
238                xid: 0,
239                in_progress: HashSet::new(),
240            },
241            visible_collections: None,
242        }
243    }
244
245    fn flush(_log: &Arc<SlowQueryLogger>) {
246        // NonBlocking sends bytes to a background thread; sleep gives the
247        // worker time to drain the channel and write to disk.
248        std::thread::sleep(std::time::Duration::from_millis(50));
249    }
250
251    fn read_log_lines(dir: &PathBuf) -> Vec<crate::json::Value> {
252        let path = reddb_file::layout::legacy_slow_query_log_path(dir);
253        let body = std::fs::read_to_string(&path).unwrap_or_default();
254        body.lines()
255            .filter(|l| !l.is_empty())
256            .map(|l| crate::json::from_str::<crate::json::Value>(l).expect("valid JSON"))
257            .collect()
258    }
259
260    // -----------------------------------------------------------------------
261    // Below-threshold: zero writes, fast
262    // -----------------------------------------------------------------------
263
264    #[test]
265    fn below_threshold_no_file_writes() {
266        let dir = tmp_dir();
267        let log = logger(&dir, 1000, 100);
268        let scope = empty_scope();
269
270        for _ in 0..10_000 {
271            log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
272        }
273
274        flush(&log);
275        let lines = read_log_lines(&dir);
276        assert!(
277            lines.is_empty(),
278            "expected zero writes, got {}",
279            lines.len()
280        );
281    }
282
283    #[test]
284    fn below_threshold_wall_time_under_10ms() {
285        let dir = tmp_dir();
286        let log = logger(&dir, 1000, 100);
287        let scope = empty_scope();
288
289        let start = Instant::now();
290        for _ in 0..10_000 {
291            log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
292        }
293        let elapsed = start.elapsed();
294        assert!(
295            elapsed.as_millis() < 10,
296            "10k below-threshold calls took {}ms (>10ms budget)",
297            elapsed.as_millis()
298        );
299    }
300
301    // -----------------------------------------------------------------------
302    // Above-threshold: structured JSON line, parseable fields
303    // -----------------------------------------------------------------------
304
305    #[test]
306    fn above_threshold_emits_json_line() {
307        let dir = tmp_dir();
308        let log = logger(&dir, 10, 100);
309        let scope = empty_scope();
310
311        log.record(QueryKind::Select, 100, "SELECT * FROM t".into(), &scope);
312        flush(&log);
313
314        let lines = read_log_lines(&dir);
315        assert_eq!(lines.len(), 1, "expected 1 line");
316        let v = &lines[0];
317        assert_eq!(v.get("kind").and_then(|x| x.as_str()), Some("select"));
318        assert_eq!(v.get("duration_ms").and_then(|x| x.as_i64()), Some(100));
319        let sql = v.get("sql").and_then(|x| x.as_str());
320        assert_eq!(sql, Some("SELECT * FROM t"));
321    }
322
323    #[test]
324    fn json_line_has_all_required_fields() {
325        let dir = tmp_dir();
326        let log = logger(&dir, 0, 100);
327        let scope = empty_scope();
328
329        log.record(
330            QueryKind::Insert,
331            42,
332            "INSERT INTO t VALUES (1)".into(),
333            &scope,
334        );
335        flush(&log);
336
337        let lines = read_log_lines(&dir);
338        assert_eq!(lines.len(), 1);
339        let v = &lines[0];
340        assert!(v.get("ts_ms").is_some(), "missing ts_ms");
341        assert!(v.get("kind").is_some(), "missing kind");
342        assert!(v.get("duration_ms").is_some(), "missing duration_ms");
343        assert!(v.get("sql").is_some(), "missing sql");
344        assert!(v.get("tenant").is_some(), "missing tenant");
345        assert!(v.get("identity").is_some(), "missing identity");
346    }
347
348    #[test]
349    fn all_query_kinds_serialise() {
350        let kinds = [
351            QueryKind::Select,
352            QueryKind::Insert,
353            QueryKind::Update,
354            QueryKind::Delete,
355            QueryKind::Bulk,
356            QueryKind::Aggregate,
357            QueryKind::DDL,
358            QueryKind::Internal,
359        ];
360        for k in kinds {
361            assert!(!k.as_str().is_empty());
362        }
363    }
364
365    // -----------------------------------------------------------------------
366    // Sampling property test: sample_pct=10 → ~10% (±2pp) over 10_000 calls
367    // -----------------------------------------------------------------------
368
369    #[test]
370    fn sampling_property_10pct() {
371        let dir = tmp_dir();
372        // threshold=0 so every call is above-threshold
373        let log = logger(&dir, 0, 10);
374        let scope = empty_scope();
375
376        let calls = 10_000u64;
377        for i in 0..calls {
378            log.record(QueryKind::Select, 1, format!("SELECT {i}"), &scope);
379        }
380        flush(&log);
381
382        let lines = read_log_lines(&dir);
383        let emitted = lines.len() as u64;
384        // Expected: 1000 ± 200 (2pp of 10_000)
385        assert!(
386            emitted >= 800 && emitted <= 1200,
387            "sample_pct=10 over {calls} calls emitted {emitted} (expected 800..=1200)"
388        );
389    }
390
391    // -----------------------------------------------------------------------
392    // Adversarial SQL: CRLF / NUL / quote → escape-safe JSON
393    // -----------------------------------------------------------------------
394
395    #[test]
396    fn adversarial_sql_is_escape_safe() {
397        let payloads: &[(&str, &str)] = &[
398            ("crlf", "SELECT 1\r\nDROP TABLE t--"),
399            ("nul", "SELECT '\x00'"),
400            ("quote", r#"SELECT "secret" FROM t"#),
401            ("json_inject", r#"SELECT 1},"pwned":true,{"x":"#),
402            ("low_ctrl", "SELECT \x01\x02\x07\x1f"),
403            ("backslash", "SELECT 'C:\\path\\file'"),
404        ];
405
406        for (label, sql) in payloads {
407            let dir = tmp_dir();
408            let log = logger(&dir, 0, 100);
409            let scope = empty_scope();
410
411            log.record(QueryKind::Select, 1, sql.to_string(), &scope);
412            flush(&log);
413
414            let path = reddb_file::layout::legacy_slow_query_log_path(&dir);
415            let body = std::fs::read_to_string(&path).unwrap_or_default();
416            let line = body
417                .lines()
418                .find(|l| !l.is_empty())
419                .unwrap_or_else(|| panic!("{label}: no line emitted"));
420
421            // Must be a single JSONL row — no embedded newline.
422            assert!(
423                !line.contains('\n'),
424                "{label}: embedded newline in JSONL row"
425            );
426
427            // Must parse as valid JSON.
428            let v: crate::json::Value = crate::json::from_str(line)
429                .unwrap_or_else(|e| panic!("{label}: not valid JSON: {e}\n{line:?}"));
430
431            // SQL must round-trip correctly.
432            let recovered = v.get("sql").and_then(|x| x.as_str()).unwrap_or("");
433            assert_eq!(recovered, *sql, "{label}: SQL round-trip mismatch");
434        }
435    }
436
437    // -----------------------------------------------------------------------
438    // Exact-threshold boundary: duration == threshold emits
439    // -----------------------------------------------------------------------
440
441    #[test]
442    fn at_threshold_boundary_emits() {
443        let dir = tmp_dir();
444        let log = logger(&dir, 50, 100);
445        let scope = empty_scope();
446
447        // Exactly at threshold: duration_ms < threshold_ms is false → emits.
448        log.record(QueryKind::Select, 50, "SELECT 1".into(), &scope);
449        flush(&log);
450        let lines = read_log_lines(&dir);
451        assert_eq!(lines.len(), 1, "duration == threshold should emit");
452    }
453
454    #[test]
455    fn just_below_threshold_does_not_emit() {
456        let dir = tmp_dir();
457        let log = logger(&dir, 50, 100);
458        let scope = empty_scope();
459
460        log.record(QueryKind::Select, 49, "SELECT 1".into(), &scope);
461        flush(&log);
462        let lines = read_log_lines(&dir);
463        assert!(lines.is_empty(), "duration < threshold must not emit");
464    }
465}