1use super::{ComparisonOperator, Condition, ParsedQuery, QueryType, WhereClause};
18use crate::{schema::SchemaManager, Config, Error, Result, TableId};
19use std::sync::Arc;
20
21const DEFAULT_PARALLELISM: usize = 4;
28
29const PARALLELIZATION_ROW_THRESHOLD: u64 = 10_000;
31
32const FILTER_COST_FACTOR: f64 = 0.1;
34
35const UPDATE_WRITE_COST_FACTOR: f64 = 0.5;
37
38const PROJECT_COST_FACTOR: f64 = 0.001;
40
41const PRIMARY_INDEX_COST_FACTOR: f64 = 0.1;
43
44const BLOOM_INDEX_COST_FACTOR: f64 = 0.01;
46
47const COMPOSITE_INDEX_COST_FACTOR: f64 = 0.5;
49
50const BLOOM_INDEX_SELECTIVITY: f64 = 0.1;
52
53const SELECTIVITY_EQUAL: f64 = 0.1;
55const SELECTIVITY_NOT_EQUAL: f64 = 0.9;
56const SELECTIVITY_RANGE: f64 = 0.3;
57const SELECTIVITY_IN: f64 = 0.2;
58const SELECTIVITY_NOT_IN: f64 = 0.8;
59const SELECTIVITY_LIKE: f64 = 0.5;
60
61const DDL_FIXED_COST: f64 = 1.0;
63
64const METADATA_FIXED_COST: f64 = 0.1;
66
67#[derive(Debug, Clone)]
69pub struct QueryPlan {
70 pub plan_type: PlanType,
72 pub table: Option<TableId>,
74 pub estimated_cost: f64,
76 pub estimated_rows: u64,
78 pub selected_indexes: Vec<IndexSelection>,
80 pub steps: Vec<ExecutionStep>,
82 pub hints: QueryHints,
84}
85
86#[derive(Debug, Clone, PartialEq)]
88pub enum PlanType {
89 TableScan,
91 IndexScan,
93 PointLookup,
95 RangeScan,
97 Join,
99 Aggregation,
101 Subquery,
103}
104
105#[derive(Debug, Clone)]
107pub struct IndexSelection {
108 pub index_name: String,
110 pub columns: Vec<String>,
112 pub selectivity: f64,
114 pub index_type: IndexType,
116}
117
118#[derive(Debug, Clone, PartialEq)]
120pub enum IndexType {
121 Primary,
123 Secondary,
125 BloomFilter,
127 Composite,
129}
130
131#[derive(Debug, Clone)]
133pub struct ExecutionStep {
134 pub step_type: StepType,
136 pub columns: Vec<String>,
138 pub conditions: Vec<Condition>,
140 pub cost: f64,
142 pub parallelization: ParallelizationInfo,
144}
145
146#[derive(Debug, Clone, PartialEq)]
148pub enum StepType {
149 Scan,
151 Filter,
153 Insert,
155 Sort,
157 Limit,
159 Project,
161 Join,
163 Aggregate,
165}
166
167#[derive(Debug, Clone)]
169pub struct ParallelizationInfo {
170 pub can_parallelize: bool,
172 pub suggested_threads: usize,
174 pub partition_key: Option<String>,
176}
177
178#[derive(Debug, Clone, Default)]
180pub struct QueryHints {
181 pub force_index: Option<String>,
183 pub disable_bloom_filter: bool,
185 pub preferred_parallelization: Option<usize>,
187 pub timeout_ms: Option<u64>,
189}
190
191#[derive(Debug)]
193pub struct QueryPlanner {
194 _schema: Arc<SchemaManager>,
196 config: Config,
198 cost_model: CostModel,
200}
201
202#[derive(Debug, Clone)]
204pub struct CostModel {
205 pub row_scan_cost: f64,
207 pub index_lookup_cost: f64,
209 pub sort_cost_per_row: f64,
211 pub join_cost_per_row: f64,
213 pub memory_cost_factor: f64,
215}
216
217impl Default for CostModel {
218 fn default() -> Self {
219 Self {
220 row_scan_cost: 1.0,
221 index_lookup_cost: 0.1,
222 sort_cost_per_row: 0.01,
223 join_cost_per_row: 0.05,
224 memory_cost_factor: 0.001,
225 }
226 }
227}
228
229fn require_table<'a>(query: &'a ParsedQuery, op: &str) -> Result<&'a TableId> {
231 query
232 .table
233 .as_ref()
234 .ok_or_else(|| Error::query_execution(format!("Missing table in {op}")))
235}
236
237fn clone_conditions(where_clause: &Option<WhereClause>) -> Vec<Condition> {
239 where_clause
240 .as_ref()
241 .map(|w| w.conditions.clone())
242 .unwrap_or_default()
243}
244
245fn default_insert_columns(table_name: &str, value_count: usize) -> Vec<String> {
250 fn s(items: &[&str]) -> Vec<String> {
251 items.iter().map(|s| s.to_string()).collect()
252 }
253
254 match table_name {
255 "sales" => s(&["id", "region", "amount"]),
256 "orders" => s(&["id", "status", "amount"]),
257 "products" => s(&["id", "name", "price", "category"]),
258 "employees" => s(&["department", "id", "name", "salary"]),
259 "inventory" => s(&["id", "product", "quantity", "price", "active"]),
260 "customers" => s(&["id", "name", "email"]),
261 "user_data" => s(&["id", "tags", "preferences"]),
262 "performance_test" => s(&["id", "value", "category"]),
263 _ => (0..value_count).map(|i| format!("col_{i}")).collect(),
264 }
265}
266
267impl QueryPlanner {
268 pub fn new(schema: Arc<SchemaManager>, config: &Config) -> Self {
270 Self {
271 _schema: schema,
272 config: config.clone(),
273 cost_model: CostModel::default(),
274 }
275 }
276
277 pub async fn plan(&self, query: &ParsedQuery) -> Result<QueryPlan> {
279 match query.query_type {
280 QueryType::Select => self.plan_select(query).await,
281 QueryType::Insert => self.plan_insert(query).await,
282 QueryType::Update => self.plan_update(query).await,
283 QueryType::Delete => self.plan_delete(query).await,
284 QueryType::CreateTable => Ok(self.plan_ddl(query, PlanType::TableScan, DDL_FIXED_COST)),
285 QueryType::DropTable => Ok(self.plan_ddl(query, PlanType::TableScan, DDL_FIXED_COST)),
286 QueryType::CreateIndex => Ok(self.plan_ddl(query, PlanType::IndexScan, DDL_FIXED_COST)),
287 QueryType::DropIndex => Ok(self.plan_ddl(query, PlanType::IndexScan, DDL_FIXED_COST)),
288 QueryType::Describe => {
289 Ok(self.plan_metadata(query, PlanType::PointLookup, METADATA_FIXED_COST, 1))
290 }
291 QueryType::Use => {
292 Ok(self.plan_metadata(query, PlanType::PointLookup, METADATA_FIXED_COST, 0))
293 }
294 }
295 }
296
297 fn query_parallelism(&self) -> usize {
299 self.config
300 .query
301 .query_parallelism
302 .unwrap_or(DEFAULT_PARALLELISM)
303 }
304
305 fn parallel_info(&self) -> ParallelizationInfo {
308 ParallelizationInfo {
309 can_parallelize: true,
310 suggested_threads: self.query_parallelism(),
311 partition_key: None,
312 }
313 }
314
315 fn serial_info() -> ParallelizationInfo {
317 ParallelizationInfo {
318 can_parallelize: false,
319 suggested_threads: 1,
320 partition_key: None,
321 }
322 }
323
324 async fn plan_select(&self, query: &ParsedQuery) -> Result<QueryPlan> {
326 let table = require_table(query, "SELECT")?;
327
328 let table_stats = self.get_table_statistics(table).await?;
329 let index_selection = self.select_indexes(table, &query.where_clause).await?;
330 let plan_type = self.determine_plan_type(&index_selection, &query.where_clause);
331
332 let mut steps = Vec::new();
333
334 steps.push(ExecutionStep {
336 step_type: StepType::Scan,
337 columns: query.columns.clone(),
338 conditions: clone_conditions(&query.where_clause),
339 cost: self.calculate_scan_cost(&index_selection, &table_stats),
340 parallelization: self.determine_parallelization(&index_selection, &table_stats),
341 });
342
343 if let Some(where_clause) = &query.where_clause {
345 if plan_type != PlanType::PointLookup {
346 steps.push(ExecutionStep {
347 step_type: StepType::Filter,
348 columns: vec![],
349 conditions: where_clause.conditions.clone(),
350 cost: table_stats.row_count as f64
351 * self.cost_model.row_scan_cost
352 * FILTER_COST_FACTOR,
353 parallelization: self.parallel_info(),
354 });
355 }
356 }
357
358 if !query.order_by.is_empty() {
360 steps.push(ExecutionStep {
361 step_type: StepType::Sort,
362 columns: query.order_by.iter().map(|o| o.column.clone()).collect(),
363 conditions: vec![],
364 cost: table_stats.row_count as f64 * self.cost_model.sort_cost_per_row,
365 parallelization: self.parallel_info(),
366 });
367 }
368
369 if query.limit.is_some() {
371 steps.push(ExecutionStep {
372 step_type: StepType::Limit,
373 columns: vec![],
374 conditions: vec![],
375 cost: 0.0,
376 parallelization: Self::serial_info(),
377 });
378 }
379
380 if !query.columns.is_empty() && query.columns != vec!["*"] {
382 steps.push(ExecutionStep {
383 step_type: StepType::Project,
384 columns: query.columns.clone(),
385 conditions: vec![],
386 cost: table_stats.row_count as f64 * PROJECT_COST_FACTOR,
387 parallelization: self.parallel_info(),
388 });
389 }
390
391 let total_cost = steps.iter().map(|s| s.cost).sum();
392 let estimated_rows = self.estimate_result_rows(&table_stats, &query.where_clause);
393
394 Ok(QueryPlan {
395 plan_type,
396 table: Some(table.clone()),
397 estimated_cost: total_cost,
398 estimated_rows,
399 selected_indexes: index_selection,
400 steps,
401 hints: QueryHints::default(),
402 })
403 }
404
405 async fn plan_insert(&self, query: &ParsedQuery) -> Result<QueryPlan> {
407 let table = require_table(query, "INSERT")?;
408 let _table_stats = self.get_table_statistics(table).await?;
409
410 let owned_default;
413 let columns: &[String] = if query.columns.is_empty() {
414 owned_default = default_insert_columns(table.name(), query.values.len());
415 &owned_default
416 } else {
417 &query.columns
418 };
419
420 let conditions: Vec<Condition> = columns
422 .iter()
423 .zip(query.values.iter())
424 .map(|(column, value)| Condition {
425 column: column.clone(),
426 operator: ComparisonOperator::Equal,
427 value: value.clone(),
428 })
429 .collect();
430
431 let steps = vec![ExecutionStep {
432 step_type: StepType::Insert,
433 columns: query.columns.clone(),
434 conditions,
435 cost: self.cost_model.row_scan_cost,
436 parallelization: Self::serial_info(),
437 }];
438
439 Ok(QueryPlan {
440 plan_type: PlanType::TableScan,
441 table: Some(table.clone()),
442 estimated_cost: self.cost_model.row_scan_cost,
443 estimated_rows: 1,
444 selected_indexes: vec![],
445 steps,
446 hints: QueryHints::default(),
447 })
448 }
449
450 async fn plan_update(&self, query: &ParsedQuery) -> Result<QueryPlan> {
452 let table = require_table(query, "UPDATE")?;
453
454 let table_stats = self.get_table_statistics(table).await?;
455 let index_selection = self.select_indexes(table, &query.where_clause).await?;
456
457 let steps = vec![
458 ExecutionStep {
459 step_type: StepType::Scan,
460 columns: vec![],
461 conditions: clone_conditions(&query.where_clause),
462 cost: self.calculate_scan_cost(&index_selection, &table_stats),
463 parallelization: self.determine_parallelization(&index_selection, &table_stats),
464 },
465 ExecutionStep {
467 step_type: StepType::Filter,
468 columns: query.set_clause.keys().cloned().collect(),
469 conditions: vec![],
470 cost: table_stats.row_count as f64
471 * self.cost_model.row_scan_cost
472 * UPDATE_WRITE_COST_FACTOR,
473 parallelization: self.parallel_info(),
474 },
475 ];
476
477 let total_cost = steps.iter().map(|s| s.cost).sum();
478 let estimated_rows = self.estimate_result_rows(&table_stats, &query.where_clause);
479
480 Ok(QueryPlan {
481 plan_type: PlanType::TableScan,
482 table: Some(table.clone()),
483 estimated_cost: total_cost,
484 estimated_rows,
485 selected_indexes: index_selection,
486 steps,
487 hints: QueryHints::default(),
488 })
489 }
490
491 async fn plan_delete(&self, query: &ParsedQuery) -> Result<QueryPlan> {
493 let table = require_table(query, "DELETE")?;
494
495 let table_stats = self.get_table_statistics(table).await?;
496 let index_selection = self.select_indexes(table, &query.where_clause).await?;
497
498 let steps = vec![ExecutionStep {
499 step_type: StepType::Scan,
500 columns: vec![],
501 conditions: clone_conditions(&query.where_clause),
502 cost: self.calculate_scan_cost(&index_selection, &table_stats),
503 parallelization: self.determine_parallelization(&index_selection, &table_stats),
504 }];
505
506 let total_cost = steps.iter().map(|s| s.cost).sum();
507 let estimated_rows = self.estimate_result_rows(&table_stats, &query.where_clause);
508
509 Ok(QueryPlan {
510 plan_type: PlanType::TableScan,
511 table: Some(table.clone()),
512 estimated_cost: total_cost,
513 estimated_rows,
514 selected_indexes: index_selection,
515 steps,
516 hints: QueryHints::default(),
517 })
518 }
519
520 fn plan_ddl(&self, query: &ParsedQuery, plan_type: PlanType, cost: f64) -> QueryPlan {
522 QueryPlan {
523 plan_type,
524 table: query.table.clone(),
525 estimated_cost: cost,
526 estimated_rows: 0,
527 selected_indexes: vec![],
528 steps: vec![],
529 hints: QueryHints::default(),
530 }
531 }
532
533 fn plan_metadata(
535 &self,
536 query: &ParsedQuery,
537 plan_type: PlanType,
538 cost: f64,
539 estimated_rows: u64,
540 ) -> QueryPlan {
541 QueryPlan {
542 plan_type,
543 table: query.table.clone(),
544 estimated_cost: cost,
545 estimated_rows,
546 selected_indexes: vec![],
547 steps: vec![],
548 hints: QueryHints::default(),
549 }
550 }
551
552 async fn select_indexes(
554 &self,
555 _table: &TableId,
556 where_clause: &Option<WhereClause>,
557 ) -> Result<Vec<IndexSelection>> {
558 let mut selections = Vec::new();
559
560 selections.push(IndexSelection {
562 index_name: "PRIMARY".to_string(),
563 columns: vec!["id".to_string()], selectivity: 1.0,
565 index_type: IndexType::Primary,
566 });
567
568 if let Some(where_clause) = where_clause {
569 for condition in &where_clause.conditions {
573 selections.push(IndexSelection {
574 index_name: format!("idx_{}", condition.column),
575 columns: vec![condition.column.clone()],
576 selectivity: self.estimate_selectivity(condition),
577 index_type: IndexType::Secondary,
578 });
579 }
580 for condition in &where_clause.conditions {
581 if condition.operator == ComparisonOperator::Equal {
582 selections.push(IndexSelection {
583 index_name: format!("bloom_{}", condition.column),
584 columns: vec![condition.column.clone()],
585 selectivity: BLOOM_INDEX_SELECTIVITY,
586 index_type: IndexType::BloomFilter,
587 });
588 }
589 }
590 }
591
592 Ok(selections)
593 }
594
595 fn determine_plan_type(
597 &self,
598 index_selection: &[IndexSelection],
599 where_clause: &Option<WhereClause>,
600 ) -> PlanType {
601 let Some(where_clause) = where_clause else {
602 return PlanType::TableScan;
603 };
604
605 let primary_columns: Vec<&str> = index_selection
606 .iter()
607 .filter(|idx| idx.index_type == IndexType::Primary)
608 .flat_map(|idx| idx.columns.iter().map(String::as_str))
609 .collect();
610
611 let mut has_range = false;
612 for condition in &where_clause.conditions {
613 match condition.operator {
614 ComparisonOperator::Equal => {
615 if primary_columns.iter().any(|c| *c == condition.column) {
616 return PlanType::PointLookup;
617 }
618 }
619 ComparisonOperator::LessThan
620 | ComparisonOperator::LessThanOrEqual
621 | ComparisonOperator::GreaterThan
622 | ComparisonOperator::GreaterThanOrEqual => {
623 has_range = true;
624 }
625 _ => {}
626 }
627 }
628
629 if has_range {
630 return PlanType::RangeScan;
631 }
632
633 if index_selection
634 .iter()
635 .any(|idx| idx.index_type == IndexType::Secondary)
636 {
637 return PlanType::IndexScan;
638 }
639
640 PlanType::TableScan
641 }
642
643 fn calculate_scan_cost(
645 &self,
646 index_selection: &[IndexSelection],
647 table_stats: &TableStatistics,
648 ) -> f64 {
649 let rows = table_stats.row_count as f64;
650 let base_lookup = rows * self.cost_model.index_lookup_cost;
651 let mut min_cost = rows * self.cost_model.row_scan_cost;
652
653 for index in index_selection {
654 let index_cost = match index.index_type {
655 IndexType::Primary => base_lookup * PRIMARY_INDEX_COST_FACTOR,
656 IndexType::Secondary => base_lookup * index.selectivity,
657 IndexType::BloomFilter => base_lookup * BLOOM_INDEX_COST_FACTOR,
658 IndexType::Composite => {
659 base_lookup * index.selectivity * COMPOSITE_INDEX_COST_FACTOR
660 }
661 };
662 min_cost = min_cost.min(index_cost);
663 }
664
665 min_cost
666 }
667
668 fn determine_parallelization(
670 &self,
671 index_selection: &[IndexSelection],
672 table_stats: &TableStatistics,
673 ) -> ParallelizationInfo {
674 let can_parallelize = table_stats.row_count > PARALLELIZATION_ROW_THRESHOLD;
675 let suggested_threads = if can_parallelize {
676 self.query_parallelism()
677 } else {
678 1
679 };
680
681 let partition_key = index_selection
682 .iter()
683 .find(|idx| idx.index_type == IndexType::Primary)
684 .and_then(|idx| idx.columns.first())
685 .cloned();
686
687 ParallelizationInfo {
688 can_parallelize,
689 suggested_threads,
690 partition_key,
691 }
692 }
693
694 fn estimate_selectivity(&self, condition: &Condition) -> f64 {
696 match condition.operator {
697 ComparisonOperator::Equal => SELECTIVITY_EQUAL,
698 ComparisonOperator::NotEqual => SELECTIVITY_NOT_EQUAL,
699 ComparisonOperator::LessThan
700 | ComparisonOperator::LessThanOrEqual
701 | ComparisonOperator::GreaterThan
702 | ComparisonOperator::GreaterThanOrEqual => SELECTIVITY_RANGE,
703 ComparisonOperator::In => SELECTIVITY_IN,
704 ComparisonOperator::NotIn => SELECTIVITY_NOT_IN,
705 ComparisonOperator::Like | ComparisonOperator::NotLike => SELECTIVITY_LIKE,
706 }
707 }
708
709 fn estimate_result_rows(
711 &self,
712 table_stats: &TableStatistics,
713 where_clause: &Option<WhereClause>,
714 ) -> u64 {
715 let selectivity = where_clause
716 .as_ref()
717 .map(|w| {
718 w.conditions
719 .iter()
720 .map(|c| self.estimate_selectivity(c))
721 .product::<f64>()
722 })
723 .unwrap_or(1.0);
724
725 (table_stats.row_count as f64 * selectivity) as u64
726 }
727
728 async fn get_table_statistics(&self, _table: &TableId) -> Result<TableStatistics> {
730 Ok(TableStatistics {
732 row_count: 100_000,
733 avg_row_size: 256,
734 table_size: 25_600_000,
735 index_count: 3,
736 })
737 }
738}
739
740#[derive(Debug, Clone)]
742pub struct TableStatistics {
743 pub row_count: u64,
745 pub avg_row_size: u32,
747 pub table_size: u64,
749 pub index_count: u32,
751}
752
753#[cfg(test)]
754mod tests {
755 use super::*;
756 use crate::Config;
757 use std::sync::Arc;
758 use tempfile::TempDir;
759
760 async fn make_planner() -> (TempDir, QueryPlanner) {
763 let temp_dir = TempDir::new().unwrap();
764 let config = Config::default();
765 let schema = Arc::new(
766 crate::schema::SchemaManager::new(temp_dir.path())
767 .await
768 .unwrap(),
769 );
770 let planner = QueryPlanner::new(schema, &config);
771 (temp_dir, planner)
772 }
773
774 #[tokio::test]
775 async fn test_query_planner_creation() {
776 let (_tmp, planner) = make_planner().await;
777 assert_eq!(planner.cost_model.row_scan_cost, 1.0);
778 }
779
780 #[tokio::test]
781 async fn test_plan_type_determination() {
782 let (_tmp, planner) = make_planner().await;
783
784 let index_selection = vec![IndexSelection {
785 index_name: "PRIMARY".to_string(),
786 columns: vec!["id".to_string()],
787 selectivity: 1.0,
788 index_type: IndexType::Primary,
789 }];
790
791 let where_clause = Some(WhereClause {
792 conditions: vec![Condition {
793 column: "id".to_string(),
794 operator: ComparisonOperator::Equal,
795 value: crate::Value::Integer(1),
796 }],
797 });
798
799 let plan_type = planner.determine_plan_type(&index_selection, &where_clause);
800 assert_eq!(plan_type, PlanType::PointLookup);
801 }
802
803 #[tokio::test]
804 async fn test_selectivity_estimation() {
805 let (_tmp, planner) = make_planner().await;
806
807 let condition = Condition {
808 column: "name".to_string(),
809 operator: ComparisonOperator::Equal,
810 value: crate::Value::Text("test".to_string()),
811 };
812
813 let selectivity = planner.estimate_selectivity(&condition);
814 assert_eq!(selectivity, SELECTIVITY_EQUAL);
815 }
816}