use anyhow::Result;
use colored::Colorize;
use cqlite_core::{
query::{
QueryEngine, QueryResult, QueryType, ParsedQuery, QueryParser,
select_parser::{SelectParser, parse_select},
select_executor::SelectExecutor,
WhereClause, Condition, ComparisonOperator,
},
schema::SchemaManager,
storage::StorageEngine,
Database, TableId, Value,
};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug)]
pub struct CQLQueryProcessor {
database: Arc<Database>,
query_engine: Arc<QueryEngine>,
schema_manager: Arc<SchemaManager>,
storage_engine: Arc<StorageEngine>,
parser: QueryParser,
select_parser: SelectParser,
select_executor: SelectExecutor,
performance_tracking: bool,
query_cache: HashMap<String, CachedQuery>,
}
#[derive(Debug, Clone)]
struct CachedQuery {
parsed_query: ParsedQuery,
execution_plan: ExecutionPlan,
last_used: Instant,
usage_count: u32,
}
#[derive(Debug, Clone)]
struct ExecutionPlan {
query_type: QueryType,
target_tables: Vec<TableId>,
estimated_cost: f64,
optimization_hints: Vec<String>,
}
#[derive(Debug)]
pub struct QueryExecutionContext {
pub current_keyspace: Option<String>,
pub page_size: usize,
pub timing_enabled: bool,
pub query_history: Vec<String>,
pub interactive_mode: bool,
}
#[derive(Debug)]
pub struct REPLQueryResult {
pub result: QueryResult,
pub execution_time_ms: f64,
pub performance_metrics: QueryPerformanceMetrics,
pub warnings: Vec<String>,
pub optimization_hints: Vec<String>,
}
#[derive(Debug, Default)]
pub struct QueryPerformanceMetrics {
pub parse_time_us: u64,
pub planning_time_us: u64,
pub execution_time_us: u64,
pub memory_usage_bytes: u64,
pub sstables_scanned: u32,
pub rows_examined: u64,
pub rows_returned: u64,
pub cache_hits: u32,
pub cache_misses: u32,
}
impl CQLQueryProcessor {
pub fn new(
database: Arc<Database>,
query_engine: Arc<QueryEngine>,
schema_manager: Arc<SchemaManager>,
storage_engine: Arc<StorageEngine>,
) -> Result<Self> {
let config = cqlite_core::Config::default();
let parser = QueryParser::new(&config);
let select_parser = SelectParser::new(&config);
let select_executor = SelectExecutor::new(
storage_engine.clone(),
schema_manager.clone(),
)?;
Ok(Self {
database,
query_engine,
schema_manager,
storage_engine,
parser,
select_parser,
select_executor,
performance_tracking: true,
query_cache: HashMap::new(),
})
}
pub async fn execute_query(
&mut self,
query: &str,
context: &QueryExecutionContext,
) -> Result<REPLQueryResult> {
let start_time = Instant::now();
let mut performance_metrics = QueryPerformanceMetrics::default();
let mut warnings = Vec::new();
let mut optimization_hints = Vec::new();
if let Some(cached) = self.get_cached_query(query) {
optimization_hints.push("Query found in cache".to_string());
cached.usage_count += 1;
}
let parse_start = Instant::now();
let parsed_query = match self.parse_query_with_context(query, context).await {
Ok(pq) => pq,
Err(e) => {
return Ok(REPLQueryResult {
result: QueryResult::error(format!("Parse error: {}", e)),
execution_time_ms: start_time.elapsed().as_millis() as f64,
performance_metrics,
warnings,
optimization_hints,
});
}
};
performance_metrics.parse_time_us = parse_start.elapsed().as_micros() as u64;
let planning_start = Instant::now();
let execution_plan = self.create_execution_plan(&parsed_query, context).await?;
performance_metrics.planning_time_us = planning_start.elapsed().as_micros() as u64;
let execution_start = Instant::now();
let result = match parsed_query.query_type {
QueryType::Select => {
self.execute_select_query(&parsed_query, context, &mut performance_metrics).await?
}
QueryType::Describe => {
self.execute_describe_query(&parsed_query, context).await?
}
QueryType::Use => {
self.execute_use_query(&parsed_query, context).await?
}
_ => {
self.query_engine.execute(query).await?
}
};
performance_metrics.execution_time_us = execution_start.elapsed().as_micros() as u64;
self.analyze_performance(&performance_metrics, &mut warnings, &mut optimization_hints);
self.cache_query(query.to_string(), parsed_query, execution_plan);
Ok(REPLQueryResult {
result,
execution_time_ms: start_time.elapsed().as_millis() as f64,
performance_metrics,
warnings,
optimization_hints,
})
}
async fn parse_query_with_context(
&self,
query: &str,
context: &QueryExecutionContext,
) -> Result<ParsedQuery> {
let mut parsed = self.parser.parse(query)?;
if let Some(table) = &mut parsed.table {
if !table.name().contains('.') && context.current_keyspace.is_some() {
let qualified_name = format!(
"{}.{}",
context.current_keyspace.as_ref().unwrap(),
table.name()
);
*table = TableId::new(&qualified_name);
}
}
if context.interactive_mode && parsed.query_type == QueryType::Select && parsed.limit.is_none() {
parsed.limit = Some(context.page_size);
}
Ok(parsed)
}
async fn execute_select_query(
&mut self,
parsed_query: &ParsedQuery,
context: &QueryExecutionContext,
metrics: &mut QueryPerformanceMetrics,
) -> Result<QueryResult> {
if self.is_complex_select(parsed_query) {
let select_ast = parse_select(&parsed_query.sql)?;
return self.select_executor.execute_select(&select_ast, metrics).await;
}
let table_id = parsed_query.table.as_ref()
.ok_or_else(|| anyhow::anyhow!("No table specified in SELECT query"))?;
let mut results = Vec::new();
let limit = parsed_query.limit.unwrap_or(1000);
if let Some(where_clause) = &parsed_query.where_clause {
results = self.execute_filtered_scan(table_id, where_clause, limit, metrics).await?;
} else {
results = self.execute_full_scan(table_id, limit, metrics).await?;
}
if !parsed_query.order_by.is_empty() {
self.apply_ordering(&mut results, &parsed_query.order_by)?;
}
Ok(self.build_query_result(results, &parsed_query.columns))
}
async fn execute_describe_query(
&self,
parsed_query: &ParsedQuery,
_context: &QueryExecutionContext,
) -> Result<QueryResult> {
let table_id = parsed_query.table.as_ref()
.ok_or_else(|| anyhow::anyhow!("No table specified in DESCRIBE query"))?;
let schema_info = self.get_table_schema_info(table_id).await?;
let mut rows = Vec::new();
for column in schema_info.columns {
let mut row_data = HashMap::new();
row_data.insert("column_name".to_string(), Value::Text(column.name));
row_data.insert("type".to_string(), Value::Text(column.data_type));
row_data.insert("kind".to_string(), Value::Text(column.kind));
rows.push(cqlite_core::query::result::QueryRow::from_map(row_data));
}
Ok(QueryResult {
rows,
rows_affected: 0,
metadata: Default::default(),
})
}
async fn execute_use_query(
&self,
parsed_query: &ParsedQuery,
_context: &QueryExecutionContext,
) -> Result<QueryResult> {
let keyspace_name = parsed_query.table.as_ref()
.ok_or_else(|| anyhow::anyhow!("No keyspace specified in USE query"))?
.name();
if self.validate_keyspace_exists(keyspace_name).await? {
Ok(QueryResult {
rows: vec![],
rows_affected: 0,
metadata: Default::default(),
})
} else {
Err(anyhow::anyhow!("Keyspace '{}' does not exist", keyspace_name))
}
}
async fn execute_filtered_scan(
&self,
table_id: &TableId,
where_clause: &WhereClause,
limit: usize,
metrics: &mut QueryPerformanceMetrics,
) -> Result<Vec<(cqlite_core::RowKey, Value)>> {
let mut results = Vec::new();
let mut rows_examined = 0;
let (start_key, end_key) = self.extract_key_range(where_clause)?;
let scan_results = self.storage_engine
.scan(table_id, start_key.as_ref(), end_key.as_ref(), Some(limit * 2), None)
.await?;
metrics.sstables_scanned += 1;
for (key, value) in scan_results {
rows_examined += 1;
if self.evaluate_where_clause(&key, &value, where_clause)? {
results.push((key, value));
if results.len() >= limit {
break;
}
}
}
metrics.rows_examined = rows_examined;
metrics.rows_returned = results.len() as u64;
Ok(results)
}
async fn execute_full_scan(
&self,
table_id: &TableId,
limit: usize,
metrics: &mut QueryPerformanceMetrics,
) -> Result<Vec<(cqlite_core::RowKey, Value)>> {
let results = self.storage_engine
.scan(table_id, None, None, Some(limit), None)
.await?;
metrics.sstables_scanned += 1;
metrics.rows_examined = results.len() as u64;
metrics.rows_returned = results.len() as u64;
Ok(results)
}
fn extract_key_range(
&self,
where_clause: &WhereClause,
) -> Result<(Option<cqlite_core::RowKey>, Option<cqlite_core::RowKey>)> {
let mut start_key = None;
let mut end_key = None;
for condition in &where_clause.conditions {
if let Value::Text(key_str) = &condition.value {
let key = cqlite_core::RowKey::from(key_str.as_str());
match condition.operator {
ComparisonOperator::Equal => {
start_key = Some(key.clone());
end_key = Some(key);
break;
}
ComparisonOperator::GreaterThan | ComparisonOperator::GreaterThanOrEqual => {
start_key = Some(key);
}
ComparisonOperator::LessThan | ComparisonOperator::LessThanOrEqual => {
end_key = Some(key);
}
_ => {} }
}
}
Ok((start_key, end_key))
}
fn evaluate_where_clause(
&self,
_key: &cqlite_core::RowKey,
value: &Value,
where_clause: &WhereClause,
) -> Result<bool> {
for condition in &where_clause.conditions {
match condition.operator {
ComparisonOperator::Equal => {
if !self.values_equal(value, &condition.value) {
return Ok(false);
}
}
_ => {
continue;
}
}
}
Ok(true)
}
fn apply_ordering(
&self,
results: &mut Vec<(cqlite_core::RowKey, Value)>,
_order_by: &[cqlite_core::query::OrderByClause],
) -> Result<()> {
results.sort_by(|a, b| a.0.cmp(&b.0));
Ok(())
}
fn build_query_result(
&self,
results: Vec<(cqlite_core::RowKey, Value)>,
selected_columns: &[String],
) -> QueryResult {
let mut rows = Vec::new();
for (_key, value) in results {
if selected_columns.contains(&"*".to_string()) {
let row = self.value_to_query_row(&value);
rows.push(row);
} else {
let row = self.value_to_query_row_filtered(&value, selected_columns);
rows.push(row);
}
}
QueryResult {
rows,
rows_affected: 0,
metadata: Default::default(),
}
}
fn value_to_query_row(&self, value: &Value) -> cqlite_core::query::result::QueryRow {
let mut row_data = HashMap::new();
row_data.insert("value".to_string(), value.clone());
cqlite_core::query::result::QueryRow::from_map(row_data)
}
fn value_to_query_row_filtered(
&self,
value: &Value,
_selected_columns: &[String],
) -> cqlite_core::query::result::QueryRow {
self.value_to_query_row(value)
}
async fn get_table_schema_info(&self, table_id: &TableId) -> Result<TableSchemaInfo> {
let query = format!(
"SELECT column_name, type, kind FROM system.columns WHERE keyspace_name = '{}' AND table_name = '{}' ORDER BY position",
table_id.keyspace().unwrap_or("unknown"),
table_id.name()
);
let result = self.database.execute(&query).await?;
let mut columns = Vec::new();
for row in result.rows {
if let (Some(name), Some(data_type), Some(kind)) = (
row.get("column_name"),
row.get("type"),
row.get("kind"),
) {
columns.push(ColumnInfo {
name: name.to_string(),
data_type: data_type.to_string(),
kind: kind.to_string(),
});
}
}
Ok(TableSchemaInfo { columns })
}
async fn validate_keyspace_exists(&self, keyspace_name: &str) -> Result<bool> {
let query = format!(
"SELECT keyspace_name FROM system.keyspaces WHERE keyspace_name = '{}'",
keyspace_name
);
let result = self.database.execute(&query).await?;
Ok(!result.rows.is_empty())
}
async fn create_execution_plan(
&self,
parsed_query: &ParsedQuery,
_context: &QueryExecutionContext,
) -> Result<ExecutionPlan> {
let mut target_tables = Vec::new();
let mut estimated_cost = 1.0;
let mut optimization_hints = Vec::new();
if let Some(table) = &parsed_query.table {
target_tables.push(table.clone());
}
match parsed_query.query_type {
QueryType::Select => {
if parsed_query.where_clause.is_some() {
estimated_cost = 5.0;
optimization_hints.push("Consider adding indexes for WHERE clauses".to_string());
} else {
estimated_cost = 10.0;
optimization_hints.push("Full table scan - consider adding LIMIT".to_string());
}
}
QueryType::Describe => {
estimated_cost = 1.0;
}
_ => {
estimated_cost = 3.0;
}
}
Ok(ExecutionPlan {
query_type: parsed_query.query_type.clone(),
target_tables,
estimated_cost,
optimization_hints,
})
}
fn analyze_performance(
&self,
metrics: &QueryPerformanceMetrics,
warnings: &mut Vec<String>,
optimization_hints: &mut Vec<String>,
) {
if metrics.execution_time_us > 1_000_000 {
warnings.push("Query took longer than 1 second".to_string());
optimization_hints.push("Consider adding WHERE clause or LIMIT".to_string());
}
if metrics.rows_examined > metrics.rows_returned * 10 {
warnings.push("High row examination ratio".to_string());
optimization_hints.push("Query is examining many more rows than returned".to_string());
}
if metrics.memory_usage_bytes > 50_000_000 {
warnings.push("High memory usage".to_string());
optimization_hints.push("Consider using pagination or smaller result sets".to_string());
}
let total_cache_ops = metrics.cache_hits + metrics.cache_misses;
if total_cache_ops > 0 {
let hit_ratio = metrics.cache_hits as f64 / total_cache_ops as f64;
if hit_ratio < 0.5 {
optimization_hints.push("Low cache hit ratio - consider query optimization".to_string());
}
}
}
fn is_complex_select(&self, parsed_query: &ParsedQuery) -> bool {
parsed_query.sql.to_uppercase().contains("JOIN") ||
parsed_query.sql.to_uppercase().contains("GROUP BY") ||
parsed_query.sql.to_uppercase().contains("HAVING") ||
parsed_query.sql.to_uppercase().contains("UNION")
}
fn values_equal(&self, a: &Value, b: &Value) -> bool {
match (a, b) {
(Value::Text(a_str), Value::Text(b_str)) => a_str == b_str,
(Value::Integer(a_int), Value::Integer(b_int)) => a_int == b_int,
(Value::Float(a_float), Value::Float(b_float)) => (a_float - b_float).abs() < f64::EPSILON,
(Value::Boolean(a_bool), Value::Boolean(b_bool)) => a_bool == b_bool,
(Value::Null, Value::Null) => true,
_ => false,
}
}
fn get_cached_query(&self, query: &str) -> Option<&CachedQuery> {
self.query_cache.get(query)
}
fn cache_query(&mut self, query: String, parsed_query: ParsedQuery, execution_plan: ExecutionPlan) {
let cached_query = CachedQuery {
parsed_query,
execution_plan,
last_used: Instant::now(),
usage_count: 1,
};
self.query_cache.insert(query, cached_query);
if self.query_cache.len() > 100 {
self.evict_old_cache_entries();
}
}
fn evict_old_cache_entries(&mut self) {
let cutoff = Instant::now() - std::time::Duration::from_secs(300); self.query_cache.retain(|_, cached| cached.last_used > cutoff);
}
}
#[derive(Debug)]
struct TableSchemaInfo {
columns: Vec<ColumnInfo>,
}
#[derive(Debug)]
struct ColumnInfo {
name: String,
data_type: String,
kind: String,
}
impl QueryResult {
fn error(message: String) -> Self {
Self {
rows: vec![],
rows_affected: 0,
metadata: cqlite_core::query::result::QueryMetadata {
columns: vec![
cqlite_core::query::result::ColumnInfo {
name: "error".to_string(),
data_type: "text".to_string(),
nullable: false,
}
],
performance: cqlite_core::query::result::PerformanceMetrics::default(),
row_metadata: cqlite_core::query::result::RowMetadata {
total_rows: 0,
has_more: false,
},
warnings: vec![message],
},
}
}
}
pub struct QueryCompletion {
processor: Arc<CQLQueryProcessor>,
}
impl QueryCompletion {
pub fn new(processor: Arc<CQLQueryProcessor>) -> Self {
Self { processor }
}
pub async fn complete_query(&self, partial_query: &str, cursor_position: usize) -> Vec<String> {
let mut suggestions = Vec::new();
let partial = partial_query[..cursor_position].to_uppercase();
if partial.ends_with("SELECT ") {
suggestions.extend(vec!["*".to_string(), "COUNT(*)".to_string()]);
} else if partial.ends_with("FROM ") {
suggestions.push("table_name".to_string());
} else if partial.ends_with("WHERE ") {
suggestions.push("column_name".to_string());
}
let keywords = vec![
"SELECT", "FROM", "WHERE", "ORDER BY", "LIMIT", "INSERT", "UPDATE",
"DELETE", "CREATE", "DROP", "DESCRIBE", "USE", "AND", "OR", "ASC", "DESC"
];
for keyword in keywords {
if keyword.starts_with(&partial.split_whitespace().last().unwrap_or("")) {
suggestions.push(keyword.to_string());
}
}
suggestions
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_query_processor_creation() {
}
#[tokio::test]
async fn test_select_query_execution() {
}
#[tokio::test]
async fn test_describe_query_execution() {
}
#[tokio::test]
async fn test_query_performance_tracking() {
}
}