Skip to main content

chryso_optimizer/
lib.rs

1use crate::cost::{CostModel, UnitCostModel};
2use crate::memo::Memo;
3use crate::rules::{RuleContext, RuleSet};
4use chryso_metadata::StatsCache;
5use chryso_planner::{LogicalPlan, PhysicalPlan};
6use std::collections::HashSet;
7
8pub mod column_prune;
9pub mod cost;
10pub mod cost_profile;
11pub mod enforcer;
12pub mod estimation;
13pub mod expr_rewrite;
14pub mod join_order;
15pub mod memo;
16pub mod physical_rules;
17pub mod plan_fingerprint;
18pub mod properties;
19pub mod rules;
20pub mod stats_collect;
21pub mod subquery;
22pub mod utils;
23
24pub use cost::CostModelConfig;
25pub use cost_profile::CostProfile;
26pub use plan_fingerprint::{logical_fingerprint, physical_fingerprint};
27
28#[derive(Debug)]
29pub struct OptimizerTrace {
30    pub applied_rules: Vec<String>,
31    pub stats_loaded: Vec<String>,
32    pub conflicting_literals: Vec<(String, String)>,
33    pub warnings: Vec<String>,
34}
35
36impl OptimizerTrace {
37    pub fn new() -> Self {
38        Self {
39            applied_rules: Vec::new(),
40            stats_loaded: Vec::new(),
41            conflicting_literals: Vec::new(),
42            warnings: Vec::new(),
43        }
44    }
45}
46
47#[derive(Debug, Clone)]
48pub struct MemoTrace {
49    pub groups: Vec<MemoTraceGroup>,
50}
51
52#[derive(Debug, Clone)]
53pub struct MemoTraceGroup {
54    pub id: usize,
55    pub candidates: Vec<MemoTraceCandidate>,
56}
57
58#[derive(Debug, Clone)]
59pub struct MemoTraceCandidate {
60    pub cost: f64,
61    pub plan: String,
62}
63
64impl MemoTrace {
65    pub fn format_full(&self) -> String {
66        format_memo_trace(self, false)
67    }
68
69    pub fn format_best_only(&self) -> String {
70        format_memo_trace(self, true)
71    }
72}
73
74fn format_memo_trace(trace: &MemoTrace, best_only: bool) -> String {
75    use std::fmt::Write;
76
77    let mut output = String::new();
78    for group in &trace.groups {
79        let _ = writeln!(
80            &mut output,
81            "group={} candidates={}",
82            group.id,
83            group.candidates.len()
84        );
85        if best_only {
86            if let Some(best) = group.candidates.first() {
87                write_candidate(&mut output, best);
88            }
89            continue;
90        }
91        for candidate in &group.candidates {
92            write_candidate(&mut output, candidate);
93        }
94    }
95    output
96}
97
98fn write_candidate(output: &mut String, candidate: &MemoTraceCandidate) {
99    use std::fmt::Write;
100
101    let _ = writeln!(output, "  cost={:.3}", candidate.cost);
102    for line in candidate.plan.lines() {
103        let _ = writeln!(output, "    {line}");
104    }
105}
106
107pub struct OptimizerConfig {
108    pub enable_cascades: bool,
109    pub enable_properties: bool,
110    pub rules: RuleSet,
111    pub rule_config: RuleConfig,
112    pub search_budget: SearchBudget,
113    pub trace: bool,
114    pub debug_rules: bool,
115    pub stats_provider: Option<std::sync::Arc<dyn chryso_metadata::StatsProvider>>,
116    pub cost_config: Option<CostModelConfig>,
117    pub system_params: Option<std::sync::Arc<chryso_core::system_params::SystemParamRegistry>>,
118    pub tenant_id: Option<String>,
119}
120
121impl std::fmt::Debug for OptimizerConfig {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        f.debug_struct("OptimizerConfig")
124            .field("enable_cascades", &self.enable_cascades)
125            .field("enable_properties", &self.enable_properties)
126            .field("rules", &self.rules)
127            .field("rule_config", &self.rule_config)
128            .field("search_budget", &self.search_budget)
129            .field("trace", &self.trace)
130            .field("debug_rules", &self.debug_rules)
131            .field("stats_provider", &self.stats_provider.is_some())
132            .field("cost_config", &self.cost_config.is_some())
133            .field("system_params", &self.system_params.is_some())
134            .field("tenant_id", &self.tenant_id)
135            .finish()
136    }
137}
138
139#[derive(Clone, Debug)]
140pub struct RuleConfig {
141    pub enabled_rules: Option<HashSet<String>>,
142    pub disabled_rules: HashSet<String>,
143}
144
145impl RuleConfig {
146    pub fn is_enabled(&self, name: &str) -> bool {
147        if let Some(enabled) = &self.enabled_rules {
148            return enabled.contains(name);
149        }
150        !self.disabled_rules.contains(name)
151    }
152}
153
154impl Default for RuleConfig {
155    fn default() -> Self {
156        Self {
157            enabled_rules: None,
158            disabled_rules: HashSet::new(),
159        }
160    }
161}
162
163#[derive(Clone, Debug, Default)]
164pub struct SearchBudget {
165    pub max_groups: Option<usize>,
166    pub max_rewrites: Option<usize>,
167}
168
169#[cfg(test)]
170mod tests {
171    use super::cost::UnitCostModel;
172    use super::{CascadesOptimizer, OptimizerConfig};
173    use crate::CostModelConfig;
174    use chryso_core::ast::{BinaryOperator, Expr, Literal};
175    use chryso_metadata::{StatsCache, StatsSnapshot, type_inference::SimpleTypeInferencer};
176    use chryso_parser::{Dialect, ParserConfig, SimpleParser, SqlParser};
177    use chryso_planner::{LogicalPlan, PhysicalPlan, PlanBuilder};
178    use serde_json;
179    use std::collections::HashSet;
180
181    #[test]
182    fn explain_with_types_and_costs() {
183        let sql = "select sum(amount) from sales group by region";
184        let parser = SimpleParser::new(ParserConfig {
185            dialect: Dialect::Postgres,
186        });
187        let stmt = parser.parse(sql).expect("parse");
188        let logical = PlanBuilder::build(stmt).expect("plan");
189        let typed = logical.explain_typed(0, &SimpleTypeInferencer);
190        assert!(typed.contains("LogicalAggregate"));
191
192        let physical = CascadesOptimizer::new(OptimizerConfig::default())
193            .optimize(&logical, &mut StatsCache::new());
194        let costed = physical.explain_costed(0, &UnitCostModel);
195        assert!(costed.contains("cost="));
196    }
197
198    #[test]
199    fn optimizer_respects_disabled_rule() {
200        let logical = LogicalPlan::Filter {
201            predicate: Expr::Literal(Literal::Bool(true)),
202            input: Box::new(LogicalPlan::Scan {
203                table: "t".to_string(),
204            }),
205        };
206        let mut config = OptimizerConfig::default();
207        config
208            .rule_config
209            .disabled_rules
210            .insert("remove_true_filter".to_string());
211        let plan = CascadesOptimizer::new(config).optimize(&logical, &mut StatsCache::new());
212        assert!(matches!(plan, PhysicalPlan::Filter { .. }));
213    }
214
215    #[test]
216    fn optimizer_respects_enabled_rule_allowlist() {
217        let logical = LogicalPlan::Filter {
218            predicate: Expr::Literal(Literal::Bool(true)),
219            input: Box::new(LogicalPlan::Scan {
220                table: "t".to_string(),
221            }),
222        };
223        let mut config = OptimizerConfig::default();
224        config.rule_config.enabled_rules =
225            Some(HashSet::from_iter([String::from("remove_true_filter")]));
226        let plan = CascadesOptimizer::new(config).optimize(&logical, &mut StatsCache::new());
227        assert!(matches!(plan, PhysicalPlan::TableScan { .. }));
228    }
229
230    #[test]
231    fn optimize_with_memo_trace_collects_candidates() {
232        let sql = "select * from t";
233        let parser = SimpleParser::new(ParserConfig {
234            dialect: Dialect::Postgres,
235        });
236        let stmt = parser.parse(sql).expect("parse");
237        let logical = PlanBuilder::build(stmt).expect("plan");
238        let (physical, trace) = CascadesOptimizer::new(OptimizerConfig::default())
239            .optimize_with_memo_trace(&logical, &mut StatsCache::new());
240        assert!(matches!(physical, PhysicalPlan::TableScan { .. }));
241        assert!(!trace.groups.is_empty());
242        let first = &trace.groups[0];
243        assert!(!first.candidates.is_empty());
244        let formatted = trace.format_best_only();
245        assert!(formatted.contains("group="));
246        assert!(formatted.contains("cost="));
247    }
248
249    #[test]
250    fn rules_reach_fixpoint_for_projection_limit_topn() {
251        let sql = "select id from t order by id limit 5";
252        let parser = SimpleParser::new(ParserConfig {
253            dialect: Dialect::Postgres,
254        });
255        let stmt = parser.parse(sql).expect("parse");
256        let logical = PlanBuilder::build(stmt).expect("plan");
257        let (physical, _) = CascadesOptimizer::new(OptimizerConfig::default())
258            .optimize_with_trace(&logical, &mut StatsCache::new());
259        assert!(!matches!(physical, PhysicalPlan::Sort { .. }));
260    }
261
262    #[test]
263    fn trace_records_literal_conflicts() {
264        let logical = LogicalPlan::Filter {
265            predicate: Expr::BinaryOp {
266                left: Box::new(Expr::BinaryOp {
267                    left: Box::new(Expr::Identifier("a".to_string())),
268                    op: BinaryOperator::Eq,
269                    right: Box::new(Expr::Literal(Literal::Number(1.0))),
270                }),
271                op: BinaryOperator::And,
272                right: Box::new(Expr::BinaryOp {
273                    left: Box::new(Expr::Identifier("a".to_string())),
274                    op: BinaryOperator::Eq,
275                    right: Box::new(Expr::Literal(Literal::Number(2.0))),
276                }),
277            },
278            input: Box::new(LogicalPlan::Scan {
279                table: "t".to_string(),
280            }),
281        };
282        let (_, trace) = CascadesOptimizer::new(OptimizerConfig::default())
283            .optimize_with_trace(&logical, &mut StatsCache::new());
284        assert!(
285            trace
286                .conflicting_literals
287                .iter()
288                .any(|(left, right)| left.contains("a = 1") && right.contains("a = 2")),
289            "expected conflict pair, got {:?}",
290            trace.conflicting_literals
291        );
292    }
293
294    #[test]
295    fn optimizer_uses_stats_snapshot() {
296        let snapshot_json = include_str!("../tests/testdata/tpch_scale1.json");
297        let snapshot: StatsSnapshot = serde_json::from_str(snapshot_json).expect("snapshot");
298        let mut stats = snapshot.to_cache();
299        let sql = "select * from orders";
300        let parser = SimpleParser::new(ParserConfig {
301            dialect: Dialect::Postgres,
302        });
303        let stmt = parser.parse(sql).expect("parse");
304        let logical = PlanBuilder::build(stmt).expect("plan");
305        let (physical, _) = CascadesOptimizer::new(OptimizerConfig::default())
306            .optimize_with_trace(&logical, &mut stats);
307        assert!(matches!(physical, PhysicalPlan::TableScan { .. }));
308    }
309
310    #[test]
311    fn optimizer_uses_system_param_override() {
312        let registry = chryso_core::system_params::SystemParamRegistry::new();
313        registry.set_default_param(
314            CostModelConfig::PARAM_FILTER,
315            chryso_core::system_params::SystemParamValue::Float(9.0),
316        );
317        let mut config = OptimizerConfig::default();
318        config.system_params = Some(std::sync::Arc::new(registry));
319        let base = CostModelConfig::default();
320        let updated = base.apply_system_params(
321            config.system_params.as_ref().unwrap(),
322            config.tenant_id.as_deref(),
323        );
324        assert_eq!(updated.filter, 9.0);
325    }
326}
327
328impl Default for OptimizerConfig {
329    fn default() -> Self {
330        Self {
331            enable_cascades: true,
332            enable_properties: true,
333            rules: RuleSet::default(),
334            rule_config: RuleConfig::default(),
335            search_budget: SearchBudget::default(),
336            trace: false,
337            debug_rules: false,
338            stats_provider: None,
339            cost_config: None,
340            system_params: None,
341            tenant_id: None,
342        }
343    }
344}
345
346pub struct CascadesOptimizer {
347    config: OptimizerConfig,
348}
349
350impl CascadesOptimizer {
351    pub fn new(config: OptimizerConfig) -> Self {
352        Self { config }
353    }
354
355    pub fn optimize(&self, logical: &LogicalPlan, stats: &mut StatsCache) -> PhysicalPlan {
356        let _ = ensure_stats(logical, stats, &self.config);
357        let logical = crate::expr_rewrite::rewrite_plan(logical);
358        let logical = crate::column_prune::prune_plan(&logical);
359        if self.config.enable_cascades {
360            optimize_with_cascades(&logical, &self.config, stats).0
361        } else {
362            logical_to_physical(&logical)
363        }
364    }
365
366    pub fn optimize_with_trace(
367        &self,
368        logical: &LogicalPlan,
369        stats: &mut StatsCache,
370    ) -> (PhysicalPlan, OptimizerTrace) {
371        let loaded = ensure_stats(logical, stats, &self.config).unwrap_or_default();
372        let logical = crate::expr_rewrite::rewrite_plan(logical);
373        let logical = crate::column_prune::prune_plan(&logical);
374        if self.config.enable_cascades {
375            let (plan, mut trace) = optimize_with_cascades(&logical, &self.config, stats);
376            trace.stats_loaded = loaded;
377            (plan, trace)
378        } else {
379            let mut trace = OptimizerTrace::new();
380            trace.stats_loaded = loaded;
381            (logical_to_physical(&logical), trace)
382        }
383    }
384
385    pub fn optimize_with_memo_trace(
386        &self,
387        logical: &LogicalPlan,
388        stats: &mut StatsCache,
389    ) -> (PhysicalPlan, MemoTrace) {
390        let _ = ensure_stats(logical, stats, &self.config);
391        let logical = crate::expr_rewrite::rewrite_plan(logical);
392        let logical = crate::column_prune::prune_plan(&logical);
393        if self.config.enable_cascades {
394            optimize_with_cascades_memo(&logical, &self.config, stats)
395        } else {
396            let physical = logical_to_physical(&logical);
397            (physical, MemoTrace { groups: Vec::new() })
398        }
399    }
400}
401
402fn optimize_with_cascades(
403    logical: &LogicalPlan,
404    config: &OptimizerConfig,
405    _stats: &StatsCache,
406) -> (PhysicalPlan, OptimizerTrace) {
407    let mut trace = OptimizerTrace::new();
408    // Keep rule side effects (e.g., literal conflicts) explicit and thread-safe.
409    let mut rule_ctx = RuleContext::default();
410    let logical = apply_rules_fixpoint(
411        logical,
412        &config.rules,
413        &config.rule_config,
414        &mut trace,
415        &mut rule_ctx,
416        config.debug_rules,
417    );
418    trace
419        .conflicting_literals
420        .extend(rule_ctx.take_literal_conflicts());
421    let logical = crate::subquery::rewrite_correlated_subqueries(&logical);
422    let logical = crate::expr_rewrite::rewrite_plan(&logical);
423    let candidates = crate::join_order::enumerate_join_orders(&logical, _stats);
424    let mut memo = Memo::new();
425    let root = memo.insert(candidates.first().unwrap_or(&logical));
426    memo.explore(&config.rules, &config.rule_config, &config.search_budget);
427    let cost_model = build_cost_model(_stats, config, Some(&mut trace));
428    let physical_rules = crate::physical_rules::PhysicalRuleSet::default();
429    let mut best = memo
430        .best_physical(root, &physical_rules, cost_model.as_ref())
431        .unwrap_or_else(|| logical_to_physical(&logical));
432    if config.enable_properties {
433        let required = crate::properties::PhysicalProperties::default();
434        best = crate::enforcer::enforce(best, &required);
435    }
436    (best, trace)
437}
438
439fn optimize_with_cascades_memo(
440    logical: &LogicalPlan,
441    config: &OptimizerConfig,
442    _stats: &StatsCache,
443) -> (PhysicalPlan, MemoTrace) {
444    let mut rule_ctx = RuleContext::default();
445    let logical = apply_rules_fixpoint(
446        logical,
447        &config.rules,
448        &config.rule_config,
449        &mut OptimizerTrace::new(),
450        &mut rule_ctx,
451        config.debug_rules,
452    );
453    let logical = crate::subquery::rewrite_correlated_subqueries(&logical);
454    let logical = crate::expr_rewrite::rewrite_plan(&logical);
455    let candidates = crate::join_order::enumerate_join_orders(&logical, _stats);
456    let mut memo = Memo::new();
457    let root = memo.insert(candidates.first().unwrap_or(&logical));
458    memo.explore(&config.rules, &config.rule_config, &config.search_budget);
459    let cost_model = build_cost_model(_stats, config, None);
460    let physical_rules = crate::physical_rules::PhysicalRuleSet::default();
461    let trace = memo.trace(&physical_rules, cost_model.as_ref());
462    let mut best = memo
463        .best_physical(root, &physical_rules, cost_model.as_ref())
464        .unwrap_or_else(|| logical_to_physical(&logical));
465    if config.enable_properties {
466        let required = crate::properties::PhysicalProperties::default();
467        best = crate::enforcer::enforce(best, &required);
468    }
469    (best, trace)
470}
471
472fn build_cost_model<'a>(
473    stats: &'a StatsCache,
474    config: &'a OptimizerConfig,
475    trace: Option<&mut OptimizerTrace>,
476) -> Box<dyn CostModel + 'a> {
477    let base = config.cost_config.clone().unwrap_or_default();
478    let model_config = match &config.system_params {
479        Some(registry) => {
480            let tenant = config.tenant_id.as_deref();
481            base.apply_system_params(registry, tenant)
482        }
483        None => base,
484    };
485    let model_config = if model_config.validate().is_ok() {
486        model_config
487    } else {
488        if let Some(trace) = trace {
489            trace
490                .warnings
491                .push("invalid cost config detected; falling back to defaults".to_string());
492        }
493        CostModelConfig::default()
494    };
495    if stats.is_empty() {
496        Box::new(cost::UnitCostModelWithConfig::new(model_config))
497    } else {
498        Box::new(cost::StatsCostModel::with_config(stats, model_config))
499    }
500}
501
502fn ensure_stats(
503    logical: &LogicalPlan,
504    stats: &mut StatsCache,
505    config: &OptimizerConfig,
506) -> chryso_core::ChrysoResult<Vec<String>> {
507    let Some(provider) = &config.stats_provider else {
508        return Ok(Vec::new());
509    };
510    let requirements = crate::stats_collect::collect_requirements(logical);
511    let mut missing_tables = Vec::new();
512    for table in &requirements.tables {
513        if stats.table_stats(table).is_none() {
514            missing_tables.push(table.clone());
515        }
516    }
517    let mut missing_columns = Vec::new();
518    for (table, column) in &requirements.columns {
519        if stats.column_stats(table, column).is_none() {
520            missing_columns.push((table.clone(), column.clone()));
521        }
522    }
523    if missing_tables.is_empty() && missing_columns.is_empty() {
524        return Ok(Vec::new());
525    }
526    provider.load_stats(&missing_tables, &missing_columns, stats)?;
527    Ok(missing_tables)
528}
529
530fn apply_rules_recursive(
531    plan: &LogicalPlan,
532    rules: &RuleSet,
533    rule_config: &RuleConfig,
534    trace: &mut OptimizerTrace,
535    rule_ctx: &mut RuleContext,
536    debug_rules: bool,
537) -> LogicalPlan {
538    let mut rewritten = plan.clone();
539    let mut matched = Vec::new();
540    for rule in rules.iter() {
541        if !rule_config.is_enabled(rule.name()) {
542            continue;
543        }
544        let alternatives = rule.apply(&rewritten, rule_ctx);
545        if !alternatives.is_empty() {
546            matched.push(rule.name().to_string());
547            rewritten = alternatives.last().cloned().unwrap_or(rewritten);
548        }
549    }
550    if debug_rules {
551        trace.applied_rules.extend(matched);
552    }
553    let rewritten = match rewritten {
554        LogicalPlan::Filter { predicate, input } => LogicalPlan::Filter {
555            predicate,
556            input: Box::new(apply_rules_recursive(
557                input.as_ref(),
558                rules,
559                rule_config,
560                trace,
561                rule_ctx,
562                debug_rules,
563            )),
564        },
565        LogicalPlan::Projection { exprs, input } => LogicalPlan::Projection {
566            exprs,
567            input: Box::new(apply_rules_recursive(
568                input.as_ref(),
569                rules,
570                rule_config,
571                trace,
572                rule_ctx,
573                debug_rules,
574            )),
575        },
576        LogicalPlan::Join {
577            join_type,
578            left,
579            right,
580            on,
581        } => LogicalPlan::Join {
582            join_type,
583            left: Box::new(apply_rules_recursive(
584                left.as_ref(),
585                rules,
586                rule_config,
587                trace,
588                rule_ctx,
589                debug_rules,
590            )),
591            right: Box::new(apply_rules_recursive(
592                right.as_ref(),
593                rules,
594                rule_config,
595                trace,
596                rule_ctx,
597                debug_rules,
598            )),
599            on,
600        },
601        LogicalPlan::Aggregate {
602            group_exprs,
603            aggr_exprs,
604            input,
605        } => LogicalPlan::Aggregate {
606            group_exprs,
607            aggr_exprs,
608            input: Box::new(apply_rules_recursive(
609                input.as_ref(),
610                rules,
611                rule_config,
612                trace,
613                rule_ctx,
614                debug_rules,
615            )),
616        },
617        LogicalPlan::Distinct { input } => LogicalPlan::Distinct {
618            input: Box::new(apply_rules_recursive(
619                input.as_ref(),
620                rules,
621                rule_config,
622                trace,
623                rule_ctx,
624                debug_rules,
625            )),
626        },
627        LogicalPlan::TopN {
628            order_by,
629            limit,
630            input,
631        } => LogicalPlan::TopN {
632            order_by,
633            limit,
634            input: Box::new(apply_rules_recursive(
635                input.as_ref(),
636                rules,
637                rule_config,
638                trace,
639                rule_ctx,
640                debug_rules,
641            )),
642        },
643        LogicalPlan::Sort { order_by, input } => LogicalPlan::Sort {
644            order_by,
645            input: Box::new(apply_rules_recursive(
646                input.as_ref(),
647                rules,
648                rule_config,
649                trace,
650                rule_ctx,
651                debug_rules,
652            )),
653        },
654        LogicalPlan::Limit {
655            limit,
656            offset,
657            input,
658        } => LogicalPlan::Limit {
659            limit,
660            offset,
661            input: Box::new(apply_rules_recursive(
662                input.as_ref(),
663                rules,
664                rule_config,
665                trace,
666                rule_ctx,
667                debug_rules,
668            )),
669        },
670        LogicalPlan::Derived {
671            input,
672            alias,
673            column_aliases,
674        } => LogicalPlan::Derived {
675            input: Box::new(apply_rules_recursive(
676                input.as_ref(),
677                rules,
678                rule_config,
679                trace,
680                rule_ctx,
681                debug_rules,
682            )),
683            alias,
684            column_aliases,
685        },
686        other => other,
687    };
688    let mut final_plan = rewritten.clone();
689    for rule in rules.iter() {
690        if !rule_config.is_enabled(rule.name()) {
691            continue;
692        }
693        let alternatives = rule.apply(&final_plan, rule_ctx);
694        if !alternatives.is_empty() {
695            final_plan = alternatives.last().cloned().unwrap_or(final_plan);
696        }
697    }
698    final_plan
699}
700
701fn apply_rules_fixpoint(
702    plan: &LogicalPlan,
703    rules: &RuleSet,
704    rule_config: &RuleConfig,
705    trace: &mut OptimizerTrace,
706    rule_ctx: &mut RuleContext,
707    debug_rules: bool,
708) -> LogicalPlan {
709    const MAX_RULE_PASSES: usize = 8;
710    let mut current = plan.clone();
711    for _ in 0..MAX_RULE_PASSES {
712        let before = logical_plan_fingerprint(&current);
713        let next =
714            apply_rules_recursive(&current, rules, rule_config, trace, rule_ctx, debug_rules);
715        let after = logical_plan_fingerprint(&next);
716        if before == after {
717            return current;
718        }
719        current = next;
720    }
721    current
722}
723
724fn logical_plan_fingerprint(plan: &LogicalPlan) -> String {
725    plan.explain(0)
726}
727
728fn logical_to_physical(logical: &LogicalPlan) -> PhysicalPlan {
729    let children = match logical {
730        LogicalPlan::Scan { .. } => Vec::new(),
731        LogicalPlan::IndexScan { .. } => Vec::new(),
732        LogicalPlan::Dml { .. } => Vec::new(),
733        LogicalPlan::Derived { input, .. } => vec![logical_to_physical(input)],
734        LogicalPlan::Filter { input, .. } => vec![logical_to_physical(input)],
735        LogicalPlan::Projection { input, .. } => vec![logical_to_physical(input)],
736        LogicalPlan::Join { left, right, .. } => {
737            vec![logical_to_physical(left), logical_to_physical(right)]
738        }
739        LogicalPlan::Aggregate { input, .. } => vec![logical_to_physical(input)],
740        LogicalPlan::Distinct { input } => vec![logical_to_physical(input)],
741        LogicalPlan::TopN { input, .. } => vec![logical_to_physical(input)],
742        LogicalPlan::Sort { input, .. } => vec![logical_to_physical(input)],
743        LogicalPlan::Limit { input, .. } => vec![logical_to_physical(input)],
744    };
745    let rules = crate::physical_rules::PhysicalRuleSet::default();
746    let candidates = rules.apply_all(logical, &children);
747    let cost_model = UnitCostModel;
748    candidates
749        .into_iter()
750        .min_by(|left, right| {
751            cost_model
752                .cost(left)
753                .0
754                .partial_cmp(&cost_model.cost(right).0)
755                .unwrap()
756        })
757        .unwrap_or(PhysicalPlan::TableScan {
758            table: "unknown".to_string(),
759        })
760}