Skip to main content

cqlite_core/query/
engine.rs

1//! Query engine implementation for CQLite
2//!
3//! This module provides the main query engine that coordinates between
4//! parsing, planning, and execution of CQL queries.
5
6// CQL (Cassandra Query Language) Reference:
7// https://cassandra.apache.org/doc/latest/cassandra/developing/cql/cql_singlefile.html
8//
9// This implements CQL v3.4.3+ for Apache Cassandra 5.0+
10// CQL is NOT SQL - it's a query language specifically designed for Cassandra's distributed architecture.
11
12use super::{
13    executor::{QueryExecutor, QueryResult},
14    parser::QueryParser,
15    planner::QueryPlanner,
16    prepared::PreparedQuery,
17    result::{QueryResultIterator, StreamingConfig},
18    QueryStats,
19};
20
21#[cfg(feature = "state_machine")]
22use super::{select_executor::SelectExecutor, select_optimizer::SelectOptimizer, select_parser};
23use crate::{
24    memory::MemoryManager, schema::SchemaManager, storage::StorageEngine, Config, Error, Result,
25    Value,
26};
27use dashmap::DashMap;
28use std::sync::Arc;
29use std::time::Instant;
30
31/// Query cache entry
32#[derive(Debug, Clone)]
33pub struct QueryCacheEntry {
34    /// Parsed query
35    pub parsed_query: super::ParsedQuery,
36    /// Query plan
37    pub plan: super::planner::QueryPlan,
38    /// Cache timestamp
39    pub cached_at: Instant,
40    /// Hit count
41    pub hit_count: u64,
42}
43
44/// Schema availability status for diagnostic purposes
45#[derive(Debug, Clone)]
46pub enum SchemaStatus {
47    /// Schema is available and ready for queries
48    Available { keyspace: String, table: String },
49    /// Schema not found in registry
50    Missing { table: String, reason: String },
51    /// Schema extraction failed from SSTable
52    ExtractionFailed {
53        table: String,
54        cause: String,
55        suggestion: String,
56    },
57}
58
59/// Query engine with caching and statistics
60#[derive(Debug)]
61pub struct QueryEngine {
62    /// Query parser
63    parser: QueryParser,
64    /// Query planner
65    planner: QueryPlanner,
66    /// Query executor
67    executor: QueryExecutor,
68    /// Schema manager reference
69    schema_manager: Arc<SchemaManager>,
70    /// Advanced SELECT optimizer
71    #[cfg(feature = "state_machine")]
72    select_optimizer: SelectOptimizer,
73    /// Advanced SELECT executor
74    #[cfg(feature = "state_machine")]
75    select_executor: SelectExecutor,
76    /// Prepared statement cache
77    prepared_cache: DashMap<String, Arc<PreparedQuery>>,
78    /// Query plan cache
79    plan_cache: DashMap<String, QueryCacheEntry>,
80    /// Query statistics
81    stats: Arc<parking_lot::RwLock<QueryStats>>,
82    /// Configuration
83    config: Config,
84}
85
86impl QueryEngine {
87    /// Create a new query engine
88    pub fn new(
89        storage: Arc<StorageEngine>,
90        schema: Arc<SchemaManager>,
91        _memory: Arc<MemoryManager>,
92        config: &Config,
93    ) -> Result<Self> {
94        let parser = QueryParser::new(config);
95        let planner = QueryPlanner::new(schema.clone(), config);
96        let executor = QueryExecutor::new(storage.clone(), schema.clone(), config);
97
98        // Initialize advanced SELECT components
99        #[cfg(feature = "state_machine")]
100        let select_optimizer = SelectOptimizer::new(schema.clone(), storage.clone());
101        #[cfg(feature = "state_machine")]
102        let select_executor = SelectExecutor::new(schema.clone(), storage);
103
104        Ok(Self {
105            parser,
106            planner,
107            executor,
108            schema_manager: schema,
109            #[cfg(feature = "state_machine")]
110            select_optimizer,
111            #[cfg(feature = "state_machine")]
112            select_executor,
113            prepared_cache: DashMap::new(),
114            plan_cache: DashMap::new(),
115            stats: Arc::new(parking_lot::RwLock::new(QueryStats::default())),
116            config: config.clone(),
117        })
118    }
119
120    /// Increment the total queries counter
121    fn inc_total_queries(&self) {
122        self.stats.write().total_queries += 1;
123    }
124
125    /// Increment the error queries counter
126    fn inc_error_queries(&self) {
127        self.stats.write().error_queries += 1;
128    }
129
130    /// Update cache hit ratio after a cache hit
131    fn record_cache_hit(&self) {
132        let mut stats = self.stats.write();
133        let total = stats.total_queries as f64;
134        // Running mean: previous ratio weighted by (total - 1) hits + 1 hit / total
135        stats.cache_hit_ratio = (stats.cache_hit_ratio * (total - 1.0) + 1.0) / total;
136    }
137
138    /// Execute a CQL query
139    pub async fn execute(&self, cql: &str) -> Result<QueryResult> {
140        let start_time = Instant::now();
141        self.inc_total_queries();
142
143        // Route SELECT statements through the advanced parser, except simple
144        // `WHERE id = <value>` point lookups which must share the normal
145        // executor's key-handling path so INSERT and SELECT agree on keys.
146        let trimmed_cql = cql.trim().to_uppercase();
147        let is_simple_id_lookup = cql.contains("WHERE id =") && cql.split_whitespace().count() <= 8;
148        if trimmed_cql.starts_with("SELECT") && !is_simple_id_lookup {
149            return self.execute_select_query(cql, start_time).await;
150        }
151        #[cfg(debug_assertions)]
152        if trimmed_cql.starts_with("SELECT") && is_simple_id_lookup {
153            log::debug!(
154                "Routing simple SELECT through normal executor for consistent key handling"
155            );
156        }
157
158        // Check plan cache first for non-SELECT queries
159        if let Some(mut cached_entry) = self.plan_cache.get_mut(cql) {
160            self.record_cache_hit();
161            cached_entry.hit_count += 1;
162
163            let mut result = self.executor.execute(&cached_entry.plan).await?;
164            self.update_execution_stats(&mut result, start_time);
165            return Ok(result);
166        }
167
168        let parsed_query = self
169            .parser
170            .parse(cql)
171            .inspect_err(|_| self.inc_error_queries())?;
172        let plan = self.planner.plan(&parsed_query).await?;
173
174        if self.config.query.query_cache_size.unwrap_or(0) > 0 {
175            self.cache_query_plan(cql, parsed_query, plan.clone());
176        }
177
178        let mut result = self.executor.execute(&plan).await?;
179        self.update_execution_stats(&mut result, start_time);
180        Ok(result)
181    }
182
183    /// Execute a CQL query with streaming results (Issue #280)
184    ///
185    /// Returns a `QueryResultIterator` that yields rows incrementally via a bounded
186    /// channel, enabling memory-efficient processing of large result sets.
187    ///
188    /// # Arguments
189    ///
190    /// * `cql` - The CQL query string to execute (must be a SELECT statement)
191    /// * `config` - Streaming configuration (buffer size, chunk hints)
192    ///
193    /// # Errors
194    ///
195    /// Returns an error if:
196    /// - Query is not a SELECT statement
197    /// - SQL syntax is invalid
198    /// - Query execution fails
199    ///
200    /// # Memory Budget
201    ///
202    /// The streaming approach stays within the 128MB target by using bounded channels
203    /// and processing rows incrementally rather than materializing all results.
204    #[cfg(feature = "state_machine")]
205    pub async fn execute_streaming(
206        &self,
207        cql: &str,
208        config: StreamingConfig,
209    ) -> Result<QueryResultIterator> {
210        self.inc_total_queries();
211
212        if !cql.trim().to_uppercase().starts_with("SELECT") {
213            return Err(Error::query_execution(
214                "Streaming execution only supports SELECT queries",
215            ));
216        }
217
218        let select_statement =
219            select_parser::parse_select(cql).inspect_err(|_| self.inc_error_queries())?;
220        let optimized_plan = self.select_optimizer.optimize(select_statement).await?;
221
222        self.select_executor
223            .execute_streaming(optimized_plan, config)
224            .await
225    }
226
227    /// Execute a SELECT query using the advanced parser and optimizer
228    async fn execute_select_query(&self, cql: &str, start_time: Instant) -> Result<QueryResult> {
229        // Check plan cache first for SELECT queries too
230        if let Some(mut cached_entry) = self.plan_cache.get_mut(cql) {
231            if cached_entry.plan.table.is_some() {
232                self.record_cache_hit();
233                cached_entry.hit_count += 1;
234
235                let mut result = self.executor.execute(&cached_entry.plan).await?;
236                self.update_execution_stats(&mut result, start_time);
237                return Ok(result);
238            }
239
240            // Placeholder plans without table information are not reusable; drop them.
241            drop(cached_entry);
242            self.plan_cache.remove(cql);
243        }
244
245        #[cfg(not(feature = "state_machine"))]
246        return Err(Error::query_execution(
247            "Advanced SELECT parsing requires state_machine feature",
248        ));
249
250        #[cfg(feature = "state_machine")]
251        {
252            let select_statement =
253                select_parser::parse_select(cql).inspect_err(|_| self.inc_error_queries())?;
254            let optimized_plan = self.select_optimizer.optimize(select_statement).await?;
255            let mut result = self.select_executor.execute(optimized_plan).await?;
256            self.update_execution_stats(&mut result, start_time);
257            Ok(result)
258        }
259    }
260
261    /// Execute a query with parameters
262    pub async fn execute_with_params(&self, cql: &str, _params: &[Value]) -> Result<QueryResult> {
263        // In a real implementation, this would substitute parameters into the query
264        // For now, we'll just execute the query as-is
265        self.execute(cql).await
266    }
267
268    /// Prepare a query for repeated execution
269    pub async fn prepare(&self, cql: &str) -> Result<Arc<PreparedQuery>> {
270        if let Some(cached) = self.prepared_cache.get(cql) {
271            return Ok(cached.clone());
272        }
273
274        let parsed_query = self.parser.parse(cql)?;
275        let plan = self.planner.plan(&parsed_query).await?;
276
277        let prepared = Arc::new(PreparedQuery::new(
278            parsed_query,
279            plan,
280            Arc::new(self.executor.clone()),
281        ));
282
283        self.prepared_cache
284            .insert(cql.to_string(), prepared.clone());
285
286        Ok(prepared)
287    }
288
289    /// Execute a prepared query
290    pub async fn execute_prepared(
291        &self,
292        prepared: &PreparedQuery,
293        params: &[Value],
294    ) -> Result<QueryResult> {
295        let start_time = Instant::now();
296        self.inc_total_queries();
297
298        let mut result = prepared.execute(params).await?;
299        self.update_execution_stats(&mut result, start_time);
300        Ok(result)
301    }
302
303    /// Get query statistics
304    pub fn stats(&self) -> QueryStats {
305        self.stats.read().clone()
306    }
307
308    /// Clear all caches
309    pub fn clear_caches(&self) {
310        self.prepared_cache.clear();
311        self.plan_cache.clear();
312    }
313
314    /// Clear prepared statement cache
315    pub fn clear_prepared_cache(&self) {
316        self.prepared_cache.clear();
317    }
318
319    /// Clear query plan cache
320    pub fn clear_plan_cache(&self) {
321        self.plan_cache.clear();
322    }
323
324    /// Get cache statistics
325    pub fn cache_stats(&self) -> CacheStats {
326        CacheStats {
327            prepared_cache_size: self.prepared_cache.len(),
328            plan_cache_size: self.plan_cache.len(),
329            prepared_cache_hits: self.prepared_cache.len() as u64,
330            plan_cache_hits: self.plan_cache.len() as u64,
331        }
332    }
333
334    /// Optimize a query (return execution plan without executing)
335    pub async fn explain(&self, cql: &str) -> Result<ExplainResult> {
336        // Parse the query
337        let parsed_query = self.parser.parse(cql)?;
338
339        // Plan the query
340        let plan = self.planner.plan(&parsed_query).await?;
341
342        Ok(ExplainResult {
343            query_type: format!("{:?}", parsed_query.query_type),
344            plan_type: format!("{:?}", plan.plan_type),
345            estimated_cost: plan.estimated_cost,
346            estimated_rows: plan.estimated_rows,
347            selected_indexes: plan
348                .selected_indexes
349                .iter()
350                .map(|idx| format!("{} ({:?})", idx.index_name, idx.index_type))
351                .collect(),
352            execution_steps: plan
353                .steps
354                .iter()
355                .map(|step| {
356                    format!(
357                        "{:?}: {} (cost: {:.2})",
358                        step.step_type,
359                        step.columns.join(", "),
360                        step.cost
361                    )
362                })
363                .collect(),
364            parallelization_info: plan
365                .steps
366                .iter()
367                .filter(|step| step.parallelization.can_parallelize)
368                .map(|step| {
369                    format!(
370                        "Threads: {}, Partition: {:?}",
371                        step.parallelization.suggested_threads, step.parallelization.partition_key
372                    )
373                })
374                .collect(),
375        })
376    }
377
378    /// Analyze query performance
379    pub async fn analyze(&self, cql: &str) -> Result<AnalyzeResult> {
380        let start_time = Instant::now();
381
382        // Execute the query multiple times to get average performance
383        let mut execution_times = Vec::new();
384        let mut results = Vec::new();
385
386        for _ in 0..self.config.query.analyze_iterations.unwrap_or(5) {
387            let iter_start = Instant::now();
388            let result = self.execute(cql).await?;
389            execution_times.push(iter_start.elapsed());
390            results.push(result);
391        }
392
393        let total_time = start_time.elapsed();
394        let avg_time =
395            execution_times.iter().sum::<std::time::Duration>() / execution_times.len() as u32;
396        let no_times = || Error::query_execution("No execution times recorded for analysis");
397        let min_time = execution_times.iter().min().ok_or_else(no_times)?;
398        let max_time = execution_times.iter().max().ok_or_else(no_times)?;
399
400        // Calculate standard deviation
401        let variance = execution_times
402            .iter()
403            .map(|time| {
404                let diff = time.as_nanos() as f64 - avg_time.as_nanos() as f64;
405                diff * diff
406            })
407            .sum::<f64>()
408            / execution_times.len() as f64;
409        let std_dev = variance.sqrt();
410
411        Ok(AnalyzeResult {
412            iterations: execution_times.len(),
413            total_time_ms: total_time.as_millis() as u64,
414            avg_time_ms: avg_time.as_millis() as u64,
415            min_time_ms: min_time.as_millis() as u64,
416            max_time_ms: max_time.as_millis() as u64,
417            std_dev_ms: (std_dev / 1_000_000.0) as u64, // Convert from nanoseconds to milliseconds
418            avg_rows_returned: results.iter().map(|r| r.rows.len()).sum::<usize>() / results.len(),
419            cache_hit_ratio: self.stats().cache_hit_ratio,
420        })
421    }
422
423    /// Cache a query plan, evicting the oldest entry first if at capacity (simple LRU).
424    fn cache_query_plan(
425        &self,
426        cql: &str,
427        parsed_query: super::ParsedQuery,
428        plan: super::planner::QueryPlan,
429    ) {
430        let cache_size = self.config.query.query_cache_size.unwrap_or(0);
431        if cache_size == 0 {
432            return;
433        }
434
435        if self.plan_cache.len() >= cache_size {
436            let oldest_key = self
437                .plan_cache
438                .iter()
439                .min_by_key(|entry| entry.cached_at)
440                .map(|entry| entry.key().clone());
441            if let Some(key) = oldest_key {
442                self.plan_cache.remove(&key);
443            }
444        }
445
446        self.plan_cache.insert(
447            cql.to_string(),
448            QueryCacheEntry {
449                parsed_query,
450                plan,
451                cached_at: Instant::now(),
452                hit_count: 0,
453            },
454        );
455    }
456
457    /// Check if schema is available for a table
458    pub async fn has_schema_for_table(&self, table: &str) -> bool {
459        self.schema_manager.get_table_schema(table).await.is_ok()
460    }
461
462    /// Get detailed schema status for debugging
463    pub async fn schema_status(&self, table: &str) -> SchemaStatus {
464        match self.schema_manager.get_table_schema(table).await {
465            Ok(schema) => SchemaStatus::Available {
466                keyspace: schema.keyspace.clone(),
467                table: schema.table.clone(),
468            },
469            Err(Error::Schema(msg)) if msg.contains("not found") => {
470                SchemaStatus::Missing {
471                    table: table.to_string(),
472                    reason: msg,
473                }
474            }
475            Err(e) => SchemaStatus::ExtractionFailed {
476                table: table.to_string(),
477                cause: e.to_string(),
478                suggestion: "Verify SSTable files are valid Cassandra 5.0 format and Statistics.db contains SerializationHeader".to_string(),
479            },
480        }
481    }
482
483    /// Update execution statistics
484    fn update_execution_stats(&self, result: &mut QueryResult, start_time: Instant) {
485        let execution_time = start_time.elapsed();
486        // Ensure any non-zero execution time is at least 1ms for reporting
487        result.execution_time_ms = if execution_time.is_zero() {
488            0
489        } else {
490            std::cmp::max(1, execution_time.as_millis() as u64)
491        };
492
493        let new_time_us = execution_time.as_micros() as u64;
494        let mut stats = self.stats.write();
495        stats.avg_execution_time_us = if stats.total_queries <= 1 {
496            new_time_us
497        } else {
498            ((stats.avg_execution_time_us * (stats.total_queries - 1)) + new_time_us)
499                / stats.total_queries
500        };
501        stats.rows_affected += result.rows_affected;
502    }
503}
504
505/// Cache statistics
506#[derive(Debug, Clone)]
507pub struct CacheStats {
508    /// Number of prepared statements cached
509    pub prepared_cache_size: usize,
510    /// Number of query plans cached
511    pub plan_cache_size: usize,
512    /// Total prepared cache hits
513    pub prepared_cache_hits: u64,
514    /// Total plan cache hits
515    pub plan_cache_hits: u64,
516}
517
518/// Query explanation result
519#[derive(Debug, Clone)]
520pub struct ExplainResult {
521    /// Query type
522    pub query_type: String,
523    /// Plan type
524    pub plan_type: String,
525    /// Estimated cost
526    pub estimated_cost: f64,
527    /// Estimated rows
528    pub estimated_rows: u64,
529    /// Selected indexes
530    pub selected_indexes: Vec<String>,
531    /// Execution steps
532    pub execution_steps: Vec<String>,
533    /// Parallelization information
534    pub parallelization_info: Vec<String>,
535}
536
537/// Query analysis result
538#[derive(Debug, Clone)]
539pub struct AnalyzeResult {
540    /// Number of iterations
541    pub iterations: usize,
542    /// Total analysis time
543    pub total_time_ms: u64,
544    /// Average execution time
545    pub avg_time_ms: u64,
546    /// Minimum execution time
547    pub min_time_ms: u64,
548    /// Maximum execution time
549    pub max_time_ms: u64,
550    /// Standard deviation of execution times
551    pub std_dev_ms: u64,
552    /// Average rows returned
553    pub avg_rows_returned: usize,
554    /// Cache hit ratio
555    pub cache_hit_ratio: f64,
556}
557
558#[cfg(all(test, feature = "state_machine"))]
559mod tests {
560    use super::*;
561    use crate::Config;
562    use std::sync::Arc;
563    use tempfile::TempDir;
564
565    #[tokio::test]
566    async fn test_query_engine_creation() {
567        let temp_dir = TempDir::new().unwrap();
568        let config = Config::default();
569        let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
570
571        let storage = Arc::new(
572            crate::storage::StorageEngine::open(
573                temp_dir.path(),
574                &config,
575                platform,
576                #[cfg(feature = "state_machine")]
577                None,
578            )
579            .await
580            .unwrap(),
581        );
582        let schema = Arc::new(
583            crate::schema::SchemaManager::new(temp_dir.path())
584                .await
585                .unwrap(),
586        );
587        let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
588
589        let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
590
591        assert_eq!(query_engine.stats().total_queries, 0);
592        assert_eq!(query_engine.cache_stats().prepared_cache_size, 0);
593        assert_eq!(query_engine.cache_stats().plan_cache_size, 0);
594    }
595
596    #[tokio::test]
597    #[ignore = "Hangs >60s; needs investigation - gated for M1"]
598    async fn test_query_caching() {
599        let temp_dir = TempDir::new().unwrap();
600        let mut config = Config::test_config();
601        config.query.query_cache_size = Some(10);
602
603        let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
604        let storage = Arc::new(
605            crate::storage::StorageEngine::open(
606                temp_dir.path(),
607                &config,
608                platform,
609                #[cfg(feature = "state_machine")]
610                None,
611            )
612            .await
613            .unwrap(),
614        );
615        let schema = Arc::new(
616            crate::schema::SchemaManager::new(temp_dir.path())
617                .await
618                .unwrap(),
619        );
620        let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
621
622        let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
623
624        // Execute a query twice
625        let cql = "SELECT * FROM users WHERE id = 1";
626        let _ = query_engine.execute(cql).await;
627        let _ = query_engine.execute(cql).await;
628
629        // Check that plan was cached
630        assert_eq!(query_engine.cache_stats().plan_cache_size, 1);
631
632        // Check cache hit ratio
633        let stats = query_engine.stats();
634        assert!(stats.cache_hit_ratio > 0.0);
635    }
636
637    #[tokio::test]
638    #[cfg(feature = "state_machine")]
639    async fn test_prepared_statements() {
640        let temp_dir = TempDir::new().unwrap();
641        let config = Config::default();
642        let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
643
644        let storage = Arc::new(
645            crate::storage::StorageEngine::open(
646                temp_dir.path(),
647                &config,
648                platform,
649                #[cfg(feature = "state_machine")]
650                None,
651            )
652            .await
653            .unwrap(),
654        );
655        let schema = Arc::new(
656            crate::schema::SchemaManager::new(temp_dir.path())
657                .await
658                .unwrap(),
659        );
660        let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
661
662        let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
663
664        // Prepare a statement
665        let cql = "SELECT * FROM users WHERE id = ?";
666        let prepared = query_engine.prepare(cql).await.unwrap();
667
668        // Execute it with parameters
669        let params = vec![Value::Integer(1)];
670        let result = query_engine
671            .execute_prepared(&prepared, &params)
672            .await
673            .unwrap();
674
675        // Check that result was generated
676        assert!(result.execution_time_ms > 0);
677
678        // Check that statement was cached
679        assert_eq!(query_engine.cache_stats().prepared_cache_size, 1);
680    }
681
682    #[tokio::test]
683    async fn test_query_explain() {
684        let temp_dir = TempDir::new().unwrap();
685        let config = Config::default();
686        let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
687
688        let storage = Arc::new(
689            crate::storage::StorageEngine::open(
690                temp_dir.path(),
691                &config,
692                platform,
693                #[cfg(feature = "state_machine")]
694                None,
695            )
696            .await
697            .unwrap(),
698        );
699        let schema = Arc::new(
700            crate::schema::SchemaManager::new(temp_dir.path())
701                .await
702                .unwrap(),
703        );
704        let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
705
706        let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
707
708        // Explain a query
709        let cql = "SELECT * FROM users WHERE id = 1";
710        let explain_result = query_engine.explain(cql).await.unwrap();
711
712        assert_eq!(explain_result.query_type, "Select");
713        assert!(explain_result.estimated_cost > 0.0);
714        assert!(!explain_result.selected_indexes.is_empty());
715        assert!(!explain_result.execution_steps.is_empty());
716    }
717
718    #[tokio::test]
719    #[cfg(feature = "state_machine")]
720    async fn test_cache_eviction() {
721        let temp_dir = TempDir::new().unwrap();
722        let mut config = Config::default();
723        config.query.query_cache_size = Some(2); // Very small cache
724
725        let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
726        let storage = Arc::new(
727            crate::storage::StorageEngine::open(
728                temp_dir.path(),
729                &config,
730                platform,
731                #[cfg(feature = "state_machine")]
732                None,
733            )
734            .await
735            .unwrap(),
736        );
737        let schema = Arc::new(
738            crate::schema::SchemaManager::new(temp_dir.path())
739                .await
740                .unwrap(),
741        );
742        let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
743
744        let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
745
746        // Execute 3 different queries
747        let _ = query_engine
748            .execute("SELECT * FROM users WHERE id = 1")
749            .await;
750        let _ = query_engine
751            .execute("SELECT * FROM users WHERE id = 2")
752            .await;
753        let _ = query_engine
754            .execute("SELECT * FROM users WHERE id = 3")
755            .await;
756
757        // Cache should only have 2 entries due to eviction
758        assert_eq!(query_engine.cache_stats().plan_cache_size, 2);
759    }
760
761    #[tokio::test]
762    #[cfg(feature = "state_machine")]
763    async fn test_schema_validation_api() {
764        let temp_dir = TempDir::new().unwrap();
765        let config = Config::default();
766        let platform = Arc::new(crate::platform::Platform::new(&config).await.unwrap());
767
768        let storage = Arc::new(
769            crate::storage::StorageEngine::open(
770                temp_dir.path(),
771                &config,
772                platform,
773                #[cfg(feature = "state_machine")]
774                None,
775            )
776            .await
777            .unwrap(),
778        );
779        let schema = Arc::new(
780            crate::schema::SchemaManager::new(temp_dir.path())
781                .await
782                .unwrap(),
783        );
784        let memory = Arc::new(crate::memory::MemoryManager::new(&config).unwrap());
785
786        let query_engine = QueryEngine::new(storage, schema, memory, &config).unwrap();
787
788        // Test has_schema_for_table with non-existent table
789        let has_schema = query_engine.has_schema_for_table("nonexistent_table").await;
790        assert!(!has_schema, "Should return false for non-existent table");
791
792        // Test schema_status with non-existent table
793        let status = query_engine.schema_status("nonexistent_table").await;
794        match status {
795            SchemaStatus::Missing { .. } | SchemaStatus::ExtractionFailed { .. } => {
796                // Expected - either missing or extraction failed is correct
797            }
798            SchemaStatus::Available { .. } => {
799                panic!("Should not be Available for non-existent table");
800            }
801        }
802    }
803}
804
805#[cfg(test)]
806#[cfg(feature = "experimental")]
807mod plan_cache_tests {
808    use super::*;
809    use crate::{
810        memory::MemoryManager, platform::Platform, schema::SchemaManager, storage::StorageEngine,
811        Config,
812    };
813    use std::sync::Arc;
814    use tempfile::TempDir;
815
816    async fn setup_query_engine(config: &Config) -> (QueryEngine, TempDir) {
817        let temp_dir = TempDir::new().unwrap();
818        let platform = Arc::new(Platform::new(config).await.unwrap());
819        let storage = Arc::new(
820            StorageEngine::open(
821                temp_dir.path(),
822                config,
823                platform,
824                #[cfg(feature = "state_machine")]
825                None,
826            )
827            .await
828            .unwrap(),
829        );
830        let schema = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
831        let memory = Arc::new(MemoryManager::new(config).unwrap());
832
833        let engine = QueryEngine::new(storage, schema, memory, config).unwrap();
834        (engine, temp_dir)
835    }
836
837    async fn create_sample_table(engine: &QueryEngine) {
838        engine
839            .execute(
840                "CREATE TABLE plan_cache_test (
841                    id INTEGER PRIMARY KEY,
842                    value TEXT
843                )",
844            )
845            .await
846            .unwrap();
847
848        engine
849            .execute("INSERT INTO plan_cache_test (id, value) VALUES (1, 'one')")
850            .await
851            .unwrap();
852        engine
853            .execute("INSERT INTO plan_cache_test (id, value) VALUES (2, 'two')")
854            .await
855            .unwrap();
856        engine
857            .execute("INSERT INTO plan_cache_test (id, value) VALUES (3, 'three')")
858            .await
859            .unwrap();
860    }
861
862    #[tokio::test]
863    async fn test_plan_cache_disabled() {
864        let mut config = Config::default();
865        config.query.query_cache_size = Some(0);
866
867        let (engine, _temp_dir) = setup_query_engine(&config).await;
868        create_sample_table(&engine).await;
869
870        engine
871            .execute("SELECT * FROM plan_cache_test WHERE id = 1")
872            .await
873            .unwrap();
874
875        assert_eq!(engine.cache_stats().plan_cache_size, 0);
876    }
877
878    #[tokio::test]
879    async fn test_plan_cache_reuse_point_lookup() {
880        let mut config = Config::default();
881        config.query.query_cache_size = Some(4);
882
883        let (engine, _temp_dir) = setup_query_engine(&config).await;
884        create_sample_table(&engine).await;
885
886        engine.clear_plan_cache();
887
888        engine
889            .execute("SELECT * FROM plan_cache_test WHERE id = 1")
890            .await
891            .unwrap();
892        engine
893            .execute("SELECT * FROM plan_cache_test WHERE id = 1")
894            .await
895            .unwrap();
896
897        assert_eq!(engine.cache_stats().plan_cache_size, 1);
898        assert!(engine.stats().cache_hit_ratio > 0.0);
899    }
900
901    #[tokio::test]
902    async fn test_plan_cache_eviction_limit() {
903        let mut config = Config::default();
904        config.query.query_cache_size = Some(2);
905
906        let (engine, _temp_dir) = setup_query_engine(&config).await;
907        create_sample_table(&engine).await;
908
909        engine.clear_plan_cache();
910
911        for id in 1..=3 {
912            engine
913                .execute(&format!("SELECT * FROM plan_cache_test WHERE id = {}", id))
914                .await
915                .unwrap();
916        }
917
918        assert_eq!(engine.cache_stats().plan_cache_size, 2);
919    }
920}