1use crate::ast::{AggregateFunc, BinaryOp, ColumnRef, Expr, SortOrder, UnaryOp};
8use crate::executor::{
9 AggregateExecutor, HashJoin, LimitExecutor, Relation, RelationEntry,
10 SharedTables, SortExecutor, SortMergeJoin,
11};
12use crate::planner::PhysicalPlan;
13use alloc::boxed::Box;
14use alloc::collections::BTreeMap;
15use alloc::rc::Rc;
16use alloc::string::String;
17use alloc::vec;
18use alloc::vec::Vec;
19use cynos_core::{Row, Value};
20use cynos_jsonb::{JsonbObject, JsonbValue, JsonPath};
21
22#[derive(Clone, Debug)]
25pub struct EvalContext<'a> {
26 pub tables: &'a [String],
28 pub table_column_counts: &'a [usize],
30}
31
32impl<'a> EvalContext<'a> {
33 #[inline]
35 pub fn new(tables: &'a [String], table_column_counts: &'a [usize]) -> Self {
36 Self { tables, table_column_counts }
37 }
38
39 #[inline]
41 pub fn resolve_column_index(&self, table_name: &str, table_relative_index: usize) -> usize {
42 let mut offset = 0;
43 for (i, t) in self.tables.iter().enumerate() {
44 if t == table_name {
45 return offset + table_relative_index;
46 }
47 offset += self.table_column_counts.get(i).copied().unwrap_or(0);
48 }
49 table_relative_index
52 }
53}
54
55#[derive(Clone, Debug)]
57pub enum ExecutionError {
58 TableNotFound(String),
60 IndexNotFound { table: String, index: String },
62 ColumnNotFound { table: String, column: String },
64 TypeMismatch(String),
66 InvalidOperation(String),
68}
69
70impl core::fmt::Display for ExecutionError {
71 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
72 match self {
73 ExecutionError::TableNotFound(t) => write!(f, "Table not found: {}", t),
74 ExecutionError::IndexNotFound { table, index } => {
75 write!(f, "Index {} not found on table {}", index, table)
76 }
77 ExecutionError::ColumnNotFound { table, column } => {
78 write!(f, "Column {}.{} not found", table, column)
79 }
80 ExecutionError::TypeMismatch(msg) => write!(f, "Type mismatch: {}", msg),
81 ExecutionError::InvalidOperation(msg) => write!(f, "Invalid operation: {}", msg),
82 }
83 }
84}
85
86#[derive(Clone, Debug, PartialEq)]
88enum RegexOp {
89 Start,
90 End,
91 Char(char),
92 Any,
93 Digit,
94 NonDigit,
95 Word,
96 NonWord,
97 Whitespace,
98 NonWhitespace,
99 CharClass(Vec<CharClassItem>),
100 NegCharClass(Vec<CharClassItem>),
101 Star(Box<RegexOp>),
102 Plus(Box<RegexOp>),
103 Question(Box<RegexOp>),
104 GroupStart,
105 GroupEnd,
106 GroupStar,
107 GroupPlus,
108 GroupQuestion,
109 Alternation,
110}
111
112#[derive(Clone, Debug, PartialEq)]
114enum CharClassItem {
115 Char(char),
116 Range(char, char),
117}
118
119pub type ExecutionResult<T> = Result<T, ExecutionError>;
121
122pub trait DataSource {
127 fn get_table_rows(&self, table: &str) -> ExecutionResult<Vec<Rc<Row>>>;
129
130 fn get_index_range(
132 &self,
133 table: &str,
134 index: &str,
135 range_start: Option<&Value>,
136 range_end: Option<&Value>,
137 include_start: bool,
138 include_end: bool,
139 ) -> ExecutionResult<Vec<Rc<Row>>> {
140 self.get_index_range_with_limit(
142 table,
143 index,
144 range_start,
145 range_end,
146 include_start,
147 include_end,
148 None,
149 0,
150 false,
151 )
152 }
153
154 fn get_index_range_with_limit(
157 &self,
158 table: &str,
159 index: &str,
160 range_start: Option<&Value>,
161 range_end: Option<&Value>,
162 include_start: bool,
163 include_end: bool,
164 limit: Option<usize>,
165 offset: usize,
166 reverse: bool,
167 ) -> ExecutionResult<Vec<Rc<Row>>>;
168
169 fn get_index_point(&self, table: &str, index: &str, key: &Value) -> ExecutionResult<Vec<Rc<Row>>>;
171
172 fn get_index_point_with_limit(
174 &self,
175 table: &str,
176 index: &str,
177 key: &Value,
178 limit: Option<usize>,
179 ) -> ExecutionResult<Vec<Rc<Row>>> {
180 let rows = self.get_index_point(table, index, key)?;
182 Ok(if let Some(limit) = limit {
183 rows.into_iter().take(limit).collect()
184 } else {
185 rows
186 })
187 }
188
189 fn get_column_count(&self, table: &str) -> ExecutionResult<usize>;
191
192 fn get_gin_index_rows(
195 &self,
196 table: &str,
197 index: &str,
198 key: &str,
199 value: &str,
200 ) -> ExecutionResult<Vec<Rc<Row>>> {
201 let _ = (index, key, value);
203 self.get_table_rows(table)
204 }
205
206 fn get_gin_index_rows_by_key(
209 &self,
210 table: &str,
211 index: &str,
212 key: &str,
213 ) -> ExecutionResult<Vec<Rc<Row>>> {
214 let _ = (index, key);
216 self.get_table_rows(table)
217 }
218
219 fn get_gin_index_rows_multi(
222 &self,
223 table: &str,
224 index: &str,
225 pairs: &[(&str, &str)],
226 ) -> ExecutionResult<Vec<Rc<Row>>> {
227 let _ = (index, pairs);
229 self.get_table_rows(table)
230 }
231}
232
233pub struct PhysicalPlanRunner<'a, D: DataSource> {
238 data_source: &'a D,
239}
240
241impl<'a, D: DataSource> PhysicalPlanRunner<'a, D> {
242 pub fn new(data_source: &'a D) -> Self {
244 Self { data_source }
245 }
246
247 pub fn execute(&self, plan: &PhysicalPlan) -> ExecutionResult<Relation> {
249 match plan {
250 PhysicalPlan::TableScan { table } => self.execute_table_scan(table),
251
252 PhysicalPlan::IndexScan {
253 table,
254 index,
255 range_start,
256 range_end,
257 include_start,
258 include_end,
259 limit,
260 offset,
261 reverse,
262 } => self.execute_index_scan(
263 table,
264 index,
265 range_start.as_ref(),
266 range_end.as_ref(),
267 *include_start,
268 *include_end,
269 *limit,
270 *offset,
271 *reverse,
272 ),
273
274 PhysicalPlan::IndexGet { table, index, key, limit } => {
275 self.execute_index_get(table, index, key, *limit)
276 }
277
278 PhysicalPlan::IndexInGet { table, index, keys } => {
279 self.execute_index_in_get(table, index, keys)
280 }
281
282 PhysicalPlan::GinIndexScan {
283 table,
284 index,
285 key,
286 value,
287 query_type,
288 } => self.execute_gin_index_scan(table, index, key, value.as_deref(), query_type),
289
290 PhysicalPlan::GinIndexScanMulti {
291 table,
292 index,
293 pairs,
294 } => self.execute_gin_index_scan_multi(table, index, pairs),
295
296 PhysicalPlan::Filter { input, predicate } => {
297 let input_rel = self.execute(input)?;
298 self.execute_filter(input_rel, predicate)
299 }
300
301 PhysicalPlan::Project { input, columns } => {
302 let input_rel = self.execute(input)?;
303 self.execute_project(input_rel, columns)
304 }
305
306 PhysicalPlan::HashJoin {
307 left,
308 right,
309 condition,
310 join_type,
311 } => {
312 let left_rel = self.execute(left)?;
313 let right_rel = self.execute(right)?;
314 self.execute_hash_join(left_rel, right_rel, condition, *join_type)
315 }
316
317 PhysicalPlan::SortMergeJoin {
318 left,
319 right,
320 condition,
321 join_type,
322 } => {
323 let left_rel = self.execute(left)?;
324 let right_rel = self.execute(right)?;
325 self.execute_sort_merge_join(left_rel, right_rel, condition, *join_type)
326 }
327
328 PhysicalPlan::NestedLoopJoin {
329 left,
330 right,
331 condition,
332 join_type,
333 } => {
334 let left_rel = self.execute(left)?;
335 let right_rel = self.execute(right)?;
336 self.execute_nested_loop_join(left_rel, right_rel, condition, *join_type)
337 }
338
339 PhysicalPlan::IndexNestedLoopJoin {
340 outer,
341 inner_table,
342 inner_index,
343 condition,
344 join_type,
345 } => {
346 let outer_rel = self.execute(outer)?;
347 self.execute_index_nested_loop_join(
348 outer_rel,
349 inner_table,
350 inner_index,
351 condition,
352 *join_type,
353 )
354 }
355
356 PhysicalPlan::HashAggregate {
357 input,
358 group_by,
359 aggregates,
360 } => {
361 let input_rel = self.execute(input)?;
362 self.execute_hash_aggregate(input_rel, group_by, aggregates)
363 }
364
365 PhysicalPlan::Sort { input, order_by } => {
366 let input_rel = self.execute(input)?;
367 self.execute_sort(input_rel, order_by)
368 }
369
370 PhysicalPlan::Limit {
371 input,
372 limit,
373 offset,
374 } => {
375 let input_rel = self.execute(input)?;
376 self.execute_limit(input_rel, *limit, *offset)
377 }
378
379 PhysicalPlan::CrossProduct { left, right } => {
380 let left_rel = self.execute(left)?;
381 let right_rel = self.execute(right)?;
382 self.execute_cross_product(left_rel, right_rel)
383 }
384
385 PhysicalPlan::NoOp { input } => self.execute(input),
386
387 PhysicalPlan::TopN {
388 input,
389 order_by,
390 limit,
391 offset,
392 } => {
393 let input_rel = self.execute(input)?;
396 let sorted = self.execute_sort(input_rel, order_by)?;
397 self.execute_limit(sorted, *limit, *offset)
398 }
399
400 PhysicalPlan::Empty => Ok(Relation::empty()),
401 }
402 }
403
404 fn execute_table_scan(&self, table: &str) -> ExecutionResult<Relation> {
407 let rows = self.data_source.get_table_rows(table)?;
408 let column_count = self.data_source.get_column_count(table)?;
409 Ok(Relation::from_rows_with_column_count(rows, alloc::vec![table.into()], column_count))
410 }
411
412 fn execute_index_scan(
413 &self,
414 table: &str,
415 index: &str,
416 range_start: Option<&Value>,
417 range_end: Option<&Value>,
418 include_start: bool,
419 include_end: bool,
420 limit: Option<usize>,
421 offset: Option<usize>,
422 reverse: bool,
423 ) -> ExecutionResult<Relation> {
424 let rows = self.data_source.get_index_range_with_limit(
426 table,
427 index,
428 range_start,
429 range_end,
430 include_start,
431 include_end,
432 limit,
433 offset.unwrap_or(0),
434 reverse,
435 )?;
436 let column_count = self.data_source.get_column_count(table)?;
437 Ok(Relation::from_rows_with_column_count(rows, alloc::vec![table.into()], column_count))
438 }
439
440 fn execute_index_get(
441 &self,
442 table: &str,
443 index: &str,
444 key: &Value,
445 limit: Option<usize>,
446 ) -> ExecutionResult<Relation> {
447 let rows = self.data_source.get_index_point_with_limit(table, index, key, limit)?;
448 let column_count = self.data_source.get_column_count(table)?;
449 Ok(Relation::from_rows_with_column_count(rows, alloc::vec![table.into()], column_count))
450 }
451
452 fn execute_index_in_get(
455 &self,
456 table: &str,
457 index: &str,
458 keys: &[Value],
459 ) -> ExecutionResult<Relation> {
460 let mut all_rows = Vec::new();
461 let mut seen_ids = alloc::collections::BTreeSet::new();
462
463 for key in keys {
465 let rows = self.data_source.get_index_point(table, index, key)?;
466 for row in rows {
467 if seen_ids.insert(row.id()) {
469 all_rows.push(row);
470 }
471 }
472 }
473
474 let column_count = self.data_source.get_column_count(table)?;
475 Ok(Relation::from_rows_with_column_count(all_rows, alloc::vec![table.into()], column_count))
476 }
477
478 fn execute_gin_index_scan(
479 &self,
480 table: &str,
481 index: &str,
482 key: &str,
483 value: Option<&str>,
484 query_type: &str,
485 ) -> ExecutionResult<Relation> {
486 let rows = match (query_type, value) {
487 ("eq", Some(v)) => self.data_source.get_gin_index_rows(table, index, key, v)?,
488 ("contains", _) | ("exists", _) => {
489 self.data_source.get_gin_index_rows_by_key(table, index, key)?
490 }
491 _ => self.data_source.get_table_rows(table)?,
492 };
493 let column_count = self.data_source.get_column_count(table)?;
494 Ok(Relation::from_rows_with_column_count(rows, alloc::vec![table.into()], column_count))
495 }
496
497 fn execute_gin_index_scan_multi(
498 &self,
499 table: &str,
500 index: &str,
501 pairs: &[(String, String)],
502 ) -> ExecutionResult<Relation> {
503 let pair_refs: Vec<(&str, &str)> = pairs
505 .iter()
506 .map(|(k, v)| (k.as_str(), v.as_str()))
507 .collect();
508 let rows = self.data_source.get_gin_index_rows_multi(table, index, &pair_refs)?;
509 let column_count = self.data_source.get_column_count(table)?;
510 Ok(Relation::from_rows_with_column_count(rows, alloc::vec![table.into()], column_count))
511 }
512
513 fn execute_filter(&self, input: Relation, predicate: &Expr) -> ExecutionResult<Relation> {
516 let tables = input.tables().to_vec();
517 let table_column_counts = input.table_column_counts().to_vec();
518 let ctx = EvalContext::new(&tables, &table_column_counts);
519
520 let entries: Vec<RelationEntry> = input
521 .into_iter()
522 .filter(|entry| self.eval_predicate_ctx(predicate, entry, &ctx))
523 .collect();
524
525 Ok(Relation { entries, tables, table_column_counts })
526 }
527
528 fn execute_project(&self, input: Relation, columns: &[Expr]) -> ExecutionResult<Relation> {
531 let tables = input.tables().to_vec();
532 let table_column_counts = input.table_column_counts().to_vec();
533 let shared_tables: SharedTables = tables.clone().into();
534 let ctx = EvalContext::new(&tables, &table_column_counts);
535
536 let entries: Vec<RelationEntry> = input
537 .into_iter()
538 .map(|entry| {
539 let values: Vec<Value> = columns
540 .iter()
541 .map(|col| self.eval_expr_ctx(col, &entry, Some(&ctx)))
542 .collect();
543 RelationEntry::new_combined(Rc::new(Row::new(entry.id(), values)), shared_tables.clone())
544 })
545 .collect();
546
547 Ok(Relation {
549 entries,
550 tables,
551 table_column_counts: alloc::vec![columns.len()],
552 })
553 }
554
555 fn execute_hash_join(
558 &self,
559 left: Relation,
560 right: Relation,
561 condition: &Expr,
562 join_type: crate::ast::JoinType,
563 ) -> ExecutionResult<Relation> {
564 let (left_key_idx, right_key_idx) = self.extract_join_keys(condition, &left, &right)?;
566
567 let is_outer = matches!(
568 join_type,
569 crate::ast::JoinType::LeftOuter | crate::ast::JoinType::FullOuter
570 );
571
572 let join = HashJoin::new(left_key_idx, right_key_idx, is_outer);
573 Ok(join.execute(left, right))
574 }
575
576 fn execute_sort_merge_join(
577 &self,
578 left: Relation,
579 right: Relation,
580 condition: &Expr,
581 join_type: crate::ast::JoinType,
582 ) -> ExecutionResult<Relation> {
583 let (left_key_idx, right_key_idx) = self.extract_join_keys(condition, &left, &right)?;
584 let is_outer = matches!(
585 join_type,
586 crate::ast::JoinType::LeftOuter | crate::ast::JoinType::FullOuter
587 );
588
589 let join = SortMergeJoin::new(left_key_idx, right_key_idx, is_outer);
590 Ok(join.execute_with_sort(left, right))
591 }
592
593 fn execute_nested_loop_join(
594 &self,
595 left: Relation,
596 right: Relation,
597 condition: &Expr,
598 join_type: crate::ast::JoinType,
599 ) -> ExecutionResult<Relation> {
600 let is_outer = matches!(
601 join_type,
602 crate::ast::JoinType::LeftOuter | crate::ast::JoinType::FullOuter
603 );
604
605 let mut result_entries = Vec::new();
607 let left_tables = left.tables().to_vec();
608 let right_tables = right.tables().to_vec();
609 let left_column_counts = left.table_column_counts().to_vec();
610 let right_column_counts = right.table_column_counts().to_vec();
611
612 let left_col_count = left
614 .entries
615 .first()
616 .map(|e| e.row.len())
617 .unwrap_or(0);
618 let right_col_count = right
619 .entries
620 .first()
621 .map(|e| e.row.len())
622 .unwrap_or(0);
623
624 let adjusted_condition = self.adjust_join_condition_indices(condition, &left_tables, &right_tables, left_col_count);
627
628 for left_entry in left.iter() {
629 let mut matched = false;
630
631 for right_entry in right.iter() {
632 let combined = RelationEntry::combine(
634 left_entry,
635 &left_tables,
636 right_entry,
637 &right_tables,
638 );
639
640 if self.eval_predicate(&adjusted_condition, &combined) {
641 matched = true;
642 result_entries.push(combined);
643 }
644 }
645
646 if is_outer && !matched {
647 let combined = RelationEntry::combine_with_null(
648 left_entry,
649 &left_tables,
650 right_col_count,
651 &right_tables,
652 );
653 result_entries.push(combined);
654 }
655 }
656
657 let mut tables = left_tables;
658 tables.extend(right_tables);
659
660 let mut table_column_counts = left_column_counts;
662 table_column_counts.extend(right_column_counts);
663
664 Ok(Relation {
665 entries: result_entries,
666 tables,
667 table_column_counts,
668 })
669 }
670
671 fn execute_index_nested_loop_join(
672 &self,
673 outer: Relation,
674 inner_table: &str,
675 inner_index: &str,
676 condition: &Expr,
677 join_type: crate::ast::JoinType,
678 ) -> ExecutionResult<Relation> {
679 let is_outer = matches!(
680 join_type,
681 crate::ast::JoinType::LeftOuter | crate::ast::JoinType::FullOuter
682 );
683
684 let outer_key_idx = self.extract_outer_key_index(condition, &outer)?;
686 let inner_col_count = self.data_source.get_column_count(inner_table)?;
687
688 let mut result_entries = Vec::new();
689 let outer_tables = outer.tables().to_vec();
690 let outer_column_counts = outer.table_column_counts().to_vec();
691 let inner_tables = alloc::vec![inner_table.into()];
692
693 for outer_entry in outer.iter() {
694 let key_value = outer_entry.get_field(outer_key_idx);
695
696 let inner_rows = if let Some(key) = key_value {
697 if !key.is_null() {
698 self.data_source
699 .get_index_point(inner_table, inner_index, key)?
700 } else {
701 Vec::new()
702 }
703 } else {
704 Vec::new()
705 };
706
707 if inner_rows.is_empty() {
708 if is_outer {
709 let combined = RelationEntry::combine_with_null(
710 outer_entry,
711 &outer_tables,
712 inner_col_count,
713 &inner_tables,
714 );
715 result_entries.push(combined);
716 }
717 } else {
718 for inner_row in inner_rows {
719 let inner_entry = RelationEntry::from_row(inner_row, inner_table);
720 let combined = RelationEntry::combine(
721 outer_entry,
722 &outer_tables,
723 &inner_entry,
724 &inner_tables,
725 );
726 result_entries.push(combined);
727 }
728 }
729 }
730
731 let mut tables = outer_tables;
732 tables.extend(inner_tables);
733
734 let mut table_column_counts = outer_column_counts;
736 table_column_counts.push(inner_col_count);
737
738 Ok(Relation {
739 entries: result_entries,
740 tables,
741 table_column_counts,
742 })
743 }
744
745 fn execute_cross_product(&self, left: Relation, right: Relation) -> ExecutionResult<Relation> {
746 let mut result_entries = Vec::new();
747 let left_tables = left.tables().to_vec();
748 let right_tables = right.tables().to_vec();
749 let left_column_counts = left.table_column_counts().to_vec();
750 let right_column_counts = right.table_column_counts().to_vec();
751
752 for left_entry in left.iter() {
753 for right_entry in right.iter() {
754 let combined = RelationEntry::combine(
755 left_entry,
756 &left_tables,
757 right_entry,
758 &right_tables,
759 );
760 result_entries.push(combined);
761 }
762 }
763
764 let mut tables = left_tables;
765 tables.extend(right_tables);
766
767 let mut table_column_counts = left_column_counts;
769 table_column_counts.extend(right_column_counts);
770
771 Ok(Relation {
772 entries: result_entries,
773 tables,
774 table_column_counts,
775 })
776 }
777
778 fn execute_hash_aggregate(
781 &self,
782 input: Relation,
783 group_by: &[Expr],
784 aggregates: &[(AggregateFunc, Expr)],
785 ) -> ExecutionResult<Relation> {
786 let group_by_indices: Vec<usize> = group_by
788 .iter()
789 .filter_map(|expr| {
790 if let Expr::Column(col) = expr {
791 Some(col.index)
792 } else {
793 None
794 }
795 })
796 .collect();
797
798 let agg_specs: Vec<(AggregateFunc, Option<usize>)> = aggregates
800 .iter()
801 .map(|(func, expr)| {
802 let col_idx = if let Expr::Column(col) = expr {
803 Some(col.index)
804 } else {
805 None
806 };
807 (*func, col_idx)
808 })
809 .collect();
810
811 let executor = AggregateExecutor::new(group_by_indices, agg_specs);
812 Ok(executor.execute(input))
813 }
814
815 fn execute_sort(
818 &self,
819 input: Relation,
820 order_by: &[(Expr, SortOrder)],
821 ) -> ExecutionResult<Relation> {
822 let tables = input.tables().to_vec();
823 let table_column_counts = input.table_column_counts().to_vec();
824 let ctx = EvalContext::new(&tables, &table_column_counts);
825
826 let order_by_indices: Vec<(usize, SortOrder)> = order_by
828 .iter()
829 .filter_map(|(expr, order)| {
830 if let Expr::Column(col) = expr {
831 let actual_index = ctx.resolve_column_index(&col.table, col.index);
832 Some((actual_index, *order))
833 } else {
834 None
835 }
836 })
837 .collect();
838
839 let executor = SortExecutor::new(order_by_indices);
840 Ok(executor.execute(input))
841 }
842
843 fn execute_limit(
846 &self,
847 input: Relation,
848 limit: usize,
849 offset: usize,
850 ) -> ExecutionResult<Relation> {
851 let executor = LimitExecutor::new(limit, offset);
852 Ok(executor.execute(input))
853 }
854
855 fn eval_expr_ctx(&self, expr: &Expr, entry: &RelationEntry, ctx: Option<&EvalContext<'_>>) -> Value {
861 match expr {
862 Expr::Column(col) => {
863 let index = if let Some(c) = ctx {
864 c.resolve_column_index(&col.table, col.index)
865 } else {
866 col.index
867 };
868 entry.get_field(index).cloned().unwrap_or(Value::Null)
869 }
870
871 Expr::Literal(value) => value.clone(),
872
873 Expr::BinaryOp { left, op, right } => {
874 let left_val = self.eval_expr_ctx(left, entry, ctx);
875 let right_val = self.eval_expr_ctx(right, entry, ctx);
876 self.eval_binary_op(*op, &left_val, &right_val)
877 }
878
879 Expr::UnaryOp { op, expr } => {
880 let val = self.eval_expr_ctx(expr, entry, ctx);
881 self.eval_unary_op(*op, &val)
882 }
883
884 Expr::Aggregate { expr, .. } => {
885 if let Some(e) = expr {
886 self.eval_expr_ctx(e, entry, ctx)
887 } else {
888 Value::Int64(1) }
890 }
891
892 Expr::Between { expr, low, high } => {
893 let val = self.eval_expr_ctx(expr, entry, ctx);
894 let low_val = self.eval_expr_ctx(low, entry, ctx);
895 let high_val = self.eval_expr_ctx(high, entry, ctx);
896 Value::Boolean(val >= low_val && val <= high_val)
897 }
898
899 Expr::NotBetween { expr, low, high } => {
900 let val = self.eval_expr_ctx(expr, entry, ctx);
901 let low_val = self.eval_expr_ctx(low, entry, ctx);
902 let high_val = self.eval_expr_ctx(high, entry, ctx);
903 Value::Boolean(val < low_val || val > high_val)
904 }
905
906 Expr::In { expr, list } => {
907 let val = self.eval_expr_ctx(expr, entry, ctx);
908 let in_list = list.iter().any(|item| self.eval_expr_ctx(item, entry, ctx) == val);
909 Value::Boolean(in_list)
910 }
911
912 Expr::NotIn { expr, list } => {
913 let val = self.eval_expr_ctx(expr, entry, ctx);
914 let in_list = list.iter().any(|item| self.eval_expr_ctx(item, entry, ctx) == val);
915 Value::Boolean(!in_list)
916 }
917
918 Expr::Like { expr, pattern } => {
919 let val = self.eval_expr_ctx(expr, entry, ctx);
920 if let Value::String(s) = val {
921 Value::Boolean(self.match_like_pattern(&s, pattern))
922 } else {
923 Value::Boolean(false)
924 }
925 }
926
927 Expr::NotLike { expr, pattern } => {
928 let val = self.eval_expr_ctx(expr, entry, ctx);
929 if let Value::String(s) = val {
930 Value::Boolean(!self.match_like_pattern(&s, pattern))
931 } else {
932 Value::Boolean(true)
933 }
934 }
935
936 Expr::Match { expr, pattern } => {
937 let val = self.eval_expr_ctx(expr, entry, ctx);
938 if let Value::String(s) = val {
939 Value::Boolean(self.match_regex_pattern(&s, pattern))
940 } else {
941 Value::Boolean(false)
942 }
943 }
944
945 Expr::NotMatch { expr, pattern } => {
946 let val = self.eval_expr_ctx(expr, entry, ctx);
947 if let Value::String(s) = val {
948 Value::Boolean(!self.match_regex_pattern(&s, pattern))
949 } else {
950 Value::Boolean(true)
951 }
952 }
953
954 Expr::Function { name, args } => {
955 let arg_values: Vec<Value> = args.iter().map(|a| self.eval_expr_ctx(a, entry, ctx)).collect();
956 self.eval_function(name, &arg_values)
957 }
958 }
959 }
960
961 #[inline]
963 fn eval_expr(&self, expr: &Expr, entry: &RelationEntry) -> Value {
964 self.eval_expr_ctx(expr, entry, None)
965 }
966
967 #[inline]
969 fn eval_predicate(&self, expr: &Expr, entry: &RelationEntry) -> bool {
970 match self.eval_expr(expr, entry) {
971 Value::Boolean(b) => b,
972 Value::Null => false,
973 _ => false,
974 }
975 }
976
977 #[inline]
979 fn eval_predicate_ctx(&self, expr: &Expr, entry: &RelationEntry, ctx: &EvalContext<'_>) -> bool {
980 match self.eval_expr_ctx(expr, entry, Some(ctx)) {
981 Value::Boolean(b) => b,
982 Value::Null => false,
983 _ => false,
984 }
985 }
986
987 fn eval_binary_op(&self, op: BinaryOp, left: &Value, right: &Value) -> Value {
988 if left.is_null() || right.is_null() {
990 return match op {
991 BinaryOp::And => {
992 if let Value::Boolean(false) = left {
994 return Value::Boolean(false);
995 }
996 if let Value::Boolean(false) = right {
997 return Value::Boolean(false);
998 }
999 Value::Null
1000 }
1001 BinaryOp::Or => {
1002 if let Value::Boolean(true) = left {
1004 return Value::Boolean(true);
1005 }
1006 if let Value::Boolean(true) = right {
1007 return Value::Boolean(true);
1008 }
1009 Value::Null
1010 }
1011 _ => Value::Null,
1012 };
1013 }
1014
1015 match op {
1016 BinaryOp::Eq => Value::Boolean(left == right),
1017 BinaryOp::Ne => Value::Boolean(left != right),
1018 BinaryOp::Lt => Value::Boolean(left < right),
1019 BinaryOp::Le => Value::Boolean(left <= right),
1020 BinaryOp::Gt => Value::Boolean(left > right),
1021 BinaryOp::Ge => Value::Boolean(left >= right),
1022 BinaryOp::And => {
1023 let l = matches!(left, Value::Boolean(true));
1024 let r = matches!(right, Value::Boolean(true));
1025 Value::Boolean(l && r)
1026 }
1027 BinaryOp::Or => {
1028 let l = matches!(left, Value::Boolean(true));
1029 let r = matches!(right, Value::Boolean(true));
1030 Value::Boolean(l || r)
1031 }
1032 BinaryOp::Add => self.eval_arithmetic(left, right, |a, b| a + b),
1033 BinaryOp::Sub => self.eval_arithmetic(left, right, |a, b| a - b),
1034 BinaryOp::Mul => self.eval_arithmetic(left, right, |a, b| a * b),
1035 BinaryOp::Div => {
1036 match right {
1038 Value::Int32(0) | Value::Int64(0) => Value::Null,
1039 Value::Float64(f) if *f == 0.0 => Value::Null,
1040 _ => self.eval_arithmetic(left, right, |a, b| if b != 0.0 { a / b } else { 0.0 }),
1041 }
1042 }
1043 BinaryOp::Mod => {
1044 match (left, right) {
1045 (Value::Int64(a), Value::Int64(b)) if *b != 0 => Value::Int64(a % b),
1046 (Value::Int32(a), Value::Int32(b)) if *b != 0 => Value::Int32(a % b),
1047 _ => Value::Null,
1048 }
1049 }
1050 BinaryOp::Like | BinaryOp::In | BinaryOp::Between => {
1051 Value::Null
1053 }
1054 }
1055 }
1056
1057 fn eval_arithmetic<F>(&self, left: &Value, right: &Value, op: F) -> Value
1058 where
1059 F: Fn(f64, f64) -> f64,
1060 {
1061 let l = match left {
1062 Value::Int32(i) => *i as f64,
1063 Value::Int64(i) => *i as f64,
1064 Value::Float64(f) => *f,
1065 _ => return Value::Null,
1066 };
1067 let r = match right {
1068 Value::Int32(i) => *i as f64,
1069 Value::Int64(i) => *i as f64,
1070 Value::Float64(f) => *f,
1071 _ => return Value::Null,
1072 };
1073
1074 let result = op(l, r);
1075
1076 match (left, right) {
1078 (Value::Int64(_), Value::Int64(_)) => Value::Int64(result as i64),
1079 (Value::Int32(_), Value::Int32(_)) => Value::Int32(result as i32),
1080 _ => Value::Float64(result),
1081 }
1082 }
1083
1084 fn eval_unary_op(&self, op: UnaryOp, value: &Value) -> Value {
1085 match op {
1086 UnaryOp::Not => match value {
1087 Value::Boolean(b) => Value::Boolean(!b),
1088 Value::Null => Value::Null,
1089 _ => Value::Null,
1090 },
1091 UnaryOp::Neg => match value {
1092 Value::Int32(i) => Value::Int32(-i),
1093 Value::Int64(i) => Value::Int64(-i),
1094 Value::Float64(f) => Value::Float64(-f),
1095 _ => Value::Null,
1096 },
1097 UnaryOp::IsNull => Value::Boolean(value.is_null()),
1098 UnaryOp::IsNotNull => Value::Boolean(!value.is_null()),
1099 }
1100 }
1101
1102 fn eval_function(&self, name: &str, args: &[Value]) -> Value {
1103 match name.to_uppercase().as_str() {
1104 "ABS" => {
1105 if let Some(v) = args.first() {
1106 match v {
1107 Value::Int32(i) => Value::Int32(i.abs()),
1108 Value::Int64(i) => Value::Int64(i.abs()),
1109 Value::Float64(f) => Value::Float64(f.abs()),
1110 _ => Value::Null,
1111 }
1112 } else {
1113 Value::Null
1114 }
1115 }
1116 "UPPER" => {
1117 if let Some(Value::String(s)) = args.first() {
1118 Value::String(s.to_uppercase().into())
1119 } else {
1120 Value::Null
1121 }
1122 }
1123 "LOWER" => {
1124 if let Some(Value::String(s)) = args.first() {
1125 Value::String(s.to_lowercase().into())
1126 } else {
1127 Value::Null
1128 }
1129 }
1130 "LENGTH" => {
1131 if let Some(Value::String(s)) = args.first() {
1132 Value::Int64(s.len() as i64)
1133 } else {
1134 Value::Null
1135 }
1136 }
1137 "COALESCE" => {
1138 for arg in args {
1139 if !arg.is_null() {
1140 return arg.clone();
1141 }
1142 }
1143 Value::Null
1144 }
1145 "JSONB_PATH_EQ" => {
1147 if args.len() >= 3 {
1148 if let (Value::Jsonb(jsonb), Value::String(path)) = (&args[0], &args[1]) {
1149 let expected = &args[2];
1150 return self.jsonb_path_eq(jsonb, path, expected);
1151 }
1152 }
1153 Value::Boolean(false)
1154 }
1155 "JSONB_CONTAINS" => {
1157 if args.len() >= 2 {
1158 if let (Value::Jsonb(jsonb), Value::String(path)) = (&args[0], &args[1]) {
1159 return self.jsonb_path_exists(jsonb, path);
1160 }
1161 }
1162 Value::Boolean(false)
1163 }
1164 "JSONB_EXISTS" => {
1166 if args.len() >= 2 {
1167 if let (Value::Jsonb(jsonb), Value::String(path)) = (&args[0], &args[1]) {
1168 return self.jsonb_path_exists(jsonb, path);
1169 }
1170 }
1171 Value::Boolean(false)
1172 }
1173 _ => Value::Null,
1174 }
1175 }
1176
1177 fn match_like_pattern(&self, value: &str, pattern: &str) -> bool {
1178 let pattern_chars: Vec<char> = pattern.chars().collect();
1180 let value_chars: Vec<char> = value.chars().collect();
1181 self.match_like_recursive(&value_chars, &pattern_chars, 0, 0)
1182 }
1183
1184 fn match_regex_pattern(&self, value: &str, pattern: &str) -> bool {
1187 let compiled = match self.compile_regex(pattern) {
1188 Some(c) => c,
1189 None => return false,
1190 };
1191 self.regex_match_compiled(value, &compiled)
1192 }
1193
1194 fn compile_regex(&self, pattern: &str) -> Option<Vec<RegexOp>> {
1196 let mut ops = Vec::new();
1197 let mut chars = pattern.chars().peekable();
1198 let mut in_group = false;
1199
1200 while let Some(c) = chars.next() {
1201 match c {
1202 '^' if ops.is_empty() => ops.push(RegexOp::Start),
1203 '$' if chars.peek().is_none() => ops.push(RegexOp::End),
1204 '.' => self.apply_quantifier(&mut chars, &mut ops, RegexOp::Any),
1205 '*' | '+' | '?' => return None, '\\' => {
1207 let escaped = chars.next()?;
1208 let op = match escaped {
1209 'd' => RegexOp::Digit,
1210 'D' => RegexOp::NonDigit,
1211 'w' => RegexOp::Word,
1212 'W' => RegexOp::NonWord,
1213 's' => RegexOp::Whitespace,
1214 'S' => RegexOp::NonWhitespace,
1215 'n' => RegexOp::Char('\n'),
1216 't' => RegexOp::Char('\t'),
1217 'r' => RegexOp::Char('\r'),
1218 _ => RegexOp::Char(escaped),
1219 };
1220 self.apply_quantifier(&mut chars, &mut ops, op);
1221 }
1222 '[' => {
1223 let (class_op, negated) = self.parse_char_class(&mut chars)?;
1224 let op = if negated {
1225 RegexOp::NegCharClass(class_op)
1226 } else {
1227 RegexOp::CharClass(class_op)
1228 };
1229 self.apply_quantifier(&mut chars, &mut ops, op);
1230 }
1231 '(' => {
1232 in_group = true;
1233 ops.push(RegexOp::GroupStart);
1234 }
1235 ')' => {
1236 if !in_group {
1237 return None;
1238 }
1239 in_group = false;
1240 ops.push(RegexOp::GroupEnd);
1241 if let Some(&q) = chars.peek() {
1243 match q {
1244 '*' => {
1245 chars.next();
1246 ops.push(RegexOp::GroupStar);
1247 }
1248 '+' => {
1249 chars.next();
1250 ops.push(RegexOp::GroupPlus);
1251 }
1252 '?' => {
1253 chars.next();
1254 ops.push(RegexOp::GroupQuestion);
1255 }
1256 _ => {}
1257 }
1258 }
1259 }
1260 '|' => ops.push(RegexOp::Alternation),
1261 _ => self.apply_quantifier(&mut chars, &mut ops, RegexOp::Char(c)),
1262 }
1263 }
1264
1265 if in_group {
1266 return None; }
1268
1269 Some(ops)
1270 }
1271
1272 fn apply_quantifier(
1273 &self,
1274 chars: &mut core::iter::Peekable<core::str::Chars>,
1275 ops: &mut Vec<RegexOp>,
1276 base_op: RegexOp,
1277 ) {
1278 if let Some(&q) = chars.peek() {
1279 match q {
1280 '*' => {
1281 chars.next();
1282 ops.push(RegexOp::Star(Box::new(base_op)));
1283 }
1284 '+' => {
1285 chars.next();
1286 ops.push(RegexOp::Plus(Box::new(base_op)));
1287 }
1288 '?' => {
1289 chars.next();
1290 ops.push(RegexOp::Question(Box::new(base_op)));
1291 }
1292 _ => ops.push(base_op),
1293 }
1294 } else {
1295 ops.push(base_op);
1296 }
1297 }
1298
1299 fn parse_char_class(
1300 &self,
1301 chars: &mut core::iter::Peekable<core::str::Chars>,
1302 ) -> Option<(Vec<CharClassItem>, bool)> {
1303 let mut items = Vec::new();
1304 let negated = chars.peek() == Some(&'^');
1305 if negated {
1306 chars.next();
1307 }
1308
1309 while let Some(c) = chars.next() {
1310 if c == ']' {
1311 return Some((items, negated));
1312 }
1313 if c == '\\' {
1314 let escaped = chars.next()?;
1315 items.push(CharClassItem::Char(match escaped {
1316 'n' => '\n',
1317 't' => '\t',
1318 'r' => '\r',
1319 'd' => {
1320 items.push(CharClassItem::Range('0', '9'));
1321 continue;
1322 }
1323 'w' => {
1324 items.push(CharClassItem::Range('a', 'z'));
1325 items.push(CharClassItem::Range('A', 'Z'));
1326 items.push(CharClassItem::Range('0', '9'));
1327 items.push(CharClassItem::Char('_'));
1328 continue;
1329 }
1330 's' => {
1331 items.push(CharClassItem::Char(' '));
1332 items.push(CharClassItem::Char('\t'));
1333 items.push(CharClassItem::Char('\n'));
1334 items.push(CharClassItem::Char('\r'));
1335 continue;
1336 }
1337 _ => escaped,
1338 }));
1339 } else if chars.peek() == Some(&'-') {
1340 chars.next(); if let Some(end) = chars.next() {
1342 if end == ']' {
1343 items.push(CharClassItem::Char(c));
1344 items.push(CharClassItem::Char('-'));
1345 return Some((items, negated));
1346 }
1347 items.push(CharClassItem::Range(c, end));
1348 } else {
1349 items.push(CharClassItem::Char(c));
1350 items.push(CharClassItem::Char('-'));
1351 }
1352 } else {
1353 items.push(CharClassItem::Char(c));
1354 }
1355 }
1356 None }
1358
1359 fn regex_match_compiled(&self, value: &str, ops: &[RegexOp]) -> bool {
1360 let chars: Vec<char> = value.chars().collect();
1361
1362 let alternatives = self.split_alternatives(ops);
1364 if alternatives.len() > 1 {
1365 return alternatives.iter().any(|alt| self.regex_match_ops(&chars, alt, 0, 0));
1366 }
1367
1368 let has_start = ops.first() == Some(&RegexOp::Start);
1370 let ops_to_match = if has_start { &ops[1..] } else { ops };
1371
1372 if has_start {
1373 self.regex_match_ops(&chars, ops_to_match, 0, 0)
1374 } else {
1375 for start in 0..=chars.len() {
1377 if self.regex_match_ops(&chars, ops_to_match, start, 0) {
1378 return true;
1379 }
1380 }
1381 false
1382 }
1383 }
1384
1385 fn split_alternatives(&self, ops: &[RegexOp]) -> Vec<Vec<RegexOp>> {
1386 let mut alternatives = Vec::new();
1387 let mut current = Vec::new();
1388 let mut depth = 0;
1389
1390 for op in ops {
1391 match op {
1392 RegexOp::GroupStart => {
1393 depth += 1;
1394 current.push(op.clone());
1395 }
1396 RegexOp::GroupEnd => {
1397 depth -= 1;
1398 current.push(op.clone());
1399 }
1400 RegexOp::Alternation if depth == 0 => {
1401 alternatives.push(core::mem::take(&mut current));
1402 }
1403 _ => current.push(op.clone()),
1404 }
1405 }
1406 alternatives.push(current);
1407 alternatives
1408 }
1409
1410 fn regex_match_ops(&self, chars: &[char], ops: &[RegexOp], pos: usize, op_idx: usize) -> bool {
1411 if op_idx >= ops.len() {
1412 return true; }
1414
1415 let op = &ops[op_idx];
1416 match op {
1417 RegexOp::End => pos == chars.len(),
1418 RegexOp::Start => self.regex_match_ops(chars, ops, pos, op_idx + 1),
1419 RegexOp::Char(c) => {
1420 pos < chars.len()
1421 && chars[pos] == *c
1422 && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1423 }
1424 RegexOp::Any => {
1425 pos < chars.len() && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1426 }
1427 RegexOp::Digit => {
1428 pos < chars.len()
1429 && chars[pos].is_ascii_digit()
1430 && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1431 }
1432 RegexOp::NonDigit => {
1433 pos < chars.len()
1434 && !chars[pos].is_ascii_digit()
1435 && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1436 }
1437 RegexOp::Word => {
1438 pos < chars.len()
1439 && (chars[pos].is_ascii_alphanumeric() || chars[pos] == '_')
1440 && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1441 }
1442 RegexOp::NonWord => {
1443 pos < chars.len()
1444 && !(chars[pos].is_ascii_alphanumeric() || chars[pos] == '_')
1445 && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1446 }
1447 RegexOp::Whitespace => {
1448 pos < chars.len()
1449 && chars[pos].is_ascii_whitespace()
1450 && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1451 }
1452 RegexOp::NonWhitespace => {
1453 pos < chars.len()
1454 && !chars[pos].is_ascii_whitespace()
1455 && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1456 }
1457 RegexOp::CharClass(items) => {
1458 pos < chars.len()
1459 && self.char_matches_class(chars[pos], items)
1460 && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1461 }
1462 RegexOp::NegCharClass(items) => {
1463 pos < chars.len()
1464 && !self.char_matches_class(chars[pos], items)
1465 && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1466 }
1467 RegexOp::Star(inner) => {
1468 let mut p = pos;
1470 let mut positions = vec![p];
1471 while p < chars.len() && self.single_op_matches(chars[p], inner) {
1472 p += 1;
1473 positions.push(p);
1474 }
1475 for &try_pos in positions.iter().rev() {
1477 if self.regex_match_ops(chars, ops, try_pos, op_idx + 1) {
1478 return true;
1479 }
1480 }
1481 false
1482 }
1483 RegexOp::Plus(inner) => {
1484 if pos >= chars.len() || !self.single_op_matches(chars[pos], inner) {
1486 return false;
1487 }
1488 let mut p = pos + 1;
1489 let mut positions = vec![p];
1490 while p < chars.len() && self.single_op_matches(chars[p], inner) {
1491 p += 1;
1492 positions.push(p);
1493 }
1494 for &try_pos in positions.iter().rev() {
1495 if self.regex_match_ops(chars, ops, try_pos, op_idx + 1) {
1496 return true;
1497 }
1498 }
1499 false
1500 }
1501 RegexOp::Question(inner) => {
1502 if pos < chars.len() && self.single_op_matches(chars[pos], inner) {
1504 if self.regex_match_ops(chars, ops, pos + 1, op_idx + 1) {
1505 return true;
1506 }
1507 }
1508 self.regex_match_ops(chars, ops, pos, op_idx + 1)
1509 }
1510 RegexOp::GroupStart | RegexOp::GroupEnd => {
1511 self.regex_match_ops(chars, ops, pos, op_idx + 1)
1512 }
1513 RegexOp::GroupStar | RegexOp::GroupPlus | RegexOp::GroupQuestion => {
1514 self.regex_match_ops(chars, ops, pos, op_idx + 1)
1516 }
1517 RegexOp::Alternation => {
1518 self.regex_match_ops(chars, ops, pos, op_idx + 1)
1520 }
1521 }
1522 }
1523
1524 fn single_op_matches(&self, c: char, op: &RegexOp) -> bool {
1525 match op {
1526 RegexOp::Char(expected) => c == *expected,
1527 RegexOp::Any => true,
1528 RegexOp::Digit => c.is_ascii_digit(),
1529 RegexOp::NonDigit => !c.is_ascii_digit(),
1530 RegexOp::Word => c.is_ascii_alphanumeric() || c == '_',
1531 RegexOp::NonWord => !(c.is_ascii_alphanumeric() || c == '_'),
1532 RegexOp::Whitespace => c.is_ascii_whitespace(),
1533 RegexOp::NonWhitespace => !c.is_ascii_whitespace(),
1534 RegexOp::CharClass(items) => self.char_matches_class(c, items),
1535 RegexOp::NegCharClass(items) => !self.char_matches_class(c, items),
1536 _ => false,
1537 }
1538 }
1539
1540 fn char_matches_class(&self, c: char, items: &[CharClassItem]) -> bool {
1541 items.iter().any(|item| match item {
1542 CharClassItem::Char(ch) => c == *ch,
1543 CharClassItem::Range(start, end) => c >= *start && c <= *end,
1544 })
1545 }
1546
1547 fn match_like_recursive(
1548 &self,
1549 value: &[char],
1550 pattern: &[char],
1551 vi: usize,
1552 pi: usize,
1553 ) -> bool {
1554 if pi >= pattern.len() {
1555 return vi >= value.len();
1556 }
1557
1558 match pattern[pi] {
1559 '%' => {
1560 for i in vi..=value.len() {
1562 if self.match_like_recursive(value, pattern, i, pi + 1) {
1563 return true;
1564 }
1565 }
1566 false
1567 }
1568 '_' => {
1569 if vi < value.len() {
1571 self.match_like_recursive(value, pattern, vi + 1, pi + 1)
1572 } else {
1573 false
1574 }
1575 }
1576 c => {
1577 if vi < value.len() && value[vi] == c {
1579 self.match_like_recursive(value, pattern, vi + 1, pi + 1)
1580 } else {
1581 false
1582 }
1583 }
1584 }
1585 }
1586
1587 fn parse_json_bytes(&self, bytes: &[u8]) -> Option<JsonbValue> {
1591 let json_str = core::str::from_utf8(bytes).ok()?;
1592 self.parse_json_str(json_str)
1593 }
1594
1595 fn parse_json_str(&self, s: &str) -> Option<JsonbValue> {
1597 let s = s.trim();
1599 if s == "null" {
1600 return Some(JsonbValue::Null);
1601 }
1602 if s == "true" {
1603 return Some(JsonbValue::Bool(true));
1604 }
1605 if s == "false" {
1606 return Some(JsonbValue::Bool(false));
1607 }
1608 if let Ok(n) = s.parse::<f64>() {
1610 return Some(JsonbValue::Number(n));
1611 }
1612 if s.starts_with('"') && s.ends_with('"') && s.len() >= 2 {
1614 let inner = &s[1..s.len() - 1];
1615 let unescaped = self.unescape_json_string(inner);
1617 return Some(JsonbValue::String(unescaped));
1618 }
1619 if s.starts_with('{') && s.ends_with('}') {
1621 return self.parse_json_object(s);
1622 }
1623 if s.starts_with('[') && s.ends_with(']') {
1625 return self.parse_json_array(s);
1626 }
1627 None
1628 }
1629
1630 fn unescape_json_string(&self, s: &str) -> String {
1632 let mut result = String::new();
1633 let mut chars = s.chars().peekable();
1634 while let Some(c) = chars.next() {
1635 if c == '\\' {
1636 if let Some(&next) = chars.peek() {
1637 match next {
1638 '"' | '\\' | '/' => {
1639 result.push(next);
1640 chars.next();
1641 }
1642 'n' => {
1643 result.push('\n');
1644 chars.next();
1645 }
1646 'r' => {
1647 result.push('\r');
1648 chars.next();
1649 }
1650 't' => {
1651 result.push('\t');
1652 chars.next();
1653 }
1654 _ => result.push(c),
1655 }
1656 } else {
1657 result.push(c);
1658 }
1659 } else {
1660 result.push(c);
1661 }
1662 }
1663 result
1664 }
1665
1666 fn parse_json_object(&self, s: &str) -> Option<JsonbValue> {
1668 let inner = s[1..s.len() - 1].trim();
1669 if inner.is_empty() {
1670 return Some(JsonbValue::Object(JsonbObject::new()));
1671 }
1672
1673 let mut obj = JsonbObject::new();
1674 let mut depth = 0;
1675 let mut in_string = false;
1676 let mut escape = false;
1677 let mut start = 0;
1678
1679 for (i, c) in inner.char_indices() {
1680 if escape {
1681 escape = false;
1682 continue;
1683 }
1684 match c {
1685 '\\' if in_string => escape = true,
1686 '"' => in_string = !in_string,
1687 '{' | '[' if !in_string => depth += 1,
1688 '}' | ']' if !in_string => depth -= 1,
1689 ',' if !in_string && depth == 0 => {
1690 if let Some((k, v)) = self.parse_json_kv(&inner[start..i]) {
1691 obj.insert(k, v);
1692 }
1693 start = i + 1;
1694 }
1695 _ => {}
1696 }
1697 }
1698 if start < inner.len() {
1700 if let Some((k, v)) = self.parse_json_kv(&inner[start..]) {
1701 obj.insert(k, v);
1702 }
1703 }
1704
1705 Some(JsonbValue::Object(obj))
1706 }
1707
1708 fn parse_json_kv(&self, s: &str) -> Option<(String, JsonbValue)> {
1710 let s = s.trim();
1711 let colon_pos = s.find(':')?;
1712 let key_part = s[..colon_pos].trim();
1713 let value_part = s[colon_pos + 1..].trim();
1714
1715 if key_part.starts_with('"') && key_part.ends_with('"') && key_part.len() >= 2 {
1717 let key = self.unescape_json_string(&key_part[1..key_part.len() - 1]);
1718 let value = self.parse_json_str(value_part)?;
1719 Some((key, value))
1720 } else {
1721 None
1722 }
1723 }
1724
1725 fn parse_json_array(&self, s: &str) -> Option<JsonbValue> {
1727 let inner = s[1..s.len() - 1].trim();
1728 if inner.is_empty() {
1729 return Some(JsonbValue::Array(Vec::new()));
1730 }
1731
1732 let mut items = Vec::new();
1733 let mut depth = 0;
1734 let mut in_string = false;
1735 let mut escape = false;
1736 let mut start = 0;
1737
1738 for (i, c) in inner.char_indices() {
1739 if escape {
1740 escape = false;
1741 continue;
1742 }
1743 match c {
1744 '\\' if in_string => escape = true,
1745 '"' => in_string = !in_string,
1746 '{' | '[' if !in_string => depth += 1,
1747 '}' | ']' if !in_string => depth -= 1,
1748 ',' if !in_string && depth == 0 => {
1749 if let Some(v) = self.parse_json_str(inner[start..i].trim()) {
1750 items.push(v);
1751 }
1752 start = i + 1;
1753 }
1754 _ => {}
1755 }
1756 }
1757 if start < inner.len() {
1759 if let Some(v) = self.parse_json_str(inner[start..].trim()) {
1760 items.push(v);
1761 }
1762 }
1763
1764 Some(JsonbValue::Array(items))
1765 }
1766
1767 fn jsonb_path_eq(&self, jsonb: &cynos_core::JsonbValue, path: &str, expected: &Value) -> Value {
1769 let json_value = match self.parse_json_bytes(&jsonb.0) {
1771 Some(v) => v,
1772 None => return Value::Boolean(false),
1773 };
1774
1775 let json_path = match JsonPath::parse(path) {
1777 Ok(p) => p,
1778 Err(_) => return Value::Boolean(false),
1779 };
1780
1781 let results = json_value.query(&json_path);
1783 if results.is_empty() {
1784 return Value::Boolean(false);
1785 }
1786
1787 let actual = results[0];
1789 Value::Boolean(self.compare_jsonb_value(actual, expected))
1790 }
1791
1792 fn jsonb_path_exists(&self, jsonb: &cynos_core::JsonbValue, path: &str) -> Value {
1794 let json_value = match self.parse_json_bytes(&jsonb.0) {
1796 Some(v) => v,
1797 None => return Value::Boolean(false),
1798 };
1799
1800 let json_path = match JsonPath::parse(path) {
1802 Ok(p) => p,
1803 Err(_) => return Value::Boolean(false),
1804 };
1805
1806 let results = json_value.query(&json_path);
1808 Value::Boolean(!results.is_empty())
1809 }
1810
1811 fn compare_jsonb_value(&self, jsonb: &JsonbValue, value: &Value) -> bool {
1813 match (jsonb, value) {
1814 (JsonbValue::Null, Value::Null) => true,
1815 (JsonbValue::Bool(a), Value::Boolean(b)) => a == b,
1816 (JsonbValue::Number(a), Value::Int32(b)) => (*a - *b as f64).abs() < f64::EPSILON,
1817 (JsonbValue::Number(a), Value::Int64(b)) => (*a - *b as f64).abs() < f64::EPSILON,
1818 (JsonbValue::Number(a), Value::Float64(b)) => (*a - *b).abs() < f64::EPSILON,
1819 (JsonbValue::String(a), Value::String(b)) => a == b,
1820 _ => false,
1821 }
1822 }
1823
1824 fn extract_join_keys(
1828 &self,
1829 condition: &Expr,
1830 left: &Relation,
1831 right: &Relation,
1832 ) -> ExecutionResult<(usize, usize)> {
1833 if let Expr::BinaryOp {
1834 left: left_expr,
1835 op: BinaryOp::Eq,
1836 right: right_expr,
1837 } = condition
1838 {
1839 let left_col = self.extract_column_ref(left_expr)?;
1840 let right_col = self.extract_column_ref(right_expr)?;
1841
1842 let left_tables = left.tables();
1844 let right_tables = right.tables();
1845 let left_ctx = EvalContext::new(left_tables, left.table_column_counts());
1846 let right_ctx = EvalContext::new(right_tables, right.table_column_counts());
1847
1848 if left_tables.contains(&left_col.table) && right_tables.contains(&right_col.table) {
1849 let left_idx = left_ctx.resolve_column_index(&left_col.table, left_col.index);
1851 let right_idx = right_ctx.resolve_column_index(&right_col.table, right_col.index);
1852 Ok((left_idx, right_idx))
1853 } else if left_tables.contains(&right_col.table)
1854 && right_tables.contains(&left_col.table)
1855 {
1856 let left_idx = left_ctx.resolve_column_index(&right_col.table, right_col.index);
1858 let right_idx = right_ctx.resolve_column_index(&left_col.table, left_col.index);
1859 Ok((left_idx, right_idx))
1860 } else {
1861 Err(ExecutionError::InvalidOperation(
1862 "Join columns do not match relation tables".into(),
1863 ))
1864 }
1865 } else {
1866 Err(ExecutionError::InvalidOperation(
1867 "Expected equi-join condition".into(),
1868 ))
1869 }
1870 }
1871
1872 fn extract_outer_key_index(
1873 &self,
1874 condition: &Expr,
1875 outer: &Relation,
1876 ) -> ExecutionResult<usize> {
1877 if let Expr::BinaryOp {
1878 left: left_expr,
1879 op: BinaryOp::Eq,
1880 right: right_expr,
1881 } = condition
1882 {
1883 let left_col = self.extract_column_ref(left_expr)?;
1884 let right_col = self.extract_column_ref(right_expr)?;
1885
1886 let outer_tables = outer.tables();
1887
1888 if outer_tables.contains(&left_col.table) {
1889 Ok(left_col.index)
1890 } else if outer_tables.contains(&right_col.table) {
1891 Ok(right_col.index)
1892 } else {
1893 Err(ExecutionError::InvalidOperation(
1894 "Outer key column not found in outer relation".into(),
1895 ))
1896 }
1897 } else {
1898 Err(ExecutionError::InvalidOperation(
1899 "Expected equi-join condition".into(),
1900 ))
1901 }
1902 }
1903
1904 fn extract_column_ref<'b>(&self, expr: &'b Expr) -> ExecutionResult<&'b ColumnRef> {
1905 if let Expr::Column(col) = expr {
1906 Ok(col)
1907 } else {
1908 Err(ExecutionError::InvalidOperation(
1909 "Expected column reference".into(),
1910 ))
1911 }
1912 }
1913
1914 fn adjust_join_condition_indices(
1918 &self,
1919 condition: &Expr,
1920 left_tables: &[String],
1921 right_tables: &[String],
1922 left_col_count: usize,
1923 ) -> Expr {
1924 match condition {
1925 Expr::Column(col) => {
1926 if right_tables.contains(&col.table) {
1928 Expr::Column(ColumnRef {
1930 table: col.table.clone(),
1931 column: col.column.clone(),
1932 index: col.index + left_col_count,
1933 })
1934 } else {
1935 condition.clone()
1937 }
1938 }
1939 Expr::BinaryOp { left, op, right } => {
1940 let adjusted_left = self.adjust_join_condition_indices(left, left_tables, right_tables, left_col_count);
1941 let adjusted_right = self.adjust_join_condition_indices(right, left_tables, right_tables, left_col_count);
1942 Expr::BinaryOp {
1943 left: Box::new(adjusted_left),
1944 op: *op,
1945 right: Box::new(adjusted_right),
1946 }
1947 }
1948 Expr::UnaryOp { op, expr } => {
1949 let adjusted_expr = self.adjust_join_condition_indices(expr, left_tables, right_tables, left_col_count);
1950 Expr::UnaryOp {
1951 op: *op,
1952 expr: Box::new(adjusted_expr),
1953 }
1954 }
1955 _ => condition.clone(),
1957 }
1958 }
1959}
1960
1961#[derive(Default)]
1963pub struct InMemoryDataSource {
1964 tables: BTreeMap<String, TableData>,
1965}
1966
1967#[derive(Default)]
1969struct TableData {
1970 rows: Vec<Rc<Row>>,
1971 indexes: BTreeMap<String, IndexData>,
1972 column_count: usize,
1973}
1974
1975struct IndexData {
1977 key_to_rows: BTreeMap<Value, Vec<usize>>,
1979}
1980
1981impl InMemoryDataSource {
1982 pub fn new() -> Self {
1984 Self::default()
1985 }
1986
1987 pub fn add_table(&mut self, name: impl Into<String>, rows: Vec<Row>, column_count: usize) {
1989 self.tables.insert(
1990 name.into(),
1991 TableData {
1992 rows: rows.into_iter().map(Rc::new).collect(),
1993 indexes: BTreeMap::new(),
1994 column_count,
1995 },
1996 );
1997 }
1998
1999 pub fn create_index(
2001 &mut self,
2002 table: &str,
2003 index_name: impl Into<String>,
2004 column_index: usize,
2005 ) -> ExecutionResult<()> {
2006 let table_data = self
2007 .tables
2008 .get_mut(table)
2009 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
2010
2011 let mut key_to_rows: BTreeMap<Value, Vec<usize>> = BTreeMap::new();
2012
2013 for (row_idx, row) in table_data.rows.iter().enumerate() {
2014 if let Some(key) = row.get(column_index) {
2015 key_to_rows.entry(key.clone()).or_default().push(row_idx);
2016 }
2017 }
2018
2019 table_data
2020 .indexes
2021 .insert(index_name.into(), IndexData { key_to_rows });
2022
2023 Ok(())
2024 }
2025}
2026
2027impl DataSource for InMemoryDataSource {
2028 fn get_table_rows(&self, table: &str) -> ExecutionResult<Vec<Rc<Row>>> {
2029 self.tables
2030 .get(table)
2031 .map(|t| t.rows.clone())
2032 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))
2033 }
2034
2035 fn get_index_range(
2036 &self,
2037 table: &str,
2038 index: &str,
2039 range_start: Option<&Value>,
2040 range_end: Option<&Value>,
2041 include_start: bool,
2042 include_end: bool,
2043 ) -> ExecutionResult<Vec<Rc<Row>>> {
2044 let table_data = self
2045 .tables
2046 .get(table)
2047 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
2048
2049 let index_data = table_data.indexes.get(index).ok_or_else(|| {
2050 ExecutionError::IndexNotFound {
2051 table: table.into(),
2052 index: index.into(),
2053 }
2054 })?;
2055
2056 let mut result = Vec::new();
2057
2058 for (key, row_indices) in &index_data.key_to_rows {
2059 let in_range = match (range_start, range_end) {
2060 (Some(start), Some(end)) => {
2061 let start_ok = if include_start {
2062 key >= start
2063 } else {
2064 key > start
2065 };
2066 let end_ok = if include_end { key <= end } else { key < end };
2067 start_ok && end_ok
2068 }
2069 (Some(start), None) => {
2070 if include_start {
2071 key >= start
2072 } else {
2073 key > start
2074 }
2075 }
2076 (None, Some(end)) => {
2077 if include_end {
2078 key <= end
2079 } else {
2080 key < end
2081 }
2082 }
2083 (None, None) => true,
2084 };
2085
2086 if in_range {
2087 for &idx in row_indices {
2088 result.push(Rc::clone(&table_data.rows[idx]));
2089 }
2090 }
2091 }
2092
2093 Ok(result)
2094 }
2095
2096 fn get_index_range_with_limit(
2097 &self,
2098 table: &str,
2099 index: &str,
2100 range_start: Option<&Value>,
2101 range_end: Option<&Value>,
2102 include_start: bool,
2103 include_end: bool,
2104 limit: Option<usize>,
2105 offset: usize,
2106 reverse: bool,
2107 ) -> ExecutionResult<Vec<Rc<Row>>> {
2108 let table_data = self
2109 .tables
2110 .get(table)
2111 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
2112
2113 let index_data = table_data.indexes.get(index).ok_or_else(|| {
2114 ExecutionError::IndexNotFound {
2115 table: table.into(),
2116 index: index.into(),
2117 }
2118 })?;
2119
2120 let keys_in_range: Vec<&Value> = index_data
2122 .key_to_rows
2123 .keys()
2124 .filter(|key| {
2125 match (range_start, range_end) {
2126 (Some(start), Some(end)) => {
2127 let start_ok = if include_start {
2128 *key >= start
2129 } else {
2130 *key > start
2131 };
2132 let end_ok = if include_end { *key <= end } else { *key < end };
2133 start_ok && end_ok
2134 }
2135 (Some(start), None) => {
2136 if include_start {
2137 *key >= start
2138 } else {
2139 *key > start
2140 }
2141 }
2142 (None, Some(end)) => {
2143 if include_end {
2144 *key <= end
2145 } else {
2146 *key < end
2147 }
2148 }
2149 (None, None) => true,
2150 }
2151 })
2152 .collect();
2153
2154 let mut result = Vec::new();
2155 let mut skipped = 0;
2156 let mut collected = 0;
2157
2158 let iter: Box<dyn Iterator<Item = &&Value>> = if reverse {
2160 Box::new(keys_in_range.iter().rev())
2161 } else {
2162 Box::new(keys_in_range.iter())
2163 };
2164
2165 for key in iter {
2166 if let Some(row_indices) = index_data.key_to_rows.get(*key) {
2167 for &idx in row_indices {
2168 if skipped < offset {
2170 skipped += 1;
2171 continue;
2172 }
2173 if let Some(lim) = limit {
2175 if collected >= lim {
2176 return Ok(result);
2177 }
2178 }
2179 result.push(Rc::clone(&table_data.rows[idx]));
2180 collected += 1;
2181 }
2182 }
2183 }
2184
2185 Ok(result)
2186 }
2187
2188 fn get_index_point(&self, table: &str, index: &str, key: &Value) -> ExecutionResult<Vec<Rc<Row>>> {
2189 let table_data = self
2190 .tables
2191 .get(table)
2192 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
2193
2194 let index_data = table_data.indexes.get(index).ok_or_else(|| {
2195 ExecutionError::IndexNotFound {
2196 table: table.into(),
2197 index: index.into(),
2198 }
2199 })?;
2200
2201 let result = index_data
2202 .key_to_rows
2203 .get(key)
2204 .map(|indices| indices.iter().map(|&i| Rc::clone(&table_data.rows[i])).collect())
2205 .unwrap_or_default();
2206
2207 Ok(result)
2208 }
2209
2210 fn get_column_count(&self, table: &str) -> ExecutionResult<usize> {
2211 self.tables
2212 .get(table)
2213 .map(|t| t.column_count)
2214 .ok_or_else(|| ExecutionError::TableNotFound(table.into()))
2215 }
2216}
2217
2218#[cfg(test)]
2219mod tests {
2220 use super::*;
2221 use crate::ast::JoinType;
2222 use alloc::boxed::Box;
2223 use alloc::vec;
2224
2225 fn create_test_data_source() -> InMemoryDataSource {
2226 let mut ds = InMemoryDataSource::new();
2227
2228 let users = vec![
2230 Row::new(1, vec![Value::Int64(1), Value::String("Alice".into()), Value::Int64(10)]),
2231 Row::new(2, vec![Value::Int64(2), Value::String("Bob".into()), Value::Int64(20)]),
2232 Row::new(3, vec![Value::Int64(3), Value::String("Charlie".into()), Value::Int64(10)]),
2233 ];
2234 ds.add_table("users", users, 3);
2235 ds.create_index("users", "idx_id", 0).unwrap();
2236 ds.create_index("users", "idx_dept", 2).unwrap();
2237
2238 let depts = vec![
2240 Row::new(10, vec![Value::Int64(10), Value::String("Engineering".into())]),
2241 Row::new(20, vec![Value::Int64(20), Value::String("Sales".into())]),
2242 Row::new(30, vec![Value::Int64(30), Value::String("Marketing".into())]),
2243 ];
2244 ds.add_table("departments", depts, 2);
2245 ds.create_index("departments", "idx_id", 0).unwrap();
2246
2247 ds
2248 }
2249
2250 #[test]
2251 fn test_table_scan() {
2252 let ds = create_test_data_source();
2253 let runner = PhysicalPlanRunner::new(&ds);
2254
2255 let plan = PhysicalPlan::table_scan("users");
2256 let result = runner.execute(&plan).unwrap();
2257
2258 assert_eq!(result.len(), 3);
2259 assert_eq!(result.tables(), &["users"]);
2260 }
2261
2262 #[test]
2263 fn test_index_scan() {
2264 let ds = create_test_data_source();
2265 let runner = PhysicalPlanRunner::new(&ds);
2266
2267 let plan = PhysicalPlan::IndexScan {
2268 table: "users".into(),
2269 index: "idx_id".into(),
2270 range_start: Some(Value::Int64(1)),
2271 range_end: Some(Value::Int64(2)),
2272 include_start: true,
2273 include_end: true,
2274 limit: None,
2275 offset: None,
2276 reverse: false,
2277 };
2278 let result = runner.execute(&plan).unwrap();
2279
2280 assert_eq!(result.len(), 2);
2281 }
2282
2283 #[test]
2284 fn test_index_get() {
2285 let ds = create_test_data_source();
2286 let runner = PhysicalPlanRunner::new(&ds);
2287
2288 let plan = PhysicalPlan::index_get("users", "idx_id", Value::Int64(2));
2289 let result = runner.execute(&plan).unwrap();
2290
2291 assert_eq!(result.len(), 1);
2292 assert_eq!(
2293 result.entries[0].get_field(1),
2294 Some(&Value::String("Bob".into()))
2295 );
2296 }
2297
2298 #[test]
2299 fn test_filter() {
2300 let ds = create_test_data_source();
2301 let runner = PhysicalPlanRunner::new(&ds);
2302
2303 let plan = PhysicalPlan::filter(
2305 PhysicalPlan::table_scan("users"),
2306 Expr::eq(
2307 Expr::column("users", "dept_id", 2),
2308 Expr::literal(Value::Int64(10)),
2309 ),
2310 );
2311 let result = runner.execute(&plan).unwrap();
2312
2313 assert_eq!(result.len(), 2); }
2315
2316 #[test]
2317 fn test_project() {
2318 let ds = create_test_data_source();
2319 let runner = PhysicalPlanRunner::new(&ds);
2320
2321 let plan = PhysicalPlan::project(
2323 PhysicalPlan::table_scan("users"),
2324 vec![
2325 Expr::column("users", "id", 0),
2326 Expr::column("users", "name", 1),
2327 ],
2328 );
2329 let result = runner.execute(&plan).unwrap();
2330
2331 assert_eq!(result.len(), 3);
2332 assert_eq!(result.entries[0].row.len(), 2);
2333 }
2334
2335 #[test]
2336 fn test_hash_join() {
2337 let ds = create_test_data_source();
2338 let runner = PhysicalPlanRunner::new(&ds);
2339
2340 let plan = PhysicalPlan::hash_join(
2342 PhysicalPlan::table_scan("users"),
2343 PhysicalPlan::table_scan("departments"),
2344 Expr::eq(
2345 Expr::column("users", "dept_id", 2),
2346 Expr::column("departments", "id", 0),
2347 ),
2348 JoinType::Inner,
2349 );
2350 let result = runner.execute(&plan).unwrap();
2351
2352 assert_eq!(result.len(), 3);
2354 assert_eq!(result.tables(), &["users", "departments"]);
2355 }
2356
2357 #[test]
2358 fn test_left_outer_join() {
2359 let mut ds = InMemoryDataSource::new();
2360
2361 let left = vec![
2362 Row::new(1, vec![Value::Int64(1)]),
2363 Row::new(2, vec![Value::Int64(2)]),
2364 Row::new(3, vec![Value::Int64(3)]),
2365 ];
2366 ds.add_table("left", left, 1);
2367
2368 let right = vec![Row::new(10, vec![Value::Int64(1)])];
2369 ds.add_table("right", right, 1);
2370
2371 let runner = PhysicalPlanRunner::new(&ds);
2372
2373 let plan = PhysicalPlan::hash_join(
2374 PhysicalPlan::table_scan("left"),
2375 PhysicalPlan::table_scan("right"),
2376 Expr::eq(
2377 Expr::column("left", "id", 0),
2378 Expr::column("right", "id", 0),
2379 ),
2380 JoinType::LeftOuter,
2381 );
2382 let result = runner.execute(&plan).unwrap();
2383
2384 assert_eq!(result.len(), 3);
2386 }
2387
2388 #[test]
2389 fn test_nested_loop_join_with_predicate() {
2390 let mut ds = InMemoryDataSource::new();
2391
2392 let left = vec![
2393 Row::new(1, vec![Value::Int64(10)]),
2394 Row::new(2, vec![Value::Int64(20)]),
2395 ];
2396 ds.add_table("left", left, 1);
2397
2398 let right = vec![
2399 Row::new(10, vec![Value::Int64(5)]),
2400 Row::new(11, vec![Value::Int64(15)]),
2401 Row::new(12, vec![Value::Int64(25)]),
2402 ];
2403 ds.add_table("right", right, 1);
2404
2405 let runner = PhysicalPlanRunner::new(&ds);
2406
2407 let plan = PhysicalPlan::nested_loop_join(
2409 PhysicalPlan::table_scan("left"),
2410 PhysicalPlan::table_scan("right"),
2411 Expr::gt(
2412 Expr::column("left", "value", 0),
2413 Expr::column("right", "value", 0),
2414 ),
2415 JoinType::Inner,
2416 );
2417 let result = runner.execute(&plan).unwrap();
2418
2419 assert_eq!(result.len(), 3);
2421 }
2422
2423 #[test]
2424 fn test_cross_product() {
2425 let mut ds = InMemoryDataSource::new();
2426
2427 let left = vec![
2428 Row::new(1, vec![Value::Int64(1)]),
2429 Row::new(2, vec![Value::Int64(2)]),
2430 ];
2431 ds.add_table("left", left, 1);
2432
2433 let right = vec![
2434 Row::new(10, vec![Value::String("A".into())]),
2435 Row::new(11, vec![Value::String("B".into())]),
2436 Row::new(12, vec![Value::String("C".into())]),
2437 ];
2438 ds.add_table("right", right, 1);
2439
2440 let runner = PhysicalPlanRunner::new(&ds);
2441
2442 let plan = PhysicalPlan::CrossProduct {
2443 left: Box::new(PhysicalPlan::table_scan("left")),
2444 right: Box::new(PhysicalPlan::table_scan("right")),
2445 };
2446 let result = runner.execute(&plan).unwrap();
2447
2448 assert_eq!(result.len(), 6);
2450 }
2451
2452 #[test]
2453 fn test_sort() {
2454 let ds = create_test_data_source();
2455 let runner = PhysicalPlanRunner::new(&ds);
2456
2457 let plan = PhysicalPlan::sort(
2458 PhysicalPlan::table_scan("users"),
2459 vec![(Expr::column("users", "name", 1), SortOrder::Asc)],
2460 );
2461 let result = runner.execute(&plan).unwrap();
2462
2463 assert_eq!(result.len(), 3);
2464 assert_eq!(
2466 result.entries[0].get_field(1),
2467 Some(&Value::String("Alice".into()))
2468 );
2469 assert_eq!(
2470 result.entries[1].get_field(1),
2471 Some(&Value::String("Bob".into()))
2472 );
2473 assert_eq!(
2474 result.entries[2].get_field(1),
2475 Some(&Value::String("Charlie".into()))
2476 );
2477 }
2478
2479 #[test]
2480 fn test_limit() {
2481 let ds = create_test_data_source();
2482 let runner = PhysicalPlanRunner::new(&ds);
2483
2484 let plan = PhysicalPlan::limit(PhysicalPlan::table_scan("users"), 2, 1);
2485 let result = runner.execute(&plan).unwrap();
2486
2487 assert_eq!(result.len(), 2);
2488 }
2489
2490 #[test]
2491 fn test_aggregate_count() {
2492 let ds = create_test_data_source();
2493 let runner = PhysicalPlanRunner::new(&ds);
2494
2495 let plan = PhysicalPlan::hash_aggregate(
2496 PhysicalPlan::table_scan("users"),
2497 vec![],
2498 vec![(AggregateFunc::Count, Expr::column("users", "id", 0))],
2499 );
2500 let result = runner.execute(&plan).unwrap();
2501
2502 assert_eq!(result.len(), 1);
2503 assert_eq!(result.entries[0].get_field(0), Some(&Value::Int64(3)));
2504 }
2505
2506 #[test]
2507 fn test_aggregate_group_by() {
2508 let ds = create_test_data_source();
2509 let runner = PhysicalPlanRunner::new(&ds);
2510
2511 let plan = PhysicalPlan::hash_aggregate(
2513 PhysicalPlan::table_scan("users"),
2514 vec![Expr::column("users", "dept_id", 2)],
2515 vec![(AggregateFunc::Count, Expr::column("users", "id", 0))],
2516 );
2517 let result = runner.execute(&plan).unwrap();
2518
2519 assert_eq!(result.len(), 2);
2521 }
2522
2523 #[test]
2524 fn test_complex_query() {
2525 let ds = create_test_data_source();
2526 let runner = PhysicalPlanRunner::new(&ds);
2527
2528 let plan = PhysicalPlan::limit(
2530 PhysicalPlan::sort(
2531 PhysicalPlan::project(
2532 PhysicalPlan::filter(
2533 PhysicalPlan::table_scan("users"),
2534 Expr::eq(
2535 Expr::column("users", "dept_id", 2),
2536 Expr::literal(Value::Int64(10)),
2537 ),
2538 ),
2539 vec![Expr::column("users", "name", 1)],
2540 ),
2541 vec![(Expr::column("users", "name", 0), SortOrder::Asc)],
2542 ),
2543 1,
2544 0,
2545 );
2546 let result = runner.execute(&plan).unwrap();
2547
2548 assert_eq!(result.len(), 1);
2549 assert_eq!(
2550 result.entries[0].get_field(0),
2551 Some(&Value::String("Alice".into()))
2552 );
2553 }
2554
2555 #[test]
2556 fn test_empty_result() {
2557 let ds = create_test_data_source();
2558 let runner = PhysicalPlanRunner::new(&ds);
2559
2560 let plan = PhysicalPlan::Empty;
2561 let result = runner.execute(&plan).unwrap();
2562
2563 assert!(result.is_empty());
2564 }
2565
2566 #[test]
2567 fn test_noop() {
2568 let ds = create_test_data_source();
2569 let runner = PhysicalPlanRunner::new(&ds);
2570
2571 let plan = PhysicalPlan::NoOp {
2572 input: Box::new(PhysicalPlan::table_scan("users")),
2573 };
2574 let result = runner.execute(&plan).unwrap();
2575
2576 assert_eq!(result.len(), 3);
2577 }
2578
2579 #[test]
2580 fn test_expression_evaluation() {
2581 let ds = create_test_data_source();
2582 let runner = PhysicalPlanRunner::new(&ds);
2583
2584 let plan = PhysicalPlan::project(
2586 PhysicalPlan::table_scan("users"),
2587 vec![Expr::BinaryOp {
2588 left: Box::new(Expr::column("users", "id", 0)),
2589 op: BinaryOp::Mul,
2590 right: Box::new(Expr::literal(Value::Int64(10))),
2591 }],
2592 );
2593 let result = runner.execute(&plan).unwrap();
2594
2595 assert_eq!(result.entries[0].get_field(0), Some(&Value::Int64(10)));
2596 assert_eq!(result.entries[1].get_field(0), Some(&Value::Int64(20)));
2597 assert_eq!(result.entries[2].get_field(0), Some(&Value::Int64(30)));
2598 }
2599
2600 #[test]
2601 fn test_like_pattern() {
2602 let mut ds = InMemoryDataSource::new();
2603
2604 let data = vec![
2605 Row::new(1, vec![Value::String("Alice".into())]),
2606 Row::new(2, vec![Value::String("Bob".into())]),
2607 Row::new(3, vec![Value::String("Charlie".into())]),
2608 Row::new(4, vec![Value::String("Alex".into())]),
2609 ];
2610 ds.add_table("names", data, 1);
2611
2612 let runner = PhysicalPlanRunner::new(&ds);
2613
2614 let plan = PhysicalPlan::filter(
2616 PhysicalPlan::table_scan("names"),
2617 Expr::Like {
2618 expr: Box::new(Expr::column("names", "name", 0)),
2619 pattern: "Al%".into(),
2620 },
2621 );
2622 let result = runner.execute(&plan).unwrap();
2623
2624 assert_eq!(result.len(), 2); }
2626
2627 #[test]
2628 fn test_between() {
2629 let mut ds = InMemoryDataSource::new();
2630
2631 let data = vec![
2632 Row::new(1, vec![Value::Int64(5)]),
2633 Row::new(2, vec![Value::Int64(10)]),
2634 Row::new(3, vec![Value::Int64(15)]),
2635 Row::new(4, vec![Value::Int64(20)]),
2636 ];
2637 ds.add_table("numbers", data, 1);
2638
2639 let runner = PhysicalPlanRunner::new(&ds);
2640
2641 let plan = PhysicalPlan::filter(
2643 PhysicalPlan::table_scan("numbers"),
2644 Expr::Between {
2645 expr: Box::new(Expr::column("numbers", "value", 0)),
2646 low: Box::new(Expr::literal(Value::Int64(10))),
2647 high: Box::new(Expr::literal(Value::Int64(15))),
2648 },
2649 );
2650 let result = runner.execute(&plan).unwrap();
2651
2652 assert_eq!(result.len(), 2); }
2654
2655 #[test]
2656 fn test_in_list() {
2657 let ds = create_test_data_source();
2658 let runner = PhysicalPlanRunner::new(&ds);
2659
2660 let plan = PhysicalPlan::filter(
2662 PhysicalPlan::table_scan("users"),
2663 Expr::In {
2664 expr: Box::new(Expr::column("users", "id", 0)),
2665 list: vec![
2666 Expr::literal(Value::Int64(1)),
2667 Expr::literal(Value::Int64(3)),
2668 ],
2669 },
2670 );
2671 let result = runner.execute(&plan).unwrap();
2672
2673 assert_eq!(result.len(), 2); }
2675
2676 #[test]
2677 fn test_not_in_list() {
2678 let ds = create_test_data_source();
2679 let runner = PhysicalPlanRunner::new(&ds);
2680
2681 let plan = PhysicalPlan::filter(
2683 PhysicalPlan::table_scan("users"),
2684 Expr::NotIn {
2685 expr: Box::new(Expr::column("users", "id", 0)),
2686 list: vec![
2687 Expr::literal(Value::Int64(1)),
2688 Expr::literal(Value::Int64(3)),
2689 ],
2690 },
2691 );
2692 let result = runner.execute(&plan).unwrap();
2693
2694 assert_eq!(result.len(), 1); }
2696
2697 #[test]
2698 fn test_not_between() {
2699 let ds = create_test_data_source();
2700 let runner = PhysicalPlanRunner::new(&ds);
2701
2702 let plan = PhysicalPlan::filter(
2704 PhysicalPlan::table_scan("users"),
2705 Expr::NotBetween {
2706 expr: Box::new(Expr::column("users", "dept_id", 2)),
2707 low: Box::new(Expr::literal(Value::Int64(15))),
2708 high: Box::new(Expr::literal(Value::Int64(25))),
2709 },
2710 );
2711 let result = runner.execute(&plan).unwrap();
2712
2713 assert_eq!(result.len(), 2); }
2715
2716 #[test]
2717 fn test_not_like() {
2718 let ds = create_test_data_source();
2719 let runner = PhysicalPlanRunner::new(&ds);
2720
2721 let plan = PhysicalPlan::filter(
2723 PhysicalPlan::table_scan("users"),
2724 Expr::NotLike {
2725 expr: Box::new(Expr::column("users", "name", 1)),
2726 pattern: "A%".into(),
2727 },
2728 );
2729 let result = runner.execute(&plan).unwrap();
2730
2731 assert_eq!(result.len(), 2); }
2733
2734 #[test]
2735 fn test_regex_match() {
2736 let ds = create_test_data_source();
2737 let runner = PhysicalPlanRunner::new(&ds);
2738
2739 let plan = PhysicalPlan::filter(
2741 PhysicalPlan::table_scan("users"),
2742 Expr::Match {
2743 expr: Box::new(Expr::column("users", "name", 1)),
2744 pattern: "^[AB].*".into(),
2745 },
2746 );
2747 let result = runner.execute(&plan).unwrap();
2748
2749 assert_eq!(result.len(), 2); }
2751
2752 #[test]
2753 fn test_regex_match_digit() {
2754 let mut ds = InMemoryDataSource::new();
2755 let data = vec![
2756 Row::new(1, vec![Value::String("abc123".into())]),
2757 Row::new(2, vec![Value::String("xyz".into())]),
2758 Row::new(3, vec![Value::String("test456def".into())]),
2759 ];
2760 ds.add_table("data", data, 1);
2761
2762 let runner = PhysicalPlanRunner::new(&ds);
2763
2764 let plan = PhysicalPlan::filter(
2766 PhysicalPlan::table_scan("data"),
2767 Expr::Match {
2768 expr: Box::new(Expr::column("data", "col", 0)),
2769 pattern: "\\d+".into(),
2770 },
2771 );
2772 let result = runner.execute(&plan).unwrap();
2773
2774 assert_eq!(result.len(), 2); }
2776
2777 #[test]
2778 fn test_not_regex_match() {
2779 let ds = create_test_data_source();
2780 let runner = PhysicalPlanRunner::new(&ds);
2781
2782 let plan = PhysicalPlan::filter(
2784 PhysicalPlan::table_scan("users"),
2785 Expr::NotMatch {
2786 expr: Box::new(Expr::column("users", "name", 1)),
2787 pattern: "^[AB].*".into(),
2788 },
2789 );
2790 let result = runner.execute(&plan).unwrap();
2791
2792 assert_eq!(result.len(), 1); }
2794
2795 #[test]
2796 fn test_null_handling() {
2797 let mut ds = InMemoryDataSource::new();
2798
2799 let data = vec![
2800 Row::new(1, vec![Value::Int64(1), Value::String("A".into())]),
2801 Row::new(2, vec![Value::Int64(2), Value::Null]),
2802 Row::new(3, vec![Value::Null, Value::String("C".into())]),
2803 ];
2804 ds.add_table("data", data, 2);
2805
2806 let runner = PhysicalPlanRunner::new(&ds);
2807
2808 let plan = PhysicalPlan::filter(
2810 PhysicalPlan::table_scan("data"),
2811 Expr::is_not_null(Expr::column("data", "col1", 0)),
2812 );
2813 let result = runner.execute(&plan).unwrap();
2814
2815 assert_eq!(result.len(), 2);
2816 }
2817
2818 #[test]
2819 fn test_index_scan_reverse() {
2820 let mut ds = InMemoryDataSource::new();
2821
2822 let data = vec![
2824 Row::new(1, vec![Value::Int64(10)]),
2825 Row::new(2, vec![Value::Int64(20)]),
2826 Row::new(3, vec![Value::Int64(30)]),
2827 Row::new(4, vec![Value::Int64(40)]),
2828 Row::new(5, vec![Value::Int64(50)]),
2829 ];
2830 ds.add_table("scores", data, 1);
2831 ds.create_index("scores", "idx_score", 0).unwrap();
2832
2833 let runner = PhysicalPlanRunner::new(&ds);
2834
2835 let plan_forward = PhysicalPlan::IndexScan {
2837 table: "scores".into(),
2838 index: "idx_score".into(),
2839 range_start: None,
2840 range_end: None,
2841 include_start: true,
2842 include_end: true,
2843 limit: Some(3),
2844 offset: None,
2845 reverse: false,
2846 };
2847 let result_forward = runner.execute(&plan_forward).unwrap();
2848 assert_eq!(result_forward.len(), 3);
2849 assert_eq!(result_forward.entries[0].get_field(0), Some(&Value::Int64(10)));
2850 assert_eq!(result_forward.entries[1].get_field(0), Some(&Value::Int64(20)));
2851 assert_eq!(result_forward.entries[2].get_field(0), Some(&Value::Int64(30)));
2852
2853 let plan_reverse = PhysicalPlan::IndexScan {
2855 table: "scores".into(),
2856 index: "idx_score".into(),
2857 range_start: None,
2858 range_end: None,
2859 include_start: true,
2860 include_end: true,
2861 limit: Some(3),
2862 offset: None,
2863 reverse: true,
2864 };
2865 let result_reverse = runner.execute(&plan_reverse).unwrap();
2866 assert_eq!(result_reverse.len(), 3);
2867 assert_eq!(result_reverse.entries[0].get_field(0), Some(&Value::Int64(50)));
2868 assert_eq!(result_reverse.entries[1].get_field(0), Some(&Value::Int64(40)));
2869 assert_eq!(result_reverse.entries[2].get_field(0), Some(&Value::Int64(30)));
2870 }
2871
2872 #[test]
2873 fn test_index_scan_reverse_with_offset() {
2874 let mut ds = InMemoryDataSource::new();
2875
2876 let data = vec![
2878 Row::new(1, vec![Value::Int64(10)]),
2879 Row::new(2, vec![Value::Int64(20)]),
2880 Row::new(3, vec![Value::Int64(30)]),
2881 Row::new(4, vec![Value::Int64(40)]),
2882 Row::new(5, vec![Value::Int64(50)]),
2883 ];
2884 ds.add_table("scores", data, 1);
2885 ds.create_index("scores", "idx_score", 0).unwrap();
2886
2887 let runner = PhysicalPlanRunner::new(&ds);
2888
2889 let plan = PhysicalPlan::IndexScan {
2891 table: "scores".into(),
2892 index: "idx_score".into(),
2893 range_start: None,
2894 range_end: None,
2895 include_start: true,
2896 include_end: true,
2897 limit: Some(2),
2898 offset: Some(1),
2899 reverse: true,
2900 };
2901 let result = runner.execute(&plan).unwrap();
2902 assert_eq!(result.len(), 2);
2903 assert_eq!(result.entries[0].get_field(0), Some(&Value::Int64(40)));
2904 assert_eq!(result.entries[1].get_field(0), Some(&Value::Int64(30)));
2905 }
2906}