use crate::residual::{plan_regression, ResidualStream};
use anyhow::{Context, Result};
use serde::Deserialize;
use std::collections::HashMap;
use std::path::Path;
const BASELINE_WINDOW: usize = 3;
const MAX_STATEMENT_HASHES: usize = 1_000_000;
const MAX_SPANS: usize = 100_000_000;
const MIN_DURATION_MS: f64 = 1e-6;
#[derive(Debug, Deserialize)]
pub struct DbSpan {
pub t_start_ns: i128,
pub t_end_ns: i128,
pub statement_hash: String,
#[serde(default)]
pub db_system: Option<String>,
}
pub fn load_otel_db_spans(path: &Path) -> Result<ResidualStream> {
let text = std::fs::read_to_string(path)
.with_context(|| format!("reading OTel DB-spans JSON at {}", path.display()))?;
let spans: Vec<DbSpan> = serde_json::from_str(&text)
.with_context(|| format!("parsing OTel DB-spans JSON at {}", path.display()))?;
debug_assert!(spans.len() <= MAX_SPANS, "span-count bound enforced");
if spans.len() > MAX_SPANS {
anyhow::bail!(
"OTel DB-spans JSON at {} exceeds {} span bound",
path.display(),
MAX_SPANS
);
}
let mut by_stmt: HashMap<String, Vec<(f64, f64)>> = HashMap::new();
let mut first_system: Option<String> = None;
let mut t0_ns: Option<i128> = None;
for s in &spans {
let dur_ns = s.t_end_ns - s.t_start_ns;
if dur_ns <= 0 {
continue;
}
let dur_ms = dur_ns as f64 / 1e6;
if !(dur_ms >= MIN_DURATION_MS && dur_ms.is_finite()) {
continue;
}
let t_rel_s = match t0_ns {
Some(t0) => (s.t_start_ns - t0) as f64 / 1e9,
None => {
t0_ns = Some(s.t_start_ns);
0.0
}
};
if first_system.is_none() {
first_system.clone_from(&s.db_system);
}
let entries = by_stmt.entry(s.statement_hash.clone()).or_default();
entries.push((t_rel_s, dur_ms));
debug_assert!(by_stmt.len() <= MAX_STATEMENT_HASHES, "qid bound enforced");
}
if by_stmt.len() > MAX_STATEMENT_HASHES {
anyhow::bail!(
"OTel DB-spans JSON at {} has >{} distinct statement hashes",
path.display(),
MAX_STATEMENT_HASHES
);
}
for v in by_stmt.values_mut() {
v.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));
}
let source = match first_system.as_deref() {
Some(sys) => format!(
"otel-db-spans@{}[system={}]",
path.file_name().and_then(|n| n.to_str()).unwrap_or("?"),
sys
),
None => format!(
"otel-db-spans@{}",
path.file_name().and_then(|n| n.to_str()).unwrap_or("?")
),
};
let mut stream = ResidualStream::new(source);
let mut hashes: Vec<&String> = by_stmt.keys().collect();
hashes.sort();
for stmt in hashes {
let entries = &by_stmt[stmt];
if entries.len() <= BASELINE_WINDOW {
continue;
}
let baseline_ms: f64 = entries
.iter()
.take(BASELINE_WINDOW)
.map(|(_, d)| *d)
.sum::<f64>()
/ BASELINE_WINDOW as f64;
debug_assert!(baseline_ms.is_finite() && baseline_ms > 0.0);
for (i, (t_rel, dur_ms)) in entries.iter().enumerate() {
if i < BASELINE_WINDOW {
continue;
}
plan_regression::push_latency(&mut stream, *t_rel, stmt, *dur_ms, baseline_ms);
}
}
stream.sort();
Ok(stream)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
fn write_fixture(spans: &[DbSpan]) -> tempfile::NamedTempFile {
let mut tmp = tempfile::NamedTempFile::new().unwrap();
writeln!(tmp, "[").unwrap();
for (i, s) in spans.iter().enumerate() {
let sep = if i + 1 < spans.len() { "," } else { "" };
let sys = s
.db_system
.as_deref()
.map(|x| format!(", \"db_system\":\"{}\"", x))
.unwrap_or_default();
writeln!(
tmp,
" {{\"t_start_ns\":{},\"t_end_ns\":{},\"statement_hash\":\"{}\"{}}}{}",
s.t_start_ns, s.t_end_ns, s.statement_hash, sys, sep
)
.unwrap();
}
writeln!(tmp, "]").unwrap();
tmp.flush().unwrap();
tmp
}
#[test]
fn empty_span_list_produces_empty_stream() {
let tmp = write_fixture(&[]);
let stream = load_otel_db_spans(tmp.path()).unwrap();
assert!(stream.samples.is_empty());
}
#[test]
fn plan_regression_residuals_emit_after_baseline_window() {
let mut spans = Vec::new();
let t0 = 1_700_000_000_000_000_000_i128;
for i in 0..10 {
let start = t0 + (i as i128) * 1_000_000_000;
let dur_ns = if i < 3 { 10_000_000 } else { 30_000_000 };
spans.push(DbSpan {
t_start_ns: start,
t_end_ns: start + dur_ns,
statement_hash: "hot_stmt".to_string(),
db_system: Some("postgresql".to_string()),
});
}
let tmp = write_fixture(&spans);
let stream = load_otel_db_spans(tmp.path()).unwrap();
assert_eq!(stream.samples.len(), 7);
assert!(stream.source.contains("system=postgresql"));
}
#[test]
fn non_positive_duration_spans_are_dropped() {
let t0 = 1_700_000_000_000_000_000_i128;
let spans = vec![
DbSpan {
t_start_ns: t0,
t_end_ns: t0 + 10_000_000,
statement_hash: "s".into(),
db_system: None,
},
DbSpan {
t_start_ns: t0 + 1_000_000_000,
t_end_ns: t0 + 1_000_000_000 + 10_000_000,
statement_hash: "s".into(),
db_system: None,
},
DbSpan {
t_start_ns: t0 + 2_000_000_000,
t_end_ns: t0 + 2_000_000_000 + 10_000_000,
statement_hash: "s".into(),
db_system: None,
},
DbSpan {
t_start_ns: t0 + 3_000_000_000,
t_end_ns: t0 + 3_000_000_000,
statement_hash: "s".into(),
db_system: None,
},
DbSpan {
t_start_ns: t0 + 4_000_000_000,
t_end_ns: t0 + 3_999_999_999,
statement_hash: "s".into(),
db_system: None,
},
];
let tmp = write_fixture(&spans);
let stream = load_otel_db_spans(tmp.path()).unwrap();
assert_eq!(stream.samples.len(), 0);
}
#[test]
fn per_statement_baselines_are_independent() {
let t0 = 1_700_000_000_000_000_000_i128;
let mut spans = Vec::new();
for i in 0..4 {
spans.push(DbSpan {
t_start_ns: t0 + (i as i128) * 1_000_000_000,
t_end_ns: t0 + (i as i128) * 1_000_000_000 + 20_000_000,
statement_hash: "a".into(),
db_system: None,
});
}
for i in 0..4 {
let dur = if i < 3 { 10_000_000 } else { 100_000_000 };
spans.push(DbSpan {
t_start_ns: t0 + (i as i128) * 1_000_000_000 + 500_000_000,
t_end_ns: t0 + (i as i128) * 1_000_000_000 + 500_000_000 + dur,
statement_hash: "b".into(),
db_system: None,
});
}
let tmp = write_fixture(&spans);
let stream = load_otel_db_spans(tmp.path()).unwrap();
assert_eq!(stream.samples.len(), 2);
let max_abs = stream
.samples
.iter()
.map(|s| s.value.abs())
.fold(0.0_f64, f64::max);
assert!(max_abs > 1.0, "max |residual| was {}", max_abs);
}
}