sochdb_query/
optimizer_integration.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Query Optimizer Integration (Task 10)
16//!
17//! Wires the QueryOptimizer into the SOCH-QL execution path for cost-based planning:
18//! - Converts SOCH-QL WHERE clauses to optimizer predicates
19//! - Uses cardinality hints from HyperLogLog sketches
20//! - Selects optimal index based on selectivity estimates
21//!
22//! ## Integration Flow
23//!
24//! ```text
25//! SOCH-QL Query
26//!     │
27//!     ▼
28//! ┌─────────────────┐
29//! │ Parse & Validate│
30//! └────────┬────────┘
31//!          │
32//!          ▼
33//! ┌─────────────────┐
34//! │ Extract         │
35//! │ Predicates      │ ← WHERE clause → QueryPredicate[]
36//! └────────┬────────┘
37//!          │
38//!          ▼
39//! ┌─────────────────┐
40//! │ QueryOptimizer  │
41//! │ .plan_query()   │ ← Cost-based index selection
42//! └────────┬────────┘
43//!          │
44//!          ▼
45//! ┌─────────────────┐
46//! │ Execute Plan    │
47//! └─────────────────┘
48//! ```
49//!
50//! ## Task 11: HyperLogLog Integration for Real-Time Cardinality
51//!
52//! The `CardinalityTracker` maintains HyperLogLog sketches per column for
53//! real-time cardinality estimation with <1% standard error.
54//!
55//! ```text
56//! On INSERT:
57//!   tracker.observe("column_name", value)  // O(1) HLL update
58//!
59//! On SELECT planning:
60//!   cardinality = tracker.estimate("column_name")  // O(1) estimate
61//!
62//! Math:
63//!   Standard error = 1.04 / sqrt(m) where m = 2^precision
64//!   For precision=14: SE = 0.81%, memory = 16KB per column
65//! ```
66
67use crate::query_optimizer::{
68    CardinalitySource, CostModel, IndexSelection, QueryOperation, QueryOptimizer,
69    QueryPlan as OptimizerPlan, QueryPredicate, TraversalDirection,
70};
71#[cfg(test)]
72use crate::soch_ql::{ComparisonOp, WhereClause};
73use crate::soch_ql::{SelectQuery, SochResult, SochValue};
74use parking_lot::RwLock;
75use std::collections::HashMap;
76use std::sync::Arc;
77use sochdb_core::{Catalog, Result};
78use sochdb_storage::sketches::HyperLogLog;
79
80// ============================================================================
81// Storage Backend Trait - Allows wiring optimizer to actual storage
82// ============================================================================
83
84/// Storage backend trait for executing optimized query plans
85///
86/// This trait abstracts the storage layer so the optimizer can execute
87/// plans without knowing the concrete storage implementation.
88pub trait StorageBackend: Send + Sync {
89    /// Execute a full table scan
90    fn table_scan(
91        &self,
92        table: &str,
93        columns: &[String],
94        predicate: Option<&str>,
95    ) -> Result<Vec<HashMap<String, SochValue>>>;
96
97    /// Execute a primary key lookup
98    fn primary_key_lookup(
99        &self,
100        table: &str,
101        key: &SochValue,
102    ) -> Result<Option<HashMap<String, SochValue>>>;
103
104    /// Execute a secondary index seek
105    fn secondary_index_seek(
106        &self,
107        table: &str,
108        index: &str,
109        key: &SochValue,
110    ) -> Result<Vec<HashMap<String, SochValue>>>;
111
112    /// Execute a time range scan
113    fn time_index_scan(
114        &self,
115        table: &str,
116        start_us: u64,
117        end_us: u64,
118    ) -> Result<Vec<HashMap<String, SochValue>>>;
119
120    /// Execute a vector similarity search
121    fn vector_search(
122        &self,
123        table: &str,
124        query: &[f32],
125        k: usize,
126    ) -> Result<Vec<(f32, HashMap<String, SochValue>)>>;
127
128    /// Get table row count (for optimization)
129    fn row_count(&self, table: &str) -> usize;
130}
131
132// ============================================================================
133// Task 11: CardinalityTracker - Real-Time HyperLogLog Integration
134// ============================================================================
135
136/// Real-time cardinality tracker using HyperLogLog sketches
137///
138/// Maintains per-column HLL sketches for sub-microsecond cardinality queries
139/// with <1% standard error.
140///
141/// ## Math
142///
143/// ```text
144/// Standard error = 1.04 / sqrt(2^precision)
145///
146/// Precision=14: SE = 0.81%, memory = 16KB per column (dense)
147/// Sparse mode: memory = O(cardinality) for low-cardinality columns
148/// ```
149///
150/// ## Thread Safety
151///
152/// Uses fine-grained locking per table for concurrent updates across
153/// multiple ingestion threads.
154pub struct CardinalityTracker {
155    /// HLL precision (4-18, default 14 for 0.81% error)
156    precision: u8,
157    /// Per-table column cardinality trackers
158    tables: RwLock<HashMap<String, TableCardinalityTracker>>,
159    /// Drift threshold for cache invalidation (0.20 = 20% change)
160    drift_threshold: f64,
161}
162
163/// Per-table cardinality tracking
164struct TableCardinalityTracker {
165    /// HLL sketch per column
166    columns: HashMap<String, HyperLogLog>,
167    /// Row count estimate
168    row_count: usize,
169    /// Last update timestamp
170    last_update_us: u64,
171}
172
173/// Cardinality estimate with confidence
174#[derive(Debug, Clone)]
175pub struct CardinalityEstimate {
176    /// Estimated distinct count
177    pub distinct: usize,
178    /// Standard error percentage
179    pub error_pct: f64,
180    /// Source of estimate
181    pub source: CardinalitySource,
182    /// Is this a fresh (recently updated) estimate?
183    pub is_fresh: bool,
184}
185
186impl CardinalityTracker {
187    /// Create a new tracker with default precision (14)
188    pub fn new() -> Self {
189        Self::with_precision(14)
190    }
191
192    /// Create with custom HLL precision
193    ///
194    /// Precision affects accuracy vs memory:
195    /// - 10: SE=3.25%, 1KB/column
196    /// - 12: SE=1.63%, 4KB/column  
197    /// - 14: SE=0.81%, 16KB/column (default)
198    /// - 16: SE=0.41%, 64KB/column
199    pub fn with_precision(precision: u8) -> Self {
200        assert!((4..=18).contains(&precision), "Precision must be 4-18");
201        Self {
202            precision,
203            tables: RwLock::new(HashMap::new()),
204            drift_threshold: 0.20, // 20% change triggers replan
205        }
206    }
207
208    /// Set drift threshold for cache invalidation
209    pub fn set_drift_threshold(&mut self, threshold: f64) {
210        self.drift_threshold = threshold;
211    }
212
213    /// Observe a value for a column (call on INSERT/UPDATE)
214    ///
215    /// O(1) operation - safe to call on every write.
216    pub fn observe<T: std::hash::Hash>(&self, table: &str, column: &str, value: &T) {
217        let mut tables = self.tables.write();
218        let tracker = tables
219            .entry(table.to_string())
220            .or_insert_with(|| TableCardinalityTracker {
221                columns: HashMap::new(),
222                row_count: 0,
223                last_update_us: Self::now(),
224            });
225
226        let hll = tracker
227            .columns
228            .entry(column.to_string())
229            .or_insert_with(|| HyperLogLog::new(self.precision));
230
231        hll.add(value);
232        tracker.last_update_us = Self::now();
233    }
234
235    /// Observe multiple values in batch (more efficient for bulk inserts)
236    pub fn observe_batch<T: std::hash::Hash>(
237        &self,
238        table: &str,
239        column: &str,
240        values: impl Iterator<Item = T>,
241    ) {
242        let mut tables = self.tables.write();
243        let tracker = tables
244            .entry(table.to_string())
245            .or_insert_with(|| TableCardinalityTracker {
246                columns: HashMap::new(),
247                row_count: 0,
248                last_update_us: Self::now(),
249            });
250
251        let hll = tracker
252            .columns
253            .entry(column.to_string())
254            .or_insert_with(|| HyperLogLog::new(self.precision));
255
256        for value in values {
257            hll.add(&value);
258        }
259        tracker.last_update_us = Self::now();
260    }
261
262    /// Increment row count for a table
263    pub fn increment_row_count(&self, table: &str, delta: usize) {
264        let mut tables = self.tables.write();
265        if let Some(tracker) = tables.get_mut(table) {
266            tracker.row_count = tracker.row_count.saturating_add(delta);
267        }
268    }
269
270    /// Estimate cardinality for a column
271    ///
272    /// O(1) operation - returns sub-microsecond.
273    pub fn estimate(&self, table: &str, column: &str) -> Option<CardinalityEstimate> {
274        let tables = self.tables.read();
275        let tracker = tables.get(table)?;
276        let hll = tracker.columns.get(column)?;
277
278        let distinct = hll.cardinality() as usize;
279        let error_pct = hll.standard_error();
280        let freshness_us = Self::now().saturating_sub(tracker.last_update_us);
281
282        Some(CardinalityEstimate {
283            distinct,
284            error_pct,
285            source: CardinalitySource::HyperLogLog,
286            // Consider fresh if updated within last minute
287            is_fresh: freshness_us < 60_000_000,
288        })
289    }
290
291    /// Get all column cardinalities for a table
292    pub fn get_table_cardinalities(&self, table: &str) -> HashMap<String, usize> {
293        let tables = self.tables.read();
294        tables
295            .get(table)
296            .map(|tracker| {
297                tracker
298                    .columns
299                    .iter()
300                    .map(|(col, hll)| (col.clone(), hll.cardinality() as usize))
301                    .collect()
302            })
303            .unwrap_or_default()
304    }
305
306    /// Get row count estimate for a table
307    pub fn get_row_count(&self, table: &str) -> usize {
308        self.tables
309            .read()
310            .get(table)
311            .map(|t| t.row_count)
312            .unwrap_or(0)
313    }
314
315    /// Check if cardinality has drifted beyond threshold
316    ///
317    /// Returns true if any column's cardinality has changed by more than
318    /// `drift_threshold` (default 20%).
319    pub fn has_cardinality_drift(
320        &self,
321        table: &str,
322        cached_cardinalities: &HashMap<String, usize>,
323    ) -> bool {
324        let tables = self.tables.read();
325        let tracker = match tables.get(table) {
326            Some(t) => t,
327            None => return true, // Table not tracked, consider stale
328        };
329
330        for (column, &cached) in cached_cardinalities {
331            if let Some(hll) = tracker.columns.get(column) {
332                let current = hll.cardinality();
333                if cached == 0 {
334                    if current > 0 {
335                        return true; // New data in empty column
336                    }
337                } else {
338                    let drift = (current as f64 - cached as f64).abs() / cached as f64;
339                    if drift > self.drift_threshold {
340                        return true;
341                    }
342                }
343            }
344        }
345
346        false
347    }
348
349    /// Merge HLL from another tracker (for distributed scenarios)
350    pub fn merge(&self, table: &str, column: &str, other_hll: &HyperLogLog) {
351        let mut tables = self.tables.write();
352        if let Some(tracker) = tables.get_mut(table)
353            && let Some(hll) = tracker.columns.get_mut(column)
354        {
355            hll.merge(other_hll);
356            tracker.last_update_us = Self::now();
357        }
358    }
359
360    /// Clear all tracking data for a table
361    pub fn clear_table(&self, table: &str) {
362        self.tables.write().remove(table);
363    }
364
365    /// Get memory usage statistics
366    pub fn memory_usage(&self) -> CardinalityTrackerStats {
367        let tables = self.tables.read();
368        let mut total_columns = 0;
369        let mut total_bytes = 0;
370
371        for tracker in tables.values() {
372            for hll in tracker.columns.values() {
373                total_columns += 1;
374                total_bytes += hll.memory_usage();
375            }
376        }
377
378        CardinalityTrackerStats {
379            table_count: tables.len(),
380            column_count: total_columns,
381            memory_bytes: total_bytes,
382            precision: self.precision,
383            standard_error_pct: 1.04 / (1usize << self.precision) as f64 * 100.0,
384        }
385    }
386
387    fn now() -> u64 {
388        std::time::SystemTime::now()
389            .duration_since(std::time::UNIX_EPOCH)
390            .unwrap()
391            .as_micros() as u64
392    }
393}
394
395impl Default for CardinalityTracker {
396    fn default() -> Self {
397        Self::new()
398    }
399}
400
401/// Statistics for cardinality tracker
402#[derive(Debug, Clone)]
403pub struct CardinalityTrackerStats {
404    /// Number of tables tracked
405    pub table_count: usize,
406    /// Total columns tracked across all tables
407    pub column_count: usize,
408    /// Total memory usage in bytes
409    pub memory_bytes: usize,
410    /// HLL precision
411    pub precision: u8,
412    /// Standard error percentage
413    pub standard_error_pct: f64,
414}
415
416/// Optimized query executor with cost-based planning
417pub struct OptimizedExecutor {
418    /// Query optimizer instance
419    optimizer: QueryOptimizer,
420    /// Table statistics cache
421    table_stats: HashMap<String, TableStats>,
422    /// Real-time cardinality tracker (Task 11)
423    cardinality_tracker: Arc<CardinalityTracker>,
424    /// Embedding provider for vector search (optional)
425    embedding_provider: Option<Arc<dyn crate::embedding_provider::EmbeddingProvider>>,
426}
427
428/// Statistics for a table
429#[derive(Debug, Clone, Default)]
430pub struct TableStats {
431    /// Estimated row count
432    pub row_count: usize,
433    /// Column cardinalities (distinct values)
434    pub column_cardinalities: HashMap<String, usize>,
435    /// Has time index
436    pub has_time_index: bool,
437    /// Has vector index
438    pub has_vector_index: bool,
439    /// Primary key column
440    pub primary_key: Option<String>,
441}
442
443impl OptimizedExecutor {
444    /// Create a new optimized executor
445    pub fn new() -> Self {
446        Self {
447            optimizer: QueryOptimizer::new(),
448            table_stats: HashMap::new(),
449            cardinality_tracker: Arc::new(CardinalityTracker::new()),
450            embedding_provider: None,
451        }
452    }
453
454    /// Create with custom cost model
455    pub fn with_cost_model(cost_model: CostModel) -> Self {
456        Self {
457            optimizer: QueryOptimizer::with_cost_model(cost_model),
458            table_stats: HashMap::new(),
459            cardinality_tracker: Arc::new(CardinalityTracker::new()),
460            embedding_provider: None,
461        }
462    }
463
464    /// Create with shared cardinality tracker (for integration with ingestion)
465    pub fn with_cardinality_tracker(tracker: Arc<CardinalityTracker>) -> Self {
466        Self {
467            optimizer: QueryOptimizer::new(),
468            table_stats: HashMap::new(),
469            cardinality_tracker: tracker,
470            embedding_provider: None,
471        }
472    }
473
474    /// Set embedding provider for vector search
475    pub fn set_embedding_provider(
476        &mut self,
477        provider: Arc<dyn crate::embedding_provider::EmbeddingProvider>,
478    ) {
479        self.embedding_provider = Some(provider);
480    }
481
482    /// Create with embedding provider
483    pub fn with_embedding_provider(
484        mut self,
485        provider: Arc<dyn crate::embedding_provider::EmbeddingProvider>,
486    ) -> Self {
487        self.embedding_provider = Some(provider);
488        self
489    }
490
491    /// Get the cardinality tracker for external updates (e.g., on INSERT)
492    pub fn cardinality_tracker(&self) -> Arc<CardinalityTracker> {
493        Arc::clone(&self.cardinality_tracker)
494    }
495
496    /// Update table statistics (call periodically or on schema change)
497    pub fn update_table_stats(&mut self, table: &str, stats: TableStats) {
498        let row_count = stats.row_count;
499        self.table_stats.insert(table.to_string(), stats);
500        self.optimizer
501            .update_total_edges(row_count, CardinalitySource::Exact);
502    }
503
504    /// Refresh stats from cardinality tracker
505    ///
506    /// Syncs real-time HLL estimates to the static stats cache.
507    pub fn refresh_stats_from_tracker(&mut self, table: &str) {
508        let cardinalities = self.cardinality_tracker.get_table_cardinalities(table);
509        let row_count = self.cardinality_tracker.get_row_count(table);
510
511        if let Some(stats) = self.table_stats.get_mut(table) {
512            stats.column_cardinalities = cardinalities;
513            if row_count > 0 {
514                stats.row_count = row_count;
515            }
516        } else {
517            self.table_stats.insert(
518                table.to_string(),
519                TableStats {
520                    row_count,
521                    column_cardinalities: cardinalities,
522                    ..Default::default()
523                },
524            );
525        }
526    }
527
528    /// Update column cardinality from HyperLogLog
529    pub fn update_cardinality_hint(
530        &mut self,
531        table: &str,
532        column: &str,
533        cardinality: usize,
534        _source: CardinalitySource,
535    ) {
536        if let Some(stats) = self.table_stats.get_mut(table) {
537            stats
538                .column_cardinalities
539                .insert(column.to_string(), cardinality);
540        }
541    }
542
543    /// Plan a SELECT query with cost-based optimization
544    pub fn plan_select(
545        &self,
546        select: &SelectQuery,
547        _catalog: &Catalog,
548    ) -> Result<OptimizedQueryPlan> {
549        // Extract predicates from WHERE clause
550        let predicates = self.extract_predicates(select)?;
551
552        // Get optimizer plan
553        let optimizer_plan = self.optimizer.plan_query(&predicates, select.limit);
554
555        // Convert to execution plan
556        let exec_plan = self.build_execution_plan(select, &optimizer_plan)?;
557
558        Ok(OptimizedQueryPlan {
559            table: select.table.clone(),
560            columns: select.columns.clone(),
561            execution_plan: exec_plan,
562            optimizer_plan,
563            predicates,
564        })
565    }
566
567    /// Extract predicates from SELECT query
568    fn extract_predicates(&self, select: &SelectQuery) -> Result<Vec<QueryPredicate>> {
569        let mut predicates = Vec::new();
570
571        if let Some(where_clause) = &select.where_clause {
572            for condition in &where_clause.conditions {
573                if let Some(pred) = self.condition_to_predicate(&condition.column, &condition.value)
574                {
575                    predicates.push(pred);
576                }
577            }
578        }
579
580        Ok(predicates)
581    }
582
583    /// Convert a condition to optimizer predicate
584    fn condition_to_predicate(&self, column: &str, value: &SochValue) -> Option<QueryPredicate> {
585        // Detect special column patterns
586        match column {
587            // Time-based columns
588            "timestamp" | "created_at" | "updated_at" | "time" => {
589                if let SochValue::UInt(ts) = value {
590                    // Assume range of 1 hour by default
591                    let hour_us = 60 * 60 * 1_000_000u64;
592                    return Some(QueryPredicate::TimeRange(*ts, ts + hour_us));
593                }
594            }
595            // Project ID
596            "project_id" | "project" => {
597                if let SochValue::UInt(id) = value {
598                    return Some(QueryPredicate::Project(*id as u16));
599                }
600            }
601            // Tenant ID
602            "tenant_id" | "tenant" => {
603                if let SochValue::UInt(id) = value {
604                    return Some(QueryPredicate::Tenant(*id as u32));
605                }
606            }
607            // Span type
608            "span_type" | "type" => {
609                if let SochValue::Text(s) = value {
610                    return Some(QueryPredicate::SpanType(s.clone()));
611                }
612            }
613            _ => {}
614        }
615
616        None
617    }
618
619    /// Build execution plan from optimizer plan
620    fn build_execution_plan(
621        &self,
622        select: &SelectQuery,
623        opt_plan: &OptimizerPlan,
624    ) -> Result<ExecutionPlan> {
625        let mut steps = Vec::new();
626
627        // Add scan/index step based on index selection
628        match &opt_plan.index_selection {
629            IndexSelection::LsmScan | IndexSelection::FullScan => {
630                steps.push(ExecutionStep::TableScan {
631                    table: select.table.clone(),
632                });
633            }
634            IndexSelection::TimeIndex => {
635                // Extract time range from operations
636                if let Some(QueryOperation::LsmRangeScan { start_us, end_us }) =
637                    opt_plan.operations.first()
638                {
639                    steps.push(ExecutionStep::TimeIndexScan {
640                        table: select.table.clone(),
641                        start_us: *start_us,
642                        end_us: *end_us,
643                    });
644                }
645            }
646            IndexSelection::VectorIndex => {
647                if let Some(QueryOperation::VectorSearch { k }) = opt_plan.operations.first() {
648                    // Extract query text from SIMILAR TO predicate in WHERE clause
649                    let query_text = self.extract_vector_query_text(select);
650                    steps.push(ExecutionStep::VectorSearch {
651                        table: select.table.clone(),
652                        k: *k,
653                        query_text,
654                    });
655                }
656            }
657            IndexSelection::CausalIndex => {
658                if let Some(QueryOperation::GraphTraversal {
659                    direction,
660                    max_depth,
661                }) = opt_plan.operations.first()
662                {
663                    steps.push(ExecutionStep::GraphTraversal {
664                        table: select.table.clone(),
665                        direction: *direction,
666                        max_depth: *max_depth,
667                    });
668                }
669            }
670            IndexSelection::ProjectIndex => {
671                steps.push(ExecutionStep::SecondaryIndexSeek {
672                    table: select.table.clone(),
673                    index: "project_idx".to_string(),
674                });
675            }
676            IndexSelection::PrimaryKey => {
677                steps.push(ExecutionStep::PrimaryKeyLookup {
678                    table: select.table.clone(),
679                });
680            }
681            IndexSelection::Secondary(idx) => {
682                steps.push(ExecutionStep::SecondaryIndexSeek {
683                    table: select.table.clone(),
684                    index: idx.clone(),
685                });
686            }
687            IndexSelection::MultiIndex(indexes) => {
688                // For multi-index, use intersection
689                steps.push(ExecutionStep::MultiIndexIntersect {
690                    table: select.table.clone(),
691                    indexes: indexes.iter().map(|idx| format!("{:?}", idx)).collect(),
692                });
693            }
694        }
695
696        // Add filter step if WHERE clause exists
697        if select.where_clause.is_some() {
698            steps.push(ExecutionStep::Filter {
699                predicate: format!("{:?}", select.where_clause),
700            });
701        }
702
703        // Add projection
704        if !select.columns.is_empty() && select.columns[0] != "*" {
705            steps.push(ExecutionStep::Project {
706                columns: select.columns.clone(),
707            });
708        }
709
710        // Add sort if ORDER BY exists
711        if let Some(order_by) = &select.order_by {
712            steps.push(ExecutionStep::Sort {
713                column: order_by.column.clone(),
714                ascending: order_by.direction == crate::soch_ql::SortDirection::Asc,
715            });
716        }
717
718        // Add limit if specified
719        if let Some(limit) = select.limit {
720            steps.push(ExecutionStep::Limit { count: limit });
721        }
722
723        Ok(ExecutionPlan {
724            steps,
725            estimated_cost: opt_plan.cost.total_cost,
726            estimated_rows: opt_plan.cost.records_returned,
727        })
728    }
729
730    /// Execute an optimized query plan against a storage backend
731    ///
732    /// This is the key method that wires the optimizer output to actual storage.
733    /// It interprets each ExecutionStep and calls the appropriate storage method.
734    pub fn execute<S: StorageBackend>(
735        &self,
736        plan: &OptimizedQueryPlan,
737        storage: &S,
738    ) -> Result<SochResult> {
739        let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
740        let mut columns_to_return = plan.columns.clone();
741
742        // Execute each step in order
743        for step in &plan.execution_plan.steps {
744            match step {
745                ExecutionStep::TableScan { table } => {
746                    // Full table scan - use storage backend
747                    let predicate = plan.execution_plan.steps.iter().find_map(|s| match s {
748                        ExecutionStep::Filter { predicate } => Some(predicate.as_str()),
749                        _ => None,
750                    });
751                    rows = storage.table_scan(table, &columns_to_return, predicate)?;
752                }
753                ExecutionStep::PrimaryKeyLookup { table } => {
754                    // Extract key from predicates
755                    if let Some(key) = self.extract_primary_key_from_predicates(&plan.predicates)
756                        && let Some(row) = storage.primary_key_lookup(table, &key)?
757                    {
758                        rows = vec![row];
759                    }
760                }
761                ExecutionStep::SecondaryIndexSeek { table, index } => {
762                    // Extract key from predicates for the indexed column
763                    if let Some(key) =
764                        self.extract_index_key_from_predicates(&plan.predicates, index)
765                    {
766                        rows = storage.secondary_index_seek(table, index, &key)?;
767                    }
768                }
769                ExecutionStep::TimeIndexScan {
770                    table,
771                    start_us,
772                    end_us,
773                } => {
774                    rows = storage.time_index_scan(table, *start_us, *end_us)?;
775                }
776                ExecutionStep::VectorSearch {
777                    table,
778                    k,
779                    query_text,
780                } => {
781                    // Generate real embedding from query text using embedding provider
782                    let query_embedding = match (query_text, &self.embedding_provider) {
783                        (Some(text), Some(provider)) => {
784                            // Use embedding provider to generate real embedding
785                            provider.embed(text).unwrap_or_else(|e| {
786                                tracing::warn!(
787                                    "Failed to generate embedding for '{}': {}. Using fallback.",
788                                    text,
789                                    e
790                                );
791                                // Fallback to zeros matching provider dimension
792                                vec![0.0f32; provider.dimension()]
793                            })
794                        }
795                        (Some(_text), None) => {
796                            // No embedding provider configured - use placeholder
797                            tracing::warn!(
798                                "Vector search requested but no embedding provider configured"
799                            );
800                            vec![0.0f32; 128] // Fallback dimension
801                        }
802                        (None, _) => {
803                            // No query text provided - use placeholder
804                            tracing::warn!("Vector search without query text, using placeholder");
805                            vec![0.0f32; 128] // Fallback dimension
806                        }
807                    };
808                    let results = storage.vector_search(table, &query_embedding, *k)?;
809                    rows = results.into_iter().map(|(_, row)| row).collect();
810                }
811                ExecutionStep::GraphTraversal {
812                    table,
813                    direction: _,
814                    max_depth: _,
815                } => {
816                    // Graph traversal - fallback to table scan for now
817                    rows = storage.table_scan(table, &columns_to_return, None)?;
818                }
819                ExecutionStep::MultiIndexIntersect { table, indexes } => {
820                    // Execute each index and intersect results
821                    let mut result_sets: Vec<Vec<HashMap<String, SochValue>>> = Vec::new();
822                    for index in indexes {
823                        if let Some(key) =
824                            self.extract_index_key_from_predicates(&plan.predicates, index)
825                        {
826                            result_sets.push(storage.secondary_index_seek(table, index, &key)?);
827                        }
828                    }
829                    // Intersect by checking row IDs (simplified - assumes "id" column)
830                    if !result_sets.is_empty() {
831                        rows = self.intersect_result_sets(result_sets);
832                    }
833                }
834                ExecutionStep::Filter { predicate: _ } => {
835                    // Filter already applied in scan, but can post-filter here if needed
836                    // For now, filtering is pushed to storage
837                }
838                ExecutionStep::Project { columns } => {
839                    columns_to_return = columns.clone();
840                    // Project columns from rows
841                    rows = rows
842                        .into_iter()
843                        .map(|row| {
844                            columns
845                                .iter()
846                                .filter_map(|c| row.get(c).map(|v| (c.clone(), v.clone())))
847                                .collect()
848                        })
849                        .collect();
850                }
851                ExecutionStep::Sort { column, ascending } => {
852                    rows.sort_by(|a, b| {
853                        let va = a.get(column);
854                        let vb = b.get(column);
855                        let cmp = Self::compare_values(va, vb);
856                        if *ascending { cmp } else { cmp.reverse() }
857                    });
858                }
859                ExecutionStep::Limit { count } => {
860                    rows.truncate(*count);
861                }
862            }
863        }
864
865        // Convert to SochResult
866        let result_rows: Vec<Vec<SochValue>> = rows
867            .iter()
868            .map(|row| {
869                columns_to_return
870                    .iter()
871                    .map(|c| row.get(c).cloned().unwrap_or(SochValue::Null))
872                    .collect()
873            })
874            .collect();
875
876        Ok(SochResult {
877            table: plan.table.clone(),
878            columns: columns_to_return,
879            rows: result_rows,
880        })
881    }
882
883    /// Extract primary key value from predicates
884    fn extract_primary_key_from_predicates(
885        &self,
886        predicates: &[QueryPredicate],
887    ) -> Option<SochValue> {
888        for pred in predicates {
889            // Look for ID predicates
890            if let QueryPredicate::Project(id) = pred {
891                return Some(SochValue::UInt(*id as u64));
892            }
893        }
894        None
895    }
896
897    /// Extract index key from predicates for a specific index
898    fn extract_index_key_from_predicates(
899        &self,
900        predicates: &[QueryPredicate],
901        _index: &str,
902    ) -> Option<SochValue> {
903        for pred in predicates {
904            match pred {
905                QueryPredicate::Tenant(id) => return Some(SochValue::UInt(*id as u64)),
906                QueryPredicate::Project(id) => return Some(SochValue::UInt(*id as u64)),
907                QueryPredicate::SpanType(s) => return Some(SochValue::Text(s.clone())),
908                _ => {}
909            }
910        }
911        None
912    }
913
914    /// Extract query text from SIMILAR TO predicate for vector search
915    ///
916    /// Looks for conditions like: `content SIMILAR TO 'search query text'`
917    /// Returns the query text to be embedded for similarity search.
918    fn extract_vector_query_text(&self, select: &SelectQuery) -> Option<String> {
919        use crate::soch_ql::ComparisonOp;
920        
921        if let Some(where_clause) = &select.where_clause {
922            for condition in &where_clause.conditions {
923                if matches!(condition.operator, ComparisonOp::SimilarTo) {
924                    // Extract the text value from the condition
925                    if let SochValue::Text(query_text) = &condition.value {
926                        return Some(query_text.clone());
927                    }
928                }
929            }
930        }
931        None
932    }
933
934    /// Intersect multiple result sets by common IDs
935    fn intersect_result_sets(
936        &self,
937        sets: Vec<Vec<HashMap<String, SochValue>>>,
938    ) -> Vec<HashMap<String, SochValue>> {
939        if sets.is_empty() {
940            return Vec::new();
941        }
942        if sets.len() == 1 {
943            return sets.into_iter().next().unwrap();
944        }
945
946        // Use first set as base, filter by presence in other sets
947        let mut base = sets.into_iter().next().unwrap();
948        // Simplified intersection - in production, use row IDs
949        base.truncate(base.len().min(100)); // Cap for safety
950        base
951    }
952
953    /// Compare SochValue for sorting
954    fn compare_values(a: Option<&SochValue>, b: Option<&SochValue>) -> std::cmp::Ordering {
955        match (a, b) {
956            (None, None) => std::cmp::Ordering::Equal,
957            (None, Some(_)) => std::cmp::Ordering::Less,
958            (Some(_), None) => std::cmp::Ordering::Greater,
959            (Some(va), Some(vb)) => match (va, vb) {
960                (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
961                (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
962                (SochValue::Float(a), SochValue::Float(b)) => {
963                    a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
964                }
965                (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
966                (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
967                _ => std::cmp::Ordering::Equal,
968            },
969        }
970    }
971
972    /// Explain a query plan (for debugging)
973    pub fn explain(&self, select: &SelectQuery, catalog: &Catalog) -> Result<String> {
974        let plan = self.plan_select(select, catalog)?;
975
976        let mut output = String::new();
977        output.push_str(&format!(
978            "QUERY PLAN (estimated cost: {:.2}, rows: {})\n",
979            plan.optimizer_plan.cost.total_cost, plan.optimizer_plan.cost.records_returned
980        ));
981        output.push_str(&format!(
982            "Index Selection: {:?}\n",
983            plan.optimizer_plan.index_selection
984        ));
985        output.push_str("Execution Steps:\n");
986
987        for (i, step) in plan.execution_plan.steps.iter().enumerate() {
988            output.push_str(&format!("  {}. {:?}\n", i + 1, step));
989        }
990
991        output.push_str("\nCost Breakdown:\n");
992        for (op, cost) in &plan.optimizer_plan.cost.breakdown {
993            output.push_str(&format!("  {:?}: {:.2}\n", op, cost));
994        }
995
996        Ok(output)
997    }
998}
999
1000impl Default for OptimizedExecutor {
1001    fn default() -> Self {
1002        Self::new()
1003    }
1004}
1005
1006/// Optimized query plan with cost estimates
1007#[derive(Debug)]
1008pub struct OptimizedQueryPlan {
1009    /// Target table
1010    pub table: String,
1011    /// Columns to return
1012    pub columns: Vec<String>,
1013    /// Execution plan
1014    pub execution_plan: ExecutionPlan,
1015    /// Optimizer's plan (for debugging)
1016    pub optimizer_plan: OptimizerPlan,
1017    /// Extracted predicates
1018    pub predicates: Vec<QueryPredicate>,
1019}
1020
1021/// Execution plan with ordered steps
1022#[derive(Debug, Clone)]
1023pub struct ExecutionPlan {
1024    /// Ordered execution steps
1025    pub steps: Vec<ExecutionStep>,
1026    /// Estimated cost
1027    pub estimated_cost: f64,
1028    /// Estimated output rows
1029    pub estimated_rows: usize,
1030}
1031
1032/// Single execution step
1033#[derive(Debug, Clone)]
1034pub enum ExecutionStep {
1035    /// Full table scan
1036    TableScan { table: String },
1037    /// Primary key lookup
1038    PrimaryKeyLookup { table: String },
1039    /// Time-based index scan
1040    TimeIndexScan {
1041        table: String,
1042        start_us: u64,
1043        end_us: u64,
1044    },
1045    /// Vector similarity search
1046    VectorSearch {
1047        table: String,
1048        k: usize,
1049        /// Query text to embed for similarity search.
1050        /// If None, falls back to placeholder (for backwards compat).
1051        query_text: Option<String>,
1052    },
1053    /// Graph traversal
1054    GraphTraversal {
1055        table: String,
1056        direction: TraversalDirection,
1057        max_depth: usize,
1058    },
1059    /// Secondary index seek
1060    SecondaryIndexSeek { table: String, index: String },
1061    /// Multi-index intersection
1062    MultiIndexIntersect { table: String, indexes: Vec<String> },
1063    /// Filter rows
1064    Filter { predicate: String },
1065    /// Project columns
1066    Project { columns: Vec<String> },
1067    /// Sort results
1068    Sort { column: String, ascending: bool },
1069    /// Limit output
1070    Limit { count: usize },
1071}
1072
1073/// Query plan cache for repeated queries
1074///
1075/// ## Task 5 Enhancement: Frequency-Gated Caching
1076///
1077/// Plans are only cached after being used 3+ times to avoid
1078/// polluting the cache with one-off queries.
1079pub struct PlanCache {
1080    /// Cached plans by query hash
1081    cache: HashMap<u64, CachedPlan>,
1082    /// Frequency tracker for uncached queries
1083    frequency: HashMap<u64, FrequencyEntry>,
1084    /// Maximum cache entries
1085    max_entries: usize,
1086    /// Cache threshold (number of uses before caching)
1087    cache_threshold: usize,
1088    /// Statistics
1089    stats: AdaptiveCacheStats,
1090}
1091
1092/// Cached query plan
1093#[derive(Debug, Clone)]
1094struct CachedPlan {
1095    /// The execution plan
1096    plan: ExecutionPlan,
1097    /// Cache hit count
1098    hits: usize,
1099    /// Last used timestamp
1100    last_used: u64,
1101    /// Time saved by caching (cumulative planning time avoided)
1102    time_saved_us: u64,
1103}
1104
1105/// Frequency tracking entry
1106#[derive(Debug, Clone)]
1107struct FrequencyEntry {
1108    /// Number of times query was seen
1109    count: usize,
1110    /// First seen timestamp
1111    #[allow(dead_code)]
1112    first_seen: u64,
1113    /// Most recent timestamp
1114    last_seen: u64,
1115    /// The plan (saved but not cached until threshold)
1116    pending_plan: Option<ExecutionPlan>,
1117}
1118
1119/// Enhanced cache statistics
1120#[derive(Debug, Clone, Default)]
1121pub struct AdaptiveCacheStats {
1122    /// Number of cached entries
1123    pub entries: usize,
1124    /// Total cache hits
1125    pub total_hits: usize,
1126    /// Total cache misses
1127    pub total_misses: usize,
1128    /// Queries blocked from cache (below threshold)
1129    pub frequency_blocked: usize,
1130    /// Queries promoted to cache
1131    pub promotions: usize,
1132    /// Estimated time saved (microseconds)
1133    pub time_saved_us: u64,
1134}
1135
1136impl PlanCache {
1137    /// Create a new plan cache with default threshold (3)
1138    pub fn new(max_entries: usize) -> Self {
1139        Self::with_threshold(max_entries, 3)
1140    }
1141
1142    /// Create with custom frequency threshold
1143    pub fn with_threshold(max_entries: usize, cache_threshold: usize) -> Self {
1144        Self {
1145            cache: HashMap::new(),
1146            frequency: HashMap::new(),
1147            max_entries,
1148            cache_threshold,
1149            stats: AdaptiveCacheStats::default(),
1150        }
1151    }
1152
1153    /// Hash a query for caching
1154    pub fn hash_query(query: &str) -> u64 {
1155        use std::hash::{Hash, Hasher};
1156        let mut hasher = std::collections::hash_map::DefaultHasher::new();
1157        query.hash(&mut hasher);
1158        hasher.finish()
1159    }
1160
1161    /// Get cached plan with frequency tracking
1162    ///
1163    /// Returns cached plan if available, or None.
1164    /// If query is seen frequently but not cached, attempts promotion.
1165    pub fn get(&mut self, query_hash: u64) -> Option<&ExecutionPlan> {
1166        // Check cache first
1167        if self.cache.contains_key(&query_hash) {
1168            if let Some(cached) = self.cache.get_mut(&query_hash) {
1169                cached.hits += 1;
1170                cached.last_used = Self::now();
1171                cached.time_saved_us += 1000; // Assume 1ms planning time saved
1172                self.stats.total_hits += 1;
1173            }
1174            return self.cache.get(&query_hash).map(|c| &c.plan);
1175        }
1176
1177        self.stats.total_misses += 1;
1178
1179        // Check frequency tracker and promote if needed
1180        let should_promote = if let Some(freq) = self.frequency.get_mut(&query_hash) {
1181            freq.count += 1;
1182            freq.last_seen = Self::now();
1183            freq.count >= self.cache_threshold && freq.pending_plan.is_some()
1184        } else {
1185            false
1186        };
1187
1188        if should_promote
1189            && let Some(freq) = self.frequency.remove(&query_hash)
1190            && let Some(plan) = freq.pending_plan
1191        {
1192            self.insert_to_cache(query_hash, plan);
1193            self.stats.promotions += 1;
1194            return self.cache.get(&query_hash).map(|c| &c.plan);
1195        }
1196
1197        None
1198    }
1199
1200    /// Register a plan for potential caching
1201    ///
1202    /// Does not immediately cache - waits for frequency threshold.
1203    pub fn put(&mut self, query_hash: u64, plan: ExecutionPlan) {
1204        let now = Self::now();
1205
1206        // Check if already tracking frequency
1207        if let Some(freq) = self.frequency.get_mut(&query_hash) {
1208            freq.count += 1;
1209            freq.last_seen = now;
1210            freq.pending_plan = Some(plan.clone());
1211
1212            // Promote if threshold reached
1213            if freq.count >= self.cache_threshold {
1214                self.promote_to_cache(query_hash, plan);
1215                self.stats.promotions += 1;
1216            } else {
1217                self.stats.frequency_blocked += 1;
1218            }
1219        } else {
1220            // First time seeing this query
1221            self.frequency.insert(
1222                query_hash,
1223                FrequencyEntry {
1224                    count: 1,
1225                    first_seen: now,
1226                    last_seen: now,
1227                    pending_plan: Some(plan),
1228                },
1229            );
1230            self.stats.frequency_blocked += 1;
1231        }
1232
1233        // Clean up old frequency entries
1234        self.cleanup_frequency_tracker();
1235    }
1236
1237    /// Force-cache a plan (bypasses frequency check)
1238    pub fn force_put(&mut self, query_hash: u64, plan: ExecutionPlan) {
1239        self.insert_to_cache(query_hash, plan);
1240        self.frequency.remove(&query_hash);
1241    }
1242
1243    /// Insert plan directly into cache (internal helper)
1244    fn insert_to_cache(&mut self, query_hash: u64, plan: ExecutionPlan) {
1245        // Evict if at capacity
1246        if self.cache.len() >= self.max_entries {
1247            self.evict_lru();
1248        }
1249
1250        self.cache.insert(
1251            query_hash,
1252            CachedPlan {
1253                plan,
1254                hits: 0,
1255                last_used: Self::now(),
1256                time_saved_us: 0,
1257            },
1258        );
1259
1260        self.stats.entries = self.cache.len();
1261    }
1262
1263    /// Promote plan from frequency tracker to cache
1264    fn promote_to_cache(&mut self, query_hash: u64, plan: ExecutionPlan) {
1265        self.insert_to_cache(query_hash, plan);
1266        self.frequency.remove(&query_hash);
1267    }
1268
1269    /// Evict least recently used entry
1270    fn evict_lru(&mut self) {
1271        if let Some((&key, _)) = self.cache.iter().min_by_key(|(_, v)| v.last_used) {
1272            self.cache.remove(&key);
1273        }
1274    }
1275
1276    /// Cleanup old frequency tracker entries
1277    fn cleanup_frequency_tracker(&mut self) {
1278        let now = Self::now();
1279        let max_age = 60 * 1_000_000; // 1 minute
1280
1281        self.frequency.retain(|_, v| now - v.last_seen < max_age);
1282    }
1283
1284    /// Clear the cache
1285    pub fn clear(&mut self) {
1286        self.cache.clear();
1287        self.frequency.clear();
1288        self.stats = AdaptiveCacheStats::default();
1289    }
1290
1291    /// Get cache statistics (legacy compatibility)
1292    pub fn stats(&self) -> CacheStats {
1293        CacheStats {
1294            entries: self.cache.len(),
1295            total_hits: self.stats.total_hits,
1296        }
1297    }
1298
1299    /// Get enhanced statistics
1300    pub fn adaptive_stats(&self) -> &AdaptiveCacheStats {
1301        &self.stats
1302    }
1303
1304    fn now() -> u64 {
1305        std::time::SystemTime::now()
1306            .duration_since(std::time::UNIX_EPOCH)
1307            .unwrap()
1308            .as_micros() as u64
1309    }
1310}
1311
1312/// Cache statistics
1313#[derive(Debug, Clone, Default)]
1314pub struct CacheStats {
1315    /// Number of cached entries
1316    pub entries: usize,
1317    /// Total cache hits
1318    pub total_hits: usize,
1319}
1320
1321#[cfg(test)]
1322mod tests {
1323    use super::*;
1324    use crate::soch_ql::{Condition, LogicalOp, OrderBy, SortDirection};
1325
1326    #[test]
1327    fn test_predicate_extraction() {
1328        let executor = OptimizedExecutor::new();
1329
1330        let select = SelectQuery {
1331            table: "events".to_string(),
1332            columns: vec!["*".to_string()],
1333            where_clause: Some(WhereClause {
1334                conditions: vec![Condition {
1335                    column: "timestamp".to_string(),
1336                    operator: ComparisonOp::Ge,
1337                    value: SochValue::UInt(1700000000000000),
1338                }],
1339                operator: LogicalOp::And,
1340            }),
1341            order_by: None,
1342            limit: None,
1343            offset: None,
1344        };
1345
1346        let predicates = executor.extract_predicates(&select).unwrap();
1347        assert_eq!(predicates.len(), 1);
1348        assert!(matches!(predicates[0], QueryPredicate::TimeRange(_, _)));
1349    }
1350
1351    #[test]
1352    fn test_plan_with_time_index() {
1353        let mut executor = OptimizedExecutor::new();
1354        executor.update_table_stats(
1355            "events",
1356            TableStats {
1357                row_count: 1_000_000,
1358                has_time_index: true,
1359                ..Default::default()
1360            },
1361        );
1362
1363        let select = SelectQuery {
1364            table: "events".to_string(),
1365            columns: vec!["id".to_string(), "data".to_string()],
1366            where_clause: Some(WhereClause {
1367                conditions: vec![Condition {
1368                    column: "timestamp".to_string(),
1369                    operator: ComparisonOp::Ge,
1370                    value: SochValue::UInt(1700000000000000),
1371                }],
1372                operator: LogicalOp::And,
1373            }),
1374            order_by: None,
1375            limit: Some(100),
1376            offset: None,
1377        };
1378
1379        let catalog = Catalog::new("test");
1380        let plan = executor.plan_select(&select, &catalog).unwrap();
1381
1382        assert!(plan.execution_plan.estimated_cost > 0.0);
1383    }
1384
1385    #[test]
1386    fn test_plan_cache() {
1387        let mut cache = PlanCache::new(100);
1388
1389        let plan = ExecutionPlan {
1390            steps: vec![ExecutionStep::TableScan {
1391                table: "test".to_string(),
1392            }],
1393            estimated_cost: 100.0,
1394            estimated_rows: 1000,
1395        };
1396
1397        let query = "SELECT * FROM test";
1398        let hash = PlanCache::hash_query(query);
1399
1400        // Miss (not tracked yet)
1401        assert!(cache.get(hash).is_none());
1402
1403        // Put (with frequency-gated caching, needs 3 uses before cache)
1404        // Use 1: put sets count=1
1405        cache.put(hash, plan.clone());
1406        // Use 2: get increments to count=2 (still < threshold)
1407        assert!(cache.get(hash).is_none());
1408
1409        // Use 3: put increments to count=3, triggers promotion
1410        cache.put(hash, plan);
1411        // Now at threshold (3 uses), should be cached
1412        assert!(cache.get(hash).is_some());
1413
1414        let stats = cache.stats();
1415        assert_eq!(stats.entries, 1);
1416        assert_eq!(stats.total_hits, 1);
1417    }
1418
1419    #[test]
1420    fn test_force_cache() {
1421        let mut cache = PlanCache::new(100);
1422
1423        let plan = ExecutionPlan {
1424            steps: vec![ExecutionStep::TableScan {
1425                table: "test".to_string(),
1426            }],
1427            estimated_cost: 100.0,
1428            estimated_rows: 1000,
1429        };
1430
1431        let hash = PlanCache::hash_query("SELECT * FROM test2");
1432
1433        // Force put bypasses frequency threshold
1434        cache.force_put(hash, plan);
1435        assert!(cache.get(hash).is_some());
1436    }
1437
1438    #[test]
1439    fn test_explain() {
1440        let executor = OptimizedExecutor::new();
1441
1442        let select = SelectQuery {
1443            table: "users".to_string(),
1444            columns: vec!["id".to_string(), "name".to_string()],
1445            where_clause: None,
1446            order_by: Some(OrderBy {
1447                column: "id".to_string(),
1448                direction: SortDirection::Asc,
1449            }),
1450            limit: Some(10),
1451            offset: None,
1452        };
1453
1454        let catalog = Catalog::new("test");
1455        let explain = executor.explain(&select, &catalog).unwrap();
1456
1457        assert!(explain.contains("QUERY PLAN"));
1458        assert!(explain.contains("Execution Steps"));
1459    }
1460
1461    // ========================================================================
1462    // Task 11: CardinalityTracker Tests
1463    // ========================================================================
1464
1465    #[test]
1466    fn test_cardinality_tracker_basic() {
1467        let tracker = CardinalityTracker::new();
1468
1469        // Add 1000 unique values
1470        for i in 0u64..1000 {
1471            tracker.observe("events", "user_id", &i);
1472        }
1473
1474        let estimate = tracker.estimate("events", "user_id").unwrap();
1475
1476        // Should be within 5% of actual (HLL with precision=14 has ~0.81% SE)
1477        let error = (estimate.distinct as f64 - 1000.0).abs() / 1000.0;
1478        assert!(
1479            error < 0.05,
1480            "Cardinality error {}% exceeds 5%",
1481            error * 100.0
1482        );
1483        assert!(estimate.error_pct < 1.0, "Standard error should be < 1%");
1484    }
1485
1486    #[test]
1487    fn test_cardinality_tracker_multiple_columns() {
1488        let tracker = CardinalityTracker::new();
1489
1490        // High cardinality column
1491        for i in 0u64..10_000 {
1492            tracker.observe("events", "span_id", &i);
1493        }
1494
1495        // Low cardinality column
1496        for i in 0u64..1000 {
1497            tracker.observe("events", "project_id", &(i % 10));
1498        }
1499
1500        let span_estimate = tracker.estimate("events", "span_id").unwrap();
1501        let project_estimate = tracker.estimate("events", "project_id").unwrap();
1502
1503        // High cardinality should be ~10000
1504        let span_error = (span_estimate.distinct as f64 - 10000.0).abs() / 10000.0;
1505        assert!(span_error < 0.05, "span_id error {}%", span_error * 100.0);
1506
1507        // Low cardinality should be ~10
1508        let project_error = (project_estimate.distinct as f64 - 10.0).abs() / 10.0;
1509        assert!(
1510            project_error < 0.20,
1511            "project_id error {}%",
1512            project_error * 100.0
1513        );
1514    }
1515
1516    #[test]
1517    fn test_cardinality_drift_detection() {
1518        let tracker = CardinalityTracker::new();
1519
1520        // Initial state: 100 distinct values
1521        for i in 0u64..100 {
1522            tracker.observe("events", "user_id", &i);
1523        }
1524
1525        let mut cached = std::collections::HashMap::new();
1526        cached.insert("user_id".to_string(), 100usize);
1527
1528        // No drift yet
1529        assert!(!tracker.has_cardinality_drift("events", &cached));
1530
1531        // Add many more distinct values (50% more = drift)
1532        for i in 100u64..200 {
1533            tracker.observe("events", "user_id", &i);
1534        }
1535
1536        // Now ~100% increase, should exceed 20% threshold
1537        assert!(tracker.has_cardinality_drift("events", &cached));
1538    }
1539
1540    #[test]
1541    fn test_cardinality_tracker_memory() {
1542        let tracker = CardinalityTracker::new();
1543
1544        // Add data for multiple tables/columns
1545        for i in 0u64..1000 {
1546            tracker.observe("table1", "col1", &i);
1547            tracker.observe("table1", "col2", &i);
1548            tracker.observe("table2", "col1", &i);
1549        }
1550
1551        let stats = tracker.memory_usage();
1552        assert_eq!(stats.table_count, 2);
1553        assert_eq!(stats.column_count, 3);
1554        assert!(stats.memory_bytes > 0);
1555        assert!(stats.standard_error_pct < 1.0);
1556    }
1557
1558    #[test]
1559    fn test_executor_with_cardinality_tracker() {
1560        let tracker = Arc::new(CardinalityTracker::new());
1561
1562        // Simulate ingestion updating tracker
1563        for i in 0u64..500 {
1564            tracker.observe("events", "user_id", &i);
1565            tracker.observe("events", "span_id", &(i * 2));
1566        }
1567        tracker.increment_row_count("events", 500);
1568
1569        // Create executor with shared tracker
1570        let mut executor = OptimizedExecutor::with_cardinality_tracker(Arc::clone(&tracker));
1571
1572        // Refresh stats from tracker
1573        executor.refresh_stats_from_tracker("events");
1574
1575        // Verify stats were synced
1576        let stats = &executor.table_stats.get("events").unwrap();
1577        assert_eq!(stats.row_count, 500);
1578        assert!(stats.column_cardinalities.contains_key("user_id"));
1579        assert!(stats.column_cardinalities.contains_key("span_id"));
1580
1581        // Cardinality estimates should be reasonable
1582        let user_card = stats.column_cardinalities.get("user_id").unwrap();
1583        let error = (*user_card as f64 - 500.0).abs() / 500.0;
1584        assert!(error < 0.05, "user_id cardinality error {}%", error * 100.0);
1585    }
1586}