use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
use std::sync::{Arc, Mutex};
use tracing_appender::non_blocking::{NonBlocking, NonBlockingBuilder, WorkerGuard};
use crate::runtime::EffectiveScope;
pub struct SlowQueryOpts {
pub log_dir: PathBuf,
pub threshold_ms: u64,
pub sample_pct: u8,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum QueryKind {
Select,
Insert,
Update,
Delete,
Bulk,
Aggregate,
DDL,
Internal,
}
impl QueryKind {
fn as_str(self) -> &'static str {
match self {
Self::Select => "select",
Self::Insert => "insert",
Self::Update => "update",
Self::Delete => "delete",
Self::Bulk => "bulk",
Self::Aggregate => "aggregate",
Self::DDL => "ddl",
Self::Internal => "internal",
}
}
}
pub struct SlowQueryLogger {
writer: Mutex<NonBlocking>,
_guard: WorkerGuard,
threshold_ms: AtomicU64,
sample_pct: AtomicU8,
above_count: AtomicU64,
}
impl SlowQueryLogger {
pub fn new(opts: SlowQueryOpts) -> Arc<Self> {
let _ = std::fs::create_dir_all(&opts.log_dir);
let path = opts.log_dir.join("red-slow.log");
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.unwrap_or_else(|e| panic!("SlowQueryLogger: cannot open {}: {e}", path.display()));
let (writer, guard) = NonBlockingBuilder::default()
.buffered_lines_limit(65_536)
.lossy(true)
.finish(file);
Arc::new(Self {
writer: Mutex::new(writer),
_guard: guard,
threshold_ms: AtomicU64::new(opts.threshold_ms),
sample_pct: AtomicU8::new(opts.sample_pct.min(100)),
above_count: AtomicU64::new(0),
})
}
pub fn record(
&self,
kind: QueryKind,
duration_ms: u64,
sql_redacted: String,
scope: &EffectiveScope,
) {
if duration_ms < self.threshold_ms.load(Ordering::Relaxed) {
return;
}
let pct = u64::from(self.sample_pct.load(Ordering::Relaxed));
if pct < 100 {
let n = self.above_count.fetch_add(1, Ordering::Relaxed);
if (n % 100) >= pct {
return;
}
}
self.emit(kind, duration_ms, sql_redacted, scope);
}
fn emit(
&self,
kind: QueryKind,
duration_ms: u64,
sql_redacted: String,
scope: &EffectiveScope,
) {
let ts_ms = crate::utils::now_unix_millis();
let tenant = scope.tenant.as_deref().unwrap_or("").to_string();
let identity = scope
.identity
.as_ref()
.map(|(u, _)| u.as_str())
.unwrap_or("")
.to_string();
let mut map = std::collections::BTreeMap::new();
map.insert(
"ts_ms".to_string(),
crate::json::Value::Number(ts_ms as f64),
);
map.insert(
"kind".to_string(),
crate::json::Value::String(kind.as_str().to_string()),
);
map.insert(
"duration_ms".to_string(),
crate::json::Value::Number(duration_ms as f64),
);
map.insert("sql".to_string(), crate::json::Value::String(sql_redacted));
map.insert("tenant".to_string(), crate::json::Value::String(tenant));
map.insert("identity".to_string(), crate::json::Value::String(identity));
let obj = crate::json::Value::Object(map);
if let Ok(mut line) = crate::json::to_string(&obj) {
line.push('\n');
if let Ok(mut w) = self.writer.lock() {
let _ = w.write_all(line.as_bytes());
}
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::time::Instant;
use super::*;
use crate::runtime::EffectiveScope;
use crate::storage::transaction::snapshot::Snapshot;
fn tmp_dir() -> PathBuf {
let mut d = std::env::temp_dir();
d.push(format!(
"reddb-slow-{}-{}",
std::process::id(),
crate::utils::now_unix_nanos()
));
d
}
fn logger(dir: &PathBuf, threshold_ms: u64, sample_pct: u8) -> Arc<SlowQueryLogger> {
SlowQueryLogger::new(SlowQueryOpts {
log_dir: dir.clone(),
threshold_ms,
sample_pct,
})
}
fn empty_scope() -> EffectiveScope {
EffectiveScope {
tenant: None,
identity: None,
snapshot: Snapshot {
xid: 0,
in_progress: HashSet::new(),
},
visible_collections: None,
}
}
fn flush(_log: &Arc<SlowQueryLogger>) {
std::thread::sleep(std::time::Duration::from_millis(50));
}
fn read_log_lines(dir: &PathBuf) -> Vec<crate::json::Value> {
let path = dir.join("red-slow.log");
let body = std::fs::read_to_string(&path).unwrap_or_default();
body.lines()
.filter(|l| !l.is_empty())
.map(|l| crate::json::from_str::<crate::json::Value>(l).expect("valid JSON"))
.collect()
}
#[test]
fn below_threshold_no_file_writes() {
let dir = tmp_dir();
let log = logger(&dir, 1000, 100);
let scope = empty_scope();
for _ in 0..10_000 {
log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
}
flush(&log);
let lines = read_log_lines(&dir);
assert!(
lines.is_empty(),
"expected zero writes, got {}",
lines.len()
);
}
#[test]
fn below_threshold_wall_time_under_10ms() {
let dir = tmp_dir();
let log = logger(&dir, 1000, 100);
let scope = empty_scope();
let start = Instant::now();
for _ in 0..10_000 {
log.record(QueryKind::Select, 5, "SELECT 1".into(), &scope);
}
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() < 10,
"10k below-threshold calls took {}ms (>10ms budget)",
elapsed.as_millis()
);
}
#[test]
fn above_threshold_emits_json_line() {
let dir = tmp_dir();
let log = logger(&dir, 10, 100);
let scope = empty_scope();
log.record(QueryKind::Select, 100, "SELECT * FROM t".into(), &scope);
flush(&log);
let lines = read_log_lines(&dir);
assert_eq!(lines.len(), 1, "expected 1 line");
let v = &lines[0];
assert_eq!(v.get("kind").and_then(|x| x.as_str()), Some("select"));
assert_eq!(v.get("duration_ms").and_then(|x| x.as_i64()), Some(100));
let sql = v.get("sql").and_then(|x| x.as_str());
assert_eq!(sql, Some("SELECT * FROM t"));
}
#[test]
fn json_line_has_all_required_fields() {
let dir = tmp_dir();
let log = logger(&dir, 0, 100);
let scope = empty_scope();
log.record(
QueryKind::Insert,
42,
"INSERT INTO t VALUES (1)".into(),
&scope,
);
flush(&log);
let lines = read_log_lines(&dir);
assert_eq!(lines.len(), 1);
let v = &lines[0];
assert!(v.get("ts_ms").is_some(), "missing ts_ms");
assert!(v.get("kind").is_some(), "missing kind");
assert!(v.get("duration_ms").is_some(), "missing duration_ms");
assert!(v.get("sql").is_some(), "missing sql");
assert!(v.get("tenant").is_some(), "missing tenant");
assert!(v.get("identity").is_some(), "missing identity");
}
#[test]
fn all_query_kinds_serialise() {
let kinds = [
QueryKind::Select,
QueryKind::Insert,
QueryKind::Update,
QueryKind::Delete,
QueryKind::Bulk,
QueryKind::Aggregate,
QueryKind::DDL,
QueryKind::Internal,
];
for k in kinds {
assert!(!k.as_str().is_empty());
}
}
#[test]
fn sampling_property_10pct() {
let dir = tmp_dir();
let log = logger(&dir, 0, 10);
let scope = empty_scope();
let calls = 10_000u64;
for i in 0..calls {
log.record(QueryKind::Select, 1, format!("SELECT {i}"), &scope);
}
flush(&log);
let lines = read_log_lines(&dir);
let emitted = lines.len() as u64;
assert!(
emitted >= 800 && emitted <= 1200,
"sample_pct=10 over {calls} calls emitted {emitted} (expected 800..=1200)"
);
}
#[test]
fn adversarial_sql_is_escape_safe() {
let payloads: &[(&str, &str)] = &[
("crlf", "SELECT 1\r\nDROP TABLE t--"),
("nul", "SELECT '\x00'"),
("quote", r#"SELECT "secret" FROM t"#),
("json_inject", r#"SELECT 1},"pwned":true,{"x":"#),
("low_ctrl", "SELECT \x01\x02\x07\x1f"),
("backslash", "SELECT 'C:\\path\\file'"),
];
for (label, sql) in payloads {
let dir = tmp_dir();
let log = logger(&dir, 0, 100);
let scope = empty_scope();
log.record(QueryKind::Select, 1, sql.to_string(), &scope);
flush(&log);
let path = dir.join("red-slow.log");
let body = std::fs::read_to_string(&path).unwrap_or_default();
let line = body
.lines()
.find(|l| !l.is_empty())
.unwrap_or_else(|| panic!("{label}: no line emitted"));
assert!(
!line.contains('\n'),
"{label}: embedded newline in JSONL row"
);
let v: crate::json::Value = crate::json::from_str(line)
.unwrap_or_else(|e| panic!("{label}: not valid JSON: {e}\n{line:?}"));
let recovered = v.get("sql").and_then(|x| x.as_str()).unwrap_or("");
assert_eq!(recovered, *sql, "{label}: SQL round-trip mismatch");
}
}
#[test]
fn at_threshold_boundary_emits() {
let dir = tmp_dir();
let log = logger(&dir, 50, 100);
let scope = empty_scope();
log.record(QueryKind::Select, 50, "SELECT 1".into(), &scope);
flush(&log);
let lines = read_log_lines(&dir);
assert_eq!(lines.len(), 1, "duration == threshold should emit");
}
#[test]
fn just_below_threshold_does_not_emit() {
let dir = tmp_dir();
let log = logger(&dir, 50, 100);
let scope = empty_scope();
log.record(QueryKind::Select, 49, "SELECT 1".into(), &scope);
flush(&log);
let lines = read_log_lines(&dir);
assert!(lines.is_empty(), "duration < threshold must not emit");
}
}