use std::any::Any;
use std::fmt;
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
use std::path::{Path, PathBuf};
use serde_json::Value;
use tracing::{debug, trace};
#[derive(Debug, Clone, Default)]
pub struct QueryStats {
pub elapsed_s: f64,
pub commit_time_s: Option<f64>,
pub time_to_schedule_s: Option<f64>,
pub pre_execution: Option<PreExecutionStats>,
pub execution: Option<ExecutionStats>,
pub result_size_mb: Option<f64>,
pub peak_result_buffer_memory_mb: Option<f64>,
pub plan_cache_status: Option<String>,
pub plan_cache_hit_count: Option<u32>,
pub statement_type: Option<String>,
pub rows: Option<u64>,
pub cols: Option<u32>,
pub query_truncated: Option<String>,
}
#[derive(Debug, Clone, Default)]
pub struct PreExecutionStats {
pub parsing_time_s: Option<f64>,
pub compilation_time_s: Option<f64>,
pub elapsed_s: Option<f64>,
pub peak_memory_mb: Option<f64>,
}
#[derive(Debug, Clone, Default)]
pub struct ExecutionStats {
pub elapsed_s: Option<f64>,
pub cpu_time_s: Option<f64>,
pub thread_time_s: Option<f64>,
pub wait_time_s: Option<f64>,
pub processed_rows_total: Option<u64>,
pub processed_rows_native: Option<u64>,
pub storage_access_time_s: Option<f64>,
pub storage_access_count: Option<u64>,
pub storage_access_bytes: Option<u64>,
pub peak_memory_mb: Option<f64>,
}
pub trait QueryStatsProvider: Send + Sync {
fn before_query(&self, sql: &str) -> Box<dyn Any + Send>;
fn after_query(&self, token: Box<dyn Any + Send>, sql: &str) -> Option<QueryStats>;
}
impl fmt::Debug for dyn QueryStatsProvider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<QueryStatsProvider>")
}
}
pub struct LogFileStatsProvider {
log_path: PathBuf,
}
impl fmt::Debug for LogFileStatsProvider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LogFileStatsProvider")
.field("log_path", &self.log_path)
.finish()
}
}
struct LogFileToken {
offset: u64,
}
impl LogFileStatsProvider {
pub fn new(log_path: impl Into<PathBuf>) -> Self {
LogFileStatsProvider {
log_path: log_path.into(),
}
}
#[must_use]
pub fn from_process(process: &crate::HyperProcess) -> Self {
let log_dir = process.log_dir().unwrap_or_else(|| Path::new("."));
LogFileStatsProvider {
log_path: log_dir.join("hyperd.log"),
}
}
#[must_use]
pub fn log_path(&self) -> &Path {
&self.log_path
}
fn current_offset(&self) -> u64 {
std::fs::metadata(&self.log_path).map_or(0, |m| m.len())
}
fn find_query_end(&self, offset: u64, sql: &str) -> Option<QueryStats> {
let file = File::open(&self.log_path).ok()?;
let mut reader = BufReader::new(file);
reader.seek(SeekFrom::Start(offset)).ok()?;
let sql_normalized = normalize_for_matching(sql);
let sql_tokens = extract_match_tokens(&sql_normalized);
let mut prepare_match: Option<QueryStats> = None;
let mut execution_match: Option<QueryStats> = None;
let mut last_entry: Option<QueryStats> = None;
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line) {
Ok(0) => break, Ok(_) => {}
Err(_) => break,
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if !trimmed.contains("\"query-end\"") {
continue;
}
let entry: Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(_) => continue,
};
if entry.get("k").and_then(|k| k.as_str()) != Some("query-end") {
continue;
}
let Some(v) = entry.get("v") else { continue };
let is_prepare = v
.get("statement")
.and_then(|s| s.as_str())
.is_some_and(|s| s == "PREPARE");
let is_prepare_flag = v
.get("prepare-statement")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
let is_prepare = is_prepare || is_prepare_flag;
let matches = if let Some(query_trunc) = v.get("query-trunc").and_then(|q| q.as_str()) {
let log_normalized = normalize_for_matching(query_trunc);
sql_normalized.starts_with(&log_normalized)
|| log_normalized.starts_with(&sql_normalized)
|| (!sql_tokens.is_empty()
&& sql_tokens.iter().all(|t| log_normalized.contains(t)))
} else {
false
};
if matches {
trace!(
target: "hyperdb_api",
query_trunc = v.get("query-trunc").and_then(|q| q.as_str()).unwrap_or(""),
is_prepare,
"query-stats-matched"
);
if is_prepare {
prepare_match = Some(parse_query_end(v));
} else {
execution_match = Some(parse_query_end(v));
}
}
last_entry = Some(parse_query_end(v));
}
execution_match.or(prepare_match).or(last_entry)
}
}
impl QueryStatsProvider for LogFileStatsProvider {
fn before_query(&self, _sql: &str) -> Box<dyn Any + Send> {
let offset = self.current_offset();
trace!(
target: "hyperdb_api",
offset,
path = %self.log_path.display(),
"query-stats-before"
);
Box::new(LogFileToken { offset })
}
fn after_query(&self, token: Box<dyn Any + Send>, sql: &str) -> Option<QueryStats> {
let token = token.downcast::<LogFileToken>().ok()?;
std::thread::sleep(std::time::Duration::from_millis(5));
let stats = self.find_query_end(token.offset, sql);
if stats.is_none() {
debug!(
target: "hyperdb_api",
offset = token.offset,
sql_prefix = &sql[..sql.len().min(80)],
"query-stats-not-found"
);
}
stats
}
}
fn extract_match_tokens(normalized_sql: &str) -> Vec<String> {
const SKIP: &[&str] = &[
"select",
"from",
"where",
"and",
"or",
"not",
"in",
"is",
"null",
"as",
"order",
"by",
"group",
"having",
"limit",
"offset",
"join",
"on",
"left",
"right",
"inner",
"outer",
"cross",
"full",
"union",
"all",
"distinct",
"insert",
"into",
"values",
"update",
"set",
"delete",
"create",
"drop",
"alter",
"table",
"temporary",
"temp",
"if",
"exists",
"index",
"with",
"case",
"when",
"then",
"else",
"end",
"between",
"like",
"cast",
"asc",
"desc",
"true",
"false",
"count",
"sum",
"avg",
"min",
"max",
"text",
"int",
"integer",
"bigint",
"smallint",
"double",
"precision",
"float",
"varchar",
"bool",
"boolean",
"date",
"timestamp",
"interval",
"generate_series",
];
normalized_sql
.split(|c: char| !c.is_alphanumeric() && c != '_')
.filter(|t| t.len() >= 2)
.map(str::to_lowercase)
.filter(|t| !SKIP.contains(&t.as_str()))
.collect()
}
fn normalize_for_matching(sql: &str) -> String {
let mut result = String::with_capacity(sql.len());
let mut prev_was_space = false;
for c in sql.chars() {
if c.is_whitespace() {
if !prev_was_space {
result.push(' ');
prev_was_space = true;
}
} else {
for lc in c.to_lowercase() {
result.push(lc);
}
prev_was_space = false;
}
}
result.trim().to_string()
}
fn parse_query_end(v: &Value) -> QueryStats {
let pre_execution = v.get("pre-execution").map(|pre| PreExecutionStats {
parsing_time_s: pre.get("parsing-time").and_then(serde_json::Value::as_f64),
compilation_time_s: pre
.get("compilation-time")
.and_then(serde_json::Value::as_f64),
elapsed_s: pre.get("elapsed").and_then(serde_json::Value::as_f64),
peak_memory_mb: pre
.get("peak-transaction-memory-mb")
.and_then(serde_json::Value::as_f64),
});
let execution = v.get("execution").map(|exec| {
let (cpu_time_s, thread_time_s, wait_time_s) = if let Some(threads) = exec.get("threads") {
(
threads.get("cpu-time").and_then(serde_json::Value::as_f64),
threads
.get("thread-time")
.and_then(serde_json::Value::as_f64),
threads.get("wait-time").and_then(serde_json::Value::as_f64),
)
} else {
(None, None, None)
};
let (processed_rows_total, processed_rows_native) =
if let Some(rows) = exec.get("processed-rows") {
(
rows.get("total").and_then(serde_json::Value::as_u64),
rows.get("native").and_then(serde_json::Value::as_u64),
)
} else {
(None, None)
};
let (storage_access_time_s, storage_access_count, storage_access_bytes) =
if let Some(storage) = exec.get("storage") {
(
storage
.get("access-time")
.and_then(serde_json::Value::as_f64),
storage
.get("access-count")
.and_then(serde_json::Value::as_u64),
storage
.get("access-bytes")
.and_then(serde_json::Value::as_u64),
)
} else {
(None, None, None)
};
ExecutionStats {
elapsed_s: exec.get("elapsed").and_then(serde_json::Value::as_f64),
peak_memory_mb: exec
.get("peak-transaction-memory-mb")
.and_then(serde_json::Value::as_f64),
cpu_time_s,
thread_time_s,
wait_time_s,
processed_rows_total,
processed_rows_native,
storage_access_time_s,
storage_access_count,
storage_access_bytes,
}
});
QueryStats {
elapsed_s: v
.get("elapsed")
.and_then(serde_json::Value::as_f64)
.unwrap_or(0.0),
commit_time_s: v.get("commit-time").and_then(serde_json::Value::as_f64),
time_to_schedule_s: v
.get("time-to-schedule")
.and_then(serde_json::Value::as_f64),
result_size_mb: v.get("result-size-mb").and_then(serde_json::Value::as_f64),
peak_result_buffer_memory_mb: v
.get("peak-result-buffer-memory-mb")
.and_then(serde_json::Value::as_f64),
plan_cache_status: v
.get("plan-cache-status")
.and_then(|v| v.as_str())
.map(std::string::ToString::to_string),
plan_cache_hit_count: v
.get("plan-cache-hit-count")
.and_then(serde_json::Value::as_u64)
.and_then(|n| u32::try_from(n).ok()),
statement_type: v
.get("statement")
.and_then(|v| v.as_str())
.map(std::string::ToString::to_string),
rows: v.get("rows").and_then(serde_json::Value::as_u64),
cols: v
.get("cols")
.and_then(serde_json::Value::as_u64)
.and_then(|n| u32::try_from(n).ok()),
query_truncated: v
.get("query-trunc")
.and_then(|v| v.as_str())
.map(std::string::ToString::to_string),
pre_execution,
execution,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_normalize_for_matching() {
assert_eq!(
normalize_for_matching("SELECT * FROM\n test"),
"select * from test"
);
assert_eq!(normalize_for_matching(" Hello World "), "hello world");
}
#[test]
fn test_parse_query_end_full() {
let json = r#"{
"elapsed": 0.0299386,
"commit-time": 1.666e-06,
"time-to-schedule": 3.6208e-05,
"result-size-mb": 0.00053978,
"plan-cache-status": "cache miss",
"plan-cache-hit-count": 0,
"statement": "SELECT",
"rows": 42,
"cols": 3,
"query-trunc": "SELECT * FROM test",
"pre-execution": {
"parsing-time": 1.75e-05,
"compilation-time": 1.4542e-05,
"elapsed": 2.45e-05,
"peak-transaction-memory-mb": 0.5
},
"execution": {
"elapsed": 0.0293959,
"peak-transaction-memory-mb": 1.25,
"threads": {
"thread-time": 0.0293959,
"cpu-time": 0.029353,
"wait-time": 0.0001
},
"processed-rows": {
"total": 1000,
"native": 1000
},
"storage": {
"access-time": 0.000529375,
"access-count": 11,
"access-bytes": 148979
}
}
}"#;
let v: Value = serde_json::from_str(json).unwrap();
let stats = parse_query_end(&v);
assert!((stats.elapsed_s - 0.0299386).abs() < 1e-10);
assert_eq!(stats.plan_cache_status, Some("cache miss".to_string()));
assert_eq!(stats.rows, Some(42));
assert_eq!(stats.cols, Some(3));
let pre = stats.pre_execution.unwrap();
assert!((pre.parsing_time_s.unwrap() - 1.75e-05).abs() < 1e-15);
assert!((pre.compilation_time_s.unwrap() - 1.4542e-05).abs() < 1e-15);
assert!((pre.peak_memory_mb.unwrap() - 0.5).abs() < 1e-10);
let exec = stats.execution.unwrap();
assert!((exec.elapsed_s.unwrap() - 0.0293959).abs() < 1e-10);
assert_eq!(exec.processed_rows_total, Some(1000));
assert_eq!(exec.storage_access_count, Some(11));
assert_eq!(exec.storage_access_bytes, Some(148979));
}
#[test]
fn test_parse_query_end_minimal() {
let json = r#"{"elapsed": 0.001}"#;
let v: Value = serde_json::from_str(json).unwrap();
let stats = parse_query_end(&v);
assert!((stats.elapsed_s - 0.001).abs() < 1e-10);
assert!(stats.pre_execution.is_none());
assert!(stats.execution.is_none());
assert!(stats.plan_cache_status.is_none());
}
}