Skip to main content

hyperdb_api/
query_stats.rs

1// Copyright (c) 2026, Salesforce, Inc. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! Query statistics collection for Hyper database queries.
5//!
6//! This module provides a mechanism to capture detailed query performance metrics
7//! from Hyper, including parsing time, compilation time, execution time, memory usage,
8//! storage I/O, and plan cache status.
9//!
10//! # Architecture
11//!
12//! The stats collection is abstracted behind the [`QueryStatsProvider`] trait, allowing
13//! the implementation to be swapped without changing the user-facing API. Currently,
14//! [`LogFileStatsProvider`] parses Hyper's JSON log file (`hyperd.log`) to extract
15//! per-query statistics. If Hyper adds native wire-protocol stats in the future, a new
16//! provider can replace the log-based one transparently.
17//!
18//! # Usage
19//!
20//! ```no_run
21//! use hyperdb_api::{Connection, CreateMode, HyperProcess, Result};
22//! use hyperdb_api::LogFileStatsProvider;
23//!
24//! fn main() -> Result<()> {
25//!     let hyper = HyperProcess::new(None, None)?;
26//!     let mut conn = Connection::new(&hyper, "test.hyper", CreateMode::CreateIfNotExists)?;
27//!
28//!     // Enable stats collection (auto-detect log path from HyperProcess)
29//!     conn.enable_query_stats(LogFileStatsProvider::from_process(&hyper));
30//!
31//!     // Execute a query
32//!     conn.execute_command("CREATE TABLE t (id INT)")?;
33//!
34//!     // Retrieve stats for the last query
35//!     if let Some(stats) = conn.last_query_stats() {
36//!         println!("Total elapsed: {}s", stats.elapsed_s);
37//!         if let Some(ref pre) = stats.pre_execution {
38//!             println!("  Parse: {:?}s, Compile: {:?}s",
39//!                 pre.parsing_time_s, pre.compilation_time_s);
40//!         }
41//!     }
42//!
43//!     Ok(())
44//! }
45//! ```
46//!
47//! # Availability
48//!
49//! - **Local `HyperProcess`**: Full stats available via log file parsing.
50//! - **Remote standalone Hyper**: Not available with the log-based provider (no local log file).
51//!   When Hyper adds native stats support, remote connections will work via a new provider.
52
53use std::any::Any;
54use std::fmt;
55use std::fs::File;
56use std::io::{BufRead, BufReader, Seek, SeekFrom};
57use std::path::{Path, PathBuf};
58
59use serde_json::Value;
60use tracing::{debug, trace};
61
62// =============================================================================
63// Data Model
64// =============================================================================
65
66/// Detailed statistics for a single query execution.
67///
68/// All time fields are in seconds. Memory fields are in megabytes.
69/// Fields are `Option` because not all queries produce all stats (e.g., a simple
70/// `SET` command won't have execution storage stats).
71#[derive(Debug, Clone, Default)]
72pub struct QueryStats {
73    /// Total elapsed wall-clock time for the query (seconds).
74    pub elapsed_s: f64,
75    /// Time spent committing the transaction (seconds).
76    pub commit_time_s: Option<f64>,
77    /// Time waiting to be scheduled on a worker thread (seconds).
78    pub time_to_schedule_s: Option<f64>,
79    /// Pre-execution phase stats (parsing, compilation).
80    pub pre_execution: Option<PreExecutionStats>,
81    /// Execution phase stats (runtime, CPU, storage I/O).
82    pub execution: Option<ExecutionStats>,
83    /// Size of the result set sent to the client (MB).
84    pub result_size_mb: Option<f64>,
85    /// Peak memory used by the result buffer (MB).
86    pub peak_result_buffer_memory_mb: Option<f64>,
87    /// Plan cache status: "cache miss", "cache hit", "not run yet", etc.
88    pub plan_cache_status: Option<String>,
89    /// Number of times the cached plan was reused.
90    pub plan_cache_hit_count: Option<u32>,
91    /// Statement type: "SELECT", "INSERT", "SET", "ATTACH", "PREPARE", etc.
92    pub statement_type: Option<String>,
93    /// Number of result rows.
94    pub rows: Option<u64>,
95    /// Number of result columns.
96    pub cols: Option<u32>,
97    /// Truncated query text (as logged by Hyper).
98    pub query_truncated: Option<String>,
99}
100
101/// Pre-execution phase statistics (parsing and compilation).
102#[derive(Debug, Clone, Default)]
103pub struct PreExecutionStats {
104    /// Time spent parsing the SQL (seconds).
105    pub parsing_time_s: Option<f64>,
106    /// Time spent compiling the query plan (seconds).
107    pub compilation_time_s: Option<f64>,
108    /// Total pre-execution elapsed time (seconds).
109    pub elapsed_s: Option<f64>,
110    /// Peak transaction memory during pre-execution (MB).
111    pub peak_memory_mb: Option<f64>,
112}
113
114/// Execution phase statistics (runtime performance).
115#[derive(Debug, Clone, Default)]
116pub struct ExecutionStats {
117    /// Total execution elapsed time (seconds).
118    pub elapsed_s: Option<f64>,
119    /// CPU time consumed (seconds).
120    pub cpu_time_s: Option<f64>,
121    /// Total thread time (seconds).
122    pub thread_time_s: Option<f64>,
123    /// Time spent waiting (seconds).
124    pub wait_time_s: Option<f64>,
125    /// Total rows processed (including federated sources).
126    pub processed_rows_total: Option<u64>,
127    /// Rows processed from native (local) storage.
128    pub processed_rows_native: Option<u64>,
129    /// Time spent on storage access (seconds).
130    pub storage_access_time_s: Option<f64>,
131    /// Number of storage access operations.
132    pub storage_access_count: Option<u64>,
133    /// Bytes read from storage.
134    pub storage_access_bytes: Option<u64>,
135    /// Peak transaction memory during execution (MB).
136    pub peak_memory_mb: Option<f64>,
137}
138
139// =============================================================================
140// Provider Trait
141// =============================================================================
142
143/// Trait for collecting query statistics from a Hyper server.
144///
145/// Implementations capture stats using different mechanisms (log file parsing,
146/// future native protocol support, etc.). The trait uses an opaque token pattern:
147/// `before_query` is called before execution and returns a token (e.g., a file
148/// offset), which is passed to `after_query` after execution to extract the stats.
149///
150/// # Thread Safety
151///
152/// Providers must be `Send + Sync` since they may be shared across connections.
153pub trait QueryStatsProvider: Send + Sync {
154    /// Called before a query is executed.
155    ///
156    /// Returns an opaque token that will be passed to [`after_query`](Self::after_query).
157    /// For the log-based provider, this is the current file offset.
158    fn before_query(&self, sql: &str) -> Box<dyn Any + Send>;
159
160    /// Called after a query completes.
161    ///
162    /// Uses the token from [`before_query`](Self::before_query) to locate and
163    /// extract the query statistics. Returns `None` if stats could not be found.
164    fn after_query(&self, token: Box<dyn Any + Send>, sql: &str) -> Option<QueryStats>;
165}
166
167impl fmt::Debug for dyn QueryStatsProvider {
168    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169        f.write_str("<QueryStatsProvider>")
170    }
171}
172
173// =============================================================================
174// LogFileStatsProvider
175// =============================================================================
176
177/// A [`QueryStatsProvider`] that extracts stats by parsing Hyper's JSON log file.
178///
179/// Hyper writes detailed `query-end` log entries to `hyperd.log` in JSON-per-line
180/// format. This provider:
181/// 1. Records the file offset before each query
182/// 2. After the query, reads new log entries from that offset
183/// 3. Finds the matching `query-end` entry by query text prefix
184/// 4. Parses the JSON stats into a [`QueryStats`] struct
185///
186/// # Example
187///
188/// ```no_run
189/// use hyperdb_api::LogFileStatsProvider;
190///
191/// // From an explicit path
192/// let provider = LogFileStatsProvider::new("/path/to/hyperd.log");
193///
194/// // From a HyperProcess (auto-detects log path)
195/// // let provider = LogFileStatsProvider::from_process(&hyper);
196/// ```
197pub struct LogFileStatsProvider {
198    log_path: PathBuf,
199}
200
201impl fmt::Debug for LogFileStatsProvider {
202    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
203        f.debug_struct("LogFileStatsProvider")
204            .field("log_path", &self.log_path)
205            .finish()
206    }
207}
208
209/// Opaque token storing the log file offset before a query.
210struct LogFileToken {
211    offset: u64,
212}
213
214impl LogFileStatsProvider {
215    /// Creates a new provider that reads from the given log file path.
216    ///
217    /// The path should point to the `hyperd.log` file written by the Hyper server.
218    pub fn new(log_path: impl Into<PathBuf>) -> Self {
219        LogFileStatsProvider {
220            log_path: log_path.into(),
221        }
222    }
223
224    /// Creates a new provider by auto-detecting the log path from a [`HyperProcess`].
225    ///
226    /// The log file is expected at `<log_dir>/hyperd.log`.
227    ///
228    /// [`HyperProcess`]: crate::HyperProcess
229    #[must_use]
230    pub fn from_process(process: &crate::HyperProcess) -> Self {
231        let log_dir = process.log_dir().unwrap_or_else(|| Path::new("."));
232        LogFileStatsProvider {
233            log_path: log_dir.join("hyperd.log"),
234        }
235    }
236
237    /// Returns the log file path this provider reads from.
238    #[must_use]
239    pub fn log_path(&self) -> &Path {
240        &self.log_path
241    }
242
243    /// Gets the current file size (used as offset marker).
244    fn current_offset(&self) -> u64 {
245        std::fs::metadata(&self.log_path).map_or(0, |m| m.len())
246    }
247
248    /// Reads new log entries since `offset` and finds a matching `query-end` entry.
249    ///
250    /// Hyper's extended query protocol (used by `query_streaming`) produces two
251    /// `query-end` entries per query:
252    /// 1. A PREPARE entry with the original SQL text but `rows=0` and minimal stats.
253    /// 2. An execution entry with the fully-qualified (schema-prefixed) SQL and
254    ///    the real rows, memory, storage I/O, and timing stats.
255    ///
256    /// We prefer execution entries (non-PREPARE) over PREPARE entries, and use
257    /// keyword-based matching to handle Hyper's query rewriting.
258    fn find_query_end(&self, offset: u64, sql: &str) -> Option<QueryStats> {
259        let file = File::open(&self.log_path).ok()?;
260        let mut reader = BufReader::new(file);
261        reader.seek(SeekFrom::Start(offset)).ok()?;
262
263        // Normalize the SQL for matching against the truncated log entry
264        let sql_normalized = normalize_for_matching(sql);
265        // Extract significant keywords/identifiers for fuzzy matching
266        let sql_tokens = extract_match_tokens(&sql_normalized);
267
268        // Collect all query-end entries, then pick the best one
269        let mut prepare_match: Option<QueryStats> = None;
270        let mut execution_match: Option<QueryStats> = None;
271        let mut last_entry: Option<QueryStats> = None;
272
273        let mut line = String::new();
274        loop {
275            line.clear();
276            match reader.read_line(&mut line) {
277                Ok(0) => break, // EOF
278                Ok(_) => {}
279                Err(_) => break,
280            }
281
282            let trimmed = line.trim();
283            if trimmed.is_empty() {
284                continue;
285            }
286
287            // Fast check before JSON parsing
288            if !trimmed.contains("\"query-end\"") {
289                continue;
290            }
291
292            let entry: Value = match serde_json::from_str(trimmed) {
293                Ok(v) => v,
294                Err(_) => continue,
295            };
296
297            // Verify this is a query-end entry
298            if entry.get("k").and_then(|k| k.as_str()) != Some("query-end") {
299                continue;
300            }
301
302            let Some(v) = entry.get("v") else { continue };
303
304            let is_prepare = v
305                .get("statement")
306                .and_then(|s| s.as_str())
307                .is_some_and(|s| s == "PREPARE");
308
309            let is_prepare_flag = v
310                .get("prepare-statement")
311                .and_then(serde_json::Value::as_bool)
312                .unwrap_or(false);
313
314            let is_prepare = is_prepare || is_prepare_flag;
315
316            // Check if this entry matches our query
317            let matches = if let Some(query_trunc) = v.get("query-trunc").and_then(|q| q.as_str()) {
318                let log_normalized = normalize_for_matching(query_trunc);
319                // Direct prefix match (either direction)
320                sql_normalized.starts_with(&log_normalized)
321                    || log_normalized.starts_with(&sql_normalized)
322                    // Token-based match: all significant tokens from our SQL appear in the log entry
323                    || (!sql_tokens.is_empty()
324                        && sql_tokens.iter().all(|t| log_normalized.contains(t)))
325            } else {
326                false
327            };
328
329            if matches {
330                trace!(
331                    target: "hyperdb_api",
332                    query_trunc = v.get("query-trunc").and_then(|q| q.as_str()).unwrap_or(""),
333                    is_prepare,
334                    "query-stats-matched"
335                );
336                if is_prepare {
337                    prepare_match = Some(parse_query_end(v));
338                } else {
339                    execution_match = Some(parse_query_end(v));
340                }
341            }
342
343            // Always track the last entry as a fallback
344            last_entry = Some(parse_query_end(v));
345        }
346
347        // Prefer: execution match > prepare match > last entry (for SET/ATTACH etc.)
348        execution_match.or(prepare_match).or(last_entry)
349    }
350}
351
352impl QueryStatsProvider for LogFileStatsProvider {
353    fn before_query(&self, _sql: &str) -> Box<dyn Any + Send> {
354        let offset = self.current_offset();
355        trace!(
356            target: "hyperdb_api",
357            offset,
358            path = %self.log_path.display(),
359            "query-stats-before"
360        );
361        Box::new(LogFileToken { offset })
362    }
363
364    fn after_query(&self, token: Box<dyn Any + Send>, sql: &str) -> Option<QueryStats> {
365        let token = token.downcast::<LogFileToken>().ok()?;
366
367        // Small delay to allow the OS to flush Hyper's log write to disk.
368        // Hyper logs query-end before sending the result, so by the time the
369        // client has consumed all result data the entry should be written,
370        // but OS-level I/O buffering may introduce a small delay.
371        std::thread::sleep(std::time::Duration::from_millis(5));
372
373        let stats = self.find_query_end(token.offset, sql);
374
375        if stats.is_none() {
376            debug!(
377                target: "hyperdb_api",
378                offset = token.offset,
379                sql_prefix = &sql[..sql.len().min(80)],
380                "query-stats-not-found"
381            );
382        }
383
384        stats
385    }
386}
387
388// =============================================================================
389// JSON Parsing Helpers
390// =============================================================================
391
392/// Extracts significant tokens (table names, column names, keywords) from
393/// normalized SQL for fuzzy matching against Hyper's rewritten queries.
394///
395/// Hyper rewrites queries with schema prefixes (e.g., `employees` becomes
396/// `"reading_data"."public"."employees"`), so we extract user-written
397/// identifiers and keywords to match against the rewritten form.
398fn extract_match_tokens(normalized_sql: &str) -> Vec<String> {
399    // SQL keywords to skip — these appear in every query and aren't useful for matching
400    const SKIP: &[&str] = &[
401        "select",
402        "from",
403        "where",
404        "and",
405        "or",
406        "not",
407        "in",
408        "is",
409        "null",
410        "as",
411        "order",
412        "by",
413        "group",
414        "having",
415        "limit",
416        "offset",
417        "join",
418        "on",
419        "left",
420        "right",
421        "inner",
422        "outer",
423        "cross",
424        "full",
425        "union",
426        "all",
427        "distinct",
428        "insert",
429        "into",
430        "values",
431        "update",
432        "set",
433        "delete",
434        "create",
435        "drop",
436        "alter",
437        "table",
438        "temporary",
439        "temp",
440        "if",
441        "exists",
442        "index",
443        "with",
444        "case",
445        "when",
446        "then",
447        "else",
448        "end",
449        "between",
450        "like",
451        "cast",
452        "asc",
453        "desc",
454        "true",
455        "false",
456        "count",
457        "sum",
458        "avg",
459        "min",
460        "max",
461        "text",
462        "int",
463        "integer",
464        "bigint",
465        "smallint",
466        "double",
467        "precision",
468        "float",
469        "varchar",
470        "bool",
471        "boolean",
472        "date",
473        "timestamp",
474        "interval",
475        "generate_series",
476    ];
477
478    normalized_sql
479        .split(|c: char| !c.is_alphanumeric() && c != '_')
480        .filter(|t| t.len() >= 2)
481        .map(str::to_lowercase)
482        .filter(|t| !SKIP.contains(&t.as_str()))
483        .collect()
484}
485
486/// Normalizes SQL text for matching against truncated log entries.
487///
488/// Lowercases and collapses whitespace so that `SELECT * FROM Test` matches
489/// `select * from "db"."public"."test"`.
490fn normalize_for_matching(sql: &str) -> String {
491    let mut result = String::with_capacity(sql.len());
492    let mut prev_was_space = false;
493    for c in sql.chars() {
494        if c.is_whitespace() {
495            if !prev_was_space {
496                result.push(' ');
497                prev_was_space = true;
498            }
499        } else {
500            // Lowercase for case-insensitive matching
501            for lc in c.to_lowercase() {
502                result.push(lc);
503            }
504            prev_was_space = false;
505        }
506    }
507    result.trim().to_string()
508}
509
510/// Parses a `query-end` log entry's `v` (value) object into a `QueryStats`.
511fn parse_query_end(v: &Value) -> QueryStats {
512    // Pre-execution stats
513    let pre_execution = v.get("pre-execution").map(|pre| PreExecutionStats {
514        parsing_time_s: pre.get("parsing-time").and_then(serde_json::Value::as_f64),
515        compilation_time_s: pre
516            .get("compilation-time")
517            .and_then(serde_json::Value::as_f64),
518        elapsed_s: pre.get("elapsed").and_then(serde_json::Value::as_f64),
519        peak_memory_mb: pre
520            .get("peak-transaction-memory-mb")
521            .and_then(serde_json::Value::as_f64),
522    });
523
524    // Execution stats
525    let execution = v.get("execution").map(|exec| {
526        let (cpu_time_s, thread_time_s, wait_time_s) = if let Some(threads) = exec.get("threads") {
527            (
528                threads.get("cpu-time").and_then(serde_json::Value::as_f64),
529                threads
530                    .get("thread-time")
531                    .and_then(serde_json::Value::as_f64),
532                threads.get("wait-time").and_then(serde_json::Value::as_f64),
533            )
534        } else {
535            (None, None, None)
536        };
537
538        let (processed_rows_total, processed_rows_native) =
539            if let Some(rows) = exec.get("processed-rows") {
540                (
541                    rows.get("total").and_then(serde_json::Value::as_u64),
542                    rows.get("native").and_then(serde_json::Value::as_u64),
543                )
544            } else {
545                (None, None)
546            };
547
548        let (storage_access_time_s, storage_access_count, storage_access_bytes) =
549            if let Some(storage) = exec.get("storage") {
550                (
551                    storage
552                        .get("access-time")
553                        .and_then(serde_json::Value::as_f64),
554                    storage
555                        .get("access-count")
556                        .and_then(serde_json::Value::as_u64),
557                    storage
558                        .get("access-bytes")
559                        .and_then(serde_json::Value::as_u64),
560                )
561            } else {
562                (None, None, None)
563            };
564
565        ExecutionStats {
566            elapsed_s: exec.get("elapsed").and_then(serde_json::Value::as_f64),
567            peak_memory_mb: exec
568                .get("peak-transaction-memory-mb")
569                .and_then(serde_json::Value::as_f64),
570            cpu_time_s,
571            thread_time_s,
572            wait_time_s,
573            processed_rows_total,
574            processed_rows_native,
575            storage_access_time_s,
576            storage_access_count,
577            storage_access_bytes,
578        }
579    });
580
581    QueryStats {
582        elapsed_s: v
583            .get("elapsed")
584            .and_then(serde_json::Value::as_f64)
585            .unwrap_or(0.0),
586        commit_time_s: v.get("commit-time").and_then(serde_json::Value::as_f64),
587        time_to_schedule_s: v
588            .get("time-to-schedule")
589            .and_then(serde_json::Value::as_f64),
590        result_size_mb: v.get("result-size-mb").and_then(serde_json::Value::as_f64),
591        peak_result_buffer_memory_mb: v
592            .get("peak-result-buffer-memory-mb")
593            .and_then(serde_json::Value::as_f64),
594        plan_cache_status: v
595            .get("plan-cache-status")
596            .and_then(|v| v.as_str())
597            .map(std::string::ToString::to_string),
598        plan_cache_hit_count: v
599            .get("plan-cache-hit-count")
600            .and_then(serde_json::Value::as_u64)
601            .and_then(|n| u32::try_from(n).ok()),
602        statement_type: v
603            .get("statement")
604            .and_then(|v| v.as_str())
605            .map(std::string::ToString::to_string),
606        rows: v.get("rows").and_then(serde_json::Value::as_u64),
607        cols: v
608            .get("cols")
609            .and_then(serde_json::Value::as_u64)
610            .and_then(|n| u32::try_from(n).ok()),
611        query_truncated: v
612            .get("query-trunc")
613            .and_then(|v| v.as_str())
614            .map(std::string::ToString::to_string),
615        pre_execution,
616        execution,
617    }
618}
619
620#[cfg(test)]
621mod tests {
622    use super::*;
623
624    #[test]
625    fn test_normalize_for_matching() {
626        assert_eq!(
627            normalize_for_matching("SELECT  *  FROM\n  test"),
628            "select * from test"
629        );
630        assert_eq!(normalize_for_matching("  Hello   World  "), "hello world");
631    }
632
633    #[test]
634    fn test_parse_query_end_full() {
635        let json = r#"{
636            "elapsed": 0.0299386,
637            "commit-time": 1.666e-06,
638            "time-to-schedule": 3.6208e-05,
639            "result-size-mb": 0.00053978,
640            "plan-cache-status": "cache miss",
641            "plan-cache-hit-count": 0,
642            "statement": "SELECT",
643            "rows": 42,
644            "cols": 3,
645            "query-trunc": "SELECT * FROM test",
646            "pre-execution": {
647                "parsing-time": 1.75e-05,
648                "compilation-time": 1.4542e-05,
649                "elapsed": 2.45e-05,
650                "peak-transaction-memory-mb": 0.5
651            },
652            "execution": {
653                "elapsed": 0.0293959,
654                "peak-transaction-memory-mb": 1.25,
655                "threads": {
656                    "thread-time": 0.0293959,
657                    "cpu-time": 0.029353,
658                    "wait-time": 0.0001
659                },
660                "processed-rows": {
661                    "total": 1000,
662                    "native": 1000
663                },
664                "storage": {
665                    "access-time": 0.000529375,
666                    "access-count": 11,
667                    "access-bytes": 148979
668                }
669            }
670        }"#;
671
672        let v: Value = serde_json::from_str(json).unwrap();
673        let stats = parse_query_end(&v);
674
675        assert!((stats.elapsed_s - 0.0299386).abs() < 1e-10);
676        assert_eq!(stats.plan_cache_status, Some("cache miss".to_string()));
677        assert_eq!(stats.rows, Some(42));
678        assert_eq!(stats.cols, Some(3));
679
680        let pre = stats.pre_execution.unwrap();
681        assert!((pre.parsing_time_s.unwrap() - 1.75e-05).abs() < 1e-15);
682        assert!((pre.compilation_time_s.unwrap() - 1.4542e-05).abs() < 1e-15);
683        assert!((pre.peak_memory_mb.unwrap() - 0.5).abs() < 1e-10);
684
685        let exec = stats.execution.unwrap();
686        assert!((exec.elapsed_s.unwrap() - 0.0293959).abs() < 1e-10);
687        assert_eq!(exec.processed_rows_total, Some(1000));
688        assert_eq!(exec.storage_access_count, Some(11));
689        assert_eq!(exec.storage_access_bytes, Some(148979));
690    }
691
692    #[test]
693    fn test_parse_query_end_minimal() {
694        let json = r#"{"elapsed": 0.001}"#;
695        let v: Value = serde_json::from_str(json).unwrap();
696        let stats = parse_query_end(&v);
697
698        assert!((stats.elapsed_s - 0.001).abs() < 1e-10);
699        assert!(stats.pre_execution.is_none());
700        assert!(stats.execution.is_none());
701        assert!(stats.plan_cache_status.is_none());
702    }
703}