Skip to main content

reddb_server/storage/query/planner/
cost.rs

1//! Cost Estimation
2//!
3//! Cost-based query plan selection with cardinality estimation.
4//!
5//! # Cost Model
6//!
7//! - **CPU cost**: Computation overhead
8//! - **IO cost**: Disk/memory access
9//! - **Network cost**: For distributed queries
10//! - **Memory cost**: Working memory required
11
12use std::sync::Arc;
13
14use super::stats_provider::{NullProvider, StatsProvider};
15use crate::storage::query::ast::{
16    CompareOp, FieldRef, Filter as AstFilter, GraphQuery, HybridQuery, JoinQuery, JoinType,
17    PathQuery, QueryExpr, TableQuery, VectorQuery,
18};
19use crate::storage::schema::Value;
20
21/// Cardinality estimate for a query result
22#[derive(Debug, Clone, Default)]
23pub struct CardinalityEstimate {
24    /// Estimated row/record count
25    pub rows: f64,
26    /// Selectivity factor (0.0 - 1.0)
27    pub selectivity: f64,
28    /// Confidence in the estimate (0.0 - 1.0)
29    pub confidence: f64,
30}
31
32impl CardinalityEstimate {
33    /// Create a new cardinality estimate
34    pub fn new(rows: f64, selectivity: f64) -> Self {
35        Self {
36            rows,
37            selectivity,
38            confidence: 1.0,
39        }
40    }
41
42    /// Full table scan estimate
43    pub fn full_scan(table_size: f64) -> Self {
44        Self {
45            rows: table_size,
46            selectivity: 1.0,
47            confidence: 1.0,
48        }
49    }
50
51    /// Apply a filter to reduce cardinality
52    pub fn with_filter(mut self, filter_selectivity: f64) -> Self {
53        self.rows *= filter_selectivity;
54        self.selectivity *= filter_selectivity;
55        self.confidence *= 0.9; // Reduce confidence with each estimate
56        self
57    }
58}
59
60/// Cost of executing a query plan.
61///
62/// Mirrors PostgreSQL's `Cost` split: `startup_cost` is the work needed
63/// before the **first** row can be produced, `total` is the work to
64/// produce the **last** row. Both are reported so plan selection can
65/// pick a low-startup plan when a small `LIMIT` is in scope, even if
66/// total work is higher.
67///
68/// See `src/storage/query/planner/README.md` § Invariant 1.
69#[derive(Debug, Clone, Default)]
70pub struct PlanCost {
71    /// CPU computation cost
72    pub cpu: f64,
73    /// IO access cost
74    pub io: f64,
75    /// Network transfer cost (for distributed)
76    pub network: f64,
77    /// Memory requirement
78    pub memory: f64,
79    /// Cost to produce the **first** row.
80    ///
81    /// Zero for streaming operators (full scan, index scan, filter over
82    /// scan). Equal to `total` for blocking operators (sort, hash join
83    /// build side, materialize).
84    pub startup_cost: f64,
85    /// Cost to produce the **last** row.
86    pub total: f64,
87}
88
89impl PlanCost {
90    /// Create a new cost estimate with `startup_cost = 0` (streaming).
91    pub fn new(cpu: f64, io: f64, memory: f64) -> Self {
92        let total = cpu + io * 10.0 + memory * 0.1; // IO is expensive
93        Self {
94            cpu,
95            io,
96            network: 0.0,
97            memory,
98            startup_cost: 0.0,
99            total,
100        }
101    }
102
103    /// Create a cost with an explicit `startup_cost`. Use for blocking
104    /// operators (sort, hash build) and for index point lookups whose
105    /// first-row cost is non-zero.
106    pub fn with_startup(cpu: f64, io: f64, memory: f64, startup_cost: f64) -> Self {
107        let total = cpu + io * 10.0 + memory * 0.1;
108        Self {
109            cpu,
110            io,
111            network: 0.0,
112            memory,
113            startup_cost: startup_cost.max(0.0),
114            total: total.max(startup_cost),
115        }
116    }
117
118    /// Compose two costs in a **pipelined** fashion: the second operator
119    /// consumes the first as a stream.
120    ///
121    /// Both `startup_cost` and `total` add together. Use for filter
122    /// over scan, projection over filter, etc.
123    pub fn combine_pipelined(&self, other: &PlanCost) -> PlanCost {
124        PlanCost {
125            cpu: self.cpu + other.cpu,
126            io: self.io + other.io,
127            network: self.network + other.network,
128            memory: self.memory.max(other.memory),
129            startup_cost: self.startup_cost + other.startup_cost,
130            total: self.total + other.total,
131        }
132    }
133
134    /// Compose two costs where `self` must be **fully consumed** before
135    /// `blocker` can produce its first row.
136    ///
137    /// `self.total` flows into `blocker.startup_cost`. Use for sort,
138    /// hash build, materialise — anything that has to drain its input
139    /// before emitting.
140    pub fn combine_blocking(&self, blocker: &PlanCost) -> PlanCost {
141        PlanCost {
142            cpu: self.cpu + blocker.cpu,
143            io: self.io + blocker.io,
144            network: self.network + blocker.network,
145            memory: self.memory.max(blocker.memory),
146            startup_cost: self.total + blocker.startup_cost,
147            total: self.total + blocker.total,
148        }
149    }
150
151    /// Backwards-compatible alias for [`combine_pipelined`].
152    ///
153    /// New code should prefer `combine_pipelined` / `combine_blocking`
154    /// explicitly. This is kept so existing callers compile unchanged.
155    pub fn combine(&self, other: &PlanCost) -> PlanCost {
156        self.combine_pipelined(other)
157    }
158
159    /// Scale cost by a factor (cardinality multiplier, etc.).
160    pub fn scale(&self, factor: f64) -> PlanCost {
161        PlanCost {
162            cpu: self.cpu * factor,
163            io: self.io * factor,
164            network: self.network * factor,
165            memory: self.memory,             // Memory doesn't scale linearly
166            startup_cost: self.startup_cost, // startup is per-plan, not per-row
167            total: self.total * factor,
168        }
169    }
170
171    /// Plan-comparison helper. Picks `Less` when `self` should be
172    /// preferred over `other`.
173    ///
174    /// When `limit` is `Some(k)` and `k < 0.1 * cardinality`, the
175    /// comparison switches from `total` to `startup_cost` — the client
176    /// will only consume a small slice of the result, so we want the
177    /// plan that produces the first rows fastest even if the full scan
178    /// would be more expensive.
179    ///
180    /// This mirrors PostgreSQL's `compare_path_costs_fuzzily` logic for
181    /// `STARTUP` vs `TOTAL` cost ordering.
182    pub fn prefer_over(
183        &self,
184        other: &PlanCost,
185        limit: Option<u64>,
186        cardinality: f64,
187    ) -> std::cmp::Ordering {
188        let use_startup = matches!(limit, Some(k) if (k as f64) < 0.1 * cardinality.max(1.0));
189        let (lhs, rhs) = if use_startup {
190            (self.startup_cost, other.startup_cost)
191        } else {
192            (self.total, other.total)
193        };
194        lhs.partial_cmp(&rhs).unwrap_or(std::cmp::Ordering::Equal)
195    }
196}
197
198/// Statistics about a table or graph
199#[derive(Debug, Clone, Default)]
200pub struct TableStats {
201    /// Total row count
202    pub row_count: u64,
203    /// Average row size in bytes
204    pub avg_row_size: u32,
205    /// Number of pages
206    pub page_count: u64,
207    /// Column statistics
208    pub columns: Vec<ColumnStats>,
209}
210
211/// Statistics about a column
212#[derive(Debug, Clone, Default)]
213pub struct ColumnStats {
214    /// Column name
215    pub name: String,
216    /// Number of distinct values
217    pub distinct_count: u64,
218    /// Null count
219    pub null_count: u64,
220    /// Minimum value (if orderable)
221    pub min_value: Option<String>,
222    /// Maximum value (if orderable)
223    pub max_value: Option<String>,
224    /// Has index
225    pub has_index: bool,
226}
227
228/// Cost estimator for query plans
229pub struct CostEstimator {
230    /// Default table row count estimate
231    default_row_count: f64,
232    /// Cost per row scan
233    row_scan_cost: f64,
234    /// Cost per index lookup
235    index_lookup_cost: f64,
236    /// Cost per hash join probe
237    hash_probe_cost: f64,
238    /// Cost per nested loop iteration
239    nested_loop_cost: f64,
240    /// Cost per graph edge traversal
241    edge_traversal_cost: f64,
242    /// Optional stats provider. When present, `estimate_table_cardinality`
243    /// and the selectivity computation use real per-table / per-column
244    /// statistics instead of the heuristic constants. `None` preserves the
245    /// legacy behaviour so callers can adopt stats incrementally.
246    stats: Arc<dyn StatsProvider>,
247}
248
249impl CostEstimator {
250    /// Create a new cost estimator with default parameters and a
251    /// [`NullProvider`] — no real stats, pure heuristic mode.
252    pub fn new() -> Self {
253        Self {
254            default_row_count: 1000.0,
255            row_scan_cost: 1.0,
256            index_lookup_cost: 0.1,
257            hash_probe_cost: 0.5,
258            nested_loop_cost: 2.0,
259            edge_traversal_cost: 1.5,
260            stats: Arc::new(NullProvider),
261        }
262    }
263
264    /// Create a cost estimator that consults `provider` for real table /
265    /// column / index statistics. Any lookups the provider cannot satisfy
266    /// fall back to the heuristic path automatically.
267    pub fn with_stats(provider: Arc<dyn StatsProvider>) -> Self {
268        Self {
269            stats: provider,
270            ..Self::new()
271        }
272    }
273
274    /// Swap the stats provider on an existing estimator. Useful for tests
275    /// and for planners that build one `CostEstimator` and repoint it at
276    /// per-query snapshots.
277    pub fn set_stats(&mut self, provider: Arc<dyn StatsProvider>) {
278        self.stats = provider;
279    }
280
281    /// Estimate cost of a query expression
282    pub fn estimate(&self, query: &QueryExpr) -> PlanCost {
283        match query {
284            QueryExpr::Table(tq) => self.estimate_table(tq),
285            QueryExpr::Graph(gq) => self.estimate_graph(gq),
286            QueryExpr::Join(jq) => self.estimate_join(jq),
287            QueryExpr::Path(pq) => self.estimate_path(pq),
288            QueryExpr::Vector(vq) => self.estimate_vector(vq),
289            QueryExpr::Hybrid(hq) => self.estimate_hybrid(hq),
290            // DML/DDL statements have minimal query cost
291            QueryExpr::Insert(_)
292            | QueryExpr::Update(_)
293            | QueryExpr::Delete(_)
294            | QueryExpr::CreateTable(_)
295            | QueryExpr::CreateCollection(_)
296            | QueryExpr::CreateVector(_)
297            | QueryExpr::DropTable(_)
298            | QueryExpr::DropGraph(_)
299            | QueryExpr::DropVector(_)
300            | QueryExpr::DropDocument(_)
301            | QueryExpr::DropKv(_)
302            | QueryExpr::DropCollection(_)
303            | QueryExpr::Truncate(_)
304            | QueryExpr::AlterTable(_)
305            | QueryExpr::GraphCommand(_)
306            | QueryExpr::SearchCommand(_)
307            | QueryExpr::CreateIndex(_)
308            | QueryExpr::DropIndex(_)
309            | QueryExpr::ProbabilisticCommand(_)
310            | QueryExpr::Ask(_)
311            | QueryExpr::SetConfig { .. }
312            | QueryExpr::ShowConfig { .. }
313            | QueryExpr::SetSecret { .. }
314            | QueryExpr::DeleteSecret { .. }
315            | QueryExpr::ShowSecrets { .. }
316            | QueryExpr::SetTenant(_)
317            | QueryExpr::ShowTenant
318            | QueryExpr::CreateTimeSeries(_)
319            | QueryExpr::CreateMetric(_)
320            | QueryExpr::AlterMetric(_)
321            | QueryExpr::CreateSlo(_)
322            | QueryExpr::DropTimeSeries(_)
323            | QueryExpr::CreateQueue(_)
324            | QueryExpr::AlterQueue(_)
325            | QueryExpr::DropQueue(_)
326            | QueryExpr::QueueSelect(_)
327            | QueryExpr::QueueCommand(_)
328            | QueryExpr::KvCommand(_)
329            | QueryExpr::ConfigCommand(_)
330            | QueryExpr::CreateTree(_)
331            | QueryExpr::DropTree(_)
332            | QueryExpr::TreeCommand(_)
333            | QueryExpr::ExplainAlter(_)
334            | QueryExpr::TransactionControl(_)
335            | QueryExpr::MaintenanceCommand(_)
336            | QueryExpr::CreateSchema(_)
337            | QueryExpr::DropSchema(_)
338            | QueryExpr::CreateSequence(_)
339            | QueryExpr::DropSequence(_)
340            | QueryExpr::CopyFrom(_)
341            | QueryExpr::CreateView(_)
342            | QueryExpr::DropView(_)
343            | QueryExpr::RefreshMaterializedView(_)
344            | QueryExpr::CreatePolicy(_)
345            | QueryExpr::DropPolicy(_)
346            | QueryExpr::CreateServer(_)
347            | QueryExpr::DropServer(_)
348            | QueryExpr::CreateForeignTable(_)
349            | QueryExpr::DropForeignTable(_)
350            | QueryExpr::Grant(_)
351            | QueryExpr::Revoke(_)
352            | QueryExpr::AlterUser(_)
353            | QueryExpr::CreateIamPolicy { .. }
354            | QueryExpr::DropIamPolicy { .. }
355            | QueryExpr::AttachPolicy { .. }
356            | QueryExpr::DetachPolicy { .. }
357            | QueryExpr::ShowPolicies { .. }
358            | QueryExpr::ShowEffectivePermissions { .. }
359            | QueryExpr::SimulatePolicy { .. }
360            | QueryExpr::LintPolicy { .. }
361            | QueryExpr::MigratePolicyMode { .. }
362            | QueryExpr::CreateMigration(_)
363            | QueryExpr::ApplyMigration(_)
364            | QueryExpr::RollbackMigration(_)
365            | QueryExpr::ExplainMigration(_)
366            | QueryExpr::EventsBackfill(_)
367            | QueryExpr::EventsBackfillStatus { .. } => PlanCost::new(1.0, 1.0, 0.0),
368        }
369    }
370
371    /// Estimate cardinality of a query result
372    pub fn estimate_cardinality(&self, query: &QueryExpr) -> CardinalityEstimate {
373        match query {
374            QueryExpr::Table(tq) => self.estimate_table_cardinality(tq),
375            QueryExpr::Graph(gq) => self.estimate_graph_cardinality(gq),
376            QueryExpr::Join(jq) => self.estimate_join_cardinality(jq),
377            QueryExpr::Path(pq) => self.estimate_path_cardinality(pq),
378            QueryExpr::Vector(vq) => self.estimate_vector_cardinality(vq),
379            QueryExpr::Hybrid(hq) => self.estimate_hybrid_cardinality(hq),
380            // DML/DDL/Command statements return affected-row count or nothing
381            QueryExpr::Insert(_)
382            | QueryExpr::Update(_)
383            | QueryExpr::Delete(_)
384            | QueryExpr::CreateTable(_)
385            | QueryExpr::CreateCollection(_)
386            | QueryExpr::CreateVector(_)
387            | QueryExpr::DropTable(_)
388            | QueryExpr::DropGraph(_)
389            | QueryExpr::DropVector(_)
390            | QueryExpr::DropDocument(_)
391            | QueryExpr::DropKv(_)
392            | QueryExpr::DropCollection(_)
393            | QueryExpr::Truncate(_)
394            | QueryExpr::AlterTable(_)
395            | QueryExpr::GraphCommand(_)
396            | QueryExpr::SearchCommand(_)
397            | QueryExpr::CreateIndex(_)
398            | QueryExpr::DropIndex(_)
399            | QueryExpr::ProbabilisticCommand(_)
400            | QueryExpr::Ask(_)
401            | QueryExpr::SetConfig { .. }
402            | QueryExpr::ShowConfig { .. }
403            | QueryExpr::SetSecret { .. }
404            | QueryExpr::DeleteSecret { .. }
405            | QueryExpr::ShowSecrets { .. }
406            | QueryExpr::SetTenant(_)
407            | QueryExpr::ShowTenant
408            | QueryExpr::CreateTimeSeries(_)
409            | QueryExpr::CreateMetric(_)
410            | QueryExpr::AlterMetric(_)
411            | QueryExpr::CreateSlo(_)
412            | QueryExpr::DropTimeSeries(_)
413            | QueryExpr::CreateQueue(_)
414            | QueryExpr::AlterQueue(_)
415            | QueryExpr::DropQueue(_)
416            | QueryExpr::QueueSelect(_)
417            | QueryExpr::QueueCommand(_)
418            | QueryExpr::KvCommand(_)
419            | QueryExpr::ConfigCommand(_)
420            | QueryExpr::CreateTree(_)
421            | QueryExpr::DropTree(_)
422            | QueryExpr::TreeCommand(_)
423            | QueryExpr::ExplainAlter(_)
424            | QueryExpr::TransactionControl(_)
425            | QueryExpr::MaintenanceCommand(_)
426            | QueryExpr::CreateSchema(_)
427            | QueryExpr::DropSchema(_)
428            | QueryExpr::CreateSequence(_)
429            | QueryExpr::DropSequence(_)
430            | QueryExpr::CopyFrom(_)
431            | QueryExpr::CreateView(_)
432            | QueryExpr::DropView(_)
433            | QueryExpr::RefreshMaterializedView(_)
434            | QueryExpr::CreatePolicy(_)
435            | QueryExpr::DropPolicy(_)
436            | QueryExpr::CreateServer(_)
437            | QueryExpr::DropServer(_)
438            | QueryExpr::CreateForeignTable(_)
439            | QueryExpr::DropForeignTable(_)
440            | QueryExpr::Grant(_)
441            | QueryExpr::Revoke(_)
442            | QueryExpr::AlterUser(_)
443            | QueryExpr::CreateIamPolicy { .. }
444            | QueryExpr::DropIamPolicy { .. }
445            | QueryExpr::AttachPolicy { .. }
446            | QueryExpr::DetachPolicy { .. }
447            | QueryExpr::ShowPolicies { .. }
448            | QueryExpr::ShowEffectivePermissions { .. }
449            | QueryExpr::SimulatePolicy { .. }
450            | QueryExpr::LintPolicy { .. }
451            | QueryExpr::MigratePolicyMode { .. }
452            | QueryExpr::CreateMigration(_)
453            | QueryExpr::ApplyMigration(_)
454            | QueryExpr::RollbackMigration(_)
455            | QueryExpr::ExplainMigration(_)
456            | QueryExpr::EventsBackfill(_)
457            | QueryExpr::EventsBackfillStatus { .. } => CardinalityEstimate::new(1.0, 1.0),
458        }
459    }
460
461    // =========================================================================
462    // Table Query Estimation
463    // =========================================================================
464
465    fn estimate_table(&self, query: &TableQuery) -> PlanCost {
466        let cardinality = self.estimate_table_cardinality(query);
467
468        let cpu = cardinality.rows * self.row_scan_cost;
469
470        // I/O cost: use Mackert-Lohman when we have index stats and a filter
471        // column with a known index; otherwise fall back to the naive heuristic.
472        let io = self.estimate_table_io(query, cardinality.rows);
473
474        let memory = cardinality.rows * 100.0; // 100 bytes per row estimate
475
476        PlanCost::new(cpu, io, memory)
477    }
478
479    /// Compute the I/O page cost for a table scan.
480    ///
481    /// When the query has a simple equality or range filter on an indexed
482    /// column, use `IndexStats::correlated_io_cost` (Mackert-Lohman) which
483    /// accounts for `index_correlation` (0.0 = random I/O, 1.0 = sequential).
484    /// Falls back to the naive `rows / 100` heuristic otherwise.
485    fn estimate_table_io(&self, query: &TableQuery, result_rows: f64) -> f64 {
486        const ROWS_PER_PAGE: f64 = 100.0;
487
488        // Look up total heap pages from table stats if available
489        let table_stats = self.stats.table_stats(&query.table);
490        let heap_pages = table_stats
491            .map(|s| s.page_count as f64)
492            .unwrap_or_else(|| (result_rows / ROWS_PER_PAGE).max(1.0));
493
494        // If the filter is a simple comparison on an indexed column, use
495        // the Mackert-Lohman formula with correlation from IndexStats.
496        if let Some(filter) = crate::storage::query::sql_lowering::effective_table_filter(query) {
497            if let Some(col) = first_filter_column(&filter, &query.table) {
498                if let Some(idx) = self.stats.index_stats(&query.table, col) {
499                    return idx.correlated_io_cost(result_rows, heap_pages);
500                }
501            }
502        }
503
504        // Heuristic fallback: assume sequential pages = rows / 100
505        (result_rows / ROWS_PER_PAGE).ceil()
506    }
507
508    fn estimate_table_cardinality(&self, query: &TableQuery) -> CardinalityEstimate {
509        // Prefer real row counts from the stats provider; fall back to the
510        // heuristic `default_row_count` when no stats are registered.
511        let base_rows = self
512            .stats
513            .table_stats(&query.table)
514            .map(|s| s.row_count as f64)
515            .unwrap_or(self.default_row_count);
516
517        let mut estimate = CardinalityEstimate::full_scan(base_rows);
518
519        // Apply filter selectivity (stats-aware when provider has index
520        // stats on the compared column).
521        if let Some(filter) = crate::storage::query::sql_lowering::effective_table_filter(query) {
522            let selectivity = self.filter_selectivity(&filter, &query.table);
523            estimate = estimate.with_filter(selectivity);
524        }
525
526        // Apply limit
527        if let Some(limit) = query.limit {
528            estimate.rows = estimate.rows.min(limit as f64);
529        }
530
531        estimate
532    }
533
534    /// Stats-aware selectivity computation.
535    ///
536    /// Resolution order (best → worst):
537    ///   1. `column_mcv` for equality on a known frequent value
538    ///   2. `column_histogram` for ranges and BETWEEN
539    ///   3. `index_stats.point_selectivity()` for indexed columns
540    ///   4. Hardcoded heuristic constants as final fallback
541    ///
542    /// Mirrors postgres `var_eq_const` / `histogram_selectivity` in
543    /// `src/backend/utils/adt/selfuncs.c`. Histogram + MCV data
544    /// structures already live in `super::histogram`; this method is
545    /// where we finally consume them on the hot planner path.
546    fn filter_selectivity(&self, filter: &AstFilter, table: &str) -> f64 {
547        match filter {
548            AstFilter::Compare { field, op, value } => {
549                let column = column_name_for_table(field, table);
550                match op {
551                    CompareOp::Eq => self.eq_selectivity(table, column, value),
552                    CompareOp::Ne => 1.0 - self.eq_selectivity(table, column, value),
553                    CompareOp::Lt | CompareOp::Le => {
554                        self.range_selectivity(table, column, None, Some(value))
555                    }
556                    CompareOp::Gt | CompareOp::Ge => {
557                        self.range_selectivity(table, column, Some(value), None)
558                    }
559                }
560            }
561            AstFilter::Between {
562                field, low, high, ..
563            } => {
564                let column = column_name_for_table(field, table);
565                self.range_selectivity(table, column, Some(low), Some(high))
566            }
567            AstFilter::In { field, values, .. } => {
568                let column = column_name_for_table(field, table);
569                // If we have an MCV list, sum the per-value frequencies
570                // for values that are actually in the list, plus the
571                // residual estimate for the rest.
572                if let Some(c) = column {
573                    if let Some(mcv) = self.stats.column_mcv(table, c) {
574                        let mut hits: f64 = 0.0;
575                        let mut residual_count = 0usize;
576                        for v in values {
577                            if let Some(cv) = column_value_from(v) {
578                                if let Some(freq) = mcv.frequency_of(&cv) {
579                                    hits += freq;
580                                } else {
581                                    residual_count += 1;
582                                }
583                            } else {
584                                residual_count += 1;
585                            }
586                        }
587                        let total = mcv.total_frequency();
588                        let distinct = self.stats.distinct_values(table, c).unwrap_or(100);
589                        let non_mcv_distinct =
590                            distinct.saturating_sub(mcv.values.len() as u64).max(1);
591                        let per_residual = (1.0 - total) / non_mcv_distinct as f64;
592                        let estimate = hits + (residual_count as f64) * per_residual;
593                        return estimate.clamp(0.0, 1.0).min(0.5);
594                    }
595                    if let Some(s) = self.stats.index_stats(table, c) {
596                        return (s.point_selectivity() * values.len() as f64).min(0.5);
597                    }
598                }
599                (values.len() as f64 * 0.01).min(0.5)
600            }
601            AstFilter::Like { .. } => 0.1,
602            AstFilter::StartsWith { .. } => 0.15,
603            AstFilter::EndsWith { .. } => 0.15,
604            AstFilter::Contains { .. } => 0.1,
605            AstFilter::IsNull { .. } => 0.01,
606            AstFilter::IsNotNull { .. } => 0.99,
607            AstFilter::And(left, right) => {
608                self.filter_selectivity(left, table) * self.filter_selectivity(right, table)
609            }
610            AstFilter::Or(left, right) => {
611                let s1 = self.filter_selectivity(left, table);
612                let s2 = self.filter_selectivity(right, table);
613                s1 + s2 - (s1 * s2)
614            }
615            AstFilter::Not(inner) => 1.0 - self.filter_selectivity(inner, table),
616            AstFilter::CompareFields { .. } => {
617                // Column-to-column predicates lack histogram leverage
618                // — assume moderate selectivity. Histogram/MCV hooks
619                // only help literal-valued filters.
620                0.1
621            }
622            AstFilter::CompareExpr { .. } => {
623                // Expression-shaped predicates: conservative 0.1 until
624                // the planner learns to walk Expr trees. Matches the
625                // CompareFields default.
626                0.1
627            }
628        }
629    }
630
631    // =========================================================================
632    // Graph Query Estimation
633    // =========================================================================
634
635    fn estimate_graph(&self, query: &GraphQuery) -> PlanCost {
636        let cardinality = self.estimate_graph_cardinality(query);
637
638        // Graph queries are more expensive due to pointer chasing
639        let nodes = query.pattern.nodes.len() as f64;
640        let edges = query.pattern.edges.len() as f64;
641
642        let cpu = cardinality.rows * self.edge_traversal_cost * (nodes + edges);
643        let io = cardinality.rows * 0.1; // More random IO
644        let memory = cardinality.rows * 200.0; // Larger due to paths
645
646        PlanCost::new(cpu, io, memory)
647    }
648
649    fn estimate_graph_cardinality(&self, query: &GraphQuery) -> CardinalityEstimate {
650        let nodes = query.pattern.nodes.len() as f64;
651        let edges = query.pattern.edges.len() as f64;
652
653        // Each edge reduces cardinality
654        let base_rows = self.default_row_count;
655        let edge_factor = 0.1_f64.powf(edges); // Each edge is highly selective
656
657        let mut estimate = CardinalityEstimate::new(base_rows * nodes * edge_factor, edge_factor);
658        estimate.confidence = 0.5; // Graph estimates are less accurate
659
660        // Apply filter
661        if let Some(ref filter) = query.filter {
662            let selectivity = Self::estimate_filter_selectivity(filter);
663            estimate = estimate.with_filter(selectivity);
664        }
665
666        estimate
667    }
668
669    // =========================================================================
670    // Join Query Estimation
671    // =========================================================================
672
673    fn estimate_join(&self, query: &JoinQuery) -> PlanCost {
674        let left_cost = self.estimate(&query.left);
675        let right_cost = self.estimate(&query.right);
676
677        let left_card = self.estimate_cardinality(&query.left);
678        let right_card = self.estimate_cardinality(&query.right);
679
680        // Hash join cost model.
681        //
682        // Build side (left) is **blocking** — we must drain the entire
683        // left input and populate the hash table before any probe can
684        // produce its first output row. Probe side (right) is then
685        // streamed pipelined.
686        let build_cpu = left_card.rows * self.hash_probe_cost;
687        let probe_cpu = right_card.rows * self.hash_probe_cost;
688        let join_memory = left_card.rows * 100.0; // hash table footprint
689
690        // The build operator: zero work upstream, blocking on left input.
691        let build_op = PlanCost::with_startup(build_cpu, 0.0, join_memory, build_cpu);
692        // The probe operator: pipelined over right input.
693        let probe_op = PlanCost::new(probe_cpu, 0.0, 0.0);
694
695        // Compose: left → block on build → pipelined probe with right.
696        let after_build = left_cost.combine_blocking(&build_op);
697        after_build
698            .combine_pipelined(&right_cost)
699            .combine_pipelined(&probe_op)
700    }
701
702    fn estimate_join_cardinality(&self, query: &JoinQuery) -> CardinalityEstimate {
703        let left = self.estimate_cardinality(&query.left);
704        let right = self.estimate_cardinality(&query.right);
705
706        // Join selectivity based on join type
707        let selectivity = match query.join_type {
708            JoinType::Inner => 0.1,      // Inner join is selective
709            JoinType::LeftOuter => 1.0,  // Left join preserves left side
710            JoinType::RightOuter => 1.0, // Right join preserves right side
711            JoinType::FullOuter => 1.0,  // Full outer preserves both sides entirely
712            JoinType::Cross => 1.0,      // Cartesian product — every pair matches
713        };
714
715        CardinalityEstimate::new(
716            left.rows * right.rows * selectivity,
717            left.selectivity * right.selectivity * selectivity,
718        )
719    }
720
721    // =========================================================================
722    // Path Query Estimation
723    // =========================================================================
724
725    fn estimate_path(&self, query: &PathQuery) -> PlanCost {
726        let cardinality = self.estimate_path_cardinality(query);
727
728        // BFS/DFS cost
729        let max_hops = query.max_length;
730        let branching_factor: f64 = 5.0; // Average edges per node
731
732        let nodes_visited = branching_factor.powf(max_hops as f64).min(10000.0);
733        let cpu = nodes_visited * self.edge_traversal_cost;
734        let io = nodes_visited * 0.1;
735        let memory = nodes_visited * 50.0; // Visited set
736
737        PlanCost::new(cpu, io, memory)
738    }
739
740    fn estimate_path_cardinality(&self, query: &PathQuery) -> CardinalityEstimate {
741        // Path queries typically return few results
742        let max_paths = 10.0;
743        CardinalityEstimate::new(max_paths, 0.001)
744    }
745
746    // =========================================================================
747    // Vector Query Estimation
748    // =========================================================================
749
750    fn estimate_vector(&self, query: &VectorQuery) -> PlanCost {
751        // HNSW search is O(log n) with relatively low constant
752        // Typical search visits ~100-500 nodes for 1M vectors
753        let k = query.k as f64;
754
755        // Base cost from HNSW traversal — must descend the layer graph
756        // before *any* candidate can be returned. This is the operator's
757        // intrinsic startup cost.
758        let hnsw_cost = 100.0 * (1.0 + k.ln()); // ~100-300 node visits
759
760        // Metadata filtering adds cost if present
761        let filter_cost =
762            if crate::storage::query::sql_lowering::effective_vector_filter(query).is_some() {
763                50.0
764            } else {
765                0.0
766            };
767
768        let cpu = hnsw_cost + filter_cost;
769        let io = 20.0; // HNSW layers are cached
770        let memory = k * 32.0 + 1000.0; // k results + working set
771
772        // Vector search is *partly* blocking: HNSW must traverse the
773        // entry layers before the first neighbour is known, so the
774        // first-row cost is roughly the descent cost. Subsequent rows
775        // come essentially free until `k`.
776        PlanCost::with_startup(cpu, io, memory, hnsw_cost * 0.5)
777    }
778
779    fn estimate_vector_cardinality(&self, query: &VectorQuery) -> CardinalityEstimate {
780        // Vector search returns exactly k results (or fewer if not enough vectors)
781        let k = query.k as f64;
782        CardinalityEstimate::new(k, 0.1)
783    }
784
785    // =========================================================================
786    // Hybrid Query Estimation
787    // =========================================================================
788
789    fn estimate_hybrid(&self, query: &HybridQuery) -> PlanCost {
790        // Hybrid cost = structured + vector + fusion overhead
791        let structured_cost = self.estimate(&query.structured);
792        let vector_cost = self.estimate_vector(&query.vector);
793
794        // Fusion overhead depends on strategy
795        let fusion_overhead = match &query.fusion {
796            crate::storage::query::ast::FusionStrategy::Rerank { .. } => 50.0,
797            crate::storage::query::ast::FusionStrategy::FilterThenSearch => 10.0,
798            crate::storage::query::ast::FusionStrategy::SearchThenFilter => 10.0,
799            crate::storage::query::ast::FusionStrategy::RRF { .. } => 30.0,
800            crate::storage::query::ast::FusionStrategy::Intersection => 20.0,
801            crate::storage::query::ast::FusionStrategy::Union { .. } => 40.0,
802        };
803
804        PlanCost::new(
805            structured_cost.cpu + vector_cost.cpu + fusion_overhead,
806            structured_cost.io + vector_cost.io,
807            structured_cost.memory + vector_cost.memory,
808        )
809    }
810
811    fn estimate_hybrid_cardinality(&self, query: &HybridQuery) -> CardinalityEstimate {
812        let structured_card = self.estimate_cardinality(&query.structured);
813        let vector_card = self.estimate_vector_cardinality(&query.vector);
814
815        // Result size depends on fusion strategy
816        let rows = match &query.fusion {
817            crate::storage::query::ast::FusionStrategy::Intersection => {
818                structured_card.rows.min(vector_card.rows)
819            }
820            crate::storage::query::ast::FusionStrategy::Union { .. } => {
821                structured_card.rows + vector_card.rows
822            }
823            _ => vector_card.rows, // Rerank and filter strategies return vector k
824        };
825
826        CardinalityEstimate::new(rows, 0.2)
827    }
828
829    // =========================================================================
830    // Filter Selectivity
831    // =========================================================================
832
833    fn estimate_filter_selectivity(filter: &AstFilter) -> f64 {
834        match filter {
835            AstFilter::Compare { op, .. } => {
836                match op {
837                    CompareOp::Eq => 0.01, // Equality is very selective
838                    CompareOp::Ne => 0.99, // Inequality is not selective
839                    CompareOp::Lt | CompareOp::Le => 0.3,
840                    CompareOp::Gt | CompareOp::Ge => 0.3,
841                }
842            }
843            AstFilter::Between { .. } => 0.25,
844            AstFilter::In { values, .. } => {
845                // Each value adds 1% selectivity
846                (values.len() as f64 * 0.01).min(0.5)
847            }
848            AstFilter::Like { .. } => 0.1,
849            AstFilter::StartsWith { .. } => 0.15,
850            AstFilter::EndsWith { .. } => 0.15,
851            AstFilter::Contains { .. } => 0.1,
852            AstFilter::IsNull { .. } => 0.01,
853            AstFilter::IsNotNull { .. } => 0.99,
854            AstFilter::And(left, right) => {
855                Self::estimate_filter_selectivity(left) * Self::estimate_filter_selectivity(right)
856            }
857            AstFilter::Or(left, right) => {
858                let s1 = Self::estimate_filter_selectivity(left);
859                let s2 = Self::estimate_filter_selectivity(right);
860                s1 + s2 - (s1 * s2) // Inclusion-exclusion
861            }
862            AstFilter::Not(inner) => 1.0 - Self::estimate_filter_selectivity(inner),
863            AstFilter::CompareFields { .. } => 0.1,
864            AstFilter::CompareExpr { .. } => 0.1,
865        }
866    }
867}
868
869impl CostEstimator {
870    /// Equality selectivity for `column = value`.
871    ///
872    /// Resolution order:
873    /// 1. MCV list — exact frequency for tracked values, residual
874    ///    formula for untracked values.
875    /// 2. `index_stats.point_selectivity()` — `1 / distinct_keys`.
876    /// 3. Heuristic constant `0.01`.
877    fn eq_selectivity(&self, table: &str, column: Option<&str>, value: &Value) -> f64 {
878        if let Some(col) = column {
879            // 1. Most-common-values lookup.
880            if let Some(mcv) = self.stats.column_mcv(table, col) {
881                if let Some(cv) = column_value_from(value) {
882                    if let Some(freq) = mcv.frequency_of(&cv) {
883                        return freq;
884                    }
885                    // Untracked value: residual / non_mcv_distinct.
886                    let total = mcv.total_frequency();
887                    let distinct = self.stats.distinct_values(table, col).unwrap_or(100);
888                    let non_mcv_distinct = distinct.saturating_sub(mcv.values.len() as u64).max(1);
889                    return ((1.0 - total) / non_mcv_distinct as f64).clamp(0.0, 1.0);
890                }
891            }
892            // 2. Index stats fallback.
893            if let Some(s) = self.stats.index_stats(table, col) {
894                return s.point_selectivity();
895            }
896        }
897        // 3. Heuristic.
898        0.01
899    }
900
901    /// Range selectivity for `lo <= column <= hi`. Either bound may
902    /// be `None` to express an open side. Used by `<`, `<=`, `>`,
903    /// `>=`, and `BETWEEN`.
904    ///
905    /// Resolution order:
906    /// 1. Histogram — `Histogram::range_selectivity` with bounds
907    ///    converted via `column_value_from`.
908    /// 2. `index_stats.point_selectivity() * (distinct_keys / 2)`
909    ///    capped at the legacy heuristic.
910    /// 3. Heuristic `0.3` for one-sided, `0.25` for two-sided.
911    fn range_selectivity(
912        &self,
913        table: &str,
914        column: Option<&str>,
915        lo: Option<&Value>,
916        hi: Option<&Value>,
917    ) -> f64 {
918        if let Some(col) = column {
919            // 1. Histogram bucket arithmetic.
920            if let Some(h) = self.stats.column_histogram(table, col) {
921                let lo_cv = lo.and_then(column_value_from);
922                let hi_cv = hi.and_then(column_value_from);
923                return h.range_selectivity(lo_cv.as_ref(), hi_cv.as_ref());
924            }
925            // 2. Index stats fallback.
926            if let Some(s) = self.stats.index_stats(table, col) {
927                let cap = if lo.is_some() && hi.is_some() {
928                    0.25
929                } else {
930                    0.3
931                };
932                return (s.point_selectivity() * (s.distinct_keys as f64 / 2.0)).min(cap);
933            }
934        }
935        // 3. Heuristic.
936        if lo.is_some() && hi.is_some() {
937            0.25
938        } else {
939            0.3
940        }
941    }
942}
943
944impl Default for CostEstimator {
945    fn default() -> Self {
946        Self::new()
947    }
948}
949
950/// Convert a query AST `Value` into a histogram-comparable
951/// [`super::histogram::ColumnValue`]. Returns `None` for value types
952/// that histograms don't support (Bool, Null, Bytes, etc.) — callers
953/// fall through to the heuristic path.
954fn column_value_from(v: &crate::storage::schema::Value) -> Option<super::histogram::ColumnValue> {
955    use super::histogram::ColumnValue;
956    use crate::storage::schema::Value;
957    match v {
958        Value::Integer(i) | Value::BigInt(i) => Some(ColumnValue::Int(*i)),
959        Value::UnsignedInteger(u) => Some(ColumnValue::Int(*u as i64)),
960        Value::Float(f) if f.is_finite() => Some(ColumnValue::Float(*f)),
961        Value::Text(s) => Some(ColumnValue::Text(s.to_string())),
962        Value::Email(s)
963        | Value::Url(s)
964        | Value::NodeRef(s)
965        | Value::EdgeRef(s)
966        | Value::TableRef(s)
967        | Value::Password(s) => Some(ColumnValue::Text(s.clone())),
968        Value::Timestamp(t) => Some(ColumnValue::Int(*t)),
969        Value::Duration(d) => Some(ColumnValue::Int(*d)),
970        Value::TimestampMs(t) => Some(ColumnValue::Int(*t)),
971        Value::Decimal(d) => Some(ColumnValue::Int(*d)),
972        Value::Date(d) => Some(ColumnValue::Int(i64::from(*d))),
973        Value::Time(t) => Some(ColumnValue::Int(i64::from(*t))),
974        Value::Phone(p) => Some(ColumnValue::Int(*p as i64)),
975        Value::Semver(v) => Some(ColumnValue::Int(i64::from(*v))),
976        Value::Port(v) => Some(ColumnValue::Int(i64::from(*v))),
977        Value::PageRef(v) => Some(ColumnValue::Int(i64::from(*v))),
978        Value::EnumValue(v) => Some(ColumnValue::Int(i64::from(*v))),
979        Value::Latitude(v) => Some(ColumnValue::Int(i64::from(*v))),
980        Value::Longitude(v) => Some(ColumnValue::Int(i64::from(*v))),
981        // Other variants (Null, Blob, Boolean, IpAddr, MacAddr,
982        // Vector, Json, Uuid, NodeRef, EdgeRef, vector ref...) are
983        // not orderable in a histogram-meaningful way; the planner
984        // falls through to the heuristic for these.
985        _ => None,
986    }
987}
988
989/// Resolve a `FieldRef` to a bare column name when it refers to `table`.
990/// Returns `None` when the field targets another relation — in that case
991/// Extract the first plain column name from a filter for index-stat lookup.
992/// Walks AND nodes; stops at OR/NOT (too complex for simple correlation lookup).
993fn first_filter_column<'a>(filter: &'a AstFilter, table: &str) -> Option<&'a str> {
994    match filter {
995        AstFilter::Compare { field, .. } => column_name_for_table(field, table),
996        AstFilter::Between { field, .. } => column_name_for_table(field, table),
997        AstFilter::And(l, r) => {
998            first_filter_column(l, table).or_else(|| first_filter_column(r, table))
999        }
1000        _ => None,
1001    }
1002}
1003
1004/// the legacy heuristic still applies.
1005fn column_name_for_table<'a>(field: &'a FieldRef, table: &str) -> Option<&'a str> {
1006    match field {
1007        FieldRef::TableColumn { table: t, column } if t == table || t.is_empty() => {
1008            Some(column.as_str())
1009        }
1010        // Node / edge property refs don't map to table-level stats.
1011        _ => None,
1012    }
1013}
1014
1015#[cfg(test)]
1016mod tests {
1017    use super::super::stats_provider::StaticProvider;
1018    use super::*;
1019    use crate::storage::index::{IndexKind, IndexStats};
1020    use crate::storage::query::ast::{FieldRef, Projection};
1021    use crate::storage::schema::Value;
1022
1023    fn eq_filter(table: &str, column: &str, value: i64) -> AstFilter {
1024        AstFilter::Compare {
1025            field: FieldRef::column(table, column),
1026            op: CompareOp::Eq,
1027            value: Value::Integer(value),
1028        }
1029    }
1030
1031    fn table_query(name: &str, filter: Option<AstFilter>) -> TableQuery {
1032        TableQuery {
1033            table: name.to_string(),
1034            source: None,
1035            alias: None,
1036            select_items: Vec::new(),
1037            columns: vec![Projection::All],
1038            where_expr: None,
1039            filter,
1040            group_by_exprs: Vec::new(),
1041            group_by: Vec::new(),
1042            having_expr: None,
1043            having: None,
1044            order_by: vec![],
1045            limit: None,
1046            limit_param: None,
1047            offset: None,
1048            offset_param: None,
1049            expand: None,
1050            as_of: None,
1051            sessionize: None,
1052        }
1053    }
1054
1055    #[test]
1056    fn injected_row_count_overrides_default() {
1057        let provider = Arc::new(StaticProvider::new().with_table(
1058            "users",
1059            TableStats {
1060                row_count: 50_000,
1061                avg_row_size: 256,
1062                page_count: 500,
1063                columns: vec![],
1064            },
1065        ));
1066        let estimator = CostEstimator::with_stats(provider);
1067        let q = table_query("users", None);
1068        let card = estimator.estimate_table_cardinality(&q);
1069        // Default would be 1000; provider says 50_000.
1070        assert_eq!(card.rows, 50_000.0);
1071    }
1072
1073    #[test]
1074    fn stats_aware_eq_selectivity_beats_default() {
1075        let provider = Arc::new(
1076            StaticProvider::new()
1077                .with_table(
1078                    "users",
1079                    TableStats {
1080                        row_count: 1_000_000,
1081                        avg_row_size: 256,
1082                        page_count: 10_000,
1083                        columns: vec![],
1084                    },
1085                )
1086                .with_index(
1087                    "users",
1088                    "email",
1089                    IndexStats {
1090                        entries: 1_000_000,
1091                        distinct_keys: 1_000_000,
1092                        approx_bytes: 0,
1093                        kind: IndexKind::Hash,
1094                        has_bloom: true,
1095                        index_correlation: 0.0,
1096                    },
1097                ),
1098        );
1099        let estimator = CostEstimator::with_stats(provider);
1100        let q = table_query("users", Some(eq_filter("users", "email", 0)));
1101        let card = estimator.estimate_table_cardinality(&q);
1102        // 1M rows × (1 / 1M distinct) ≈ 1 row
1103        assert!(card.rows < 2.0, "expected ~1 row, got {}", card.rows);
1104    }
1105
1106    #[test]
1107    fn fallback_when_no_index_stats() {
1108        let provider = Arc::new(StaticProvider::new().with_table(
1109            "users",
1110            TableStats {
1111                row_count: 1_000_000,
1112                avg_row_size: 256,
1113                page_count: 10_000,
1114                columns: vec![],
1115            },
1116        ));
1117        let estimator = CostEstimator::with_stats(provider);
1118        let q = table_query("users", Some(eq_filter("users", "email", 0)));
1119        let card = estimator.estimate_table_cardinality(&q);
1120        // Heuristic 0.01 on 1M rows = 10_000
1121        assert!((card.rows - 10_000.0).abs() < 1.0);
1122    }
1123
1124    #[test]
1125    fn null_provider_keeps_legacy_behaviour() {
1126        let estimator = CostEstimator::new();
1127        let q = table_query("whatever", Some(eq_filter("whatever", "id", 1)));
1128        let card = estimator.estimate_table_cardinality(&q);
1129        // Default 1000 rows × 0.01 eq selectivity = 10
1130        assert!((card.rows - 10.0).abs() < 1.0);
1131    }
1132
1133    #[test]
1134    fn and_combines_stats_selectivities() {
1135        let provider = Arc::new(
1136            StaticProvider::new()
1137                .with_table(
1138                    "t",
1139                    TableStats {
1140                        row_count: 100_000,
1141                        avg_row_size: 64,
1142                        page_count: 100,
1143                        columns: vec![],
1144                    },
1145                )
1146                .with_index(
1147                    "t",
1148                    "a",
1149                    IndexStats {
1150                        entries: 100_000,
1151                        distinct_keys: 10,
1152                        approx_bytes: 0,
1153                        kind: IndexKind::BTree,
1154                        has_bloom: false,
1155                        index_correlation: 0.0,
1156                    },
1157                )
1158                .with_index(
1159                    "t",
1160                    "b",
1161                    IndexStats {
1162                        entries: 100_000,
1163                        distinct_keys: 1000,
1164                        approx_bytes: 0,
1165                        kind: IndexKind::BTree,
1166                        has_bloom: false,
1167                        index_correlation: 0.0,
1168                    },
1169                ),
1170        );
1171        let estimator = CostEstimator::with_stats(provider);
1172        let filter = AstFilter::And(
1173            Box::new(eq_filter("t", "a", 1)),
1174            Box::new(eq_filter("t", "b", 1)),
1175        );
1176        let q = table_query("t", Some(filter));
1177        let card = estimator.estimate_table_cardinality(&q);
1178        // 100_000 × (1/10) × (1/1000) = 10
1179        assert!(card.rows < 15.0, "got {}", card.rows);
1180    }
1181
1182    #[test]
1183    fn test_table_cost_estimation() {
1184        let estimator = CostEstimator::new();
1185
1186        let query = QueryExpr::Table(TableQuery {
1187            table: "hosts".to_string(),
1188            source: None,
1189            alias: None,
1190            select_items: Vec::new(),
1191            columns: vec![Projection::All],
1192            where_expr: None,
1193            filter: None,
1194            group_by_exprs: Vec::new(),
1195            group_by: Vec::new(),
1196            having_expr: None,
1197            having: None,
1198            order_by: vec![],
1199            limit: None,
1200            limit_param: None,
1201            offset: None,
1202            offset_param: None,
1203            expand: None,
1204            as_of: None,
1205            sessionize: None,
1206        });
1207
1208        let cost = estimator.estimate(&query);
1209        assert!(cost.cpu > 0.0);
1210        assert!(cost.total > 0.0);
1211    }
1212
1213    #[test]
1214    fn test_filter_selectivity() {
1215        let estimator = CostEstimator::new();
1216
1217        let eq_filter = AstFilter::Compare {
1218            field: FieldRef::column("hosts", "id"),
1219            op: CompareOp::Eq,
1220            value: Value::Integer(1),
1221        };
1222        assert!(CostEstimator::estimate_filter_selectivity(&eq_filter) < 0.1);
1223
1224        let ne_filter = AstFilter::Compare {
1225            field: FieldRef::column("hosts", "id"),
1226            op: CompareOp::Ne,
1227            value: Value::Integer(1),
1228        };
1229        assert!(CostEstimator::estimate_filter_selectivity(&ne_filter) > 0.9);
1230    }
1231
1232    #[test]
1233    fn test_and_selectivity() {
1234        let estimator = CostEstimator::new();
1235
1236        let and_filter = AstFilter::And(
1237            Box::new(AstFilter::Compare {
1238                field: FieldRef::column("hosts", "a"),
1239                op: CompareOp::Eq,
1240                value: Value::Integer(1),
1241            }),
1242            Box::new(AstFilter::Compare {
1243                field: FieldRef::column("hosts", "b"),
1244                op: CompareOp::Eq,
1245                value: Value::Integer(2),
1246            }),
1247        );
1248
1249        let selectivity = CostEstimator::estimate_filter_selectivity(&and_filter);
1250        assert!(selectivity < 0.01); // AND should be very selective
1251    }
1252
1253    #[test]
1254    fn test_cardinality_with_limit() {
1255        let estimator = CostEstimator::new();
1256
1257        let query = TableQuery {
1258            table: "hosts".to_string(),
1259            source: None,
1260            alias: None,
1261            select_items: Vec::new(),
1262            columns: vec![Projection::All],
1263            where_expr: None,
1264            filter: None,
1265            group_by_exprs: Vec::new(),
1266            group_by: Vec::new(),
1267            having_expr: None,
1268            having: None,
1269            order_by: vec![],
1270            limit: Some(10),
1271            limit_param: None,
1272            offset: None,
1273            offset_param: None,
1274            expand: None,
1275            as_of: None,
1276            sessionize: None,
1277        };
1278
1279        let card = estimator.estimate_table_cardinality(&query);
1280        assert!(card.rows <= 10.0);
1281    }
1282
1283    // ---------------------------------------------------------------
1284    // Target 2: startup_cost vs total_cost split
1285    // ---------------------------------------------------------------
1286
1287    #[test]
1288    fn startup_zero_for_full_scan() {
1289        // estimate_table is implemented as a streaming sequential scan
1290        // (no startup cost — the first row is producible as soon as the
1291        // first page is read).
1292        let estimator = CostEstimator::new();
1293        let q = table_query("any_table", None);
1294        let cost = estimator.estimate(&QueryExpr::Table(q));
1295        assert_eq!(cost.startup_cost, 0.0, "full scan must have zero startup");
1296        assert!(cost.total > 0.0);
1297    }
1298
1299    #[test]
1300    fn startup_nonzero_for_blocking_combine() {
1301        // combine_blocking models a sort or hash build: the input must
1302        // be fully consumed before the blocker can emit its first row.
1303        let input = PlanCost::new(100.0, 10.0, 50.0); // cost = 100 + 100 + 5 = 205
1304        let blocker = PlanCost::new(20.0, 0.0, 10.0); // cost = 20 + 0 + 1 = 21
1305        let composed = input.combine_blocking(&blocker);
1306        // Blocking startup absorbs all of input.total
1307        assert_eq!(composed.startup_cost, input.total);
1308        // Total is input.total + blocker.total
1309        assert_eq!(composed.total, input.total + blocker.total);
1310        assert!(composed.startup_cost > 0.0);
1311    }
1312
1313    #[test]
1314    fn pipelined_combine_adds_startup_directly() {
1315        let upstream = PlanCost::with_startup(50.0, 5.0, 10.0, 30.0);
1316        let downstream = PlanCost::with_startup(20.0, 0.0, 0.0, 5.0);
1317        let composed = upstream.combine_pipelined(&downstream);
1318        assert_eq!(composed.startup_cost, 30.0 + 5.0);
1319        assert_eq!(composed.total, upstream.total + downstream.total);
1320    }
1321
1322    #[test]
1323    fn cost_prefers_low_startup_when_limit_small() {
1324        // Two plans with the same total but different startup. With a
1325        // small LIMIT, the planner must pick the low-startup plan.
1326        let fast_first = PlanCost {
1327            cpu: 100.0,
1328            io: 10.0,
1329            network: 0.0,
1330            memory: 50.0,
1331            startup_cost: 5.0,
1332            total: 200.0,
1333        };
1334        let slow_first = PlanCost {
1335            cpu: 100.0,
1336            io: 10.0,
1337            network: 0.0,
1338            memory: 50.0,
1339            startup_cost: 150.0,
1340            total: 200.0,
1341        };
1342        // Cardinality 10_000, LIMIT 10 → 10 < 0.1 * 10_000 = 1000 → use startup.
1343        assert_eq!(
1344            fast_first.prefer_over(&slow_first, Some(10), 10_000.0),
1345            std::cmp::Ordering::Less
1346        );
1347    }
1348
1349    #[test]
1350    fn cost_prefers_low_total_when_no_limit() {
1351        // Same two plans, no LIMIT — total wins.
1352        let low_total = PlanCost {
1353            cpu: 50.0,
1354            io: 5.0,
1355            network: 0.0,
1356            memory: 0.0,
1357            startup_cost: 30.0,
1358            total: 100.0,
1359        };
1360        let high_total = PlanCost {
1361            cpu: 100.0,
1362            io: 10.0,
1363            network: 0.0,
1364            memory: 0.0,
1365            startup_cost: 5.0,
1366            total: 200.0,
1367        };
1368        assert_eq!(
1369            low_total.prefer_over(&high_total, None, 10_000.0),
1370            std::cmp::Ordering::Less
1371        );
1372    }
1373
1374    #[test]
1375    fn limit_threshold_falls_back_to_total_when_limit_large() {
1376        // LIMIT 5000 vs cardinality 10_000 → 5000 > 1000 → use total.
1377        let low_total = PlanCost {
1378            cpu: 50.0,
1379            io: 5.0,
1380            network: 0.0,
1381            memory: 0.0,
1382            startup_cost: 30.0,
1383            total: 100.0,
1384        };
1385        let low_startup = PlanCost {
1386            cpu: 100.0,
1387            io: 10.0,
1388            network: 0.0,
1389            memory: 0.0,
1390            startup_cost: 5.0,
1391            total: 200.0,
1392        };
1393        assert_eq!(
1394            low_total.prefer_over(&low_startup, Some(5000), 10_000.0),
1395            std::cmp::Ordering::Less
1396        );
1397    }
1398
1399    #[test]
1400    fn hash_join_startup_includes_build_cost() {
1401        // Direct combine_blocking semantics: a hash join must drain the
1402        // left input and build the hash table before producing the first
1403        // probe result.
1404        let left = PlanCost::new(80.0, 8.0, 100.0); // table scan
1405        let build = PlanCost::with_startup(50.0, 0.0, 200.0, 50.0); // build op
1406        let after_build = left.combine_blocking(&build);
1407        assert!(
1408            after_build.startup_cost >= left.total,
1409            "after-build startup ({}) must absorb left.total ({})",
1410            after_build.startup_cost,
1411            left.total
1412        );
1413        assert!(after_build.total >= after_build.startup_cost);
1414    }
1415
1416    #[test]
1417    fn vector_search_reports_nonzero_startup() {
1418        // estimate_vector now uses with_startup so HNSW descent shows
1419        // up as startup_cost > 0 (and < total — subsequent neighbours
1420        // are essentially free).
1421        let estimator = CostEstimator::new();
1422        // We can't easily build a VectorQuery without the AST helpers,
1423        // so test the direct cost surface with_startup uses.
1424        let v = PlanCost::with_startup(150.0, 20.0, 1320.0, 50.0);
1425        assert!(v.startup_cost > 0.0);
1426        assert!(v.startup_cost < v.total);
1427        let _ = estimator; // suppress unused
1428    }
1429
1430    #[test]
1431    fn with_startup_clamps_total_below_startup() {
1432        // If a caller asks for total < startup, with_startup raises total.
1433        let cost = PlanCost::with_startup(1.0, 0.0, 0.0, 100.0);
1434        assert!(cost.total >= cost.startup_cost);
1435    }
1436
1437    #[test]
1438    fn default_plancost_has_zero_startup() {
1439        let c = PlanCost::default();
1440        assert_eq!(c.startup_cost, 0.0);
1441        assert_eq!(c.total, 0.0);
1442    }
1443
1444    // ---------------------------------------------------------------
1445    // Perf 1.3: histogram + MCV plug-in into filter_selectivity
1446    // ---------------------------------------------------------------
1447
1448    use super::super::histogram::{ColumnValue, Histogram, MostCommonValues};
1449
1450    fn provider_with_skew() -> Arc<StaticProvider> {
1451        // Build a histogram where 80 of 100 values fall in [0, 9]
1452        // and the rest spread sparsely up to 1000. range_selectivity
1453        // for `<= 9` should be ~0.8, vastly beating the heuristic 0.3.
1454        let mut sample: Vec<ColumnValue> = Vec::new();
1455        for i in 0..80 {
1456            sample.push(ColumnValue::Int(i % 10));
1457        }
1458        for i in 0..20 {
1459            sample.push(ColumnValue::Int(10 + i * 50));
1460        }
1461        let h = Histogram::equi_depth_from_sample(sample, 10);
1462
1463        let mcv = MostCommonValues::new(vec![
1464            (ColumnValue::Text("boss".to_string()), 0.5),
1465            (ColumnValue::Text("intern".to_string()), 0.05),
1466        ]);
1467
1468        Arc::new(
1469            StaticProvider::new()
1470                .with_table(
1471                    "people",
1472                    TableStats {
1473                        row_count: 100_000,
1474                        avg_row_size: 64,
1475                        page_count: 100,
1476                        columns: vec![],
1477                    },
1478                )
1479                .with_histogram("people", "score", h)
1480                .with_mcv("people", "role", mcv),
1481        )
1482    }
1483
1484    #[test]
1485    fn eq_uses_mcv_when_value_is_tracked() {
1486        let provider = provider_with_skew();
1487        let estimator = CostEstimator::with_stats(provider);
1488        let filter = AstFilter::Compare {
1489            field: FieldRef::column("people", "role"),
1490            op: CompareOp::Eq,
1491            value: Value::text("boss".to_string()),
1492        };
1493        // MCV says "boss" is 50% of the table → selectivity 0.5,
1494        // not the 0.01 heuristic.
1495        let s = estimator.filter_selectivity(&filter, "people");
1496        assert!(
1497            (s - 0.5).abs() < 1e-9,
1498            "MCV-tracked equality should report exact frequency, got {s}"
1499        );
1500    }
1501
1502    #[test]
1503    fn eq_uses_residual_for_non_mcv_value() {
1504        let provider = provider_with_skew();
1505        let estimator = CostEstimator::with_stats(provider);
1506        let filter = AstFilter::Compare {
1507            field: FieldRef::column("people", "role"),
1508            op: CompareOp::Eq,
1509            value: Value::text("staff".to_string()),
1510        };
1511        // 1 - 0.55 (mcv totals) = 0.45 spread across (distinct - 2)
1512        // distinct values. We don't have an exact distinct count, so
1513        // the planner uses the default 100 → 0.45 / 98 ≈ 0.0046.
1514        let s = estimator.filter_selectivity(&filter, "people");
1515        assert!(s > 0.0 && s < 0.01, "residual eq should be tiny, got {s}");
1516    }
1517
1518    #[test]
1519    fn ne_is_one_minus_eq_under_mcv() {
1520        let provider = provider_with_skew();
1521        let estimator = CostEstimator::with_stats(provider);
1522        let filter = AstFilter::Compare {
1523            field: FieldRef::column("people", "role"),
1524            op: CompareOp::Ne,
1525            value: Value::text("boss".to_string()),
1526        };
1527        let s = estimator.filter_selectivity(&filter, "people");
1528        // 1 - 0.5 == 0.5
1529        assert!((s - 0.5).abs() < 1e-9, "Ne selectivity = 0.5, got {s}");
1530    }
1531
1532    #[test]
1533    fn range_uses_histogram_when_present() {
1534        let provider = provider_with_skew();
1535        let estimator = CostEstimator::with_stats(provider);
1536        let filter = AstFilter::Compare {
1537            field: FieldRef::column("people", "score"),
1538            op: CompareOp::Le,
1539            value: Value::Integer(9),
1540        };
1541        // Histogram says ~80% of values are in [0, 9], heuristic
1542        // would have said 0.3.
1543        let s = estimator.filter_selectivity(&filter, "people");
1544        assert!(
1545            s > 0.5,
1546            "histogram-based range selectivity should beat 0.3, got {s}"
1547        );
1548    }
1549
1550    #[test]
1551    fn between_uses_histogram() {
1552        let provider = provider_with_skew();
1553        let estimator = CostEstimator::with_stats(provider);
1554        let filter = AstFilter::Between {
1555            field: FieldRef::column("people", "score"),
1556            low: Value::Integer(0),
1557            high: Value::Integer(9),
1558        };
1559        let s = estimator.filter_selectivity(&filter, "people");
1560        assert!(s > 0.5, "BETWEEN should use histogram too, got {s}");
1561    }
1562
1563    #[test]
1564    fn graceful_fallback_when_histogram_absent() {
1565        // Provider has no histogram on `unknown_col` — must fall
1566        // through to the 0.3 heuristic without panicking.
1567        let provider = Arc::new(StaticProvider::new().with_table(
1568            "people",
1569            TableStats {
1570                row_count: 1000,
1571                avg_row_size: 64,
1572                page_count: 10,
1573                columns: vec![],
1574            },
1575        ));
1576        let estimator = CostEstimator::with_stats(provider);
1577        let filter = AstFilter::Compare {
1578            field: FieldRef::column("people", "unknown_col"),
1579            op: CompareOp::Lt,
1580            value: Value::Integer(50),
1581        };
1582        let s = estimator.filter_selectivity(&filter, "people");
1583        assert!((s - 0.3).abs() < 1e-9, "fallback heuristic 0.3, got {s}");
1584    }
1585}