Skip to main content

cqlite_core/query/
planner.rs

1//! Query planner for CQLite
2//!
3//! This module provides query planning and optimization capabilities for CQL queries.
4//! It includes:
5//!
6//! - Query plan generation and optimization
7//! - Index selection and utilization
8//! - Cost-based optimization
9//! - Execution plan representation
10
11// CQL (Cassandra Query Language) Reference:
12// https://cassandra.apache.org/doc/latest/cassandra/developing/cql/cql_singlefile.html
13//
14// This implements CQL v3.4.3+ for Apache Cassandra 5.0+
15// CQL is NOT SQL - it's a query language specifically designed for Cassandra's distributed architecture.
16
17use super::{ComparisonOperator, Condition, ParsedQuery, QueryType, WhereClause};
18use crate::{schema::SchemaManager, Config, Error, Result, TableId};
19use std::sync::Arc;
20
21// --- Cost-model tuning constants ---------------------------------------------
22// All multipliers below are dimensionless factors applied to base costs from
23// `CostModel`. Names are kept descriptive so changes flow through to every
24// caller via a single source of truth.
25
26/// Default thread count when `Config.query.query_parallelism` is unset.
27const DEFAULT_PARALLELISM: usize = 4;
28
29/// Row count above which a scan is worth parallelizing.
30const PARALLELIZATION_ROW_THRESHOLD: u64 = 10_000;
31
32/// Filter is roughly an order of magnitude cheaper than a scan over the same rows.
33const FILTER_COST_FACTOR: f64 = 0.1;
34
35/// UPDATE write step is approximately half the cost of a full row scan.
36const UPDATE_WRITE_COST_FACTOR: f64 = 0.5;
37
38/// Projection is essentially a column-pick — three orders of magnitude cheaper.
39const PROJECT_COST_FACTOR: f64 = 0.001;
40
41/// Primary-key lookups visit a small fraction of rows compared to a scan.
42const PRIMARY_INDEX_COST_FACTOR: f64 = 0.1;
43
44/// Bloom filters short-circuit most reads.
45const BLOOM_INDEX_COST_FACTOR: f64 = 0.01;
46
47/// Composite indexes inherit selectivity but get an additional discount.
48const COMPOSITE_INDEX_COST_FACTOR: f64 = 0.5;
49
50/// Default selectivity assumed for `bloom_<col>` index entries.
51const BLOOM_INDEX_SELECTIVITY: f64 = 0.1;
52
53/// Selectivity defaults indexed by `ComparisonOperator` semantics.
54const SELECTIVITY_EQUAL: f64 = 0.1;
55const SELECTIVITY_NOT_EQUAL: f64 = 0.9;
56const SELECTIVITY_RANGE: f64 = 0.3;
57const SELECTIVITY_IN: f64 = 0.2;
58const SELECTIVITY_NOT_IN: f64 = 0.8;
59const SELECTIVITY_LIKE: f64 = 0.5;
60
61/// Trivial fixed cost for DDL plans (CREATE/DROP TABLE/INDEX).
62const DDL_FIXED_COST: f64 = 1.0;
63
64/// Trivial fixed cost for metadata-only plans (DESCRIBE/USE).
65const METADATA_FIXED_COST: f64 = 0.1;
66
67/// Query execution plan
68#[derive(Debug, Clone)]
69pub struct QueryPlan {
70    /// Plan type
71    pub plan_type: PlanType,
72    /// Target table
73    pub table: Option<TableId>,
74    /// Estimated cost
75    pub estimated_cost: f64,
76    /// Estimated rows
77    pub estimated_rows: u64,
78    /// Selected indexes
79    pub selected_indexes: Vec<IndexSelection>,
80    /// Execution steps
81    pub steps: Vec<ExecutionStep>,
82    /// Query hints
83    pub hints: QueryHints,
84}
85
86/// Plan type enum
87#[derive(Debug, Clone, PartialEq)]
88pub enum PlanType {
89    /// Table scan
90    TableScan,
91    /// Index scan
92    IndexScan,
93    /// Point lookup
94    PointLookup,
95    /// Range scan
96    RangeScan,
97    /// Multi-table join
98    Join,
99    /// Aggregation
100    Aggregation,
101    /// Subquery
102    Subquery,
103}
104
105/// Index selection information
106#[derive(Debug, Clone)]
107pub struct IndexSelection {
108    /// Index name
109    pub index_name: String,
110    /// Columns covered by index
111    pub columns: Vec<String>,
112    /// Selectivity estimate
113    pub selectivity: f64,
114    /// Index type
115    pub index_type: IndexType,
116}
117
118/// Index type
119#[derive(Debug, Clone, PartialEq)]
120pub enum IndexType {
121    /// Primary key index
122    Primary,
123    /// Secondary index
124    Secondary,
125    /// Bloom filter
126    BloomFilter,
127    /// Composite index
128    Composite,
129}
130
131/// Execution step
132#[derive(Debug, Clone)]
133pub struct ExecutionStep {
134    /// Step type
135    pub step_type: StepType,
136    /// Columns involved
137    pub columns: Vec<String>,
138    /// Conditions to apply
139    pub conditions: Vec<Condition>,
140    /// Estimated cost
141    pub cost: f64,
142    /// Parallelization info
143    pub parallelization: ParallelizationInfo,
144}
145
146/// Execution step type
147#[derive(Debug, Clone, PartialEq)]
148pub enum StepType {
149    /// Scan table or index
150    Scan,
151    /// Filter rows
152    Filter,
153    /// Insert rows
154    Insert,
155    /// Sort results
156    Sort,
157    /// Limit results
158    Limit,
159    /// Project columns
160    Project,
161    /// Join tables
162    Join,
163    /// Aggregate rows
164    Aggregate,
165}
166
167/// Parallelization information
168#[derive(Debug, Clone)]
169pub struct ParallelizationInfo {
170    /// Can be parallelized
171    pub can_parallelize: bool,
172    /// Suggested thread count
173    pub suggested_threads: usize,
174    /// Partition key for parallel execution
175    pub partition_key: Option<String>,
176}
177
178/// Query hints and optimization settings
179#[derive(Debug, Clone, Default)]
180pub struct QueryHints {
181    /// Force index usage
182    pub force_index: Option<String>,
183    /// Disable bloom filter
184    pub disable_bloom_filter: bool,
185    /// Preferred parallelization
186    pub preferred_parallelization: Option<usize>,
187    /// Query timeout
188    pub timeout_ms: Option<u64>,
189}
190
191/// Query planner
192#[derive(Debug)]
193pub struct QueryPlanner {
194    /// Schema manager reference
195    _schema: Arc<SchemaManager>,
196    /// Configuration
197    config: Config,
198    /// Cost model
199    cost_model: CostModel,
200}
201
202/// Cost model for query optimization
203#[derive(Debug, Clone)]
204pub struct CostModel {
205    /// Cost per row scan
206    pub row_scan_cost: f64,
207    /// Cost per index lookup
208    pub index_lookup_cost: f64,
209    /// Cost per sort operation
210    pub sort_cost_per_row: f64,
211    /// Cost per join operation
212    pub join_cost_per_row: f64,
213    /// Memory cost factor
214    pub memory_cost_factor: f64,
215}
216
217impl Default for CostModel {
218    fn default() -> Self {
219        Self {
220            row_scan_cost: 1.0,
221            index_lookup_cost: 0.1,
222            sort_cost_per_row: 0.01,
223            join_cost_per_row: 0.05,
224            memory_cost_factor: 0.001,
225        }
226    }
227}
228
229/// Pull the table out of a parsed query, producing a uniform error message.
230fn require_table<'a>(query: &'a ParsedQuery, op: &str) -> Result<&'a TableId> {
231    query
232        .table
233        .as_ref()
234        .ok_or_else(|| Error::query_execution(format!("Missing table in {op}")))
235}
236
237/// Clone the conditions out of an optional WHERE clause, defaulting to empty.
238fn clone_conditions(where_clause: &Option<WhereClause>) -> Vec<Condition> {
239    where_clause
240        .as_ref()
241        .map(|w| w.conditions.clone())
242        .unwrap_or_default()
243}
244
245/// Hardcoded fallback column orderings used when an INSERT omits the column list.
246///
247/// This is a temporary fixture for tests that exercise INSERT VALUES without
248/// schema lookup; a real implementation will resolve this from `SchemaManager`.
249fn default_insert_columns(table_name: &str, value_count: usize) -> Vec<String> {
250    fn s(items: &[&str]) -> Vec<String> {
251        items.iter().map(|s| s.to_string()).collect()
252    }
253
254    match table_name {
255        "sales" => s(&["id", "region", "amount"]),
256        "orders" => s(&["id", "status", "amount"]),
257        "products" => s(&["id", "name", "price", "category"]),
258        "employees" => s(&["department", "id", "name", "salary"]),
259        "inventory" => s(&["id", "product", "quantity", "price", "active"]),
260        "customers" => s(&["id", "name", "email"]),
261        "user_data" => s(&["id", "tags", "preferences"]),
262        "performance_test" => s(&["id", "value", "category"]),
263        _ => (0..value_count).map(|i| format!("col_{i}")).collect(),
264    }
265}
266
267impl QueryPlanner {
268    /// Create a new query planner
269    pub fn new(schema: Arc<SchemaManager>, config: &Config) -> Self {
270        Self {
271            _schema: schema,
272            config: config.clone(),
273            cost_model: CostModel::default(),
274        }
275    }
276
277    /// Plan a query
278    pub async fn plan(&self, query: &ParsedQuery) -> Result<QueryPlan> {
279        match query.query_type {
280            QueryType::Select => self.plan_select(query).await,
281            QueryType::Insert => self.plan_insert(query).await,
282            QueryType::Update => self.plan_update(query).await,
283            QueryType::Delete => self.plan_delete(query).await,
284            QueryType::CreateTable => Ok(self.plan_ddl(query, PlanType::TableScan, DDL_FIXED_COST)),
285            QueryType::DropTable => Ok(self.plan_ddl(query, PlanType::TableScan, DDL_FIXED_COST)),
286            QueryType::CreateIndex => Ok(self.plan_ddl(query, PlanType::IndexScan, DDL_FIXED_COST)),
287            QueryType::DropIndex => Ok(self.plan_ddl(query, PlanType::IndexScan, DDL_FIXED_COST)),
288            QueryType::Describe => {
289                Ok(self.plan_metadata(query, PlanType::PointLookup, METADATA_FIXED_COST, 1))
290            }
291            QueryType::Use => {
292                Ok(self.plan_metadata(query, PlanType::PointLookup, METADATA_FIXED_COST, 0))
293            }
294        }
295    }
296
297    /// Configured query parallelism, falling back to the module default.
298    fn query_parallelism(&self) -> usize {
299        self.config
300            .query
301            .query_parallelism
302            .unwrap_or(DEFAULT_PARALLELISM)
303    }
304
305    /// Build a `ParallelizationInfo` for steps that can always be parallelized
306    /// using the configured thread count.
307    fn parallel_info(&self) -> ParallelizationInfo {
308        ParallelizationInfo {
309            can_parallelize: true,
310            suggested_threads: self.query_parallelism(),
311            partition_key: None,
312        }
313    }
314
315    /// `ParallelizationInfo` for inherently single-threaded steps.
316    fn serial_info() -> ParallelizationInfo {
317        ParallelizationInfo {
318            can_parallelize: false,
319            suggested_threads: 1,
320            partition_key: None,
321        }
322    }
323
324    /// Plan SELECT query
325    async fn plan_select(&self, query: &ParsedQuery) -> Result<QueryPlan> {
326        let table = require_table(query, "SELECT")?;
327
328        let table_stats = self.get_table_statistics(table).await?;
329        let index_selection = self.select_indexes(table, &query.where_clause).await?;
330        let plan_type = self.determine_plan_type(&index_selection, &query.where_clause);
331
332        let mut steps = Vec::new();
333
334        // Step 1: Scan/Lookup
335        steps.push(ExecutionStep {
336            step_type: StepType::Scan,
337            columns: query.columns.clone(),
338            conditions: clone_conditions(&query.where_clause),
339            cost: self.calculate_scan_cost(&index_selection, &table_stats),
340            parallelization: self.determine_parallelization(&index_selection, &table_stats),
341        });
342
343        // Step 2: Filter (skipped for point lookups since the scan already pinned the row)
344        if let Some(where_clause) = &query.where_clause {
345            if plan_type != PlanType::PointLookup {
346                steps.push(ExecutionStep {
347                    step_type: StepType::Filter,
348                    columns: vec![],
349                    conditions: where_clause.conditions.clone(),
350                    cost: table_stats.row_count as f64
351                        * self.cost_model.row_scan_cost
352                        * FILTER_COST_FACTOR,
353                    parallelization: self.parallel_info(),
354                });
355            }
356        }
357
358        // Step 3: Sort
359        if !query.order_by.is_empty() {
360            steps.push(ExecutionStep {
361                step_type: StepType::Sort,
362                columns: query.order_by.iter().map(|o| o.column.clone()).collect(),
363                conditions: vec![],
364                cost: table_stats.row_count as f64 * self.cost_model.sort_cost_per_row,
365                parallelization: self.parallel_info(),
366            });
367        }
368
369        // Step 4: Limit (virtually free)
370        if query.limit.is_some() {
371            steps.push(ExecutionStep {
372                step_type: StepType::Limit,
373                columns: vec![],
374                conditions: vec![],
375                cost: 0.0,
376                parallelization: Self::serial_info(),
377            });
378        }
379
380        // Step 5: Project (only when an explicit, non-`*` projection was requested)
381        if !query.columns.is_empty() && query.columns != vec!["*"] {
382            steps.push(ExecutionStep {
383                step_type: StepType::Project,
384                columns: query.columns.clone(),
385                conditions: vec![],
386                cost: table_stats.row_count as f64 * PROJECT_COST_FACTOR,
387                parallelization: self.parallel_info(),
388            });
389        }
390
391        let total_cost = steps.iter().map(|s| s.cost).sum();
392        let estimated_rows = self.estimate_result_rows(&table_stats, &query.where_clause);
393
394        Ok(QueryPlan {
395            plan_type,
396            table: Some(table.clone()),
397            estimated_cost: total_cost,
398            estimated_rows,
399            selected_indexes: index_selection,
400            steps,
401            hints: QueryHints::default(),
402        })
403    }
404
405    /// Plan INSERT query
406    async fn plan_insert(&self, query: &ParsedQuery) -> Result<QueryPlan> {
407        let table = require_table(query, "INSERT")?;
408        let _table_stats = self.get_table_statistics(table).await?;
409
410        // Determine which column names to pair with the VALUES list. When the
411        // query omits columns, fall back to a per-table hardcoded ordering.
412        let owned_default;
413        let columns: &[String] = if query.columns.is_empty() {
414            owned_default = default_insert_columns(table.name(), query.values.len());
415            &owned_default
416        } else {
417            &query.columns
418        };
419
420        // Pair each column with its corresponding value (truncated to the shorter list).
421        let conditions: Vec<Condition> = columns
422            .iter()
423            .zip(query.values.iter())
424            .map(|(column, value)| Condition {
425                column: column.clone(),
426                operator: ComparisonOperator::Equal,
427                value: value.clone(),
428            })
429            .collect();
430
431        let steps = vec![ExecutionStep {
432            step_type: StepType::Insert,
433            columns: query.columns.clone(),
434            conditions,
435            cost: self.cost_model.row_scan_cost,
436            parallelization: Self::serial_info(),
437        }];
438
439        Ok(QueryPlan {
440            plan_type: PlanType::TableScan,
441            table: Some(table.clone()),
442            estimated_cost: self.cost_model.row_scan_cost,
443            estimated_rows: 1,
444            selected_indexes: vec![],
445            steps,
446            hints: QueryHints::default(),
447        })
448    }
449
450    /// Plan UPDATE query
451    async fn plan_update(&self, query: &ParsedQuery) -> Result<QueryPlan> {
452        let table = require_table(query, "UPDATE")?;
453
454        let table_stats = self.get_table_statistics(table).await?;
455        let index_selection = self.select_indexes(table, &query.where_clause).await?;
456
457        let steps = vec![
458            ExecutionStep {
459                step_type: StepType::Scan,
460                columns: vec![],
461                conditions: clone_conditions(&query.where_clause),
462                cost: self.calculate_scan_cost(&index_selection, &table_stats),
463                parallelization: self.determine_parallelization(&index_selection, &table_stats),
464            },
465            // The "update write" pass is modeled as a Filter step today.
466            ExecutionStep {
467                step_type: StepType::Filter,
468                columns: query.set_clause.keys().cloned().collect(),
469                conditions: vec![],
470                cost: table_stats.row_count as f64
471                    * self.cost_model.row_scan_cost
472                    * UPDATE_WRITE_COST_FACTOR,
473                parallelization: self.parallel_info(),
474            },
475        ];
476
477        let total_cost = steps.iter().map(|s| s.cost).sum();
478        let estimated_rows = self.estimate_result_rows(&table_stats, &query.where_clause);
479
480        Ok(QueryPlan {
481            plan_type: PlanType::TableScan,
482            table: Some(table.clone()),
483            estimated_cost: total_cost,
484            estimated_rows,
485            selected_indexes: index_selection,
486            steps,
487            hints: QueryHints::default(),
488        })
489    }
490
491    /// Plan DELETE query
492    async fn plan_delete(&self, query: &ParsedQuery) -> Result<QueryPlan> {
493        let table = require_table(query, "DELETE")?;
494
495        let table_stats = self.get_table_statistics(table).await?;
496        let index_selection = self.select_indexes(table, &query.where_clause).await?;
497
498        let steps = vec![ExecutionStep {
499            step_type: StepType::Scan,
500            columns: vec![],
501            conditions: clone_conditions(&query.where_clause),
502            cost: self.calculate_scan_cost(&index_selection, &table_stats),
503            parallelization: self.determine_parallelization(&index_selection, &table_stats),
504        }];
505
506        let total_cost = steps.iter().map(|s| s.cost).sum();
507        let estimated_rows = self.estimate_result_rows(&table_stats, &query.where_clause);
508
509        Ok(QueryPlan {
510            plan_type: PlanType::TableScan,
511            table: Some(table.clone()),
512            estimated_cost: total_cost,
513            estimated_rows,
514            selected_indexes: index_selection,
515            steps,
516            hints: QueryHints::default(),
517        })
518    }
519
520    /// Build a stub plan for DDL operations that don't yet generate real steps.
521    fn plan_ddl(&self, query: &ParsedQuery, plan_type: PlanType, cost: f64) -> QueryPlan {
522        QueryPlan {
523            plan_type,
524            table: query.table.clone(),
525            estimated_cost: cost,
526            estimated_rows: 0,
527            selected_indexes: vec![],
528            steps: vec![],
529            hints: QueryHints::default(),
530        }
531    }
532
533    /// Build a stub plan for metadata-only queries (DESCRIBE/USE).
534    fn plan_metadata(
535        &self,
536        query: &ParsedQuery,
537        plan_type: PlanType,
538        cost: f64,
539        estimated_rows: u64,
540    ) -> QueryPlan {
541        QueryPlan {
542            plan_type,
543            table: query.table.clone(),
544            estimated_cost: cost,
545            estimated_rows,
546            selected_indexes: vec![],
547            steps: vec![],
548            hints: QueryHints::default(),
549        }
550    }
551
552    /// Select optimal indexes for the query
553    async fn select_indexes(
554        &self,
555        _table: &TableId,
556        where_clause: &Option<WhereClause>,
557    ) -> Result<Vec<IndexSelection>> {
558        let mut selections = Vec::new();
559
560        // Always consider the primary key.
561        selections.push(IndexSelection {
562            index_name: "PRIMARY".to_string(),
563            columns: vec!["id".to_string()], // Simplified
564            selectivity: 1.0,
565            index_type: IndexType::Primary,
566        });
567
568        if let Some(where_clause) = where_clause {
569            // Per-condition: a synthetic secondary index, plus a bloom filter
570            // when the operator is equality. The order below preserves the
571            // original step output (all secondaries first, then all blooms).
572            for condition in &where_clause.conditions {
573                selections.push(IndexSelection {
574                    index_name: format!("idx_{}", condition.column),
575                    columns: vec![condition.column.clone()],
576                    selectivity: self.estimate_selectivity(condition),
577                    index_type: IndexType::Secondary,
578                });
579            }
580            for condition in &where_clause.conditions {
581                if condition.operator == ComparisonOperator::Equal {
582                    selections.push(IndexSelection {
583                        index_name: format!("bloom_{}", condition.column),
584                        columns: vec![condition.column.clone()],
585                        selectivity: BLOOM_INDEX_SELECTIVITY,
586                        index_type: IndexType::BloomFilter,
587                    });
588                }
589            }
590        }
591
592        Ok(selections)
593    }
594
595    /// Determine plan type based on index selection
596    fn determine_plan_type(
597        &self,
598        index_selection: &[IndexSelection],
599        where_clause: &Option<WhereClause>,
600    ) -> PlanType {
601        let Some(where_clause) = where_clause else {
602            return PlanType::TableScan;
603        };
604
605        let primary_columns: Vec<&str> = index_selection
606            .iter()
607            .filter(|idx| idx.index_type == IndexType::Primary)
608            .flat_map(|idx| idx.columns.iter().map(String::as_str))
609            .collect();
610
611        let mut has_range = false;
612        for condition in &where_clause.conditions {
613            match condition.operator {
614                ComparisonOperator::Equal => {
615                    if primary_columns.iter().any(|c| *c == condition.column) {
616                        return PlanType::PointLookup;
617                    }
618                }
619                ComparisonOperator::LessThan
620                | ComparisonOperator::LessThanOrEqual
621                | ComparisonOperator::GreaterThan
622                | ComparisonOperator::GreaterThanOrEqual => {
623                    has_range = true;
624                }
625                _ => {}
626            }
627        }
628
629        if has_range {
630            return PlanType::RangeScan;
631        }
632
633        if index_selection
634            .iter()
635            .any(|idx| idx.index_type == IndexType::Secondary)
636        {
637            return PlanType::IndexScan;
638        }
639
640        PlanType::TableScan
641    }
642
643    /// Calculate scan cost based on index selection
644    fn calculate_scan_cost(
645        &self,
646        index_selection: &[IndexSelection],
647        table_stats: &TableStatistics,
648    ) -> f64 {
649        let rows = table_stats.row_count as f64;
650        let base_lookup = rows * self.cost_model.index_lookup_cost;
651        let mut min_cost = rows * self.cost_model.row_scan_cost;
652
653        for index in index_selection {
654            let index_cost = match index.index_type {
655                IndexType::Primary => base_lookup * PRIMARY_INDEX_COST_FACTOR,
656                IndexType::Secondary => base_lookup * index.selectivity,
657                IndexType::BloomFilter => base_lookup * BLOOM_INDEX_COST_FACTOR,
658                IndexType::Composite => {
659                    base_lookup * index.selectivity * COMPOSITE_INDEX_COST_FACTOR
660                }
661            };
662            min_cost = min_cost.min(index_cost);
663        }
664
665        min_cost
666    }
667
668    /// Determine parallelization strategy
669    fn determine_parallelization(
670        &self,
671        index_selection: &[IndexSelection],
672        table_stats: &TableStatistics,
673    ) -> ParallelizationInfo {
674        let can_parallelize = table_stats.row_count > PARALLELIZATION_ROW_THRESHOLD;
675        let suggested_threads = if can_parallelize {
676            self.query_parallelism()
677        } else {
678            1
679        };
680
681        let partition_key = index_selection
682            .iter()
683            .find(|idx| idx.index_type == IndexType::Primary)
684            .and_then(|idx| idx.columns.first())
685            .cloned();
686
687        ParallelizationInfo {
688            can_parallelize,
689            suggested_threads,
690            partition_key,
691        }
692    }
693
694    /// Estimate selectivity of a condition
695    fn estimate_selectivity(&self, condition: &Condition) -> f64 {
696        match condition.operator {
697            ComparisonOperator::Equal => SELECTIVITY_EQUAL,
698            ComparisonOperator::NotEqual => SELECTIVITY_NOT_EQUAL,
699            ComparisonOperator::LessThan
700            | ComparisonOperator::LessThanOrEqual
701            | ComparisonOperator::GreaterThan
702            | ComparisonOperator::GreaterThanOrEqual => SELECTIVITY_RANGE,
703            ComparisonOperator::In => SELECTIVITY_IN,
704            ComparisonOperator::NotIn => SELECTIVITY_NOT_IN,
705            ComparisonOperator::Like | ComparisonOperator::NotLike => SELECTIVITY_LIKE,
706        }
707    }
708
709    /// Estimate result rows
710    fn estimate_result_rows(
711        &self,
712        table_stats: &TableStatistics,
713        where_clause: &Option<WhereClause>,
714    ) -> u64 {
715        let selectivity = where_clause
716            .as_ref()
717            .map(|w| {
718                w.conditions
719                    .iter()
720                    .map(|c| self.estimate_selectivity(c))
721                    .product::<f64>()
722            })
723            .unwrap_or(1.0);
724
725        (table_stats.row_count as f64 * selectivity) as u64
726    }
727
728    /// Get table statistics
729    async fn get_table_statistics(&self, _table: &TableId) -> Result<TableStatistics> {
730        // In a real implementation, this would query actual table statistics.
731        Ok(TableStatistics {
732            row_count: 100_000,
733            avg_row_size: 256,
734            table_size: 25_600_000,
735            index_count: 3,
736        })
737    }
738}
739
740/// Table statistics for cost estimation
741#[derive(Debug, Clone)]
742pub struct TableStatistics {
743    /// Number of rows in table
744    pub row_count: u64,
745    /// Average row size in bytes
746    pub avg_row_size: u32,
747    /// Total table size in bytes
748    pub table_size: u64,
749    /// Number of indexes
750    pub index_count: u32,
751}
752
753#[cfg(test)]
754mod tests {
755    use super::*;
756    use crate::Config;
757    use std::sync::Arc;
758    use tempfile::TempDir;
759
760    /// Build a planner backed by a fresh temp dir. Tests in this module don't
761    /// need the storage engine; constructing it eats most of the test runtime.
762    async fn make_planner() -> (TempDir, QueryPlanner) {
763        let temp_dir = TempDir::new().unwrap();
764        let config = Config::default();
765        let schema = Arc::new(
766            crate::schema::SchemaManager::new(temp_dir.path())
767                .await
768                .unwrap(),
769        );
770        let planner = QueryPlanner::new(schema, &config);
771        (temp_dir, planner)
772    }
773
774    #[tokio::test]
775    async fn test_query_planner_creation() {
776        let (_tmp, planner) = make_planner().await;
777        assert_eq!(planner.cost_model.row_scan_cost, 1.0);
778    }
779
780    #[tokio::test]
781    async fn test_plan_type_determination() {
782        let (_tmp, planner) = make_planner().await;
783
784        let index_selection = vec![IndexSelection {
785            index_name: "PRIMARY".to_string(),
786            columns: vec!["id".to_string()],
787            selectivity: 1.0,
788            index_type: IndexType::Primary,
789        }];
790
791        let where_clause = Some(WhereClause {
792            conditions: vec![Condition {
793                column: "id".to_string(),
794                operator: ComparisonOperator::Equal,
795                value: crate::Value::Integer(1),
796            }],
797        });
798
799        let plan_type = planner.determine_plan_type(&index_selection, &where_clause);
800        assert_eq!(plan_type, PlanType::PointLookup);
801    }
802
803    #[tokio::test]
804    async fn test_selectivity_estimation() {
805        let (_tmp, planner) = make_planner().await;
806
807        let condition = Condition {
808            column: "name".to_string(),
809            operator: ComparisonOperator::Equal,
810            value: crate::Value::Text("test".to_string()),
811        };
812
813        let selectivity = planner.estimate_selectivity(&condition);
814        assert_eq!(selectivity, SELECTIVITY_EQUAL);
815    }
816}