Skip to main content

grafeo_engine/query/optimizer/
cardinality.rs

1//! Cardinality estimation for query optimization.
2//!
3//! Estimates the number of rows produced by each operator in a query plan.
4//!
5//! # Equi-Depth Histograms
6//!
7//! This module provides equi-depth histogram support for accurate selectivity
8//! estimation of range predicates. Unlike equi-width histograms that divide
9//! the value range into equal-sized buckets, equi-depth histograms divide
10//! the data into buckets with approximately equal numbers of rows.
11//!
12//! Benefits:
13//! - Better estimates for skewed data distributions
14//! - More accurate range selectivity than assuming uniform distribution
15//! - Adaptive to actual data characteristics
16
17use crate::query::plan::{
18    AggregateOp, BinaryOp, DistinctOp, ExpandOp, FilterOp, JoinOp, JoinType, LimitOp,
19    LogicalExpression, LogicalOperator, NodeScanOp, ProjectOp, SkipOp, SortOp, UnaryOp,
20    VectorJoinOp, VectorScanOp,
21};
22use std::collections::HashMap;
23
24/// A bucket in an equi-depth histogram.
25///
26/// Each bucket represents a range of values and the frequency of rows
27/// falling within that range. In an equi-depth histogram, all buckets
28/// contain approximately the same number of rows.
29#[derive(Debug, Clone)]
30pub struct HistogramBucket {
31    /// Lower bound of the bucket (inclusive).
32    pub lower_bound: f64,
33    /// Upper bound of the bucket (exclusive, except for the last bucket).
34    pub upper_bound: f64,
35    /// Number of rows in this bucket.
36    pub frequency: u64,
37    /// Number of distinct values in this bucket.
38    pub distinct_count: u64,
39}
40
41impl HistogramBucket {
42    /// Creates a new histogram bucket.
43    #[must_use]
44    pub fn new(lower_bound: f64, upper_bound: f64, frequency: u64, distinct_count: u64) -> Self {
45        Self {
46            lower_bound,
47            upper_bound,
48            frequency,
49            distinct_count,
50        }
51    }
52
53    /// Returns the width of this bucket.
54    #[must_use]
55    pub fn width(&self) -> f64 {
56        self.upper_bound - self.lower_bound
57    }
58
59    /// Checks if a value falls within this bucket.
60    #[must_use]
61    pub fn contains(&self, value: f64) -> bool {
62        value >= self.lower_bound && value < self.upper_bound
63    }
64
65    /// Returns the fraction of this bucket covered by the given range.
66    #[must_use]
67    pub fn overlap_fraction(&self, lower: Option<f64>, upper: Option<f64>) -> f64 {
68        let effective_lower = lower.unwrap_or(self.lower_bound).max(self.lower_bound);
69        let effective_upper = upper.unwrap_or(self.upper_bound).min(self.upper_bound);
70
71        let bucket_width = self.width();
72        if bucket_width <= 0.0 {
73            return if effective_lower <= self.lower_bound && effective_upper >= self.upper_bound {
74                1.0
75            } else {
76                0.0
77            };
78        }
79
80        let overlap = (effective_upper - effective_lower).max(0.0);
81        (overlap / bucket_width).min(1.0)
82    }
83}
84
85/// An equi-depth histogram for selectivity estimation.
86///
87/// Equi-depth histograms partition data into buckets where each bucket
88/// contains approximately the same number of rows. This provides more
89/// accurate selectivity estimates than assuming uniform distribution,
90/// especially for skewed data.
91///
92/// # Example
93///
94/// ```ignore
95/// use grafeo_engine::query::optimizer::cardinality::EquiDepthHistogram;
96///
97/// // Build a histogram from sorted values
98/// let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0];
99/// let histogram = EquiDepthHistogram::build(&values, 4);
100///
101/// // Estimate selectivity for age > 25
102/// let selectivity = histogram.range_selectivity(Some(25.0), None);
103/// ```
104#[derive(Debug, Clone)]
105pub struct EquiDepthHistogram {
106    /// The histogram buckets, sorted by lower_bound.
107    buckets: Vec<HistogramBucket>,
108    /// Total number of rows represented by this histogram.
109    total_rows: u64,
110}
111
112impl EquiDepthHistogram {
113    /// Creates a new histogram from pre-built buckets.
114    #[must_use]
115    pub fn new(buckets: Vec<HistogramBucket>) -> Self {
116        let total_rows = buckets.iter().map(|b| b.frequency).sum();
117        Self {
118            buckets,
119            total_rows,
120        }
121    }
122
123    /// Builds an equi-depth histogram from a sorted slice of values.
124    ///
125    /// # Arguments
126    /// * `values` - A sorted slice of numeric values
127    /// * `num_buckets` - The desired number of buckets
128    ///
129    /// # Returns
130    /// An equi-depth histogram with approximately equal row counts per bucket.
131    #[must_use]
132    pub fn build(values: &[f64], num_buckets: usize) -> Self {
133        if values.is_empty() || num_buckets == 0 {
134            return Self {
135                buckets: Vec::new(),
136                total_rows: 0,
137            };
138        }
139
140        let num_buckets = num_buckets.min(values.len());
141        let rows_per_bucket = (values.len() + num_buckets - 1) / num_buckets;
142        let mut buckets = Vec::with_capacity(num_buckets);
143
144        let mut start_idx = 0;
145        while start_idx < values.len() {
146            let end_idx = (start_idx + rows_per_bucket).min(values.len());
147            let lower_bound = values[start_idx];
148            let upper_bound = if end_idx < values.len() {
149                values[end_idx]
150            } else {
151                // For the last bucket, extend slightly beyond the max value
152                values[end_idx - 1] + 1.0
153            };
154
155            // Count distinct values in this bucket
156            let bucket_values = &values[start_idx..end_idx];
157            let distinct_count = count_distinct(bucket_values);
158
159            buckets.push(HistogramBucket::new(
160                lower_bound,
161                upper_bound,
162                (end_idx - start_idx) as u64,
163                distinct_count,
164            ));
165
166            start_idx = end_idx;
167        }
168
169        Self::new(buckets)
170    }
171
172    /// Returns the number of buckets in this histogram.
173    #[must_use]
174    pub fn num_buckets(&self) -> usize {
175        self.buckets.len()
176    }
177
178    /// Returns the total number of rows represented.
179    #[must_use]
180    pub fn total_rows(&self) -> u64 {
181        self.total_rows
182    }
183
184    /// Returns the histogram buckets.
185    #[must_use]
186    pub fn buckets(&self) -> &[HistogramBucket] {
187        &self.buckets
188    }
189
190    /// Estimates selectivity for a range predicate.
191    ///
192    /// # Arguments
193    /// * `lower` - Lower bound (None for unbounded)
194    /// * `upper` - Upper bound (None for unbounded)
195    ///
196    /// # Returns
197    /// Estimated fraction of rows matching the range (0.0 to 1.0).
198    #[must_use]
199    pub fn range_selectivity(&self, lower: Option<f64>, upper: Option<f64>) -> f64 {
200        if self.buckets.is_empty() || self.total_rows == 0 {
201            return 0.33; // Default fallback
202        }
203
204        let mut matching_rows = 0.0;
205
206        for bucket in &self.buckets {
207            // Check if this bucket overlaps with the range
208            let bucket_lower = bucket.lower_bound;
209            let bucket_upper = bucket.upper_bound;
210
211            // Skip buckets entirely outside the range
212            if let Some(l) = lower
213                && bucket_upper <= l
214            {
215                continue;
216            }
217            if let Some(u) = upper
218                && bucket_lower >= u
219            {
220                continue;
221            }
222
223            // Calculate the fraction of this bucket covered by the range
224            let overlap = bucket.overlap_fraction(lower, upper);
225            matching_rows += overlap * bucket.frequency as f64;
226        }
227
228        (matching_rows / self.total_rows as f64).min(1.0).max(0.0)
229    }
230
231    /// Estimates selectivity for an equality predicate.
232    ///
233    /// Uses the distinct count within matching buckets for better accuracy.
234    #[must_use]
235    pub fn equality_selectivity(&self, value: f64) -> f64 {
236        if self.buckets.is_empty() || self.total_rows == 0 {
237            return 0.01; // Default fallback
238        }
239
240        // Find the bucket containing this value
241        for bucket in &self.buckets {
242            if bucket.contains(value) {
243                // Assume uniform distribution within bucket
244                if bucket.distinct_count > 0 {
245                    return (bucket.frequency as f64
246                        / bucket.distinct_count as f64
247                        / self.total_rows as f64)
248                        .min(1.0);
249                }
250            }
251        }
252
253        // Value not in any bucket - very low selectivity
254        0.001
255    }
256
257    /// Gets the minimum value in the histogram.
258    #[must_use]
259    pub fn min_value(&self) -> Option<f64> {
260        self.buckets.first().map(|b| b.lower_bound)
261    }
262
263    /// Gets the maximum value in the histogram.
264    #[must_use]
265    pub fn max_value(&self) -> Option<f64> {
266        self.buckets.last().map(|b| b.upper_bound)
267    }
268}
269
270/// Counts distinct values in a sorted slice.
271fn count_distinct(sorted_values: &[f64]) -> u64 {
272    if sorted_values.is_empty() {
273        return 0;
274    }
275
276    let mut count = 1u64;
277    let mut prev = sorted_values[0];
278
279    for &val in &sorted_values[1..] {
280        if (val - prev).abs() > f64::EPSILON {
281            count += 1;
282            prev = val;
283        }
284    }
285
286    count
287}
288
289/// Statistics for a table/label.
290#[derive(Debug, Clone)]
291pub struct TableStats {
292    /// Total number of rows.
293    pub row_count: u64,
294    /// Column statistics.
295    pub columns: HashMap<String, ColumnStats>,
296}
297
298impl TableStats {
299    /// Creates new table statistics.
300    #[must_use]
301    pub fn new(row_count: u64) -> Self {
302        Self {
303            row_count,
304            columns: HashMap::new(),
305        }
306    }
307
308    /// Adds column statistics.
309    pub fn with_column(mut self, name: &str, stats: ColumnStats) -> Self {
310        self.columns.insert(name.to_string(), stats);
311        self
312    }
313}
314
315/// Statistics for a column.
316#[derive(Debug, Clone)]
317pub struct ColumnStats {
318    /// Number of distinct values.
319    pub distinct_count: u64,
320    /// Number of null values.
321    pub null_count: u64,
322    /// Minimum value (if orderable).
323    pub min_value: Option<f64>,
324    /// Maximum value (if orderable).
325    pub max_value: Option<f64>,
326    /// Equi-depth histogram for accurate selectivity estimation.
327    pub histogram: Option<EquiDepthHistogram>,
328}
329
330impl ColumnStats {
331    /// Creates new column statistics.
332    #[must_use]
333    pub fn new(distinct_count: u64) -> Self {
334        Self {
335            distinct_count,
336            null_count: 0,
337            min_value: None,
338            max_value: None,
339            histogram: None,
340        }
341    }
342
343    /// Sets the null count.
344    #[must_use]
345    pub fn with_nulls(mut self, null_count: u64) -> Self {
346        self.null_count = null_count;
347        self
348    }
349
350    /// Sets the min/max range.
351    #[must_use]
352    pub fn with_range(mut self, min: f64, max: f64) -> Self {
353        self.min_value = Some(min);
354        self.max_value = Some(max);
355        self
356    }
357
358    /// Sets the equi-depth histogram for this column.
359    #[must_use]
360    pub fn with_histogram(mut self, histogram: EquiDepthHistogram) -> Self {
361        self.histogram = Some(histogram);
362        self
363    }
364
365    /// Builds column statistics with histogram from raw values.
366    ///
367    /// This is a convenience method that computes all statistics from the data.
368    ///
369    /// # Arguments
370    /// * `values` - The column values (will be sorted internally)
371    /// * `num_buckets` - Number of histogram buckets to create
372    #[must_use]
373    pub fn from_values(mut values: Vec<f64>, num_buckets: usize) -> Self {
374        if values.is_empty() {
375            return Self::new(0);
376        }
377
378        // Sort values for histogram building
379        values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
380
381        let min = values.first().copied();
382        let max = values.last().copied();
383        let distinct_count = count_distinct(&values);
384        let histogram = EquiDepthHistogram::build(&values, num_buckets);
385
386        Self {
387            distinct_count,
388            null_count: 0,
389            min_value: min,
390            max_value: max,
391            histogram: Some(histogram),
392        }
393    }
394}
395
396/// Configurable selectivity defaults for cardinality estimation.
397///
398/// Controls the assumed selectivity for various predicate types when
399/// histogram or column statistics are unavailable. Adjusting these
400/// values can improve plan quality for workloads with known skew.
401#[derive(Debug, Clone)]
402pub struct SelectivityConfig {
403    /// Selectivity for unknown predicates (default: 0.1).
404    pub default: f64,
405    /// Selectivity for equality predicates without stats (default: 0.01).
406    pub equality: f64,
407    /// Selectivity for inequality predicates (default: 0.99).
408    pub inequality: f64,
409    /// Selectivity for range predicates without stats (default: 0.33).
410    pub range: f64,
411    /// Selectivity for string operations: STARTS WITH, ENDS WITH, CONTAINS, LIKE (default: 0.1).
412    pub string_ops: f64,
413    /// Selectivity for IN membership (default: 0.1).
414    pub membership: f64,
415    /// Selectivity for IS NULL (default: 0.05).
416    pub is_null: f64,
417    /// Selectivity for IS NOT NULL (default: 0.95).
418    pub is_not_null: f64,
419    /// Fraction assumed distinct for DISTINCT operations (default: 0.5).
420    pub distinct_fraction: f64,
421}
422
423impl SelectivityConfig {
424    /// Creates a new config with standard database defaults.
425    #[must_use]
426    pub fn new() -> Self {
427        Self {
428            default: 0.1,
429            equality: 0.01,
430            inequality: 0.99,
431            range: 0.33,
432            string_ops: 0.1,
433            membership: 0.1,
434            is_null: 0.05,
435            is_not_null: 0.95,
436            distinct_fraction: 0.5,
437        }
438    }
439}
440
441impl Default for SelectivityConfig {
442    fn default() -> Self {
443        Self::new()
444    }
445}
446
447/// A single estimate-vs-actual observation for analysis.
448#[derive(Debug, Clone)]
449pub struct EstimationEntry {
450    /// Human-readable label for the operator (e.g., "NodeScan(Person)").
451    pub operator: String,
452    /// The cardinality estimate produced by the optimizer.
453    pub estimated: f64,
454    /// The actual row count observed at execution time.
455    pub actual: f64,
456}
457
458impl EstimationEntry {
459    /// Returns the estimation error ratio (actual / estimated).
460    ///
461    /// Values near 1.0 indicate accurate estimates.
462    /// Values > 1.0 indicate underestimation.
463    /// Values < 1.0 indicate overestimation.
464    #[must_use]
465    pub fn error_ratio(&self) -> f64 {
466        if self.estimated.abs() < f64::EPSILON {
467            if self.actual.abs() < f64::EPSILON {
468                1.0
469            } else {
470                f64::INFINITY
471            }
472        } else {
473            self.actual / self.estimated
474        }
475    }
476}
477
478/// Collects estimate vs actual cardinality data for query plan analysis.
479///
480/// After executing a query, call [`record()`](Self::record) for each
481/// operator with its estimated and actual cardinalities. Then use
482/// [`should_replan()`](Self::should_replan) to decide whether the plan
483/// should be re-optimized.
484#[derive(Debug, Clone, Default)]
485pub struct EstimationLog {
486    /// Recorded entries.
487    entries: Vec<EstimationEntry>,
488    /// Error ratio threshold that triggers re-planning (default: 10.0).
489    ///
490    /// If any operator's error ratio exceeds this, `should_replan()` returns true.
491    replan_threshold: f64,
492}
493
494impl EstimationLog {
495    /// Creates a new estimation log with the given re-planning threshold.
496    #[must_use]
497    pub fn new(replan_threshold: f64) -> Self {
498        Self {
499            entries: Vec::new(),
500            replan_threshold,
501        }
502    }
503
504    /// Records an estimate-vs-actual observation.
505    pub fn record(&mut self, operator: impl Into<String>, estimated: f64, actual: f64) {
506        self.entries.push(EstimationEntry {
507            operator: operator.into(),
508            estimated,
509            actual,
510        });
511    }
512
513    /// Returns all recorded entries.
514    #[must_use]
515    pub fn entries(&self) -> &[EstimationEntry] {
516        &self.entries
517    }
518
519    /// Returns whether any operator's estimation error exceeds the threshold,
520    /// indicating the plan should be re-optimized.
521    #[must_use]
522    pub fn should_replan(&self) -> bool {
523        self.entries.iter().any(|e| {
524            let ratio = e.error_ratio();
525            ratio > self.replan_threshold || ratio < 1.0 / self.replan_threshold
526        })
527    }
528
529    /// Returns the maximum error ratio across all entries.
530    #[must_use]
531    pub fn max_error_ratio(&self) -> f64 {
532        self.entries
533            .iter()
534            .map(|e| {
535                let r = e.error_ratio();
536                // Normalize so both over- and under-estimation are > 1.0
537                if r < 1.0 { 1.0 / r } else { r }
538            })
539            .fold(1.0_f64, f64::max)
540    }
541
542    /// Clears all entries.
543    pub fn clear(&mut self) {
544        self.entries.clear();
545    }
546}
547
548/// Cardinality estimator.
549pub struct CardinalityEstimator {
550    /// Statistics for each label/table.
551    table_stats: HashMap<String, TableStats>,
552    /// Default row count for unknown tables.
553    default_row_count: u64,
554    /// Default selectivity for unknown predicates.
555    default_selectivity: f64,
556    /// Average edge fanout (outgoing edges per node).
557    avg_fanout: f64,
558    /// Configurable selectivity defaults.
559    selectivity_config: SelectivityConfig,
560}
561
562impl CardinalityEstimator {
563    /// Creates a new cardinality estimator with default settings.
564    #[must_use]
565    pub fn new() -> Self {
566        let config = SelectivityConfig::new();
567        Self {
568            table_stats: HashMap::new(),
569            default_row_count: 1000,
570            default_selectivity: config.default,
571            avg_fanout: 10.0,
572            selectivity_config: config,
573        }
574    }
575
576    /// Creates a new cardinality estimator with custom selectivity configuration.
577    #[must_use]
578    pub fn with_selectivity_config(config: SelectivityConfig) -> Self {
579        Self {
580            table_stats: HashMap::new(),
581            default_row_count: 1000,
582            default_selectivity: config.default,
583            avg_fanout: 10.0,
584            selectivity_config: config,
585        }
586    }
587
588    /// Returns the current selectivity configuration.
589    #[must_use]
590    pub fn selectivity_config(&self) -> &SelectivityConfig {
591        &self.selectivity_config
592    }
593
594    /// Creates an estimation log with the default re-planning threshold (10x).
595    #[must_use]
596    pub fn create_estimation_log() -> EstimationLog {
597        EstimationLog::new(10.0)
598    }
599
600    /// Creates a cardinality estimator pre-populated from store statistics.
601    ///
602    /// Maps `LabelStatistics` to `TableStats` and computes the average edge
603    /// fanout from `EdgeTypeStatistics`. Falls back to defaults for any
604    /// missing statistics.
605    #[must_use]
606    pub fn from_statistics(stats: &grafeo_core::statistics::Statistics) -> Self {
607        let mut estimator = Self::new();
608
609        // Use total node count as default for unlabeled scans
610        if stats.total_nodes > 0 {
611            estimator.default_row_count = stats.total_nodes;
612        }
613
614        // Convert label statistics to optimizer table stats
615        for (label, label_stats) in &stats.labels {
616            let mut table_stats = TableStats::new(label_stats.node_count);
617
618            // Map property statistics (distinct count for selectivity estimation)
619            for (prop, col_stats) in &label_stats.properties {
620                let optimizer_col =
621                    ColumnStats::new(col_stats.distinct_count).with_nulls(col_stats.null_count);
622                table_stats = table_stats.with_column(prop, optimizer_col);
623            }
624
625            estimator.add_table_stats(label, table_stats);
626        }
627
628        // Compute average fanout from edge type statistics
629        if !stats.edge_types.is_empty() {
630            let total_out_degree: f64 = stats.edge_types.values().map(|e| e.avg_out_degree).sum();
631            estimator.avg_fanout = total_out_degree / stats.edge_types.len() as f64;
632        } else if stats.total_nodes > 0 {
633            estimator.avg_fanout = stats.total_edges as f64 / stats.total_nodes as f64;
634        }
635
636        // Clamp fanout to a reasonable minimum
637        if estimator.avg_fanout < 1.0 {
638            estimator.avg_fanout = 1.0;
639        }
640
641        estimator
642    }
643
644    /// Adds statistics for a table/label.
645    pub fn add_table_stats(&mut self, name: &str, stats: TableStats) {
646        self.table_stats.insert(name.to_string(), stats);
647    }
648
649    /// Sets the average edge fanout.
650    pub fn set_avg_fanout(&mut self, fanout: f64) {
651        self.avg_fanout = fanout;
652    }
653
654    /// Estimates the cardinality of a logical operator.
655    #[must_use]
656    pub fn estimate(&self, op: &LogicalOperator) -> f64 {
657        match op {
658            LogicalOperator::NodeScan(scan) => self.estimate_node_scan(scan),
659            LogicalOperator::Filter(filter) => self.estimate_filter(filter),
660            LogicalOperator::Project(project) => self.estimate_project(project),
661            LogicalOperator::Expand(expand) => self.estimate_expand(expand),
662            LogicalOperator::Join(join) => self.estimate_join(join),
663            LogicalOperator::Aggregate(agg) => self.estimate_aggregate(agg),
664            LogicalOperator::Sort(sort) => self.estimate_sort(sort),
665            LogicalOperator::Distinct(distinct) => self.estimate_distinct(distinct),
666            LogicalOperator::Limit(limit) => self.estimate_limit(limit),
667            LogicalOperator::Skip(skip) => self.estimate_skip(skip),
668            LogicalOperator::Return(ret) => self.estimate(&ret.input),
669            LogicalOperator::Empty => 0.0,
670            LogicalOperator::VectorScan(scan) => self.estimate_vector_scan(scan),
671            LogicalOperator::VectorJoin(join) => self.estimate_vector_join(join),
672            _ => self.default_row_count as f64,
673        }
674    }
675
676    /// Estimates node scan cardinality.
677    fn estimate_node_scan(&self, scan: &NodeScanOp) -> f64 {
678        if let Some(label) = &scan.label
679            && let Some(stats) = self.table_stats.get(label)
680        {
681            return stats.row_count as f64;
682        }
683        // No label filter - scan all nodes
684        self.default_row_count as f64
685    }
686
687    /// Estimates filter cardinality.
688    fn estimate_filter(&self, filter: &FilterOp) -> f64 {
689        let input_cardinality = self.estimate(&filter.input);
690        let selectivity = self.estimate_selectivity(&filter.predicate);
691        (input_cardinality * selectivity).max(1.0)
692    }
693
694    /// Estimates projection cardinality (same as input).
695    fn estimate_project(&self, project: &ProjectOp) -> f64 {
696        self.estimate(&project.input)
697    }
698
699    /// Estimates expand cardinality.
700    fn estimate_expand(&self, expand: &ExpandOp) -> f64 {
701        let input_cardinality = self.estimate(&expand.input);
702
703        // Apply fanout based on edge type
704        let fanout = if expand.edge_type.is_some() {
705            // Specific edge type typically has lower fanout
706            self.avg_fanout * 0.5
707        } else {
708            self.avg_fanout
709        };
710
711        // Handle variable-length paths
712        let path_multiplier = if expand.max_hops.unwrap_or(1) > 1 {
713            let min = expand.min_hops as f64;
714            let max = expand.max_hops.unwrap_or(expand.min_hops + 3) as f64;
715            // Geometric series approximation
716            (fanout.powf(max + 1.0) - fanout.powf(min)) / (fanout - 1.0)
717        } else {
718            fanout
719        };
720
721        (input_cardinality * path_multiplier).max(1.0)
722    }
723
724    /// Estimates join cardinality.
725    fn estimate_join(&self, join: &JoinOp) -> f64 {
726        let left_card = self.estimate(&join.left);
727        let right_card = self.estimate(&join.right);
728
729        match join.join_type {
730            JoinType::Cross => left_card * right_card,
731            JoinType::Inner => {
732                // Assume join selectivity based on conditions
733                let selectivity = if join.conditions.is_empty() {
734                    1.0 // Cross join
735                } else {
736                    // Estimate based on number of conditions
737                    0.1_f64.powi(join.conditions.len() as i32)
738                };
739                (left_card * right_card * selectivity).max(1.0)
740            }
741            JoinType::Left => {
742                // Left join returns at least all left rows
743                let inner_card = self.estimate_join(&JoinOp {
744                    left: join.left.clone(),
745                    right: join.right.clone(),
746                    join_type: JoinType::Inner,
747                    conditions: join.conditions.clone(),
748                });
749                inner_card.max(left_card)
750            }
751            JoinType::Right => {
752                // Right join returns at least all right rows
753                let inner_card = self.estimate_join(&JoinOp {
754                    left: join.left.clone(),
755                    right: join.right.clone(),
756                    join_type: JoinType::Inner,
757                    conditions: join.conditions.clone(),
758                });
759                inner_card.max(right_card)
760            }
761            JoinType::Full => {
762                // Full join returns at least max(left, right)
763                let inner_card = self.estimate_join(&JoinOp {
764                    left: join.left.clone(),
765                    right: join.right.clone(),
766                    join_type: JoinType::Inner,
767                    conditions: join.conditions.clone(),
768                });
769                inner_card.max(left_card.max(right_card))
770            }
771            JoinType::Semi => {
772                // Semi join returns at most left cardinality
773                (left_card * self.default_selectivity).max(1.0)
774            }
775            JoinType::Anti => {
776                // Anti join returns at most left cardinality
777                (left_card * (1.0 - self.default_selectivity)).max(1.0)
778            }
779        }
780    }
781
782    /// Estimates aggregation cardinality.
783    fn estimate_aggregate(&self, agg: &AggregateOp) -> f64 {
784        let input_cardinality = self.estimate(&agg.input);
785
786        if agg.group_by.is_empty() {
787            // Global aggregation - single row
788            1.0
789        } else {
790            // Group by - estimate distinct groups
791            // Assume each group key reduces cardinality by 10
792            let group_reduction = 10.0_f64.powi(agg.group_by.len() as i32);
793            (input_cardinality / group_reduction).max(1.0)
794        }
795    }
796
797    /// Estimates sort cardinality (same as input).
798    fn estimate_sort(&self, sort: &SortOp) -> f64 {
799        self.estimate(&sort.input)
800    }
801
802    /// Estimates distinct cardinality.
803    fn estimate_distinct(&self, distinct: &DistinctOp) -> f64 {
804        let input_cardinality = self.estimate(&distinct.input);
805        (input_cardinality * self.selectivity_config.distinct_fraction).max(1.0)
806    }
807
808    /// Estimates limit cardinality.
809    fn estimate_limit(&self, limit: &LimitOp) -> f64 {
810        let input_cardinality = self.estimate(&limit.input);
811        (limit.count as f64).min(input_cardinality)
812    }
813
814    /// Estimates skip cardinality.
815    fn estimate_skip(&self, skip: &SkipOp) -> f64 {
816        let input_cardinality = self.estimate(&skip.input);
817        (input_cardinality - skip.count as f64).max(0.0)
818    }
819
820    /// Estimates vector scan cardinality.
821    ///
822    /// Vector scan returns at most k results (the k nearest neighbors).
823    /// With similarity/distance filters, it may return fewer.
824    fn estimate_vector_scan(&self, scan: &VectorScanOp) -> f64 {
825        let base_k = scan.k as f64;
826
827        // Apply filter selectivity if thresholds are specified
828        let selectivity = if scan.min_similarity.is_some() || scan.max_distance.is_some() {
829            // Assume 70% of results pass threshold filters
830            0.7
831        } else {
832            1.0
833        };
834
835        (base_k * selectivity).max(1.0)
836    }
837
838    /// Estimates vector join cardinality.
839    ///
840    /// Vector join produces up to k results per input row.
841    fn estimate_vector_join(&self, join: &VectorJoinOp) -> f64 {
842        let input_cardinality = self.estimate(&join.input);
843        let k = join.k as f64;
844
845        // Apply filter selectivity if thresholds are specified
846        let selectivity = if join.min_similarity.is_some() || join.max_distance.is_some() {
847            0.7
848        } else {
849            1.0
850        };
851
852        (input_cardinality * k * selectivity).max(1.0)
853    }
854
855    /// Estimates the selectivity of a predicate (0.0 to 1.0).
856    fn estimate_selectivity(&self, expr: &LogicalExpression) -> f64 {
857        match expr {
858            LogicalExpression::Binary { left, op, right } => {
859                self.estimate_binary_selectivity(left, *op, right)
860            }
861            LogicalExpression::Unary { op, operand } => {
862                self.estimate_unary_selectivity(*op, operand)
863            }
864            LogicalExpression::Literal(value) => {
865                // Boolean literal
866                if let grafeo_common::types::Value::Bool(b) = value {
867                    if *b { 1.0 } else { 0.0 }
868                } else {
869                    self.default_selectivity
870                }
871            }
872            _ => self.default_selectivity,
873        }
874    }
875
876    /// Estimates binary expression selectivity.
877    fn estimate_binary_selectivity(
878        &self,
879        left: &LogicalExpression,
880        op: BinaryOp,
881        right: &LogicalExpression,
882    ) -> f64 {
883        match op {
884            // Equality - try histogram-based estimation
885            BinaryOp::Eq => {
886                if let Some(selectivity) = self.try_equality_selectivity(left, right) {
887                    return selectivity;
888                }
889                self.selectivity_config.equality
890            }
891            // Inequality is very unselective
892            BinaryOp::Ne => self.selectivity_config.inequality,
893            // Range predicates - use histogram if available
894            BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
895                if let Some(selectivity) = self.try_range_selectivity(left, op, right) {
896                    return selectivity;
897                }
898                self.selectivity_config.range
899            }
900            // Logical operators - recursively estimate sub-expressions
901            BinaryOp::And => {
902                let left_sel = self.estimate_selectivity(left);
903                let right_sel = self.estimate_selectivity(right);
904                // AND reduces selectivity (multiply assuming independence)
905                left_sel * right_sel
906            }
907            BinaryOp::Or => {
908                let left_sel = self.estimate_selectivity(left);
909                let right_sel = self.estimate_selectivity(right);
910                // OR: P(A ∪ B) = P(A) + P(B) - P(A ∩ B)
911                // Assuming independence: P(A ∩ B) = P(A) * P(B)
912                (left_sel + right_sel - left_sel * right_sel).min(1.0)
913            }
914            // String operations
915            BinaryOp::StartsWith | BinaryOp::EndsWith | BinaryOp::Contains | BinaryOp::Like => {
916                self.selectivity_config.string_ops
917            }
918            // Collection membership
919            BinaryOp::In => self.selectivity_config.membership,
920            // Other operations
921            _ => self.default_selectivity,
922        }
923    }
924
925    /// Tries to estimate equality selectivity using histograms.
926    fn try_equality_selectivity(
927        &self,
928        left: &LogicalExpression,
929        right: &LogicalExpression,
930    ) -> Option<f64> {
931        // Extract property access and literal value
932        let (label, column, value) = self.extract_column_and_value(left, right)?;
933
934        // Get column stats with histogram
935        let stats = self.get_column_stats(&label, &column)?;
936
937        // Try histogram-based estimation
938        if let Some(ref histogram) = stats.histogram {
939            return Some(histogram.equality_selectivity(value));
940        }
941
942        // Fall back to distinct count estimation
943        if stats.distinct_count > 0 {
944            return Some(1.0 / stats.distinct_count as f64);
945        }
946
947        None
948    }
949
950    /// Tries to estimate range selectivity using histograms.
951    fn try_range_selectivity(
952        &self,
953        left: &LogicalExpression,
954        op: BinaryOp,
955        right: &LogicalExpression,
956    ) -> Option<f64> {
957        // Extract property access and literal value
958        let (label, column, value) = self.extract_column_and_value(left, right)?;
959
960        // Get column stats
961        let stats = self.get_column_stats(&label, &column)?;
962
963        // Determine the range based on operator
964        let (lower, upper) = match op {
965            BinaryOp::Lt => (None, Some(value)),
966            BinaryOp::Le => (None, Some(value + f64::EPSILON)),
967            BinaryOp::Gt => (Some(value + f64::EPSILON), None),
968            BinaryOp::Ge => (Some(value), None),
969            _ => return None,
970        };
971
972        // Try histogram-based estimation first
973        if let Some(ref histogram) = stats.histogram {
974            return Some(histogram.range_selectivity(lower, upper));
975        }
976
977        // Fall back to min/max range estimation
978        if let (Some(min), Some(max)) = (stats.min_value, stats.max_value) {
979            let range = max - min;
980            if range <= 0.0 {
981                return Some(1.0);
982            }
983
984            let effective_lower = lower.unwrap_or(min).max(min);
985            let effective_upper = upper.unwrap_or(max).min(max);
986            let overlap = (effective_upper - effective_lower).max(0.0);
987            return Some((overlap / range).min(1.0).max(0.0));
988        }
989
990        None
991    }
992
993    /// Extracts column information and literal value from a comparison.
994    ///
995    /// Returns (label, column_name, numeric_value) if the expression is
996    /// a comparison between a property access and a numeric literal.
997    fn extract_column_and_value(
998        &self,
999        left: &LogicalExpression,
1000        right: &LogicalExpression,
1001    ) -> Option<(String, String, f64)> {
1002        // Try left as property, right as literal
1003        if let Some(result) = self.try_extract_property_literal(left, right) {
1004            return Some(result);
1005        }
1006
1007        // Try right as property, left as literal
1008        self.try_extract_property_literal(right, left)
1009    }
1010
1011    /// Tries to extract property and literal from a specific ordering.
1012    fn try_extract_property_literal(
1013        &self,
1014        property_expr: &LogicalExpression,
1015        literal_expr: &LogicalExpression,
1016    ) -> Option<(String, String, f64)> {
1017        // Extract property access
1018        let (variable, property) = match property_expr {
1019            LogicalExpression::Property { variable, property } => {
1020                (variable.clone(), property.clone())
1021            }
1022            _ => return None,
1023        };
1024
1025        // Extract numeric literal
1026        let value = match literal_expr {
1027            LogicalExpression::Literal(grafeo_common::types::Value::Int64(n)) => *n as f64,
1028            LogicalExpression::Literal(grafeo_common::types::Value::Float64(f)) => *f,
1029            _ => return None,
1030        };
1031
1032        // Try to find a label for this variable from table stats
1033        // Use the variable name as a heuristic label lookup
1034        // In practice, the optimizer would track which labels variables are bound to
1035        for label in self.table_stats.keys() {
1036            if let Some(stats) = self.table_stats.get(label)
1037                && stats.columns.contains_key(&property)
1038            {
1039                return Some((label.clone(), property, value));
1040            }
1041        }
1042
1043        // If no stats found but we have the property, return with variable as label
1044        Some((variable, property, value))
1045    }
1046
1047    /// Estimates unary expression selectivity.
1048    fn estimate_unary_selectivity(&self, op: UnaryOp, _operand: &LogicalExpression) -> f64 {
1049        match op {
1050            UnaryOp::Not => 1.0 - self.default_selectivity,
1051            UnaryOp::IsNull => self.selectivity_config.is_null,
1052            UnaryOp::IsNotNull => self.selectivity_config.is_not_null,
1053            UnaryOp::Neg => 1.0, // Negation doesn't change cardinality
1054        }
1055    }
1056
1057    /// Gets statistics for a column.
1058    fn get_column_stats(&self, label: &str, column: &str) -> Option<&ColumnStats> {
1059        self.table_stats.get(label)?.columns.get(column)
1060    }
1061
1062    /// Estimates equality selectivity using column statistics.
1063    #[allow(dead_code)]
1064    fn estimate_equality_with_stats(&self, label: &str, column: &str) -> f64 {
1065        if let Some(stats) = self.get_column_stats(label, column)
1066            && stats.distinct_count > 0
1067        {
1068            return 1.0 / stats.distinct_count as f64;
1069        }
1070        self.selectivity_config.equality
1071    }
1072
1073    /// Estimates range selectivity using column statistics.
1074    #[allow(dead_code)]
1075    fn estimate_range_with_stats(
1076        &self,
1077        label: &str,
1078        column: &str,
1079        lower: Option<f64>,
1080        upper: Option<f64>,
1081    ) -> f64 {
1082        if let Some(stats) = self.get_column_stats(label, column)
1083            && let (Some(min), Some(max)) = (stats.min_value, stats.max_value)
1084        {
1085            let range = max - min;
1086            if range <= 0.0 {
1087                return 1.0;
1088            }
1089
1090            let effective_lower = lower.unwrap_or(min).max(min);
1091            let effective_upper = upper.unwrap_or(max).min(max);
1092
1093            let overlap = (effective_upper - effective_lower).max(0.0);
1094            return (overlap / range).min(1.0).max(0.0);
1095        }
1096        self.selectivity_config.range
1097    }
1098}
1099
1100impl Default for CardinalityEstimator {
1101    fn default() -> Self {
1102        Self::new()
1103    }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108    use super::*;
1109    use crate::query::plan::{
1110        DistinctOp, ExpandDirection, ExpandOp, FilterOp, JoinCondition, NodeScanOp, ProjectOp,
1111        Projection, ReturnItem, ReturnOp, SkipOp, SortKey, SortOp, SortOrder,
1112    };
1113    use grafeo_common::types::Value;
1114
1115    #[test]
1116    fn test_node_scan_with_stats() {
1117        let mut estimator = CardinalityEstimator::new();
1118        estimator.add_table_stats("Person", TableStats::new(5000));
1119
1120        let scan = LogicalOperator::NodeScan(NodeScanOp {
1121            variable: "n".to_string(),
1122            label: Some("Person".to_string()),
1123            input: None,
1124        });
1125
1126        let cardinality = estimator.estimate(&scan);
1127        assert!((cardinality - 5000.0).abs() < 0.001);
1128    }
1129
1130    #[test]
1131    fn test_filter_reduces_cardinality() {
1132        let mut estimator = CardinalityEstimator::new();
1133        estimator.add_table_stats("Person", TableStats::new(1000));
1134
1135        let filter = LogicalOperator::Filter(FilterOp {
1136            predicate: LogicalExpression::Binary {
1137                left: Box::new(LogicalExpression::Property {
1138                    variable: "n".to_string(),
1139                    property: "age".to_string(),
1140                }),
1141                op: BinaryOp::Eq,
1142                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1143            },
1144            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1145                variable: "n".to_string(),
1146                label: Some("Person".to_string()),
1147                input: None,
1148            })),
1149        });
1150
1151        let cardinality = estimator.estimate(&filter);
1152        // Equality selectivity is 0.01, so 1000 * 0.01 = 10
1153        assert!(cardinality < 1000.0);
1154        assert!(cardinality >= 1.0);
1155    }
1156
1157    #[test]
1158    fn test_join_cardinality() {
1159        let mut estimator = CardinalityEstimator::new();
1160        estimator.add_table_stats("Person", TableStats::new(1000));
1161        estimator.add_table_stats("Company", TableStats::new(100));
1162
1163        let join = LogicalOperator::Join(JoinOp {
1164            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1165                variable: "p".to_string(),
1166                label: Some("Person".to_string()),
1167                input: None,
1168            })),
1169            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1170                variable: "c".to_string(),
1171                label: Some("Company".to_string()),
1172                input: None,
1173            })),
1174            join_type: JoinType::Inner,
1175            conditions: vec![JoinCondition {
1176                left: LogicalExpression::Property {
1177                    variable: "p".to_string(),
1178                    property: "company_id".to_string(),
1179                },
1180                right: LogicalExpression::Property {
1181                    variable: "c".to_string(),
1182                    property: "id".to_string(),
1183                },
1184            }],
1185        });
1186
1187        let cardinality = estimator.estimate(&join);
1188        // Should be less than cross product
1189        assert!(cardinality < 1000.0 * 100.0);
1190    }
1191
1192    #[test]
1193    fn test_limit_caps_cardinality() {
1194        let mut estimator = CardinalityEstimator::new();
1195        estimator.add_table_stats("Person", TableStats::new(1000));
1196
1197        let limit = LogicalOperator::Limit(LimitOp {
1198            count: 10,
1199            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1200                variable: "n".to_string(),
1201                label: Some("Person".to_string()),
1202                input: None,
1203            })),
1204        });
1205
1206        let cardinality = estimator.estimate(&limit);
1207        assert!((cardinality - 10.0).abs() < 0.001);
1208    }
1209
1210    #[test]
1211    fn test_aggregate_reduces_cardinality() {
1212        let mut estimator = CardinalityEstimator::new();
1213        estimator.add_table_stats("Person", TableStats::new(1000));
1214
1215        // Global aggregation
1216        let global_agg = LogicalOperator::Aggregate(AggregateOp {
1217            group_by: vec![],
1218            aggregates: vec![],
1219            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1220                variable: "n".to_string(),
1221                label: Some("Person".to_string()),
1222                input: None,
1223            })),
1224            having: None,
1225        });
1226
1227        let cardinality = estimator.estimate(&global_agg);
1228        assert!((cardinality - 1.0).abs() < 0.001);
1229
1230        // Group by aggregation
1231        let group_agg = LogicalOperator::Aggregate(AggregateOp {
1232            group_by: vec![LogicalExpression::Property {
1233                variable: "n".to_string(),
1234                property: "city".to_string(),
1235            }],
1236            aggregates: vec![],
1237            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1238                variable: "n".to_string(),
1239                label: Some("Person".to_string()),
1240                input: None,
1241            })),
1242            having: None,
1243        });
1244
1245        let cardinality = estimator.estimate(&group_agg);
1246        // Should be less than input
1247        assert!(cardinality < 1000.0);
1248    }
1249
1250    #[test]
1251    fn test_node_scan_without_stats() {
1252        let estimator = CardinalityEstimator::new();
1253
1254        let scan = LogicalOperator::NodeScan(NodeScanOp {
1255            variable: "n".to_string(),
1256            label: Some("Unknown".to_string()),
1257            input: None,
1258        });
1259
1260        let cardinality = estimator.estimate(&scan);
1261        // Should return default (1000)
1262        assert!((cardinality - 1000.0).abs() < 0.001);
1263    }
1264
1265    #[test]
1266    fn test_node_scan_no_label() {
1267        let estimator = CardinalityEstimator::new();
1268
1269        let scan = LogicalOperator::NodeScan(NodeScanOp {
1270            variable: "n".to_string(),
1271            label: None,
1272            input: None,
1273        });
1274
1275        let cardinality = estimator.estimate(&scan);
1276        // Should scan all nodes (default)
1277        assert!((cardinality - 1000.0).abs() < 0.001);
1278    }
1279
1280    #[test]
1281    fn test_filter_inequality_selectivity() {
1282        let mut estimator = CardinalityEstimator::new();
1283        estimator.add_table_stats("Person", TableStats::new(1000));
1284
1285        let filter = LogicalOperator::Filter(FilterOp {
1286            predicate: LogicalExpression::Binary {
1287                left: Box::new(LogicalExpression::Property {
1288                    variable: "n".to_string(),
1289                    property: "age".to_string(),
1290                }),
1291                op: BinaryOp::Ne,
1292                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1293            },
1294            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1295                variable: "n".to_string(),
1296                label: Some("Person".to_string()),
1297                input: None,
1298            })),
1299        });
1300
1301        let cardinality = estimator.estimate(&filter);
1302        // Inequality selectivity is 0.99, so 1000 * 0.99 = 990
1303        assert!(cardinality > 900.0);
1304    }
1305
1306    #[test]
1307    fn test_filter_range_selectivity() {
1308        let mut estimator = CardinalityEstimator::new();
1309        estimator.add_table_stats("Person", TableStats::new(1000));
1310
1311        let filter = LogicalOperator::Filter(FilterOp {
1312            predicate: LogicalExpression::Binary {
1313                left: Box::new(LogicalExpression::Property {
1314                    variable: "n".to_string(),
1315                    property: "age".to_string(),
1316                }),
1317                op: BinaryOp::Gt,
1318                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1319            },
1320            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1321                variable: "n".to_string(),
1322                label: Some("Person".to_string()),
1323                input: None,
1324            })),
1325        });
1326
1327        let cardinality = estimator.estimate(&filter);
1328        // Range selectivity is 0.33, so 1000 * 0.33 = 330
1329        assert!(cardinality < 500.0);
1330        assert!(cardinality > 100.0);
1331    }
1332
1333    #[test]
1334    fn test_filter_and_selectivity() {
1335        let mut estimator = CardinalityEstimator::new();
1336        estimator.add_table_stats("Person", TableStats::new(1000));
1337
1338        // Test AND with two equality predicates
1339        // Each equality has selectivity 0.01, so AND gives 0.01 * 0.01 = 0.0001
1340        let filter = LogicalOperator::Filter(FilterOp {
1341            predicate: LogicalExpression::Binary {
1342                left: Box::new(LogicalExpression::Binary {
1343                    left: Box::new(LogicalExpression::Property {
1344                        variable: "n".to_string(),
1345                        property: "city".to_string(),
1346                    }),
1347                    op: BinaryOp::Eq,
1348                    right: Box::new(LogicalExpression::Literal(Value::String("NYC".into()))),
1349                }),
1350                op: BinaryOp::And,
1351                right: Box::new(LogicalExpression::Binary {
1352                    left: Box::new(LogicalExpression::Property {
1353                        variable: "n".to_string(),
1354                        property: "age".to_string(),
1355                    }),
1356                    op: BinaryOp::Eq,
1357                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1358                }),
1359            },
1360            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1361                variable: "n".to_string(),
1362                label: Some("Person".to_string()),
1363                input: None,
1364            })),
1365        });
1366
1367        let cardinality = estimator.estimate(&filter);
1368        // AND reduces selectivity (multiply): 0.01 * 0.01 = 0.0001
1369        // 1000 * 0.0001 = 0.1, min is 1.0
1370        assert!(cardinality < 100.0);
1371        assert!(cardinality >= 1.0);
1372    }
1373
1374    #[test]
1375    fn test_filter_or_selectivity() {
1376        let mut estimator = CardinalityEstimator::new();
1377        estimator.add_table_stats("Person", TableStats::new(1000));
1378
1379        // Test OR with two equality predicates
1380        // Each equality has selectivity 0.01
1381        // OR gives: 0.01 + 0.01 - (0.01 * 0.01) = 0.0199
1382        let filter = LogicalOperator::Filter(FilterOp {
1383            predicate: LogicalExpression::Binary {
1384                left: Box::new(LogicalExpression::Binary {
1385                    left: Box::new(LogicalExpression::Property {
1386                        variable: "n".to_string(),
1387                        property: "city".to_string(),
1388                    }),
1389                    op: BinaryOp::Eq,
1390                    right: Box::new(LogicalExpression::Literal(Value::String("NYC".into()))),
1391                }),
1392                op: BinaryOp::Or,
1393                right: Box::new(LogicalExpression::Binary {
1394                    left: Box::new(LogicalExpression::Property {
1395                        variable: "n".to_string(),
1396                        property: "city".to_string(),
1397                    }),
1398                    op: BinaryOp::Eq,
1399                    right: Box::new(LogicalExpression::Literal(Value::String("LA".into()))),
1400                }),
1401            },
1402            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1403                variable: "n".to_string(),
1404                label: Some("Person".to_string()),
1405                input: None,
1406            })),
1407        });
1408
1409        let cardinality = estimator.estimate(&filter);
1410        // OR: 0.01 + 0.01 - 0.0001 ≈ 0.0199, so 1000 * 0.0199 ≈ 19.9
1411        assert!(cardinality < 100.0);
1412        assert!(cardinality >= 1.0);
1413    }
1414
1415    #[test]
1416    fn test_filter_literal_true() {
1417        let mut estimator = CardinalityEstimator::new();
1418        estimator.add_table_stats("Person", TableStats::new(1000));
1419
1420        let filter = LogicalOperator::Filter(FilterOp {
1421            predicate: LogicalExpression::Literal(Value::Bool(true)),
1422            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1423                variable: "n".to_string(),
1424                label: Some("Person".to_string()),
1425                input: None,
1426            })),
1427        });
1428
1429        let cardinality = estimator.estimate(&filter);
1430        // Literal true has selectivity 1.0
1431        assert!((cardinality - 1000.0).abs() < 0.001);
1432    }
1433
1434    #[test]
1435    fn test_filter_literal_false() {
1436        let mut estimator = CardinalityEstimator::new();
1437        estimator.add_table_stats("Person", TableStats::new(1000));
1438
1439        let filter = LogicalOperator::Filter(FilterOp {
1440            predicate: LogicalExpression::Literal(Value::Bool(false)),
1441            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1442                variable: "n".to_string(),
1443                label: Some("Person".to_string()),
1444                input: None,
1445            })),
1446        });
1447
1448        let cardinality = estimator.estimate(&filter);
1449        // Literal false has selectivity 0.0, but min is 1.0
1450        assert!((cardinality - 1.0).abs() < 0.001);
1451    }
1452
1453    #[test]
1454    fn test_unary_not_selectivity() {
1455        let mut estimator = CardinalityEstimator::new();
1456        estimator.add_table_stats("Person", TableStats::new(1000));
1457
1458        let filter = LogicalOperator::Filter(FilterOp {
1459            predicate: LogicalExpression::Unary {
1460                op: UnaryOp::Not,
1461                operand: Box::new(LogicalExpression::Literal(Value::Bool(true))),
1462            },
1463            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1464                variable: "n".to_string(),
1465                label: Some("Person".to_string()),
1466                input: None,
1467            })),
1468        });
1469
1470        let cardinality = estimator.estimate(&filter);
1471        // NOT inverts selectivity
1472        assert!(cardinality < 1000.0);
1473    }
1474
1475    #[test]
1476    fn test_unary_is_null_selectivity() {
1477        let mut estimator = CardinalityEstimator::new();
1478        estimator.add_table_stats("Person", TableStats::new(1000));
1479
1480        let filter = LogicalOperator::Filter(FilterOp {
1481            predicate: LogicalExpression::Unary {
1482                op: UnaryOp::IsNull,
1483                operand: Box::new(LogicalExpression::Variable("x".to_string())),
1484            },
1485            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1486                variable: "n".to_string(),
1487                label: Some("Person".to_string()),
1488                input: None,
1489            })),
1490        });
1491
1492        let cardinality = estimator.estimate(&filter);
1493        // IS NULL has selectivity 0.05
1494        assert!(cardinality < 100.0);
1495    }
1496
1497    #[test]
1498    fn test_expand_cardinality() {
1499        let mut estimator = CardinalityEstimator::new();
1500        estimator.add_table_stats("Person", TableStats::new(100));
1501
1502        let expand = LogicalOperator::Expand(ExpandOp {
1503            from_variable: "a".to_string(),
1504            to_variable: "b".to_string(),
1505            edge_variable: None,
1506            direction: ExpandDirection::Outgoing,
1507            edge_type: None,
1508            min_hops: 1,
1509            max_hops: Some(1),
1510            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1511                variable: "a".to_string(),
1512                label: Some("Person".to_string()),
1513                input: None,
1514            })),
1515            path_alias: None,
1516        });
1517
1518        let cardinality = estimator.estimate(&expand);
1519        // Expand multiplies by fanout (10)
1520        assert!(cardinality > 100.0);
1521    }
1522
1523    #[test]
1524    fn test_expand_with_edge_type_filter() {
1525        let mut estimator = CardinalityEstimator::new();
1526        estimator.add_table_stats("Person", TableStats::new(100));
1527
1528        let expand = LogicalOperator::Expand(ExpandOp {
1529            from_variable: "a".to_string(),
1530            to_variable: "b".to_string(),
1531            edge_variable: None,
1532            direction: ExpandDirection::Outgoing,
1533            edge_type: Some("KNOWS".to_string()),
1534            min_hops: 1,
1535            max_hops: Some(1),
1536            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1537                variable: "a".to_string(),
1538                label: Some("Person".to_string()),
1539                input: None,
1540            })),
1541            path_alias: None,
1542        });
1543
1544        let cardinality = estimator.estimate(&expand);
1545        // With edge type, fanout is reduced by half
1546        assert!(cardinality > 100.0);
1547    }
1548
1549    #[test]
1550    fn test_expand_variable_length() {
1551        let mut estimator = CardinalityEstimator::new();
1552        estimator.add_table_stats("Person", TableStats::new(100));
1553
1554        let expand = LogicalOperator::Expand(ExpandOp {
1555            from_variable: "a".to_string(),
1556            to_variable: "b".to_string(),
1557            edge_variable: None,
1558            direction: ExpandDirection::Outgoing,
1559            edge_type: None,
1560            min_hops: 1,
1561            max_hops: Some(3),
1562            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1563                variable: "a".to_string(),
1564                label: Some("Person".to_string()),
1565                input: None,
1566            })),
1567            path_alias: None,
1568        });
1569
1570        let cardinality = estimator.estimate(&expand);
1571        // Variable length path has much higher cardinality
1572        assert!(cardinality > 500.0);
1573    }
1574
1575    #[test]
1576    fn test_join_cross_product() {
1577        let mut estimator = CardinalityEstimator::new();
1578        estimator.add_table_stats("Person", TableStats::new(100));
1579        estimator.add_table_stats("Company", TableStats::new(50));
1580
1581        let join = LogicalOperator::Join(JoinOp {
1582            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1583                variable: "p".to_string(),
1584                label: Some("Person".to_string()),
1585                input: None,
1586            })),
1587            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1588                variable: "c".to_string(),
1589                label: Some("Company".to_string()),
1590                input: None,
1591            })),
1592            join_type: JoinType::Cross,
1593            conditions: vec![],
1594        });
1595
1596        let cardinality = estimator.estimate(&join);
1597        // Cross join = 100 * 50 = 5000
1598        assert!((cardinality - 5000.0).abs() < 0.001);
1599    }
1600
1601    #[test]
1602    fn test_join_left_outer() {
1603        let mut estimator = CardinalityEstimator::new();
1604        estimator.add_table_stats("Person", TableStats::new(1000));
1605        estimator.add_table_stats("Company", TableStats::new(10));
1606
1607        let join = LogicalOperator::Join(JoinOp {
1608            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1609                variable: "p".to_string(),
1610                label: Some("Person".to_string()),
1611                input: None,
1612            })),
1613            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1614                variable: "c".to_string(),
1615                label: Some("Company".to_string()),
1616                input: None,
1617            })),
1618            join_type: JoinType::Left,
1619            conditions: vec![JoinCondition {
1620                left: LogicalExpression::Variable("p".to_string()),
1621                right: LogicalExpression::Variable("c".to_string()),
1622            }],
1623        });
1624
1625        let cardinality = estimator.estimate(&join);
1626        // Left join returns at least all left rows
1627        assert!(cardinality >= 1000.0);
1628    }
1629
1630    #[test]
1631    fn test_join_semi() {
1632        let mut estimator = CardinalityEstimator::new();
1633        estimator.add_table_stats("Person", TableStats::new(1000));
1634        estimator.add_table_stats("Company", TableStats::new(100));
1635
1636        let join = LogicalOperator::Join(JoinOp {
1637            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1638                variable: "p".to_string(),
1639                label: Some("Person".to_string()),
1640                input: None,
1641            })),
1642            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1643                variable: "c".to_string(),
1644                label: Some("Company".to_string()),
1645                input: None,
1646            })),
1647            join_type: JoinType::Semi,
1648            conditions: vec![],
1649        });
1650
1651        let cardinality = estimator.estimate(&join);
1652        // Semi join returns at most left cardinality
1653        assert!(cardinality <= 1000.0);
1654    }
1655
1656    #[test]
1657    fn test_join_anti() {
1658        let mut estimator = CardinalityEstimator::new();
1659        estimator.add_table_stats("Person", TableStats::new(1000));
1660        estimator.add_table_stats("Company", TableStats::new(100));
1661
1662        let join = LogicalOperator::Join(JoinOp {
1663            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1664                variable: "p".to_string(),
1665                label: Some("Person".to_string()),
1666                input: None,
1667            })),
1668            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1669                variable: "c".to_string(),
1670                label: Some("Company".to_string()),
1671                input: None,
1672            })),
1673            join_type: JoinType::Anti,
1674            conditions: vec![],
1675        });
1676
1677        let cardinality = estimator.estimate(&join);
1678        // Anti join returns at most left cardinality
1679        assert!(cardinality <= 1000.0);
1680        assert!(cardinality >= 1.0);
1681    }
1682
1683    #[test]
1684    fn test_project_preserves_cardinality() {
1685        let mut estimator = CardinalityEstimator::new();
1686        estimator.add_table_stats("Person", TableStats::new(1000));
1687
1688        let project = LogicalOperator::Project(ProjectOp {
1689            projections: vec![Projection {
1690                expression: LogicalExpression::Variable("n".to_string()),
1691                alias: None,
1692            }],
1693            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1694                variable: "n".to_string(),
1695                label: Some("Person".to_string()),
1696                input: None,
1697            })),
1698        });
1699
1700        let cardinality = estimator.estimate(&project);
1701        assert!((cardinality - 1000.0).abs() < 0.001);
1702    }
1703
1704    #[test]
1705    fn test_sort_preserves_cardinality() {
1706        let mut estimator = CardinalityEstimator::new();
1707        estimator.add_table_stats("Person", TableStats::new(1000));
1708
1709        let sort = LogicalOperator::Sort(SortOp {
1710            keys: vec![SortKey {
1711                expression: LogicalExpression::Variable("n".to_string()),
1712                order: SortOrder::Ascending,
1713            }],
1714            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1715                variable: "n".to_string(),
1716                label: Some("Person".to_string()),
1717                input: None,
1718            })),
1719        });
1720
1721        let cardinality = estimator.estimate(&sort);
1722        assert!((cardinality - 1000.0).abs() < 0.001);
1723    }
1724
1725    #[test]
1726    fn test_distinct_reduces_cardinality() {
1727        let mut estimator = CardinalityEstimator::new();
1728        estimator.add_table_stats("Person", TableStats::new(1000));
1729
1730        let distinct = LogicalOperator::Distinct(DistinctOp {
1731            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1732                variable: "n".to_string(),
1733                label: Some("Person".to_string()),
1734                input: None,
1735            })),
1736            columns: None,
1737        });
1738
1739        let cardinality = estimator.estimate(&distinct);
1740        // Distinct assumes 50% unique
1741        assert!((cardinality - 500.0).abs() < 0.001);
1742    }
1743
1744    #[test]
1745    fn test_skip_reduces_cardinality() {
1746        let mut estimator = CardinalityEstimator::new();
1747        estimator.add_table_stats("Person", TableStats::new(1000));
1748
1749        let skip = LogicalOperator::Skip(SkipOp {
1750            count: 100,
1751            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1752                variable: "n".to_string(),
1753                label: Some("Person".to_string()),
1754                input: None,
1755            })),
1756        });
1757
1758        let cardinality = estimator.estimate(&skip);
1759        assert!((cardinality - 900.0).abs() < 0.001);
1760    }
1761
1762    #[test]
1763    fn test_return_preserves_cardinality() {
1764        let mut estimator = CardinalityEstimator::new();
1765        estimator.add_table_stats("Person", TableStats::new(1000));
1766
1767        let ret = LogicalOperator::Return(ReturnOp {
1768            items: vec![ReturnItem {
1769                expression: LogicalExpression::Variable("n".to_string()),
1770                alias: None,
1771            }],
1772            distinct: false,
1773            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1774                variable: "n".to_string(),
1775                label: Some("Person".to_string()),
1776                input: None,
1777            })),
1778        });
1779
1780        let cardinality = estimator.estimate(&ret);
1781        assert!((cardinality - 1000.0).abs() < 0.001);
1782    }
1783
1784    #[test]
1785    fn test_empty_cardinality() {
1786        let estimator = CardinalityEstimator::new();
1787        let cardinality = estimator.estimate(&LogicalOperator::Empty);
1788        assert!((cardinality).abs() < 0.001);
1789    }
1790
1791    #[test]
1792    fn test_table_stats_with_column() {
1793        let stats = TableStats::new(1000).with_column(
1794            "age",
1795            ColumnStats::new(50).with_nulls(10).with_range(0.0, 100.0),
1796        );
1797
1798        assert_eq!(stats.row_count, 1000);
1799        let col = stats.columns.get("age").unwrap();
1800        assert_eq!(col.distinct_count, 50);
1801        assert_eq!(col.null_count, 10);
1802        assert!((col.min_value.unwrap() - 0.0).abs() < 0.001);
1803        assert!((col.max_value.unwrap() - 100.0).abs() < 0.001);
1804    }
1805
1806    #[test]
1807    fn test_estimator_default() {
1808        let estimator = CardinalityEstimator::default();
1809        let scan = LogicalOperator::NodeScan(NodeScanOp {
1810            variable: "n".to_string(),
1811            label: None,
1812            input: None,
1813        });
1814        let cardinality = estimator.estimate(&scan);
1815        assert!((cardinality - 1000.0).abs() < 0.001);
1816    }
1817
1818    #[test]
1819    fn test_set_avg_fanout() {
1820        let mut estimator = CardinalityEstimator::new();
1821        estimator.add_table_stats("Person", TableStats::new(100));
1822        estimator.set_avg_fanout(5.0);
1823
1824        let expand = LogicalOperator::Expand(ExpandOp {
1825            from_variable: "a".to_string(),
1826            to_variable: "b".to_string(),
1827            edge_variable: None,
1828            direction: ExpandDirection::Outgoing,
1829            edge_type: None,
1830            min_hops: 1,
1831            max_hops: Some(1),
1832            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1833                variable: "a".to_string(),
1834                label: Some("Person".to_string()),
1835                input: None,
1836            })),
1837            path_alias: None,
1838        });
1839
1840        let cardinality = estimator.estimate(&expand);
1841        // With fanout 5: 100 * 5 = 500
1842        assert!((cardinality - 500.0).abs() < 0.001);
1843    }
1844
1845    #[test]
1846    fn test_multiple_group_by_keys_reduce_cardinality() {
1847        // The current implementation uses a simplified model where more group by keys
1848        // results in greater reduction (dividing by 10^num_keys). This is a simplification
1849        // that works for most cases where group by keys are correlated.
1850        let mut estimator = CardinalityEstimator::new();
1851        estimator.add_table_stats("Person", TableStats::new(10000));
1852
1853        let single_group = LogicalOperator::Aggregate(AggregateOp {
1854            group_by: vec![LogicalExpression::Property {
1855                variable: "n".to_string(),
1856                property: "city".to_string(),
1857            }],
1858            aggregates: vec![],
1859            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1860                variable: "n".to_string(),
1861                label: Some("Person".to_string()),
1862                input: None,
1863            })),
1864            having: None,
1865        });
1866
1867        let multi_group = LogicalOperator::Aggregate(AggregateOp {
1868            group_by: vec![
1869                LogicalExpression::Property {
1870                    variable: "n".to_string(),
1871                    property: "city".to_string(),
1872                },
1873                LogicalExpression::Property {
1874                    variable: "n".to_string(),
1875                    property: "country".to_string(),
1876                },
1877            ],
1878            aggregates: vec![],
1879            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1880                variable: "n".to_string(),
1881                label: Some("Person".to_string()),
1882                input: None,
1883            })),
1884            having: None,
1885        });
1886
1887        let single_card = estimator.estimate(&single_group);
1888        let multi_card = estimator.estimate(&multi_group);
1889
1890        // Both should reduce cardinality from input
1891        assert!(single_card < 10000.0);
1892        assert!(multi_card < 10000.0);
1893        // Both should be at least 1
1894        assert!(single_card >= 1.0);
1895        assert!(multi_card >= 1.0);
1896    }
1897
1898    // ============= Histogram Tests =============
1899
1900    #[test]
1901    fn test_histogram_build_uniform() {
1902        // Build histogram from uniformly distributed data
1903        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1904        let histogram = EquiDepthHistogram::build(&values, 10);
1905
1906        assert_eq!(histogram.num_buckets(), 10);
1907        assert_eq!(histogram.total_rows(), 100);
1908
1909        // Each bucket should have approximately 10 rows
1910        for bucket in histogram.buckets() {
1911            assert!(bucket.frequency >= 9 && bucket.frequency <= 11);
1912        }
1913    }
1914
1915    #[test]
1916    fn test_histogram_build_skewed() {
1917        // Build histogram from skewed data (many small values, few large)
1918        let mut values: Vec<f64> = (0..80).map(|i| i as f64).collect();
1919        values.extend((0..20).map(|i| 1000.0 + i as f64));
1920        let histogram = EquiDepthHistogram::build(&values, 5);
1921
1922        assert_eq!(histogram.num_buckets(), 5);
1923        assert_eq!(histogram.total_rows(), 100);
1924
1925        // Each bucket should have ~20 rows despite skewed data
1926        for bucket in histogram.buckets() {
1927            assert!(bucket.frequency >= 18 && bucket.frequency <= 22);
1928        }
1929    }
1930
1931    #[test]
1932    fn test_histogram_range_selectivity_full() {
1933        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1934        let histogram = EquiDepthHistogram::build(&values, 10);
1935
1936        // Full range should have selectivity ~1.0
1937        let selectivity = histogram.range_selectivity(None, None);
1938        assert!((selectivity - 1.0).abs() < 0.01);
1939    }
1940
1941    #[test]
1942    fn test_histogram_range_selectivity_half() {
1943        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1944        let histogram = EquiDepthHistogram::build(&values, 10);
1945
1946        // Values >= 50 should be ~50% (half the data)
1947        let selectivity = histogram.range_selectivity(Some(50.0), None);
1948        assert!(selectivity > 0.4 && selectivity < 0.6);
1949    }
1950
1951    #[test]
1952    fn test_histogram_range_selectivity_quarter() {
1953        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1954        let histogram = EquiDepthHistogram::build(&values, 10);
1955
1956        // Values < 25 should be ~25%
1957        let selectivity = histogram.range_selectivity(None, Some(25.0));
1958        assert!(selectivity > 0.2 && selectivity < 0.3);
1959    }
1960
1961    #[test]
1962    fn test_histogram_equality_selectivity() {
1963        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
1964        let histogram = EquiDepthHistogram::build(&values, 10);
1965
1966        // Equality on 100 distinct values should be ~1%
1967        let selectivity = histogram.equality_selectivity(50.0);
1968        assert!(selectivity > 0.005 && selectivity < 0.02);
1969    }
1970
1971    #[test]
1972    fn test_histogram_empty() {
1973        let histogram = EquiDepthHistogram::build(&[], 10);
1974
1975        assert_eq!(histogram.num_buckets(), 0);
1976        assert_eq!(histogram.total_rows(), 0);
1977
1978        // Default selectivity for empty histogram
1979        let selectivity = histogram.range_selectivity(Some(0.0), Some(100.0));
1980        assert!((selectivity - 0.33).abs() < 0.01);
1981    }
1982
1983    #[test]
1984    fn test_histogram_bucket_overlap() {
1985        let bucket = HistogramBucket::new(10.0, 20.0, 100, 10);
1986
1987        // Full overlap
1988        assert!((bucket.overlap_fraction(Some(10.0), Some(20.0)) - 1.0).abs() < 0.01);
1989
1990        // Half overlap (lower half)
1991        assert!((bucket.overlap_fraction(Some(10.0), Some(15.0)) - 0.5).abs() < 0.01);
1992
1993        // Half overlap (upper half)
1994        assert!((bucket.overlap_fraction(Some(15.0), Some(20.0)) - 0.5).abs() < 0.01);
1995
1996        // No overlap (below)
1997        assert!((bucket.overlap_fraction(Some(0.0), Some(5.0))).abs() < 0.01);
1998
1999        // No overlap (above)
2000        assert!((bucket.overlap_fraction(Some(25.0), Some(30.0))).abs() < 0.01);
2001    }
2002
2003    #[test]
2004    fn test_column_stats_from_values() {
2005        let values = vec![10.0, 20.0, 30.0, 40.0, 50.0, 20.0, 30.0, 40.0];
2006        let stats = ColumnStats::from_values(values, 4);
2007
2008        assert_eq!(stats.distinct_count, 5); // 10, 20, 30, 40, 50
2009        assert!(stats.min_value.is_some());
2010        assert!((stats.min_value.unwrap() - 10.0).abs() < 0.01);
2011        assert!(stats.max_value.is_some());
2012        assert!((stats.max_value.unwrap() - 50.0).abs() < 0.01);
2013        assert!(stats.histogram.is_some());
2014    }
2015
2016    #[test]
2017    fn test_column_stats_with_histogram_builder() {
2018        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2019        let histogram = EquiDepthHistogram::build(&values, 10);
2020
2021        let stats = ColumnStats::new(100)
2022            .with_range(0.0, 99.0)
2023            .with_histogram(histogram);
2024
2025        assert!(stats.histogram.is_some());
2026        assert_eq!(stats.histogram.as_ref().unwrap().num_buckets(), 10);
2027    }
2028
2029    #[test]
2030    fn test_filter_with_histogram_stats() {
2031        let mut estimator = CardinalityEstimator::new();
2032
2033        // Create stats with histogram for age column
2034        let age_values: Vec<f64> = (18..80).map(|i| i as f64).collect();
2035        let histogram = EquiDepthHistogram::build(&age_values, 10);
2036        let age_stats = ColumnStats::new(62)
2037            .with_range(18.0, 79.0)
2038            .with_histogram(histogram);
2039
2040        estimator.add_table_stats(
2041            "Person",
2042            TableStats::new(1000).with_column("age", age_stats),
2043        );
2044
2045        // Filter: age > 50
2046        // Age range is 18-79, so >50 is about (79-50)/(79-18) = 29/61 ≈ 47.5%
2047        let filter = LogicalOperator::Filter(FilterOp {
2048            predicate: LogicalExpression::Binary {
2049                left: Box::new(LogicalExpression::Property {
2050                    variable: "n".to_string(),
2051                    property: "age".to_string(),
2052                }),
2053                op: BinaryOp::Gt,
2054                right: Box::new(LogicalExpression::Literal(Value::Int64(50))),
2055            },
2056            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2057                variable: "n".to_string(),
2058                label: Some("Person".to_string()),
2059                input: None,
2060            })),
2061        });
2062
2063        let cardinality = estimator.estimate(&filter);
2064
2065        // With histogram, should get more accurate estimate than default 0.33
2066        // Expected: ~47.5% of 1000 = ~475
2067        assert!(cardinality > 300.0 && cardinality < 600.0);
2068    }
2069
2070    #[test]
2071    fn test_filter_equality_with_histogram() {
2072        let mut estimator = CardinalityEstimator::new();
2073
2074        // Create stats with histogram
2075        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2076        let histogram = EquiDepthHistogram::build(&values, 10);
2077        let stats = ColumnStats::new(100)
2078            .with_range(0.0, 99.0)
2079            .with_histogram(histogram);
2080
2081        estimator.add_table_stats("Data", TableStats::new(1000).with_column("value", stats));
2082
2083        // Filter: value = 50
2084        let filter = LogicalOperator::Filter(FilterOp {
2085            predicate: LogicalExpression::Binary {
2086                left: Box::new(LogicalExpression::Property {
2087                    variable: "d".to_string(),
2088                    property: "value".to_string(),
2089                }),
2090                op: BinaryOp::Eq,
2091                right: Box::new(LogicalExpression::Literal(Value::Int64(50))),
2092            },
2093            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2094                variable: "d".to_string(),
2095                label: Some("Data".to_string()),
2096                input: None,
2097            })),
2098        });
2099
2100        let cardinality = estimator.estimate(&filter);
2101
2102        // With 100 distinct values, selectivity should be ~1%
2103        // 1000 * 0.01 = 10
2104        assert!((1.0..50.0).contains(&cardinality));
2105    }
2106
2107    #[test]
2108    fn test_histogram_min_max() {
2109        let values: Vec<f64> = vec![5.0, 10.0, 15.0, 20.0, 25.0];
2110        let histogram = EquiDepthHistogram::build(&values, 2);
2111
2112        assert_eq!(histogram.min_value(), Some(5.0));
2113        // Max is the upper bound of the last bucket
2114        assert!(histogram.max_value().is_some());
2115    }
2116
2117    // ==================== SelectivityConfig Tests ====================
2118
2119    #[test]
2120    fn test_selectivity_config_defaults() {
2121        let config = SelectivityConfig::new();
2122        assert!((config.default - 0.1).abs() < f64::EPSILON);
2123        assert!((config.equality - 0.01).abs() < f64::EPSILON);
2124        assert!((config.inequality - 0.99).abs() < f64::EPSILON);
2125        assert!((config.range - 0.33).abs() < f64::EPSILON);
2126        assert!((config.string_ops - 0.1).abs() < f64::EPSILON);
2127        assert!((config.membership - 0.1).abs() < f64::EPSILON);
2128        assert!((config.is_null - 0.05).abs() < f64::EPSILON);
2129        assert!((config.is_not_null - 0.95).abs() < f64::EPSILON);
2130        assert!((config.distinct_fraction - 0.5).abs() < f64::EPSILON);
2131    }
2132
2133    #[test]
2134    fn test_custom_selectivity_config() {
2135        let config = SelectivityConfig {
2136            equality: 0.05,
2137            range: 0.25,
2138            ..SelectivityConfig::new()
2139        };
2140        let estimator = CardinalityEstimator::with_selectivity_config(config);
2141        assert!((estimator.selectivity_config().equality - 0.05).abs() < f64::EPSILON);
2142        assert!((estimator.selectivity_config().range - 0.25).abs() < f64::EPSILON);
2143    }
2144
2145    #[test]
2146    fn test_custom_selectivity_affects_estimation() {
2147        // Default: equality = 0.01 → 1000 * 0.01 = 10
2148        let mut default_est = CardinalityEstimator::new();
2149        default_est.add_table_stats("Person", TableStats::new(1000));
2150
2151        let filter = LogicalOperator::Filter(FilterOp {
2152            predicate: LogicalExpression::Binary {
2153                left: Box::new(LogicalExpression::Property {
2154                    variable: "n".to_string(),
2155                    property: "name".to_string(),
2156                }),
2157                op: BinaryOp::Eq,
2158                right: Box::new(LogicalExpression::Literal(Value::String("Alice".into()))),
2159            },
2160            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2161                variable: "n".to_string(),
2162                label: Some("Person".to_string()),
2163                input: None,
2164            })),
2165        });
2166
2167        let default_card = default_est.estimate(&filter);
2168
2169        // Custom: equality = 0.2 → 1000 * 0.2 = 200
2170        let config = SelectivityConfig {
2171            equality: 0.2,
2172            ..SelectivityConfig::new()
2173        };
2174        let mut custom_est = CardinalityEstimator::with_selectivity_config(config);
2175        custom_est.add_table_stats("Person", TableStats::new(1000));
2176
2177        let custom_card = custom_est.estimate(&filter);
2178
2179        assert!(custom_card > default_card);
2180        assert!((custom_card - 200.0).abs() < 1.0);
2181    }
2182
2183    #[test]
2184    fn test_custom_range_selectivity() {
2185        let config = SelectivityConfig {
2186            range: 0.5,
2187            ..SelectivityConfig::new()
2188        };
2189        let mut estimator = CardinalityEstimator::with_selectivity_config(config);
2190        estimator.add_table_stats("Person", TableStats::new(1000));
2191
2192        let filter = LogicalOperator::Filter(FilterOp {
2193            predicate: LogicalExpression::Binary {
2194                left: Box::new(LogicalExpression::Property {
2195                    variable: "n".to_string(),
2196                    property: "age".to_string(),
2197                }),
2198                op: BinaryOp::Gt,
2199                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2200            },
2201            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2202                variable: "n".to_string(),
2203                label: Some("Person".to_string()),
2204                input: None,
2205            })),
2206        });
2207
2208        let cardinality = estimator.estimate(&filter);
2209        // 1000 * 0.5 = 500
2210        assert!((cardinality - 500.0).abs() < 1.0);
2211    }
2212
2213    #[test]
2214    fn test_custom_distinct_fraction() {
2215        let config = SelectivityConfig {
2216            distinct_fraction: 0.8,
2217            ..SelectivityConfig::new()
2218        };
2219        let mut estimator = CardinalityEstimator::with_selectivity_config(config);
2220        estimator.add_table_stats("Person", TableStats::new(1000));
2221
2222        let distinct = LogicalOperator::Distinct(DistinctOp {
2223            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2224                variable: "n".to_string(),
2225                label: Some("Person".to_string()),
2226                input: None,
2227            })),
2228            columns: None,
2229        });
2230
2231        let cardinality = estimator.estimate(&distinct);
2232        // 1000 * 0.8 = 800
2233        assert!((cardinality - 800.0).abs() < 1.0);
2234    }
2235
2236    // ==================== EstimationLog Tests ====================
2237
2238    #[test]
2239    fn test_estimation_log_basic() {
2240        let mut log = EstimationLog::new(10.0);
2241        log.record("NodeScan(Person)", 1000.0, 1200.0);
2242        log.record("Filter(age > 30)", 100.0, 90.0);
2243
2244        assert_eq!(log.entries().len(), 2);
2245        assert!(!log.should_replan()); // 1.2x and 0.9x are within 10x threshold
2246    }
2247
2248    #[test]
2249    fn test_estimation_log_triggers_replan() {
2250        let mut log = EstimationLog::new(10.0);
2251        log.record("NodeScan(Person)", 100.0, 5000.0); // 50x underestimate
2252
2253        assert!(log.should_replan());
2254    }
2255
2256    #[test]
2257    fn test_estimation_log_overestimate_triggers_replan() {
2258        let mut log = EstimationLog::new(5.0);
2259        log.record("Filter", 1000.0, 100.0); // 10x overestimate → ratio = 0.1
2260
2261        assert!(log.should_replan()); // 0.1 < 1/5 = 0.2
2262    }
2263
2264    #[test]
2265    fn test_estimation_entry_error_ratio() {
2266        let entry = EstimationEntry {
2267            operator: "test".into(),
2268            estimated: 100.0,
2269            actual: 200.0,
2270        };
2271        assert!((entry.error_ratio() - 2.0).abs() < f64::EPSILON);
2272
2273        let perfect = EstimationEntry {
2274            operator: "test".into(),
2275            estimated: 100.0,
2276            actual: 100.0,
2277        };
2278        assert!((perfect.error_ratio() - 1.0).abs() < f64::EPSILON);
2279
2280        let zero_est = EstimationEntry {
2281            operator: "test".into(),
2282            estimated: 0.0,
2283            actual: 0.0,
2284        };
2285        assert!((zero_est.error_ratio() - 1.0).abs() < f64::EPSILON);
2286    }
2287
2288    #[test]
2289    fn test_estimation_log_max_error_ratio() {
2290        let mut log = EstimationLog::new(10.0);
2291        log.record("A", 100.0, 300.0); // 3x
2292        log.record("B", 100.0, 50.0); // 2x (normalized: 1/0.5 = 2)
2293        log.record("C", 100.0, 100.0); // 1x
2294
2295        assert!((log.max_error_ratio() - 3.0).abs() < f64::EPSILON);
2296    }
2297
2298    #[test]
2299    fn test_estimation_log_clear() {
2300        let mut log = EstimationLog::new(10.0);
2301        log.record("A", 100.0, 100.0);
2302        assert_eq!(log.entries().len(), 1);
2303
2304        log.clear();
2305        assert!(log.entries().is_empty());
2306        assert!(!log.should_replan());
2307    }
2308
2309    #[test]
2310    fn test_create_estimation_log() {
2311        let log = CardinalityEstimator::create_estimation_log();
2312        assert!(log.entries().is_empty());
2313        assert!(!log.should_replan());
2314    }
2315}