Skip to main content

grafeo_engine/query/optimizer/
cardinality.rs

1//! Cardinality estimation for query optimization.
2//!
3//! Estimates the number of rows produced by each operator in a query plan.
4//!
5//! # Equi-Depth Histograms
6//!
7//! This module provides equi-depth histogram support for accurate selectivity
8//! estimation of range predicates. Unlike equi-width histograms that divide
9//! the value range into equal-sized buckets, equi-depth histograms divide
10//! the data into buckets with approximately equal numbers of rows.
11//!
12//! Benefits:
13//! - Better estimates for skewed data distributions
14//! - More accurate range selectivity than assuming uniform distribution
15//! - Adaptive to actual data characteristics
16
17use crate::query::plan::{
18    AggregateOp, BinaryOp, DistinctOp, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp,
19    LogicalExpression, LogicalOperator, MultiWayJoinOp, NodeScanOp, ProjectOp, SkipOp, SortOp,
20    UnaryOp, VectorJoinOp, VectorScanOp,
21};
22use std::collections::HashMap;
23
24/// A bucket in an equi-depth histogram.
25///
26/// Each bucket represents a range of values and the frequency of rows
27/// falling within that range. In an equi-depth histogram, all buckets
28/// contain approximately the same number of rows.
29#[derive(Debug, Clone)]
30pub struct HistogramBucket {
31    /// Lower bound of the bucket (inclusive).
32    pub lower_bound: f64,
33    /// Upper bound of the bucket (exclusive, except for the last bucket).
34    pub upper_bound: f64,
35    /// Number of rows in this bucket.
36    pub frequency: u64,
37    /// Number of distinct values in this bucket.
38    pub distinct_count: u64,
39}
40
41impl HistogramBucket {
42    /// Creates a new histogram bucket.
43    #[must_use]
44    pub fn new(lower_bound: f64, upper_bound: f64, frequency: u64, distinct_count: u64) -> Self {
45        Self {
46            lower_bound,
47            upper_bound,
48            frequency,
49            distinct_count,
50        }
51    }
52
53    /// Returns the width of this bucket.
54    #[must_use]
55    pub fn width(&self) -> f64 {
56        self.upper_bound - self.lower_bound
57    }
58
59    /// Checks if a value falls within this bucket.
60    #[must_use]
61    pub fn contains(&self, value: f64) -> bool {
62        value >= self.lower_bound && value < self.upper_bound
63    }
64
65    /// Returns the fraction of this bucket covered by the given range.
66    #[must_use]
67    pub fn overlap_fraction(&self, lower: Option<f64>, upper: Option<f64>) -> f64 {
68        let effective_lower = lower.unwrap_or(self.lower_bound).max(self.lower_bound);
69        let effective_upper = upper.unwrap_or(self.upper_bound).min(self.upper_bound);
70
71        let bucket_width = self.width();
72        if bucket_width <= 0.0 {
73            return if effective_lower <= self.lower_bound && effective_upper >= self.upper_bound {
74                1.0
75            } else {
76                0.0
77            };
78        }
79
80        let overlap = (effective_upper - effective_lower).max(0.0);
81        (overlap / bucket_width).min(1.0)
82    }
83}
84
85/// An equi-depth histogram for selectivity estimation.
86///
87/// Equi-depth histograms partition data into buckets where each bucket
88/// contains approximately the same number of rows. This provides more
89/// accurate selectivity estimates than assuming uniform distribution,
90/// especially for skewed data.
91///
92/// # Example
93///
94/// ```no_run
95/// use grafeo_engine::query::optimizer::cardinality::EquiDepthHistogram;
96///
97/// // Build a histogram from sorted values
98/// let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0];
99/// let histogram = EquiDepthHistogram::build(&values, 4);
100///
101/// // Estimate selectivity for age > 25
102/// let selectivity = histogram.range_selectivity(Some(25.0), None);
103/// ```
104#[derive(Debug, Clone)]
105pub struct EquiDepthHistogram {
106    /// The histogram buckets, sorted by lower_bound.
107    buckets: Vec<HistogramBucket>,
108    /// Total number of rows represented by this histogram.
109    total_rows: u64,
110}
111
112impl EquiDepthHistogram {
113    /// Creates a new histogram from pre-built buckets.
114    #[must_use]
115    pub fn new(buckets: Vec<HistogramBucket>) -> Self {
116        let total_rows = buckets.iter().map(|b| b.frequency).sum();
117        Self {
118            buckets,
119            total_rows,
120        }
121    }
122
123    /// Builds an equi-depth histogram from a sorted slice of values.
124    ///
125    /// # Arguments
126    /// * `values` - A sorted slice of numeric values
127    /// * `num_buckets` - The desired number of buckets
128    ///
129    /// # Returns
130    /// An equi-depth histogram with approximately equal row counts per bucket.
131    #[must_use]
132    pub fn build(values: &[f64], num_buckets: usize) -> Self {
133        if values.is_empty() || num_buckets == 0 {
134            return Self {
135                buckets: Vec::new(),
136                total_rows: 0,
137            };
138        }
139
140        let num_buckets = num_buckets.min(values.len());
141        let rows_per_bucket = (values.len() + num_buckets - 1) / num_buckets;
142        let mut buckets = Vec::with_capacity(num_buckets);
143
144        let mut start_idx = 0;
145        while start_idx < values.len() {
146            let end_idx = (start_idx + rows_per_bucket).min(values.len());
147            let lower_bound = values[start_idx];
148            let upper_bound = if end_idx < values.len() {
149                values[end_idx]
150            } else {
151                // For the last bucket, extend slightly beyond the max value
152                values[end_idx - 1] + 1.0
153            };
154
155            // Count distinct values in this bucket
156            let bucket_values = &values[start_idx..end_idx];
157            let distinct_count = count_distinct(bucket_values);
158
159            buckets.push(HistogramBucket::new(
160                lower_bound,
161                upper_bound,
162                (end_idx - start_idx) as u64,
163                distinct_count,
164            ));
165
166            start_idx = end_idx;
167        }
168
169        Self::new(buckets)
170    }
171
172    /// Returns the number of buckets in this histogram.
173    #[must_use]
174    pub fn num_buckets(&self) -> usize {
175        self.buckets.len()
176    }
177
178    /// Returns the total number of rows represented.
179    #[must_use]
180    pub fn total_rows(&self) -> u64 {
181        self.total_rows
182    }
183
184    /// Returns the histogram buckets.
185    #[must_use]
186    pub fn buckets(&self) -> &[HistogramBucket] {
187        &self.buckets
188    }
189
190    /// Estimates selectivity for a range predicate.
191    ///
192    /// # Arguments
193    /// * `lower` - Lower bound (None for unbounded)
194    /// * `upper` - Upper bound (None for unbounded)
195    ///
196    /// # Returns
197    /// Estimated fraction of rows matching the range (0.0 to 1.0).
198    #[must_use]
199    pub fn range_selectivity(&self, lower: Option<f64>, upper: Option<f64>) -> f64 {
200        if self.buckets.is_empty() || self.total_rows == 0 {
201            return 0.33; // Default fallback
202        }
203
204        let mut matching_rows = 0.0;
205
206        for bucket in &self.buckets {
207            // Check if this bucket overlaps with the range
208            let bucket_lower = bucket.lower_bound;
209            let bucket_upper = bucket.upper_bound;
210
211            // Skip buckets entirely outside the range
212            if let Some(l) = lower
213                && bucket_upper <= l
214            {
215                continue;
216            }
217            if let Some(u) = upper
218                && bucket_lower >= u
219            {
220                continue;
221            }
222
223            // Calculate the fraction of this bucket covered by the range
224            let overlap = bucket.overlap_fraction(lower, upper);
225            matching_rows += overlap * bucket.frequency as f64;
226        }
227
228        (matching_rows / self.total_rows as f64).clamp(0.0, 1.0)
229    }
230
231    /// Estimates selectivity for an equality predicate.
232    ///
233    /// Uses the distinct count within matching buckets for better accuracy.
234    #[must_use]
235    pub fn equality_selectivity(&self, value: f64) -> f64 {
236        if self.buckets.is_empty() || self.total_rows == 0 {
237            return 0.01; // Default fallback
238        }
239
240        // Find the bucket containing this value
241        for bucket in &self.buckets {
242            if bucket.contains(value) {
243                // Assume uniform distribution within bucket
244                if bucket.distinct_count > 0 {
245                    return (bucket.frequency as f64
246                        / bucket.distinct_count as f64
247                        / self.total_rows as f64)
248                        .min(1.0);
249                }
250            }
251        }
252
253        // Value not in any bucket - very low selectivity
254        0.001
255    }
256
257    /// Gets the minimum value in the histogram.
258    #[must_use]
259    pub fn min_value(&self) -> Option<f64> {
260        self.buckets.first().map(|b| b.lower_bound)
261    }
262
263    /// Gets the maximum value in the histogram.
264    #[must_use]
265    pub fn max_value(&self) -> Option<f64> {
266        self.buckets.last().map(|b| b.upper_bound)
267    }
268}
269
270/// Counts distinct values in a sorted slice.
271fn count_distinct(sorted_values: &[f64]) -> u64 {
272    if sorted_values.is_empty() {
273        return 0;
274    }
275
276    let mut count = 1u64;
277    let mut prev = sorted_values[0];
278
279    for &val in &sorted_values[1..] {
280        if (val - prev).abs() > f64::EPSILON {
281            count += 1;
282            prev = val;
283        }
284    }
285
286    count
287}
288
289/// Statistics for a table/label.
290#[derive(Debug, Clone)]
291pub struct TableStats {
292    /// Total number of rows.
293    pub row_count: u64,
294    /// Column statistics.
295    pub columns: HashMap<String, ColumnStats>,
296}
297
298impl TableStats {
299    /// Creates new table statistics.
300    #[must_use]
301    pub fn new(row_count: u64) -> Self {
302        Self {
303            row_count,
304            columns: HashMap::new(),
305        }
306    }
307
308    /// Adds column statistics.
309    pub fn with_column(mut self, name: &str, stats: ColumnStats) -> Self {
310        self.columns.insert(name.to_string(), stats);
311        self
312    }
313}
314
315/// Statistics for a column.
316#[derive(Debug, Clone)]
317pub struct ColumnStats {
318    /// Number of distinct values.
319    pub distinct_count: u64,
320    /// Number of null values.
321    pub null_count: u64,
322    /// Minimum value (if orderable).
323    pub min_value: Option<f64>,
324    /// Maximum value (if orderable).
325    pub max_value: Option<f64>,
326    /// Equi-depth histogram for accurate selectivity estimation.
327    pub histogram: Option<EquiDepthHistogram>,
328}
329
330impl ColumnStats {
331    /// Creates new column statistics.
332    #[must_use]
333    pub fn new(distinct_count: u64) -> Self {
334        Self {
335            distinct_count,
336            null_count: 0,
337            min_value: None,
338            max_value: None,
339            histogram: None,
340        }
341    }
342
343    /// Sets the null count.
344    #[must_use]
345    pub fn with_nulls(mut self, null_count: u64) -> Self {
346        self.null_count = null_count;
347        self
348    }
349
350    /// Sets the min/max range.
351    #[must_use]
352    pub fn with_range(mut self, min: f64, max: f64) -> Self {
353        self.min_value = Some(min);
354        self.max_value = Some(max);
355        self
356    }
357
358    /// Sets the equi-depth histogram for this column.
359    #[must_use]
360    pub fn with_histogram(mut self, histogram: EquiDepthHistogram) -> Self {
361        self.histogram = Some(histogram);
362        self
363    }
364
365    /// Builds column statistics with histogram from raw values.
366    ///
367    /// This is a convenience method that computes all statistics from the data.
368    ///
369    /// # Arguments
370    /// * `values` - The column values (will be sorted internally)
371    /// * `num_buckets` - Number of histogram buckets to create
372    #[must_use]
373    pub fn from_values(mut values: Vec<f64>, num_buckets: usize) -> Self {
374        if values.is_empty() {
375            return Self::new(0);
376        }
377
378        // Sort values for histogram building
379        values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
380
381        let min = values.first().copied();
382        let max = values.last().copied();
383        let distinct_count = count_distinct(&values);
384        let histogram = EquiDepthHistogram::build(&values, num_buckets);
385
386        Self {
387            distinct_count,
388            null_count: 0,
389            min_value: min,
390            max_value: max,
391            histogram: Some(histogram),
392        }
393    }
394}
395
396/// Configurable selectivity defaults for cardinality estimation.
397///
398/// Controls the assumed selectivity for various predicate types when
399/// histogram or column statistics are unavailable. Adjusting these
400/// values can improve plan quality for workloads with known skew.
401#[derive(Debug, Clone)]
402pub struct SelectivityConfig {
403    /// Selectivity for unknown predicates (default: 0.1).
404    pub default: f64,
405    /// Selectivity for equality predicates without stats (default: 0.01).
406    pub equality: f64,
407    /// Selectivity for inequality predicates (default: 0.99).
408    pub inequality: f64,
409    /// Selectivity for range predicates without stats (default: 0.33).
410    pub range: f64,
411    /// Selectivity for string operations: STARTS WITH, ENDS WITH, CONTAINS, LIKE (default: 0.1).
412    pub string_ops: f64,
413    /// Selectivity for IN membership (default: 0.1).
414    pub membership: f64,
415    /// Selectivity for IS NULL (default: 0.05).
416    pub is_null: f64,
417    /// Selectivity for IS NOT NULL (default: 0.95).
418    pub is_not_null: f64,
419    /// Fraction assumed distinct for DISTINCT operations (default: 0.5).
420    pub distinct_fraction: f64,
421}
422
423impl SelectivityConfig {
424    /// Creates a new config with standard database defaults.
425    #[must_use]
426    pub fn new() -> Self {
427        Self {
428            default: 0.1,
429            equality: 0.01,
430            inequality: 0.99,
431            range: 0.33,
432            string_ops: 0.1,
433            membership: 0.1,
434            is_null: 0.05,
435            is_not_null: 0.95,
436            distinct_fraction: 0.5,
437        }
438    }
439}
440
441impl Default for SelectivityConfig {
442    fn default() -> Self {
443        Self::new()
444    }
445}
446
447/// A single estimate-vs-actual observation for analysis.
448#[derive(Debug, Clone)]
449pub struct EstimationEntry {
450    /// Human-readable label for the operator (e.g., "NodeScan(Person)").
451    pub operator: String,
452    /// The cardinality estimate produced by the optimizer.
453    pub estimated: f64,
454    /// The actual row count observed at execution time.
455    pub actual: f64,
456}
457
458impl EstimationEntry {
459    /// Returns the estimation error ratio (actual / estimated).
460    ///
461    /// Values near 1.0 indicate accurate estimates.
462    /// Values > 1.0 indicate underestimation.
463    /// Values < 1.0 indicate overestimation.
464    #[must_use]
465    pub fn error_ratio(&self) -> f64 {
466        if self.estimated.abs() < f64::EPSILON {
467            if self.actual.abs() < f64::EPSILON {
468                1.0
469            } else {
470                f64::INFINITY
471            }
472        } else {
473            self.actual / self.estimated
474        }
475    }
476}
477
478/// Collects estimate vs actual cardinality data for query plan analysis.
479///
480/// After executing a query, call [`record()`](Self::record) for each
481/// operator with its estimated and actual cardinalities. Then use
482/// [`should_replan()`](Self::should_replan) to decide whether the plan
483/// should be re-optimized.
484#[derive(Debug, Clone, Default)]
485pub struct EstimationLog {
486    /// Recorded entries.
487    entries: Vec<EstimationEntry>,
488    /// Error ratio threshold that triggers re-planning (default: 10.0).
489    ///
490    /// If any operator's error ratio exceeds this, `should_replan()` returns true.
491    replan_threshold: f64,
492}
493
494impl EstimationLog {
495    /// Creates a new estimation log with the given re-planning threshold.
496    #[must_use]
497    pub fn new(replan_threshold: f64) -> Self {
498        Self {
499            entries: Vec::new(),
500            replan_threshold,
501        }
502    }
503
504    /// Records an estimate-vs-actual observation.
505    pub fn record(&mut self, operator: impl Into<String>, estimated: f64, actual: f64) {
506        self.entries.push(EstimationEntry {
507            operator: operator.into(),
508            estimated,
509            actual,
510        });
511    }
512
513    /// Returns all recorded entries.
514    #[must_use]
515    pub fn entries(&self) -> &[EstimationEntry] {
516        &self.entries
517    }
518
519    /// Returns whether any operator's estimation error exceeds the threshold,
520    /// indicating the plan should be re-optimized.
521    #[must_use]
522    pub fn should_replan(&self) -> bool {
523        self.entries.iter().any(|e| {
524            let ratio = e.error_ratio();
525            ratio > self.replan_threshold || ratio < 1.0 / self.replan_threshold
526        })
527    }
528
529    /// Returns the maximum error ratio across all entries.
530    #[must_use]
531    pub fn max_error_ratio(&self) -> f64 {
532        self.entries
533            .iter()
534            .map(|e| {
535                let r = e.error_ratio();
536                // Normalize so both over- and under-estimation are > 1.0
537                if r < 1.0 { 1.0 / r } else { r }
538            })
539            .fold(1.0_f64, f64::max)
540    }
541
542    /// Clears all entries.
543    pub fn clear(&mut self) {
544        self.entries.clear();
545    }
546}
547
548/// Cardinality estimator.
549pub struct CardinalityEstimator {
550    /// Statistics for each label/table.
551    table_stats: HashMap<String, TableStats>,
552    /// Default row count for unknown tables.
553    default_row_count: u64,
554    /// Default selectivity for unknown predicates.
555    default_selectivity: f64,
556    /// Average edge fanout (outgoing edges per node).
557    avg_fanout: f64,
558    /// Configurable selectivity defaults.
559    selectivity_config: SelectivityConfig,
560}
561
562impl CardinalityEstimator {
563    /// Creates a new cardinality estimator with default settings.
564    #[must_use]
565    pub fn new() -> Self {
566        let config = SelectivityConfig::new();
567        Self {
568            table_stats: HashMap::new(),
569            default_row_count: 1000,
570            default_selectivity: config.default,
571            avg_fanout: 10.0,
572            selectivity_config: config,
573        }
574    }
575
576    /// Creates a new cardinality estimator with custom selectivity configuration.
577    #[must_use]
578    pub fn with_selectivity_config(config: SelectivityConfig) -> Self {
579        Self {
580            table_stats: HashMap::new(),
581            default_row_count: 1000,
582            default_selectivity: config.default,
583            avg_fanout: 10.0,
584            selectivity_config: config,
585        }
586    }
587
588    /// Returns the current selectivity configuration.
589    #[must_use]
590    pub fn selectivity_config(&self) -> &SelectivityConfig {
591        &self.selectivity_config
592    }
593
594    /// Creates an estimation log with the default re-planning threshold (10x).
595    #[must_use]
596    pub fn create_estimation_log() -> EstimationLog {
597        EstimationLog::new(10.0)
598    }
599
600    /// Creates a cardinality estimator pre-populated from store statistics.
601    ///
602    /// Maps `LabelStatistics` to `TableStats` and computes the average edge
603    /// fanout from `EdgeTypeStatistics`. Falls back to defaults for any
604    /// missing statistics.
605    #[must_use]
606    pub fn from_statistics(stats: &grafeo_core::statistics::Statistics) -> Self {
607        let mut estimator = Self::new();
608
609        // Use total node count as default for unlabeled scans
610        if stats.total_nodes > 0 {
611            estimator.default_row_count = stats.total_nodes;
612        }
613
614        // Convert label statistics to optimizer table stats
615        for (label, label_stats) in &stats.labels {
616            let mut table_stats = TableStats::new(label_stats.node_count);
617
618            // Map property statistics (distinct count for selectivity estimation)
619            for (prop, col_stats) in &label_stats.properties {
620                let optimizer_col =
621                    ColumnStats::new(col_stats.distinct_count).with_nulls(col_stats.null_count);
622                table_stats = table_stats.with_column(prop, optimizer_col);
623            }
624
625            estimator.add_table_stats(label, table_stats);
626        }
627
628        // Compute average fanout from edge type statistics
629        if !stats.edge_types.is_empty() {
630            let total_out_degree: f64 = stats.edge_types.values().map(|e| e.avg_out_degree).sum();
631            estimator.avg_fanout = total_out_degree / stats.edge_types.len() as f64;
632        } else if stats.total_nodes > 0 {
633            estimator.avg_fanout = stats.total_edges as f64 / stats.total_nodes as f64;
634        }
635
636        // Clamp fanout to a reasonable minimum
637        if estimator.avg_fanout < 1.0 {
638            estimator.avg_fanout = 1.0;
639        }
640
641        estimator
642    }
643
644    /// Adds statistics for a table/label.
645    pub fn add_table_stats(&mut self, name: &str, stats: TableStats) {
646        self.table_stats.insert(name.to_string(), stats);
647    }
648
649    /// Sets the average edge fanout.
650    pub fn set_avg_fanout(&mut self, fanout: f64) {
651        self.avg_fanout = fanout;
652    }
653
654    /// Estimates the cardinality of a logical operator.
655    #[must_use]
656    pub fn estimate(&self, op: &LogicalOperator) -> f64 {
657        match op {
658            LogicalOperator::NodeScan(scan) => self.estimate_node_scan(scan),
659            LogicalOperator::Filter(filter) => self.estimate_filter(filter),
660            LogicalOperator::Project(project) => self.estimate_project(project),
661            LogicalOperator::Expand(expand) => self.estimate_expand(expand),
662            LogicalOperator::Join(join) => self.estimate_join(join),
663            LogicalOperator::Aggregate(agg) => self.estimate_aggregate(agg),
664            LogicalOperator::Sort(sort) => self.estimate_sort(sort),
665            LogicalOperator::Distinct(distinct) => self.estimate_distinct(distinct),
666            LogicalOperator::Limit(limit) => self.estimate_limit(limit),
667            LogicalOperator::Skip(skip) => self.estimate_skip(skip),
668            LogicalOperator::Return(ret) => self.estimate(&ret.input),
669            LogicalOperator::Empty => 0.0,
670            LogicalOperator::VectorScan(scan) => self.estimate_vector_scan(scan),
671            LogicalOperator::VectorJoin(join) => self.estimate_vector_join(join),
672            LogicalOperator::MultiWayJoin(mwj) => self.estimate_multi_way_join(mwj),
673            LogicalOperator::LeftJoin(lj) => self.estimate_left_join(lj),
674            _ => self.default_row_count as f64,
675        }
676    }
677
678    /// Estimates node scan cardinality.
679    fn estimate_node_scan(&self, scan: &NodeScanOp) -> f64 {
680        if let Some(label) = &scan.label
681            && let Some(stats) = self.table_stats.get(label)
682        {
683            return stats.row_count as f64;
684        }
685        // No label filter - scan all nodes
686        self.default_row_count as f64
687    }
688
689    /// Estimates filter cardinality.
690    fn estimate_filter(&self, filter: &FilterOp) -> f64 {
691        let input_cardinality = self.estimate(&filter.input);
692        let selectivity = self.estimate_selectivity(&filter.predicate);
693        (input_cardinality * selectivity).max(1.0)
694    }
695
696    /// Estimates projection cardinality (same as input).
697    fn estimate_project(&self, project: &ProjectOp) -> f64 {
698        self.estimate(&project.input)
699    }
700
701    /// Estimates expand cardinality.
702    fn estimate_expand(&self, expand: &ExpandOp) -> f64 {
703        let input_cardinality = self.estimate(&expand.input);
704
705        // Apply fanout based on edge type
706        let fanout = if !expand.edge_types.is_empty() {
707            // Specific edge type(s) typically have lower fanout
708            self.avg_fanout * 0.5
709        } else {
710            self.avg_fanout
711        };
712
713        // Handle variable-length paths
714        let path_multiplier = if expand.max_hops.unwrap_or(1) > 1 {
715            let min = expand.min_hops as f64;
716            let max = expand.max_hops.unwrap_or(expand.min_hops + 3) as f64;
717            // Geometric series approximation
718            (fanout.powf(max + 1.0) - fanout.powf(min)) / (fanout - 1.0)
719        } else {
720            fanout
721        };
722
723        (input_cardinality * path_multiplier).max(1.0)
724    }
725
726    /// Estimates join cardinality.
727    fn estimate_join(&self, join: &JoinOp) -> f64 {
728        let left_card = self.estimate(&join.left);
729        let right_card = self.estimate(&join.right);
730
731        match join.join_type {
732            JoinType::Cross => left_card * right_card,
733            JoinType::Inner => {
734                // Assume join selectivity based on conditions
735                let selectivity = if join.conditions.is_empty() {
736                    1.0 // Cross join
737                } else {
738                    // Estimate based on number of conditions
739                    0.1_f64.powi(join.conditions.len() as i32)
740                };
741                (left_card * right_card * selectivity).max(1.0)
742            }
743            JoinType::Left => {
744                // Left join returns at least all left rows
745                let inner_card = self.estimate_join(&JoinOp {
746                    left: join.left.clone(),
747                    right: join.right.clone(),
748                    join_type: JoinType::Inner,
749                    conditions: join.conditions.clone(),
750                });
751                inner_card.max(left_card)
752            }
753            JoinType::Right => {
754                // Right join returns at least all right rows
755                let inner_card = self.estimate_join(&JoinOp {
756                    left: join.left.clone(),
757                    right: join.right.clone(),
758                    join_type: JoinType::Inner,
759                    conditions: join.conditions.clone(),
760                });
761                inner_card.max(right_card)
762            }
763            JoinType::Full => {
764                // Full join returns at least max(left, right)
765                let inner_card = self.estimate_join(&JoinOp {
766                    left: join.left.clone(),
767                    right: join.right.clone(),
768                    join_type: JoinType::Inner,
769                    conditions: join.conditions.clone(),
770                });
771                inner_card.max(left_card.max(right_card))
772            }
773            JoinType::Semi => {
774                // Semi join returns at most left cardinality
775                (left_card * self.default_selectivity).max(1.0)
776            }
777            JoinType::Anti => {
778                // Anti join returns at most left cardinality
779                (left_card * (1.0 - self.default_selectivity)).max(1.0)
780            }
781        }
782    }
783
784    /// Estimates left join cardinality (OPTIONAL MATCH).
785    ///
786    /// A left outer join preserves all left rows, so the output is at least
787    /// `left_cardinality`. When the right side matches, the output may be
788    /// larger (one left row can match multiple right rows).
789    fn estimate_left_join(&self, lj: &LeftJoinOp) -> f64 {
790        let left_card = self.estimate(&lj.left);
791        let right_card = self.estimate(&lj.right);
792
793        // Estimate as inner join cardinality, but guaranteed at least left_card
794        let inner_estimate = left_card * right_card * self.default_selectivity;
795        inner_estimate.max(left_card).max(1.0)
796    }
797
798    /// Estimates aggregation cardinality.
799    fn estimate_aggregate(&self, agg: &AggregateOp) -> f64 {
800        let input_cardinality = self.estimate(&agg.input);
801
802        if agg.group_by.is_empty() {
803            // Global aggregation - single row
804            1.0
805        } else {
806            // Group by - estimate distinct groups
807            // Assume each group key reduces cardinality by 10
808            let group_reduction = 10.0_f64.powi(agg.group_by.len() as i32);
809            (input_cardinality / group_reduction).max(1.0)
810        }
811    }
812
813    /// Estimates sort cardinality (same as input).
814    fn estimate_sort(&self, sort: &SortOp) -> f64 {
815        self.estimate(&sort.input)
816    }
817
818    /// Estimates distinct cardinality.
819    fn estimate_distinct(&self, distinct: &DistinctOp) -> f64 {
820        let input_cardinality = self.estimate(&distinct.input);
821        (input_cardinality * self.selectivity_config.distinct_fraction).max(1.0)
822    }
823
824    /// Estimates limit cardinality.
825    fn estimate_limit(&self, limit: &LimitOp) -> f64 {
826        let input_cardinality = self.estimate(&limit.input);
827        limit.count.estimate().min(input_cardinality)
828    }
829
830    /// Estimates skip cardinality.
831    fn estimate_skip(&self, skip: &SkipOp) -> f64 {
832        let input_cardinality = self.estimate(&skip.input);
833        (input_cardinality - skip.count.estimate()).max(0.0)
834    }
835
836    /// Estimates vector scan cardinality.
837    ///
838    /// Vector scan returns at most k results (the k nearest neighbors).
839    /// With similarity/distance filters, it may return fewer.
840    fn estimate_vector_scan(&self, scan: &VectorScanOp) -> f64 {
841        let base_k = scan.k as f64;
842
843        // Apply filter selectivity if thresholds are specified
844        let selectivity = if scan.min_similarity.is_some() || scan.max_distance.is_some() {
845            // Assume 70% of results pass threshold filters
846            0.7
847        } else {
848            1.0
849        };
850
851        (base_k * selectivity).max(1.0)
852    }
853
854    /// Estimates vector join cardinality.
855    ///
856    /// Vector join produces up to k results per input row.
857    fn estimate_vector_join(&self, join: &VectorJoinOp) -> f64 {
858        let input_cardinality = self.estimate(&join.input);
859        let k = join.k as f64;
860
861        // Apply filter selectivity if thresholds are specified
862        let selectivity = if join.min_similarity.is_some() || join.max_distance.is_some() {
863            0.7
864        } else {
865            1.0
866        };
867
868        (input_cardinality * k * selectivity).max(1.0)
869    }
870
871    /// Estimates multi-way join cardinality using the AGM bound heuristic.
872    ///
873    /// For a cyclic join of N relations, the AGM (Atserias-Grohe-Marx) bound
874    /// gives min(cardinality)^(N/2) as a worst-case output size estimate.
875    fn estimate_multi_way_join(&self, mwj: &MultiWayJoinOp) -> f64 {
876        if mwj.inputs.is_empty() {
877            return 0.0;
878        }
879        let cardinalities: Vec<f64> = mwj
880            .inputs
881            .iter()
882            .map(|input| self.estimate(input))
883            .collect();
884        let min_card = cardinalities.iter().copied().fold(f64::INFINITY, f64::min);
885        let n = cardinalities.len() as f64;
886        // AGM bound: min(cardinality)^(n/2)
887        (min_card.powf(n / 2.0)).max(1.0)
888    }
889
890    /// Estimates the selectivity of a predicate (0.0 to 1.0).
891    fn estimate_selectivity(&self, expr: &LogicalExpression) -> f64 {
892        match expr {
893            LogicalExpression::Binary { left, op, right } => {
894                self.estimate_binary_selectivity(left, *op, right)
895            }
896            LogicalExpression::Unary { op, operand } => {
897                self.estimate_unary_selectivity(*op, operand)
898            }
899            LogicalExpression::Literal(value) => {
900                // Boolean literal
901                if let grafeo_common::types::Value::Bool(b) = value {
902                    if *b { 1.0 } else { 0.0 }
903                } else {
904                    self.default_selectivity
905                }
906            }
907            _ => self.default_selectivity,
908        }
909    }
910
911    /// Estimates binary expression selectivity.
912    fn estimate_binary_selectivity(
913        &self,
914        left: &LogicalExpression,
915        op: BinaryOp,
916        right: &LogicalExpression,
917    ) -> f64 {
918        match op {
919            // Equality - try histogram-based estimation
920            BinaryOp::Eq => {
921                if let Some(selectivity) = self.try_equality_selectivity(left, right) {
922                    return selectivity;
923                }
924                self.selectivity_config.equality
925            }
926            // Inequality is very unselective
927            BinaryOp::Ne => self.selectivity_config.inequality,
928            // Range predicates - use histogram if available
929            BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
930                if let Some(selectivity) = self.try_range_selectivity(left, op, right) {
931                    return selectivity;
932                }
933                self.selectivity_config.range
934            }
935            // Logical operators - recursively estimate sub-expressions
936            BinaryOp::And => {
937                let left_sel = self.estimate_selectivity(left);
938                let right_sel = self.estimate_selectivity(right);
939                // AND reduces selectivity (multiply assuming independence)
940                left_sel * right_sel
941            }
942            BinaryOp::Or => {
943                let left_sel = self.estimate_selectivity(left);
944                let right_sel = self.estimate_selectivity(right);
945                // OR: P(A ∪ B) = P(A) + P(B) - P(A ∩ B)
946                // Assuming independence: P(A ∩ B) = P(A) * P(B)
947                (left_sel + right_sel - left_sel * right_sel).min(1.0)
948            }
949            // String operations
950            BinaryOp::StartsWith | BinaryOp::EndsWith | BinaryOp::Contains | BinaryOp::Like => {
951                self.selectivity_config.string_ops
952            }
953            // Collection membership
954            BinaryOp::In => self.selectivity_config.membership,
955            // Other operations
956            _ => self.default_selectivity,
957        }
958    }
959
960    /// Tries to estimate equality selectivity using histograms.
961    fn try_equality_selectivity(
962        &self,
963        left: &LogicalExpression,
964        right: &LogicalExpression,
965    ) -> Option<f64> {
966        // Extract property access and literal value
967        let (label, column, value) = self.extract_column_and_value(left, right)?;
968
969        // Get column stats with histogram
970        let stats = self.get_column_stats(&label, &column)?;
971
972        // Try histogram-based estimation
973        if let Some(ref histogram) = stats.histogram {
974            return Some(histogram.equality_selectivity(value));
975        }
976
977        // Fall back to distinct count estimation
978        if stats.distinct_count > 0 {
979            return Some(1.0 / stats.distinct_count as f64);
980        }
981
982        None
983    }
984
985    /// Tries to estimate range selectivity using histograms.
986    fn try_range_selectivity(
987        &self,
988        left: &LogicalExpression,
989        op: BinaryOp,
990        right: &LogicalExpression,
991    ) -> Option<f64> {
992        // Extract property access and literal value
993        let (label, column, value) = self.extract_column_and_value(left, right)?;
994
995        // Get column stats
996        let stats = self.get_column_stats(&label, &column)?;
997
998        // Determine the range based on operator
999        let (lower, upper) = match op {
1000            BinaryOp::Lt => (None, Some(value)),
1001            BinaryOp::Le => (None, Some(value + f64::EPSILON)),
1002            BinaryOp::Gt => (Some(value + f64::EPSILON), None),
1003            BinaryOp::Ge => (Some(value), None),
1004            _ => return None,
1005        };
1006
1007        // Try histogram-based estimation first
1008        if let Some(ref histogram) = stats.histogram {
1009            return Some(histogram.range_selectivity(lower, upper));
1010        }
1011
1012        // Fall back to min/max range estimation
1013        if let (Some(min), Some(max)) = (stats.min_value, stats.max_value) {
1014            let range = max - min;
1015            if range <= 0.0 {
1016                return Some(1.0);
1017            }
1018
1019            let effective_lower = lower.unwrap_or(min).max(min);
1020            let effective_upper = upper.unwrap_or(max).min(max);
1021            let overlap = (effective_upper - effective_lower).max(0.0);
1022            return Some((overlap / range).clamp(0.0, 1.0));
1023        }
1024
1025        None
1026    }
1027
1028    /// Extracts column information and literal value from a comparison.
1029    ///
1030    /// Returns (label, column_name, numeric_value) if the expression is
1031    /// a comparison between a property access and a numeric literal.
1032    fn extract_column_and_value(
1033        &self,
1034        left: &LogicalExpression,
1035        right: &LogicalExpression,
1036    ) -> Option<(String, String, f64)> {
1037        // Try left as property, right as literal
1038        if let Some(result) = self.try_extract_property_literal(left, right) {
1039            return Some(result);
1040        }
1041
1042        // Try right as property, left as literal
1043        self.try_extract_property_literal(right, left)
1044    }
1045
1046    /// Tries to extract property and literal from a specific ordering.
1047    fn try_extract_property_literal(
1048        &self,
1049        property_expr: &LogicalExpression,
1050        literal_expr: &LogicalExpression,
1051    ) -> Option<(String, String, f64)> {
1052        // Extract property access
1053        let (variable, property) = match property_expr {
1054            LogicalExpression::Property { variable, property } => {
1055                (variable.clone(), property.clone())
1056            }
1057            _ => return None,
1058        };
1059
1060        // Extract numeric literal
1061        let value = match literal_expr {
1062            LogicalExpression::Literal(grafeo_common::types::Value::Int64(n)) => *n as f64,
1063            LogicalExpression::Literal(grafeo_common::types::Value::Float64(f)) => *f,
1064            _ => return None,
1065        };
1066
1067        // Try to find a label for this variable from table stats
1068        // Use the variable name as a heuristic label lookup
1069        // In practice, the optimizer would track which labels variables are bound to
1070        for label in self.table_stats.keys() {
1071            if let Some(stats) = self.table_stats.get(label)
1072                && stats.columns.contains_key(&property)
1073            {
1074                return Some((label.clone(), property, value));
1075            }
1076        }
1077
1078        // If no stats found but we have the property, return with variable as label
1079        Some((variable, property, value))
1080    }
1081
1082    /// Estimates unary expression selectivity.
1083    fn estimate_unary_selectivity(&self, op: UnaryOp, _operand: &LogicalExpression) -> f64 {
1084        match op {
1085            UnaryOp::Not => 1.0 - self.default_selectivity,
1086            UnaryOp::IsNull => self.selectivity_config.is_null,
1087            UnaryOp::IsNotNull => self.selectivity_config.is_not_null,
1088            UnaryOp::Neg => 1.0, // Negation doesn't change cardinality
1089        }
1090    }
1091
1092    /// Gets statistics for a column.
1093    fn get_column_stats(&self, label: &str, column: &str) -> Option<&ColumnStats> {
1094        self.table_stats.get(label)?.columns.get(column)
1095    }
1096}
1097
1098impl Default for CardinalityEstimator {
1099    fn default() -> Self {
1100        Self::new()
1101    }
1102}
1103
1104#[cfg(test)]
1105mod tests {
1106    use super::*;
1107    use crate::query::plan::{
1108        DistinctOp, ExpandDirection, ExpandOp, FilterOp, JoinCondition, NodeScanOp, PathMode,
1109        ProjectOp, Projection, ReturnItem, ReturnOp, SkipOp, SortKey, SortOp, SortOrder,
1110    };
1111    use grafeo_common::types::Value;
1112
1113    #[test]
1114    fn test_node_scan_with_stats() {
1115        let mut estimator = CardinalityEstimator::new();
1116        estimator.add_table_stats("Person", TableStats::new(5000));
1117
1118        let scan = LogicalOperator::NodeScan(NodeScanOp {
1119            variable: "n".to_string(),
1120            label: Some("Person".to_string()),
1121            input: None,
1122        });
1123
1124        let cardinality = estimator.estimate(&scan);
1125        assert!((cardinality - 5000.0).abs() < 0.001);
1126    }
1127
1128    #[test]
1129    fn test_filter_reduces_cardinality() {
1130        let mut estimator = CardinalityEstimator::new();
1131        estimator.add_table_stats("Person", TableStats::new(1000));
1132
1133        let filter = LogicalOperator::Filter(FilterOp {
1134            predicate: LogicalExpression::Binary {
1135                left: Box::new(LogicalExpression::Property {
1136                    variable: "n".to_string(),
1137                    property: "age".to_string(),
1138                }),
1139                op: BinaryOp::Eq,
1140                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1141            },
1142            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1143                variable: "n".to_string(),
1144                label: Some("Person".to_string()),
1145                input: None,
1146            })),
1147            pushdown_hint: None,
1148        });
1149
1150        let cardinality = estimator.estimate(&filter);
1151        // Equality selectivity is 0.01, so 1000 * 0.01 = 10
1152        assert!(cardinality < 1000.0);
1153        assert!(cardinality >= 1.0);
1154    }
1155
1156    #[test]
1157    fn test_join_cardinality() {
1158        let mut estimator = CardinalityEstimator::new();
1159        estimator.add_table_stats("Person", TableStats::new(1000));
1160        estimator.add_table_stats("Company", TableStats::new(100));
1161
1162        let join = LogicalOperator::Join(JoinOp {
1163            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1164                variable: "p".to_string(),
1165                label: Some("Person".to_string()),
1166                input: None,
1167            })),
1168            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1169                variable: "c".to_string(),
1170                label: Some("Company".to_string()),
1171                input: None,
1172            })),
1173            join_type: JoinType::Inner,
1174            conditions: vec![JoinCondition {
1175                left: LogicalExpression::Property {
1176                    variable: "p".to_string(),
1177                    property: "company_id".to_string(),
1178                },
1179                right: LogicalExpression::Property {
1180                    variable: "c".to_string(),
1181                    property: "id".to_string(),
1182                },
1183            }],
1184        });
1185
1186        let cardinality = estimator.estimate(&join);
1187        // Should be less than cross product
1188        assert!(cardinality < 1000.0 * 100.0);
1189    }
1190
1191    #[test]
1192    fn test_limit_caps_cardinality() {
1193        let mut estimator = CardinalityEstimator::new();
1194        estimator.add_table_stats("Person", TableStats::new(1000));
1195
1196        let limit = LogicalOperator::Limit(LimitOp {
1197            count: 10.into(),
1198            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1199                variable: "n".to_string(),
1200                label: Some("Person".to_string()),
1201                input: None,
1202            })),
1203        });
1204
1205        let cardinality = estimator.estimate(&limit);
1206        assert!((cardinality - 10.0).abs() < 0.001);
1207    }
1208
1209    #[test]
1210    fn test_aggregate_reduces_cardinality() {
1211        let mut estimator = CardinalityEstimator::new();
1212        estimator.add_table_stats("Person", TableStats::new(1000));
1213
1214        // Global aggregation
1215        let global_agg = LogicalOperator::Aggregate(AggregateOp {
1216            group_by: vec![],
1217            aggregates: vec![],
1218            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1219                variable: "n".to_string(),
1220                label: Some("Person".to_string()),
1221                input: None,
1222            })),
1223            having: None,
1224        });
1225
1226        let cardinality = estimator.estimate(&global_agg);
1227        assert!((cardinality - 1.0).abs() < 0.001);
1228
1229        // Group by aggregation
1230        let group_agg = LogicalOperator::Aggregate(AggregateOp {
1231            group_by: vec![LogicalExpression::Property {
1232                variable: "n".to_string(),
1233                property: "city".to_string(),
1234            }],
1235            aggregates: vec![],
1236            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1237                variable: "n".to_string(),
1238                label: Some("Person".to_string()),
1239                input: None,
1240            })),
1241            having: None,
1242        });
1243
1244        let cardinality = estimator.estimate(&group_agg);
1245        // Should be less than input
1246        assert!(cardinality < 1000.0);
1247    }
1248
1249    #[test]
1250    fn test_node_scan_without_stats() {
1251        let estimator = CardinalityEstimator::new();
1252
1253        let scan = LogicalOperator::NodeScan(NodeScanOp {
1254            variable: "n".to_string(),
1255            label: Some("Unknown".to_string()),
1256            input: None,
1257        });
1258
1259        let cardinality = estimator.estimate(&scan);
1260        // Should return default (1000)
1261        assert!((cardinality - 1000.0).abs() < 0.001);
1262    }
1263
1264    #[test]
1265    fn test_node_scan_no_label() {
1266        let estimator = CardinalityEstimator::new();
1267
1268        let scan = LogicalOperator::NodeScan(NodeScanOp {
1269            variable: "n".to_string(),
1270            label: None,
1271            input: None,
1272        });
1273
1274        let cardinality = estimator.estimate(&scan);
1275        // Should scan all nodes (default)
1276        assert!((cardinality - 1000.0).abs() < 0.001);
1277    }
1278
1279    #[test]
1280    fn test_filter_inequality_selectivity() {
1281        let mut estimator = CardinalityEstimator::new();
1282        estimator.add_table_stats("Person", TableStats::new(1000));
1283
1284        let filter = LogicalOperator::Filter(FilterOp {
1285            predicate: LogicalExpression::Binary {
1286                left: Box::new(LogicalExpression::Property {
1287                    variable: "n".to_string(),
1288                    property: "age".to_string(),
1289                }),
1290                op: BinaryOp::Ne,
1291                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1292            },
1293            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1294                variable: "n".to_string(),
1295                label: Some("Person".to_string()),
1296                input: None,
1297            })),
1298            pushdown_hint: None,
1299        });
1300
1301        let cardinality = estimator.estimate(&filter);
1302        // Inequality selectivity is 0.99, so 1000 * 0.99 = 990
1303        assert!(cardinality > 900.0);
1304    }
1305
1306    #[test]
1307    fn test_filter_range_selectivity() {
1308        let mut estimator = CardinalityEstimator::new();
1309        estimator.add_table_stats("Person", TableStats::new(1000));
1310
1311        let filter = LogicalOperator::Filter(FilterOp {
1312            predicate: LogicalExpression::Binary {
1313                left: Box::new(LogicalExpression::Property {
1314                    variable: "n".to_string(),
1315                    property: "age".to_string(),
1316                }),
1317                op: BinaryOp::Gt,
1318                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1319            },
1320            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1321                variable: "n".to_string(),
1322                label: Some("Person".to_string()),
1323                input: None,
1324            })),
1325            pushdown_hint: None,
1326        });
1327
1328        let cardinality = estimator.estimate(&filter);
1329        // Range selectivity is 0.33, so 1000 * 0.33 = 330
1330        assert!(cardinality < 500.0);
1331        assert!(cardinality > 100.0);
1332    }
1333
1334    #[test]
1335    fn test_filter_and_selectivity() {
1336        let mut estimator = CardinalityEstimator::new();
1337        estimator.add_table_stats("Person", TableStats::new(1000));
1338
1339        // Test AND with two equality predicates
1340        // Each equality has selectivity 0.01, so AND gives 0.01 * 0.01 = 0.0001
1341        let filter = LogicalOperator::Filter(FilterOp {
1342            predicate: LogicalExpression::Binary {
1343                left: Box::new(LogicalExpression::Binary {
1344                    left: Box::new(LogicalExpression::Property {
1345                        variable: "n".to_string(),
1346                        property: "city".to_string(),
1347                    }),
1348                    op: BinaryOp::Eq,
1349                    right: Box::new(LogicalExpression::Literal(Value::String("NYC".into()))),
1350                }),
1351                op: BinaryOp::And,
1352                right: Box::new(LogicalExpression::Binary {
1353                    left: Box::new(LogicalExpression::Property {
1354                        variable: "n".to_string(),
1355                        property: "age".to_string(),
1356                    }),
1357                    op: BinaryOp::Eq,
1358                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1359                }),
1360            },
1361            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1362                variable: "n".to_string(),
1363                label: Some("Person".to_string()),
1364                input: None,
1365            })),
1366            pushdown_hint: None,
1367        });
1368
1369        let cardinality = estimator.estimate(&filter);
1370        // AND reduces selectivity (multiply): 0.01 * 0.01 = 0.0001
1371        // 1000 * 0.0001 = 0.1, min is 1.0
1372        assert!(cardinality < 100.0);
1373        assert!(cardinality >= 1.0);
1374    }
1375
1376    #[test]
1377    fn test_filter_or_selectivity() {
1378        let mut estimator = CardinalityEstimator::new();
1379        estimator.add_table_stats("Person", TableStats::new(1000));
1380
1381        // Test OR with two equality predicates
1382        // Each equality has selectivity 0.01
1383        // OR gives: 0.01 + 0.01 - (0.01 * 0.01) = 0.0199
1384        let filter = LogicalOperator::Filter(FilterOp {
1385            predicate: LogicalExpression::Binary {
1386                left: Box::new(LogicalExpression::Binary {
1387                    left: Box::new(LogicalExpression::Property {
1388                        variable: "n".to_string(),
1389                        property: "city".to_string(),
1390                    }),
1391                    op: BinaryOp::Eq,
1392                    right: Box::new(LogicalExpression::Literal(Value::String("NYC".into()))),
1393                }),
1394                op: BinaryOp::Or,
1395                right: Box::new(LogicalExpression::Binary {
1396                    left: Box::new(LogicalExpression::Property {
1397                        variable: "n".to_string(),
1398                        property: "city".to_string(),
1399                    }),
1400                    op: BinaryOp::Eq,
1401                    right: Box::new(LogicalExpression::Literal(Value::String("LA".into()))),
1402                }),
1403            },
1404            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1405                variable: "n".to_string(),
1406                label: Some("Person".to_string()),
1407                input: None,
1408            })),
1409            pushdown_hint: None,
1410        });
1411
1412        let cardinality = estimator.estimate(&filter);
1413        // OR: 0.01 + 0.01 - 0.0001 ≈ 0.0199, so 1000 * 0.0199 ≈ 19.9
1414        assert!(cardinality < 100.0);
1415        assert!(cardinality >= 1.0);
1416    }
1417
1418    #[test]
1419    fn test_filter_literal_true() {
1420        let mut estimator = CardinalityEstimator::new();
1421        estimator.add_table_stats("Person", TableStats::new(1000));
1422
1423        let filter = LogicalOperator::Filter(FilterOp {
1424            predicate: LogicalExpression::Literal(Value::Bool(true)),
1425            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1426                variable: "n".to_string(),
1427                label: Some("Person".to_string()),
1428                input: None,
1429            })),
1430            pushdown_hint: None,
1431        });
1432
1433        let cardinality = estimator.estimate(&filter);
1434        // Literal true has selectivity 1.0
1435        assert!((cardinality - 1000.0).abs() < 0.001);
1436    }
1437
1438    #[test]
1439    fn test_filter_literal_false() {
1440        let mut estimator = CardinalityEstimator::new();
1441        estimator.add_table_stats("Person", TableStats::new(1000));
1442
1443        let filter = LogicalOperator::Filter(FilterOp {
1444            predicate: LogicalExpression::Literal(Value::Bool(false)),
1445            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1446                variable: "n".to_string(),
1447                label: Some("Person".to_string()),
1448                input: None,
1449            })),
1450            pushdown_hint: None,
1451        });
1452
1453        let cardinality = estimator.estimate(&filter);
1454        // Literal false has selectivity 0.0, but min is 1.0
1455        assert!((cardinality - 1.0).abs() < 0.001);
1456    }
1457
1458    #[test]
1459    fn test_unary_not_selectivity() {
1460        let mut estimator = CardinalityEstimator::new();
1461        estimator.add_table_stats("Person", TableStats::new(1000));
1462
1463        let filter = LogicalOperator::Filter(FilterOp {
1464            predicate: LogicalExpression::Unary {
1465                op: UnaryOp::Not,
1466                operand: Box::new(LogicalExpression::Literal(Value::Bool(true))),
1467            },
1468            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1469                variable: "n".to_string(),
1470                label: Some("Person".to_string()),
1471                input: None,
1472            })),
1473            pushdown_hint: None,
1474        });
1475
1476        let cardinality = estimator.estimate(&filter);
1477        // NOT inverts selectivity
1478        assert!(cardinality < 1000.0);
1479    }
1480
1481    #[test]
1482    fn test_unary_is_null_selectivity() {
1483        let mut estimator = CardinalityEstimator::new();
1484        estimator.add_table_stats("Person", TableStats::new(1000));
1485
1486        let filter = LogicalOperator::Filter(FilterOp {
1487            predicate: LogicalExpression::Unary {
1488                op: UnaryOp::IsNull,
1489                operand: Box::new(LogicalExpression::Variable("x".to_string())),
1490            },
1491            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1492                variable: "n".to_string(),
1493                label: Some("Person".to_string()),
1494                input: None,
1495            })),
1496            pushdown_hint: None,
1497        });
1498
1499        let cardinality = estimator.estimate(&filter);
1500        // IS NULL has selectivity 0.05
1501        assert!(cardinality < 100.0);
1502    }
1503
1504    #[test]
1505    fn test_expand_cardinality() {
1506        let mut estimator = CardinalityEstimator::new();
1507        estimator.add_table_stats("Person", TableStats::new(100));
1508
1509        let expand = LogicalOperator::Expand(ExpandOp {
1510            from_variable: "a".to_string(),
1511            to_variable: "b".to_string(),
1512            edge_variable: None,
1513            direction: ExpandDirection::Outgoing,
1514            edge_types: vec![],
1515            min_hops: 1,
1516            max_hops: Some(1),
1517            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1518                variable: "a".to_string(),
1519                label: Some("Person".to_string()),
1520                input: None,
1521            })),
1522            path_alias: None,
1523            path_mode: PathMode::Walk,
1524        });
1525
1526        let cardinality = estimator.estimate(&expand);
1527        // Expand multiplies by fanout (10)
1528        assert!(cardinality > 100.0);
1529    }
1530
1531    #[test]
1532    fn test_expand_with_edge_type_filter() {
1533        let mut estimator = CardinalityEstimator::new();
1534        estimator.add_table_stats("Person", TableStats::new(100));
1535
1536        let expand = LogicalOperator::Expand(ExpandOp {
1537            from_variable: "a".to_string(),
1538            to_variable: "b".to_string(),
1539            edge_variable: None,
1540            direction: ExpandDirection::Outgoing,
1541            edge_types: vec!["KNOWS".to_string()],
1542            min_hops: 1,
1543            max_hops: Some(1),
1544            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1545                variable: "a".to_string(),
1546                label: Some("Person".to_string()),
1547                input: None,
1548            })),
1549            path_alias: None,
1550            path_mode: PathMode::Walk,
1551        });
1552
1553        let cardinality = estimator.estimate(&expand);
1554        // With edge type, fanout is reduced by half
1555        assert!(cardinality > 100.0);
1556    }
1557
1558    #[test]
1559    fn test_expand_variable_length() {
1560        let mut estimator = CardinalityEstimator::new();
1561        estimator.add_table_stats("Person", TableStats::new(100));
1562
1563        let expand = LogicalOperator::Expand(ExpandOp {
1564            from_variable: "a".to_string(),
1565            to_variable: "b".to_string(),
1566            edge_variable: None,
1567            direction: ExpandDirection::Outgoing,
1568            edge_types: vec![],
1569            min_hops: 1,
1570            max_hops: Some(3),
1571            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1572                variable: "a".to_string(),
1573                label: Some("Person".to_string()),
1574                input: None,
1575            })),
1576            path_alias: None,
1577            path_mode: PathMode::Walk,
1578        });
1579
1580        let cardinality = estimator.estimate(&expand);
1581        // Variable length path has much higher cardinality
1582        assert!(cardinality > 500.0);
1583    }
1584
1585    #[test]
1586    fn test_join_cross_product() {
1587        let mut estimator = CardinalityEstimator::new();
1588        estimator.add_table_stats("Person", TableStats::new(100));
1589        estimator.add_table_stats("Company", TableStats::new(50));
1590
1591        let join = LogicalOperator::Join(JoinOp {
1592            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1593                variable: "p".to_string(),
1594                label: Some("Person".to_string()),
1595                input: None,
1596            })),
1597            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1598                variable: "c".to_string(),
1599                label: Some("Company".to_string()),
1600                input: None,
1601            })),
1602            join_type: JoinType::Cross,
1603            conditions: vec![],
1604        });
1605
1606        let cardinality = estimator.estimate(&join);
1607        // Cross join = 100 * 50 = 5000
1608        assert!((cardinality - 5000.0).abs() < 0.001);
1609    }
1610
1611    #[test]
1612    fn test_join_left_outer() {
1613        let mut estimator = CardinalityEstimator::new();
1614        estimator.add_table_stats("Person", TableStats::new(1000));
1615        estimator.add_table_stats("Company", TableStats::new(10));
1616
1617        let join = LogicalOperator::Join(JoinOp {
1618            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1619                variable: "p".to_string(),
1620                label: Some("Person".to_string()),
1621                input: None,
1622            })),
1623            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1624                variable: "c".to_string(),
1625                label: Some("Company".to_string()),
1626                input: None,
1627            })),
1628            join_type: JoinType::Left,
1629            conditions: vec![JoinCondition {
1630                left: LogicalExpression::Variable("p".to_string()),
1631                right: LogicalExpression::Variable("c".to_string()),
1632            }],
1633        });
1634
1635        let cardinality = estimator.estimate(&join);
1636        // Left join returns at least all left rows
1637        assert!(cardinality >= 1000.0);
1638    }
1639
1640    #[test]
1641    fn test_join_semi() {
1642        let mut estimator = CardinalityEstimator::new();
1643        estimator.add_table_stats("Person", TableStats::new(1000));
1644        estimator.add_table_stats("Company", TableStats::new(100));
1645
1646        let join = LogicalOperator::Join(JoinOp {
1647            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1648                variable: "p".to_string(),
1649                label: Some("Person".to_string()),
1650                input: None,
1651            })),
1652            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1653                variable: "c".to_string(),
1654                label: Some("Company".to_string()),
1655                input: None,
1656            })),
1657            join_type: JoinType::Semi,
1658            conditions: vec![],
1659        });
1660
1661        let cardinality = estimator.estimate(&join);
1662        // Semi join returns at most left cardinality
1663        assert!(cardinality <= 1000.0);
1664    }
1665
1666    #[test]
1667    fn test_join_anti() {
1668        let mut estimator = CardinalityEstimator::new();
1669        estimator.add_table_stats("Person", TableStats::new(1000));
1670        estimator.add_table_stats("Company", TableStats::new(100));
1671
1672        let join = LogicalOperator::Join(JoinOp {
1673            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1674                variable: "p".to_string(),
1675                label: Some("Person".to_string()),
1676                input: None,
1677            })),
1678            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1679                variable: "c".to_string(),
1680                label: Some("Company".to_string()),
1681                input: None,
1682            })),
1683            join_type: JoinType::Anti,
1684            conditions: vec![],
1685        });
1686
1687        let cardinality = estimator.estimate(&join);
1688        // Anti join returns at most left cardinality
1689        assert!(cardinality <= 1000.0);
1690        assert!(cardinality >= 1.0);
1691    }
1692
1693    #[test]
1694    fn test_project_preserves_cardinality() {
1695        let mut estimator = CardinalityEstimator::new();
1696        estimator.add_table_stats("Person", TableStats::new(1000));
1697
1698        let project = LogicalOperator::Project(ProjectOp {
1699            projections: vec![Projection {
1700                expression: LogicalExpression::Variable("n".to_string()),
1701                alias: None,
1702            }],
1703            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1704                variable: "n".to_string(),
1705                label: Some("Person".to_string()),
1706                input: None,
1707            })),
1708            pass_through_input: false,
1709        });
1710
1711        let cardinality = estimator.estimate(&project);
1712        assert!((cardinality - 1000.0).abs() < 0.001);
1713    }
1714
1715    #[test]
1716    fn test_sort_preserves_cardinality() {
1717        let mut estimator = CardinalityEstimator::new();
1718        estimator.add_table_stats("Person", TableStats::new(1000));
1719
1720        let sort = LogicalOperator::Sort(SortOp {
1721            keys: vec![SortKey {
1722                expression: LogicalExpression::Variable("n".to_string()),
1723                order: SortOrder::Ascending,
1724                nulls: None,
1725            }],
1726            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1727                variable: "n".to_string(),
1728                label: Some("Person".to_string()),
1729                input: None,
1730            })),
1731        });
1732
1733        let cardinality = estimator.estimate(&sort);
1734        assert!((cardinality - 1000.0).abs() < 0.001);
1735    }
1736
1737    #[test]
1738    fn test_distinct_reduces_cardinality() {
1739        let mut estimator = CardinalityEstimator::new();
1740        estimator.add_table_stats("Person", TableStats::new(1000));
1741
1742        let distinct = LogicalOperator::Distinct(DistinctOp {
1743            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1744                variable: "n".to_string(),
1745                label: Some("Person".to_string()),
1746                input: None,
1747            })),
1748            columns: None,
1749        });
1750
1751        let cardinality = estimator.estimate(&distinct);
1752        // Distinct assumes 50% unique
1753        assert!((cardinality - 500.0).abs() < 0.001);
1754    }
1755
1756    #[test]
1757    fn test_skip_reduces_cardinality() {
1758        let mut estimator = CardinalityEstimator::new();
1759        estimator.add_table_stats("Person", TableStats::new(1000));
1760
1761        let skip = LogicalOperator::Skip(SkipOp {
1762            count: 100.into(),
1763            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1764                variable: "n".to_string(),
1765                label: Some("Person".to_string()),
1766                input: None,
1767            })),
1768        });
1769
1770        let cardinality = estimator.estimate(&skip);
1771        assert!((cardinality - 900.0).abs() < 0.001);
1772    }
1773
1774    #[test]
1775    fn test_return_preserves_cardinality() {
1776        let mut estimator = CardinalityEstimator::new();
1777        estimator.add_table_stats("Person", TableStats::new(1000));
1778
1779        let ret = LogicalOperator::Return(ReturnOp {
1780            items: vec![ReturnItem {
1781                expression: LogicalExpression::Variable("n".to_string()),
1782                alias: None,
1783            }],
1784            distinct: false,
1785            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1786                variable: "n".to_string(),
1787                label: Some("Person".to_string()),
1788                input: None,
1789            })),
1790        });
1791
1792        let cardinality = estimator.estimate(&ret);
1793        assert!((cardinality - 1000.0).abs() < 0.001);
1794    }
1795
1796    #[test]
1797    fn test_empty_cardinality() {
1798        let estimator = CardinalityEstimator::new();
1799        let cardinality = estimator.estimate(&LogicalOperator::Empty);
1800        assert!((cardinality).abs() < 0.001);
1801    }
1802
1803    #[test]
1804    fn test_table_stats_with_column() {
1805        let stats = TableStats::new(1000).with_column(
1806            "age",
1807            ColumnStats::new(50).with_nulls(10).with_range(0.0, 100.0),
1808        );
1809
1810        assert_eq!(stats.row_count, 1000);
1811        let col = stats.columns.get("age").unwrap();
1812        assert_eq!(col.distinct_count, 50);
1813        assert_eq!(col.null_count, 10);
1814        assert!((col.min_value.unwrap() - 0.0).abs() < 0.001);
1815        assert!((col.max_value.unwrap() - 100.0).abs() < 0.001);
1816    }
1817
1818    #[test]
1819    fn test_estimator_default() {
1820        let estimator = CardinalityEstimator::default();
1821        let scan = LogicalOperator::NodeScan(NodeScanOp {
1822            variable: "n".to_string(),
1823            label: None,
1824            input: None,
1825        });
1826        let cardinality = estimator.estimate(&scan);
1827        assert!((cardinality - 1000.0).abs() < 0.001);
1828    }
1829
1830    #[test]
1831    fn test_set_avg_fanout() {
1832        let mut estimator = CardinalityEstimator::new();
1833        estimator.add_table_stats("Person", TableStats::new(100));
1834        estimator.set_avg_fanout(5.0);
1835
1836        let expand = LogicalOperator::Expand(ExpandOp {
1837            from_variable: "a".to_string(),
1838            to_variable: "b".to_string(),
1839            edge_variable: None,
1840            direction: ExpandDirection::Outgoing,
1841            edge_types: vec![],
1842            min_hops: 1,
1843            max_hops: Some(1),
1844            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1845                variable: "a".to_string(),
1846                label: Some("Person".to_string()),
1847                input: None,
1848            })),
1849            path_alias: None,
1850            path_mode: PathMode::Walk,
1851        });
1852
1853        let cardinality = estimator.estimate(&expand);
1854        // With fanout 5: 100 * 5 = 500
1855        assert!((cardinality - 500.0).abs() < 0.001);
1856    }
1857
1858    #[test]
1859    fn test_multiple_group_by_keys_reduce_cardinality() {
1860        // The current implementation uses a simplified model where more group by keys
1861        // results in greater reduction (dividing by 10^num_keys). This is a simplification
1862        // that works for most cases where group by keys are correlated.
1863        let mut estimator = CardinalityEstimator::new();
1864        estimator.add_table_stats("Person", TableStats::new(10000));
1865
1866        let single_group = LogicalOperator::Aggregate(AggregateOp {
1867            group_by: vec![LogicalExpression::Property {
1868                variable: "n".to_string(),
1869                property: "city".to_string(),
1870            }],
1871            aggregates: vec![],
1872            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1873                variable: "n".to_string(),
1874                label: Some("Person".to_string()),
1875                input: None,
1876            })),
1877            having: None,
1878        });
1879
1880        let multi_group = LogicalOperator::Aggregate(AggregateOp {
1881            group_by: vec![
1882                LogicalExpression::Property {
1883                    variable: "n".to_string(),
1884                    property: "city".to_string(),
1885                },
1886                LogicalExpression::Property {
1887                    variable: "n".to_string(),
1888                    property: "country".to_string(),
1889                },
1890            ],
1891            aggregates: vec![],
1892            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1893                variable: "n".to_string(),
1894                label: Some("Person".to_string()),
1895                input: None,
1896            })),
1897            having: None,
1898        });
1899
1900        let single_card = estimator.estimate(&single_group);
1901        let multi_card = estimator.estimate(&multi_group);
1902
1903        // Both should reduce cardinality from input
1904        assert!(single_card < 10000.0);
1905        assert!(multi_card < 10000.0);
1906        // Both should be at least 1
1907        assert!(single_card >= 1.0);
1908        assert!(multi_card >= 1.0);
1909    }
1910
1911    // ============= Histogram Tests =============
1912
1913    #[test]
1914    fn test_histogram_build_uniform() {
1915        // Build histogram from uniformly distributed data
1916        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1917        let histogram = EquiDepthHistogram::build(&values, 10);
1918
1919        assert_eq!(histogram.num_buckets(), 10);
1920        assert_eq!(histogram.total_rows(), 100);
1921
1922        // Each bucket should have approximately 10 rows
1923        for bucket in histogram.buckets() {
1924            assert!(bucket.frequency >= 9 && bucket.frequency <= 11);
1925        }
1926    }
1927
1928    #[test]
1929    fn test_histogram_build_skewed() {
1930        // Build histogram from skewed data (many small values, few large)
1931        let mut values: Vec<f64> = (0..80).map(|i| i as f64).collect();
1932        values.extend((0..20).map(|i| 1000.0 + i as f64));
1933        let histogram = EquiDepthHistogram::build(&values, 5);
1934
1935        assert_eq!(histogram.num_buckets(), 5);
1936        assert_eq!(histogram.total_rows(), 100);
1937
1938        // Each bucket should have ~20 rows despite skewed data
1939        for bucket in histogram.buckets() {
1940            assert!(bucket.frequency >= 18 && bucket.frequency <= 22);
1941        }
1942    }
1943
1944    #[test]
1945    fn test_histogram_range_selectivity_full() {
1946        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1947        let histogram = EquiDepthHistogram::build(&values, 10);
1948
1949        // Full range should have selectivity ~1.0
1950        let selectivity = histogram.range_selectivity(None, None);
1951        assert!((selectivity - 1.0).abs() < 0.01);
1952    }
1953
1954    #[test]
1955    fn test_histogram_range_selectivity_half() {
1956        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1957        let histogram = EquiDepthHistogram::build(&values, 10);
1958
1959        // Values >= 50 should be ~50% (half the data)
1960        let selectivity = histogram.range_selectivity(Some(50.0), None);
1961        assert!(selectivity > 0.4 && selectivity < 0.6);
1962    }
1963
1964    #[test]
1965    fn test_histogram_range_selectivity_quarter() {
1966        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1967        let histogram = EquiDepthHistogram::build(&values, 10);
1968
1969        // Values < 25 should be ~25%
1970        let selectivity = histogram.range_selectivity(None, Some(25.0));
1971        assert!(selectivity > 0.2 && selectivity < 0.3);
1972    }
1973
1974    #[test]
1975    fn test_histogram_equality_selectivity() {
1976        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1977        let histogram = EquiDepthHistogram::build(&values, 10);
1978
1979        // Equality on 100 distinct values should be ~1%
1980        let selectivity = histogram.equality_selectivity(50.0);
1981        assert!(selectivity > 0.005 && selectivity < 0.02);
1982    }
1983
1984    #[test]
1985    fn test_histogram_empty() {
1986        let histogram = EquiDepthHistogram::build(&[], 10);
1987
1988        assert_eq!(histogram.num_buckets(), 0);
1989        assert_eq!(histogram.total_rows(), 0);
1990
1991        // Default selectivity for empty histogram
1992        let selectivity = histogram.range_selectivity(Some(0.0), Some(100.0));
1993        assert!((selectivity - 0.33).abs() < 0.01);
1994    }
1995
1996    #[test]
1997    fn test_histogram_bucket_overlap() {
1998        let bucket = HistogramBucket::new(10.0, 20.0, 100, 10);
1999
2000        // Full overlap
2001        assert!((bucket.overlap_fraction(Some(10.0), Some(20.0)) - 1.0).abs() < 0.01);
2002
2003        // Half overlap (lower half)
2004        assert!((bucket.overlap_fraction(Some(10.0), Some(15.0)) - 0.5).abs() < 0.01);
2005
2006        // Half overlap (upper half)
2007        assert!((bucket.overlap_fraction(Some(15.0), Some(20.0)) - 0.5).abs() < 0.01);
2008
2009        // No overlap (below)
2010        assert!((bucket.overlap_fraction(Some(0.0), Some(5.0))).abs() < 0.01);
2011
2012        // No overlap (above)
2013        assert!((bucket.overlap_fraction(Some(25.0), Some(30.0))).abs() < 0.01);
2014    }
2015
2016    #[test]
2017    fn test_column_stats_from_values() {
2018        let values = vec![10.0, 20.0, 30.0, 40.0, 50.0, 20.0, 30.0, 40.0];
2019        let stats = ColumnStats::from_values(values, 4);
2020
2021        assert_eq!(stats.distinct_count, 5); // 10, 20, 30, 40, 50
2022        assert!(stats.min_value.is_some());
2023        assert!((stats.min_value.unwrap() - 10.0).abs() < 0.01);
2024        assert!(stats.max_value.is_some());
2025        assert!((stats.max_value.unwrap() - 50.0).abs() < 0.01);
2026        assert!(stats.histogram.is_some());
2027    }
2028
2029    #[test]
2030    fn test_column_stats_with_histogram_builder() {
2031        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2032        let histogram = EquiDepthHistogram::build(&values, 10);
2033
2034        let stats = ColumnStats::new(100)
2035            .with_range(0.0, 99.0)
2036            .with_histogram(histogram);
2037
2038        assert!(stats.histogram.is_some());
2039        assert_eq!(stats.histogram.as_ref().unwrap().num_buckets(), 10);
2040    }
2041
2042    #[test]
2043    fn test_filter_with_histogram_stats() {
2044        let mut estimator = CardinalityEstimator::new();
2045
2046        // Create stats with histogram for age column
2047        let age_values: Vec<f64> = (18..80).map(|i| i as f64).collect();
2048        let histogram = EquiDepthHistogram::build(&age_values, 10);
2049        let age_stats = ColumnStats::new(62)
2050            .with_range(18.0, 79.0)
2051            .with_histogram(histogram);
2052
2053        estimator.add_table_stats(
2054            "Person",
2055            TableStats::new(1000).with_column("age", age_stats),
2056        );
2057
2058        // Filter: age > 50
2059        // Age range is 18-79, so >50 is about (79-50)/(79-18) = 29/61 ≈ 47.5%
2060        let filter = LogicalOperator::Filter(FilterOp {
2061            predicate: LogicalExpression::Binary {
2062                left: Box::new(LogicalExpression::Property {
2063                    variable: "n".to_string(),
2064                    property: "age".to_string(),
2065                }),
2066                op: BinaryOp::Gt,
2067                right: Box::new(LogicalExpression::Literal(Value::Int64(50))),
2068            },
2069            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2070                variable: "n".to_string(),
2071                label: Some("Person".to_string()),
2072                input: None,
2073            })),
2074            pushdown_hint: None,
2075        });
2076
2077        let cardinality = estimator.estimate(&filter);
2078
2079        // With histogram, should get more accurate estimate than default 0.33
2080        // Expected: ~47.5% of 1000 = ~475
2081        assert!(cardinality > 300.0 && cardinality < 600.0);
2082    }
2083
2084    #[test]
2085    fn test_filter_equality_with_histogram() {
2086        let mut estimator = CardinalityEstimator::new();
2087
2088        // Create stats with histogram
2089        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2090        let histogram = EquiDepthHistogram::build(&values, 10);
2091        let stats = ColumnStats::new(100)
2092            .with_range(0.0, 99.0)
2093            .with_histogram(histogram);
2094
2095        estimator.add_table_stats("Data", TableStats::new(1000).with_column("value", stats));
2096
2097        // Filter: value = 50
2098        let filter = LogicalOperator::Filter(FilterOp {
2099            predicate: LogicalExpression::Binary {
2100                left: Box::new(LogicalExpression::Property {
2101                    variable: "d".to_string(),
2102                    property: "value".to_string(),
2103                }),
2104                op: BinaryOp::Eq,
2105                right: Box::new(LogicalExpression::Literal(Value::Int64(50))),
2106            },
2107            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2108                variable: "d".to_string(),
2109                label: Some("Data".to_string()),
2110                input: None,
2111            })),
2112            pushdown_hint: None,
2113        });
2114
2115        let cardinality = estimator.estimate(&filter);
2116
2117        // With 100 distinct values, selectivity should be ~1%
2118        // 1000 * 0.01 = 10
2119        assert!((1.0..50.0).contains(&cardinality));
2120    }
2121
2122    #[test]
2123    fn test_histogram_min_max() {
2124        let values: Vec<f64> = vec![5.0, 10.0, 15.0, 20.0, 25.0];
2125        let histogram = EquiDepthHistogram::build(&values, 2);
2126
2127        assert_eq!(histogram.min_value(), Some(5.0));
2128        // Max is the upper bound of the last bucket
2129        assert!(histogram.max_value().is_some());
2130    }
2131
2132    // ==================== SelectivityConfig Tests ====================
2133
2134    #[test]
2135    fn test_selectivity_config_defaults() {
2136        let config = SelectivityConfig::new();
2137        assert!((config.default - 0.1).abs() < f64::EPSILON);
2138        assert!((config.equality - 0.01).abs() < f64::EPSILON);
2139        assert!((config.inequality - 0.99).abs() < f64::EPSILON);
2140        assert!((config.range - 0.33).abs() < f64::EPSILON);
2141        assert!((config.string_ops - 0.1).abs() < f64::EPSILON);
2142        assert!((config.membership - 0.1).abs() < f64::EPSILON);
2143        assert!((config.is_null - 0.05).abs() < f64::EPSILON);
2144        assert!((config.is_not_null - 0.95).abs() < f64::EPSILON);
2145        assert!((config.distinct_fraction - 0.5).abs() < f64::EPSILON);
2146    }
2147
2148    #[test]
2149    fn test_custom_selectivity_config() {
2150        let config = SelectivityConfig {
2151            equality: 0.05,
2152            range: 0.25,
2153            ..SelectivityConfig::new()
2154        };
2155        let estimator = CardinalityEstimator::with_selectivity_config(config);
2156        assert!((estimator.selectivity_config().equality - 0.05).abs() < f64::EPSILON);
2157        assert!((estimator.selectivity_config().range - 0.25).abs() < f64::EPSILON);
2158    }
2159
2160    #[test]
2161    fn test_custom_selectivity_affects_estimation() {
2162        // Default: equality = 0.01 → 1000 * 0.01 = 10
2163        let mut default_est = CardinalityEstimator::new();
2164        default_est.add_table_stats("Person", TableStats::new(1000));
2165
2166        let filter = LogicalOperator::Filter(FilterOp {
2167            predicate: LogicalExpression::Binary {
2168                left: Box::new(LogicalExpression::Property {
2169                    variable: "n".to_string(),
2170                    property: "name".to_string(),
2171                }),
2172                op: BinaryOp::Eq,
2173                right: Box::new(LogicalExpression::Literal(Value::String("Alix".into()))),
2174            },
2175            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2176                variable: "n".to_string(),
2177                label: Some("Person".to_string()),
2178                input: None,
2179            })),
2180            pushdown_hint: None,
2181        });
2182
2183        let default_card = default_est.estimate(&filter);
2184
2185        // Custom: equality = 0.2 → 1000 * 0.2 = 200
2186        let config = SelectivityConfig {
2187            equality: 0.2,
2188            ..SelectivityConfig::new()
2189        };
2190        let mut custom_est = CardinalityEstimator::with_selectivity_config(config);
2191        custom_est.add_table_stats("Person", TableStats::new(1000));
2192
2193        let custom_card = custom_est.estimate(&filter);
2194
2195        assert!(custom_card > default_card);
2196        assert!((custom_card - 200.0).abs() < 1.0);
2197    }
2198
2199    #[test]
2200    fn test_custom_range_selectivity() {
2201        let config = SelectivityConfig {
2202            range: 0.5,
2203            ..SelectivityConfig::new()
2204        };
2205        let mut estimator = CardinalityEstimator::with_selectivity_config(config);
2206        estimator.add_table_stats("Person", TableStats::new(1000));
2207
2208        let filter = LogicalOperator::Filter(FilterOp {
2209            predicate: LogicalExpression::Binary {
2210                left: Box::new(LogicalExpression::Property {
2211                    variable: "n".to_string(),
2212                    property: "age".to_string(),
2213                }),
2214                op: BinaryOp::Gt,
2215                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2216            },
2217            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2218                variable: "n".to_string(),
2219                label: Some("Person".to_string()),
2220                input: None,
2221            })),
2222            pushdown_hint: None,
2223        });
2224
2225        let cardinality = estimator.estimate(&filter);
2226        // 1000 * 0.5 = 500
2227        assert!((cardinality - 500.0).abs() < 1.0);
2228    }
2229
2230    #[test]
2231    fn test_custom_distinct_fraction() {
2232        let config = SelectivityConfig {
2233            distinct_fraction: 0.8,
2234            ..SelectivityConfig::new()
2235        };
2236        let mut estimator = CardinalityEstimator::with_selectivity_config(config);
2237        estimator.add_table_stats("Person", TableStats::new(1000));
2238
2239        let distinct = LogicalOperator::Distinct(DistinctOp {
2240            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2241                variable: "n".to_string(),
2242                label: Some("Person".to_string()),
2243                input: None,
2244            })),
2245            columns: None,
2246        });
2247
2248        let cardinality = estimator.estimate(&distinct);
2249        // 1000 * 0.8 = 800
2250        assert!((cardinality - 800.0).abs() < 1.0);
2251    }
2252
2253    // ==================== EstimationLog Tests ====================
2254
2255    #[test]
2256    fn test_estimation_log_basic() {
2257        let mut log = EstimationLog::new(10.0);
2258        log.record("NodeScan(Person)", 1000.0, 1200.0);
2259        log.record("Filter(age > 30)", 100.0, 90.0);
2260
2261        assert_eq!(log.entries().len(), 2);
2262        assert!(!log.should_replan()); // 1.2x and 0.9x are within 10x threshold
2263    }
2264
2265    #[test]
2266    fn test_estimation_log_triggers_replan() {
2267        let mut log = EstimationLog::new(10.0);
2268        log.record("NodeScan(Person)", 100.0, 5000.0); // 50x underestimate
2269
2270        assert!(log.should_replan());
2271    }
2272
2273    #[test]
2274    fn test_estimation_log_overestimate_triggers_replan() {
2275        let mut log = EstimationLog::new(5.0);
2276        log.record("Filter", 1000.0, 100.0); // 10x overestimate → ratio = 0.1
2277
2278        assert!(log.should_replan()); // 0.1 < 1/5 = 0.2
2279    }
2280
2281    #[test]
2282    fn test_estimation_entry_error_ratio() {
2283        let entry = EstimationEntry {
2284            operator: "test".into(),
2285            estimated: 100.0,
2286            actual: 200.0,
2287        };
2288        assert!((entry.error_ratio() - 2.0).abs() < f64::EPSILON);
2289
2290        let perfect = EstimationEntry {
2291            operator: "test".into(),
2292            estimated: 100.0,
2293            actual: 100.0,
2294        };
2295        assert!((perfect.error_ratio() - 1.0).abs() < f64::EPSILON);
2296
2297        let zero_est = EstimationEntry {
2298            operator: "test".into(),
2299            estimated: 0.0,
2300            actual: 0.0,
2301        };
2302        assert!((zero_est.error_ratio() - 1.0).abs() < f64::EPSILON);
2303    }
2304
2305    #[test]
2306    fn test_estimation_log_max_error_ratio() {
2307        let mut log = EstimationLog::new(10.0);
2308        log.record("A", 100.0, 300.0); // 3x
2309        log.record("B", 100.0, 50.0); // 2x (normalized: 1/0.5 = 2)
2310        log.record("C", 100.0, 100.0); // 1x
2311
2312        assert!((log.max_error_ratio() - 3.0).abs() < f64::EPSILON);
2313    }
2314
2315    #[test]
2316    fn test_estimation_log_clear() {
2317        let mut log = EstimationLog::new(10.0);
2318        log.record("A", 100.0, 100.0);
2319        assert_eq!(log.entries().len(), 1);
2320
2321        log.clear();
2322        assert!(log.entries().is_empty());
2323        assert!(!log.should_replan());
2324    }
2325
2326    #[test]
2327    fn test_create_estimation_log() {
2328        let log = CardinalityEstimator::create_estimation_log();
2329        assert!(log.entries().is_empty());
2330        assert!(!log.should_replan());
2331    }
2332}