Skip to main content

grafeo_engine/query/optimizer/
cardinality.rs

1//! Cardinality estimation for query optimization.
2//!
3//! Estimates the number of rows produced by each operator in a query plan.
4//!
5//! # Equi-Depth Histograms
6//!
7//! This module provides equi-depth histogram support for accurate selectivity
8//! estimation of range predicates. Unlike equi-width histograms that divide
9//! the value range into equal-sized buckets, equi-depth histograms divide
10//! the data into buckets with approximately equal numbers of rows.
11//!
12//! Benefits:
13//! - Better estimates for skewed data distributions
14//! - More accurate range selectivity than assuming uniform distribution
15//! - Adaptive to actual data characteristics
16
17use crate::query::plan::{
18    AggregateOp, BinaryOp, DistinctOp, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp,
19    LogicalExpression, LogicalOperator, MultiWayJoinOp, NodeScanOp, ProjectOp, SkipOp, SortOp,
20    TripleComponent, TripleScanOp, UnaryOp, VectorJoinOp, VectorScanOp,
21};
22use std::collections::HashMap;
23
24/// A bucket in an equi-depth histogram.
25///
26/// Each bucket represents a range of values and the frequency of rows
27/// falling within that range. In an equi-depth histogram, all buckets
28/// contain approximately the same number of rows.
29#[derive(Debug, Clone)]
30pub struct HistogramBucket {
31    /// Lower bound of the bucket (inclusive).
32    pub lower_bound: f64,
33    /// Upper bound of the bucket (exclusive, except for the last bucket).
34    pub upper_bound: f64,
35    /// Number of rows in this bucket.
36    pub frequency: u64,
37    /// Number of distinct values in this bucket.
38    pub distinct_count: u64,
39}
40
41impl HistogramBucket {
42    /// Creates a new histogram bucket.
43    #[must_use]
44    pub fn new(lower_bound: f64, upper_bound: f64, frequency: u64, distinct_count: u64) -> Self {
45        Self {
46            lower_bound,
47            upper_bound,
48            frequency,
49            distinct_count,
50        }
51    }
52
53    /// Returns the width of this bucket.
54    #[must_use]
55    pub fn width(&self) -> f64 {
56        self.upper_bound - self.lower_bound
57    }
58
59    /// Checks if a value falls within this bucket.
60    #[must_use]
61    pub fn contains(&self, value: f64) -> bool {
62        value >= self.lower_bound && value < self.upper_bound
63    }
64
65    /// Returns the fraction of this bucket covered by the given range.
66    #[must_use]
67    pub fn overlap_fraction(&self, lower: Option<f64>, upper: Option<f64>) -> f64 {
68        let effective_lower = lower.unwrap_or(self.lower_bound).max(self.lower_bound);
69        let effective_upper = upper.unwrap_or(self.upper_bound).min(self.upper_bound);
70
71        let bucket_width = self.width();
72        if bucket_width <= 0.0 {
73            return if effective_lower <= self.lower_bound && effective_upper >= self.upper_bound {
74                1.0
75            } else {
76                0.0
77            };
78        }
79
80        let overlap = (effective_upper - effective_lower).max(0.0);
81        (overlap / bucket_width).min(1.0)
82    }
83}
84
85/// An equi-depth histogram for selectivity estimation.
86///
87/// Equi-depth histograms partition data into buckets where each bucket
88/// contains approximately the same number of rows. This provides more
89/// accurate selectivity estimates than assuming uniform distribution,
90/// especially for skewed data.
91///
92/// # Example
93///
94/// ```no_run
95/// use grafeo_engine::query::optimizer::cardinality::EquiDepthHistogram;
96///
97/// // Build a histogram from sorted values
98/// let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0];
99/// let histogram = EquiDepthHistogram::build(&values, 4);
100///
101/// // Estimate selectivity for age > 25
102/// let selectivity = histogram.range_selectivity(Some(25.0), None);
103/// ```
104#[derive(Debug, Clone)]
105pub struct EquiDepthHistogram {
106    /// The histogram buckets, sorted by lower_bound.
107    buckets: Vec<HistogramBucket>,
108    /// Total number of rows represented by this histogram.
109    total_rows: u64,
110}
111
112impl EquiDepthHistogram {
113    /// Creates a new histogram from pre-built buckets.
114    #[must_use]
115    pub fn new(buckets: Vec<HistogramBucket>) -> Self {
116        let total_rows = buckets.iter().map(|b| b.frequency).sum();
117        Self {
118            buckets,
119            total_rows,
120        }
121    }
122
123    /// Builds an equi-depth histogram from a sorted slice of values.
124    ///
125    /// # Arguments
126    /// * `values` - A sorted slice of numeric values
127    /// * `num_buckets` - The desired number of buckets
128    ///
129    /// # Returns
130    /// An equi-depth histogram with approximately equal row counts per bucket.
131    #[must_use]
132    pub fn build(values: &[f64], num_buckets: usize) -> Self {
133        if values.is_empty() || num_buckets == 0 {
134            return Self {
135                buckets: Vec::new(),
136                total_rows: 0,
137            };
138        }
139
140        let num_buckets = num_buckets.min(values.len());
141        let rows_per_bucket = (values.len() + num_buckets - 1) / num_buckets;
142        let mut buckets = Vec::with_capacity(num_buckets);
143
144        let mut start_idx = 0;
145        while start_idx < values.len() {
146            let end_idx = (start_idx + rows_per_bucket).min(values.len());
147            let lower_bound = values[start_idx];
148            let upper_bound = if end_idx < values.len() {
149                values[end_idx]
150            } else {
151                // For the last bucket, extend slightly beyond the max value
152                values[end_idx - 1] + 1.0
153            };
154
155            // Count distinct values in this bucket
156            let bucket_values = &values[start_idx..end_idx];
157            let distinct_count = count_distinct(bucket_values);
158
159            buckets.push(HistogramBucket::new(
160                lower_bound,
161                upper_bound,
162                (end_idx - start_idx) as u64,
163                distinct_count,
164            ));
165
166            start_idx = end_idx;
167        }
168
169        Self::new(buckets)
170    }
171
172    /// Returns the number of buckets in this histogram.
173    #[must_use]
174    pub fn num_buckets(&self) -> usize {
175        self.buckets.len()
176    }
177
178    /// Returns the total number of rows represented.
179    #[must_use]
180    pub fn total_rows(&self) -> u64 {
181        self.total_rows
182    }
183
184    /// Returns the histogram buckets.
185    #[must_use]
186    pub fn buckets(&self) -> &[HistogramBucket] {
187        &self.buckets
188    }
189
190    /// Estimates selectivity for a range predicate.
191    ///
192    /// # Arguments
193    /// * `lower` - Lower bound (None for unbounded)
194    /// * `upper` - Upper bound (None for unbounded)
195    ///
196    /// # Returns
197    /// Estimated fraction of rows matching the range (0.0 to 1.0).
198    #[must_use]
199    pub fn range_selectivity(&self, lower: Option<f64>, upper: Option<f64>) -> f64 {
200        if self.buckets.is_empty() || self.total_rows == 0 {
201            return 0.33; // Default fallback
202        }
203
204        let mut matching_rows = 0.0;
205
206        for bucket in &self.buckets {
207            // Check if this bucket overlaps with the range
208            let bucket_lower = bucket.lower_bound;
209            let bucket_upper = bucket.upper_bound;
210
211            // Skip buckets entirely outside the range
212            if let Some(l) = lower
213                && bucket_upper <= l
214            {
215                continue;
216            }
217            if let Some(u) = upper
218                && bucket_lower >= u
219            {
220                continue;
221            }
222
223            // Calculate the fraction of this bucket covered by the range
224            let overlap = bucket.overlap_fraction(lower, upper);
225            matching_rows += overlap * bucket.frequency as f64;
226        }
227
228        (matching_rows / self.total_rows as f64).clamp(0.0, 1.0)
229    }
230
231    /// Estimates selectivity for an equality predicate.
232    ///
233    /// Uses the distinct count within matching buckets for better accuracy.
234    #[must_use]
235    pub fn equality_selectivity(&self, value: f64) -> f64 {
236        if self.buckets.is_empty() || self.total_rows == 0 {
237            return 0.01; // Default fallback
238        }
239
240        // Find the bucket containing this value
241        for bucket in &self.buckets {
242            if bucket.contains(value) {
243                // Assume uniform distribution within bucket
244                if bucket.distinct_count > 0 {
245                    return (bucket.frequency as f64
246                        / bucket.distinct_count as f64
247                        / self.total_rows as f64)
248                        .min(1.0);
249                }
250            }
251        }
252
253        // Value not in any bucket - very low selectivity
254        0.001
255    }
256
257    /// Gets the minimum value in the histogram.
258    #[must_use]
259    pub fn min_value(&self) -> Option<f64> {
260        self.buckets.first().map(|b| b.lower_bound)
261    }
262
263    /// Gets the maximum value in the histogram.
264    #[must_use]
265    pub fn max_value(&self) -> Option<f64> {
266        self.buckets.last().map(|b| b.upper_bound)
267    }
268}
269
270/// Counts distinct values in a sorted slice.
271/// Counts the number of top-level AND conjuncts in an expression.
272///
273/// For `(A AND B) AND (C AND D)` returns 4; for a single non-AND expression
274/// returns 1. Used to estimate selectivity of multi-predicate join conditions.
275fn count_and_conjuncts(expr: &LogicalExpression) -> usize {
276    match expr {
277        LogicalExpression::Binary {
278            op: BinaryOp::And,
279            left,
280            right,
281        } => count_and_conjuncts(left) + count_and_conjuncts(right),
282        _ => 1,
283    }
284}
285
286fn count_distinct(sorted_values: &[f64]) -> u64 {
287    if sorted_values.is_empty() {
288        return 0;
289    }
290
291    let mut count = 1u64;
292    let mut prev = sorted_values[0];
293
294    for &val in &sorted_values[1..] {
295        if (val - prev).abs() > f64::EPSILON {
296            count += 1;
297            prev = val;
298        }
299    }
300
301    count
302}
303
304/// Statistics for a table/label.
305#[derive(Debug, Clone)]
306pub struct TableStats {
307    /// Total number of rows.
308    pub row_count: u64,
309    /// Column statistics.
310    pub columns: HashMap<String, ColumnStats>,
311}
312
313impl TableStats {
314    /// Creates new table statistics.
315    #[must_use]
316    pub fn new(row_count: u64) -> Self {
317        Self {
318            row_count,
319            columns: HashMap::new(),
320        }
321    }
322
323    /// Adds column statistics.
324    pub fn with_column(mut self, name: &str, stats: ColumnStats) -> Self {
325        self.columns.insert(name.to_string(), stats);
326        self
327    }
328}
329
330/// Statistics for a column.
331#[derive(Debug, Clone)]
332pub struct ColumnStats {
333    /// Number of distinct values.
334    pub distinct_count: u64,
335    /// Number of null values.
336    pub null_count: u64,
337    /// Minimum value (if orderable).
338    pub min_value: Option<f64>,
339    /// Maximum value (if orderable).
340    pub max_value: Option<f64>,
341    /// Equi-depth histogram for accurate selectivity estimation.
342    pub histogram: Option<EquiDepthHistogram>,
343}
344
345impl ColumnStats {
346    /// Creates new column statistics.
347    #[must_use]
348    pub fn new(distinct_count: u64) -> Self {
349        Self {
350            distinct_count,
351            null_count: 0,
352            min_value: None,
353            max_value: None,
354            histogram: None,
355        }
356    }
357
358    /// Sets the null count.
359    #[must_use]
360    pub fn with_nulls(mut self, null_count: u64) -> Self {
361        self.null_count = null_count;
362        self
363    }
364
365    /// Sets the min/max range.
366    #[must_use]
367    pub fn with_range(mut self, min: f64, max: f64) -> Self {
368        self.min_value = Some(min);
369        self.max_value = Some(max);
370        self
371    }
372
373    /// Sets the equi-depth histogram for this column.
374    #[must_use]
375    pub fn with_histogram(mut self, histogram: EquiDepthHistogram) -> Self {
376        self.histogram = Some(histogram);
377        self
378    }
379
380    /// Builds column statistics with histogram from raw values.
381    ///
382    /// This is a convenience method that computes all statistics from the data.
383    ///
384    /// # Arguments
385    /// * `values` - The column values (will be sorted internally)
386    /// * `num_buckets` - Number of histogram buckets to create
387    #[must_use]
388    pub fn from_values(mut values: Vec<f64>, num_buckets: usize) -> Self {
389        if values.is_empty() {
390            return Self::new(0);
391        }
392
393        // Sort values for histogram building
394        values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
395
396        let min = values.first().copied();
397        let max = values.last().copied();
398        let distinct_count = count_distinct(&values);
399        let histogram = EquiDepthHistogram::build(&values, num_buckets);
400
401        Self {
402            distinct_count,
403            null_count: 0,
404            min_value: min,
405            max_value: max,
406            histogram: Some(histogram),
407        }
408    }
409}
410
411/// Configurable selectivity defaults for cardinality estimation.
412///
413/// Controls the assumed selectivity for various predicate types when
414/// histogram or column statistics are unavailable. Adjusting these
415/// values can improve plan quality for workloads with known skew.
416#[derive(Debug, Clone)]
417pub struct SelectivityConfig {
418    /// Selectivity for unknown predicates (default: 0.1).
419    pub default: f64,
420    /// Selectivity for equality predicates without stats (default: 0.01).
421    pub equality: f64,
422    /// Selectivity for inequality predicates (default: 0.99).
423    pub inequality: f64,
424    /// Selectivity for range predicates without stats (default: 0.33).
425    pub range: f64,
426    /// Selectivity for string operations: STARTS WITH, ENDS WITH, CONTAINS, LIKE (default: 0.1).
427    pub string_ops: f64,
428    /// Selectivity for IN membership (default: 0.1).
429    pub membership: f64,
430    /// Selectivity for IS NULL (default: 0.05).
431    pub is_null: f64,
432    /// Selectivity for IS NOT NULL (default: 0.95).
433    pub is_not_null: f64,
434    /// Fraction assumed distinct for DISTINCT operations (default: 0.5).
435    pub distinct_fraction: f64,
436}
437
438impl SelectivityConfig {
439    /// Creates a new config with standard database defaults.
440    #[must_use]
441    pub fn new() -> Self {
442        Self {
443            default: 0.1,
444            equality: 0.01,
445            inequality: 0.99,
446            range: 0.33,
447            string_ops: 0.1,
448            membership: 0.1,
449            is_null: 0.05,
450            is_not_null: 0.95,
451            distinct_fraction: 0.5,
452        }
453    }
454}
455
456impl Default for SelectivityConfig {
457    fn default() -> Self {
458        Self::new()
459    }
460}
461
462/// A single estimate-vs-actual observation for analysis.
463#[derive(Debug, Clone)]
464pub struct EstimationEntry {
465    /// Human-readable label for the operator (e.g., "NodeScan(Person)").
466    pub operator: String,
467    /// The cardinality estimate produced by the optimizer.
468    pub estimated: f64,
469    /// The actual row count observed at execution time.
470    pub actual: f64,
471}
472
473impl EstimationEntry {
474    /// Returns the estimation error ratio (actual / estimated).
475    ///
476    /// Values near 1.0 indicate accurate estimates.
477    /// Values > 1.0 indicate underestimation.
478    /// Values < 1.0 indicate overestimation.
479    #[must_use]
480    pub fn error_ratio(&self) -> f64 {
481        if self.estimated.abs() < f64::EPSILON {
482            if self.actual.abs() < f64::EPSILON {
483                1.0
484            } else {
485                f64::INFINITY
486            }
487        } else {
488            self.actual / self.estimated
489        }
490    }
491}
492
493/// Collects estimate vs actual cardinality data for query plan analysis.
494///
495/// After executing a query, call [`record()`](Self::record) for each
496/// operator with its estimated and actual cardinalities. Then use
497/// [`should_replan()`](Self::should_replan) to decide whether the plan
498/// should be re-optimized.
499#[derive(Debug, Clone, Default)]
500pub struct EstimationLog {
501    /// Recorded entries.
502    entries: Vec<EstimationEntry>,
503    /// Error ratio threshold that triggers re-planning (default: 10.0).
504    ///
505    /// If any operator's error ratio exceeds this, `should_replan()` returns true.
506    replan_threshold: f64,
507}
508
509impl EstimationLog {
510    /// Creates a new estimation log with the given re-planning threshold.
511    #[must_use]
512    pub fn new(replan_threshold: f64) -> Self {
513        Self {
514            entries: Vec::new(),
515            replan_threshold,
516        }
517    }
518
519    /// Records an estimate-vs-actual observation.
520    pub fn record(&mut self, operator: impl Into<String>, estimated: f64, actual: f64) {
521        self.entries.push(EstimationEntry {
522            operator: operator.into(),
523            estimated,
524            actual,
525        });
526    }
527
528    /// Returns all recorded entries.
529    #[must_use]
530    pub fn entries(&self) -> &[EstimationEntry] {
531        &self.entries
532    }
533
534    /// Returns whether any operator's estimation error exceeds the threshold,
535    /// indicating the plan should be re-optimized.
536    #[must_use]
537    pub fn should_replan(&self) -> bool {
538        self.entries.iter().any(|e| {
539            let ratio = e.error_ratio();
540            ratio > self.replan_threshold || ratio < 1.0 / self.replan_threshold
541        })
542    }
543
544    /// Returns the maximum error ratio across all entries.
545    #[must_use]
546    pub fn max_error_ratio(&self) -> f64 {
547        self.entries
548            .iter()
549            .map(|e| {
550                let r = e.error_ratio();
551                // Normalize so both over- and under-estimation are > 1.0
552                if r < 1.0 { 1.0 / r } else { r }
553            })
554            .fold(1.0_f64, f64::max)
555    }
556
557    /// Clears all entries.
558    pub fn clear(&mut self) {
559        self.entries.clear();
560    }
561}
562
563/// Cardinality estimator.
564pub struct CardinalityEstimator {
565    /// Statistics for each label/table.
566    table_stats: HashMap<String, TableStats>,
567    /// Default row count for unknown tables.
568    default_row_count: u64,
569    /// Default selectivity for unknown predicates.
570    default_selectivity: f64,
571    /// Average edge fanout (outgoing edges per node).
572    avg_fanout: f64,
573    /// Configurable selectivity defaults.
574    selectivity_config: SelectivityConfig,
575    /// RDF statistics for triple pattern cardinality estimation.
576    rdf_statistics: Option<grafeo_core::statistics::RdfStatistics>,
577}
578
579impl CardinalityEstimator {
580    /// Creates a new cardinality estimator with default settings.
581    #[must_use]
582    pub fn new() -> Self {
583        let config = SelectivityConfig::new();
584        Self {
585            table_stats: HashMap::new(),
586            default_row_count: 1000,
587            default_selectivity: config.default,
588            avg_fanout: 10.0,
589            selectivity_config: config,
590            rdf_statistics: None,
591        }
592    }
593
594    /// Creates a new cardinality estimator with custom selectivity configuration.
595    #[must_use]
596    pub fn with_selectivity_config(config: SelectivityConfig) -> Self {
597        Self {
598            table_stats: HashMap::new(),
599            default_row_count: 1000,
600            default_selectivity: config.default,
601            avg_fanout: 10.0,
602            selectivity_config: config,
603            rdf_statistics: None,
604        }
605    }
606
607    /// Returns the current selectivity configuration.
608    #[must_use]
609    pub fn selectivity_config(&self) -> &SelectivityConfig {
610        &self.selectivity_config
611    }
612
613    /// Creates an estimation log with the default re-planning threshold (10x).
614    #[must_use]
615    pub fn create_estimation_log() -> EstimationLog {
616        EstimationLog::new(10.0)
617    }
618
619    /// Creates a cardinality estimator pre-populated from store statistics.
620    ///
621    /// Maps `LabelStatistics` to `TableStats` and computes the average edge
622    /// fanout from `EdgeTypeStatistics`. Falls back to defaults for any
623    /// missing statistics.
624    #[must_use]
625    pub fn from_statistics(stats: &grafeo_core::statistics::Statistics) -> Self {
626        let mut estimator = Self::new();
627
628        // Use total node count as default for unlabeled scans
629        if stats.total_nodes > 0 {
630            estimator.default_row_count = stats.total_nodes;
631        }
632
633        // Convert label statistics to optimizer table stats
634        for (label, label_stats) in &stats.labels {
635            let mut table_stats = TableStats::new(label_stats.node_count);
636
637            // Map property statistics (distinct count for selectivity estimation)
638            for (prop, col_stats) in &label_stats.properties {
639                let optimizer_col =
640                    ColumnStats::new(col_stats.distinct_count).with_nulls(col_stats.null_count);
641                table_stats = table_stats.with_column(prop, optimizer_col);
642            }
643
644            estimator.add_table_stats(label, table_stats);
645        }
646
647        // Compute average fanout from edge type statistics
648        if !stats.edge_types.is_empty() {
649            let total_out_degree: f64 = stats.edge_types.values().map(|e| e.avg_out_degree).sum();
650            estimator.avg_fanout = total_out_degree / stats.edge_types.len() as f64;
651        } else if stats.total_nodes > 0 {
652            estimator.avg_fanout = stats.total_edges as f64 / stats.total_nodes as f64;
653        }
654
655        // Clamp fanout to a reasonable minimum
656        if estimator.avg_fanout < 1.0 {
657            estimator.avg_fanout = 1.0;
658        }
659
660        estimator
661    }
662
663    /// Creates a cardinality estimator from RDF statistics.
664    ///
665    /// Uses triple pattern cardinality estimates for `TripleScan` operators
666    /// and join selectivity from per-predicate statistics.
667    #[must_use]
668    pub fn from_rdf_statistics(rdf_stats: grafeo_core::statistics::RdfStatistics) -> Self {
669        let mut estimator = Self::new();
670        if rdf_stats.total_triples > 0 {
671            estimator.default_row_count = rdf_stats.total_triples;
672        }
673        estimator.rdf_statistics = Some(rdf_stats);
674        estimator
675    }
676
677    /// Adds statistics for a table/label.
678    pub fn add_table_stats(&mut self, name: &str, stats: TableStats) {
679        self.table_stats.insert(name.to_string(), stats);
680    }
681
682    /// Sets the average edge fanout.
683    pub fn set_avg_fanout(&mut self, fanout: f64) {
684        self.avg_fanout = fanout;
685    }
686
687    /// Estimates the cardinality of a logical operator.
688    #[must_use]
689    pub fn estimate(&self, op: &LogicalOperator) -> f64 {
690        match op {
691            LogicalOperator::NodeScan(scan) => self.estimate_node_scan(scan),
692            LogicalOperator::Filter(filter) => self.estimate_filter(filter),
693            LogicalOperator::Project(project) => self.estimate_project(project),
694            LogicalOperator::Expand(expand) => self.estimate_expand(expand),
695            LogicalOperator::Join(join) => self.estimate_join(join),
696            LogicalOperator::Aggregate(agg) => self.estimate_aggregate(agg),
697            LogicalOperator::Sort(sort) => self.estimate_sort(sort),
698            LogicalOperator::Distinct(distinct) => self.estimate_distinct(distinct),
699            LogicalOperator::Limit(limit) => self.estimate_limit(limit),
700            LogicalOperator::Skip(skip) => self.estimate_skip(skip),
701            LogicalOperator::Return(ret) => self.estimate(&ret.input),
702            LogicalOperator::Empty => 0.0,
703            LogicalOperator::VectorScan(scan) => self.estimate_vector_scan(scan),
704            LogicalOperator::VectorJoin(join) => self.estimate_vector_join(join),
705            LogicalOperator::MultiWayJoin(mwj) => self.estimate_multi_way_join(mwj),
706            LogicalOperator::LeftJoin(lj) => self.estimate_left_join(lj),
707            LogicalOperator::TripleScan(scan) => self.estimate_triple_scan(scan),
708            _ => self.default_row_count as f64,
709        }
710    }
711
712    /// Estimates node scan cardinality.
713    fn estimate_node_scan(&self, scan: &NodeScanOp) -> f64 {
714        if let Some(label) = &scan.label
715            && let Some(stats) = self.table_stats.get(label)
716        {
717            return stats.row_count as f64;
718        }
719        // No label filter - scan all nodes
720        self.default_row_count as f64
721    }
722
723    /// Estimates triple scan cardinality using RDF statistics.
724    ///
725    /// If RDF statistics are available, uses the pattern binding (which positions
726    /// are bound vs variable) to produce accurate estimates. Otherwise falls back
727    /// to the default row count.
728    fn estimate_triple_scan(&self, scan: &TripleScanOp) -> f64 {
729        // If there's an input, the triple scan is chained: multiply input cardinality
730        // by the per-row expansion factor.
731        let base = if let Some(ref input) = scan.input {
732            self.estimate(input)
733        } else {
734            1.0
735        };
736
737        let Some(rdf_stats) = &self.rdf_statistics else {
738            return if scan.input.is_some() {
739                base * self.default_row_count as f64
740            } else {
741                self.default_row_count as f64
742            };
743        };
744
745        let subject_bound = matches!(
746            scan.subject,
747            TripleComponent::Iri(_) | TripleComponent::Literal(_)
748        );
749        let object_bound = matches!(
750            scan.object,
751            TripleComponent::Iri(_) | TripleComponent::Literal(_)
752        );
753        let predicate_iri = match &scan.predicate {
754            TripleComponent::Iri(iri) => Some(iri.as_str()),
755            _ => None,
756        };
757
758        let pattern_card = rdf_stats.estimate_triple_pattern_cardinality(
759            subject_bound,
760            predicate_iri,
761            object_bound,
762        );
763
764        if scan.input.is_some() {
765            // Chained scan: each input row expands by the pattern's selectivity
766            let selectivity = if rdf_stats.total_triples > 0 {
767                pattern_card / rdf_stats.total_triples as f64
768            } else {
769                1.0
770            };
771            (base * pattern_card * selectivity).max(1.0)
772        } else {
773            pattern_card.max(1.0)
774        }
775    }
776
777    /// Estimates filter cardinality.
778    fn estimate_filter(&self, filter: &FilterOp) -> f64 {
779        let input_cardinality = self.estimate(&filter.input);
780        let selectivity = self.estimate_selectivity(&filter.predicate);
781        (input_cardinality * selectivity).max(1.0)
782    }
783
784    /// Estimates projection cardinality (same as input).
785    fn estimate_project(&self, project: &ProjectOp) -> f64 {
786        self.estimate(&project.input)
787    }
788
789    /// Estimates expand cardinality.
790    fn estimate_expand(&self, expand: &ExpandOp) -> f64 {
791        let input_cardinality = self.estimate(&expand.input);
792
793        // Apply fanout based on edge type
794        let fanout = if !expand.edge_types.is_empty() {
795            // Specific edge type(s) typically have lower fanout
796            self.avg_fanout * 0.5
797        } else {
798            self.avg_fanout
799        };
800
801        // Handle variable-length paths
802        let path_multiplier = if expand.max_hops.unwrap_or(1) > 1 {
803            let min = expand.min_hops as f64;
804            let max = expand.max_hops.unwrap_or(expand.min_hops + 3) as f64;
805            // Geometric series approximation
806            (fanout.powf(max + 1.0) - fanout.powf(min)) / (fanout - 1.0)
807        } else {
808            fanout
809        };
810
811        (input_cardinality * path_multiplier).max(1.0)
812    }
813
814    /// Estimates join cardinality.
815    fn estimate_join(&self, join: &JoinOp) -> f64 {
816        let left_card = self.estimate(&join.left);
817        let right_card = self.estimate(&join.right);
818
819        match join.join_type {
820            JoinType::Cross => left_card * right_card,
821            JoinType::Inner => {
822                // Assume join selectivity based on conditions
823                let selectivity = if join.conditions.is_empty() {
824                    1.0 // Cross join
825                } else {
826                    // Estimate based on number of conditions
827                    0.1_f64.powi(join.conditions.len() as i32)
828                };
829                (left_card * right_card * selectivity).max(1.0)
830            }
831            JoinType::Left => {
832                // Left join returns at least all left rows
833                let inner_card = self.estimate_join(&JoinOp {
834                    left: join.left.clone(),
835                    right: join.right.clone(),
836                    join_type: JoinType::Inner,
837                    conditions: join.conditions.clone(),
838                });
839                inner_card.max(left_card)
840            }
841            JoinType::Right => {
842                // Right join returns at least all right rows
843                let inner_card = self.estimate_join(&JoinOp {
844                    left: join.left.clone(),
845                    right: join.right.clone(),
846                    join_type: JoinType::Inner,
847                    conditions: join.conditions.clone(),
848                });
849                inner_card.max(right_card)
850            }
851            JoinType::Full => {
852                // Full join returns at least max(left, right)
853                let inner_card = self.estimate_join(&JoinOp {
854                    left: join.left.clone(),
855                    right: join.right.clone(),
856                    join_type: JoinType::Inner,
857                    conditions: join.conditions.clone(),
858                });
859                inner_card.max(left_card.max(right_card))
860            }
861            JoinType::Semi => {
862                // Semi join returns at most left cardinality
863                (left_card * self.default_selectivity).max(1.0)
864            }
865            JoinType::Anti => {
866                // Anti join returns at most left cardinality
867                (left_card * (1.0 - self.default_selectivity)).max(1.0)
868            }
869        }
870    }
871
872    /// Estimates left join cardinality (OPTIONAL MATCH).
873    ///
874    /// A left outer join preserves all left rows, so the output is at least
875    /// `left_cardinality`. When the right side matches, the output may be
876    /// larger (one left row can match multiple right rows).
877    ///
878    /// When the join carries a cross-side condition (null-safe predicates),
879    /// each AND-conjunct reduces the selectivity estimate.
880    fn estimate_left_join(&self, lj: &LeftJoinOp) -> f64 {
881        let left_card = self.estimate(&lj.left);
882        let right_card = self.estimate(&lj.right);
883
884        // Adjust selectivity based on the number of AND conjuncts in the
885        // cross-side condition: each equality reduces match probability.
886        let condition_selectivity = if let Some(cond) = &lj.condition {
887            let n = count_and_conjuncts(cond);
888            self.default_selectivity.powi(n as i32)
889        } else {
890            self.default_selectivity
891        };
892
893        // Estimate as inner join cardinality, but guaranteed at least left_card
894        let inner_estimate = left_card * right_card * condition_selectivity;
895        inner_estimate.max(left_card).max(1.0)
896    }
897
898    /// Estimates aggregation cardinality.
899    fn estimate_aggregate(&self, agg: &AggregateOp) -> f64 {
900        let input_cardinality = self.estimate(&agg.input);
901
902        if agg.group_by.is_empty() {
903            // Global aggregation - single row
904            1.0
905        } else {
906            // Group by - estimate distinct groups
907            // Assume each group key reduces cardinality by 10
908            let group_reduction = 10.0_f64.powi(agg.group_by.len() as i32);
909            (input_cardinality / group_reduction).max(1.0)
910        }
911    }
912
913    /// Estimates sort cardinality (same as input).
914    fn estimate_sort(&self, sort: &SortOp) -> f64 {
915        self.estimate(&sort.input)
916    }
917
918    /// Estimates distinct cardinality.
919    fn estimate_distinct(&self, distinct: &DistinctOp) -> f64 {
920        let input_cardinality = self.estimate(&distinct.input);
921        (input_cardinality * self.selectivity_config.distinct_fraction).max(1.0)
922    }
923
924    /// Estimates limit cardinality.
925    fn estimate_limit(&self, limit: &LimitOp) -> f64 {
926        let input_cardinality = self.estimate(&limit.input);
927        limit.count.estimate().min(input_cardinality)
928    }
929
930    /// Estimates skip cardinality.
931    fn estimate_skip(&self, skip: &SkipOp) -> f64 {
932        let input_cardinality = self.estimate(&skip.input);
933        (input_cardinality - skip.count.estimate()).max(0.0)
934    }
935
936    /// Estimates vector scan cardinality.
937    ///
938    /// Vector scan returns at most k results (the k nearest neighbors).
939    /// With similarity/distance filters, it may return fewer.
940    fn estimate_vector_scan(&self, scan: &VectorScanOp) -> f64 {
941        let base_k = scan.k as f64;
942
943        // Apply filter selectivity if thresholds are specified
944        let selectivity = if scan.min_similarity.is_some() || scan.max_distance.is_some() {
945            // Assume 70% of results pass threshold filters
946            0.7
947        } else {
948            1.0
949        };
950
951        (base_k * selectivity).max(1.0)
952    }
953
954    /// Estimates vector join cardinality.
955    ///
956    /// Vector join produces up to k results per input row.
957    fn estimate_vector_join(&self, join: &VectorJoinOp) -> f64 {
958        let input_cardinality = self.estimate(&join.input);
959        let k = join.k as f64;
960
961        // Apply filter selectivity if thresholds are specified
962        let selectivity = if join.min_similarity.is_some() || join.max_distance.is_some() {
963            0.7
964        } else {
965            1.0
966        };
967
968        (input_cardinality * k * selectivity).max(1.0)
969    }
970
971    /// Estimates multi-way join cardinality using the AGM bound heuristic.
972    ///
973    /// For a cyclic join of N relations, the AGM (Atserias-Grohe-Marx) bound
974    /// gives min(cardinality)^(N/2) as a worst-case output size estimate.
975    fn estimate_multi_way_join(&self, mwj: &MultiWayJoinOp) -> f64 {
976        if mwj.inputs.is_empty() {
977            return 0.0;
978        }
979        let cardinalities: Vec<f64> = mwj
980            .inputs
981            .iter()
982            .map(|input| self.estimate(input))
983            .collect();
984        let min_card = cardinalities.iter().copied().fold(f64::INFINITY, f64::min);
985        let n = cardinalities.len() as f64;
986        // AGM bound: min(cardinality)^(n/2)
987        (min_card.powf(n / 2.0)).max(1.0)
988    }
989
990    /// Estimates the selectivity of a predicate (0.0 to 1.0).
991    fn estimate_selectivity(&self, expr: &LogicalExpression) -> f64 {
992        match expr {
993            LogicalExpression::Binary { left, op, right } => {
994                self.estimate_binary_selectivity(left, *op, right)
995            }
996            LogicalExpression::Unary { op, operand } => {
997                self.estimate_unary_selectivity(*op, operand)
998            }
999            LogicalExpression::Literal(value) => {
1000                // Boolean literal
1001                if let grafeo_common::types::Value::Bool(b) = value {
1002                    if *b { 1.0 } else { 0.0 }
1003                } else {
1004                    self.default_selectivity
1005                }
1006            }
1007            _ => self.default_selectivity,
1008        }
1009    }
1010
1011    /// Estimates binary expression selectivity.
1012    fn estimate_binary_selectivity(
1013        &self,
1014        left: &LogicalExpression,
1015        op: BinaryOp,
1016        right: &LogicalExpression,
1017    ) -> f64 {
1018        match op {
1019            // Equality - try histogram-based estimation
1020            BinaryOp::Eq => {
1021                if let Some(selectivity) = self.try_equality_selectivity(left, right) {
1022                    return selectivity;
1023                }
1024                self.selectivity_config.equality
1025            }
1026            // Inequality is very unselective
1027            BinaryOp::Ne => self.selectivity_config.inequality,
1028            // Range predicates - use histogram if available
1029            BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1030                if let Some(selectivity) = self.try_range_selectivity(left, op, right) {
1031                    return selectivity;
1032                }
1033                self.selectivity_config.range
1034            }
1035            // Logical operators - recursively estimate sub-expressions
1036            BinaryOp::And => {
1037                let left_sel = self.estimate_selectivity(left);
1038                let right_sel = self.estimate_selectivity(right);
1039                // AND reduces selectivity (multiply assuming independence)
1040                left_sel * right_sel
1041            }
1042            BinaryOp::Or => {
1043                let left_sel = self.estimate_selectivity(left);
1044                let right_sel = self.estimate_selectivity(right);
1045                // OR: P(A ∪ B) = P(A) + P(B) - P(A ∩ B)
1046                // Assuming independence: P(A ∩ B) = P(A) * P(B)
1047                (left_sel + right_sel - left_sel * right_sel).min(1.0)
1048            }
1049            // String operations
1050            BinaryOp::StartsWith | BinaryOp::EndsWith | BinaryOp::Contains | BinaryOp::Like => {
1051                self.selectivity_config.string_ops
1052            }
1053            // Collection membership
1054            BinaryOp::In => self.selectivity_config.membership,
1055            // Other operations
1056            _ => self.default_selectivity,
1057        }
1058    }
1059
1060    /// Tries to estimate equality selectivity using histograms.
1061    fn try_equality_selectivity(
1062        &self,
1063        left: &LogicalExpression,
1064        right: &LogicalExpression,
1065    ) -> Option<f64> {
1066        // Extract property access and literal value
1067        let (label, column, value) = self.extract_column_and_value(left, right)?;
1068
1069        // Get column stats with histogram
1070        let stats = self.get_column_stats(&label, &column)?;
1071
1072        // Try histogram-based estimation
1073        if let Some(ref histogram) = stats.histogram {
1074            return Some(histogram.equality_selectivity(value));
1075        }
1076
1077        // Fall back to distinct count estimation
1078        if stats.distinct_count > 0 {
1079            return Some(1.0 / stats.distinct_count as f64);
1080        }
1081
1082        None
1083    }
1084
1085    /// Tries to estimate range selectivity using histograms.
1086    fn try_range_selectivity(
1087        &self,
1088        left: &LogicalExpression,
1089        op: BinaryOp,
1090        right: &LogicalExpression,
1091    ) -> Option<f64> {
1092        // Extract property access and literal value
1093        let (label, column, value) = self.extract_column_and_value(left, right)?;
1094
1095        // Get column stats
1096        let stats = self.get_column_stats(&label, &column)?;
1097
1098        // Determine the range based on operator
1099        let (lower, upper) = match op {
1100            BinaryOp::Lt => (None, Some(value)),
1101            BinaryOp::Le => (None, Some(value + f64::EPSILON)),
1102            BinaryOp::Gt => (Some(value + f64::EPSILON), None),
1103            BinaryOp::Ge => (Some(value), None),
1104            _ => return None,
1105        };
1106
1107        // Try histogram-based estimation first
1108        if let Some(ref histogram) = stats.histogram {
1109            return Some(histogram.range_selectivity(lower, upper));
1110        }
1111
1112        // Fall back to min/max range estimation
1113        if let (Some(min), Some(max)) = (stats.min_value, stats.max_value) {
1114            let range = max - min;
1115            if range <= 0.0 {
1116                return Some(1.0);
1117            }
1118
1119            let effective_lower = lower.unwrap_or(min).max(min);
1120            let effective_upper = upper.unwrap_or(max).min(max);
1121            let overlap = (effective_upper - effective_lower).max(0.0);
1122            return Some((overlap / range).clamp(0.0, 1.0));
1123        }
1124
1125        None
1126    }
1127
1128    /// Extracts column information and literal value from a comparison.
1129    ///
1130    /// Returns (label, column_name, numeric_value) if the expression is
1131    /// a comparison between a property access and a numeric literal.
1132    fn extract_column_and_value(
1133        &self,
1134        left: &LogicalExpression,
1135        right: &LogicalExpression,
1136    ) -> Option<(String, String, f64)> {
1137        // Try left as property, right as literal
1138        if let Some(result) = self.try_extract_property_literal(left, right) {
1139            return Some(result);
1140        }
1141
1142        // Try right as property, left as literal
1143        self.try_extract_property_literal(right, left)
1144    }
1145
1146    /// Tries to extract property and literal from a specific ordering.
1147    fn try_extract_property_literal(
1148        &self,
1149        property_expr: &LogicalExpression,
1150        literal_expr: &LogicalExpression,
1151    ) -> Option<(String, String, f64)> {
1152        // Extract property access
1153        let (variable, property) = match property_expr {
1154            LogicalExpression::Property { variable, property } => {
1155                (variable.clone(), property.clone())
1156            }
1157            _ => return None,
1158        };
1159
1160        // Extract numeric literal
1161        let value = match literal_expr {
1162            LogicalExpression::Literal(grafeo_common::types::Value::Int64(n)) => *n as f64,
1163            LogicalExpression::Literal(grafeo_common::types::Value::Float64(f)) => *f,
1164            _ => return None,
1165        };
1166
1167        // Try to find a label for this variable from table stats
1168        // Use the variable name as a heuristic label lookup
1169        // In practice, the optimizer would track which labels variables are bound to
1170        for label in self.table_stats.keys() {
1171            if let Some(stats) = self.table_stats.get(label)
1172                && stats.columns.contains_key(&property)
1173            {
1174                return Some((label.clone(), property, value));
1175            }
1176        }
1177
1178        // If no stats found but we have the property, return with variable as label
1179        Some((variable, property, value))
1180    }
1181
1182    /// Estimates unary expression selectivity.
1183    fn estimate_unary_selectivity(&self, op: UnaryOp, _operand: &LogicalExpression) -> f64 {
1184        match op {
1185            UnaryOp::Not => 1.0 - self.default_selectivity,
1186            UnaryOp::IsNull => self.selectivity_config.is_null,
1187            UnaryOp::IsNotNull => self.selectivity_config.is_not_null,
1188            UnaryOp::Neg => 1.0, // Negation doesn't change cardinality
1189        }
1190    }
1191
1192    /// Gets statistics for a column.
1193    fn get_column_stats(&self, label: &str, column: &str) -> Option<&ColumnStats> {
1194        self.table_stats.get(label)?.columns.get(column)
1195    }
1196}
1197
1198impl Default for CardinalityEstimator {
1199    fn default() -> Self {
1200        Self::new()
1201    }
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206    use super::*;
1207    use crate::query::plan::{
1208        DistinctOp, ExpandDirection, ExpandOp, FilterOp, JoinCondition, NodeScanOp, PathMode,
1209        ProjectOp, Projection, ReturnItem, ReturnOp, SkipOp, SortKey, SortOp, SortOrder,
1210    };
1211    use grafeo_common::types::Value;
1212
1213    #[test]
1214    fn test_node_scan_with_stats() {
1215        let mut estimator = CardinalityEstimator::new();
1216        estimator.add_table_stats("Person", TableStats::new(5000));
1217
1218        let scan = LogicalOperator::NodeScan(NodeScanOp {
1219            variable: "n".to_string(),
1220            label: Some("Person".to_string()),
1221            input: None,
1222        });
1223
1224        let cardinality = estimator.estimate(&scan);
1225        assert!((cardinality - 5000.0).abs() < 0.001);
1226    }
1227
1228    #[test]
1229    fn test_filter_reduces_cardinality() {
1230        let mut estimator = CardinalityEstimator::new();
1231        estimator.add_table_stats("Person", TableStats::new(1000));
1232
1233        let filter = LogicalOperator::Filter(FilterOp {
1234            predicate: LogicalExpression::Binary {
1235                left: Box::new(LogicalExpression::Property {
1236                    variable: "n".to_string(),
1237                    property: "age".to_string(),
1238                }),
1239                op: BinaryOp::Eq,
1240                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1241            },
1242            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1243                variable: "n".to_string(),
1244                label: Some("Person".to_string()),
1245                input: None,
1246            })),
1247            pushdown_hint: None,
1248        });
1249
1250        let cardinality = estimator.estimate(&filter);
1251        // Equality selectivity is 0.01, so 1000 * 0.01 = 10
1252        assert!(cardinality < 1000.0);
1253        assert!(cardinality >= 1.0);
1254    }
1255
1256    #[test]
1257    fn test_join_cardinality() {
1258        let mut estimator = CardinalityEstimator::new();
1259        estimator.add_table_stats("Person", TableStats::new(1000));
1260        estimator.add_table_stats("Company", TableStats::new(100));
1261
1262        let join = LogicalOperator::Join(JoinOp {
1263            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1264                variable: "p".to_string(),
1265                label: Some("Person".to_string()),
1266                input: None,
1267            })),
1268            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1269                variable: "c".to_string(),
1270                label: Some("Company".to_string()),
1271                input: None,
1272            })),
1273            join_type: JoinType::Inner,
1274            conditions: vec![JoinCondition {
1275                left: LogicalExpression::Property {
1276                    variable: "p".to_string(),
1277                    property: "company_id".to_string(),
1278                },
1279                right: LogicalExpression::Property {
1280                    variable: "c".to_string(),
1281                    property: "id".to_string(),
1282                },
1283            }],
1284        });
1285
1286        let cardinality = estimator.estimate(&join);
1287        // Should be less than cross product
1288        assert!(cardinality < 1000.0 * 100.0);
1289    }
1290
1291    #[test]
1292    fn test_limit_caps_cardinality() {
1293        let mut estimator = CardinalityEstimator::new();
1294        estimator.add_table_stats("Person", TableStats::new(1000));
1295
1296        let limit = LogicalOperator::Limit(LimitOp {
1297            count: 10.into(),
1298            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1299                variable: "n".to_string(),
1300                label: Some("Person".to_string()),
1301                input: None,
1302            })),
1303        });
1304
1305        let cardinality = estimator.estimate(&limit);
1306        assert!((cardinality - 10.0).abs() < 0.001);
1307    }
1308
1309    #[test]
1310    fn test_aggregate_reduces_cardinality() {
1311        let mut estimator = CardinalityEstimator::new();
1312        estimator.add_table_stats("Person", TableStats::new(1000));
1313
1314        // Global aggregation
1315        let global_agg = LogicalOperator::Aggregate(AggregateOp {
1316            group_by: vec![],
1317            aggregates: vec![],
1318            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1319                variable: "n".to_string(),
1320                label: Some("Person".to_string()),
1321                input: None,
1322            })),
1323            having: None,
1324        });
1325
1326        let cardinality = estimator.estimate(&global_agg);
1327        assert!((cardinality - 1.0).abs() < 0.001);
1328
1329        // Group by aggregation
1330        let group_agg = LogicalOperator::Aggregate(AggregateOp {
1331            group_by: vec![LogicalExpression::Property {
1332                variable: "n".to_string(),
1333                property: "city".to_string(),
1334            }],
1335            aggregates: vec![],
1336            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1337                variable: "n".to_string(),
1338                label: Some("Person".to_string()),
1339                input: None,
1340            })),
1341            having: None,
1342        });
1343
1344        let cardinality = estimator.estimate(&group_agg);
1345        // Should be less than input
1346        assert!(cardinality < 1000.0);
1347    }
1348
1349    #[test]
1350    fn test_node_scan_without_stats() {
1351        let estimator = CardinalityEstimator::new();
1352
1353        let scan = LogicalOperator::NodeScan(NodeScanOp {
1354            variable: "n".to_string(),
1355            label: Some("Unknown".to_string()),
1356            input: None,
1357        });
1358
1359        let cardinality = estimator.estimate(&scan);
1360        // Should return default (1000)
1361        assert!((cardinality - 1000.0).abs() < 0.001);
1362    }
1363
1364    #[test]
1365    fn test_node_scan_no_label() {
1366        let estimator = CardinalityEstimator::new();
1367
1368        let scan = LogicalOperator::NodeScan(NodeScanOp {
1369            variable: "n".to_string(),
1370            label: None,
1371            input: None,
1372        });
1373
1374        let cardinality = estimator.estimate(&scan);
1375        // Should scan all nodes (default)
1376        assert!((cardinality - 1000.0).abs() < 0.001);
1377    }
1378
1379    #[test]
1380    fn test_filter_inequality_selectivity() {
1381        let mut estimator = CardinalityEstimator::new();
1382        estimator.add_table_stats("Person", TableStats::new(1000));
1383
1384        let filter = LogicalOperator::Filter(FilterOp {
1385            predicate: LogicalExpression::Binary {
1386                left: Box::new(LogicalExpression::Property {
1387                    variable: "n".to_string(),
1388                    property: "age".to_string(),
1389                }),
1390                op: BinaryOp::Ne,
1391                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1392            },
1393            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1394                variable: "n".to_string(),
1395                label: Some("Person".to_string()),
1396                input: None,
1397            })),
1398            pushdown_hint: None,
1399        });
1400
1401        let cardinality = estimator.estimate(&filter);
1402        // Inequality selectivity is 0.99, so 1000 * 0.99 = 990
1403        assert!(cardinality > 900.0);
1404    }
1405
1406    #[test]
1407    fn test_filter_range_selectivity() {
1408        let mut estimator = CardinalityEstimator::new();
1409        estimator.add_table_stats("Person", TableStats::new(1000));
1410
1411        let filter = LogicalOperator::Filter(FilterOp {
1412            predicate: LogicalExpression::Binary {
1413                left: Box::new(LogicalExpression::Property {
1414                    variable: "n".to_string(),
1415                    property: "age".to_string(),
1416                }),
1417                op: BinaryOp::Gt,
1418                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1419            },
1420            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1421                variable: "n".to_string(),
1422                label: Some("Person".to_string()),
1423                input: None,
1424            })),
1425            pushdown_hint: None,
1426        });
1427
1428        let cardinality = estimator.estimate(&filter);
1429        // Range selectivity is 0.33, so 1000 * 0.33 = 330
1430        assert!(cardinality < 500.0);
1431        assert!(cardinality > 100.0);
1432    }
1433
1434    #[test]
1435    fn test_filter_and_selectivity() {
1436        let mut estimator = CardinalityEstimator::new();
1437        estimator.add_table_stats("Person", TableStats::new(1000));
1438
1439        // Test AND with two equality predicates
1440        // Each equality has selectivity 0.01, so AND gives 0.01 * 0.01 = 0.0001
1441        let filter = LogicalOperator::Filter(FilterOp {
1442            predicate: LogicalExpression::Binary {
1443                left: Box::new(LogicalExpression::Binary {
1444                    left: Box::new(LogicalExpression::Property {
1445                        variable: "n".to_string(),
1446                        property: "city".to_string(),
1447                    }),
1448                    op: BinaryOp::Eq,
1449                    right: Box::new(LogicalExpression::Literal(Value::String("NYC".into()))),
1450                }),
1451                op: BinaryOp::And,
1452                right: Box::new(LogicalExpression::Binary {
1453                    left: Box::new(LogicalExpression::Property {
1454                        variable: "n".to_string(),
1455                        property: "age".to_string(),
1456                    }),
1457                    op: BinaryOp::Eq,
1458                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1459                }),
1460            },
1461            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1462                variable: "n".to_string(),
1463                label: Some("Person".to_string()),
1464                input: None,
1465            })),
1466            pushdown_hint: None,
1467        });
1468
1469        let cardinality = estimator.estimate(&filter);
1470        // AND reduces selectivity (multiply): 0.01 * 0.01 = 0.0001
1471        // 1000 * 0.0001 = 0.1, min is 1.0
1472        assert!(cardinality < 100.0);
1473        assert!(cardinality >= 1.0);
1474    }
1475
1476    #[test]
1477    fn test_filter_or_selectivity() {
1478        let mut estimator = CardinalityEstimator::new();
1479        estimator.add_table_stats("Person", TableStats::new(1000));
1480
1481        // Test OR with two equality predicates
1482        // Each equality has selectivity 0.01
1483        // OR gives: 0.01 + 0.01 - (0.01 * 0.01) = 0.0199
1484        let filter = LogicalOperator::Filter(FilterOp {
1485            predicate: LogicalExpression::Binary {
1486                left: Box::new(LogicalExpression::Binary {
1487                    left: Box::new(LogicalExpression::Property {
1488                        variable: "n".to_string(),
1489                        property: "city".to_string(),
1490                    }),
1491                    op: BinaryOp::Eq,
1492                    right: Box::new(LogicalExpression::Literal(Value::String("NYC".into()))),
1493                }),
1494                op: BinaryOp::Or,
1495                right: Box::new(LogicalExpression::Binary {
1496                    left: Box::new(LogicalExpression::Property {
1497                        variable: "n".to_string(),
1498                        property: "city".to_string(),
1499                    }),
1500                    op: BinaryOp::Eq,
1501                    right: Box::new(LogicalExpression::Literal(Value::String("LA".into()))),
1502                }),
1503            },
1504            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1505                variable: "n".to_string(),
1506                label: Some("Person".to_string()),
1507                input: None,
1508            })),
1509            pushdown_hint: None,
1510        });
1511
1512        let cardinality = estimator.estimate(&filter);
1513        // OR: 0.01 + 0.01 - 0.0001 ≈ 0.0199, so 1000 * 0.0199 ≈ 19.9
1514        assert!(cardinality < 100.0);
1515        assert!(cardinality >= 1.0);
1516    }
1517
1518    #[test]
1519    fn test_filter_literal_true() {
1520        let mut estimator = CardinalityEstimator::new();
1521        estimator.add_table_stats("Person", TableStats::new(1000));
1522
1523        let filter = LogicalOperator::Filter(FilterOp {
1524            predicate: LogicalExpression::Literal(Value::Bool(true)),
1525            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1526                variable: "n".to_string(),
1527                label: Some("Person".to_string()),
1528                input: None,
1529            })),
1530            pushdown_hint: None,
1531        });
1532
1533        let cardinality = estimator.estimate(&filter);
1534        // Literal true has selectivity 1.0
1535        assert!((cardinality - 1000.0).abs() < 0.001);
1536    }
1537
1538    #[test]
1539    fn test_filter_literal_false() {
1540        let mut estimator = CardinalityEstimator::new();
1541        estimator.add_table_stats("Person", TableStats::new(1000));
1542
1543        let filter = LogicalOperator::Filter(FilterOp {
1544            predicate: LogicalExpression::Literal(Value::Bool(false)),
1545            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1546                variable: "n".to_string(),
1547                label: Some("Person".to_string()),
1548                input: None,
1549            })),
1550            pushdown_hint: None,
1551        });
1552
1553        let cardinality = estimator.estimate(&filter);
1554        // Literal false has selectivity 0.0, but min is 1.0
1555        assert!((cardinality - 1.0).abs() < 0.001);
1556    }
1557
1558    #[test]
1559    fn test_unary_not_selectivity() {
1560        let mut estimator = CardinalityEstimator::new();
1561        estimator.add_table_stats("Person", TableStats::new(1000));
1562
1563        let filter = LogicalOperator::Filter(FilterOp {
1564            predicate: LogicalExpression::Unary {
1565                op: UnaryOp::Not,
1566                operand: Box::new(LogicalExpression::Literal(Value::Bool(true))),
1567            },
1568            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1569                variable: "n".to_string(),
1570                label: Some("Person".to_string()),
1571                input: None,
1572            })),
1573            pushdown_hint: None,
1574        });
1575
1576        let cardinality = estimator.estimate(&filter);
1577        // NOT inverts selectivity
1578        assert!(cardinality < 1000.0);
1579    }
1580
1581    #[test]
1582    fn test_unary_is_null_selectivity() {
1583        let mut estimator = CardinalityEstimator::new();
1584        estimator.add_table_stats("Person", TableStats::new(1000));
1585
1586        let filter = LogicalOperator::Filter(FilterOp {
1587            predicate: LogicalExpression::Unary {
1588                op: UnaryOp::IsNull,
1589                operand: Box::new(LogicalExpression::Variable("x".to_string())),
1590            },
1591            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1592                variable: "n".to_string(),
1593                label: Some("Person".to_string()),
1594                input: None,
1595            })),
1596            pushdown_hint: None,
1597        });
1598
1599        let cardinality = estimator.estimate(&filter);
1600        // IS NULL has selectivity 0.05
1601        assert!(cardinality < 100.0);
1602    }
1603
1604    #[test]
1605    fn test_expand_cardinality() {
1606        let mut estimator = CardinalityEstimator::new();
1607        estimator.add_table_stats("Person", TableStats::new(100));
1608
1609        let expand = LogicalOperator::Expand(ExpandOp {
1610            from_variable: "a".to_string(),
1611            to_variable: "b".to_string(),
1612            edge_variable: None,
1613            direction: ExpandDirection::Outgoing,
1614            edge_types: vec![],
1615            min_hops: 1,
1616            max_hops: Some(1),
1617            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1618                variable: "a".to_string(),
1619                label: Some("Person".to_string()),
1620                input: None,
1621            })),
1622            path_alias: None,
1623            path_mode: PathMode::Walk,
1624        });
1625
1626        let cardinality = estimator.estimate(&expand);
1627        // Expand multiplies by fanout (10)
1628        assert!(cardinality > 100.0);
1629    }
1630
1631    #[test]
1632    fn test_expand_with_edge_type_filter() {
1633        let mut estimator = CardinalityEstimator::new();
1634        estimator.add_table_stats("Person", TableStats::new(100));
1635
1636        let expand = LogicalOperator::Expand(ExpandOp {
1637            from_variable: "a".to_string(),
1638            to_variable: "b".to_string(),
1639            edge_variable: None,
1640            direction: ExpandDirection::Outgoing,
1641            edge_types: vec!["KNOWS".to_string()],
1642            min_hops: 1,
1643            max_hops: Some(1),
1644            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1645                variable: "a".to_string(),
1646                label: Some("Person".to_string()),
1647                input: None,
1648            })),
1649            path_alias: None,
1650            path_mode: PathMode::Walk,
1651        });
1652
1653        let cardinality = estimator.estimate(&expand);
1654        // With edge type, fanout is reduced by half
1655        assert!(cardinality > 100.0);
1656    }
1657
1658    #[test]
1659    fn test_expand_variable_length() {
1660        let mut estimator = CardinalityEstimator::new();
1661        estimator.add_table_stats("Person", TableStats::new(100));
1662
1663        let expand = LogicalOperator::Expand(ExpandOp {
1664            from_variable: "a".to_string(),
1665            to_variable: "b".to_string(),
1666            edge_variable: None,
1667            direction: ExpandDirection::Outgoing,
1668            edge_types: vec![],
1669            min_hops: 1,
1670            max_hops: Some(3),
1671            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1672                variable: "a".to_string(),
1673                label: Some("Person".to_string()),
1674                input: None,
1675            })),
1676            path_alias: None,
1677            path_mode: PathMode::Walk,
1678        });
1679
1680        let cardinality = estimator.estimate(&expand);
1681        // Variable length path has much higher cardinality
1682        assert!(cardinality > 500.0);
1683    }
1684
1685    #[test]
1686    fn test_join_cross_product() {
1687        let mut estimator = CardinalityEstimator::new();
1688        estimator.add_table_stats("Person", TableStats::new(100));
1689        estimator.add_table_stats("Company", TableStats::new(50));
1690
1691        let join = LogicalOperator::Join(JoinOp {
1692            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1693                variable: "p".to_string(),
1694                label: Some("Person".to_string()),
1695                input: None,
1696            })),
1697            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1698                variable: "c".to_string(),
1699                label: Some("Company".to_string()),
1700                input: None,
1701            })),
1702            join_type: JoinType::Cross,
1703            conditions: vec![],
1704        });
1705
1706        let cardinality = estimator.estimate(&join);
1707        // Cross join = 100 * 50 = 5000
1708        assert!((cardinality - 5000.0).abs() < 0.001);
1709    }
1710
1711    #[test]
1712    fn test_join_left_outer() {
1713        let mut estimator = CardinalityEstimator::new();
1714        estimator.add_table_stats("Person", TableStats::new(1000));
1715        estimator.add_table_stats("Company", TableStats::new(10));
1716
1717        let join = LogicalOperator::Join(JoinOp {
1718            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1719                variable: "p".to_string(),
1720                label: Some("Person".to_string()),
1721                input: None,
1722            })),
1723            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1724                variable: "c".to_string(),
1725                label: Some("Company".to_string()),
1726                input: None,
1727            })),
1728            join_type: JoinType::Left,
1729            conditions: vec![JoinCondition {
1730                left: LogicalExpression::Variable("p".to_string()),
1731                right: LogicalExpression::Variable("c".to_string()),
1732            }],
1733        });
1734
1735        let cardinality = estimator.estimate(&join);
1736        // Left join returns at least all left rows
1737        assert!(cardinality >= 1000.0);
1738    }
1739
1740    #[test]
1741    fn test_join_semi() {
1742        let mut estimator = CardinalityEstimator::new();
1743        estimator.add_table_stats("Person", TableStats::new(1000));
1744        estimator.add_table_stats("Company", TableStats::new(100));
1745
1746        let join = LogicalOperator::Join(JoinOp {
1747            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1748                variable: "p".to_string(),
1749                label: Some("Person".to_string()),
1750                input: None,
1751            })),
1752            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1753                variable: "c".to_string(),
1754                label: Some("Company".to_string()),
1755                input: None,
1756            })),
1757            join_type: JoinType::Semi,
1758            conditions: vec![],
1759        });
1760
1761        let cardinality = estimator.estimate(&join);
1762        // Semi join returns at most left cardinality
1763        assert!(cardinality <= 1000.0);
1764    }
1765
1766    #[test]
1767    fn test_join_anti() {
1768        let mut estimator = CardinalityEstimator::new();
1769        estimator.add_table_stats("Person", TableStats::new(1000));
1770        estimator.add_table_stats("Company", TableStats::new(100));
1771
1772        let join = LogicalOperator::Join(JoinOp {
1773            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1774                variable: "p".to_string(),
1775                label: Some("Person".to_string()),
1776                input: None,
1777            })),
1778            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1779                variable: "c".to_string(),
1780                label: Some("Company".to_string()),
1781                input: None,
1782            })),
1783            join_type: JoinType::Anti,
1784            conditions: vec![],
1785        });
1786
1787        let cardinality = estimator.estimate(&join);
1788        // Anti join returns at most left cardinality
1789        assert!(cardinality <= 1000.0);
1790        assert!(cardinality >= 1.0);
1791    }
1792
1793    #[test]
1794    fn test_project_preserves_cardinality() {
1795        let mut estimator = CardinalityEstimator::new();
1796        estimator.add_table_stats("Person", TableStats::new(1000));
1797
1798        let project = LogicalOperator::Project(ProjectOp {
1799            projections: vec![Projection {
1800                expression: LogicalExpression::Variable("n".to_string()),
1801                alias: None,
1802            }],
1803            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1804                variable: "n".to_string(),
1805                label: Some("Person".to_string()),
1806                input: None,
1807            })),
1808            pass_through_input: false,
1809        });
1810
1811        let cardinality = estimator.estimate(&project);
1812        assert!((cardinality - 1000.0).abs() < 0.001);
1813    }
1814
1815    #[test]
1816    fn test_sort_preserves_cardinality() {
1817        let mut estimator = CardinalityEstimator::new();
1818        estimator.add_table_stats("Person", TableStats::new(1000));
1819
1820        let sort = LogicalOperator::Sort(SortOp {
1821            keys: vec![SortKey {
1822                expression: LogicalExpression::Variable("n".to_string()),
1823                order: SortOrder::Ascending,
1824                nulls: None,
1825            }],
1826            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1827                variable: "n".to_string(),
1828                label: Some("Person".to_string()),
1829                input: None,
1830            })),
1831        });
1832
1833        let cardinality = estimator.estimate(&sort);
1834        assert!((cardinality - 1000.0).abs() < 0.001);
1835    }
1836
1837    #[test]
1838    fn test_distinct_reduces_cardinality() {
1839        let mut estimator = CardinalityEstimator::new();
1840        estimator.add_table_stats("Person", TableStats::new(1000));
1841
1842        let distinct = LogicalOperator::Distinct(DistinctOp {
1843            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1844                variable: "n".to_string(),
1845                label: Some("Person".to_string()),
1846                input: None,
1847            })),
1848            columns: None,
1849        });
1850
1851        let cardinality = estimator.estimate(&distinct);
1852        // Distinct assumes 50% unique
1853        assert!((cardinality - 500.0).abs() < 0.001);
1854    }
1855
1856    #[test]
1857    fn test_skip_reduces_cardinality() {
1858        let mut estimator = CardinalityEstimator::new();
1859        estimator.add_table_stats("Person", TableStats::new(1000));
1860
1861        let skip = LogicalOperator::Skip(SkipOp {
1862            count: 100.into(),
1863            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1864                variable: "n".to_string(),
1865                label: Some("Person".to_string()),
1866                input: None,
1867            })),
1868        });
1869
1870        let cardinality = estimator.estimate(&skip);
1871        assert!((cardinality - 900.0).abs() < 0.001);
1872    }
1873
1874    #[test]
1875    fn test_return_preserves_cardinality() {
1876        let mut estimator = CardinalityEstimator::new();
1877        estimator.add_table_stats("Person", TableStats::new(1000));
1878
1879        let ret = LogicalOperator::Return(ReturnOp {
1880            items: vec![ReturnItem {
1881                expression: LogicalExpression::Variable("n".to_string()),
1882                alias: None,
1883            }],
1884            distinct: false,
1885            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1886                variable: "n".to_string(),
1887                label: Some("Person".to_string()),
1888                input: None,
1889            })),
1890        });
1891
1892        let cardinality = estimator.estimate(&ret);
1893        assert!((cardinality - 1000.0).abs() < 0.001);
1894    }
1895
1896    #[test]
1897    fn test_empty_cardinality() {
1898        let estimator = CardinalityEstimator::new();
1899        let cardinality = estimator.estimate(&LogicalOperator::Empty);
1900        assert!((cardinality).abs() < 0.001);
1901    }
1902
1903    #[test]
1904    fn test_table_stats_with_column() {
1905        let stats = TableStats::new(1000).with_column(
1906            "age",
1907            ColumnStats::new(50).with_nulls(10).with_range(0.0, 100.0),
1908        );
1909
1910        assert_eq!(stats.row_count, 1000);
1911        let col = stats.columns.get("age").unwrap();
1912        assert_eq!(col.distinct_count, 50);
1913        assert_eq!(col.null_count, 10);
1914        assert!((col.min_value.unwrap() - 0.0).abs() < 0.001);
1915        assert!((col.max_value.unwrap() - 100.0).abs() < 0.001);
1916    }
1917
1918    #[test]
1919    fn test_estimator_default() {
1920        let estimator = CardinalityEstimator::default();
1921        let scan = LogicalOperator::NodeScan(NodeScanOp {
1922            variable: "n".to_string(),
1923            label: None,
1924            input: None,
1925        });
1926        let cardinality = estimator.estimate(&scan);
1927        assert!((cardinality - 1000.0).abs() < 0.001);
1928    }
1929
1930    #[test]
1931    fn test_set_avg_fanout() {
1932        let mut estimator = CardinalityEstimator::new();
1933        estimator.add_table_stats("Person", TableStats::new(100));
1934        estimator.set_avg_fanout(5.0);
1935
1936        let expand = LogicalOperator::Expand(ExpandOp {
1937            from_variable: "a".to_string(),
1938            to_variable: "b".to_string(),
1939            edge_variable: None,
1940            direction: ExpandDirection::Outgoing,
1941            edge_types: vec![],
1942            min_hops: 1,
1943            max_hops: Some(1),
1944            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1945                variable: "a".to_string(),
1946                label: Some("Person".to_string()),
1947                input: None,
1948            })),
1949            path_alias: None,
1950            path_mode: PathMode::Walk,
1951        });
1952
1953        let cardinality = estimator.estimate(&expand);
1954        // With fanout 5: 100 * 5 = 500
1955        assert!((cardinality - 500.0).abs() < 0.001);
1956    }
1957
1958    #[test]
1959    fn test_multiple_group_by_keys_reduce_cardinality() {
1960        // The current implementation uses a simplified model where more group by keys
1961        // results in greater reduction (dividing by 10^num_keys). This is a simplification
1962        // that works for most cases where group by keys are correlated.
1963        let mut estimator = CardinalityEstimator::new();
1964        estimator.add_table_stats("Person", TableStats::new(10000));
1965
1966        let single_group = LogicalOperator::Aggregate(AggregateOp {
1967            group_by: vec![LogicalExpression::Property {
1968                variable: "n".to_string(),
1969                property: "city".to_string(),
1970            }],
1971            aggregates: vec![],
1972            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1973                variable: "n".to_string(),
1974                label: Some("Person".to_string()),
1975                input: None,
1976            })),
1977            having: None,
1978        });
1979
1980        let multi_group = LogicalOperator::Aggregate(AggregateOp {
1981            group_by: vec![
1982                LogicalExpression::Property {
1983                    variable: "n".to_string(),
1984                    property: "city".to_string(),
1985                },
1986                LogicalExpression::Property {
1987                    variable: "n".to_string(),
1988                    property: "country".to_string(),
1989                },
1990            ],
1991            aggregates: vec![],
1992            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1993                variable: "n".to_string(),
1994                label: Some("Person".to_string()),
1995                input: None,
1996            })),
1997            having: None,
1998        });
1999
2000        let single_card = estimator.estimate(&single_group);
2001        let multi_card = estimator.estimate(&multi_group);
2002
2003        // Both should reduce cardinality from input
2004        assert!(single_card < 10000.0);
2005        assert!(multi_card < 10000.0);
2006        // Both should be at least 1
2007        assert!(single_card >= 1.0);
2008        assert!(multi_card >= 1.0);
2009    }
2010
2011    // ============= Histogram Tests =============
2012
2013    #[test]
2014    fn test_histogram_build_uniform() {
2015        // Build histogram from uniformly distributed data
2016        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2017        let histogram = EquiDepthHistogram::build(&values, 10);
2018
2019        assert_eq!(histogram.num_buckets(), 10);
2020        assert_eq!(histogram.total_rows(), 100);
2021
2022        // Each bucket should have approximately 10 rows
2023        for bucket in histogram.buckets() {
2024            assert!(bucket.frequency >= 9 && bucket.frequency <= 11);
2025        }
2026    }
2027
2028    #[test]
2029    fn test_histogram_build_skewed() {
2030        // Build histogram from skewed data (many small values, few large)
2031        let mut values: Vec<f64> = (0..80).map(|i| i as f64).collect();
2032        values.extend((0..20).map(|i| 1000.0 + i as f64));
2033        let histogram = EquiDepthHistogram::build(&values, 5);
2034
2035        assert_eq!(histogram.num_buckets(), 5);
2036        assert_eq!(histogram.total_rows(), 100);
2037
2038        // Each bucket should have ~20 rows despite skewed data
2039        for bucket in histogram.buckets() {
2040            assert!(bucket.frequency >= 18 && bucket.frequency <= 22);
2041        }
2042    }
2043
2044    #[test]
2045    fn test_histogram_range_selectivity_full() {
2046        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2047        let histogram = EquiDepthHistogram::build(&values, 10);
2048
2049        // Full range should have selectivity ~1.0
2050        let selectivity = histogram.range_selectivity(None, None);
2051        assert!((selectivity - 1.0).abs() < 0.01);
2052    }
2053
2054    #[test]
2055    fn test_histogram_range_selectivity_half() {
2056        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2057        let histogram = EquiDepthHistogram::build(&values, 10);
2058
2059        // Values >= 50 should be ~50% (half the data)
2060        let selectivity = histogram.range_selectivity(Some(50.0), None);
2061        assert!(selectivity > 0.4 && selectivity < 0.6);
2062    }
2063
2064    #[test]
2065    fn test_histogram_range_selectivity_quarter() {
2066        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2067        let histogram = EquiDepthHistogram::build(&values, 10);
2068
2069        // Values < 25 should be ~25%
2070        let selectivity = histogram.range_selectivity(None, Some(25.0));
2071        assert!(selectivity > 0.2 && selectivity < 0.3);
2072    }
2073
2074    #[test]
2075    fn test_histogram_equality_selectivity() {
2076        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2077        let histogram = EquiDepthHistogram::build(&values, 10);
2078
2079        // Equality on 100 distinct values should be ~1%
2080        let selectivity = histogram.equality_selectivity(50.0);
2081        assert!(selectivity > 0.005 && selectivity < 0.02);
2082    }
2083
2084    #[test]
2085    fn test_histogram_empty() {
2086        let histogram = EquiDepthHistogram::build(&[], 10);
2087
2088        assert_eq!(histogram.num_buckets(), 0);
2089        assert_eq!(histogram.total_rows(), 0);
2090
2091        // Default selectivity for empty histogram
2092        let selectivity = histogram.range_selectivity(Some(0.0), Some(100.0));
2093        assert!((selectivity - 0.33).abs() < 0.01);
2094    }
2095
2096    #[test]
2097    fn test_histogram_bucket_overlap() {
2098        let bucket = HistogramBucket::new(10.0, 20.0, 100, 10);
2099
2100        // Full overlap
2101        assert!((bucket.overlap_fraction(Some(10.0), Some(20.0)) - 1.0).abs() < 0.01);
2102
2103        // Half overlap (lower half)
2104        assert!((bucket.overlap_fraction(Some(10.0), Some(15.0)) - 0.5).abs() < 0.01);
2105
2106        // Half overlap (upper half)
2107        assert!((bucket.overlap_fraction(Some(15.0), Some(20.0)) - 0.5).abs() < 0.01);
2108
2109        // No overlap (below)
2110        assert!((bucket.overlap_fraction(Some(0.0), Some(5.0))).abs() < 0.01);
2111
2112        // No overlap (above)
2113        assert!((bucket.overlap_fraction(Some(25.0), Some(30.0))).abs() < 0.01);
2114    }
2115
2116    #[test]
2117    fn test_column_stats_from_values() {
2118        let values = vec![10.0, 20.0, 30.0, 40.0, 50.0, 20.0, 30.0, 40.0];
2119        let stats = ColumnStats::from_values(values, 4);
2120
2121        assert_eq!(stats.distinct_count, 5); // 10, 20, 30, 40, 50
2122        assert!(stats.min_value.is_some());
2123        assert!((stats.min_value.unwrap() - 10.0).abs() < 0.01);
2124        assert!(stats.max_value.is_some());
2125        assert!((stats.max_value.unwrap() - 50.0).abs() < 0.01);
2126        assert!(stats.histogram.is_some());
2127    }
2128
2129    #[test]
2130    fn test_column_stats_with_histogram_builder() {
2131        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2132        let histogram = EquiDepthHistogram::build(&values, 10);
2133
2134        let stats = ColumnStats::new(100)
2135            .with_range(0.0, 99.0)
2136            .with_histogram(histogram);
2137
2138        assert!(stats.histogram.is_some());
2139        assert_eq!(stats.histogram.as_ref().unwrap().num_buckets(), 10);
2140    }
2141
2142    #[test]
2143    fn test_filter_with_histogram_stats() {
2144        let mut estimator = CardinalityEstimator::new();
2145
2146        // Create stats with histogram for age column
2147        let age_values: Vec<f64> = (18..80).map(|i| i as f64).collect();
2148        let histogram = EquiDepthHistogram::build(&age_values, 10);
2149        let age_stats = ColumnStats::new(62)
2150            .with_range(18.0, 79.0)
2151            .with_histogram(histogram);
2152
2153        estimator.add_table_stats(
2154            "Person",
2155            TableStats::new(1000).with_column("age", age_stats),
2156        );
2157
2158        // Filter: age > 50
2159        // Age range is 18-79, so >50 is about (79-50)/(79-18) = 29/61 ≈ 47.5%
2160        let filter = LogicalOperator::Filter(FilterOp {
2161            predicate: LogicalExpression::Binary {
2162                left: Box::new(LogicalExpression::Property {
2163                    variable: "n".to_string(),
2164                    property: "age".to_string(),
2165                }),
2166                op: BinaryOp::Gt,
2167                right: Box::new(LogicalExpression::Literal(Value::Int64(50))),
2168            },
2169            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2170                variable: "n".to_string(),
2171                label: Some("Person".to_string()),
2172                input: None,
2173            })),
2174            pushdown_hint: None,
2175        });
2176
2177        let cardinality = estimator.estimate(&filter);
2178
2179        // With histogram, should get more accurate estimate than default 0.33
2180        // Expected: ~47.5% of 1000 = ~475
2181        assert!(cardinality > 300.0 && cardinality < 600.0);
2182    }
2183
2184    #[test]
2185    fn test_filter_equality_with_histogram() {
2186        let mut estimator = CardinalityEstimator::new();
2187
2188        // Create stats with histogram
2189        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2190        let histogram = EquiDepthHistogram::build(&values, 10);
2191        let stats = ColumnStats::new(100)
2192            .with_range(0.0, 99.0)
2193            .with_histogram(histogram);
2194
2195        estimator.add_table_stats("Data", TableStats::new(1000).with_column("value", stats));
2196
2197        // Filter: value = 50
2198        let filter = LogicalOperator::Filter(FilterOp {
2199            predicate: LogicalExpression::Binary {
2200                left: Box::new(LogicalExpression::Property {
2201                    variable: "d".to_string(),
2202                    property: "value".to_string(),
2203                }),
2204                op: BinaryOp::Eq,
2205                right: Box::new(LogicalExpression::Literal(Value::Int64(50))),
2206            },
2207            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2208                variable: "d".to_string(),
2209                label: Some("Data".to_string()),
2210                input: None,
2211            })),
2212            pushdown_hint: None,
2213        });
2214
2215        let cardinality = estimator.estimate(&filter);
2216
2217        // With 100 distinct values, selectivity should be ~1%
2218        // 1000 * 0.01 = 10
2219        assert!((1.0..50.0).contains(&cardinality));
2220    }
2221
2222    #[test]
2223    fn test_histogram_min_max() {
2224        let values: Vec<f64> = vec![5.0, 10.0, 15.0, 20.0, 25.0];
2225        let histogram = EquiDepthHistogram::build(&values, 2);
2226
2227        assert_eq!(histogram.min_value(), Some(5.0));
2228        // Max is the upper bound of the last bucket
2229        assert!(histogram.max_value().is_some());
2230    }
2231
2232    // ==================== SelectivityConfig Tests ====================
2233
2234    #[test]
2235    fn test_selectivity_config_defaults() {
2236        let config = SelectivityConfig::new();
2237        assert!((config.default - 0.1).abs() < f64::EPSILON);
2238        assert!((config.equality - 0.01).abs() < f64::EPSILON);
2239        assert!((config.inequality - 0.99).abs() < f64::EPSILON);
2240        assert!((config.range - 0.33).abs() < f64::EPSILON);
2241        assert!((config.string_ops - 0.1).abs() < f64::EPSILON);
2242        assert!((config.membership - 0.1).abs() < f64::EPSILON);
2243        assert!((config.is_null - 0.05).abs() < f64::EPSILON);
2244        assert!((config.is_not_null - 0.95).abs() < f64::EPSILON);
2245        assert!((config.distinct_fraction - 0.5).abs() < f64::EPSILON);
2246    }
2247
2248    #[test]
2249    fn test_custom_selectivity_config() {
2250        let config = SelectivityConfig {
2251            equality: 0.05,
2252            range: 0.25,
2253            ..SelectivityConfig::new()
2254        };
2255        let estimator = CardinalityEstimator::with_selectivity_config(config);
2256        assert!((estimator.selectivity_config().equality - 0.05).abs() < f64::EPSILON);
2257        assert!((estimator.selectivity_config().range - 0.25).abs() < f64::EPSILON);
2258    }
2259
2260    #[test]
2261    fn test_custom_selectivity_affects_estimation() {
2262        // Default: equality = 0.01 → 1000 * 0.01 = 10
2263        let mut default_est = CardinalityEstimator::new();
2264        default_est.add_table_stats("Person", TableStats::new(1000));
2265
2266        let filter = LogicalOperator::Filter(FilterOp {
2267            predicate: LogicalExpression::Binary {
2268                left: Box::new(LogicalExpression::Property {
2269                    variable: "n".to_string(),
2270                    property: "name".to_string(),
2271                }),
2272                op: BinaryOp::Eq,
2273                right: Box::new(LogicalExpression::Literal(Value::String("Alix".into()))),
2274            },
2275            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2276                variable: "n".to_string(),
2277                label: Some("Person".to_string()),
2278                input: None,
2279            })),
2280            pushdown_hint: None,
2281        });
2282
2283        let default_card = default_est.estimate(&filter);
2284
2285        // Custom: equality = 0.2 → 1000 * 0.2 = 200
2286        let config = SelectivityConfig {
2287            equality: 0.2,
2288            ..SelectivityConfig::new()
2289        };
2290        let mut custom_est = CardinalityEstimator::with_selectivity_config(config);
2291        custom_est.add_table_stats("Person", TableStats::new(1000));
2292
2293        let custom_card = custom_est.estimate(&filter);
2294
2295        assert!(custom_card > default_card);
2296        assert!((custom_card - 200.0).abs() < 1.0);
2297    }
2298
2299    #[test]
2300    fn test_custom_range_selectivity() {
2301        let config = SelectivityConfig {
2302            range: 0.5,
2303            ..SelectivityConfig::new()
2304        };
2305        let mut estimator = CardinalityEstimator::with_selectivity_config(config);
2306        estimator.add_table_stats("Person", TableStats::new(1000));
2307
2308        let filter = LogicalOperator::Filter(FilterOp {
2309            predicate: LogicalExpression::Binary {
2310                left: Box::new(LogicalExpression::Property {
2311                    variable: "n".to_string(),
2312                    property: "age".to_string(),
2313                }),
2314                op: BinaryOp::Gt,
2315                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2316            },
2317            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2318                variable: "n".to_string(),
2319                label: Some("Person".to_string()),
2320                input: None,
2321            })),
2322            pushdown_hint: None,
2323        });
2324
2325        let cardinality = estimator.estimate(&filter);
2326        // 1000 * 0.5 = 500
2327        assert!((cardinality - 500.0).abs() < 1.0);
2328    }
2329
2330    #[test]
2331    fn test_custom_distinct_fraction() {
2332        let config = SelectivityConfig {
2333            distinct_fraction: 0.8,
2334            ..SelectivityConfig::new()
2335        };
2336        let mut estimator = CardinalityEstimator::with_selectivity_config(config);
2337        estimator.add_table_stats("Person", TableStats::new(1000));
2338
2339        let distinct = LogicalOperator::Distinct(DistinctOp {
2340            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2341                variable: "n".to_string(),
2342                label: Some("Person".to_string()),
2343                input: None,
2344            })),
2345            columns: None,
2346        });
2347
2348        let cardinality = estimator.estimate(&distinct);
2349        // 1000 * 0.8 = 800
2350        assert!((cardinality - 800.0).abs() < 1.0);
2351    }
2352
2353    // ==================== EstimationLog Tests ====================
2354
2355    #[test]
2356    fn test_estimation_log_basic() {
2357        let mut log = EstimationLog::new(10.0);
2358        log.record("NodeScan(Person)", 1000.0, 1200.0);
2359        log.record("Filter(age > 30)", 100.0, 90.0);
2360
2361        assert_eq!(log.entries().len(), 2);
2362        assert!(!log.should_replan()); // 1.2x and 0.9x are within 10x threshold
2363    }
2364
2365    #[test]
2366    fn test_estimation_log_triggers_replan() {
2367        let mut log = EstimationLog::new(10.0);
2368        log.record("NodeScan(Person)", 100.0, 5000.0); // 50x underestimate
2369
2370        assert!(log.should_replan());
2371    }
2372
2373    #[test]
2374    fn test_estimation_log_overestimate_triggers_replan() {
2375        let mut log = EstimationLog::new(5.0);
2376        log.record("Filter", 1000.0, 100.0); // 10x overestimate → ratio = 0.1
2377
2378        assert!(log.should_replan()); // 0.1 < 1/5 = 0.2
2379    }
2380
2381    #[test]
2382    fn test_estimation_entry_error_ratio() {
2383        let entry = EstimationEntry {
2384            operator: "test".into(),
2385            estimated: 100.0,
2386            actual: 200.0,
2387        };
2388        assert!((entry.error_ratio() - 2.0).abs() < f64::EPSILON);
2389
2390        let perfect = EstimationEntry {
2391            operator: "test".into(),
2392            estimated: 100.0,
2393            actual: 100.0,
2394        };
2395        assert!((perfect.error_ratio() - 1.0).abs() < f64::EPSILON);
2396
2397        let zero_est = EstimationEntry {
2398            operator: "test".into(),
2399            estimated: 0.0,
2400            actual: 0.0,
2401        };
2402        assert!((zero_est.error_ratio() - 1.0).abs() < f64::EPSILON);
2403    }
2404
2405    #[test]
2406    fn test_estimation_log_max_error_ratio() {
2407        let mut log = EstimationLog::new(10.0);
2408        log.record("A", 100.0, 300.0); // 3x
2409        log.record("B", 100.0, 50.0); // 2x (normalized: 1/0.5 = 2)
2410        log.record("C", 100.0, 100.0); // 1x
2411
2412        assert!((log.max_error_ratio() - 3.0).abs() < f64::EPSILON);
2413    }
2414
2415    #[test]
2416    fn test_estimation_log_clear() {
2417        let mut log = EstimationLog::new(10.0);
2418        log.record("A", 100.0, 100.0);
2419        assert_eq!(log.entries().len(), 1);
2420
2421        log.clear();
2422        assert!(log.entries().is_empty());
2423        assert!(!log.should_replan());
2424    }
2425
2426    #[test]
2427    fn test_create_estimation_log() {
2428        let log = CardinalityEstimator::create_estimation_log();
2429        assert!(log.entries().is_empty());
2430        assert!(!log.should_replan());
2431    }
2432
2433    #[test]
2434    fn test_equality_selectivity_empty_histogram() {
2435        let hist = EquiDepthHistogram::new(vec![]);
2436        // Empty histogram returns fixed fallback
2437        assert_eq!(hist.equality_selectivity(5.0), 0.01);
2438    }
2439
2440    #[test]
2441    fn test_equality_selectivity_value_in_bucket() {
2442        let values: Vec<f64> = (1..=10).map(|i| i as f64).collect();
2443        let hist = EquiDepthHistogram::build(&values, 2);
2444        let sel = hist.equality_selectivity(3.0);
2445        assert!(sel > 0.0);
2446        assert!(sel <= 1.0);
2447    }
2448
2449    #[test]
2450    fn test_equality_selectivity_value_outside_all_buckets() {
2451        let values: Vec<f64> = (1..=10).map(|i| i as f64).collect();
2452        let hist = EquiDepthHistogram::build(&values, 2);
2453        // Value far outside range
2454        let sel = hist.equality_selectivity(9999.0);
2455        assert_eq!(sel, 0.001);
2456    }
2457
2458    #[test]
2459    fn test_histogram_min_max_empty() {
2460        let hist = EquiDepthHistogram::new(vec![]);
2461        assert_eq!(hist.min_value(), None);
2462        assert_eq!(hist.max_value(), None);
2463    }
2464
2465    #[test]
2466    fn test_histogram_min_max_single_bucket() {
2467        let hist = EquiDepthHistogram::new(vec![HistogramBucket::new(1.0, 10.0, 5, 5)]);
2468        assert_eq!(hist.min_value(), Some(1.0));
2469        assert_eq!(hist.max_value(), Some(10.0));
2470    }
2471
2472    #[test]
2473    fn test_histogram_min_max_multi_bucket() {
2474        let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0];
2475        let hist = EquiDepthHistogram::build(&values, 3);
2476        let min = hist.min_value().unwrap();
2477        let max = hist.max_value().unwrap();
2478        assert!((min - 1.0).abs() < 1e-9, "min should be 1.0, got {min}");
2479        assert!(max >= 20.0, "max should be >= last value, got {max}");
2480    }
2481
2482    #[test]
2483    fn test_count_and_conjuncts_single_expression() {
2484        use crate::query::plan::LogicalExpression;
2485        let expr = LogicalExpression::Literal(Value::Bool(true));
2486        assert_eq!(count_and_conjuncts(&expr), 1);
2487    }
2488
2489    #[test]
2490    fn test_count_and_conjuncts_flat_and() {
2491        use crate::query::plan::{BinaryOp, LogicalExpression};
2492        let expr = LogicalExpression::Binary {
2493            left: Box::new(LogicalExpression::Literal(Value::Bool(true))),
2494            op: BinaryOp::And,
2495            right: Box::new(LogicalExpression::Literal(Value::Bool(false))),
2496        };
2497        assert_eq!(count_and_conjuncts(&expr), 2);
2498    }
2499
2500    #[test]
2501    fn test_count_and_conjuncts_nested_and() {
2502        use crate::query::plan::{BinaryOp, LogicalExpression};
2503        let ab = LogicalExpression::Binary {
2504            left: Box::new(LogicalExpression::Literal(Value::Bool(true))),
2505            op: BinaryOp::And,
2506            right: Box::new(LogicalExpression::Literal(Value::Bool(false))),
2507        };
2508        let cd = LogicalExpression::Binary {
2509            left: Box::new(LogicalExpression::Literal(Value::Int64(1))),
2510            op: BinaryOp::And,
2511            right: Box::new(LogicalExpression::Literal(Value::Int64(2))),
2512        };
2513        let expr = LogicalExpression::Binary {
2514            left: Box::new(ab),
2515            op: BinaryOp::And,
2516            right: Box::new(cd),
2517        };
2518        assert_eq!(count_and_conjuncts(&expr), 4);
2519    }
2520
2521    #[test]
2522    fn test_count_distinct_empty() {
2523        assert_eq!(count_distinct(&[]), 0);
2524    }
2525
2526    #[test]
2527    fn test_count_distinct_all_unique() {
2528        assert_eq!(count_distinct(&[1.0, 2.0, 3.0, 4.0]), 4);
2529    }
2530
2531    #[test]
2532    fn test_count_distinct_with_duplicates() {
2533        assert_eq!(count_distinct(&[1.0, 1.0, 2.0, 2.0, 3.0]), 3);
2534    }
2535
2536    #[test]
2537    fn test_count_distinct_all_same() {
2538        assert_eq!(count_distinct(&[5.0, 5.0, 5.0]), 1);
2539    }
2540
2541    #[test]
2542    fn test_count_distinct_single_value() {
2543        assert_eq!(count_distinct(&[42.0]), 1);
2544    }
2545}