1use super::{
13 executor::{QueryExecutor, QueryResult},
14 parser::QueryParser,
15 planner::QueryPlanner,
16 prepared::PreparedQuery,
17 result::{QueryResultIterator, StreamingConfig},
18 QueryStats,
19};
20
21#[cfg(feature = "state_machine")]
22use super::{select_executor::SelectExecutor, select_optimizer::SelectOptimizer, select_parser};
23use crate::{
24 memory::MemoryManager, schema::SchemaManager, storage::StorageEngine, Config, Error, Result,
25 Value,
26};
27use dashmap::DashMap;
28use std::sync::Arc;
29use std::time::Instant;
30
31#[derive(Debug, Clone)]
33pub struct QueryCacheEntry {
34 pub parsed_query: super::ParsedQuery,
36 pub plan: super::planner::QueryPlan,
38 pub cached_at: Instant,
40 pub hit_count: u64,
42}
43
44#[derive(Debug, Clone)]
46pub enum SchemaStatus {
47 Available { keyspace: String, table: String },
49 Missing { table: String, reason: String },
51 ExtractionFailed {
53 table: String,
54 cause: String,
55 suggestion: String,
56 },
57}
58
59#[derive(Debug)]
61pub struct QueryEngine {
62 parser: QueryParser,
64 planner: QueryPlanner,
66 executor: QueryExecutor,
68 schema_manager: Arc<SchemaManager>,
70 #[cfg(feature = "state_machine")]
72 select_optimizer: SelectOptimizer,
73 #[cfg(feature = "state_machine")]
75 select_executor: SelectExecutor,
76 prepared_cache: DashMap<String, Arc<PreparedQuery>>,
78 plan_cache: DashMap<String, QueryCacheEntry>,
80 stats: Arc<parking_lot::RwLock<QueryStats>>,
82 config: Config,
84}
85
86impl QueryEngine {
87 pub fn new(
89 storage: Arc<StorageEngine>,
90 schema: Arc<SchemaManager>,
91 _memory: Arc<MemoryManager>,
92 config: &Config,
93 ) -> Result<Self> {
94 let parser = QueryParser::new(config);
95 let planner = QueryPlanner::new(schema.clone(), config);
96 let executor = QueryExecutor::new(storage.clone(), schema.clone(), config);
97
98 #[cfg(feature = "state_machine")]
100 let select_optimizer = SelectOptimizer::new(schema.clone(), storage.clone());
101 #[cfg(feature = "state_machine")]
102 let select_executor = SelectExecutor::new(schema.clone(), storage);
103
104 Ok(Self {
105 parser,
106 planner,
107 executor,
108 schema_manager: schema,
109 #[cfg(feature = "state_machine")]
110 select_optimizer,
111 #[cfg(feature = "state_machine")]
112 select_executor,
113 prepared_cache: DashMap::new(),
114 plan_cache: DashMap::new(),
115 stats: Arc::new(parking_lot::RwLock::new(QueryStats::default())),
116 config: config.clone(),
117 })
118 }
119
120 fn inc_total_queries(&self) {
122 self.stats.write().total_queries += 1;
123 }
124
125 fn inc_error_queries(&self) {
127 self.stats.write().error_queries += 1;
128 }
129
130 fn record_cache_hit(&self) {
132 let mut stats = self.stats.write();
133 let total = stats.total_queries as f64;
134 stats.cache_hit_ratio = (stats.cache_hit_ratio * (total - 1.0) + 1.0) / total;
136 }
137
138 pub async fn execute(&self, cql: &str) -> Result<QueryResult> {
140 let start_time = Instant::now();
141 self.inc_total_queries();
142
143 let trimmed_cql = cql.trim().to_uppercase();
147 let is_simple_id_lookup = cql.contains("WHERE id =") && cql.split_whitespace().count() <= 8;
148 if trimmed_cql.starts_with("SELECT") && !is_simple_id_lookup {
149 return self.execute_select_query(cql, start_time).await;
150 }
151 #[cfg(debug_assertions)]
152 if trimmed_cql.starts_with("SELECT") && is_simple_id_lookup {
153 log::debug!(
154 "Routing simple SELECT through normal executor for consistent key handling"
155 );
156 }
157
158 if let Some(mut cached_entry) = self.plan_cache.get_mut(cql) {
160 self.record_cache_hit();
161 cached_entry.hit_count += 1;
162
163 let mut result = self.executor.execute(&cached_entry.plan).await?;
164 self.update_execution_stats(&mut result, start_time);
165 return Ok(result);
166 }
167
168 let parsed_query = self
169 .parser
170 .parse(cql)
171 .inspect_err(|_| self.inc_error_queries())?;
172 let plan = self.planner.plan(&parsed_query).await?;
173
174 if self.config.query.query_cache_size.unwrap_or(0) > 0 {
175 self.cache_query_plan(cql, parsed_query, plan.clone());
176 }
177
178 let mut result = self.executor.execute(&plan).await?;
179 self.update_execution_stats(&mut result, start_time);
180 Ok(result)
181 }
182
183 #[cfg(feature = "state_machine")]
205 pub async fn execute_streaming(
206 &self,
207 cql: &str,
208 config: StreamingConfig,
209 ) -> Result<QueryResultIterator> {
210 self.inc_total_queries();
211
212 if !cql.trim().to_uppercase().starts_with("SELECT") {
213 return Err(Error::query_execution(
214 "Streaming execution only supports SELECT queries",
215 ));
216 }
217
218 let select_statement =
219 select_parser::parse_select(cql).inspect_err(|_| self.inc_error_queries())?;
220 let optimized_plan = self.select_optimizer.optimize(select_statement).await?;
221
222 self.select_executor
223 .execute_streaming(optimized_plan, config)
224 .await
225 }
226
227 async fn execute_select_query(&self, cql: &str, start_time: Instant) -> Result<QueryResult> {
229 if let Some(mut cached_entry) = self.plan_cache.get_mut(cql) {
231 if cached_entry.plan.table.is_some() {
232 self.record_cache_hit();
233 cached_entry.hit_count += 1;
234
235 let mut result = self.executor.execute(&cached_entry.plan).await?;
236 self.update_execution_stats(&mut result, start_time);
237 return Ok(result);
238 }
239
240 drop(cached_entry);
242 self.plan_cache.remove(cql);
243 }
244
245 #[cfg(not(feature = "state_machine"))]
246 return Err(Error::query_execution(
247 "Advanced SELECT parsing requires state_machine feature",
248 ));
249
250 #[cfg(feature = "state_machine")]
251 {
252 let select_statement =
253 select_parser::parse_select(cql).inspect_err(|_| self.inc_error_queries())?;
254 let optimized_plan = self.select_optimizer.optimize(select_statement).await?;
255 let mut result = self.select_executor.execute(optimized_plan).await?;
256 self.update_execution_stats(&mut result, start_time);
257 Ok(result)
258 }
259 }
260
261 pub async fn execute_with_params(&self, cql: &str, _params: &[Value]) -> Result<QueryResult> {
263 self.execute(cql).await
266 }
267
268 pub async fn prepare(&self, cql: &str) -> Result<Arc<PreparedQuery>> {
270 if let Some(cached) = self.prepared_cache.get(cql) {
271 return Ok(cached.clone());
272 }
273
274 let parsed_query = self.parser.parse(cql)?;
275 let plan = self.planner.plan(&parsed_query).await?;
276
277 let prepared = Arc::new(PreparedQuery::new(
278 parsed_query,
279 plan,
280 Arc::new(self.executor.clone()),
281 ));
282
283 self.prepared_cache
284 .insert(cql.to_string(), prepared.clone());
285
286 Ok(prepared)
287 }
288
289 pub async fn execute_prepared(
291 &self,
292 prepared: &PreparedQuery,
293 params: &[Value],
294 ) -> Result<QueryResult> {
295 let start_time = Instant::now();
296 self.inc_total_queries();
297
298 let mut result = prepared.execute(params).await?;
299 self.update_execution_stats(&mut result, start_time);
300 Ok(result)
301 }
302
303 pub fn stats(&self) -> QueryStats {
305 self.stats.read().clone()
306 }
307
308 pub fn clear_caches(&self) {
310 self.prepared_cache.clear();
311 self.plan_cache.clear();
312 }
313
314 pub fn clear_prepared_cache(&self) {
316 self.prepared_cache.clear();
317 }
318
319 pub fn clear_plan_cache(&self) {
321 self.plan_cache.clear();
322 }
323
324 pub fn cache_stats(&self) -> CacheStats {
326 CacheStats {
327 prepared_cache_size: self.prepared_cache.len(),
328 plan_cache_size: self.plan_cache.len(),
329 prepared_cache_hits: self.prepared_cache.len() as u64,
330 plan_cache_hits: self.plan_cache.len() as u64,
331 }
332 }
333
334 pub async fn explain(&self, cql: &str) -> Result<ExplainResult> {
336 let parsed_query = self.parser.parse(cql)?;
338
339 let plan = self.planner.plan(&parsed_query).await?;
341
342 Ok(ExplainResult {
343 query_type: format!("{:?}", parsed_query.query_type),
344 plan_type: format!("{:?}", plan.plan_type),
345 estimated_cost: plan.estimated_cost,
346 estimated_rows: plan.estimated_rows,
347 selected_indexes: plan
348 .selected_indexes
349 .iter()
350 .map(|idx| format!("{} ({:?})", idx.index_name, idx.index_type))
351 .collect(),
352 execution_steps: plan
353 .steps
354 .iter()
355 .map(|step| {
356 format!(
357 "{:?}: {} (cost: {:.2})",
358 step.step_type,
359 step.columns.join(", "),
360 step.cost
361 )
362 })
363 .collect(),
364 parallelization_info: plan
365 .steps
366 .iter()
367 .filter(|step| step.parallelization.can_parallelize)
368 .map(|step| {
369 format!(
370 "Threads: {}, Partition: {:?}",
371 step.parallelization.suggested_threads, step.parallelization.partition_key
372 )
373 })
374 .collect(),
375 })
376 }
377
378 pub async fn analyze(&self, cql: &str) -> Result<AnalyzeResult> {
380 let start_time = Instant::now();
381
382 let mut execution_times = Vec::new();
384 let mut results = Vec::new();
385
386 for _ in 0..self.config.query.analyze_iterations.unwrap_or(5) {
387 let iter_start = Instant::now();
388 let result = self.execute(cql).await?;
389 execution_times.push(iter_start.elapsed());
390 results.push(result);
391 }
392
393 let total_time = start_time.elapsed();
394 let avg_time =
395 execution_times.iter().sum::<std::time::Duration>() / execution_times.len() as u32;
396 let no_times = || Error::query_execution("No execution times recorded for analysis");
397 let min_time = execution_times.iter().min().ok_or_else(no_times)?;
398 let max_time = execution_times.iter().max().ok_or_else(no_times)?;
399
400 let variance = execution_times
402 .iter()
403 .map(|time| {
404 let diff = time.as_nanos() as f64 - avg_time.as_nanos() as f64;
405 diff * diff
406 })
407 .sum::<f64>()
408 / execution_times.len() as f64;
409 let std_dev = variance.sqrt();
410
411 Ok(AnalyzeResult {
412 iterations: execution_times.len(),
413 total_time_ms: total_time.as_millis() as u64,
414 avg_time_ms: avg_time.as_millis() as u64,
415 min_time_ms: min_time.as_millis() as u64,
416 max_time_ms: max_time.as_millis() as u64,
417 std_dev_ms: (std_dev / 1_000_000.0) as u64, avg_rows_returned: results.iter().map(|r| r.rows.len()).sum::<usize>() / results.len(),
419 cache_hit_ratio: self.stats().cache_hit_ratio,
420 })
421 }
422
423 fn cache_query_plan(
425 &self,
426 cql: &str,
427 parsed_query: super::ParsedQuery,
428 plan: super::planner::QueryPlan,
429 ) {
430 let cache_size = self.config.query.query_cache_size.unwrap_or(0);
431 if cache_size == 0 {
432 return;
433 }
434
435 if self.plan_cache.len() >= cache_size {
436 let oldest_key = self
437 .plan_cache
438 .iter()
439 .min_by_key(|entry| entry.cached_at)
440 .map(|entry| entry.key().clone());
441 if let Some(key) = oldest_key {
442 self.plan_cache.remove(&key);
443 }
444 }
445
446 self.plan_cache.insert(
447 cql.to_string(),
448 QueryCacheEntry {
449 parsed_query,
450 plan,
451 cached_at: Instant::now(),
452 hit_count: 0,
453 },
454 );
455 }
456
457 pub async fn has_schema_for_table(&self, table: &str) -> bool {
459 self.schema_manager.get_table_schema(table).await.is_ok()
460 }
461
462 pub async fn schema_status(&self, table: &str) -> SchemaStatus {
464 match self.schema_manager.get_table_schema(table).await {
465 Ok(schema) => SchemaStatus::Available {
466 keyspace: schema.keyspace.clone(),
467 table: schema.table.clone(),
468 },
469 Err(Error::Schema(msg)) if msg.contains("not found") => {
470 SchemaStatus::Missing {
471 table: table.to_string(),
472 reason: msg,
473 }
474 }
475 Err(e) => SchemaStatus::ExtractionFailed {
476 table: table.to_string(),
477 cause: e.to_string(),
478 suggestion: "Verify SSTable files are valid Cassandra 5.0 format and Statistics.db contains SerializationHeader".to_string(),
479 },
480 }
481 }
482
483 fn update_execution_stats(&self, result: &mut QueryResult, start_time: Instant) {
485 let execution_time = start_time.elapsed();
486 result.execution_time_ms = if execution_time.is_zero() {
488 0
489 } else {
490 std::cmp::max(1, execution_time.as_millis() as u64)
491 };
492
493 let new_time_us = execution_time.as_micros() as u64;
494 let mut stats = self.stats.write();
495 stats.avg_execution_time_us = if stats.total_queries <= 1 {
496 new_time_us
497 } else {
498 ((stats.avg_execution_time_us * (stats.total_queries - 1)) + new_time_us)
499 / stats.total_queries
500 };
501 stats.rows_affected += result.rows_affected;
502 }
503}
504
505#[derive(Debug, Clone)]
507pub struct CacheStats {
508 pub prepared_cache_size: usize,
510 pub plan_cache_size: usize,
512 pub prepared_cache_hits: u64,
514 pub plan_cache_hits: u64,
516}
517
518#[derive(Debug, Clone)]
520pub struct ExplainResult {
521 pub query_type: String,
523 pub plan_type: String,
525 pub estimated_cost: f64,
527 pub estimated_rows: u64,
529 pub selected_indexes: Vec<String>,
531 pub execution_steps: Vec<String>,
533 pub parallelization_info: Vec<String>,
535}
536
537#[derive(Debug, Clone)]
539pub struct AnalyzeResult {
540 pub iterations: usize,
542 pub total_time_ms: u64,
544 pub avg_time_ms: u64,
546 pub min_time_ms: u64,
548 pub max_time_ms: u64,
550 pub std_dev_ms: u64,
552 pub avg_rows_returned: usize,
554 pub cache_hit_ratio: f64,
556}
557
558#[cfg(all(test, feature = "state_machine"))]
559mod tests {
560 use super::*;
561 use crate::Config;
562 use std::sync::Arc;
563 use tempfile::TempDir;
564
565 #[tokio::test]
566 async fn test_query_engine_creation() {
567 let temp_dir = TempDir::new().unwrap();
568 let config = Config::default();
569 let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
570
571 let storage = Arc::new(
572 crate::storage::StorageEngine::open(
573 temp_dir.path(),
574 &config,
575 platform,
576 #[cfg(feature = "state_machine")]
577 None,
578 )
579 .await
580 .unwrap(),
581 );
582 let schema = Arc::new(
583 crate::schema::SchemaManager::new(temp_dir.path())
584 .await
585 .unwrap(),
586 );
587 let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
588
589 let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
590
591 assert_eq!(query_engine.stats().total_queries, 0);
592 assert_eq!(query_engine.cache_stats().prepared_cache_size, 0);
593 assert_eq!(query_engine.cache_stats().plan_cache_size, 0);
594 }
595
596 #[tokio::test]
597 #[ignore = "Hangs >60s; needs investigation - gated for M1"]
598 async fn test_query_caching() {
599 let temp_dir = TempDir::new().unwrap();
600 let mut config = Config::test_config();
601 config.query.query_cache_size = Some(10);
602
603 let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
604 let storage = Arc::new(
605 crate::storage::StorageEngine::open(
606 temp_dir.path(),
607 &config,
608 platform,
609 #[cfg(feature = "state_machine")]
610 None,
611 )
612 .await
613 .unwrap(),
614 );
615 let schema = Arc::new(
616 crate::schema::SchemaManager::new(temp_dir.path())
617 .await
618 .unwrap(),
619 );
620 let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
621
622 let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
623
624 let cql = "SELECT * FROM users WHERE id = 1";
626 let _ = query_engine.execute(cql).await;
627 let _ = query_engine.execute(cql).await;
628
629 assert_eq!(query_engine.cache_stats().plan_cache_size, 1);
631
632 let stats = query_engine.stats();
634 assert!(stats.cache_hit_ratio > 0.0);
635 }
636
637 #[tokio::test]
638 #[cfg(feature = "state_machine")]
639 async fn test_prepared_statements() {
640 let temp_dir = TempDir::new().unwrap();
641 let config = Config::default();
642 let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
643
644 let storage = Arc::new(
645 crate::storage::StorageEngine::open(
646 temp_dir.path(),
647 &config,
648 platform,
649 #[cfg(feature = "state_machine")]
650 None,
651 )
652 .await
653 .unwrap(),
654 );
655 let schema = Arc::new(
656 crate::schema::SchemaManager::new(temp_dir.path())
657 .await
658 .unwrap(),
659 );
660 let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
661
662 let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
663
664 let cql = "SELECT * FROM users WHERE id = ?";
666 let prepared = query_engine.prepare(cql).await.unwrap();
667
668 let params = vec![Value::Integer(1)];
670 let result = query_engine
671 .execute_prepared(&prepared, ¶ms)
672 .await
673 .unwrap();
674
675 assert!(result.execution_time_ms > 0);
677
678 assert_eq!(query_engine.cache_stats().prepared_cache_size, 1);
680 }
681
682 #[tokio::test]
683 async fn test_query_explain() {
684 let temp_dir = TempDir::new().unwrap();
685 let config = Config::default();
686 let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
687
688 let storage = Arc::new(
689 crate::storage::StorageEngine::open(
690 temp_dir.path(),
691 &config,
692 platform,
693 #[cfg(feature = "state_machine")]
694 None,
695 )
696 .await
697 .unwrap(),
698 );
699 let schema = Arc::new(
700 crate::schema::SchemaManager::new(temp_dir.path())
701 .await
702 .unwrap(),
703 );
704 let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
705
706 let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
707
708 let cql = "SELECT * FROM users WHERE id = 1";
710 let explain_result = query_engine.explain(cql).await.unwrap();
711
712 assert_eq!(explain_result.query_type, "Select");
713 assert!(explain_result.estimated_cost > 0.0);
714 assert!(!explain_result.selected_indexes.is_empty());
715 assert!(!explain_result.execution_steps.is_empty());
716 }
717
718 #[tokio::test]
719 #[cfg(feature = "state_machine")]
720 async fn test_cache_eviction() {
721 let temp_dir = TempDir::new().unwrap();
722 let mut config = Config::default();
723 config.query.query_cache_size = Some(2); let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
726 let storage = Arc::new(
727 crate::storage::StorageEngine::open(
728 temp_dir.path(),
729 &config,
730 platform,
731 #[cfg(feature = "state_machine")]
732 None,
733 )
734 .await
735 .unwrap(),
736 );
737 let schema = Arc::new(
738 crate::schema::SchemaManager::new(temp_dir.path())
739 .await
740 .unwrap(),
741 );
742 let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
743
744 let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
745
746 let _ = query_engine
748 .execute("SELECT * FROM users WHERE id = 1")
749 .await;
750 let _ = query_engine
751 .execute("SELECT * FROM users WHERE id = 2")
752 .await;
753 let _ = query_engine
754 .execute("SELECT * FROM users WHERE id = 3")
755 .await;
756
757 assert_eq!(query_engine.cache_stats().plan_cache_size, 2);
759 }
760
761 #[tokio::test]
762 #[cfg(feature = "state_machine")]
763 async fn test_schema_validation_api() {
764 let temp_dir = TempDir::new().unwrap();
765 let config = Config::default();
766 let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
767
768 let storage = Arc::new(
769 crate::storage::StorageEngine::open(
770 temp_dir.path(),
771 &config,
772 platform,
773 #[cfg(feature = "state_machine")]
774 None,
775 )
776 .await
777 .unwrap(),
778 );
779 let schema = Arc::new(
780 crate::schema::SchemaManager::new(temp_dir.path())
781 .await
782 .unwrap(),
783 );
784 let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
785
786 let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
787
788 let has_schema = query_engine.has_schema_for_table("nonexistent_table").await;
790 assert!(!has_schema, "Should return false for non-existent table");
791
792 let status = query_engine.schema_status("nonexistent_table").await;
794 match status {
795 SchemaStatus::Missing { .. } | SchemaStatus::ExtractionFailed { .. } => {
796 }
798 SchemaStatus::Available { .. } => {
799 panic!("Should not be Available for non-existent table");
800 }
801 }
802 }
803}
804
805#[cfg(test)]
806#[cfg(feature = "experimental")]
807mod plan_cache_tests {
808 use super::*;
809 use crate::{
810 memory::MemoryManager, platform::Platform, schema::SchemaManager, storage::StorageEngine,
811 Config,
812 };
813 use std::sync::Arc;
814 use tempfile::TempDir;
815
816 async fn setup_query_engine(config: &Config) -> (QueryEngine, TempDir) {
817 let temp_dir = TempDir::new().unwrap();
818 let platform = Arc::new(Platform::new(config).await.unwrap());
819 let storage = Arc::new(
820 StorageEngine::open(
821 temp_dir.path(),
822 config,
823 platform,
824 #[cfg(feature = "state_machine")]
825 None,
826 )
827 .await
828 .unwrap(),
829 );
830 let schema = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
831 let memory = Arc::new(MemoryManager::new(config).unwrap());
832
833 let engine = QueryEngine::new(storage, schema, memory, config).unwrap();
834 (engine, temp_dir)
835 }
836
837 async fn create_sample_table(engine: &QueryEngine) {
838 engine
839 .execute(
840 "CREATE TABLE plan_cache_test (
841 id INTEGER PRIMARY KEY,
842 value TEXT
843 )",
844 )
845 .await
846 .unwrap();
847
848 engine
849 .execute("INSERT INTO plan_cache_test (id, value) VALUES (1, 'one')")
850 .await
851 .unwrap();
852 engine
853 .execute("INSERT INTO plan_cache_test (id, value) VALUES (2, 'two')")
854 .await
855 .unwrap();
856 engine
857 .execute("INSERT INTO plan_cache_test (id, value) VALUES (3, 'three')")
858 .await
859 .unwrap();
860 }
861
862 #[tokio::test]
863 async fn test_plan_cache_disabled() {
864 let mut config = Config::default();
865 config.query.query_cache_size = Some(0);
866
867 let (engine, _temp_dir) = setup_query_engine(&config).await;
868 create_sample_table(&engine).await;
869
870 engine
871 .execute("SELECT * FROM plan_cache_test WHERE id = 1")
872 .await
873 .unwrap();
874
875 assert_eq!(engine.cache_stats().plan_cache_size, 0);
876 }
877
878 #[tokio::test]
879 async fn test_plan_cache_reuse_point_lookup() {
880 let mut config = Config::default();
881 config.query.query_cache_size = Some(4);
882
883 let (engine, _temp_dir) = setup_query_engine(&config).await;
884 create_sample_table(&engine).await;
885
886 engine.clear_plan_cache();
887
888 engine
889 .execute("SELECT * FROM plan_cache_test WHERE id = 1")
890 .await
891 .unwrap();
892 engine
893 .execute("SELECT * FROM plan_cache_test WHERE id = 1")
894 .await
895 .unwrap();
896
897 assert_eq!(engine.cache_stats().plan_cache_size, 1);
898 assert!(engine.stats().cache_hit_ratio > 0.0);
899 }
900
901 #[tokio::test]
902 async fn test_plan_cache_eviction_limit() {
903 let mut config = Config::default();
904 config.query.query_cache_size = Some(2);
905
906 let (engine, _temp_dir) = setup_query_engine(&config).await;
907 create_sample_table(&engine).await;
908
909 engine.clear_plan_cache();
910
911 for id in 1..=3 {
912 engine
913 .execute(&format!("SELECT * FROM plan_cache_test WHERE id = {}", id))
914 .await
915 .unwrap();
916 }
917
918 assert_eq!(engine.cache_stats().plan_cache_size, 2);
919 }
920}