1use crate::cost::{CostModel, UnitCostModel};
2use crate::memo::Memo;
3use crate::rules::{RuleContext, RuleSet};
4use chryso_metadata::StatsCache;
5use chryso_planner::{LogicalPlan, PhysicalPlan};
6use std::collections::HashSet;
7
8pub mod column_prune;
9pub mod cost;
10pub mod cost_profile;
11pub mod enforcer;
12pub mod estimation;
13pub mod expr_rewrite;
14pub mod join_order;
15pub mod memo;
16pub mod physical_rules;
17pub mod plan_fingerprint;
18pub mod properties;
19pub mod rules;
20pub mod stats_collect;
21pub mod subquery;
22pub mod utils;
23
24pub use cost::CostModelConfig;
25pub use cost_profile::CostProfile;
26pub use plan_fingerprint::{logical_fingerprint, physical_fingerprint};
27
28#[derive(Debug)]
29pub struct OptimizerTrace {
30 pub applied_rules: Vec<String>,
31 pub stats_loaded: Vec<String>,
32 pub conflicting_literals: Vec<(String, String)>,
33 pub warnings: Vec<String>,
34}
35
36impl OptimizerTrace {
37 pub fn new() -> Self {
38 Self {
39 applied_rules: Vec::new(),
40 stats_loaded: Vec::new(),
41 conflicting_literals: Vec::new(),
42 warnings: Vec::new(),
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
48pub struct MemoTrace {
49 pub groups: Vec<MemoTraceGroup>,
50}
51
52#[derive(Debug, Clone)]
53pub struct MemoTraceGroup {
54 pub id: usize,
55 pub candidates: Vec<MemoTraceCandidate>,
56}
57
58#[derive(Debug, Clone)]
59pub struct MemoTraceCandidate {
60 pub cost: f64,
61 pub plan: String,
62}
63
64impl MemoTrace {
65 pub fn format_full(&self) -> String {
66 format_memo_trace(self, false)
67 }
68
69 pub fn format_best_only(&self) -> String {
70 format_memo_trace(self, true)
71 }
72}
73
74fn format_memo_trace(trace: &MemoTrace, best_only: bool) -> String {
75 use std::fmt::Write;
76
77 let mut output = String::new();
78 for group in &trace.groups {
79 let _ = writeln!(
80 &mut output,
81 "group={} candidates={}",
82 group.id,
83 group.candidates.len()
84 );
85 if best_only {
86 if let Some(best) = group.candidates.first() {
87 write_candidate(&mut output, best);
88 }
89 continue;
90 }
91 for candidate in &group.candidates {
92 write_candidate(&mut output, candidate);
93 }
94 }
95 output
96}
97
98fn write_candidate(output: &mut String, candidate: &MemoTraceCandidate) {
99 use std::fmt::Write;
100
101 let _ = writeln!(output, " cost={:.3}", candidate.cost);
102 for line in candidate.plan.lines() {
103 let _ = writeln!(output, " {line}");
104 }
105}
106
107pub struct OptimizerConfig {
108 pub enable_cascades: bool,
109 pub enable_properties: bool,
110 pub rules: RuleSet,
111 pub rule_config: RuleConfig,
112 pub search_budget: SearchBudget,
113 pub trace: bool,
114 pub debug_rules: bool,
115 pub stats_provider: Option<std::sync::Arc<dyn chryso_metadata::StatsProvider>>,
116 pub cost_config: Option<CostModelConfig>,
117 pub system_params: Option<std::sync::Arc<chryso_core::system_params::SystemParamRegistry>>,
118 pub tenant_id: Option<String>,
119}
120
121impl std::fmt::Debug for OptimizerConfig {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 f.debug_struct("OptimizerConfig")
124 .field("enable_cascades", &self.enable_cascades)
125 .field("enable_properties", &self.enable_properties)
126 .field("rules", &self.rules)
127 .field("rule_config", &self.rule_config)
128 .field("search_budget", &self.search_budget)
129 .field("trace", &self.trace)
130 .field("debug_rules", &self.debug_rules)
131 .field("stats_provider", &self.stats_provider.is_some())
132 .field("cost_config", &self.cost_config.is_some())
133 .field("system_params", &self.system_params.is_some())
134 .field("tenant_id", &self.tenant_id)
135 .finish()
136 }
137}
138
139#[derive(Clone, Debug)]
140pub struct RuleConfig {
141 pub enabled_rules: Option<HashSet<String>>,
142 pub disabled_rules: HashSet<String>,
143}
144
145impl RuleConfig {
146 pub fn is_enabled(&self, name: &str) -> bool {
147 if let Some(enabled) = &self.enabled_rules {
148 return enabled.contains(name);
149 }
150 !self.disabled_rules.contains(name)
151 }
152}
153
154impl Default for RuleConfig {
155 fn default() -> Self {
156 Self {
157 enabled_rules: None,
158 disabled_rules: HashSet::new(),
159 }
160 }
161}
162
163#[derive(Clone, Debug, Default)]
164pub struct SearchBudget {
165 pub max_groups: Option<usize>,
166 pub max_rewrites: Option<usize>,
167}
168
169#[cfg(test)]
170mod tests {
171 use super::cost::UnitCostModel;
172 use super::{CascadesOptimizer, OptimizerConfig};
173 use crate::CostModelConfig;
174 use chryso_core::ast::{BinaryOperator, Expr, Literal};
175 use chryso_metadata::{StatsCache, StatsSnapshot, type_inference::SimpleTypeInferencer};
176 use chryso_parser::{Dialect, ParserConfig, SimpleParser, SqlParser};
177 use chryso_planner::{LogicalPlan, PhysicalPlan, PlanBuilder};
178 use serde_json;
179 use std::collections::HashSet;
180
181 #[test]
182 fn explain_with_types_and_costs() {
183 let sql = "select sum(amount) from sales group by region";
184 let parser = SimpleParser::new(ParserConfig {
185 dialect: Dialect::Postgres,
186 });
187 let stmt = parser.parse(sql).expect("parse");
188 let logical = PlanBuilder::build(stmt).expect("plan");
189 let typed = logical.explain_typed(0, &SimpleTypeInferencer);
190 assert!(typed.contains("LogicalAggregate"));
191
192 let physical = CascadesOptimizer::new(OptimizerConfig::default())
193 .optimize(&logical, &mut StatsCache::new());
194 let costed = physical.explain_costed(0, &UnitCostModel);
195 assert!(costed.contains("cost="));
196 }
197
198 #[test]
199 fn optimizer_respects_disabled_rule() {
200 let logical = LogicalPlan::Filter {
201 predicate: Expr::Literal(Literal::Bool(true)),
202 input: Box::new(LogicalPlan::Scan {
203 table: "t".to_string(),
204 }),
205 };
206 let mut config = OptimizerConfig::default();
207 config
208 .rule_config
209 .disabled_rules
210 .insert("remove_true_filter".to_string());
211 let plan = CascadesOptimizer::new(config).optimize(&logical, &mut StatsCache::new());
212 assert!(matches!(plan, PhysicalPlan::Filter { .. }));
213 }
214
215 #[test]
216 fn optimizer_respects_enabled_rule_allowlist() {
217 let logical = LogicalPlan::Filter {
218 predicate: Expr::Literal(Literal::Bool(true)),
219 input: Box::new(LogicalPlan::Scan {
220 table: "t".to_string(),
221 }),
222 };
223 let mut config = OptimizerConfig::default();
224 config.rule_config.enabled_rules =
225 Some(HashSet::from_iter([String::from("remove_true_filter")]));
226 let plan = CascadesOptimizer::new(config).optimize(&logical, &mut StatsCache::new());
227 assert!(matches!(plan, PhysicalPlan::TableScan { .. }));
228 }
229
230 #[test]
231 fn optimize_with_memo_trace_collects_candidates() {
232 let sql = "select * from t";
233 let parser = SimpleParser::new(ParserConfig {
234 dialect: Dialect::Postgres,
235 });
236 let stmt = parser.parse(sql).expect("parse");
237 let logical = PlanBuilder::build(stmt).expect("plan");
238 let (physical, trace) = CascadesOptimizer::new(OptimizerConfig::default())
239 .optimize_with_memo_trace(&logical, &mut StatsCache::new());
240 assert!(matches!(physical, PhysicalPlan::TableScan { .. }));
241 assert!(!trace.groups.is_empty());
242 let first = &trace.groups[0];
243 assert!(!first.candidates.is_empty());
244 let formatted = trace.format_best_only();
245 assert!(formatted.contains("group="));
246 assert!(formatted.contains("cost="));
247 }
248
249 #[test]
250 fn rules_reach_fixpoint_for_projection_limit_topn() {
251 let sql = "select id from t order by id limit 5";
252 let parser = SimpleParser::new(ParserConfig {
253 dialect: Dialect::Postgres,
254 });
255 let stmt = parser.parse(sql).expect("parse");
256 let logical = PlanBuilder::build(stmt).expect("plan");
257 let (physical, _) = CascadesOptimizer::new(OptimizerConfig::default())
258 .optimize_with_trace(&logical, &mut StatsCache::new());
259 assert!(!matches!(physical, PhysicalPlan::Sort { .. }));
260 }
261
262 #[test]
263 fn trace_records_literal_conflicts() {
264 let logical = LogicalPlan::Filter {
265 predicate: Expr::BinaryOp {
266 left: Box::new(Expr::BinaryOp {
267 left: Box::new(Expr::Identifier("a".to_string())),
268 op: BinaryOperator::Eq,
269 right: Box::new(Expr::Literal(Literal::Number(1.0))),
270 }),
271 op: BinaryOperator::And,
272 right: Box::new(Expr::BinaryOp {
273 left: Box::new(Expr::Identifier("a".to_string())),
274 op: BinaryOperator::Eq,
275 right: Box::new(Expr::Literal(Literal::Number(2.0))),
276 }),
277 },
278 input: Box::new(LogicalPlan::Scan {
279 table: "t".to_string(),
280 }),
281 };
282 let (_, trace) = CascadesOptimizer::new(OptimizerConfig::default())
283 .optimize_with_trace(&logical, &mut StatsCache::new());
284 assert!(
285 trace
286 .conflicting_literals
287 .iter()
288 .any(|(left, right)| left.contains("a = 1") && right.contains("a = 2")),
289 "expected conflict pair, got {:?}",
290 trace.conflicting_literals
291 );
292 }
293
294 #[test]
295 fn optimizer_uses_stats_snapshot() {
296 let snapshot_json = include_str!("../tests/testdata/tpch_scale1.json");
297 let snapshot: StatsSnapshot = serde_json::from_str(snapshot_json).expect("snapshot");
298 let mut stats = snapshot.to_cache();
299 let sql = "select * from orders";
300 let parser = SimpleParser::new(ParserConfig {
301 dialect: Dialect::Postgres,
302 });
303 let stmt = parser.parse(sql).expect("parse");
304 let logical = PlanBuilder::build(stmt).expect("plan");
305 let (physical, _) = CascadesOptimizer::new(OptimizerConfig::default())
306 .optimize_with_trace(&logical, &mut stats);
307 assert!(matches!(physical, PhysicalPlan::TableScan { .. }));
308 }
309
310 #[test]
311 fn optimizer_uses_system_param_override() {
312 let registry = chryso_core::system_params::SystemParamRegistry::new();
313 registry.set_default_param(
314 CostModelConfig::PARAM_FILTER,
315 chryso_core::system_params::SystemParamValue::Float(9.0),
316 );
317 let mut config = OptimizerConfig::default();
318 config.system_params = Some(std::sync::Arc::new(registry));
319 let base = CostModelConfig::default();
320 let updated = base.apply_system_params(
321 config.system_params.as_ref().unwrap(),
322 config.tenant_id.as_deref(),
323 );
324 assert_eq!(updated.filter, 9.0);
325 }
326}
327
328impl Default for OptimizerConfig {
329 fn default() -> Self {
330 Self {
331 enable_cascades: true,
332 enable_properties: true,
333 rules: RuleSet::default(),
334 rule_config: RuleConfig::default(),
335 search_budget: SearchBudget::default(),
336 trace: false,
337 debug_rules: false,
338 stats_provider: None,
339 cost_config: None,
340 system_params: None,
341 tenant_id: None,
342 }
343 }
344}
345
346pub struct CascadesOptimizer {
347 config: OptimizerConfig,
348}
349
350impl CascadesOptimizer {
351 pub fn new(config: OptimizerConfig) -> Self {
352 Self { config }
353 }
354
355 pub fn optimize(&self, logical: &LogicalPlan, stats: &mut StatsCache) -> PhysicalPlan {
356 let _ = ensure_stats(logical, stats, &self.config);
357 let logical = crate::expr_rewrite::rewrite_plan(logical);
358 let logical = crate::column_prune::prune_plan(&logical);
359 if self.config.enable_cascades {
360 optimize_with_cascades(&logical, &self.config, stats).0
361 } else {
362 logical_to_physical(&logical)
363 }
364 }
365
366 pub fn optimize_with_trace(
367 &self,
368 logical: &LogicalPlan,
369 stats: &mut StatsCache,
370 ) -> (PhysicalPlan, OptimizerTrace) {
371 let loaded = ensure_stats(logical, stats, &self.config).unwrap_or_default();
372 let logical = crate::expr_rewrite::rewrite_plan(logical);
373 let logical = crate::column_prune::prune_plan(&logical);
374 if self.config.enable_cascades {
375 let (plan, mut trace) = optimize_with_cascades(&logical, &self.config, stats);
376 trace.stats_loaded = loaded;
377 (plan, trace)
378 } else {
379 let mut trace = OptimizerTrace::new();
380 trace.stats_loaded = loaded;
381 (logical_to_physical(&logical), trace)
382 }
383 }
384
385 pub fn optimize_with_memo_trace(
386 &self,
387 logical: &LogicalPlan,
388 stats: &mut StatsCache,
389 ) -> (PhysicalPlan, MemoTrace) {
390 let _ = ensure_stats(logical, stats, &self.config);
391 let logical = crate::expr_rewrite::rewrite_plan(logical);
392 let logical = crate::column_prune::prune_plan(&logical);
393 if self.config.enable_cascades {
394 optimize_with_cascades_memo(&logical, &self.config, stats)
395 } else {
396 let physical = logical_to_physical(&logical);
397 (physical, MemoTrace { groups: Vec::new() })
398 }
399 }
400}
401
402fn optimize_with_cascades(
403 logical: &LogicalPlan,
404 config: &OptimizerConfig,
405 _stats: &StatsCache,
406) -> (PhysicalPlan, OptimizerTrace) {
407 let mut trace = OptimizerTrace::new();
408 let mut rule_ctx = RuleContext::default();
410 let logical = apply_rules_fixpoint(
411 logical,
412 &config.rules,
413 &config.rule_config,
414 &mut trace,
415 &mut rule_ctx,
416 config.debug_rules,
417 );
418 trace
419 .conflicting_literals
420 .extend(rule_ctx.take_literal_conflicts());
421 let logical = crate::subquery::rewrite_correlated_subqueries(&logical);
422 let logical = crate::expr_rewrite::rewrite_plan(&logical);
423 let candidates = crate::join_order::enumerate_join_orders(&logical, _stats);
424 let mut memo = Memo::new();
425 let root = memo.insert(candidates.first().unwrap_or(&logical));
426 memo.explore(&config.rules, &config.rule_config, &config.search_budget);
427 let cost_model = build_cost_model(_stats, config, Some(&mut trace));
428 let physical_rules = crate::physical_rules::PhysicalRuleSet::default();
429 let mut best = memo
430 .best_physical(root, &physical_rules, cost_model.as_ref())
431 .unwrap_or_else(|| logical_to_physical(&logical));
432 if config.enable_properties {
433 let required = crate::properties::PhysicalProperties::default();
434 best = crate::enforcer::enforce(best, &required);
435 }
436 (best, trace)
437}
438
439fn optimize_with_cascades_memo(
440 logical: &LogicalPlan,
441 config: &OptimizerConfig,
442 _stats: &StatsCache,
443) -> (PhysicalPlan, MemoTrace) {
444 let mut rule_ctx = RuleContext::default();
445 let logical = apply_rules_fixpoint(
446 logical,
447 &config.rules,
448 &config.rule_config,
449 &mut OptimizerTrace::new(),
450 &mut rule_ctx,
451 config.debug_rules,
452 );
453 let logical = crate::subquery::rewrite_correlated_subqueries(&logical);
454 let logical = crate::expr_rewrite::rewrite_plan(&logical);
455 let candidates = crate::join_order::enumerate_join_orders(&logical, _stats);
456 let mut memo = Memo::new();
457 let root = memo.insert(candidates.first().unwrap_or(&logical));
458 memo.explore(&config.rules, &config.rule_config, &config.search_budget);
459 let cost_model = build_cost_model(_stats, config, None);
460 let physical_rules = crate::physical_rules::PhysicalRuleSet::default();
461 let trace = memo.trace(&physical_rules, cost_model.as_ref());
462 let mut best = memo
463 .best_physical(root, &physical_rules, cost_model.as_ref())
464 .unwrap_or_else(|| logical_to_physical(&logical));
465 if config.enable_properties {
466 let required = crate::properties::PhysicalProperties::default();
467 best = crate::enforcer::enforce(best, &required);
468 }
469 (best, trace)
470}
471
472fn build_cost_model<'a>(
473 stats: &'a StatsCache,
474 config: &'a OptimizerConfig,
475 trace: Option<&mut OptimizerTrace>,
476) -> Box<dyn CostModel + 'a> {
477 let base = config.cost_config.clone().unwrap_or_default();
478 let model_config = match &config.system_params {
479 Some(registry) => {
480 let tenant = config.tenant_id.as_deref();
481 base.apply_system_params(registry, tenant)
482 }
483 None => base,
484 };
485 let model_config = if model_config.validate().is_ok() {
486 model_config
487 } else {
488 if let Some(trace) = trace {
489 trace
490 .warnings
491 .push("invalid cost config detected; falling back to defaults".to_string());
492 }
493 CostModelConfig::default()
494 };
495 if stats.is_empty() {
496 Box::new(cost::UnitCostModelWithConfig::new(model_config))
497 } else {
498 Box::new(cost::StatsCostModel::with_config(stats, model_config))
499 }
500}
501
502fn ensure_stats(
503 logical: &LogicalPlan,
504 stats: &mut StatsCache,
505 config: &OptimizerConfig,
506) -> chryso_core::ChrysoResult<Vec<String>> {
507 let Some(provider) = &config.stats_provider else {
508 return Ok(Vec::new());
509 };
510 let requirements = crate::stats_collect::collect_requirements(logical);
511 let mut missing_tables = Vec::new();
512 for table in &requirements.tables {
513 if stats.table_stats(table).is_none() {
514 missing_tables.push(table.clone());
515 }
516 }
517 let mut missing_columns = Vec::new();
518 for (table, column) in &requirements.columns {
519 if stats.column_stats(table, column).is_none() {
520 missing_columns.push((table.clone(), column.clone()));
521 }
522 }
523 if missing_tables.is_empty() && missing_columns.is_empty() {
524 return Ok(Vec::new());
525 }
526 provider.load_stats(&missing_tables, &missing_columns, stats)?;
527 Ok(missing_tables)
528}
529
530fn apply_rules_recursive(
531 plan: &LogicalPlan,
532 rules: &RuleSet,
533 rule_config: &RuleConfig,
534 trace: &mut OptimizerTrace,
535 rule_ctx: &mut RuleContext,
536 debug_rules: bool,
537) -> LogicalPlan {
538 let mut rewritten = plan.clone();
539 let mut matched = Vec::new();
540 for rule in rules.iter() {
541 if !rule_config.is_enabled(rule.name()) {
542 continue;
543 }
544 let alternatives = rule.apply(&rewritten, rule_ctx);
545 if !alternatives.is_empty() {
546 matched.push(rule.name().to_string());
547 rewritten = alternatives.last().cloned().unwrap_or(rewritten);
548 }
549 }
550 if debug_rules {
551 trace.applied_rules.extend(matched);
552 }
553 let rewritten = match rewritten {
554 LogicalPlan::Filter { predicate, input } => LogicalPlan::Filter {
555 predicate,
556 input: Box::new(apply_rules_recursive(
557 input.as_ref(),
558 rules,
559 rule_config,
560 trace,
561 rule_ctx,
562 debug_rules,
563 )),
564 },
565 LogicalPlan::Projection { exprs, input } => LogicalPlan::Projection {
566 exprs,
567 input: Box::new(apply_rules_recursive(
568 input.as_ref(),
569 rules,
570 rule_config,
571 trace,
572 rule_ctx,
573 debug_rules,
574 )),
575 },
576 LogicalPlan::Join {
577 join_type,
578 left,
579 right,
580 on,
581 } => LogicalPlan::Join {
582 join_type,
583 left: Box::new(apply_rules_recursive(
584 left.as_ref(),
585 rules,
586 rule_config,
587 trace,
588 rule_ctx,
589 debug_rules,
590 )),
591 right: Box::new(apply_rules_recursive(
592 right.as_ref(),
593 rules,
594 rule_config,
595 trace,
596 rule_ctx,
597 debug_rules,
598 )),
599 on,
600 },
601 LogicalPlan::Aggregate {
602 group_exprs,
603 aggr_exprs,
604 input,
605 } => LogicalPlan::Aggregate {
606 group_exprs,
607 aggr_exprs,
608 input: Box::new(apply_rules_recursive(
609 input.as_ref(),
610 rules,
611 rule_config,
612 trace,
613 rule_ctx,
614 debug_rules,
615 )),
616 },
617 LogicalPlan::Distinct { input } => LogicalPlan::Distinct {
618 input: Box::new(apply_rules_recursive(
619 input.as_ref(),
620 rules,
621 rule_config,
622 trace,
623 rule_ctx,
624 debug_rules,
625 )),
626 },
627 LogicalPlan::TopN {
628 order_by,
629 limit,
630 input,
631 } => LogicalPlan::TopN {
632 order_by,
633 limit,
634 input: Box::new(apply_rules_recursive(
635 input.as_ref(),
636 rules,
637 rule_config,
638 trace,
639 rule_ctx,
640 debug_rules,
641 )),
642 },
643 LogicalPlan::Sort { order_by, input } => LogicalPlan::Sort {
644 order_by,
645 input: Box::new(apply_rules_recursive(
646 input.as_ref(),
647 rules,
648 rule_config,
649 trace,
650 rule_ctx,
651 debug_rules,
652 )),
653 },
654 LogicalPlan::Limit {
655 limit,
656 offset,
657 input,
658 } => LogicalPlan::Limit {
659 limit,
660 offset,
661 input: Box::new(apply_rules_recursive(
662 input.as_ref(),
663 rules,
664 rule_config,
665 trace,
666 rule_ctx,
667 debug_rules,
668 )),
669 },
670 LogicalPlan::Derived {
671 input,
672 alias,
673 column_aliases,
674 } => LogicalPlan::Derived {
675 input: Box::new(apply_rules_recursive(
676 input.as_ref(),
677 rules,
678 rule_config,
679 trace,
680 rule_ctx,
681 debug_rules,
682 )),
683 alias,
684 column_aliases,
685 },
686 other => other,
687 };
688 let mut final_plan = rewritten.clone();
689 for rule in rules.iter() {
690 if !rule_config.is_enabled(rule.name()) {
691 continue;
692 }
693 let alternatives = rule.apply(&final_plan, rule_ctx);
694 if !alternatives.is_empty() {
695 final_plan = alternatives.last().cloned().unwrap_or(final_plan);
696 }
697 }
698 final_plan
699}
700
701fn apply_rules_fixpoint(
702 plan: &LogicalPlan,
703 rules: &RuleSet,
704 rule_config: &RuleConfig,
705 trace: &mut OptimizerTrace,
706 rule_ctx: &mut RuleContext,
707 debug_rules: bool,
708) -> LogicalPlan {
709 const MAX_RULE_PASSES: usize = 8;
710 let mut current = plan.clone();
711 for _ in 0..MAX_RULE_PASSES {
712 let before = logical_plan_fingerprint(¤t);
713 let next =
714 apply_rules_recursive(¤t, rules, rule_config, trace, rule_ctx, debug_rules);
715 let after = logical_plan_fingerprint(&next);
716 if before == after {
717 return current;
718 }
719 current = next;
720 }
721 current
722}
723
724fn logical_plan_fingerprint(plan: &LogicalPlan) -> String {
725 plan.explain(0)
726}
727
728fn logical_to_physical(logical: &LogicalPlan) -> PhysicalPlan {
729 let children = match logical {
730 LogicalPlan::Scan { .. } => Vec::new(),
731 LogicalPlan::IndexScan { .. } => Vec::new(),
732 LogicalPlan::Dml { .. } => Vec::new(),
733 LogicalPlan::Derived { input, .. } => vec![logical_to_physical(input)],
734 LogicalPlan::Filter { input, .. } => vec![logical_to_physical(input)],
735 LogicalPlan::Projection { input, .. } => vec![logical_to_physical(input)],
736 LogicalPlan::Join { left, right, .. } => {
737 vec![logical_to_physical(left), logical_to_physical(right)]
738 }
739 LogicalPlan::Aggregate { input, .. } => vec![logical_to_physical(input)],
740 LogicalPlan::Distinct { input } => vec![logical_to_physical(input)],
741 LogicalPlan::TopN { input, .. } => vec![logical_to_physical(input)],
742 LogicalPlan::Sort { input, .. } => vec![logical_to_physical(input)],
743 LogicalPlan::Limit { input, .. } => vec![logical_to_physical(input)],
744 };
745 let rules = crate::physical_rules::PhysicalRuleSet::default();
746 let candidates = rules.apply_all(logical, &children);
747 let cost_model = UnitCostModel;
748 candidates
749 .into_iter()
750 .min_by(|left, right| {
751 cost_model
752 .cost(left)
753 .0
754 .partial_cmp(&cost_model.cost(right).0)
755 .unwrap()
756 })
757 .unwrap_or(PhysicalPlan::TableScan {
758 table: "unknown".to_string(),
759 })
760}