rustlite_core/query/
executor.rs

1/// Query executor
2///
3/// Executes physical query plans using iterators.
4use super::ast::*;
5use super::planner::{PhysicalOperator, PhysicalPlan};
6use crate::error::Result;
7use std::collections::HashMap;
8use std::fmt;
9
10/// Query result row
11#[derive(Debug, Clone, PartialEq)]
12pub struct Row {
13    pub columns: Vec<Column>,
14    pub values: Vec<Value>,
15}
16
17/// Column metadata
18#[derive(Debug, Clone, PartialEq)]
19pub struct Column {
20    pub name: String,
21    pub alias: Option<String>,
22}
23
24/// Value types in query results
25#[derive(Debug, Clone, PartialEq)]
26pub enum Value {
27    Integer(i64),
28    Float(f64),
29    String(String),
30    Boolean(bool),
31    Null,
32}
33
34impl Value {
35    /// Convert value to bytes for comparison
36    pub fn to_bytes(&self) -> Vec<u8> {
37        match self {
38            Value::Integer(i) => i.to_le_bytes().to_vec(),
39            Value::Float(f) => f.to_le_bytes().to_vec(),
40            Value::String(s) => s.as_bytes().to_vec(),
41            Value::Boolean(b) => vec![if *b { 1 } else { 0 }],
42            Value::Null => vec![],
43        }
44    }
45
46    /// Compare values
47    pub fn compare(&self, other: &Value, op: &BinaryOperator) -> bool {
48        match (self, other) {
49            (Value::Integer(a), Value::Integer(b)) => match op {
50                BinaryOperator::Eq => a == b,
51                BinaryOperator::Ne => a != b,
52                BinaryOperator::Lt => a < b,
53                BinaryOperator::Le => a <= b,
54                BinaryOperator::Gt => a > b,
55                BinaryOperator::Ge => a >= b,
56            },
57            (Value::Float(a), Value::Float(b)) => match op {
58                BinaryOperator::Eq => (a - b).abs() < f64::EPSILON,
59                BinaryOperator::Ne => (a - b).abs() >= f64::EPSILON,
60                BinaryOperator::Lt => a < b,
61                BinaryOperator::Le => a <= b,
62                BinaryOperator::Gt => a > b,
63                BinaryOperator::Ge => a >= b,
64            },
65            (Value::String(a), Value::String(b)) => match op {
66                BinaryOperator::Eq => a == b,
67                BinaryOperator::Ne => a != b,
68                BinaryOperator::Lt => a < b,
69                BinaryOperator::Le => a <= b,
70                BinaryOperator::Gt => a > b,
71                BinaryOperator::Ge => a >= b,
72            },
73            (Value::Boolean(a), Value::Boolean(b)) => match op {
74                BinaryOperator::Eq => a == b,
75                BinaryOperator::Ne => a != b,
76                _ => false,
77            },
78            (Value::Null, Value::Null) => matches!(op, BinaryOperator::Eq),
79            _ => false,
80        }
81    }
82}
83
84impl fmt::Display for Value {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        match self {
87            Value::Integer(i) => write!(f, "{}", i),
88            Value::Float(fl) => write!(f, "{}", fl),
89            Value::String(s) => write!(f, "{}", s),
90            Value::Boolean(b) => write!(f, "{}", b),
91            Value::Null => write!(f, "NULL"),
92        }
93    }
94}
95
96/// Query execution context
97#[derive(Clone, Default)]
98pub struct ExecutionContext {
99    /// Storage backend access (simplified - would integrate with actual storage)
100    pub data: HashMap<String, Vec<Row>>,
101    /// Index access (simplified)
102    pub indexes: HashMap<String, HashMap<Vec<u8>, Vec<u64>>>,
103}
104
105impl ExecutionContext {
106    /// Creates a new execution context
107    pub fn new() -> Self {
108        Self::default()
109    }
110}
111
112/// Query executor
113pub struct Executor {
114    context: ExecutionContext,
115}
116
117impl Executor {
118    /// Create new executor
119    pub fn new(context: ExecutionContext) -> Self {
120        Self { context }
121    }
122
123    /// Execute a physical plan
124    pub fn execute(&mut self, plan: &PhysicalPlan) -> Result<Vec<Row>> {
125        self.execute_operator(&plan.root)
126    }
127
128    fn execute_operator(&mut self, op: &PhysicalOperator) -> Result<Vec<Row>> {
129        match op {
130            PhysicalOperator::TableScan { table } => self.execute_table_scan(table),
131            PhysicalOperator::IndexScan { table, index, key } => {
132                self.execute_index_scan(table, index, key)
133            }
134            PhysicalOperator::IndexRangeScan {
135                table,
136                index,
137                start,
138                end,
139            } => self.execute_index_range_scan(table, index, start.as_deref(), end.as_deref()),
140            PhysicalOperator::Filter { input, condition } => self.execute_filter(input, condition),
141            PhysicalOperator::Sort { input, columns } => self.execute_sort(input, columns),
142            PhysicalOperator::Limit {
143                input,
144                count,
145                offset,
146            } => self.execute_limit(input, *count, *offset),
147            PhysicalOperator::Project { input, columns } => self.execute_project(input, columns),
148            PhysicalOperator::HashJoin {
149                left,
150                right,
151                join_type,
152                condition,
153            } => self.execute_hash_join(left, right, join_type, condition),
154            PhysicalOperator::Aggregate { input, aggregates } => {
155                self.execute_aggregate(input, aggregates)
156            }
157        }
158    }
159
160    fn execute_table_scan(&mut self, table: &str) -> Result<Vec<Row>> {
161        // Get all rows from table
162        Ok(self.context.data.get(table).cloned().unwrap_or_default())
163    }
164
165    fn execute_index_scan(&mut self, table: &str, index: &str, key: &[u8]) -> Result<Vec<Row>> {
166        // Look up row IDs from index
167        let row_ids = self
168            .context
169            .indexes
170            .get(index)
171            .and_then(|idx| idx.get(key))
172            .cloned()
173            .unwrap_or_default();
174
175        // Fetch rows by ID
176        let all_rows = self.context.data.get(table).cloned().unwrap_or_default();
177        let result = row_ids
178            .iter()
179            .filter_map(|&id| all_rows.get(id as usize).cloned())
180            .collect();
181
182        Ok(result)
183    }
184
185    fn execute_index_range_scan(
186        &mut self,
187        table: &str,
188        index: &str,
189        start: Option<&[u8]>,
190        end: Option<&[u8]>,
191    ) -> Result<Vec<Row>> {
192        // Get all keys from index in range
193        let index_data = self.context.indexes.get(index).cloned().unwrap_or_default();
194
195        let mut row_ids = Vec::new();
196        for (key, ids) in index_data {
197            let in_range = match (start, end) {
198                (Some(s), Some(e)) => key.as_slice() >= s && key.as_slice() <= e,
199                (Some(s), None) => key.as_slice() >= s,
200                (None, Some(e)) => key.as_slice() <= e,
201                (None, None) => true,
202            };
203
204            if in_range {
205                row_ids.extend(ids);
206            }
207        }
208
209        // Fetch rows by ID
210        let all_rows = self.context.data.get(table).cloned().unwrap_or_default();
211        let result = row_ids
212            .iter()
213            .filter_map(|&id| all_rows.get(id as usize).cloned())
214            .collect();
215
216        Ok(result)
217    }
218
219    fn execute_filter(
220        &mut self,
221        input: &PhysicalOperator,
222        condition: &Expression,
223    ) -> Result<Vec<Row>> {
224        let rows = self.execute_operator(input)?;
225
226        let filtered = rows
227            .into_iter()
228            .filter(|row| self.evaluate_condition(row, condition))
229            .collect();
230
231        Ok(filtered)
232    }
233
234    fn execute_sort(
235        &mut self,
236        input: &PhysicalOperator,
237        columns: &[OrderByColumn],
238    ) -> Result<Vec<Row>> {
239        let mut rows = self.execute_operator(input)?;
240
241        rows.sort_by(|a, b| {
242            for col in columns {
243                let a_idx = a.columns.iter().position(|c| c.name == col.column);
244                let b_idx = b.columns.iter().position(|c| c.name == col.column);
245
246                if let (Some(a_idx), Some(b_idx)) = (a_idx, b_idx) {
247                    let ordering = match (&a.values[a_idx], &b.values[b_idx]) {
248                        (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
249                        (Value::Float(a), Value::Float(b)) => {
250                            a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
251                        }
252                        (Value::String(a), Value::String(b)) => a.cmp(b),
253                        (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
254                        _ => std::cmp::Ordering::Equal,
255                    };
256
257                    let ordering = match col.direction {
258                        OrderDirection::Asc => ordering,
259                        OrderDirection::Desc => ordering.reverse(),
260                    };
261
262                    if ordering != std::cmp::Ordering::Equal {
263                        return ordering;
264                    }
265                }
266            }
267            std::cmp::Ordering::Equal
268        });
269
270        Ok(rows)
271    }
272
273    fn execute_limit(
274        &mut self,
275        input: &PhysicalOperator,
276        count: usize,
277        offset: usize,
278    ) -> Result<Vec<Row>> {
279        let rows = self.execute_operator(input)?;
280        Ok(rows.into_iter().skip(offset).take(count).collect())
281    }
282
283    fn execute_project(
284        &mut self,
285        input: &PhysicalOperator,
286        columns: &[SelectColumn],
287    ) -> Result<Vec<Row>> {
288        let rows = self.execute_operator(input)?;
289
290        let projected = rows
291            .into_iter()
292            .map(|row| {
293                let mut new_columns = Vec::new();
294                let mut new_values = Vec::new();
295
296                for col in columns {
297                    match col {
298                        SelectColumn::Wildcard => {
299                            new_columns.extend(row.columns.clone());
300                            new_values.extend(row.values.clone());
301                        }
302                        SelectColumn::Column { name, alias } => {
303                            if let Some(idx) = row.columns.iter().position(|c| &c.name == name) {
304                                new_columns.push(Column {
305                                    name: name.clone(),
306                                    alias: alias.clone(),
307                                });
308                                new_values.push(row.values[idx].clone());
309                            }
310                        }
311                        SelectColumn::Aggregate { .. } => {
312                            // Aggregates handled by Aggregate operator
313                        }
314                    }
315                }
316
317                Row {
318                    columns: new_columns,
319                    values: new_values,
320                }
321            })
322            .collect();
323
324        Ok(projected)
325    }
326
327    fn execute_hash_join(
328        &mut self,
329        left: &PhysicalOperator,
330        right: &PhysicalOperator,
331        join_type: &JoinType,
332        condition: &Expression,
333    ) -> Result<Vec<Row>> {
334        let left_rows = self.execute_operator(left)?;
335        let right_rows = self.execute_operator(right)?;
336
337        // Choose join algorithm based on dataset size
338        if right_rows.len() < 100 {
339            // Use nested loop join for small datasets
340            self.nested_loop_join(&left_rows, &right_rows, join_type, condition)
341        } else {
342            // Use hash join for larger datasets
343            self.hash_join_impl(&left_rows, &right_rows, join_type, condition)
344        }
345    }
346
347    /// Nested loop join - simple but works for small datasets
348    fn nested_loop_join(
349        &mut self,
350        left_rows: &[Row],
351        right_rows: &[Row],
352        join_type: &JoinType,
353        condition: &Expression,
354    ) -> Result<Vec<Row>> {
355        let mut result = Vec::new();
356
357        match join_type {
358            JoinType::Inner => {
359                for l_row in left_rows {
360                    for r_row in right_rows {
361                        if self.evaluate_join_condition(l_row, r_row, condition) {
362                            result.push(self.merge_rows(l_row, r_row));
363                        }
364                    }
365                }
366            }
367            JoinType::Left => {
368                for l_row in left_rows {
369                    let mut matched = false;
370                    for r_row in right_rows {
371                        if self.evaluate_join_condition(l_row, r_row, condition) {
372                            result.push(self.merge_rows(l_row, r_row));
373                            matched = true;
374                        }
375                    }
376                    if !matched {
377                        // Left row with NULL values for right side
378                        result.push(self.merge_rows_with_null(l_row, right_rows[0].columns.len()));
379                    }
380                }
381            }
382            JoinType::Right => {
383                for r_row in right_rows {
384                    let mut matched = false;
385                    for l_row in left_rows {
386                        if self.evaluate_join_condition(l_row, r_row, condition) {
387                            result.push(self.merge_rows(l_row, r_row));
388                            matched = true;
389                        }
390                    }
391                    if !matched {
392                        // NULL values for left side with right row
393                        result.push(self.merge_null_with_row(left_rows[0].columns.len(), r_row));
394                    }
395                }
396            }
397            JoinType::Full => {
398                let mut left_matched = vec![false; left_rows.len()];
399                let mut right_matched = vec![false; right_rows.len()];
400
401                for (l_idx, l_row) in left_rows.iter().enumerate() {
402                    for (r_idx, r_row) in right_rows.iter().enumerate() {
403                        if self.evaluate_join_condition(l_row, r_row, condition) {
404                            result.push(self.merge_rows(l_row, r_row));
405                            left_matched[l_idx] = true;
406                            right_matched[r_idx] = true;
407                        }
408                    }
409                }
410
411                // Add unmatched left rows
412                for (idx, matched) in left_matched.iter().enumerate() {
413                    if !*matched {
414                        result.push(
415                            self.merge_rows_with_null(&left_rows[idx], right_rows[0].columns.len()),
416                        );
417                    }
418                }
419
420                // Add unmatched right rows
421                for (idx, matched) in right_matched.iter().enumerate() {
422                    if !*matched {
423                        result.push(
424                            self.merge_null_with_row(left_rows[0].columns.len(), &right_rows[idx]),
425                        );
426                    }
427                }
428            }
429        }
430
431        Ok(result)
432    }
433
434    /// Hash join - efficient for larger datasets
435    fn hash_join_impl(
436        &mut self,
437        left_rows: &[Row],
438        right_rows: &[Row],
439        join_type: &JoinType,
440        condition: &Expression,
441    ) -> Result<Vec<Row>> {
442        // Build hash table from right side (build phase)
443        let mut hash_table: HashMap<Vec<u8>, Vec<&Row>> = HashMap::new();
444
445        for r_row in right_rows {
446            let key = self.extract_join_key(r_row, condition, true);
447            hash_table.entry(key).or_insert_with(Vec::new).push(r_row);
448        }
449
450        let mut result = Vec::new();
451
452        match join_type {
453            JoinType::Inner => {
454                for l_row in left_rows {
455                    let key = self.extract_join_key(l_row, condition, false);
456                    if let Some(matching_rows) = hash_table.get(&key) {
457                        for r_row in matching_rows {
458                            if self.evaluate_join_condition(l_row, r_row, condition) {
459                                result.push(self.merge_rows(l_row, r_row));
460                            }
461                        }
462                    }
463                }
464            }
465            JoinType::Left => {
466                for l_row in left_rows {
467                    let key = self.extract_join_key(l_row, condition, false);
468                    if let Some(matching_rows) = hash_table.get(&key) {
469                        let mut matched = false;
470                        for r_row in matching_rows {
471                            if self.evaluate_join_condition(l_row, r_row, condition) {
472                                result.push(self.merge_rows(l_row, r_row));
473                                matched = true;
474                            }
475                        }
476                        if !matched {
477                            result.push(
478                                self.merge_rows_with_null(l_row, right_rows[0].columns.len()),
479                            );
480                        }
481                    } else {
482                        result.push(self.merge_rows_with_null(l_row, right_rows[0].columns.len()));
483                    }
484                }
485            }
486            JoinType::Right | JoinType::Full => {
487                // For RIGHT and FULL, fall back to nested loop
488                // (hash join is less efficient for these join types)
489                return self.nested_loop_join(left_rows, right_rows, join_type, condition);
490            }
491        }
492
493        Ok(result)
494    }
495
496    /// Extract join key from row for hashing
497    fn extract_join_key(&self, row: &Row, condition: &Expression, is_right: bool) -> Vec<u8> {
498        // Simple key extraction - would be more sophisticated in production
499        match condition {
500            Expression::BinaryOp { left, right, .. } => {
501                if let (Expression::Column(left_col), Expression::Column(right_col)) =
502                    (left.as_ref(), right.as_ref())
503                {
504                    let col_name = if is_right { right_col } else { left_col };
505
506                    // Strip table prefix if present (e.g., "users.id" -> "id")
507                    let column_name = if let Some(dot_pos) = col_name.rfind('.') {
508                        &col_name[dot_pos + 1..]
509                    } else {
510                        col_name
511                    };
512
513                    if let Some(idx) = row.columns.iter().position(|c| c.name == column_name) {
514                        return row.values[idx].to_bytes();
515                    }
516                }
517            }
518            _ => {}
519        }
520        vec![]
521    }
522
523    /// Evaluate join condition for two rows
524    fn evaluate_join_condition(&self, left: &Row, right: &Row, condition: &Expression) -> bool {
525        match condition {
526            Expression::BinaryOp {
527                left: l_expr,
528                op,
529                right: r_expr,
530            } => {
531                let left_val = self.evaluate_expression_for_row(l_expr, left, right, true);
532                let right_val = self.evaluate_expression_for_row(r_expr, left, right, false);
533
534                if let (Some(lv), Some(rv)) = (left_val, right_val) {
535                    return lv.compare(&rv, op);
536                }
537                false
538            }
539            Expression::LogicalOp {
540                left: l_expr,
541                op,
542                right: r_expr,
543            } => {
544                let left_result = self.evaluate_join_condition(left, right, l_expr);
545                let right_result = self.evaluate_join_condition(left, right, r_expr);
546
547                match op {
548                    LogicalOperator::And => left_result && right_result,
549                    LogicalOperator::Or => left_result || right_result,
550                }
551            }
552            _ => true, // Default to true for unsupported conditions
553        }
554    }
555
556    /// Evaluate expression in the context of two joined rows
557    fn evaluate_expression_for_row(
558        &self,
559        expr: &Expression,
560        left_row: &Row,
561        right_row: &Row,
562        is_left: bool,
563    ) -> Option<Value> {
564        match expr {
565            Expression::Column(name) => {
566                // Strip table prefix if present (e.g., "users.id" -> "id")
567                let column_name = if let Some(dot_pos) = name.rfind('.') {
568                    &name[dot_pos + 1..]
569                } else {
570                    name
571                };
572
573                // Try to find column in appropriate row
574                let row = if is_left { left_row } else { right_row };
575                row.columns
576                    .iter()
577                    .position(|c| c.name == column_name)
578                    .map(|idx| row.values[idx].clone())
579            }
580            Expression::Literal(lit) => Some(self.literal_to_value(lit)),
581            _ => None,
582        }
583    }
584
585    /// Merge two rows into one
586    fn merge_rows(&self, left: &Row, right: &Row) -> Row {
587        let mut columns = left.columns.clone();
588        columns.extend(right.columns.clone());
589        let mut values = left.values.clone();
590        values.extend(right.values.clone());
591        Row { columns, values }
592    }
593
594    /// Merge left row with NULL values for right side
595    fn merge_rows_with_null(&self, left: &Row, right_col_count: usize) -> Row {
596        let mut columns = left.columns.clone();
597        let mut values = left.values.clone();
598        for _ in 0..right_col_count {
599            values.push(Value::Null);
600        }
601        Row { columns, values }
602    }
603
604    /// Merge NULL values for left side with right row
605    fn merge_null_with_row(&self, left_col_count: usize, right: &Row) -> Row {
606        let mut columns = Vec::new();
607        let mut values = Vec::new();
608        for _ in 0..left_col_count {
609            values.push(Value::Null);
610        }
611        columns.extend(right.columns.clone());
612        values.extend(right.values.clone());
613        Row { columns, values }
614    }
615
616    /// Convert literal to value
617    fn literal_to_value(&self, lit: &Literal) -> Value {
618        match lit {
619            Literal::Integer(i) => Value::Integer(*i),
620            Literal::Float(f) => Value::Float(*f),
621            Literal::String(s) => Value::String(s.clone()),
622            Literal::Boolean(b) => Value::Boolean(*b),
623            Literal::Null => Value::Null,
624        }
625    }
626
627    fn execute_aggregate(
628        &mut self,
629        input: &PhysicalOperator,
630        aggregates: &[SelectColumn],
631    ) -> Result<Vec<Row>> {
632        let rows = self.execute_operator(input)?;
633
634        let mut result_columns = Vec::new();
635        let mut result_values = Vec::new();
636
637        for agg in aggregates {
638            if let SelectColumn::Aggregate {
639                function,
640                column,
641                alias,
642            } = agg
643            {
644                let col_name = match column.as_ref() {
645                    SelectColumn::Wildcard => "*",
646                    SelectColumn::Column { name, .. } => name.as_str(),
647                    _ => continue,
648                };
649
650                let value = match function {
651                    AggregateFunction::Count => Value::Integer(rows.len() as i64),
652                    AggregateFunction::Sum => {
653                        let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
654                        if let Some(idx) = col_idx {
655                            let sum: i64 = rows
656                                .iter()
657                                .filter_map(|r| match &r.values[idx] {
658                                    Value::Integer(i) => Some(i),
659                                    _ => None,
660                                })
661                                .sum();
662                            Value::Integer(sum)
663                        } else {
664                            Value::Null
665                        }
666                    }
667                    AggregateFunction::Avg => {
668                        let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
669                        if let Some(idx) = col_idx {
670                            let values: Vec<i64> = rows
671                                .iter()
672                                .filter_map(|r| match &r.values[idx] {
673                                    Value::Integer(i) => Some(*i),
674                                    _ => None,
675                                })
676                                .collect();
677                            if !values.is_empty() {
678                                let sum: i64 = values.iter().sum();
679                                Value::Float(sum as f64 / values.len() as f64)
680                            } else {
681                                Value::Null
682                            }
683                        } else {
684                            Value::Null
685                        }
686                    }
687                    AggregateFunction::Min => {
688                        let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
689                        if let Some(idx) = col_idx {
690                            rows.iter()
691                                .map(|r| &r.values[idx])
692                                .min_by(|a, b| match (a, b) {
693                                    (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
694                                    _ => std::cmp::Ordering::Equal,
695                                })
696                                .cloned()
697                                .unwrap_or(Value::Null)
698                        } else {
699                            Value::Null
700                        }
701                    }
702                    AggregateFunction::Max => {
703                        let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
704                        if let Some(idx) = col_idx {
705                            rows.iter()
706                                .map(|r| &r.values[idx])
707                                .max_by(|a, b| match (a, b) {
708                                    (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
709                                    _ => std::cmp::Ordering::Equal,
710                                })
711                                .cloned()
712                                .unwrap_or(Value::Null)
713                        } else {
714                            Value::Null
715                        }
716                    }
717                };
718
719                let display_name = alias
720                    .as_ref()
721                    .cloned()
722                    .unwrap_or_else(|| format!("{}({})", function, col_name));
723
724                result_columns.push(Column {
725                    name: display_name.clone(),
726                    alias: alias.clone(),
727                });
728                result_values.push(value);
729            }
730        }
731
732        Ok(vec![Row {
733            columns: result_columns,
734            values: result_values,
735        }])
736    }
737
738    fn evaluate_condition(&self, row: &Row, condition: &Expression) -> bool {
739        match condition {
740            Expression::Column(name) => {
741                // Column reference - check if exists and is truthy
742                row.columns.iter().any(|c| &c.name == name)
743            }
744            Expression::Literal(lit) => {
745                // Literal value
746                match lit {
747                    Literal::Boolean(b) => *b,
748                    _ => true,
749                }
750            }
751            Expression::BinaryOp { left, op, right } => {
752                let left_val = self.evaluate_expression(row, left);
753                let right_val = self.evaluate_expression(row, right);
754
755                if let (Some(l), Some(r)) = (left_val, right_val) {
756                    l.compare(&r, op)
757                } else {
758                    false
759                }
760            }
761            Expression::LogicalOp { left, op, right } => {
762                let left_result = self.evaluate_condition(row, left);
763                let right_result = self.evaluate_condition(row, right);
764
765                match op {
766                    LogicalOperator::And => left_result && right_result,
767                    LogicalOperator::Or => left_result || right_result,
768                }
769            }
770            Expression::Not(expr) => !self.evaluate_condition(row, expr),
771            Expression::Like { expr, pattern } => {
772                if let Some(Value::String(s)) = self.evaluate_expression(row, expr) {
773                    // Simplified LIKE - just use contains for now
774                    let pattern = pattern.replace('%', "");
775                    s.contains(&pattern)
776                } else {
777                    false
778                }
779            }
780            Expression::In { expr, values } => {
781                self.evaluate_expression(row, expr).is_some_and(|val| {
782                    values.iter().any(|lit| {
783                        let lit_val = literal_to_value(lit);
784                        val == lit_val
785                    })
786                })
787            }
788            Expression::Between { expr, min, max } => {
789                if let (Some(val), Some(min_v), Some(max_v)) = (
790                    self.evaluate_expression(row, expr),
791                    self.evaluate_expression(row, min),
792                    self.evaluate_expression(row, max),
793                ) {
794                    val.compare(&min_v, &BinaryOperator::Ge)
795                        && val.compare(&max_v, &BinaryOperator::Le)
796                } else {
797                    false
798                }
799            }
800        }
801    }
802
803    fn evaluate_expression(&self, row: &Row, expr: &Expression) -> Option<Value> {
804        match expr {
805            Expression::Column(name) => row
806                .columns
807                .iter()
808                .position(|c| &c.name == name)
809                .and_then(|idx| row.values.get(idx).cloned()),
810            Expression::Literal(lit) => Some(literal_to_value(lit)),
811            _ => None,
812        }
813    }
814}
815
816fn literal_to_value(lit: &Literal) -> Value {
817    match lit {
818        Literal::Integer(i) => Value::Integer(*i),
819        Literal::Float(f) => Value::Float(*f),
820        Literal::String(s) => Value::String(s.clone()),
821        Literal::Boolean(b) => Value::Boolean(*b),
822        Literal::Null => Value::Null,
823    }
824}
825
826#[cfg(test)]
827mod tests {
828    use super::*;
829    use crate::query::parser::Parser;
830    use crate::query::planner::Planner;
831
832    #[test]
833    fn test_table_scan() {
834        let mut context = ExecutionContext::new();
835        context.data.insert(
836            "users".to_string(),
837            vec![
838                Row {
839                    columns: vec![
840                        Column {
841                            name: "id".to_string(),
842                            alias: None,
843                        },
844                        Column {
845                            name: "name".to_string(),
846                            alias: None,
847                        },
848                    ],
849                    values: vec![Value::Integer(1), Value::String("Alice".to_string())],
850                },
851                Row {
852                    columns: vec![
853                        Column {
854                            name: "id".to_string(),
855                            alias: None,
856                        },
857                        Column {
858                            name: "name".to_string(),
859                            alias: None,
860                        },
861                    ],
862                    values: vec![Value::Integer(2), Value::String("Bob".to_string())],
863                },
864            ],
865        );
866
867        let mut executor = Executor::new(context);
868
869        let mut parser = Parser::new("SELECT * FROM users").unwrap();
870        let query = parser.parse().unwrap();
871        let planner = Planner::new();
872        let plan = planner.plan(&query).unwrap();
873
874        let result = executor.execute(&plan).unwrap();
875        assert_eq!(result.len(), 2);
876    }
877}