1use crate::query_optimizer::{
68 CardinalitySource, CostModel, IndexSelection, QueryOperation, QueryOptimizer,
69 QueryPlan as OptimizerPlan, QueryPredicate, TraversalDirection,
70};
71#[cfg(test)]
72use crate::soch_ql::{ComparisonOp, WhereClause};
73use crate::soch_ql::{SelectQuery, SochResult, SochValue};
74use parking_lot::RwLock;
75use std::collections::HashMap;
76use std::sync::Arc;
77use sochdb_core::{Catalog, Result};
78use sochdb_storage::sketches::HyperLogLog;
79
80pub trait StorageBackend: Send + Sync {
89 fn table_scan(
91 &self,
92 table: &str,
93 columns: &[String],
94 predicate: Option<&str>,
95 ) -> Result<Vec<HashMap<String, SochValue>>>;
96
97 fn primary_key_lookup(
99 &self,
100 table: &str,
101 key: &SochValue,
102 ) -> Result<Option<HashMap<String, SochValue>>>;
103
104 fn secondary_index_seek(
106 &self,
107 table: &str,
108 index: &str,
109 key: &SochValue,
110 ) -> Result<Vec<HashMap<String, SochValue>>>;
111
112 fn time_index_scan(
114 &self,
115 table: &str,
116 start_us: u64,
117 end_us: u64,
118 ) -> Result<Vec<HashMap<String, SochValue>>>;
119
120 fn vector_search(
122 &self,
123 table: &str,
124 query: &[f32],
125 k: usize,
126 ) -> Result<Vec<(f32, HashMap<String, SochValue>)>>;
127
128 fn row_count(&self, table: &str) -> usize;
130}
131
132pub struct CardinalityTracker {
155 precision: u8,
157 tables: RwLock<HashMap<String, TableCardinalityTracker>>,
159 drift_threshold: f64,
161}
162
163struct TableCardinalityTracker {
165 columns: HashMap<String, HyperLogLog>,
167 row_count: usize,
169 last_update_us: u64,
171}
172
173#[derive(Debug, Clone)]
175pub struct CardinalityEstimate {
176 pub distinct: usize,
178 pub error_pct: f64,
180 pub source: CardinalitySource,
182 pub is_fresh: bool,
184}
185
186impl CardinalityTracker {
187 pub fn new() -> Self {
189 Self::with_precision(14)
190 }
191
192 pub fn with_precision(precision: u8) -> Self {
200 assert!((4..=18).contains(&precision), "Precision must be 4-18");
201 Self {
202 precision,
203 tables: RwLock::new(HashMap::new()),
204 drift_threshold: 0.20, }
206 }
207
208 pub fn set_drift_threshold(&mut self, threshold: f64) {
210 self.drift_threshold = threshold;
211 }
212
213 pub fn observe<T: std::hash::Hash>(&self, table: &str, column: &str, value: &T) {
217 let mut tables = self.tables.write();
218 let tracker = tables
219 .entry(table.to_string())
220 .or_insert_with(|| TableCardinalityTracker {
221 columns: HashMap::new(),
222 row_count: 0,
223 last_update_us: Self::now(),
224 });
225
226 let hll = tracker
227 .columns
228 .entry(column.to_string())
229 .or_insert_with(|| HyperLogLog::new(self.precision));
230
231 hll.add(value);
232 tracker.last_update_us = Self::now();
233 }
234
235 pub fn observe_batch<T: std::hash::Hash>(
237 &self,
238 table: &str,
239 column: &str,
240 values: impl Iterator<Item = T>,
241 ) {
242 let mut tables = self.tables.write();
243 let tracker = tables
244 .entry(table.to_string())
245 .or_insert_with(|| TableCardinalityTracker {
246 columns: HashMap::new(),
247 row_count: 0,
248 last_update_us: Self::now(),
249 });
250
251 let hll = tracker
252 .columns
253 .entry(column.to_string())
254 .or_insert_with(|| HyperLogLog::new(self.precision));
255
256 for value in values {
257 hll.add(&value);
258 }
259 tracker.last_update_us = Self::now();
260 }
261
262 pub fn increment_row_count(&self, table: &str, delta: usize) {
264 let mut tables = self.tables.write();
265 if let Some(tracker) = tables.get_mut(table) {
266 tracker.row_count = tracker.row_count.saturating_add(delta);
267 }
268 }
269
270 pub fn estimate(&self, table: &str, column: &str) -> Option<CardinalityEstimate> {
274 let tables = self.tables.read();
275 let tracker = tables.get(table)?;
276 let hll = tracker.columns.get(column)?;
277
278 let distinct = hll.cardinality() as usize;
279 let error_pct = hll.standard_error();
280 let freshness_us = Self::now().saturating_sub(tracker.last_update_us);
281
282 Some(CardinalityEstimate {
283 distinct,
284 error_pct,
285 source: CardinalitySource::HyperLogLog,
286 is_fresh: freshness_us < 60_000_000,
288 })
289 }
290
291 pub fn get_table_cardinalities(&self, table: &str) -> HashMap<String, usize> {
293 let tables = self.tables.read();
294 tables
295 .get(table)
296 .map(|tracker| {
297 tracker
298 .columns
299 .iter()
300 .map(|(col, hll)| (col.clone(), hll.cardinality() as usize))
301 .collect()
302 })
303 .unwrap_or_default()
304 }
305
306 pub fn get_row_count(&self, table: &str) -> usize {
308 self.tables
309 .read()
310 .get(table)
311 .map(|t| t.row_count)
312 .unwrap_or(0)
313 }
314
315 pub fn has_cardinality_drift(
320 &self,
321 table: &str,
322 cached_cardinalities: &HashMap<String, usize>,
323 ) -> bool {
324 let tables = self.tables.read();
325 let tracker = match tables.get(table) {
326 Some(t) => t,
327 None => return true, };
329
330 for (column, &cached) in cached_cardinalities {
331 if let Some(hll) = tracker.columns.get(column) {
332 let current = hll.cardinality();
333 if cached == 0 {
334 if current > 0 {
335 return true; }
337 } else {
338 let drift = (current as f64 - cached as f64).abs() / cached as f64;
339 if drift > self.drift_threshold {
340 return true;
341 }
342 }
343 }
344 }
345
346 false
347 }
348
349 pub fn merge(&self, table: &str, column: &str, other_hll: &HyperLogLog) {
351 let mut tables = self.tables.write();
352 if let Some(tracker) = tables.get_mut(table)
353 && let Some(hll) = tracker.columns.get_mut(column)
354 {
355 hll.merge(other_hll);
356 tracker.last_update_us = Self::now();
357 }
358 }
359
360 pub fn clear_table(&self, table: &str) {
362 self.tables.write().remove(table);
363 }
364
365 pub fn memory_usage(&self) -> CardinalityTrackerStats {
367 let tables = self.tables.read();
368 let mut total_columns = 0;
369 let mut total_bytes = 0;
370
371 for tracker in tables.values() {
372 for hll in tracker.columns.values() {
373 total_columns += 1;
374 total_bytes += hll.memory_usage();
375 }
376 }
377
378 CardinalityTrackerStats {
379 table_count: tables.len(),
380 column_count: total_columns,
381 memory_bytes: total_bytes,
382 precision: self.precision,
383 standard_error_pct: 1.04 / (1usize << self.precision) as f64 * 100.0,
384 }
385 }
386
387 fn now() -> u64 {
388 std::time::SystemTime::now()
389 .duration_since(std::time::UNIX_EPOCH)
390 .unwrap()
391 .as_micros() as u64
392 }
393}
394
395impl Default for CardinalityTracker {
396 fn default() -> Self {
397 Self::new()
398 }
399}
400
401#[derive(Debug, Clone)]
403pub struct CardinalityTrackerStats {
404 pub table_count: usize,
406 pub column_count: usize,
408 pub memory_bytes: usize,
410 pub precision: u8,
412 pub standard_error_pct: f64,
414}
415
416pub struct OptimizedExecutor {
418 optimizer: QueryOptimizer,
420 table_stats: HashMap<String, TableStats>,
422 cardinality_tracker: Arc<CardinalityTracker>,
424 embedding_provider: Option<Arc<dyn crate::embedding_provider::EmbeddingProvider>>,
426}
427
428#[derive(Debug, Clone, Default)]
430pub struct TableStats {
431 pub row_count: usize,
433 pub column_cardinalities: HashMap<String, usize>,
435 pub has_time_index: bool,
437 pub has_vector_index: bool,
439 pub primary_key: Option<String>,
441}
442
443impl OptimizedExecutor {
444 pub fn new() -> Self {
446 Self {
447 optimizer: QueryOptimizer::new(),
448 table_stats: HashMap::new(),
449 cardinality_tracker: Arc::new(CardinalityTracker::new()),
450 embedding_provider: None,
451 }
452 }
453
454 pub fn with_cost_model(cost_model: CostModel) -> Self {
456 Self {
457 optimizer: QueryOptimizer::with_cost_model(cost_model),
458 table_stats: HashMap::new(),
459 cardinality_tracker: Arc::new(CardinalityTracker::new()),
460 embedding_provider: None,
461 }
462 }
463
464 pub fn with_cardinality_tracker(tracker: Arc<CardinalityTracker>) -> Self {
466 Self {
467 optimizer: QueryOptimizer::new(),
468 table_stats: HashMap::new(),
469 cardinality_tracker: tracker,
470 embedding_provider: None,
471 }
472 }
473
474 pub fn set_embedding_provider(
476 &mut self,
477 provider: Arc<dyn crate::embedding_provider::EmbeddingProvider>,
478 ) {
479 self.embedding_provider = Some(provider);
480 }
481
482 pub fn with_embedding_provider(
484 mut self,
485 provider: Arc<dyn crate::embedding_provider::EmbeddingProvider>,
486 ) -> Self {
487 self.embedding_provider = Some(provider);
488 self
489 }
490
491 pub fn cardinality_tracker(&self) -> Arc<CardinalityTracker> {
493 Arc::clone(&self.cardinality_tracker)
494 }
495
496 pub fn update_table_stats(&mut self, table: &str, stats: TableStats) {
498 let row_count = stats.row_count;
499 self.table_stats.insert(table.to_string(), stats);
500 self.optimizer
501 .update_total_edges(row_count, CardinalitySource::Exact);
502 }
503
504 pub fn refresh_stats_from_tracker(&mut self, table: &str) {
508 let cardinalities = self.cardinality_tracker.get_table_cardinalities(table);
509 let row_count = self.cardinality_tracker.get_row_count(table);
510
511 if let Some(stats) = self.table_stats.get_mut(table) {
512 stats.column_cardinalities = cardinalities;
513 if row_count > 0 {
514 stats.row_count = row_count;
515 }
516 } else {
517 self.table_stats.insert(
518 table.to_string(),
519 TableStats {
520 row_count,
521 column_cardinalities: cardinalities,
522 ..Default::default()
523 },
524 );
525 }
526 }
527
528 pub fn update_cardinality_hint(
530 &mut self,
531 table: &str,
532 column: &str,
533 cardinality: usize,
534 _source: CardinalitySource,
535 ) {
536 if let Some(stats) = self.table_stats.get_mut(table) {
537 stats
538 .column_cardinalities
539 .insert(column.to_string(), cardinality);
540 }
541 }
542
543 pub fn plan_select(
545 &self,
546 select: &SelectQuery,
547 _catalog: &Catalog,
548 ) -> Result<OptimizedQueryPlan> {
549 let predicates = self.extract_predicates(select)?;
551
552 let optimizer_plan = self.optimizer.plan_query(&predicates, select.limit);
554
555 let exec_plan = self.build_execution_plan(select, &optimizer_plan)?;
557
558 Ok(OptimizedQueryPlan {
559 table: select.table.clone(),
560 columns: select.columns.clone(),
561 execution_plan: exec_plan,
562 optimizer_plan,
563 predicates,
564 })
565 }
566
567 fn extract_predicates(&self, select: &SelectQuery) -> Result<Vec<QueryPredicate>> {
569 let mut predicates = Vec::new();
570
571 if let Some(where_clause) = &select.where_clause {
572 for condition in &where_clause.conditions {
573 if let Some(pred) = self.condition_to_predicate(&condition.column, &condition.value)
574 {
575 predicates.push(pred);
576 }
577 }
578 }
579
580 Ok(predicates)
581 }
582
583 fn condition_to_predicate(&self, column: &str, value: &SochValue) -> Option<QueryPredicate> {
585 match column {
587 "timestamp" | "created_at" | "updated_at" | "time" => {
589 if let SochValue::UInt(ts) = value {
590 let hour_us = 60 * 60 * 1_000_000u64;
592 return Some(QueryPredicate::TimeRange(*ts, ts + hour_us));
593 }
594 }
595 "project_id" | "project" => {
597 if let SochValue::UInt(id) = value {
598 return Some(QueryPredicate::Project(*id as u16));
599 }
600 }
601 "tenant_id" | "tenant" => {
603 if let SochValue::UInt(id) = value {
604 return Some(QueryPredicate::Tenant(*id as u32));
605 }
606 }
607 "span_type" | "type" => {
609 if let SochValue::Text(s) = value {
610 return Some(QueryPredicate::SpanType(s.clone()));
611 }
612 }
613 _ => {}
614 }
615
616 None
617 }
618
619 fn build_execution_plan(
621 &self,
622 select: &SelectQuery,
623 opt_plan: &OptimizerPlan,
624 ) -> Result<ExecutionPlan> {
625 let mut steps = Vec::new();
626
627 match &opt_plan.index_selection {
629 IndexSelection::LsmScan | IndexSelection::FullScan => {
630 steps.push(ExecutionStep::TableScan {
631 table: select.table.clone(),
632 });
633 }
634 IndexSelection::TimeIndex => {
635 if let Some(QueryOperation::LsmRangeScan { start_us, end_us }) =
637 opt_plan.operations.first()
638 {
639 steps.push(ExecutionStep::TimeIndexScan {
640 table: select.table.clone(),
641 start_us: *start_us,
642 end_us: *end_us,
643 });
644 }
645 }
646 IndexSelection::VectorIndex => {
647 if let Some(QueryOperation::VectorSearch { k }) = opt_plan.operations.first() {
648 let query_text = self.extract_vector_query_text(select);
650 steps.push(ExecutionStep::VectorSearch {
651 table: select.table.clone(),
652 k: *k,
653 query_text,
654 });
655 }
656 }
657 IndexSelection::CausalIndex => {
658 if let Some(QueryOperation::GraphTraversal {
659 direction,
660 max_depth,
661 }) = opt_plan.operations.first()
662 {
663 steps.push(ExecutionStep::GraphTraversal {
664 table: select.table.clone(),
665 direction: *direction,
666 max_depth: *max_depth,
667 });
668 }
669 }
670 IndexSelection::ProjectIndex => {
671 steps.push(ExecutionStep::SecondaryIndexSeek {
672 table: select.table.clone(),
673 index: "project_idx".to_string(),
674 });
675 }
676 IndexSelection::PrimaryKey => {
677 steps.push(ExecutionStep::PrimaryKeyLookup {
678 table: select.table.clone(),
679 });
680 }
681 IndexSelection::Secondary(idx) => {
682 steps.push(ExecutionStep::SecondaryIndexSeek {
683 table: select.table.clone(),
684 index: idx.clone(),
685 });
686 }
687 IndexSelection::MultiIndex(indexes) => {
688 steps.push(ExecutionStep::MultiIndexIntersect {
690 table: select.table.clone(),
691 indexes: indexes.iter().map(|idx| format!("{:?}", idx)).collect(),
692 });
693 }
694 }
695
696 if select.where_clause.is_some() {
698 steps.push(ExecutionStep::Filter {
699 predicate: format!("{:?}", select.where_clause),
700 });
701 }
702
703 if !select.columns.is_empty() && select.columns[0] != "*" {
705 steps.push(ExecutionStep::Project {
706 columns: select.columns.clone(),
707 });
708 }
709
710 if let Some(order_by) = &select.order_by {
712 steps.push(ExecutionStep::Sort {
713 column: order_by.column.clone(),
714 ascending: order_by.direction == crate::soch_ql::SortDirection::Asc,
715 });
716 }
717
718 if let Some(limit) = select.limit {
720 steps.push(ExecutionStep::Limit { count: limit });
721 }
722
723 Ok(ExecutionPlan {
724 steps,
725 estimated_cost: opt_plan.cost.total_cost,
726 estimated_rows: opt_plan.cost.records_returned,
727 })
728 }
729
730 pub fn execute<S: StorageBackend>(
735 &self,
736 plan: &OptimizedQueryPlan,
737 storage: &S,
738 ) -> Result<SochResult> {
739 let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
740 let mut columns_to_return = plan.columns.clone();
741
742 for step in &plan.execution_plan.steps {
744 match step {
745 ExecutionStep::TableScan { table } => {
746 let predicate = plan.execution_plan.steps.iter().find_map(|s| match s {
748 ExecutionStep::Filter { predicate } => Some(predicate.as_str()),
749 _ => None,
750 });
751 rows = storage.table_scan(table, &columns_to_return, predicate)?;
752 }
753 ExecutionStep::PrimaryKeyLookup { table } => {
754 if let Some(key) = self.extract_primary_key_from_predicates(&plan.predicates)
756 && let Some(row) = storage.primary_key_lookup(table, &key)?
757 {
758 rows = vec![row];
759 }
760 }
761 ExecutionStep::SecondaryIndexSeek { table, index } => {
762 if let Some(key) =
764 self.extract_index_key_from_predicates(&plan.predicates, index)
765 {
766 rows = storage.secondary_index_seek(table, index, &key)?;
767 }
768 }
769 ExecutionStep::TimeIndexScan {
770 table,
771 start_us,
772 end_us,
773 } => {
774 rows = storage.time_index_scan(table, *start_us, *end_us)?;
775 }
776 ExecutionStep::VectorSearch {
777 table,
778 k,
779 query_text,
780 } => {
781 let query_embedding = match (query_text, &self.embedding_provider) {
783 (Some(text), Some(provider)) => {
784 provider.embed(text).unwrap_or_else(|e| {
786 tracing::warn!(
787 "Failed to generate embedding for '{}': {}. Using fallback.",
788 text,
789 e
790 );
791 vec![0.0f32; provider.dimension()]
793 })
794 }
795 (Some(_text), None) => {
796 tracing::warn!(
798 "Vector search requested but no embedding provider configured"
799 );
800 vec![0.0f32; 128] }
802 (None, _) => {
803 tracing::warn!("Vector search without query text, using placeholder");
805 vec![0.0f32; 128] }
807 };
808 let results = storage.vector_search(table, &query_embedding, *k)?;
809 rows = results.into_iter().map(|(_, row)| row).collect();
810 }
811 ExecutionStep::GraphTraversal {
812 table,
813 direction: _,
814 max_depth: _,
815 } => {
816 rows = storage.table_scan(table, &columns_to_return, None)?;
818 }
819 ExecutionStep::MultiIndexIntersect { table, indexes } => {
820 let mut result_sets: Vec<Vec<HashMap<String, SochValue>>> = Vec::new();
822 for index in indexes {
823 if let Some(key) =
824 self.extract_index_key_from_predicates(&plan.predicates, index)
825 {
826 result_sets.push(storage.secondary_index_seek(table, index, &key)?);
827 }
828 }
829 if !result_sets.is_empty() {
831 rows = self.intersect_result_sets(result_sets);
832 }
833 }
834 ExecutionStep::Filter { predicate: _ } => {
835 }
838 ExecutionStep::Project { columns } => {
839 columns_to_return = columns.clone();
840 rows = rows
842 .into_iter()
843 .map(|row| {
844 columns
845 .iter()
846 .filter_map(|c| row.get(c).map(|v| (c.clone(), v.clone())))
847 .collect()
848 })
849 .collect();
850 }
851 ExecutionStep::Sort { column, ascending } => {
852 rows.sort_by(|a, b| {
853 let va = a.get(column);
854 let vb = b.get(column);
855 let cmp = Self::compare_values(va, vb);
856 if *ascending { cmp } else { cmp.reverse() }
857 });
858 }
859 ExecutionStep::Limit { count } => {
860 rows.truncate(*count);
861 }
862 }
863 }
864
865 let result_rows: Vec<Vec<SochValue>> = rows
867 .iter()
868 .map(|row| {
869 columns_to_return
870 .iter()
871 .map(|c| row.get(c).cloned().unwrap_or(SochValue::Null))
872 .collect()
873 })
874 .collect();
875
876 Ok(SochResult {
877 table: plan.table.clone(),
878 columns: columns_to_return,
879 rows: result_rows,
880 })
881 }
882
883 fn extract_primary_key_from_predicates(
885 &self,
886 predicates: &[QueryPredicate],
887 ) -> Option<SochValue> {
888 for pred in predicates {
889 if let QueryPredicate::Project(id) = pred {
891 return Some(SochValue::UInt(*id as u64));
892 }
893 }
894 None
895 }
896
897 fn extract_index_key_from_predicates(
899 &self,
900 predicates: &[QueryPredicate],
901 _index: &str,
902 ) -> Option<SochValue> {
903 for pred in predicates {
904 match pred {
905 QueryPredicate::Tenant(id) => return Some(SochValue::UInt(*id as u64)),
906 QueryPredicate::Project(id) => return Some(SochValue::UInt(*id as u64)),
907 QueryPredicate::SpanType(s) => return Some(SochValue::Text(s.clone())),
908 _ => {}
909 }
910 }
911 None
912 }
913
914 fn extract_vector_query_text(&self, select: &SelectQuery) -> Option<String> {
919 use crate::soch_ql::ComparisonOp;
920
921 if let Some(where_clause) = &select.where_clause {
922 for condition in &where_clause.conditions {
923 if matches!(condition.operator, ComparisonOp::SimilarTo) {
924 if let SochValue::Text(query_text) = &condition.value {
926 return Some(query_text.clone());
927 }
928 }
929 }
930 }
931 None
932 }
933
934 fn intersect_result_sets(
936 &self,
937 sets: Vec<Vec<HashMap<String, SochValue>>>,
938 ) -> Vec<HashMap<String, SochValue>> {
939 if sets.is_empty() {
940 return Vec::new();
941 }
942 if sets.len() == 1 {
943 return sets.into_iter().next().unwrap();
944 }
945
946 let mut base = sets.into_iter().next().unwrap();
948 base.truncate(base.len().min(100)); base
951 }
952
953 fn compare_values(a: Option<&SochValue>, b: Option<&SochValue>) -> std::cmp::Ordering {
955 match (a, b) {
956 (None, None) => std::cmp::Ordering::Equal,
957 (None, Some(_)) => std::cmp::Ordering::Less,
958 (Some(_), None) => std::cmp::Ordering::Greater,
959 (Some(va), Some(vb)) => match (va, vb) {
960 (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
961 (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
962 (SochValue::Float(a), SochValue::Float(b)) => {
963 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
964 }
965 (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
966 (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
967 _ => std::cmp::Ordering::Equal,
968 },
969 }
970 }
971
972 pub fn explain(&self, select: &SelectQuery, catalog: &Catalog) -> Result<String> {
974 let plan = self.plan_select(select, catalog)?;
975
976 let mut output = String::new();
977 output.push_str(&format!(
978 "QUERY PLAN (estimated cost: {:.2}, rows: {})\n",
979 plan.optimizer_plan.cost.total_cost, plan.optimizer_plan.cost.records_returned
980 ));
981 output.push_str(&format!(
982 "Index Selection: {:?}\n",
983 plan.optimizer_plan.index_selection
984 ));
985 output.push_str("Execution Steps:\n");
986
987 for (i, step) in plan.execution_plan.steps.iter().enumerate() {
988 output.push_str(&format!(" {}. {:?}\n", i + 1, step));
989 }
990
991 output.push_str("\nCost Breakdown:\n");
992 for (op, cost) in &plan.optimizer_plan.cost.breakdown {
993 output.push_str(&format!(" {:?}: {:.2}\n", op, cost));
994 }
995
996 Ok(output)
997 }
998}
999
1000impl Default for OptimizedExecutor {
1001 fn default() -> Self {
1002 Self::new()
1003 }
1004}
1005
1006#[derive(Debug)]
1008pub struct OptimizedQueryPlan {
1009 pub table: String,
1011 pub columns: Vec<String>,
1013 pub execution_plan: ExecutionPlan,
1015 pub optimizer_plan: OptimizerPlan,
1017 pub predicates: Vec<QueryPredicate>,
1019}
1020
1021#[derive(Debug, Clone)]
1023pub struct ExecutionPlan {
1024 pub steps: Vec<ExecutionStep>,
1026 pub estimated_cost: f64,
1028 pub estimated_rows: usize,
1030}
1031
1032#[derive(Debug, Clone)]
1034pub enum ExecutionStep {
1035 TableScan { table: String },
1037 PrimaryKeyLookup { table: String },
1039 TimeIndexScan {
1041 table: String,
1042 start_us: u64,
1043 end_us: u64,
1044 },
1045 VectorSearch {
1047 table: String,
1048 k: usize,
1049 query_text: Option<String>,
1052 },
1053 GraphTraversal {
1055 table: String,
1056 direction: TraversalDirection,
1057 max_depth: usize,
1058 },
1059 SecondaryIndexSeek { table: String, index: String },
1061 MultiIndexIntersect { table: String, indexes: Vec<String> },
1063 Filter { predicate: String },
1065 Project { columns: Vec<String> },
1067 Sort { column: String, ascending: bool },
1069 Limit { count: usize },
1071}
1072
1073pub struct PlanCache {
1080 cache: HashMap<u64, CachedPlan>,
1082 frequency: HashMap<u64, FrequencyEntry>,
1084 max_entries: usize,
1086 cache_threshold: usize,
1088 stats: AdaptiveCacheStats,
1090}
1091
1092#[derive(Debug, Clone)]
1094struct CachedPlan {
1095 plan: ExecutionPlan,
1097 hits: usize,
1099 last_used: u64,
1101 time_saved_us: u64,
1103}
1104
1105#[derive(Debug, Clone)]
1107struct FrequencyEntry {
1108 count: usize,
1110 #[allow(dead_code)]
1112 first_seen: u64,
1113 last_seen: u64,
1115 pending_plan: Option<ExecutionPlan>,
1117}
1118
1119#[derive(Debug, Clone, Default)]
1121pub struct AdaptiveCacheStats {
1122 pub entries: usize,
1124 pub total_hits: usize,
1126 pub total_misses: usize,
1128 pub frequency_blocked: usize,
1130 pub promotions: usize,
1132 pub time_saved_us: u64,
1134}
1135
1136impl PlanCache {
1137 pub fn new(max_entries: usize) -> Self {
1139 Self::with_threshold(max_entries, 3)
1140 }
1141
1142 pub fn with_threshold(max_entries: usize, cache_threshold: usize) -> Self {
1144 Self {
1145 cache: HashMap::new(),
1146 frequency: HashMap::new(),
1147 max_entries,
1148 cache_threshold,
1149 stats: AdaptiveCacheStats::default(),
1150 }
1151 }
1152
1153 pub fn hash_query(query: &str) -> u64 {
1155 use std::hash::{Hash, Hasher};
1156 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1157 query.hash(&mut hasher);
1158 hasher.finish()
1159 }
1160
1161 pub fn get(&mut self, query_hash: u64) -> Option<&ExecutionPlan> {
1166 if self.cache.contains_key(&query_hash) {
1168 if let Some(cached) = self.cache.get_mut(&query_hash) {
1169 cached.hits += 1;
1170 cached.last_used = Self::now();
1171 cached.time_saved_us += 1000; self.stats.total_hits += 1;
1173 }
1174 return self.cache.get(&query_hash).map(|c| &c.plan);
1175 }
1176
1177 self.stats.total_misses += 1;
1178
1179 let should_promote = if let Some(freq) = self.frequency.get_mut(&query_hash) {
1181 freq.count += 1;
1182 freq.last_seen = Self::now();
1183 freq.count >= self.cache_threshold && freq.pending_plan.is_some()
1184 } else {
1185 false
1186 };
1187
1188 if should_promote
1189 && let Some(freq) = self.frequency.remove(&query_hash)
1190 && let Some(plan) = freq.pending_plan
1191 {
1192 self.insert_to_cache(query_hash, plan);
1193 self.stats.promotions += 1;
1194 return self.cache.get(&query_hash).map(|c| &c.plan);
1195 }
1196
1197 None
1198 }
1199
1200 pub fn put(&mut self, query_hash: u64, plan: ExecutionPlan) {
1204 let now = Self::now();
1205
1206 if let Some(freq) = self.frequency.get_mut(&query_hash) {
1208 freq.count += 1;
1209 freq.last_seen = now;
1210 freq.pending_plan = Some(plan.clone());
1211
1212 if freq.count >= self.cache_threshold {
1214 self.promote_to_cache(query_hash, plan);
1215 self.stats.promotions += 1;
1216 } else {
1217 self.stats.frequency_blocked += 1;
1218 }
1219 } else {
1220 self.frequency.insert(
1222 query_hash,
1223 FrequencyEntry {
1224 count: 1,
1225 first_seen: now,
1226 last_seen: now,
1227 pending_plan: Some(plan),
1228 },
1229 );
1230 self.stats.frequency_blocked += 1;
1231 }
1232
1233 self.cleanup_frequency_tracker();
1235 }
1236
1237 pub fn force_put(&mut self, query_hash: u64, plan: ExecutionPlan) {
1239 self.insert_to_cache(query_hash, plan);
1240 self.frequency.remove(&query_hash);
1241 }
1242
1243 fn insert_to_cache(&mut self, query_hash: u64, plan: ExecutionPlan) {
1245 if self.cache.len() >= self.max_entries {
1247 self.evict_lru();
1248 }
1249
1250 self.cache.insert(
1251 query_hash,
1252 CachedPlan {
1253 plan,
1254 hits: 0,
1255 last_used: Self::now(),
1256 time_saved_us: 0,
1257 },
1258 );
1259
1260 self.stats.entries = self.cache.len();
1261 }
1262
1263 fn promote_to_cache(&mut self, query_hash: u64, plan: ExecutionPlan) {
1265 self.insert_to_cache(query_hash, plan);
1266 self.frequency.remove(&query_hash);
1267 }
1268
1269 fn evict_lru(&mut self) {
1271 if let Some((&key, _)) = self.cache.iter().min_by_key(|(_, v)| v.last_used) {
1272 self.cache.remove(&key);
1273 }
1274 }
1275
1276 fn cleanup_frequency_tracker(&mut self) {
1278 let now = Self::now();
1279 let max_age = 60 * 1_000_000; self.frequency.retain(|_, v| now - v.last_seen < max_age);
1282 }
1283
1284 pub fn clear(&mut self) {
1286 self.cache.clear();
1287 self.frequency.clear();
1288 self.stats = AdaptiveCacheStats::default();
1289 }
1290
1291 pub fn stats(&self) -> CacheStats {
1293 CacheStats {
1294 entries: self.cache.len(),
1295 total_hits: self.stats.total_hits,
1296 }
1297 }
1298
1299 pub fn adaptive_stats(&self) -> &AdaptiveCacheStats {
1301 &self.stats
1302 }
1303
1304 fn now() -> u64 {
1305 std::time::SystemTime::now()
1306 .duration_since(std::time::UNIX_EPOCH)
1307 .unwrap()
1308 .as_micros() as u64
1309 }
1310}
1311
1312#[derive(Debug, Clone, Default)]
1314pub struct CacheStats {
1315 pub entries: usize,
1317 pub total_hits: usize,
1319}
1320
1321#[cfg(test)]
1322mod tests {
1323 use super::*;
1324 use crate::soch_ql::{Condition, LogicalOp, OrderBy, SortDirection};
1325
1326 #[test]
1327 fn test_predicate_extraction() {
1328 let executor = OptimizedExecutor::new();
1329
1330 let select = SelectQuery {
1331 table: "events".to_string(),
1332 columns: vec!["*".to_string()],
1333 where_clause: Some(WhereClause {
1334 conditions: vec![Condition {
1335 column: "timestamp".to_string(),
1336 operator: ComparisonOp::Ge,
1337 value: SochValue::UInt(1700000000000000),
1338 }],
1339 operator: LogicalOp::And,
1340 }),
1341 order_by: None,
1342 limit: None,
1343 offset: None,
1344 };
1345
1346 let predicates = executor.extract_predicates(&select).unwrap();
1347 assert_eq!(predicates.len(), 1);
1348 assert!(matches!(predicates[0], QueryPredicate::TimeRange(_, _)));
1349 }
1350
1351 #[test]
1352 fn test_plan_with_time_index() {
1353 let mut executor = OptimizedExecutor::new();
1354 executor.update_table_stats(
1355 "events",
1356 TableStats {
1357 row_count: 1_000_000,
1358 has_time_index: true,
1359 ..Default::default()
1360 },
1361 );
1362
1363 let select = SelectQuery {
1364 table: "events".to_string(),
1365 columns: vec!["id".to_string(), "data".to_string()],
1366 where_clause: Some(WhereClause {
1367 conditions: vec![Condition {
1368 column: "timestamp".to_string(),
1369 operator: ComparisonOp::Ge,
1370 value: SochValue::UInt(1700000000000000),
1371 }],
1372 operator: LogicalOp::And,
1373 }),
1374 order_by: None,
1375 limit: Some(100),
1376 offset: None,
1377 };
1378
1379 let catalog = Catalog::new("test");
1380 let plan = executor.plan_select(&select, &catalog).unwrap();
1381
1382 assert!(plan.execution_plan.estimated_cost > 0.0);
1383 }
1384
1385 #[test]
1386 fn test_plan_cache() {
1387 let mut cache = PlanCache::new(100);
1388
1389 let plan = ExecutionPlan {
1390 steps: vec![ExecutionStep::TableScan {
1391 table: "test".to_string(),
1392 }],
1393 estimated_cost: 100.0,
1394 estimated_rows: 1000,
1395 };
1396
1397 let query = "SELECT * FROM test";
1398 let hash = PlanCache::hash_query(query);
1399
1400 assert!(cache.get(hash).is_none());
1402
1403 cache.put(hash, plan.clone());
1406 assert!(cache.get(hash).is_none());
1408
1409 cache.put(hash, plan);
1411 assert!(cache.get(hash).is_some());
1413
1414 let stats = cache.stats();
1415 assert_eq!(stats.entries, 1);
1416 assert_eq!(stats.total_hits, 1);
1417 }
1418
1419 #[test]
1420 fn test_force_cache() {
1421 let mut cache = PlanCache::new(100);
1422
1423 let plan = ExecutionPlan {
1424 steps: vec![ExecutionStep::TableScan {
1425 table: "test".to_string(),
1426 }],
1427 estimated_cost: 100.0,
1428 estimated_rows: 1000,
1429 };
1430
1431 let hash = PlanCache::hash_query("SELECT * FROM test2");
1432
1433 cache.force_put(hash, plan);
1435 assert!(cache.get(hash).is_some());
1436 }
1437
1438 #[test]
1439 fn test_explain() {
1440 let executor = OptimizedExecutor::new();
1441
1442 let select = SelectQuery {
1443 table: "users".to_string(),
1444 columns: vec!["id".to_string(), "name".to_string()],
1445 where_clause: None,
1446 order_by: Some(OrderBy {
1447 column: "id".to_string(),
1448 direction: SortDirection::Asc,
1449 }),
1450 limit: Some(10),
1451 offset: None,
1452 };
1453
1454 let catalog = Catalog::new("test");
1455 let explain = executor.explain(&select, &catalog).unwrap();
1456
1457 assert!(explain.contains("QUERY PLAN"));
1458 assert!(explain.contains("Execution Steps"));
1459 }
1460
1461 #[test]
1466 fn test_cardinality_tracker_basic() {
1467 let tracker = CardinalityTracker::new();
1468
1469 for i in 0u64..1000 {
1471 tracker.observe("events", "user_id", &i);
1472 }
1473
1474 let estimate = tracker.estimate("events", "user_id").unwrap();
1475
1476 let error = (estimate.distinct as f64 - 1000.0).abs() / 1000.0;
1478 assert!(
1479 error < 0.05,
1480 "Cardinality error {}% exceeds 5%",
1481 error * 100.0
1482 );
1483 assert!(estimate.error_pct < 1.0, "Standard error should be < 1%");
1484 }
1485
1486 #[test]
1487 fn test_cardinality_tracker_multiple_columns() {
1488 let tracker = CardinalityTracker::new();
1489
1490 for i in 0u64..10_000 {
1492 tracker.observe("events", "span_id", &i);
1493 }
1494
1495 for i in 0u64..1000 {
1497 tracker.observe("events", "project_id", &(i % 10));
1498 }
1499
1500 let span_estimate = tracker.estimate("events", "span_id").unwrap();
1501 let project_estimate = tracker.estimate("events", "project_id").unwrap();
1502
1503 let span_error = (span_estimate.distinct as f64 - 10000.0).abs() / 10000.0;
1505 assert!(span_error < 0.05, "span_id error {}%", span_error * 100.0);
1506
1507 let project_error = (project_estimate.distinct as f64 - 10.0).abs() / 10.0;
1509 assert!(
1510 project_error < 0.20,
1511 "project_id error {}%",
1512 project_error * 100.0
1513 );
1514 }
1515
1516 #[test]
1517 fn test_cardinality_drift_detection() {
1518 let tracker = CardinalityTracker::new();
1519
1520 for i in 0u64..100 {
1522 tracker.observe("events", "user_id", &i);
1523 }
1524
1525 let mut cached = std::collections::HashMap::new();
1526 cached.insert("user_id".to_string(), 100usize);
1527
1528 assert!(!tracker.has_cardinality_drift("events", &cached));
1530
1531 for i in 100u64..200 {
1533 tracker.observe("events", "user_id", &i);
1534 }
1535
1536 assert!(tracker.has_cardinality_drift("events", &cached));
1538 }
1539
1540 #[test]
1541 fn test_cardinality_tracker_memory() {
1542 let tracker = CardinalityTracker::new();
1543
1544 for i in 0u64..1000 {
1546 tracker.observe("table1", "col1", &i);
1547 tracker.observe("table1", "col2", &i);
1548 tracker.observe("table2", "col1", &i);
1549 }
1550
1551 let stats = tracker.memory_usage();
1552 assert_eq!(stats.table_count, 2);
1553 assert_eq!(stats.column_count, 3);
1554 assert!(stats.memory_bytes > 0);
1555 assert!(stats.standard_error_pct < 1.0);
1556 }
1557
1558 #[test]
1559 fn test_executor_with_cardinality_tracker() {
1560 let tracker = Arc::new(CardinalityTracker::new());
1561
1562 for i in 0u64..500 {
1564 tracker.observe("events", "user_id", &i);
1565 tracker.observe("events", "span_id", &(i * 2));
1566 }
1567 tracker.increment_row_count("events", 500);
1568
1569 let mut executor = OptimizedExecutor::with_cardinality_tracker(Arc::clone(&tracker));
1571
1572 executor.refresh_stats_from_tracker("events");
1574
1575 let stats = &executor.table_stats.get("events").unwrap();
1577 assert_eq!(stats.row_count, 500);
1578 assert!(stats.column_cardinalities.contains_key("user_id"));
1579 assert!(stats.column_cardinalities.contains_key("span_id"));
1580
1581 let user_card = stats.column_cardinalities.get("user_id").unwrap();
1583 let error = (*user_card as f64 - 500.0).abs() / 500.0;
1584 assert!(error < 0.05, "user_id cardinality error {}%", error * 100.0);
1585 }
1586}