use super::{
executor::{QueryExecutor, QueryResult},
parser::QueryParser,
planner::QueryPlanner,
prepared::PreparedQuery,
result::{QueryResultIterator, StreamingConfig},
QueryStats,
};
#[cfg(feature = "state_machine")]
use super::{select_executor::SelectExecutor, select_optimizer::SelectOptimizer, select_parser};
use crate::{
memory::MemoryManager, schema::SchemaManager, storage::StorageEngine, Config, Error, Result,
Value,
};
use dashmap::DashMap;
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct QueryCacheEntry {
pub parsed_query: super::ParsedQuery,
pub plan: super::planner::QueryPlan,
pub cached_at: Instant,
pub hit_count: u64,
}
#[derive(Debug, Clone)]
pub enum SchemaStatus {
Available { keyspace: String, table: String },
Missing { table: String, reason: String },
ExtractionFailed {
table: String,
cause: String,
suggestion: String,
},
}
#[derive(Debug)]
pub struct QueryEngine {
parser: QueryParser,
planner: QueryPlanner,
executor: QueryExecutor,
schema_manager: Arc<SchemaManager>,
#[cfg(feature = "state_machine")]
select_optimizer: SelectOptimizer,
#[cfg(feature = "state_machine")]
select_executor: SelectExecutor,
prepared_cache: DashMap<String, Arc<PreparedQuery>>,
plan_cache: DashMap<String, QueryCacheEntry>,
stats: Arc<parking_lot::RwLock<QueryStats>>,
config: Config,
}
impl QueryEngine {
pub fn new(
storage: Arc<StorageEngine>,
schema: Arc<SchemaManager>,
_memory: Arc<MemoryManager>,
config: &Config,
) -> Result<Self> {
let parser = QueryParser::new(config);
let planner = QueryPlanner::new(schema.clone(), config);
let executor = QueryExecutor::new(storage.clone(), schema.clone(), config);
#[cfg(feature = "state_machine")]
let select_optimizer = SelectOptimizer::new(schema.clone(), storage.clone());
#[cfg(feature = "state_machine")]
let select_executor = SelectExecutor::new(schema.clone(), storage);
Ok(Self {
parser,
planner,
executor,
schema_manager: schema,
#[cfg(feature = "state_machine")]
select_optimizer,
#[cfg(feature = "state_machine")]
select_executor,
prepared_cache: DashMap::new(),
plan_cache: DashMap::new(),
stats: Arc::new(parking_lot::RwLock::new(QueryStats::default())),
config: config.clone(),
})
}
fn inc_total_queries(&self) {
self.stats.write().total_queries += 1;
}
fn inc_error_queries(&self) {
self.stats.write().error_queries += 1;
}
fn record_cache_hit(&self) {
let mut stats = self.stats.write();
let total = stats.total_queries as f64;
stats.cache_hit_ratio = (stats.cache_hit_ratio * (total - 1.0) + 1.0) / total;
}
pub async fn execute(&self, cql: &str) -> Result<QueryResult> {
let start_time = Instant::now();
self.inc_total_queries();
let trimmed_cql = cql.trim().to_uppercase();
let is_simple_id_lookup = cql.contains("WHERE id =") && cql.split_whitespace().count() <= 8;
if trimmed_cql.starts_with("SELECT") && !is_simple_id_lookup {
return self.execute_select_query(cql, start_time).await;
}
#[cfg(debug_assertions)]
if trimmed_cql.starts_with("SELECT") && is_simple_id_lookup {
log::debug!(
"Routing simple SELECT through normal executor for consistent key handling"
);
}
if let Some(mut cached_entry) = self.plan_cache.get_mut(cql) {
self.record_cache_hit();
cached_entry.hit_count += 1;
let mut result = self.executor.execute(&cached_entry.plan).await?;
self.update_execution_stats(&mut result, start_time);
return Ok(result);
}
let parsed_query = self
.parser
.parse(cql)
.inspect_err(|_| self.inc_error_queries())?;
let plan = self.planner.plan(&parsed_query).await?;
if self.config.query.query_cache_size.unwrap_or(0) > 0 {
self.cache_query_plan(cql, parsed_query, plan.clone());
}
let mut result = self.executor.execute(&plan).await?;
self.update_execution_stats(&mut result, start_time);
Ok(result)
}
#[cfg(feature = "state_machine")]
pub async fn execute_streaming(
&self,
cql: &str,
config: StreamingConfig,
) -> Result<QueryResultIterator> {
self.inc_total_queries();
if !cql.trim().to_uppercase().starts_with("SELECT") {
return Err(Error::query_execution(
"Streaming execution only supports SELECT queries",
));
}
let select_statement =
select_parser::parse_select(cql).inspect_err(|_| self.inc_error_queries())?;
let optimized_plan = self.select_optimizer.optimize(select_statement).await?;
self.select_executor
.execute_streaming(optimized_plan, config)
.await
}
async fn execute_select_query(&self, cql: &str, start_time: Instant) -> Result<QueryResult> {
if let Some(mut cached_entry) = self.plan_cache.get_mut(cql) {
if cached_entry.plan.table.is_some() {
self.record_cache_hit();
cached_entry.hit_count += 1;
let mut result = self.executor.execute(&cached_entry.plan).await?;
self.update_execution_stats(&mut result, start_time);
return Ok(result);
}
drop(cached_entry);
self.plan_cache.remove(cql);
}
#[cfg(not(feature = "state_machine"))]
return Err(Error::query_execution(
"Advanced SELECT parsing requires state_machine feature",
));
#[cfg(feature = "state_machine")]
{
let select_statement =
select_parser::parse_select(cql).inspect_err(|_| self.inc_error_queries())?;
let optimized_plan = self.select_optimizer.optimize(select_statement).await?;
let mut result = self.select_executor.execute(optimized_plan).await?;
self.update_execution_stats(&mut result, start_time);
Ok(result)
}
}
pub async fn execute_with_params(&self, cql: &str, _params: &[Value]) -> Result<QueryResult> {
self.execute(cql).await
}
pub async fn prepare(&self, cql: &str) -> Result<Arc<PreparedQuery>> {
if let Some(cached) = self.prepared_cache.get(cql) {
return Ok(cached.clone());
}
let parsed_query = self.parser.parse(cql)?;
let plan = self.planner.plan(&parsed_query).await?;
let prepared = Arc::new(PreparedQuery::new(
parsed_query,
plan,
Arc::new(self.executor.clone()),
));
self.prepared_cache
.insert(cql.to_string(), prepared.clone());
Ok(prepared)
}
pub async fn execute_prepared(
&self,
prepared: &PreparedQuery,
params: &[Value],
) -> Result<QueryResult> {
let start_time = Instant::now();
self.inc_total_queries();
let mut result = prepared.execute(params).await?;
self.update_execution_stats(&mut result, start_time);
Ok(result)
}
pub fn stats(&self) -> QueryStats {
self.stats.read().clone()
}
pub fn clear_caches(&self) {
self.prepared_cache.clear();
self.plan_cache.clear();
}
pub fn clear_prepared_cache(&self) {
self.prepared_cache.clear();
}
pub fn clear_plan_cache(&self) {
self.plan_cache.clear();
}
pub fn cache_stats(&self) -> CacheStats {
CacheStats {
prepared_cache_size: self.prepared_cache.len(),
plan_cache_size: self.plan_cache.len(),
prepared_cache_hits: self.prepared_cache.len() as u64,
plan_cache_hits: self.plan_cache.len() as u64,
}
}
pub async fn explain(&self, cql: &str) -> Result<ExplainResult> {
let parsed_query = self.parser.parse(cql)?;
let plan = self.planner.plan(&parsed_query).await?;
Ok(ExplainResult {
query_type: format!("{:?}", parsed_query.query_type),
plan_type: format!("{:?}", plan.plan_type),
estimated_cost: plan.estimated_cost,
estimated_rows: plan.estimated_rows,
selected_indexes: plan
.selected_indexes
.iter()
.map(|idx| format!("{} ({:?})", idx.index_name, idx.index_type))
.collect(),
execution_steps: plan
.steps
.iter()
.map(|step| {
format!(
"{:?}: {} (cost: {:.2})",
step.step_type,
step.columns.join(", "),
step.cost
)
})
.collect(),
parallelization_info: plan
.steps
.iter()
.filter(|step| step.parallelization.can_parallelize)
.map(|step| {
format!(
"Threads: {}, Partition: {:?}",
step.parallelization.suggested_threads, step.parallelization.partition_key
)
})
.collect(),
})
}
pub async fn analyze(&self, cql: &str) -> Result<AnalyzeResult> {
let start_time = Instant::now();
let mut execution_times = Vec::new();
let mut results = Vec::new();
for _ in 0..self.config.query.analyze_iterations.unwrap_or(5) {
let iter_start = Instant::now();
let result = self.execute(cql).await?;
execution_times.push(iter_start.elapsed());
results.push(result);
}
let total_time = start_time.elapsed();
let avg_time =
execution_times.iter().sum::<std::time::Duration>() / execution_times.len() as u32;
let no_times = || Error::query_execution("No execution times recorded for analysis");
let min_time = execution_times.iter().min().ok_or_else(no_times)?;
let max_time = execution_times.iter().max().ok_or_else(no_times)?;
let variance = execution_times
.iter()
.map(|time| {
let diff = time.as_nanos() as f64 - avg_time.as_nanos() as f64;
diff * diff
})
.sum::<f64>()
/ execution_times.len() as f64;
let std_dev = variance.sqrt();
Ok(AnalyzeResult {
iterations: execution_times.len(),
total_time_ms: total_time.as_millis() as u64,
avg_time_ms: avg_time.as_millis() as u64,
min_time_ms: min_time.as_millis() as u64,
max_time_ms: max_time.as_millis() as u64,
std_dev_ms: (std_dev / 1_000_000.0) as u64, avg_rows_returned: results.iter().map(|r| r.rows.len()).sum::<usize>() / results.len(),
cache_hit_ratio: self.stats().cache_hit_ratio,
})
}
fn cache_query_plan(
&self,
cql: &str,
parsed_query: super::ParsedQuery,
plan: super::planner::QueryPlan,
) {
let cache_size = self.config.query.query_cache_size.unwrap_or(0);
if cache_size == 0 {
return;
}
if self.plan_cache.len() >= cache_size {
let oldest_key = self
.plan_cache
.iter()
.min_by_key(|entry| entry.cached_at)
.map(|entry| entry.key().clone());
if let Some(key) = oldest_key {
self.plan_cache.remove(&key);
}
}
self.plan_cache.insert(
cql.to_string(),
QueryCacheEntry {
parsed_query,
plan,
cached_at: Instant::now(),
hit_count: 0,
},
);
}
pub async fn has_schema_for_table(&self, table: &str) -> bool {
self.schema_manager.get_table_schema(table).await.is_ok()
}
pub async fn schema_status(&self, table: &str) -> SchemaStatus {
match self.schema_manager.get_table_schema(table).await {
Ok(schema) => SchemaStatus::Available {
keyspace: schema.keyspace.clone(),
table: schema.table.clone(),
},
Err(Error::Schema(msg)) if msg.contains("not found") => {
SchemaStatus::Missing {
table: table.to_string(),
reason: msg,
}
}
Err(e) => SchemaStatus::ExtractionFailed {
table: table.to_string(),
cause: e.to_string(),
suggestion: "Verify SSTable files are valid Cassandra 5.0 format and Statistics.db contains SerializationHeader".to_string(),
},
}
}
fn update_execution_stats(&self, result: &mut QueryResult, start_time: Instant) {
let execution_time = start_time.elapsed();
result.execution_time_ms = if execution_time.is_zero() {
0
} else {
std::cmp::max(1, execution_time.as_millis() as u64)
};
let new_time_us = execution_time.as_micros() as u64;
let mut stats = self.stats.write();
stats.avg_execution_time_us = if stats.total_queries <= 1 {
new_time_us
} else {
((stats.avg_execution_time_us * (stats.total_queries - 1)) + new_time_us)
/ stats.total_queries
};
stats.rows_affected += result.rows_affected;
}
}
#[derive(Debug, Clone)]
pub struct CacheStats {
pub prepared_cache_size: usize,
pub plan_cache_size: usize,
pub prepared_cache_hits: u64,
pub plan_cache_hits: u64,
}
#[derive(Debug, Clone)]
pub struct ExplainResult {
pub query_type: String,
pub plan_type: String,
pub estimated_cost: f64,
pub estimated_rows: u64,
pub selected_indexes: Vec<String>,
pub execution_steps: Vec<String>,
pub parallelization_info: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct AnalyzeResult {
pub iterations: usize,
pub total_time_ms: u64,
pub avg_time_ms: u64,
pub min_time_ms: u64,
pub max_time_ms: u64,
pub std_dev_ms: u64,
pub avg_rows_returned: usize,
pub cache_hit_ratio: f64,
}
#[cfg(all(test, feature = "state_machine"))]
mod tests {
use super::*;
use crate::Config;
use std::sync::Arc;
use tempfile::TempDir;
#[tokio::test]
async fn test_query_engine_creation() {
let temp_dir = TempDir::new().unwrap();
let config = Config::default();
let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
let storage = Arc::new(
crate::storage::StorageEngine::open(
temp_dir.path(),
&config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.unwrap(),
);
let schema = Arc::new(
crate::schema::SchemaManager::new(temp_dir.path())
.await
.unwrap(),
);
let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
assert_eq!(query_engine.stats().total_queries, 0);
assert_eq!(query_engine.cache_stats().prepared_cache_size, 0);
assert_eq!(query_engine.cache_stats().plan_cache_size, 0);
}
#[tokio::test]
#[ignore = "Hangs >60s; needs investigation - gated for M1"]
async fn test_query_caching() {
let temp_dir = TempDir::new().unwrap();
let mut config = Config::test_config();
config.query.query_cache_size = Some(10);
let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
let storage = Arc::new(
crate::storage::StorageEngine::open(
temp_dir.path(),
&config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.unwrap(),
);
let schema = Arc::new(
crate::schema::SchemaManager::new(temp_dir.path())
.await
.unwrap(),
);
let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
let cql = "SELECT * FROM users WHERE id = 1";
let _ = query_engine.execute(cql).await;
let _ = query_engine.execute(cql).await;
assert_eq!(query_engine.cache_stats().plan_cache_size, 1);
let stats = query_engine.stats();
assert!(stats.cache_hit_ratio > 0.0);
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_prepared_statements() {
let temp_dir = TempDir::new().unwrap();
let config = Config::default();
let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
let storage = Arc::new(
crate::storage::StorageEngine::open(
temp_dir.path(),
&config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.unwrap(),
);
let schema = Arc::new(
crate::schema::SchemaManager::new(temp_dir.path())
.await
.unwrap(),
);
let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
let cql = "SELECT * FROM users WHERE id = ?";
let prepared = query_engine.prepare(cql).await.unwrap();
let params = vec![Value::Integer(1)];
let result = query_engine
.execute_prepared(&prepared, ¶ms)
.await
.unwrap();
assert!(result.execution_time_ms > 0);
assert_eq!(query_engine.cache_stats().prepared_cache_size, 1);
}
#[tokio::test]
async fn test_query_explain() {
let temp_dir = TempDir::new().unwrap();
let config = Config::default();
let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
let storage = Arc::new(
crate::storage::StorageEngine::open(
temp_dir.path(),
&config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.unwrap(),
);
let schema = Arc::new(
crate::schema::SchemaManager::new(temp_dir.path())
.await
.unwrap(),
);
let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
let cql = "SELECT * FROM users WHERE id = 1";
let explain_result = query_engine.explain(cql).await.unwrap();
assert_eq!(explain_result.query_type, "Select");
assert!(explain_result.estimated_cost > 0.0);
assert!(!explain_result.selected_indexes.is_empty());
assert!(!explain_result.execution_steps.is_empty());
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_cache_eviction() {
let temp_dir = TempDir::new().unwrap();
let mut config = Config::default();
config.query.query_cache_size = Some(2);
let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
let storage = Arc::new(
crate::storage::StorageEngine::open(
temp_dir.path(),
&config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.unwrap(),
);
let schema = Arc::new(
crate::schema::SchemaManager::new(temp_dir.path())
.await
.unwrap(),
);
let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
let _ = query_engine
.execute("SELECT * FROM users WHERE id = 1")
.await;
let _ = query_engine
.execute("SELECT * FROM users WHERE id = 2")
.await;
let _ = query_engine
.execute("SELECT * FROM users WHERE id = 3")
.await;
assert_eq!(query_engine.cache_stats().plan_cache_size, 2);
}
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_schema_validation_api() {
let temp_dir = TempDir::new().unwrap();
let config = Config::default();
let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
let storage = Arc::new(
crate::storage::StorageEngine::open(
temp_dir.path(),
&config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.unwrap(),
);
let schema = Arc::new(
crate::schema::SchemaManager::new(temp_dir.path())
.await
.unwrap(),
);
let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
let has_schema = query_engine.has_schema_for_table("nonexistent_table").await;
assert!(!has_schema, "Should return false for non-existent table");
let status = query_engine.schema_status("nonexistent_table").await;
match status {
SchemaStatus::Missing { .. } | SchemaStatus::ExtractionFailed { .. } => {
}
SchemaStatus::Available { .. } => {
panic!("Should not be Available for non-existent table");
}
}
}
}
#[cfg(test)]
#[cfg(feature = "experimental")]
mod plan_cache_tests {
use super::*;
use crate::{
memory::MemoryManager, platform::Platform, schema::SchemaManager, storage::StorageEngine,
Config,
};
use std::sync::Arc;
use tempfile::TempDir;
async fn setup_query_engine(config: &Config) -> (QueryEngine, TempDir) {
let temp_dir = TempDir::new().unwrap();
let platform = Arc::new(Platform::new(config).await.unwrap());
let storage = Arc::new(
StorageEngine::open(
temp_dir.path(),
config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.unwrap(),
);
let schema = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
let memory = Arc::new(MemoryManager::new(config).unwrap());
let engine = QueryEngine::new(storage, schema, memory, config).unwrap();
(engine, temp_dir)
}
async fn create_sample_table(engine: &QueryEngine) {
engine
.execute(
"CREATE TABLE plan_cache_test (
id INTEGER PRIMARY KEY,
value TEXT
)",
)
.await
.unwrap();
engine
.execute("INSERT INTO plan_cache_test (id, value) VALUES (1, 'one')")
.await
.unwrap();
engine
.execute("INSERT INTO plan_cache_test (id, value) VALUES (2, 'two')")
.await
.unwrap();
engine
.execute("INSERT INTO plan_cache_test (id, value) VALUES (3, 'three')")
.await
.unwrap();
}
#[tokio::test]
async fn test_plan_cache_disabled() {
let mut config = Config::default();
config.query.query_cache_size = Some(0);
let (engine, _temp_dir) = setup_query_engine(&config).await;
create_sample_table(&engine).await;
engine
.execute("SELECT * FROM plan_cache_test WHERE id = 1")
.await
.unwrap();
assert_eq!(engine.cache_stats().plan_cache_size, 0);
}
#[tokio::test]
async fn test_plan_cache_reuse_point_lookup() {
let mut config = Config::default();
config.query.query_cache_size = Some(4);
let (engine, _temp_dir) = setup_query_engine(&config).await;
create_sample_table(&engine).await;
engine.clear_plan_cache();
engine
.execute("SELECT * FROM plan_cache_test WHERE id = 1")
.await
.unwrap();
engine
.execute("SELECT * FROM plan_cache_test WHERE id = 1")
.await
.unwrap();
assert_eq!(engine.cache_stats().plan_cache_size, 1);
assert!(engine.stats().cache_hit_ratio > 0.0);
}
#[tokio::test]
async fn test_plan_cache_eviction_limit() {
let mut config = Config::default();
config.query.query_cache_size = Some(2);
let (engine, _temp_dir) = setup_query_engine(&config).await;
create_sample_table(&engine).await;
engine.clear_plan_cache();
for id in 1..=3 {
engine
.execute(&format!("SELECT * FROM plan_cache_test WHERE id = {}", id))
.await
.unwrap();
}
assert_eq!(engine.cache_stats().plan_cache_size, 2);
}
}