1use std::io::Write;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
12use std::sync::{Arc, Mutex};
13
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15
16use crate::runtime::EffectiveScope;
17
18pub struct SlowQueryOpts {
23 pub log_dir: PathBuf,
24 pub threshold_ms: u64,
25 pub sample_pct: u8,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum QueryKind {
32 Select,
33 Insert,
34 Update,
35 Delete,
36 Bulk,
37 Aggregate,
38 DDL,
39 Internal,
40}
41
42impl QueryKind {
43 pub const ALL: [QueryKind; 8] = [
47 Self::Select,
48 Self::Insert,
49 Self::Update,
50 Self::Delete,
51 Self::Bulk,
52 Self::Aggregate,
53 Self::DDL,
54 Self::Internal,
55 ];
56
57 pub fn as_str(self) -> &'static str {
58 match self {
59 Self::Select => "select",
60 Self::Insert => "insert",
61 Self::Update => "update",
62 Self::Delete => "delete",
63 Self::Bulk => "bulk",
64 Self::Aggregate => "aggregate",
65 Self::DDL => "ddl",
66 Self::Internal => "internal",
67 }
68 }
69
70 pub fn index(self) -> usize {
73 match self {
74 Self::Select => 0,
75 Self::Insert => 1,
76 Self::Update => 2,
77 Self::Delete => 3,
78 Self::Bulk => 4,
79 Self::Aggregate => 5,
80 Self::DDL => 6,
81 Self::Internal => 7,
82 }
83 }
84}
85
86pub struct SlowQueryLogger {
91 writer: Mutex<NonBlocking>,
93 _guard: WorkerGuard,
95 threshold_ms: AtomicU64,
96 sample_pct: AtomicU8,
98 above_count: AtomicU64,
101 store: std::sync::OnceLock<std::sync::Arc<super::slow_query_store::SlowQueryStore>>,
104}
105
106impl SlowQueryLogger {
107 pub fn new(opts: SlowQueryOpts) -> Arc<Self> {
108 let path = reddb_file::layout::legacy_slow_query_log_path(&opts.log_dir);
109 Self::open_at(path, opts.threshold_ms, opts.sample_pct)
110 }
111
112 pub fn for_destination(
117 dest: &crate::storage::layout::LogDestination,
118 fallback_log_dir: &std::path::Path,
119 threshold_ms: u64,
120 sample_pct: u8,
121 ) -> Arc<Self> {
122 use crate::storage::layout::LogDestination;
123 let path = match dest {
124 LogDestination::File(p) => p.clone(),
125 LogDestination::Stderr => {
126 reddb_file::layout::legacy_slow_query_log_path(fallback_log_dir)
127 }
128 LogDestination::Syslog => {
129 tracing::warn!(
130 target: "reddb::slow",
131 "slow-query LogDestination::Syslog requested; sink not implemented, falling back to file"
132 );
133 reddb_file::layout::legacy_slow_query_log_path(fallback_log_dir)
134 }
135 };
136 Self::open_at(path, threshold_ms, sample_pct)
137 }
138
139 fn open_at(path: PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<Self> {
140 if let Some(parent) = path.parent() {
141 let _ = std::fs::create_dir_all(parent);
142 }
143 let file = std::fs::OpenOptions::new()
144 .create(true)
145 .append(true)
146 .open(&path)
147 .unwrap_or_else(|e| panic!("SlowQueryLogger: cannot open {}: {e}", path.display()));
148
149 let (writer, guard) = NonBlockingBuilder::default()
150 .buffered_lines_limit(65_536)
151 .lossy(true)
152 .finish(file);
153
154 Arc::new(Self {
155 writer: Mutex::new(writer),
156 _guard: guard,
157 threshold_ms: AtomicU64::new(threshold_ms),
158 sample_pct: AtomicU8::new(sample_pct.min(100)),
159 above_count: AtomicU64::new(0),
160 store: std::sync::OnceLock::new(),
161 })
162 }
163
164 pub fn attach_store(&self, store: std::sync::Arc<super::slow_query_store::SlowQueryStore>) {
170 let _ = self.store.set(store);
171 }
172
173 pub fn record(
176 &self,
177 kind: QueryKind,
178 duration_ms: u64,
179 sql_redacted: String,
180 scope: &EffectiveScope,
181 ) {
182 if duration_ms < self.threshold_ms.load(Ordering::Relaxed) {
184 return;
185 }
186
187 let pct = u64::from(self.sample_pct.load(Ordering::Relaxed));
191 if pct < 100 {
192 let n = self.above_count.fetch_add(1, Ordering::Relaxed);
193 if (n % 100) >= pct {
194 return;
195 }
196 }
197
198 self.emit(kind, duration_ms, sql_redacted, scope);
199 }
200
201 fn emit(
202 &self,
203 kind: QueryKind,
204 duration_ms: u64,
205 sql_redacted: String,
206 scope: &EffectiveScope,
207 ) {
208 let ts_ms = crate::utils::now_unix_millis();
209 let tenant = scope.tenant.as_deref().unwrap_or("").to_string();
210 let identity = scope
211 .identity
212 .as_ref()
213 .map(|(u, _)| u.as_str())
214 .unwrap_or("")
215 .to_string();
216
217 if let Some(store) = self.store.get() {
221 store.push(super::slow_query_store::SlowQueryEvent {
222 ts_ms,
223 kind: kind.as_str(),
224 duration_ms,
225 sql_redacted: sql_redacted.clone(),
226 tenant_hash: super::slow_query_store::hash_label(&tenant),
227 identity_hash: super::slow_query_store::hash_label(&identity),
228 });
229 }
230
231 let mut map = std::collections::BTreeMap::new();
232 map.insert(
233 "ts_ms".to_string(),
234 crate::json::Value::Number(ts_ms as f64),
235 );
236 map.insert(
237 "kind".to_string(),
238 crate::json::Value::String(kind.as_str().to_string()),
239 );
240 map.insert(
241 "duration_ms".to_string(),
242 crate::json::Value::Number(duration_ms as f64),
243 );
244 map.insert("sql".to_string(), crate::json::Value::String(sql_redacted));
245 map.insert("tenant".to_string(), crate::json::Value::String(tenant));
246 map.insert("identity".to_string(), crate::json::Value::String(identity));
247
248 let obj = crate::json::Value::Object(map);
249 if let Ok(mut line) = crate::json::to_string(&obj) {
250 line.push('\n');
251 if let Ok(mut w) = self.writer.lock() {
252 let _ = w.write_all(line.as_bytes());
253 }
254 }
255 }
256}
257
258#[cfg(test)]
263mod tests {
264 use std::collections::HashSet;
265 use std::time::Instant;
266
267 use super::*;
268 use crate::runtime::EffectiveScope;
269 use crate::storage::transaction::snapshot::Snapshot;
270
271 fn tmp_dir() -> PathBuf {
272 let mut d = std::env::temp_dir();
273 d.push(format!(
274 "reddb-slow-{}-{}",
275 std::process::id(),
276 crate::utils::now_unix_nanos()
277 ));
278 d
279 }
280
281 fn logger(dir: &PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<SlowQueryLogger> {
282 SlowQueryLogger::new(SlowQueryOpts {
283 log_dir: dir.clone(),
284 threshold_ms,
285 sample_pct,
286 })
287 }
288
289 fn empty_scope() -> EffectiveScope {
290 EffectiveScope {
291 tenant: None,
292 identity: None,
293 snapshot: Snapshot {
294 xid: 0,
295 in_progress: HashSet::new(),
296 },
297 visible_collections: None,
298 }
299 }
300
301 fn flush(_log: &Arc<SlowQueryLogger>) {
302 std::thread::sleep(std::time::Duration::from_millis(50));
305 }
306
307 fn read_log_lines(dir: &PathBuf) -> Vec<crate::json::Value> {
308 let path = reddb_file::layout::legacy_slow_query_log_path(dir);
309 let body = std::fs::read_to_string(&path).unwrap_or_default();
310 body.lines()
311 .filter(|l| !l.is_empty())
312 .map(|l| crate::json::from_str::<crate::json::Value>(l).expect("valid JSON"))
313 .collect()
314 }
315
316 #[test]
321 fn below_threshold_no_file_writes() {
322 let dir = tmp_dir();
323 let log = logger(&dir, 1000, 100);
324 let scope = empty_scope();
325
326 for _ in 0..10_000 {
327 log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
328 }
329
330 flush(&log);
331 let lines = read_log_lines(&dir);
332 assert!(
333 lines.is_empty(),
334 "expected zero writes, got {}",
335 lines.len()
336 );
337 }
338
339 #[test]
340 fn below_threshold_wall_time_under_10ms() {
341 let dir = tmp_dir();
342 let log = logger(&dir, 1000, 100);
343 let scope = empty_scope();
344
345 let start = Instant::now();
346 for _ in 0..10_000 {
347 log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
348 }
349 let elapsed = start.elapsed();
350 assert!(
351 elapsed.as_millis() < 10,
352 "10k below-threshold calls took {}ms (>10ms budget)",
353 elapsed.as_millis()
354 );
355 }
356
357 #[test]
362 fn above_threshold_emits_json_line() {
363 let dir = tmp_dir();
364 let log = logger(&dir, 10, 100);
365 let scope = empty_scope();
366
367 log.record(QueryKind::Select, 100, "SELECT * FROM t".into(), &scope);
368 flush(&log);
369
370 let lines = read_log_lines(&dir);
371 assert_eq!(lines.len(), 1, "expected 1 line");
372 let v = &lines[0];
373 assert_eq!(v.get("kind").and_then(|x| x.as_str()), Some("select"));
374 assert_eq!(v.get("duration_ms").and_then(|x| x.as_i64()), Some(100));
375 let sql = v.get("sql").and_then(|x| x.as_str());
376 assert_eq!(sql, Some("SELECT * FROM t"));
377 }
378
379 #[test]
380 fn json_line_has_all_required_fields() {
381 let dir = tmp_dir();
382 let log = logger(&dir, 0, 100);
383 let scope = empty_scope();
384
385 log.record(
386 QueryKind::Insert,
387 42,
388 "INSERT INTO t VALUES (1)".into(),
389 &scope,
390 );
391 flush(&log);
392
393 let lines = read_log_lines(&dir);
394 assert_eq!(lines.len(), 1);
395 let v = &lines[0];
396 assert!(v.get("ts_ms").is_some(), "missing ts_ms");
397 assert!(v.get("kind").is_some(), "missing kind");
398 assert!(v.get("duration_ms").is_some(), "missing duration_ms");
399 assert!(v.get("sql").is_some(), "missing sql");
400 assert!(v.get("tenant").is_some(), "missing tenant");
401 assert!(v.get("identity").is_some(), "missing identity");
402 }
403
404 #[test]
405 fn all_query_kinds_serialise() {
406 let kinds = [
407 QueryKind::Select,
408 QueryKind::Insert,
409 QueryKind::Update,
410 QueryKind::Delete,
411 QueryKind::Bulk,
412 QueryKind::Aggregate,
413 QueryKind::DDL,
414 QueryKind::Internal,
415 ];
416 for k in kinds {
417 assert!(!k.as_str().is_empty());
418 }
419 }
420
421 #[test]
426 fn sampling_property_10pct() {
427 let dir = tmp_dir();
428 let log = logger(&dir, 0, 10);
430 let scope = empty_scope();
431
432 let calls = 10_000u64;
433 for i in 0..calls {
434 log.record(QueryKind::Select, 1, format!("SELECT {i}"), &scope);
435 }
436 flush(&log);
437
438 let lines = read_log_lines(&dir);
439 let emitted = lines.len() as u64;
440 assert!(
442 emitted >= 800 && emitted <= 1200,
443 "sample_pct=10 over {calls} calls emitted {emitted} (expected 800..=1200)"
444 );
445 }
446
447 #[test]
452 fn adversarial_sql_is_escape_safe() {
453 let payloads: &[(&str, &str)] = &[
454 ("crlf", "SELECT 1\r\nDROP TABLE t--"),
455 ("nul", "SELECT '\x00'"),
456 ("quote", r#"SELECT "secret" FROM t"#),
457 ("json_inject", r#"SELECT 1},"pwned":true,{"x":"#),
458 ("low_ctrl", "SELECT \x01\x02\x07\x1f"),
459 ("backslash", "SELECT 'C:\\path\\file'"),
460 ];
461
462 for (label, sql) in payloads {
463 let dir = tmp_dir();
464 let log = logger(&dir, 0, 100);
465 let scope = empty_scope();
466
467 log.record(QueryKind::Select, 1, sql.to_string(), &scope);
468 flush(&log);
469
470 let path = reddb_file::layout::legacy_slow_query_log_path(&dir);
471 let body = std::fs::read_to_string(&path).unwrap_or_default();
472 let line = body
473 .lines()
474 .find(|l| !l.is_empty())
475 .unwrap_or_else(|| panic!("{label}: no line emitted"));
476
477 assert!(
479 !line.contains('\n'),
480 "{label}: embedded newline in JSONL row"
481 );
482
483 let v: crate::json::Value = crate::json::from_str(line)
485 .unwrap_or_else(|e| panic!("{label}: not valid JSON: {e}\n{line:?}"));
486
487 let recovered = v.get("sql").and_then(|x| x.as_str()).unwrap_or("");
489 assert_eq!(recovered, *sql, "{label}: SQL round-trip mismatch");
490 }
491 }
492
493 #[test]
498 fn at_threshold_boundary_emits() {
499 let dir = tmp_dir();
500 let log = logger(&dir, 50, 100);
501 let scope = empty_scope();
502
503 log.record(QueryKind::Select, 50, "SELECT 1".into(), &scope);
505 flush(&log);
506 let lines = read_log_lines(&dir);
507 assert_eq!(lines.len(), 1, "duration == threshold should emit");
508 }
509
510 #[test]
511 fn just_below_threshold_does_not_emit() {
512 let dir = tmp_dir();
513 let log = logger(&dir, 50, 100);
514 let scope = empty_scope();
515
516 log.record(QueryKind::Select, 49, "SELECT 1".into(), &scope);
517 flush(&log);
518 let lines = read_log_lines(&dir);
519 assert!(lines.is_empty(), "duration < threshold must not emit");
520 }
521}