1use super::{
14 result::{
15 cql_type_to_data_type, ColumnInfo, QueryMetadata, QueryResult, QueryResultIterator,
16 QueryRow, StreamingConfig,
17 },
18 select_ast::*,
19 select_optimizer::{AggregationPlan, ExecutionStep, OptimizedQueryPlan, SSTablePredicate},
20};
21use crate::{
22 parser::complex_types::ComplexTypeParser,
23 schema::{CqlType, SchemaManager},
24 storage::StorageEngine,
25 types::{RowKey, Value},
26 Error, Result, TableId,
27};
28use std::collections::HashMap;
29use std::sync::Arc;
30use tokio::sync::mpsc;
31
32#[derive(Debug)]
34pub struct SelectExecutor {
35 _schema: Arc<SchemaManager>,
37 storage: Arc<StorageEngine>,
39}
40
41#[derive(Debug)]
46struct ExecutionContext {
47 pub table_id: TableId,
49 pub columns: Vec<ColumnInfo>,
51 pub rows_processed: u64,
53}
54
55#[derive(Debug)]
57struct AggregationState {
58 groups: Vec<(Vec<Value>, Vec<AggregateValue>)>,
60 memory_usage_bytes: usize,
62 memory_limit_bytes: usize,
64}
65
66#[derive(Debug, Clone)]
68enum AggregateValue {
69 Count(u64),
70 Sum(f64),
71 Avg { sum: f64, count: u64 },
72 Min(Value),
73 Max(Value),
74}
75
76fn parse_table_id(table_id: &TableId) -> (Option<String>, String) {
88 let table_str = table_id.name();
89 match table_str.rfind('.') {
90 Some(dot) => (
91 Some(table_str[..dot].to_string()),
92 table_str[dot + 1..].to_string(),
93 ),
94 None => (None, table_str.to_string()),
95 }
96}
97
98fn values_equal(a: &Value, b: &Value) -> bool {
105 if a == b {
106 return true;
107 }
108 if same_numeric_family(a, b) {
111 if let (Some(x), Some(y)) = (a.as_f64(), b.as_f64()) {
112 return x == y;
113 }
114 }
115 false
116}
117
118fn same_numeric_family(a: &Value, b: &Value) -> bool {
120 a.as_f64().is_some() && b.as_f64().is_some()
121}
122
123fn compare_values_ordering(a: &Value, b: &Value) -> std::cmp::Ordering {
127 try_compare_values(a, b).unwrap_or(std::cmp::Ordering::Equal)
128}
129
130fn try_compare_values(a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
139 use std::cmp::Ordering;
140 if same_numeric_family(a, b) {
141 if let (Some(x), Some(y)) = (a.as_f64(), b.as_f64()) {
142 return Ok(x.partial_cmp(&y).unwrap_or(Ordering::Equal));
143 }
144 }
145 if std::mem::discriminant(a) == std::mem::discriminant(b) {
146 return a.partial_cmp(b).ok_or_else(|| {
147 Error::query_execution("Cannot compare incompatible types".to_string())
148 });
149 }
150 log::debug!("Cannot compare {:?} with {:?}", a, b);
151 Err(Error::query_execution(
152 "Cannot compare incompatible types".to_string(),
153 ))
154}
155
156fn evaluate_predicates(row: &QueryRow, predicates: &[SSTablePredicate]) -> Result<bool> {
161 use super::select_optimizer::SSTableFilterOp;
162 for predicate in predicates {
163 let Some(column_value) = row.values.get(&predicate.column) else {
164 return Ok(false);
165 };
166 let matches = match &predicate.operation {
167 SSTableFilterOp::Equal => predicate
168 .values
169 .first()
170 .is_some_and(|v| values_equal(column_value, v)),
171 SSTableFilterOp::In => predicate.values.contains(column_value),
172 SSTableFilterOp::Range => {
173 if predicate.values.len() < 2 {
174 false
175 } else {
176 let lo = &predicate.values[0];
177 let hi = &predicate.values[1];
178 compare_values_ordering(column_value, lo).is_ge()
179 && compare_values_ordering(column_value, hi).is_le()
180 }
181 }
182 SSTableFilterOp::Prefix => matches!(
183 (column_value, predicate.values.first()),
184 (Value::Text(s), Some(Value::Text(p))) if s.starts_with(p)
185 ),
186 SSTableFilterOp::BloomFilter => true, };
188 if !matches {
189 return Ok(false);
190 }
191 }
192 Ok(true)
193}
194
195fn build_row_from_scan(
209 key: RowKey,
210 value: Value,
211 projection: &[String],
212 schema: Option<&crate::schema::TableSchema>,
213) -> Option<QueryRow> {
214 if matches!(value, Value::Null | Value::Tombstone(_)) {
218 return None;
219 }
220
221 let mut row_values = HashMap::new();
222 let project = |name: &str| projection.is_empty() || projection.iter().any(|p| p == name);
223
224 if let Value::Map(map) = value {
225 for (col_name, col_value) in map {
226 if let Value::Text(name) = col_name {
227 if project(&name) {
228 row_values.insert(name, col_value);
229 }
230 }
231 }
232 if let Some(schema) = schema {
238 match crate::storage::partition_key_codec::decode_partition_key_columns(&key.0, schema)
239 {
240 Ok(pk_columns) => {
241 for (name, value) in pk_columns {
242 if project(&name) {
243 row_values.insert(name, value);
244 }
245 }
246 }
247 Err(e) => {
250 log::warn!(
251 "Failed to reconstruct partition-key columns from row key \
252 (len={} bytes) for {}.{}: {}",
253 key.0.len(),
254 schema.keyspace,
255 schema.table,
256 e
257 );
258 }
259 }
260 }
261 } else {
262 row_values.insert("data".to_string(), value);
264 if project("id") {
265 row_values.insert("id".to_string(), Value::Text(format!("{:?}", key)));
266 }
267 }
268
269 Some(QueryRow {
270 values: row_values,
271 key,
272 metadata: Default::default(),
273 })
274}
275
276fn eval_arithmetic(op: &ArithmeticOperator, left: Value, right: Value) -> Result<Value> {
283 use ArithmeticOperator::*;
284 macro_rules! int_op {
285 ($a:expr, $b:expr, $ctor:expr) => {
286 match op {
287 Add => Ok($ctor($a + $b)),
288 Subtract => Ok($ctor($a - $b)),
289 Multiply => Ok($ctor($a * $b)),
290 Divide => {
291 if $b == 0 {
292 Err(Error::query_execution("Division by zero".to_string()))
293 } else {
294 Ok($ctor($a / $b))
295 }
296 }
297 Modulo => {
298 if $b == 0 {
299 Err(Error::query_execution("Modulo by zero".to_string()))
300 } else {
301 Ok($ctor($a % $b))
302 }
303 }
304 }
305 };
306 }
307 match (left, right) {
308 (Value::Integer(a), Value::Integer(b)) => int_op!(a, b, Value::Integer),
309 (Value::BigInt(a), Value::BigInt(b)) => int_op!(a, b, Value::BigInt),
310 (Value::Float(a), Value::Float(b)) => match op {
311 Add => Ok(Value::Float(a + b)),
312 Subtract => Ok(Value::Float(a - b)),
313 Multiply => Ok(Value::Float(a * b)),
314 Divide => Ok(Value::Float(a / b)),
315 Modulo => Ok(Value::Float(a % b)),
316 },
317 _ => Err(Error::query_execution(
318 "Incompatible types for arithmetic".to_string(),
319 )),
320 }
321}
322
323fn build_group_key(row: &QueryRow, group_by_columns: &[String]) -> Vec<Value> {
326 if group_by_columns.is_empty() {
327 return vec![Value::Null];
328 }
329 group_by_columns
330 .iter()
331 .map(|col| row.values.get(col).cloned().unwrap_or(Value::Null))
332 .collect()
333}
334
335fn find_or_init_group(
343 groups: &mut Vec<(Vec<Value>, Vec<AggregateValue>)>,
344 key: Vec<Value>,
345 aggregates: &[super::select_optimizer::AggregateComputation],
346) -> usize {
347 if let Some(idx) = groups.iter().position(|(k, _)| k == &key) {
348 return idx;
349 }
350 let initial: Vec<_> = aggregates
351 .iter()
352 .map(|c| match c.function {
353 AggregateType::Count => AggregateValue::Count(0),
354 AggregateType::Sum => AggregateValue::Sum(0.0),
355 AggregateType::Avg => AggregateValue::Avg { sum: 0.0, count: 0 },
356 AggregateType::Min => AggregateValue::Min(Value::Null),
357 AggregateType::Max => AggregateValue::Max(Value::Null),
358 })
359 .collect();
360 groups.push((key, initial));
361 groups.len() - 1
362}
363
364fn update_aggregate(
370 state: &mut AggregateValue,
371 agg_comp: &super::select_optimizer::AggregateComputation,
372 row: &QueryRow,
373) {
374 let is_star = agg_comp.column == "*";
375 let value: Option<&Value> = if is_star {
377 None
378 } else {
379 row.values.get(&agg_comp.column)
380 };
381 let is_null = !is_star && value.is_none_or(Value::is_null);
382
383 match state {
384 AggregateValue::Count(count) => {
385 if is_star || !is_null {
386 *count += 1;
387 }
388 }
389 AggregateValue::Sum(sum) => {
390 if let Some(v) = value.and_then(Value::as_f64) {
391 *sum += v;
392 }
393 }
394 AggregateValue::Avg { sum, count } => {
395 if let Some(v) = value.and_then(Value::as_f64) {
396 *sum += v;
397 *count += 1;
398 }
399 }
400 AggregateValue::Min(min_val) => {
401 if let Some(v) = value {
402 if !v.is_null()
403 && (min_val.is_null() || compare_values_ordering(v, min_val).is_lt())
404 {
405 *min_val = v.clone();
406 }
407 }
408 }
409 AggregateValue::Max(max_val) => {
410 if let Some(v) = value {
411 if !v.is_null()
412 && (max_val.is_null() || compare_values_ordering(v, max_val).is_gt())
413 {
414 *max_val = v.clone();
415 }
416 }
417 }
418 }
419}
420
421fn finalize_group(
423 group_key: Vec<Value>,
424 group_aggregates: Vec<AggregateValue>,
425 agg_plan: &AggregationPlan,
426) -> QueryRow {
427 let mut row_values = HashMap::new();
428
429 for (i, col) in agg_plan.group_by_columns.iter().enumerate() {
430 if let Some(v) = group_key.get(i) {
431 row_values.insert(col.clone(), v.clone());
432 }
433 }
434
435 for (i, agg_comp) in agg_plan.aggregates.iter().enumerate() {
436 let result_value = match &group_aggregates[i] {
437 AggregateValue::Count(count) => Value::BigInt(*count as i64),
438 AggregateValue::Sum(sum) => Value::Float(*sum),
439 AggregateValue::Avg { sum, count } => {
440 if *count > 0 {
441 Value::Float(sum / (*count as f64))
442 } else {
443 Value::Null
444 }
445 }
446 AggregateValue::Min(val) | AggregateValue::Max(val) => val.clone(),
447 };
448 row_values.insert(agg_comp.alias.clone(), result_value);
449 }
450
451 QueryRow {
452 values: row_values,
453 key: RowKey::new(vec![]),
454 metadata: Default::default(),
455 }
456}
457
458fn const_arithmetic(op: &ArithmeticOperator, left: Value, right: Value) -> Result<Value> {
463 use ArithmeticOperator::*;
464
465 if matches!(op, Modulo) {
469 return match (left, right) {
470 (Value::Integer(a), Value::Integer(b)) => {
471 eval_arithmetic(op, Value::Integer(a), Value::Integer(b))
472 }
473 (Value::BigInt(a), Value::BigInt(b)) => {
474 eval_arithmetic(op, Value::BigInt(a), Value::BigInt(b))
475 }
476 _ => Err(Error::query_execution(
477 "Modulo only supported for integers".to_string(),
478 )),
479 };
480 }
481
482 let verb = match op {
483 Add => "add",
484 Subtract => "subtract",
485 Multiply => "multiply",
486 Divide => "divide",
487 Modulo => unreachable!("handled above"),
488 };
489
490 match (left, right) {
491 (Value::Integer(a), Value::Integer(b)) => {
492 eval_arithmetic(op, Value::Integer(a), Value::Integer(b))
493 }
494 (Value::BigInt(a), Value::BigInt(b)) => {
495 eval_arithmetic(op, Value::BigInt(a), Value::BigInt(b))
496 }
497 (Value::Float(a), Value::Float(b)) => {
498 if matches!(op, Divide) && b == 0.0 {
501 return Err(Error::query_execution("Division by zero".to_string()));
502 }
503 eval_arithmetic(op, Value::Float(a), Value::Float(b))
504 }
505 _ => Err(Error::query_execution(format!(
506 "Cannot {} incompatible types",
507 verb
508 ))),
509 }
510}
511
512fn like_pattern_to_regex(pattern: &str) -> String {
514 let mut out = String::with_capacity(pattern.len() + 4);
515 out.push('^');
516 for ch in pattern.chars() {
517 match ch {
518 '%' => out.push_str(".*"),
519 '_' => out.push('.'),
520 _ => out.push(ch),
521 }
522 }
523 out.push('$');
524 out
525}
526
527fn parse_cql_type_str(type_str: &str) -> Option<CqlType> {
533 let parser = ComplexTypeParser::new();
534 parser
535 .parse_type(type_str)
536 .ok()
537 .map(|parsed| parsed.cql_type)
538}
539
540impl SelectExecutor {
541 pub fn new(schema: Arc<SchemaManager>, storage: Arc<StorageEngine>) -> Self {
543 Self {
544 _schema: schema,
545 storage,
546 }
547 }
548
549 pub async fn execute(&self, plan: OptimizedQueryPlan) -> Result<QueryResult> {
551 let table_id = if let Some(ref from_clause) = plan.statement.from_clause {
552 self.extract_table_id(from_clause)?
553 } else {
554 TableId::new("_dummy_")
556 };
557
558 let mut context = ExecutionContext {
559 table_id,
560 columns: self.get_result_columns(&plan.statement).await?,
561 rows_processed: 0,
562 };
563
564 if plan.statement.from_clause.is_none() {
566 return self.execute_constant_query(&plan.statement, &context).await;
567 }
568
569 let mut intermediate_results = Vec::new();
571
572 let execution_steps = if plan.execution_steps.is_empty() {
574 vec![ExecutionStep::SSTableScan {
575 table: context.table_id.clone(),
576 predicates: vec![],
577 projection: context.columns.iter().map(|c| c.name.clone()).collect(),
578 }]
579 } else {
580 plan.execution_steps.clone()
581 };
582
583 for step in &execution_steps {
584 match step {
585 ExecutionStep::SSTableScan {
586 table,
587 predicates,
588 projection,
589 ..
590 } => {
591 let rows = self
592 .execute_sstable_scan(table, predicates, projection, &mut context)
593 .await?;
594 intermediate_results = rows;
595 }
596 ExecutionStep::Filter { expression, .. } => {
597 intermediate_results = self
598 .execute_filter(intermediate_results, expression, &mut context)
599 .await?;
600 }
601 ExecutionStep::Sort { order_by, .. } => {
602 intermediate_results = self
603 .execute_sort(intermediate_results, order_by, &mut context)
604 .await?;
605 }
606 ExecutionStep::Aggregate { plan: agg_plan, .. } => {
607 intermediate_results = self
608 .execute_aggregation(intermediate_results, agg_plan, &mut context)
609 .await?;
610 }
611 ExecutionStep::Limit { count, offset } => {
612 intermediate_results = self
613 .execute_limit(intermediate_results, *count, *offset, &mut context)
614 .await?;
615 }
616 ExecutionStep::Project { columns } => {
617 intermediate_results = self
618 .execute_projection(intermediate_results, columns, &mut context)
619 .await?;
620 }
621 }
622 }
623
624 let total_rows = intermediate_results.len() as u64;
625
626 let mut columns = context.columns;
631 if columns.is_empty() && !intermediate_results.is_empty() {
632 let schema_opt = if let Some(ref from_clause) = plan.statement.from_clause {
634 if let Ok(table_id) = self.extract_table_id(from_clause) {
635 let (keyspace, table_name) = parse_table_id(&table_id);
636 self._schema
637 .find_schema_by_table(&keyspace, &table_name)
638 .await
639 } else {
640 None
641 }
642 } else {
643 None
644 };
645
646 let first_row = &intermediate_results[0];
647 let mut col_names: Vec<_> = first_row.values.keys().collect();
648 col_names.sort(); let table_name_for_meta = schema_opt
651 .as_ref()
652 .map(|s| format!("{}.{}", s.keyspace, s.table));
653
654 for (idx, col_name) in col_names.iter().enumerate() {
655 let cql_type_opt = schema_opt.as_ref().and_then(|schema| {
657 schema
658 .columns
659 .iter()
660 .find(|c| c.name.as_str() == col_name.as_str())
661 .and_then(|c| parse_cql_type_str(&c.data_type))
662 });
663
664 let data_type = cql_type_opt
665 .as_ref()
666 .map(cql_type_to_data_type)
667 .unwrap_or(crate::types::DataType::Text);
668
669 let mut col_info = ColumnInfo {
670 name: (*col_name).clone(),
671 data_type,
672 nullable: true,
673 position: idx,
674 table_name: table_name_for_meta.clone(),
675 cql_type: None,
676 };
677 if let Some(cql_type) = cql_type_opt {
678 col_info = col_info.with_cql_type(cql_type);
679 }
680 columns.push(col_info);
681 }
682 }
683
684 Ok(QueryResult {
685 rows: intermediate_results,
686 rows_affected: total_rows, execution_time_ms: 0, metadata: crate::query::result::QueryMetadata {
689 columns,
690 total_rows: Some(total_rows),
691 plan_info: None,
692 performance: Default::default(),
693 warnings: vec![],
694 },
695 })
696 }
697
698 pub async fn execute_streaming(
725 &self,
726 plan: OptimizedQueryPlan,
727 config: StreamingConfig,
728 ) -> Result<QueryResultIterator> {
729 if self.requires_materialization(&plan) {
731 log::info!("Query requires materialization (ORDER BY/GROUP BY/aggregates), using execute-then-stream");
732 return self.execute_and_stream(plan, config).await;
733 }
734
735 let table_id = if let Some(ref from_clause) = plan.statement.from_clause {
736 self.extract_table_id(from_clause)?
737 } else {
738 return self.execute_and_stream(plan, config).await;
740 };
741
742 let columns = self.get_result_columns(&plan.statement).await?;
743
744 let (tx, rx) = mpsc::channel(config.buffer_size);
746
747 let execution_steps = if plan.execution_steps.is_empty() {
749 vec![ExecutionStep::SSTableScan {
750 table: table_id.clone(),
751 predicates: vec![],
752 projection: columns.iter().map(|c| c.name.clone()).collect(),
753 }]
754 } else {
755 plan.execution_steps.clone()
756 };
757
758 let storage = Arc::clone(&self.storage);
760 let schema_manager = Arc::clone(&self._schema);
761
762 tokio::spawn(async move {
764 if let Err(e) = Self::execute_streaming_background(
765 storage,
766 schema_manager,
767 table_id,
768 execution_steps,
769 tx,
770 )
771 .await
772 {
773 log::error!("Streaming execution error: {}", e);
774 }
776 });
777
778 let metadata = QueryMetadata {
780 columns,
781 total_rows: None, plan_info: None,
783 performance: Default::default(),
784 warnings: vec![],
785 };
786
787 Ok(QueryResultIterator::new(rx, metadata))
788 }
789
790 fn requires_materialization(&self, plan: &OptimizedQueryPlan) -> bool {
792 for step in &plan.execution_steps {
793 match step {
794 ExecutionStep::Sort { .. } => return true,
795 ExecutionStep::Aggregate { .. } => return true,
796 _ => {}
797 }
798 }
799
800 matches!(plan.statement.select_clause, SelectClause::Distinct(_))
802 }
803
804 async fn execute_and_stream(
806 &self,
807 plan: OptimizedQueryPlan,
808 config: StreamingConfig,
809 ) -> Result<QueryResultIterator> {
810 let result = self.execute(plan).await?;
812
813 let (tx, rx) = mpsc::channel(config.buffer_size);
815
816 tokio::spawn(async move {
818 for row in result.rows {
819 if tx.send(Ok(row)).await.is_err() {
820 break; }
822 }
823 });
825
826 Ok(QueryResultIterator::new(rx, result.metadata))
827 }
828
829 async fn execute_streaming_background(
831 storage: Arc<StorageEngine>,
832 schema_manager: Arc<SchemaManager>,
833 _table_id: TableId,
834 execution_steps: Vec<ExecutionStep>,
835 tx: mpsc::Sender<Result<QueryRow>>,
836 ) -> Result<()> {
837 let limit = execution_steps.iter().find_map(|step| match step {
846 ExecutionStep::Limit { count, offset } => Some((*count, offset.unwrap_or(0))),
847 _ => None,
848 });
849 let (limit_count, mut offset_remaining) = match limit {
850 Some((count, offset)) => (Some(count), offset),
851 None => (None, 0),
852 };
853
854 if limit_count == Some(0) {
856 return Ok(());
857 }
858
859 let mut sent: u64 = 0;
860
861 for step in &execution_steps {
862 match step {
863 ExecutionStep::SSTableScan {
864 table,
865 predicates,
866 projection,
867 ..
868 } => {
869 let (keyspace, table_name) = parse_table_id(table);
870 let schema_opt = schema_manager
871 .find_schema_by_table(&keyspace, &table_name)
872 .await;
873
874 let scan_results = storage
875 .scan(table, None, None, None, schema_opt.as_ref())
876 .await?;
877
878 for (key, value) in scan_results {
879 let Some(row) =
880 build_row_from_scan(key, value, projection, schema_opt.as_ref())
881 else {
882 continue;
883 };
884
885 if !evaluate_predicates(&row, predicates)? {
886 continue;
887 }
888
889 if offset_remaining > 0 {
891 offset_remaining -= 1;
892 continue;
893 }
894
895 if tx.send(Ok(row)).await.is_err() {
897 return Ok(());
898 }
899 sent += 1;
900
901 if let Some(count) = limit_count {
903 if sent >= count {
904 return Ok(());
905 }
906 }
907 }
908 }
909 ExecutionStep::Limit { .. } => {
910 }
913 ExecutionStep::Project { .. } | ExecutionStep::Filter { .. } => {}
915 _ => {
916 log::warn!("Streaming execution: skipping unsupported step {:?}", step);
917 }
918 }
919 }
920
921 Ok(())
922 }
923
924 async fn execute_sstable_scan(
931 &self,
932 table: &TableId,
933 predicates: &[SSTablePredicate],
934 projection: &[String],
935 context: &mut ExecutionContext,
936 ) -> Result<Vec<QueryRow>> {
937 const MAX_RESULTS: usize = 1_000_000;
938
939 log::info!(
940 "Executing SSTableScan: table=\"{}\", predicates={:?}",
941 table,
942 predicates
943 );
944
945 let (keyspace, table_name) = parse_table_id(table);
946 let schema_opt = self
947 ._schema
948 .find_schema_by_table(&keyspace, &table_name)
949 .await;
950
951 match schema_opt.as_ref() {
952 Some(schema) => log::info!(
953 "Found schema for {}.{} with {} columns",
954 schema.keyspace,
955 schema.table,
956 schema.columns.len()
957 ),
958 None => log::info!(
959 "No schema found for {}.{}, proceeding without schema-aware parsing",
960 keyspace.as_deref().unwrap_or("unknown"),
961 table_name
962 ),
963 }
964
965 let scan_results = self
966 .storage
967 .scan(table, None, None, None, schema_opt.as_ref())
968 .await?;
969
970 log::info!("Scan returned {} rows", scan_results.len());
971
972 let mut results = Vec::new();
973 for (key, value) in scan_results {
974 context.rows_processed += 1;
975
976 let Some(row) = build_row_from_scan(key, value, projection, schema_opt.as_ref()) else {
978 continue;
979 };
980
981 if evaluate_predicates(&row, predicates)? {
982 results.push(row);
983 }
984
985 if results.len() > MAX_RESULTS {
986 return Err(Error::query_execution(
987 "Result set too large, consider adding LIMIT".to_string(),
988 ));
989 }
990 }
991
992 Ok(results)
993 }
994
995 async fn execute_filter(
997 &self,
998 rows: Vec<QueryRow>,
999 filter_expr: &WhereExpression,
1000 context: &mut ExecutionContext,
1001 ) -> Result<Vec<QueryRow>> {
1002 let mut filtered_rows = Vec::new();
1003
1004 for row in rows {
1005 if self.evaluate_where_expression(filter_expr, &row)? {
1006 filtered_rows.push(row);
1007 }
1008 context.rows_processed += 1;
1009 }
1010
1011 Ok(filtered_rows)
1012 }
1013
1014 fn evaluate_where_expression(&self, expr: &WhereExpression, row: &QueryRow) -> Result<bool> {
1016 match expr {
1017 WhereExpression::Comparison(comp) => self.evaluate_comparison(comp, row),
1018 WhereExpression::And(exprs) => {
1019 for expr in exprs {
1020 if !self.evaluate_where_expression(expr, row)? {
1021 return Ok(false);
1022 }
1023 }
1024 Ok(true)
1025 }
1026 WhereExpression::Or(exprs) => {
1027 for expr in exprs {
1028 if self.evaluate_where_expression(expr, row)? {
1029 return Ok(true);
1030 }
1031 }
1032 Ok(false)
1033 }
1034 WhereExpression::Not(expr) => Ok(!self.evaluate_where_expression(expr, row)?),
1035 WhereExpression::Parentheses(expr) => self.evaluate_where_expression(expr, row),
1036 }
1037 }
1038
1039 fn evaluate_comparison(&self, comp: &ComparisonExpression, row: &QueryRow) -> Result<bool> {
1043 use ComparisonOperator::*;
1044
1045 let left_value = self.evaluate_select_expression(&comp.left, row)?;
1046
1047 match comp.operator {
1049 IsNull => return Ok(left_value.is_null()),
1050 IsNotNull => return Ok(!left_value.is_null()),
1051 _ => {}
1052 }
1053
1054 match (&comp.operator, &comp.right) {
1055 (
1056 op @ (Equal | NotEqual | LessThan | LessThanOrEqual | GreaterThan
1057 | GreaterThanOrEqual),
1058 ComparisonRightSide::Value(right_expr),
1059 ) => {
1060 let right_value = self.evaluate_select_expression(right_expr, row)?;
1061 let result = match op {
1062 Equal => values_equal(&left_value, &right_value),
1063 NotEqual => !values_equal(&left_value, &right_value),
1064 LessThan => try_compare_values(&left_value, &right_value)?.is_lt(),
1065 LessThanOrEqual => try_compare_values(&left_value, &right_value)?.is_le(),
1066 GreaterThan => try_compare_values(&left_value, &right_value)?.is_gt(),
1067 GreaterThanOrEqual => try_compare_values(&left_value, &right_value)?.is_ge(),
1068 _ => unreachable!("guarded by outer match"),
1069 };
1070 Ok(result)
1071 }
1072 (In, ComparisonRightSide::ValueList(value_exprs)) => {
1073 for value_expr in value_exprs {
1074 let value = self.evaluate_select_expression(value_expr, row)?;
1075 if left_value == value {
1076 return Ok(true);
1077 }
1078 }
1079 Ok(false)
1080 }
1081 (Like, ComparisonRightSide::Value(pattern_expr)) => {
1082 let pattern = self.evaluate_select_expression(pattern_expr, row)?;
1083 if let (Value::Text(text), Value::Text(pattern_str)) = (&left_value, &pattern) {
1084 Ok(self.match_like_pattern(text, pattern_str))
1085 } else {
1086 Ok(false)
1087 }
1088 }
1089 _ => Err(Error::query_execution(
1090 "Unsupported comparison operator".to_string(),
1091 )),
1092 }
1093 }
1094
1095 fn evaluate_select_expression(&self, expr: &SelectExpression, row: &QueryRow) -> Result<Value> {
1097 match expr {
1098 SelectExpression::Column(col_ref) => {
1099 row.values.get(&col_ref.column).cloned().ok_or_else(|| {
1100 Error::query_execution(format!("Column not found: {}", col_ref.column))
1101 })
1102 }
1103 SelectExpression::Literal(value) => Ok(value.clone()),
1104 SelectExpression::CollectionAccess(access) => {
1105 self.evaluate_collection_access(access, row)
1106 }
1107 SelectExpression::Arithmetic(arith) => {
1108 let left = self.evaluate_select_expression(&arith.left, row)?;
1109 let right = self.evaluate_select_expression(&arith.right, row)?;
1110 self.evaluate_arithmetic(&arith.operator, left, right)
1111 }
1112 SelectExpression::Aliased(expr, _) => self.evaluate_select_expression(expr, row),
1113 SelectExpression::Aggregate(_) => {
1114 Err(Error::query_execution(
1117 "Aggregate expressions should be processed during aggregation step, not row evaluation".to_string(),
1118 ))
1119 }
1120 SelectExpression::Function(_) => {
1121 Err(Error::query_execution(
1123 "Function expressions not yet implemented".to_string(),
1124 ))
1125 }
1126 }
1127 }
1128
1129 fn evaluate_collection_access(
1132 &self,
1133 access: &CollectionAccessExpression,
1134 row: &QueryRow,
1135 ) -> Result<Value> {
1136 let lookup_column = |col: &ColumnRef| -> Result<&Value> {
1137 row.values
1138 .get(&col.column)
1139 .ok_or_else(|| Error::query_execution(format!("Column not found: {}", col.column)))
1140 };
1141
1142 match access {
1143 CollectionAccessExpression::ListIndex(col_ref, index_expr) => {
1144 let list_value = lookup_column(col_ref)?;
1145 let index_value = self.evaluate_select_expression(index_expr, row)?;
1146
1147 let (Value::List(list), Value::Integer(index)) = (list_value, &index_value) else {
1148 return Err(Error::query_execution("Invalid list access".to_string()));
1149 };
1150 if *index >= 0 && (*index as usize) < list.len() {
1151 Ok(list[*index as usize].clone())
1152 } else {
1153 Ok(Value::Null)
1154 }
1155 }
1156 CollectionAccessExpression::MapKey(col_ref, key_expr) => {
1157 let map_value = lookup_column(col_ref)?;
1158 let key_value = self.evaluate_select_expression(key_expr, row)?;
1159
1160 let Value::Map(map) = map_value else {
1161 return Err(Error::query_execution("Invalid map access".to_string()));
1162 };
1163 Ok(map
1164 .iter()
1165 .find(|(k, _)| *k == key_value)
1166 .map(|(_, v)| v.clone())
1167 .unwrap_or(Value::Null))
1168 }
1169 CollectionAccessExpression::SetContains(col_ref, value_expr) => {
1170 let set_value = lookup_column(col_ref)?;
1171 let test_value = self.evaluate_select_expression(value_expr, row)?;
1172
1173 let Value::Set(set) = set_value else {
1174 return Err(Error::query_execution(
1175 "Invalid set contains operation".to_string(),
1176 ));
1177 };
1178 Ok(Value::Boolean(set.contains(&test_value)))
1179 }
1180 }
1181 }
1182
1183 fn evaluate_arithmetic(
1190 &self,
1191 op: &ArithmeticOperator,
1192 left: Value,
1193 right: Value,
1194 ) -> Result<Value> {
1195 match (&left, &right) {
1196 (Value::Integer(_), Value::Integer(_)) | (Value::Float(_), Value::Float(_)) => {
1197 eval_arithmetic(op, left, right)
1198 }
1199 _ => Err(Error::query_execution(
1200 "Incompatible types for arithmetic".to_string(),
1201 )),
1202 }
1203 }
1204
1205 fn match_like_pattern(&self, text: &str, pattern: &str) -> bool {
1208 regex::Regex::new(&like_pattern_to_regex(pattern))
1209 .map(|re| re.is_match(text))
1210 .unwrap_or(false)
1211 }
1212
1213 async fn execute_sort(
1215 &self,
1216 mut rows: Vec<QueryRow>,
1217 order_by: &OrderByClause,
1218 _context: &mut ExecutionContext,
1219 ) -> Result<Vec<QueryRow>> {
1220 rows.sort_by(|a, b| {
1221 for item in &order_by.items {
1222 let a_val = self
1223 .evaluate_select_expression(&item.expression, a)
1224 .unwrap_or(Value::Null);
1225 let b_val = self
1226 .evaluate_select_expression(&item.expression, b)
1227 .unwrap_or(Value::Null);
1228
1229 let ordering = match item.direction {
1230 SortDirection::Ascending => compare_values_ordering(&a_val, &b_val),
1231 SortDirection::Descending => compare_values_ordering(&b_val, &a_val),
1232 };
1233 if !ordering.is_eq() {
1234 return ordering;
1235 }
1236 }
1237 std::cmp::Ordering::Equal
1238 });
1239
1240 Ok(rows)
1241 }
1242
1243 async fn execute_aggregation(
1247 &self,
1248 rows: Vec<QueryRow>,
1249 agg_plan: &AggregationPlan,
1250 _context: &mut ExecutionContext,
1251 ) -> Result<Vec<QueryRow>> {
1252 const PER_ROW_MEMORY_ESTIMATE_BYTES: usize = 100;
1253 const DEFAULT_AGGREGATION_MEMORY_LIMIT: usize = 512 * 1024 * 1024;
1254
1255 let mut agg_state = AggregationState {
1256 groups: Vec::new(),
1257 memory_usage_bytes: 0,
1258 memory_limit_bytes: DEFAULT_AGGREGATION_MEMORY_LIMIT,
1259 };
1260
1261 for row in rows {
1262 let group_key = build_group_key(&row, &agg_plan.group_by_columns);
1263 let group_index =
1264 find_or_init_group(&mut agg_state.groups, group_key, &agg_plan.aggregates);
1265 let group_aggregates = &mut agg_state.groups[group_index].1;
1266
1267 for (i, agg_comp) in agg_plan.aggregates.iter().enumerate() {
1268 update_aggregate(&mut group_aggregates[i], agg_comp, &row);
1269 }
1270
1271 agg_state.memory_usage_bytes += PER_ROW_MEMORY_ESTIMATE_BYTES;
1272 if agg_state.memory_usage_bytes > agg_state.memory_limit_bytes {
1273 return Err(Error::query_execution(
1274 "Aggregation memory limit exceeded".to_string(),
1275 ));
1276 }
1277 }
1278
1279 let result_rows = agg_state
1280 .groups
1281 .into_iter()
1282 .map(|(group_key, group_aggregates)| {
1283 finalize_group(group_key, group_aggregates, agg_plan)
1284 })
1285 .collect();
1286
1287 Ok(result_rows)
1288 }
1289
1290 async fn execute_limit(
1292 &self,
1293 mut rows: Vec<QueryRow>,
1294 count: u64,
1295 offset: Option<u64>,
1296 _context: &mut ExecutionContext,
1297 ) -> Result<Vec<QueryRow>> {
1298 let start_index = offset.unwrap_or(0) as usize;
1299 if start_index >= rows.len() {
1300 return Ok(Vec::new());
1301 }
1302 rows.drain(..start_index);
1303 rows.truncate(count as usize);
1304 Ok(rows)
1305 }
1306
1307 async fn execute_projection(
1309 &self,
1310 rows: Vec<QueryRow>,
1311 columns: &[SelectExpression],
1312 _context: &mut ExecutionContext,
1313 ) -> Result<Vec<QueryRow>> {
1314 let mut projected_rows = Vec::new();
1315
1316 for row in rows {
1317 let mut projected_values = HashMap::new();
1318
1319 for (i, expr) in columns.iter().enumerate() {
1320 let value = self.evaluate_select_expression(expr, &row)?;
1321 let column_name = match expr {
1322 SelectExpression::Column(col_ref) => col_ref.column.clone(),
1323 SelectExpression::Aliased(_, alias) => alias.clone(),
1324 _ => format!("col_{i}"),
1325 };
1326 projected_values.insert(column_name, value);
1327 }
1328
1329 projected_rows.push(QueryRow {
1330 values: projected_values,
1331 key: RowKey::new(vec![]),
1332 metadata: Default::default(),
1333 });
1334 }
1335
1336 Ok(projected_rows)
1337 }
1338
1339 async fn execute_constant_query(
1341 &self,
1342 statement: &SelectStatement,
1343 _context: &ExecutionContext,
1344 ) -> Result<QueryResult> {
1345 let mut values = HashMap::new();
1346 let mut columns = Vec::new();
1347
1348 match &statement.select_clause {
1349 SelectClause::All => {
1350 return Err(Error::query_execution(
1351 "SELECT * requires a FROM clause".to_string(),
1352 ));
1353 }
1354 SelectClause::Columns(expressions) | SelectClause::Distinct(expressions) => {
1355 for (i, expr) in expressions.iter().enumerate() {
1356 let (value, column_name) = self.evaluate_constant_expression(expr)?;
1357 let key = column_name.unwrap_or_else(|| format!("column_{}", i));
1358 values.insert(key.clone(), value);
1359 columns.push(ColumnInfo {
1360 name: key,
1361 data_type: crate::types::DataType::Text, nullable: true,
1363 position: i,
1364 table_name: None, cql_type: None,
1366 });
1367 }
1368 }
1369 }
1370
1371 let row = QueryRow::with_values(RowKey::new(vec![1]), values);
1372
1373 Ok(QueryResult {
1374 rows: vec![row],
1375 rows_affected: 1, execution_time_ms: 0,
1377 metadata: crate::query::result::QueryMetadata {
1378 columns,
1379 total_rows: Some(1),
1380 plan_info: None,
1381 performance: crate::query::result::PerformanceMetrics::default(),
1382 warnings: Vec::new(),
1383 },
1384 })
1385 }
1386
1387 #[allow(clippy::only_used_in_recursion)]
1394 fn evaluate_constant_expression(
1395 &self,
1396 expr: &SelectExpression,
1397 ) -> Result<(Value, Option<String>)> {
1398 match expr {
1399 SelectExpression::Literal(value) => Ok((value.clone(), None)),
1400 SelectExpression::Aliased(inner_expr, alias) => {
1401 let (value, _) = self.evaluate_constant_expression(inner_expr)?;
1402 Ok((value, Some(alias.clone())))
1403 }
1404 SelectExpression::Arithmetic(arith) => {
1405 let (left_val, _) = self.evaluate_constant_expression(&arith.left)?;
1406 let (right_val, _) = self.evaluate_constant_expression(&arith.right)?;
1407 let result = const_arithmetic(&arith.operator, left_val, right_val)?;
1408 Ok((result, None))
1409 }
1410 _ => Err(Error::query_execution(
1411 "Expression type not supported in constant queries".to_string(),
1412 )),
1413 }
1414 }
1415
1416 fn extract_table_id(&self, from_clause: &FromClause) -> Result<TableId> {
1419 match from_clause {
1420 FromClause::Table(table_id) | FromClause::TableAlias(table_id, _) => {
1421 Ok(table_id.clone())
1422 }
1423 }
1424 }
1425
1426 async fn get_result_columns(&self, statement: &SelectStatement) -> Result<Vec<ColumnInfo>> {
1427 let mut columns = Vec::new();
1428
1429 match &statement.select_clause {
1430 SelectClause::All => {
1431 if let Some(ref from_clause) = statement.from_clause {
1434 let table_id = self.extract_table_id(from_clause)?;
1435 let (keyspace_opt, table_name) = parse_table_id(&table_id);
1436
1437 if let Some(schema) = self
1439 ._schema
1440 .find_schema_by_table(&keyspace_opt, &table_name)
1441 .await
1442 {
1443 let mut schema_cols: Vec<&crate::schema::Column> =
1445 schema.columns.iter().collect();
1446 schema_cols.sort_by_key(|c| c.name.as_str());
1447
1448 let keyspace_str = keyspace_opt.as_deref().unwrap_or("");
1449 let table_name_str = format!("{}.{}", keyspace_str, table_name);
1450
1451 for (idx, schema_col) in schema_cols.iter().enumerate() {
1452 let cql_type_opt = parse_cql_type_str(&schema_col.data_type);
1454 let data_type = cql_type_opt
1456 .as_ref()
1457 .map(cql_type_to_data_type)
1458 .unwrap_or(crate::types::DataType::Text);
1459
1460 let mut col_info = ColumnInfo {
1461 name: schema_col.name.clone(),
1462 data_type,
1463 nullable: true,
1464 position: idx,
1465 table_name: Some(table_name_str.clone()),
1466 cql_type: None,
1467 };
1468 if let Some(cql_type) = cql_type_opt {
1469 col_info = col_info.with_cql_type(cql_type);
1470 }
1471 columns.push(col_info);
1472 }
1473
1474 log::debug!(
1475 "SELECT * resolved {} columns from schema for {:?}.{}",
1476 columns.len(),
1477 keyspace_opt,
1478 table_name
1479 );
1480 }
1481 }
1483 }
1484 SelectClause::Columns(exprs) | SelectClause::Distinct(exprs) => {
1485 let schema_opt = if let Some(ref from_clause) = statement.from_clause {
1488 if let Ok(table_id) = self.extract_table_id(from_clause) {
1489 let (keyspace_opt, table_name) = parse_table_id(&table_id);
1490 self._schema
1491 .find_schema_by_table(&keyspace_opt, &table_name)
1492 .await
1493 } else {
1494 None
1495 }
1496 } else {
1497 None
1498 };
1499
1500 for (i, expr) in exprs.iter().enumerate() {
1501 let column_name = match expr {
1502 SelectExpression::Column(col_ref) => col_ref.column.clone(),
1503 SelectExpression::Aliased(_, alias) => alias.clone(),
1504 _ => format!("col_{i}"),
1505 };
1506
1507 let cql_type_opt = schema_opt.as_ref().and_then(|schema| {
1509 schema
1510 .columns
1511 .iter()
1512 .find(|c| c.name == column_name)
1513 .and_then(|c| parse_cql_type_str(&c.data_type))
1514 });
1515 let data_type = cql_type_opt
1516 .as_ref()
1517 .map(cql_type_to_data_type)
1518 .unwrap_or(crate::types::DataType::Text);
1519
1520 let mut col_info = ColumnInfo {
1521 name: column_name,
1522 data_type,
1523 nullable: true,
1524 position: i,
1525 table_name: None,
1526 cql_type: None,
1527 };
1528 if let Some(cql_type) = cql_type_opt {
1529 col_info = col_info.with_cql_type(cql_type);
1530 }
1531 columns.push(col_info);
1532 }
1533 }
1534 }
1535
1536 Ok(columns)
1537 }
1538}
1539
1540#[cfg(test)]
1541mod tests {
1542 use super::*;
1543 use crate::{platform::Platform, Config};
1544 use tempfile::TempDir;
1545
1546 async fn create_test_executor() -> SelectExecutor {
1547 let temp_dir = TempDir::new().unwrap();
1548 let config = Config::default();
1549 let platform = Arc::new(Platform::new(&config).await.unwrap());
1550 let storage = Arc::new(
1551 StorageEngine::open(
1552 temp_dir.path(),
1553 &config,
1554 platform.clone(),
1555 #[cfg(feature = "state_machine")]
1556 None,
1557 )
1558 .await
1559 .unwrap(),
1560 );
1561 let _schema = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
1562
1563 SelectExecutor { _schema, storage }
1564 }
1565
1566 #[test]
1567 fn test_value_comparison() {
1568 use std::cmp::Ordering;
1569 assert_eq!(
1570 try_compare_values(&Value::Integer(5), &Value::Integer(3)).unwrap(),
1571 Ordering::Greater
1572 );
1573 assert_eq!(
1574 try_compare_values(&Value::Integer(3), &Value::Integer(5)).unwrap(),
1575 Ordering::Less
1576 );
1577 assert_eq!(
1578 try_compare_values(&Value::Integer(5), &Value::Integer(5)).unwrap(),
1579 Ordering::Equal
1580 );
1581 }
1582
1583 #[tokio::test]
1584 async fn test_like_pattern_matching() {
1585 let executor = create_test_executor().await;
1586
1587 assert!(executor.match_like_pattern("hello", "h%"));
1588 assert!(executor.match_like_pattern("hello", "%lo"));
1589 assert!(executor.match_like_pattern("hello", "h_llo"));
1590 assert!(!executor.match_like_pattern("hello", "h_l"));
1591 }
1592
1593 fn single_pk_schema(name: &str, data_type: &str) -> crate::schema::TableSchema {
1598 crate::schema::TableSchema {
1599 keyspace: "ks".to_string(),
1600 table: "t".to_string(),
1601 partition_keys: vec![crate::schema::KeyColumn {
1602 name: name.to_string(),
1603 data_type: data_type.to_string(),
1604 position: 0,
1605 }],
1606 clustering_keys: vec![],
1607 columns: vec![],
1608 comments: std::collections::HashMap::new(),
1609 }
1610 }
1611
1612 #[test]
1617 fn build_row_from_scan_materialises_single_text_pk() {
1618 let key = RowKey::new(b"k0000000000000000".to_vec());
1619 let value = Value::Map(vec![(
1620 Value::Text("name".to_string()),
1621 Value::Text("name-0".to_string()),
1622 )]);
1623 let schema = single_pk_schema("id", "text");
1624
1625 let row = build_row_from_scan(key, value, &[], Some(&schema))
1626 .expect("row must be built (not tombstoned)");
1627
1628 assert_eq!(
1629 row.values.get("id"),
1630 Some(&Value::Text("k0000000000000000".to_string())),
1631 "Issue #586: single TEXT PK column must be reconstructed from the raw row key"
1632 );
1633 assert_eq!(
1635 row.values.get("name"),
1636 Some(&Value::Text("name-0".to_string()))
1637 );
1638 }
1639
1640 #[test]
1643 fn scan_built_row_matches_text_pk_equality_predicate() {
1644 use super::super::select_optimizer::{SSTableFilterOp, SSTablePredicate};
1645
1646 let key = RowKey::new(b"k0000000000000000".to_vec());
1647 let value = Value::Map(vec![(Value::Text("age".to_string()), Value::Integer(0))]);
1648 let schema = single_pk_schema("id", "text");
1649 let row = build_row_from_scan(key, value, &[], Some(&schema)).unwrap();
1650
1651 let predicate = SSTablePredicate {
1652 column: "id".to_string(),
1653 operation: SSTableFilterOp::Equal,
1654 values: vec![Value::Text("k0000000000000000".to_string())],
1655 };
1656
1657 assert!(
1658 evaluate_predicates(&row, std::slice::from_ref(&predicate)).unwrap(),
1659 "Issue #586: WHERE id = '<literal>' must match the reconstructed PK column"
1660 );
1661 }
1662}