Skip to main content

cqlite_core/query/
prepared.rs

1//! Prepared statements for CQLite
2//!
3//! This module provides prepared statement support for CQL queries.
4//! Prepared statements offer several benefits:
5//!
6//! - Performance: Query parsing and planning is done once
7//! - Security: Parameters are safely bound preventing query injection
8//! - Reusability: Same query can be executed with different parameters
9
10// CQL (Cassandra Query Language) Reference:
11// https://cassandra.apache.org/doc/latest/cassandra/developing/cql/cql_singlefile.html
12//
13// This implements CQL v3.4.3+ for Apache Cassandra 5.0+
14// CQL is NOT SQL - it's a query language specifically designed for Cassandra's distributed architecture.
15
16use super::{
17    executor::{QueryExecutor, QueryResult},
18    planner::{PlanType, QueryPlan},
19    ParsedQuery,
20};
21use crate::types::DataType;
22use crate::{Error, Result, Value};
23use std::collections::HashMap;
24use std::sync::Arc;
25
26/// Prepared query statement
27#[derive(Debug)]
28pub struct PreparedQuery {
29    /// Original CQL text
30    pub cql: String,
31    /// Parsed query
32    pub parsed_query: ParsedQuery,
33    /// Query execution plan
34    pub plan: QueryPlan,
35    /// Parameter metadata
36    pub parameters: Vec<ParameterMetadata>,
37    /// Query executor
38    executor: Arc<QueryExecutor>,
39}
40
41/// Parameter metadata for prepared statements
42#[derive(Debug, Clone)]
43pub struct ParameterMetadata {
44    /// Parameter name (if named)
45    pub name: Option<String>,
46    /// Parameter position (0-based)
47    pub position: usize,
48    /// Expected parameter type
49    pub expected_type: Option<DataType>,
50    /// Whether parameter is optional
51    pub optional: bool,
52}
53
54/// Prepared statement execution context
55#[derive(Debug)]
56pub struct PreparedContext {
57    /// Bound parameters
58    pub parameters: HashMap<String, Value>,
59    /// Positional parameters
60    pub positional_params: Vec<Value>,
61    /// Execution hints
62    pub hints: ExecutionHints,
63}
64
65/// Execution hints for prepared statements
66#[derive(Debug, Clone, Default)]
67pub struct ExecutionHints {
68    /// Force specific index usage
69    pub force_index: Option<String>,
70    /// Query timeout in milliseconds
71    pub timeout_ms: Option<u64>,
72    /// Parallelization preference
73    pub parallelism: Option<usize>,
74    /// Cache results
75    pub cache_results: bool,
76}
77
78impl PreparedQuery {
79    /// Create a new prepared query
80    pub fn new(parsed_query: ParsedQuery, plan: QueryPlan, executor: Arc<QueryExecutor>) -> Self {
81        let cql = parsed_query.cql.clone();
82        let parameters = Self::extract_parameters(&parsed_query);
83
84        Self {
85            cql,
86            parsed_query,
87            plan,
88            parameters,
89            executor,
90        }
91    }
92
93    /// Execute the prepared query with parameters
94    pub async fn execute(&self, params: &[Value]) -> Result<QueryResult> {
95        self.validate_params(params)?;
96        // Default execution path: no hints, so skip the plan clone in
97        // execute_with_context and call the executor directly.
98        self.executor.execute(&self.plan).await
99    }
100
101    /// Execute with named parameters
102    pub async fn execute_named(&self, params: &HashMap<String, Value>) -> Result<QueryResult> {
103        // Convert named parameters to positional, in declaration order.
104        let mut positional_params = Vec::with_capacity(self.parameters.len());
105        for metadata in &self.parameters {
106            let Some(name) = &metadata.name else {
107                continue;
108            };
109            match params.get(name) {
110                Some(value) => positional_params.push(value.clone()),
111                None if metadata.optional => positional_params.push(Value::Null),
112                None => {
113                    return Err(Error::query_execution(format!(
114                        "Missing required parameter: {}",
115                        name
116                    )));
117                }
118            }
119        }
120
121        self.execute(&positional_params).await
122    }
123
124    /// Execute with execution context
125    pub async fn execute_with_context(&self, context: &PreparedContext) -> Result<QueryResult> {
126        let hints = &context.hints;
127        // Avoid cloning the plan if no hints would override it.
128        if hints.force_index.is_none() && hints.timeout_ms.is_none() && hints.parallelism.is_none()
129        {
130            return self.executor.execute(&self.plan).await;
131        }
132
133        let mut modified_plan = self.plan.clone();
134        if let Some(force_index) = &hints.force_index {
135            modified_plan.hints.force_index = Some(force_index.clone());
136        }
137        if let Some(timeout) = hints.timeout_ms {
138            modified_plan.hints.timeout_ms = Some(timeout);
139        }
140        if let Some(parallelism) = hints.parallelism {
141            modified_plan.hints.preferred_parallelization = Some(parallelism);
142        }
143        self.executor.execute(&modified_plan).await
144    }
145
146    /// Get parameter metadata
147    pub fn parameters(&self) -> &[ParameterMetadata] {
148        &self.parameters
149    }
150
151    /// Get CQL text
152    pub fn cql(&self) -> &str {
153        &self.cql
154    }
155
156    /// Get query plan
157    pub fn plan(&self) -> &QueryPlan {
158        &self.plan
159    }
160
161    /// Get query statistics
162    pub fn stats(&self) -> PreparedQueryStats {
163        PreparedQueryStats {
164            parameter_count: self.parameters.len(),
165            plan_type: format!("{:?}", self.plan.plan_type),
166            estimated_cost: self.plan.estimated_cost,
167            estimated_rows: self.plan.estimated_rows,
168            cache_friendly: self.is_cache_friendly(),
169        }
170    }
171
172    /// Check if query is cache-friendly
173    pub fn is_cache_friendly(&self) -> bool {
174        // Cache-friendly plans have predictable execution patterns and no complex
175        // aggregations. The simplified implementation also treats TableScan as
176        // cache-friendly.
177        matches!(
178            self.plan.plan_type,
179            PlanType::PointLookup | PlanType::IndexScan | PlanType::TableScan
180        )
181    }
182
183    /// Validate parameter count and types against this query's metadata.
184    fn validate_params(&self, params: &[Value]) -> Result<()> {
185        if params.len() != self.parameters.len() {
186            return Err(Error::query_execution(format!(
187                "Parameter count mismatch: expected {}, got {}",
188                self.parameters.len(),
189                params.len()
190            )));
191        }
192
193        for (i, (param, metadata)) in params.iter().zip(&self.parameters).enumerate() {
194            if let Some(expected_type) = &metadata.expected_type {
195                if !type_matches(param, expected_type) {
196                    return Err(Error::query_execution(format!(
197                        "Parameter {} type mismatch: expected {:?}, got {:?}",
198                        i, expected_type, param
199                    )));
200                }
201            }
202        }
203
204        Ok(())
205    }
206
207    /// Extract parameter placeholders from parsed query.
208    ///
209    /// Simplified implementation: a single positional Integer parameter is
210    /// emitted whenever the query has a WHERE clause. A real implementation
211    /// would scan the CQL text for `?` and `:name` placeholders.
212    fn extract_parameters(parsed_query: &ParsedQuery) -> Vec<ParameterMetadata> {
213        if parsed_query.where_clause.is_none() {
214            return Vec::new();
215        }
216        vec![ParameterMetadata {
217            name: None,
218            position: 0,
219            expected_type: Some(DataType::Integer),
220            optional: false,
221        }]
222    }
223}
224
225/// Check if `value` matches `expected_type`. `Null` is compatible with any type.
226fn type_matches(value: &Value, expected_type: &DataType) -> bool {
227    matches!(
228        (value, expected_type),
229        (Value::Integer(_), DataType::Integer)
230            | (Value::Float(_), DataType::Float)
231            | (Value::Text(_), DataType::Text)
232            | (Value::Boolean(_), DataType::Boolean)
233            | (Value::Null, _)
234    )
235}
236
237/// Statistics for prepared queries
238#[derive(Debug, Clone)]
239pub struct PreparedQueryStats {
240    /// Number of parameters
241    pub parameter_count: usize,
242    /// Plan type
243    pub plan_type: String,
244    /// Estimated execution cost
245    pub estimated_cost: f64,
246    /// Estimated rows returned
247    pub estimated_rows: u64,
248    /// Whether query is cache-friendly
249    pub cache_friendly: bool,
250}
251
252/// Prepared statement builder
253#[derive(Default)]
254pub struct PreparedQueryBuilder {
255    /// CQL text
256    cql: String,
257    /// Parameter metadata
258    parameters: Vec<ParameterMetadata>,
259    /// Execution hints
260    hints: ExecutionHints,
261}
262
263impl PreparedQueryBuilder {
264    /// Create a new builder
265    pub fn new(cql: &str) -> Self {
266        Self {
267            cql: cql.to_string(),
268            ..Self::default()
269        }
270    }
271
272    /// Add a parameter
273    pub fn parameter(mut self, name: Option<String>, data_type: DataType, optional: bool) -> Self {
274        self.push_parameter(name, data_type, optional);
275        self
276    }
277
278    /// Add a positional parameter
279    pub fn positional_parameter(mut self, data_type: DataType) -> Self {
280        self.push_parameter(None, data_type, false);
281        self
282    }
283
284    /// Add a named parameter
285    pub fn named_parameter(mut self, name: &str, data_type: DataType, optional: bool) -> Self {
286        self.push_parameter(Some(name.to_string()), data_type, optional);
287        self
288    }
289
290    /// Set execution hints
291    pub fn hints(mut self, hints: ExecutionHints) -> Self {
292        self.hints = hints;
293        self
294    }
295
296    /// Force index usage
297    pub fn force_index(mut self, index_name: &str) -> Self {
298        self.hints.force_index = Some(index_name.to_string());
299        self
300    }
301
302    /// Set query timeout
303    pub fn timeout(mut self, timeout_ms: u64) -> Self {
304        self.hints.timeout_ms = Some(timeout_ms);
305        self
306    }
307
308    /// Set parallelism preference
309    pub fn parallelism(mut self, threads: usize) -> Self {
310        self.hints.parallelism = Some(threads);
311        self
312    }
313
314    /// Enable result caching
315    pub fn cache_results(mut self) -> Self {
316        self.hints.cache_results = true;
317        self
318    }
319
320    /// Build the prepared query (this would typically be called by the query engine)
321    pub fn build(
322        self,
323        parsed_query: ParsedQuery,
324        plan: QueryPlan,
325        executor: Arc<QueryExecutor>,
326    ) -> PreparedQuery {
327        PreparedQuery {
328            cql: self.cql,
329            parsed_query,
330            plan,
331            parameters: self.parameters,
332            executor,
333        }
334    }
335
336    fn push_parameter(&mut self, name: Option<String>, data_type: DataType, optional: bool) {
337        self.parameters.push(ParameterMetadata {
338            name,
339            position: self.parameters.len(),
340            expected_type: Some(data_type),
341            optional,
342        });
343    }
344}
345
346#[cfg(all(test, feature = "state_machine"))]
347mod tests {
348    use super::*;
349    use crate::Config;
350    use std::sync::Arc;
351    use tempfile::TempDir;
352
353    #[tokio::test]
354    #[cfg(feature = "state_machine")]
355    async fn test_prepared_query_creation() {
356        let temp_dir = TempDir::new().unwrap();
357        let config = Config::default();
358        let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
359        let storage = Arc::new(
360            crate::storage::StorageEngine::open(
361                temp_dir.path(),
362                &config,
363                platform,
364                #[cfg(feature = "state_machine")]
365                None,
366            )
367            .await
368            .unwrap(),
369        );
370        let schema = Arc::new(
371            crate::schema::SchemaManager::new(temp_dir.path())
372                .await
373                .unwrap(),
374        );
375        let executor = Arc::new(crate::query::executor::QueryExecutor::new(
376            storage, schema, &config,
377        ));
378
379        let parsed_query = ParsedQuery {
380            query_type: crate::query::QueryType::Select,
381            table: Some(crate::TableId::new("users")),
382            columns: vec!["*".to_string()],
383            where_clause: None,
384            values: vec![],
385            set_clause: std::collections::HashMap::new(),
386            order_by: vec![],
387            limit: None,
388            cql: "SELECT * FROM users".to_string(),
389        };
390
391        let plan = crate::query::planner::QueryPlan {
392            plan_type: crate::query::planner::PlanType::TableScan,
393            table: Some(crate::TableId::new("users")),
394            estimated_cost: 100.0,
395            estimated_rows: 1000,
396            selected_indexes: vec![],
397            steps: vec![],
398            hints: crate::query::planner::QueryHints::default(),
399        };
400
401        let prepared = PreparedQuery::new(parsed_query, plan, executor);
402
403        assert_eq!(prepared.cql(), "SELECT * FROM users");
404        assert_eq!(prepared.parameters().len(), 0);
405        // TableScan is treated as cache-friendly by the simplified implementation.
406        assert!(prepared.is_cache_friendly());
407    }
408
409    #[test]
410    fn test_prepared_query_builder() {
411        let builder = PreparedQueryBuilder::new("SELECT * FROM users WHERE id = ? AND name = ?")
412            .positional_parameter(DataType::Integer)
413            .positional_parameter(DataType::Text)
414            .timeout(5000)
415            .parallelism(4);
416
417        assert_eq!(builder.cql, "SELECT * FROM users WHERE id = ? AND name = ?");
418        assert_eq!(builder.parameters.len(), 2);
419        assert_eq!(builder.hints.timeout_ms, Some(5000));
420        assert_eq!(builder.hints.parallelism, Some(4));
421    }
422
423    #[test]
424    fn test_parameter_metadata() {
425        let metadata = ParameterMetadata {
426            name: Some("user_id".to_string()),
427            position: 0,
428            expected_type: Some(DataType::Integer),
429            optional: false,
430        };
431
432        assert_eq!(metadata.name, Some("user_id".to_string()));
433        assert_eq!(metadata.position, 0);
434        assert!(!metadata.optional);
435    }
436
437    #[test]
438    fn test_execution_hints() {
439        let hints = ExecutionHints {
440            force_index: Some("idx_user_id".to_string()),
441            timeout_ms: Some(10000),
442            parallelism: Some(8),
443            cache_results: true,
444        };
445
446        assert_eq!(hints.force_index, Some("idx_user_id".to_string()));
447        assert_eq!(hints.timeout_ms, Some(10000));
448        assert_eq!(hints.parallelism, Some(8));
449        assert!(hints.cache_results);
450    }
451
452    #[test]
453    fn test_type_matching() {
454        assert!(type_matches(&Value::Integer(42), &DataType::Integer));
455        assert!(type_matches(
456            &Value::Text("test".to_string()),
457            &DataType::Text
458        ));
459        // Null matches any expected type.
460        assert!(type_matches(&Value::Null, &DataType::Integer));
461        assert!(!type_matches(&Value::Integer(42), &DataType::Text));
462    }
463}