use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use serde::{Deserialize, Serialize};
pub struct QueryProfiler {
queries: Arc<Mutex<Vec<QueryExecution>>>,
is_running: Arc<Mutex<bool>>,
_handle: Option<tokio::task::JoinHandle<()>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryExecution {
pub id: u64,
pub query_type: QueryType,
pub query_text: String,
pub start_time: chrono::DateTime<chrono::Utc>,
pub end_time: Option<chrono::DateTime<chrono::Utc>>,
pub execution_time_us: Option<u64>,
pub result_count: usize,
pub bytes_processed: u64,
pub success: bool,
pub error_message: Option<String>,
pub execution_plan: Option<QueryPlan>,
pub thread_id: u64,
pub client_info: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum QueryType {
Select,
Insert,
Update,
Delete,
Create,
Drop,
Alter,
Transaction,
Custom(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryPlan {
pub operations: Vec<PlanOperation>,
pub estimated_cost: f64,
pub actual_cost: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlanOperation {
pub operation_type: String,
pub object_name: String,
pub estimated_rows: usize,
pub actual_rows: Option<usize>,
pub execution_time_us: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct QueryAnalysis {
pub total_queries: usize,
pub query_type_breakdown: HashMap<String, QueryTypeStats>,
pub slow_queries: Vec<SlowQuery>,
pub inefficient_queries: Vec<InefficientQuery>,
pub query_patterns: Vec<QueryPattern>,
pub index_recommendations: Vec<IndexRecommendation>,
pub performance_trends: QueryPerformanceTrends,
pub recommendations: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryTypeStats {
pub count: usize,
pub total_execution_time_us: u64,
pub average_execution_time_us: f64,
pub p95_execution_time_us: u64,
pub success_rate: f64,
pub average_result_count: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SlowQuery {
pub query_id: u64,
pub query_text: String,
pub execution_time_us: u64,
pub result_count: usize,
pub execution_plan: Option<QueryPlan>,
pub suggestion: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InefficientQuery {
pub query_id: u64,
pub query_text: String,
pub inefficiency_type: InefficiencyType,
pub severity: f64, pub explanation: String,
pub suggestion: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum InefficiencyType {
FullTableScan,
MissingIndex,
CartesianProduct,
RedundantSort,
ExpensivePredicate,
LargeResultSet,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryPattern {
pub pattern_type: String,
pub frequency: usize,
pub average_performance: f64,
pub trend: PerformanceTrend,
pub example_query: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PerformanceTrend {
Improving,
Stable,
Degrading,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexRecommendation {
pub table_name: String,
pub column_name: String,
pub index_type: String,
pub estimated_improvement: f64,
pub queries_affected: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryPerformanceTrends {
pub overall_trend: PerformanceTrend,
pub trend_by_query_type: HashMap<String, PerformanceTrend>,
pub peak_load_periods: Vec<TimeRange>,
pub performance_degradation_points: Vec<chrono::DateTime<chrono::Utc>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeRange {
pub start: chrono::DateTime<chrono::Utc>,
pub end: chrono::DateTime<chrono::Utc>,
}
impl QueryProfiler {
pub fn new() -> Self {
Self {
queries: Arc::new(Mutex::new(Vec::new())),
is_running: Arc::new(Mutex::new(false)),
_handle: None,
}
}
pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let mut is_running = self.is_running.lock().unwrap();
if *is_running {
return Err("Query profiler is already running".into());
}
*is_running = true;
let queries = Arc::clone(&self.queries);
let is_running_clone = Arc::clone(&self.is_running);
self._handle = Some(tokio::spawn(async move {
while *is_running_clone.lock().unwrap() {
tokio::time::sleep(Duration::from_millis(50)).await;
if rand::random::<f32>() < 0.2 { let query = Self::simulate_query_execution();
queries.lock().unwrap().push(query);
}
}
}));
Ok(())
}
pub async fn stop(&mut self) -> Result<(), Box<dyn std::error::Error>> {
let mut is_running = self.is_running.lock().unwrap();
if !*is_running {
return Err("Query profiler is not running".into());
}
*is_running = false;
if let Some(handle) = self._handle.take() {
let _ = handle.await;
}
Ok(())
}
pub async fn record_query(&self, execution: QueryExecution) {
self.queries.lock().unwrap().push(execution);
}
pub async fn analyze(&self) -> Result<QueryAnalysis, Box<dyn std::error::Error>> {
let queries = self.queries.lock().unwrap();
if queries.is_empty() {
return Ok(QueryAnalysis {
total_queries: 0,
query_type_breakdown: HashMap::new(),
slow_queries: Vec::new(),
inefficient_queries: Vec::new(),
query_patterns: Vec::new(),
index_recommendations: Vec::new(),
performance_trends: QueryPerformanceTrends {
overall_trend: PerformanceTrend::Stable,
trend_by_query_type: HashMap::new(),
peak_load_periods: Vec::new(),
performance_degradation_points: Vec::new(),
},
recommendations: vec!["No queries recorded for analysis".to_string()],
});
}
let total_queries = queries.len();
let query_type_breakdown = self.analyze_query_types(&queries);
let slow_queries = self.identify_slow_queries(&queries);
let inefficient_queries = self.identify_inefficient_queries(&queries);
let query_patterns = self.analyze_query_patterns(&queries);
let index_recommendations = self.generate_index_recommendations(&queries);
let performance_trends = self.analyze_performance_trends(&queries);
let recommendations = self.generate_recommendations(
&slow_queries,
&inefficient_queries,
&index_recommendations,
);
Ok(QueryAnalysis {
total_queries,
query_type_breakdown,
slow_queries,
inefficient_queries,
query_patterns,
index_recommendations,
performance_trends,
recommendations,
})
}
fn simulate_query_execution() -> QueryExecution {
use rand::Rng;
let mut rng = rand::thread_rng();
let query_types = [QueryType::Select, QueryType::Insert, QueryType::Update, QueryType::Delete];
let query_type = query_types[rng.gen_range(0..query_types.len())].clone();
let execution_time_us = rng.gen_range(100..100000); let result_count = rng.gen_range(0..1000);
let bytes_processed = rng.gen_range(1000..100000);
let query_text = match query_type {
QueryType::Select => format!("SELECT * FROM users WHERE age > {} LIMIT {}", rng.gen_range(18..65), rng.gen_range(10..100)),
QueryType::Insert => format!("INSERT INTO users (name, email, age) VALUES ('User{}', 'user{}@example.com', {})",
rng.gen::<u32>(), rng.gen::<u32>(), rng.gen_range(18..65)),
QueryType::Update => format!("UPDATE users SET age = {} WHERE id = {}", rng.gen_range(18..65), rng.gen::<u32>()),
QueryType::Delete => format!("DELETE FROM users WHERE id = {}", rng.gen::<u32>()),
_ => "UNKNOWN QUERY".to_string(),
};
QueryExecution {
id: rng.gen(),
query_type,
query_text,
start_time: chrono::Utc::now(),
end_time: Some(chrono::Utc::now()),
execution_time_us: Some(execution_time_us),
result_count,
bytes_processed,
success: rng.gen_bool(0.95), error_message: None,
execution_plan: None, thread_id: rng.gen(),
client_info: Some(format!("client_{}", rng.gen::<u32>())),
}
}
fn analyze_query_types(&self, queries: &[QueryExecution]) -> HashMap<String, QueryTypeStats> {
let mut type_groups: HashMap<String, Vec<&QueryExecution>> = HashMap::new();
for query in queries {
let type_key = format!("{:?}", query.query_type);
type_groups.entry(type_key).or_insert(Vec::new()).push(query);
}
let mut breakdown = HashMap::new();
for (type_name, type_queries) in type_groups {
let count = type_queries.len();
let total_execution_time: u64 = type_queries.iter()
.filter_map(|q| q.execution_time_us)
.sum();
let average_execution_time = total_execution_time as f64 / count as f64;
let latencies: Vec<u64> = type_queries.iter()
.filter_map(|q| q.execution_time_us)
.collect();
let p95_execution_time = if !latencies.is_empty() {
let mut sorted = latencies.clone();
sorted.sort_unstable();
sorted[(sorted.len() as f64 * 0.95) as usize]
} else {
0
};
let successful_queries = type_queries.iter().filter(|q| q.success).count();
let success_rate = successful_queries as f64 / count as f64;
let total_result_count: usize = type_queries.iter().map(|q| q.result_count).sum();
let average_result_count = total_result_count as f64 / count as f64;
breakdown.insert(type_name, QueryTypeStats {
count,
total_execution_time_us: total_execution_time,
average_execution_time_us: average_execution_time,
p95_execution_time_us: p95_execution_time,
success_rate,
average_result_count,
});
}
breakdown
}
fn identify_slow_queries(&self, queries: &[QueryExecution]) -> Vec<SlowQuery> {
let mut completed_queries: Vec<&QueryExecution> = queries.iter()
.filter(|q| q.execution_time_us.is_some())
.collect();
if completed_queries.len() < 10 {
return Vec::new();
}
completed_queries.sort_by(|a, b| {
b.execution_time_us.unwrap().cmp(&a.execution_time_us.unwrap())
});
let slow_count = (completed_queries.len() / 10).max(1);
let mut slow_queries = Vec::new();
for query in completed_queries.iter().take(slow_count) {
let suggestion = self.generate_slow_query_suggestion(query);
slow_queries.push(SlowQuery {
query_id: query.id,
query_text: query.query_text.clone(),
execution_time_us: query.execution_time_us.unwrap(),
result_count: query.result_count,
execution_plan: query.execution_plan.clone(),
suggestion,
});
}
slow_queries
}
fn identify_inefficient_queries(&self, queries: &[QueryExecution]) -> Vec<InefficientQuery> {
let mut inefficient = Vec::new();
for query in queries {
if let Some(inefficiency) = self.detect_inefficiency(query) {
inefficient.push(inefficiency);
}
}
inefficient.sort_by(|a, b| b.severity.partial_cmp(&a.severity).unwrap());
inefficient.truncate(20);
inefficient
}
fn detect_inefficiency(&self, query: &QueryExecution) -> Option<InefficientQuery> {
if query.query_text.contains("SELECT *") && query.result_count > 1000 {
return Some(InefficientQuery {
query_id: query.id,
query_text: query.query_text.clone(),
inefficiency_type: InefficiencyType::LargeResultSet,
severity: 0.7,
explanation: format!("Query returns {} rows without filtering", query.result_count),
suggestion: "Add WHERE clauses to limit result set or use pagination".to_string(),
});
}
if query.execution_time_us.unwrap_or(0) > 50000 && query.query_text.contains("WHERE") == false {
return Some(InefficientQuery {
query_id: query.id,
query_text: query.query_text.clone(),
inefficiency_type: InefficiencyType::FullTableScan,
severity: 0.8,
explanation: "Query likely performs full table scan".to_string(),
suggestion: "Add appropriate indexes on frequently queried columns".to_string(),
});
}
None
}
fn analyze_query_patterns(&self, queries: &[QueryExecution]) -> Vec<QueryPattern> {
let mut patterns = Vec::new();
let point_queries: Vec<_> = queries.iter()
.filter(|q| q.query_text.contains("WHERE id ="))
.collect();
if !point_queries.is_empty() {
let avg_performance = point_queries.iter()
.filter_map(|q| q.execution_time_us)
.sum::<u64>() as f64 / point_queries.len() as f64;
patterns.push(QueryPattern {
pattern_type: "Point Query".to_string(),
frequency: point_queries.len(),
average_performance: avg_performance,
trend: PerformanceTrend::Stable,
example_query: point_queries[0].query_text.clone(),
});
}
let range_queries: Vec<_> = queries.iter()
.filter(|q| q.query_text.contains("BETWEEN") || q.query_text.contains(">"))
.collect();
if !range_queries.is_empty() {
let avg_performance = range_queries.iter()
.filter_map(|q| q.execution_time_us)
.sum::<u64>() as f64 / range_queries.len() as f64;
patterns.push(QueryPattern {
pattern_type: "Range Query".to_string(),
frequency: range_queries.len(),
average_performance: avg_performance,
trend: PerformanceTrend::Stable,
example_query: range_queries[0].query_text.clone(),
});
}
patterns
}
fn generate_index_recommendations(&self, queries: &[QueryExecution]) -> Vec<IndexRecommendation> {
let mut recommendations = Vec::new();
let slow_selects: Vec<_> = queries.iter()
.filter(|q| matches!(q.query_type, QueryType::Select))
.filter(|q| q.execution_time_us.unwrap_or(0) > 10000)
.collect();
if slow_selects.len() > queries.len() / 10 { recommendations.push(IndexRecommendation {
table_name: "users".to_string(),
column_name: "email".to_string(),
index_type: "btree".to_string(),
estimated_improvement: 0.8, queries_affected: slow_selects.len(),
});
}
recommendations
}
fn analyze_performance_trends(&self, queries: &[QueryExecution]) -> QueryPerformanceTrends {
QueryPerformanceTrends {
overall_trend: PerformanceTrend::Stable,
trend_by_query_type: HashMap::new(),
peak_load_periods: Vec::new(),
performance_degradation_points: Vec::new(),
}
}
fn generate_slow_query_suggestion(&self, query: &QueryExecution) -> String {
if query.query_text.contains("SELECT *") {
"Consider selecting only required columns instead of SELECT *".to_string()
} else if query.query_text.contains("ORDER BY") && !query.query_text.contains("LIMIT") {
"Consider adding LIMIT clause or creating appropriate indexes for ORDER BY".to_string()
} else if query.result_count > 10000 {
"Large result set detected. Consider pagination or filtering".to_string()
} else {
"Consider adding indexes on frequently queried columns".to_string()
}
}
fn generate_recommendations(
&self,
slow_queries: &[SlowQuery],
inefficient_queries: &[InefficientQuery],
index_recommendations: &[IndexRecommendation],
) -> Vec<String> {
let mut recommendations = Vec::new();
if !slow_queries.is_empty() {
recommendations.push(format!("{} slow queries identified. Review query optimization and indexing strategies.",
slow_queries.len()));
}
if !inefficient_queries.is_empty() {
recommendations.push(format!("{} inefficient queries detected. Consider query rewriting and index optimization.",
inefficient_queries.len()));
}
if !index_recommendations.is_empty() {
recommendations.push(format!("{} index recommendations generated. Implementing these could improve performance significantly.",
index_recommendations.len()));
}
if slow_queries.is_empty() && inefficient_queries.is_empty() && index_recommendations.is_empty() {
recommendations.push("Query performance appears normal. No specific recommendations.".to_string());
}
recommendations
}
}
impl QueryAnalysis {
pub fn query_performance_score(&self) -> f64 {
let slow_query_penalty = (self.slow_queries.len() as f64 * 0.1).min(0.4);
let inefficient_penalty = (self.inefficient_queries.len() as f64 * 0.05).min(0.3);
let index_opportunity_bonus = (self.index_recommendations.len() as f64 * 0.1).min(0.3);
(1.0 - slow_query_penalty - inefficient_penalty + index_opportunity_bonus).max(0.0).min(1.0)
}
pub fn most_problematic_queries(&self) -> Vec<&SlowQuery> {
self.slow_queries.iter().take(5).collect()
}
pub fn meets_performance_requirements(&self, max_slow_query_percentage: f64) -> bool {
let slow_query_percentage = self.slow_queries.len() as f64 / self.total_queries.max(1) as f64 * 100.0;
slow_query_percentage <= max_slow_query_percentage
}
}