1use std::io::Write;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
12use std::sync::{Arc, Mutex};
13
14use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
15
16use crate::runtime::EffectiveScope;
17
18pub struct SlowQueryOpts {
23 pub log_dir: PathBuf,
24 pub threshold_ms: u64,
25 pub sample_pct: u8,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum QueryKind {
32 Select,
33 Insert,
34 Update,
35 Delete,
36 Bulk,
37 Aggregate,
38 DDL,
39 Internal,
40}
41
42impl QueryKind {
43 fn as_str(self) -> &'static str {
44 match self {
45 Self::Select => "select",
46 Self::Insert => "insert",
47 Self::Update => "update",
48 Self::Delete => "delete",
49 Self::Bulk => "bulk",
50 Self::Aggregate => "aggregate",
51 Self::DDL => "ddl",
52 Self::Internal => "internal",
53 }
54 }
55}
56
57pub struct SlowQueryLogger {
62 writer: Mutex<NonBlocking>,
64 _guard: WorkerGuard,
66 threshold_ms: AtomicU64,
67 sample_pct: AtomicU8,
69 above_count: AtomicU64,
72}
73
74impl SlowQueryLogger {
75 pub fn new(opts: SlowQueryOpts) -> Arc<Self> {
76 let _ = std::fs::create_dir_all(&opts.log_dir);
77 let path = opts.log_dir.join("red-slow.log");
78 let file = std::fs::OpenOptions::new()
79 .create(true)
80 .append(true)
81 .open(&path)
82 .unwrap_or_else(|e| panic!("SlowQueryLogger: cannot open {}: {e}", path.display()));
83
84 let (writer, guard) = NonBlockingBuilder::default()
85 .buffered_lines_limit(65_536)
86 .lossy(true)
87 .finish(file);
88
89 Arc::new(Self {
90 writer: Mutex::new(writer),
91 _guard: guard,
92 threshold_ms: AtomicU64::new(opts.threshold_ms),
93 sample_pct: AtomicU8::new(opts.sample_pct.min(100)),
94 above_count: AtomicU64::new(0),
95 })
96 }
97
98 pub fn record(
101 &self,
102 kind: QueryKind,
103 duration_ms: u64,
104 sql_redacted: String,
105 scope: &EffectiveScope,
106 ) {
107 if duration_ms < self.threshold_ms.load(Ordering::Relaxed) {
109 return;
110 }
111
112 let pct = u64::from(self.sample_pct.load(Ordering::Relaxed));
116 if pct < 100 {
117 let n = self.above_count.fetch_add(1, Ordering::Relaxed);
118 if (n % 100) >= pct {
119 return;
120 }
121 }
122
123 self.emit(kind, duration_ms, sql_redacted, scope);
124 }
125
126 fn emit(
127 &self,
128 kind: QueryKind,
129 duration_ms: u64,
130 sql_redacted: String,
131 scope: &EffectiveScope,
132 ) {
133 let ts_ms = crate::utils::now_unix_millis();
134 let tenant = scope.tenant.as_deref().unwrap_or("").to_string();
135 let identity = scope
136 .identity
137 .as_ref()
138 .map(|(u, _)| u.as_str())
139 .unwrap_or("")
140 .to_string();
141
142 let mut map = std::collections::BTreeMap::new();
143 map.insert(
144 "ts_ms".to_string(),
145 crate::json::Value::Number(ts_ms as f64),
146 );
147 map.insert(
148 "kind".to_string(),
149 crate::json::Value::String(kind.as_str().to_string()),
150 );
151 map.insert(
152 "duration_ms".to_string(),
153 crate::json::Value::Number(duration_ms as f64),
154 );
155 map.insert("sql".to_string(), crate::json::Value::String(sql_redacted));
156 map.insert("tenant".to_string(), crate::json::Value::String(tenant));
157 map.insert("identity".to_string(), crate::json::Value::String(identity));
158
159 let obj = crate::json::Value::Object(map);
160 if let Ok(mut line) = crate::json::to_string(&obj) {
161 line.push('\n');
162 if let Ok(mut w) = self.writer.lock() {
163 let _ = w.write_all(line.as_bytes());
164 }
165 }
166 }
167}
168
169#[cfg(test)]
174mod tests {
175 use std::collections::HashSet;
176 use std::time::Instant;
177
178 use super::*;
179 use crate::runtime::EffectiveScope;
180 use crate::storage::transaction::snapshot::Snapshot;
181
182 fn tmp_dir() -> PathBuf {
183 let mut d = std::env::temp_dir();
184 d.push(format!(
185 "reddb-slow-{}-{}",
186 std::process::id(),
187 crate::utils::now_unix_nanos()
188 ));
189 d
190 }
191
192 fn logger(dir: &PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<SlowQueryLogger> {
193 SlowQueryLogger::new(SlowQueryOpts {
194 log_dir: dir.clone(),
195 threshold_ms,
196 sample_pct,
197 })
198 }
199
200 fn empty_scope() -> EffectiveScope {
201 EffectiveScope {
202 tenant: None,
203 identity: None,
204 snapshot: Snapshot {
205 xid: 0,
206 in_progress: HashSet::new(),
207 },
208 visible_collections: None,
209 }
210 }
211
212 fn flush(_log: &Arc<SlowQueryLogger>) {
213 std::thread::sleep(std::time::Duration::from_millis(50));
216 }
217
218 fn read_log_lines(dir: &PathBuf) -> Vec<crate::json::Value> {
219 let path = dir.join("red-slow.log");
220 let body = std::fs::read_to_string(&path).unwrap_or_default();
221 body.lines()
222 .filter(|l| !l.is_empty())
223 .map(|l| crate::json::from_str::<crate::json::Value>(l).expect("valid JSON"))
224 .collect()
225 }
226
227 #[test]
232 fn below_threshold_no_file_writes() {
233 let dir = tmp_dir();
234 let log = logger(&dir, 1000, 100);
235 let scope = empty_scope();
236
237 for _ in 0..10_000 {
238 log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
239 }
240
241 flush(&log);
242 let lines = read_log_lines(&dir);
243 assert!(
244 lines.is_empty(),
245 "expected zero writes, got {}",
246 lines.len()
247 );
248 }
249
250 #[test]
251 fn below_threshold_wall_time_under_10ms() {
252 let dir = tmp_dir();
253 let log = logger(&dir, 1000, 100);
254 let scope = empty_scope();
255
256 let start = Instant::now();
257 for _ in 0..10_000 {
258 log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
259 }
260 let elapsed = start.elapsed();
261 assert!(
262 elapsed.as_millis() < 10,
263 "10k below-threshold calls took {}ms (>10ms budget)",
264 elapsed.as_millis()
265 );
266 }
267
268 #[test]
273 fn above_threshold_emits_json_line() {
274 let dir = tmp_dir();
275 let log = logger(&dir, 10, 100);
276 let scope = empty_scope();
277
278 log.record(QueryKind::Select, 100, "SELECT * FROM t".into(), &scope);
279 flush(&log);
280
281 let lines = read_log_lines(&dir);
282 assert_eq!(lines.len(), 1, "expected 1 line");
283 let v = &lines[0];
284 assert_eq!(v.get("kind").and_then(|x| x.as_str()), Some("select"));
285 assert_eq!(v.get("duration_ms").and_then(|x| x.as_i64()), Some(100));
286 let sql = v.get("sql").and_then(|x| x.as_str());
287 assert_eq!(sql, Some("SELECT * FROM t"));
288 }
289
290 #[test]
291 fn json_line_has_all_required_fields() {
292 let dir = tmp_dir();
293 let log = logger(&dir, 0, 100);
294 let scope = empty_scope();
295
296 log.record(
297 QueryKind::Insert,
298 42,
299 "INSERT INTO t VALUES (1)".into(),
300 &scope,
301 );
302 flush(&log);
303
304 let lines = read_log_lines(&dir);
305 assert_eq!(lines.len(), 1);
306 let v = &lines[0];
307 assert!(v.get("ts_ms").is_some(), "missing ts_ms");
308 assert!(v.get("kind").is_some(), "missing kind");
309 assert!(v.get("duration_ms").is_some(), "missing duration_ms");
310 assert!(v.get("sql").is_some(), "missing sql");
311 assert!(v.get("tenant").is_some(), "missing tenant");
312 assert!(v.get("identity").is_some(), "missing identity");
313 }
314
315 #[test]
316 fn all_query_kinds_serialise() {
317 let kinds = [
318 QueryKind::Select,
319 QueryKind::Insert,
320 QueryKind::Update,
321 QueryKind::Delete,
322 QueryKind::Bulk,
323 QueryKind::Aggregate,
324 QueryKind::DDL,
325 QueryKind::Internal,
326 ];
327 for k in kinds {
328 assert!(!k.as_str().is_empty());
329 }
330 }
331
332 #[test]
337 fn sampling_property_10pct() {
338 let dir = tmp_dir();
339 let log = logger(&dir, 0, 10);
341 let scope = empty_scope();
342
343 let calls = 10_000u64;
344 for i in 0..calls {
345 log.record(QueryKind::Select, 1, format!("SELECT {i}"), &scope);
346 }
347 flush(&log);
348
349 let lines = read_log_lines(&dir);
350 let emitted = lines.len() as u64;
351 assert!(
353 emitted >= 800 && emitted <= 1200,
354 "sample_pct=10 over {calls} calls emitted {emitted} (expected 800..=1200)"
355 );
356 }
357
358 #[test]
363 fn adversarial_sql_is_escape_safe() {
364 let payloads: &[(&str, &str)] = &[
365 ("crlf", "SELECT 1\r\nDROP TABLE t--"),
366 ("nul", "SELECT '\x00'"),
367 ("quote", r#"SELECT "secret" FROM t"#),
368 ("json_inject", r#"SELECT 1},"pwned":true,{"x":"#),
369 ("low_ctrl", "SELECT \x01\x02\x07\x1f"),
370 ("backslash", "SELECT 'C:\\path\\file'"),
371 ];
372
373 for (label, sql) in payloads {
374 let dir = tmp_dir();
375 let log = logger(&dir, 0, 100);
376 let scope = empty_scope();
377
378 log.record(QueryKind::Select, 1, sql.to_string(), &scope);
379 flush(&log);
380
381 let path = dir.join("red-slow.log");
382 let body = std::fs::read_to_string(&path).unwrap_or_default();
383 let line = body
384 .lines()
385 .find(|l| !l.is_empty())
386 .unwrap_or_else(|| panic!("{label}: no line emitted"));
387
388 assert!(
390 !line.contains('\n'),
391 "{label}: embedded newline in JSONL row"
392 );
393
394 let v: crate::json::Value = crate::json::from_str(line)
396 .unwrap_or_else(|e| panic!("{label}: not valid JSON: {e}\n{line:?}"));
397
398 let recovered = v.get("sql").and_then(|x| x.as_str()).unwrap_or("");
400 assert_eq!(recovered, *sql, "{label}: SQL round-trip mismatch");
401 }
402 }
403
404 #[test]
409 fn at_threshold_boundary_emits() {
410 let dir = tmp_dir();
411 let log = logger(&dir, 50, 100);
412 let scope = empty_scope();
413
414 log.record(QueryKind::Select, 50, "SELECT 1".into(), &scope);
416 flush(&log);
417 let lines = read_log_lines(&dir);
418 assert_eq!(lines.len(), 1, "duration == threshold should emit");
419 }
420
421 #[test]
422 fn just_below_threshold_does_not_emit() {
423 let dir = tmp_dir();
424 let log = logger(&dir, 50, 100);
425 let scope = empty_scope();
426
427 log.record(QueryKind::Select, 49, "SELECT 1".into(), &scope);
428 flush(&log);
429 let lines = read_log_lines(&dir);
430 assert!(lines.is_empty(), "duration < threshold must not emit");
431 }
432}