Skip to main content

hirn_engine/observability/
diagnostics.rs

1//! Query diagnostics: per-stage timing, query IDs, slow query detection.
2//!
3//! Every query gets a unique ULID, stage timings are
4//! captured during execution, and queries exceeding the configured threshold
5//! are logged with full context.
6
7use std::fmt;
8use std::time::Duration;
9
10/// Unique identifier for a query execution, based on ULID for time-ordering.
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
12pub struct QueryId(ulid::Ulid);
13
14impl QueryId {
15    /// Generate a new query ID.
16    pub fn new() -> Self {
17        Self(ulid::Ulid::new())
18    }
19}
20
21impl fmt::Display for QueryId {
22    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
23        write!(f, "{}", self.0)
24    }
25}
26
27impl Default for QueryId {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33/// Per-stage timing breakdown for a query execution.
34#[derive(Debug, Clone, Default)]
35pub struct QueryDiagnostics {
36    /// Unique query identifier.
37    pub query_id: Option<QueryId>,
38    /// Authorization evaluation time.
39    pub authorize_us: Option<u64>,
40    /// Embedding generation time.
41    pub embed_ms: Option<f64>,
42    /// DataFusion logical-plan optimization duration.
43    pub optimize_ms: Option<f64>,
44    /// DataFusion physical-plan creation duration.
45    pub physical_plan_ms: Option<f64>,
46    /// DataFusion physical-plan execution and collection duration.
47    pub execute_plan_ms: Option<f64>,
48    /// Vector search stage duration.
49    pub vector_search_ms: Option<f64>,
50    /// Graph expansion stage duration.
51    pub graph_expand_ms: Option<f64>,
52    /// Reranking stage duration.
53    pub rerank_ms: Option<f64>,
54    /// Neural reranker (cross-encoder / API) stage duration.
55    pub neural_rerank_ms: Option<f64>,
56    /// Secondary record-hydration duration (loading full records from storage
57    /// after plan execution, separate from the context-assembly step itself).
58    /// Only set by THINK queries; nil for RECALL and other query types.
59    pub decode_ms: Option<f64>,
60    /// Assembly stage duration.  For THINK queries this covers only
61    /// `assemble_think_context`; secondary hydration is in `decode_ms`.
62    pub assemble_ms: Option<f64>,
63    /// Total query execution time.
64    pub total_ms: Option<f64>,
65    /// Number of records scanned during vector search.
66    pub records_scanned: Option<usize>,
67    /// Number of records returned after filtering and reranking.
68    pub records_returned: Option<usize>,
69    /// Number of candidates discarded before record fetch due to thresholding.
70    pub threshold_filtered_count: Option<usize>,
71    /// Number of candidates penalized by competitive inhibition.
72    pub competitive_inhibition_count: Option<usize>,
73    /// Number of scored candidates dropped due to the requested limit.
74    pub truncated_by_limit_count: Option<usize>,
75    /// Number of returned records whose raw text was redacted by policy.
76    pub raw_text_redacted_results: Option<usize>,
77    /// Number of times multivector MaxSim failed and recall fell back to composite-only ranking.
78    pub multivector_fallback_count: Option<usize>,
79    /// Number of times the neural reranker failed and recall kept composite ordering.
80    pub neural_rerank_fallback_count: Option<usize>,
81}
82
83impl QueryDiagnostics {
84    #[must_use]
85    pub fn advanced_retrieval_fallback_summary(&self) -> Option<String> {
86        let mut parts = Vec::new();
87
88        if let Some(count) = self.multivector_fallback_count.filter(|count| *count > 0) {
89            parts.push(format!("multivector_fallback_count={count}"));
90        }
91        if let Some(count) = self.neural_rerank_fallback_count.filter(|count| *count > 0) {
92            parts.push(format!("neural_rerank_fallback_count={count}"));
93        }
94
95        (!parts.is_empty()).then(|| parts.join(", "))
96    }
97}
98
99impl fmt::Display for QueryDiagnostics {
100    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
101        if let Some(id) = &self.query_id {
102            writeln!(f, "Query ID: {id}")?;
103        }
104        writeln!(f, "Stage Timings:")?;
105        if let Some(v) = self.authorize_us {
106            writeln!(f, "  authorize:     {:.3}ms", v as f64 / 1000.0)?;
107        }
108        if let Some(v) = self.embed_ms {
109            writeln!(f, "  embed_query:   {v:.3}ms")?;
110        }
111        if let Some(v) = self.optimize_ms {
112            writeln!(f, "  optimize:      {v:.3}ms")?;
113        }
114        if let Some(v) = self.physical_plan_ms {
115            writeln!(f, "  physical_plan: {v:.3}ms")?;
116        }
117        if let Some(v) = self.execute_plan_ms {
118            writeln!(f, "  execute_plan:  {v:.3}ms")?;
119        }
120        if let Some(v) = self.vector_search_ms {
121            writeln!(f, "  vector_search: {v:.3}ms")?;
122        }
123        if let Some(v) = self.graph_expand_ms {
124            writeln!(f, "  graph_expand:  {v:.3}ms")?;
125        }
126        if let Some(v) = self.rerank_ms {
127            writeln!(f, "  rerank:        {v:.3}ms")?;
128        }
129        if let Some(v) = self.neural_rerank_ms {
130            writeln!(f, "  neural_rerank: {v:.3}ms")?;
131        }
132        if let Some(v) = self.decode_ms {
133            writeln!(f, "  decode:        {v:.3}ms")?;
134        }
135        if let Some(v) = self.assemble_ms {
136            writeln!(f, "  assemble:      {v:.3}ms")?;
137        }
138        if let Some(v) = self.total_ms {
139            writeln!(f, "  total:         {v:.3}ms")?;
140        }
141        if self.records_scanned.is_some() || self.records_returned.is_some() {
142            writeln!(f, "Row Counts:")?;
143            if let Some(v) = self.records_scanned {
144                writeln!(f, "  scanned:  {v}")?;
145            }
146            if let Some(v) = self.records_returned {
147                writeln!(f, "  returned: {v}")?;
148            }
149        }
150        if self.threshold_filtered_count.is_some()
151            || self.competitive_inhibition_count.is_some()
152            || self.truncated_by_limit_count.is_some()
153            || self.raw_text_redacted_results.is_some()
154        {
155            writeln!(f, "Suppression:")?;
156            if let Some(v) = self.threshold_filtered_count {
157                writeln!(f, "  threshold_filtered: {v}")?;
158            }
159            if let Some(v) = self.competitive_inhibition_count {
160                writeln!(f, "  competitively_inhibited: {v}")?;
161            }
162            if let Some(v) = self.truncated_by_limit_count {
163                writeln!(f, "  truncated_by_limit: {v}")?;
164            }
165            if let Some(v) = self.raw_text_redacted_results {
166                writeln!(f, "  raw_text_redacted: {v}")?;
167            }
168        }
169        if self.multivector_fallback_count.is_some() || self.neural_rerank_fallback_count.is_some()
170        {
171            writeln!(f, "Fallbacks:")?;
172            if let Some(v) = self.multivector_fallback_count {
173                writeln!(f, "  multivector: {v}")?;
174            }
175            if let Some(v) = self.neural_rerank_fallback_count {
176                writeln!(f, "  neural_rerank: {v}")?;
177            }
178        }
179        Ok(())
180    }
181}
182
183/// Convert a `Duration` to milliseconds as `f64`.
184pub(crate) fn duration_ms(d: Duration) -> f64 {
185    d.as_secs_f64() * 1000.0
186}