1use std::io::Write;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
12use std::sync::{Arc, Mutex};
13
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15
16use crate::runtime::EffectiveScope;
17
18pub struct SlowQueryOpts {
23 pub log_dir: PathBuf,
24 pub threshold_ms: u64,
25 pub sample_pct: u8,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum QueryKind {
32 Select,
33 Insert,
34 Update,
35 Delete,
36 Bulk,
37 Aggregate,
38 DDL,
39 Internal,
40}
41
42impl QueryKind {
43 fn as_str(self) -> &'static str {
44 match self {
45 Self::Select => "select",
46 Self::Insert => "insert",
47 Self::Update => "update",
48 Self::Delete => "delete",
49 Self::Bulk => "bulk",
50 Self::Aggregate => "aggregate",
51 Self::DDL => "ddl",
52 Self::Internal => "internal",
53 }
54 }
55}
56
57pub struct SlowQueryLogger {
62 writer: Mutex<NonBlocking>,
64 _guard: WorkerGuard,
66 threshold_ms: AtomicU64,
67 sample_pct: AtomicU8,
69 above_count: AtomicU64,
72}
73
74impl SlowQueryLogger {
75 pub fn new(opts: SlowQueryOpts) -> Arc<Self> {
76 let path = opts.log_dir.join("red-slow.log");
77 Self::open_at(path, opts.threshold_ms, opts.sample_pct)
78 }
79
80 pub fn for_destination(
85 dest: &crate::storage::layout::LogDestination,
86 fallback_log_dir: &std::path::Path,
87 threshold_ms: u64,
88 sample_pct: u8,
89 ) -> Arc<Self> {
90 use crate::storage::layout::LogDestination;
91 let path = match dest {
92 LogDestination::File(p) => p.clone(),
93 LogDestination::Stderr => fallback_log_dir.join("red-slow.log"),
94 LogDestination::Syslog => {
95 tracing::warn!(
96 target: "reddb::slow",
97 "slow-query LogDestination::Syslog requested; sink not implemented, falling back to file"
98 );
99 fallback_log_dir.join("red-slow.log")
100 }
101 };
102 Self::open_at(path, threshold_ms, sample_pct)
103 }
104
105 fn open_at(path: PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<Self> {
106 if let Some(parent) = path.parent() {
107 let _ = std::fs::create_dir_all(parent);
108 }
109 let file = std::fs::OpenOptions::new()
110 .create(true)
111 .append(true)
112 .open(&path)
113 .unwrap_or_else(|e| panic!("SlowQueryLogger: cannot open {}: {e}", path.display()));
114
115 let (writer, guard) = NonBlockingBuilder::default()
116 .buffered_lines_limit(65_536)
117 .lossy(true)
118 .finish(file);
119
120 Arc::new(Self {
121 writer: Mutex::new(writer),
122 _guard: guard,
123 threshold_ms: AtomicU64::new(threshold_ms),
124 sample_pct: AtomicU8::new(sample_pct.min(100)),
125 above_count: AtomicU64::new(0),
126 })
127 }
128
129 pub fn record(
132 &self,
133 kind: QueryKind,
134 duration_ms: u64,
135 sql_redacted: String,
136 scope: &EffectiveScope,
137 ) {
138 if duration_ms < self.threshold_ms.load(Ordering::Relaxed) {
140 return;
141 }
142
143 let pct = u64::from(self.sample_pct.load(Ordering::Relaxed));
147 if pct < 100 {
148 let n = self.above_count.fetch_add(1, Ordering::Relaxed);
149 if (n % 100) >= pct {
150 return;
151 }
152 }
153
154 self.emit(kind, duration_ms, sql_redacted, scope);
155 }
156
157 fn emit(
158 &self,
159 kind: QueryKind,
160 duration_ms: u64,
161 sql_redacted: String,
162 scope: &EffectiveScope,
163 ) {
164 let ts_ms = crate::utils::now_unix_millis();
165 let tenant = scope.tenant.as_deref().unwrap_or("").to_string();
166 let identity = scope
167 .identity
168 .as_ref()
169 .map(|(u, _)| u.as_str())
170 .unwrap_or("")
171 .to_string();
172
173 let mut map = std::collections::BTreeMap::new();
174 map.insert(
175 "ts_ms".to_string(),
176 crate::json::Value::Number(ts_ms as f64),
177 );
178 map.insert(
179 "kind".to_string(),
180 crate::json::Value::String(kind.as_str().to_string()),
181 );
182 map.insert(
183 "duration_ms".to_string(),
184 crate::json::Value::Number(duration_ms as f64),
185 );
186 map.insert("sql".to_string(), crate::json::Value::String(sql_redacted));
187 map.insert("tenant".to_string(), crate::json::Value::String(tenant));
188 map.insert("identity".to_string(), crate::json::Value::String(identity));
189
190 let obj = crate::json::Value::Object(map);
191 if let Ok(mut line) = crate::json::to_string(&obj) {
192 line.push('\n');
193 if let Ok(mut w) = self.writer.lock() {
194 let _ = w.write_all(line.as_bytes());
195 }
196 }
197 }
198}
199
200#[cfg(test)]
205mod tests {
206 use std::collections::HashSet;
207 use std::time::Instant;
208
209 use super::*;
210 use crate::runtime::EffectiveScope;
211 use crate::storage::transaction::snapshot::Snapshot;
212
213 fn tmp_dir() -> PathBuf {
214 let mut d = std::env::temp_dir();
215 d.push(format!(
216 "reddb-slow-{}-{}",
217 std::process::id(),
218 crate::utils::now_unix_nanos()
219 ));
220 d
221 }
222
223 fn logger(dir: &PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<SlowQueryLogger> {
224 SlowQueryLogger::new(SlowQueryOpts {
225 log_dir: dir.clone(),
226 threshold_ms,
227 sample_pct,
228 })
229 }
230
231 fn empty_scope() -> EffectiveScope {
232 EffectiveScope {
233 tenant: None,
234 identity: None,
235 snapshot: Snapshot {
236 xid: 0,
237 in_progress: HashSet::new(),
238 },
239 visible_collections: None,
240 }
241 }
242
243 fn flush(_log: &Arc<SlowQueryLogger>) {
244 std::thread::sleep(std::time::Duration::from_millis(50));
247 }
248
249 fn read_log_lines(dir: &PathBuf) -> Vec<crate::json::Value> {
250 let path = dir.join("red-slow.log");
251 let body = std::fs::read_to_string(&path).unwrap_or_default();
252 body.lines()
253 .filter(|l| !l.is_empty())
254 .map(|l| crate::json::from_str::<crate::json::Value>(l).expect("valid JSON"))
255 .collect()
256 }
257
258 #[test]
263 fn below_threshold_no_file_writes() {
264 let dir = tmp_dir();
265 let log = logger(&dir, 1000, 100);
266 let scope = empty_scope();
267
268 for _ in 0..10_000 {
269 log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
270 }
271
272 flush(&log);
273 let lines = read_log_lines(&dir);
274 assert!(
275 lines.is_empty(),
276 "expected zero writes, got {}",
277 lines.len()
278 );
279 }
280
281 #[test]
282 fn below_threshold_wall_time_under_10ms() {
283 let dir = tmp_dir();
284 let log = logger(&dir, 1000, 100);
285 let scope = empty_scope();
286
287 let start = Instant::now();
288 for _ in 0..10_000 {
289 log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
290 }
291 let elapsed = start.elapsed();
292 assert!(
293 elapsed.as_millis() < 10,
294 "10k below-threshold calls took {}ms (>10ms budget)",
295 elapsed.as_millis()
296 );
297 }
298
299 #[test]
304 fn above_threshold_emits_json_line() {
305 let dir = tmp_dir();
306 let log = logger(&dir, 10, 100);
307 let scope = empty_scope();
308
309 log.record(QueryKind::Select, 100, "SELECT * FROM t".into(), &scope);
310 flush(&log);
311
312 let lines = read_log_lines(&dir);
313 assert_eq!(lines.len(), 1, "expected 1 line");
314 let v = &lines[0];
315 assert_eq!(v.get("kind").and_then(|x| x.as_str()), Some("select"));
316 assert_eq!(v.get("duration_ms").and_then(|x| x.as_i64()), Some(100));
317 let sql = v.get("sql").and_then(|x| x.as_str());
318 assert_eq!(sql, Some("SELECT * FROM t"));
319 }
320
321 #[test]
322 fn json_line_has_all_required_fields() {
323 let dir = tmp_dir();
324 let log = logger(&dir, 0, 100);
325 let scope = empty_scope();
326
327 log.record(
328 QueryKind::Insert,
329 42,
330 "INSERT INTO t VALUES (1)".into(),
331 &scope,
332 );
333 flush(&log);
334
335 let lines = read_log_lines(&dir);
336 assert_eq!(lines.len(), 1);
337 let v = &lines[0];
338 assert!(v.get("ts_ms").is_some(), "missing ts_ms");
339 assert!(v.get("kind").is_some(), "missing kind");
340 assert!(v.get("duration_ms").is_some(), "missing duration_ms");
341 assert!(v.get("sql").is_some(), "missing sql");
342 assert!(v.get("tenant").is_some(), "missing tenant");
343 assert!(v.get("identity").is_some(), "missing identity");
344 }
345
346 #[test]
347 fn all_query_kinds_serialise() {
348 let kinds = [
349 QueryKind::Select,
350 QueryKind::Insert,
351 QueryKind::Update,
352 QueryKind::Delete,
353 QueryKind::Bulk,
354 QueryKind::Aggregate,
355 QueryKind::DDL,
356 QueryKind::Internal,
357 ];
358 for k in kinds {
359 assert!(!k.as_str().is_empty());
360 }
361 }
362
363 #[test]
368 fn sampling_property_10pct() {
369 let dir = tmp_dir();
370 let log = logger(&dir, 0, 10);
372 let scope = empty_scope();
373
374 let calls = 10_000u64;
375 for i in 0..calls {
376 log.record(QueryKind::Select, 1, format!("SELECT {i}"), &scope);
377 }
378 flush(&log);
379
380 let lines = read_log_lines(&dir);
381 let emitted = lines.len() as u64;
382 assert!(
384 emitted >= 800 && emitted <= 1200,
385 "sample_pct=10 over {calls} calls emitted {emitted} (expected 800..=1200)"
386 );
387 }
388
389 #[test]
394 fn adversarial_sql_is_escape_safe() {
395 let payloads: &[(&str, &str)] = &[
396 ("crlf", "SELECT 1\r\nDROP TABLE t--"),
397 ("nul", "SELECT '\x00'"),
398 ("quote", r#"SELECT "secret" FROM t"#),
399 ("json_inject", r#"SELECT 1},"pwned":true,{"x":"#),
400 ("low_ctrl", "SELECT \x01\x02\x07\x1f"),
401 ("backslash", "SELECT 'C:\\path\\file'"),
402 ];
403
404 for (label, sql) in payloads {
405 let dir = tmp_dir();
406 let log = logger(&dir, 0, 100);
407 let scope = empty_scope();
408
409 log.record(QueryKind::Select, 1, sql.to_string(), &scope);
410 flush(&log);
411
412 let path = dir.join("red-slow.log");
413 let body = std::fs::read_to_string(&path).unwrap_or_default();
414 let line = body
415 .lines()
416 .find(|l| !l.is_empty())
417 .unwrap_or_else(|| panic!("{label}: no line emitted"));
418
419 assert!(
421 !line.contains('\n'),
422 "{label}: embedded newline in JSONL row"
423 );
424
425 let v: crate::json::Value = crate::json::from_str(line)
427 .unwrap_or_else(|e| panic!("{label}: not valid JSON: {e}\n{line:?}"));
428
429 let recovered = v.get("sql").and_then(|x| x.as_str()).unwrap_or("");
431 assert_eq!(recovered, *sql, "{label}: SQL round-trip mismatch");
432 }
433 }
434
435 #[test]
440 fn at_threshold_boundary_emits() {
441 let dir = tmp_dir();
442 let log = logger(&dir, 50, 100);
443 let scope = empty_scope();
444
445 log.record(QueryKind::Select, 50, "SELECT 1".into(), &scope);
447 flush(&log);
448 let lines = read_log_lines(&dir);
449 assert_eq!(lines.len(), 1, "duration == threshold should emit");
450 }
451
452 #[test]
453 fn just_below_threshold_does_not_emit() {
454 let dir = tmp_dir();
455 let log = logger(&dir, 50, 100);
456 let scope = empty_scope();
457
458 log.record(QueryKind::Select, 49, "SELECT 1".into(), &scope);
459 flush(&log);
460 let lines = read_log_lines(&dir);
461 assert!(lines.is_empty(), "duration < threshold must not emit");
462 }
463}