Skip to main content

heliosdb_proxy/analytics/
mod.rs

1//! Query Analytics & Slow Query Log
2//!
3//! Comprehensive query analytics at the proxy layer:
4//! - Query fingerprinting and normalization
5//! - Execution statistics and histograms
6//! - Slow query logging
7//! - Pattern detection (N+1, bursts)
8//! - AI/Agent workload classification
9
10pub mod config;
11pub mod fingerprinter;
12pub mod histogram;
13pub mod intent;
14pub mod metrics;
15pub mod patterns;
16pub mod slow_log;
17pub mod statistics;
18
19// Re-exports
20pub use config::{
21    AnalyticsConfig, AnalyticsConfigBuilder, PatternConfig, SamplingConfig, SlowQueryConfig,
22};
23pub use fingerprinter::{OperationType, QueryFingerprint, QueryFingerprinter};
24pub use histogram::{HistogramBucket, HistogramSnapshot, LatencyHistogram};
25pub use intent::{
26    CostAttribution, QueryClassifier, QueryIntent, RagAnalytics, WorkflowTrace, WorkflowTracer,
27};
28pub use metrics::{AnalyticsMetrics, AnalyticsSnapshot, QueryMetricEntry};
29pub use patterns::{NplusOnePattern, PatternAlert, PatternDetector, QueryBurst};
30pub use slow_log::{SlowQueryEntry, SlowQueryLog, SlowQueryReader};
31pub use statistics::{QueryExecution, QueryStatistics, QueryStats, StatisticsStore};
32
33/// Main analytics engine
34pub struct QueryAnalytics {
35    /// Configuration
36    config: AnalyticsConfig,
37
38    /// Query fingerprinter
39    fingerprinter: QueryFingerprinter,
40
41    /// Statistics store (fingerprint -> stats)
42    statistics: StatisticsStore,
43
44    /// Slow query log
45    slow_log: SlowQueryLog,
46
47    /// Pattern detector
48    patterns: PatternDetector,
49
50    /// Metrics
51    metrics: AnalyticsMetrics,
52
53    /// Query classifier (AI intent)
54    classifier: QueryClassifier,
55
56    /// Workflow tracer
57    workflows: WorkflowTracer,
58
59    /// Cost attribution
60    costs: CostAttribution,
61}
62
63impl QueryAnalytics {
64    /// Create new analytics engine
65    pub fn new(config: AnalyticsConfig) -> Self {
66        let slow_log = SlowQueryLog::new(config.slow_query.clone());
67        let patterns = PatternDetector::new(config.patterns.clone());
68        let statistics = StatisticsStore::new(config.max_fingerprints);
69
70        Self {
71            fingerprinter: QueryFingerprinter::new(),
72            statistics,
73            slow_log,
74            patterns,
75            metrics: AnalyticsMetrics::new(),
76            classifier: QueryClassifier::new(),
77            workflows: WorkflowTracer::new(),
78            costs: CostAttribution::new(),
79            config,
80        }
81    }
82
83    /// Create with default configuration
84    pub fn with_defaults() -> Self {
85        Self::new(AnalyticsConfig::default())
86    }
87
88    /// Record query execution
89    pub fn record(&self, execution: QueryExecution) {
90        if !self.config.enabled {
91            return;
92        }
93
94        // Apply sampling if configured
95        if self.config.sampling.enabled && !self.should_sample() {
96            return;
97        }
98
99        // Fingerprint the query
100        let fingerprint = self.fingerprinter.fingerprint(&execution.query);
101
102        // Record statistics
103        self.statistics.record(&fingerprint, &execution);
104
105        // Check for slow query
106        self.slow_log.log_if_slow(&execution, &fingerprint);
107
108        // Detect patterns
109        if let Some(session) = &execution.session_id {
110            self.patterns
111                .record_query(session, &execution, &fingerprint);
112        }
113
114        // Classify intent
115        let intent = self.classifier.classify(&execution.query);
116
117        // Record metrics
118        self.metrics.record(&fingerprint, &execution, intent);
119
120        // Track workflow if applicable
121        if let Some(workflow_id) = &execution.workflow_id {
122            self.workflows.record_step(workflow_id, &execution);
123        }
124
125        // Attribute costs
126        self.costs.record(&execution);
127    }
128
129    /// Check if we should sample this query
130    fn should_sample(&self) -> bool {
131        rand::random::<f64>() < self.config.sampling.rate
132    }
133
134    /// Get fingerprinter for external use
135    pub fn fingerprinter(&self) -> &QueryFingerprinter {
136        &self.fingerprinter
137    }
138
139    /// Get statistics for a fingerprint
140    pub fn get_stats(&self, fingerprint_hash: u64) -> Option<QueryStats> {
141        self.statistics.get(fingerprint_hash)
142    }
143
144    /// Get top queries by a metric
145    pub fn top_queries(&self, order_by: OrderBy, limit: usize) -> Vec<QueryStats> {
146        self.statistics.top(order_by, limit)
147    }
148
149    /// Get recent slow queries
150    pub fn slow_queries(&self, limit: usize) -> Vec<SlowQueryEntry> {
151        self.slow_log.recent(limit)
152    }
153
154    /// Get detected patterns
155    pub fn get_patterns(&self) -> Vec<PatternAlert> {
156        self.patterns.get_alerts()
157    }
158
159    /// Get metrics snapshot
160    pub fn get_metrics(&self) -> AnalyticsSnapshot {
161        self.metrics.snapshot()
162    }
163
164    /// Get analytics by query intent
165    pub fn by_intent(&self) -> std::collections::HashMap<QueryIntent, IntentStats> {
166        self.metrics.by_intent()
167    }
168
169    /// Get workflow traces
170    pub fn get_workflows(&self, limit: usize) -> Vec<WorkflowTrace> {
171        self.workflows.recent(limit)
172    }
173
174    /// Get cost attribution
175    pub fn get_costs(&self) -> CostReport {
176        self.costs.report()
177    }
178
179    /// Reset all statistics
180    pub fn reset(&self) {
181        self.statistics.reset();
182        self.metrics.reset();
183        self.workflows.reset();
184        self.costs.reset();
185    }
186}
187
188/// Order by options for top queries
189#[derive(Debug, Clone, Copy, PartialEq, Eq)]
190pub enum OrderBy {
191    TotalTime,
192    AvgTime,
193    Calls,
194    Errors,
195    P99Time,
196    Rows,
197}
198
199impl std::str::FromStr for OrderBy {
200    type Err = String;
201
202    fn from_str(s: &str) -> Result<Self, Self::Err> {
203        match s.to_lowercase().as_str() {
204            "total_time" | "totaltime" => Ok(OrderBy::TotalTime),
205            "avg_time" | "avgtime" => Ok(OrderBy::AvgTime),
206            "calls" | "count" => Ok(OrderBy::Calls),
207            "errors" => Ok(OrderBy::Errors),
208            "p99" | "p99_time" => Ok(OrderBy::P99Time),
209            "rows" => Ok(OrderBy::Rows),
210            _ => Err(format!("Unknown order by: {}", s)),
211        }
212    }
213}
214
215/// Intent statistics
216#[derive(Debug, Clone)]
217pub struct IntentStats {
218    pub calls: u64,
219    pub total_time_ms: u64,
220    pub avg_time_ms: f64,
221    pub cache_hit_ratio: f64,
222}
223
224/// Cost report
225#[derive(Debug, Clone)]
226pub struct CostReport {
227    pub total_queries: u64,
228    pub total_time_seconds: f64,
229    pub estimated_cost_usd: f64,
230    pub by_user: Vec<UserCost>,
231    pub by_agent: Vec<AgentCost>,
232}
233
234/// Per-user cost
235#[derive(Debug, Clone)]
236pub struct UserCost {
237    pub user: String,
238    pub queries: u64,
239    pub time_seconds: f64,
240    pub cost_usd: f64,
241}
242
243/// Per-agent cost
244#[derive(Debug, Clone)]
245pub struct AgentCost {
246    pub agent_id: String,
247    pub queries: u64,
248    pub time_seconds: f64,
249    pub cost_usd: f64,
250}
251
252#[cfg(test)]
253mod tests {
254    use super::*;
255    use std::time::Duration;
256
257    #[test]
258    fn test_analytics_basic() {
259        let analytics = QueryAnalytics::with_defaults();
260
261        let execution = QueryExecution {
262            query: "SELECT * FROM users WHERE id = 1".to_string(),
263            duration: Duration::from_millis(5),
264            rows: 1,
265            error: None,
266            user: "test_user".to_string(),
267            client_ip: "127.0.0.1".to_string(),
268            database: "test_db".to_string(),
269            node: "primary".to_string(),
270            session_id: Some("session_1".to_string()),
271            workflow_id: None,
272            parameters: None,
273        };
274
275        analytics.record(execution);
276
277        let top = analytics.top_queries(OrderBy::Calls, 10);
278        assert_eq!(top.len(), 1);
279        assert_eq!(top[0].calls, 1);
280    }
281
282    #[test]
283    fn test_order_by_parse() {
284        assert_eq!("total_time".parse::<OrderBy>().unwrap(), OrderBy::TotalTime);
285        assert_eq!("calls".parse::<OrderBy>().unwrap(), OrderBy::Calls);
286        assert_eq!("p99".parse::<OrderBy>().unwrap(), OrderBy::P99Time);
287    }
288}