Skip to main content

heliosdb_proxy/analytics/
mod.rs

1//! Query Analytics & Slow Query Log
2//!
3//! Comprehensive query analytics at the proxy layer:
4//! - Query fingerprinting and normalization
5//! - Execution statistics and histograms
6//! - Slow query logging
7//! - Pattern detection (N+1, bursts)
8//! - AI/Agent workload classification
9
10pub mod config;
11pub mod fingerprinter;
12pub mod statistics;
13pub mod slow_log;
14pub mod patterns;
15pub mod histogram;
16pub mod metrics;
17pub mod intent;
18
19// Re-exports
20pub use config::{AnalyticsConfig, AnalyticsConfigBuilder, SlowQueryConfig, PatternConfig, SamplingConfig};
21pub use fingerprinter::{QueryFingerprinter, QueryFingerprint, OperationType};
22pub use statistics::{QueryStatistics, QueryExecution, StatisticsStore, QueryStats};
23pub use slow_log::{SlowQueryLog, SlowQueryEntry, SlowQueryReader};
24pub use patterns::{PatternDetector, NplusOnePattern, QueryBurst, PatternAlert};
25pub use histogram::{LatencyHistogram, HistogramBucket, HistogramSnapshot};
26pub use metrics::{AnalyticsMetrics, AnalyticsSnapshot, QueryMetricEntry};
27pub use intent::{QueryClassifier, QueryIntent, RagAnalytics, WorkflowTracer, WorkflowTrace, CostAttribution};
28
29
30/// Main analytics engine
31pub struct QueryAnalytics {
32    /// Configuration
33    config: AnalyticsConfig,
34
35    /// Query fingerprinter
36    fingerprinter: QueryFingerprinter,
37
38    /// Statistics store (fingerprint -> stats)
39    statistics: StatisticsStore,
40
41    /// Slow query log
42    slow_log: SlowQueryLog,
43
44    /// Pattern detector
45    patterns: PatternDetector,
46
47    /// Metrics
48    metrics: AnalyticsMetrics,
49
50    /// Query classifier (AI intent)
51    classifier: QueryClassifier,
52
53    /// Workflow tracer
54    workflows: WorkflowTracer,
55
56    /// Cost attribution
57    costs: CostAttribution,
58}
59
60impl QueryAnalytics {
61    /// Create new analytics engine
62    pub fn new(config: AnalyticsConfig) -> Self {
63        let slow_log = SlowQueryLog::new(config.slow_query.clone());
64        let patterns = PatternDetector::new(config.patterns.clone());
65        let statistics = StatisticsStore::new(config.max_fingerprints);
66
67        Self {
68            fingerprinter: QueryFingerprinter::new(),
69            statistics,
70            slow_log,
71            patterns,
72            metrics: AnalyticsMetrics::new(),
73            classifier: QueryClassifier::new(),
74            workflows: WorkflowTracer::new(),
75            costs: CostAttribution::new(),
76            config,
77        }
78    }
79
80    /// Create with default configuration
81    pub fn with_defaults() -> Self {
82        Self::new(AnalyticsConfig::default())
83    }
84
85    /// Record query execution
86    pub fn record(&self, execution: QueryExecution) {
87        if !self.config.enabled {
88            return;
89        }
90
91        // Apply sampling if configured
92        if self.config.sampling.enabled && !self.should_sample() {
93            return;
94        }
95
96        // Fingerprint the query
97        let fingerprint = self.fingerprinter.fingerprint(&execution.query);
98
99        // Record statistics
100        self.statistics.record(&fingerprint, &execution);
101
102        // Check for slow query
103        self.slow_log.log_if_slow(&execution, &fingerprint);
104
105        // Detect patterns
106        if let Some(session) = &execution.session_id {
107            self.patterns.record_query(session, &execution, &fingerprint);
108        }
109
110        // Classify intent
111        let intent = self.classifier.classify(&execution.query);
112
113        // Record metrics
114        self.metrics.record(&fingerprint, &execution, intent);
115
116        // Track workflow if applicable
117        if let Some(workflow_id) = &execution.workflow_id {
118            self.workflows.record_step(workflow_id, &execution);
119        }
120
121        // Attribute costs
122        self.costs.record(&execution);
123    }
124
125    /// Check if we should sample this query
126    fn should_sample(&self) -> bool {
127        rand::random::<f64>() < self.config.sampling.rate
128    }
129
130    /// Get fingerprinter for external use
131    pub fn fingerprinter(&self) -> &QueryFingerprinter {
132        &self.fingerprinter
133    }
134
135    /// Get statistics for a fingerprint
136    pub fn get_stats(&self, fingerprint_hash: u64) -> Option<QueryStats> {
137        self.statistics.get(fingerprint_hash)
138    }
139
140    /// Get top queries by a metric
141    pub fn top_queries(&self, order_by: OrderBy, limit: usize) -> Vec<QueryStats> {
142        self.statistics.top(order_by, limit)
143    }
144
145    /// Get recent slow queries
146    pub fn slow_queries(&self, limit: usize) -> Vec<SlowQueryEntry> {
147        self.slow_log.recent(limit)
148    }
149
150    /// Get detected patterns
151    pub fn get_patterns(&self) -> Vec<PatternAlert> {
152        self.patterns.get_alerts()
153    }
154
155    /// Get metrics snapshot
156    pub fn get_metrics(&self) -> AnalyticsSnapshot {
157        self.metrics.snapshot()
158    }
159
160    /// Get analytics by query intent
161    pub fn by_intent(&self) -> std::collections::HashMap<QueryIntent, IntentStats> {
162        self.metrics.by_intent()
163    }
164
165    /// Get workflow traces
166    pub fn get_workflows(&self, limit: usize) -> Vec<WorkflowTrace> {
167        self.workflows.recent(limit)
168    }
169
170    /// Get cost attribution
171    pub fn get_costs(&self) -> CostReport {
172        self.costs.report()
173    }
174
175    /// Reset all statistics
176    pub fn reset(&self) {
177        self.statistics.reset();
178        self.metrics.reset();
179        self.workflows.reset();
180        self.costs.reset();
181    }
182}
183
184/// Order by options for top queries
185#[derive(Debug, Clone, Copy, PartialEq, Eq)]
186pub enum OrderBy {
187    TotalTime,
188    AvgTime,
189    Calls,
190    Errors,
191    P99Time,
192    Rows,
193}
194
195impl std::str::FromStr for OrderBy {
196    type Err = String;
197
198    fn from_str(s: &str) -> Result<Self, Self::Err> {
199        match s.to_lowercase().as_str() {
200            "total_time" | "totaltime" => Ok(OrderBy::TotalTime),
201            "avg_time" | "avgtime" => Ok(OrderBy::AvgTime),
202            "calls" | "count" => Ok(OrderBy::Calls),
203            "errors" => Ok(OrderBy::Errors),
204            "p99" | "p99_time" => Ok(OrderBy::P99Time),
205            "rows" => Ok(OrderBy::Rows),
206            _ => Err(format!("Unknown order by: {}", s)),
207        }
208    }
209}
210
211/// Intent statistics
212#[derive(Debug, Clone)]
213pub struct IntentStats {
214    pub calls: u64,
215    pub total_time_ms: u64,
216    pub avg_time_ms: f64,
217    pub cache_hit_ratio: f64,
218}
219
220/// Cost report
221#[derive(Debug, Clone)]
222pub struct CostReport {
223    pub total_queries: u64,
224    pub total_time_seconds: f64,
225    pub estimated_cost_usd: f64,
226    pub by_user: Vec<UserCost>,
227    pub by_agent: Vec<AgentCost>,
228}
229
230/// Per-user cost
231#[derive(Debug, Clone)]
232pub struct UserCost {
233    pub user: String,
234    pub queries: u64,
235    pub time_seconds: f64,
236    pub cost_usd: f64,
237}
238
239/// Per-agent cost
240#[derive(Debug, Clone)]
241pub struct AgentCost {
242    pub agent_id: String,
243    pub queries: u64,
244    pub time_seconds: f64,
245    pub cost_usd: f64,
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use std::time::Duration;
252
253    #[test]
254    fn test_analytics_basic() {
255        let analytics = QueryAnalytics::with_defaults();
256
257        let execution = QueryExecution {
258            query: "SELECT * FROM users WHERE id = 1".to_string(),
259            duration: Duration::from_millis(5),
260            rows: 1,
261            error: None,
262            user: "test_user".to_string(),
263            client_ip: "127.0.0.1".to_string(),
264            database: "test_db".to_string(),
265            node: "primary".to_string(),
266            session_id: Some("session_1".to_string()),
267            workflow_id: None,
268            parameters: None,
269        };
270
271        analytics.record(execution);
272
273        let top = analytics.top_queries(OrderBy::Calls, 10);
274        assert_eq!(top.len(), 1);
275        assert_eq!(top[0].calls, 1);
276    }
277
278    #[test]
279    fn test_order_by_parse() {
280        assert_eq!("total_time".parse::<OrderBy>().unwrap(), OrderBy::TotalTime);
281        assert_eq!("calls".parse::<OrderBy>().unwrap(), OrderBy::Calls);
282        assert_eq!("p99".parse::<OrderBy>().unwrap(), OrderBy::P99Time);
283    }
284}