heliosdb_proxy/analytics/
mod.rs1pub mod config;
11pub mod fingerprinter;
12pub mod histogram;
13pub mod intent;
14pub mod metrics;
15pub mod patterns;
16pub mod slow_log;
17pub mod statistics;
18
19pub use config::{
21 AnalyticsConfig, AnalyticsConfigBuilder, PatternConfig, SamplingConfig, SlowQueryConfig,
22};
23pub use fingerprinter::{OperationType, QueryFingerprint, QueryFingerprinter};
24pub use histogram::{HistogramBucket, HistogramSnapshot, LatencyHistogram};
25pub use intent::{
26 CostAttribution, QueryClassifier, QueryIntent, RagAnalytics, WorkflowTrace, WorkflowTracer,
27};
28pub use metrics::{AnalyticsMetrics, AnalyticsSnapshot, QueryMetricEntry};
29pub use patterns::{NplusOnePattern, PatternAlert, PatternDetector, QueryBurst};
30pub use slow_log::{SlowQueryEntry, SlowQueryLog, SlowQueryReader};
31pub use statistics::{QueryExecution, QueryStatistics, QueryStats, StatisticsStore};
32
33pub struct QueryAnalytics {
35 config: AnalyticsConfig,
37
38 fingerprinter: QueryFingerprinter,
40
41 statistics: StatisticsStore,
43
44 slow_log: SlowQueryLog,
46
47 patterns: PatternDetector,
49
50 metrics: AnalyticsMetrics,
52
53 classifier: QueryClassifier,
55
56 workflows: WorkflowTracer,
58
59 costs: CostAttribution,
61}
62
63impl QueryAnalytics {
64 pub fn new(config: AnalyticsConfig) -> Self {
66 let slow_log = SlowQueryLog::new(config.slow_query.clone());
67 let patterns = PatternDetector::new(config.patterns.clone());
68 let statistics = StatisticsStore::new(config.max_fingerprints);
69
70 Self {
71 fingerprinter: QueryFingerprinter::new(),
72 statistics,
73 slow_log,
74 patterns,
75 metrics: AnalyticsMetrics::new(),
76 classifier: QueryClassifier::new(),
77 workflows: WorkflowTracer::new(),
78 costs: CostAttribution::new(),
79 config,
80 }
81 }
82
83 pub fn with_defaults() -> Self {
85 Self::new(AnalyticsConfig::default())
86 }
87
88 pub fn record(&self, execution: QueryExecution) {
90 if !self.config.enabled {
91 return;
92 }
93
94 if self.config.sampling.enabled && !self.should_sample() {
96 return;
97 }
98
99 let fingerprint = self.fingerprinter.fingerprint(&execution.query);
101
102 self.statistics.record(&fingerprint, &execution);
104
105 self.slow_log.log_if_slow(&execution, &fingerprint);
107
108 if let Some(session) = &execution.session_id {
110 self.patterns
111 .record_query(session, &execution, &fingerprint);
112 }
113
114 let intent = self.classifier.classify(&execution.query);
116
117 self.metrics.record(&fingerprint, &execution, intent);
119
120 if let Some(workflow_id) = &execution.workflow_id {
122 self.workflows.record_step(workflow_id, &execution);
123 }
124
125 self.costs.record(&execution);
127 }
128
129 fn should_sample(&self) -> bool {
131 rand::random::<f64>() < self.config.sampling.rate
132 }
133
134 pub fn fingerprinter(&self) -> &QueryFingerprinter {
136 &self.fingerprinter
137 }
138
139 pub fn get_stats(&self, fingerprint_hash: u64) -> Option<QueryStats> {
141 self.statistics.get(fingerprint_hash)
142 }
143
144 pub fn top_queries(&self, order_by: OrderBy, limit: usize) -> Vec<QueryStats> {
146 self.statistics.top(order_by, limit)
147 }
148
149 pub fn slow_queries(&self, limit: usize) -> Vec<SlowQueryEntry> {
151 self.slow_log.recent(limit)
152 }
153
154 pub fn get_patterns(&self) -> Vec<PatternAlert> {
156 self.patterns.get_alerts()
157 }
158
159 pub fn get_metrics(&self) -> AnalyticsSnapshot {
161 self.metrics.snapshot()
162 }
163
164 pub fn by_intent(&self) -> std::collections::HashMap<QueryIntent, IntentStats> {
166 self.metrics.by_intent()
167 }
168
169 pub fn get_workflows(&self, limit: usize) -> Vec<WorkflowTrace> {
171 self.workflows.recent(limit)
172 }
173
174 pub fn get_costs(&self) -> CostReport {
176 self.costs.report()
177 }
178
179 pub fn reset(&self) {
181 self.statistics.reset();
182 self.metrics.reset();
183 self.workflows.reset();
184 self.costs.reset();
185 }
186}
187
188#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190pub enum OrderBy {
191 TotalTime,
192 AvgTime,
193 Calls,
194 Errors,
195 P99Time,
196 Rows,
197}
198
199impl std::str::FromStr for OrderBy {
200 type Err = String;
201
202 fn from_str(s: &str) -> Result<Self, Self::Err> {
203 match s.to_lowercase().as_str() {
204 "total_time" | "totaltime" => Ok(OrderBy::TotalTime),
205 "avg_time" | "avgtime" => Ok(OrderBy::AvgTime),
206 "calls" | "count" => Ok(OrderBy::Calls),
207 "errors" => Ok(OrderBy::Errors),
208 "p99" | "p99_time" => Ok(OrderBy::P99Time),
209 "rows" => Ok(OrderBy::Rows),
210 _ => Err(format!("Unknown order by: {}", s)),
211 }
212 }
213}
214
215#[derive(Debug, Clone)]
217pub struct IntentStats {
218 pub calls: u64,
219 pub total_time_ms: u64,
220 pub avg_time_ms: f64,
221 pub cache_hit_ratio: f64,
222}
223
224#[derive(Debug, Clone)]
226pub struct CostReport {
227 pub total_queries: u64,
228 pub total_time_seconds: f64,
229 pub estimated_cost_usd: f64,
230 pub by_user: Vec<UserCost>,
231 pub by_agent: Vec<AgentCost>,
232}
233
234#[derive(Debug, Clone)]
236pub struct UserCost {
237 pub user: String,
238 pub queries: u64,
239 pub time_seconds: f64,
240 pub cost_usd: f64,
241}
242
243#[derive(Debug, Clone)]
245pub struct AgentCost {
246 pub agent_id: String,
247 pub queries: u64,
248 pub time_seconds: f64,
249 pub cost_usd: f64,
250}
251
252#[cfg(test)]
253mod tests {
254 use super::*;
255 use std::time::Duration;
256
257 #[test]
258 fn test_analytics_basic() {
259 let analytics = QueryAnalytics::with_defaults();
260
261 let execution = QueryExecution {
262 query: "SELECT * FROM users WHERE id = 1".to_string(),
263 duration: Duration::from_millis(5),
264 rows: 1,
265 error: None,
266 user: "test_user".to_string(),
267 client_ip: "127.0.0.1".to_string(),
268 database: "test_db".to_string(),
269 node: "primary".to_string(),
270 session_id: Some("session_1".to_string()),
271 workflow_id: None,
272 parameters: None,
273 };
274
275 analytics.record(execution);
276
277 let top = analytics.top_queries(OrderBy::Calls, 10);
278 assert_eq!(top.len(), 1);
279 assert_eq!(top[0].calls, 1);
280 }
281
282 #[test]
283 fn test_order_by_parse() {
284 assert_eq!("total_time".parse::<OrderBy>().unwrap(), OrderBy::TotalTime);
285 assert_eq!("calls".parse::<OrderBy>().unwrap(), OrderBy::Calls);
286 assert_eq!("p99".parse::<OrderBy>().unwrap(), OrderBy::P99Time);
287 }
288}