hyperdb_api/query_stats.rs
1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Query statistics collection for Hyper database queries.
5//!
6//! This module provides a mechanism to capture detailed query performance metrics
7//! from Hyper, including parsing time, compilation time, execution time, memory usage,
8//! storage I/O, and plan cache status.
9//!
10//! # Architecture
11//!
12//! The stats collection is abstracted behind the [`QueryStatsProvider`] trait, allowing
13//! the implementation to be swapped without changing the user-facing API. Currently,
14//! [`LogFileStatsProvider`] parses Hyper's JSON log file (`hyperd.log`) to extract
15//! per-query statistics. If Hyper adds native wire-protocol stats in the future, a new
16//! provider can replace the log-based one transparently.
17//!
18//! # Usage
19//!
20//! ```no_run
21//! use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
22//! use hyperdb_api::LogFileStatsProvider;
23//!
24//! fn main() -> Result<()> {
25//! let hyper = HyperProcess::new(None, None)?;
26//! let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
27//!
28//! // Enable stats collection (auto-detect log path from HyperProcess)
29//! conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
30//!
31//! // Execute a query
32//! conn.execute_command("CREATE TABLE t (id INT)")?;
33//!
34//! // Retrieve stats for the last query
35//! if let Some(stats) = conn.last_query_stats() {
36//! println!("Total elapsed: {}s", stats.elapsed_s);
37//! if let Some(ref pre) = stats.pre_execution {
38//! println!(" Parse: {:?}s, Compile: {:?}s",
39//! pre.parsing_time_s, pre.compilation_time_s);
40//! }
41//! }
42//!
43//! Ok(())
44//! }
45//! ```
46//!
47//! # Availability
48//!
49//! - **Local `HyperProcess`**: Full stats available via log file parsing.
50//! - **Remote standalone Hyper**: Not available with the log-based provider (no local log file).
51//! When Hyper adds native stats support, remote connections will work via a new provider.
52
53use std::any::Any;
54use std::fmt;
55use std::fs::File;
56use std::io::{BufRead, BufReader, Seek, SeekFrom};
57use std::path::{Path, PathBuf};
58
59use serde_json::Value;
60use tracing::{debug, trace};
61
62// =============================================================================
63// Data Model
64// =============================================================================
65
66/// Detailed statistics for a single query execution.
67///
68/// All time fields are in seconds. Memory fields are in megabytes.
69/// Fields are `Option` because not all queries produce all stats (e.g., a simple
70/// `SET` command won't have execution storage stats).
71#[derive(Debug, Clone, Default)]
72pub struct QueryStats {
73 /// Total elapsed wall-clock time for the query (seconds).
74 pub elapsed_s: f64,
75 /// Time spent committing the transaction (seconds).
76 pub commit_time_s: Option<f64>,
77 /// Time waiting to be scheduled on a worker thread (seconds).
78 pub time_to_schedule_s: Option<f64>,
79 /// Pre-execution phase stats (parsing, compilation).
80 pub pre_execution: Option<PreExecutionStats>,
81 /// Execution phase stats (runtime, CPU, storage I/O).
82 pub execution: Option<ExecutionStats>,
83 /// Size of the result set sent to the client (MB).
84 pub result_size_mb: Option<f64>,
85 /// Peak memory used by the result buffer (MB).
86 pub peak_result_buffer_memory_mb: Option<f64>,
87 /// Plan cache status: "cache miss", "cache hit", "not run yet", etc.
88 pub plan_cache_status: Option<String>,
89 /// Number of times the cached plan was reused.
90 pub plan_cache_hit_count: Option<u32>,
91 /// Statement type: "SELECT", "INSERT", "SET", "ATTACH", "PREPARE", etc.
92 pub statement_type: Option<String>,
93 /// Number of result rows.
94 pub rows: Option<u64>,
95 /// Number of result columns.
96 pub cols: Option<u32>,
97 /// Truncated query text (as logged by Hyper).
98 pub query_truncated: Option<String>,
99}
100
101/// Pre-execution phase statistics (parsing and compilation).
102#[derive(Debug, Clone, Default)]
103pub struct PreExecutionStats {
104 /// Time spent parsing the SQL (seconds).
105 pub parsing_time_s: Option<f64>,
106 /// Time spent compiling the query plan (seconds).
107 pub compilation_time_s: Option<f64>,
108 /// Total pre-execution elapsed time (seconds).
109 pub elapsed_s: Option<f64>,
110 /// Peak transaction memory during pre-execution (MB).
111 pub peak_memory_mb: Option<f64>,
112}
113
114/// Execution phase statistics (runtime performance).
115#[derive(Debug, Clone, Default)]
116pub struct ExecutionStats {
117 /// Total execution elapsed time (seconds).
118 pub elapsed_s: Option<f64>,
119 /// CPU time consumed (seconds).
120 pub cpu_time_s: Option<f64>,
121 /// Total thread time (seconds).
122 pub thread_time_s: Option<f64>,
123 /// Time spent waiting (seconds).
124 pub wait_time_s: Option<f64>,
125 /// Total rows processed (including federated sources).
126 pub processed_rows_total: Option<u64>,
127 /// Rows processed from native (local) storage.
128 pub processed_rows_native: Option<u64>,
129 /// Time spent on storage access (seconds).
130 pub storage_access_time_s: Option<f64>,
131 /// Number of storage access operations.
132 pub storage_access_count: Option<u64>,
133 /// Bytes read from storage.
134 pub storage_access_bytes: Option<u64>,
135 /// Peak transaction memory during execution (MB).
136 pub peak_memory_mb: Option<f64>,
137}
138
139// =============================================================================
140// Provider Trait
141// =============================================================================
142
143/// Trait for collecting query statistics from a Hyper server.
144///
145/// Implementations capture stats using different mechanisms (log file parsing,
146/// future native protocol support, etc.). The trait uses an opaque token pattern:
147/// `before_query` is called before execution and returns a token (e.g., a file
148/// offset), which is passed to `after_query` after execution to extract the stats.
149///
150/// # Thread Safety
151///
152/// Providers must be `Send + Sync` since they may be shared across connections.
153pub trait QueryStatsProvider: Send + Sync {
154 /// Called before a query is executed.
155 ///
156 /// Returns an opaque token that will be passed to [`after_query`](Self::after_query).
157 /// For the log-based provider, this is the current file offset.
158 fn before_query(&self, sql: &str) -> Box<dyn Any + Send>;
159
160 /// Called after a query completes.
161 ///
162 /// Uses the token from [`before_query`](Self::before_query) to locate and
163 /// extract the query statistics. Returns `None` if stats could not be found.
164 fn after_query(&self, token: Box<dyn Any + Send>, sql: &str) -> Option<QueryStats>;
165}
166
167impl fmt::Debug for dyn QueryStatsProvider {
168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169 f.write_str("<QueryStatsProvider>")
170 }
171}
172
173// =============================================================================
174// LogFileStatsProvider
175// =============================================================================
176
177/// A [`QueryStatsProvider`] that extracts stats by parsing Hyper's JSON log file.
178///
179/// Hyper writes detailed `query-end` log entries to `hyperd.log` in JSON-per-line
180/// format. This provider:
181/// 1. Records the file offset before each query
182/// 2. After the query, reads new log entries from that offset
183/// 3. Finds the matching `query-end` entry by query text prefix
184/// 4. Parses the JSON stats into a [`QueryStats`] struct
185///
186/// # Example
187///
188/// ```no_run
189/// use hyperdb_api::LogFileStatsProvider;
190///
191/// // From an explicit path
192/// let provider = LogFileStatsProvider::new("/path/to/hyperd.log");
193///
194/// // From a HyperProcess (auto-detects log path)
195/// // let provider = LogFileStatsProvider::from_process(&hyper);
196/// ```
197pub struct LogFileStatsProvider {
198 log_path: PathBuf,
199}
200
201impl fmt::Debug for LogFileStatsProvider {
202 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203 f.debug_struct("LogFileStatsProvider")
204 .field("log_path", &self.log_path)
205 .finish()
206 }
207}
208
209/// Opaque token storing the log file offset before a query.
210struct LogFileToken {
211 offset: u64,
212}
213
214impl LogFileStatsProvider {
215 /// Creates a new provider that reads from the given log file path.
216 ///
217 /// The path should point to the `hyperd.log` file written by the Hyper server.
218 pub fn new(log_path: impl Into<PathBuf>) -> Self {
219 LogFileStatsProvider {
220 log_path: log_path.into(),
221 }
222 }
223
224 /// Creates a new provider by auto-detecting the log path from a [`HyperProcess`].
225 ///
226 /// The log file is expected at `<log_dir>/hyperd.log`.
227 ///
228 /// [`HyperProcess`]: crate::HyperProcess
229 #[must_use]
230 pub fn from_process(process: &crate::HyperProcess) -> Self {
231 let log_dir = process.log_dir().unwrap_or_else(|| Path::new("."));
232 LogFileStatsProvider {
233 log_path: log_dir.join("hyperd.log"),
234 }
235 }
236
237 /// Returns the log file path this provider reads from.
238 #[must_use]
239 pub fn log_path(&self) -> &Path {
240 &self.log_path
241 }
242
243 /// Gets the current file size (used as offset marker).
244 fn current_offset(&self) -> u64 {
245 std::fs::metadata(&self.log_path).map_or(0, |m| m.len())
246 }
247
248 /// Reads new log entries since `offset` and finds a matching `query-end` entry.
249 ///
250 /// Hyper's extended query protocol (used by `query_streaming`) produces two
251 /// `query-end` entries per query:
252 /// 1. A PREPARE entry with the original SQL text but `rows=0` and minimal stats.
253 /// 2. An execution entry with the fully-qualified (schema-prefixed) SQL and
254 /// the real rows, memory, storage I/O, and timing stats.
255 ///
256 /// We prefer execution entries (non-PREPARE) over PREPARE entries, and use
257 /// keyword-based matching to handle Hyper's query rewriting.
258 fn find_query_end(&self, offset: u64, sql: &str) -> Option<QueryStats> {
259 let file = File::open(&self.log_path).ok()?;
260 let mut reader = BufReader::new(file);
261 reader.seek(SeekFrom::Start(offset)).ok()?;
262
263 // Normalize the SQL for matching against the truncated log entry
264 let sql_normalized = normalize_for_matching(sql);
265 // Extract significant keywords/identifiers for fuzzy matching
266 let sql_tokens = extract_match_tokens(&sql_normalized);
267
268 // Collect all query-end entries, then pick the best one
269 let mut prepare_match: Option<QueryStats> = None;
270 let mut execution_match: Option<QueryStats> = None;
271 let mut last_entry: Option<QueryStats> = None;
272
273 let mut line = String::new();
274 loop {
275 line.clear();
276 match reader.read_line(&mut line) {
277 Ok(0) => break, // EOF
278 Ok(_) => {}
279 Err(_) => break,
280 }
281
282 let trimmed = line.trim();
283 if trimmed.is_empty() {
284 continue;
285 }
286
287 // Fast check before JSON parsing
288 if !trimmed.contains("\"query-end\"") {
289 continue;
290 }
291
292 let entry: Value = match serde_json::from_str(trimmed) {
293 Ok(v) => v,
294 Err(_) => continue,
295 };
296
297 // Verify this is a query-end entry
298 if entry.get("k").and_then(|k| k.as_str()) != Some("query-end") {
299 continue;
300 }
301
302 let Some(v) = entry.get("v") else { continue };
303
304 let is_prepare = v
305 .get("statement")
306 .and_then(|s| s.as_str())
307 .is_some_and(|s| s == "PREPARE");
308
309 let is_prepare_flag = v
310 .get("prepare-statement")
311 .and_then(serde_json::Value::as_bool)
312 .unwrap_or(false);
313
314 let is_prepare = is_prepare || is_prepare_flag;
315
316 // Check if this entry matches our query
317 let matches = if let Some(query_trunc) = v.get("query-trunc").and_then(|q| q.as_str()) {
318 let log_normalized = normalize_for_matching(query_trunc);
319 // Direct prefix match (either direction)
320 sql_normalized.starts_with(&log_normalized)
321 || log_normalized.starts_with(&sql_normalized)
322 // Token-based match: all significant tokens from our SQL appear in the log entry
323 || (!sql_tokens.is_empty()
324 && sql_tokens.iter().all(|t| log_normalized.contains(t)))
325 } else {
326 false
327 };
328
329 if matches {
330 trace!(
331 target: "hyperdb_api",
332 query_trunc = v.get("query-trunc").and_then(|q| q.as_str()).unwrap_or(""),
333 is_prepare,
334 "query-stats-matched"
335 );
336 if is_prepare {
337 prepare_match = Some(parse_query_end(v));
338 } else {
339 execution_match = Some(parse_query_end(v));
340 }
341 }
342
343 // Always track the last entry as a fallback
344 last_entry = Some(parse_query_end(v));
345 }
346
347 // Prefer: execution match > prepare match > last entry (for SET/ATTACH etc.)
348 execution_match.or(prepare_match).or(last_entry)
349 }
350}
351
352impl QueryStatsProvider for LogFileStatsProvider {
353 fn before_query(&self, _sql: &str) -> Box<dyn Any + Send> {
354 let offset = self.current_offset();
355 trace!(
356 target: "hyperdb_api",
357 offset,
358 path = %self.log_path.display(),
359 "query-stats-before"
360 );
361 Box::new(LogFileToken { offset })
362 }
363
364 fn after_query(&self, token: Box<dyn Any + Send>, sql: &str) -> Option<QueryStats> {
365 let token = token.downcast::<LogFileToken>().ok()?;
366
367 // Small delay to allow the OS to flush Hyper's log write to disk.
368 // Hyper logs query-end before sending the result, so by the time the
369 // client has consumed all result data the entry should be written,
370 // but OS-level I/O buffering may introduce a small delay.
371 std::thread::sleep(std::time::Duration::from_millis(5));
372
373 let stats = self.find_query_end(token.offset, sql);
374
375 if stats.is_none() {
376 debug!(
377 target: "hyperdb_api",
378 offset = token.offset,
379 sql_prefix = &sql[..sql.len().min(80)],
380 "query-stats-not-found"
381 );
382 }
383
384 stats
385 }
386}
387
388// =============================================================================
389// JSON Parsing Helpers
390// =============================================================================
391
392/// Extracts significant tokens (table names, column names, keywords) from
393/// normalized SQL for fuzzy matching against Hyper's rewritten queries.
394///
395/// Hyper rewrites queries with schema prefixes (e.g., `employees` becomes
396/// `"reading_data"."public"."employees"`), so we extract user-written
397/// identifiers and keywords to match against the rewritten form.
398fn extract_match_tokens(normalized_sql: &str) -> Vec<String> {
399 // SQL keywords to skip — these appear in every query and aren't useful for matching
400 const SKIP: &[&str] = &[
401 "select",
402 "from",
403 "where",
404 "and",
405 "or",
406 "not",
407 "in",
408 "is",
409 "null",
410 "as",
411 "order",
412 "by",
413 "group",
414 "having",
415 "limit",
416 "offset",
417 "join",
418 "on",
419 "left",
420 "right",
421 "inner",
422 "outer",
423 "cross",
424 "full",
425 "union",
426 "all",
427 "distinct",
428 "insert",
429 "into",
430 "values",
431 "update",
432 "set",
433 "delete",
434 "create",
435 "drop",
436 "alter",
437 "table",
438 "temporary",
439 "temp",
440 "if",
441 "exists",
442 "index",
443 "with",
444 "case",
445 "when",
446 "then",
447 "else",
448 "end",
449 "between",
450 "like",
451 "cast",
452 "asc",
453 "desc",
454 "true",
455 "false",
456 "count",
457 "sum",
458 "avg",
459 "min",
460 "max",
461 "text",
462 "int",
463 "integer",
464 "bigint",
465 "smallint",
466 "double",
467 "precision",
468 "float",
469 "varchar",
470 "bool",
471 "boolean",
472 "date",
473 "timestamp",
474 "interval",
475 "generate_series",
476 ];
477
478 normalized_sql
479 .split(|c: char| !c.is_alphanumeric() && c != '_')
480 .filter(|t| t.len() >= 2)
481 .map(str::to_lowercase)
482 .filter(|t| !SKIP.contains(&t.as_str()))
483 .collect()
484}
485
486/// Normalizes SQL text for matching against truncated log entries.
487///
488/// Lowercases and collapses whitespace so that `SELECT * FROM Test` matches
489/// `select * from "db"."public"."test"`.
490fn normalize_for_matching(sql: &str) -> String {
491 let mut result = String::with_capacity(sql.len());
492 let mut prev_was_space = false;
493 for c in sql.chars() {
494 if c.is_whitespace() {
495 if !prev_was_space {
496 result.push(' ');
497 prev_was_space = true;
498 }
499 } else {
500 // Lowercase for case-insensitive matching
501 for lc in c.to_lowercase() {
502 result.push(lc);
503 }
504 prev_was_space = false;
505 }
506 }
507 result.trim().to_string()
508}
509
510/// Parses a `query-end` log entry's `v` (value) object into a `QueryStats`.
511fn parse_query_end(v: &Value) -> QueryStats {
512 // Pre-execution stats
513 let pre_execution = v.get("pre-execution").map(|pre| PreExecutionStats {
514 parsing_time_s: pre.get("parsing-time").and_then(serde_json::Value::as_f64),
515 compilation_time_s: pre
516 .get("compilation-time")
517 .and_then(serde_json::Value::as_f64),
518 elapsed_s: pre.get("elapsed").and_then(serde_json::Value::as_f64),
519 peak_memory_mb: pre
520 .get("peak-transaction-memory-mb")
521 .and_then(serde_json::Value::as_f64),
522 });
523
524 // Execution stats
525 let execution = v.get("execution").map(|exec| {
526 let (cpu_time_s, thread_time_s, wait_time_s) = if let Some(threads) = exec.get("threads") {
527 (
528 threads.get("cpu-time").and_then(serde_json::Value::as_f64),
529 threads
530 .get("thread-time")
531 .and_then(serde_json::Value::as_f64),
532 threads.get("wait-time").and_then(serde_json::Value::as_f64),
533 )
534 } else {
535 (None, None, None)
536 };
537
538 let (processed_rows_total, processed_rows_native) =
539 if let Some(rows) = exec.get("processed-rows") {
540 (
541 rows.get("total").and_then(serde_json::Value::as_u64),
542 rows.get("native").and_then(serde_json::Value::as_u64),
543 )
544 } else {
545 (None, None)
546 };
547
548 let (storage_access_time_s, storage_access_count, storage_access_bytes) =
549 if let Some(storage) = exec.get("storage") {
550 (
551 storage
552 .get("access-time")
553 .and_then(serde_json::Value::as_f64),
554 storage
555 .get("access-count")
556 .and_then(serde_json::Value::as_u64),
557 storage
558 .get("access-bytes")
559 .and_then(serde_json::Value::as_u64),
560 )
561 } else {
562 (None, None, None)
563 };
564
565 ExecutionStats {
566 elapsed_s: exec.get("elapsed").and_then(serde_json::Value::as_f64),
567 peak_memory_mb: exec
568 .get("peak-transaction-memory-mb")
569 .and_then(serde_json::Value::as_f64),
570 cpu_time_s,
571 thread_time_s,
572 wait_time_s,
573 processed_rows_total,
574 processed_rows_native,
575 storage_access_time_s,
576 storage_access_count,
577 storage_access_bytes,
578 }
579 });
580
581 QueryStats {
582 elapsed_s: v
583 .get("elapsed")
584 .and_then(serde_json::Value::as_f64)
585 .unwrap_or(0.0),
586 commit_time_s: v.get("commit-time").and_then(serde_json::Value::as_f64),
587 time_to_schedule_s: v
588 .get("time-to-schedule")
589 .and_then(serde_json::Value::as_f64),
590 result_size_mb: v.get("result-size-mb").and_then(serde_json::Value::as_f64),
591 peak_result_buffer_memory_mb: v
592 .get("peak-result-buffer-memory-mb")
593 .and_then(serde_json::Value::as_f64),
594 plan_cache_status: v
595 .get("plan-cache-status")
596 .and_then(|v| v.as_str())
597 .map(std::string::ToString::to_string),
598 plan_cache_hit_count: v
599 .get("plan-cache-hit-count")
600 .and_then(serde_json::Value::as_u64)
601 .and_then(|n| u32::try_from(n).ok()),
602 statement_type: v
603 .get("statement")
604 .and_then(|v| v.as_str())
605 .map(std::string::ToString::to_string),
606 rows: v.get("rows").and_then(serde_json::Value::as_u64),
607 cols: v
608 .get("cols")
609 .and_then(serde_json::Value::as_u64)
610 .and_then(|n| u32::try_from(n).ok()),
611 query_truncated: v
612 .get("query-trunc")
613 .and_then(|v| v.as_str())
614 .map(std::string::ToString::to_string),
615 pre_execution,
616 execution,
617 }
618}
619
620#[cfg(test)]
621mod tests {
622 use super::*;
623
624 #[test]
625 fn test_normalize_for_matching() {
626 assert_eq!(
627 normalize_for_matching("SELECT * FROM\n test"),
628 "select * from test"
629 );
630 assert_eq!(normalize_for_matching(" Hello World "), "hello world");
631 }
632
633 #[test]
634 fn test_parse_query_end_full() {
635 let json = r#"{
636 "elapsed": 0.0299386,
637 "commit-time": 1.666e-06,
638 "time-to-schedule": 3.6208e-05,
639 "result-size-mb": 0.00053978,
640 "plan-cache-status": "cache miss",
641 "plan-cache-hit-count": 0,
642 "statement": "SELECT",
643 "rows": 42,
644 "cols": 3,
645 "query-trunc": "SELECT * FROM test",
646 "pre-execution": {
647 "parsing-time": 1.75e-05,
648 "compilation-time": 1.4542e-05,
649 "elapsed": 2.45e-05,
650 "peak-transaction-memory-mb": 0.5
651 },
652 "execution": {
653 "elapsed": 0.0293959,
654 "peak-transaction-memory-mb": 1.25,
655 "threads": {
656 "thread-time": 0.0293959,
657 "cpu-time": 0.029353,
658 "wait-time": 0.0001
659 },
660 "processed-rows": {
661 "total": 1000,
662 "native": 1000
663 },
664 "storage": {
665 "access-time": 0.000529375,
666 "access-count": 11,
667 "access-bytes": 148979
668 }
669 }
670 }"#;
671
672 let v: Value = serde_json::from_str(json).unwrap();
673 let stats = parse_query_end(&v);
674
675 assert!((stats.elapsed_s - 0.0299386).abs() < 1e-10);
676 assert_eq!(stats.plan_cache_status, Some("cache miss".to_string()));
677 assert_eq!(stats.rows, Some(42));
678 assert_eq!(stats.cols, Some(3));
679
680 let pre = stats.pre_execution.unwrap();
681 assert!((pre.parsing_time_s.unwrap() - 1.75e-05).abs() < 1e-15);
682 assert!((pre.compilation_time_s.unwrap() - 1.4542e-05).abs() < 1e-15);
683 assert!((pre.peak_memory_mb.unwrap() - 0.5).abs() < 1e-10);
684
685 let exec = stats.execution.unwrap();
686 assert!((exec.elapsed_s.unwrap() - 0.0293959).abs() < 1e-10);
687 assert_eq!(exec.processed_rows_total, Some(1000));
688 assert_eq!(exec.storage_access_count, Some(11));
689 assert_eq!(exec.storage_access_bytes, Some(148979));
690 }
691
692 #[test]
693 fn test_parse_query_end_minimal() {
694 let json = r#"{"elapsed": 0.001}"#;
695 let v: Value = serde_json::from_str(json).unwrap();
696 let stats = parse_query_end(&v);
697
698 assert!((stats.elapsed_s - 0.001).abs() < 1e-10);
699 assert!(stats.pre_execution.is_none());
700 assert!(stats.execution.is_none());
701 assert!(stats.plan_cache_status.is_none());
702 }
703}