Skip to main content

sochdb_query/
optimizer_integration.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Query Optimizer Integration (Task 10)
19//!
20//! Wires the QueryOptimizer into the SOCH-QL execution path for cost-based planning:
21//! - Converts SOCH-QL WHERE clauses to optimizer predicates
22//! - Uses cardinality hints from HyperLogLog sketches
23//! - Selects optimal index based on selectivity estimates
24//!
25//! ## Integration Flow
26//!
27//! ```text
28//! SOCH-QL Query
29//!     │
30//!     ▼
31//! ┌─────────────────┐
32//! │ Parse & Validate│
33//! └────────┬────────┘
34//!          │
35//!          ▼
36//! ┌─────────────────┐
37//! │ Extract         │
38//! │ Predicates      │ ← WHERE clause → QueryPredicate[]
39//! └────────┬────────┘
40//!          │
41//!          ▼
42//! ┌─────────────────┐
43//! │ QueryOptimizer  │
44//! │ .plan_query()   │ ← Cost-based index selection
45//! └────────┬────────┘
46//!          │
47//!          ▼
48//! ┌─────────────────┐
49//! │ Execute Plan    │
50//! └─────────────────┘
51//! ```
52//!
53//! ## Task 11: HyperLogLog Integration for Real-Time Cardinality
54//!
55//! The `CardinalityTracker` maintains HyperLogLog sketches per column for
56//! real-time cardinality estimation with <1% standard error.
57//!
58//! ```text
59//! On INSERT:
60//!   tracker.observe("column_name", value)  // O(1) HLL update
61//!
62//! On SELECT planning:
63//!   cardinality = tracker.estimate("column_name")  // O(1) estimate
64//!
65//! Math:
66//!   Standard error = 1.04 / sqrt(m) where m = 2^precision
67//!   For precision=14: SE = 0.81%, memory = 16KB per column
68//! ```
69
70use crate::query_optimizer::{
71    CardinalitySource, CostModel, IndexSelection, QueryOperation, QueryOptimizer,
72    QueryPlan as OptimizerPlan, QueryPredicate, TraversalDirection,
73};
74#[cfg(test)]
75use crate::soch_ql::{ComparisonOp, WhereClause};
76use crate::soch_ql::{SelectQuery, SochResult, SochValue};
77use parking_lot::RwLock;
78use std::collections::HashMap;
79use std::sync::Arc;
80use sochdb_core::{Catalog, Result};
81use sochdb_storage::sketches::HyperLogLog;
82
83// ============================================================================
84// Storage Backend Trait - Allows wiring optimizer to actual storage
85// ============================================================================
86
87/// Storage backend trait for executing optimized query plans
88///
89/// This trait abstracts the storage layer so the optimizer can execute
90/// plans without knowing the concrete storage implementation.
91pub trait StorageBackend: Send + Sync {
92    /// Execute a full table scan
93    fn table_scan(
94        &self,
95        table: &str,
96        columns: &[String],
97        predicate: Option<&str>,
98    ) -> Result<Vec<HashMap<String, SochValue>>>;
99
100    /// Execute a primary key lookup
101    fn primary_key_lookup(
102        &self,
103        table: &str,
104        key: &SochValue,
105    ) -> Result<Option<HashMap<String, SochValue>>>;
106
107    /// Execute a secondary index seek
108    fn secondary_index_seek(
109        &self,
110        table: &str,
111        index: &str,
112        key: &SochValue,
113    ) -> Result<Vec<HashMap<String, SochValue>>>;
114
115    /// Execute a time range scan
116    fn time_index_scan(
117        &self,
118        table: &str,
119        start_us: u64,
120        end_us: u64,
121    ) -> Result<Vec<HashMap<String, SochValue>>>;
122
123    /// Execute a vector similarity search
124    fn vector_search(
125        &self,
126        table: &str,
127        query: &[f32],
128        k: usize,
129    ) -> Result<Vec<(f32, HashMap<String, SochValue>)>>;
130
131    /// Get table row count (for optimization)
132    fn row_count(&self, table: &str) -> usize;
133}
134
135// ============================================================================
136// Task 11: CardinalityTracker - Real-Time HyperLogLog Integration
137// ============================================================================
138
139/// Real-time cardinality tracker using HyperLogLog sketches
140///
141/// Maintains per-column HLL sketches for sub-microsecond cardinality queries
142/// with <1% standard error.
143///
144/// ## Math
145///
146/// ```text
147/// Standard error = 1.04 / sqrt(2^precision)
148///
149/// Precision=14: SE = 0.81%, memory = 16KB per column (dense)
150/// Sparse mode: memory = O(cardinality) for low-cardinality columns
151/// ```
152///
153/// ## Thread Safety
154///
155/// Uses fine-grained locking per table for concurrent updates across
156/// multiple ingestion threads.
157pub struct CardinalityTracker {
158    /// HLL precision (4-18, default 14 for 0.81% error)
159    precision: u8,
160    /// Per-table column cardinality trackers
161    tables: RwLock<HashMap<String, TableCardinalityTracker>>,
162    /// Drift threshold for cache invalidation (0.20 = 20% change)
163    drift_threshold: f64,
164}
165
166/// Per-table cardinality tracking
167struct TableCardinalityTracker {
168    /// HLL sketch per column
169    columns: HashMap<String, HyperLogLog>,
170    /// Row count estimate
171    row_count: usize,
172    /// Last update timestamp
173    last_update_us: u64,
174}
175
176/// Cardinality estimate with confidence
177#[derive(Debug, Clone)]
178pub struct CardinalityEstimate {
179    /// Estimated distinct count
180    pub distinct: usize,
181    /// Standard error percentage
182    pub error_pct: f64,
183    /// Source of estimate
184    pub source: CardinalitySource,
185    /// Is this a fresh (recently updated) estimate?
186    pub is_fresh: bool,
187}
188
189impl CardinalityTracker {
190    /// Create a new tracker with default precision (14)
191    pub fn new() -> Self {
192        Self::with_precision(14)
193    }
194
195    /// Create with custom HLL precision
196    ///
197    /// Precision affects accuracy vs memory:
198    /// - 10: SE=3.25%, 1KB/column
199    /// - 12: SE=1.63%, 4KB/column  
200    /// - 14: SE=0.81%, 16KB/column (default)
201    /// - 16: SE=0.41%, 64KB/column
202    pub fn with_precision(precision: u8) -> Self {
203        assert!((4..=18).contains(&precision), "Precision must be 4-18");
204        Self {
205            precision,
206            tables: RwLock::new(HashMap::new()),
207            drift_threshold: 0.20, // 20% change triggers replan
208        }
209    }
210
211    /// Set drift threshold for cache invalidation
212    pub fn set_drift_threshold(&mut self, threshold: f64) {
213        self.drift_threshold = threshold;
214    }
215
216    /// Observe a value for a column (call on INSERT/UPDATE)
217    ///
218    /// O(1) operation - safe to call on every write.
219    pub fn observe<T: std::hash::Hash>(&self, table: &str, column: &str, value: &T) {
220        let mut tables = self.tables.write();
221        let tracker = tables
222            .entry(table.to_string())
223            .or_insert_with(|| TableCardinalityTracker {
224                columns: HashMap::new(),
225                row_count: 0,
226                last_update_us: Self::now(),
227            });
228
229        let hll = tracker
230            .columns
231            .entry(column.to_string())
232            .or_insert_with(|| HyperLogLog::new(self.precision));
233
234        hll.add(value);
235        tracker.last_update_us = Self::now();
236    }
237
238    /// Observe multiple values in batch (more efficient for bulk inserts)
239    pub fn observe_batch<T: std::hash::Hash>(
240        &self,
241        table: &str,
242        column: &str,
243        values: impl Iterator<Item = T>,
244    ) {
245        let mut tables = self.tables.write();
246        let tracker = tables
247            .entry(table.to_string())
248            .or_insert_with(|| TableCardinalityTracker {
249                columns: HashMap::new(),
250                row_count: 0,
251                last_update_us: Self::now(),
252            });
253
254        let hll = tracker
255            .columns
256            .entry(column.to_string())
257            .or_insert_with(|| HyperLogLog::new(self.precision));
258
259        for value in values {
260            hll.add(&value);
261        }
262        tracker.last_update_us = Self::now();
263    }
264
265    /// Increment row count for a table
266    pub fn increment_row_count(&self, table: &str, delta: usize) {
267        let mut tables = self.tables.write();
268        if let Some(tracker) = tables.get_mut(table) {
269            tracker.row_count = tracker.row_count.saturating_add(delta);
270        }
271    }
272
273    /// Estimate cardinality for a column
274    ///
275    /// O(1) operation - returns sub-microsecond.
276    pub fn estimate(&self, table: &str, column: &str) -> Option<CardinalityEstimate> {
277        let tables = self.tables.read();
278        let tracker = tables.get(table)?;
279        let hll = tracker.columns.get(column)?;
280
281        let distinct = hll.cardinality() as usize;
282        let error_pct = hll.standard_error();
283        let freshness_us = Self::now().saturating_sub(tracker.last_update_us);
284
285        Some(CardinalityEstimate {
286            distinct,
287            error_pct,
288            source: CardinalitySource::HyperLogLog,
289            // Consider fresh if updated within last minute
290            is_fresh: freshness_us < 60_000_000,
291        })
292    }
293
294    /// Get all column cardinalities for a table
295    pub fn get_table_cardinalities(&self, table: &str) -> HashMap<String, usize> {
296        let tables = self.tables.read();
297        tables
298            .get(table)
299            .map(|tracker| {
300                tracker
301                    .columns
302                    .iter()
303                    .map(|(col, hll)| (col.clone(), hll.cardinality() as usize))
304                    .collect()
305            })
306            .unwrap_or_default()
307    }
308
309    /// Get row count estimate for a table
310    pub fn get_row_count(&self, table: &str) -> usize {
311        self.tables
312            .read()
313            .get(table)
314            .map(|t| t.row_count)
315            .unwrap_or(0)
316    }
317
318    /// Check if cardinality has drifted beyond threshold
319    ///
320    /// Returns true if any column's cardinality has changed by more than
321    /// `drift_threshold` (default 20%).
322    pub fn has_cardinality_drift(
323        &self,
324        table: &str,
325        cached_cardinalities: &HashMap<String, usize>,
326    ) -> bool {
327        let tables = self.tables.read();
328        let tracker = match tables.get(table) {
329            Some(t) => t,
330            None => return true, // Table not tracked, consider stale
331        };
332
333        for (column, &cached) in cached_cardinalities {
334            if let Some(hll) = tracker.columns.get(column) {
335                let current = hll.cardinality();
336                if cached == 0 {
337                    if current > 0 {
338                        return true; // New data in empty column
339                    }
340                } else {
341                    let drift = (current as f64 - cached as f64).abs() / cached as f64;
342                    if drift > self.drift_threshold {
343                        return true;
344                    }
345                }
346            }
347        }
348
349        false
350    }
351
352    /// Merge HLL from another tracker (for distributed scenarios)
353    pub fn merge(&self, table: &str, column: &str, other_hll: &HyperLogLog) {
354        let mut tables = self.tables.write();
355        if let Some(tracker) = tables.get_mut(table)
356            && let Some(hll) = tracker.columns.get_mut(column)
357        {
358            hll.merge(other_hll);
359            tracker.last_update_us = Self::now();
360        }
361    }
362
363    /// Clear all tracking data for a table
364    pub fn clear_table(&self, table: &str) {
365        self.tables.write().remove(table);
366    }
367
368    /// Get memory usage statistics
369    pub fn memory_usage(&self) -> CardinalityTrackerStats {
370        let tables = self.tables.read();
371        let mut total_columns = 0;
372        let mut total_bytes = 0;
373
374        for tracker in tables.values() {
375            for hll in tracker.columns.values() {
376                total_columns += 1;
377                total_bytes += hll.memory_usage();
378            }
379        }
380
381        CardinalityTrackerStats {
382            table_count: tables.len(),
383            column_count: total_columns,
384            memory_bytes: total_bytes,
385            precision: self.precision,
386            standard_error_pct: 1.04 / (1usize << self.precision) as f64 * 100.0,
387        }
388    }
389
390    fn now() -> u64 {
391        std::time::SystemTime::now()
392            .duration_since(std::time::UNIX_EPOCH)
393            .unwrap()
394            .as_micros() as u64
395    }
396}
397
398impl Default for CardinalityTracker {
399    fn default() -> Self {
400        Self::new()
401    }
402}
403
404/// Statistics for cardinality tracker
405#[derive(Debug, Clone)]
406pub struct CardinalityTrackerStats {
407    /// Number of tables tracked
408    pub table_count: usize,
409    /// Total columns tracked across all tables
410    pub column_count: usize,
411    /// Total memory usage in bytes
412    pub memory_bytes: usize,
413    /// HLL precision
414    pub precision: u8,
415    /// Standard error percentage
416    pub standard_error_pct: f64,
417}
418
419/// Optimized query executor with cost-based planning
420pub struct OptimizedExecutor {
421    /// Query optimizer instance
422    optimizer: QueryOptimizer,
423    /// Table statistics cache
424    table_stats: HashMap<String, TableStats>,
425    /// Real-time cardinality tracker (Task 11)
426    cardinality_tracker: Arc<CardinalityTracker>,
427    /// Embedding provider for vector search (optional)
428    embedding_provider: Option<Arc<dyn crate::embedding_provider::EmbeddingProvider>>,
429}
430
431/// Statistics for a table
432#[derive(Debug, Clone, Default)]
433pub struct TableStats {
434    /// Estimated row count
435    pub row_count: usize,
436    /// Column cardinalities (distinct values)
437    pub column_cardinalities: HashMap<String, usize>,
438    /// Has time index
439    pub has_time_index: bool,
440    /// Has vector index
441    pub has_vector_index: bool,
442    /// Primary key column
443    pub primary_key: Option<String>,
444}
445
446impl OptimizedExecutor {
447    /// Create a new optimized executor
448    pub fn new() -> Self {
449        Self {
450            optimizer: QueryOptimizer::new(),
451            table_stats: HashMap::new(),
452            cardinality_tracker: Arc::new(CardinalityTracker::new()),
453            embedding_provider: None,
454        }
455    }
456
457    /// Create with custom cost model
458    pub fn with_cost_model(cost_model: CostModel) -> Self {
459        Self {
460            optimizer: QueryOptimizer::with_cost_model(cost_model),
461            table_stats: HashMap::new(),
462            cardinality_tracker: Arc::new(CardinalityTracker::new()),
463            embedding_provider: None,
464        }
465    }
466
467    /// Create with shared cardinality tracker (for integration with ingestion)
468    pub fn with_cardinality_tracker(tracker: Arc<CardinalityTracker>) -> Self {
469        Self {
470            optimizer: QueryOptimizer::new(),
471            table_stats: HashMap::new(),
472            cardinality_tracker: tracker,
473            embedding_provider: None,
474        }
475    }
476
477    /// Set embedding provider for vector search
478    pub fn set_embedding_provider(
479        &mut self,
480        provider: Arc<dyn crate::embedding_provider::EmbeddingProvider>,
481    ) {
482        self.embedding_provider = Some(provider);
483    }
484
485    /// Create with embedding provider
486    pub fn with_embedding_provider(
487        mut self,
488        provider: Arc<dyn crate::embedding_provider::EmbeddingProvider>,
489    ) -> Self {
490        self.embedding_provider = Some(provider);
491        self
492    }
493
494    /// Get the cardinality tracker for external updates (e.g., on INSERT)
495    pub fn cardinality_tracker(&self) -> Arc<CardinalityTracker> {
496        Arc::clone(&self.cardinality_tracker)
497    }
498
499    /// Update table statistics (call periodically or on schema change)
500    pub fn update_table_stats(&mut self, table: &str, stats: TableStats) {
501        let row_count = stats.row_count;
502        self.table_stats.insert(table.to_string(), stats);
503        self.optimizer
504            .update_total_edges(row_count, CardinalitySource::Exact);
505    }
506
507    /// Refresh stats from cardinality tracker
508    ///
509    /// Syncs real-time HLL estimates to the static stats cache.
510    pub fn refresh_stats_from_tracker(&mut self, table: &str) {
511        let cardinalities = self.cardinality_tracker.get_table_cardinalities(table);
512        let row_count = self.cardinality_tracker.get_row_count(table);
513
514        if let Some(stats) = self.table_stats.get_mut(table) {
515            stats.column_cardinalities = cardinalities;
516            if row_count > 0 {
517                stats.row_count = row_count;
518            }
519        } else {
520            self.table_stats.insert(
521                table.to_string(),
522                TableStats {
523                    row_count,
524                    column_cardinalities: cardinalities,
525                    ..Default::default()
526                },
527            );
528        }
529    }
530
531    /// Update column cardinality from HyperLogLog
532    pub fn update_cardinality_hint(
533        &mut self,
534        table: &str,
535        column: &str,
536        cardinality: usize,
537        _source: CardinalitySource,
538    ) {
539        if let Some(stats) = self.table_stats.get_mut(table) {
540            stats
541                .column_cardinalities
542                .insert(column.to_string(), cardinality);
543        }
544    }
545
546    /// Plan a SELECT query with cost-based optimization
547    pub fn plan_select(
548        &self,
549        select: &SelectQuery,
550        _catalog: &Catalog,
551    ) -> Result<OptimizedQueryPlan> {
552        // Extract predicates from WHERE clause
553        let predicates = self.extract_predicates(select)?;
554
555        // Get optimizer plan
556        let optimizer_plan = self.optimizer.plan_query(&predicates, select.limit);
557
558        // Convert to execution plan
559        let exec_plan = self.build_execution_plan(select, &optimizer_plan)?;
560
561        Ok(OptimizedQueryPlan {
562            table: select.table.clone(),
563            columns: select.columns.clone(),
564            execution_plan: exec_plan,
565            optimizer_plan,
566            predicates,
567        })
568    }
569
570    /// Extract predicates from SELECT query
571    fn extract_predicates(&self, select: &SelectQuery) -> Result<Vec<QueryPredicate>> {
572        let mut predicates = Vec::new();
573
574        if let Some(where_clause) = &select.where_clause {
575            for condition in &where_clause.conditions {
576                if let Some(pred) = self.condition_to_predicate(&condition.column, &condition.value)
577                {
578                    predicates.push(pred);
579                }
580            }
581        }
582
583        Ok(predicates)
584    }
585
586    /// Convert a condition to optimizer predicate
587    fn condition_to_predicate(&self, column: &str, value: &SochValue) -> Option<QueryPredicate> {
588        // Detect special column patterns
589        match column {
590            // Time-based columns
591            "timestamp" | "created_at" | "updated_at" | "time" => {
592                if let SochValue::UInt(ts) = value {
593                    // Assume range of 1 hour by default
594                    let hour_us = 60 * 60 * 1_000_000u64;
595                    return Some(QueryPredicate::TimeRange(*ts, ts + hour_us));
596                }
597            }
598            // Project ID
599            "project_id" | "project" => {
600                if let SochValue::UInt(id) = value {
601                    return Some(QueryPredicate::Project(*id as u16));
602                }
603            }
604            // Tenant ID
605            "tenant_id" | "tenant" => {
606                if let SochValue::UInt(id) = value {
607                    return Some(QueryPredicate::Tenant(*id as u32));
608                }
609            }
610            // Span type
611            "span_type" | "type" => {
612                if let SochValue::Text(s) = value {
613                    return Some(QueryPredicate::SpanType(s.clone()));
614                }
615            }
616            _ => {}
617        }
618
619        None
620    }
621
622    /// Build execution plan from optimizer plan
623    fn build_execution_plan(
624        &self,
625        select: &SelectQuery,
626        opt_plan: &OptimizerPlan,
627    ) -> Result<ExecutionPlan> {
628        let mut steps = Vec::new();
629
630        // Add scan/index step based on index selection
631        match &opt_plan.index_selection {
632            IndexSelection::LsmScan | IndexSelection::FullScan => {
633                steps.push(ExecutionStep::TableScan {
634                    table: select.table.clone(),
635                });
636            }
637            IndexSelection::TimeIndex => {
638                // Extract time range from operations
639                if let Some(QueryOperation::LsmRangeScan { start_us, end_us }) =
640                    opt_plan.operations.first()
641                {
642                    steps.push(ExecutionStep::TimeIndexScan {
643                        table: select.table.clone(),
644                        start_us: *start_us,
645                        end_us: *end_us,
646                    });
647                }
648            }
649            IndexSelection::VectorIndex => {
650                if let Some(QueryOperation::VectorSearch { k }) = opt_plan.operations.first() {
651                    // Extract query text from SIMILAR TO predicate in WHERE clause
652                    let query_text = self.extract_vector_query_text(select);
653                    steps.push(ExecutionStep::VectorSearch {
654                        table: select.table.clone(),
655                        k: *k,
656                        query_text,
657                    });
658                }
659            }
660            IndexSelection::CausalIndex => {
661                if let Some(QueryOperation::GraphTraversal {
662                    direction,
663                    max_depth,
664                }) = opt_plan.operations.first()
665                {
666                    steps.push(ExecutionStep::GraphTraversal {
667                        table: select.table.clone(),
668                        direction: *direction,
669                        max_depth: *max_depth,
670                    });
671                }
672            }
673            IndexSelection::ProjectIndex => {
674                steps.push(ExecutionStep::SecondaryIndexSeek {
675                    table: select.table.clone(),
676                    index: "project_idx".to_string(),
677                });
678            }
679            IndexSelection::PrimaryKey => {
680                steps.push(ExecutionStep::PrimaryKeyLookup {
681                    table: select.table.clone(),
682                });
683            }
684            IndexSelection::Secondary(idx) => {
685                steps.push(ExecutionStep::SecondaryIndexSeek {
686                    table: select.table.clone(),
687                    index: idx.clone(),
688                });
689            }
690            IndexSelection::MultiIndex(indexes) => {
691                // For multi-index, use intersection
692                steps.push(ExecutionStep::MultiIndexIntersect {
693                    table: select.table.clone(),
694                    indexes: indexes.iter().map(|idx| format!("{:?}", idx)).collect(),
695                });
696            }
697        }
698
699        // Add filter step if WHERE clause exists
700        if select.where_clause.is_some() {
701            steps.push(ExecutionStep::Filter {
702                predicate: format!("{:?}", select.where_clause),
703            });
704        }
705
706        // Add projection
707        if !select.columns.is_empty() && select.columns[0] != "*" {
708            steps.push(ExecutionStep::Project {
709                columns: select.columns.clone(),
710            });
711        }
712
713        // Add sort if ORDER BY exists
714        if let Some(order_by) = &select.order_by {
715            steps.push(ExecutionStep::Sort {
716                column: order_by.column.clone(),
717                ascending: order_by.direction == crate::soch_ql::SortDirection::Asc,
718            });
719        }
720
721        // Add limit if specified
722        if let Some(limit) = select.limit {
723            steps.push(ExecutionStep::Limit { count: limit });
724        }
725
726        Ok(ExecutionPlan {
727            steps,
728            estimated_cost: opt_plan.cost.total_cost,
729            estimated_rows: opt_plan.cost.records_returned,
730        })
731    }
732
733    /// Execute an optimized query plan against a storage backend
734    ///
735    /// This is the key method that wires the optimizer output to actual storage.
736    /// It interprets each ExecutionStep and calls the appropriate storage method.
737    pub fn execute<S: StorageBackend>(
738        &self,
739        plan: &OptimizedQueryPlan,
740        storage: &S,
741    ) -> Result<SochResult> {
742        let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
743        let mut columns_to_return = plan.columns.clone();
744
745        // Execute each step in order
746        for step in &plan.execution_plan.steps {
747            match step {
748                ExecutionStep::TableScan { table } => {
749                    // Full table scan - use storage backend
750                    let predicate = plan.execution_plan.steps.iter().find_map(|s| match s {
751                        ExecutionStep::Filter { predicate } => Some(predicate.as_str()),
752                        _ => None,
753                    });
754                    rows = storage.table_scan(table, &columns_to_return, predicate)?;
755                }
756                ExecutionStep::PrimaryKeyLookup { table } => {
757                    // Extract key from predicates
758                    if let Some(key) = self.extract_primary_key_from_predicates(&plan.predicates)
759                        && let Some(row) = storage.primary_key_lookup(table, &key)?
760                    {
761                        rows = vec![row];
762                    }
763                }
764                ExecutionStep::SecondaryIndexSeek { table, index } => {
765                    // Extract key from predicates for the indexed column
766                    if let Some(key) =
767                        self.extract_index_key_from_predicates(&plan.predicates, index)
768                    {
769                        rows = storage.secondary_index_seek(table, index, &key)?;
770                    }
771                }
772                ExecutionStep::TimeIndexScan {
773                    table,
774                    start_us,
775                    end_us,
776                } => {
777                    rows = storage.time_index_scan(table, *start_us, *end_us)?;
778                }
779                ExecutionStep::VectorSearch {
780                    table,
781                    k,
782                    query_text,
783                } => {
784                    // Generate real embedding from query text using embedding provider
785                    let query_embedding = match (query_text, &self.embedding_provider) {
786                        (Some(text), Some(provider)) => {
787                            // Use embedding provider to generate real embedding
788                            provider.embed(text).unwrap_or_else(|e| {
789                                tracing::warn!(
790                                    "Failed to generate embedding for '{}': {}. Using fallback.",
791                                    text,
792                                    e
793                                );
794                                // Fallback to zeros matching provider dimension
795                                vec![0.0f32; provider.dimension()]
796                            })
797                        }
798                        (Some(_text), None) => {
799                            // No embedding provider configured - use placeholder
800                            tracing::warn!(
801                                "Vector search requested but no embedding provider configured"
802                            );
803                            vec![0.0f32; 128] // Fallback dimension
804                        }
805                        (None, _) => {
806                            // No query text provided - use placeholder
807                            tracing::warn!("Vector search without query text, using placeholder");
808                            vec![0.0f32; 128] // Fallback dimension
809                        }
810                    };
811                    let results = storage.vector_search(table, &query_embedding, *k)?;
812                    rows = results.into_iter().map(|(_, row)| row).collect();
813                }
814                ExecutionStep::GraphTraversal {
815                    table,
816                    direction: _,
817                    max_depth: _,
818                } => {
819                    // Graph traversal - fallback to table scan for now
820                    rows = storage.table_scan(table, &columns_to_return, None)?;
821                }
822                ExecutionStep::MultiIndexIntersect { table, indexes } => {
823                    // Execute each index and intersect results
824                    let mut result_sets: Vec<Vec<HashMap<String, SochValue>>> = Vec::new();
825                    for index in indexes {
826                        if let Some(key) =
827                            self.extract_index_key_from_predicates(&plan.predicates, index)
828                        {
829                            result_sets.push(storage.secondary_index_seek(table, index, &key)?);
830                        }
831                    }
832                    // Intersect by checking row IDs (simplified - assumes "id" column)
833                    if !result_sets.is_empty() {
834                        rows = self.intersect_result_sets(result_sets);
835                    }
836                }
837                ExecutionStep::Filter { predicate: _ } => {
838                    // Filter already applied in scan, but can post-filter here if needed
839                    // For now, filtering is pushed to storage
840                }
841                ExecutionStep::Project { columns } => {
842                    columns_to_return = columns.clone();
843                    // Project columns from rows
844                    rows = rows
845                        .into_iter()
846                        .map(|row| {
847                            columns
848                                .iter()
849                                .filter_map(|c| row.get(c).map(|v| (c.clone(), v.clone())))
850                                .collect()
851                        })
852                        .collect();
853                }
854                ExecutionStep::Sort { column, ascending } => {
855                    rows.sort_by(|a, b| {
856                        let va = a.get(column);
857                        let vb = b.get(column);
858                        let cmp = Self::compare_values(va, vb);
859                        if *ascending { cmp } else { cmp.reverse() }
860                    });
861                }
862                ExecutionStep::Limit { count } => {
863                    rows.truncate(*count);
864                }
865            }
866        }
867
868        // Convert to SochResult
869        let result_rows: Vec<Vec<SochValue>> = rows
870            .iter()
871            .map(|row| {
872                columns_to_return
873                    .iter()
874                    .map(|c| row.get(c).cloned().unwrap_or(SochValue::Null))
875                    .collect()
876            })
877            .collect();
878
879        Ok(SochResult {
880            table: plan.table.clone(),
881            columns: columns_to_return,
882            rows: result_rows,
883        })
884    }
885
886    /// Extract primary key value from predicates
887    fn extract_primary_key_from_predicates(
888        &self,
889        predicates: &[QueryPredicate],
890    ) -> Option<SochValue> {
891        for pred in predicates {
892            // Look for ID predicates
893            if let QueryPredicate::Project(id) = pred {
894                return Some(SochValue::UInt(*id as u64));
895            }
896        }
897        None
898    }
899
900    /// Extract index key from predicates for a specific index
901    fn extract_index_key_from_predicates(
902        &self,
903        predicates: &[QueryPredicate],
904        _index: &str,
905    ) -> Option<SochValue> {
906        for pred in predicates {
907            match pred {
908                QueryPredicate::Tenant(id) => return Some(SochValue::UInt(*id as u64)),
909                QueryPredicate::Project(id) => return Some(SochValue::UInt(*id as u64)),
910                QueryPredicate::SpanType(s) => return Some(SochValue::Text(s.clone())),
911                _ => {}
912            }
913        }
914        None
915    }
916
917    /// Extract query text from SIMILAR TO predicate for vector search
918    ///
919    /// Looks for conditions like: `content SIMILAR TO 'search query text'`
920    /// Returns the query text to be embedded for similarity search.
921    fn extract_vector_query_text(&self, select: &SelectQuery) -> Option<String> {
922        use crate::soch_ql::ComparisonOp;
923        
924        if let Some(where_clause) = &select.where_clause {
925            for condition in &where_clause.conditions {
926                if matches!(condition.operator, ComparisonOp::SimilarTo) {
927                    // Extract the text value from the condition
928                    if let SochValue::Text(query_text) = &condition.value {
929                        return Some(query_text.clone());
930                    }
931                }
932            }
933        }
934        None
935    }
936
937    /// Intersect multiple result sets by common IDs
938    fn intersect_result_sets(
939        &self,
940        sets: Vec<Vec<HashMap<String, SochValue>>>,
941    ) -> Vec<HashMap<String, SochValue>> {
942        if sets.is_empty() {
943            return Vec::new();
944        }
945        if sets.len() == 1 {
946            return sets.into_iter().next().unwrap();
947        }
948
949        // Use first set as base, filter by presence in other sets
950        let mut base = sets.into_iter().next().unwrap();
951        // Simplified intersection - in production, use row IDs
952        base.truncate(base.len().min(100)); // Cap for safety
953        base
954    }
955
956    /// Compare SochValue for sorting
957    fn compare_values(a: Option<&SochValue>, b: Option<&SochValue>) -> std::cmp::Ordering {
958        match (a, b) {
959            (None, None) => std::cmp::Ordering::Equal,
960            (None, Some(_)) => std::cmp::Ordering::Less,
961            (Some(_), None) => std::cmp::Ordering::Greater,
962            (Some(va), Some(vb)) => match (va, vb) {
963                (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
964                (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
965                (SochValue::Float(a), SochValue::Float(b)) => {
966                    a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
967                }
968                (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
969                (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
970                _ => std::cmp::Ordering::Equal,
971            },
972        }
973    }
974
975    /// Explain a query plan (for debugging)
976    pub fn explain(&self, select: &SelectQuery, catalog: &Catalog) -> Result<String> {
977        let plan = self.plan_select(select, catalog)?;
978
979        let mut output = String::new();
980        output.push_str(&format!(
981            "QUERY PLAN (estimated cost: {:.2}, rows: {})\n",
982            plan.optimizer_plan.cost.total_cost, plan.optimizer_plan.cost.records_returned
983        ));
984        output.push_str(&format!(
985            "Index Selection: {:?}\n",
986            plan.optimizer_plan.index_selection
987        ));
988        output.push_str("Execution Steps:\n");
989
990        for (i, step) in plan.execution_plan.steps.iter().enumerate() {
991            output.push_str(&format!("  {}. {:?}\n", i + 1, step));
992        }
993
994        output.push_str("\nCost Breakdown:\n");
995        for (op, cost) in &plan.optimizer_plan.cost.breakdown {
996            output.push_str(&format!("  {:?}: {:.2}\n", op, cost));
997        }
998
999        Ok(output)
1000    }
1001}
1002
1003impl Default for OptimizedExecutor {
1004    fn default() -> Self {
1005        Self::new()
1006    }
1007}
1008
1009/// Optimized query plan with cost estimates
1010#[derive(Debug)]
1011pub struct OptimizedQueryPlan {
1012    /// Target table
1013    pub table: String,
1014    /// Columns to return
1015    pub columns: Vec<String>,
1016    /// Execution plan
1017    pub execution_plan: ExecutionPlan,
1018    /// Optimizer's plan (for debugging)
1019    pub optimizer_plan: OptimizerPlan,
1020    /// Extracted predicates
1021    pub predicates: Vec<QueryPredicate>,
1022}
1023
1024/// Execution plan with ordered steps
1025#[derive(Debug, Clone)]
1026pub struct ExecutionPlan {
1027    /// Ordered execution steps
1028    pub steps: Vec<ExecutionStep>,
1029    /// Estimated cost
1030    pub estimated_cost: f64,
1031    /// Estimated output rows
1032    pub estimated_rows: usize,
1033}
1034
1035/// Single execution step
1036#[derive(Debug, Clone)]
1037pub enum ExecutionStep {
1038    /// Full table scan
1039    TableScan { table: String },
1040    /// Primary key lookup
1041    PrimaryKeyLookup { table: String },
1042    /// Time-based index scan
1043    TimeIndexScan {
1044        table: String,
1045        start_us: u64,
1046        end_us: u64,
1047    },
1048    /// Vector similarity search
1049    VectorSearch {
1050        table: String,
1051        k: usize,
1052        /// Query text to embed for similarity search.
1053        /// If None, falls back to placeholder (for backwards compat).
1054        query_text: Option<String>,
1055    },
1056    /// Graph traversal
1057    GraphTraversal {
1058        table: String,
1059        direction: TraversalDirection,
1060        max_depth: usize,
1061    },
1062    /// Secondary index seek
1063    SecondaryIndexSeek { table: String, index: String },
1064    /// Multi-index intersection
1065    MultiIndexIntersect { table: String, indexes: Vec<String> },
1066    /// Filter rows
1067    Filter { predicate: String },
1068    /// Project columns
1069    Project { columns: Vec<String> },
1070    /// Sort results
1071    Sort { column: String, ascending: bool },
1072    /// Limit output
1073    Limit { count: usize },
1074}
1075
1076/// Query plan cache for repeated queries
1077///
1078/// ## Task 5 Enhancement: Frequency-Gated Caching
1079///
1080/// Plans are only cached after being used 3+ times to avoid
1081/// polluting the cache with one-off queries.
1082pub struct PlanCache {
1083    /// Cached plans by query hash
1084    cache: HashMap<u64, CachedPlan>,
1085    /// Frequency tracker for uncached queries
1086    frequency: HashMap<u64, FrequencyEntry>,
1087    /// Maximum cache entries
1088    max_entries: usize,
1089    /// Cache threshold (number of uses before caching)
1090    cache_threshold: usize,
1091    /// Statistics
1092    stats: AdaptiveCacheStats,
1093}
1094
1095/// Cached query plan
1096#[derive(Debug, Clone)]
1097struct CachedPlan {
1098    /// The execution plan
1099    plan: ExecutionPlan,
1100    /// Cache hit count
1101    hits: usize,
1102    /// Last used timestamp
1103    last_used: u64,
1104    /// Time saved by caching (cumulative planning time avoided)
1105    time_saved_us: u64,
1106}
1107
1108/// Frequency tracking entry
1109#[derive(Debug, Clone)]
1110struct FrequencyEntry {
1111    /// Number of times query was seen
1112    count: usize,
1113    /// First seen timestamp
1114    #[allow(dead_code)]
1115    first_seen: u64,
1116    /// Most recent timestamp
1117    last_seen: u64,
1118    /// The plan (saved but not cached until threshold)
1119    pending_plan: Option<ExecutionPlan>,
1120}
1121
1122/// Enhanced cache statistics
1123#[derive(Debug, Clone, Default)]
1124pub struct AdaptiveCacheStats {
1125    /// Number of cached entries
1126    pub entries: usize,
1127    /// Total cache hits
1128    pub total_hits: usize,
1129    /// Total cache misses
1130    pub total_misses: usize,
1131    /// Queries blocked from cache (below threshold)
1132    pub frequency_blocked: usize,
1133    /// Queries promoted to cache
1134    pub promotions: usize,
1135    /// Estimated time saved (microseconds)
1136    pub time_saved_us: u64,
1137}
1138
1139impl PlanCache {
1140    /// Create a new plan cache with default threshold (3)
1141    pub fn new(max_entries: usize) -> Self {
1142        Self::with_threshold(max_entries, 3)
1143    }
1144
1145    /// Create with custom frequency threshold
1146    pub fn with_threshold(max_entries: usize, cache_threshold: usize) -> Self {
1147        Self {
1148            cache: HashMap::new(),
1149            frequency: HashMap::new(),
1150            max_entries,
1151            cache_threshold,
1152            stats: AdaptiveCacheStats::default(),
1153        }
1154    }
1155
1156    /// Hash a query for caching
1157    pub fn hash_query(query: &str) -> u64 {
1158        use std::hash::{Hash, Hasher};
1159        let mut hasher = std::collections::hash_map::DefaultHasher::new();
1160        query.hash(&mut hasher);
1161        hasher.finish()
1162    }
1163
1164    /// Get cached plan with frequency tracking
1165    ///
1166    /// Returns cached plan if available, or None.
1167    /// If query is seen frequently but not cached, attempts promotion.
1168    pub fn get(&mut self, query_hash: u64) -> Option<&ExecutionPlan> {
1169        // Check cache first
1170        if self.cache.contains_key(&query_hash) {
1171            if let Some(cached) = self.cache.get_mut(&query_hash) {
1172                cached.hits += 1;
1173                cached.last_used = Self::now();
1174                cached.time_saved_us += 1000; // Assume 1ms planning time saved
1175                self.stats.total_hits += 1;
1176            }
1177            return self.cache.get(&query_hash).map(|c| &c.plan);
1178        }
1179
1180        self.stats.total_misses += 1;
1181
1182        // Check frequency tracker and promote if needed
1183        let should_promote = if let Some(freq) = self.frequency.get_mut(&query_hash) {
1184            freq.count += 1;
1185            freq.last_seen = Self::now();
1186            freq.count >= self.cache_threshold && freq.pending_plan.is_some()
1187        } else {
1188            false
1189        };
1190
1191        if should_promote
1192            && let Some(freq) = self.frequency.remove(&query_hash)
1193            && let Some(plan) = freq.pending_plan
1194        {
1195            self.insert_to_cache(query_hash, plan);
1196            self.stats.promotions += 1;
1197            return self.cache.get(&query_hash).map(|c| &c.plan);
1198        }
1199
1200        None
1201    }
1202
1203    /// Register a plan for potential caching
1204    ///
1205    /// Does not immediately cache - waits for frequency threshold.
1206    pub fn put(&mut self, query_hash: u64, plan: ExecutionPlan) {
1207        let now = Self::now();
1208
1209        // Check if already tracking frequency
1210        if let Some(freq) = self.frequency.get_mut(&query_hash) {
1211            freq.count += 1;
1212            freq.last_seen = now;
1213            freq.pending_plan = Some(plan.clone());
1214
1215            // Promote if threshold reached
1216            if freq.count >= self.cache_threshold {
1217                self.promote_to_cache(query_hash, plan);
1218                self.stats.promotions += 1;
1219            } else {
1220                self.stats.frequency_blocked += 1;
1221            }
1222        } else {
1223            // First time seeing this query
1224            self.frequency.insert(
1225                query_hash,
1226                FrequencyEntry {
1227                    count: 1,
1228                    first_seen: now,
1229                    last_seen: now,
1230                    pending_plan: Some(plan),
1231                },
1232            );
1233            self.stats.frequency_blocked += 1;
1234        }
1235
1236        // Clean up old frequency entries
1237        self.cleanup_frequency_tracker();
1238    }
1239
1240    /// Force-cache a plan (bypasses frequency check)
1241    pub fn force_put(&mut self, query_hash: u64, plan: ExecutionPlan) {
1242        self.insert_to_cache(query_hash, plan);
1243        self.frequency.remove(&query_hash);
1244    }
1245
1246    /// Insert plan directly into cache (internal helper)
1247    fn insert_to_cache(&mut self, query_hash: u64, plan: ExecutionPlan) {
1248        // Evict if at capacity
1249        if self.cache.len() >= self.max_entries {
1250            self.evict_lru();
1251        }
1252
1253        self.cache.insert(
1254            query_hash,
1255            CachedPlan {
1256                plan,
1257                hits: 0,
1258                last_used: Self::now(),
1259                time_saved_us: 0,
1260            },
1261        );
1262
1263        self.stats.entries = self.cache.len();
1264    }
1265
1266    /// Promote plan from frequency tracker to cache
1267    fn promote_to_cache(&mut self, query_hash: u64, plan: ExecutionPlan) {
1268        self.insert_to_cache(query_hash, plan);
1269        self.frequency.remove(&query_hash);
1270    }
1271
1272    /// Evict least recently used entry
1273    fn evict_lru(&mut self) {
1274        if let Some((&key, _)) = self.cache.iter().min_by_key(|(_, v)| v.last_used) {
1275            self.cache.remove(&key);
1276        }
1277    }
1278
1279    /// Cleanup old frequency tracker entries
1280    fn cleanup_frequency_tracker(&mut self) {
1281        let now = Self::now();
1282        let max_age = 60 * 1_000_000; // 1 minute
1283
1284        self.frequency.retain(|_, v| now - v.last_seen < max_age);
1285    }
1286
1287    /// Clear the cache
1288    pub fn clear(&mut self) {
1289        self.cache.clear();
1290        self.frequency.clear();
1291        self.stats = AdaptiveCacheStats::default();
1292    }
1293
1294    /// Get cache statistics (legacy compatibility)
1295    pub fn stats(&self) -> CacheStats {
1296        CacheStats {
1297            entries: self.cache.len(),
1298            total_hits: self.stats.total_hits,
1299        }
1300    }
1301
1302    /// Get enhanced statistics
1303    pub fn adaptive_stats(&self) -> &AdaptiveCacheStats {
1304        &self.stats
1305    }
1306
1307    fn now() -> u64 {
1308        std::time::SystemTime::now()
1309            .duration_since(std::time::UNIX_EPOCH)
1310            .unwrap()
1311            .as_micros() as u64
1312    }
1313}
1314
1315/// Cache statistics
1316#[derive(Debug, Clone, Default)]
1317pub struct CacheStats {
1318    /// Number of cached entries
1319    pub entries: usize,
1320    /// Total cache hits
1321    pub total_hits: usize,
1322}
1323
1324#[cfg(test)]
1325mod tests {
1326    use super::*;
1327    use crate::soch_ql::{Condition, LogicalOp, OrderBy, SortDirection};
1328
1329    #[test]
1330    fn test_predicate_extraction() {
1331        let executor = OptimizedExecutor::new();
1332
1333        let select = SelectQuery {
1334            table: "events".to_string(),
1335            columns: vec!["*".to_string()],
1336            where_clause: Some(WhereClause {
1337                conditions: vec![Condition {
1338                    column: "timestamp".to_string(),
1339                    operator: ComparisonOp::Ge,
1340                    value: SochValue::UInt(1700000000000000),
1341                }],
1342                operator: LogicalOp::And,
1343            }),
1344            order_by: None,
1345            limit: None,
1346            offset: None,
1347        };
1348
1349        let predicates = executor.extract_predicates(&select).unwrap();
1350        assert_eq!(predicates.len(), 1);
1351        assert!(matches!(predicates[0], QueryPredicate::TimeRange(_, _)));
1352    }
1353
1354    #[test]
1355    fn test_plan_with_time_index() {
1356        let mut executor = OptimizedExecutor::new();
1357        executor.update_table_stats(
1358            "events",
1359            TableStats {
1360                row_count: 1_000_000,
1361                has_time_index: true,
1362                ..Default::default()
1363            },
1364        );
1365
1366        let select = SelectQuery {
1367            table: "events".to_string(),
1368            columns: vec!["id".to_string(), "data".to_string()],
1369            where_clause: Some(WhereClause {
1370                conditions: vec![Condition {
1371                    column: "timestamp".to_string(),
1372                    operator: ComparisonOp::Ge,
1373                    value: SochValue::UInt(1700000000000000),
1374                }],
1375                operator: LogicalOp::And,
1376            }),
1377            order_by: None,
1378            limit: Some(100),
1379            offset: None,
1380        };
1381
1382        let catalog = Catalog::new("test");
1383        let plan = executor.plan_select(&select, &catalog).unwrap();
1384
1385        assert!(plan.execution_plan.estimated_cost > 0.0);
1386    }
1387
1388    #[test]
1389    fn test_plan_cache() {
1390        let mut cache = PlanCache::new(100);
1391
1392        let plan = ExecutionPlan {
1393            steps: vec![ExecutionStep::TableScan {
1394                table: "test".to_string(),
1395            }],
1396            estimated_cost: 100.0,
1397            estimated_rows: 1000,
1398        };
1399
1400        let query = "SELECT * FROM test";
1401        let hash = PlanCache::hash_query(query);
1402
1403        // Miss (not tracked yet)
1404        assert!(cache.get(hash).is_none());
1405
1406        // Put (with frequency-gated caching, needs 3 uses before cache)
1407        // Use 1: put sets count=1
1408        cache.put(hash, plan.clone());
1409        // Use 2: get increments to count=2 (still < threshold)
1410        assert!(cache.get(hash).is_none());
1411
1412        // Use 3: put increments to count=3, triggers promotion
1413        cache.put(hash, plan);
1414        // Now at threshold (3 uses), should be cached
1415        assert!(cache.get(hash).is_some());
1416
1417        let stats = cache.stats();
1418        assert_eq!(stats.entries, 1);
1419        assert_eq!(stats.total_hits, 1);
1420    }
1421
1422    #[test]
1423    fn test_force_cache() {
1424        let mut cache = PlanCache::new(100);
1425
1426        let plan = ExecutionPlan {
1427            steps: vec![ExecutionStep::TableScan {
1428                table: "test".to_string(),
1429            }],
1430            estimated_cost: 100.0,
1431            estimated_rows: 1000,
1432        };
1433
1434        let hash = PlanCache::hash_query("SELECT * FROM test2");
1435
1436        // Force put bypasses frequency threshold
1437        cache.force_put(hash, plan);
1438        assert!(cache.get(hash).is_some());
1439    }
1440
1441    #[test]
1442    fn test_explain() {
1443        let executor = OptimizedExecutor::new();
1444
1445        let select = SelectQuery {
1446            table: "users".to_string(),
1447            columns: vec!["id".to_string(), "name".to_string()],
1448            where_clause: None,
1449            order_by: Some(OrderBy {
1450                column: "id".to_string(),
1451                direction: SortDirection::Asc,
1452            }),
1453            limit: Some(10),
1454            offset: None,
1455        };
1456
1457        let catalog = Catalog::new("test");
1458        let explain = executor.explain(&select, &catalog).unwrap();
1459
1460        assert!(explain.contains("QUERY PLAN"));
1461        assert!(explain.contains("Execution Steps"));
1462    }
1463
1464    // ========================================================================
1465    // Task 11: CardinalityTracker Tests
1466    // ========================================================================
1467
1468    #[test]
1469    fn test_cardinality_tracker_basic() {
1470        let tracker = CardinalityTracker::new();
1471
1472        // Add 1000 unique values
1473        for i in 0u64..1000 {
1474            tracker.observe("events", "user_id", &i);
1475        }
1476
1477        let estimate = tracker.estimate("events", "user_id").unwrap();
1478
1479        // Should be within 5% of actual (HLL with precision=14 has ~0.81% SE)
1480        let error = (estimate.distinct as f64 - 1000.0).abs() / 1000.0;
1481        assert!(
1482            error < 0.05,
1483            "Cardinality error {}% exceeds 5%",
1484            error * 100.0
1485        );
1486        assert!(estimate.error_pct < 1.0, "Standard error should be < 1%");
1487    }
1488
1489    #[test]
1490    fn test_cardinality_tracker_multiple_columns() {
1491        let tracker = CardinalityTracker::new();
1492
1493        // High cardinality column
1494        for i in 0u64..10_000 {
1495            tracker.observe("events", "span_id", &i);
1496        }
1497
1498        // Low cardinality column
1499        for i in 0u64..1000 {
1500            tracker.observe("events", "project_id", &(i % 10));
1501        }
1502
1503        let span_estimate = tracker.estimate("events", "span_id").unwrap();
1504        let project_estimate = tracker.estimate("events", "project_id").unwrap();
1505
1506        // High cardinality should be ~10000
1507        let span_error = (span_estimate.distinct as f64 - 10000.0).abs() / 10000.0;
1508        assert!(span_error < 0.05, "span_id error {}%", span_error * 100.0);
1509
1510        // Low cardinality should be ~10
1511        let project_error = (project_estimate.distinct as f64 - 10.0).abs() / 10.0;
1512        assert!(
1513            project_error < 0.20,
1514            "project_id error {}%",
1515            project_error * 100.0
1516        );
1517    }
1518
1519    #[test]
1520    fn test_cardinality_drift_detection() {
1521        let tracker = CardinalityTracker::new();
1522
1523        // Initial state: 100 distinct values
1524        for i in 0u64..100 {
1525            tracker.observe("events", "user_id", &i);
1526        }
1527
1528        let mut cached = std::collections::HashMap::new();
1529        cached.insert("user_id".to_string(), 100usize);
1530
1531        // No drift yet
1532        assert!(!tracker.has_cardinality_drift("events", &cached));
1533
1534        // Add many more distinct values (50% more = drift)
1535        for i in 100u64..200 {
1536            tracker.observe("events", "user_id", &i);
1537        }
1538
1539        // Now ~100% increase, should exceed 20% threshold
1540        assert!(tracker.has_cardinality_drift("events", &cached));
1541    }
1542
1543    #[test]
1544    fn test_cardinality_tracker_memory() {
1545        let tracker = CardinalityTracker::new();
1546
1547        // Add data for multiple tables/columns
1548        for i in 0u64..1000 {
1549            tracker.observe("table1", "col1", &i);
1550            tracker.observe("table1", "col2", &i);
1551            tracker.observe("table2", "col1", &i);
1552        }
1553
1554        let stats = tracker.memory_usage();
1555        assert_eq!(stats.table_count, 2);
1556        assert_eq!(stats.column_count, 3);
1557        assert!(stats.memory_bytes > 0);
1558        assert!(stats.standard_error_pct < 1.0);
1559    }
1560
1561    #[test]
1562    fn test_executor_with_cardinality_tracker() {
1563        let tracker = Arc::new(CardinalityTracker::new());
1564
1565        // Simulate ingestion updating tracker
1566        for i in 0u64..500 {
1567            tracker.observe("events", "user_id", &i);
1568            tracker.observe("events", "span_id", &(i * 2));
1569        }
1570        tracker.increment_row_count("events", 500);
1571
1572        // Create executor with shared tracker
1573        let mut executor = OptimizedExecutor::with_cardinality_tracker(Arc::clone(&tracker));
1574
1575        // Refresh stats from tracker
1576        executor.refresh_stats_from_tracker("events");
1577
1578        // Verify stats were synced
1579        let stats = &executor.table_stats.get("events").unwrap();
1580        assert_eq!(stats.row_count, 500);
1581        assert!(stats.column_cardinalities.contains_key("user_id"));
1582        assert!(stats.column_cardinalities.contains_key("span_id"));
1583
1584        // Cardinality estimates should be reasonable
1585        let user_card = stats.column_cardinalities.get("user_id").unwrap();
1586        let error = (*user_card as f64 - 500.0).abs() / 500.0;
1587        assert!(error < 0.05, "user_id cardinality error {}%", error * 100.0);
1588    }
1589}