1use crate::query_optimizer::{
71 CardinalitySource, CostModel, IndexSelection, QueryOperation, QueryOptimizer,
72 QueryPlan as OptimizerPlan, QueryPredicate, TraversalDirection,
73};
74#[cfg(test)]
75use crate::soch_ql::{ComparisonOp, WhereClause};
76use crate::soch_ql::{SelectQuery, SochResult, SochValue};
77use parking_lot::RwLock;
78use std::collections::HashMap;
79use std::sync::Arc;
80use sochdb_core::{Catalog, Result};
81use sochdb_storage::sketches::HyperLogLog;
82
83pub trait StorageBackend: Send + Sync {
92 fn table_scan(
94 &self,
95 table: &str,
96 columns: &[String],
97 predicate: Option<&str>,
98 ) -> Result<Vec<HashMap<String, SochValue>>>;
99
100 fn primary_key_lookup(
102 &self,
103 table: &str,
104 key: &SochValue,
105 ) -> Result<Option<HashMap<String, SochValue>>>;
106
107 fn secondary_index_seek(
109 &self,
110 table: &str,
111 index: &str,
112 key: &SochValue,
113 ) -> Result<Vec<HashMap<String, SochValue>>>;
114
115 fn time_index_scan(
117 &self,
118 table: &str,
119 start_us: u64,
120 end_us: u64,
121 ) -> Result<Vec<HashMap<String, SochValue>>>;
122
123 fn vector_search(
125 &self,
126 table: &str,
127 query: &[f32],
128 k: usize,
129 ) -> Result<Vec<(f32, HashMap<String, SochValue>)>>;
130
131 fn row_count(&self, table: &str) -> usize;
133}
134
135pub struct CardinalityTracker {
158 precision: u8,
160 tables: RwLock<HashMap<String, TableCardinalityTracker>>,
162 drift_threshold: f64,
164}
165
166struct TableCardinalityTracker {
168 columns: HashMap<String, HyperLogLog>,
170 row_count: usize,
172 last_update_us: u64,
174}
175
176#[derive(Debug, Clone)]
178pub struct CardinalityEstimate {
179 pub distinct: usize,
181 pub error_pct: f64,
183 pub source: CardinalitySource,
185 pub is_fresh: bool,
187}
188
189impl CardinalityTracker {
190 pub fn new() -> Self {
192 Self::with_precision(14)
193 }
194
195 pub fn with_precision(precision: u8) -> Self {
203 assert!((4..=18).contains(&precision), "Precision must be 4-18");
204 Self {
205 precision,
206 tables: RwLock::new(HashMap::new()),
207 drift_threshold: 0.20, }
209 }
210
211 pub fn set_drift_threshold(&mut self, threshold: f64) {
213 self.drift_threshold = threshold;
214 }
215
216 pub fn observe<T: std::hash::Hash>(&self, table: &str, column: &str, value: &T) {
220 let mut tables = self.tables.write();
221 let tracker = tables
222 .entry(table.to_string())
223 .or_insert_with(|| TableCardinalityTracker {
224 columns: HashMap::new(),
225 row_count: 0,
226 last_update_us: Self::now(),
227 });
228
229 let hll = tracker
230 .columns
231 .entry(column.to_string())
232 .or_insert_with(|| HyperLogLog::new(self.precision));
233
234 hll.add(value);
235 tracker.last_update_us = Self::now();
236 }
237
238 pub fn observe_batch<T: std::hash::Hash>(
240 &self,
241 table: &str,
242 column: &str,
243 values: impl Iterator<Item = T>,
244 ) {
245 let mut tables = self.tables.write();
246 let tracker = tables
247 .entry(table.to_string())
248 .or_insert_with(|| TableCardinalityTracker {
249 columns: HashMap::new(),
250 row_count: 0,
251 last_update_us: Self::now(),
252 });
253
254 let hll = tracker
255 .columns
256 .entry(column.to_string())
257 .or_insert_with(|| HyperLogLog::new(self.precision));
258
259 for value in values {
260 hll.add(&value);
261 }
262 tracker.last_update_us = Self::now();
263 }
264
265 pub fn increment_row_count(&self, table: &str, delta: usize) {
267 let mut tables = self.tables.write();
268 if let Some(tracker) = tables.get_mut(table) {
269 tracker.row_count = tracker.row_count.saturating_add(delta);
270 }
271 }
272
273 pub fn estimate(&self, table: &str, column: &str) -> Option<CardinalityEstimate> {
277 let tables = self.tables.read();
278 let tracker = tables.get(table)?;
279 let hll = tracker.columns.get(column)?;
280
281 let distinct = hll.cardinality() as usize;
282 let error_pct = hll.standard_error();
283 let freshness_us = Self::now().saturating_sub(tracker.last_update_us);
284
285 Some(CardinalityEstimate {
286 distinct,
287 error_pct,
288 source: CardinalitySource::HyperLogLog,
289 is_fresh: freshness_us < 60_000_000,
291 })
292 }
293
294 pub fn get_table_cardinalities(&self, table: &str) -> HashMap<String, usize> {
296 let tables = self.tables.read();
297 tables
298 .get(table)
299 .map(|tracker| {
300 tracker
301 .columns
302 .iter()
303 .map(|(col, hll)| (col.clone(), hll.cardinality() as usize))
304 .collect()
305 })
306 .unwrap_or_default()
307 }
308
309 pub fn get_row_count(&self, table: &str) -> usize {
311 self.tables
312 .read()
313 .get(table)
314 .map(|t| t.row_count)
315 .unwrap_or(0)
316 }
317
318 pub fn has_cardinality_drift(
323 &self,
324 table: &str,
325 cached_cardinalities: &HashMap<String, usize>,
326 ) -> bool {
327 let tables = self.tables.read();
328 let tracker = match tables.get(table) {
329 Some(t) => t,
330 None => return true, };
332
333 for (column, &cached) in cached_cardinalities {
334 if let Some(hll) = tracker.columns.get(column) {
335 let current = hll.cardinality();
336 if cached == 0 {
337 if current > 0 {
338 return true; }
340 } else {
341 let drift = (current as f64 - cached as f64).abs() / cached as f64;
342 if drift > self.drift_threshold {
343 return true;
344 }
345 }
346 }
347 }
348
349 false
350 }
351
352 pub fn merge(&self, table: &str, column: &str, other_hll: &HyperLogLog) {
354 let mut tables = self.tables.write();
355 if let Some(tracker) = tables.get_mut(table)
356 && let Some(hll) = tracker.columns.get_mut(column)
357 {
358 hll.merge(other_hll);
359 tracker.last_update_us = Self::now();
360 }
361 }
362
363 pub fn clear_table(&self, table: &str) {
365 self.tables.write().remove(table);
366 }
367
368 pub fn memory_usage(&self) -> CardinalityTrackerStats {
370 let tables = self.tables.read();
371 let mut total_columns = 0;
372 let mut total_bytes = 0;
373
374 for tracker in tables.values() {
375 for hll in tracker.columns.values() {
376 total_columns += 1;
377 total_bytes += hll.memory_usage();
378 }
379 }
380
381 CardinalityTrackerStats {
382 table_count: tables.len(),
383 column_count: total_columns,
384 memory_bytes: total_bytes,
385 precision: self.precision,
386 standard_error_pct: 1.04 / (1usize << self.precision) as f64 * 100.0,
387 }
388 }
389
390 fn now() -> u64 {
391 std::time::SystemTime::now()
392 .duration_since(std::time::UNIX_EPOCH)
393 .unwrap()
394 .as_micros() as u64
395 }
396}
397
398impl Default for CardinalityTracker {
399 fn default() -> Self {
400 Self::new()
401 }
402}
403
404#[derive(Debug, Clone)]
406pub struct CardinalityTrackerStats {
407 pub table_count: usize,
409 pub column_count: usize,
411 pub memory_bytes: usize,
413 pub precision: u8,
415 pub standard_error_pct: f64,
417}
418
419pub struct OptimizedExecutor {
421 optimizer: QueryOptimizer,
423 table_stats: HashMap<String, TableStats>,
425 cardinality_tracker: Arc<CardinalityTracker>,
427 embedding_provider: Option<Arc<dyn crate::embedding_provider::EmbeddingProvider>>,
429}
430
431#[derive(Debug, Clone, Default)]
433pub struct TableStats {
434 pub row_count: usize,
436 pub column_cardinalities: HashMap<String, usize>,
438 pub has_time_index: bool,
440 pub has_vector_index: bool,
442 pub primary_key: Option<String>,
444}
445
446impl OptimizedExecutor {
447 pub fn new() -> Self {
449 Self {
450 optimizer: QueryOptimizer::new(),
451 table_stats: HashMap::new(),
452 cardinality_tracker: Arc::new(CardinalityTracker::new()),
453 embedding_provider: None,
454 }
455 }
456
457 pub fn with_cost_model(cost_model: CostModel) -> Self {
459 Self {
460 optimizer: QueryOptimizer::with_cost_model(cost_model),
461 table_stats: HashMap::new(),
462 cardinality_tracker: Arc::new(CardinalityTracker::new()),
463 embedding_provider: None,
464 }
465 }
466
467 pub fn with_cardinality_tracker(tracker: Arc<CardinalityTracker>) -> Self {
469 Self {
470 optimizer: QueryOptimizer::new(),
471 table_stats: HashMap::new(),
472 cardinality_tracker: tracker,
473 embedding_provider: None,
474 }
475 }
476
477 pub fn set_embedding_provider(
479 &mut self,
480 provider: Arc<dyn crate::embedding_provider::EmbeddingProvider>,
481 ) {
482 self.embedding_provider = Some(provider);
483 }
484
485 pub fn with_embedding_provider(
487 mut self,
488 provider: Arc<dyn crate::embedding_provider::EmbeddingProvider>,
489 ) -> Self {
490 self.embedding_provider = Some(provider);
491 self
492 }
493
494 pub fn cardinality_tracker(&self) -> Arc<CardinalityTracker> {
496 Arc::clone(&self.cardinality_tracker)
497 }
498
499 pub fn update_table_stats(&mut self, table: &str, stats: TableStats) {
501 let row_count = stats.row_count;
502 self.table_stats.insert(table.to_string(), stats);
503 self.optimizer
504 .update_total_edges(row_count, CardinalitySource::Exact);
505 }
506
507 pub fn refresh_stats_from_tracker(&mut self, table: &str) {
511 let cardinalities = self.cardinality_tracker.get_table_cardinalities(table);
512 let row_count = self.cardinality_tracker.get_row_count(table);
513
514 if let Some(stats) = self.table_stats.get_mut(table) {
515 stats.column_cardinalities = cardinalities;
516 if row_count > 0 {
517 stats.row_count = row_count;
518 }
519 } else {
520 self.table_stats.insert(
521 table.to_string(),
522 TableStats {
523 row_count,
524 column_cardinalities: cardinalities,
525 ..Default::default()
526 },
527 );
528 }
529 }
530
531 pub fn update_cardinality_hint(
533 &mut self,
534 table: &str,
535 column: &str,
536 cardinality: usize,
537 _source: CardinalitySource,
538 ) {
539 if let Some(stats) = self.table_stats.get_mut(table) {
540 stats
541 .column_cardinalities
542 .insert(column.to_string(), cardinality);
543 }
544 }
545
546 pub fn plan_select(
548 &self,
549 select: &SelectQuery,
550 _catalog: &Catalog,
551 ) -> Result<OptimizedQueryPlan> {
552 let predicates = self.extract_predicates(select)?;
554
555 let optimizer_plan = self.optimizer.plan_query(&predicates, select.limit);
557
558 let exec_plan = self.build_execution_plan(select, &optimizer_plan)?;
560
561 Ok(OptimizedQueryPlan {
562 table: select.table.clone(),
563 columns: select.columns.clone(),
564 execution_plan: exec_plan,
565 optimizer_plan,
566 predicates,
567 })
568 }
569
570 fn extract_predicates(&self, select: &SelectQuery) -> Result<Vec<QueryPredicate>> {
572 let mut predicates = Vec::new();
573
574 if let Some(where_clause) = &select.where_clause {
575 for condition in &where_clause.conditions {
576 if let Some(pred) = self.condition_to_predicate(&condition.column, &condition.value)
577 {
578 predicates.push(pred);
579 }
580 }
581 }
582
583 Ok(predicates)
584 }
585
586 fn condition_to_predicate(&self, column: &str, value: &SochValue) -> Option<QueryPredicate> {
588 match column {
590 "timestamp" | "created_at" | "updated_at" | "time" => {
592 if let SochValue::UInt(ts) = value {
593 let hour_us = 60 * 60 * 1_000_000u64;
595 return Some(QueryPredicate::TimeRange(*ts, ts + hour_us));
596 }
597 }
598 "project_id" | "project" => {
600 if let SochValue::UInt(id) = value {
601 return Some(QueryPredicate::Project(*id as u16));
602 }
603 }
604 "tenant_id" | "tenant" => {
606 if let SochValue::UInt(id) = value {
607 return Some(QueryPredicate::Tenant(*id as u32));
608 }
609 }
610 "span_type" | "type" => {
612 if let SochValue::Text(s) = value {
613 return Some(QueryPredicate::SpanType(s.clone()));
614 }
615 }
616 _ => {}
617 }
618
619 None
620 }
621
622 fn build_execution_plan(
624 &self,
625 select: &SelectQuery,
626 opt_plan: &OptimizerPlan,
627 ) -> Result<ExecutionPlan> {
628 let mut steps = Vec::new();
629
630 match &opt_plan.index_selection {
632 IndexSelection::LsmScan | IndexSelection::FullScan => {
633 steps.push(ExecutionStep::TableScan {
634 table: select.table.clone(),
635 });
636 }
637 IndexSelection::TimeIndex => {
638 if let Some(QueryOperation::LsmRangeScan { start_us, end_us }) =
640 opt_plan.operations.first()
641 {
642 steps.push(ExecutionStep::TimeIndexScan {
643 table: select.table.clone(),
644 start_us: *start_us,
645 end_us: *end_us,
646 });
647 }
648 }
649 IndexSelection::VectorIndex => {
650 if let Some(QueryOperation::VectorSearch { k }) = opt_plan.operations.first() {
651 let query_text = self.extract_vector_query_text(select);
653 steps.push(ExecutionStep::VectorSearch {
654 table: select.table.clone(),
655 k: *k,
656 query_text,
657 });
658 }
659 }
660 IndexSelection::CausalIndex => {
661 if let Some(QueryOperation::GraphTraversal {
662 direction,
663 max_depth,
664 }) = opt_plan.operations.first()
665 {
666 steps.push(ExecutionStep::GraphTraversal {
667 table: select.table.clone(),
668 direction: *direction,
669 max_depth: *max_depth,
670 });
671 }
672 }
673 IndexSelection::ProjectIndex => {
674 steps.push(ExecutionStep::SecondaryIndexSeek {
675 table: select.table.clone(),
676 index: "project_idx".to_string(),
677 });
678 }
679 IndexSelection::PrimaryKey => {
680 steps.push(ExecutionStep::PrimaryKeyLookup {
681 table: select.table.clone(),
682 });
683 }
684 IndexSelection::Secondary(idx) => {
685 steps.push(ExecutionStep::SecondaryIndexSeek {
686 table: select.table.clone(),
687 index: idx.clone(),
688 });
689 }
690 IndexSelection::MultiIndex(indexes) => {
691 steps.push(ExecutionStep::MultiIndexIntersect {
693 table: select.table.clone(),
694 indexes: indexes.iter().map(|idx| format!("{:?}", idx)).collect(),
695 });
696 }
697 }
698
699 if select.where_clause.is_some() {
701 steps.push(ExecutionStep::Filter {
702 predicate: format!("{:?}", select.where_clause),
703 });
704 }
705
706 if !select.columns.is_empty() && select.columns[0] != "*" {
708 steps.push(ExecutionStep::Project {
709 columns: select.columns.clone(),
710 });
711 }
712
713 if let Some(order_by) = &select.order_by {
715 steps.push(ExecutionStep::Sort {
716 column: order_by.column.clone(),
717 ascending: order_by.direction == crate::soch_ql::SortDirection::Asc,
718 });
719 }
720
721 if let Some(limit) = select.limit {
723 steps.push(ExecutionStep::Limit { count: limit });
724 }
725
726 Ok(ExecutionPlan {
727 steps,
728 estimated_cost: opt_plan.cost.total_cost,
729 estimated_rows: opt_plan.cost.records_returned,
730 })
731 }
732
733 pub fn execute<S: StorageBackend>(
738 &self,
739 plan: &OptimizedQueryPlan,
740 storage: &S,
741 ) -> Result<SochResult> {
742 let mut rows: Vec<HashMap<String, SochValue>> = Vec::new();
743 let mut columns_to_return = plan.columns.clone();
744
745 for step in &plan.execution_plan.steps {
747 match step {
748 ExecutionStep::TableScan { table } => {
749 let predicate = plan.execution_plan.steps.iter().find_map(|s| match s {
751 ExecutionStep::Filter { predicate } => Some(predicate.as_str()),
752 _ => None,
753 });
754 rows = storage.table_scan(table, &columns_to_return, predicate)?;
755 }
756 ExecutionStep::PrimaryKeyLookup { table } => {
757 if let Some(key) = self.extract_primary_key_from_predicates(&plan.predicates)
759 && let Some(row) = storage.primary_key_lookup(table, &key)?
760 {
761 rows = vec![row];
762 }
763 }
764 ExecutionStep::SecondaryIndexSeek { table, index } => {
765 if let Some(key) =
767 self.extract_index_key_from_predicates(&plan.predicates, index)
768 {
769 rows = storage.secondary_index_seek(table, index, &key)?;
770 }
771 }
772 ExecutionStep::TimeIndexScan {
773 table,
774 start_us,
775 end_us,
776 } => {
777 rows = storage.time_index_scan(table, *start_us, *end_us)?;
778 }
779 ExecutionStep::VectorSearch {
780 table,
781 k,
782 query_text,
783 } => {
784 let query_embedding = match (query_text, &self.embedding_provider) {
786 (Some(text), Some(provider)) => {
787 provider.embed(text).unwrap_or_else(|e| {
789 tracing::warn!(
790 "Failed to generate embedding for '{}': {}. Using fallback.",
791 text,
792 e
793 );
794 vec![0.0f32; provider.dimension()]
796 })
797 }
798 (Some(_text), None) => {
799 tracing::warn!(
801 "Vector search requested but no embedding provider configured"
802 );
803 vec![0.0f32; 128] }
805 (None, _) => {
806 tracing::warn!("Vector search without query text, using placeholder");
808 vec![0.0f32; 128] }
810 };
811 let results = storage.vector_search(table, &query_embedding, *k)?;
812 rows = results.into_iter().map(|(_, row)| row).collect();
813 }
814 ExecutionStep::GraphTraversal {
815 table,
816 direction: _,
817 max_depth: _,
818 } => {
819 rows = storage.table_scan(table, &columns_to_return, None)?;
821 }
822 ExecutionStep::MultiIndexIntersect { table, indexes } => {
823 let mut result_sets: Vec<Vec<HashMap<String, SochValue>>> = Vec::new();
825 for index in indexes {
826 if let Some(key) =
827 self.extract_index_key_from_predicates(&plan.predicates, index)
828 {
829 result_sets.push(storage.secondary_index_seek(table, index, &key)?);
830 }
831 }
832 if !result_sets.is_empty() {
834 rows = self.intersect_result_sets(result_sets);
835 }
836 }
837 ExecutionStep::Filter { predicate: _ } => {
838 }
841 ExecutionStep::Project { columns } => {
842 columns_to_return = columns.clone();
843 rows = rows
845 .into_iter()
846 .map(|row| {
847 columns
848 .iter()
849 .filter_map(|c| row.get(c).map(|v| (c.clone(), v.clone())))
850 .collect()
851 })
852 .collect();
853 }
854 ExecutionStep::Sort { column, ascending } => {
855 rows.sort_by(|a, b| {
856 let va = a.get(column);
857 let vb = b.get(column);
858 let cmp = Self::compare_values(va, vb);
859 if *ascending { cmp } else { cmp.reverse() }
860 });
861 }
862 ExecutionStep::Limit { count } => {
863 rows.truncate(*count);
864 }
865 }
866 }
867
868 let result_rows: Vec<Vec<SochValue>> = rows
870 .iter()
871 .map(|row| {
872 columns_to_return
873 .iter()
874 .map(|c| row.get(c).cloned().unwrap_or(SochValue::Null))
875 .collect()
876 })
877 .collect();
878
879 Ok(SochResult {
880 table: plan.table.clone(),
881 columns: columns_to_return,
882 rows: result_rows,
883 })
884 }
885
886 fn extract_primary_key_from_predicates(
888 &self,
889 predicates: &[QueryPredicate],
890 ) -> Option<SochValue> {
891 for pred in predicates {
892 if let QueryPredicate::Project(id) = pred {
894 return Some(SochValue::UInt(*id as u64));
895 }
896 }
897 None
898 }
899
900 fn extract_index_key_from_predicates(
902 &self,
903 predicates: &[QueryPredicate],
904 _index: &str,
905 ) -> Option<SochValue> {
906 for pred in predicates {
907 match pred {
908 QueryPredicate::Tenant(id) => return Some(SochValue::UInt(*id as u64)),
909 QueryPredicate::Project(id) => return Some(SochValue::UInt(*id as u64)),
910 QueryPredicate::SpanType(s) => return Some(SochValue::Text(s.clone())),
911 _ => {}
912 }
913 }
914 None
915 }
916
917 fn extract_vector_query_text(&self, select: &SelectQuery) -> Option<String> {
922 use crate::soch_ql::ComparisonOp;
923
924 if let Some(where_clause) = &select.where_clause {
925 for condition in &where_clause.conditions {
926 if matches!(condition.operator, ComparisonOp::SimilarTo) {
927 if let SochValue::Text(query_text) = &condition.value {
929 return Some(query_text.clone());
930 }
931 }
932 }
933 }
934 None
935 }
936
937 fn intersect_result_sets(
939 &self,
940 sets: Vec<Vec<HashMap<String, SochValue>>>,
941 ) -> Vec<HashMap<String, SochValue>> {
942 if sets.is_empty() {
943 return Vec::new();
944 }
945 if sets.len() == 1 {
946 return sets.into_iter().next().unwrap();
947 }
948
949 let mut base = sets.into_iter().next().unwrap();
951 base.truncate(base.len().min(100)); base
954 }
955
956 fn compare_values(a: Option<&SochValue>, b: Option<&SochValue>) -> std::cmp::Ordering {
958 match (a, b) {
959 (None, None) => std::cmp::Ordering::Equal,
960 (None, Some(_)) => std::cmp::Ordering::Less,
961 (Some(_), None) => std::cmp::Ordering::Greater,
962 (Some(va), Some(vb)) => match (va, vb) {
963 (SochValue::Int(a), SochValue::Int(b)) => a.cmp(b),
964 (SochValue::UInt(a), SochValue::UInt(b)) => a.cmp(b),
965 (SochValue::Float(a), SochValue::Float(b)) => {
966 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
967 }
968 (SochValue::Text(a), SochValue::Text(b)) => a.cmp(b),
969 (SochValue::Bool(a), SochValue::Bool(b)) => a.cmp(b),
970 _ => std::cmp::Ordering::Equal,
971 },
972 }
973 }
974
975 pub fn explain(&self, select: &SelectQuery, catalog: &Catalog) -> Result<String> {
977 let plan = self.plan_select(select, catalog)?;
978
979 let mut output = String::new();
980 output.push_str(&format!(
981 "QUERY PLAN (estimated cost: {:.2}, rows: {})\n",
982 plan.optimizer_plan.cost.total_cost, plan.optimizer_plan.cost.records_returned
983 ));
984 output.push_str(&format!(
985 "Index Selection: {:?}\n",
986 plan.optimizer_plan.index_selection
987 ));
988 output.push_str("Execution Steps:\n");
989
990 for (i, step) in plan.execution_plan.steps.iter().enumerate() {
991 output.push_str(&format!(" {}. {:?}\n", i + 1, step));
992 }
993
994 output.push_str("\nCost Breakdown:\n");
995 for (op, cost) in &plan.optimizer_plan.cost.breakdown {
996 output.push_str(&format!(" {:?}: {:.2}\n", op, cost));
997 }
998
999 Ok(output)
1000 }
1001}
1002
1003impl Default for OptimizedExecutor {
1004 fn default() -> Self {
1005 Self::new()
1006 }
1007}
1008
1009#[derive(Debug)]
1011pub struct OptimizedQueryPlan {
1012 pub table: String,
1014 pub columns: Vec<String>,
1016 pub execution_plan: ExecutionPlan,
1018 pub optimizer_plan: OptimizerPlan,
1020 pub predicates: Vec<QueryPredicate>,
1022}
1023
1024#[derive(Debug, Clone)]
1026pub struct ExecutionPlan {
1027 pub steps: Vec<ExecutionStep>,
1029 pub estimated_cost: f64,
1031 pub estimated_rows: usize,
1033}
1034
1035#[derive(Debug, Clone)]
1037pub enum ExecutionStep {
1038 TableScan { table: String },
1040 PrimaryKeyLookup { table: String },
1042 TimeIndexScan {
1044 table: String,
1045 start_us: u64,
1046 end_us: u64,
1047 },
1048 VectorSearch {
1050 table: String,
1051 k: usize,
1052 query_text: Option<String>,
1055 },
1056 GraphTraversal {
1058 table: String,
1059 direction: TraversalDirection,
1060 max_depth: usize,
1061 },
1062 SecondaryIndexSeek { table: String, index: String },
1064 MultiIndexIntersect { table: String, indexes: Vec<String> },
1066 Filter { predicate: String },
1068 Project { columns: Vec<String> },
1070 Sort { column: String, ascending: bool },
1072 Limit { count: usize },
1074}
1075
1076pub struct PlanCache {
1083 cache: HashMap<u64, CachedPlan>,
1085 frequency: HashMap<u64, FrequencyEntry>,
1087 max_entries: usize,
1089 cache_threshold: usize,
1091 stats: AdaptiveCacheStats,
1093}
1094
1095#[derive(Debug, Clone)]
1097struct CachedPlan {
1098 plan: ExecutionPlan,
1100 hits: usize,
1102 last_used: u64,
1104 time_saved_us: u64,
1106}
1107
1108#[derive(Debug, Clone)]
1110struct FrequencyEntry {
1111 count: usize,
1113 #[allow(dead_code)]
1115 first_seen: u64,
1116 last_seen: u64,
1118 pending_plan: Option<ExecutionPlan>,
1120}
1121
1122#[derive(Debug, Clone, Default)]
1124pub struct AdaptiveCacheStats {
1125 pub entries: usize,
1127 pub total_hits: usize,
1129 pub total_misses: usize,
1131 pub frequency_blocked: usize,
1133 pub promotions: usize,
1135 pub time_saved_us: u64,
1137}
1138
1139impl PlanCache {
1140 pub fn new(max_entries: usize) -> Self {
1142 Self::with_threshold(max_entries, 3)
1143 }
1144
1145 pub fn with_threshold(max_entries: usize, cache_threshold: usize) -> Self {
1147 Self {
1148 cache: HashMap::new(),
1149 frequency: HashMap::new(),
1150 max_entries,
1151 cache_threshold,
1152 stats: AdaptiveCacheStats::default(),
1153 }
1154 }
1155
1156 pub fn hash_query(query: &str) -> u64 {
1158 use std::hash::{Hash, Hasher};
1159 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1160 query.hash(&mut hasher);
1161 hasher.finish()
1162 }
1163
1164 pub fn get(&mut self, query_hash: u64) -> Option<&ExecutionPlan> {
1169 if self.cache.contains_key(&query_hash) {
1171 if let Some(cached) = self.cache.get_mut(&query_hash) {
1172 cached.hits += 1;
1173 cached.last_used = Self::now();
1174 cached.time_saved_us += 1000; self.stats.total_hits += 1;
1176 }
1177 return self.cache.get(&query_hash).map(|c| &c.plan);
1178 }
1179
1180 self.stats.total_misses += 1;
1181
1182 let should_promote = if let Some(freq) = self.frequency.get_mut(&query_hash) {
1184 freq.count += 1;
1185 freq.last_seen = Self::now();
1186 freq.count >= self.cache_threshold && freq.pending_plan.is_some()
1187 } else {
1188 false
1189 };
1190
1191 if should_promote
1192 && let Some(freq) = self.frequency.remove(&query_hash)
1193 && let Some(plan) = freq.pending_plan
1194 {
1195 self.insert_to_cache(query_hash, plan);
1196 self.stats.promotions += 1;
1197 return self.cache.get(&query_hash).map(|c| &c.plan);
1198 }
1199
1200 None
1201 }
1202
1203 pub fn put(&mut self, query_hash: u64, plan: ExecutionPlan) {
1207 let now = Self::now();
1208
1209 if let Some(freq) = self.frequency.get_mut(&query_hash) {
1211 freq.count += 1;
1212 freq.last_seen = now;
1213 freq.pending_plan = Some(plan.clone());
1214
1215 if freq.count >= self.cache_threshold {
1217 self.promote_to_cache(query_hash, plan);
1218 self.stats.promotions += 1;
1219 } else {
1220 self.stats.frequency_blocked += 1;
1221 }
1222 } else {
1223 self.frequency.insert(
1225 query_hash,
1226 FrequencyEntry {
1227 count: 1,
1228 first_seen: now,
1229 last_seen: now,
1230 pending_plan: Some(plan),
1231 },
1232 );
1233 self.stats.frequency_blocked += 1;
1234 }
1235
1236 self.cleanup_frequency_tracker();
1238 }
1239
1240 pub fn force_put(&mut self, query_hash: u64, plan: ExecutionPlan) {
1242 self.insert_to_cache(query_hash, plan);
1243 self.frequency.remove(&query_hash);
1244 }
1245
1246 fn insert_to_cache(&mut self, query_hash: u64, plan: ExecutionPlan) {
1248 if self.cache.len() >= self.max_entries {
1250 self.evict_lru();
1251 }
1252
1253 self.cache.insert(
1254 query_hash,
1255 CachedPlan {
1256 plan,
1257 hits: 0,
1258 last_used: Self::now(),
1259 time_saved_us: 0,
1260 },
1261 );
1262
1263 self.stats.entries = self.cache.len();
1264 }
1265
1266 fn promote_to_cache(&mut self, query_hash: u64, plan: ExecutionPlan) {
1268 self.insert_to_cache(query_hash, plan);
1269 self.frequency.remove(&query_hash);
1270 }
1271
1272 fn evict_lru(&mut self) {
1274 if let Some((&key, _)) = self.cache.iter().min_by_key(|(_, v)| v.last_used) {
1275 self.cache.remove(&key);
1276 }
1277 }
1278
1279 fn cleanup_frequency_tracker(&mut self) {
1281 let now = Self::now();
1282 let max_age = 60 * 1_000_000; self.frequency.retain(|_, v| now - v.last_seen < max_age);
1285 }
1286
1287 pub fn clear(&mut self) {
1289 self.cache.clear();
1290 self.frequency.clear();
1291 self.stats = AdaptiveCacheStats::default();
1292 }
1293
1294 pub fn stats(&self) -> CacheStats {
1296 CacheStats {
1297 entries: self.cache.len(),
1298 total_hits: self.stats.total_hits,
1299 }
1300 }
1301
1302 pub fn adaptive_stats(&self) -> &AdaptiveCacheStats {
1304 &self.stats
1305 }
1306
1307 fn now() -> u64 {
1308 std::time::SystemTime::now()
1309 .duration_since(std::time::UNIX_EPOCH)
1310 .unwrap()
1311 .as_micros() as u64
1312 }
1313}
1314
1315#[derive(Debug, Clone, Default)]
1317pub struct CacheStats {
1318 pub entries: usize,
1320 pub total_hits: usize,
1322}
1323
1324#[cfg(test)]
1325mod tests {
1326 use super::*;
1327 use crate::soch_ql::{Condition, LogicalOp, OrderBy, SortDirection};
1328
1329 #[test]
1330 fn test_predicate_extraction() {
1331 let executor = OptimizedExecutor::new();
1332
1333 let select = SelectQuery {
1334 table: "events".to_string(),
1335 columns: vec!["*".to_string()],
1336 where_clause: Some(WhereClause {
1337 conditions: vec![Condition {
1338 column: "timestamp".to_string(),
1339 operator: ComparisonOp::Ge,
1340 value: SochValue::UInt(1700000000000000),
1341 }],
1342 operator: LogicalOp::And,
1343 }),
1344 order_by: None,
1345 limit: None,
1346 offset: None,
1347 };
1348
1349 let predicates = executor.extract_predicates(&select).unwrap();
1350 assert_eq!(predicates.len(), 1);
1351 assert!(matches!(predicates[0], QueryPredicate::TimeRange(_, _)));
1352 }
1353
1354 #[test]
1355 fn test_plan_with_time_index() {
1356 let mut executor = OptimizedExecutor::new();
1357 executor.update_table_stats(
1358 "events",
1359 TableStats {
1360 row_count: 1_000_000,
1361 has_time_index: true,
1362 ..Default::default()
1363 },
1364 );
1365
1366 let select = SelectQuery {
1367 table: "events".to_string(),
1368 columns: vec!["id".to_string(), "data".to_string()],
1369 where_clause: Some(WhereClause {
1370 conditions: vec![Condition {
1371 column: "timestamp".to_string(),
1372 operator: ComparisonOp::Ge,
1373 value: SochValue::UInt(1700000000000000),
1374 }],
1375 operator: LogicalOp::And,
1376 }),
1377 order_by: None,
1378 limit: Some(100),
1379 offset: None,
1380 };
1381
1382 let catalog = Catalog::new("test");
1383 let plan = executor.plan_select(&select, &catalog).unwrap();
1384
1385 assert!(plan.execution_plan.estimated_cost > 0.0);
1386 }
1387
1388 #[test]
1389 fn test_plan_cache() {
1390 let mut cache = PlanCache::new(100);
1391
1392 let plan = ExecutionPlan {
1393 steps: vec![ExecutionStep::TableScan {
1394 table: "test".to_string(),
1395 }],
1396 estimated_cost: 100.0,
1397 estimated_rows: 1000,
1398 };
1399
1400 let query = "SELECT * FROM test";
1401 let hash = PlanCache::hash_query(query);
1402
1403 assert!(cache.get(hash).is_none());
1405
1406 cache.put(hash, plan.clone());
1409 assert!(cache.get(hash).is_none());
1411
1412 cache.put(hash, plan);
1414 assert!(cache.get(hash).is_some());
1416
1417 let stats = cache.stats();
1418 assert_eq!(stats.entries, 1);
1419 assert_eq!(stats.total_hits, 1);
1420 }
1421
1422 #[test]
1423 fn test_force_cache() {
1424 let mut cache = PlanCache::new(100);
1425
1426 let plan = ExecutionPlan {
1427 steps: vec![ExecutionStep::TableScan {
1428 table: "test".to_string(),
1429 }],
1430 estimated_cost: 100.0,
1431 estimated_rows: 1000,
1432 };
1433
1434 let hash = PlanCache::hash_query("SELECT * FROM test2");
1435
1436 cache.force_put(hash, plan);
1438 assert!(cache.get(hash).is_some());
1439 }
1440
1441 #[test]
1442 fn test_explain() {
1443 let executor = OptimizedExecutor::new();
1444
1445 let select = SelectQuery {
1446 table: "users".to_string(),
1447 columns: vec!["id".to_string(), "name".to_string()],
1448 where_clause: None,
1449 order_by: Some(OrderBy {
1450 column: "id".to_string(),
1451 direction: SortDirection::Asc,
1452 }),
1453 limit: Some(10),
1454 offset: None,
1455 };
1456
1457 let catalog = Catalog::new("test");
1458 let explain = executor.explain(&select, &catalog).unwrap();
1459
1460 assert!(explain.contains("QUERY PLAN"));
1461 assert!(explain.contains("Execution Steps"));
1462 }
1463
1464 #[test]
1469 fn test_cardinality_tracker_basic() {
1470 let tracker = CardinalityTracker::new();
1471
1472 for i in 0u64..1000 {
1474 tracker.observe("events", "user_id", &i);
1475 }
1476
1477 let estimate = tracker.estimate("events", "user_id").unwrap();
1478
1479 let error = (estimate.distinct as f64 - 1000.0).abs() / 1000.0;
1481 assert!(
1482 error < 0.05,
1483 "Cardinality error {}% exceeds 5%",
1484 error * 100.0
1485 );
1486 assert!(estimate.error_pct < 1.0, "Standard error should be < 1%");
1487 }
1488
1489 #[test]
1490 fn test_cardinality_tracker_multiple_columns() {
1491 let tracker = CardinalityTracker::new();
1492
1493 for i in 0u64..10_000 {
1495 tracker.observe("events", "span_id", &i);
1496 }
1497
1498 for i in 0u64..1000 {
1500 tracker.observe("events", "project_id", &(i % 10));
1501 }
1502
1503 let span_estimate = tracker.estimate("events", "span_id").unwrap();
1504 let project_estimate = tracker.estimate("events", "project_id").unwrap();
1505
1506 let span_error = (span_estimate.distinct as f64 - 10000.0).abs() / 10000.0;
1508 assert!(span_error < 0.05, "span_id error {}%", span_error * 100.0);
1509
1510 let project_error = (project_estimate.distinct as f64 - 10.0).abs() / 10.0;
1512 assert!(
1513 project_error < 0.20,
1514 "project_id error {}%",
1515 project_error * 100.0
1516 );
1517 }
1518
1519 #[test]
1520 fn test_cardinality_drift_detection() {
1521 let tracker = CardinalityTracker::new();
1522
1523 for i in 0u64..100 {
1525 tracker.observe("events", "user_id", &i);
1526 }
1527
1528 let mut cached = std::collections::HashMap::new();
1529 cached.insert("user_id".to_string(), 100usize);
1530
1531 assert!(!tracker.has_cardinality_drift("events", &cached));
1533
1534 for i in 100u64..200 {
1536 tracker.observe("events", "user_id", &i);
1537 }
1538
1539 assert!(tracker.has_cardinality_drift("events", &cached));
1541 }
1542
1543 #[test]
1544 fn test_cardinality_tracker_memory() {
1545 let tracker = CardinalityTracker::new();
1546
1547 for i in 0u64..1000 {
1549 tracker.observe("table1", "col1", &i);
1550 tracker.observe("table1", "col2", &i);
1551 tracker.observe("table2", "col1", &i);
1552 }
1553
1554 let stats = tracker.memory_usage();
1555 assert_eq!(stats.table_count, 2);
1556 assert_eq!(stats.column_count, 3);
1557 assert!(stats.memory_bytes > 0);
1558 assert!(stats.standard_error_pct < 1.0);
1559 }
1560
1561 #[test]
1562 fn test_executor_with_cardinality_tracker() {
1563 let tracker = Arc::new(CardinalityTracker::new());
1564
1565 for i in 0u64..500 {
1567 tracker.observe("events", "user_id", &i);
1568 tracker.observe("events", "span_id", &(i * 2));
1569 }
1570 tracker.increment_row_count("events", 500);
1571
1572 let mut executor = OptimizedExecutor::with_cardinality_tracker(Arc::clone(&tracker));
1574
1575 executor.refresh_stats_from_tracker("events");
1577
1578 let stats = &executor.table_stats.get("events").unwrap();
1580 assert_eq!(stats.row_count, 500);
1581 assert!(stats.column_cardinalities.contains_key("user_id"));
1582 assert!(stats.column_cardinalities.contains_key("span_id"));
1583
1584 let user_card = stats.column_cardinalities.get("user_id").unwrap();
1586 let error = (*user_card as f64 - 500.0).abs() / 500.0;
1587 assert!(error < 0.05, "user_id cardinality error {}%", error * 100.0);
1588 }
1589}