use async_graphql::{Context, Result, Object};
use std::sync::Arc;
use tokio::sync::RwLock;
use serde::{Serialize, Deserialize};
use std::collections::HashMap;
use std::time::{Duration, Instant};
pub struct OptimizedQueryExecutor {
cache: Arc<dyn Cache>,
query_planner: Arc<dyn QueryPlanner>,
connection_pool: Arc<dyn ConnectionPool>,
metrics: Arc<RwLock<QueryMetrics>>,
}
#[derive(Default)]
struct QueryMetrics {
queries_executed: u64,
cache_hits: u64,
cache_misses: u64,
avg_execution_time: Duration,
slow_queries: Vec<SlowQuery>,
}
#[derive(Clone)]
struct SlowQuery {
query: String,
execution_time: Duration,
timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct BatchQuery {
pub query: String,
pub parameters: Option<Vec<serde_json::Value>>,
pub cache_ttl: Option<i32>,
}
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct QueryResult {
pub rows: Vec<serde_json::Value>,
pub affected_rows: u64,
pub execution_time_ms: u64,
pub cached: bool,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct QueryStatistics {
pub queries_executed: u64,
pub cache_hit_rate: f64,
pub avg_execution_time_ms: f64,
pub slow_queries: Vec<SlowQuery>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct ParsedQuery {
pub tables: Vec<String>,
pub operation: QueryOperation,
pub is_admin_operation: bool,
pub complexity_score: u32,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum QueryOperation {
Select,
Insert,
Update,
Delete,
Create,
Drop,
Alter,
}
#[Object]
impl OptimizedQueryExecutor {
async fn execute_query(
&self,
ctx: &Context<'_>,
#[graphql(desc = "SQL query to execute")] query: String,
#[graphql(desc = "Query parameters")] parameters: Option<Vec<serde_json::Value>>,
#[graphql(desc = "Cache TTL in seconds")] cache_ttl: Option<i32>
) -> Result<QueryResult> {
let user_context = ctx.data::<crate::graphql::context::GraphQLContext>()?;
self::validate_query_permissions(user_context, &query)?;
let cache_key = self::generate_cache_key(&query, ¶meters);
if let Some(cached_result) = self.cache.get(&cache_key).await {
self::update_cache_hit_metrics(&self.metrics).await;
return Ok(cached_result);
}
self::update_cache_miss_metrics(&self.metrics).await;
let optimized_plan = self.query_planner.optimize(&query, ¶meters).await?;
let start_time = Instant::now();
let result = self::execute_optimized_plan(&optimized_plan).await?;
let execution_time = start_time.elapsed();
self::update_execution_metrics(&self.metrics, execution_time, &query).await;
let ttl = Duration::from_secs(cache_ttl.unwrap_or(300) as u64);
self.cache.set(&cache_key, &result, ttl).await;
Ok(result)
}
async fn execute_batch_queries(
&self,
ctx: &Context<'_>,
#[graphql(desc = "List of queries to execute")] queries: Vec<BatchQuery>
) -> Result<Vec<QueryResult>> {
let user_context = ctx.data::<crate::graphql::context::GraphQLContext>()?;
for batch_query in &queries {
self::validate_query_permissions(user_context, &batch_query.query)?;
}
let mut read_queries = Vec::new();
let mut write_queries = Vec::new();
for (i, batch_query) in queries.into_iter().enumerate() {
if self::is_read_query(&batch_query.query) {
read_queries.push((i, batch_query));
} else {
write_queries.push((i, batch_query));
}
}
let read_futures: Vec<_> = read_queries.into_iter()
.map(|(index, batch_query)| {
let executor = self.clone();
async move {
let result = executor.execute_single_query(&batch_query.query, &batch_query.parameters).await?;
Ok((index, result))
}
})
.collect();
let read_results = futures::future::try_join_all(read_futures).await?;
let mut write_results = Vec::new();
for (index, batch_query) in write_queries {
let result = self::execute_single_query(&batch_query.query, &batch_query.parameters).await?;
write_results.push((index, result));
}
let mut combined_results = vec![QueryResult::default(); read_results.len() + write_results.len()];
for (index, result) in read_results {
combined_results[index] = result;
}
for (index, result) in write_results {
combined_results[index] = result;
}
Ok(combined_results)
}
async fn query_statistics(&self, ctx: &Context<'_>) -> Result<QueryStatistics> {
let user_context = ctx.data::<crate::graphql::context::GraphQLContext>()?;
if !user_context.has_permission("query.statistics") {
return Err(async_graphql::Error::new("Insufficient permissions for query statistics"));
}
let metrics = self.metrics.read().await;
Ok(QueryStatistics {
queries_executed: metrics.queries_executed,
cache_hit_rate: if metrics.queries_executed > 0 {
metrics.cache_hits as f64 / metrics.queries_executed as f64
} else {
0.0
},
avg_execution_time_ms: metrics.avg_execution_time.as_millis() as f64,
slow_queries: metrics.slow_queries.clone(),
})
}
async fn analyze_query_complexity(
&self,
ctx: &Context<'_>,
#[graphql(desc = "SQL query to analyze")] query: String
) -> Result<QueryComplexityAnalysis> {
let user_context = ctx.data::<crate::graphql::context::GraphQLContext>()?;
if !user_context.has_permission("query.analyze") {
return Err(async_graphql::Error::new("Insufficient permissions for query analysis"));
}
let parsed_query = self::parse_query(&query)?;
let complexity_score = self::calculate_complexity_score(&parsed_query);
let recommendations = self::generate_optimization_recommendations(&parsed_query, complexity_score);
Ok(QueryComplexityAnalysis {
query: query.clone(),
complexity_score,
estimated_execution_time_ms: self::estimate_execution_time(&parsed_query),
recommendations,
tables_accessed: parsed_query.tables,
operation_type: parsed_query.operation,
})
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct QueryComplexityAnalysis {
pub query: String,
pub complexity_score: u32,
pub estimated_execution_time_ms: u64,
pub recommendations: Vec<String>,
pub tables_accessed: Vec<String>,
pub operation_type: QueryOperation,
}
impl OptimizedQueryExecutor {
pub fn new(
cache: Arc<dyn Cache>,
query_planner: Arc<dyn QueryPlanner>,
connection_pool: Arc<dyn ConnectionPool>,
) -> Self {
Self {
cache,
query_planner,
connection_pool,
metrics: Arc::new(RwLock::new(QueryMetrics::default())),
}
}
async fn validate_query_permissions(
user_context: &crate::graphql::context::GraphQLContext,
query: &str,
) -> Result<()> {
let parsed_query = Self::parse_query(query)?;
for table in &parsed_query.tables {
if !user_context.has_table_permission(table, &parsed_query.operation) {
return Err(async_graphql::Error::new(format!("Access denied to table: {}", table)));
}
}
if parsed_query.is_admin_operation && !user_context.has_permission("admin.query") {
return Err(async_graphql::Error::new("Admin privileges required for this query"));
}
Ok(())
}
fn parse_query(query: &str) -> Result<ParsedQuery> {
let query_lower = query.to_lowercase();
let operation = if query_lower.starts_with("select") {
QueryOperation::Select
} else if query_lower.starts_with("insert") {
QueryOperation::Insert
} else if query_lower.starts_with("update") {
QueryOperation::Update
} else if query_lower.starts_with("delete") {
QueryOperation::Delete
} else if query_lower.starts_with("create") {
QueryOperation::Create
} else if query_lower.starts_with("drop") {
QueryOperation::Drop
} else if query_lower.starts_with("alter") {
QueryOperation::Alter
} else {
return Err(async_graphql::Error::new("Unsupported query operation"));
};
let tables = Self::extract_tables_from_query(query)?;
let complexity_score = Self::calculate_base_complexity(&operation, &tables);
let is_admin_operation = matches!(operation, QueryOperation::Create | QueryOperation::Drop | QueryOperation::Alter);
Ok(ParsedQuery {
tables,
operation,
is_admin_operation,
complexity_score,
})
}
fn extract_tables_from_query(query: &str) -> Result<Vec<String>> {
let mut tables = Vec::new();
let query_lower = query.to_lowercase();
let patterns = vec![
"from ",
"into ",
"update ",
"join ",
"inner join ",
"left join ",
"right join ",
];
for pattern in patterns {
if let Some(pos) = query_lower.find(pattern) {
let after_pattern = &query_lower[pos + pattern.len()..];
if let Some(table_end) = after_pattern.find(|c| c == ' ' || c == ',' || c == '(' || c == ';') {
let table_name = &after_pattern[..table_end];
if !table_name.is_empty() && !tables.contains(&table_name.to_string()) {
tables.push(table_name.to_string());
}
}
}
}
Ok(tables)
}
fn calculate_base_complexity(operation: &QueryOperation, tables: &[String]) -> u32 {
let mut score = match operation {
QueryOperation::Select => 10,
QueryOperation::Insert => 20,
QueryOperation::Update => 25,
QueryOperation::Delete => 30,
QueryOperation::Create => 50,
QueryOperation::Drop => 100,
QueryOperation::Alter => 75,
};
score += tables.len() as u32 * 5;
score
}
fn calculate_complexity_score(parsed_query: &ParsedQuery) -> u32 {
let mut score = parsed_query.complexity_score;
if parsed_query.query.to_lowercase().contains("where") {
score += 10;
}
if parsed_query.query.to_lowercase().contains("order by") {
score += 5;
}
if parsed_query.query.to_lowercase().contains("group by") {
score += 15;
}
if parsed_query.query.to_lowercase().contains("join") {
score += 20;
}
score
}
fn estimate_execution_time(parsed_query: &ParsedQuery) -> u64 {
let base_time = match parsed_query.operation {
QueryOperation::Select => 50,
QueryOperation::Insert => 100,
QueryOperation::Update => 150,
QueryOperation::Delete => 120,
QueryOperation::Create => 200,
QueryOperation::Drop => 300,
QueryOperation::Alter => 250,
};
let complexity_multiplier = parsed_query.complexity_score as f64 / 50.0;
(base_time as f64 * complexity_multiplier) as u64
}
fn generate_optimization_recommendations(
parsed_query: &ParsedQuery,
complexity_score: u32,
) -> Vec<String> {
let mut recommendations = Vec::new();
if complexity_score > 100 {
recommendations.push("Consider breaking this complex query into multiple simpler queries".to_string());
}
if parsed_query.tables.len() > 3 {
recommendations.push("Query accesses multiple tables - consider using proper indexing".to_string());
}
if parsed_query.query.to_lowercase().contains("select *") {
recommendations.push("Avoid SELECT * - specify only the columns you need".to_string());
}
if parsed_query.query.to_lowercase().contains("where") &&
!parsed_query.query.to_lowercase().contains("index") {
recommendations.push("Ensure WHERE clause columns are properly indexed".to_string());
}
if parsed_query.query.to_lowercase().contains("order by") &&
!parsed_query.query.to_lowercase().contains("limit") {
recommendations.push("Consider adding LIMIT to ORDER BY queries for better performance".to_string());
}
recommendations
}
fn generate_cache_key(query: &str, parameters: &Option<Vec<serde_json::Value>>) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
query.hash(&mut hasher);
if let Some(params) = parameters {
for param in params {
param.to_string().hash(&mut hasher);
}
}
format!("query_{:x}", hasher.finish())
}
async fn update_cache_hit_metrics(metrics: &Arc<RwLock<QueryMetrics>>) {
let mut metrics = metrics.write().await;
metrics.cache_hits += 1;
}
async fn update_cache_miss_metrics(metrics: &Arc<RwLock<QueryMetrics>>) {
let mut metrics = metrics.write().await;
metrics.cache_misses += 1;
}
async fn update_execution_metrics(
metrics: &Arc<RwLock<QueryMetrics>>,
execution_time: Duration,
query: &str,
) {
let mut metrics = metrics.write().await;
metrics.queries_executed += 1;
let total_time = metrics.avg_execution_time * (metrics.queries_executed - 1) as u32 + execution_time;
metrics.avg_execution_time = total_time / metrics.queries_executed as u32;
if execution_time > Duration::from_secs(1) {
metrics.slow_queries.push(SlowQuery {
query: query.to_string(),
execution_time,
timestamp: chrono::Utc::now(),
});
if metrics.slow_queries.len() > 100 {
metrics.slow_queries.remove(0);
}
}
}
fn is_read_query(query: &str) -> bool {
let query_lower = query.to_lowercase().trim();
query_lower.starts_with("select") || query_lower.starts_with("with")
}
async fn execute_single_query(&self, query: &str, parameters: &Option<Vec<serde_json::Value>>) -> Result<QueryResult> {
let start_time = Instant::now();
tokio::time::sleep(Duration::from_millis(50)).await;
let execution_time = start_time.elapsed();
Ok(QueryResult {
rows: vec![],
affected_rows: 0,
execution_time_ms: execution_time.as_millis() as u64,
cached: false,
})
}
async fn execute_optimized_plan(&self, plan: &str) -> Result<QueryResult> {
let start_time = Instant::now();
tokio::time::sleep(Duration::from_millis(25)).await;
let execution_time = start_time.elapsed();
Ok(QueryResult {
rows: vec![],
affected_rows: 0,
execution_time_ms: execution_time.as_millis() as u64,
cached: false,
})
}
}
impl Clone for OptimizedQueryExecutor {
fn clone(&self) -> Self {
Self {
cache: self.cache.clone(),
query_planner: self.query_planner.clone(),
connection_pool: self.connection_pool.clone(),
metrics: self.metrics.clone(),
}
}
}
#[async_trait::async_trait]
pub trait Cache: Send + Sync {
async fn get(&self, key: &str) -> Option<QueryResult>;
async fn set(&self, key: &str, value: &QueryResult, ttl: Duration);
}
#[async_trait::async_trait]
pub trait QueryPlanner: Send + Sync {
async fn optimize(&self, query: &str, parameters: &Option<Vec<serde_json::Value>>) -> Result<String>;
}
#[async_trait::async_trait]
pub trait ConnectionPool: Send + Sync {
async fn execute(&self, query: &str) -> Result<QueryResult>;
}
pub struct MockCache {
cache: Arc<RwLock<HashMap<String, (QueryResult, Instant)>>>,
}
impl MockCache {
pub fn new() -> Self {
Self {
cache: Arc::new(RwLock::new(HashMap::new())),
}
}
}
#[async_trait::async_trait]
impl Cache for MockCache {
async fn get(&self, key: &str) -> Option<QueryResult> {
let cache = self.cache.read().await;
if let Some((result, _timestamp)) = cache.get(key) {
Some(result.clone())
} else {
None
}
}
async fn set(&self, key: &str, value: &QueryResult, _ttl: Duration) {
let mut cache = self.cache.write().await;
cache.insert(key.to_string(), (value.clone(), Instant::now()));
}
}
pub struct MockQueryPlanner;
#[async_trait::async_trait]
impl QueryPlanner for MockQueryPlanner {
async fn optimize(&self, query: &str, _parameters: &Option<Vec<serde_json::Value>>) -> Result<String> {
let query_lower = query.to_lowercase();
if query_lower.starts_with("select") && !query_lower.contains("limit") {
Ok(format!("{} LIMIT 1000", query))
} else {
Ok(query.to_string())
}
}
}
pub struct MockConnectionPool;
#[async_trait::async_trait]
impl ConnectionPool for MockConnectionPool {
async fn execute(&self, query: &str) -> Result<QueryResult> {
let start_time = Instant::now();
tokio::time::sleep(Duration::from_millis(30)).await;
let execution_time = start_time.elapsed();
Ok(QueryResult {
rows: vec![],
affected_rows: 0,
execution_time_ms: execution_time.as_millis() as u64,
cached: false,
})
}
}