use super::{
executor::{QueryExecutor, QueryResult},
planner::{PlanType, QueryPlan},
ParsedQuery,
};
use crate::types::DataType;
use crate::{Error, Result, Value};
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug)]
pub struct PreparedQuery {
pub cql: String,
pub parsed_query: ParsedQuery,
pub plan: QueryPlan,
pub parameters: Vec<ParameterMetadata>,
executor: Arc<QueryExecutor>,
}
#[derive(Debug, Clone)]
pub struct ParameterMetadata {
pub name: Option<String>,
pub position: usize,
pub expected_type: Option<DataType>,
pub optional: bool,
}
#[derive(Debug)]
pub struct PreparedContext {
pub parameters: HashMap<String, Value>,
pub positional_params: Vec<Value>,
pub hints: ExecutionHints,
}
#[derive(Debug, Clone, Default)]
pub struct ExecutionHints {
pub force_index: Option<String>,
pub timeout_ms: Option<u64>,
pub parallelism: Option<usize>,
pub cache_results: bool,
}
impl PreparedQuery {
pub fn new(parsed_query: ParsedQuery, plan: QueryPlan, executor: Arc<QueryExecutor>) -> Self {
let cql = parsed_query.cql.clone();
let parameters = Self::extract_parameters(&parsed_query);
Self {
cql,
parsed_query,
plan,
parameters,
executor,
}
}
pub async fn execute(&self, params: &[Value]) -> Result<QueryResult> {
self.validate_params(params)?;
self.executor.execute(&self.plan).await
}
pub async fn execute_named(&self, params: &HashMap<String, Value>) -> Result<QueryResult> {
let mut positional_params = Vec::with_capacity(self.parameters.len());
for metadata in &self.parameters {
let Some(name) = &metadata.name else {
continue;
};
match params.get(name) {
Some(value) => positional_params.push(value.clone()),
None if metadata.optional => positional_params.push(Value::Null),
None => {
return Err(Error::query_execution(format!(
"Missing required parameter: {}",
name
)));
}
}
}
self.execute(&positional_params).await
}
pub async fn execute_with_context(&self, context: &PreparedContext) -> Result<QueryResult> {
let hints = &context.hints;
if hints.force_index.is_none() && hints.timeout_ms.is_none() && hints.parallelism.is_none()
{
return self.executor.execute(&self.plan).await;
}
let mut modified_plan = self.plan.clone();
if let Some(force_index) = &hints.force_index {
modified_plan.hints.force_index = Some(force_index.clone());
}
if let Some(timeout) = hints.timeout_ms {
modified_plan.hints.timeout_ms = Some(timeout);
}
if let Some(parallelism) = hints.parallelism {
modified_plan.hints.preferred_parallelization = Some(parallelism);
}
self.executor.execute(&modified_plan).await
}
pub fn parameters(&self) -> &[ParameterMetadata] {
&self.parameters
}
pub fn cql(&self) -> &str {
&self.cql
}
pub fn plan(&self) -> &QueryPlan {
&self.plan
}
pub fn stats(&self) -> PreparedQueryStats {
PreparedQueryStats {
parameter_count: self.parameters.len(),
plan_type: format!("{:?}", self.plan.plan_type),
estimated_cost: self.plan.estimated_cost,
estimated_rows: self.plan.estimated_rows,
cache_friendly: self.is_cache_friendly(),
}
}
pub fn is_cache_friendly(&self) -> bool {
matches!(
self.plan.plan_type,
PlanType::PointLookup | PlanType::IndexScan | PlanType::TableScan
)
}
fn validate_params(&self, params: &[Value]) -> Result<()> {
if params.len() != self.parameters.len() {
return Err(Error::query_execution(format!(
"Parameter count mismatch: expected {}, got {}",
self.parameters.len(),
params.len()
)));
}
for (i, (param, metadata)) in params.iter().zip(&self.parameters).enumerate() {
if let Some(expected_type) = &metadata.expected_type {
if !type_matches(param, expected_type) {
return Err(Error::query_execution(format!(
"Parameter {} type mismatch: expected {:?}, got {:?}",
i, expected_type, param
)));
}
}
}
Ok(())
}
fn extract_parameters(parsed_query: &ParsedQuery) -> Vec<ParameterMetadata> {
if parsed_query.where_clause.is_none() {
return Vec::new();
}
vec![ParameterMetadata {
name: None,
position: 0,
expected_type: Some(DataType::Integer),
optional: false,
}]
}
}
fn type_matches(value: &Value, expected_type: &DataType) -> bool {
matches!(
(value, expected_type),
(Value::Integer(_), DataType::Integer)
| (Value::Float(_), DataType::Float)
| (Value::Text(_), DataType::Text)
| (Value::Boolean(_), DataType::Boolean)
| (Value::Null, _)
)
}
#[derive(Debug, Clone)]
pub struct PreparedQueryStats {
pub parameter_count: usize,
pub plan_type: String,
pub estimated_cost: f64,
pub estimated_rows: u64,
pub cache_friendly: bool,
}
#[derive(Default)]
pub struct PreparedQueryBuilder {
cql: String,
parameters: Vec<ParameterMetadata>,
hints: ExecutionHints,
}
impl PreparedQueryBuilder {
pub fn new(cql: &str) -> Self {
Self {
cql: cql.to_string(),
..Self::default()
}
}
pub fn parameter(mut self, name: Option<String>, data_type: DataType, optional: bool) -> Self {
self.push_parameter(name, data_type, optional);
self
}
pub fn positional_parameter(mut self, data_type: DataType) -> Self {
self.push_parameter(None, data_type, false);
self
}
pub fn named_parameter(mut self, name: &str, data_type: DataType, optional: bool) -> Self {
self.push_parameter(Some(name.to_string()), data_type, optional);
self
}
pub fn hints(mut self, hints: ExecutionHints) -> Self {
self.hints = hints;
self
}
pub fn force_index(mut self, index_name: &str) -> Self {
self.hints.force_index = Some(index_name.to_string());
self
}
pub fn timeout(mut self, timeout_ms: u64) -> Self {
self.hints.timeout_ms = Some(timeout_ms);
self
}
pub fn parallelism(mut self, threads: usize) -> Self {
self.hints.parallelism = Some(threads);
self
}
pub fn cache_results(mut self) -> Self {
self.hints.cache_results = true;
self
}
pub fn build(
self,
parsed_query: ParsedQuery,
plan: QueryPlan,
executor: Arc<QueryExecutor>,
) -> PreparedQuery {
PreparedQuery {
cql: self.cql,
parsed_query,
plan,
parameters: self.parameters,
executor,
}
}
fn push_parameter(&mut self, name: Option<String>, data_type: DataType, optional: bool) {
self.parameters.push(ParameterMetadata {
name,
position: self.parameters.len(),
expected_type: Some(data_type),
optional,
});
}
}
#[cfg(all(test, feature = "state_machine"))]
mod tests {
use super::*;
use crate::Config;
use std::sync::Arc;
use tempfile::TempDir;
#[tokio::test]
#[cfg(feature = "state_machine")]
async fn test_prepared_query_creation() {
let temp_dir = TempDir::new().unwrap();
let config = Config::default();
let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
let storage = Arc::new(
crate::storage::StorageEngine::open(
temp_dir.path(),
&config,
platform,
#[cfg(feature = "state_machine")]
None,
)
.await
.unwrap(),
);
let schema = Arc::new(
crate::schema::SchemaManager::new(temp_dir.path())
.await
.unwrap(),
);
let executor = Arc::new(crate::query::executor::QueryExecutor::new(
storage, schema, &config,
));
let parsed_query = ParsedQuery {
query_type: crate::query::QueryType::Select,
table: Some(crate::TableId::new("users")),
columns: vec!["*".to_string()],
where_clause: None,
values: vec![],
set_clause: std::collections::HashMap::new(),
order_by: vec![],
limit: None,
cql: "SELECT * FROM users".to_string(),
};
let plan = crate::query::planner::QueryPlan {
plan_type: crate::query::planner::PlanType::TableScan,
table: Some(crate::TableId::new("users")),
estimated_cost: 100.0,
estimated_rows: 1000,
selected_indexes: vec![],
steps: vec![],
hints: crate::query::planner::QueryHints::default(),
};
let prepared = PreparedQuery::new(parsed_query, plan, executor);
assert_eq!(prepared.cql(), "SELECT * FROM users");
assert_eq!(prepared.parameters().len(), 0);
assert!(prepared.is_cache_friendly());
}
#[test]
fn test_prepared_query_builder() {
let builder = PreparedQueryBuilder::new("SELECT * FROM users WHERE id = ? AND name = ?")
.positional_parameter(DataType::Integer)
.positional_parameter(DataType::Text)
.timeout(5000)
.parallelism(4);
assert_eq!(builder.cql, "SELECT * FROM users WHERE id = ? AND name = ?");
assert_eq!(builder.parameters.len(), 2);
assert_eq!(builder.hints.timeout_ms, Some(5000));
assert_eq!(builder.hints.parallelism, Some(4));
}
#[test]
fn test_parameter_metadata() {
let metadata = ParameterMetadata {
name: Some("user_id".to_string()),
position: 0,
expected_type: Some(DataType::Integer),
optional: false,
};
assert_eq!(metadata.name, Some("user_id".to_string()));
assert_eq!(metadata.position, 0);
assert!(!metadata.optional);
}
#[test]
fn test_execution_hints() {
let hints = ExecutionHints {
force_index: Some("idx_user_id".to_string()),
timeout_ms: Some(10000),
parallelism: Some(8),
cache_results: true,
};
assert_eq!(hints.force_index, Some("idx_user_id".to_string()));
assert_eq!(hints.timeout_ms, Some(10000));
assert_eq!(hints.parallelism, Some(8));
assert!(hints.cache_results);
}
#[test]
fn test_type_matching() {
assert!(type_matches(&Value::Integer(42), &DataType::Integer));
assert!(type_matches(
&Value::Text("test".to_string()),
&DataType::Text
));
assert!(type_matches(&Value::Null, &DataType::Integer));
assert!(!type_matches(&Value::Integer(42), &DataType::Text));
}
}