rustlite_core/query/
executor.rs

1/// Query executor
2///
3/// Executes physical query plans using iterators.
4use super::ast::*;
5use super::planner::{PhysicalOperator, PhysicalPlan};
6use crate::error::Result;
7use std::collections::HashMap;
8use std::fmt;
9
10/// Query result row
11#[derive(Debug, Clone, PartialEq)]
12pub struct Row {
13    pub columns: Vec<Column>,
14    pub values: Vec<Value>,
15}
16
17/// Column metadata
18#[derive(Debug, Clone, PartialEq)]
19pub struct Column {
20    pub name: String,
21    pub alias: Option<String>,
22}
23
24/// Value types in query results
25#[derive(Debug, Clone, PartialEq)]
26pub enum Value {
27    Integer(i64),
28    Float(f64),
29    String(String),
30    Boolean(bool),
31    Null,
32}
33
34impl Value {
35    /// Convert value to bytes for comparison
36    pub fn to_bytes(&self) -> Vec<u8> {
37        match self {
38            Value::Integer(i) => i.to_le_bytes().to_vec(),
39            Value::Float(f) => f.to_le_bytes().to_vec(),
40            Value::String(s) => s.as_bytes().to_vec(),
41            Value::Boolean(b) => vec![if *b { 1 } else { 0 }],
42            Value::Null => vec![],
43        }
44    }
45
46    /// Compare values
47    pub fn compare(&self, other: &Value, op: &BinaryOperator) -> bool {
48        match (self, other) {
49            (Value::Integer(a), Value::Integer(b)) => match op {
50                BinaryOperator::Eq => a == b,
51                BinaryOperator::Ne => a != b,
52                BinaryOperator::Lt => a < b,
53                BinaryOperator::Le => a <= b,
54                BinaryOperator::Gt => a > b,
55                BinaryOperator::Ge => a >= b,
56            },
57            (Value::Float(a), Value::Float(b)) => match op {
58                BinaryOperator::Eq => (a - b).abs() < f64::EPSILON,
59                BinaryOperator::Ne => (a - b).abs() >= f64::EPSILON,
60                BinaryOperator::Lt => a < b,
61                BinaryOperator::Le => a <= b,
62                BinaryOperator::Gt => a > b,
63                BinaryOperator::Ge => a >= b,
64            },
65            (Value::String(a), Value::String(b)) => match op {
66                BinaryOperator::Eq => a == b,
67                BinaryOperator::Ne => a != b,
68                BinaryOperator::Lt => a < b,
69                BinaryOperator::Le => a <= b,
70                BinaryOperator::Gt => a > b,
71                BinaryOperator::Ge => a >= b,
72            },
73            (Value::Boolean(a), Value::Boolean(b)) => match op {
74                BinaryOperator::Eq => a == b,
75                BinaryOperator::Ne => a != b,
76                _ => false,
77            },
78            (Value::Null, Value::Null) => matches!(op, BinaryOperator::Eq),
79            _ => false,
80        }
81    }
82}
83
84impl fmt::Display for Value {
85    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86        match self {
87            Value::Integer(i) => write!(f, "{}", i),
88            Value::Float(fl) => write!(f, "{}", fl),
89            Value::String(s) => write!(f, "{}", s),
90            Value::Boolean(b) => write!(f, "{}", b),
91            Value::Null => write!(f, "NULL"),
92        }
93    }
94}
95
96/// Query execution context
97#[derive(Clone, Default)]
98pub struct ExecutionContext {
99    /// Storage backend access (simplified - would integrate with actual storage)
100    pub data: HashMap<String, Vec<Row>>,
101    /// Index access (simplified)
102    pub indexes: HashMap<String, HashMap<Vec<u8>, Vec<u64>>>,
103}
104
105impl ExecutionContext {
106    /// Creates a new execution context
107    pub fn new() -> Self {
108        Self::default()
109    }
110}
111
112/// Query executor
113pub struct Executor {
114    context: ExecutionContext,
115}
116
117impl Executor {
118    /// Create new executor
119    pub fn new(context: ExecutionContext) -> Self {
120        Self { context }
121    }
122
123    /// Execute a physical plan
124    pub fn execute(&mut self, plan: &PhysicalPlan) -> Result<Vec<Row>> {
125        self.execute_operator(&plan.root)
126    }
127
128    fn execute_operator(&mut self, op: &PhysicalOperator) -> Result<Vec<Row>> {
129        match op {
130            PhysicalOperator::TableScan { table } => self.execute_table_scan(table),
131            PhysicalOperator::IndexScan { table, index, key } => {
132                self.execute_index_scan(table, index, key)
133            }
134            PhysicalOperator::IndexRangeScan {
135                table,
136                index,
137                start,
138                end,
139            } => self.execute_index_range_scan(table, index, start.as_deref(), end.as_deref()),
140            PhysicalOperator::Filter { input, condition } => self.execute_filter(input, condition),
141            PhysicalOperator::Sort { input, columns } => self.execute_sort(input, columns),
142            PhysicalOperator::Limit {
143                input,
144                count,
145                offset,
146            } => self.execute_limit(input, *count, *offset),
147            PhysicalOperator::Project { input, columns } => self.execute_project(input, columns),
148            PhysicalOperator::HashJoin {
149                left,
150                right,
151                join_type,
152                condition,
153            } => self.execute_hash_join(left, right, join_type, condition),
154            PhysicalOperator::Aggregate { input, aggregates } => {
155                self.execute_aggregate(input, aggregates)
156            }
157        }
158    }
159
160    fn execute_table_scan(&mut self, table: &str) -> Result<Vec<Row>> {
161        // Get all rows from table
162        Ok(self.context.data.get(table).cloned().unwrap_or_default())
163    }
164
165    fn execute_index_scan(&mut self, table: &str, index: &str, key: &[u8]) -> Result<Vec<Row>> {
166        // Look up row IDs from index
167        let row_ids = self
168            .context
169            .indexes
170            .get(index)
171            .and_then(|idx| idx.get(key))
172            .cloned()
173            .unwrap_or_default();
174
175        // Fetch rows by ID
176        let all_rows = self.context.data.get(table).cloned().unwrap_or_default();
177        let result = row_ids
178            .iter()
179            .filter_map(|&id| all_rows.get(id as usize).cloned())
180            .collect();
181
182        Ok(result)
183    }
184
185    fn execute_index_range_scan(
186        &mut self,
187        table: &str,
188        index: &str,
189        start: Option<&[u8]>,
190        end: Option<&[u8]>,
191    ) -> Result<Vec<Row>> {
192        // Get all keys from index in range
193        let index_data = self.context.indexes.get(index).cloned().unwrap_or_default();
194
195        let mut row_ids = Vec::new();
196        for (key, ids) in index_data {
197            let in_range = match (start, end) {
198                (Some(s), Some(e)) => key.as_slice() >= s && key.as_slice() <= e,
199                (Some(s), None) => key.as_slice() >= s,
200                (None, Some(e)) => key.as_slice() <= e,
201                (None, None) => true,
202            };
203
204            if in_range {
205                row_ids.extend(ids);
206            }
207        }
208
209        // Fetch rows by ID
210        let all_rows = self.context.data.get(table).cloned().unwrap_or_default();
211        let result = row_ids
212            .iter()
213            .filter_map(|&id| all_rows.get(id as usize).cloned())
214            .collect();
215
216        Ok(result)
217    }
218
219    fn execute_filter(
220        &mut self,
221        input: &PhysicalOperator,
222        condition: &Expression,
223    ) -> Result<Vec<Row>> {
224        let rows = self.execute_operator(input)?;
225
226        let filtered = rows
227            .into_iter()
228            .filter(|row| self.evaluate_condition(row, condition))
229            .collect();
230
231        Ok(filtered)
232    }
233
234    fn execute_sort(
235        &mut self,
236        input: &PhysicalOperator,
237        columns: &[OrderByColumn],
238    ) -> Result<Vec<Row>> {
239        let mut rows = self.execute_operator(input)?;
240
241        rows.sort_by(|a, b| {
242            for col in columns {
243                let a_idx = a.columns.iter().position(|c| c.name == col.column);
244                let b_idx = b.columns.iter().position(|c| c.name == col.column);
245
246                if let (Some(a_idx), Some(b_idx)) = (a_idx, b_idx) {
247                    let ordering = match (&a.values[a_idx], &b.values[b_idx]) {
248                        (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
249                        (Value::Float(a), Value::Float(b)) => {
250                            a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
251                        }
252                        (Value::String(a), Value::String(b)) => a.cmp(b),
253                        (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
254                        _ => std::cmp::Ordering::Equal,
255                    };
256
257                    let ordering = match col.direction {
258                        OrderDirection::Asc => ordering,
259                        OrderDirection::Desc => ordering.reverse(),
260                    };
261
262                    if ordering != std::cmp::Ordering::Equal {
263                        return ordering;
264                    }
265                }
266            }
267            std::cmp::Ordering::Equal
268        });
269
270        Ok(rows)
271    }
272
273    fn execute_limit(
274        &mut self,
275        input: &PhysicalOperator,
276        count: usize,
277        offset: usize,
278    ) -> Result<Vec<Row>> {
279        let rows = self.execute_operator(input)?;
280        Ok(rows.into_iter().skip(offset).take(count).collect())
281    }
282
283    fn execute_project(
284        &mut self,
285        input: &PhysicalOperator,
286        columns: &[SelectColumn],
287    ) -> Result<Vec<Row>> {
288        let rows = self.execute_operator(input)?;
289
290        let projected = rows
291            .into_iter()
292            .map(|row| {
293                let mut new_columns = Vec::new();
294                let mut new_values = Vec::new();
295
296                for col in columns {
297                    match col {
298                        SelectColumn::Wildcard => {
299                            new_columns.extend(row.columns.clone());
300                            new_values.extend(row.values.clone());
301                        }
302                        SelectColumn::Column { name, alias } => {
303                            if let Some(idx) = row.columns.iter().position(|c| &c.name == name) {
304                                new_columns.push(Column {
305                                    name: name.clone(),
306                                    alias: alias.clone(),
307                                });
308                                new_values.push(row.values[idx].clone());
309                            }
310                        }
311                        SelectColumn::Aggregate { .. } => {
312                            // Aggregates handled by Aggregate operator
313                        }
314                    }
315                }
316
317                Row {
318                    columns: new_columns,
319                    values: new_values,
320                }
321            })
322            .collect();
323
324        Ok(projected)
325    }
326
327    fn execute_hash_join(
328        &mut self,
329        left: &PhysicalOperator,
330        right: &PhysicalOperator,
331        _join_type: &JoinType,
332        _condition: &Expression,
333    ) -> Result<Vec<Row>> {
334        let left_rows = self.execute_operator(left)?;
335        let right_rows = self.execute_operator(right)?;
336
337        // Simplified join - cartesian product
338        // In a real implementation, this would use the condition to match rows
339        let mut result = Vec::new();
340        for l_row in &left_rows {
341            for r_row in &right_rows {
342                let mut columns = l_row.columns.clone();
343                columns.extend(r_row.columns.clone());
344                let mut values = l_row.values.clone();
345                values.extend(r_row.values.clone());
346
347                result.push(Row { columns, values });
348            }
349        }
350
351        Ok(result)
352    }
353
354    fn execute_aggregate(
355        &mut self,
356        input: &PhysicalOperator,
357        aggregates: &[SelectColumn],
358    ) -> Result<Vec<Row>> {
359        let rows = self.execute_operator(input)?;
360
361        let mut result_columns = Vec::new();
362        let mut result_values = Vec::new();
363
364        for agg in aggregates {
365            if let SelectColumn::Aggregate {
366                function,
367                column,
368                alias,
369            } = agg
370            {
371                let col_name = match column.as_ref() {
372                    SelectColumn::Wildcard => "*",
373                    SelectColumn::Column { name, .. } => name.as_str(),
374                    _ => continue,
375                };
376
377                let value = match function {
378                    AggregateFunction::Count => Value::Integer(rows.len() as i64),
379                    AggregateFunction::Sum => {
380                        let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
381                        if let Some(idx) = col_idx {
382                            let sum: i64 = rows
383                                .iter()
384                                .filter_map(|r| match &r.values[idx] {
385                                    Value::Integer(i) => Some(i),
386                                    _ => None,
387                                })
388                                .sum();
389                            Value::Integer(sum)
390                        } else {
391                            Value::Null
392                        }
393                    }
394                    AggregateFunction::Avg => {
395                        let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
396                        if let Some(idx) = col_idx {
397                            let values: Vec<i64> = rows
398                                .iter()
399                                .filter_map(|r| match &r.values[idx] {
400                                    Value::Integer(i) => Some(*i),
401                                    _ => None,
402                                })
403                                .collect();
404                            if !values.is_empty() {
405                                let sum: i64 = values.iter().sum();
406                                Value::Float(sum as f64 / values.len() as f64)
407                            } else {
408                                Value::Null
409                            }
410                        } else {
411                            Value::Null
412                        }
413                    }
414                    AggregateFunction::Min => {
415                        let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
416                        if let Some(idx) = col_idx {
417                            rows.iter()
418                                .map(|r| &r.values[idx])
419                                .min_by(|a, b| match (a, b) {
420                                    (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
421                                    _ => std::cmp::Ordering::Equal,
422                                })
423                                .cloned()
424                                .unwrap_or(Value::Null)
425                        } else {
426                            Value::Null
427                        }
428                    }
429                    AggregateFunction::Max => {
430                        let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
431                        if let Some(idx) = col_idx {
432                            rows.iter()
433                                .map(|r| &r.values[idx])
434                                .max_by(|a, b| match (a, b) {
435                                    (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
436                                    _ => std::cmp::Ordering::Equal,
437                                })
438                                .cloned()
439                                .unwrap_or(Value::Null)
440                        } else {
441                            Value::Null
442                        }
443                    }
444                };
445
446                let display_name = alias
447                    .as_ref()
448                    .cloned()
449                    .unwrap_or_else(|| format!("{}({})", function, col_name));
450
451                result_columns.push(Column {
452                    name: display_name.clone(),
453                    alias: alias.clone(),
454                });
455                result_values.push(value);
456            }
457        }
458
459        Ok(vec![Row {
460            columns: result_columns,
461            values: result_values,
462        }])
463    }
464
465    fn evaluate_condition(&self, row: &Row, condition: &Expression) -> bool {
466        match condition {
467            Expression::Column(name) => {
468                // Column reference - check if exists and is truthy
469                row.columns.iter().any(|c| &c.name == name)
470            }
471            Expression::Literal(lit) => {
472                // Literal value
473                match lit {
474                    Literal::Boolean(b) => *b,
475                    _ => true,
476                }
477            }
478            Expression::BinaryOp { left, op, right } => {
479                let left_val = self.evaluate_expression(row, left);
480                let right_val = self.evaluate_expression(row, right);
481
482                if let (Some(l), Some(r)) = (left_val, right_val) {
483                    l.compare(&r, op)
484                } else {
485                    false
486                }
487            }
488            Expression::LogicalOp { left, op, right } => {
489                let left_result = self.evaluate_condition(row, left);
490                let right_result = self.evaluate_condition(row, right);
491
492                match op {
493                    LogicalOperator::And => left_result && right_result,
494                    LogicalOperator::Or => left_result || right_result,
495                }
496            }
497            Expression::Not(expr) => !self.evaluate_condition(row, expr),
498            Expression::Like { expr, pattern } => {
499                if let Some(Value::String(s)) = self.evaluate_expression(row, expr) {
500                    // Simplified LIKE - just use contains for now
501                    let pattern = pattern.replace('%', "");
502                    s.contains(&pattern)
503                } else {
504                    false
505                }
506            }
507            Expression::In { expr, values } => {
508                self.evaluate_expression(row, expr).is_some_and(|val| {
509                    values.iter().any(|lit| {
510                        let lit_val = literal_to_value(lit);
511                        val == lit_val
512                    })
513                })
514            }
515            Expression::Between { expr, min, max } => {
516                if let (Some(val), Some(min_v), Some(max_v)) = (
517                    self.evaluate_expression(row, expr),
518                    self.evaluate_expression(row, min),
519                    self.evaluate_expression(row, max),
520                ) {
521                    val.compare(&min_v, &BinaryOperator::Ge)
522                        && val.compare(&max_v, &BinaryOperator::Le)
523                } else {
524                    false
525                }
526            }
527        }
528    }
529
530    fn evaluate_expression(&self, row: &Row, expr: &Expression) -> Option<Value> {
531        match expr {
532            Expression::Column(name) => row
533                .columns
534                .iter()
535                .position(|c| &c.name == name)
536                .and_then(|idx| row.values.get(idx).cloned()),
537            Expression::Literal(lit) => Some(literal_to_value(lit)),
538            _ => None,
539        }
540    }
541}
542
543fn literal_to_value(lit: &Literal) -> Value {
544    match lit {
545        Literal::Integer(i) => Value::Integer(*i),
546        Literal::Float(f) => Value::Float(*f),
547        Literal::String(s) => Value::String(s.clone()),
548        Literal::Boolean(b) => Value::Boolean(*b),
549        Literal::Null => Value::Null,
550    }
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556    use crate::query::parser::Parser;
557    use crate::query::planner::Planner;
558
559    #[test]
560    fn test_table_scan() {
561        let mut context = ExecutionContext::new();
562        context.data.insert(
563            "users".to_string(),
564            vec![
565                Row {
566                    columns: vec![
567                        Column {
568                            name: "id".to_string(),
569                            alias: None,
570                        },
571                        Column {
572                            name: "name".to_string(),
573                            alias: None,
574                        },
575                    ],
576                    values: vec![Value::Integer(1), Value::String("Alice".to_string())],
577                },
578                Row {
579                    columns: vec![
580                        Column {
581                            name: "id".to_string(),
582                            alias: None,
583                        },
584                        Column {
585                            name: "name".to_string(),
586                            alias: None,
587                        },
588                    ],
589                    values: vec![Value::Integer(2), Value::String("Bob".to_string())],
590                },
591            ],
592        );
593
594        let mut executor = Executor::new(context);
595
596        let mut parser = Parser::new("SELECT * FROM users").unwrap();
597        let query = parser.parse().unwrap();
598        let planner = Planner::new();
599        let plan = planner.plan(&query).unwrap();
600
601        let result = executor.execute(&plan).unwrap();
602        assert_eq!(result.len(), 2);
603    }
604}