1use std::io::Write;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
12use std::sync::{Arc, Mutex};
13
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15
16use crate::runtime::EffectiveScope;
17
18pub struct SlowQueryOpts {
23 pub log_dir: PathBuf,
24 pub threshold_ms: u64,
25 pub sample_pct: u8,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum QueryKind {
32 Select,
33 Insert,
34 Update,
35 Delete,
36 Bulk,
37 Aggregate,
38 DDL,
39 Internal,
40}
41
42impl QueryKind {
43 fn as_str(self) -> &'static str {
44 match self {
45 Self::Select => "select",
46 Self::Insert => "insert",
47 Self::Update => "update",
48 Self::Delete => "delete",
49 Self::Bulk => "bulk",
50 Self::Aggregate => "aggregate",
51 Self::DDL => "ddl",
52 Self::Internal => "internal",
53 }
54 }
55}
56
57pub struct SlowQueryLogger {
62 writer: Mutex<NonBlocking>,
64 _guard: WorkerGuard,
66 threshold_ms: AtomicU64,
67 sample_pct: AtomicU8,
69 above_count: AtomicU64,
72}
73
74impl SlowQueryLogger {
75 pub fn new(opts: SlowQueryOpts) -> Arc<Self> {
76 let path = reddb_file::layout::legacy_slow_query_log_path(&opts.log_dir);
77 Self::open_at(path, opts.threshold_ms, opts.sample_pct)
78 }
79
80 pub fn for_destination(
85 dest: &crate::storage::layout::LogDestination,
86 fallback_log_dir: &std::path::Path,
87 threshold_ms: u64,
88 sample_pct: u8,
89 ) -> Arc<Self> {
90 use crate::storage::layout::LogDestination;
91 let path = match dest {
92 LogDestination::File(p) => p.clone(),
93 LogDestination::Stderr => {
94 reddb_file::layout::legacy_slow_query_log_path(fallback_log_dir)
95 }
96 LogDestination::Syslog => {
97 tracing::warn!(
98 target: "reddb::slow",
99 "slow-query LogDestination::Syslog requested; sink not implemented, falling back to file"
100 );
101 reddb_file::layout::legacy_slow_query_log_path(fallback_log_dir)
102 }
103 };
104 Self::open_at(path, threshold_ms, sample_pct)
105 }
106
107 fn open_at(path: PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<Self> {
108 if let Some(parent) = path.parent() {
109 let _ = std::fs::create_dir_all(parent);
110 }
111 let file = std::fs::OpenOptions::new()
112 .create(true)
113 .append(true)
114 .open(&path)
115 .unwrap_or_else(|e| panic!("SlowQueryLogger: cannot open {}: {e}", path.display()));
116
117 let (writer, guard) = NonBlockingBuilder::default()
118 .buffered_lines_limit(65_536)
119 .lossy(true)
120 .finish(file);
121
122 Arc::new(Self {
123 writer: Mutex::new(writer),
124 _guard: guard,
125 threshold_ms: AtomicU64::new(threshold_ms),
126 sample_pct: AtomicU8::new(sample_pct.min(100)),
127 above_count: AtomicU64::new(0),
128 })
129 }
130
131 pub fn record(
134 &self,
135 kind: QueryKind,
136 duration_ms: u64,
137 sql_redacted: String,
138 scope: &EffectiveScope,
139 ) {
140 if duration_ms < self.threshold_ms.load(Ordering::Relaxed) {
142 return;
143 }
144
145 let pct = u64::from(self.sample_pct.load(Ordering::Relaxed));
149 if pct < 100 {
150 let n = self.above_count.fetch_add(1, Ordering::Relaxed);
151 if (n % 100) >= pct {
152 return;
153 }
154 }
155
156 self.emit(kind, duration_ms, sql_redacted, scope);
157 }
158
159 fn emit(
160 &self,
161 kind: QueryKind,
162 duration_ms: u64,
163 sql_redacted: String,
164 scope: &EffectiveScope,
165 ) {
166 let ts_ms = crate::utils::now_unix_millis();
167 let tenant = scope.tenant.as_deref().unwrap_or("").to_string();
168 let identity = scope
169 .identity
170 .as_ref()
171 .map(|(u, _)| u.as_str())
172 .unwrap_or("")
173 .to_string();
174
175 let mut map = std::collections::BTreeMap::new();
176 map.insert(
177 "ts_ms".to_string(),
178 crate::json::Value::Number(ts_ms as f64),
179 );
180 map.insert(
181 "kind".to_string(),
182 crate::json::Value::String(kind.as_str().to_string()),
183 );
184 map.insert(
185 "duration_ms".to_string(),
186 crate::json::Value::Number(duration_ms as f64),
187 );
188 map.insert("sql".to_string(), crate::json::Value::String(sql_redacted));
189 map.insert("tenant".to_string(), crate::json::Value::String(tenant));
190 map.insert("identity".to_string(), crate::json::Value::String(identity));
191
192 let obj = crate::json::Value::Object(map);
193 if let Ok(mut line) = crate::json::to_string(&obj) {
194 line.push('\n');
195 if let Ok(mut w) = self.writer.lock() {
196 let _ = w.write_all(line.as_bytes());
197 }
198 }
199 }
200}
201
202#[cfg(test)]
207mod tests {
208 use std::collections::HashSet;
209 use std::time::Instant;
210
211 use super::*;
212 use crate::runtime::EffectiveScope;
213 use crate::storage::transaction::snapshot::Snapshot;
214
215 fn tmp_dir() -> PathBuf {
216 let mut d = std::env::temp_dir();
217 d.push(format!(
218 "reddb-slow-{}-{}",
219 std::process::id(),
220 crate::utils::now_unix_nanos()
221 ));
222 d
223 }
224
225 fn logger(dir: &PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<SlowQueryLogger> {
226 SlowQueryLogger::new(SlowQueryOpts {
227 log_dir: dir.clone(),
228 threshold_ms,
229 sample_pct,
230 })
231 }
232
233 fn empty_scope() -> EffectiveScope {
234 EffectiveScope {
235 tenant: None,
236 identity: None,
237 snapshot: Snapshot {
238 xid: 0,
239 in_progress: HashSet::new(),
240 },
241 visible_collections: None,
242 }
243 }
244
245 fn flush(_log: &Arc<SlowQueryLogger>) {
246 std::thread::sleep(std::time::Duration::from_millis(50));
249 }
250
251 fn read_log_lines(dir: &PathBuf) -> Vec<crate::json::Value> {
252 let path = reddb_file::layout::legacy_slow_query_log_path(dir);
253 let body = std::fs::read_to_string(&path).unwrap_or_default();
254 body.lines()
255 .filter(|l| !l.is_empty())
256 .map(|l| crate::json::from_str::<crate::json::Value>(l).expect("valid JSON"))
257 .collect()
258 }
259
260 #[test]
265 fn below_threshold_no_file_writes() {
266 let dir = tmp_dir();
267 let log = logger(&dir, 1000, 100);
268 let scope = empty_scope();
269
270 for _ in 0..10_000 {
271 log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
272 }
273
274 flush(&log);
275 let lines = read_log_lines(&dir);
276 assert!(
277 lines.is_empty(),
278 "expected zero writes, got {}",
279 lines.len()
280 );
281 }
282
283 #[test]
284 fn below_threshold_wall_time_under_10ms() {
285 let dir = tmp_dir();
286 let log = logger(&dir, 1000, 100);
287 let scope = empty_scope();
288
289 let start = Instant::now();
290 for _ in 0..10_000 {
291 log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
292 }
293 let elapsed = start.elapsed();
294 assert!(
295 elapsed.as_millis() < 10,
296 "10k below-threshold calls took {}ms (>10ms budget)",
297 elapsed.as_millis()
298 );
299 }
300
301 #[test]
306 fn above_threshold_emits_json_line() {
307 let dir = tmp_dir();
308 let log = logger(&dir, 10, 100);
309 let scope = empty_scope();
310
311 log.record(QueryKind::Select, 100, "SELECT * FROM t".into(), &scope);
312 flush(&log);
313
314 let lines = read_log_lines(&dir);
315 assert_eq!(lines.len(), 1, "expected 1 line");
316 let v = &lines[0];
317 assert_eq!(v.get("kind").and_then(|x| x.as_str()), Some("select"));
318 assert_eq!(v.get("duration_ms").and_then(|x| x.as_i64()), Some(100));
319 let sql = v.get("sql").and_then(|x| x.as_str());
320 assert_eq!(sql, Some("SELECT * FROM t"));
321 }
322
323 #[test]
324 fn json_line_has_all_required_fields() {
325 let dir = tmp_dir();
326 let log = logger(&dir, 0, 100);
327 let scope = empty_scope();
328
329 log.record(
330 QueryKind::Insert,
331 42,
332 "INSERT INTO t VALUES (1)".into(),
333 &scope,
334 );
335 flush(&log);
336
337 let lines = read_log_lines(&dir);
338 assert_eq!(lines.len(), 1);
339 let v = &lines[0];
340 assert!(v.get("ts_ms").is_some(), "missing ts_ms");
341 assert!(v.get("kind").is_some(), "missing kind");
342 assert!(v.get("duration_ms").is_some(), "missing duration_ms");
343 assert!(v.get("sql").is_some(), "missing sql");
344 assert!(v.get("tenant").is_some(), "missing tenant");
345 assert!(v.get("identity").is_some(), "missing identity");
346 }
347
348 #[test]
349 fn all_query_kinds_serialise() {
350 let kinds = [
351 QueryKind::Select,
352 QueryKind::Insert,
353 QueryKind::Update,
354 QueryKind::Delete,
355 QueryKind::Bulk,
356 QueryKind::Aggregate,
357 QueryKind::DDL,
358 QueryKind::Internal,
359 ];
360 for k in kinds {
361 assert!(!k.as_str().is_empty());
362 }
363 }
364
365 #[test]
370 fn sampling_property_10pct() {
371 let dir = tmp_dir();
372 let log = logger(&dir, 0, 10);
374 let scope = empty_scope();
375
376 let calls = 10_000u64;
377 for i in 0..calls {
378 log.record(QueryKind::Select, 1, format!("SELECT {i}"), &scope);
379 }
380 flush(&log);
381
382 let lines = read_log_lines(&dir);
383 let emitted = lines.len() as u64;
384 assert!(
386 emitted >= 800 && emitted <= 1200,
387 "sample_pct=10 over {calls} calls emitted {emitted} (expected 800..=1200)"
388 );
389 }
390
391 #[test]
396 fn adversarial_sql_is_escape_safe() {
397 let payloads: &[(&str, &str)] = &[
398 ("crlf", "SELECT 1\r\nDROP TABLE t--"),
399 ("nul", "SELECT '\x00'"),
400 ("quote", r#"SELECT "secret" FROM t"#),
401 ("json_inject", r#"SELECT 1},"pwned":true,{"x":"#),
402 ("low_ctrl", "SELECT \x01\x02\x07\x1f"),
403 ("backslash", "SELECT 'C:\\path\\file'"),
404 ];
405
406 for (label, sql) in payloads {
407 let dir = tmp_dir();
408 let log = logger(&dir, 0, 100);
409 let scope = empty_scope();
410
411 log.record(QueryKind::Select, 1, sql.to_string(), &scope);
412 flush(&log);
413
414 let path = reddb_file::layout::legacy_slow_query_log_path(&dir);
415 let body = std::fs::read_to_string(&path).unwrap_or_default();
416 let line = body
417 .lines()
418 .find(|l| !l.is_empty())
419 .unwrap_or_else(|| panic!("{label}: no line emitted"));
420
421 assert!(
423 !line.contains('\n'),
424 "{label}: embedded newline in JSONL row"
425 );
426
427 let v: crate::json::Value = crate::json::from_str(line)
429 .unwrap_or_else(|e| panic!("{label}: not valid JSON: {e}\n{line:?}"));
430
431 let recovered = v.get("sql").and_then(|x| x.as_str()).unwrap_or("");
433 assert_eq!(recovered, *sql, "{label}: SQL round-trip mismatch");
434 }
435 }
436
437 #[test]
442 fn at_threshold_boundary_emits() {
443 let dir = tmp_dir();
444 let log = logger(&dir, 50, 100);
445 let scope = empty_scope();
446
447 log.record(QueryKind::Select, 50, "SELECT 1".into(), &scope);
449 flush(&log);
450 let lines = read_log_lines(&dir);
451 assert_eq!(lines.len(), 1, "duration == threshold should emit");
452 }
453
454 #[test]
455 fn just_below_threshold_does_not_emit() {
456 let dir = tmp_dir();
457 let log = logger(&dir, 50, 100);
458 let scope = empty_scope();
459
460 log.record(QueryKind::Select, 49, "SELECT 1".into(), &scope);
461 flush(&log);
462 let lines = read_log_lines(&dir);
463 assert!(lines.is_empty(), "duration < threshold must not emit");
464 }
465}