rustlite_core/query/
executor.rs

1/// Query executor
2///
3/// Executes physical query plans using iterators.
4use super::ast::*;
5use super::planner::{PhysicalOperator, PhysicalPlan};
6use crate::error::Result;
7use std::collections::HashMap;
8use std::fmt;
9use std::hash::{Hash, Hasher};
10
11/// Hashable wrapper for group key values
12#[derive(Debug, Clone, Eq)]
13struct GroupKey(Vec<GroupValue>);
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16enum GroupValue {
17    Integer(i64),
18    Float(i64), // Store float as bits for hashing
19    String(String),
20    Boolean(bool),
21    Null,
22}
23
24impl From<&Value> for GroupValue {
25    fn from(value: &Value) -> Self {
26        match value {
27            Value::Integer(i) => GroupValue::Integer(*i),
28            Value::Float(f) => GroupValue::Float(f.to_bits() as i64),
29            Value::String(s) => GroupValue::String(s.clone()),
30            Value::Boolean(b) => GroupValue::Boolean(*b),
31            Value::Null => GroupValue::Null,
32        }
33    }
34}
35
36impl PartialEq for GroupKey {
37    fn eq(&self, other: &Self) -> bool {
38        self.0 == other.0
39    }
40}
41
42impl Hash for GroupKey {
43    fn hash<H: Hasher>(&self, state: &mut H) {
44        self.0.hash(state);
45    }
46}
47
48/// Query result row
49#[derive(Debug, Clone, PartialEq)]
50pub struct Row {
51    pub columns: Vec<Column>,
52    pub values: Vec<Value>,
53}
54
55/// Column metadata
56#[derive(Debug, Clone, PartialEq)]
57pub struct Column {
58    pub name: String,
59    pub alias: Option<String>,
60}
61
62/// Value types in query results
63#[derive(Debug, Clone, PartialEq)]
64pub enum Value {
65    Integer(i64),
66    Float(f64),
67    String(String),
68    Boolean(bool),
69    Null,
70}
71
72impl Value {
73    /// Convert value to bytes for comparison
74    pub fn to_bytes(&self) -> Vec<u8> {
75        match self {
76            Value::Integer(i) => i.to_le_bytes().to_vec(),
77            Value::Float(f) => f.to_le_bytes().to_vec(),
78            Value::String(s) => s.as_bytes().to_vec(),
79            Value::Boolean(b) => vec![if *b { 1 } else { 0 }],
80            Value::Null => vec![],
81        }
82    }
83
84    /// Compare values
85    pub fn compare(&self, other: &Value, op: &BinaryOperator) -> bool {
86        match (self, other) {
87            (Value::Integer(a), Value::Integer(b)) => match op {
88                BinaryOperator::Eq => a == b,
89                BinaryOperator::Ne => a != b,
90                BinaryOperator::Lt => a < b,
91                BinaryOperator::Le => a <= b,
92                BinaryOperator::Gt => a > b,
93                BinaryOperator::Ge => a >= b,
94            },
95            (Value::Float(a), Value::Float(b)) => match op {
96                BinaryOperator::Eq => (a - b).abs() < f64::EPSILON,
97                BinaryOperator::Ne => (a - b).abs() >= f64::EPSILON,
98                BinaryOperator::Lt => a < b,
99                BinaryOperator::Le => a <= b,
100                BinaryOperator::Gt => a > b,
101                BinaryOperator::Ge => a >= b,
102            },
103            (Value::String(a), Value::String(b)) => match op {
104                BinaryOperator::Eq => a == b,
105                BinaryOperator::Ne => a != b,
106                BinaryOperator::Lt => a < b,
107                BinaryOperator::Le => a <= b,
108                BinaryOperator::Gt => a > b,
109                BinaryOperator::Ge => a >= b,
110            },
111            (Value::Boolean(a), Value::Boolean(b)) => match op {
112                BinaryOperator::Eq => a == b,
113                BinaryOperator::Ne => a != b,
114                _ => false,
115            },
116            (Value::Null, Value::Null) => matches!(op, BinaryOperator::Eq),
117            _ => false,
118        }
119    }
120}
121
122impl fmt::Display for Value {
123    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124        match self {
125            Value::Integer(i) => write!(f, "{}", i),
126            Value::Float(fl) => write!(f, "{}", fl),
127            Value::String(s) => write!(f, "{}", s),
128            Value::Boolean(b) => write!(f, "{}", b),
129            Value::Null => write!(f, "NULL"),
130        }
131    }
132}
133
134/// Query execution context
135#[derive(Clone, Default)]
136pub struct ExecutionContext {
137    /// Storage backend access (simplified - would integrate with actual storage)
138    pub data: HashMap<String, Vec<Row>>,
139    /// Index access (simplified)
140    pub indexes: HashMap<String, HashMap<Vec<u8>, Vec<u64>>>,
141}
142
143impl ExecutionContext {
144    /// Creates a new execution context
145    pub fn new() -> Self {
146        Self::default()
147    }
148}
149
150/// Query executor
151pub struct Executor {
152    context: ExecutionContext,
153}
154
155impl Executor {
156    /// Create new executor
157    pub fn new(context: ExecutionContext) -> Self {
158        Self { context }
159    }
160
161    /// Execute a physical plan
162    pub fn execute(&mut self, plan: &PhysicalPlan) -> Result<Vec<Row>> {
163        self.execute_operator(&plan.root)
164    }
165
166    fn execute_operator(&mut self, op: &PhysicalOperator) -> Result<Vec<Row>> {
167        match op {
168            PhysicalOperator::TableScan { table } => self.execute_table_scan(table),
169            PhysicalOperator::IndexScan { table, index, key } => {
170                self.execute_index_scan(table, index, key)
171            }
172            PhysicalOperator::IndexRangeScan {
173                table,
174                index,
175                start,
176                end,
177            } => self.execute_index_range_scan(table, index, start.as_deref(), end.as_deref()),
178            PhysicalOperator::Filter { input, condition } => self.execute_filter(input, condition),
179            PhysicalOperator::Sort { input, columns } => self.execute_sort(input, columns),
180            PhysicalOperator::Limit {
181                input,
182                count,
183                offset,
184            } => self.execute_limit(input, *count, *offset),
185            PhysicalOperator::Project { input, columns } => self.execute_project(input, columns),
186            PhysicalOperator::HashJoin {
187                left,
188                right,
189                join_type,
190                condition,
191            } => self.execute_hash_join(left, right, join_type, condition),
192            PhysicalOperator::GroupBy {
193                input,
194                group_columns,
195                aggregates,
196                having,
197            } => self.execute_group_by(input, group_columns, aggregates, having.as_ref()),
198            PhysicalOperator::Aggregate { input, aggregates } => {
199                self.execute_aggregate(input, aggregates)
200            }
201        }
202    }
203
204    fn execute_table_scan(&mut self, table: &str) -> Result<Vec<Row>> {
205        // Get all rows from table
206        Ok(self.context.data.get(table).cloned().unwrap_or_default())
207    }
208
209    fn execute_index_scan(&mut self, table: &str, index: &str, key: &[u8]) -> Result<Vec<Row>> {
210        // Look up row IDs from index
211        let row_ids = self
212            .context
213            .indexes
214            .get(index)
215            .and_then(|idx| idx.get(key))
216            .cloned()
217            .unwrap_or_default();
218
219        // Fetch rows by ID
220        let all_rows = self.context.data.get(table).cloned().unwrap_or_default();
221        let result = row_ids
222            .iter()
223            .filter_map(|&id| all_rows.get(id as usize).cloned())
224            .collect();
225
226        Ok(result)
227    }
228
229    fn execute_index_range_scan(
230        &mut self,
231        table: &str,
232        index: &str,
233        start: Option<&[u8]>,
234        end: Option<&[u8]>,
235    ) -> Result<Vec<Row>> {
236        // Get all keys from index in range
237        let index_data = self.context.indexes.get(index).cloned().unwrap_or_default();
238
239        let mut row_ids = Vec::new();
240        for (key, ids) in index_data {
241            let in_range = match (start, end) {
242                (Some(s), Some(e)) => key.as_slice() >= s && key.as_slice() <= e,
243                (Some(s), None) => key.as_slice() >= s,
244                (None, Some(e)) => key.as_slice() <= e,
245                (None, None) => true,
246            };
247
248            if in_range {
249                row_ids.extend(ids);
250            }
251        }
252
253        // Fetch rows by ID
254        let all_rows = self.context.data.get(table).cloned().unwrap_or_default();
255        let result = row_ids
256            .iter()
257            .filter_map(|&id| all_rows.get(id as usize).cloned())
258            .collect();
259
260        Ok(result)
261    }
262
263    fn execute_filter(
264        &mut self,
265        input: &PhysicalOperator,
266        condition: &Expression,
267    ) -> Result<Vec<Row>> {
268        let rows = self.execute_operator(input)?;
269
270        let filtered = rows
271            .into_iter()
272            .filter(|row| self.evaluate_condition(row, condition))
273            .collect();
274
275        Ok(filtered)
276    }
277
278    fn execute_sort(
279        &mut self,
280        input: &PhysicalOperator,
281        columns: &[OrderByColumn],
282    ) -> Result<Vec<Row>> {
283        let mut rows = self.execute_operator(input)?;
284
285        rows.sort_by(|a, b| {
286            for col in columns {
287                let a_idx = a.columns.iter().position(|c| c.name == col.column);
288                let b_idx = b.columns.iter().position(|c| c.name == col.column);
289
290                if let (Some(a_idx), Some(b_idx)) = (a_idx, b_idx) {
291                    let ordering = match (&a.values[a_idx], &b.values[b_idx]) {
292                        (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
293                        (Value::Float(a), Value::Float(b)) => {
294                            a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
295                        }
296                        (Value::String(a), Value::String(b)) => a.cmp(b),
297                        (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
298                        _ => std::cmp::Ordering::Equal,
299                    };
300
301                    let ordering = match col.direction {
302                        OrderDirection::Asc => ordering,
303                        OrderDirection::Desc => ordering.reverse(),
304                    };
305
306                    if ordering != std::cmp::Ordering::Equal {
307                        return ordering;
308                    }
309                }
310            }
311            std::cmp::Ordering::Equal
312        });
313
314        Ok(rows)
315    }
316
317    fn execute_limit(
318        &mut self,
319        input: &PhysicalOperator,
320        count: usize,
321        offset: usize,
322    ) -> Result<Vec<Row>> {
323        let rows = self.execute_operator(input)?;
324        Ok(rows.into_iter().skip(offset).take(count).collect())
325    }
326
327    fn execute_project(
328        &mut self,
329        input: &PhysicalOperator,
330        columns: &[SelectColumn],
331    ) -> Result<Vec<Row>> {
332        let rows = self.execute_operator(input)?;
333
334        let projected = rows
335            .into_iter()
336            .map(|row| {
337                let mut new_columns = Vec::new();
338                let mut new_values = Vec::new();
339
340                for col in columns {
341                    match col {
342                        SelectColumn::Wildcard => {
343                            new_columns.extend(row.columns.clone());
344                            new_values.extend(row.values.clone());
345                        }
346                        SelectColumn::Column { name, alias } => {
347                            if let Some(idx) = row.columns.iter().position(|c| &c.name == name) {
348                                new_columns.push(Column {
349                                    name: name.clone(),
350                                    alias: alias.clone(),
351                                });
352                                new_values.push(row.values[idx].clone());
353                            }
354                        }
355                        SelectColumn::Aggregate { .. } => {
356                            // Aggregates handled by Aggregate operator
357                        }
358                    }
359                }
360
361                Row {
362                    columns: new_columns,
363                    values: new_values,
364                }
365            })
366            .collect();
367
368        Ok(projected)
369    }
370
371    fn execute_hash_join(
372        &mut self,
373        left: &PhysicalOperator,
374        right: &PhysicalOperator,
375        join_type: &JoinType,
376        condition: &Expression,
377    ) -> Result<Vec<Row>> {
378        let left_rows = self.execute_operator(left)?;
379        let right_rows = self.execute_operator(right)?;
380
381        // Choose join algorithm based on dataset size
382        if right_rows.len() < 100 {
383            // Use nested loop join for small datasets
384            self.nested_loop_join(&left_rows, &right_rows, join_type, condition)
385        } else {
386            // Use hash join for larger datasets
387            self.hash_join_impl(&left_rows, &right_rows, join_type, condition)
388        }
389    }
390
391    /// Nested loop join - simple but works for small datasets
392    fn nested_loop_join(
393        &mut self,
394        left_rows: &[Row],
395        right_rows: &[Row],
396        join_type: &JoinType,
397        condition: &Expression,
398    ) -> Result<Vec<Row>> {
399        let mut result = Vec::new();
400
401        match join_type {
402            JoinType::Inner => {
403                for l_row in left_rows {
404                    for r_row in right_rows {
405                        if self.evaluate_join_condition(l_row, r_row, condition) {
406                            result.push(self.merge_rows(l_row, r_row));
407                        }
408                    }
409                }
410            }
411            JoinType::Left => {
412                for l_row in left_rows {
413                    let mut matched = false;
414                    for r_row in right_rows {
415                        if self.evaluate_join_condition(l_row, r_row, condition) {
416                            result.push(self.merge_rows(l_row, r_row));
417                            matched = true;
418                        }
419                    }
420                    if !matched {
421                        // Left row with NULL values for right side
422                        result.push(self.merge_rows_with_null(l_row, right_rows[0].columns.len()));
423                    }
424                }
425            }
426            JoinType::Right => {
427                for r_row in right_rows {
428                    let mut matched = false;
429                    for l_row in left_rows {
430                        if self.evaluate_join_condition(l_row, r_row, condition) {
431                            result.push(self.merge_rows(l_row, r_row));
432                            matched = true;
433                        }
434                    }
435                    if !matched {
436                        // NULL values for left side with right row
437                        result.push(self.merge_null_with_row(left_rows[0].columns.len(), r_row));
438                    }
439                }
440            }
441            JoinType::Full => {
442                let mut left_matched = vec![false; left_rows.len()];
443                let mut right_matched = vec![false; right_rows.len()];
444
445                for (l_idx, l_row) in left_rows.iter().enumerate() {
446                    for (r_idx, r_row) in right_rows.iter().enumerate() {
447                        if self.evaluate_join_condition(l_row, r_row, condition) {
448                            result.push(self.merge_rows(l_row, r_row));
449                            left_matched[l_idx] = true;
450                            right_matched[r_idx] = true;
451                        }
452                    }
453                }
454
455                // Add unmatched left rows
456                for (idx, matched) in left_matched.iter().enumerate() {
457                    if !*matched {
458                        result.push(
459                            self.merge_rows_with_null(&left_rows[idx], right_rows[0].columns.len()),
460                        );
461                    }
462                }
463
464                // Add unmatched right rows
465                for (idx, matched) in right_matched.iter().enumerate() {
466                    if !*matched {
467                        result.push(
468                            self.merge_null_with_row(left_rows[0].columns.len(), &right_rows[idx]),
469                        );
470                    }
471                }
472            }
473        }
474
475        Ok(result)
476    }
477
478    /// Hash join - efficient for larger datasets
479    fn hash_join_impl(
480        &mut self,
481        left_rows: &[Row],
482        right_rows: &[Row],
483        join_type: &JoinType,
484        condition: &Expression,
485    ) -> Result<Vec<Row>> {
486        // Build hash table from right side (build phase)
487        let mut hash_table: HashMap<Vec<u8>, Vec<&Row>> = HashMap::new();
488
489        for r_row in right_rows {
490            let key = self.extract_join_key(r_row, condition, true);
491            hash_table.entry(key).or_default().push(r_row);
492        }
493
494        let mut result = Vec::new();
495
496        match join_type {
497            JoinType::Inner => {
498                for l_row in left_rows {
499                    let key = self.extract_join_key(l_row, condition, false);
500                    if let Some(matching_rows) = hash_table.get(&key) {
501                        for r_row in matching_rows {
502                            if self.evaluate_join_condition(l_row, r_row, condition) {
503                                result.push(self.merge_rows(l_row, r_row));
504                            }
505                        }
506                    }
507                }
508            }
509            JoinType::Left => {
510                for l_row in left_rows {
511                    let key = self.extract_join_key(l_row, condition, false);
512                    if let Some(matching_rows) = hash_table.get(&key) {
513                        let mut matched = false;
514                        for r_row in matching_rows {
515                            if self.evaluate_join_condition(l_row, r_row, condition) {
516                                result.push(self.merge_rows(l_row, r_row));
517                                matched = true;
518                            }
519                        }
520                        if !matched {
521                            result.push(
522                                self.merge_rows_with_null(l_row, right_rows[0].columns.len()),
523                            );
524                        }
525                    } else {
526                        result.push(self.merge_rows_with_null(l_row, right_rows[0].columns.len()));
527                    }
528                }
529            }
530            JoinType::Right | JoinType::Full => {
531                // For RIGHT and FULL, fall back to nested loop
532                // (hash join is less efficient for these join types)
533                return self.nested_loop_join(left_rows, right_rows, join_type, condition);
534            }
535        }
536
537        Ok(result)
538    }
539
540    /// Extract join key from row for hashing
541    fn extract_join_key(&self, row: &Row, condition: &Expression, is_right: bool) -> Vec<u8> {
542        // Simple key extraction - would be more sophisticated in production
543        if let Expression::BinaryOp { left, right, .. } = condition {
544            if let (Expression::Column(left_col), Expression::Column(right_col)) =
545                (left.as_ref(), right.as_ref())
546            {
547                let col_name = if is_right { right_col } else { left_col };
548
549                // Strip table prefix if present (e.g., "users.id" -> "id")
550                let column_name = if let Some(dot_pos) = col_name.rfind('.') {
551                    &col_name[dot_pos + 1..]
552                } else {
553                    col_name
554                };
555
556                if let Some(idx) = row.columns.iter().position(|c| c.name == column_name) {
557                    return row.values[idx].to_bytes();
558                }
559            }
560        }
561        vec![]
562    }
563
564    /// Evaluate join condition for two rows
565    fn evaluate_join_condition(&self, left: &Row, right: &Row, condition: &Expression) -> bool {
566        match condition {
567            Expression::BinaryOp {
568                left: l_expr,
569                op,
570                right: r_expr,
571            } => {
572                let left_val = self.evaluate_expression_for_row(l_expr, left, right, true);
573                let right_val = self.evaluate_expression_for_row(r_expr, left, right, false);
574
575                if let (Some(lv), Some(rv)) = (left_val, right_val) {
576                    return lv.compare(&rv, op);
577                }
578                false
579            }
580            Expression::LogicalOp {
581                left: l_expr,
582                op,
583                right: r_expr,
584            } => {
585                let left_result = self.evaluate_join_condition(left, right, l_expr);
586                let right_result = self.evaluate_join_condition(left, right, r_expr);
587
588                match op {
589                    LogicalOperator::And => left_result && right_result,
590                    LogicalOperator::Or => left_result || right_result,
591                }
592            }
593            _ => true, // Default to true for unsupported conditions
594        }
595    }
596
597    /// Evaluate expression in the context of two joined rows
598    fn evaluate_expression_for_row(
599        &self,
600        expr: &Expression,
601        left_row: &Row,
602        right_row: &Row,
603        is_left: bool,
604    ) -> Option<Value> {
605        match expr {
606            Expression::Column(name) => {
607                // Strip table prefix if present (e.g., "users.id" -> "id")
608                let column_name = if let Some(dot_pos) = name.rfind('.') {
609                    &name[dot_pos + 1..]
610                } else {
611                    name
612                };
613
614                // Try to find column in appropriate row
615                let row = if is_left { left_row } else { right_row };
616                row.columns
617                    .iter()
618                    .position(|c| c.name == column_name)
619                    .map(|idx| row.values[idx].clone())
620            }
621            Expression::Literal(lit) => Some(self.literal_to_value(lit)),
622            _ => None,
623        }
624    }
625
626    /// Merge two rows into one
627    fn merge_rows(&self, left: &Row, right: &Row) -> Row {
628        let mut columns = left.columns.clone();
629        columns.extend(right.columns.clone());
630        let mut values = left.values.clone();
631        values.extend(right.values.clone());
632        Row { columns, values }
633    }
634
635    /// Merge left row with NULL values for right side
636    fn merge_rows_with_null(&self, left: &Row, right_col_count: usize) -> Row {
637        let columns = left.columns.clone();
638        let mut values = left.values.clone();
639        for _ in 0..right_col_count {
640            values.push(Value::Null);
641        }
642        Row { columns, values }
643    }
644
645    /// Merge NULL values for left side with right row
646    fn merge_null_with_row(&self, left_col_count: usize, right: &Row) -> Row {
647        let mut columns = Vec::new();
648        let mut values = Vec::new();
649        for _ in 0..left_col_count {
650            values.push(Value::Null);
651        }
652        columns.extend(right.columns.clone());
653        values.extend(right.values.clone());
654        Row { columns, values }
655    }
656
657    /// Convert literal to value
658    fn literal_to_value(&self, lit: &Literal) -> Value {
659        match lit {
660            Literal::Integer(i) => Value::Integer(*i),
661            Literal::Float(f) => Value::Float(*f),
662            Literal::String(s) => Value::String(s.clone()),
663            Literal::Boolean(b) => Value::Boolean(*b),
664            Literal::Null => Value::Null,
665        }
666    }
667
668    fn execute_group_by(
669        &mut self,
670        input: &PhysicalOperator,
671        group_columns: &[String],
672        aggregates: &[SelectColumn],
673        having: Option<&Expression>,
674    ) -> Result<Vec<Row>> {
675        let rows = self.execute_operator(input)?;
676
677        if rows.is_empty() {
678            return Ok(Vec::new());
679        }
680
681        // Group rows by the specified columns
682        let mut groups: HashMap<GroupKey, Vec<Row>> = HashMap::new();
683
684        for row in rows {
685            // Extract group key values
686            let mut key_values = Vec::new();
687            for group_col in group_columns {
688                if let Some(col_idx) = row.columns.iter().position(|c| &c.name == group_col) {
689                    key_values.push(GroupValue::from(&row.values[col_idx]));
690                } else {
691                    key_values.push(GroupValue::Null);
692                }
693            }
694
695            let group_key = GroupKey(key_values);
696            groups.entry(group_key).or_default().push(row);
697        }
698
699        // Apply aggregates for each group
700        let mut result_rows = Vec::new();
701
702        for (group_key, group_rows) in groups {
703            let mut result_columns = Vec::new();
704            let mut result_values = Vec::new();
705
706            // Add group columns
707            for (i, col_name) in group_columns.iter().enumerate() {
708                result_columns.push(Column {
709                    name: col_name.clone(),
710                    alias: None,
711                });
712                // Convert GroupValue back to Value
713                let value = match &group_key.0[i] {
714                    GroupValue::Integer(i) => Value::Integer(*i),
715                    GroupValue::Float(bits) => Value::Float(f64::from_bits(*bits as u64)),
716                    GroupValue::String(s) => Value::String(s.clone()),
717                    GroupValue::Boolean(b) => Value::Boolean(*b),
718                    GroupValue::Null => Value::Null,
719                };
720                result_values.push(value);
721            }
722
723            // Add aggregate columns
724            for agg in aggregates {
725                if let SelectColumn::Aggregate {
726                    function,
727                    column,
728                    alias,
729                } = agg
730                {
731                    let col_name = match column.as_ref() {
732                        SelectColumn::Wildcard => "*",
733                        SelectColumn::Column { name, .. } => name.as_str(),
734                        _ => continue,
735                    };
736
737                    let value = self.compute_aggregate(function, col_name, &group_rows)?;
738
739                    let display_name = alias
740                        .as_ref()
741                        .cloned()
742                        .unwrap_or_else(|| format!("{}({})", function, col_name));
743
744                    result_columns.push(Column {
745                        name: display_name.clone(),
746                        alias: alias.clone(),
747                    });
748                    result_values.push(value);
749                } else if let SelectColumn::Column { name, alias: _ } = agg {
750                    // Non-aggregate column (must be in GROUP BY)
751                    if !group_columns.contains(name) {
752                        // This would be a SQL error - column must be in GROUP BY or be aggregated
753                        continue;
754                    }
755                }
756            }
757
758            let row = Row {
759                columns: result_columns,
760                values: result_values,
761            };
762
763            // Apply HAVING clause if present
764            if let Some(having_condition) = having {
765                if self.evaluate_condition(&row, having_condition) {
766                    result_rows.push(row);
767                }
768            } else {
769                result_rows.push(row);
770            }
771        }
772
773        Ok(result_rows)
774    }
775
776    fn compute_aggregate(
777        &self,
778        function: &AggregateFunction,
779        col_name: &str,
780        rows: &[Row],
781    ) -> Result<Value> {
782        if rows.is_empty() {
783            return Ok(Value::Null);
784        }
785
786        match function {
787            AggregateFunction::Count => {
788                if col_name == "*" {
789                    // COUNT(*): Count all rows
790                    Ok(Value::Integer(rows.len() as i64))
791                } else {
792                    // COUNT(column): Count non-null values
793                    let col_idx = rows
794                        .iter()
795                        .find_map(|r| r.columns.iter().position(|c| c.name == col_name));
796
797                    if let Some(idx) = col_idx {
798                        let count = rows
799                            .iter()
800                            .filter(|r| {
801                                idx < r.values.len() && !matches!(r.values[idx], Value::Null)
802                            })
803                            .count();
804                        Ok(Value::Integer(count as i64))
805                    } else {
806                        Ok(Value::Integer(0))
807                    }
808                }
809            }
810            AggregateFunction::Sum => {
811                // Find column index - check all rows if first doesn't have it
812                let col_idx = rows
813                    .iter()
814                    .find_map(|r| r.columns.iter().position(|c| c.name == col_name));
815
816                if let Some(idx) = col_idx {
817                    let sum: i64 = rows
818                        .iter()
819                        .filter_map(|r| {
820                            if idx < r.values.len() {
821                                match &r.values[idx] {
822                                    Value::Integer(i) => Some(i),
823                                    _ => None,
824                                }
825                            } else {
826                                None
827                            }
828                        })
829                        .sum();
830                    Ok(Value::Integer(sum))
831                } else {
832                    Ok(Value::Null)
833                }
834            }
835            AggregateFunction::Avg => {
836                let col_idx = rows
837                    .iter()
838                    .find_map(|r| r.columns.iter().position(|c| c.name == col_name));
839
840                if let Some(idx) = col_idx {
841                    let values: Vec<i64> = rows
842                        .iter()
843                        .filter_map(|r| {
844                            if idx < r.values.len() {
845                                match &r.values[idx] {
846                                    Value::Integer(i) => Some(*i),
847                                    _ => None,
848                                }
849                            } else {
850                                None
851                            }
852                        })
853                        .collect();
854                    if !values.is_empty() {
855                        let sum: i64 = values.iter().sum();
856                        Ok(Value::Float(sum as f64 / values.len() as f64))
857                    } else {
858                        Ok(Value::Null)
859                    }
860                } else {
861                    Ok(Value::Null)
862                }
863            }
864            AggregateFunction::Min => {
865                let col_idx = rows
866                    .iter()
867                    .find_map(|r| r.columns.iter().position(|c| c.name == col_name));
868
869                if let Some(idx) = col_idx {
870                    Ok(rows
871                        .iter()
872                        .filter_map(|r| {
873                            if idx < r.values.len() {
874                                Some(&r.values[idx])
875                            } else {
876                                None
877                            }
878                        })
879                        .min_by(|a, b| match (a, b) {
880                            (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
881                            (Value::Float(a), Value::Float(b)) => {
882                                a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
883                            }
884                            (Value::String(a), Value::String(b)) => a.cmp(b),
885                            _ => std::cmp::Ordering::Equal,
886                        })
887                        .cloned()
888                        .unwrap_or(Value::Null))
889                } else {
890                    Ok(Value::Null)
891                }
892            }
893            AggregateFunction::Max => {
894                let col_idx = rows
895                    .iter()
896                    .find_map(|r| r.columns.iter().position(|c| c.name == col_name));
897
898                if let Some(idx) = col_idx {
899                    Ok(rows
900                        .iter()
901                        .filter_map(|r| {
902                            if idx < r.values.len() {
903                                Some(&r.values[idx])
904                            } else {
905                                None
906                            }
907                        })
908                        .max_by(|a, b| match (a, b) {
909                            (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
910                            (Value::Float(a), Value::Float(b)) => {
911                                a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
912                            }
913                            (Value::String(a), Value::String(b)) => a.cmp(b),
914                            _ => std::cmp::Ordering::Equal,
915                        })
916                        .cloned()
917                        .unwrap_or(Value::Null))
918                } else {
919                    Ok(Value::Null)
920                }
921            }
922        }
923    }
924
925    fn execute_aggregate(
926        &mut self,
927        input: &PhysicalOperator,
928        aggregates: &[SelectColumn],
929    ) -> Result<Vec<Row>> {
930        let rows = self.execute_operator(input)?;
931
932        let mut result_columns = Vec::new();
933        let mut result_values = Vec::new();
934
935        for agg in aggregates {
936            if let SelectColumn::Aggregate {
937                function,
938                column,
939                alias,
940            } = agg
941            {
942                let col_name = match column.as_ref() {
943                    SelectColumn::Wildcard => "*",
944                    SelectColumn::Column { name, .. } => name.as_str(),
945                    _ => continue,
946                };
947
948                let value = self.compute_aggregate(function, col_name, &rows)?;
949
950                let display_name = alias
951                    .as_ref()
952                    .cloned()
953                    .unwrap_or_else(|| format!("{}({})", function, col_name));
954
955                result_columns.push(Column {
956                    name: display_name.clone(),
957                    alias: alias.clone(),
958                });
959                result_values.push(value);
960            }
961        }
962
963        Ok(vec![Row {
964            columns: result_columns,
965            values: result_values,
966        }])
967    }
968
969    fn evaluate_condition(&self, row: &Row, condition: &Expression) -> bool {
970        match condition {
971            Expression::Column(name) => {
972                // Column reference - check if exists and is truthy
973                row.columns.iter().any(|c| &c.name == name)
974            }
975            Expression::Literal(lit) => {
976                // Literal value
977                match lit {
978                    Literal::Boolean(b) => *b,
979                    _ => true,
980                }
981            }
982            Expression::BinaryOp { left, op, right } => {
983                let left_val = self.evaluate_expression(row, left);
984                let right_val = self.evaluate_expression(row, right);
985
986                if let (Some(l), Some(r)) = (left_val, right_val) {
987                    l.compare(&r, op)
988                } else {
989                    false
990                }
991            }
992            Expression::LogicalOp { left, op, right } => {
993                let left_result = self.evaluate_condition(row, left);
994                let right_result = self.evaluate_condition(row, right);
995
996                match op {
997                    LogicalOperator::And => left_result && right_result,
998                    LogicalOperator::Or => left_result || right_result,
999                }
1000            }
1001            Expression::Not(expr) => !self.evaluate_condition(row, expr),
1002            Expression::Like { expr, pattern } => {
1003                if let Some(Value::String(s)) = self.evaluate_expression(row, expr) {
1004                    // Simplified LIKE - just use contains for now
1005                    let pattern = pattern.replace('%', "");
1006                    s.contains(&pattern)
1007                } else {
1008                    false
1009                }
1010            }
1011            Expression::In { expr, values } => {
1012                self.evaluate_expression(row, expr).is_some_and(|val| {
1013                    values.iter().any(|lit| {
1014                        let lit_val = literal_to_value(lit);
1015                        val == lit_val
1016                    })
1017                })
1018            }
1019            Expression::Between { expr, min, max } => {
1020                if let (Some(val), Some(min_v), Some(max_v)) = (
1021                    self.evaluate_expression(row, expr),
1022                    self.evaluate_expression(row, min),
1023                    self.evaluate_expression(row, max),
1024                ) {
1025                    val.compare(&min_v, &BinaryOperator::Ge)
1026                        && val.compare(&max_v, &BinaryOperator::Le)
1027                } else {
1028                    false
1029                }
1030            }
1031        }
1032    }
1033
1034    fn evaluate_expression(&self, row: &Row, expr: &Expression) -> Option<Value> {
1035        match expr {
1036            Expression::Column(name) => row
1037                .columns
1038                .iter()
1039                .position(|c| &c.name == name)
1040                .and_then(|idx| row.values.get(idx).cloned()),
1041            Expression::Literal(lit) => Some(literal_to_value(lit)),
1042            _ => None,
1043        }
1044    }
1045}
1046
1047fn literal_to_value(lit: &Literal) -> Value {
1048    match lit {
1049        Literal::Integer(i) => Value::Integer(*i),
1050        Literal::Float(f) => Value::Float(*f),
1051        Literal::String(s) => Value::String(s.clone()),
1052        Literal::Boolean(b) => Value::Boolean(*b),
1053        Literal::Null => Value::Null,
1054    }
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059    use super::*;
1060    use crate::query::parser::Parser;
1061    use crate::query::planner::Planner;
1062
1063    #[test]
1064    fn test_table_scan() {
1065        let mut context = ExecutionContext::new();
1066        context.data.insert(
1067            "users".to_string(),
1068            vec![
1069                Row {
1070                    columns: vec![
1071                        Column {
1072                            name: "id".to_string(),
1073                            alias: None,
1074                        },
1075                        Column {
1076                            name: "name".to_string(),
1077                            alias: None,
1078                        },
1079                    ],
1080                    values: vec![Value::Integer(1), Value::String("Alice".to_string())],
1081                },
1082                Row {
1083                    columns: vec![
1084                        Column {
1085                            name: "id".to_string(),
1086                            alias: None,
1087                        },
1088                        Column {
1089                            name: "name".to_string(),
1090                            alias: None,
1091                        },
1092                    ],
1093                    values: vec![Value::Integer(2), Value::String("Bob".to_string())],
1094                },
1095            ],
1096        );
1097
1098        let mut executor = Executor::new(context);
1099
1100        let mut parser = Parser::new("SELECT * FROM users").unwrap();
1101        let query = parser.parse().unwrap();
1102        let planner = Planner::new();
1103        let plan = planner.plan(&query).unwrap();
1104
1105        let result = executor.execute(&plan).unwrap();
1106        assert_eq!(result.len(), 2);
1107    }
1108}