1use super::ast::*;
5use super::planner::{PhysicalOperator, PhysicalPlan};
6use crate::error::Result;
7use std::collections::HashMap;
8use std::fmt;
9
10#[derive(Debug, Clone, PartialEq)]
12pub struct Row {
13 pub columns: Vec<Column>,
14 pub values: Vec<Value>,
15}
16
17#[derive(Debug, Clone, PartialEq)]
19pub struct Column {
20 pub name: String,
21 pub alias: Option<String>,
22}
23
24#[derive(Debug, Clone, PartialEq)]
26pub enum Value {
27 Integer(i64),
28 Float(f64),
29 String(String),
30 Boolean(bool),
31 Null,
32}
33
34impl Value {
35 pub fn to_bytes(&self) -> Vec<u8> {
37 match self {
38 Value::Integer(i) => i.to_le_bytes().to_vec(),
39 Value::Float(f) => f.to_le_bytes().to_vec(),
40 Value::String(s) => s.as_bytes().to_vec(),
41 Value::Boolean(b) => vec![if *b { 1 } else { 0 }],
42 Value::Null => vec![],
43 }
44 }
45
46 pub fn compare(&self, other: &Value, op: &BinaryOperator) -> bool {
48 match (self, other) {
49 (Value::Integer(a), Value::Integer(b)) => match op {
50 BinaryOperator::Eq => a == b,
51 BinaryOperator::Ne => a != b,
52 BinaryOperator::Lt => a < b,
53 BinaryOperator::Le => a <= b,
54 BinaryOperator::Gt => a > b,
55 BinaryOperator::Ge => a >= b,
56 },
57 (Value::Float(a), Value::Float(b)) => match op {
58 BinaryOperator::Eq => (a - b).abs() < f64::EPSILON,
59 BinaryOperator::Ne => (a - b).abs() >= f64::EPSILON,
60 BinaryOperator::Lt => a < b,
61 BinaryOperator::Le => a <= b,
62 BinaryOperator::Gt => a > b,
63 BinaryOperator::Ge => a >= b,
64 },
65 (Value::String(a), Value::String(b)) => match op {
66 BinaryOperator::Eq => a == b,
67 BinaryOperator::Ne => a != b,
68 BinaryOperator::Lt => a < b,
69 BinaryOperator::Le => a <= b,
70 BinaryOperator::Gt => a > b,
71 BinaryOperator::Ge => a >= b,
72 },
73 (Value::Boolean(a), Value::Boolean(b)) => match op {
74 BinaryOperator::Eq => a == b,
75 BinaryOperator::Ne => a != b,
76 _ => false,
77 },
78 (Value::Null, Value::Null) => matches!(op, BinaryOperator::Eq),
79 _ => false,
80 }
81 }
82}
83
84impl fmt::Display for Value {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 match self {
87 Value::Integer(i) => write!(f, "{}", i),
88 Value::Float(fl) => write!(f, "{}", fl),
89 Value::String(s) => write!(f, "{}", s),
90 Value::Boolean(b) => write!(f, "{}", b),
91 Value::Null => write!(f, "NULL"),
92 }
93 }
94}
95
96#[derive(Clone, Default)]
98pub struct ExecutionContext {
99 pub data: HashMap<String, Vec<Row>>,
101 pub indexes: HashMap<String, HashMap<Vec<u8>, Vec<u64>>>,
103}
104
105impl ExecutionContext {
106 pub fn new() -> Self {
108 Self::default()
109 }
110}
111
112pub struct Executor {
114 context: ExecutionContext,
115}
116
117impl Executor {
118 pub fn new(context: ExecutionContext) -> Self {
120 Self { context }
121 }
122
123 pub fn execute(&mut self, plan: &PhysicalPlan) -> Result<Vec<Row>> {
125 self.execute_operator(&plan.root)
126 }
127
128 fn execute_operator(&mut self, op: &PhysicalOperator) -> Result<Vec<Row>> {
129 match op {
130 PhysicalOperator::TableScan { table } => self.execute_table_scan(table),
131 PhysicalOperator::IndexScan { table, index, key } => {
132 self.execute_index_scan(table, index, key)
133 }
134 PhysicalOperator::IndexRangeScan {
135 table,
136 index,
137 start,
138 end,
139 } => self.execute_index_range_scan(table, index, start.as_deref(), end.as_deref()),
140 PhysicalOperator::Filter { input, condition } => self.execute_filter(input, condition),
141 PhysicalOperator::Sort { input, columns } => self.execute_sort(input, columns),
142 PhysicalOperator::Limit {
143 input,
144 count,
145 offset,
146 } => self.execute_limit(input, *count, *offset),
147 PhysicalOperator::Project { input, columns } => self.execute_project(input, columns),
148 PhysicalOperator::HashJoin {
149 left,
150 right,
151 join_type,
152 condition,
153 } => self.execute_hash_join(left, right, join_type, condition),
154 PhysicalOperator::Aggregate { input, aggregates } => {
155 self.execute_aggregate(input, aggregates)
156 }
157 }
158 }
159
160 fn execute_table_scan(&mut self, table: &str) -> Result<Vec<Row>> {
161 Ok(self.context.data.get(table).cloned().unwrap_or_default())
163 }
164
165 fn execute_index_scan(&mut self, table: &str, index: &str, key: &[u8]) -> Result<Vec<Row>> {
166 let row_ids = self
168 .context
169 .indexes
170 .get(index)
171 .and_then(|idx| idx.get(key))
172 .cloned()
173 .unwrap_or_default();
174
175 let all_rows = self.context.data.get(table).cloned().unwrap_or_default();
177 let result = row_ids
178 .iter()
179 .filter_map(|&id| all_rows.get(id as usize).cloned())
180 .collect();
181
182 Ok(result)
183 }
184
185 fn execute_index_range_scan(
186 &mut self,
187 table: &str,
188 index: &str,
189 start: Option<&[u8]>,
190 end: Option<&[u8]>,
191 ) -> Result<Vec<Row>> {
192 let index_data = self.context.indexes.get(index).cloned().unwrap_or_default();
194
195 let mut row_ids = Vec::new();
196 for (key, ids) in index_data {
197 let in_range = match (start, end) {
198 (Some(s), Some(e)) => key.as_slice() >= s && key.as_slice() <= e,
199 (Some(s), None) => key.as_slice() >= s,
200 (None, Some(e)) => key.as_slice() <= e,
201 (None, None) => true,
202 };
203
204 if in_range {
205 row_ids.extend(ids);
206 }
207 }
208
209 let all_rows = self.context.data.get(table).cloned().unwrap_or_default();
211 let result = row_ids
212 .iter()
213 .filter_map(|&id| all_rows.get(id as usize).cloned())
214 .collect();
215
216 Ok(result)
217 }
218
219 fn execute_filter(
220 &mut self,
221 input: &PhysicalOperator,
222 condition: &Expression,
223 ) -> Result<Vec<Row>> {
224 let rows = self.execute_operator(input)?;
225
226 let filtered = rows
227 .into_iter()
228 .filter(|row| self.evaluate_condition(row, condition))
229 .collect();
230
231 Ok(filtered)
232 }
233
234 fn execute_sort(
235 &mut self,
236 input: &PhysicalOperator,
237 columns: &[OrderByColumn],
238 ) -> Result<Vec<Row>> {
239 let mut rows = self.execute_operator(input)?;
240
241 rows.sort_by(|a, b| {
242 for col in columns {
243 let a_idx = a.columns.iter().position(|c| c.name == col.column);
244 let b_idx = b.columns.iter().position(|c| c.name == col.column);
245
246 if let (Some(a_idx), Some(b_idx)) = (a_idx, b_idx) {
247 let ordering = match (&a.values[a_idx], &b.values[b_idx]) {
248 (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
249 (Value::Float(a), Value::Float(b)) => {
250 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
251 }
252 (Value::String(a), Value::String(b)) => a.cmp(b),
253 (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
254 _ => std::cmp::Ordering::Equal,
255 };
256
257 let ordering = match col.direction {
258 OrderDirection::Asc => ordering,
259 OrderDirection::Desc => ordering.reverse(),
260 };
261
262 if ordering != std::cmp::Ordering::Equal {
263 return ordering;
264 }
265 }
266 }
267 std::cmp::Ordering::Equal
268 });
269
270 Ok(rows)
271 }
272
273 fn execute_limit(
274 &mut self,
275 input: &PhysicalOperator,
276 count: usize,
277 offset: usize,
278 ) -> Result<Vec<Row>> {
279 let rows = self.execute_operator(input)?;
280 Ok(rows.into_iter().skip(offset).take(count).collect())
281 }
282
283 fn execute_project(
284 &mut self,
285 input: &PhysicalOperator,
286 columns: &[SelectColumn],
287 ) -> Result<Vec<Row>> {
288 let rows = self.execute_operator(input)?;
289
290 let projected = rows
291 .into_iter()
292 .map(|row| {
293 let mut new_columns = Vec::new();
294 let mut new_values = Vec::new();
295
296 for col in columns {
297 match col {
298 SelectColumn::Wildcard => {
299 new_columns.extend(row.columns.clone());
300 new_values.extend(row.values.clone());
301 }
302 SelectColumn::Column { name, alias } => {
303 if let Some(idx) = row.columns.iter().position(|c| &c.name == name) {
304 new_columns.push(Column {
305 name: name.clone(),
306 alias: alias.clone(),
307 });
308 new_values.push(row.values[idx].clone());
309 }
310 }
311 SelectColumn::Aggregate { .. } => {
312 }
314 }
315 }
316
317 Row {
318 columns: new_columns,
319 values: new_values,
320 }
321 })
322 .collect();
323
324 Ok(projected)
325 }
326
327 fn execute_hash_join(
328 &mut self,
329 left: &PhysicalOperator,
330 right: &PhysicalOperator,
331 _join_type: &JoinType,
332 _condition: &Expression,
333 ) -> Result<Vec<Row>> {
334 let left_rows = self.execute_operator(left)?;
335 let right_rows = self.execute_operator(right)?;
336
337 let mut result = Vec::new();
340 for l_row in &left_rows {
341 for r_row in &right_rows {
342 let mut columns = l_row.columns.clone();
343 columns.extend(r_row.columns.clone());
344 let mut values = l_row.values.clone();
345 values.extend(r_row.values.clone());
346
347 result.push(Row { columns, values });
348 }
349 }
350
351 Ok(result)
352 }
353
354 fn execute_aggregate(
355 &mut self,
356 input: &PhysicalOperator,
357 aggregates: &[SelectColumn],
358 ) -> Result<Vec<Row>> {
359 let rows = self.execute_operator(input)?;
360
361 let mut result_columns = Vec::new();
362 let mut result_values = Vec::new();
363
364 for agg in aggregates {
365 if let SelectColumn::Aggregate {
366 function,
367 column,
368 alias,
369 } = agg
370 {
371 let col_name = match column.as_ref() {
372 SelectColumn::Wildcard => "*",
373 SelectColumn::Column { name, .. } => name.as_str(),
374 _ => continue,
375 };
376
377 let value = match function {
378 AggregateFunction::Count => Value::Integer(rows.len() as i64),
379 AggregateFunction::Sum => {
380 let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
381 if let Some(idx) = col_idx {
382 let sum: i64 = rows
383 .iter()
384 .filter_map(|r| match &r.values[idx] {
385 Value::Integer(i) => Some(i),
386 _ => None,
387 })
388 .sum();
389 Value::Integer(sum)
390 } else {
391 Value::Null
392 }
393 }
394 AggregateFunction::Avg => {
395 let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
396 if let Some(idx) = col_idx {
397 let values: Vec<i64> = rows
398 .iter()
399 .filter_map(|r| match &r.values[idx] {
400 Value::Integer(i) => Some(*i),
401 _ => None,
402 })
403 .collect();
404 if !values.is_empty() {
405 let sum: i64 = values.iter().sum();
406 Value::Float(sum as f64 / values.len() as f64)
407 } else {
408 Value::Null
409 }
410 } else {
411 Value::Null
412 }
413 }
414 AggregateFunction::Min => {
415 let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
416 if let Some(idx) = col_idx {
417 rows.iter()
418 .map(|r| &r.values[idx])
419 .min_by(|a, b| match (a, b) {
420 (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
421 _ => std::cmp::Ordering::Equal,
422 })
423 .cloned()
424 .unwrap_or(Value::Null)
425 } else {
426 Value::Null
427 }
428 }
429 AggregateFunction::Max => {
430 let col_idx = rows[0].columns.iter().position(|c| c.name == col_name);
431 if let Some(idx) = col_idx {
432 rows.iter()
433 .map(|r| &r.values[idx])
434 .max_by(|a, b| match (a, b) {
435 (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
436 _ => std::cmp::Ordering::Equal,
437 })
438 .cloned()
439 .unwrap_or(Value::Null)
440 } else {
441 Value::Null
442 }
443 }
444 };
445
446 let display_name = alias
447 .as_ref()
448 .cloned()
449 .unwrap_or_else(|| format!("{}({})", function, col_name));
450
451 result_columns.push(Column {
452 name: display_name.clone(),
453 alias: alias.clone(),
454 });
455 result_values.push(value);
456 }
457 }
458
459 Ok(vec![Row {
460 columns: result_columns,
461 values: result_values,
462 }])
463 }
464
465 fn evaluate_condition(&self, row: &Row, condition: &Expression) -> bool {
466 match condition {
467 Expression::Column(name) => {
468 row.columns.iter().any(|c| &c.name == name)
470 }
471 Expression::Literal(lit) => {
472 match lit {
474 Literal::Boolean(b) => *b,
475 _ => true,
476 }
477 }
478 Expression::BinaryOp { left, op, right } => {
479 let left_val = self.evaluate_expression(row, left);
480 let right_val = self.evaluate_expression(row, right);
481
482 if let (Some(l), Some(r)) = (left_val, right_val) {
483 l.compare(&r, op)
484 } else {
485 false
486 }
487 }
488 Expression::LogicalOp { left, op, right } => {
489 let left_result = self.evaluate_condition(row, left);
490 let right_result = self.evaluate_condition(row, right);
491
492 match op {
493 LogicalOperator::And => left_result && right_result,
494 LogicalOperator::Or => left_result || right_result,
495 }
496 }
497 Expression::Not(expr) => !self.evaluate_condition(row, expr),
498 Expression::Like { expr, pattern } => {
499 if let Some(Value::String(s)) = self.evaluate_expression(row, expr) {
500 let pattern = pattern.replace('%', "");
502 s.contains(&pattern)
503 } else {
504 false
505 }
506 }
507 Expression::In { expr, values } => {
508 self.evaluate_expression(row, expr).is_some_and(|val| {
509 values.iter().any(|lit| {
510 let lit_val = literal_to_value(lit);
511 val == lit_val
512 })
513 })
514 }
515 Expression::Between { expr, min, max } => {
516 if let (Some(val), Some(min_v), Some(max_v)) = (
517 self.evaluate_expression(row, expr),
518 self.evaluate_expression(row, min),
519 self.evaluate_expression(row, max),
520 ) {
521 val.compare(&min_v, &BinaryOperator::Ge)
522 && val.compare(&max_v, &BinaryOperator::Le)
523 } else {
524 false
525 }
526 }
527 }
528 }
529
530 fn evaluate_expression(&self, row: &Row, expr: &Expression) -> Option<Value> {
531 match expr {
532 Expression::Column(name) => row
533 .columns
534 .iter()
535 .position(|c| &c.name == name)
536 .and_then(|idx| row.values.get(idx).cloned()),
537 Expression::Literal(lit) => Some(literal_to_value(lit)),
538 _ => None,
539 }
540 }
541}
542
543fn literal_to_value(lit: &Literal) -> Value {
544 match lit {
545 Literal::Integer(i) => Value::Integer(*i),
546 Literal::Float(f) => Value::Float(*f),
547 Literal::String(s) => Value::String(s.clone()),
548 Literal::Boolean(b) => Value::Boolean(*b),
549 Literal::Null => Value::Null,
550 }
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556 use crate::query::parser::Parser;
557 use crate::query::planner::Planner;
558
559 #[test]
560 fn test_table_scan() {
561 let mut context = ExecutionContext::new();
562 context.data.insert(
563 "users".to_string(),
564 vec![
565 Row {
566 columns: vec![
567 Column {
568 name: "id".to_string(),
569 alias: None,
570 },
571 Column {
572 name: "name".to_string(),
573 alias: None,
574 },
575 ],
576 values: vec![Value::Integer(1), Value::String("Alice".to_string())],
577 },
578 Row {
579 columns: vec![
580 Column {
581 name: "id".to_string(),
582 alias: None,
583 },
584 Column {
585 name: "name".to_string(),
586 alias: None,
587 },
588 ],
589 values: vec![Value::Integer(2), Value::String("Bob".to_string())],
590 },
591 ],
592 );
593
594 let mut executor = Executor::new(context);
595
596 let mut parser = Parser::new("SELECT * FROM users").unwrap();
597 let query = parser.parse().unwrap();
598 let planner = Planner::new();
599 let plan = planner.plan(&query).unwrap();
600
601 let result = executor.execute(&plan).unwrap();
602 assert_eq!(result.len(), 2);
603 }
604}