pub mod config;
pub mod fingerprinter;
pub mod statistics;
pub mod slow_log;
pub mod patterns;
pub mod histogram;
pub mod metrics;
pub mod intent;
pub use config::{AnalyticsConfig, AnalyticsConfigBuilder, SlowQueryConfig, PatternConfig, SamplingConfig};
pub use fingerprinter::{QueryFingerprinter, QueryFingerprint, OperationType};
pub use statistics::{QueryStatistics, QueryExecution, StatisticsStore, QueryStats};
pub use slow_log::{SlowQueryLog, SlowQueryEntry, SlowQueryReader};
pub use patterns::{PatternDetector, NplusOnePattern, QueryBurst, PatternAlert};
pub use histogram::{LatencyHistogram, HistogramBucket, HistogramSnapshot};
pub use metrics::{AnalyticsMetrics, AnalyticsSnapshot, QueryMetricEntry};
pub use intent::{QueryClassifier, QueryIntent, RagAnalytics, WorkflowTracer, WorkflowTrace, CostAttribution};
pub struct QueryAnalytics {
config: AnalyticsConfig,
fingerprinter: QueryFingerprinter,
statistics: StatisticsStore,
slow_log: SlowQueryLog,
patterns: PatternDetector,
metrics: AnalyticsMetrics,
classifier: QueryClassifier,
workflows: WorkflowTracer,
costs: CostAttribution,
}
impl QueryAnalytics {
pub fn new(config: AnalyticsConfig) -> Self {
let slow_log = SlowQueryLog::new(config.slow_query.clone());
let patterns = PatternDetector::new(config.patterns.clone());
let statistics = StatisticsStore::new(config.max_fingerprints);
Self {
fingerprinter: QueryFingerprinter::new(),
statistics,
slow_log,
patterns,
metrics: AnalyticsMetrics::new(),
classifier: QueryClassifier::new(),
workflows: WorkflowTracer::new(),
costs: CostAttribution::new(),
config,
}
}
pub fn with_defaults() -> Self {
Self::new(AnalyticsConfig::default())
}
pub fn record(&self, execution: QueryExecution) {
if !self.config.enabled {
return;
}
if self.config.sampling.enabled && !self.should_sample() {
return;
}
let fingerprint = self.fingerprinter.fingerprint(&execution.query);
self.statistics.record(&fingerprint, &execution);
self.slow_log.log_if_slow(&execution, &fingerprint);
if let Some(session) = &execution.session_id {
self.patterns.record_query(session, &execution, &fingerprint);
}
let intent = self.classifier.classify(&execution.query);
self.metrics.record(&fingerprint, &execution, intent);
if let Some(workflow_id) = &execution.workflow_id {
self.workflows.record_step(workflow_id, &execution);
}
self.costs.record(&execution);
}
fn should_sample(&self) -> bool {
rand::random::<f64>() < self.config.sampling.rate
}
pub fn fingerprinter(&self) -> &QueryFingerprinter {
&self.fingerprinter
}
pub fn get_stats(&self, fingerprint_hash: u64) -> Option<QueryStats> {
self.statistics.get(fingerprint_hash)
}
pub fn top_queries(&self, order_by: OrderBy, limit: usize) -> Vec<QueryStats> {
self.statistics.top(order_by, limit)
}
pub fn slow_queries(&self, limit: usize) -> Vec<SlowQueryEntry> {
self.slow_log.recent(limit)
}
pub fn get_patterns(&self) -> Vec<PatternAlert> {
self.patterns.get_alerts()
}
pub fn get_metrics(&self) -> AnalyticsSnapshot {
self.metrics.snapshot()
}
pub fn by_intent(&self) -> std::collections::HashMap<QueryIntent, IntentStats> {
self.metrics.by_intent()
}
pub fn get_workflows(&self, limit: usize) -> Vec<WorkflowTrace> {
self.workflows.recent(limit)
}
pub fn get_costs(&self) -> CostReport {
self.costs.report()
}
pub fn reset(&self) {
self.statistics.reset();
self.metrics.reset();
self.workflows.reset();
self.costs.reset();
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum OrderBy {
TotalTime,
AvgTime,
Calls,
Errors,
P99Time,
Rows,
}
impl std::str::FromStr for OrderBy {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"total_time" | "totaltime" => Ok(OrderBy::TotalTime),
"avg_time" | "avgtime" => Ok(OrderBy::AvgTime),
"calls" | "count" => Ok(OrderBy::Calls),
"errors" => Ok(OrderBy::Errors),
"p99" | "p99_time" => Ok(OrderBy::P99Time),
"rows" => Ok(OrderBy::Rows),
_ => Err(format!("Unknown order by: {}", s)),
}
}
}
#[derive(Debug, Clone)]
pub struct IntentStats {
pub calls: u64,
pub total_time_ms: u64,
pub avg_time_ms: f64,
pub cache_hit_ratio: f64,
}
#[derive(Debug, Clone)]
pub struct CostReport {
pub total_queries: u64,
pub total_time_seconds: f64,
pub estimated_cost_usd: f64,
pub by_user: Vec<UserCost>,
pub by_agent: Vec<AgentCost>,
}
#[derive(Debug, Clone)]
pub struct UserCost {
pub user: String,
pub queries: u64,
pub time_seconds: f64,
pub cost_usd: f64,
}
#[derive(Debug, Clone)]
pub struct AgentCost {
pub agent_id: String,
pub queries: u64,
pub time_seconds: f64,
pub cost_usd: f64,
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
#[test]
fn test_analytics_basic() {
let analytics = QueryAnalytics::with_defaults();
let execution = QueryExecution {
query: "SELECT * FROM users WHERE id = 1".to_string(),
duration: Duration::from_millis(5),
rows: 1,
error: None,
user: "test_user".to_string(),
client_ip: "127.0.0.1".to_string(),
database: "test_db".to_string(),
node: "primary".to_string(),
session_id: Some("session_1".to_string()),
workflow_id: None,
parameters: None,
};
analytics.record(execution);
let top = analytics.top_queries(OrderBy::Calls, 10);
assert_eq!(top.len(), 1);
assert_eq!(top[0].calls, 1);
}
#[test]
fn test_order_by_parse() {
assert_eq!("total_time".parse::<OrderBy>().unwrap(), OrderBy::TotalTime);
assert_eq!("calls".parse::<OrderBy>().unwrap(), OrderBy::Calls);
assert_eq!("p99".parse::<OrderBy>().unwrap(), OrderBy::P99Time);
}
}