heliosdb_proxy/analytics/
mod.rs1pub mod config;
11pub mod fingerprinter;
12pub mod statistics;
13pub mod slow_log;
14pub mod patterns;
15pub mod histogram;
16pub mod metrics;
17pub mod intent;
18
19pub use config::{AnalyticsConfig, AnalyticsConfigBuilder, SlowQueryConfig, PatternConfig, SamplingConfig};
21pub use fingerprinter::{QueryFingerprinter, QueryFingerprint, OperationType};
22pub use statistics::{QueryStatistics, QueryExecution, StatisticsStore, QueryStats};
23pub use slow_log::{SlowQueryLog, SlowQueryEntry, SlowQueryReader};
24pub use patterns::{PatternDetector, NplusOnePattern, QueryBurst, PatternAlert};
25pub use histogram::{LatencyHistogram, HistogramBucket, HistogramSnapshot};
26pub use metrics::{AnalyticsMetrics, AnalyticsSnapshot, QueryMetricEntry};
27pub use intent::{QueryClassifier, QueryIntent, RagAnalytics, WorkflowTracer, WorkflowTrace, CostAttribution};
28
29
30pub struct QueryAnalytics {
32 config: AnalyticsConfig,
34
35 fingerprinter: QueryFingerprinter,
37
38 statistics: StatisticsStore,
40
41 slow_log: SlowQueryLog,
43
44 patterns: PatternDetector,
46
47 metrics: AnalyticsMetrics,
49
50 classifier: QueryClassifier,
52
53 workflows: WorkflowTracer,
55
56 costs: CostAttribution,
58}
59
60impl QueryAnalytics {
61 pub fn new(config: AnalyticsConfig) -> Self {
63 let slow_log = SlowQueryLog::new(config.slow_query.clone());
64 let patterns = PatternDetector::new(config.patterns.clone());
65 let statistics = StatisticsStore::new(config.max_fingerprints);
66
67 Self {
68 fingerprinter: QueryFingerprinter::new(),
69 statistics,
70 slow_log,
71 patterns,
72 metrics: AnalyticsMetrics::new(),
73 classifier: QueryClassifier::new(),
74 workflows: WorkflowTracer::new(),
75 costs: CostAttribution::new(),
76 config,
77 }
78 }
79
80 pub fn with_defaults() -> Self {
82 Self::new(AnalyticsConfig::default())
83 }
84
85 pub fn record(&self, execution: QueryExecution) {
87 if !self.config.enabled {
88 return;
89 }
90
91 if self.config.sampling.enabled && !self.should_sample() {
93 return;
94 }
95
96 let fingerprint = self.fingerprinter.fingerprint(&execution.query);
98
99 self.statistics.record(&fingerprint, &execution);
101
102 self.slow_log.log_if_slow(&execution, &fingerprint);
104
105 if let Some(session) = &execution.session_id {
107 self.patterns.record_query(session, &execution, &fingerprint);
108 }
109
110 let intent = self.classifier.classify(&execution.query);
112
113 self.metrics.record(&fingerprint, &execution, intent);
115
116 if let Some(workflow_id) = &execution.workflow_id {
118 self.workflows.record_step(workflow_id, &execution);
119 }
120
121 self.costs.record(&execution);
123 }
124
125 fn should_sample(&self) -> bool {
127 rand::random::<f64>() < self.config.sampling.rate
128 }
129
130 pub fn fingerprinter(&self) -> &QueryFingerprinter {
132 &self.fingerprinter
133 }
134
135 pub fn get_stats(&self, fingerprint_hash: u64) -> Option<QueryStats> {
137 self.statistics.get(fingerprint_hash)
138 }
139
140 pub fn top_queries(&self, order_by: OrderBy, limit: usize) -> Vec<QueryStats> {
142 self.statistics.top(order_by, limit)
143 }
144
145 pub fn slow_queries(&self, limit: usize) -> Vec<SlowQueryEntry> {
147 self.slow_log.recent(limit)
148 }
149
150 pub fn get_patterns(&self) -> Vec<PatternAlert> {
152 self.patterns.get_alerts()
153 }
154
155 pub fn get_metrics(&self) -> AnalyticsSnapshot {
157 self.metrics.snapshot()
158 }
159
160 pub fn by_intent(&self) -> std::collections::HashMap<QueryIntent, IntentStats> {
162 self.metrics.by_intent()
163 }
164
165 pub fn get_workflows(&self, limit: usize) -> Vec<WorkflowTrace> {
167 self.workflows.recent(limit)
168 }
169
170 pub fn get_costs(&self) -> CostReport {
172 self.costs.report()
173 }
174
175 pub fn reset(&self) {
177 self.statistics.reset();
178 self.metrics.reset();
179 self.workflows.reset();
180 self.costs.reset();
181 }
182}
183
184#[derive(Debug, Clone, Copy, PartialEq, Eq)]
186pub enum OrderBy {
187 TotalTime,
188 AvgTime,
189 Calls,
190 Errors,
191 P99Time,
192 Rows,
193}
194
195impl std::str::FromStr for OrderBy {
196 type Err = String;
197
198 fn from_str(s: &str) -> Result<Self, Self::Err> {
199 match s.to_lowercase().as_str() {
200 "total_time" | "totaltime" => Ok(OrderBy::TotalTime),
201 "avg_time" | "avgtime" => Ok(OrderBy::AvgTime),
202 "calls" | "count" => Ok(OrderBy::Calls),
203 "errors" => Ok(OrderBy::Errors),
204 "p99" | "p99_time" => Ok(OrderBy::P99Time),
205 "rows" => Ok(OrderBy::Rows),
206 _ => Err(format!("Unknown order by: {}", s)),
207 }
208 }
209}
210
211#[derive(Debug, Clone)]
213pub struct IntentStats {
214 pub calls: u64,
215 pub total_time_ms: u64,
216 pub avg_time_ms: f64,
217 pub cache_hit_ratio: f64,
218}
219
220#[derive(Debug, Clone)]
222pub struct CostReport {
223 pub total_queries: u64,
224 pub total_time_seconds: f64,
225 pub estimated_cost_usd: f64,
226 pub by_user: Vec<UserCost>,
227 pub by_agent: Vec<AgentCost>,
228}
229
230#[derive(Debug, Clone)]
232pub struct UserCost {
233 pub user: String,
234 pub queries: u64,
235 pub time_seconds: f64,
236 pub cost_usd: f64,
237}
238
239#[derive(Debug, Clone)]
241pub struct AgentCost {
242 pub agent_id: String,
243 pub queries: u64,
244 pub time_seconds: f64,
245 pub cost_usd: f64,
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use std::time::Duration;
252
253 #[test]
254 fn test_analytics_basic() {
255 let analytics = QueryAnalytics::with_defaults();
256
257 let execution = QueryExecution {
258 query: "SELECT * FROM users WHERE id = 1".to_string(),
259 duration: Duration::from_millis(5),
260 rows: 1,
261 error: None,
262 user: "test_user".to_string(),
263 client_ip: "127.0.0.1".to_string(),
264 database: "test_db".to_string(),
265 node: "primary".to_string(),
266 session_id: Some("session_1".to_string()),
267 workflow_id: None,
268 parameters: None,
269 };
270
271 analytics.record(execution);
272
273 let top = analytics.top_queries(OrderBy::Calls, 10);
274 assert_eq!(top.len(), 1);
275 assert_eq!(top[0].calls, 1);
276 }
277
278 #[test]
279 fn test_order_by_parse() {
280 assert_eq!("total_time".parse::<OrderBy>().unwrap(), OrderBy::TotalTime);
281 assert_eq!("calls".parse::<OrderBy>().unwrap(), OrderBy::Calls);
282 assert_eq!("p99".parse::<OrderBy>().unwrap(), OrderBy::P99Time);
283 }
284}