Skip to main content

grafeo_engine/query/optimizer/
cardinality.rs

1//! Cardinality estimation for query optimization.
2//!
3//! Estimates the number of rows produced by each operator in a query plan.
4//!
5//! # Equi-Depth Histograms
6//!
7//! This module provides equi-depth histogram support for accurate selectivity
8//! estimation of range predicates. Unlike equi-width histograms that divide
9//! the value range into equal-sized buckets, equi-depth histograms divide
10//! the data into buckets with approximately equal numbers of rows.
11//!
12//! Benefits:
13//! - Better estimates for skewed data distributions
14//! - More accurate range selectivity than assuming uniform distribution
15//! - Adaptive to actual data characteristics
16
17use crate::query::plan::{
18    AggregateOp, BinaryOp, DistinctOp, ExpandOp, FilterOp, JoinOp, JoinType, LeftJoinOp, LimitOp,
19    LogicalExpression, LogicalOperator, MultiWayJoinOp, NodeScanOp, ProjectOp, SkipOp, SortOp,
20    TripleComponent, TripleScanOp, UnaryOp, VectorJoinOp, VectorScanOp,
21};
22use std::collections::HashMap;
23
24/// A bucket in an equi-depth histogram.
25///
26/// Each bucket represents a range of values and the frequency of rows
27/// falling within that range. In an equi-depth histogram, all buckets
28/// contain approximately the same number of rows.
29#[derive(Debug, Clone)]
30pub struct HistogramBucket {
31    /// Lower bound of the bucket (inclusive).
32    pub lower_bound: f64,
33    /// Upper bound of the bucket (exclusive, except for the last bucket).
34    pub upper_bound: f64,
35    /// Number of rows in this bucket.
36    pub frequency: u64,
37    /// Number of distinct values in this bucket.
38    pub distinct_count: u64,
39}
40
41impl HistogramBucket {
42    /// Creates a new histogram bucket.
43    #[must_use]
44    pub fn new(lower_bound: f64, upper_bound: f64, frequency: u64, distinct_count: u64) -> Self {
45        Self {
46            lower_bound,
47            upper_bound,
48            frequency,
49            distinct_count,
50        }
51    }
52
53    /// Returns the width of this bucket.
54    #[must_use]
55    pub fn width(&self) -> f64 {
56        self.upper_bound - self.lower_bound
57    }
58
59    /// Checks if a value falls within this bucket.
60    #[must_use]
61    pub fn contains(&self, value: f64) -> bool {
62        value >= self.lower_bound && value < self.upper_bound
63    }
64
65    /// Returns the fraction of this bucket covered by the given range.
66    #[must_use]
67    pub fn overlap_fraction(&self, lower: Option<f64>, upper: Option<f64>) -> f64 {
68        let effective_lower = lower.unwrap_or(self.lower_bound).max(self.lower_bound);
69        let effective_upper = upper.unwrap_or(self.upper_bound).min(self.upper_bound);
70
71        let bucket_width = self.width();
72        if bucket_width <= 0.0 {
73            return if effective_lower <= self.lower_bound && effective_upper >= self.upper_bound {
74                1.0
75            } else {
76                0.0
77            };
78        }
79
80        let overlap = (effective_upper - effective_lower).max(0.0);
81        (overlap / bucket_width).min(1.0)
82    }
83}
84
85/// An equi-depth histogram for selectivity estimation.
86///
87/// Equi-depth histograms partition data into buckets where each bucket
88/// contains approximately the same number of rows. This provides more
89/// accurate selectivity estimates than assuming uniform distribution,
90/// especially for skewed data.
91///
92/// # Example
93///
94/// ```no_run
95/// use grafeo_engine::query::optimizer::cardinality::EquiDepthHistogram;
96///
97/// // Build a histogram from sorted values
98/// let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0, 30.0, 40.0, 50.0];
99/// let histogram = EquiDepthHistogram::build(&values, 4);
100///
101/// // Estimate selectivity for age > 25
102/// let selectivity = histogram.range_selectivity(Some(25.0), None);
103/// ```
104#[derive(Debug, Clone)]
105pub struct EquiDepthHistogram {
106    /// The histogram buckets, sorted by lower_bound.
107    buckets: Vec<HistogramBucket>,
108    /// Total number of rows represented by this histogram.
109    total_rows: u64,
110}
111
112impl EquiDepthHistogram {
113    /// Creates a new histogram from pre-built buckets.
114    #[must_use]
115    pub fn new(buckets: Vec<HistogramBucket>) -> Self {
116        let total_rows = buckets.iter().map(|b| b.frequency).sum();
117        Self {
118            buckets,
119            total_rows,
120        }
121    }
122
123    /// Builds an equi-depth histogram from a sorted slice of values.
124    ///
125    /// # Arguments
126    /// * `values` - A sorted slice of numeric values
127    /// * `num_buckets` - The desired number of buckets
128    ///
129    /// # Returns
130    /// An equi-depth histogram with approximately equal row counts per bucket.
131    #[must_use]
132    pub fn build(values: &[f64], num_buckets: usize) -> Self {
133        if values.is_empty() || num_buckets == 0 {
134            return Self {
135                buckets: Vec::new(),
136                total_rows: 0,
137            };
138        }
139
140        let num_buckets = num_buckets.min(values.len());
141        let rows_per_bucket = (values.len() + num_buckets - 1) / num_buckets;
142        let mut buckets = Vec::with_capacity(num_buckets);
143
144        let mut start_idx = 0;
145        while start_idx < values.len() {
146            let end_idx = (start_idx + rows_per_bucket).min(values.len());
147            let lower_bound = values[start_idx];
148            let upper_bound = if end_idx < values.len() {
149                values[end_idx]
150            } else {
151                // For the last bucket, extend slightly beyond the max value
152                values[end_idx - 1] + 1.0
153            };
154
155            // Count distinct values in this bucket
156            let bucket_values = &values[start_idx..end_idx];
157            let distinct_count = count_distinct(bucket_values);
158
159            buckets.push(HistogramBucket::new(
160                lower_bound,
161                upper_bound,
162                (end_idx - start_idx) as u64,
163                distinct_count,
164            ));
165
166            start_idx = end_idx;
167        }
168
169        Self::new(buckets)
170    }
171
172    /// Returns the number of buckets in this histogram.
173    #[must_use]
174    pub fn num_buckets(&self) -> usize {
175        self.buckets.len()
176    }
177
178    /// Returns the total number of rows represented.
179    #[must_use]
180    pub fn total_rows(&self) -> u64 {
181        self.total_rows
182    }
183
184    /// Returns the histogram buckets.
185    #[must_use]
186    pub fn buckets(&self) -> &[HistogramBucket] {
187        &self.buckets
188    }
189
190    /// Estimates selectivity for a range predicate.
191    ///
192    /// # Arguments
193    /// * `lower` - Lower bound (None for unbounded)
194    /// * `upper` - Upper bound (None for unbounded)
195    ///
196    /// # Returns
197    /// Estimated fraction of rows matching the range (0.0 to 1.0).
198    #[must_use]
199    pub fn range_selectivity(&self, lower: Option<f64>, upper: Option<f64>) -> f64 {
200        if self.buckets.is_empty() || self.total_rows == 0 {
201            return 0.33; // Default fallback
202        }
203
204        let mut matching_rows = 0.0;
205
206        for bucket in &self.buckets {
207            // Check if this bucket overlaps with the range
208            let bucket_lower = bucket.lower_bound;
209            let bucket_upper = bucket.upper_bound;
210
211            // Skip buckets entirely outside the range
212            if let Some(l) = lower
213                && bucket_upper <= l
214            {
215                continue;
216            }
217            if let Some(u) = upper
218                && bucket_lower >= u
219            {
220                continue;
221            }
222
223            // Calculate the fraction of this bucket covered by the range
224            let overlap = bucket.overlap_fraction(lower, upper);
225            matching_rows += overlap * bucket.frequency as f64;
226        }
227
228        (matching_rows / self.total_rows as f64).clamp(0.0, 1.0)
229    }
230
231    /// Estimates selectivity for an equality predicate.
232    ///
233    /// Uses the distinct count within matching buckets for better accuracy.
234    #[must_use]
235    pub fn equality_selectivity(&self, value: f64) -> f64 {
236        if self.buckets.is_empty() || self.total_rows == 0 {
237            return 0.01; // Default fallback
238        }
239
240        // Find the bucket containing this value
241        for bucket in &self.buckets {
242            if bucket.contains(value) {
243                // Assume uniform distribution within bucket
244                if bucket.distinct_count > 0 {
245                    return (bucket.frequency as f64
246                        / bucket.distinct_count as f64
247                        / self.total_rows as f64)
248                        .min(1.0);
249                }
250            }
251        }
252
253        // Value not in any bucket - very low selectivity
254        0.001
255    }
256
257    /// Gets the minimum value in the histogram.
258    #[must_use]
259    pub fn min_value(&self) -> Option<f64> {
260        self.buckets.first().map(|b| b.lower_bound)
261    }
262
263    /// Gets the maximum value in the histogram.
264    #[must_use]
265    pub fn max_value(&self) -> Option<f64> {
266        self.buckets.last().map(|b| b.upper_bound)
267    }
268}
269
270/// Counts distinct values in a sorted slice.
271/// Counts the number of top-level AND conjuncts in an expression.
272///
273/// For `(A AND B) AND (C AND D)` returns 4; for a single non-AND expression
274/// returns 1. Used to estimate selectivity of multi-predicate join conditions.
275fn count_and_conjuncts(expr: &LogicalExpression) -> usize {
276    match expr {
277        LogicalExpression::Binary {
278            op: BinaryOp::And,
279            left,
280            right,
281        } => count_and_conjuncts(left) + count_and_conjuncts(right),
282        _ => 1,
283    }
284}
285
286fn count_distinct(sorted_values: &[f64]) -> u64 {
287    if sorted_values.is_empty() {
288        return 0;
289    }
290
291    let mut count = 1u64;
292    let mut prev = sorted_values[0];
293
294    for &val in &sorted_values[1..] {
295        if (val - prev).abs() > f64::EPSILON {
296            count += 1;
297            prev = val;
298        }
299    }
300
301    count
302}
303
304/// Statistics for a table/label.
305#[derive(Debug, Clone)]
306pub struct TableStats {
307    /// Total number of rows.
308    pub row_count: u64,
309    /// Column statistics.
310    pub columns: HashMap<String, ColumnStats>,
311}
312
313impl TableStats {
314    /// Creates new table statistics.
315    #[must_use]
316    pub fn new(row_count: u64) -> Self {
317        Self {
318            row_count,
319            columns: HashMap::new(),
320        }
321    }
322
323    /// Adds column statistics.
324    pub fn with_column(mut self, name: &str, stats: ColumnStats) -> Self {
325        self.columns.insert(name.to_string(), stats);
326        self
327    }
328}
329
330/// Statistics for a column.
331#[derive(Debug, Clone)]
332pub struct ColumnStats {
333    /// Number of distinct values.
334    pub distinct_count: u64,
335    /// Number of null values.
336    pub null_count: u64,
337    /// Minimum value (if orderable).
338    pub min_value: Option<f64>,
339    /// Maximum value (if orderable).
340    pub max_value: Option<f64>,
341    /// Equi-depth histogram for accurate selectivity estimation.
342    pub histogram: Option<EquiDepthHistogram>,
343}
344
345impl ColumnStats {
346    /// Creates new column statistics.
347    #[must_use]
348    pub fn new(distinct_count: u64) -> Self {
349        Self {
350            distinct_count,
351            null_count: 0,
352            min_value: None,
353            max_value: None,
354            histogram: None,
355        }
356    }
357
358    /// Sets the null count.
359    #[must_use]
360    pub fn with_nulls(mut self, null_count: u64) -> Self {
361        self.null_count = null_count;
362        self
363    }
364
365    /// Sets the min/max range.
366    #[must_use]
367    pub fn with_range(mut self, min: f64, max: f64) -> Self {
368        self.min_value = Some(min);
369        self.max_value = Some(max);
370        self
371    }
372
373    /// Sets the equi-depth histogram for this column.
374    #[must_use]
375    pub fn with_histogram(mut self, histogram: EquiDepthHistogram) -> Self {
376        self.histogram = Some(histogram);
377        self
378    }
379
380    /// Builds column statistics with histogram from raw values.
381    ///
382    /// This is a convenience method that computes all statistics from the data.
383    ///
384    /// # Arguments
385    /// * `values` - The column values (will be sorted internally)
386    /// * `num_buckets` - Number of histogram buckets to create
387    #[must_use]
388    pub fn from_values(mut values: Vec<f64>, num_buckets: usize) -> Self {
389        if values.is_empty() {
390            return Self::new(0);
391        }
392
393        // Sort values for histogram building
394        values.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
395
396        let min = values.first().copied();
397        let max = values.last().copied();
398        let distinct_count = count_distinct(&values);
399        let histogram = EquiDepthHistogram::build(&values, num_buckets);
400
401        Self {
402            distinct_count,
403            null_count: 0,
404            min_value: min,
405            max_value: max,
406            histogram: Some(histogram),
407        }
408    }
409}
410
411/// Configurable selectivity defaults for cardinality estimation.
412///
413/// Controls the assumed selectivity for various predicate types when
414/// histogram or column statistics are unavailable. Adjusting these
415/// values can improve plan quality for workloads with known skew.
416#[derive(Debug, Clone)]
417pub struct SelectivityConfig {
418    /// Selectivity for unknown predicates (default: 0.1).
419    pub default: f64,
420    /// Selectivity for equality predicates without stats (default: 0.01).
421    pub equality: f64,
422    /// Selectivity for inequality predicates (default: 0.99).
423    pub inequality: f64,
424    /// Selectivity for range predicates without stats (default: 0.33).
425    pub range: f64,
426    /// Selectivity for string operations: STARTS WITH, ENDS WITH, CONTAINS, LIKE (default: 0.1).
427    pub string_ops: f64,
428    /// Selectivity for IN membership (default: 0.1).
429    pub membership: f64,
430    /// Selectivity for IS NULL (default: 0.05).
431    pub is_null: f64,
432    /// Selectivity for IS NOT NULL (default: 0.95).
433    pub is_not_null: f64,
434    /// Fraction assumed distinct for DISTINCT operations (default: 0.5).
435    pub distinct_fraction: f64,
436}
437
438impl SelectivityConfig {
439    /// Creates a new config with standard database defaults.
440    #[must_use]
441    pub fn new() -> Self {
442        Self {
443            default: 0.1,
444            equality: 0.01,
445            inequality: 0.99,
446            range: 0.33,
447            string_ops: 0.1,
448            membership: 0.1,
449            is_null: 0.05,
450            is_not_null: 0.95,
451            distinct_fraction: 0.5,
452        }
453    }
454}
455
456impl Default for SelectivityConfig {
457    fn default() -> Self {
458        Self::new()
459    }
460}
461
462/// A single estimate-vs-actual observation for analysis.
463#[derive(Debug, Clone)]
464pub struct EstimationEntry {
465    /// Human-readable label for the operator (e.g., "NodeScan(Person)").
466    pub operator: String,
467    /// The cardinality estimate produced by the optimizer.
468    pub estimated: f64,
469    /// The actual row count observed at execution time.
470    pub actual: f64,
471}
472
473impl EstimationEntry {
474    /// Returns the estimation error ratio (actual / estimated).
475    ///
476    /// Values near 1.0 indicate accurate estimates.
477    /// Values > 1.0 indicate underestimation.
478    /// Values < 1.0 indicate overestimation.
479    #[must_use]
480    pub fn error_ratio(&self) -> f64 {
481        if self.estimated.abs() < f64::EPSILON {
482            if self.actual.abs() < f64::EPSILON {
483                1.0
484            } else {
485                f64::INFINITY
486            }
487        } else {
488            self.actual / self.estimated
489        }
490    }
491}
492
493/// Collects estimate vs actual cardinality data for query plan analysis.
494///
495/// After executing a query, call [`record()`](Self::record) for each
496/// operator with its estimated and actual cardinalities. Then use
497/// [`should_replan()`](Self::should_replan) to decide whether the plan
498/// should be re-optimized.
499#[derive(Debug, Clone, Default)]
500pub struct EstimationLog {
501    /// Recorded entries.
502    entries: Vec<EstimationEntry>,
503    /// Error ratio threshold that triggers re-planning (default: 10.0).
504    ///
505    /// If any operator's error ratio exceeds this, `should_replan()` returns true.
506    replan_threshold: f64,
507}
508
509impl EstimationLog {
510    /// Creates a new estimation log with the given re-planning threshold.
511    #[must_use]
512    pub fn new(replan_threshold: f64) -> Self {
513        Self {
514            entries: Vec::new(),
515            replan_threshold,
516        }
517    }
518
519    /// Records an estimate-vs-actual observation.
520    pub fn record(&mut self, operator: impl Into<String>, estimated: f64, actual: f64) {
521        self.entries.push(EstimationEntry {
522            operator: operator.into(),
523            estimated,
524            actual,
525        });
526    }
527
528    /// Returns all recorded entries.
529    #[must_use]
530    pub fn entries(&self) -> &[EstimationEntry] {
531        &self.entries
532    }
533
534    /// Returns whether any operator's estimation error exceeds the threshold,
535    /// indicating the plan should be re-optimized.
536    #[must_use]
537    pub fn should_replan(&self) -> bool {
538        self.entries.iter().any(|e| {
539            let ratio = e.error_ratio();
540            ratio > self.replan_threshold || ratio < 1.0 / self.replan_threshold
541        })
542    }
543
544    /// Returns the maximum error ratio across all entries.
545    #[must_use]
546    pub fn max_error_ratio(&self) -> f64 {
547        self.entries
548            .iter()
549            .map(|e| {
550                let r = e.error_ratio();
551                // Normalize so both over- and under-estimation are > 1.0
552                if r < 1.0 { 1.0 / r } else { r }
553            })
554            .fold(1.0_f64, f64::max)
555    }
556
557    /// Clears all entries.
558    pub fn clear(&mut self) {
559        self.entries.clear();
560    }
561}
562
563/// Cardinality estimator.
564pub struct CardinalityEstimator {
565    /// Statistics for each label/table.
566    table_stats: HashMap<String, TableStats>,
567    /// Default row count for unknown tables.
568    default_row_count: u64,
569    /// Default selectivity for unknown predicates.
570    default_selectivity: f64,
571    /// Average edge fanout (outgoing edges per node).
572    avg_fanout: f64,
573    /// Configurable selectivity defaults.
574    selectivity_config: SelectivityConfig,
575    /// RDF statistics for triple pattern cardinality estimation.
576    rdf_statistics: Option<grafeo_core::statistics::RdfStatistics>,
577}
578
579impl CardinalityEstimator {
580    /// Creates a new cardinality estimator with default settings.
581    #[must_use]
582    pub fn new() -> Self {
583        let config = SelectivityConfig::new();
584        Self {
585            table_stats: HashMap::new(),
586            default_row_count: 1000,
587            default_selectivity: config.default,
588            avg_fanout: 10.0,
589            selectivity_config: config,
590            rdf_statistics: None,
591        }
592    }
593
594    /// Creates a new cardinality estimator with custom selectivity configuration.
595    #[must_use]
596    pub fn with_selectivity_config(config: SelectivityConfig) -> Self {
597        Self {
598            table_stats: HashMap::new(),
599            default_row_count: 1000,
600            default_selectivity: config.default,
601            avg_fanout: 10.0,
602            selectivity_config: config,
603            rdf_statistics: None,
604        }
605    }
606
607    /// Returns the current selectivity configuration.
608    #[must_use]
609    pub fn selectivity_config(&self) -> &SelectivityConfig {
610        &self.selectivity_config
611    }
612
613    /// Creates an estimation log with the default re-planning threshold (10x).
614    #[must_use]
615    pub fn create_estimation_log() -> EstimationLog {
616        EstimationLog::new(10.0)
617    }
618
619    /// Creates a cardinality estimator pre-populated from store statistics.
620    ///
621    /// Maps `LabelStatistics` to `TableStats` and computes the average edge
622    /// fanout from `EdgeTypeStatistics`. Falls back to defaults for any
623    /// missing statistics.
624    #[must_use]
625    pub fn from_statistics(stats: &grafeo_core::statistics::Statistics) -> Self {
626        let mut estimator = Self::new();
627
628        // Use total node count as default for unlabeled scans
629        if stats.total_nodes > 0 {
630            estimator.default_row_count = stats.total_nodes;
631        }
632
633        // Convert label statistics to optimizer table stats
634        for (label, label_stats) in &stats.labels {
635            let mut table_stats = TableStats::new(label_stats.node_count);
636
637            // Map property statistics (distinct count for selectivity estimation)
638            for (prop, col_stats) in &label_stats.properties {
639                let optimizer_col =
640                    ColumnStats::new(col_stats.distinct_count).with_nulls(col_stats.null_count);
641                table_stats = table_stats.with_column(prop, optimizer_col);
642            }
643
644            estimator.add_table_stats(label, table_stats);
645        }
646
647        // Compute average fanout from edge type statistics
648        if !stats.edge_types.is_empty() {
649            let total_out_degree: f64 = stats.edge_types.values().map(|e| e.avg_out_degree).sum();
650            estimator.avg_fanout = total_out_degree / stats.edge_types.len() as f64;
651        } else if stats.total_nodes > 0 {
652            estimator.avg_fanout = stats.total_edges as f64 / stats.total_nodes as f64;
653        }
654
655        // Clamp fanout to a reasonable minimum
656        if estimator.avg_fanout < 1.0 {
657            estimator.avg_fanout = 1.0;
658        }
659
660        estimator
661    }
662
663    /// Creates a cardinality estimator from RDF statistics.
664    ///
665    /// Uses triple pattern cardinality estimates for `TripleScan` operators
666    /// and join selectivity from per-predicate statistics.
667    #[must_use]
668    pub fn from_rdf_statistics(rdf_stats: grafeo_core::statistics::RdfStatistics) -> Self {
669        let mut estimator = Self::new();
670        if rdf_stats.total_triples > 0 {
671            estimator.default_row_count = rdf_stats.total_triples;
672        }
673        estimator.rdf_statistics = Some(rdf_stats);
674        estimator
675    }
676
677    /// Adds statistics for a table/label.
678    pub fn add_table_stats(&mut self, name: &str, stats: TableStats) {
679        self.table_stats.insert(name.to_string(), stats);
680    }
681
682    /// Sets the average edge fanout.
683    pub fn set_avg_fanout(&mut self, fanout: f64) {
684        self.avg_fanout = fanout;
685    }
686
687    /// Estimates the cardinality of a logical operator.
688    #[must_use]
689    pub fn estimate(&self, op: &LogicalOperator) -> f64 {
690        match op {
691            LogicalOperator::NodeScan(scan) => self.estimate_node_scan(scan),
692            LogicalOperator::Filter(filter) => self.estimate_filter(filter),
693            LogicalOperator::Project(project) => self.estimate_project(project),
694            LogicalOperator::Expand(expand) => self.estimate_expand(expand),
695            LogicalOperator::Join(join) => self.estimate_join(join),
696            LogicalOperator::Aggregate(agg) => self.estimate_aggregate(agg),
697            LogicalOperator::Sort(sort) => self.estimate_sort(sort),
698            LogicalOperator::Distinct(distinct) => self.estimate_distinct(distinct),
699            LogicalOperator::Limit(limit) => self.estimate_limit(limit),
700            LogicalOperator::Skip(skip) => self.estimate_skip(skip),
701            LogicalOperator::Return(ret) => self.estimate(&ret.input),
702            LogicalOperator::Empty => 0.0,
703            LogicalOperator::VectorScan(scan) => self.estimate_vector_scan(scan),
704            LogicalOperator::VectorJoin(join) => self.estimate_vector_join(join),
705            LogicalOperator::MultiWayJoin(mwj) => self.estimate_multi_way_join(mwj),
706            LogicalOperator::LeftJoin(lj) => self.estimate_left_join(lj),
707            LogicalOperator::TripleScan(scan) => self.estimate_triple_scan(scan),
708            _ => self.default_row_count as f64,
709        }
710    }
711
712    /// Estimates node scan cardinality.
713    fn estimate_node_scan(&self, scan: &NodeScanOp) -> f64 {
714        if let Some(label) = &scan.label
715            && let Some(stats) = self.table_stats.get(label)
716        {
717            return stats.row_count as f64;
718        }
719        // No label filter - scan all nodes
720        self.default_row_count as f64
721    }
722
723    /// Estimates triple scan cardinality using RDF statistics.
724    ///
725    /// If RDF statistics are available, uses the pattern binding (which positions
726    /// are bound vs variable) to produce accurate estimates. Otherwise falls back
727    /// to the default row count.
728    fn estimate_triple_scan(&self, scan: &TripleScanOp) -> f64 {
729        // If there's an input, the triple scan is chained: multiply input cardinality
730        // by the per-row expansion factor.
731        let base = if let Some(ref input) = scan.input {
732            self.estimate(input)
733        } else {
734            1.0
735        };
736
737        let Some(rdf_stats) = &self.rdf_statistics else {
738            return if scan.input.is_some() {
739                base * self.default_row_count as f64
740            } else {
741                self.default_row_count as f64
742            };
743        };
744
745        let subject_bound = matches!(
746            scan.subject,
747            TripleComponent::Iri(_)
748                | TripleComponent::Literal(_)
749                | TripleComponent::LangLiteral { .. }
750        );
751        let object_bound = matches!(
752            scan.object,
753            TripleComponent::Iri(_)
754                | TripleComponent::Literal(_)
755                | TripleComponent::LangLiteral { .. }
756        );
757        let predicate_iri = match &scan.predicate {
758            TripleComponent::Iri(iri) => Some(iri.as_str()),
759            _ => None,
760        };
761
762        let pattern_card = rdf_stats.estimate_triple_pattern_cardinality(
763            subject_bound,
764            predicate_iri,
765            object_bound,
766        );
767
768        if scan.input.is_some() {
769            // Chained scan: each input row expands by the pattern's selectivity
770            let selectivity = if rdf_stats.total_triples > 0 {
771                pattern_card / rdf_stats.total_triples as f64
772            } else {
773                1.0
774            };
775            (base * pattern_card * selectivity).max(1.0)
776        } else {
777            pattern_card.max(1.0)
778        }
779    }
780
781    /// Estimates filter cardinality.
782    fn estimate_filter(&self, filter: &FilterOp) -> f64 {
783        let input_cardinality = self.estimate(&filter.input);
784        let selectivity = self.estimate_selectivity(&filter.predicate);
785        (input_cardinality * selectivity).max(1.0)
786    }
787
788    /// Estimates projection cardinality (same as input).
789    fn estimate_project(&self, project: &ProjectOp) -> f64 {
790        self.estimate(&project.input)
791    }
792
793    /// Estimates expand cardinality.
794    fn estimate_expand(&self, expand: &ExpandOp) -> f64 {
795        let input_cardinality = self.estimate(&expand.input);
796
797        // Apply fanout based on edge type
798        let fanout = if !expand.edge_types.is_empty() {
799            // Specific edge type(s) typically have lower fanout
800            self.avg_fanout * 0.5
801        } else {
802            self.avg_fanout
803        };
804
805        // Handle variable-length paths
806        let path_multiplier = if expand.max_hops.unwrap_or(1) > 1 {
807            let min = expand.min_hops as f64;
808            let max = expand.max_hops.unwrap_or(expand.min_hops + 3) as f64;
809            // Geometric series approximation
810            (fanout.powf(max + 1.0) - fanout.powf(min)) / (fanout - 1.0)
811        } else {
812            fanout
813        };
814
815        (input_cardinality * path_multiplier).max(1.0)
816    }
817
818    /// Estimates join cardinality.
819    fn estimate_join(&self, join: &JoinOp) -> f64 {
820        let left_card = self.estimate(&join.left);
821        let right_card = self.estimate(&join.right);
822
823        match join.join_type {
824            JoinType::Cross => left_card * right_card,
825            JoinType::Inner => {
826                // Assume join selectivity based on conditions
827                let selectivity = if join.conditions.is_empty() {
828                    1.0 // Cross join
829                } else {
830                    // Estimate based on number of conditions
831                    0.1_f64.powi(join.conditions.len() as i32)
832                };
833                (left_card * right_card * selectivity).max(1.0)
834            }
835            JoinType::Left => {
836                // Left join returns at least all left rows
837                let inner_card = self.estimate_join(&JoinOp {
838                    left: join.left.clone(),
839                    right: join.right.clone(),
840                    join_type: JoinType::Inner,
841                    conditions: join.conditions.clone(),
842                });
843                inner_card.max(left_card)
844            }
845            JoinType::Right => {
846                // Right join returns at least all right rows
847                let inner_card = self.estimate_join(&JoinOp {
848                    left: join.left.clone(),
849                    right: join.right.clone(),
850                    join_type: JoinType::Inner,
851                    conditions: join.conditions.clone(),
852                });
853                inner_card.max(right_card)
854            }
855            JoinType::Full => {
856                // Full join returns at least max(left, right)
857                let inner_card = self.estimate_join(&JoinOp {
858                    left: join.left.clone(),
859                    right: join.right.clone(),
860                    join_type: JoinType::Inner,
861                    conditions: join.conditions.clone(),
862                });
863                inner_card.max(left_card.max(right_card))
864            }
865            JoinType::Semi => {
866                // Semi join returns at most left cardinality
867                (left_card * self.default_selectivity).max(1.0)
868            }
869            JoinType::Anti => {
870                // Anti join returns at most left cardinality
871                (left_card * (1.0 - self.default_selectivity)).max(1.0)
872            }
873        }
874    }
875
876    /// Estimates left join cardinality (OPTIONAL MATCH).
877    ///
878    /// A left outer join preserves all left rows, so the output is at least
879    /// `left_cardinality`. When the right side matches, the output may be
880    /// larger (one left row can match multiple right rows).
881    ///
882    /// When the join carries a cross-side condition (null-safe predicates),
883    /// each AND-conjunct reduces the selectivity estimate.
884    fn estimate_left_join(&self, lj: &LeftJoinOp) -> f64 {
885        let left_card = self.estimate(&lj.left);
886        let right_card = self.estimate(&lj.right);
887
888        // Adjust selectivity based on the number of AND conjuncts in the
889        // cross-side condition: each equality reduces match probability.
890        let condition_selectivity = if let Some(cond) = &lj.condition {
891            let n = count_and_conjuncts(cond);
892            self.default_selectivity.powi(n as i32)
893        } else {
894            self.default_selectivity
895        };
896
897        // Estimate as inner join cardinality, but guaranteed at least left_card
898        let inner_estimate = left_card * right_card * condition_selectivity;
899        inner_estimate.max(left_card).max(1.0)
900    }
901
902    /// Estimates aggregation cardinality.
903    fn estimate_aggregate(&self, agg: &AggregateOp) -> f64 {
904        let input_cardinality = self.estimate(&agg.input);
905
906        if agg.group_by.is_empty() {
907            // Global aggregation - single row
908            1.0
909        } else {
910            // Group by - estimate distinct groups
911            // Assume each group key reduces cardinality by 10
912            let group_reduction = 10.0_f64.powi(agg.group_by.len() as i32);
913            (input_cardinality / group_reduction).max(1.0)
914        }
915    }
916
917    /// Estimates sort cardinality (same as input).
918    fn estimate_sort(&self, sort: &SortOp) -> f64 {
919        self.estimate(&sort.input)
920    }
921
922    /// Estimates distinct cardinality.
923    fn estimate_distinct(&self, distinct: &DistinctOp) -> f64 {
924        let input_cardinality = self.estimate(&distinct.input);
925        (input_cardinality * self.selectivity_config.distinct_fraction).max(1.0)
926    }
927
928    /// Estimates limit cardinality.
929    fn estimate_limit(&self, limit: &LimitOp) -> f64 {
930        let input_cardinality = self.estimate(&limit.input);
931        limit.count.estimate().min(input_cardinality)
932    }
933
934    /// Estimates skip cardinality.
935    fn estimate_skip(&self, skip: &SkipOp) -> f64 {
936        let input_cardinality = self.estimate(&skip.input);
937        (input_cardinality - skip.count.estimate()).max(0.0)
938    }
939
940    /// Estimates vector scan cardinality.
941    ///
942    /// Vector scan returns at most k results (the k nearest neighbors).
943    /// With similarity/distance filters, it may return fewer.
944    fn estimate_vector_scan(&self, scan: &VectorScanOp) -> f64 {
945        let base_k = scan.k as f64;
946
947        // Apply filter selectivity if thresholds are specified
948        let selectivity = if scan.min_similarity.is_some() || scan.max_distance.is_some() {
949            // Assume 70% of results pass threshold filters
950            0.7
951        } else {
952            1.0
953        };
954
955        (base_k * selectivity).max(1.0)
956    }
957
958    /// Estimates vector join cardinality.
959    ///
960    /// Vector join produces up to k results per input row.
961    fn estimate_vector_join(&self, join: &VectorJoinOp) -> f64 {
962        let input_cardinality = self.estimate(&join.input);
963        let k = join.k as f64;
964
965        // Apply filter selectivity if thresholds are specified
966        let selectivity = if join.min_similarity.is_some() || join.max_distance.is_some() {
967            0.7
968        } else {
969            1.0
970        };
971
972        (input_cardinality * k * selectivity).max(1.0)
973    }
974
975    /// Estimates multi-way join cardinality using the AGM bound heuristic.
976    ///
977    /// For a cyclic join of N relations, the AGM (Atserias-Grohe-Marx) bound
978    /// gives min(cardinality)^(N/2) as a worst-case output size estimate.
979    fn estimate_multi_way_join(&self, mwj: &MultiWayJoinOp) -> f64 {
980        if mwj.inputs.is_empty() {
981            return 0.0;
982        }
983        let cardinalities: Vec<f64> = mwj
984            .inputs
985            .iter()
986            .map(|input| self.estimate(input))
987            .collect();
988        let min_card = cardinalities.iter().copied().fold(f64::INFINITY, f64::min);
989        let n = cardinalities.len() as f64;
990        // AGM bound: min(cardinality)^(n/2)
991        (min_card.powf(n / 2.0)).max(1.0)
992    }
993
994    /// Estimates the selectivity of a predicate (0.0 to 1.0).
995    fn estimate_selectivity(&self, expr: &LogicalExpression) -> f64 {
996        match expr {
997            LogicalExpression::Binary { left, op, right } => {
998                self.estimate_binary_selectivity(left, *op, right)
999            }
1000            LogicalExpression::Unary { op, operand } => {
1001                self.estimate_unary_selectivity(*op, operand)
1002            }
1003            LogicalExpression::Literal(value) => {
1004                // Boolean literal
1005                if let grafeo_common::types::Value::Bool(b) = value {
1006                    if *b { 1.0 } else { 0.0 }
1007                } else {
1008                    self.default_selectivity
1009                }
1010            }
1011            _ => self.default_selectivity,
1012        }
1013    }
1014
1015    /// Estimates binary expression selectivity.
1016    fn estimate_binary_selectivity(
1017        &self,
1018        left: &LogicalExpression,
1019        op: BinaryOp,
1020        right: &LogicalExpression,
1021    ) -> f64 {
1022        match op {
1023            // Equality - try histogram-based estimation
1024            BinaryOp::Eq => {
1025                if let Some(selectivity) = self.try_equality_selectivity(left, right) {
1026                    return selectivity;
1027                }
1028                self.selectivity_config.equality
1029            }
1030            // Inequality is very unselective
1031            BinaryOp::Ne => self.selectivity_config.inequality,
1032            // Range predicates - use histogram if available
1033            BinaryOp::Lt | BinaryOp::Le | BinaryOp::Gt | BinaryOp::Ge => {
1034                if let Some(selectivity) = self.try_range_selectivity(left, op, right) {
1035                    return selectivity;
1036                }
1037                self.selectivity_config.range
1038            }
1039            // Logical operators - recursively estimate sub-expressions
1040            BinaryOp::And => {
1041                let left_sel = self.estimate_selectivity(left);
1042                let right_sel = self.estimate_selectivity(right);
1043                // AND reduces selectivity (multiply assuming independence)
1044                left_sel * right_sel
1045            }
1046            BinaryOp::Or => {
1047                let left_sel = self.estimate_selectivity(left);
1048                let right_sel = self.estimate_selectivity(right);
1049                // OR: P(A ∪ B) = P(A) + P(B) - P(A ∩ B)
1050                // Assuming independence: P(A ∩ B) = P(A) * P(B)
1051                (left_sel + right_sel - left_sel * right_sel).min(1.0)
1052            }
1053            // String operations
1054            BinaryOp::StartsWith | BinaryOp::EndsWith | BinaryOp::Contains | BinaryOp::Like => {
1055                self.selectivity_config.string_ops
1056            }
1057            // Collection membership
1058            BinaryOp::In => self.selectivity_config.membership,
1059            // Other operations
1060            _ => self.default_selectivity,
1061        }
1062    }
1063
1064    /// Tries to estimate equality selectivity using histograms.
1065    fn try_equality_selectivity(
1066        &self,
1067        left: &LogicalExpression,
1068        right: &LogicalExpression,
1069    ) -> Option<f64> {
1070        // Extract property access and literal value
1071        let (label, column, value) = self.extract_column_and_value(left, right)?;
1072
1073        // Get column stats with histogram
1074        let stats = self.get_column_stats(&label, &column)?;
1075
1076        // Try histogram-based estimation
1077        if let Some(ref histogram) = stats.histogram {
1078            return Some(histogram.equality_selectivity(value));
1079        }
1080
1081        // Fall back to distinct count estimation
1082        if stats.distinct_count > 0 {
1083            return Some(1.0 / stats.distinct_count as f64);
1084        }
1085
1086        None
1087    }
1088
1089    /// Tries to estimate range selectivity using histograms.
1090    fn try_range_selectivity(
1091        &self,
1092        left: &LogicalExpression,
1093        op: BinaryOp,
1094        right: &LogicalExpression,
1095    ) -> Option<f64> {
1096        // Extract property access and literal value
1097        let (label, column, value) = self.extract_column_and_value(left, right)?;
1098
1099        // Get column stats
1100        let stats = self.get_column_stats(&label, &column)?;
1101
1102        // Determine the range based on operator
1103        let (lower, upper) = match op {
1104            BinaryOp::Lt => (None, Some(value)),
1105            BinaryOp::Le => (None, Some(value + f64::EPSILON)),
1106            BinaryOp::Gt => (Some(value + f64::EPSILON), None),
1107            BinaryOp::Ge => (Some(value), None),
1108            _ => return None,
1109        };
1110
1111        // Try histogram-based estimation first
1112        if let Some(ref histogram) = stats.histogram {
1113            return Some(histogram.range_selectivity(lower, upper));
1114        }
1115
1116        // Fall back to min/max range estimation
1117        if let (Some(min), Some(max)) = (stats.min_value, stats.max_value) {
1118            let range = max - min;
1119            if range <= 0.0 {
1120                return Some(1.0);
1121            }
1122
1123            let effective_lower = lower.unwrap_or(min).max(min);
1124            let effective_upper = upper.unwrap_or(max).min(max);
1125            let overlap = (effective_upper - effective_lower).max(0.0);
1126            return Some((overlap / range).clamp(0.0, 1.0));
1127        }
1128
1129        None
1130    }
1131
1132    /// Extracts column information and literal value from a comparison.
1133    ///
1134    /// Returns (label, column_name, numeric_value) if the expression is
1135    /// a comparison between a property access and a numeric literal.
1136    fn extract_column_and_value(
1137        &self,
1138        left: &LogicalExpression,
1139        right: &LogicalExpression,
1140    ) -> Option<(String, String, f64)> {
1141        // Try left as property, right as literal
1142        if let Some(result) = self.try_extract_property_literal(left, right) {
1143            return Some(result);
1144        }
1145
1146        // Try right as property, left as literal
1147        self.try_extract_property_literal(right, left)
1148    }
1149
1150    /// Tries to extract property and literal from a specific ordering.
1151    fn try_extract_property_literal(
1152        &self,
1153        property_expr: &LogicalExpression,
1154        literal_expr: &LogicalExpression,
1155    ) -> Option<(String, String, f64)> {
1156        // Extract property access
1157        let (variable, property) = match property_expr {
1158            LogicalExpression::Property { variable, property } => {
1159                (variable.clone(), property.clone())
1160            }
1161            _ => return None,
1162        };
1163
1164        // Extract numeric literal
1165        let value = match literal_expr {
1166            LogicalExpression::Literal(grafeo_common::types::Value::Int64(n)) => *n as f64,
1167            LogicalExpression::Literal(grafeo_common::types::Value::Float64(f)) => *f,
1168            _ => return None,
1169        };
1170
1171        // Try to find a label for this variable from table stats
1172        // Use the variable name as a heuristic label lookup
1173        // In practice, the optimizer would track which labels variables are bound to
1174        for label in self.table_stats.keys() {
1175            if let Some(stats) = self.table_stats.get(label)
1176                && stats.columns.contains_key(&property)
1177            {
1178                return Some((label.clone(), property, value));
1179            }
1180        }
1181
1182        // If no stats found but we have the property, return with variable as label
1183        Some((variable, property, value))
1184    }
1185
1186    /// Estimates unary expression selectivity.
1187    fn estimate_unary_selectivity(&self, op: UnaryOp, _operand: &LogicalExpression) -> f64 {
1188        match op {
1189            UnaryOp::Not => 1.0 - self.default_selectivity,
1190            UnaryOp::IsNull => self.selectivity_config.is_null,
1191            UnaryOp::IsNotNull => self.selectivity_config.is_not_null,
1192            UnaryOp::Neg => 1.0, // Negation doesn't change cardinality
1193        }
1194    }
1195
1196    /// Gets statistics for a column.
1197    fn get_column_stats(&self, label: &str, column: &str) -> Option<&ColumnStats> {
1198        self.table_stats.get(label)?.columns.get(column)
1199    }
1200}
1201
1202impl Default for CardinalityEstimator {
1203    fn default() -> Self {
1204        Self::new()
1205    }
1206}
1207
1208#[cfg(test)]
1209mod tests {
1210    use super::*;
1211    use crate::query::plan::{
1212        DistinctOp, ExpandDirection, ExpandOp, FilterOp, JoinCondition, NodeScanOp, PathMode,
1213        ProjectOp, Projection, ReturnItem, ReturnOp, SkipOp, SortKey, SortOp, SortOrder,
1214    };
1215    use grafeo_common::types::Value;
1216
1217    #[test]
1218    fn test_node_scan_with_stats() {
1219        let mut estimator = CardinalityEstimator::new();
1220        estimator.add_table_stats("Person", TableStats::new(5000));
1221
1222        let scan = LogicalOperator::NodeScan(NodeScanOp {
1223            variable: "n".to_string(),
1224            label: Some("Person".to_string()),
1225            input: None,
1226        });
1227
1228        let cardinality = estimator.estimate(&scan);
1229        assert!((cardinality - 5000.0).abs() < 0.001);
1230    }
1231
1232    #[test]
1233    fn test_filter_reduces_cardinality() {
1234        let mut estimator = CardinalityEstimator::new();
1235        estimator.add_table_stats("Person", TableStats::new(1000));
1236
1237        let filter = LogicalOperator::Filter(FilterOp {
1238            predicate: LogicalExpression::Binary {
1239                left: Box::new(LogicalExpression::Property {
1240                    variable: "n".to_string(),
1241                    property: "age".to_string(),
1242                }),
1243                op: BinaryOp::Eq,
1244                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1245            },
1246            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1247                variable: "n".to_string(),
1248                label: Some("Person".to_string()),
1249                input: None,
1250            })),
1251            pushdown_hint: None,
1252        });
1253
1254        let cardinality = estimator.estimate(&filter);
1255        // Equality selectivity is 0.01, so 1000 * 0.01 = 10
1256        assert!(cardinality < 1000.0);
1257        assert!(cardinality >= 1.0);
1258    }
1259
1260    #[test]
1261    fn test_join_cardinality() {
1262        let mut estimator = CardinalityEstimator::new();
1263        estimator.add_table_stats("Person", TableStats::new(1000));
1264        estimator.add_table_stats("Company", TableStats::new(100));
1265
1266        let join = LogicalOperator::Join(JoinOp {
1267            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1268                variable: "p".to_string(),
1269                label: Some("Person".to_string()),
1270                input: None,
1271            })),
1272            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1273                variable: "c".to_string(),
1274                label: Some("Company".to_string()),
1275                input: None,
1276            })),
1277            join_type: JoinType::Inner,
1278            conditions: vec![JoinCondition {
1279                left: LogicalExpression::Property {
1280                    variable: "p".to_string(),
1281                    property: "company_id".to_string(),
1282                },
1283                right: LogicalExpression::Property {
1284                    variable: "c".to_string(),
1285                    property: "id".to_string(),
1286                },
1287            }],
1288        });
1289
1290        let cardinality = estimator.estimate(&join);
1291        // Should be less than cross product
1292        assert!(cardinality < 1000.0 * 100.0);
1293    }
1294
1295    #[test]
1296    fn test_limit_caps_cardinality() {
1297        let mut estimator = CardinalityEstimator::new();
1298        estimator.add_table_stats("Person", TableStats::new(1000));
1299
1300        let limit = LogicalOperator::Limit(LimitOp {
1301            count: 10.into(),
1302            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1303                variable: "n".to_string(),
1304                label: Some("Person".to_string()),
1305                input: None,
1306            })),
1307        });
1308
1309        let cardinality = estimator.estimate(&limit);
1310        assert!((cardinality - 10.0).abs() < 0.001);
1311    }
1312
1313    #[test]
1314    fn test_aggregate_reduces_cardinality() {
1315        let mut estimator = CardinalityEstimator::new();
1316        estimator.add_table_stats("Person", TableStats::new(1000));
1317
1318        // Global aggregation
1319        let global_agg = LogicalOperator::Aggregate(AggregateOp {
1320            group_by: vec![],
1321            aggregates: vec![],
1322            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1323                variable: "n".to_string(),
1324                label: Some("Person".to_string()),
1325                input: None,
1326            })),
1327            having: None,
1328        });
1329
1330        let cardinality = estimator.estimate(&global_agg);
1331        assert!((cardinality - 1.0).abs() < 0.001);
1332
1333        // Group by aggregation
1334        let group_agg = LogicalOperator::Aggregate(AggregateOp {
1335            group_by: vec![LogicalExpression::Property {
1336                variable: "n".to_string(),
1337                property: "city".to_string(),
1338            }],
1339            aggregates: vec![],
1340            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1341                variable: "n".to_string(),
1342                label: Some("Person".to_string()),
1343                input: None,
1344            })),
1345            having: None,
1346        });
1347
1348        let cardinality = estimator.estimate(&group_agg);
1349        // Should be less than input
1350        assert!(cardinality < 1000.0);
1351    }
1352
1353    #[test]
1354    fn test_node_scan_without_stats() {
1355        let estimator = CardinalityEstimator::new();
1356
1357        let scan = LogicalOperator::NodeScan(NodeScanOp {
1358            variable: "n".to_string(),
1359            label: Some("Unknown".to_string()),
1360            input: None,
1361        });
1362
1363        let cardinality = estimator.estimate(&scan);
1364        // Should return default (1000)
1365        assert!((cardinality - 1000.0).abs() < 0.001);
1366    }
1367
1368    #[test]
1369    fn test_node_scan_no_label() {
1370        let estimator = CardinalityEstimator::new();
1371
1372        let scan = LogicalOperator::NodeScan(NodeScanOp {
1373            variable: "n".to_string(),
1374            label: None,
1375            input: None,
1376        });
1377
1378        let cardinality = estimator.estimate(&scan);
1379        // Should scan all nodes (default)
1380        assert!((cardinality - 1000.0).abs() < 0.001);
1381    }
1382
1383    #[test]
1384    fn test_filter_inequality_selectivity() {
1385        let mut estimator = CardinalityEstimator::new();
1386        estimator.add_table_stats("Person", TableStats::new(1000));
1387
1388        let filter = LogicalOperator::Filter(FilterOp {
1389            predicate: LogicalExpression::Binary {
1390                left: Box::new(LogicalExpression::Property {
1391                    variable: "n".to_string(),
1392                    property: "age".to_string(),
1393                }),
1394                op: BinaryOp::Ne,
1395                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1396            },
1397            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1398                variable: "n".to_string(),
1399                label: Some("Person".to_string()),
1400                input: None,
1401            })),
1402            pushdown_hint: None,
1403        });
1404
1405        let cardinality = estimator.estimate(&filter);
1406        // Inequality selectivity is 0.99, so 1000 * 0.99 = 990
1407        assert!(cardinality > 900.0);
1408    }
1409
1410    #[test]
1411    fn test_filter_range_selectivity() {
1412        let mut estimator = CardinalityEstimator::new();
1413        estimator.add_table_stats("Person", TableStats::new(1000));
1414
1415        let filter = LogicalOperator::Filter(FilterOp {
1416            predicate: LogicalExpression::Binary {
1417                left: Box::new(LogicalExpression::Property {
1418                    variable: "n".to_string(),
1419                    property: "age".to_string(),
1420                }),
1421                op: BinaryOp::Gt,
1422                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1423            },
1424            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1425                variable: "n".to_string(),
1426                label: Some("Person".to_string()),
1427                input: None,
1428            })),
1429            pushdown_hint: None,
1430        });
1431
1432        let cardinality = estimator.estimate(&filter);
1433        // Range selectivity is 0.33, so 1000 * 0.33 = 330
1434        assert!(cardinality < 500.0);
1435        assert!(cardinality > 100.0);
1436    }
1437
1438    #[test]
1439    fn test_filter_and_selectivity() {
1440        let mut estimator = CardinalityEstimator::new();
1441        estimator.add_table_stats("Person", TableStats::new(1000));
1442
1443        // Test AND with two equality predicates
1444        // Each equality has selectivity 0.01, so AND gives 0.01 * 0.01 = 0.0001
1445        let filter = LogicalOperator::Filter(FilterOp {
1446            predicate: LogicalExpression::Binary {
1447                left: Box::new(LogicalExpression::Binary {
1448                    left: Box::new(LogicalExpression::Property {
1449                        variable: "n".to_string(),
1450                        property: "city".to_string(),
1451                    }),
1452                    op: BinaryOp::Eq,
1453                    right: Box::new(LogicalExpression::Literal(Value::String("NYC".into()))),
1454                }),
1455                op: BinaryOp::And,
1456                right: Box::new(LogicalExpression::Binary {
1457                    left: Box::new(LogicalExpression::Property {
1458                        variable: "n".to_string(),
1459                        property: "age".to_string(),
1460                    }),
1461                    op: BinaryOp::Eq,
1462                    right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
1463                }),
1464            },
1465            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1466                variable: "n".to_string(),
1467                label: Some("Person".to_string()),
1468                input: None,
1469            })),
1470            pushdown_hint: None,
1471        });
1472
1473        let cardinality = estimator.estimate(&filter);
1474        // AND reduces selectivity (multiply): 0.01 * 0.01 = 0.0001
1475        // 1000 * 0.0001 = 0.1, min is 1.0
1476        assert!(cardinality < 100.0);
1477        assert!(cardinality >= 1.0);
1478    }
1479
1480    #[test]
1481    fn test_filter_or_selectivity() {
1482        let mut estimator = CardinalityEstimator::new();
1483        estimator.add_table_stats("Person", TableStats::new(1000));
1484
1485        // Test OR with two equality predicates
1486        // Each equality has selectivity 0.01
1487        // OR gives: 0.01 + 0.01 - (0.01 * 0.01) = 0.0199
1488        let filter = LogicalOperator::Filter(FilterOp {
1489            predicate: LogicalExpression::Binary {
1490                left: Box::new(LogicalExpression::Binary {
1491                    left: Box::new(LogicalExpression::Property {
1492                        variable: "n".to_string(),
1493                        property: "city".to_string(),
1494                    }),
1495                    op: BinaryOp::Eq,
1496                    right: Box::new(LogicalExpression::Literal(Value::String("NYC".into()))),
1497                }),
1498                op: BinaryOp::Or,
1499                right: Box::new(LogicalExpression::Binary {
1500                    left: Box::new(LogicalExpression::Property {
1501                        variable: "n".to_string(),
1502                        property: "city".to_string(),
1503                    }),
1504                    op: BinaryOp::Eq,
1505                    right: Box::new(LogicalExpression::Literal(Value::String("LA".into()))),
1506                }),
1507            },
1508            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1509                variable: "n".to_string(),
1510                label: Some("Person".to_string()),
1511                input: None,
1512            })),
1513            pushdown_hint: None,
1514        });
1515
1516        let cardinality = estimator.estimate(&filter);
1517        // OR: 0.01 + 0.01 - 0.0001 ≈ 0.0199, so 1000 * 0.0199 ≈ 19.9
1518        assert!(cardinality < 100.0);
1519        assert!(cardinality >= 1.0);
1520    }
1521
1522    #[test]
1523    fn test_filter_literal_true() {
1524        let mut estimator = CardinalityEstimator::new();
1525        estimator.add_table_stats("Person", TableStats::new(1000));
1526
1527        let filter = LogicalOperator::Filter(FilterOp {
1528            predicate: LogicalExpression::Literal(Value::Bool(true)),
1529            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1530                variable: "n".to_string(),
1531                label: Some("Person".to_string()),
1532                input: None,
1533            })),
1534            pushdown_hint: None,
1535        });
1536
1537        let cardinality = estimator.estimate(&filter);
1538        // Literal true has selectivity 1.0
1539        assert!((cardinality - 1000.0).abs() < 0.001);
1540    }
1541
1542    #[test]
1543    fn test_filter_literal_false() {
1544        let mut estimator = CardinalityEstimator::new();
1545        estimator.add_table_stats("Person", TableStats::new(1000));
1546
1547        let filter = LogicalOperator::Filter(FilterOp {
1548            predicate: LogicalExpression::Literal(Value::Bool(false)),
1549            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1550                variable: "n".to_string(),
1551                label: Some("Person".to_string()),
1552                input: None,
1553            })),
1554            pushdown_hint: None,
1555        });
1556
1557        let cardinality = estimator.estimate(&filter);
1558        // Literal false has selectivity 0.0, but min is 1.0
1559        assert!((cardinality - 1.0).abs() < 0.001);
1560    }
1561
1562    #[test]
1563    fn test_unary_not_selectivity() {
1564        let mut estimator = CardinalityEstimator::new();
1565        estimator.add_table_stats("Person", TableStats::new(1000));
1566
1567        let filter = LogicalOperator::Filter(FilterOp {
1568            predicate: LogicalExpression::Unary {
1569                op: UnaryOp::Not,
1570                operand: Box::new(LogicalExpression::Literal(Value::Bool(true))),
1571            },
1572            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1573                variable: "n".to_string(),
1574                label: Some("Person".to_string()),
1575                input: None,
1576            })),
1577            pushdown_hint: None,
1578        });
1579
1580        let cardinality = estimator.estimate(&filter);
1581        // NOT inverts selectivity
1582        assert!(cardinality < 1000.0);
1583    }
1584
1585    #[test]
1586    fn test_unary_is_null_selectivity() {
1587        let mut estimator = CardinalityEstimator::new();
1588        estimator.add_table_stats("Person", TableStats::new(1000));
1589
1590        let filter = LogicalOperator::Filter(FilterOp {
1591            predicate: LogicalExpression::Unary {
1592                op: UnaryOp::IsNull,
1593                operand: Box::new(LogicalExpression::Variable("x".to_string())),
1594            },
1595            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1596                variable: "n".to_string(),
1597                label: Some("Person".to_string()),
1598                input: None,
1599            })),
1600            pushdown_hint: None,
1601        });
1602
1603        let cardinality = estimator.estimate(&filter);
1604        // IS NULL has selectivity 0.05
1605        assert!(cardinality < 100.0);
1606    }
1607
1608    #[test]
1609    fn test_expand_cardinality() {
1610        let mut estimator = CardinalityEstimator::new();
1611        estimator.add_table_stats("Person", TableStats::new(100));
1612
1613        let expand = LogicalOperator::Expand(ExpandOp {
1614            from_variable: "a".to_string(),
1615            to_variable: "b".to_string(),
1616            edge_variable: None,
1617            direction: ExpandDirection::Outgoing,
1618            edge_types: vec![],
1619            min_hops: 1,
1620            max_hops: Some(1),
1621            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1622                variable: "a".to_string(),
1623                label: Some("Person".to_string()),
1624                input: None,
1625            })),
1626            path_alias: None,
1627            path_mode: PathMode::Walk,
1628        });
1629
1630        let cardinality = estimator.estimate(&expand);
1631        // Expand multiplies by fanout (10)
1632        assert!(cardinality > 100.0);
1633    }
1634
1635    #[test]
1636    fn test_expand_with_edge_type_filter() {
1637        let mut estimator = CardinalityEstimator::new();
1638        estimator.add_table_stats("Person", TableStats::new(100));
1639
1640        let expand = LogicalOperator::Expand(ExpandOp {
1641            from_variable: "a".to_string(),
1642            to_variable: "b".to_string(),
1643            edge_variable: None,
1644            direction: ExpandDirection::Outgoing,
1645            edge_types: vec!["KNOWS".to_string()],
1646            min_hops: 1,
1647            max_hops: Some(1),
1648            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1649                variable: "a".to_string(),
1650                label: Some("Person".to_string()),
1651                input: None,
1652            })),
1653            path_alias: None,
1654            path_mode: PathMode::Walk,
1655        });
1656
1657        let cardinality = estimator.estimate(&expand);
1658        // With edge type, fanout is reduced by half
1659        assert!(cardinality > 100.0);
1660    }
1661
1662    #[test]
1663    fn test_expand_variable_length() {
1664        let mut estimator = CardinalityEstimator::new();
1665        estimator.add_table_stats("Person", TableStats::new(100));
1666
1667        let expand = LogicalOperator::Expand(ExpandOp {
1668            from_variable: "a".to_string(),
1669            to_variable: "b".to_string(),
1670            edge_variable: None,
1671            direction: ExpandDirection::Outgoing,
1672            edge_types: vec![],
1673            min_hops: 1,
1674            max_hops: Some(3),
1675            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1676                variable: "a".to_string(),
1677                label: Some("Person".to_string()),
1678                input: None,
1679            })),
1680            path_alias: None,
1681            path_mode: PathMode::Walk,
1682        });
1683
1684        let cardinality = estimator.estimate(&expand);
1685        // Variable length path has much higher cardinality
1686        assert!(cardinality > 500.0);
1687    }
1688
1689    #[test]
1690    fn test_join_cross_product() {
1691        let mut estimator = CardinalityEstimator::new();
1692        estimator.add_table_stats("Person", TableStats::new(100));
1693        estimator.add_table_stats("Company", TableStats::new(50));
1694
1695        let join = LogicalOperator::Join(JoinOp {
1696            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1697                variable: "p".to_string(),
1698                label: Some("Person".to_string()),
1699                input: None,
1700            })),
1701            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1702                variable: "c".to_string(),
1703                label: Some("Company".to_string()),
1704                input: None,
1705            })),
1706            join_type: JoinType::Cross,
1707            conditions: vec![],
1708        });
1709
1710        let cardinality = estimator.estimate(&join);
1711        // Cross join = 100 * 50 = 5000
1712        assert!((cardinality - 5000.0).abs() < 0.001);
1713    }
1714
1715    #[test]
1716    fn test_join_left_outer() {
1717        let mut estimator = CardinalityEstimator::new();
1718        estimator.add_table_stats("Person", TableStats::new(1000));
1719        estimator.add_table_stats("Company", TableStats::new(10));
1720
1721        let join = LogicalOperator::Join(JoinOp {
1722            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1723                variable: "p".to_string(),
1724                label: Some("Person".to_string()),
1725                input: None,
1726            })),
1727            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1728                variable: "c".to_string(),
1729                label: Some("Company".to_string()),
1730                input: None,
1731            })),
1732            join_type: JoinType::Left,
1733            conditions: vec![JoinCondition {
1734                left: LogicalExpression::Variable("p".to_string()),
1735                right: LogicalExpression::Variable("c".to_string()),
1736            }],
1737        });
1738
1739        let cardinality = estimator.estimate(&join);
1740        // Left join returns at least all left rows
1741        assert!(cardinality >= 1000.0);
1742    }
1743
1744    #[test]
1745    fn test_join_semi() {
1746        let mut estimator = CardinalityEstimator::new();
1747        estimator.add_table_stats("Person", TableStats::new(1000));
1748        estimator.add_table_stats("Company", TableStats::new(100));
1749
1750        let join = LogicalOperator::Join(JoinOp {
1751            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1752                variable: "p".to_string(),
1753                label: Some("Person".to_string()),
1754                input: None,
1755            })),
1756            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1757                variable: "c".to_string(),
1758                label: Some("Company".to_string()),
1759                input: None,
1760            })),
1761            join_type: JoinType::Semi,
1762            conditions: vec![],
1763        });
1764
1765        let cardinality = estimator.estimate(&join);
1766        // Semi join returns at most left cardinality
1767        assert!(cardinality <= 1000.0);
1768    }
1769
1770    #[test]
1771    fn test_join_anti() {
1772        let mut estimator = CardinalityEstimator::new();
1773        estimator.add_table_stats("Person", TableStats::new(1000));
1774        estimator.add_table_stats("Company", TableStats::new(100));
1775
1776        let join = LogicalOperator::Join(JoinOp {
1777            left: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1778                variable: "p".to_string(),
1779                label: Some("Person".to_string()),
1780                input: None,
1781            })),
1782            right: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1783                variable: "c".to_string(),
1784                label: Some("Company".to_string()),
1785                input: None,
1786            })),
1787            join_type: JoinType::Anti,
1788            conditions: vec![],
1789        });
1790
1791        let cardinality = estimator.estimate(&join);
1792        // Anti join returns at most left cardinality
1793        assert!(cardinality <= 1000.0);
1794        assert!(cardinality >= 1.0);
1795    }
1796
1797    #[test]
1798    fn test_project_preserves_cardinality() {
1799        let mut estimator = CardinalityEstimator::new();
1800        estimator.add_table_stats("Person", TableStats::new(1000));
1801
1802        let project = LogicalOperator::Project(ProjectOp {
1803            projections: vec![Projection {
1804                expression: LogicalExpression::Variable("n".to_string()),
1805                alias: None,
1806            }],
1807            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1808                variable: "n".to_string(),
1809                label: Some("Person".to_string()),
1810                input: None,
1811            })),
1812            pass_through_input: false,
1813        });
1814
1815        let cardinality = estimator.estimate(&project);
1816        assert!((cardinality - 1000.0).abs() < 0.001);
1817    }
1818
1819    #[test]
1820    fn test_sort_preserves_cardinality() {
1821        let mut estimator = CardinalityEstimator::new();
1822        estimator.add_table_stats("Person", TableStats::new(1000));
1823
1824        let sort = LogicalOperator::Sort(SortOp {
1825            keys: vec![SortKey {
1826                expression: LogicalExpression::Variable("n".to_string()),
1827                order: SortOrder::Ascending,
1828                nulls: None,
1829            }],
1830            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1831                variable: "n".to_string(),
1832                label: Some("Person".to_string()),
1833                input: None,
1834            })),
1835        });
1836
1837        let cardinality = estimator.estimate(&sort);
1838        assert!((cardinality - 1000.0).abs() < 0.001);
1839    }
1840
1841    #[test]
1842    fn test_distinct_reduces_cardinality() {
1843        let mut estimator = CardinalityEstimator::new();
1844        estimator.add_table_stats("Person", TableStats::new(1000));
1845
1846        let distinct = LogicalOperator::Distinct(DistinctOp {
1847            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1848                variable: "n".to_string(),
1849                label: Some("Person".to_string()),
1850                input: None,
1851            })),
1852            columns: None,
1853        });
1854
1855        let cardinality = estimator.estimate(&distinct);
1856        // Distinct assumes 50% unique
1857        assert!((cardinality - 500.0).abs() < 0.001);
1858    }
1859
1860    #[test]
1861    fn test_skip_reduces_cardinality() {
1862        let mut estimator = CardinalityEstimator::new();
1863        estimator.add_table_stats("Person", TableStats::new(1000));
1864
1865        let skip = LogicalOperator::Skip(SkipOp {
1866            count: 100.into(),
1867            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1868                variable: "n".to_string(),
1869                label: Some("Person".to_string()),
1870                input: None,
1871            })),
1872        });
1873
1874        let cardinality = estimator.estimate(&skip);
1875        assert!((cardinality - 900.0).abs() < 0.001);
1876    }
1877
1878    #[test]
1879    fn test_return_preserves_cardinality() {
1880        let mut estimator = CardinalityEstimator::new();
1881        estimator.add_table_stats("Person", TableStats::new(1000));
1882
1883        let ret = LogicalOperator::Return(ReturnOp {
1884            items: vec![ReturnItem {
1885                expression: LogicalExpression::Variable("n".to_string()),
1886                alias: None,
1887            }],
1888            distinct: false,
1889            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1890                variable: "n".to_string(),
1891                label: Some("Person".to_string()),
1892                input: None,
1893            })),
1894        });
1895
1896        let cardinality = estimator.estimate(&ret);
1897        assert!((cardinality - 1000.0).abs() < 0.001);
1898    }
1899
1900    #[test]
1901    fn test_empty_cardinality() {
1902        let estimator = CardinalityEstimator::new();
1903        let cardinality = estimator.estimate(&LogicalOperator::Empty);
1904        assert!((cardinality).abs() < 0.001);
1905    }
1906
1907    #[test]
1908    fn test_table_stats_with_column() {
1909        let stats = TableStats::new(1000).with_column(
1910            "age",
1911            ColumnStats::new(50).with_nulls(10).with_range(0.0, 100.0),
1912        );
1913
1914        assert_eq!(stats.row_count, 1000);
1915        let col = stats.columns.get("age").unwrap();
1916        assert_eq!(col.distinct_count, 50);
1917        assert_eq!(col.null_count, 10);
1918        assert!((col.min_value.unwrap() - 0.0).abs() < 0.001);
1919        assert!((col.max_value.unwrap() - 100.0).abs() < 0.001);
1920    }
1921
1922    #[test]
1923    fn test_estimator_default() {
1924        let estimator = CardinalityEstimator::default();
1925        let scan = LogicalOperator::NodeScan(NodeScanOp {
1926            variable: "n".to_string(),
1927            label: None,
1928            input: None,
1929        });
1930        let cardinality = estimator.estimate(&scan);
1931        assert!((cardinality - 1000.0).abs() < 0.001);
1932    }
1933
1934    #[test]
1935    fn test_set_avg_fanout() {
1936        let mut estimator = CardinalityEstimator::new();
1937        estimator.add_table_stats("Person", TableStats::new(100));
1938        estimator.set_avg_fanout(5.0);
1939
1940        let expand = LogicalOperator::Expand(ExpandOp {
1941            from_variable: "a".to_string(),
1942            to_variable: "b".to_string(),
1943            edge_variable: None,
1944            direction: ExpandDirection::Outgoing,
1945            edge_types: vec![],
1946            min_hops: 1,
1947            max_hops: Some(1),
1948            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1949                variable: "a".to_string(),
1950                label: Some("Person".to_string()),
1951                input: None,
1952            })),
1953            path_alias: None,
1954            path_mode: PathMode::Walk,
1955        });
1956
1957        let cardinality = estimator.estimate(&expand);
1958        // With fanout 5: 100 * 5 = 500
1959        assert!((cardinality - 500.0).abs() < 0.001);
1960    }
1961
1962    #[test]
1963    fn test_multiple_group_by_keys_reduce_cardinality() {
1964        // The current implementation uses a simplified model where more group by keys
1965        // results in greater reduction (dividing by 10^num_keys). This is a simplification
1966        // that works for most cases where group by keys are correlated.
1967        let mut estimator = CardinalityEstimator::new();
1968        estimator.add_table_stats("Person", TableStats::new(10000));
1969
1970        let single_group = LogicalOperator::Aggregate(AggregateOp {
1971            group_by: vec![LogicalExpression::Property {
1972                variable: "n".to_string(),
1973                property: "city".to_string(),
1974            }],
1975            aggregates: vec![],
1976            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1977                variable: "n".to_string(),
1978                label: Some("Person".to_string()),
1979                input: None,
1980            })),
1981            having: None,
1982        });
1983
1984        let multi_group = LogicalOperator::Aggregate(AggregateOp {
1985            group_by: vec![
1986                LogicalExpression::Property {
1987                    variable: "n".to_string(),
1988                    property: "city".to_string(),
1989                },
1990                LogicalExpression::Property {
1991                    variable: "n".to_string(),
1992                    property: "country".to_string(),
1993                },
1994            ],
1995            aggregates: vec![],
1996            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
1997                variable: "n".to_string(),
1998                label: Some("Person".to_string()),
1999                input: None,
2000            })),
2001            having: None,
2002        });
2003
2004        let single_card = estimator.estimate(&single_group);
2005        let multi_card = estimator.estimate(&multi_group);
2006
2007        // Both should reduce cardinality from input
2008        assert!(single_card < 10000.0);
2009        assert!(multi_card < 10000.0);
2010        // Both should be at least 1
2011        assert!(single_card >= 1.0);
2012        assert!(multi_card >= 1.0);
2013    }
2014
2015    // ============= Histogram Tests =============
2016
2017    #[test]
2018    fn test_histogram_build_uniform() {
2019        // Build histogram from uniformly distributed data
2020        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2021        let histogram = EquiDepthHistogram::build(&values, 10);
2022
2023        assert_eq!(histogram.num_buckets(), 10);
2024        assert_eq!(histogram.total_rows(), 100);
2025
2026        // Each bucket should have approximately 10 rows
2027        for bucket in histogram.buckets() {
2028            assert!(bucket.frequency >= 9 && bucket.frequency <= 11);
2029        }
2030    }
2031
2032    #[test]
2033    fn test_histogram_build_skewed() {
2034        // Build histogram from skewed data (many small values, few large)
2035        let mut values: Vec<f64> = (0..80).map(|i| i as f64).collect();
2036        values.extend((0..20).map(|i| 1000.0 + i as f64));
2037        let histogram = EquiDepthHistogram::build(&values, 5);
2038
2039        assert_eq!(histogram.num_buckets(), 5);
2040        assert_eq!(histogram.total_rows(), 100);
2041
2042        // Each bucket should have ~20 rows despite skewed data
2043        for bucket in histogram.buckets() {
2044            assert!(bucket.frequency >= 18 && bucket.frequency <= 22);
2045        }
2046    }
2047
2048    #[test]
2049    fn test_histogram_range_selectivity_full() {
2050        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2051        let histogram = EquiDepthHistogram::build(&values, 10);
2052
2053        // Full range should have selectivity ~1.0
2054        let selectivity = histogram.range_selectivity(None, None);
2055        assert!((selectivity - 1.0).abs() < 0.01);
2056    }
2057
2058    #[test]
2059    fn test_histogram_range_selectivity_half() {
2060        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2061        let histogram = EquiDepthHistogram::build(&values, 10);
2062
2063        // Values >= 50 should be ~50% (half the data)
2064        let selectivity = histogram.range_selectivity(Some(50.0), None);
2065        assert!(selectivity > 0.4 && selectivity < 0.6);
2066    }
2067
2068    #[test]
2069    fn test_histogram_range_selectivity_quarter() {
2070        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2071        let histogram = EquiDepthHistogram::build(&values, 10);
2072
2073        // Values < 25 should be ~25%
2074        let selectivity = histogram.range_selectivity(None, Some(25.0));
2075        assert!(selectivity > 0.2 && selectivity < 0.3);
2076    }
2077
2078    #[test]
2079    fn test_histogram_equality_selectivity() {
2080        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2081        let histogram = EquiDepthHistogram::build(&values, 10);
2082
2083        // Equality on 100 distinct values should be ~1%
2084        let selectivity = histogram.equality_selectivity(50.0);
2085        assert!(selectivity > 0.005 && selectivity < 0.02);
2086    }
2087
2088    #[test]
2089    fn test_histogram_empty() {
2090        let histogram = EquiDepthHistogram::build(&[], 10);
2091
2092        assert_eq!(histogram.num_buckets(), 0);
2093        assert_eq!(histogram.total_rows(), 0);
2094
2095        // Default selectivity for empty histogram
2096        let selectivity = histogram.range_selectivity(Some(0.0), Some(100.0));
2097        assert!((selectivity - 0.33).abs() < 0.01);
2098    }
2099
2100    #[test]
2101    fn test_histogram_bucket_overlap() {
2102        let bucket = HistogramBucket::new(10.0, 20.0, 100, 10);
2103
2104        // Full overlap
2105        assert!((bucket.overlap_fraction(Some(10.0), Some(20.0)) - 1.0).abs() < 0.01);
2106
2107        // Half overlap (lower half)
2108        assert!((bucket.overlap_fraction(Some(10.0), Some(15.0)) - 0.5).abs() < 0.01);
2109
2110        // Half overlap (upper half)
2111        assert!((bucket.overlap_fraction(Some(15.0), Some(20.0)) - 0.5).abs() < 0.01);
2112
2113        // No overlap (below)
2114        assert!((bucket.overlap_fraction(Some(0.0), Some(5.0))).abs() < 0.01);
2115
2116        // No overlap (above)
2117        assert!((bucket.overlap_fraction(Some(25.0), Some(30.0))).abs() < 0.01);
2118    }
2119
2120    #[test]
2121    fn test_column_stats_from_values() {
2122        let values = vec![10.0, 20.0, 30.0, 40.0, 50.0, 20.0, 30.0, 40.0];
2123        let stats = ColumnStats::from_values(values, 4);
2124
2125        assert_eq!(stats.distinct_count, 5); // 10, 20, 30, 40, 50
2126        assert!(stats.min_value.is_some());
2127        assert!((stats.min_value.unwrap() - 10.0).abs() < 0.01);
2128        assert!(stats.max_value.is_some());
2129        assert!((stats.max_value.unwrap() - 50.0).abs() < 0.01);
2130        assert!(stats.histogram.is_some());
2131    }
2132
2133    #[test]
2134    fn test_column_stats_with_histogram_builder() {
2135        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2136        let histogram = EquiDepthHistogram::build(&values, 10);
2137
2138        let stats = ColumnStats::new(100)
2139            .with_range(0.0, 99.0)
2140            .with_histogram(histogram);
2141
2142        assert!(stats.histogram.is_some());
2143        assert_eq!(stats.histogram.as_ref().unwrap().num_buckets(), 10);
2144    }
2145
2146    #[test]
2147    fn test_filter_with_histogram_stats() {
2148        let mut estimator = CardinalityEstimator::new();
2149
2150        // Create stats with histogram for age column
2151        let age_values: Vec<f64> = (18..80).map(|i| i as f64).collect();
2152        let histogram = EquiDepthHistogram::build(&age_values, 10);
2153        let age_stats = ColumnStats::new(62)
2154            .with_range(18.0, 79.0)
2155            .with_histogram(histogram);
2156
2157        estimator.add_table_stats(
2158            "Person",
2159            TableStats::new(1000).with_column("age", age_stats),
2160        );
2161
2162        // Filter: age > 50
2163        // Age range is 18-79, so >50 is about (79-50)/(79-18) = 29/61 ≈ 47.5%
2164        let filter = LogicalOperator::Filter(FilterOp {
2165            predicate: LogicalExpression::Binary {
2166                left: Box::new(LogicalExpression::Property {
2167                    variable: "n".to_string(),
2168                    property: "age".to_string(),
2169                }),
2170                op: BinaryOp::Gt,
2171                right: Box::new(LogicalExpression::Literal(Value::Int64(50))),
2172            },
2173            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2174                variable: "n".to_string(),
2175                label: Some("Person".to_string()),
2176                input: None,
2177            })),
2178            pushdown_hint: None,
2179        });
2180
2181        let cardinality = estimator.estimate(&filter);
2182
2183        // With histogram, should get more accurate estimate than default 0.33
2184        // Expected: ~47.5% of 1000 = ~475
2185        assert!(cardinality > 300.0 && cardinality < 600.0);
2186    }
2187
2188    #[test]
2189    fn test_filter_equality_with_histogram() {
2190        let mut estimator = CardinalityEstimator::new();
2191
2192        // Create stats with histogram
2193        let values: Vec<f64> = (0..100).map(|i| i as f64).collect();
2194        let histogram = EquiDepthHistogram::build(&values, 10);
2195        let stats = ColumnStats::new(100)
2196            .with_range(0.0, 99.0)
2197            .with_histogram(histogram);
2198
2199        estimator.add_table_stats("Data", TableStats::new(1000).with_column("value", stats));
2200
2201        // Filter: value = 50
2202        let filter = LogicalOperator::Filter(FilterOp {
2203            predicate: LogicalExpression::Binary {
2204                left: Box::new(LogicalExpression::Property {
2205                    variable: "d".to_string(),
2206                    property: "value".to_string(),
2207                }),
2208                op: BinaryOp::Eq,
2209                right: Box::new(LogicalExpression::Literal(Value::Int64(50))),
2210            },
2211            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2212                variable: "d".to_string(),
2213                label: Some("Data".to_string()),
2214                input: None,
2215            })),
2216            pushdown_hint: None,
2217        });
2218
2219        let cardinality = estimator.estimate(&filter);
2220
2221        // With 100 distinct values, selectivity should be ~1%
2222        // 1000 * 0.01 = 10
2223        assert!((1.0..50.0).contains(&cardinality));
2224    }
2225
2226    #[test]
2227    fn test_histogram_min_max() {
2228        let values: Vec<f64> = vec![5.0, 10.0, 15.0, 20.0, 25.0];
2229        let histogram = EquiDepthHistogram::build(&values, 2);
2230
2231        assert_eq!(histogram.min_value(), Some(5.0));
2232        // Max is the upper bound of the last bucket
2233        assert!(histogram.max_value().is_some());
2234    }
2235
2236    // ==================== SelectivityConfig Tests ====================
2237
2238    #[test]
2239    fn test_selectivity_config_defaults() {
2240        let config = SelectivityConfig::new();
2241        assert!((config.default - 0.1).abs() < f64::EPSILON);
2242        assert!((config.equality - 0.01).abs() < f64::EPSILON);
2243        assert!((config.inequality - 0.99).abs() < f64::EPSILON);
2244        assert!((config.range - 0.33).abs() < f64::EPSILON);
2245        assert!((config.string_ops - 0.1).abs() < f64::EPSILON);
2246        assert!((config.membership - 0.1).abs() < f64::EPSILON);
2247        assert!((config.is_null - 0.05).abs() < f64::EPSILON);
2248        assert!((config.is_not_null - 0.95).abs() < f64::EPSILON);
2249        assert!((config.distinct_fraction - 0.5).abs() < f64::EPSILON);
2250    }
2251
2252    #[test]
2253    fn test_custom_selectivity_config() {
2254        let config = SelectivityConfig {
2255            equality: 0.05,
2256            range: 0.25,
2257            ..SelectivityConfig::new()
2258        };
2259        let estimator = CardinalityEstimator::with_selectivity_config(config);
2260        assert!((estimator.selectivity_config().equality - 0.05).abs() < f64::EPSILON);
2261        assert!((estimator.selectivity_config().range - 0.25).abs() < f64::EPSILON);
2262    }
2263
2264    #[test]
2265    fn test_custom_selectivity_affects_estimation() {
2266        // Default: equality = 0.01 → 1000 * 0.01 = 10
2267        let mut default_est = CardinalityEstimator::new();
2268        default_est.add_table_stats("Person", TableStats::new(1000));
2269
2270        let filter = LogicalOperator::Filter(FilterOp {
2271            predicate: LogicalExpression::Binary {
2272                left: Box::new(LogicalExpression::Property {
2273                    variable: "n".to_string(),
2274                    property: "name".to_string(),
2275                }),
2276                op: BinaryOp::Eq,
2277                right: Box::new(LogicalExpression::Literal(Value::String("Alix".into()))),
2278            },
2279            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2280                variable: "n".to_string(),
2281                label: Some("Person".to_string()),
2282                input: None,
2283            })),
2284            pushdown_hint: None,
2285        });
2286
2287        let default_card = default_est.estimate(&filter);
2288
2289        // Custom: equality = 0.2 → 1000 * 0.2 = 200
2290        let config = SelectivityConfig {
2291            equality: 0.2,
2292            ..SelectivityConfig::new()
2293        };
2294        let mut custom_est = CardinalityEstimator::with_selectivity_config(config);
2295        custom_est.add_table_stats("Person", TableStats::new(1000));
2296
2297        let custom_card = custom_est.estimate(&filter);
2298
2299        assert!(custom_card > default_card);
2300        assert!((custom_card - 200.0).abs() < 1.0);
2301    }
2302
2303    #[test]
2304    fn test_custom_range_selectivity() {
2305        let config = SelectivityConfig {
2306            range: 0.5,
2307            ..SelectivityConfig::new()
2308        };
2309        let mut estimator = CardinalityEstimator::with_selectivity_config(config);
2310        estimator.add_table_stats("Person", TableStats::new(1000));
2311
2312        let filter = LogicalOperator::Filter(FilterOp {
2313            predicate: LogicalExpression::Binary {
2314                left: Box::new(LogicalExpression::Property {
2315                    variable: "n".to_string(),
2316                    property: "age".to_string(),
2317                }),
2318                op: BinaryOp::Gt,
2319                right: Box::new(LogicalExpression::Literal(Value::Int64(30))),
2320            },
2321            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2322                variable: "n".to_string(),
2323                label: Some("Person".to_string()),
2324                input: None,
2325            })),
2326            pushdown_hint: None,
2327        });
2328
2329        let cardinality = estimator.estimate(&filter);
2330        // 1000 * 0.5 = 500
2331        assert!((cardinality - 500.0).abs() < 1.0);
2332    }
2333
2334    #[test]
2335    fn test_custom_distinct_fraction() {
2336        let config = SelectivityConfig {
2337            distinct_fraction: 0.8,
2338            ..SelectivityConfig::new()
2339        };
2340        let mut estimator = CardinalityEstimator::with_selectivity_config(config);
2341        estimator.add_table_stats("Person", TableStats::new(1000));
2342
2343        let distinct = LogicalOperator::Distinct(DistinctOp {
2344            input: Box::new(LogicalOperator::NodeScan(NodeScanOp {
2345                variable: "n".to_string(),
2346                label: Some("Person".to_string()),
2347                input: None,
2348            })),
2349            columns: None,
2350        });
2351
2352        let cardinality = estimator.estimate(&distinct);
2353        // 1000 * 0.8 = 800
2354        assert!((cardinality - 800.0).abs() < 1.0);
2355    }
2356
2357    // ==================== EstimationLog Tests ====================
2358
2359    #[test]
2360    fn test_estimation_log_basic() {
2361        let mut log = EstimationLog::new(10.0);
2362        log.record("NodeScan(Person)", 1000.0, 1200.0);
2363        log.record("Filter(age > 30)", 100.0, 90.0);
2364
2365        assert_eq!(log.entries().len(), 2);
2366        assert!(!log.should_replan()); // 1.2x and 0.9x are within 10x threshold
2367    }
2368
2369    #[test]
2370    fn test_estimation_log_triggers_replan() {
2371        let mut log = EstimationLog::new(10.0);
2372        log.record("NodeScan(Person)", 100.0, 5000.0); // 50x underestimate
2373
2374        assert!(log.should_replan());
2375    }
2376
2377    #[test]
2378    fn test_estimation_log_overestimate_triggers_replan() {
2379        let mut log = EstimationLog::new(5.0);
2380        log.record("Filter", 1000.0, 100.0); // 10x overestimate → ratio = 0.1
2381
2382        assert!(log.should_replan()); // 0.1 < 1/5 = 0.2
2383    }
2384
2385    #[test]
2386    fn test_estimation_entry_error_ratio() {
2387        let entry = EstimationEntry {
2388            operator: "test".into(),
2389            estimated: 100.0,
2390            actual: 200.0,
2391        };
2392        assert!((entry.error_ratio() - 2.0).abs() < f64::EPSILON);
2393
2394        let perfect = EstimationEntry {
2395            operator: "test".into(),
2396            estimated: 100.0,
2397            actual: 100.0,
2398        };
2399        assert!((perfect.error_ratio() - 1.0).abs() < f64::EPSILON);
2400
2401        let zero_est = EstimationEntry {
2402            operator: "test".into(),
2403            estimated: 0.0,
2404            actual: 0.0,
2405        };
2406        assert!((zero_est.error_ratio() - 1.0).abs() < f64::EPSILON);
2407    }
2408
2409    #[test]
2410    fn test_estimation_log_max_error_ratio() {
2411        let mut log = EstimationLog::new(10.0);
2412        log.record("A", 100.0, 300.0); // 3x
2413        log.record("B", 100.0, 50.0); // 2x (normalized: 1/0.5 = 2)
2414        log.record("C", 100.0, 100.0); // 1x
2415
2416        assert!((log.max_error_ratio() - 3.0).abs() < f64::EPSILON);
2417    }
2418
2419    #[test]
2420    fn test_estimation_log_clear() {
2421        let mut log = EstimationLog::new(10.0);
2422        log.record("A", 100.0, 100.0);
2423        assert_eq!(log.entries().len(), 1);
2424
2425        log.clear();
2426        assert!(log.entries().is_empty());
2427        assert!(!log.should_replan());
2428    }
2429
2430    #[test]
2431    fn test_create_estimation_log() {
2432        let log = CardinalityEstimator::create_estimation_log();
2433        assert!(log.entries().is_empty());
2434        assert!(!log.should_replan());
2435    }
2436
2437    #[test]
2438    fn test_equality_selectivity_empty_histogram() {
2439        let hist = EquiDepthHistogram::new(vec![]);
2440        // Empty histogram returns fixed fallback
2441        assert_eq!(hist.equality_selectivity(5.0), 0.01);
2442    }
2443
2444    #[test]
2445    fn test_equality_selectivity_value_in_bucket() {
2446        let values: Vec<f64> = (1..=10).map(|i| i as f64).collect();
2447        let hist = EquiDepthHistogram::build(&values, 2);
2448        let sel = hist.equality_selectivity(3.0);
2449        assert!(sel > 0.0);
2450        assert!(sel <= 1.0);
2451    }
2452
2453    #[test]
2454    fn test_equality_selectivity_value_outside_all_buckets() {
2455        let values: Vec<f64> = (1..=10).map(|i| i as f64).collect();
2456        let hist = EquiDepthHistogram::build(&values, 2);
2457        // Value far outside range
2458        let sel = hist.equality_selectivity(9999.0);
2459        assert_eq!(sel, 0.001);
2460    }
2461
2462    #[test]
2463    fn test_histogram_min_max_empty() {
2464        let hist = EquiDepthHistogram::new(vec![]);
2465        assert_eq!(hist.min_value(), None);
2466        assert_eq!(hist.max_value(), None);
2467    }
2468
2469    #[test]
2470    fn test_histogram_min_max_single_bucket() {
2471        let hist = EquiDepthHistogram::new(vec![HistogramBucket::new(1.0, 10.0, 5, 5)]);
2472        assert_eq!(hist.min_value(), Some(1.0));
2473        assert_eq!(hist.max_value(), Some(10.0));
2474    }
2475
2476    #[test]
2477    fn test_histogram_min_max_multi_bucket() {
2478        let values = vec![1.0, 2.0, 3.0, 4.0, 5.0, 10.0, 20.0];
2479        let hist = EquiDepthHistogram::build(&values, 3);
2480        let min = hist.min_value().unwrap();
2481        let max = hist.max_value().unwrap();
2482        assert!((min - 1.0).abs() < 1e-9, "min should be 1.0, got {min}");
2483        assert!(max >= 20.0, "max should be >= last value, got {max}");
2484    }
2485
2486    #[test]
2487    fn test_count_and_conjuncts_single_expression() {
2488        use crate::query::plan::LogicalExpression;
2489        let expr = LogicalExpression::Literal(Value::Bool(true));
2490        assert_eq!(count_and_conjuncts(&expr), 1);
2491    }
2492
2493    #[test]
2494    fn test_count_and_conjuncts_flat_and() {
2495        use crate::query::plan::{BinaryOp, LogicalExpression};
2496        let expr = LogicalExpression::Binary {
2497            left: Box::new(LogicalExpression::Literal(Value::Bool(true))),
2498            op: BinaryOp::And,
2499            right: Box::new(LogicalExpression::Literal(Value::Bool(false))),
2500        };
2501        assert_eq!(count_and_conjuncts(&expr), 2);
2502    }
2503
2504    #[test]
2505    fn test_count_and_conjuncts_nested_and() {
2506        use crate::query::plan::{BinaryOp, LogicalExpression};
2507        let ab = LogicalExpression::Binary {
2508            left: Box::new(LogicalExpression::Literal(Value::Bool(true))),
2509            op: BinaryOp::And,
2510            right: Box::new(LogicalExpression::Literal(Value::Bool(false))),
2511        };
2512        let cd = LogicalExpression::Binary {
2513            left: Box::new(LogicalExpression::Literal(Value::Int64(1))),
2514            op: BinaryOp::And,
2515            right: Box::new(LogicalExpression::Literal(Value::Int64(2))),
2516        };
2517        let expr = LogicalExpression::Binary {
2518            left: Box::new(ab),
2519            op: BinaryOp::And,
2520            right: Box::new(cd),
2521        };
2522        assert_eq!(count_and_conjuncts(&expr), 4);
2523    }
2524
2525    #[test]
2526    fn test_count_distinct_empty() {
2527        assert_eq!(count_distinct(&[]), 0);
2528    }
2529
2530    #[test]
2531    fn test_count_distinct_all_unique() {
2532        assert_eq!(count_distinct(&[1.0, 2.0, 3.0, 4.0]), 4);
2533    }
2534
2535    #[test]
2536    fn test_count_distinct_with_duplicates() {
2537        assert_eq!(count_distinct(&[1.0, 1.0, 2.0, 2.0, 3.0]), 3);
2538    }
2539
2540    #[test]
2541    fn test_count_distinct_all_same() {
2542        assert_eq!(count_distinct(&[5.0, 5.0, 5.0]), 1);
2543    }
2544
2545    #[test]
2546    fn test_count_distinct_single_value() {
2547        assert_eq!(count_distinct(&[42.0]), 1);
2548    }
2549}