1use super::{
17 executor::{QueryExecutor, QueryResult},
18 planner::{PlanType, QueryPlan},
19 ParsedQuery,
20};
21use crate::types::DataType;
22use crate::{Error, Result, Value};
23use std::collections::HashMap;
24use std::sync::Arc;
25
26#[derive(Debug)]
28pub struct PreparedQuery {
29 pub cql: String,
31 pub parsed_query: ParsedQuery,
33 pub plan: QueryPlan,
35 pub parameters: Vec<ParameterMetadata>,
37 executor: Arc<QueryExecutor>,
39}
40
41#[derive(Debug, Clone)]
43pub struct ParameterMetadata {
44 pub name: Option<String>,
46 pub position: usize,
48 pub expected_type: Option<DataType>,
50 pub optional: bool,
52}
53
54#[derive(Debug)]
56pub struct PreparedContext {
57 pub parameters: HashMap<String, Value>,
59 pub positional_params: Vec<Value>,
61 pub hints: ExecutionHints,
63}
64
65#[derive(Debug, Clone, Default)]
67pub struct ExecutionHints {
68 pub force_index: Option<String>,
70 pub timeout_ms: Option<u64>,
72 pub parallelism: Option<usize>,
74 pub cache_results: bool,
76}
77
78impl PreparedQuery {
79 pub fn new(parsed_query: ParsedQuery, plan: QueryPlan, executor: Arc<QueryExecutor>) -> Self {
81 let cql = parsed_query.cql.clone();
82 let parameters = Self::extract_parameters(&parsed_query);
83
84 Self {
85 cql,
86 parsed_query,
87 plan,
88 parameters,
89 executor,
90 }
91 }
92
93 pub async fn execute(&self, params: &[Value]) -> Result<QueryResult> {
95 self.validate_params(params)?;
96 self.executor.execute(&self.plan).await
99 }
100
101 pub async fn execute_named(&self, params: &HashMap<String, Value>) -> Result<QueryResult> {
103 let mut positional_params = Vec::with_capacity(self.parameters.len());
105 for metadata in &self.parameters {
106 let Some(name) = &metadata.name else {
107 continue;
108 };
109 match params.get(name) {
110 Some(value) => positional_params.push(value.clone()),
111 None if metadata.optional => positional_params.push(Value::Null),
112 None => {
113 return Err(Error::query_execution(format!(
114 "Missing required parameter: {}",
115 name
116 )));
117 }
118 }
119 }
120
121 self.execute(&positional_params).await
122 }
123
124 pub async fn execute_with_context(&self, context: &PreparedContext) -> Result<QueryResult> {
126 let hints = &context.hints;
127 if hints.force_index.is_none() && hints.timeout_ms.is_none() && hints.parallelism.is_none()
129 {
130 return self.executor.execute(&self.plan).await;
131 }
132
133 let mut modified_plan = self.plan.clone();
134 if let Some(force_index) = &hints.force_index {
135 modified_plan.hints.force_index = Some(force_index.clone());
136 }
137 if let Some(timeout) = hints.timeout_ms {
138 modified_plan.hints.timeout_ms = Some(timeout);
139 }
140 if let Some(parallelism) = hints.parallelism {
141 modified_plan.hints.preferred_parallelization = Some(parallelism);
142 }
143 self.executor.execute(&modified_plan).await
144 }
145
146 pub fn parameters(&self) -> &[ParameterMetadata] {
148 &self.parameters
149 }
150
151 pub fn cql(&self) -> &str {
153 &self.cql
154 }
155
156 pub fn plan(&self) -> &QueryPlan {
158 &self.plan
159 }
160
161 pub fn stats(&self) -> PreparedQueryStats {
163 PreparedQueryStats {
164 parameter_count: self.parameters.len(),
165 plan_type: format!("{:?}", self.plan.plan_type),
166 estimated_cost: self.plan.estimated_cost,
167 estimated_rows: self.plan.estimated_rows,
168 cache_friendly: self.is_cache_friendly(),
169 }
170 }
171
172 pub fn is_cache_friendly(&self) -> bool {
174 matches!(
178 self.plan.plan_type,
179 PlanType::PointLookup | PlanType::IndexScan | PlanType::TableScan
180 )
181 }
182
183 fn validate_params(&self, params: &[Value]) -> Result<()> {
185 if params.len() != self.parameters.len() {
186 return Err(Error::query_execution(format!(
187 "Parameter count mismatch: expected {}, got {}",
188 self.parameters.len(),
189 params.len()
190 )));
191 }
192
193 for (i, (param, metadata)) in params.iter().zip(&self.parameters).enumerate() {
194 if let Some(expected_type) = &metadata.expected_type {
195 if !type_matches(param, expected_type) {
196 return Err(Error::query_execution(format!(
197 "Parameter {} type mismatch: expected {:?}, got {:?}",
198 i, expected_type, param
199 )));
200 }
201 }
202 }
203
204 Ok(())
205 }
206
207 fn extract_parameters(parsed_query: &ParsedQuery) -> Vec<ParameterMetadata> {
213 if parsed_query.where_clause.is_none() {
214 return Vec::new();
215 }
216 vec![ParameterMetadata {
217 name: None,
218 position: 0,
219 expected_type: Some(DataType::Integer),
220 optional: false,
221 }]
222 }
223}
224
225fn type_matches(value: &Value, expected_type: &DataType) -> bool {
227 matches!(
228 (value, expected_type),
229 (Value::Integer(_), DataType::Integer)
230 | (Value::Float(_), DataType::Float)
231 | (Value::Text(_), DataType::Text)
232 | (Value::Boolean(_), DataType::Boolean)
233 | (Value::Null, _)
234 )
235}
236
237#[derive(Debug, Clone)]
239pub struct PreparedQueryStats {
240 pub parameter_count: usize,
242 pub plan_type: String,
244 pub estimated_cost: f64,
246 pub estimated_rows: u64,
248 pub cache_friendly: bool,
250}
251
252#[derive(Default)]
254pub struct PreparedQueryBuilder {
255 cql: String,
257 parameters: Vec<ParameterMetadata>,
259 hints: ExecutionHints,
261}
262
263impl PreparedQueryBuilder {
264 pub fn new(cql: &str) -> Self {
266 Self {
267 cql: cql.to_string(),
268 ..Self::default()
269 }
270 }
271
272 pub fn parameter(mut self, name: Option<String>, data_type: DataType, optional: bool) -> Self {
274 self.push_parameter(name, data_type, optional);
275 self
276 }
277
278 pub fn positional_parameter(mut self, data_type: DataType) -> Self {
280 self.push_parameter(None, data_type, false);
281 self
282 }
283
284 pub fn named_parameter(mut self, name: &str, data_type: DataType, optional: bool) -> Self {
286 self.push_parameter(Some(name.to_string()), data_type, optional);
287 self
288 }
289
290 pub fn hints(mut self, hints: ExecutionHints) -> Self {
292 self.hints = hints;
293 self
294 }
295
296 pub fn force_index(mut self, index_name: &str) -> Self {
298 self.hints.force_index = Some(index_name.to_string());
299 self
300 }
301
302 pub fn timeout(mut self, timeout_ms: u64) -> Self {
304 self.hints.timeout_ms = Some(timeout_ms);
305 self
306 }
307
308 pub fn parallelism(mut self, threads: usize) -> Self {
310 self.hints.parallelism = Some(threads);
311 self
312 }
313
314 pub fn cache_results(mut self) -> Self {
316 self.hints.cache_results = true;
317 self
318 }
319
320 pub fn build(
322 self,
323 parsed_query: ParsedQuery,
324 plan: QueryPlan,
325 executor: Arc<QueryExecutor>,
326 ) -> PreparedQuery {
327 PreparedQuery {
328 cql: self.cql,
329 parsed_query,
330 plan,
331 parameters: self.parameters,
332 executor,
333 }
334 }
335
336 fn push_parameter(&mut self, name: Option<String>, data_type: DataType, optional: bool) {
337 self.parameters.push(ParameterMetadata {
338 name,
339 position: self.parameters.len(),
340 expected_type: Some(data_type),
341 optional,
342 });
343 }
344}
345
346#[cfg(all(test, feature = "state_machine"))]
347mod tests {
348 use super::*;
349 use crate::Config;
350 use std::sync::Arc;
351 use tempfile::TempDir;
352
353 #[tokio::test]
354 #[cfg(feature = "state_machine")]
355 async fn test_prepared_query_creation() {
356 let temp_dir = TempDir::new().unwrap();
357 let config = Config::default();
358 let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
359 let storage = Arc::new(
360 crate::storage::StorageEngine::open(
361 temp_dir.path(),
362 &config,
363 platform,
364 #[cfg(feature = "state_machine")]
365 None,
366 )
367 .await
368 .unwrap(),
369 );
370 let schema = Arc::new(
371 crate::schema::SchemaManager::new(temp_dir.path())
372 .await
373 .unwrap(),
374 );
375 let executor = Arc::new(crate::query::executor::QueryExecutor::new(
376 storage, schema, &config,
377 ));
378
379 let parsed_query = ParsedQuery {
380 query_type: crate::query::QueryType::Select,
381 table: Some(crate::TableId::new("users")),
382 columns: vec!["*".to_string()],
383 where_clause: None,
384 values: vec![],
385 set_clause: std::collections::HashMap::new(),
386 order_by: vec![],
387 limit: None,
388 cql: "SELECT * FROM users".to_string(),
389 };
390
391 let plan = crate::query::planner::QueryPlan {
392 plan_type: crate::query::planner::PlanType::TableScan,
393 table: Some(crate::TableId::new("users")),
394 estimated_cost: 100.0,
395 estimated_rows: 1000,
396 selected_indexes: vec![],
397 steps: vec![],
398 hints: crate::query::planner::QueryHints::default(),
399 };
400
401 let prepared = PreparedQuery::new(parsed_query, plan, executor);
402
403 assert_eq!(prepared.cql(), "SELECT * FROM users");
404 assert_eq!(prepared.parameters().len(), 0);
405 assert!(prepared.is_cache_friendly());
407 }
408
409 #[test]
410 fn test_prepared_query_builder() {
411 let builder = PreparedQueryBuilder::new("SELECT * FROM users WHERE id = ? AND name = ?")
412 .positional_parameter(DataType::Integer)
413 .positional_parameter(DataType::Text)
414 .timeout(5000)
415 .parallelism(4);
416
417 assert_eq!(builder.cql, "SELECT * FROM users WHERE id = ? AND name = ?");
418 assert_eq!(builder.parameters.len(), 2);
419 assert_eq!(builder.hints.timeout_ms, Some(5000));
420 assert_eq!(builder.hints.parallelism, Some(4));
421 }
422
423 #[test]
424 fn test_parameter_metadata() {
425 let metadata = ParameterMetadata {
426 name: Some("user_id".to_string()),
427 position: 0,
428 expected_type: Some(DataType::Integer),
429 optional: false,
430 };
431
432 assert_eq!(metadata.name, Some("user_id".to_string()));
433 assert_eq!(metadata.position, 0);
434 assert!(!metadata.optional);
435 }
436
437 #[test]
438 fn test_execution_hints() {
439 let hints = ExecutionHints {
440 force_index: Some("idx_user_id".to_string()),
441 timeout_ms: Some(10000),
442 parallelism: Some(8),
443 cache_results: true,
444 };
445
446 assert_eq!(hints.force_index, Some("idx_user_id".to_string()));
447 assert_eq!(hints.timeout_ms, Some(10000));
448 assert_eq!(hints.parallelism, Some(8));
449 assert!(hints.cache_results);
450 }
451
452 #[test]
453 fn test_type_matching() {
454 assert!(type_matches(&Value::Integer(42), &DataType::Integer));
455 assert!(type_matches(
456 &Value::Text("test".to_string()),
457 &DataType::Text
458 ));
459 assert!(type_matches(&Value::Null, &DataType::Integer));
461 assert!(!type_matches(&Value::Integer(42), &DataType::Text));
462 }
463}