Skip to main content

cqlite_core/query/
select_executor.rs

1//! CQL SELECT Query Executor for Direct SSTable Access
2//!
3//! This module implements the REVOLUTIONARY query executor that can run
4//! CQL SELECT statements directly on SSTable files without Cassandra.
5//!
6//! Features:
7//! - Direct SSTable file scanning with predicate pushdown
8//! - Streaming results for memory efficiency
9//! - Parallel execution across multiple SSTable files
10//! - Advanced aggregation with hash-based grouping
11//! - Collection operations (list[index], map['key'])
12
13use super::{
14    result::{
15        cql_type_to_data_type, ColumnInfo, QueryMetadata, QueryResult, QueryResultIterator,
16        QueryRow, StreamingConfig,
17    },
18    select_ast::*,
19    select_optimizer::{AggregationPlan, ExecutionStep, OptimizedQueryPlan, SSTablePredicate},
20};
21use crate::{
22    parser::complex_types::ComplexTypeParser,
23    schema::{CqlType, SchemaManager},
24    storage::StorageEngine,
25    types::{RowKey, Value},
26    Error, Result, TableId,
27};
28use std::collections::HashMap;
29use std::sync::Arc;
30use tokio::sync::mpsc;
31
32/// SELECT query executor for SSTable-based storage
33#[derive(Debug)]
34pub struct SelectExecutor {
35    /// Schema manager for metadata
36    _schema: Arc<SchemaManager>,
37    /// Storage engine for SSTable access
38    storage: Arc<StorageEngine>,
39}
40
41/// Query execution context
42///
43/// Pure bookkeeping for an in-flight query. Only used internally; the public
44/// API surface is `SelectExecutor` itself.
45#[derive(Debug)]
46struct ExecutionContext {
47    /// Current table being queried
48    pub table_id: TableId,
49    /// Column metadata
50    pub columns: Vec<ColumnInfo>,
51    /// Row count processed so far
52    pub rows_processed: u64,
53}
54
55/// Aggregation state for GROUP BY operations
56#[derive(Debug)]
57struct AggregationState {
58    /// Vector for grouping since Value doesn't implement Hash
59    groups: Vec<(Vec<Value>, Vec<AggregateValue>)>,
60    /// Memory usage tracking
61    memory_usage_bytes: usize,
62    /// Maximum memory limit
63    memory_limit_bytes: usize,
64}
65
66/// Aggregate value accumulator
67#[derive(Debug, Clone)]
68enum AggregateValue {
69    Count(u64),
70    Sum(f64),
71    Avg { sum: f64, count: u64 },
72    Min(Value),
73    Max(Value),
74}
75
76// ---------------------------------------------------------------------------
77// Free helpers: pure functions that don't depend on `&self`. These were
78// previously duplicated as `_static` methods on `SelectExecutor`; centralising
79// them lets both the streaming background task and the synchronous executor
80// share one implementation.
81// ---------------------------------------------------------------------------
82
83/// Split a `TableId` of the form `"keyspace.table"` into its parts.
84///
85/// If no dot is present, the whole name becomes the table component and the
86/// keyspace is `None`.
87fn parse_table_id(table_id: &TableId) -> (Option<String>, String) {
88    let table_str = table_id.name();
89    match table_str.rfind('.') {
90        Some(dot) => (
91            Some(table_str[..dot].to_string()),
92            table_str[dot + 1..].to_string(),
93        ),
94        None => (None, table_str.to_string()),
95    }
96}
97
98/// Compare two `Value`s for equality, including limited cross-type numeric
99/// coercion (int↔bigint, int↔float, bigint↔float).
100///
101/// `Value` implements `PartialEq` natively but only matches identical variants;
102/// we additionally treat the small set of cross-numeric cases that show up in
103/// CQL predicates.
104fn values_equal(a: &Value, b: &Value) -> bool {
105    if a == b {
106        return true;
107    }
108    // Only coerce when both operands are numeric — otherwise non-numeric
109    // pairs (e.g. Text vs Integer) would spuriously compare equal via `as_f64`.
110    if same_numeric_family(a, b) {
111        if let (Some(x), Some(y)) = (a.as_f64(), b.as_f64()) {
112            return x == y;
113        }
114    }
115    false
116}
117
118/// True when both `Value`s are numeric variants eligible for cross-type coercion.
119fn same_numeric_family(a: &Value, b: &Value) -> bool {
120    a.as_f64().is_some() && b.as_f64().is_some()
121}
122
123/// Compare two `Value`s for ordering, returning `Ordering::Equal` for
124/// incomparable variants. Used by sorting/aggregation paths that historically
125/// swallowed comparison errors via `unwrap_or(0)`.
126fn compare_values_ordering(a: &Value, b: &Value) -> std::cmp::Ordering {
127    try_compare_values(a, b).unwrap_or(std::cmp::Ordering::Equal)
128}
129
130/// Compare two `Value`s for ordering, returning an error when the operand
131/// types are not comparable. Preferred in WHERE-clause evaluation so users see
132/// a real diagnostic rather than a silent equality.
133///
134/// Cross-type numerics are coerced via `f64` first; same-variant comparisons
135/// fall back to `Value::partial_cmp`. We deliberately avoid `partial_cmp` for
136/// non-matching variants because it stringifies and would produce surprising
137/// orderings (e.g. `Text("9")` < `Text("10")` lexicographically).
138fn try_compare_values(a: &Value, b: &Value) -> Result<std::cmp::Ordering> {
139    use std::cmp::Ordering;
140    if same_numeric_family(a, b) {
141        if let (Some(x), Some(y)) = (a.as_f64(), b.as_f64()) {
142            return Ok(x.partial_cmp(&y).unwrap_or(Ordering::Equal));
143        }
144    }
145    if std::mem::discriminant(a) == std::mem::discriminant(b) {
146        return a.partial_cmp(b).ok_or_else(|| {
147            Error::query_execution("Cannot compare incompatible types".to_string())
148        });
149    }
150    log::debug!("Cannot compare {:?} with {:?}", a, b);
151    Err(Error::query_execution(
152        "Cannot compare incompatible types".to_string(),
153    ))
154}
155
156/// Evaluate the SSTable predicate set against a single `QueryRow`.
157///
158/// Returns `Ok(true)` only if every predicate is satisfied. A missing column
159/// causes the row to be rejected.
160fn evaluate_predicates(row: &QueryRow, predicates: &[SSTablePredicate]) -> Result<bool> {
161    use super::select_optimizer::SSTableFilterOp;
162    for predicate in predicates {
163        let Some(column_value) = row.values.get(&predicate.column) else {
164            return Ok(false);
165        };
166        let matches = match &predicate.operation {
167            SSTableFilterOp::Equal => predicate
168                .values
169                .first()
170                .is_some_and(|v| values_equal(column_value, v)),
171            SSTableFilterOp::In => predicate.values.contains(column_value),
172            SSTableFilterOp::Range => {
173                if predicate.values.len() < 2 {
174                    false
175                } else {
176                    let lo = &predicate.values[0];
177                    let hi = &predicate.values[1];
178                    compare_values_ordering(column_value, lo).is_ge()
179                        && compare_values_ordering(column_value, hi).is_le()
180                }
181            }
182            SSTableFilterOp::Prefix => matches!(
183                (column_value, predicate.values.first()),
184                (Value::Text(s), Some(Value::Text(p))) if s.starts_with(p)
185            ),
186            SSTableFilterOp::BloomFilter => true, // already checked upstream
187        };
188        if !matches {
189            return Ok(false);
190        }
191    }
192    Ok(true)
193}
194
195/// Build a `QueryRow` from a single `(RowKey, Value)` produced by storage scan,
196/// applying optional projection and synthesising partition-key columns from the
197/// raw key bytes when a schema is available.
198///
199/// Partition-key columns are never stored in the cell payload, so they are
200/// reconstructed from the raw row key via the canonical
201/// [`crate::storage::partition_key_codec::decode_partition_key_columns`] (the
202/// same codec the write engine uses). This is the fix for Issue #586: the
203/// previous decoder assumed a `u16` length prefix for every TEXT key, which is
204/// only correct for composite components — a single-component TEXT partition key
205/// is raw bytes, so its column was silently dropped from scan-built rows.
206///
207/// Returns `None` for tombstoned rows (so the caller can `continue`).
208fn build_row_from_scan(
209    key: RowKey,
210    value: Value,
211    projection: &[String],
212    schema: Option<&crate::schema::TableSchema>,
213) -> Option<QueryRow> {
214    // Suppress tombstoned rows from user-visible output. A row tombstone reaches
215    // here as `Value::Tombstone` (Issue #505); before that change it was `Value::Null`.
216    // Both must be suppressed identically so deleted rows never appear in query results.
217    if matches!(value, Value::Null | Value::Tombstone(_)) {
218        return None;
219    }
220
221    let mut row_values = HashMap::new();
222    let project = |name: &str| projection.is_empty() || projection.iter().any(|p| p == name);
223
224    if let Value::Map(map) = value {
225        for (col_name, col_value) in map {
226            if let Value::Text(name) = col_name {
227                if project(&name) {
228                    row_values.insert(name, col_value);
229                }
230            }
231        }
232        // Cassandra never serialises partition-key columns in the cell payload;
233        // reconstruct them from the raw row key when the schema is known. We
234        // decode through the canonical codec shared with the write engine so
235        // single-component (raw bytes) and composite (`[u16 len][bytes][0x00]`)
236        // keys are handled identically on both paths (Issue #586).
237        if let Some(schema) = schema {
238            match crate::storage::partition_key_codec::decode_partition_key_columns(&key.0, schema)
239            {
240                Ok(pk_columns) => {
241                    for (name, value) in pk_columns {
242                        if project(&name) {
243                            row_values.insert(name, value);
244                        }
245                    }
246                }
247                // Surface — never silently swallow — a decode failure, so a
248                // missing partition-key column can't ship invisibly (Issue #586).
249                Err(e) => {
250                    log::warn!(
251                        "Failed to reconstruct partition-key columns from row key \
252                         (len={} bytes) for {}.{}: {}",
253                        key.0.len(),
254                        schema.keyspace,
255                        schema.table,
256                        e
257                    );
258                }
259            }
260        }
261    } else {
262        // Non-map fallback: expose the raw value plus a debug-formatted id.
263        row_values.insert("data".to_string(), value);
264        if project("id") {
265            row_values.insert("id".to_string(), Value::Text(format!("{:?}", key)));
266        }
267    }
268
269    Some(QueryRow {
270        values: row_values,
271        key,
272        metadata: Default::default(),
273    })
274}
275
276/// Apply an `ArithmeticOperator` to two same-typed numeric `Value`s.
277///
278/// Behaviour matches the previous inline implementations: same-type only
279/// (no implicit coercion), and division/modulo by zero are reported as
280/// query-execution errors. Float division-by-zero (matching the original
281/// runtime path) yields IEEE inf/NaN rather than an error.
282fn eval_arithmetic(op: &ArithmeticOperator, left: Value, right: Value) -> Result<Value> {
283    use ArithmeticOperator::*;
284    macro_rules! int_op {
285        ($a:expr, $b:expr, $ctor:expr) => {
286            match op {
287                Add => Ok($ctor($a + $b)),
288                Subtract => Ok($ctor($a - $b)),
289                Multiply => Ok($ctor($a * $b)),
290                Divide => {
291                    if $b == 0 {
292                        Err(Error::query_execution("Division by zero".to_string()))
293                    } else {
294                        Ok($ctor($a / $b))
295                    }
296                }
297                Modulo => {
298                    if $b == 0 {
299                        Err(Error::query_execution("Modulo by zero".to_string()))
300                    } else {
301                        Ok($ctor($a % $b))
302                    }
303                }
304            }
305        };
306    }
307    match (left, right) {
308        (Value::Integer(a), Value::Integer(b)) => int_op!(a, b, Value::Integer),
309        (Value::BigInt(a), Value::BigInt(b)) => int_op!(a, b, Value::BigInt),
310        (Value::Float(a), Value::Float(b)) => match op {
311            Add => Ok(Value::Float(a + b)),
312            Subtract => Ok(Value::Float(a - b)),
313            Multiply => Ok(Value::Float(a * b)),
314            Divide => Ok(Value::Float(a / b)),
315            Modulo => Ok(Value::Float(a % b)),
316        },
317        _ => Err(Error::query_execution(
318            "Incompatible types for arithmetic".to_string(),
319        )),
320    }
321}
322
323/// Build the GROUP BY key for `row`. With no GROUP BY, all rows hash into a
324/// single `[Null]` bucket (global aggregation).
325fn build_group_key(row: &QueryRow, group_by_columns: &[String]) -> Vec<Value> {
326    if group_by_columns.is_empty() {
327        return vec![Value::Null];
328    }
329    group_by_columns
330        .iter()
331        .map(|col| row.values.get(col).cloned().unwrap_or(Value::Null))
332        .collect()
333}
334
335/// Locate the group matching `key` in `groups`, or push a fresh entry with
336/// initial aggregator state. Returns the index into `groups`.
337///
338/// `Value` doesn't implement `Hash`, so groups live in a `Vec` and lookup is
339/// linear. This is unchanged from the legacy implementation; switching to a
340/// hash map would change result-row ordering for callers that rely on
341/// insertion order.
342fn find_or_init_group(
343    groups: &mut Vec<(Vec<Value>, Vec<AggregateValue>)>,
344    key: Vec<Value>,
345    aggregates: &[super::select_optimizer::AggregateComputation],
346) -> usize {
347    if let Some(idx) = groups.iter().position(|(k, _)| k == &key) {
348        return idx;
349    }
350    let initial: Vec<_> = aggregates
351        .iter()
352        .map(|c| match c.function {
353            AggregateType::Count => AggregateValue::Count(0),
354            AggregateType::Sum => AggregateValue::Sum(0.0),
355            AggregateType::Avg => AggregateValue::Avg { sum: 0.0, count: 0 },
356            AggregateType::Min => AggregateValue::Min(Value::Null),
357            AggregateType::Max => AggregateValue::Max(Value::Null),
358        })
359        .collect();
360    groups.push((key, initial));
361    groups.len() - 1
362}
363
364/// Apply one row's contribution to a single aggregate accumulator.
365///
366/// COUNT(*) always increments; COUNT(col) only increments on non-null. SUM and
367/// AVG ignore non-numeric values. MIN/MAX clone the value only when it
368/// becomes the new extremum, sparing per-row clones in the common case.
369fn update_aggregate(
370    state: &mut AggregateValue,
371    agg_comp: &super::select_optimizer::AggregateComputation,
372    row: &QueryRow,
373) {
374    let is_star = agg_comp.column == "*";
375    // Look up the column once; for COUNT(*) we don't need it.
376    let value: Option<&Value> = if is_star {
377        None
378    } else {
379        row.values.get(&agg_comp.column)
380    };
381    let is_null = !is_star && value.is_none_or(Value::is_null);
382
383    match state {
384        AggregateValue::Count(count) => {
385            if is_star || !is_null {
386                *count += 1;
387            }
388        }
389        AggregateValue::Sum(sum) => {
390            if let Some(v) = value.and_then(Value::as_f64) {
391                *sum += v;
392            }
393        }
394        AggregateValue::Avg { sum, count } => {
395            if let Some(v) = value.and_then(Value::as_f64) {
396                *sum += v;
397                *count += 1;
398            }
399        }
400        AggregateValue::Min(min_val) => {
401            if let Some(v) = value {
402                if !v.is_null()
403                    && (min_val.is_null() || compare_values_ordering(v, min_val).is_lt())
404                {
405                    *min_val = v.clone();
406                }
407            }
408        }
409        AggregateValue::Max(max_val) => {
410            if let Some(v) = value {
411                if !v.is_null()
412                    && (max_val.is_null() || compare_values_ordering(v, max_val).is_gt())
413                {
414                    *max_val = v.clone();
415                }
416            }
417        }
418    }
419}
420
421/// Materialize a single aggregation group into a `QueryRow`.
422fn finalize_group(
423    group_key: Vec<Value>,
424    group_aggregates: Vec<AggregateValue>,
425    agg_plan: &AggregationPlan,
426) -> QueryRow {
427    let mut row_values = HashMap::new();
428
429    for (i, col) in agg_plan.group_by_columns.iter().enumerate() {
430        if let Some(v) = group_key.get(i) {
431            row_values.insert(col.clone(), v.clone());
432        }
433    }
434
435    for (i, agg_comp) in agg_plan.aggregates.iter().enumerate() {
436        let result_value = match &group_aggregates[i] {
437            AggregateValue::Count(count) => Value::BigInt(*count as i64),
438            AggregateValue::Sum(sum) => Value::Float(*sum),
439            AggregateValue::Avg { sum, count } => {
440                if *count > 0 {
441                    Value::Float(sum / (*count as f64))
442                } else {
443                    Value::Null
444                }
445            }
446            AggregateValue::Min(val) | AggregateValue::Max(val) => val.clone(),
447        };
448        row_values.insert(agg_comp.alias.clone(), result_value);
449    }
450
451    QueryRow {
452        values: row_values,
453        key: RowKey::new(vec![]),
454        metadata: Default::default(),
455    }
456}
457
458/// Constant-folding arithmetic. Same operand-type rules as `eval_arithmetic`,
459/// plus BigInt support and per-operator error wording matching the legacy
460/// implementation (e.g. `"Cannot add incompatible types"` and
461/// `"Modulo only supported for integers"`).
462fn const_arithmetic(op: &ArithmeticOperator, left: Value, right: Value) -> Result<Value> {
463    use ArithmeticOperator::*;
464
465    // Modulo's error wording is special: any non-integer combination must
466    // report `"Modulo only supported for integers"` regardless of which side
467    // is offending.
468    if matches!(op, Modulo) {
469        return match (left, right) {
470            (Value::Integer(a), Value::Integer(b)) => {
471                eval_arithmetic(op, Value::Integer(a), Value::Integer(b))
472            }
473            (Value::BigInt(a), Value::BigInt(b)) => {
474                eval_arithmetic(op, Value::BigInt(a), Value::BigInt(b))
475            }
476            _ => Err(Error::query_execution(
477                "Modulo only supported for integers".to_string(),
478            )),
479        };
480    }
481
482    let verb = match op {
483        Add => "add",
484        Subtract => "subtract",
485        Multiply => "multiply",
486        Divide => "divide",
487        Modulo => unreachable!("handled above"),
488    };
489
490    match (left, right) {
491        (Value::Integer(a), Value::Integer(b)) => {
492            eval_arithmetic(op, Value::Integer(a), Value::Integer(b))
493        }
494        (Value::BigInt(a), Value::BigInt(b)) => {
495            eval_arithmetic(op, Value::BigInt(a), Value::BigInt(b))
496        }
497        (Value::Float(a), Value::Float(b)) => {
498            // Constant Float Divide rejects 0.0 (legacy behaviour); runtime
499            // Float divide does not. Modulo on Float is rejected above.
500            if matches!(op, Divide) && b == 0.0 {
501                return Err(Error::query_execution("Division by zero".to_string()));
502            }
503            eval_arithmetic(op, Value::Float(a), Value::Float(b))
504        }
505        _ => Err(Error::query_execution(format!(
506            "Cannot {} incompatible types",
507            verb
508        ))),
509    }
510}
511
512/// Translate a CQL LIKE pattern (`%`, `_`) into an anchored regex.
513fn like_pattern_to_regex(pattern: &str) -> String {
514    let mut out = String::with_capacity(pattern.len() + 4);
515    out.push('^');
516    for ch in pattern.chars() {
517        match ch {
518            '%' => out.push_str(".*"),
519            '_' => out.push('.'),
520            _ => out.push(ch),
521        }
522    }
523    out.push('$');
524    out
525}
526
527/// Parse a CQL type string (e.g. `"list<int>"`, `"text"`) into a [`CqlType`].
528///
529/// Returns `None` when the type string cannot be parsed (unknown or malformed
530/// types). Used to populate `ColumnInfo::cql_type` from the schema's string
531/// representation, satisfying the no-heuristics mandate (Issue #28).
532fn parse_cql_type_str(type_str: &str) -> Option<CqlType> {
533    let parser = ComplexTypeParser::new();
534    parser
535        .parse_type(type_str)
536        .ok()
537        .map(|parsed| parsed.cql_type)
538}
539
540impl SelectExecutor {
541    /// Create a new SELECT executor
542    pub fn new(schema: Arc<SchemaManager>, storage: Arc<StorageEngine>) -> Self {
543        Self {
544            _schema: schema,
545            storage,
546        }
547    }
548
549    /// Execute an optimized query plan
550    pub async fn execute(&self, plan: OptimizedQueryPlan) -> Result<QueryResult> {
551        let table_id = if let Some(ref from_clause) = plan.statement.from_clause {
552            self.extract_table_id(from_clause)?
553        } else {
554            // For queries without FROM clause (like SELECT 1), use a dummy table ID
555            TableId::new("_dummy_")
556        };
557
558        let mut context = ExecutionContext {
559            table_id,
560            columns: self.get_result_columns(&plan.statement).await?,
561            rows_processed: 0,
562        };
563
564        // Handle queries without FROM clause (like SELECT 1)
565        if plan.statement.from_clause.is_none() {
566            return self.execute_constant_query(&plan.statement, &context).await;
567        }
568
569        // Execute the plan step by step
570        let mut intermediate_results = Vec::new();
571
572        // If no execution steps are provided, add a default table scan
573        let execution_steps = if plan.execution_steps.is_empty() {
574            vec![ExecutionStep::SSTableScan {
575                table: context.table_id.clone(),
576                predicates: vec![],
577                projection: context.columns.iter().map(|c| c.name.clone()).collect(),
578            }]
579        } else {
580            plan.execution_steps.clone()
581        };
582
583        for step in &execution_steps {
584            match step {
585                ExecutionStep::SSTableScan {
586                    table,
587                    predicates,
588                    projection,
589                    ..
590                } => {
591                    let rows = self
592                        .execute_sstable_scan(table, predicates, projection, &mut context)
593                        .await?;
594                    intermediate_results = rows;
595                }
596                ExecutionStep::Filter { expression, .. } => {
597                    intermediate_results = self
598                        .execute_filter(intermediate_results, expression, &mut context)
599                        .await?;
600                }
601                ExecutionStep::Sort { order_by, .. } => {
602                    intermediate_results = self
603                        .execute_sort(intermediate_results, order_by, &mut context)
604                        .await?;
605                }
606                ExecutionStep::Aggregate { plan: agg_plan, .. } => {
607                    intermediate_results = self
608                        .execute_aggregation(intermediate_results, agg_plan, &mut context)
609                        .await?;
610                }
611                ExecutionStep::Limit { count, offset } => {
612                    intermediate_results = self
613                        .execute_limit(intermediate_results, *count, *offset, &mut context)
614                        .await?;
615                }
616                ExecutionStep::Project { columns } => {
617                    intermediate_results = self
618                        .execute_projection(intermediate_results, columns, &mut context)
619                        .await?;
620                }
621            }
622        }
623
624        let total_rows = intermediate_results.len() as u64;
625
626        // CRITICAL FIX (Issue #129/#140): Populate metadata.columns for SELECT *
627        // When SELECT * is used and no schema was found, context.columns is empty.
628        // Fall back to inferring column names from the first row's HashMap keys.
629        // IMPORTANT: Must be sorted alphabetically for deterministic JSON output (Issue #129)!
630        let mut columns = context.columns;
631        if columns.is_empty() && !intermediate_results.is_empty() {
632            // Try to resolve schema to get proper CQL types (Issue #674).
633            let schema_opt = if let Some(ref from_clause) = plan.statement.from_clause {
634                if let Ok(table_id) = self.extract_table_id(from_clause) {
635                    let (keyspace, table_name) = parse_table_id(&table_id);
636                    self._schema
637                        .find_schema_by_table(&keyspace, &table_name)
638                        .await
639                } else {
640                    None
641                }
642            } else {
643                None
644            };
645
646            let first_row = &intermediate_results[0];
647            let mut col_names: Vec<_> = first_row.values.keys().collect();
648            col_names.sort(); // Sort alphabetically for deterministic ordering (Issue #129)
649
650            let table_name_for_meta = schema_opt
651                .as_ref()
652                .map(|s| format!("{}.{}", s.keyspace, s.table));
653
654            for (idx, col_name) in col_names.iter().enumerate() {
655                // Look up CQL type from schema; derive flat DataType from it (Issue #674).
656                let cql_type_opt = schema_opt.as_ref().and_then(|schema| {
657                    schema
658                        .columns
659                        .iter()
660                        .find(|c| c.name.as_str() == col_name.as_str())
661                        .and_then(|c| parse_cql_type_str(&c.data_type))
662                });
663
664                let data_type = cql_type_opt
665                    .as_ref()
666                    .map(cql_type_to_data_type)
667                    .unwrap_or(crate::types::DataType::Text);
668
669                let mut col_info = ColumnInfo {
670                    name: (*col_name).clone(),
671                    data_type,
672                    nullable: true,
673                    position: idx,
674                    table_name: table_name_for_meta.clone(),
675                    cql_type: None,
676                };
677                if let Some(cql_type) = cql_type_opt {
678                    col_info = col_info.with_cql_type(cql_type);
679                }
680                columns.push(col_info);
681            }
682        }
683
684        Ok(QueryResult {
685            rows: intermediate_results,
686            rows_affected: total_rows, // Use actual number of rows returned
687            execution_time_ms: 0,      // Will be set by the engine
688            metadata: crate::query::result::QueryMetadata {
689                columns,
690                total_rows: Some(total_rows),
691                plan_info: None,
692                performance: Default::default(),
693                warnings: vec![],
694            },
695        })
696    }
697
698    /// Execute an optimized query plan with streaming results (Issue #280)
699    ///
700    /// Instead of materializing all rows in memory, this method returns a
701    /// `QueryResultIterator` that yields rows incrementally via a bounded channel.
702    /// This enables memory-efficient processing of large result sets.
703    ///
704    /// # Memory Budget
705    ///
706    /// With default `StreamingConfig::buffer_size` of 1024 rows and ~1KB avg row size:
707    /// - Channel buffer: ~1MB in flight
708    /// - Background task: minimal overhead
709    /// - Total streaming overhead: ~1-2MB (well within 128MB target)
710    ///
711    /// # Limitations
712    ///
713    /// Currently supports:
714    /// - SSTableScan with predicates (streaming)
715    /// - Filter/Limit/Project (applied during scan)
716    ///
717    /// `LIMIT` (and `OFFSET`, when present in the plan) is enforced by the
718    /// streaming producer (`execute_streaming_background`): it skips `OFFSET`
719    /// matches and stops scanning once `count` rows have been sent, so a
720    /// `LIMIT N` query yields exactly `N` rows without materializing the rest
721    /// (Issue #581).
722    ///
723    /// For ORDER BY/GROUP BY/DISTINCT, falls back to full execution then streams results.
724    pub async fn execute_streaming(
725        &self,
726        plan: OptimizedQueryPlan,
727        config: StreamingConfig,
728    ) -> Result<QueryResultIterator> {
729        // Check if query requires full materialization (ORDER BY, GROUP BY, aggregates)
730        if self.requires_materialization(&plan) {
731            log::info!("Query requires materialization (ORDER BY/GROUP BY/aggregates), using execute-then-stream");
732            return self.execute_and_stream(plan, config).await;
733        }
734
735        let table_id = if let Some(ref from_clause) = plan.statement.from_clause {
736            self.extract_table_id(from_clause)?
737        } else {
738            // For queries without FROM clause (like SELECT 1), fall back to execute
739            return self.execute_and_stream(plan, config).await;
740        };
741
742        let columns = self.get_result_columns(&plan.statement).await?;
743
744        // Create bounded channel for backpressure
745        let (tx, rx) = mpsc::channel(config.buffer_size);
746
747        // Determine execution steps
748        let execution_steps = if plan.execution_steps.is_empty() {
749            vec![ExecutionStep::SSTableScan {
750                table: table_id.clone(),
751                predicates: vec![],
752                projection: columns.iter().map(|c| c.name.clone()).collect(),
753            }]
754        } else {
755            plan.execution_steps.clone()
756        };
757
758        // Clone what we need for the background task
759        let storage = Arc::clone(&self.storage);
760        let schema_manager = Arc::clone(&self._schema);
761
762        // Spawn background task to stream rows
763        tokio::spawn(async move {
764            if let Err(e) = Self::execute_streaming_background(
765                storage,
766                schema_manager,
767                table_id,
768                execution_steps,
769                tx,
770            )
771            .await
772            {
773                log::error!("Streaming execution error: {}", e);
774                // Error is logged; channel will close and consumer will see None
775            }
776        });
777
778        // Create metadata for the iterator
779        let metadata = QueryMetadata {
780            columns,
781            total_rows: None, // Unknown for streaming
782            plan_info: None,
783            performance: Default::default(),
784            warnings: vec![],
785        };
786
787        Ok(QueryResultIterator::new(rx, metadata))
788    }
789
790    /// Check if query plan requires full materialization before streaming
791    fn requires_materialization(&self, plan: &OptimizedQueryPlan) -> bool {
792        for step in &plan.execution_steps {
793            match step {
794                ExecutionStep::Sort { .. } => return true,
795                ExecutionStep::Aggregate { .. } => return true,
796                _ => {}
797            }
798        }
799
800        // Check for DISTINCT
801        matches!(plan.statement.select_clause, SelectClause::Distinct(_))
802    }
803
804    /// Fallback: Execute query fully, then stream the results
805    async fn execute_and_stream(
806        &self,
807        plan: OptimizedQueryPlan,
808        config: StreamingConfig,
809    ) -> Result<QueryResultIterator> {
810        // Execute full query
811        let result = self.execute(plan).await?;
812
813        // Create channel to stream results
814        let (tx, rx) = mpsc::channel(config.buffer_size);
815
816        // Spawn task to send rows through channel
817        tokio::spawn(async move {
818            for row in result.rows {
819                if tx.send(Ok(row)).await.is_err() {
820                    break; // Consumer dropped
821                }
822            }
823            // Channel closes automatically when tx drops
824        });
825
826        Ok(QueryResultIterator::new(rx, result.metadata))
827    }
828
829    /// Background task: Execute streaming scan and send rows through channel
830    async fn execute_streaming_background(
831        storage: Arc<StorageEngine>,
832        schema_manager: Arc<SchemaManager>,
833        _table_id: TableId,
834        execution_steps: Vec<ExecutionStep>,
835        tx: mpsc::Sender<Result<QueryRow>>,
836    ) -> Result<()> {
837        // Issue #581: LIMIT/OFFSET must be enforced by the producer in the
838        // streaming path. The `ExecutionStep::Limit` arm previously only logged a
839        // message and relied on a consumer that never applied it, so
840        // `execute_streaming` yielded the full result set regardless of LIMIT.
841        // Extract the bound up front (steps are ordered with Limit after the scan)
842        // and stop sending once it is satisfied — mirroring `execute_limit`
843        // (drain OFFSET, then truncate to `count`) row-by-row so the producer
844        // stops scanning early.
845        let limit = execution_steps.iter().find_map(|step| match step {
846            ExecutionStep::Limit { count, offset } => Some((*count, offset.unwrap_or(0))),
847            _ => None,
848        });
849        let (limit_count, mut offset_remaining) = match limit {
850            Some((count, offset)) => (Some(count), offset),
851            None => (None, 0),
852        };
853
854        // A `LIMIT 0` means no rows can ever be sent; return before scanning.
855        if limit_count == Some(0) {
856            return Ok(());
857        }
858
859        let mut sent: u64 = 0;
860
861        for step in &execution_steps {
862            match step {
863                ExecutionStep::SSTableScan {
864                    table,
865                    predicates,
866                    projection,
867                    ..
868                } => {
869                    let (keyspace, table_name) = parse_table_id(table);
870                    let schema_opt = schema_manager
871                        .find_schema_by_table(&keyspace, &table_name)
872                        .await;
873
874                    let scan_results = storage
875                        .scan(table, None, None, None, schema_opt.as_ref())
876                        .await?;
877
878                    for (key, value) in scan_results {
879                        let Some(row) =
880                            build_row_from_scan(key, value, projection, schema_opt.as_ref())
881                        else {
882                            continue;
883                        };
884
885                        if !evaluate_predicates(&row, predicates)? {
886                            continue;
887                        }
888
889                        // Apply OFFSET: skip the first `offset_remaining` matches.
890                        if offset_remaining > 0 {
891                            offset_remaining -= 1;
892                            continue;
893                        }
894
895                        // Send row through channel (with backpressure). Consumer drop ends the scan.
896                        if tx.send(Ok(row)).await.is_err() {
897                            return Ok(());
898                        }
899                        sent += 1;
900
901                        // Apply LIMIT: stop scanning once `count` rows have been sent.
902                        if let Some(count) = limit_count {
903                            if sent >= count {
904                                return Ok(());
905                            }
906                        }
907                    }
908                }
909                ExecutionStep::Limit { .. } => {
910                    // Enforced inline during the scan above (see the limit bound
911                    // extracted before the loop).
912                }
913                // Projection and predicate filtering are pushed into SSTableScan above.
914                ExecutionStep::Project { .. } | ExecutionStep::Filter { .. } => {}
915                _ => {
916                    log::warn!("Streaming execution: skipping unsupported step {:?}", step);
917                }
918            }
919        }
920
921        Ok(())
922    }
923
924    /// Execute SSTable scan with predicate pushdown.
925    ///
926    /// Per-row work (build row, decode partition key, evaluate predicates) is
927    /// handled by the free helpers `build_row_from_scan` and
928    /// `evaluate_predicates`, which are shared with the streaming background
929    /// task to keep the two execution paths in lockstep.
930    async fn execute_sstable_scan(
931        &self,
932        table: &TableId,
933        predicates: &[SSTablePredicate],
934        projection: &[String],
935        context: &mut ExecutionContext,
936    ) -> Result<Vec<QueryRow>> {
937        const MAX_RESULTS: usize = 1_000_000;
938
939        log::info!(
940            "Executing SSTableScan: table=\"{}\", predicates={:?}",
941            table,
942            predicates
943        );
944
945        let (keyspace, table_name) = parse_table_id(table);
946        let schema_opt = self
947            ._schema
948            .find_schema_by_table(&keyspace, &table_name)
949            .await;
950
951        match schema_opt.as_ref() {
952            Some(schema) => log::info!(
953                "Found schema for {}.{} with {} columns",
954                schema.keyspace,
955                schema.table,
956                schema.columns.len()
957            ),
958            None => log::info!(
959                "No schema found for {}.{}, proceeding without schema-aware parsing",
960                keyspace.as_deref().unwrap_or("unknown"),
961                table_name
962            ),
963        }
964
965        let scan_results = self
966            .storage
967            .scan(table, None, None, None, schema_opt.as_ref())
968            .await?;
969
970        log::info!("Scan returned {} rows", scan_results.len());
971
972        let mut results = Vec::new();
973        for (key, value) in scan_results {
974            context.rows_processed += 1;
975
976            // build_row_from_scan returns None for tombstoned/null rows (Issue #191).
977            let Some(row) = build_row_from_scan(key, value, projection, schema_opt.as_ref()) else {
978                continue;
979            };
980
981            if evaluate_predicates(&row, predicates)? {
982                results.push(row);
983            }
984
985            if results.len() > MAX_RESULTS {
986                return Err(Error::query_execution(
987                    "Result set too large, consider adding LIMIT".to_string(),
988                ));
989            }
990        }
991
992        Ok(results)
993    }
994
995    /// Execute filtering step
996    async fn execute_filter(
997        &self,
998        rows: Vec<QueryRow>,
999        filter_expr: &WhereExpression,
1000        context: &mut ExecutionContext,
1001    ) -> Result<Vec<QueryRow>> {
1002        let mut filtered_rows = Vec::new();
1003
1004        for row in rows {
1005            if self.evaluate_where_expression(filter_expr, &row)? {
1006                filtered_rows.push(row);
1007            }
1008            context.rows_processed += 1;
1009        }
1010
1011        Ok(filtered_rows)
1012    }
1013
1014    /// Evaluate WHERE expression against a row
1015    fn evaluate_where_expression(&self, expr: &WhereExpression, row: &QueryRow) -> Result<bool> {
1016        match expr {
1017            WhereExpression::Comparison(comp) => self.evaluate_comparison(comp, row),
1018            WhereExpression::And(exprs) => {
1019                for expr in exprs {
1020                    if !self.evaluate_where_expression(expr, row)? {
1021                        return Ok(false);
1022                    }
1023                }
1024                Ok(true)
1025            }
1026            WhereExpression::Or(exprs) => {
1027                for expr in exprs {
1028                    if self.evaluate_where_expression(expr, row)? {
1029                        return Ok(true);
1030                    }
1031                }
1032                Ok(false)
1033            }
1034            WhereExpression::Not(expr) => Ok(!self.evaluate_where_expression(expr, row)?),
1035            WhereExpression::Parentheses(expr) => self.evaluate_where_expression(expr, row),
1036        }
1037    }
1038
1039    /// Evaluate comparison expression. Operators that need a single right
1040    /// operand share one `evaluate` call; IN/LIKE/IS NULL fall through to
1041    /// their custom branches.
1042    fn evaluate_comparison(&self, comp: &ComparisonExpression, row: &QueryRow) -> Result<bool> {
1043        use ComparisonOperator::*;
1044
1045        let left_value = self.evaluate_select_expression(&comp.left, row)?;
1046
1047        // Fast path for null tests, which ignore the right side.
1048        match comp.operator {
1049            IsNull => return Ok(left_value.is_null()),
1050            IsNotNull => return Ok(!left_value.is_null()),
1051            _ => {}
1052        }
1053
1054        match (&comp.operator, &comp.right) {
1055            (
1056                op @ (Equal | NotEqual | LessThan | LessThanOrEqual | GreaterThan
1057                | GreaterThanOrEqual),
1058                ComparisonRightSide::Value(right_expr),
1059            ) => {
1060                let right_value = self.evaluate_select_expression(right_expr, row)?;
1061                let result = match op {
1062                    Equal => values_equal(&left_value, &right_value),
1063                    NotEqual => !values_equal(&left_value, &right_value),
1064                    LessThan => try_compare_values(&left_value, &right_value)?.is_lt(),
1065                    LessThanOrEqual => try_compare_values(&left_value, &right_value)?.is_le(),
1066                    GreaterThan => try_compare_values(&left_value, &right_value)?.is_gt(),
1067                    GreaterThanOrEqual => try_compare_values(&left_value, &right_value)?.is_ge(),
1068                    _ => unreachable!("guarded by outer match"),
1069                };
1070                Ok(result)
1071            }
1072            (In, ComparisonRightSide::ValueList(value_exprs)) => {
1073                for value_expr in value_exprs {
1074                    let value = self.evaluate_select_expression(value_expr, row)?;
1075                    if left_value == value {
1076                        return Ok(true);
1077                    }
1078                }
1079                Ok(false)
1080            }
1081            (Like, ComparisonRightSide::Value(pattern_expr)) => {
1082                let pattern = self.evaluate_select_expression(pattern_expr, row)?;
1083                if let (Value::Text(text), Value::Text(pattern_str)) = (&left_value, &pattern) {
1084                    Ok(self.match_like_pattern(text, pattern_str))
1085                } else {
1086                    Ok(false)
1087                }
1088            }
1089            _ => Err(Error::query_execution(
1090                "Unsupported comparison operator".to_string(),
1091            )),
1092        }
1093    }
1094
1095    /// Evaluate SELECT expression against a row
1096    fn evaluate_select_expression(&self, expr: &SelectExpression, row: &QueryRow) -> Result<Value> {
1097        match expr {
1098            SelectExpression::Column(col_ref) => {
1099                row.values.get(&col_ref.column).cloned().ok_or_else(|| {
1100                    Error::query_execution(format!("Column not found: {}", col_ref.column))
1101                })
1102            }
1103            SelectExpression::Literal(value) => Ok(value.clone()),
1104            SelectExpression::CollectionAccess(access) => {
1105                self.evaluate_collection_access(access, row)
1106            }
1107            SelectExpression::Arithmetic(arith) => {
1108                let left = self.evaluate_select_expression(&arith.left, row)?;
1109                let right = self.evaluate_select_expression(&arith.right, row)?;
1110                self.evaluate_arithmetic(&arith.operator, left, right)
1111            }
1112            SelectExpression::Aliased(expr, _) => self.evaluate_select_expression(expr, row),
1113            SelectExpression::Aggregate(_) => {
1114                // Aggregate expressions should not be evaluated at row level
1115                // They should only be processed during the aggregation step
1116                Err(Error::query_execution(
1117                    "Aggregate expressions should be processed during aggregation step, not row evaluation".to_string(),
1118                ))
1119            }
1120            SelectExpression::Function(_) => {
1121                // Function expressions not yet implemented
1122                Err(Error::query_execution(
1123                    "Function expressions not yet implemented".to_string(),
1124                ))
1125            }
1126        }
1127    }
1128
1129    /// Evaluate collection access operations (`list[idx]`, `map['key']`,
1130    /// `value IN set_column`).
1131    fn evaluate_collection_access(
1132        &self,
1133        access: &CollectionAccessExpression,
1134        row: &QueryRow,
1135    ) -> Result<Value> {
1136        let lookup_column = |col: &ColumnRef| -> Result<&Value> {
1137            row.values
1138                .get(&col.column)
1139                .ok_or_else(|| Error::query_execution(format!("Column not found: {}", col.column)))
1140        };
1141
1142        match access {
1143            CollectionAccessExpression::ListIndex(col_ref, index_expr) => {
1144                let list_value = lookup_column(col_ref)?;
1145                let index_value = self.evaluate_select_expression(index_expr, row)?;
1146
1147                let (Value::List(list), Value::Integer(index)) = (list_value, &index_value) else {
1148                    return Err(Error::query_execution("Invalid list access".to_string()));
1149                };
1150                if *index >= 0 && (*index as usize) < list.len() {
1151                    Ok(list[*index as usize].clone())
1152                } else {
1153                    Ok(Value::Null)
1154                }
1155            }
1156            CollectionAccessExpression::MapKey(col_ref, key_expr) => {
1157                let map_value = lookup_column(col_ref)?;
1158                let key_value = self.evaluate_select_expression(key_expr, row)?;
1159
1160                let Value::Map(map) = map_value else {
1161                    return Err(Error::query_execution("Invalid map access".to_string()));
1162                };
1163                Ok(map
1164                    .iter()
1165                    .find(|(k, _)| *k == key_value)
1166                    .map(|(_, v)| v.clone())
1167                    .unwrap_or(Value::Null))
1168            }
1169            CollectionAccessExpression::SetContains(col_ref, value_expr) => {
1170                let set_value = lookup_column(col_ref)?;
1171                let test_value = self.evaluate_select_expression(value_expr, row)?;
1172
1173                let Value::Set(set) = set_value else {
1174                    return Err(Error::query_execution(
1175                        "Invalid set contains operation".to_string(),
1176                    ));
1177                };
1178                Ok(Value::Boolean(set.contains(&test_value)))
1179            }
1180        }
1181    }
1182
1183    /// Evaluate arithmetic expressions on a (left, op, right) triple.
1184    ///
1185    /// Runtime arithmetic supports same-type Integer or Float operands. Mixed
1186    /// types or non-numeric operands return an error. (Constant-folding
1187    /// arithmetic additionally accepts BigInt — see
1188    /// `evaluate_constant_expression`.)
1189    fn evaluate_arithmetic(
1190        &self,
1191        op: &ArithmeticOperator,
1192        left: Value,
1193        right: Value,
1194    ) -> Result<Value> {
1195        match (&left, &right) {
1196            (Value::Integer(_), Value::Integer(_)) | (Value::Float(_), Value::Float(_)) => {
1197                eval_arithmetic(op, left, right)
1198            }
1199            _ => Err(Error::query_execution(
1200                "Incompatible types for arithmetic".to_string(),
1201            )),
1202        }
1203    }
1204
1205    /// Simple LIKE pattern matching. The CQL pattern syntax (`%`, `_`) is
1206    /// translated by `like_pattern_to_regex` before compilation.
1207    fn match_like_pattern(&self, text: &str, pattern: &str) -> bool {
1208        regex::Regex::new(&like_pattern_to_regex(pattern))
1209            .map(|re| re.is_match(text))
1210            .unwrap_or(false)
1211    }
1212
1213    /// Execute sorting step
1214    async fn execute_sort(
1215        &self,
1216        mut rows: Vec<QueryRow>,
1217        order_by: &OrderByClause,
1218        _context: &mut ExecutionContext,
1219    ) -> Result<Vec<QueryRow>> {
1220        rows.sort_by(|a, b| {
1221            for item in &order_by.items {
1222                let a_val = self
1223                    .evaluate_select_expression(&item.expression, a)
1224                    .unwrap_or(Value::Null);
1225                let b_val = self
1226                    .evaluate_select_expression(&item.expression, b)
1227                    .unwrap_or(Value::Null);
1228
1229                let ordering = match item.direction {
1230                    SortDirection::Ascending => compare_values_ordering(&a_val, &b_val),
1231                    SortDirection::Descending => compare_values_ordering(&b_val, &a_val),
1232                };
1233                if !ordering.is_eq() {
1234                    return ordering;
1235                }
1236            }
1237            std::cmp::Ordering::Equal
1238        });
1239
1240        Ok(rows)
1241    }
1242
1243    /// Execute the aggregation step. Splits naturally into three phases:
1244    /// build group key, accumulate per-aggregate state, then finalize each
1245    /// group into a result row.
1246    async fn execute_aggregation(
1247        &self,
1248        rows: Vec<QueryRow>,
1249        agg_plan: &AggregationPlan,
1250        _context: &mut ExecutionContext,
1251    ) -> Result<Vec<QueryRow>> {
1252        const PER_ROW_MEMORY_ESTIMATE_BYTES: usize = 100;
1253        const DEFAULT_AGGREGATION_MEMORY_LIMIT: usize = 512 * 1024 * 1024;
1254
1255        let mut agg_state = AggregationState {
1256            groups: Vec::new(),
1257            memory_usage_bytes: 0,
1258            memory_limit_bytes: DEFAULT_AGGREGATION_MEMORY_LIMIT,
1259        };
1260
1261        for row in rows {
1262            let group_key = build_group_key(&row, &agg_plan.group_by_columns);
1263            let group_index =
1264                find_or_init_group(&mut agg_state.groups, group_key, &agg_plan.aggregates);
1265            let group_aggregates = &mut agg_state.groups[group_index].1;
1266
1267            for (i, agg_comp) in agg_plan.aggregates.iter().enumerate() {
1268                update_aggregate(&mut group_aggregates[i], agg_comp, &row);
1269            }
1270
1271            agg_state.memory_usage_bytes += PER_ROW_MEMORY_ESTIMATE_BYTES;
1272            if agg_state.memory_usage_bytes > agg_state.memory_limit_bytes {
1273                return Err(Error::query_execution(
1274                    "Aggregation memory limit exceeded".to_string(),
1275                ));
1276            }
1277        }
1278
1279        let result_rows = agg_state
1280            .groups
1281            .into_iter()
1282            .map(|(group_key, group_aggregates)| {
1283                finalize_group(group_key, group_aggregates, agg_plan)
1284            })
1285            .collect();
1286
1287        Ok(result_rows)
1288    }
1289
1290    /// Execute limit step (apply OFFSET then truncate to LIMIT).
1291    async fn execute_limit(
1292        &self,
1293        mut rows: Vec<QueryRow>,
1294        count: u64,
1295        offset: Option<u64>,
1296        _context: &mut ExecutionContext,
1297    ) -> Result<Vec<QueryRow>> {
1298        let start_index = offset.unwrap_or(0) as usize;
1299        if start_index >= rows.len() {
1300            return Ok(Vec::new());
1301        }
1302        rows.drain(..start_index);
1303        rows.truncate(count as usize);
1304        Ok(rows)
1305    }
1306
1307    /// Execute projection step
1308    async fn execute_projection(
1309        &self,
1310        rows: Vec<QueryRow>,
1311        columns: &[SelectExpression],
1312        _context: &mut ExecutionContext,
1313    ) -> Result<Vec<QueryRow>> {
1314        let mut projected_rows = Vec::new();
1315
1316        for row in rows {
1317            let mut projected_values = HashMap::new();
1318
1319            for (i, expr) in columns.iter().enumerate() {
1320                let value = self.evaluate_select_expression(expr, &row)?;
1321                let column_name = match expr {
1322                    SelectExpression::Column(col_ref) => col_ref.column.clone(),
1323                    SelectExpression::Aliased(_, alias) => alias.clone(),
1324                    _ => format!("col_{i}"),
1325                };
1326                projected_values.insert(column_name, value);
1327            }
1328
1329            projected_rows.push(QueryRow {
1330                values: projected_values,
1331                key: RowKey::new(vec![]),
1332                metadata: Default::default(),
1333            });
1334        }
1335
1336        Ok(projected_rows)
1337    }
1338
1339    /// Execute a query without FROM clause (constant expressions like SELECT 1)
1340    async fn execute_constant_query(
1341        &self,
1342        statement: &SelectStatement,
1343        _context: &ExecutionContext,
1344    ) -> Result<QueryResult> {
1345        let mut values = HashMap::new();
1346        let mut columns = Vec::new();
1347
1348        match &statement.select_clause {
1349            SelectClause::All => {
1350                return Err(Error::query_execution(
1351                    "SELECT * requires a FROM clause".to_string(),
1352                ));
1353            }
1354            SelectClause::Columns(expressions) | SelectClause::Distinct(expressions) => {
1355                for (i, expr) in expressions.iter().enumerate() {
1356                    let (value, column_name) = self.evaluate_constant_expression(expr)?;
1357                    let key = column_name.unwrap_or_else(|| format!("column_{}", i));
1358                    values.insert(key.clone(), value);
1359                    columns.push(ColumnInfo {
1360                        name: key,
1361                        data_type: crate::types::DataType::Text, // Constant expressions have no schema type
1362                        nullable: true,
1363                        position: i,
1364                        table_name: None, // No table for constant expressions
1365                        cql_type: None,
1366                    });
1367                }
1368            }
1369        }
1370
1371        let row = QueryRow::with_values(RowKey::new(vec![1]), values);
1372
1373        Ok(QueryResult {
1374            rows: vec![row],
1375            rows_affected: 1, // Constant queries return 1 row
1376            execution_time_ms: 0,
1377            metadata: crate::query::result::QueryMetadata {
1378                columns,
1379                total_rows: Some(1),
1380                plan_info: None,
1381                performance: crate::query::result::PerformanceMetrics::default(),
1382                warnings: Vec::new(),
1383            },
1384        })
1385    }
1386
1387    /// Evaluate a constant expression (no table access needed).
1388    ///
1389    /// Accepts literals, aliases, and arithmetic over same-typed Integer,
1390    /// BigInt, or Float operands. Modulo is restricted to integers (matching
1391    /// the original behaviour). Error messages are kept verbatim from the
1392    /// legacy implementation so any callers asserting on them still pass.
1393    #[allow(clippy::only_used_in_recursion)]
1394    fn evaluate_constant_expression(
1395        &self,
1396        expr: &SelectExpression,
1397    ) -> Result<(Value, Option<String>)> {
1398        match expr {
1399            SelectExpression::Literal(value) => Ok((value.clone(), None)),
1400            SelectExpression::Aliased(inner_expr, alias) => {
1401                let (value, _) = self.evaluate_constant_expression(inner_expr)?;
1402                Ok((value, Some(alias.clone())))
1403            }
1404            SelectExpression::Arithmetic(arith) => {
1405                let (left_val, _) = self.evaluate_constant_expression(&arith.left)?;
1406                let (right_val, _) = self.evaluate_constant_expression(&arith.right)?;
1407                let result = const_arithmetic(&arith.operator, left_val, right_val)?;
1408                Ok((result, None))
1409            }
1410            _ => Err(Error::query_execution(
1411                "Expression type not supported in constant queries".to_string(),
1412            )),
1413        }
1414    }
1415
1416    /// Extract a `TableId` from a FROM clause. Cassandra CQL has no JOINs, so
1417    /// either form (bare table or aliased table) yields the same result.
1418    fn extract_table_id(&self, from_clause: &FromClause) -> Result<TableId> {
1419        match from_clause {
1420            FromClause::Table(table_id) | FromClause::TableAlias(table_id, _) => {
1421                Ok(table_id.clone())
1422            }
1423        }
1424    }
1425
1426    async fn get_result_columns(&self, statement: &SelectStatement) -> Result<Vec<ColumnInfo>> {
1427        let mut columns = Vec::new();
1428
1429        match &statement.select_clause {
1430            SelectClause::All => {
1431                // For SELECT *, look up the schema to get column names and CQL types.
1432                // This is needed for streaming mode where we can't wait for the first row.
1433                if let Some(ref from_clause) = statement.from_clause {
1434                    let table_id = self.extract_table_id(from_clause)?;
1435                    let (keyspace_opt, table_name) = parse_table_id(&table_id);
1436
1437                    // Look up schema from SchemaManager
1438                    if let Some(schema) = self
1439                        ._schema
1440                        .find_schema_by_table(&keyspace_opt, &table_name)
1441                        .await
1442                    {
1443                        // Collect all schema columns (sorted alphabetically for determinism)
1444                        let mut schema_cols: Vec<&crate::schema::Column> =
1445                            schema.columns.iter().collect();
1446                        schema_cols.sort_by_key(|c| c.name.as_str());
1447
1448                        let keyspace_str = keyspace_opt.as_deref().unwrap_or("");
1449                        let table_name_str = format!("{}.{}", keyspace_str, table_name);
1450
1451                        for (idx, schema_col) in schema_cols.iter().enumerate() {
1452                            // Parse the CQL type string into a structured CqlType (Issue #674).
1453                            let cql_type_opt = parse_cql_type_str(&schema_col.data_type);
1454                            // Derive the flat DataType from the CqlType; avoids hardcoded Text.
1455                            let data_type = cql_type_opt
1456                                .as_ref()
1457                                .map(cql_type_to_data_type)
1458                                .unwrap_or(crate::types::DataType::Text);
1459
1460                            let mut col_info = ColumnInfo {
1461                                name: schema_col.name.clone(),
1462                                data_type,
1463                                nullable: true,
1464                                position: idx,
1465                                table_name: Some(table_name_str.clone()),
1466                                cql_type: None,
1467                            };
1468                            if let Some(cql_type) = cql_type_opt {
1469                                col_info = col_info.with_cql_type(cql_type);
1470                            }
1471                            columns.push(col_info);
1472                        }
1473
1474                        log::debug!(
1475                            "SELECT * resolved {} columns from schema for {:?}.{}",
1476                            columns.len(),
1477                            keyspace_opt,
1478                            table_name
1479                        );
1480                    }
1481                    // If schema not found, columns stay empty - will be populated from first row at runtime
1482                }
1483            }
1484            SelectClause::Columns(exprs) | SelectClause::Distinct(exprs) => {
1485                // Try to resolve a schema for the FROM table (if present) so we can
1486                // attach authoritative CQL types to explicitly projected columns (Issue #674).
1487                let schema_opt = if let Some(ref from_clause) = statement.from_clause {
1488                    if let Ok(table_id) = self.extract_table_id(from_clause) {
1489                        let (keyspace_opt, table_name) = parse_table_id(&table_id);
1490                        self._schema
1491                            .find_schema_by_table(&keyspace_opt, &table_name)
1492                            .await
1493                    } else {
1494                        None
1495                    }
1496                } else {
1497                    None
1498                };
1499
1500                for (i, expr) in exprs.iter().enumerate() {
1501                    let column_name = match expr {
1502                        SelectExpression::Column(col_ref) => col_ref.column.clone(),
1503                        SelectExpression::Aliased(_, alias) => alias.clone(),
1504                        _ => format!("col_{i}"),
1505                    };
1506
1507                    // Look up CQL type for this column in the schema (Issue #674).
1508                    let cql_type_opt = schema_opt.as_ref().and_then(|schema| {
1509                        schema
1510                            .columns
1511                            .iter()
1512                            .find(|c| c.name == column_name)
1513                            .and_then(|c| parse_cql_type_str(&c.data_type))
1514                    });
1515                    let data_type = cql_type_opt
1516                        .as_ref()
1517                        .map(cql_type_to_data_type)
1518                        .unwrap_or(crate::types::DataType::Text);
1519
1520                    let mut col_info = ColumnInfo {
1521                        name: column_name,
1522                        data_type,
1523                        nullable: true,
1524                        position: i,
1525                        table_name: None,
1526                        cql_type: None,
1527                    };
1528                    if let Some(cql_type) = cql_type_opt {
1529                        col_info = col_info.with_cql_type(cql_type);
1530                    }
1531                    columns.push(col_info);
1532                }
1533            }
1534        }
1535
1536        Ok(columns)
1537    }
1538}
1539
1540#[cfg(test)]
1541mod tests {
1542    use super::*;
1543    use crate::{platform::Platform, Config};
1544    use tempfile::TempDir;
1545
1546    async fn create_test_executor() -> SelectExecutor {
1547        let temp_dir = TempDir::new().unwrap();
1548        let config = Config::default();
1549        let platform = Arc::new(Platform::new(&config).await.unwrap());
1550        let storage = Arc::new(
1551            StorageEngine::open(
1552                temp_dir.path(),
1553                &config,
1554                platform.clone(),
1555                #[cfg(feature = "state_machine")]
1556                None,
1557            )
1558            .await
1559            .unwrap(),
1560        );
1561        let _schema = Arc::new(SchemaManager::new(temp_dir.path()).await.unwrap());
1562
1563        SelectExecutor { _schema, storage }
1564    }
1565
1566    #[test]
1567    fn test_value_comparison() {
1568        use std::cmp::Ordering;
1569        assert_eq!(
1570            try_compare_values(&Value::Integer(5), &Value::Integer(3)).unwrap(),
1571            Ordering::Greater
1572        );
1573        assert_eq!(
1574            try_compare_values(&Value::Integer(3), &Value::Integer(5)).unwrap(),
1575            Ordering::Less
1576        );
1577        assert_eq!(
1578            try_compare_values(&Value::Integer(5), &Value::Integer(5)).unwrap(),
1579            Ordering::Equal
1580        );
1581    }
1582
1583    #[tokio::test]
1584    async fn test_like_pattern_matching() {
1585        let executor = create_test_executor().await;
1586
1587        assert!(executor.match_like_pattern("hello", "h%"));
1588        assert!(executor.match_like_pattern("hello", "%lo"));
1589        assert!(executor.match_like_pattern("hello", "h_llo"));
1590        assert!(!executor.match_like_pattern("hello", "h_l"));
1591    }
1592
1593    // ------------------------------------------------------------------
1594    // Issue #586: partition-key reconstruction on the scan path.
1595    // ------------------------------------------------------------------
1596
1597    fn single_pk_schema(name: &str, data_type: &str) -> crate::schema::TableSchema {
1598        crate::schema::TableSchema {
1599            keyspace: "ks".to_string(),
1600            table: "t".to_string(),
1601            partition_keys: vec![crate::schema::KeyColumn {
1602                name: name.to_string(),
1603                data_type: data_type.to_string(),
1604                position: 0,
1605            }],
1606            clustering_keys: vec![],
1607            columns: vec![],
1608            comments: std::collections::HashMap::new(),
1609        }
1610    }
1611
1612    /// Issue #586: a single-component TEXT partition key is stored as raw bytes
1613    /// with NO length prefix. `build_row_from_scan` must materialise it from the
1614    /// `RowKey`. Before the fix the column was silently dropped (the decoder
1615    /// read a phantom `u16` prefix, errored, and the error was swallowed).
1616    #[test]
1617    fn build_row_from_scan_materialises_single_text_pk() {
1618        let key = RowKey::new(b"k0000000000000000".to_vec());
1619        let value = Value::Map(vec![(
1620            Value::Text("name".to_string()),
1621            Value::Text("name-0".to_string()),
1622        )]);
1623        let schema = single_pk_schema("id", "text");
1624
1625        let row = build_row_from_scan(key, value, &[], Some(&schema))
1626            .expect("row must be built (not tombstoned)");
1627
1628        assert_eq!(
1629            row.values.get("id"),
1630            Some(&Value::Text("k0000000000000000".to_string())),
1631            "Issue #586: single TEXT PK column must be reconstructed from the raw row key"
1632        );
1633        // Regular columns must still be present.
1634        assert_eq!(
1635            row.values.get("name"),
1636            Some(&Value::Text("name-0".to_string()))
1637        );
1638    }
1639
1640    /// Issue #586: with the PK column materialised, a residual `WHERE id = '...'`
1641    /// (the path TEXT single-PK queries fall through to) now matches.
1642    #[test]
1643    fn scan_built_row_matches_text_pk_equality_predicate() {
1644        use super::super::select_optimizer::{SSTableFilterOp, SSTablePredicate};
1645
1646        let key = RowKey::new(b"k0000000000000000".to_vec());
1647        let value = Value::Map(vec![(Value::Text("age".to_string()), Value::Integer(0))]);
1648        let schema = single_pk_schema("id", "text");
1649        let row = build_row_from_scan(key, value, &[], Some(&schema)).unwrap();
1650
1651        let predicate = SSTablePredicate {
1652            column: "id".to_string(),
1653            operation: SSTableFilterOp::Equal,
1654            values: vec![Value::Text("k0000000000000000".to_string())],
1655        };
1656
1657        assert!(
1658            evaluate_predicates(&row, std::slice::from_ref(&predicate)).unwrap(),
1659            "Issue #586: WHERE id = '<literal>' must match the reconstructed PK column"
1660        );
1661    }
1662}