Skip to main content

reddb_server/telemetry/
slow_query_logger.rs

1//! Dedicated slow-query sink — writes structured JSON lines to `red-slow.log`.
2//!
3//! Below-threshold calls pay only one relaxed atomic load and return
4//! immediately with zero allocations. Above-threshold calls go through
5//! a deterministic counter-based sampler before writing to the
6//! `NonBlocking` writer, which pushes the bytes onto a channel and
7//! returns without waiting for the disk write.
8
9use std::io::Write;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
12use std::sync::{Arc, Mutex};
13
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15
16use crate::runtime::EffectiveScope;
17
18// ---------------------------------------------------------------------------
19// Public types
20// ---------------------------------------------------------------------------
21
22pub struct SlowQueryOpts {
23    pub log_dir: PathBuf,
24    pub threshold_ms: u64,
25    /// 0..=100; values > 100 are clamped to 100.
26    pub sample_pct: u8,
27}
28
29/// Closed enum of query kinds emitted in the slow-query log.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum QueryKind {
32    Select,
33    Insert,
34    Update,
35    Delete,
36    Bulk,
37    Aggregate,
38    DDL,
39    Internal,
40}
41
42impl QueryKind {
43    /// Every variant, in stable order. The histogram substrate (#1241)
44    /// keys one fixed cell per variant off this array, so the order
45    /// here defines the bounded `kind` cardinality budget (ADR 0060 §4).
46    pub const ALL: [QueryKind; 8] = [
47        Self::Select,
48        Self::Insert,
49        Self::Update,
50        Self::Delete,
51        Self::Bulk,
52        Self::Aggregate,
53        Self::DDL,
54        Self::Internal,
55    ];
56
57    pub fn as_str(self) -> &'static str {
58        match self {
59            Self::Select => "select",
60            Self::Insert => "insert",
61            Self::Update => "update",
62            Self::Delete => "delete",
63            Self::Bulk => "bulk",
64            Self::Aggregate => "aggregate",
65            Self::DDL => "ddl",
66            Self::Internal => "internal",
67        }
68    }
69
70    /// Dense index into [`QueryKind::ALL`] — the cell offset used by the
71    /// latency histogram substrate.
72    pub fn index(self) -> usize {
73        match self {
74            Self::Select => 0,
75            Self::Insert => 1,
76            Self::Update => 2,
77            Self::Delete => 3,
78            Self::Bulk => 4,
79            Self::Aggregate => 5,
80            Self::DDL => 6,
81            Self::Internal => 7,
82        }
83    }
84}
85
86// ---------------------------------------------------------------------------
87// SlowQueryLogger
88// ---------------------------------------------------------------------------
89
90pub struct SlowQueryLogger {
91    /// Interior-mutable because `Write::write_all` takes `&mut self`.
92    writer: Mutex<NonBlocking>,
93    /// Keeps the background writer thread alive for the process lifetime.
94    _guard: WorkerGuard,
95    threshold_ms: AtomicU64,
96    /// 0..100; 100 means "emit every above-threshold query".
97    sample_pct: AtomicU8,
98    /// Monotonic counter across above-threshold calls — drives round-robin
99    /// sampling so exactly `sample_pct`% of above-threshold calls emit.
100    above_count: AtomicU64,
101    /// Durable telemetry substrate (ADR 0060). When attached, above-threshold
102    /// events are dual-written: file sink (existing behavior) + ring store.
103    store: std::sync::OnceLock<std::sync::Arc<super::slow_query_store::SlowQueryStore>>,
104}
105
106impl SlowQueryLogger {
107    pub fn new(opts: SlowQueryOpts) -> Arc<Self> {
108        let path = reddb_file::layout::legacy_slow_query_log_path(&opts.log_dir);
109        Self::open_at(path, opts.threshold_ms, opts.sample_pct)
110    }
111
112    /// Resolve a [`crate::storage::layout::LogDestination`] into a concrete
113    /// slow-query sink. `File(p)` writes to that exact path; `Stderr` and
114    /// `Syslog` fall back to `<fallback_log_dir>/red-slow.log` until the
115    /// dedicated sinks are wired (ADR 0018).
116    pub fn for_destination(
117        dest: &crate::storage::layout::LogDestination,
118        fallback_log_dir: &std::path::Path,
119        threshold_ms: u64,
120        sample_pct: u8,
121    ) -> Arc<Self> {
122        use crate::storage::layout::LogDestination;
123        let path = match dest {
124            LogDestination::File(p) => p.clone(),
125            LogDestination::Stderr => {
126                reddb_file::layout::legacy_slow_query_log_path(fallback_log_dir)
127            }
128            LogDestination::Syslog => {
129                tracing::warn!(
130                    target: "reddb::slow",
131                    "slow-query LogDestination::Syslog requested; sink not implemented, falling back to file"
132                );
133                reddb_file::layout::legacy_slow_query_log_path(fallback_log_dir)
134            }
135        };
136        Self::open_at(path, threshold_ms, sample_pct)
137    }
138
139    fn open_at(path: PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<Self> {
140        if let Some(parent) = path.parent() {
141            let _ = std::fs::create_dir_all(parent);
142        }
143        let file = std::fs::OpenOptions::new()
144            .create(true)
145            .append(true)
146            .open(&path)
147            .unwrap_or_else(|e| panic!("SlowQueryLogger: cannot open {}: {e}", path.display()));
148
149        let (writer, guard) = NonBlockingBuilder::default()
150            .buffered_lines_limit(65_536)
151            .lossy(true)
152            .finish(file);
153
154        Arc::new(Self {
155            writer: Mutex::new(writer),
156            _guard: guard,
157            threshold_ms: AtomicU64::new(threshold_ms),
158            sample_pct: AtomicU8::new(sample_pct.min(100)),
159            above_count: AtomicU64::new(0),
160            store: std::sync::OnceLock::new(),
161        })
162    }
163
164    /// Attach the operational telemetry substrate store (ADR 0060).
165    ///
166    /// Called once at runtime startup. Above-threshold, sampled events will
167    /// be written to both the file sink and the ring store. A second call
168    /// is a no-op (first registration wins).
169    pub fn attach_store(&self, store: std::sync::Arc<super::slow_query_store::SlowQueryStore>) {
170        let _ = self.store.set(store);
171    }
172
173    /// Record a completed query. Below-threshold: single relaxed load, no
174    /// allocation. Above-threshold + sampled: emit a JSON line to the sink.
175    pub fn record(
176        &self,
177        kind: QueryKind,
178        duration_ms: u64,
179        sql_redacted: String,
180        scope: &EffectiveScope,
181    ) {
182        // Hot path: threshold gate — single relaxed atomic load.
183        if duration_ms < self.threshold_ms.load(Ordering::Relaxed) {
184            return;
185        }
186
187        // Sampling: deterministic round-robin counter.
188        // `above_count % 100 < sample_pct` gives exactly sample_pct% over
189        // long runs without any floating-point or RNG overhead.
190        let pct = u64::from(self.sample_pct.load(Ordering::Relaxed));
191        if pct < 100 {
192            let n = self.above_count.fetch_add(1, Ordering::Relaxed);
193            if (n % 100) >= pct {
194                return;
195            }
196        }
197
198        self.emit(kind, duration_ms, sql_redacted, scope);
199    }
200
201    fn emit(
202        &self,
203        kind: QueryKind,
204        duration_ms: u64,
205        sql_redacted: String,
206        scope: &EffectiveScope,
207    ) {
208        let ts_ms = crate::utils::now_unix_millis();
209        let tenant = scope.tenant.as_deref().unwrap_or("").to_string();
210        let identity = scope
211            .identity
212            .as_ref()
213            .map(|(u, _)| u.as_str())
214            .unwrap_or("")
215            .to_string();
216
217        // Durable substrate (ADR 0060): push to the ring store with hashed
218        // tenant/identity before writing the file line. Store is optional;
219        // missing store → file-only behavior (backward-compatible).
220        if let Some(store) = self.store.get() {
221            store.push(super::slow_query_store::SlowQueryEvent {
222                ts_ms,
223                kind: kind.as_str(),
224                duration_ms,
225                sql_redacted: sql_redacted.clone(),
226                tenant_hash: super::slow_query_store::hash_label(&tenant),
227                identity_hash: super::slow_query_store::hash_label(&identity),
228            });
229        }
230
231        let mut map = std::collections::BTreeMap::new();
232        map.insert(
233            "ts_ms".to_string(),
234            crate::json::Value::Number(ts_ms as f64),
235        );
236        map.insert(
237            "kind".to_string(),
238            crate::json::Value::String(kind.as_str().to_string()),
239        );
240        map.insert(
241            "duration_ms".to_string(),
242            crate::json::Value::Number(duration_ms as f64),
243        );
244        map.insert("sql".to_string(), crate::json::Value::String(sql_redacted));
245        map.insert("tenant".to_string(), crate::json::Value::String(tenant));
246        map.insert("identity".to_string(), crate::json::Value::String(identity));
247
248        let obj = crate::json::Value::Object(map);
249        if let Ok(mut line) = crate::json::to_string(&obj) {
250            line.push('\n');
251            if let Ok(mut w) = self.writer.lock() {
252                let _ = w.write_all(line.as_bytes());
253            }
254        }
255    }
256}
257
258// ---------------------------------------------------------------------------
259// Tests
260// ---------------------------------------------------------------------------
261
262#[cfg(test)]
263mod tests {
264    use std::collections::HashSet;
265    use std::time::Instant;
266
267    use super::*;
268    use crate::runtime::EffectiveScope;
269    use crate::storage::transaction::snapshot::Snapshot;
270
271    fn tmp_dir() -> PathBuf {
272        let mut d = std::env::temp_dir();
273        d.push(format!(
274            "reddb-slow-{}-{}",
275            std::process::id(),
276            crate::utils::now_unix_nanos()
277        ));
278        d
279    }
280
281    fn logger(dir: &PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<SlowQueryLogger> {
282        SlowQueryLogger::new(SlowQueryOpts {
283            log_dir: dir.clone(),
284            threshold_ms,
285            sample_pct,
286        })
287    }
288
289    fn empty_scope() -> EffectiveScope {
290        EffectiveScope {
291            tenant: None,
292            identity: None,
293            snapshot: Snapshot {
294                xid: 0,
295                in_progress: HashSet::new(),
296            },
297            visible_collections: None,
298        }
299    }
300
301    fn flush(_log: &Arc<SlowQueryLogger>) {
302        // NonBlocking sends bytes to a background thread; sleep gives the
303        // worker time to drain the channel and write to disk.
304        std::thread::sleep(std::time::Duration::from_millis(50));
305    }
306
307    fn read_log_lines(dir: &PathBuf) -> Vec<crate::json::Value> {
308        let path = reddb_file::layout::legacy_slow_query_log_path(dir);
309        let body = std::fs::read_to_string(&path).unwrap_or_default();
310        body.lines()
311            .filter(|l| !l.is_empty())
312            .map(|l| crate::json::from_str::<crate::json::Value>(l).expect("valid JSON"))
313            .collect()
314    }
315
316    // -----------------------------------------------------------------------
317    // Below-threshold: zero writes, fast
318    // -----------------------------------------------------------------------
319
320    #[test]
321    fn below_threshold_no_file_writes() {
322        let dir = tmp_dir();
323        let log = logger(&dir, 1000, 100);
324        let scope = empty_scope();
325
326        for _ in 0..10_000 {
327            log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
328        }
329
330        flush(&log);
331        let lines = read_log_lines(&dir);
332        assert!(
333            lines.is_empty(),
334            "expected zero writes, got {}",
335            lines.len()
336        );
337    }
338
339    #[test]
340    fn below_threshold_wall_time_under_10ms() {
341        let dir = tmp_dir();
342        let log = logger(&dir, 1000, 100);
343        let scope = empty_scope();
344
345        let start = Instant::now();
346        for _ in 0..10_000 {
347            log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
348        }
349        let elapsed = start.elapsed();
350        assert!(
351            elapsed.as_millis() < 10,
352            "10k below-threshold calls took {}ms (>10ms budget)",
353            elapsed.as_millis()
354        );
355    }
356
357    // -----------------------------------------------------------------------
358    // Above-threshold: structured JSON line, parseable fields
359    // -----------------------------------------------------------------------
360
361    #[test]
362    fn above_threshold_emits_json_line() {
363        let dir = tmp_dir();
364        let log = logger(&dir, 10, 100);
365        let scope = empty_scope();
366
367        log.record(QueryKind::Select, 100, "SELECT * FROM t".into(), &scope);
368        flush(&log);
369
370        let lines = read_log_lines(&dir);
371        assert_eq!(lines.len(), 1, "expected 1 line");
372        let v = &lines[0];
373        assert_eq!(v.get("kind").and_then(|x| x.as_str()), Some("select"));
374        assert_eq!(v.get("duration_ms").and_then(|x| x.as_i64()), Some(100));
375        let sql = v.get("sql").and_then(|x| x.as_str());
376        assert_eq!(sql, Some("SELECT * FROM t"));
377    }
378
379    #[test]
380    fn json_line_has_all_required_fields() {
381        let dir = tmp_dir();
382        let log = logger(&dir, 0, 100);
383        let scope = empty_scope();
384
385        log.record(
386            QueryKind::Insert,
387            42,
388            "INSERT INTO t VALUES (1)".into(),
389            &scope,
390        );
391        flush(&log);
392
393        let lines = read_log_lines(&dir);
394        assert_eq!(lines.len(), 1);
395        let v = &lines[0];
396        assert!(v.get("ts_ms").is_some(), "missing ts_ms");
397        assert!(v.get("kind").is_some(), "missing kind");
398        assert!(v.get("duration_ms").is_some(), "missing duration_ms");
399        assert!(v.get("sql").is_some(), "missing sql");
400        assert!(v.get("tenant").is_some(), "missing tenant");
401        assert!(v.get("identity").is_some(), "missing identity");
402    }
403
404    #[test]
405    fn all_query_kinds_serialise() {
406        let kinds = [
407            QueryKind::Select,
408            QueryKind::Insert,
409            QueryKind::Update,
410            QueryKind::Delete,
411            QueryKind::Bulk,
412            QueryKind::Aggregate,
413            QueryKind::DDL,
414            QueryKind::Internal,
415        ];
416        for k in kinds {
417            assert!(!k.as_str().is_empty());
418        }
419    }
420
421    // -----------------------------------------------------------------------
422    // Sampling property test: sample_pct=10 → ~10% (±2pp) over 10_000 calls
423    // -----------------------------------------------------------------------
424
425    #[test]
426    fn sampling_property_10pct() {
427        let dir = tmp_dir();
428        // threshold=0 so every call is above-threshold
429        let log = logger(&dir, 0, 10);
430        let scope = empty_scope();
431
432        let calls = 10_000u64;
433        for i in 0..calls {
434            log.record(QueryKind::Select, 1, format!("SELECT {i}"), &scope);
435        }
436        flush(&log);
437
438        let lines = read_log_lines(&dir);
439        let emitted = lines.len() as u64;
440        // Expected: 1000 ± 200 (2pp of 10_000)
441        assert!(
442            emitted >= 800 && emitted <= 1200,
443            "sample_pct=10 over {calls} calls emitted {emitted} (expected 800..=1200)"
444        );
445    }
446
447    // -----------------------------------------------------------------------
448    // Adversarial SQL: CRLF / NUL / quote → escape-safe JSON
449    // -----------------------------------------------------------------------
450
451    #[test]
452    fn adversarial_sql_is_escape_safe() {
453        let payloads: &[(&str, &str)] = &[
454            ("crlf", "SELECT 1\r\nDROP TABLE t--"),
455            ("nul", "SELECT '\x00'"),
456            ("quote", r#"SELECT "secret" FROM t"#),
457            ("json_inject", r#"SELECT 1},"pwned":true,{"x":"#),
458            ("low_ctrl", "SELECT \x01\x02\x07\x1f"),
459            ("backslash", "SELECT 'C:\\path\\file'"),
460        ];
461
462        for (label, sql) in payloads {
463            let dir = tmp_dir();
464            let log = logger(&dir, 0, 100);
465            let scope = empty_scope();
466
467            log.record(QueryKind::Select, 1, sql.to_string(), &scope);
468            flush(&log);
469
470            let path = reddb_file::layout::legacy_slow_query_log_path(&dir);
471            let body = std::fs::read_to_string(&path).unwrap_or_default();
472            let line = body
473                .lines()
474                .find(|l| !l.is_empty())
475                .unwrap_or_else(|| panic!("{label}: no line emitted"));
476
477            // Must be a single JSONL row — no embedded newline.
478            assert!(
479                !line.contains('\n'),
480                "{label}: embedded newline in JSONL row"
481            );
482
483            // Must parse as valid JSON.
484            let v: crate::json::Value = crate::json::from_str(line)
485                .unwrap_or_else(|e| panic!("{label}: not valid JSON: {e}\n{line:?}"));
486
487            // SQL must round-trip correctly.
488            let recovered = v.get("sql").and_then(|x| x.as_str()).unwrap_or("");
489            assert_eq!(recovered, *sql, "{label}: SQL round-trip mismatch");
490        }
491    }
492
493    // -----------------------------------------------------------------------
494    // Exact-threshold boundary: duration == threshold emits
495    // -----------------------------------------------------------------------
496
497    #[test]
498    fn at_threshold_boundary_emits() {
499        let dir = tmp_dir();
500        let log = logger(&dir, 50, 100);
501        let scope = empty_scope();
502
503        // Exactly at threshold: duration_ms < threshold_ms is false → emits.
504        log.record(QueryKind::Select, 50, "SELECT 1".into(), &scope);
505        flush(&log);
506        let lines = read_log_lines(&dir);
507        assert_eq!(lines.len(), 1, "duration == threshold should emit");
508    }
509
510    #[test]
511    fn just_below_threshold_does_not_emit() {
512        let dir = tmp_dir();
513        let log = logger(&dir, 50, 100);
514        let scope = empty_scope();
515
516        log.record(QueryKind::Select, 49, "SELECT 1".into(), &scope);
517        flush(&log);
518        let lines = read_log_lines(&dir);
519        assert!(lines.is_empty(), "duration < threshold must not emit");
520    }
521}