1use super::ast::*;
5use super::planner::{PhysicalOperator, PhysicalPlan};
6use crate::error::Result;
7use std::collections::HashMap;
8use std::fmt;
9use std::hash::{Hash, Hasher};
10
11#[derive(Debug, Clone, Eq)]
13struct GroupKey(Vec<GroupValue>);
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash)]
16enum GroupValue {
17 Integer(i64),
18 Float(i64), String(String),
20 Boolean(bool),
21 Null,
22}
23
24impl From<&Value> for GroupValue {
25 fn from(value: &Value) -> Self {
26 match value {
27 Value::Integer(i) => GroupValue::Integer(*i),
28 Value::Float(f) => GroupValue::Float(f.to_bits() as i64),
29 Value::String(s) => GroupValue::String(s.clone()),
30 Value::Boolean(b) => GroupValue::Boolean(*b),
31 Value::Null => GroupValue::Null,
32 }
33 }
34}
35
36impl PartialEq for GroupKey {
37 fn eq(&self, other: &Self) -> bool {
38 self.0 == other.0
39 }
40}
41
42impl Hash for GroupKey {
43 fn hash<H: Hasher>(&self, state: &mut H) {
44 self.0.hash(state);
45 }
46}
47
48#[derive(Debug, Clone, PartialEq)]
50pub struct Row {
51 pub columns: Vec<Column>,
52 pub values: Vec<Value>,
53}
54
55#[derive(Debug, Clone, PartialEq)]
57pub struct Column {
58 pub name: String,
59 pub alias: Option<String>,
60}
61
62#[derive(Debug, Clone, PartialEq)]
64pub enum Value {
65 Integer(i64),
66 Float(f64),
67 String(String),
68 Boolean(bool),
69 Null,
70}
71
72impl Value {
73 pub fn to_bytes(&self) -> Vec<u8> {
75 match self {
76 Value::Integer(i) => i.to_le_bytes().to_vec(),
77 Value::Float(f) => f.to_le_bytes().to_vec(),
78 Value::String(s) => s.as_bytes().to_vec(),
79 Value::Boolean(b) => vec![if *b { 1 } else { 0 }],
80 Value::Null => vec![],
81 }
82 }
83
84 pub fn compare(&self, other: &Value, op: &BinaryOperator) -> bool {
86 match (self, other) {
87 (Value::Integer(a), Value::Integer(b)) => match op {
88 BinaryOperator::Eq => a == b,
89 BinaryOperator::Ne => a != b,
90 BinaryOperator::Lt => a < b,
91 BinaryOperator::Le => a <= b,
92 BinaryOperator::Gt => a > b,
93 BinaryOperator::Ge => a >= b,
94 },
95 (Value::Float(a), Value::Float(b)) => match op {
96 BinaryOperator::Eq => (a - b).abs() < f64::EPSILON,
97 BinaryOperator::Ne => (a - b).abs() >= f64::EPSILON,
98 BinaryOperator::Lt => a < b,
99 BinaryOperator::Le => a <= b,
100 BinaryOperator::Gt => a > b,
101 BinaryOperator::Ge => a >= b,
102 },
103 (Value::String(a), Value::String(b)) => match op {
104 BinaryOperator::Eq => a == b,
105 BinaryOperator::Ne => a != b,
106 BinaryOperator::Lt => a < b,
107 BinaryOperator::Le => a <= b,
108 BinaryOperator::Gt => a > b,
109 BinaryOperator::Ge => a >= b,
110 },
111 (Value::Boolean(a), Value::Boolean(b)) => match op {
112 BinaryOperator::Eq => a == b,
113 BinaryOperator::Ne => a != b,
114 _ => false,
115 },
116 (Value::Null, Value::Null) => matches!(op, BinaryOperator::Eq),
117 _ => false,
118 }
119 }
120}
121
122impl fmt::Display for Value {
123 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
124 match self {
125 Value::Integer(i) => write!(f, "{}", i),
126 Value::Float(fl) => write!(f, "{}", fl),
127 Value::String(s) => write!(f, "{}", s),
128 Value::Boolean(b) => write!(f, "{}", b),
129 Value::Null => write!(f, "NULL"),
130 }
131 }
132}
133
134#[derive(Clone, Default)]
136pub struct ExecutionContext {
137 pub data: HashMap<String, Vec<Row>>,
139 pub indexes: HashMap<String, HashMap<Vec<u8>, Vec<u64>>>,
141}
142
143impl ExecutionContext {
144 pub fn new() -> Self {
146 Self::default()
147 }
148}
149
150pub struct Executor {
152 context: ExecutionContext,
153}
154
155impl Executor {
156 pub fn new(context: ExecutionContext) -> Self {
158 Self { context }
159 }
160
161 pub fn execute(&mut self, plan: &PhysicalPlan) -> Result<Vec<Row>> {
163 self.execute_operator(&plan.root)
164 }
165
166 fn execute_operator(&mut self, op: &PhysicalOperator) -> Result<Vec<Row>> {
167 match op {
168 PhysicalOperator::TableScan { table } => self.execute_table_scan(table),
169 PhysicalOperator::IndexScan { table, index, key } => {
170 self.execute_index_scan(table, index, key)
171 }
172 PhysicalOperator::IndexRangeScan {
173 table,
174 index,
175 start,
176 end,
177 } => self.execute_index_range_scan(table, index, start.as_deref(), end.as_deref()),
178 PhysicalOperator::Filter { input, condition } => self.execute_filter(input, condition),
179 PhysicalOperator::Sort { input, columns } => self.execute_sort(input, columns),
180 PhysicalOperator::Limit {
181 input,
182 count,
183 offset,
184 } => self.execute_limit(input, *count, *offset),
185 PhysicalOperator::Project { input, columns } => self.execute_project(input, columns),
186 PhysicalOperator::HashJoin {
187 left,
188 right,
189 join_type,
190 condition,
191 } => self.execute_hash_join(left, right, join_type, condition),
192 PhysicalOperator::GroupBy {
193 input,
194 group_columns,
195 aggregates,
196 having,
197 } => self.execute_group_by(input, group_columns, aggregates, having.as_ref()),
198 PhysicalOperator::Aggregate { input, aggregates } => {
199 self.execute_aggregate(input, aggregates)
200 }
201 }
202 }
203
204 fn execute_table_scan(&mut self, table: &str) -> Result<Vec<Row>> {
205 Ok(self.context.data.get(table).cloned().unwrap_or_default())
207 }
208
209 fn execute_index_scan(&mut self, table: &str, index: &str, key: &[u8]) -> Result<Vec<Row>> {
210 let row_ids = self
212 .context
213 .indexes
214 .get(index)
215 .and_then(|idx| idx.get(key))
216 .cloned()
217 .unwrap_or_default();
218
219 let all_rows = self.context.data.get(table).cloned().unwrap_or_default();
221 let result = row_ids
222 .iter()
223 .filter_map(|&id| all_rows.get(id as usize).cloned())
224 .collect();
225
226 Ok(result)
227 }
228
229 fn execute_index_range_scan(
230 &mut self,
231 table: &str,
232 index: &str,
233 start: Option<&[u8]>,
234 end: Option<&[u8]>,
235 ) -> Result<Vec<Row>> {
236 let index_data = self.context.indexes.get(index).cloned().unwrap_or_default();
238
239 let mut row_ids = Vec::new();
240 for (key, ids) in index_data {
241 let in_range = match (start, end) {
242 (Some(s), Some(e)) => key.as_slice() >= s && key.as_slice() <= e,
243 (Some(s), None) => key.as_slice() >= s,
244 (None, Some(e)) => key.as_slice() <= e,
245 (None, None) => true,
246 };
247
248 if in_range {
249 row_ids.extend(ids);
250 }
251 }
252
253 let all_rows = self.context.data.get(table).cloned().unwrap_or_default();
255 let result = row_ids
256 .iter()
257 .filter_map(|&id| all_rows.get(id as usize).cloned())
258 .collect();
259
260 Ok(result)
261 }
262
263 fn execute_filter(
264 &mut self,
265 input: &PhysicalOperator,
266 condition: &Expression,
267 ) -> Result<Vec<Row>> {
268 let rows = self.execute_operator(input)?;
269
270 let filtered = rows
271 .into_iter()
272 .filter(|row| self.evaluate_condition(row, condition))
273 .collect();
274
275 Ok(filtered)
276 }
277
278 fn execute_sort(
279 &mut self,
280 input: &PhysicalOperator,
281 columns: &[OrderByColumn],
282 ) -> Result<Vec<Row>> {
283 let mut rows = self.execute_operator(input)?;
284
285 rows.sort_by(|a, b| {
286 for col in columns {
287 let a_idx = a.columns.iter().position(|c| c.name == col.column);
288 let b_idx = b.columns.iter().position(|c| c.name == col.column);
289
290 if let (Some(a_idx), Some(b_idx)) = (a_idx, b_idx) {
291 let ordering = match (&a.values[a_idx], &b.values[b_idx]) {
292 (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
293 (Value::Float(a), Value::Float(b)) => {
294 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
295 }
296 (Value::String(a), Value::String(b)) => a.cmp(b),
297 (Value::Boolean(a), Value::Boolean(b)) => a.cmp(b),
298 _ => std::cmp::Ordering::Equal,
299 };
300
301 let ordering = match col.direction {
302 OrderDirection::Asc => ordering,
303 OrderDirection::Desc => ordering.reverse(),
304 };
305
306 if ordering != std::cmp::Ordering::Equal {
307 return ordering;
308 }
309 }
310 }
311 std::cmp::Ordering::Equal
312 });
313
314 Ok(rows)
315 }
316
317 fn execute_limit(
318 &mut self,
319 input: &PhysicalOperator,
320 count: usize,
321 offset: usize,
322 ) -> Result<Vec<Row>> {
323 let rows = self.execute_operator(input)?;
324 Ok(rows.into_iter().skip(offset).take(count).collect())
325 }
326
327 fn execute_project(
328 &mut self,
329 input: &PhysicalOperator,
330 columns: &[SelectColumn],
331 ) -> Result<Vec<Row>> {
332 let rows = self.execute_operator(input)?;
333
334 let projected = rows
335 .into_iter()
336 .map(|row| {
337 let mut new_columns = Vec::new();
338 let mut new_values = Vec::new();
339
340 for col in columns {
341 match col {
342 SelectColumn::Wildcard => {
343 new_columns.extend(row.columns.clone());
344 new_values.extend(row.values.clone());
345 }
346 SelectColumn::Column { name, alias } => {
347 if let Some(idx) = row.columns.iter().position(|c| &c.name == name) {
348 new_columns.push(Column {
349 name: name.clone(),
350 alias: alias.clone(),
351 });
352 new_values.push(row.values[idx].clone());
353 }
354 }
355 SelectColumn::Aggregate { .. } => {
356 }
358 }
359 }
360
361 Row {
362 columns: new_columns,
363 values: new_values,
364 }
365 })
366 .collect();
367
368 Ok(projected)
369 }
370
371 fn execute_hash_join(
372 &mut self,
373 left: &PhysicalOperator,
374 right: &PhysicalOperator,
375 join_type: &JoinType,
376 condition: &Expression,
377 ) -> Result<Vec<Row>> {
378 let left_rows = self.execute_operator(left)?;
379 let right_rows = self.execute_operator(right)?;
380
381 if right_rows.len() < 100 {
383 self.nested_loop_join(&left_rows, &right_rows, join_type, condition)
385 } else {
386 self.hash_join_impl(&left_rows, &right_rows, join_type, condition)
388 }
389 }
390
391 fn nested_loop_join(
393 &mut self,
394 left_rows: &[Row],
395 right_rows: &[Row],
396 join_type: &JoinType,
397 condition: &Expression,
398 ) -> Result<Vec<Row>> {
399 let mut result = Vec::new();
400
401 match join_type {
402 JoinType::Inner => {
403 for l_row in left_rows {
404 for r_row in right_rows {
405 if self.evaluate_join_condition(l_row, r_row, condition) {
406 result.push(self.merge_rows(l_row, r_row));
407 }
408 }
409 }
410 }
411 JoinType::Left => {
412 for l_row in left_rows {
413 let mut matched = false;
414 for r_row in right_rows {
415 if self.evaluate_join_condition(l_row, r_row, condition) {
416 result.push(self.merge_rows(l_row, r_row));
417 matched = true;
418 }
419 }
420 if !matched {
421 result.push(self.merge_rows_with_null(l_row, right_rows[0].columns.len()));
423 }
424 }
425 }
426 JoinType::Right => {
427 for r_row in right_rows {
428 let mut matched = false;
429 for l_row in left_rows {
430 if self.evaluate_join_condition(l_row, r_row, condition) {
431 result.push(self.merge_rows(l_row, r_row));
432 matched = true;
433 }
434 }
435 if !matched {
436 result.push(self.merge_null_with_row(left_rows[0].columns.len(), r_row));
438 }
439 }
440 }
441 JoinType::Full => {
442 let mut left_matched = vec![false; left_rows.len()];
443 let mut right_matched = vec![false; right_rows.len()];
444
445 for (l_idx, l_row) in left_rows.iter().enumerate() {
446 for (r_idx, r_row) in right_rows.iter().enumerate() {
447 if self.evaluate_join_condition(l_row, r_row, condition) {
448 result.push(self.merge_rows(l_row, r_row));
449 left_matched[l_idx] = true;
450 right_matched[r_idx] = true;
451 }
452 }
453 }
454
455 for (idx, matched) in left_matched.iter().enumerate() {
457 if !*matched {
458 result.push(
459 self.merge_rows_with_null(&left_rows[idx], right_rows[0].columns.len()),
460 );
461 }
462 }
463
464 for (idx, matched) in right_matched.iter().enumerate() {
466 if !*matched {
467 result.push(
468 self.merge_null_with_row(left_rows[0].columns.len(), &right_rows[idx]),
469 );
470 }
471 }
472 }
473 }
474
475 Ok(result)
476 }
477
478 fn hash_join_impl(
480 &mut self,
481 left_rows: &[Row],
482 right_rows: &[Row],
483 join_type: &JoinType,
484 condition: &Expression,
485 ) -> Result<Vec<Row>> {
486 let mut hash_table: HashMap<Vec<u8>, Vec<&Row>> = HashMap::new();
488
489 for r_row in right_rows {
490 let key = self.extract_join_key(r_row, condition, true);
491 hash_table.entry(key).or_default().push(r_row);
492 }
493
494 let mut result = Vec::new();
495
496 match join_type {
497 JoinType::Inner => {
498 for l_row in left_rows {
499 let key = self.extract_join_key(l_row, condition, false);
500 if let Some(matching_rows) = hash_table.get(&key) {
501 for r_row in matching_rows {
502 if self.evaluate_join_condition(l_row, r_row, condition) {
503 result.push(self.merge_rows(l_row, r_row));
504 }
505 }
506 }
507 }
508 }
509 JoinType::Left => {
510 for l_row in left_rows {
511 let key = self.extract_join_key(l_row, condition, false);
512 if let Some(matching_rows) = hash_table.get(&key) {
513 let mut matched = false;
514 for r_row in matching_rows {
515 if self.evaluate_join_condition(l_row, r_row, condition) {
516 result.push(self.merge_rows(l_row, r_row));
517 matched = true;
518 }
519 }
520 if !matched {
521 result.push(
522 self.merge_rows_with_null(l_row, right_rows[0].columns.len()),
523 );
524 }
525 } else {
526 result.push(self.merge_rows_with_null(l_row, right_rows[0].columns.len()));
527 }
528 }
529 }
530 JoinType::Right | JoinType::Full => {
531 return self.nested_loop_join(left_rows, right_rows, join_type, condition);
534 }
535 }
536
537 Ok(result)
538 }
539
540 fn extract_join_key(&self, row: &Row, condition: &Expression, is_right: bool) -> Vec<u8> {
542 if let Expression::BinaryOp { left, right, .. } = condition {
544 if let (Expression::Column(left_col), Expression::Column(right_col)) =
545 (left.as_ref(), right.as_ref())
546 {
547 let col_name = if is_right { right_col } else { left_col };
548
549 let column_name = if let Some(dot_pos) = col_name.rfind('.') {
551 &col_name[dot_pos + 1..]
552 } else {
553 col_name
554 };
555
556 if let Some(idx) = row.columns.iter().position(|c| c.name == column_name) {
557 return row.values[idx].to_bytes();
558 }
559 }
560 }
561 vec![]
562 }
563
564 fn evaluate_join_condition(&self, left: &Row, right: &Row, condition: &Expression) -> bool {
566 match condition {
567 Expression::BinaryOp {
568 left: l_expr,
569 op,
570 right: r_expr,
571 } => {
572 let left_val = self.evaluate_expression_for_row(l_expr, left, right, true);
573 let right_val = self.evaluate_expression_for_row(r_expr, left, right, false);
574
575 if let (Some(lv), Some(rv)) = (left_val, right_val) {
576 return lv.compare(&rv, op);
577 }
578 false
579 }
580 Expression::LogicalOp {
581 left: l_expr,
582 op,
583 right: r_expr,
584 } => {
585 let left_result = self.evaluate_join_condition(left, right, l_expr);
586 let right_result = self.evaluate_join_condition(left, right, r_expr);
587
588 match op {
589 LogicalOperator::And => left_result && right_result,
590 LogicalOperator::Or => left_result || right_result,
591 }
592 }
593 _ => true, }
595 }
596
597 fn evaluate_expression_for_row(
599 &self,
600 expr: &Expression,
601 left_row: &Row,
602 right_row: &Row,
603 is_left: bool,
604 ) -> Option<Value> {
605 match expr {
606 Expression::Column(name) => {
607 let column_name = if let Some(dot_pos) = name.rfind('.') {
609 &name[dot_pos + 1..]
610 } else {
611 name
612 };
613
614 let row = if is_left { left_row } else { right_row };
616 row.columns
617 .iter()
618 .position(|c| c.name == column_name)
619 .map(|idx| row.values[idx].clone())
620 }
621 Expression::Literal(lit) => Some(self.literal_to_value(lit)),
622 _ => None,
623 }
624 }
625
626 fn merge_rows(&self, left: &Row, right: &Row) -> Row {
628 let mut columns = left.columns.clone();
629 columns.extend(right.columns.clone());
630 let mut values = left.values.clone();
631 values.extend(right.values.clone());
632 Row { columns, values }
633 }
634
635 fn merge_rows_with_null(&self, left: &Row, right_col_count: usize) -> Row {
637 let columns = left.columns.clone();
638 let mut values = left.values.clone();
639 for _ in 0..right_col_count {
640 values.push(Value::Null);
641 }
642 Row { columns, values }
643 }
644
645 fn merge_null_with_row(&self, left_col_count: usize, right: &Row) -> Row {
647 let mut columns = Vec::new();
648 let mut values = Vec::new();
649 for _ in 0..left_col_count {
650 values.push(Value::Null);
651 }
652 columns.extend(right.columns.clone());
653 values.extend(right.values.clone());
654 Row { columns, values }
655 }
656
657 fn literal_to_value(&self, lit: &Literal) -> Value {
659 match lit {
660 Literal::Integer(i) => Value::Integer(*i),
661 Literal::Float(f) => Value::Float(*f),
662 Literal::String(s) => Value::String(s.clone()),
663 Literal::Boolean(b) => Value::Boolean(*b),
664 Literal::Null => Value::Null,
665 }
666 }
667
668 fn execute_group_by(
669 &mut self,
670 input: &PhysicalOperator,
671 group_columns: &[String],
672 aggregates: &[SelectColumn],
673 having: Option<&Expression>,
674 ) -> Result<Vec<Row>> {
675 let rows = self.execute_operator(input)?;
676
677 if rows.is_empty() {
678 return Ok(Vec::new());
679 }
680
681 let mut groups: HashMap<GroupKey, Vec<Row>> = HashMap::new();
683
684 for row in rows {
685 let mut key_values = Vec::new();
687 for group_col in group_columns {
688 if let Some(col_idx) = row.columns.iter().position(|c| &c.name == group_col) {
689 key_values.push(GroupValue::from(&row.values[col_idx]));
690 } else {
691 key_values.push(GroupValue::Null);
692 }
693 }
694
695 let group_key = GroupKey(key_values);
696 groups.entry(group_key).or_default().push(row);
697 }
698
699 let mut result_rows = Vec::new();
701
702 for (group_key, group_rows) in groups {
703 let mut result_columns = Vec::new();
704 let mut result_values = Vec::new();
705
706 for (i, col_name) in group_columns.iter().enumerate() {
708 result_columns.push(Column {
709 name: col_name.clone(),
710 alias: None,
711 });
712 let value = match &group_key.0[i] {
714 GroupValue::Integer(i) => Value::Integer(*i),
715 GroupValue::Float(bits) => Value::Float(f64::from_bits(*bits as u64)),
716 GroupValue::String(s) => Value::String(s.clone()),
717 GroupValue::Boolean(b) => Value::Boolean(*b),
718 GroupValue::Null => Value::Null,
719 };
720 result_values.push(value);
721 }
722
723 for agg in aggregates {
725 if let SelectColumn::Aggregate {
726 function,
727 column,
728 alias,
729 } = agg
730 {
731 let col_name = match column.as_ref() {
732 SelectColumn::Wildcard => "*",
733 SelectColumn::Column { name, .. } => name.as_str(),
734 _ => continue,
735 };
736
737 let value = self.compute_aggregate(function, col_name, &group_rows)?;
738
739 let display_name = alias
740 .as_ref()
741 .cloned()
742 .unwrap_or_else(|| format!("{}({})", function, col_name));
743
744 result_columns.push(Column {
745 name: display_name.clone(),
746 alias: alias.clone(),
747 });
748 result_values.push(value);
749 } else if let SelectColumn::Column { name, alias: _ } = agg {
750 if !group_columns.contains(name) {
752 continue;
754 }
755 }
756 }
757
758 let row = Row {
759 columns: result_columns,
760 values: result_values,
761 };
762
763 if let Some(having_condition) = having {
765 if self.evaluate_condition(&row, having_condition) {
766 result_rows.push(row);
767 }
768 } else {
769 result_rows.push(row);
770 }
771 }
772
773 Ok(result_rows)
774 }
775
776 fn compute_aggregate(
777 &self,
778 function: &AggregateFunction,
779 col_name: &str,
780 rows: &[Row],
781 ) -> Result<Value> {
782 if rows.is_empty() {
783 return Ok(Value::Null);
784 }
785
786 match function {
787 AggregateFunction::Count => {
788 if col_name == "*" {
789 Ok(Value::Integer(rows.len() as i64))
791 } else {
792 let col_idx = rows
794 .iter()
795 .find_map(|r| r.columns.iter().position(|c| c.name == col_name));
796
797 if let Some(idx) = col_idx {
798 let count = rows
799 .iter()
800 .filter(|r| {
801 idx < r.values.len() && !matches!(r.values[idx], Value::Null)
802 })
803 .count();
804 Ok(Value::Integer(count as i64))
805 } else {
806 Ok(Value::Integer(0))
807 }
808 }
809 }
810 AggregateFunction::Sum => {
811 let col_idx = rows
813 .iter()
814 .find_map(|r| r.columns.iter().position(|c| c.name == col_name));
815
816 if let Some(idx) = col_idx {
817 let sum: i64 = rows
818 .iter()
819 .filter_map(|r| {
820 if idx < r.values.len() {
821 match &r.values[idx] {
822 Value::Integer(i) => Some(i),
823 _ => None,
824 }
825 } else {
826 None
827 }
828 })
829 .sum();
830 Ok(Value::Integer(sum))
831 } else {
832 Ok(Value::Null)
833 }
834 }
835 AggregateFunction::Avg => {
836 let col_idx = rows
837 .iter()
838 .find_map(|r| r.columns.iter().position(|c| c.name == col_name));
839
840 if let Some(idx) = col_idx {
841 let values: Vec<i64> = rows
842 .iter()
843 .filter_map(|r| {
844 if idx < r.values.len() {
845 match &r.values[idx] {
846 Value::Integer(i) => Some(*i),
847 _ => None,
848 }
849 } else {
850 None
851 }
852 })
853 .collect();
854 if !values.is_empty() {
855 let sum: i64 = values.iter().sum();
856 Ok(Value::Float(sum as f64 / values.len() as f64))
857 } else {
858 Ok(Value::Null)
859 }
860 } else {
861 Ok(Value::Null)
862 }
863 }
864 AggregateFunction::Min => {
865 let col_idx = rows
866 .iter()
867 .find_map(|r| r.columns.iter().position(|c| c.name == col_name));
868
869 if let Some(idx) = col_idx {
870 Ok(rows
871 .iter()
872 .filter_map(|r| {
873 if idx < r.values.len() {
874 Some(&r.values[idx])
875 } else {
876 None
877 }
878 })
879 .min_by(|a, b| match (a, b) {
880 (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
881 (Value::Float(a), Value::Float(b)) => {
882 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
883 }
884 (Value::String(a), Value::String(b)) => a.cmp(b),
885 _ => std::cmp::Ordering::Equal,
886 })
887 .cloned()
888 .unwrap_or(Value::Null))
889 } else {
890 Ok(Value::Null)
891 }
892 }
893 AggregateFunction::Max => {
894 let col_idx = rows
895 .iter()
896 .find_map(|r| r.columns.iter().position(|c| c.name == col_name));
897
898 if let Some(idx) = col_idx {
899 Ok(rows
900 .iter()
901 .filter_map(|r| {
902 if idx < r.values.len() {
903 Some(&r.values[idx])
904 } else {
905 None
906 }
907 })
908 .max_by(|a, b| match (a, b) {
909 (Value::Integer(a), Value::Integer(b)) => a.cmp(b),
910 (Value::Float(a), Value::Float(b)) => {
911 a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal)
912 }
913 (Value::String(a), Value::String(b)) => a.cmp(b),
914 _ => std::cmp::Ordering::Equal,
915 })
916 .cloned()
917 .unwrap_or(Value::Null))
918 } else {
919 Ok(Value::Null)
920 }
921 }
922 }
923 }
924
925 fn execute_aggregate(
926 &mut self,
927 input: &PhysicalOperator,
928 aggregates: &[SelectColumn],
929 ) -> Result<Vec<Row>> {
930 let rows = self.execute_operator(input)?;
931
932 let mut result_columns = Vec::new();
933 let mut result_values = Vec::new();
934
935 for agg in aggregates {
936 if let SelectColumn::Aggregate {
937 function,
938 column,
939 alias,
940 } = agg
941 {
942 let col_name = match column.as_ref() {
943 SelectColumn::Wildcard => "*",
944 SelectColumn::Column { name, .. } => name.as_str(),
945 _ => continue,
946 };
947
948 let value = self.compute_aggregate(function, col_name, &rows)?;
949
950 let display_name = alias
951 .as_ref()
952 .cloned()
953 .unwrap_or_else(|| format!("{}({})", function, col_name));
954
955 result_columns.push(Column {
956 name: display_name.clone(),
957 alias: alias.clone(),
958 });
959 result_values.push(value);
960 }
961 }
962
963 Ok(vec![Row {
964 columns: result_columns,
965 values: result_values,
966 }])
967 }
968
969 fn evaluate_condition(&self, row: &Row, condition: &Expression) -> bool {
970 match condition {
971 Expression::Column(name) => {
972 row.columns.iter().any(|c| &c.name == name)
974 }
975 Expression::Literal(lit) => {
976 match lit {
978 Literal::Boolean(b) => *b,
979 _ => true,
980 }
981 }
982 Expression::BinaryOp { left, op, right } => {
983 let left_val = self.evaluate_expression(row, left);
984 let right_val = self.evaluate_expression(row, right);
985
986 if let (Some(l), Some(r)) = (left_val, right_val) {
987 l.compare(&r, op)
988 } else {
989 false
990 }
991 }
992 Expression::LogicalOp { left, op, right } => {
993 let left_result = self.evaluate_condition(row, left);
994 let right_result = self.evaluate_condition(row, right);
995
996 match op {
997 LogicalOperator::And => left_result && right_result,
998 LogicalOperator::Or => left_result || right_result,
999 }
1000 }
1001 Expression::Not(expr) => !self.evaluate_condition(row, expr),
1002 Expression::Like { expr, pattern } => {
1003 if let Some(Value::String(s)) = self.evaluate_expression(row, expr) {
1004 let pattern = pattern.replace('%', "");
1006 s.contains(&pattern)
1007 } else {
1008 false
1009 }
1010 }
1011 Expression::In { expr, values } => {
1012 self.evaluate_expression(row, expr).is_some_and(|val| {
1013 values.iter().any(|lit| {
1014 let lit_val = literal_to_value(lit);
1015 val == lit_val
1016 })
1017 })
1018 }
1019 Expression::Between { expr, min, max } => {
1020 if let (Some(val), Some(min_v), Some(max_v)) = (
1021 self.evaluate_expression(row, expr),
1022 self.evaluate_expression(row, min),
1023 self.evaluate_expression(row, max),
1024 ) {
1025 val.compare(&min_v, &BinaryOperator::Ge)
1026 && val.compare(&max_v, &BinaryOperator::Le)
1027 } else {
1028 false
1029 }
1030 }
1031 }
1032 }
1033
1034 fn evaluate_expression(&self, row: &Row, expr: &Expression) -> Option<Value> {
1035 match expr {
1036 Expression::Column(name) => row
1037 .columns
1038 .iter()
1039 .position(|c| &c.name == name)
1040 .and_then(|idx| row.values.get(idx).cloned()),
1041 Expression::Literal(lit) => Some(literal_to_value(lit)),
1042 _ => None,
1043 }
1044 }
1045}
1046
1047fn literal_to_value(lit: &Literal) -> Value {
1048 match lit {
1049 Literal::Integer(i) => Value::Integer(*i),
1050 Literal::Float(f) => Value::Float(*f),
1051 Literal::String(s) => Value::String(s.clone()),
1052 Literal::Boolean(b) => Value::Boolean(*b),
1053 Literal::Null => Value::Null,
1054 }
1055}
1056
1057#[cfg(test)]
1058mod tests {
1059 use super::*;
1060 use crate::query::parser::Parser;
1061 use crate::query::planner::Planner;
1062
1063 #[test]
1064 fn test_table_scan() {
1065 let mut context = ExecutionContext::new();
1066 context.data.insert(
1067 "users".to_string(),
1068 vec![
1069 Row {
1070 columns: vec![
1071 Column {
1072 name: "id".to_string(),
1073 alias: None,
1074 },
1075 Column {
1076 name: "name".to_string(),
1077 alias: None,
1078 },
1079 ],
1080 values: vec![Value::Integer(1), Value::String("Alice".to_string())],
1081 },
1082 Row {
1083 columns: vec![
1084 Column {
1085 name: "id".to_string(),
1086 alias: None,
1087 },
1088 Column {
1089 name: "name".to_string(),
1090 alias: None,
1091 },
1092 ],
1093 values: vec![Value::Integer(2), Value::String("Bob".to_string())],
1094 },
1095 ],
1096 );
1097
1098 let mut executor = Executor::new(context);
1099
1100 let mut parser = Parser::new("SELECT * FROM users").unwrap();
1101 let query = parser.parse().unwrap();
1102 let planner = Planner::new();
1103 let plan = planner.plan(&query).unwrap();
1104
1105 let result = executor.execute(&plan).unwrap();
1106 assert_eq!(result.len(), 2);
1107 }
1108}