Skip to main content

cynos_query/executor/
runner.rs

1//! Physical plan runner - unified execution framework for physical query plans.
2//!
3//! This module provides the `PhysicalPlanRunner` which executes physical query plans
4//! by recursively evaluating plan nodes and combining results using the appropriate
5//! execution operators.
6
7use crate::ast::{AggregateFunc, BinaryOp, ColumnRef, Expr, SortOrder, UnaryOp};
8use crate::executor::{
9    AggregateExecutor, HashJoin, LimitExecutor, Relation, RelationEntry,
10    SharedTables, SortExecutor, SortMergeJoin,
11};
12use crate::planner::PhysicalPlan;
13use alloc::boxed::Box;
14use alloc::collections::BTreeMap;
15use alloc::rc::Rc;
16use alloc::string::String;
17use alloc::vec;
18use alloc::vec::Vec;
19use cynos_core::{Row, Value};
20use cynos_jsonb::{JsonbObject, JsonbValue, JsonPath};
21
22/// Context for expression evaluation in JOIN queries.
23/// Contains table metadata needed to compute correct column indices at runtime.
24#[derive(Clone, Debug)]
25pub struct EvalContext<'a> {
26    /// Table names in the relation (in order).
27    pub tables: &'a [String],
28    /// Column counts for each table (used to compute offsets).
29    pub table_column_counts: &'a [usize],
30}
31
32impl<'a> EvalContext<'a> {
33    /// Creates a new evaluation context.
34    #[inline]
35    pub fn new(tables: &'a [String], table_column_counts: &'a [usize]) -> Self {
36        Self { tables, table_column_counts }
37    }
38
39    /// Computes the actual column index in the combined row based on table name and table-relative index.
40    #[inline]
41    pub fn resolve_column_index(&self, table_name: &str, table_relative_index: usize) -> usize {
42        let mut offset = 0;
43        for (i, t) in self.tables.iter().enumerate() {
44            if t == table_name {
45                return offset + table_relative_index;
46            }
47            offset += self.table_column_counts.get(i).copied().unwrap_or(0);
48        }
49        // Fallback: if table not found, use the original index
50        // This can happen for single-table queries or after projection
51        table_relative_index
52    }
53}
54
55/// Error type for plan execution.
56#[derive(Clone, Debug)]
57pub enum ExecutionError {
58    /// Table not found in data source.
59    TableNotFound(String),
60    /// Index not found.
61    IndexNotFound { table: String, index: String },
62    /// Column not found.
63    ColumnNotFound { table: String, column: String },
64    /// Type mismatch during evaluation.
65    TypeMismatch(String),
66    /// Invalid operation.
67    InvalidOperation(String),
68}
69
70impl core::fmt::Display for ExecutionError {
71    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
72        match self {
73            ExecutionError::TableNotFound(t) => write!(f, "Table not found: {}", t),
74            ExecutionError::IndexNotFound { table, index } => {
75                write!(f, "Index {} not found on table {}", index, table)
76            }
77            ExecutionError::ColumnNotFound { table, column } => {
78                write!(f, "Column {}.{} not found", table, column)
79            }
80            ExecutionError::TypeMismatch(msg) => write!(f, "Type mismatch: {}", msg),
81            ExecutionError::InvalidOperation(msg) => write!(f, "Invalid operation: {}", msg),
82        }
83    }
84}
85
86/// Regex operation for lightweight regex matching.
87#[derive(Clone, Debug, PartialEq)]
88enum RegexOp {
89    Start,
90    End,
91    Char(char),
92    Any,
93    Digit,
94    NonDigit,
95    Word,
96    NonWord,
97    Whitespace,
98    NonWhitespace,
99    CharClass(Vec<CharClassItem>),
100    NegCharClass(Vec<CharClassItem>),
101    Star(Box<RegexOp>),
102    Plus(Box<RegexOp>),
103    Question(Box<RegexOp>),
104    GroupStart,
105    GroupEnd,
106    GroupStar,
107    GroupPlus,
108    GroupQuestion,
109    Alternation,
110}
111
112/// Character class item for regex.
113#[derive(Clone, Debug, PartialEq)]
114enum CharClassItem {
115    Char(char),
116    Range(char, char),
117}
118
119/// Result type for plan execution.
120pub type ExecutionResult<T> = Result<T, ExecutionError>;
121
122/// Data source trait for providing table and index data.
123///
124/// Implementations of this trait provide access to table rows and index lookups
125/// for the physical plan runner.
126pub trait DataSource {
127    /// Returns all rows from a table.
128    fn get_table_rows(&self, table: &str) -> ExecutionResult<Vec<Rc<Row>>>;
129
130    /// Returns rows from an index scan with a key range.
131    fn get_index_range(
132        &self,
133        table: &str,
134        index: &str,
135        range_start: Option<&Value>,
136        range_end: Option<&Value>,
137        include_start: bool,
138        include_end: bool,
139    ) -> ExecutionResult<Vec<Rc<Row>>> {
140        // Default implementation: call with no limit
141        self.get_index_range_with_limit(
142            table,
143            index,
144            range_start,
145            range_end,
146            include_start,
147            include_end,
148            None,
149            0,
150            false,
151        )
152    }
153
154    /// Returns rows from an index scan with a key range, limit, offset, and reverse option.
155    /// This enables true pushdown of LIMIT to the storage layer.
156    fn get_index_range_with_limit(
157        &self,
158        table: &str,
159        index: &str,
160        range_start: Option<&Value>,
161        range_end: Option<&Value>,
162        include_start: bool,
163        include_end: bool,
164        limit: Option<usize>,
165        offset: usize,
166        reverse: bool,
167    ) -> ExecutionResult<Vec<Rc<Row>>>;
168
169    /// Returns rows from an index point lookup.
170    fn get_index_point(&self, table: &str, index: &str, key: &Value) -> ExecutionResult<Vec<Rc<Row>>>;
171
172    /// Returns rows from an index point lookup with limit.
173    fn get_index_point_with_limit(
174        &self,
175        table: &str,
176        index: &str,
177        key: &Value,
178        limit: Option<usize>,
179    ) -> ExecutionResult<Vec<Rc<Row>>> {
180        // Default implementation: get all rows and apply limit in memory
181        let rows = self.get_index_point(table, index, key)?;
182        Ok(if let Some(limit) = limit {
183            rows.into_iter().take(limit).collect()
184        } else {
185            rows
186        })
187    }
188
189    /// Returns the column count for a table.
190    fn get_column_count(&self, table: &str) -> ExecutionResult<usize>;
191
192    /// Returns rows from a GIN index lookup by key-value pair.
193    /// Used for JSONB path equality queries like `$.category = 'Electronics'`.
194    fn get_gin_index_rows(
195        &self,
196        table: &str,
197        index: &str,
198        key: &str,
199        value: &str,
200    ) -> ExecutionResult<Vec<Rc<Row>>> {
201        // Default implementation: fall back to table scan (no GIN index support)
202        let _ = (index, key, value);
203        self.get_table_rows(table)
204    }
205
206    /// Returns rows from a GIN index lookup by key existence.
207    /// Used for JSONB key existence queries.
208    fn get_gin_index_rows_by_key(
209        &self,
210        table: &str,
211        index: &str,
212        key: &str,
213    ) -> ExecutionResult<Vec<Rc<Row>>> {
214        // Default implementation: fall back to table scan (no GIN index support)
215        let _ = (index, key);
216        self.get_table_rows(table)
217    }
218
219    /// Returns rows from a GIN index lookup by multiple key-value pairs (AND query).
220    /// Used for combined JSONB path equality queries like `$.category = 'A' AND $.status = 'active'`.
221    fn get_gin_index_rows_multi(
222        &self,
223        table: &str,
224        index: &str,
225        pairs: &[(&str, &str)],
226    ) -> ExecutionResult<Vec<Rc<Row>>> {
227        // Default implementation: fall back to table scan (no GIN index support)
228        let _ = (index, pairs);
229        self.get_table_rows(table)
230    }
231}
232
233/// Physical plan runner - executes physical query plans.
234///
235/// The runner recursively evaluates plan nodes, executing each operator
236/// and combining results according to the plan structure.
237pub struct PhysicalPlanRunner<'a, D: DataSource> {
238    data_source: &'a D,
239}
240
241impl<'a, D: DataSource> PhysicalPlanRunner<'a, D> {
242    /// Creates a new physical plan runner with the given data source.
243    pub fn new(data_source: &'a D) -> Self {
244        Self { data_source }
245    }
246
247    /// Executes a physical plan and returns the result relation.
248    pub fn execute(&self, plan: &PhysicalPlan) -> ExecutionResult<Relation> {
249        match plan {
250            PhysicalPlan::TableScan { table } => self.execute_table_scan(table),
251
252            PhysicalPlan::IndexScan {
253                table,
254                index,
255                range_start,
256                range_end,
257                include_start,
258                include_end,
259                limit,
260                offset,
261                reverse,
262            } => self.execute_index_scan(
263                table,
264                index,
265                range_start.as_ref(),
266                range_end.as_ref(),
267                *include_start,
268                *include_end,
269                *limit,
270                *offset,
271                *reverse,
272            ),
273
274            PhysicalPlan::IndexGet { table, index, key, limit } => {
275                self.execute_index_get(table, index, key, *limit)
276            }
277
278            PhysicalPlan::IndexInGet { table, index, keys } => {
279                self.execute_index_in_get(table, index, keys)
280            }
281
282            PhysicalPlan::GinIndexScan {
283                table,
284                index,
285                key,
286                value,
287                query_type,
288            } => self.execute_gin_index_scan(table, index, key, value.as_deref(), query_type),
289
290            PhysicalPlan::GinIndexScanMulti {
291                table,
292                index,
293                pairs,
294            } => self.execute_gin_index_scan_multi(table, index, pairs),
295
296            PhysicalPlan::Filter { input, predicate } => {
297                let input_rel = self.execute(input)?;
298                self.execute_filter(input_rel, predicate)
299            }
300
301            PhysicalPlan::Project { input, columns } => {
302                let input_rel = self.execute(input)?;
303                self.execute_project(input_rel, columns)
304            }
305
306            PhysicalPlan::HashJoin {
307                left,
308                right,
309                condition,
310                join_type,
311            } => {
312                let left_rel = self.execute(left)?;
313                let right_rel = self.execute(right)?;
314                self.execute_hash_join(left_rel, right_rel, condition, *join_type)
315            }
316
317            PhysicalPlan::SortMergeJoin {
318                left,
319                right,
320                condition,
321                join_type,
322            } => {
323                let left_rel = self.execute(left)?;
324                let right_rel = self.execute(right)?;
325                self.execute_sort_merge_join(left_rel, right_rel, condition, *join_type)
326            }
327
328            PhysicalPlan::NestedLoopJoin {
329                left,
330                right,
331                condition,
332                join_type,
333            } => {
334                let left_rel = self.execute(left)?;
335                let right_rel = self.execute(right)?;
336                self.execute_nested_loop_join(left_rel, right_rel, condition, *join_type)
337            }
338
339            PhysicalPlan::IndexNestedLoopJoin {
340                outer,
341                inner_table,
342                inner_index,
343                condition,
344                join_type,
345            } => {
346                let outer_rel = self.execute(outer)?;
347                self.execute_index_nested_loop_join(
348                    outer_rel,
349                    inner_table,
350                    inner_index,
351                    condition,
352                    *join_type,
353                )
354            }
355
356            PhysicalPlan::HashAggregate {
357                input,
358                group_by,
359                aggregates,
360            } => {
361                let input_rel = self.execute(input)?;
362                self.execute_hash_aggregate(input_rel, group_by, aggregates)
363            }
364
365            PhysicalPlan::Sort { input, order_by } => {
366                let input_rel = self.execute(input)?;
367                self.execute_sort(input_rel, order_by)
368            }
369
370            PhysicalPlan::Limit {
371                input,
372                limit,
373                offset,
374            } => {
375                let input_rel = self.execute(input)?;
376                self.execute_limit(input_rel, *limit, *offset)
377            }
378
379            PhysicalPlan::CrossProduct { left, right } => {
380                let left_rel = self.execute(left)?;
381                let right_rel = self.execute(right)?;
382                self.execute_cross_product(left_rel, right_rel)
383            }
384
385            PhysicalPlan::NoOp { input } => self.execute(input),
386
387            PhysicalPlan::TopN {
388                input,
389                order_by,
390                limit,
391                offset,
392            } => {
393                // TopN is executed as Sort + Limit for now
394                // A more efficient implementation would use a heap
395                let input_rel = self.execute(input)?;
396                let sorted = self.execute_sort(input_rel, order_by)?;
397                self.execute_limit(sorted, *limit, *offset)
398            }
399
400            PhysicalPlan::Empty => Ok(Relation::empty()),
401        }
402    }
403
404    // ========== Scan Operations ==========
405
406    fn execute_table_scan(&self, table: &str) -> ExecutionResult<Relation> {
407        let rows = self.data_source.get_table_rows(table)?;
408        let column_count = self.data_source.get_column_count(table)?;
409        Ok(Relation::from_rows_with_column_count(rows, alloc::vec![table.into()], column_count))
410    }
411
412    fn execute_index_scan(
413        &self,
414        table: &str,
415        index: &str,
416        range_start: Option<&Value>,
417        range_end: Option<&Value>,
418        include_start: bool,
419        include_end: bool,
420        limit: Option<usize>,
421        offset: Option<usize>,
422        reverse: bool,
423    ) -> ExecutionResult<Relation> {
424        // Push limit, offset, and reverse down to storage layer for early termination
425        let rows = self.data_source.get_index_range_with_limit(
426            table,
427            index,
428            range_start,
429            range_end,
430            include_start,
431            include_end,
432            limit,
433            offset.unwrap_or(0),
434            reverse,
435        )?;
436        let column_count = self.data_source.get_column_count(table)?;
437        Ok(Relation::from_rows_with_column_count(rows, alloc::vec![table.into()], column_count))
438    }
439
440    fn execute_index_get(
441        &self,
442        table: &str,
443        index: &str,
444        key: &Value,
445        limit: Option<usize>,
446    ) -> ExecutionResult<Relation> {
447        let rows = self.data_source.get_index_point_with_limit(table, index, key, limit)?;
448        let column_count = self.data_source.get_column_count(table)?;
449        Ok(Relation::from_rows_with_column_count(rows, alloc::vec![table.into()], column_count))
450    }
451
452    /// Executes an index multi-point lookup (for IN queries).
453    /// Performs multiple index lookups and unions the results.
454    fn execute_index_in_get(
455        &self,
456        table: &str,
457        index: &str,
458        keys: &[Value],
459    ) -> ExecutionResult<Relation> {
460        let mut all_rows = Vec::new();
461        let mut seen_ids = alloc::collections::BTreeSet::new();
462
463        // Perform index lookup for each key and collect unique rows
464        for key in keys {
465            let rows = self.data_source.get_index_point(table, index, key)?;
466            for row in rows {
467                // Deduplicate by row id (in case of non-unique index)
468                if seen_ids.insert(row.id()) {
469                    all_rows.push(row);
470                }
471            }
472        }
473
474        let column_count = self.data_source.get_column_count(table)?;
475        Ok(Relation::from_rows_with_column_count(all_rows, alloc::vec![table.into()], column_count))
476    }
477
478    fn execute_gin_index_scan(
479        &self,
480        table: &str,
481        index: &str,
482        key: &str,
483        value: Option<&str>,
484        query_type: &str,
485    ) -> ExecutionResult<Relation> {
486        let rows = match (query_type, value) {
487            ("eq", Some(v)) => self.data_source.get_gin_index_rows(table, index, key, v)?,
488            ("contains", _) | ("exists", _) => {
489                self.data_source.get_gin_index_rows_by_key(table, index, key)?
490            }
491            _ => self.data_source.get_table_rows(table)?,
492        };
493        let column_count = self.data_source.get_column_count(table)?;
494        Ok(Relation::from_rows_with_column_count(rows, alloc::vec![table.into()], column_count))
495    }
496
497    fn execute_gin_index_scan_multi(
498        &self,
499        table: &str,
500        index: &str,
501        pairs: &[(String, String)],
502    ) -> ExecutionResult<Relation> {
503        // Convert to slice of references for the DataSource trait
504        let pair_refs: Vec<(&str, &str)> = pairs
505            .iter()
506            .map(|(k, v)| (k.as_str(), v.as_str()))
507            .collect();
508        let rows = self.data_source.get_gin_index_rows_multi(table, index, &pair_refs)?;
509        let column_count = self.data_source.get_column_count(table)?;
510        Ok(Relation::from_rows_with_column_count(rows, alloc::vec![table.into()], column_count))
511    }
512
513    // ========== Filter Operation ==========
514
515    fn execute_filter(&self, input: Relation, predicate: &Expr) -> ExecutionResult<Relation> {
516        let tables = input.tables().to_vec();
517        let table_column_counts = input.table_column_counts().to_vec();
518        let ctx = EvalContext::new(&tables, &table_column_counts);
519
520        let entries: Vec<RelationEntry> = input
521            .into_iter()
522            .filter(|entry| self.eval_predicate_ctx(predicate, entry, &ctx))
523            .collect();
524
525        Ok(Relation { entries, tables, table_column_counts })
526    }
527
528    // ========== Project Operation ==========
529
530    fn execute_project(&self, input: Relation, columns: &[Expr]) -> ExecutionResult<Relation> {
531        let tables = input.tables().to_vec();
532        let table_column_counts = input.table_column_counts().to_vec();
533        let shared_tables: SharedTables = tables.clone().into();
534        let ctx = EvalContext::new(&tables, &table_column_counts);
535
536        let entries: Vec<RelationEntry> = input
537            .into_iter()
538            .map(|entry| {
539                let values: Vec<Value> = columns
540                    .iter()
541                    .map(|col| self.eval_expr_ctx(col, &entry, Some(&ctx)))
542                    .collect();
543                RelationEntry::new_combined(Rc::new(Row::new(entry.id(), values)), shared_tables.clone())
544            })
545            .collect();
546
547        // After projection, the result has a single "virtual" table with the projected columns
548        Ok(Relation {
549            entries,
550            tables,
551            table_column_counts: alloc::vec![columns.len()],
552        })
553    }
554
555    // ========== Join Operations ==========
556
557    fn execute_hash_join(
558        &self,
559        left: Relation,
560        right: Relation,
561        condition: &Expr,
562        join_type: crate::ast::JoinType,
563    ) -> ExecutionResult<Relation> {
564        // Extract join key indices from condition
565        let (left_key_idx, right_key_idx) = self.extract_join_keys(condition, &left, &right)?;
566
567        let is_outer = matches!(
568            join_type,
569            crate::ast::JoinType::LeftOuter | crate::ast::JoinType::FullOuter
570        );
571
572        let join = HashJoin::new(left_key_idx, right_key_idx, is_outer);
573        Ok(join.execute(left, right))
574    }
575
576    fn execute_sort_merge_join(
577        &self,
578        left: Relation,
579        right: Relation,
580        condition: &Expr,
581        join_type: crate::ast::JoinType,
582    ) -> ExecutionResult<Relation> {
583        let (left_key_idx, right_key_idx) = self.extract_join_keys(condition, &left, &right)?;
584        let is_outer = matches!(
585            join_type,
586            crate::ast::JoinType::LeftOuter | crate::ast::JoinType::FullOuter
587        );
588
589        let join = SortMergeJoin::new(left_key_idx, right_key_idx, is_outer);
590        Ok(join.execute_with_sort(left, right))
591    }
592
593    fn execute_nested_loop_join(
594        &self,
595        left: Relation,
596        right: Relation,
597        condition: &Expr,
598        join_type: crate::ast::JoinType,
599    ) -> ExecutionResult<Relation> {
600        let is_outer = matches!(
601            join_type,
602            crate::ast::JoinType::LeftOuter | crate::ast::JoinType::FullOuter
603        );
604
605        // For nested loop join, we use predicate-based evaluation
606        let mut result_entries = Vec::new();
607        let left_tables = left.tables().to_vec();
608        let right_tables = right.tables().to_vec();
609        let left_column_counts = left.table_column_counts().to_vec();
610        let right_column_counts = right.table_column_counts().to_vec();
611
612        // Get left column count for adjusting right table column indices
613        let left_col_count = left
614            .entries
615            .first()
616            .map(|e| e.row.len())
617            .unwrap_or(0);
618        let right_col_count = right
619            .entries
620            .first()
621            .map(|e| e.row.len())
622            .unwrap_or(0);
623
624        // Adjust condition to use combined row indices
625        // Right table columns need to be offset by left table column count
626        let adjusted_condition = self.adjust_join_condition_indices(condition, &left_tables, &right_tables, left_col_count);
627
628        for left_entry in left.iter() {
629            let mut matched = false;
630
631            for right_entry in right.iter() {
632                // Create a combined entry for predicate evaluation
633                let combined = RelationEntry::combine(
634                    left_entry,
635                    &left_tables,
636                    right_entry,
637                    &right_tables,
638                );
639
640                if self.eval_predicate(&adjusted_condition, &combined) {
641                    matched = true;
642                    result_entries.push(combined);
643                }
644            }
645
646            if is_outer && !matched {
647                let combined = RelationEntry::combine_with_null(
648                    left_entry,
649                    &left_tables,
650                    right_col_count,
651                    &right_tables,
652                );
653                result_entries.push(combined);
654            }
655        }
656
657        let mut tables = left_tables;
658        tables.extend(right_tables);
659
660        // Compute combined table column counts
661        let mut table_column_counts = left_column_counts;
662        table_column_counts.extend(right_column_counts);
663
664        Ok(Relation {
665            entries: result_entries,
666            tables,
667            table_column_counts,
668        })
669    }
670
671    fn execute_index_nested_loop_join(
672        &self,
673        outer: Relation,
674        inner_table: &str,
675        inner_index: &str,
676        condition: &Expr,
677        join_type: crate::ast::JoinType,
678    ) -> ExecutionResult<Relation> {
679        let is_outer = matches!(
680            join_type,
681            crate::ast::JoinType::LeftOuter | crate::ast::JoinType::FullOuter
682        );
683
684        // Extract the outer key column index from condition
685        let outer_key_idx = self.extract_outer_key_index(condition, &outer)?;
686        let inner_col_count = self.data_source.get_column_count(inner_table)?;
687
688        let mut result_entries = Vec::new();
689        let outer_tables = outer.tables().to_vec();
690        let outer_column_counts = outer.table_column_counts().to_vec();
691        let inner_tables = alloc::vec![inner_table.into()];
692
693        for outer_entry in outer.iter() {
694            let key_value = outer_entry.get_field(outer_key_idx);
695
696            let inner_rows = if let Some(key) = key_value {
697                if !key.is_null() {
698                    self.data_source
699                        .get_index_point(inner_table, inner_index, key)?
700                } else {
701                    Vec::new()
702                }
703            } else {
704                Vec::new()
705            };
706
707            if inner_rows.is_empty() {
708                if is_outer {
709                    let combined = RelationEntry::combine_with_null(
710                        outer_entry,
711                        &outer_tables,
712                        inner_col_count,
713                        &inner_tables,
714                    );
715                    result_entries.push(combined);
716                }
717            } else {
718                for inner_row in inner_rows {
719                    let inner_entry = RelationEntry::from_row(inner_row, inner_table);
720                    let combined = RelationEntry::combine(
721                        outer_entry,
722                        &outer_tables,
723                        &inner_entry,
724                        &inner_tables,
725                    );
726                    result_entries.push(combined);
727                }
728            }
729        }
730
731        let mut tables = outer_tables;
732        tables.extend(inner_tables);
733
734        // Compute combined table column counts
735        let mut table_column_counts = outer_column_counts;
736        table_column_counts.push(inner_col_count);
737
738        Ok(Relation {
739            entries: result_entries,
740            tables,
741            table_column_counts,
742        })
743    }
744
745    fn execute_cross_product(&self, left: Relation, right: Relation) -> ExecutionResult<Relation> {
746        let mut result_entries = Vec::new();
747        let left_tables = left.tables().to_vec();
748        let right_tables = right.tables().to_vec();
749        let left_column_counts = left.table_column_counts().to_vec();
750        let right_column_counts = right.table_column_counts().to_vec();
751
752        for left_entry in left.iter() {
753            for right_entry in right.iter() {
754                let combined = RelationEntry::combine(
755                    left_entry,
756                    &left_tables,
757                    right_entry,
758                    &right_tables,
759                );
760                result_entries.push(combined);
761            }
762        }
763
764        let mut tables = left_tables;
765        tables.extend(right_tables);
766
767        // Compute combined table column counts
768        let mut table_column_counts = left_column_counts;
769        table_column_counts.extend(right_column_counts);
770
771        Ok(Relation {
772            entries: result_entries,
773            tables,
774            table_column_counts,
775        })
776    }
777
778    // ========== Aggregate Operation ==========
779
780    fn execute_hash_aggregate(
781        &self,
782        input: Relation,
783        group_by: &[Expr],
784        aggregates: &[(AggregateFunc, Expr)],
785    ) -> ExecutionResult<Relation> {
786        // Convert Expr group_by to column indices
787        let group_by_indices: Vec<usize> = group_by
788            .iter()
789            .filter_map(|expr| {
790                if let Expr::Column(col) = expr {
791                    Some(col.index)
792                } else {
793                    None
794                }
795            })
796            .collect();
797
798        // Convert aggregates to (func, Option<column_index>)
799        let agg_specs: Vec<(AggregateFunc, Option<usize>)> = aggregates
800            .iter()
801            .map(|(func, expr)| {
802                let col_idx = if let Expr::Column(col) = expr {
803                    Some(col.index)
804                } else {
805                    None
806                };
807                (*func, col_idx)
808            })
809            .collect();
810
811        let executor = AggregateExecutor::new(group_by_indices, agg_specs);
812        Ok(executor.execute(input))
813    }
814
815    // ========== Sort Operation ==========
816
817    fn execute_sort(
818        &self,
819        input: Relation,
820        order_by: &[(Expr, SortOrder)],
821    ) -> ExecutionResult<Relation> {
822        let tables = input.tables().to_vec();
823        let table_column_counts = input.table_column_counts().to_vec();
824        let ctx = EvalContext::new(&tables, &table_column_counts);
825
826        // Convert Expr order_by to column indices using dynamic computation
827        let order_by_indices: Vec<(usize, SortOrder)> = order_by
828            .iter()
829            .filter_map(|(expr, order)| {
830                if let Expr::Column(col) = expr {
831                    let actual_index = ctx.resolve_column_index(&col.table, col.index);
832                    Some((actual_index, *order))
833                } else {
834                    None
835                }
836            })
837            .collect();
838
839        let executor = SortExecutor::new(order_by_indices);
840        Ok(executor.execute(input))
841    }
842
843    // ========== Limit Operation ==========
844
845    fn execute_limit(
846        &self,
847        input: Relation,
848        limit: usize,
849        offset: usize,
850    ) -> ExecutionResult<Relation> {
851        let executor = LimitExecutor::new(limit, offset);
852        Ok(executor.execute(input))
853    }
854
855    // ========== Expression Evaluation ==========
856
857    /// Evaluates an expression against a relation entry.
858    /// If `ctx` is provided, column indices are dynamically computed based on table metadata.
859    /// This is needed for JOIN queries where the optimizer may have reordered tables.
860    fn eval_expr_ctx(&self, expr: &Expr, entry: &RelationEntry, ctx: Option<&EvalContext<'_>>) -> Value {
861        match expr {
862            Expr::Column(col) => {
863                let index = if let Some(c) = ctx {
864                    c.resolve_column_index(&col.table, col.index)
865                } else {
866                    col.index
867                };
868                entry.get_field(index).cloned().unwrap_or(Value::Null)
869            }
870
871            Expr::Literal(value) => value.clone(),
872
873            Expr::BinaryOp { left, op, right } => {
874                let left_val = self.eval_expr_ctx(left, entry, ctx);
875                let right_val = self.eval_expr_ctx(right, entry, ctx);
876                self.eval_binary_op(*op, &left_val, &right_val)
877            }
878
879            Expr::UnaryOp { op, expr } => {
880                let val = self.eval_expr_ctx(expr, entry, ctx);
881                self.eval_unary_op(*op, &val)
882            }
883
884            Expr::Aggregate { expr, .. } => {
885                if let Some(e) = expr {
886                    self.eval_expr_ctx(e, entry, ctx)
887                } else {
888                    Value::Int64(1) // COUNT(*)
889                }
890            }
891
892            Expr::Between { expr, low, high } => {
893                let val = self.eval_expr_ctx(expr, entry, ctx);
894                let low_val = self.eval_expr_ctx(low, entry, ctx);
895                let high_val = self.eval_expr_ctx(high, entry, ctx);
896                Value::Boolean(val >= low_val && val <= high_val)
897            }
898
899            Expr::NotBetween { expr, low, high } => {
900                let val = self.eval_expr_ctx(expr, entry, ctx);
901                let low_val = self.eval_expr_ctx(low, entry, ctx);
902                let high_val = self.eval_expr_ctx(high, entry, ctx);
903                Value::Boolean(val < low_val || val > high_val)
904            }
905
906            Expr::In { expr, list } => {
907                let val = self.eval_expr_ctx(expr, entry, ctx);
908                let in_list = list.iter().any(|item| self.eval_expr_ctx(item, entry, ctx) == val);
909                Value::Boolean(in_list)
910            }
911
912            Expr::NotIn { expr, list } => {
913                let val = self.eval_expr_ctx(expr, entry, ctx);
914                let in_list = list.iter().any(|item| self.eval_expr_ctx(item, entry, ctx) == val);
915                Value::Boolean(!in_list)
916            }
917
918            Expr::Like { expr, pattern } => {
919                let val = self.eval_expr_ctx(expr, entry, ctx);
920                if let Value::String(s) = val {
921                    Value::Boolean(self.match_like_pattern(&s, pattern))
922                } else {
923                    Value::Boolean(false)
924                }
925            }
926
927            Expr::NotLike { expr, pattern } => {
928                let val = self.eval_expr_ctx(expr, entry, ctx);
929                if let Value::String(s) = val {
930                    Value::Boolean(!self.match_like_pattern(&s, pattern))
931                } else {
932                    Value::Boolean(true)
933                }
934            }
935
936            Expr::Match { expr, pattern } => {
937                let val = self.eval_expr_ctx(expr, entry, ctx);
938                if let Value::String(s) = val {
939                    Value::Boolean(self.match_regex_pattern(&s, pattern))
940                } else {
941                    Value::Boolean(false)
942                }
943            }
944
945            Expr::NotMatch { expr, pattern } => {
946                let val = self.eval_expr_ctx(expr, entry, ctx);
947                if let Value::String(s) = val {
948                    Value::Boolean(!self.match_regex_pattern(&s, pattern))
949                } else {
950                    Value::Boolean(true)
951                }
952            }
953
954            Expr::Function { name, args } => {
955                let arg_values: Vec<Value> = args.iter().map(|a| self.eval_expr_ctx(a, entry, ctx)).collect();
956                self.eval_function(name, &arg_values)
957            }
958        }
959    }
960
961    /// Evaluates an expression against a relation entry (simple version without context).
962    #[inline]
963    fn eval_expr(&self, expr: &Expr, entry: &RelationEntry) -> Value {
964        self.eval_expr_ctx(expr, entry, None)
965    }
966
967    /// Evaluates a predicate expression against a relation entry.
968    #[inline]
969    fn eval_predicate(&self, expr: &Expr, entry: &RelationEntry) -> bool {
970        match self.eval_expr(expr, entry) {
971            Value::Boolean(b) => b,
972            Value::Null => false,
973            _ => false,
974        }
975    }
976
977    /// Evaluates a predicate expression with context for JOIN queries.
978    #[inline]
979    fn eval_predicate_ctx(&self, expr: &Expr, entry: &RelationEntry, ctx: &EvalContext<'_>) -> bool {
980        match self.eval_expr_ctx(expr, entry, Some(ctx)) {
981            Value::Boolean(b) => b,
982            Value::Null => false,
983            _ => false,
984        }
985    }
986
987    fn eval_binary_op(&self, op: BinaryOp, left: &Value, right: &Value) -> Value {
988        // Handle NULL propagation
989        if left.is_null() || right.is_null() {
990            return match op {
991                BinaryOp::And => {
992                    // NULL AND FALSE = FALSE, NULL AND TRUE = NULL
993                    if let Value::Boolean(false) = left {
994                        return Value::Boolean(false);
995                    }
996                    if let Value::Boolean(false) = right {
997                        return Value::Boolean(false);
998                    }
999                    Value::Null
1000                }
1001                BinaryOp::Or => {
1002                    // NULL OR TRUE = TRUE, NULL OR FALSE = NULL
1003                    if let Value::Boolean(true) = left {
1004                        return Value::Boolean(true);
1005                    }
1006                    if let Value::Boolean(true) = right {
1007                        return Value::Boolean(true);
1008                    }
1009                    Value::Null
1010                }
1011                _ => Value::Null,
1012            };
1013        }
1014
1015        match op {
1016            BinaryOp::Eq => Value::Boolean(left == right),
1017            BinaryOp::Ne => Value::Boolean(left != right),
1018            BinaryOp::Lt => Value::Boolean(left < right),
1019            BinaryOp::Le => Value::Boolean(left <= right),
1020            BinaryOp::Gt => Value::Boolean(left > right),
1021            BinaryOp::Ge => Value::Boolean(left >= right),
1022            BinaryOp::And => {
1023                let l = matches!(left, Value::Boolean(true));
1024                let r = matches!(right, Value::Boolean(true));
1025                Value::Boolean(l && r)
1026            }
1027            BinaryOp::Or => {
1028                let l = matches!(left, Value::Boolean(true));
1029                let r = matches!(right, Value::Boolean(true));
1030                Value::Boolean(l || r)
1031            }
1032            BinaryOp::Add => self.eval_arithmetic(left, right, |a, b| a + b),
1033            BinaryOp::Sub => self.eval_arithmetic(left, right, |a, b| a - b),
1034            BinaryOp::Mul => self.eval_arithmetic(left, right, |a, b| a * b),
1035            BinaryOp::Div => {
1036                // Check for division by zero
1037                match right {
1038                    Value::Int32(0) | Value::Int64(0) => Value::Null,
1039                    Value::Float64(f) if *f == 0.0 => Value::Null,
1040                    _ => self.eval_arithmetic(left, right, |a, b| if b != 0.0 { a / b } else { 0.0 }),
1041                }
1042            }
1043            BinaryOp::Mod => {
1044                match (left, right) {
1045                    (Value::Int64(a), Value::Int64(b)) if *b != 0 => Value::Int64(a % b),
1046                    (Value::Int32(a), Value::Int32(b)) if *b != 0 => Value::Int32(a % b),
1047                    _ => Value::Null,
1048                }
1049            }
1050            BinaryOp::Like | BinaryOp::In | BinaryOp::Between => {
1051                // These are handled specially in eval_expr
1052                Value::Null
1053            }
1054        }
1055    }
1056
1057    fn eval_arithmetic<F>(&self, left: &Value, right: &Value, op: F) -> Value
1058    where
1059        F: Fn(f64, f64) -> f64,
1060    {
1061        let l = match left {
1062            Value::Int32(i) => *i as f64,
1063            Value::Int64(i) => *i as f64,
1064            Value::Float64(f) => *f,
1065            _ => return Value::Null,
1066        };
1067        let r = match right {
1068            Value::Int32(i) => *i as f64,
1069            Value::Int64(i) => *i as f64,
1070            Value::Float64(f) => *f,
1071            _ => return Value::Null,
1072        };
1073
1074        let result = op(l, r);
1075
1076        // Preserve integer type if both inputs are integers
1077        match (left, right) {
1078            (Value::Int64(_), Value::Int64(_)) => Value::Int64(result as i64),
1079            (Value::Int32(_), Value::Int32(_)) => Value::Int32(result as i32),
1080            _ => Value::Float64(result),
1081        }
1082    }
1083
1084    fn eval_unary_op(&self, op: UnaryOp, value: &Value) -> Value {
1085        match op {
1086            UnaryOp::Not => match value {
1087                Value::Boolean(b) => Value::Boolean(!b),
1088                Value::Null => Value::Null,
1089                _ => Value::Null,
1090            },
1091            UnaryOp::Neg => match value {
1092                Value::Int32(i) => Value::Int32(-i),
1093                Value::Int64(i) => Value::Int64(-i),
1094                Value::Float64(f) => Value::Float64(-f),
1095                _ => Value::Null,
1096            },
1097            UnaryOp::IsNull => Value::Boolean(value.is_null()),
1098            UnaryOp::IsNotNull => Value::Boolean(!value.is_null()),
1099        }
1100    }
1101
1102    fn eval_function(&self, name: &str, args: &[Value]) -> Value {
1103        match name.to_uppercase().as_str() {
1104            "ABS" => {
1105                if let Some(v) = args.first() {
1106                    match v {
1107                        Value::Int32(i) => Value::Int32(i.abs()),
1108                        Value::Int64(i) => Value::Int64(i.abs()),
1109                        Value::Float64(f) => Value::Float64(f.abs()),
1110                        _ => Value::Null,
1111                    }
1112                } else {
1113                    Value::Null
1114                }
1115            }
1116            "UPPER" => {
1117                if let Some(Value::String(s)) = args.first() {
1118                    Value::String(s.to_uppercase().into())
1119                } else {
1120                    Value::Null
1121                }
1122            }
1123            "LOWER" => {
1124                if let Some(Value::String(s)) = args.first() {
1125                    Value::String(s.to_lowercase().into())
1126                } else {
1127                    Value::Null
1128                }
1129            }
1130            "LENGTH" => {
1131                if let Some(Value::String(s)) = args.first() {
1132                    Value::Int64(s.len() as i64)
1133                } else {
1134                    Value::Null
1135                }
1136            }
1137            "COALESCE" => {
1138                for arg in args {
1139                    if !arg.is_null() {
1140                        return arg.clone();
1141                    }
1142                }
1143                Value::Null
1144            }
1145            // JSONB path equality: jsonb_path_eq(jsonb_value, path, expected_value)
1146            "JSONB_PATH_EQ" => {
1147                if args.len() >= 3 {
1148                    if let (Value::Jsonb(jsonb), Value::String(path)) = (&args[0], &args[1]) {
1149                        let expected = &args[2];
1150                        return self.jsonb_path_eq(jsonb, path, expected);
1151                    }
1152                }
1153                Value::Boolean(false)
1154            }
1155            // JSONB contains: jsonb_contains(jsonb_value, path)
1156            "JSONB_CONTAINS" => {
1157                if args.len() >= 2 {
1158                    if let (Value::Jsonb(jsonb), Value::String(path)) = (&args[0], &args[1]) {
1159                        return self.jsonb_path_exists(jsonb, path);
1160                    }
1161                }
1162                Value::Boolean(false)
1163            }
1164            // JSONB exists: jsonb_exists(jsonb_value, path)
1165            "JSONB_EXISTS" => {
1166                if args.len() >= 2 {
1167                    if let (Value::Jsonb(jsonb), Value::String(path)) = (&args[0], &args[1]) {
1168                        return self.jsonb_path_exists(jsonb, path);
1169                    }
1170                }
1171                Value::Boolean(false)
1172            }
1173            _ => Value::Null,
1174        }
1175    }
1176
1177    fn match_like_pattern(&self, value: &str, pattern: &str) -> bool {
1178        // Simple LIKE pattern matching with % and _ wildcards
1179        let pattern_chars: Vec<char> = pattern.chars().collect();
1180        let value_chars: Vec<char> = value.chars().collect();
1181        self.match_like_recursive(&value_chars, &pattern_chars, 0, 0)
1182    }
1183
1184    /// Matches a regex pattern against a string value.
1185    /// Supports a subset of regex: . * + ? ^ $ [] [^] | () \d \w \s
1186    fn match_regex_pattern(&self, value: &str, pattern: &str) -> bool {
1187        let compiled = match self.compile_regex(pattern) {
1188            Some(c) => c,
1189            None => return false,
1190        };
1191        self.regex_match_compiled(value, &compiled)
1192    }
1193
1194    /// Compiled regex instruction
1195    fn compile_regex(&self, pattern: &str) -> Option<Vec<RegexOp>> {
1196        let mut ops = Vec::new();
1197        let mut chars = pattern.chars().peekable();
1198        let mut in_group = false;
1199
1200        while let Some(c) = chars.next() {
1201            match c {
1202                '^' if ops.is_empty() => ops.push(RegexOp::Start),
1203                '$' if chars.peek().is_none() => ops.push(RegexOp::End),
1204                '.' => self.apply_quantifier(&mut chars, &mut ops, RegexOp::Any),
1205                '*' | '+' | '?' => return None, // Invalid: quantifier without preceding element
1206                '\\' => {
1207                    let escaped = chars.next()?;
1208                    let op = match escaped {
1209                        'd' => RegexOp::Digit,
1210                        'D' => RegexOp::NonDigit,
1211                        'w' => RegexOp::Word,
1212                        'W' => RegexOp::NonWord,
1213                        's' => RegexOp::Whitespace,
1214                        'S' => RegexOp::NonWhitespace,
1215                        'n' => RegexOp::Char('\n'),
1216                        't' => RegexOp::Char('\t'),
1217                        'r' => RegexOp::Char('\r'),
1218                        _ => RegexOp::Char(escaped),
1219                    };
1220                    self.apply_quantifier(&mut chars, &mut ops, op);
1221                }
1222                '[' => {
1223                    let (class_op, negated) = self.parse_char_class(&mut chars)?;
1224                    let op = if negated {
1225                        RegexOp::NegCharClass(class_op)
1226                    } else {
1227                        RegexOp::CharClass(class_op)
1228                    };
1229                    self.apply_quantifier(&mut chars, &mut ops, op);
1230                }
1231                '(' => {
1232                    in_group = true;
1233                    ops.push(RegexOp::GroupStart);
1234                }
1235                ')' => {
1236                    if !in_group {
1237                        return None;
1238                    }
1239                    in_group = false;
1240                    ops.push(RegexOp::GroupEnd);
1241                    // Check for quantifier after group
1242                    if let Some(&q) = chars.peek() {
1243                        match q {
1244                            '*' => {
1245                                chars.next();
1246                                ops.push(RegexOp::GroupStar);
1247                            }
1248                            '+' => {
1249                                chars.next();
1250                                ops.push(RegexOp::GroupPlus);
1251                            }
1252                            '?' => {
1253                                chars.next();
1254                                ops.push(RegexOp::GroupQuestion);
1255                            }
1256                            _ => {}
1257                        }
1258                    }
1259                }
1260                '|' => ops.push(RegexOp::Alternation),
1261                _ => self.apply_quantifier(&mut chars, &mut ops, RegexOp::Char(c)),
1262            }
1263        }
1264
1265        if in_group {
1266            return None; // Unclosed group
1267        }
1268
1269        Some(ops)
1270    }
1271
1272    fn apply_quantifier(
1273        &self,
1274        chars: &mut core::iter::Peekable<core::str::Chars>,
1275        ops: &mut Vec<RegexOp>,
1276        base_op: RegexOp,
1277    ) {
1278        if let Some(&q) = chars.peek() {
1279            match q {
1280                '*' => {
1281                    chars.next();
1282                    ops.push(RegexOp::Star(Box::new(base_op)));
1283                }
1284                '+' => {
1285                    chars.next();
1286                    ops.push(RegexOp::Plus(Box::new(base_op)));
1287                }
1288                '?' => {
1289                    chars.next();
1290                    ops.push(RegexOp::Question(Box::new(base_op)));
1291                }
1292                _ => ops.push(base_op),
1293            }
1294        } else {
1295            ops.push(base_op);
1296        }
1297    }
1298
1299    fn parse_char_class(
1300        &self,
1301        chars: &mut core::iter::Peekable<core::str::Chars>,
1302    ) -> Option<(Vec<CharClassItem>, bool)> {
1303        let mut items = Vec::new();
1304        let negated = chars.peek() == Some(&'^');
1305        if negated {
1306            chars.next();
1307        }
1308
1309        while let Some(c) = chars.next() {
1310            if c == ']' {
1311                return Some((items, negated));
1312            }
1313            if c == '\\' {
1314                let escaped = chars.next()?;
1315                items.push(CharClassItem::Char(match escaped {
1316                    'n' => '\n',
1317                    't' => '\t',
1318                    'r' => '\r',
1319                    'd' => {
1320                        items.push(CharClassItem::Range('0', '9'));
1321                        continue;
1322                    }
1323                    'w' => {
1324                        items.push(CharClassItem::Range('a', 'z'));
1325                        items.push(CharClassItem::Range('A', 'Z'));
1326                        items.push(CharClassItem::Range('0', '9'));
1327                        items.push(CharClassItem::Char('_'));
1328                        continue;
1329                    }
1330                    's' => {
1331                        items.push(CharClassItem::Char(' '));
1332                        items.push(CharClassItem::Char('\t'));
1333                        items.push(CharClassItem::Char('\n'));
1334                        items.push(CharClassItem::Char('\r'));
1335                        continue;
1336                    }
1337                    _ => escaped,
1338                }));
1339            } else if chars.peek() == Some(&'-') {
1340                chars.next(); // consume '-'
1341                if let Some(end) = chars.next() {
1342                    if end == ']' {
1343                        items.push(CharClassItem::Char(c));
1344                        items.push(CharClassItem::Char('-'));
1345                        return Some((items, negated));
1346                    }
1347                    items.push(CharClassItem::Range(c, end));
1348                } else {
1349                    items.push(CharClassItem::Char(c));
1350                    items.push(CharClassItem::Char('-'));
1351                }
1352            } else {
1353                items.push(CharClassItem::Char(c));
1354            }
1355        }
1356        None // Unclosed character class
1357    }
1358
1359    fn regex_match_compiled(&self, value: &str, ops: &[RegexOp]) -> bool {
1360        let chars: Vec<char> = value.chars().collect();
1361
1362        // Handle alternation by splitting into alternatives
1363        let alternatives = self.split_alternatives(ops);
1364        if alternatives.len() > 1 {
1365            return alternatives.iter().any(|alt| self.regex_match_ops(&chars, alt, 0, 0));
1366        }
1367
1368        // Check if pattern requires start anchor
1369        let has_start = ops.first() == Some(&RegexOp::Start);
1370        let ops_to_match = if has_start { &ops[1..] } else { ops };
1371
1372        if has_start {
1373            self.regex_match_ops(&chars, ops_to_match, 0, 0)
1374        } else {
1375            // Try matching at each position
1376            for start in 0..=chars.len() {
1377                if self.regex_match_ops(&chars, ops_to_match, start, 0) {
1378                    return true;
1379                }
1380            }
1381            false
1382        }
1383    }
1384
1385    fn split_alternatives(&self, ops: &[RegexOp]) -> Vec<Vec<RegexOp>> {
1386        let mut alternatives = Vec::new();
1387        let mut current = Vec::new();
1388        let mut depth = 0;
1389
1390        for op in ops {
1391            match op {
1392                RegexOp::GroupStart => {
1393                    depth += 1;
1394                    current.push(op.clone());
1395                }
1396                RegexOp::GroupEnd => {
1397                    depth -= 1;
1398                    current.push(op.clone());
1399                }
1400                RegexOp::Alternation if depth == 0 => {
1401                    alternatives.push(core::mem::take(&mut current));
1402                }
1403                _ => current.push(op.clone()),
1404            }
1405        }
1406        alternatives.push(current);
1407        alternatives
1408    }
1409
1410    fn regex_match_ops(&self, chars: &[char], ops: &[RegexOp], pos: usize, op_idx: usize) -> bool {
1411        if op_idx >= ops.len() {
1412            return true; // All ops matched
1413        }
1414
1415        let op = &ops[op_idx];
1416        match op {
1417            RegexOp::End => pos == chars.len(),
1418            RegexOp::Start => self.regex_match_ops(chars, ops, pos, op_idx + 1),
1419            RegexOp::Char(c) => {
1420                pos < chars.len()
1421                    && chars[pos] == *c
1422                    && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1423            }
1424            RegexOp::Any => {
1425                pos < chars.len() && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1426            }
1427            RegexOp::Digit => {
1428                pos < chars.len()
1429                    && chars[pos].is_ascii_digit()
1430                    && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1431            }
1432            RegexOp::NonDigit => {
1433                pos < chars.len()
1434                    && !chars[pos].is_ascii_digit()
1435                    && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1436            }
1437            RegexOp::Word => {
1438                pos < chars.len()
1439                    && (chars[pos].is_ascii_alphanumeric() || chars[pos] == '_')
1440                    && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1441            }
1442            RegexOp::NonWord => {
1443                pos < chars.len()
1444                    && !(chars[pos].is_ascii_alphanumeric() || chars[pos] == '_')
1445                    && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1446            }
1447            RegexOp::Whitespace => {
1448                pos < chars.len()
1449                    && chars[pos].is_ascii_whitespace()
1450                    && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1451            }
1452            RegexOp::NonWhitespace => {
1453                pos < chars.len()
1454                    && !chars[pos].is_ascii_whitespace()
1455                    && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1456            }
1457            RegexOp::CharClass(items) => {
1458                pos < chars.len()
1459                    && self.char_matches_class(chars[pos], items)
1460                    && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1461            }
1462            RegexOp::NegCharClass(items) => {
1463                pos < chars.len()
1464                    && !self.char_matches_class(chars[pos], items)
1465                    && self.regex_match_ops(chars, ops, pos + 1, op_idx + 1)
1466            }
1467            RegexOp::Star(inner) => {
1468                // Try matching zero or more times (greedy)
1469                let mut p = pos;
1470                let mut positions = vec![p];
1471                while p < chars.len() && self.single_op_matches(chars[p], inner) {
1472                    p += 1;
1473                    positions.push(p);
1474                }
1475                // Try from longest match to shortest
1476                for &try_pos in positions.iter().rev() {
1477                    if self.regex_match_ops(chars, ops, try_pos, op_idx + 1) {
1478                        return true;
1479                    }
1480                }
1481                false
1482            }
1483            RegexOp::Plus(inner) => {
1484                // Must match at least once
1485                if pos >= chars.len() || !self.single_op_matches(chars[pos], inner) {
1486                    return false;
1487                }
1488                let mut p = pos + 1;
1489                let mut positions = vec![p];
1490                while p < chars.len() && self.single_op_matches(chars[p], inner) {
1491                    p += 1;
1492                    positions.push(p);
1493                }
1494                for &try_pos in positions.iter().rev() {
1495                    if self.regex_match_ops(chars, ops, try_pos, op_idx + 1) {
1496                        return true;
1497                    }
1498                }
1499                false
1500            }
1501            RegexOp::Question(inner) => {
1502                // Try matching once, then zero times
1503                if pos < chars.len() && self.single_op_matches(chars[pos], inner) {
1504                    if self.regex_match_ops(chars, ops, pos + 1, op_idx + 1) {
1505                        return true;
1506                    }
1507                }
1508                self.regex_match_ops(chars, ops, pos, op_idx + 1)
1509            }
1510            RegexOp::GroupStart | RegexOp::GroupEnd => {
1511                self.regex_match_ops(chars, ops, pos, op_idx + 1)
1512            }
1513            RegexOp::GroupStar | RegexOp::GroupPlus | RegexOp::GroupQuestion => {
1514                // Group quantifiers are handled during compilation
1515                self.regex_match_ops(chars, ops, pos, op_idx + 1)
1516            }
1517            RegexOp::Alternation => {
1518                // Should be handled by split_alternatives
1519                self.regex_match_ops(chars, ops, pos, op_idx + 1)
1520            }
1521        }
1522    }
1523
1524    fn single_op_matches(&self, c: char, op: &RegexOp) -> bool {
1525        match op {
1526            RegexOp::Char(expected) => c == *expected,
1527            RegexOp::Any => true,
1528            RegexOp::Digit => c.is_ascii_digit(),
1529            RegexOp::NonDigit => !c.is_ascii_digit(),
1530            RegexOp::Word => c.is_ascii_alphanumeric() || c == '_',
1531            RegexOp::NonWord => !(c.is_ascii_alphanumeric() || c == '_'),
1532            RegexOp::Whitespace => c.is_ascii_whitespace(),
1533            RegexOp::NonWhitespace => !c.is_ascii_whitespace(),
1534            RegexOp::CharClass(items) => self.char_matches_class(c, items),
1535            RegexOp::NegCharClass(items) => !self.char_matches_class(c, items),
1536            _ => false,
1537        }
1538    }
1539
1540    fn char_matches_class(&self, c: char, items: &[CharClassItem]) -> bool {
1541        items.iter().any(|item| match item {
1542            CharClassItem::Char(ch) => c == *ch,
1543            CharClassItem::Range(start, end) => c >= *start && c <= *end,
1544        })
1545    }
1546
1547    fn match_like_recursive(
1548        &self,
1549        value: &[char],
1550        pattern: &[char],
1551        vi: usize,
1552        pi: usize,
1553    ) -> bool {
1554        if pi >= pattern.len() {
1555            return vi >= value.len();
1556        }
1557
1558        match pattern[pi] {
1559            '%' => {
1560                // % matches zero or more characters
1561                for i in vi..=value.len() {
1562                    if self.match_like_recursive(value, pattern, i, pi + 1) {
1563                        return true;
1564                    }
1565                }
1566                false
1567            }
1568            '_' => {
1569                // _ matches exactly one character
1570                if vi < value.len() {
1571                    self.match_like_recursive(value, pattern, vi + 1, pi + 1)
1572                } else {
1573                    false
1574                }
1575            }
1576            c => {
1577                // Literal character match
1578                if vi < value.len() && value[vi] == c {
1579                    self.match_like_recursive(value, pattern, vi + 1, pi + 1)
1580                } else {
1581                    false
1582                }
1583            }
1584        }
1585    }
1586
1587    // ========== JSONB Helper Methods ==========
1588
1589    /// Parses JSON string bytes to JsonbValue.
1590    fn parse_json_bytes(&self, bytes: &[u8]) -> Option<JsonbValue> {
1591        let json_str = core::str::from_utf8(bytes).ok()?;
1592        self.parse_json_str(json_str)
1593    }
1594
1595    /// Parses a JSON string to JsonbValue.
1596    fn parse_json_str(&self, s: &str) -> Option<JsonbValue> {
1597        // Simple JSON parser for common types
1598        let s = s.trim();
1599        if s == "null" {
1600            return Some(JsonbValue::Null);
1601        }
1602        if s == "true" {
1603            return Some(JsonbValue::Bool(true));
1604        }
1605        if s == "false" {
1606            return Some(JsonbValue::Bool(false));
1607        }
1608        // Try parsing as number
1609        if let Ok(n) = s.parse::<f64>() {
1610            return Some(JsonbValue::Number(n));
1611        }
1612        // Try parsing as string (quoted)
1613        if s.starts_with('"') && s.ends_with('"') && s.len() >= 2 {
1614            let inner = &s[1..s.len() - 1];
1615            // Handle escape sequences
1616            let unescaped = self.unescape_json_string(inner);
1617            return Some(JsonbValue::String(unescaped));
1618        }
1619        // Try parsing as object
1620        if s.starts_with('{') && s.ends_with('}') {
1621            return self.parse_json_object(s);
1622        }
1623        // Try parsing as array
1624        if s.starts_with('[') && s.ends_with(']') {
1625            return self.parse_json_array(s);
1626        }
1627        None
1628    }
1629
1630    /// Unescapes a JSON string.
1631    fn unescape_json_string(&self, s: &str) -> String {
1632        let mut result = String::new();
1633        let mut chars = s.chars().peekable();
1634        while let Some(c) = chars.next() {
1635            if c == '\\' {
1636                if let Some(&next) = chars.peek() {
1637                    match next {
1638                        '"' | '\\' | '/' => {
1639                            result.push(next);
1640                            chars.next();
1641                        }
1642                        'n' => {
1643                            result.push('\n');
1644                            chars.next();
1645                        }
1646                        'r' => {
1647                            result.push('\r');
1648                            chars.next();
1649                        }
1650                        't' => {
1651                            result.push('\t');
1652                            chars.next();
1653                        }
1654                        _ => result.push(c),
1655                    }
1656                } else {
1657                    result.push(c);
1658                }
1659            } else {
1660                result.push(c);
1661            }
1662        }
1663        result
1664    }
1665
1666    /// Parses a JSON object string.
1667    fn parse_json_object(&self, s: &str) -> Option<JsonbValue> {
1668        let inner = s[1..s.len() - 1].trim();
1669        if inner.is_empty() {
1670            return Some(JsonbValue::Object(JsonbObject::new()));
1671        }
1672
1673        let mut obj = JsonbObject::new();
1674        let mut depth = 0;
1675        let mut in_string = false;
1676        let mut escape = false;
1677        let mut start = 0;
1678
1679        for (i, c) in inner.char_indices() {
1680            if escape {
1681                escape = false;
1682                continue;
1683            }
1684            match c {
1685                '\\' if in_string => escape = true,
1686                '"' => in_string = !in_string,
1687                '{' | '[' if !in_string => depth += 1,
1688                '}' | ']' if !in_string => depth -= 1,
1689                ',' if !in_string && depth == 0 => {
1690                    if let Some((k, v)) = self.parse_json_kv(&inner[start..i]) {
1691                        obj.insert(k, v);
1692                    }
1693                    start = i + 1;
1694                }
1695                _ => {}
1696            }
1697        }
1698        // Parse last entry
1699        if start < inner.len() {
1700            if let Some((k, v)) = self.parse_json_kv(&inner[start..]) {
1701                obj.insert(k, v);
1702            }
1703        }
1704
1705        Some(JsonbValue::Object(obj))
1706    }
1707
1708    /// Parses a JSON key-value pair.
1709    fn parse_json_kv(&self, s: &str) -> Option<(String, JsonbValue)> {
1710        let s = s.trim();
1711        let colon_pos = s.find(':')?;
1712        let key_part = s[..colon_pos].trim();
1713        let value_part = s[colon_pos + 1..].trim();
1714
1715        // Key should be a quoted string
1716        if key_part.starts_with('"') && key_part.ends_with('"') && key_part.len() >= 2 {
1717            let key = self.unescape_json_string(&key_part[1..key_part.len() - 1]);
1718            let value = self.parse_json_str(value_part)?;
1719            Some((key, value))
1720        } else {
1721            None
1722        }
1723    }
1724
1725    /// Parses a JSON array string.
1726    fn parse_json_array(&self, s: &str) -> Option<JsonbValue> {
1727        let inner = s[1..s.len() - 1].trim();
1728        if inner.is_empty() {
1729            return Some(JsonbValue::Array(Vec::new()));
1730        }
1731
1732        let mut items = Vec::new();
1733        let mut depth = 0;
1734        let mut in_string = false;
1735        let mut escape = false;
1736        let mut start = 0;
1737
1738        for (i, c) in inner.char_indices() {
1739            if escape {
1740                escape = false;
1741                continue;
1742            }
1743            match c {
1744                '\\' if in_string => escape = true,
1745                '"' => in_string = !in_string,
1746                '{' | '[' if !in_string => depth += 1,
1747                '}' | ']' if !in_string => depth -= 1,
1748                ',' if !in_string && depth == 0 => {
1749                    if let Some(v) = self.parse_json_str(inner[start..i].trim()) {
1750                        items.push(v);
1751                    }
1752                    start = i + 1;
1753                }
1754                _ => {}
1755            }
1756        }
1757        // Parse last item
1758        if start < inner.len() {
1759            if let Some(v) = self.parse_json_str(inner[start..].trim()) {
1760                items.push(v);
1761            }
1762        }
1763
1764        Some(JsonbValue::Array(items))
1765    }
1766
1767    /// Evaluates a JSONB path equality expression.
1768    fn jsonb_path_eq(&self, jsonb: &cynos_core::JsonbValue, path: &str, expected: &Value) -> Value {
1769        // Parse the JSON string bytes to JsonbValue
1770        let json_value = match self.parse_json_bytes(&jsonb.0) {
1771            Some(v) => v,
1772            None => return Value::Boolean(false),
1773        };
1774
1775        // Parse the JSONPath
1776        let json_path = match JsonPath::parse(path) {
1777            Ok(p) => p,
1778            Err(_) => return Value::Boolean(false),
1779        };
1780
1781        // Query the path
1782        let results = json_value.query(&json_path);
1783        if results.is_empty() {
1784            return Value::Boolean(false);
1785        }
1786
1787        // Compare the first result with the expected value
1788        let actual = results[0];
1789        Value::Boolean(self.compare_jsonb_value(actual, expected))
1790    }
1791
1792    /// Checks if a JSONB path exists.
1793    fn jsonb_path_exists(&self, jsonb: &cynos_core::JsonbValue, path: &str) -> Value {
1794        // Parse the JSON string bytes to JsonbValue
1795        let json_value = match self.parse_json_bytes(&jsonb.0) {
1796            Some(v) => v,
1797            None => return Value::Boolean(false),
1798        };
1799
1800        // Parse the JSONPath
1801        let json_path = match JsonPath::parse(path) {
1802            Ok(p) => p,
1803            Err(_) => return Value::Boolean(false),
1804        };
1805
1806        // Query the path
1807        let results = json_value.query(&json_path);
1808        Value::Boolean(!results.is_empty())
1809    }
1810
1811    /// Compares a JsonbValue with a Value.
1812    fn compare_jsonb_value(&self, jsonb: &JsonbValue, value: &Value) -> bool {
1813        match (jsonb, value) {
1814            (JsonbValue::Null, Value::Null) => true,
1815            (JsonbValue::Bool(a), Value::Boolean(b)) => a == b,
1816            (JsonbValue::Number(a), Value::Int32(b)) => (*a - *b as f64).abs() < f64::EPSILON,
1817            (JsonbValue::Number(a), Value::Int64(b)) => (*a - *b as f64).abs() < f64::EPSILON,
1818            (JsonbValue::Number(a), Value::Float64(b)) => (*a - *b).abs() < f64::EPSILON,
1819            (JsonbValue::String(a), Value::String(b)) => a == b,
1820            _ => false,
1821        }
1822    }
1823
1824    // ========== Helper Methods ==========
1825
1826    /// Extracts join key column indices from a join condition.
1827    fn extract_join_keys(
1828        &self,
1829        condition: &Expr,
1830        left: &Relation,
1831        right: &Relation,
1832    ) -> ExecutionResult<(usize, usize)> {
1833        if let Expr::BinaryOp {
1834            left: left_expr,
1835            op: BinaryOp::Eq,
1836            right: right_expr,
1837        } = condition
1838        {
1839            let left_col = self.extract_column_ref(left_expr)?;
1840            let right_col = self.extract_column_ref(right_expr)?;
1841
1842            // Determine which column belongs to which relation
1843            let left_tables = left.tables();
1844            let right_tables = right.tables();
1845            let left_ctx = EvalContext::new(left_tables, left.table_column_counts());
1846            let right_ctx = EvalContext::new(right_tables, right.table_column_counts());
1847
1848            if left_tables.contains(&left_col.table) && right_tables.contains(&right_col.table) {
1849                // left_col belongs to left relation, right_col belongs to right relation
1850                let left_idx = left_ctx.resolve_column_index(&left_col.table, left_col.index);
1851                let right_idx = right_ctx.resolve_column_index(&right_col.table, right_col.index);
1852                Ok((left_idx, right_idx))
1853            } else if left_tables.contains(&right_col.table)
1854                && right_tables.contains(&left_col.table)
1855            {
1856                // right_col belongs to left relation, left_col belongs to right relation
1857                let left_idx = left_ctx.resolve_column_index(&right_col.table, right_col.index);
1858                let right_idx = right_ctx.resolve_column_index(&left_col.table, left_col.index);
1859                Ok((left_idx, right_idx))
1860            } else {
1861                Err(ExecutionError::InvalidOperation(
1862                    "Join columns do not match relation tables".into(),
1863                ))
1864            }
1865        } else {
1866            Err(ExecutionError::InvalidOperation(
1867                "Expected equi-join condition".into(),
1868            ))
1869        }
1870    }
1871
1872    fn extract_outer_key_index(
1873        &self,
1874        condition: &Expr,
1875        outer: &Relation,
1876    ) -> ExecutionResult<usize> {
1877        if let Expr::BinaryOp {
1878            left: left_expr,
1879            op: BinaryOp::Eq,
1880            right: right_expr,
1881        } = condition
1882        {
1883            let left_col = self.extract_column_ref(left_expr)?;
1884            let right_col = self.extract_column_ref(right_expr)?;
1885
1886            let outer_tables = outer.tables();
1887
1888            if outer_tables.contains(&left_col.table) {
1889                Ok(left_col.index)
1890            } else if outer_tables.contains(&right_col.table) {
1891                Ok(right_col.index)
1892            } else {
1893                Err(ExecutionError::InvalidOperation(
1894                    "Outer key column not found in outer relation".into(),
1895                ))
1896            }
1897        } else {
1898            Err(ExecutionError::InvalidOperation(
1899                "Expected equi-join condition".into(),
1900            ))
1901        }
1902    }
1903
1904    fn extract_column_ref<'b>(&self, expr: &'b Expr) -> ExecutionResult<&'b ColumnRef> {
1905        if let Expr::Column(col) = expr {
1906            Ok(col)
1907        } else {
1908            Err(ExecutionError::InvalidOperation(
1909                "Expected column reference".into(),
1910            ))
1911        }
1912    }
1913
1914    /// Adjusts column indices in a join condition for nested loop join evaluation.
1915    /// Right table columns need to be offset by the left table's column count
1916    /// since the combined row has left columns first, then right columns.
1917    fn adjust_join_condition_indices(
1918        &self,
1919        condition: &Expr,
1920        left_tables: &[String],
1921        right_tables: &[String],
1922        left_col_count: usize,
1923    ) -> Expr {
1924        match condition {
1925            Expr::Column(col) => {
1926                // Check if this column belongs to the right table
1927                if right_tables.contains(&col.table) {
1928                    // Adjust index by adding left table column count
1929                    Expr::Column(ColumnRef {
1930                        table: col.table.clone(),
1931                        column: col.column.clone(),
1932                        index: col.index + left_col_count,
1933                    })
1934                } else {
1935                    // Left table column, keep as is
1936                    condition.clone()
1937                }
1938            }
1939            Expr::BinaryOp { left, op, right } => {
1940                let adjusted_left = self.adjust_join_condition_indices(left, left_tables, right_tables, left_col_count);
1941                let adjusted_right = self.adjust_join_condition_indices(right, left_tables, right_tables, left_col_count);
1942                Expr::BinaryOp {
1943                    left: Box::new(adjusted_left),
1944                    op: *op,
1945                    right: Box::new(adjusted_right),
1946                }
1947            }
1948            Expr::UnaryOp { op, expr } => {
1949                let adjusted_expr = self.adjust_join_condition_indices(expr, left_tables, right_tables, left_col_count);
1950                Expr::UnaryOp {
1951                    op: *op,
1952                    expr: Box::new(adjusted_expr),
1953                }
1954            }
1955            // For other expression types, return as is
1956            _ => condition.clone(),
1957        }
1958    }
1959}
1960
1961/// A simple in-memory data source for testing and simple use cases.
1962#[derive(Default)]
1963pub struct InMemoryDataSource {
1964    tables: BTreeMap<String, TableData>,
1965}
1966
1967/// Data for a single table.
1968#[derive(Default)]
1969struct TableData {
1970    rows: Vec<Rc<Row>>,
1971    indexes: BTreeMap<String, IndexData>,
1972    column_count: usize,
1973}
1974
1975/// Data for a single index.
1976struct IndexData {
1977    /// Maps key values to row indices.
1978    key_to_rows: BTreeMap<Value, Vec<usize>>,
1979}
1980
1981impl InMemoryDataSource {
1982    /// Creates a new empty in-memory data source.
1983    pub fn new() -> Self {
1984        Self::default()
1985    }
1986
1987    /// Adds a table with the given rows.
1988    pub fn add_table(&mut self, name: impl Into<String>, rows: Vec<Row>, column_count: usize) {
1989        self.tables.insert(
1990            name.into(),
1991            TableData {
1992                rows: rows.into_iter().map(Rc::new).collect(),
1993                indexes: BTreeMap::new(),
1994                column_count,
1995            },
1996        );
1997    }
1998
1999    /// Creates an index on a table column.
2000    pub fn create_index(
2001        &mut self,
2002        table: &str,
2003        index_name: impl Into<String>,
2004        column_index: usize,
2005    ) -> ExecutionResult<()> {
2006        let table_data = self
2007            .tables
2008            .get_mut(table)
2009            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
2010
2011        let mut key_to_rows: BTreeMap<Value, Vec<usize>> = BTreeMap::new();
2012
2013        for (row_idx, row) in table_data.rows.iter().enumerate() {
2014            if let Some(key) = row.get(column_index) {
2015                key_to_rows.entry(key.clone()).or_default().push(row_idx);
2016            }
2017        }
2018
2019        table_data
2020            .indexes
2021            .insert(index_name.into(), IndexData { key_to_rows });
2022
2023        Ok(())
2024    }
2025}
2026
2027impl DataSource for InMemoryDataSource {
2028    fn get_table_rows(&self, table: &str) -> ExecutionResult<Vec<Rc<Row>>> {
2029        self.tables
2030            .get(table)
2031            .map(|t| t.rows.clone())
2032            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))
2033    }
2034
2035    fn get_index_range(
2036        &self,
2037        table: &str,
2038        index: &str,
2039        range_start: Option<&Value>,
2040        range_end: Option<&Value>,
2041        include_start: bool,
2042        include_end: bool,
2043    ) -> ExecutionResult<Vec<Rc<Row>>> {
2044        let table_data = self
2045            .tables
2046            .get(table)
2047            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
2048
2049        let index_data = table_data.indexes.get(index).ok_or_else(|| {
2050            ExecutionError::IndexNotFound {
2051                table: table.into(),
2052                index: index.into(),
2053            }
2054        })?;
2055
2056        let mut result = Vec::new();
2057
2058        for (key, row_indices) in &index_data.key_to_rows {
2059            let in_range = match (range_start, range_end) {
2060                (Some(start), Some(end)) => {
2061                    let start_ok = if include_start {
2062                        key >= start
2063                    } else {
2064                        key > start
2065                    };
2066                    let end_ok = if include_end { key <= end } else { key < end };
2067                    start_ok && end_ok
2068                }
2069                (Some(start), None) => {
2070                    if include_start {
2071                        key >= start
2072                    } else {
2073                        key > start
2074                    }
2075                }
2076                (None, Some(end)) => {
2077                    if include_end {
2078                        key <= end
2079                    } else {
2080                        key < end
2081                    }
2082                }
2083                (None, None) => true,
2084            };
2085
2086            if in_range {
2087                for &idx in row_indices {
2088                    result.push(Rc::clone(&table_data.rows[idx]));
2089                }
2090            }
2091        }
2092
2093        Ok(result)
2094    }
2095
2096    fn get_index_range_with_limit(
2097        &self,
2098        table: &str,
2099        index: &str,
2100        range_start: Option<&Value>,
2101        range_end: Option<&Value>,
2102        include_start: bool,
2103        include_end: bool,
2104        limit: Option<usize>,
2105        offset: usize,
2106        reverse: bool,
2107    ) -> ExecutionResult<Vec<Rc<Row>>> {
2108        let table_data = self
2109            .tables
2110            .get(table)
2111            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
2112
2113        let index_data = table_data.indexes.get(index).ok_or_else(|| {
2114            ExecutionError::IndexNotFound {
2115                table: table.into(),
2116                index: index.into(),
2117            }
2118        })?;
2119
2120        // Collect keys in range first
2121        let keys_in_range: Vec<&Value> = index_data
2122            .key_to_rows
2123            .keys()
2124            .filter(|key| {
2125                match (range_start, range_end) {
2126                    (Some(start), Some(end)) => {
2127                        let start_ok = if include_start {
2128                            *key >= start
2129                        } else {
2130                            *key > start
2131                        };
2132                        let end_ok = if include_end { *key <= end } else { *key < end };
2133                        start_ok && end_ok
2134                    }
2135                    (Some(start), None) => {
2136                        if include_start {
2137                            *key >= start
2138                        } else {
2139                            *key > start
2140                        }
2141                    }
2142                    (None, Some(end)) => {
2143                        if include_end {
2144                            *key <= end
2145                        } else {
2146                            *key < end
2147                        }
2148                    }
2149                    (None, None) => true,
2150                }
2151            })
2152            .collect();
2153
2154        let mut result = Vec::new();
2155        let mut skipped = 0;
2156        let mut collected = 0;
2157
2158        // Iterate in forward or reverse order based on the reverse flag
2159        let iter: Box<dyn Iterator<Item = &&Value>> = if reverse {
2160            Box::new(keys_in_range.iter().rev())
2161        } else {
2162            Box::new(keys_in_range.iter())
2163        };
2164
2165        for key in iter {
2166            if let Some(row_indices) = index_data.key_to_rows.get(*key) {
2167                for &idx in row_indices {
2168                    // Apply offset
2169                    if skipped < offset {
2170                        skipped += 1;
2171                        continue;
2172                    }
2173                    // Apply limit
2174                    if let Some(lim) = limit {
2175                        if collected >= lim {
2176                            return Ok(result);
2177                        }
2178                    }
2179                    result.push(Rc::clone(&table_data.rows[idx]));
2180                    collected += 1;
2181                }
2182            }
2183        }
2184
2185        Ok(result)
2186    }
2187
2188    fn get_index_point(&self, table: &str, index: &str, key: &Value) -> ExecutionResult<Vec<Rc<Row>>> {
2189        let table_data = self
2190            .tables
2191            .get(table)
2192            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))?;
2193
2194        let index_data = table_data.indexes.get(index).ok_or_else(|| {
2195            ExecutionError::IndexNotFound {
2196                table: table.into(),
2197                index: index.into(),
2198            }
2199        })?;
2200
2201        let result = index_data
2202            .key_to_rows
2203            .get(key)
2204            .map(|indices| indices.iter().map(|&i| Rc::clone(&table_data.rows[i])).collect())
2205            .unwrap_or_default();
2206
2207        Ok(result)
2208    }
2209
2210    fn get_column_count(&self, table: &str) -> ExecutionResult<usize> {
2211        self.tables
2212            .get(table)
2213            .map(|t| t.column_count)
2214            .ok_or_else(|| ExecutionError::TableNotFound(table.into()))
2215    }
2216}
2217
2218#[cfg(test)]
2219mod tests {
2220    use super::*;
2221    use crate::ast::JoinType;
2222    use alloc::boxed::Box;
2223    use alloc::vec;
2224
2225    fn create_test_data_source() -> InMemoryDataSource {
2226        let mut ds = InMemoryDataSource::new();
2227
2228        // Users table: id, name, dept_id
2229        let users = vec![
2230            Row::new(1, vec![Value::Int64(1), Value::String("Alice".into()), Value::Int64(10)]),
2231            Row::new(2, vec![Value::Int64(2), Value::String("Bob".into()), Value::Int64(20)]),
2232            Row::new(3, vec![Value::Int64(3), Value::String("Charlie".into()), Value::Int64(10)]),
2233        ];
2234        ds.add_table("users", users, 3);
2235        ds.create_index("users", "idx_id", 0).unwrap();
2236        ds.create_index("users", "idx_dept", 2).unwrap();
2237
2238        // Departments table: id, name
2239        let depts = vec![
2240            Row::new(10, vec![Value::Int64(10), Value::String("Engineering".into())]),
2241            Row::new(20, vec![Value::Int64(20), Value::String("Sales".into())]),
2242            Row::new(30, vec![Value::Int64(30), Value::String("Marketing".into())]),
2243        ];
2244        ds.add_table("departments", depts, 2);
2245        ds.create_index("departments", "idx_id", 0).unwrap();
2246
2247        ds
2248    }
2249
2250    #[test]
2251    fn test_table_scan() {
2252        let ds = create_test_data_source();
2253        let runner = PhysicalPlanRunner::new(&ds);
2254
2255        let plan = PhysicalPlan::table_scan("users");
2256        let result = runner.execute(&plan).unwrap();
2257
2258        assert_eq!(result.len(), 3);
2259        assert_eq!(result.tables(), &["users"]);
2260    }
2261
2262    #[test]
2263    fn test_index_scan() {
2264        let ds = create_test_data_source();
2265        let runner = PhysicalPlanRunner::new(&ds);
2266
2267        let plan = PhysicalPlan::IndexScan {
2268            table: "users".into(),
2269            index: "idx_id".into(),
2270            range_start: Some(Value::Int64(1)),
2271            range_end: Some(Value::Int64(2)),
2272            include_start: true,
2273            include_end: true,
2274            limit: None,
2275            offset: None,
2276            reverse: false,
2277        };
2278        let result = runner.execute(&plan).unwrap();
2279
2280        assert_eq!(result.len(), 2);
2281    }
2282
2283    #[test]
2284    fn test_index_get() {
2285        let ds = create_test_data_source();
2286        let runner = PhysicalPlanRunner::new(&ds);
2287
2288        let plan = PhysicalPlan::index_get("users", "idx_id", Value::Int64(2));
2289        let result = runner.execute(&plan).unwrap();
2290
2291        assert_eq!(result.len(), 1);
2292        assert_eq!(
2293            result.entries[0].get_field(1),
2294            Some(&Value::String("Bob".into()))
2295        );
2296    }
2297
2298    #[test]
2299    fn test_filter() {
2300        let ds = create_test_data_source();
2301        let runner = PhysicalPlanRunner::new(&ds);
2302
2303        // Filter: dept_id = 10
2304        let plan = PhysicalPlan::filter(
2305            PhysicalPlan::table_scan("users"),
2306            Expr::eq(
2307                Expr::column("users", "dept_id", 2),
2308                Expr::literal(Value::Int64(10)),
2309            ),
2310        );
2311        let result = runner.execute(&plan).unwrap();
2312
2313        assert_eq!(result.len(), 2); // Alice and Charlie
2314    }
2315
2316    #[test]
2317    fn test_project() {
2318        let ds = create_test_data_source();
2319        let runner = PhysicalPlanRunner::new(&ds);
2320
2321        // Project: id, name
2322        let plan = PhysicalPlan::project(
2323            PhysicalPlan::table_scan("users"),
2324            vec![
2325                Expr::column("users", "id", 0),
2326                Expr::column("users", "name", 1),
2327            ],
2328        );
2329        let result = runner.execute(&plan).unwrap();
2330
2331        assert_eq!(result.len(), 3);
2332        assert_eq!(result.entries[0].row.len(), 2);
2333    }
2334
2335    #[test]
2336    fn test_hash_join() {
2337        let ds = create_test_data_source();
2338        let runner = PhysicalPlanRunner::new(&ds);
2339
2340        // Join users and departments on dept_id = id
2341        let plan = PhysicalPlan::hash_join(
2342            PhysicalPlan::table_scan("users"),
2343            PhysicalPlan::table_scan("departments"),
2344            Expr::eq(
2345                Expr::column("users", "dept_id", 2),
2346                Expr::column("departments", "id", 0),
2347            ),
2348            JoinType::Inner,
2349        );
2350        let result = runner.execute(&plan).unwrap();
2351
2352        // Alice and Charlie match Engineering (10), Bob matches Sales (20)
2353        assert_eq!(result.len(), 3);
2354        assert_eq!(result.tables(), &["users", "departments"]);
2355    }
2356
2357    #[test]
2358    fn test_left_outer_join() {
2359        let mut ds = InMemoryDataSource::new();
2360
2361        let left = vec![
2362            Row::new(1, vec![Value::Int64(1)]),
2363            Row::new(2, vec![Value::Int64(2)]),
2364            Row::new(3, vec![Value::Int64(3)]),
2365        ];
2366        ds.add_table("left", left, 1);
2367
2368        let right = vec![Row::new(10, vec![Value::Int64(1)])];
2369        ds.add_table("right", right, 1);
2370
2371        let runner = PhysicalPlanRunner::new(&ds);
2372
2373        let plan = PhysicalPlan::hash_join(
2374            PhysicalPlan::table_scan("left"),
2375            PhysicalPlan::table_scan("right"),
2376            Expr::eq(
2377                Expr::column("left", "id", 0),
2378                Expr::column("right", "id", 0),
2379            ),
2380            JoinType::LeftOuter,
2381        );
2382        let result = runner.execute(&plan).unwrap();
2383
2384        // 1 match + 2 unmatched with nulls
2385        assert_eq!(result.len(), 3);
2386    }
2387
2388    #[test]
2389    fn test_nested_loop_join_with_predicate() {
2390        let mut ds = InMemoryDataSource::new();
2391
2392        let left = vec![
2393            Row::new(1, vec![Value::Int64(10)]),
2394            Row::new(2, vec![Value::Int64(20)]),
2395        ];
2396        ds.add_table("left", left, 1);
2397
2398        let right = vec![
2399            Row::new(10, vec![Value::Int64(5)]),
2400            Row::new(11, vec![Value::Int64(15)]),
2401            Row::new(12, vec![Value::Int64(25)]),
2402        ];
2403        ds.add_table("right", right, 1);
2404
2405        let runner = PhysicalPlanRunner::new(&ds);
2406
2407        // left.value > right.value
2408        let plan = PhysicalPlan::nested_loop_join(
2409            PhysicalPlan::table_scan("left"),
2410            PhysicalPlan::table_scan("right"),
2411            Expr::gt(
2412                Expr::column("left", "value", 0),
2413                Expr::column("right", "value", 0),
2414            ),
2415            JoinType::Inner,
2416        );
2417        let result = runner.execute(&plan).unwrap();
2418
2419        // 10 > 5, 20 > 5, 20 > 15 = 3 matches
2420        assert_eq!(result.len(), 3);
2421    }
2422
2423    #[test]
2424    fn test_cross_product() {
2425        let mut ds = InMemoryDataSource::new();
2426
2427        let left = vec![
2428            Row::new(1, vec![Value::Int64(1)]),
2429            Row::new(2, vec![Value::Int64(2)]),
2430        ];
2431        ds.add_table("left", left, 1);
2432
2433        let right = vec![
2434            Row::new(10, vec![Value::String("A".into())]),
2435            Row::new(11, vec![Value::String("B".into())]),
2436            Row::new(12, vec![Value::String("C".into())]),
2437        ];
2438        ds.add_table("right", right, 1);
2439
2440        let runner = PhysicalPlanRunner::new(&ds);
2441
2442        let plan = PhysicalPlan::CrossProduct {
2443            left: Box::new(PhysicalPlan::table_scan("left")),
2444            right: Box::new(PhysicalPlan::table_scan("right")),
2445        };
2446        let result = runner.execute(&plan).unwrap();
2447
2448        // 2 * 3 = 6
2449        assert_eq!(result.len(), 6);
2450    }
2451
2452    #[test]
2453    fn test_sort() {
2454        let ds = create_test_data_source();
2455        let runner = PhysicalPlanRunner::new(&ds);
2456
2457        let plan = PhysicalPlan::sort(
2458            PhysicalPlan::table_scan("users"),
2459            vec![(Expr::column("users", "name", 1), SortOrder::Asc)],
2460        );
2461        let result = runner.execute(&plan).unwrap();
2462
2463        assert_eq!(result.len(), 3);
2464        // Should be sorted: Alice, Bob, Charlie
2465        assert_eq!(
2466            result.entries[0].get_field(1),
2467            Some(&Value::String("Alice".into()))
2468        );
2469        assert_eq!(
2470            result.entries[1].get_field(1),
2471            Some(&Value::String("Bob".into()))
2472        );
2473        assert_eq!(
2474            result.entries[2].get_field(1),
2475            Some(&Value::String("Charlie".into()))
2476        );
2477    }
2478
2479    #[test]
2480    fn test_limit() {
2481        let ds = create_test_data_source();
2482        let runner = PhysicalPlanRunner::new(&ds);
2483
2484        let plan = PhysicalPlan::limit(PhysicalPlan::table_scan("users"), 2, 1);
2485        let result = runner.execute(&plan).unwrap();
2486
2487        assert_eq!(result.len(), 2);
2488    }
2489
2490    #[test]
2491    fn test_aggregate_count() {
2492        let ds = create_test_data_source();
2493        let runner = PhysicalPlanRunner::new(&ds);
2494
2495        let plan = PhysicalPlan::hash_aggregate(
2496            PhysicalPlan::table_scan("users"),
2497            vec![],
2498            vec![(AggregateFunc::Count, Expr::column("users", "id", 0))],
2499        );
2500        let result = runner.execute(&plan).unwrap();
2501
2502        assert_eq!(result.len(), 1);
2503        assert_eq!(result.entries[0].get_field(0), Some(&Value::Int64(3)));
2504    }
2505
2506    #[test]
2507    fn test_aggregate_group_by() {
2508        let ds = create_test_data_source();
2509        let runner = PhysicalPlanRunner::new(&ds);
2510
2511        // COUNT(*) GROUP BY dept_id
2512        let plan = PhysicalPlan::hash_aggregate(
2513            PhysicalPlan::table_scan("users"),
2514            vec![Expr::column("users", "dept_id", 2)],
2515            vec![(AggregateFunc::Count, Expr::column("users", "id", 0))],
2516        );
2517        let result = runner.execute(&plan).unwrap();
2518
2519        // Two groups: dept_id=10 (2 users), dept_id=20 (1 user)
2520        assert_eq!(result.len(), 2);
2521    }
2522
2523    #[test]
2524    fn test_complex_query() {
2525        let ds = create_test_data_source();
2526        let runner = PhysicalPlanRunner::new(&ds);
2527
2528        // SELECT name FROM users WHERE dept_id = 10 ORDER BY name LIMIT 1
2529        let plan = PhysicalPlan::limit(
2530            PhysicalPlan::sort(
2531                PhysicalPlan::project(
2532                    PhysicalPlan::filter(
2533                        PhysicalPlan::table_scan("users"),
2534                        Expr::eq(
2535                            Expr::column("users", "dept_id", 2),
2536                            Expr::literal(Value::Int64(10)),
2537                        ),
2538                    ),
2539                    vec![Expr::column("users", "name", 1)],
2540                ),
2541                vec![(Expr::column("users", "name", 0), SortOrder::Asc)],
2542            ),
2543            1,
2544            0,
2545        );
2546        let result = runner.execute(&plan).unwrap();
2547
2548        assert_eq!(result.len(), 1);
2549        assert_eq!(
2550            result.entries[0].get_field(0),
2551            Some(&Value::String("Alice".into()))
2552        );
2553    }
2554
2555    #[test]
2556    fn test_empty_result() {
2557        let ds = create_test_data_source();
2558        let runner = PhysicalPlanRunner::new(&ds);
2559
2560        let plan = PhysicalPlan::Empty;
2561        let result = runner.execute(&plan).unwrap();
2562
2563        assert!(result.is_empty());
2564    }
2565
2566    #[test]
2567    fn test_noop() {
2568        let ds = create_test_data_source();
2569        let runner = PhysicalPlanRunner::new(&ds);
2570
2571        let plan = PhysicalPlan::NoOp {
2572            input: Box::new(PhysicalPlan::table_scan("users")),
2573        };
2574        let result = runner.execute(&plan).unwrap();
2575
2576        assert_eq!(result.len(), 3);
2577    }
2578
2579    #[test]
2580    fn test_expression_evaluation() {
2581        let ds = create_test_data_source();
2582        let runner = PhysicalPlanRunner::new(&ds);
2583
2584        // Test arithmetic in projection
2585        let plan = PhysicalPlan::project(
2586            PhysicalPlan::table_scan("users"),
2587            vec![Expr::BinaryOp {
2588                left: Box::new(Expr::column("users", "id", 0)),
2589                op: BinaryOp::Mul,
2590                right: Box::new(Expr::literal(Value::Int64(10))),
2591            }],
2592        );
2593        let result = runner.execute(&plan).unwrap();
2594
2595        assert_eq!(result.entries[0].get_field(0), Some(&Value::Int64(10)));
2596        assert_eq!(result.entries[1].get_field(0), Some(&Value::Int64(20)));
2597        assert_eq!(result.entries[2].get_field(0), Some(&Value::Int64(30)));
2598    }
2599
2600    #[test]
2601    fn test_like_pattern() {
2602        let mut ds = InMemoryDataSource::new();
2603
2604        let data = vec![
2605            Row::new(1, vec![Value::String("Alice".into())]),
2606            Row::new(2, vec![Value::String("Bob".into())]),
2607            Row::new(3, vec![Value::String("Charlie".into())]),
2608            Row::new(4, vec![Value::String("Alex".into())]),
2609        ];
2610        ds.add_table("names", data, 1);
2611
2612        let runner = PhysicalPlanRunner::new(&ds);
2613
2614        // Filter: name LIKE 'Al%'
2615        let plan = PhysicalPlan::filter(
2616            PhysicalPlan::table_scan("names"),
2617            Expr::Like {
2618                expr: Box::new(Expr::column("names", "name", 0)),
2619                pattern: "Al%".into(),
2620            },
2621        );
2622        let result = runner.execute(&plan).unwrap();
2623
2624        assert_eq!(result.len(), 2); // Alice and Alex
2625    }
2626
2627    #[test]
2628    fn test_between() {
2629        let mut ds = InMemoryDataSource::new();
2630
2631        let data = vec![
2632            Row::new(1, vec![Value::Int64(5)]),
2633            Row::new(2, vec![Value::Int64(10)]),
2634            Row::new(3, vec![Value::Int64(15)]),
2635            Row::new(4, vec![Value::Int64(20)]),
2636        ];
2637        ds.add_table("numbers", data, 1);
2638
2639        let runner = PhysicalPlanRunner::new(&ds);
2640
2641        // Filter: value BETWEEN 10 AND 15
2642        let plan = PhysicalPlan::filter(
2643            PhysicalPlan::table_scan("numbers"),
2644            Expr::Between {
2645                expr: Box::new(Expr::column("numbers", "value", 0)),
2646                low: Box::new(Expr::literal(Value::Int64(10))),
2647                high: Box::new(Expr::literal(Value::Int64(15))),
2648            },
2649        );
2650        let result = runner.execute(&plan).unwrap();
2651
2652        assert_eq!(result.len(), 2); // 10 and 15
2653    }
2654
2655    #[test]
2656    fn test_in_list() {
2657        let ds = create_test_data_source();
2658        let runner = PhysicalPlanRunner::new(&ds);
2659
2660        // Filter: id IN (1, 3)
2661        let plan = PhysicalPlan::filter(
2662            PhysicalPlan::table_scan("users"),
2663            Expr::In {
2664                expr: Box::new(Expr::column("users", "id", 0)),
2665                list: vec![
2666                    Expr::literal(Value::Int64(1)),
2667                    Expr::literal(Value::Int64(3)),
2668                ],
2669            },
2670        );
2671        let result = runner.execute(&plan).unwrap();
2672
2673        assert_eq!(result.len(), 2); // Alice and Charlie
2674    }
2675
2676    #[test]
2677    fn test_not_in_list() {
2678        let ds = create_test_data_source();
2679        let runner = PhysicalPlanRunner::new(&ds);
2680
2681        // Filter: id NOT IN (1, 3)
2682        let plan = PhysicalPlan::filter(
2683            PhysicalPlan::table_scan("users"),
2684            Expr::NotIn {
2685                expr: Box::new(Expr::column("users", "id", 0)),
2686                list: vec![
2687                    Expr::literal(Value::Int64(1)),
2688                    Expr::literal(Value::Int64(3)),
2689                ],
2690            },
2691        );
2692        let result = runner.execute(&plan).unwrap();
2693
2694        assert_eq!(result.len(), 1); // Only Bob (id=2)
2695    }
2696
2697    #[test]
2698    fn test_not_between() {
2699        let ds = create_test_data_source();
2700        let runner = PhysicalPlanRunner::new(&ds);
2701
2702        // Filter: dept_id NOT BETWEEN 15 AND 25 (only dept_id=10 passes)
2703        let plan = PhysicalPlan::filter(
2704            PhysicalPlan::table_scan("users"),
2705            Expr::NotBetween {
2706                expr: Box::new(Expr::column("users", "dept_id", 2)),
2707                low: Box::new(Expr::literal(Value::Int64(15))),
2708                high: Box::new(Expr::literal(Value::Int64(25))),
2709            },
2710        );
2711        let result = runner.execute(&plan).unwrap();
2712
2713        assert_eq!(result.len(), 2); // Alice and Charlie (dept_id=10)
2714    }
2715
2716    #[test]
2717    fn test_not_like() {
2718        let ds = create_test_data_source();
2719        let runner = PhysicalPlanRunner::new(&ds);
2720
2721        // Filter: name NOT LIKE 'A%'
2722        let plan = PhysicalPlan::filter(
2723            PhysicalPlan::table_scan("users"),
2724            Expr::NotLike {
2725                expr: Box::new(Expr::column("users", "name", 1)),
2726                pattern: "A%".into(),
2727            },
2728        );
2729        let result = runner.execute(&plan).unwrap();
2730
2731        assert_eq!(result.len(), 2); // Bob and Charlie
2732    }
2733
2734    #[test]
2735    fn test_regex_match() {
2736        let ds = create_test_data_source();
2737        let runner = PhysicalPlanRunner::new(&ds);
2738
2739        // Filter: name MATCH '^[AB].*'
2740        let plan = PhysicalPlan::filter(
2741            PhysicalPlan::table_scan("users"),
2742            Expr::Match {
2743                expr: Box::new(Expr::column("users", "name", 1)),
2744                pattern: "^[AB].*".into(),
2745            },
2746        );
2747        let result = runner.execute(&plan).unwrap();
2748
2749        assert_eq!(result.len(), 2); // Alice and Bob
2750    }
2751
2752    #[test]
2753    fn test_regex_match_digit() {
2754        let mut ds = InMemoryDataSource::new();
2755        let data = vec![
2756            Row::new(1, vec![Value::String("abc123".into())]),
2757            Row::new(2, vec![Value::String("xyz".into())]),
2758            Row::new(3, vec![Value::String("test456def".into())]),
2759        ];
2760        ds.add_table("data", data, 1);
2761
2762        let runner = PhysicalPlanRunner::new(&ds);
2763
2764        // Filter: col MATCH '\d+'
2765        let plan = PhysicalPlan::filter(
2766            PhysicalPlan::table_scan("data"),
2767            Expr::Match {
2768                expr: Box::new(Expr::column("data", "col", 0)),
2769                pattern: "\\d+".into(),
2770            },
2771        );
2772        let result = runner.execute(&plan).unwrap();
2773
2774        assert_eq!(result.len(), 2); // abc123 and test456def
2775    }
2776
2777    #[test]
2778    fn test_not_regex_match() {
2779        let ds = create_test_data_source();
2780        let runner = PhysicalPlanRunner::new(&ds);
2781
2782        // Filter: name NOT MATCH '^[AB].*'
2783        let plan = PhysicalPlan::filter(
2784            PhysicalPlan::table_scan("users"),
2785            Expr::NotMatch {
2786                expr: Box::new(Expr::column("users", "name", 1)),
2787                pattern: "^[AB].*".into(),
2788            },
2789        );
2790        let result = runner.execute(&plan).unwrap();
2791
2792        assert_eq!(result.len(), 1); // Only Charlie
2793    }
2794
2795    #[test]
2796    fn test_null_handling() {
2797        let mut ds = InMemoryDataSource::new();
2798
2799        let data = vec![
2800            Row::new(1, vec![Value::Int64(1), Value::String("A".into())]),
2801            Row::new(2, vec![Value::Int64(2), Value::Null]),
2802            Row::new(3, vec![Value::Null, Value::String("C".into())]),
2803        ];
2804        ds.add_table("data", data, 2);
2805
2806        let runner = PhysicalPlanRunner::new(&ds);
2807
2808        // Filter: col1 IS NOT NULL
2809        let plan = PhysicalPlan::filter(
2810            PhysicalPlan::table_scan("data"),
2811            Expr::is_not_null(Expr::column("data", "col1", 0)),
2812        );
2813        let result = runner.execute(&plan).unwrap();
2814
2815        assert_eq!(result.len(), 2);
2816    }
2817
2818    #[test]
2819    fn test_index_scan_reverse() {
2820        let mut ds = InMemoryDataSource::new();
2821
2822        // Create table with scores: 10, 20, 30, 40, 50
2823        let data = vec![
2824            Row::new(1, vec![Value::Int64(10)]),
2825            Row::new(2, vec![Value::Int64(20)]),
2826            Row::new(3, vec![Value::Int64(30)]),
2827            Row::new(4, vec![Value::Int64(40)]),
2828            Row::new(5, vec![Value::Int64(50)]),
2829        ];
2830        ds.add_table("scores", data, 1);
2831        ds.create_index("scores", "idx_score", 0).unwrap();
2832
2833        let runner = PhysicalPlanRunner::new(&ds);
2834
2835        // Forward scan with limit 3: should get 10, 20, 30
2836        let plan_forward = PhysicalPlan::IndexScan {
2837            table: "scores".into(),
2838            index: "idx_score".into(),
2839            range_start: None,
2840            range_end: None,
2841            include_start: true,
2842            include_end: true,
2843            limit: Some(3),
2844            offset: None,
2845            reverse: false,
2846        };
2847        let result_forward = runner.execute(&plan_forward).unwrap();
2848        assert_eq!(result_forward.len(), 3);
2849        assert_eq!(result_forward.entries[0].get_field(0), Some(&Value::Int64(10)));
2850        assert_eq!(result_forward.entries[1].get_field(0), Some(&Value::Int64(20)));
2851        assert_eq!(result_forward.entries[2].get_field(0), Some(&Value::Int64(30)));
2852
2853        // Reverse scan with limit 3: should get 50, 40, 30
2854        let plan_reverse = PhysicalPlan::IndexScan {
2855            table: "scores".into(),
2856            index: "idx_score".into(),
2857            range_start: None,
2858            range_end: None,
2859            include_start: true,
2860            include_end: true,
2861            limit: Some(3),
2862            offset: None,
2863            reverse: true,
2864        };
2865        let result_reverse = runner.execute(&plan_reverse).unwrap();
2866        assert_eq!(result_reverse.len(), 3);
2867        assert_eq!(result_reverse.entries[0].get_field(0), Some(&Value::Int64(50)));
2868        assert_eq!(result_reverse.entries[1].get_field(0), Some(&Value::Int64(40)));
2869        assert_eq!(result_reverse.entries[2].get_field(0), Some(&Value::Int64(30)));
2870    }
2871
2872    #[test]
2873    fn test_index_scan_reverse_with_offset() {
2874        let mut ds = InMemoryDataSource::new();
2875
2876        // Create table with scores: 10, 20, 30, 40, 50
2877        let data = vec![
2878            Row::new(1, vec![Value::Int64(10)]),
2879            Row::new(2, vec![Value::Int64(20)]),
2880            Row::new(3, vec![Value::Int64(30)]),
2881            Row::new(4, vec![Value::Int64(40)]),
2882            Row::new(5, vec![Value::Int64(50)]),
2883        ];
2884        ds.add_table("scores", data, 1);
2885        ds.create_index("scores", "idx_score", 0).unwrap();
2886
2887        let runner = PhysicalPlanRunner::new(&ds);
2888
2889        // Reverse scan with offset 1, limit 2: skip 50, get 40, 30
2890        let plan = PhysicalPlan::IndexScan {
2891            table: "scores".into(),
2892            index: "idx_score".into(),
2893            range_start: None,
2894            range_end: None,
2895            include_start: true,
2896            include_end: true,
2897            limit: Some(2),
2898            offset: Some(1),
2899            reverse: true,
2900        };
2901        let result = runner.execute(&plan).unwrap();
2902        assert_eq!(result.len(), 2);
2903        assert_eq!(result.entries[0].get_field(0), Some(&Value::Int64(40)));
2904        assert_eq!(result.entries[1].get_field(0), Some(&Value::Int64(30)));
2905    }
2906}