Skip to main content

hematite/query/
executor.rs

1//! Query execution.
2//!
3//! The executor turns a planned access path into concrete catalog operations.
4//!
5//! ```text
6//! parsed statement
7//!      +
8//! chosen access path
9//!      |
10//!      v
11//!   executor node
12//!      |
13//!      +--> table cursor scan
14//!      +--> rowid lookup
15//!      +--> PK index -> rowid -> table lookup
16//!      +--> secondary index -> rowid set -> table lookup
17//! ```
18//!
19//! This layer should stay storage-agnostic at the page level. If a change here requires knowledge
20//! of B-tree node layout or pager internals, the boundary below catalog has started to leak.
21
22use crate::catalog::column::{collation_is_nocase, pad_text_to_char_length};
23use crate::catalog::table::{
24    CheckConstraint, ForeignKeyAction as CatalogForeignKeyAction, ForeignKeyConstraint,
25};
26use crate::catalog::StoredRow;
27use crate::catalog::{
28    Column, DataType, DateTimeValue, DateValue, DecimalValue, IntervalDaySecondValue,
29    IntervalYearMonthValue, Table, TimeValue, TimeWithTimeZoneValue, Value,
30};
31use crate::error::{HematiteError, Result};
32use crate::parser::ast::*;
33use crate::query::lowering::{lower_literal_value, lower_type_name, raise_literal_value};
34use crate::query::plan::{ExecutionProgram, QueryPlan, SelectAccessPath};
35use crate::query::predicate::extract_literal_equalities;
36pub use crate::query::runtime::{ExecutionContext, MutationEvent, QueryExecutor, QueryResult};
37use crate::query::validation::{
38    projected_column_names, validate_column_reference, validate_statement,
39};
40use crate::query::QueryPlanner;
41use std::cmp::Ordering;
42use std::collections::HashMap;
43
44impl QueryPlan {
45    pub fn into_executor(self) -> Box<dyn QueryExecutor> {
46        build_executor(self.program)
47    }
48}
49
50pub fn build_executor(program: ExecutionProgram) -> Box<dyn QueryExecutor> {
51    match program {
52        ExecutionProgram::Select {
53            statement,
54            access_path,
55        } => Box::new(SelectExecutor::new(statement, access_path)),
56        ExecutionProgram::Insert { statement } => Box::new(InsertExecutor::new(statement)),
57        ExecutionProgram::Update {
58            statement,
59            access_path,
60        } => Box::new(UpdateExecutor::new(statement, access_path)),
61        ExecutionProgram::Delete {
62            statement,
63            access_path,
64        } => Box::new(DeleteExecutor::new(statement, access_path)),
65        ExecutionProgram::Create { statement } => Box::new(CreateExecutor::new(statement)),
66        ExecutionProgram::CreateIndex { statement } => {
67            Box::new(CreateIndexExecutor::new(statement))
68        }
69        ExecutionProgram::Alter { statement } => Box::new(AlterExecutor::new(statement)),
70        ExecutionProgram::Drop { statement } => Box::new(DropExecutor::new(statement)),
71        ExecutionProgram::DropIndex { statement } => Box::new(DropIndexExecutor::new(statement)),
72    }
73}
74
75#[derive(Debug, Clone)]
76pub struct SelectExecutor {
77    pub statement: SelectStatement,
78    pub access_path: SelectAccessPath,
79    outer_scopes: Vec<CorrelatedScope>,
80    materialized_ctes: HashMap<String, QueryResult>,
81}
82
83#[derive(Debug, Clone)]
84struct ResolvedSource {
85    name: String,
86    columns: Vec<String>,
87    column_types: Vec<DataType>,
88    column_collations: Vec<Option<String>>,
89    alias: Option<String>,
90    offset: usize,
91}
92
93impl ResolvedSource {
94    fn width(&self) -> usize {
95        self.columns.len()
96    }
97}
98
99#[derive(Debug, Clone, Copy)]
100struct TextComparisonContext {
101    trim_trailing_spaces: bool,
102    trim_trailing_zero_bytes: bool,
103    case_insensitive: bool,
104}
105
106#[derive(Debug, Clone)]
107enum NamedSourceKind {
108    BaseTable,
109    MaterializedCte(Vec<Vec<Value>>),
110    Cte(CommonTableExpression),
111}
112
113#[derive(Debug, Clone)]
114struct NamedSource {
115    source: ResolvedSource,
116    kind: NamedSourceKind,
117}
118
119#[derive(Debug, Clone)]
120struct GroupedRow {
121    projected: Vec<Value>,
122    source_rows: Vec<Vec<Value>>,
123}
124
125#[derive(Debug, Clone)]
126struct CorrelatedScope {
127    sources: Vec<ResolvedSource>,
128    row: Vec<Value>,
129}
130
131type SubqueryCache = HashMap<usize, QueryResult>;
132
133fn evaluate_case_expression<FBool, FExpr>(
134    branches: &[CaseWhenClause],
135    else_expr: Option<&Expression>,
136    mut eval_bool: FBool,
137    mut eval_expr: FExpr,
138) -> Result<Value>
139where
140    FBool: FnMut(&Expression) -> Result<Option<bool>>,
141    FExpr: FnMut(&Expression) -> Result<Value>,
142{
143    for branch in branches {
144        match eval_bool(&branch.condition)? {
145            Some(true) => return eval_expr(&branch.result),
146            Some(false) | None => {}
147        }
148    }
149
150    match else_expr {
151        Some(else_expr) => eval_expr(else_expr),
152        None => Ok(Value::Null),
153    }
154}
155
156fn evaluate_expression_list<FExpr>(
157    expressions: &[Expression],
158    mut eval_expr: FExpr,
159) -> Result<Vec<Value>>
160where
161    FExpr: FnMut(&Expression) -> Result<Value>,
162{
163    let mut values = Vec::with_capacity(expressions.len());
164    for expr in expressions {
165        values.push(eval_expr(expr)?);
166    }
167    Ok(values)
168}
169
170fn evaluate_scalar_function_call<FExpr>(
171    function: ScalarFunction,
172    args: &[Expression],
173    eval_expr: FExpr,
174) -> Result<Value>
175where
176    FExpr: FnMut(&Expression) -> Result<Value>,
177{
178    evaluate_scalar_function(function, evaluate_expression_list(args, eval_expr)?)
179}
180
181fn evaluate_in_list_predicate<FExpr>(
182    probe_expr: &Expression,
183    candidates: &[Expression],
184    is_not: bool,
185    text_context: Option<TextComparisonContext>,
186    mut eval_expr: FExpr,
187) -> Result<Option<bool>>
188where
189    FExpr: FnMut(&Expression) -> Result<Value>,
190{
191    let probe = eval_expr(probe_expr)?;
192    let candidates = evaluate_expression_list(candidates, eval_expr)?;
193    Ok(evaluate_in_candidates(
194        probe,
195        candidates,
196        is_not,
197        text_context,
198    ))
199}
200
201fn evaluate_between_predicate<FExpr>(
202    expr: &Expression,
203    lower: &Expression,
204    upper: &Expression,
205    is_not: bool,
206    text_context: Option<TextComparisonContext>,
207    mut eval_expr: FExpr,
208) -> Result<Option<bool>>
209where
210    FExpr: FnMut(&Expression) -> Result<Value>,
211{
212    Ok(evaluate_between_values(
213        eval_expr(expr)?,
214        eval_expr(lower)?,
215        eval_expr(upper)?,
216        is_not,
217        text_context,
218    ))
219}
220
221fn evaluate_like_predicate<FExpr>(
222    expr: &Expression,
223    pattern: &Expression,
224    is_not: bool,
225    text_context: Option<TextComparisonContext>,
226    mut eval_expr: FExpr,
227) -> Result<Option<bool>>
228where
229    FExpr: FnMut(&Expression) -> Result<Value>,
230{
231    Ok(evaluate_like_values(
232        eval_expr(expr)?,
233        eval_expr(pattern)?,
234        is_not,
235        text_context,
236    ))
237}
238
239fn conditions_match_with<FEval>(conditions: &[Condition], mut eval_condition: FEval) -> Result<bool>
240where
241    FEval: FnMut(&Condition) -> Result<Option<bool>>,
242{
243    for condition in conditions {
244        if eval_condition(condition)? != Some(true) {
245            return Ok(false);
246        }
247    }
248    Ok(true)
249}
250
251impl SelectExecutor {
252    pub fn new(statement: SelectStatement, access_path: SelectAccessPath) -> Self {
253        Self {
254            statement,
255            access_path,
256            outer_scopes: Vec::new(),
257            materialized_ctes: HashMap::new(),
258        }
259    }
260
261    fn with_outer_scope(mut self, sources: &[ResolvedSource], row: &[Value]) -> Self {
262        self.outer_scopes.push(CorrelatedScope {
263            sources: sources.to_vec(),
264            row: row.to_vec(),
265        });
266        self
267    }
268
269    fn cte_key(name: &str) -> String {
270        name.to_ascii_lowercase()
271    }
272
273    fn resolve_sources(&self, ctx: &ExecutionContext) -> Result<Vec<ResolvedSource>> {
274        let bindings = SelectStatement::collect_table_bindings(&self.statement.from);
275        let mut sources = Vec::with_capacity(bindings.len());
276        let mut offset = 0usize;
277
278        for binding in bindings {
279            sources.push(self.resolve_named_source(
280                ctx,
281                &binding.table_name,
282                binding.alias,
283                offset,
284            )?);
285            offset += sources.last().map(ResolvedSource::width).unwrap_or(0);
286        }
287
288        Ok(sources)
289    }
290
291    fn query_output_columns(
292        &self,
293        query: &SelectStatement,
294        ctx: &ExecutionContext,
295    ) -> Result<Vec<String>> {
296        projected_column_names(query, &ctx.catalog)
297    }
298
299    fn resolve_column_index(
300        &self,
301        sources: &[ResolvedSource],
302        column_reference: &str,
303    ) -> Result<Option<usize>> {
304        let (qualifier, column_name) = SelectStatement::split_column_reference(column_reference);
305        let mut matches = Vec::new();
306
307        for source in sources {
308            if let Some(qualifier) = qualifier {
309                if qualifier != source.name
310                    && source
311                        .alias
312                        .as_deref()
313                        .is_none_or(|alias| alias != qualifier)
314                {
315                    continue;
316                }
317            }
318
319            if let Some(index) = source
320                .columns
321                .iter()
322                .position(|column| column == column_name)
323            {
324                matches.push(source.offset + index);
325            }
326        }
327
328        match matches.len() {
329            0 => Ok(None),
330            1 => Ok(matches.into_iter().next()),
331            _ => Err(HematiteError::ParseError(format!(
332                "Column reference '{}' is ambiguous",
333                column_reference
334            ))),
335        }
336    }
337
338    fn text_comparison_context_for_expression(
339        &self,
340        sources: &[ResolvedSource],
341        expr: &Expression,
342    ) -> Result<Option<TextComparisonContext>> {
343        let Expression::Column(column_reference) = expr else {
344            return Ok(None);
345        };
346        let Some(flat_index) = self.resolve_column_index(sources, column_reference)? else {
347            return Ok(None);
348        };
349        Ok(self
350            .source_column_metadata(sources, flat_index)
351            .map(|(data_type, collation)| TextComparisonContext {
352                trim_trailing_spaces: matches!(data_type, DataType::Char(_)),
353                trim_trailing_zero_bytes: matches!(data_type, DataType::Binary(_)),
354                case_insensitive: collation_is_nocase(collation.as_deref()),
355            }))
356    }
357
358    fn merged_text_comparison_context(
359        &self,
360        sources: &[ResolvedSource],
361        left: &Expression,
362        right: &Expression,
363    ) -> Result<Option<TextComparisonContext>> {
364        let left_context = self.text_comparison_context_for_expression(sources, left)?;
365        let right_context = self.text_comparison_context_for_expression(sources, right)?;
366        Ok(match (left_context, right_context) {
367            (Some(left), Some(right)) => Some(TextComparisonContext {
368                trim_trailing_spaces: left.trim_trailing_spaces || right.trim_trailing_spaces,
369                trim_trailing_zero_bytes: left.trim_trailing_zero_bytes
370                    || right.trim_trailing_zero_bytes,
371                case_insensitive: left.case_insensitive || right.case_insensitive,
372            }),
373            (Some(context), None) | (None, Some(context)) => Some(context),
374            (None, None) => None,
375        })
376    }
377
378    fn source_column_metadata(
379        &self,
380        sources: &[ResolvedSource],
381        flat_index: usize,
382    ) -> Option<(DataType, Option<String>)> {
383        for source in sources {
384            if flat_index < source.offset {
385                continue;
386            }
387            let relative = flat_index - source.offset;
388            if relative < source.columns.len() {
389                return Some((
390                    source.column_types.get(relative)?.clone(),
391                    source.column_collations.get(relative)?.clone(),
392                ));
393            }
394        }
395        None
396    }
397
398    fn resolve_column_value(
399        &self,
400        sources: &[ResolvedSource],
401        column_reference: &str,
402        row: &[Value],
403    ) -> Result<Value> {
404        if let Some(index) = self.resolve_column_index(sources, column_reference)? {
405            return row.get(index).cloned().ok_or_else(|| {
406                HematiteError::ParseError(format!("Column '{}' not found", column_reference))
407            });
408        }
409
410        for scope in self.outer_scopes.iter().rev() {
411            if let Some(index) = self.resolve_column_index(&scope.sources, column_reference)? {
412                return scope.row.get(index).cloned().ok_or_else(|| {
413                    HematiteError::ParseError(format!("Column '{}' not found", column_reference))
414                });
415            }
416        }
417
418        Err(HematiteError::ParseError(format!(
419            "Column '{}' not found",
420            column_reference
421        )))
422    }
423
424    fn evaluate_expression(
425        &self,
426        ctx: &mut ExecutionContext<'_>,
427        cache: &mut SubqueryCache,
428        sources: &[ResolvedSource],
429        expr: &Expression,
430        row: &[Value],
431    ) -> Result<Value> {
432        match expr {
433            Expression::Literal(value) => Ok(lower_literal_value(value)),
434            Expression::IntervalLiteral { value, qualifier } => match qualifier {
435                IntervalQualifier::YearToMonth => Ok(Value::IntervalYearMonth(
436                    IntervalYearMonthValue::parse(value)?,
437                )),
438                IntervalQualifier::DayToSecond => Ok(Value::IntervalDaySecond(
439                    IntervalDaySecondValue::parse(value)?,
440                )),
441            },
442            Expression::Parameter(index) => Err(HematiteError::ParseError(format!(
443                "Unbound parameter {} reached execution",
444                index + 1
445            ))),
446            Expression::Cast { expr, target_type } => cast_value_to_type(
447                self.evaluate_expression(ctx, cache, sources, expr, row)?,
448                lower_type_name(target_type.clone()),
449            ),
450            Expression::Case {
451                branches,
452                else_expr,
453            } => {
454                for branch in branches {
455                    match self.evaluate_boolean_expression(
456                        ctx,
457                        cache,
458                        sources,
459                        &branch.condition,
460                        row,
461                    )? {
462                        Some(true) => {
463                            return self.evaluate_expression(
464                                ctx,
465                                cache,
466                                sources,
467                                &branch.result,
468                                row,
469                            )
470                        }
471                        Some(false) | None => {}
472                    }
473                }
474
475                match else_expr {
476                    Some(else_expr) => {
477                        self.evaluate_expression(ctx, cache, sources, else_expr, row)
478                    }
479                    None => Ok(Value::Null),
480                }
481            }
482            Expression::AggregateCall { .. } => Err(HematiteError::ParseError(
483                "Aggregate expressions can only be evaluated in grouped query contexts".to_string(),
484            )),
485            Expression::ScalarFunctionCall { function, args } => {
486                evaluate_scalar_function_call(*function, args, |expr| {
487                    self.evaluate_expression(ctx, cache, sources, expr, row)
488                })
489            }
490            Expression::ScalarSubquery(subquery) => {
491                self.execute_scalar_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))
492            }
493            Expression::Column(name) => self.resolve_column_value(sources, name, row),
494            Expression::UnaryMinus(expr) => {
495                negate_numeric_value(self.evaluate_expression(ctx, cache, sources, expr, row)?)
496            }
497            Expression::UnaryNot(_)
498            | Expression::Comparison { .. }
499            | Expression::InList { .. }
500            | Expression::InSubquery { .. }
501            | Expression::Between { .. }
502            | Expression::Like { .. }
503            | Expression::Exists { .. }
504            | Expression::NullCheck { .. }
505            | Expression::Logical { .. } => Ok(nullable_bool_to_value(
506                self.evaluate_boolean_expression(ctx, cache, sources, expr, row)?,
507            )),
508            Expression::Binary {
509                left,
510                operator,
511                right,
512            } => {
513                let left = self.evaluate_expression(ctx, cache, sources, left, row)?;
514                let right = self.evaluate_expression(ctx, cache, sources, right, row)?;
515                self.evaluate_arithmetic(operator, left, right)
516            }
517        }
518    }
519
520    fn evaluate_boolean_expression(
521        &self,
522        ctx: &mut ExecutionContext<'_>,
523        cache: &mut SubqueryCache,
524        sources: &[ResolvedSource],
525        expr: &Expression,
526        row: &[Value],
527    ) -> Result<Option<bool>> {
528        match expr {
529            Expression::Comparison {
530                left,
531                operator,
532                right,
533            } => {
534                let left_val = self.evaluate_expression(ctx, cache, sources, left, row)?;
535                let right_val = self.evaluate_expression(ctx, cache, sources, right, row)?;
536                let text_context = self.merged_text_comparison_context(sources, left, right)?;
537                Ok(self.compare_values(&left_val, operator, &right_val, text_context))
538            }
539            Expression::InList {
540                expr,
541                values,
542                is_not,
543            } => evaluate_in_list_predicate(expr, values, *is_not, None, |value_expr| {
544                self.evaluate_expression(ctx, cache, sources, value_expr, row)
545            }),
546            Expression::InSubquery {
547                expr,
548                subquery,
549                is_not,
550            } => {
551                let probe = self.evaluate_expression(ctx, cache, sources, expr, row)?;
552                let subquery_result =
553                    self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
554                let candidates = subquery_result
555                    .rows
556                    .into_iter()
557                    .map(|row| row.into_iter().next().unwrap_or(Value::Null))
558                    .collect::<Vec<_>>();
559                Ok(self.evaluate_in_candidates(probe, candidates, *is_not, None))
560            }
561            Expression::Between {
562                expr,
563                lower,
564                upper,
565                is_not,
566            } => evaluate_between_predicate(
567                expr,
568                lower,
569                upper,
570                *is_not,
571                self.text_comparison_context_for_expression(sources, expr)?,
572                |value_expr| self.evaluate_expression(ctx, cache, sources, value_expr, row),
573            ),
574            Expression::Like {
575                expr,
576                pattern,
577                is_not,
578            } => evaluate_like_predicate(
579                expr,
580                pattern,
581                *is_not,
582                self.text_comparison_context_for_expression(sources, expr)?,
583                |value_expr| self.evaluate_expression(ctx, cache, sources, value_expr, row),
584            ),
585            Expression::Exists { subquery, is_not } => {
586                let subquery_result =
587                    self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
588                let exists = !subquery_result.rows.is_empty();
589                Ok(Some(if *is_not { !exists } else { exists }))
590            }
591            Expression::NullCheck { expr, is_not } => {
592                let value = self.evaluate_expression(ctx, cache, sources, expr, row)?;
593                let is_null = value.is_null();
594                Ok(Some(if *is_not { !is_null } else { is_null }))
595            }
596            Expression::UnaryNot(expr) => Ok(self
597                .evaluate_boolean_expression(ctx, cache, sources, expr, row)?
598                .map(|value| !value)),
599            Expression::Logical {
600                left,
601                operator,
602                right,
603            } => {
604                let left_result =
605                    self.evaluate_boolean_expression(ctx, cache, sources, left, row)?;
606                let right_result =
607                    self.evaluate_boolean_expression(ctx, cache, sources, right, row)?;
608                match operator {
609                    LogicalOperator::And => Ok(self.logical_and(left_result, right_result)),
610                    LogicalOperator::Or => Ok(self.logical_or(left_result, right_result)),
611                }
612            }
613            _ => coerce_value_to_nullable_bool(
614                self.evaluate_expression(ctx, cache, sources, expr, row)?,
615                "Boolean expression",
616            ),
617        }
618    }
619
620    fn evaluate_arithmetic(
621        &self,
622        operator: &ArithmeticOperator,
623        left: Value,
624        right: Value,
625    ) -> Result<Value> {
626        evaluate_arithmetic_values(operator, left, right)
627    }
628
629    fn compare_values(
630        &self,
631        left_val: &Value,
632        operator: &ComparisonOperator,
633        right_val: &Value,
634        text_context: Option<TextComparisonContext>,
635    ) -> Option<bool> {
636        compare_condition_values(left_val, operator, right_val, text_context)
637    }
638
639    fn like_matches(pattern: &str, text: &str) -> bool {
640        fn matches(pattern: &[char], text: &[char]) -> bool {
641            if pattern.is_empty() {
642                return text.is_empty();
643            }
644
645            match pattern[0] {
646                '%' => {
647                    if matches(&pattern[1..], text) {
648                        return true;
649                    }
650                    for index in 0..text.len() {
651                        if matches(&pattern[1..], &text[index + 1..]) {
652                            return true;
653                        }
654                    }
655                    false
656                }
657                '_' => !text.is_empty() && matches(&pattern[1..], &text[1..]),
658                ch => !text.is_empty() && text[0] == ch && matches(&pattern[1..], &text[1..]),
659            }
660        }
661
662        let pattern_chars: Vec<char> = pattern.chars().collect();
663        let text_chars: Vec<char> = text.chars().collect();
664        matches(&pattern_chars, &text_chars)
665    }
666
667    fn logical_and(&self, left: Option<bool>, right: Option<bool>) -> Option<bool> {
668        logical_and_values(left, right)
669    }
670
671    fn logical_or(&self, left: Option<bool>, right: Option<bool>) -> Option<bool> {
672        logical_or_values(left, right)
673    }
674
675    fn evaluate_in_candidates(
676        &self,
677        probe: Value,
678        candidates: impl IntoIterator<Item = Value>,
679        is_not: bool,
680        text_context: Option<TextComparisonContext>,
681    ) -> Option<bool> {
682        evaluate_in_candidates(probe, candidates, is_not, text_context)
683    }
684
685    fn execute_subquery(
686        &self,
687        ctx: &mut ExecutionContext<'_>,
688        subquery: &SelectStatement,
689        current_sources: Option<&[ResolvedSource]>,
690        current_row: Option<&[Value]>,
691    ) -> Result<QueryResult> {
692        let effective_subquery = if let (Some(sources), Some(row)) = (current_sources, current_row)
693        {
694            self.bind_correlated_subquery(ctx, subquery, sources, row)?
695        } else {
696            subquery.clone()
697        };
698        let planner = QueryPlanner::new(ctx.catalog.clone())
699            .with_table_row_counts(current_table_row_counts(ctx.engine));
700        let plan = planner.plan(Statement::Select(effective_subquery))?;
701        match plan.program {
702            ExecutionProgram::Select {
703                statement,
704                access_path,
705            } => {
706                let mut executor = SelectExecutor::new(statement, access_path);
707                executor.outer_scopes = self.outer_scopes.clone();
708                executor.materialized_ctes = self.materialized_ctes.clone();
709                if let (Some(sources), Some(row)) = (current_sources, current_row) {
710                    executor = executor.with_outer_scope(sources, row);
711                }
712                executor.execute(ctx)
713            }
714            _ => Err(HematiteError::InternalError(
715                "Expected SELECT execution program for subquery".to_string(),
716            )),
717        }
718    }
719
720    fn bind_correlated_subquery(
721        &self,
722        ctx: &ExecutionContext<'_>,
723        subquery: &SelectStatement,
724        current_sources: &[ResolvedSource],
725        current_row: &[Value],
726    ) -> Result<SelectStatement> {
727        let mut bound = subquery.clone();
728        let mut scopes = self.outer_scopes.clone();
729        scopes.push(CorrelatedScope {
730            sources: current_sources.to_vec(),
731            row: current_row.to_vec(),
732        });
733        self.bind_select_outer_references(ctx, &mut bound, &scopes)?;
734        Ok(bound)
735    }
736
737    fn bind_select_outer_references(
738        &self,
739        ctx: &ExecutionContext<'_>,
740        statement: &mut SelectStatement,
741        scopes: &[CorrelatedScope],
742    ) -> Result<()> {
743        let local_from = statement.from.clone();
744        for item in &mut statement.columns {
745            match item {
746                SelectItem::Expression(expr) => {
747                    self.bind_expression_outer_references(ctx, &local_from, expr, scopes)?
748                }
749                SelectItem::Window { window, .. } => {
750                    for expr in &mut window.partition_by {
751                        self.bind_expression_outer_references(ctx, &local_from, expr, scopes)?;
752                    }
753                }
754                SelectItem::Wildcard
755                | SelectItem::Column(_)
756                | SelectItem::CountAll
757                | SelectItem::Aggregate { .. } => {}
758            }
759        }
760
761        if let Some(where_clause) = &mut statement.where_clause {
762            for condition in &mut where_clause.conditions {
763                self.bind_condition_outer_references(ctx, &local_from, condition, scopes)?;
764            }
765        }
766
767        for expr in &mut statement.group_by {
768            self.bind_expression_outer_references(ctx, &local_from, expr, scopes)?;
769        }
770
771        if let Some(having_clause) = &mut statement.having_clause {
772            for condition in &mut having_clause.conditions {
773                self.bind_condition_outer_references(ctx, &local_from, condition, scopes)?;
774            }
775        }
776
777        for cte in &mut statement.with_clause {
778            self.bind_select_outer_references(ctx, &mut cte.query, scopes)?;
779        }
780
781        if let Some(set_operation) = &mut statement.set_operation {
782            self.bind_select_outer_references(ctx, &mut set_operation.right, scopes)?;
783        }
784
785        Ok(())
786    }
787
788    fn bind_condition_outer_references(
789        &self,
790        ctx: &ExecutionContext<'_>,
791        from: &TableReference,
792        condition: &mut Condition,
793        scopes: &[CorrelatedScope],
794    ) -> Result<()> {
795        match condition {
796            Condition::Comparison { left, right, .. } => {
797                self.bind_expression_outer_references(ctx, from, left, scopes)?;
798                self.bind_expression_outer_references(ctx, from, right, scopes)?;
799            }
800            Condition::InList { expr, values, .. } => {
801                self.bind_expression_outer_references(ctx, from, expr, scopes)?;
802                for value in values {
803                    self.bind_expression_outer_references(ctx, from, value, scopes)?;
804                }
805            }
806            Condition::InSubquery { expr, subquery, .. } => {
807                self.bind_expression_outer_references(ctx, from, expr, scopes)?;
808                self.bind_select_outer_references(ctx, subquery, scopes)?;
809            }
810            Condition::Between {
811                expr, lower, upper, ..
812            } => {
813                self.bind_expression_outer_references(ctx, from, expr, scopes)?;
814                self.bind_expression_outer_references(ctx, from, lower, scopes)?;
815                self.bind_expression_outer_references(ctx, from, upper, scopes)?;
816            }
817            Condition::Like { expr, pattern, .. } => {
818                self.bind_expression_outer_references(ctx, from, expr, scopes)?;
819                self.bind_expression_outer_references(ctx, from, pattern, scopes)?;
820            }
821            Condition::Exists { subquery, .. } => {
822                self.bind_select_outer_references(ctx, subquery, scopes)?;
823            }
824            Condition::NullCheck { expr, .. } => {
825                self.bind_expression_outer_references(ctx, from, expr, scopes)?;
826            }
827            Condition::Not(inner) => {
828                self.bind_condition_outer_references(ctx, from, inner, scopes)?;
829            }
830            Condition::Logical { left, right, .. } => {
831                self.bind_condition_outer_references(ctx, from, left, scopes)?;
832                self.bind_condition_outer_references(ctx, from, right, scopes)?;
833            }
834        }
835
836        Ok(())
837    }
838
839    fn bind_expression_outer_references(
840        &self,
841        ctx: &ExecutionContext<'_>,
842        from: &TableReference,
843        expr: &mut Expression,
844        scopes: &[CorrelatedScope],
845    ) -> Result<()> {
846        match expr {
847            Expression::Column(name) => {
848                let local_scope = SelectStatement {
849                    with_clause: Vec::new(),
850                    distinct: false,
851                    columns: Vec::new(),
852                    column_aliases: Vec::new(),
853                    from: from.clone(),
854                    where_clause: None,
855                    group_by: Vec::new(),
856                    having_clause: None,
857                    order_by: Vec::new(),
858                    limit: None,
859                    offset: None,
860                    set_operation: None,
861                };
862                if validate_column_reference(&local_scope, name, &ctx.catalog, from).is_ok() {
863                    return Ok(());
864                }
865                if let Some(value) = self.lookup_outer_scope_value(scopes, name)? {
866                    *expr = Expression::Literal(raise_literal_value(&value));
867                }
868            }
869            Expression::Case {
870                branches,
871                else_expr,
872            } => {
873                for branch in branches {
874                    self.bind_expression_outer_references(
875                        ctx,
876                        from,
877                        &mut branch.condition,
878                        scopes,
879                    )?;
880                    self.bind_expression_outer_references(ctx, from, &mut branch.result, scopes)?;
881                }
882                if let Some(else_expr) = else_expr {
883                    self.bind_expression_outer_references(ctx, from, else_expr, scopes)?;
884                }
885            }
886            Expression::ScalarSubquery(subquery) => {
887                self.bind_select_outer_references(ctx, subquery, scopes)?;
888            }
889            Expression::ScalarFunctionCall { args, .. } => {
890                for arg in args {
891                    self.bind_expression_outer_references(ctx, from, arg, scopes)?;
892                }
893            }
894            Expression::Cast { expr, .. } => {
895                self.bind_expression_outer_references(ctx, from, expr, scopes)?;
896            }
897            Expression::UnaryMinus(inner) => {
898                self.bind_expression_outer_references(ctx, from, inner, scopes)?;
899            }
900            Expression::UnaryNot(inner) => {
901                self.bind_expression_outer_references(ctx, from, inner, scopes)?;
902            }
903            Expression::Binary { left, right, .. } => {
904                self.bind_expression_outer_references(ctx, from, left, scopes)?;
905                self.bind_expression_outer_references(ctx, from, right, scopes)?;
906            }
907            Expression::Comparison { left, right, .. } => {
908                self.bind_expression_outer_references(ctx, from, left, scopes)?;
909                self.bind_expression_outer_references(ctx, from, right, scopes)?;
910            }
911            Expression::InList { expr, values, .. } => {
912                self.bind_expression_outer_references(ctx, from, expr, scopes)?;
913                for value in values {
914                    self.bind_expression_outer_references(ctx, from, value, scopes)?;
915                }
916            }
917            Expression::InSubquery { expr, subquery, .. } => {
918                self.bind_expression_outer_references(ctx, from, expr, scopes)?;
919                self.bind_select_outer_references(ctx, subquery, scopes)?;
920            }
921            Expression::Between {
922                expr, lower, upper, ..
923            } => {
924                self.bind_expression_outer_references(ctx, from, expr, scopes)?;
925                self.bind_expression_outer_references(ctx, from, lower, scopes)?;
926                self.bind_expression_outer_references(ctx, from, upper, scopes)?;
927            }
928            Expression::Like { expr, pattern, .. } => {
929                self.bind_expression_outer_references(ctx, from, expr, scopes)?;
930                self.bind_expression_outer_references(ctx, from, pattern, scopes)?;
931            }
932            Expression::Exists { subquery, .. } => {
933                self.bind_select_outer_references(ctx, subquery, scopes)?;
934            }
935            Expression::NullCheck { expr, .. } => {
936                self.bind_expression_outer_references(ctx, from, expr, scopes)?;
937            }
938            Expression::Logical { left, right, .. } => {
939                self.bind_expression_outer_references(ctx, from, left, scopes)?;
940                self.bind_expression_outer_references(ctx, from, right, scopes)?;
941            }
942            Expression::AggregateCall { .. }
943            | Expression::Literal(_)
944            | Expression::IntervalLiteral { .. }
945            | Expression::Parameter(_) => {}
946        }
947
948        Ok(())
949    }
950
951    fn lookup_outer_scope_value(
952        &self,
953        scopes: &[CorrelatedScope],
954        column_reference: &str,
955    ) -> Result<Option<Value>> {
956        for scope in scopes.iter().rev() {
957            if let Some(index) = self.resolve_column_index(&scope.sources, column_reference)? {
958                return Ok(scope.row.get(index).cloned());
959            }
960        }
961
962        Ok(None)
963    }
964
965    fn execute_subquery_cached(
966        &self,
967        ctx: &mut ExecutionContext<'_>,
968        cache: &mut SubqueryCache,
969        subquery: &SelectStatement,
970        current_sources: Option<&[ResolvedSource]>,
971        current_row: Option<&[Value]>,
972    ) -> Result<QueryResult> {
973        if current_sources.is_some() && current_row.is_some() {
974            return self.execute_subquery(ctx, subquery, current_sources, current_row);
975        }
976
977        let key = subquery as *const SelectStatement as usize;
978        if let Some(result) = cache.get(&key) {
979            return Ok(result.clone());
980        }
981
982        let result = self.execute_subquery(ctx, subquery, None, None)?;
983        cache.insert(key, result.clone());
984        Ok(result)
985    }
986
987    fn execute_scalar_subquery_cached(
988        &self,
989        ctx: &mut ExecutionContext<'_>,
990        cache: &mut SubqueryCache,
991        subquery: &SelectStatement,
992        current_sources: Option<&[ResolvedSource]>,
993        current_row: Option<&[Value]>,
994    ) -> Result<Value> {
995        let result =
996            self.execute_subquery_cached(ctx, cache, subquery, current_sources, current_row)?;
997        if result.rows.len() > 1 {
998            return Err(HematiteError::ParseError(
999                "Scalar subquery returned more than one row".to_string(),
1000            ));
1001        }
1002        Ok(result
1003            .rows
1004            .into_iter()
1005            .next()
1006            .and_then(|row| row.into_iter().next())
1007            .unwrap_or(Value::Null))
1008    }
1009
1010    fn evaluate_condition(
1011        &self,
1012        ctx: &mut ExecutionContext<'_>,
1013        cache: &mut SubqueryCache,
1014        sources: &[ResolvedSource],
1015        condition: &Condition,
1016        row: &[Value],
1017    ) -> Result<Option<bool>> {
1018        match condition {
1019            Condition::Comparison {
1020                left,
1021                operator,
1022                right,
1023            } => {
1024                let left_val = self.evaluate_expression(ctx, cache, sources, left, row)?;
1025                let right_val = self.evaluate_expression(ctx, cache, sources, right, row)?;
1026                let text_context = self.merged_text_comparison_context(sources, left, right)?;
1027                Ok(self.compare_values(&left_val, operator, &right_val, text_context))
1028            }
1029            Condition::InList {
1030                expr,
1031                values,
1032                is_not,
1033            } => {
1034                let probe = self.evaluate_expression(ctx, cache, sources, expr, row)?;
1035                let candidates = values
1036                    .iter()
1037                    .map(|value_expr| {
1038                        self.evaluate_expression(ctx, cache, sources, value_expr, row)
1039                    })
1040                    .collect::<Result<Vec<_>>>()?;
1041                let text_context = self.text_comparison_context_for_expression(sources, expr)?;
1042                Ok(self.evaluate_in_candidates(probe, candidates, *is_not, text_context))
1043            }
1044            Condition::InSubquery {
1045                expr,
1046                subquery,
1047                is_not,
1048            } => {
1049                let probe = self.evaluate_expression(ctx, cache, sources, expr, row)?;
1050                let subquery_result =
1051                    self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
1052                let candidates = subquery_result
1053                    .rows
1054                    .into_iter()
1055                    .map(|row| row.into_iter().next().unwrap_or(Value::Null))
1056                    .collect::<Vec<_>>();
1057                let text_context = self.text_comparison_context_for_expression(sources, expr)?;
1058                Ok(self.evaluate_in_candidates(probe, candidates, *is_not, text_context))
1059            }
1060            Condition::Between {
1061                expr,
1062                lower,
1063                upper,
1064                is_not,
1065            } => {
1066                let value = self.evaluate_expression(ctx, cache, sources, expr, row)?;
1067                let lower_value = self.evaluate_expression(ctx, cache, sources, lower, row)?;
1068                let upper_value = self.evaluate_expression(ctx, cache, sources, upper, row)?;
1069
1070                if value.is_null() || lower_value.is_null() || upper_value.is_null() {
1071                    return Ok(None);
1072                }
1073
1074                let text_context = self.text_comparison_context_for_expression(sources, expr)?;
1075                let lower_ok = sql_partial_cmp(&value, &lower_value, text_context)
1076                    .map(|ordering| !ordering.is_lt());
1077                let upper_ok = sql_partial_cmp(&value, &upper_value, text_context)
1078                    .map(|ordering| !ordering.is_gt());
1079
1080                match (lower_ok, upper_ok) {
1081                    (Some(true), Some(true)) => Ok(Some(!is_not)),
1082                    (Some(_), Some(_)) => Ok(Some(*is_not)),
1083                    _ => Ok(None),
1084                }
1085            }
1086            Condition::Like {
1087                expr,
1088                pattern,
1089                is_not,
1090            } => {
1091                let value = self.evaluate_expression(ctx, cache, sources, expr, row)?;
1092                let pattern_value = self.evaluate_expression(ctx, cache, sources, pattern, row)?;
1093                let text_context = self.text_comparison_context_for_expression(sources, expr)?;
1094
1095                match (value, pattern_value) {
1096                    (Value::Text(text), Value::Text(pattern)) => {
1097                        let matched = like_matches_with_context(&pattern, &text, text_context);
1098                        Ok(Some(if *is_not { !matched } else { matched }))
1099                    }
1100                    (left, right) if left.is_null() || right.is_null() => Ok(None),
1101                    _ => Ok(None),
1102                }
1103            }
1104            Condition::Exists { subquery, is_not } => {
1105                let subquery_result =
1106                    self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
1107                let exists = !subquery_result.rows.is_empty();
1108                Ok(Some(if *is_not { !exists } else { exists }))
1109            }
1110            Condition::NullCheck { expr, is_not } => {
1111                let value = self.evaluate_expression(ctx, cache, sources, expr, row)?;
1112                let is_null = value.is_null();
1113                Ok(Some(if *is_not { !is_null } else { is_null }))
1114            }
1115            Condition::Not(condition) => Ok(self
1116                .evaluate_condition(ctx, cache, sources, condition, row)?
1117                .map(|value| !value)),
1118            Condition::Logical {
1119                left,
1120                operator,
1121                right,
1122            } => {
1123                let left_result = self.evaluate_condition(ctx, cache, sources, left, row)?;
1124                let right_result = self.evaluate_condition(ctx, cache, sources, right, row)?;
1125
1126                match operator {
1127                    LogicalOperator::And => Ok(self.logical_and(left_result, right_result)),
1128                    LogicalOperator::Or => Ok(self.logical_or(left_result, right_result)),
1129                }
1130            }
1131        }
1132    }
1133
1134    fn project_row(
1135        &self,
1136        ctx: &mut ExecutionContext<'_>,
1137        cache: &mut SubqueryCache,
1138        sources: &[ResolvedSource],
1139        row: &[Value],
1140    ) -> Result<Vec<Value>> {
1141        let mut projected = Vec::new();
1142
1143        for item in &self.statement.columns {
1144            match item {
1145                SelectItem::Wildcard => projected.extend(row.iter().cloned()),
1146                SelectItem::Column(name) => {
1147                    if let Some(index) = self.resolve_column_index(sources, name)? {
1148                        if index < row.len() {
1149                            projected.push(row[index].clone());
1150                        }
1151                    }
1152                }
1153                SelectItem::Expression(expr) => {
1154                    projected.push(self.evaluate_expression(ctx, cache, sources, expr, row)?);
1155                }
1156                SelectItem::CountAll => {}
1157                SelectItem::Aggregate { .. } => {}
1158                SelectItem::Window { .. } => {}
1159            }
1160        }
1161
1162        Ok(projected)
1163    }
1164
1165    fn get_column_names(&self, sources: &[ResolvedSource]) -> Vec<String> {
1166        let mut columns = Vec::new();
1167
1168        for (index, item) in self.statement.columns.iter().enumerate() {
1169            match item {
1170                SelectItem::Wildcard => {
1171                    for source in sources {
1172                        for column in &source.columns {
1173                            columns.push(column.clone());
1174                        }
1175                    }
1176                }
1177                _ => {
1178                    if let Some(name) = self.statement.output_name(index) {
1179                        columns.push(name);
1180                    }
1181                }
1182            }
1183        }
1184
1185        columns
1186    }
1187
1188    fn shifted_sources(
1189        &self,
1190        mut sources: Vec<ResolvedSource>,
1191        offset: usize,
1192    ) -> Vec<ResolvedSource> {
1193        for source in &mut sources {
1194            source.offset += offset;
1195        }
1196        sources
1197    }
1198
1199    fn total_source_width(&self, sources: &[ResolvedSource]) -> usize {
1200        sources.iter().map(ResolvedSource::width).sum()
1201    }
1202
1203    fn combine_join_rows(&self, left_row: &[Value], right_row: &[Value]) -> Vec<Value> {
1204        let mut combined = left_row.to_vec();
1205        combined.extend(right_row.iter().cloned());
1206        combined
1207    }
1208
1209    fn combine_left_row_with_nulls(&self, left_row: &[Value], right_width: usize) -> Vec<Value> {
1210        let mut combined = left_row.to_vec();
1211        combined.extend(std::iter::repeat_n(Value::Null, right_width));
1212        combined
1213    }
1214
1215    fn combine_nulls_with_right_row(&self, left_width: usize, right_row: &[Value]) -> Vec<Value> {
1216        let mut combined = Vec::with_capacity(left_width + right_row.len());
1217        combined.extend(std::iter::repeat_n(Value::Null, left_width));
1218        combined.extend(right_row.iter().cloned());
1219        combined
1220    }
1221
1222    fn join_outer_is_left(&self, left_rows: &[Vec<Value>], right_rows: &[Vec<Value>]) -> bool {
1223        left_rows.len() <= right_rows.len()
1224    }
1225
1226    fn materialize_join_sources(
1227        &mut self,
1228        ctx: &mut ExecutionContext,
1229        left: &TableReference,
1230        right: &TableReference,
1231    ) -> Result<(
1232        Vec<ResolvedSource>,
1233        Vec<Vec<Value>>,
1234        usize,
1235        Vec<Vec<Value>>,
1236        usize,
1237    )> {
1238        let (left_sources, left_rows) = self.materialize_reference(ctx, left)?;
1239        let left_width = self.total_source_width(&left_sources);
1240        let (right_sources, right_rows) = self.materialize_reference(ctx, right)?;
1241        let right_width = self.total_source_width(&right_sources);
1242        let mut sources = left_sources;
1243        sources.extend(self.shifted_sources(right_sources, left_width));
1244        Ok((sources, left_rows, left_width, right_rows, right_width))
1245    }
1246
1247    fn push_matching_join_rows(
1248        &self,
1249        ctx: &mut ExecutionContext,
1250        sources: &[ResolvedSource],
1251        left_rows: &[Vec<Value>],
1252        right_rows: &[Vec<Value>],
1253        on: Option<&Condition>,
1254        rows: &mut Vec<Vec<Value>>,
1255    ) -> Result<()> {
1256        let push_matches = |outer_rows: &[Vec<Value>],
1257                            inner_rows: &[Vec<Value>],
1258                            outer_is_left: bool,
1259                            rows: &mut Vec<Vec<Value>>| {
1260            for outer_row in outer_rows {
1261                for inner_row in inner_rows {
1262                    rows.push(if outer_is_left {
1263                        self.combine_join_rows(outer_row, inner_row)
1264                    } else {
1265                        self.combine_join_rows(inner_row, outer_row)
1266                    });
1267                }
1268            }
1269        };
1270
1271        if on.is_none() {
1272            if self.join_outer_is_left(left_rows, right_rows) {
1273                push_matches(left_rows, right_rows, true, rows);
1274            } else {
1275                push_matches(right_rows, left_rows, false, rows);
1276            }
1277            return Ok(());
1278        }
1279
1280        let predicate = on.expect("checked above");
1281        if self.join_outer_is_left(left_rows, right_rows) {
1282            self.push_join_condition_matches(
1283                ctx, sources, left_rows, right_rows, true, predicate, rows,
1284            )
1285        } else {
1286            self.push_join_condition_matches(
1287                ctx, sources, right_rows, left_rows, false, predicate, rows,
1288            )
1289        }
1290    }
1291
1292    fn push_join_condition_matches(
1293        &self,
1294        ctx: &mut ExecutionContext,
1295        sources: &[ResolvedSource],
1296        outer_rows: &[Vec<Value>],
1297        inner_rows: &[Vec<Value>],
1298        outer_is_left: bool,
1299        predicate: &Condition,
1300        rows: &mut Vec<Vec<Value>>,
1301    ) -> Result<()> {
1302        let mut subquery_cache = SubqueryCache::new();
1303        for outer_row in outer_rows {
1304            for inner_row in inner_rows {
1305                let combined = if outer_is_left {
1306                    self.combine_join_rows(outer_row, inner_row)
1307                } else {
1308                    self.combine_join_rows(inner_row, outer_row)
1309                };
1310                if self.evaluate_condition(
1311                    ctx,
1312                    &mut subquery_cache,
1313                    sources,
1314                    predicate,
1315                    &combined,
1316                )? == Some(true)
1317                {
1318                    rows.push(combined);
1319                }
1320            }
1321        }
1322        Ok(())
1323    }
1324
1325    fn resolve_named_source(
1326        &self,
1327        ctx: &ExecutionContext,
1328        table_name: &str,
1329        alias: Option<String>,
1330        offset: usize,
1331    ) -> Result<ResolvedSource> {
1332        Ok(self.named_source(ctx, table_name, alias, offset)?.source)
1333    }
1334
1335    fn named_source(
1336        &self,
1337        ctx: &ExecutionContext,
1338        table_name: &str,
1339        alias: Option<String>,
1340        offset: usize,
1341    ) -> Result<NamedSource> {
1342        if let Some(result) = self.materialized_ctes.get(&Self::cte_key(table_name)) {
1343            return Ok(NamedSource {
1344                source: ResolvedSource {
1345                    name: table_name.to_string(),
1346                    columns: result.columns.clone(),
1347                    column_types: vec![DataType::Text; result.columns.len()],
1348                    column_collations: vec![None; result.columns.len()],
1349                    alias,
1350                    offset,
1351                },
1352                kind: NamedSourceKind::MaterializedCte(result.rows.clone()),
1353            });
1354        }
1355
1356        if let Some(cte) = self.statement.lookup_cte(table_name) {
1357            let columns = self.query_output_columns(&cte.query, ctx)?;
1358            return Ok(NamedSource {
1359                source: ResolvedSource {
1360                    name: table_name.to_string(),
1361                    column_types: vec![DataType::Text; columns.len()],
1362                    column_collations: vec![None; columns.len()],
1363                    columns,
1364                    alias,
1365                    offset,
1366                },
1367                kind: NamedSourceKind::Cte(cte.clone()),
1368            });
1369        }
1370
1371        let table = ctx
1372            .catalog
1373            .get_table_by_name(table_name)
1374            .ok_or_else(|| table_not_found_parse_error(table_name))?;
1375        Ok(NamedSource {
1376            source: ResolvedSource {
1377                name: table.name.clone(),
1378                columns: table
1379                    .columns
1380                    .iter()
1381                    .map(|column| column.name.clone())
1382                    .collect(),
1383                column_types: table
1384                    .columns
1385                    .iter()
1386                    .map(|column| column.data_type.clone())
1387                    .collect(),
1388                column_collations: table
1389                    .columns
1390                    .iter()
1391                    .map(|column| column.collation.clone())
1392                    .collect(),
1393                alias,
1394                offset,
1395            },
1396            kind: NamedSourceKind::BaseTable,
1397        })
1398    }
1399
1400    fn materialize_named_source(
1401        &mut self,
1402        ctx: &mut ExecutionContext,
1403        table_name: &str,
1404        alias: Option<String>,
1405    ) -> Result<(ResolvedSource, Vec<Vec<Value>>)> {
1406        let named_source = self.named_source(ctx, table_name, alias, 0)?;
1407        let rows = match named_source.kind {
1408            NamedSourceKind::BaseTable => ctx.engine.read_from_table(table_name)?,
1409            NamedSourceKind::MaterializedCte(rows) => rows,
1410            NamedSourceKind::Cte(cte) => {
1411                let key = Self::cte_key(table_name);
1412                if let Some(result) = self.materialized_ctes.get(&key) {
1413                    result.rows.clone()
1414                } else {
1415                    self.materialize_cte(ctx, &cte)?.rows
1416                }
1417            }
1418        };
1419        Ok((named_source.source, rows))
1420    }
1421
1422    fn materialize_cte(
1423        &mut self,
1424        ctx: &mut ExecutionContext<'_>,
1425        cte: &CommonTableExpression,
1426    ) -> Result<QueryResult> {
1427        let key = Self::cte_key(&cte.name);
1428        if let Some(result) = self.materialized_ctes.get(&key) {
1429            return Ok(result.clone());
1430        }
1431
1432        let result = if cte.recursive {
1433            self.execute_recursive_cte(ctx, cte)?
1434        } else {
1435            self.execute_subquery(ctx, &cte.query, None, None)?
1436        };
1437        self.materialized_ctes.insert(key, result.clone());
1438        Ok(result)
1439    }
1440
1441    fn execute_recursive_cte(
1442        &mut self,
1443        ctx: &mut ExecutionContext<'_>,
1444        cte: &CommonTableExpression,
1445    ) -> Result<QueryResult> {
1446        const MAX_RECURSIVE_CTE_ITERATIONS: usize = 1024;
1447
1448        let mut anchor = (*cte.query).clone();
1449        let set_operation = anchor.set_operation.take().ok_or_else(|| {
1450            HematiteError::ParseError(format!(
1451                "Recursive CTE '{}' requires UNION or UNION ALL",
1452                cte.name
1453            ))
1454        })?;
1455
1456        let operator = set_operation.operator;
1457        let mut recursive_term = *set_operation.right;
1458        recursive_term.with_clause.push(CommonTableExpression {
1459            name: cte.name.clone(),
1460            recursive: false,
1461            query: Box::new(anchor.clone()),
1462        });
1463        let anchor_result = self.execute_subquery(ctx, &anchor, None, None)?;
1464        let columns = anchor_result.columns.clone();
1465        let mut rows = match operator {
1466            SetOperator::Union => deduplicate_rows(anchor_result.rows),
1467            SetOperator::UnionAll => anchor_result.rows,
1468            _ => {
1469                return Err(HematiteError::ParseError(format!(
1470                    "Recursive CTE '{}' requires UNION or UNION ALL",
1471                    cte.name
1472                )))
1473            }
1474        };
1475        let mut delta = rows.clone();
1476
1477        let key = Self::cte_key(&cte.name);
1478        let mut converged = false;
1479        for _ in 0..MAX_RECURSIVE_CTE_ITERATIONS {
1480            self.materialized_ctes.insert(
1481                key.clone(),
1482                QueryResult {
1483                    affected_rows: delta.len(),
1484                    columns: columns.clone(),
1485                    rows: delta.clone(),
1486                },
1487            );
1488
1489            let mut recursive_executor =
1490                SelectExecutor::new(recursive_term.clone(), SelectAccessPath::JoinScan);
1491            recursive_executor.outer_scopes = self.outer_scopes.clone();
1492            recursive_executor.materialized_ctes = self.materialized_ctes.clone();
1493            let next_rows = recursive_executor.execute_body(ctx)?.rows;
1494            if next_rows.is_empty() {
1495                converged = true;
1496                break;
1497            }
1498
1499            delta = match operator {
1500                SetOperator::Union => {
1501                    let mut unique_rows = Vec::new();
1502                    for row in next_rows {
1503                        if !rows.contains(&row) && !unique_rows.contains(&row) {
1504                            unique_rows.push(row);
1505                        }
1506                    }
1507                    unique_rows
1508                }
1509                SetOperator::UnionAll => next_rows,
1510                _ => Vec::new(),
1511            };
1512
1513            if delta.is_empty() {
1514                converged = true;
1515                break;
1516            }
1517            rows.extend(delta.clone());
1518        }
1519
1520        self.materialized_ctes.insert(
1521            key,
1522            QueryResult {
1523                affected_rows: rows.len(),
1524                columns: columns.clone(),
1525                rows: rows.clone(),
1526            },
1527        );
1528
1529        if !converged {
1530            return Err(HematiteError::ParseError(format!(
1531                "Recursive CTE '{}' exceeded the maximum recursion depth of {}",
1532                cte.name, MAX_RECURSIVE_CTE_ITERATIONS
1533            )));
1534        }
1535
1536        Ok(QueryResult {
1537            affected_rows: rows.len(),
1538            columns,
1539            rows,
1540        })
1541    }
1542
1543    fn materialize_reference(
1544        &mut self,
1545        ctx: &mut ExecutionContext,
1546        from: &TableReference,
1547    ) -> Result<(Vec<ResolvedSource>, Vec<Vec<Value>>)> {
1548        match from {
1549            TableReference::Table(table_name, alias) => self
1550                .materialize_named_source(ctx, table_name, alias.clone())
1551                .map(|(source, rows)| (vec![source], rows)),
1552            TableReference::Derived { subquery, alias } => {
1553                let result = self.execute_subquery(ctx, subquery, None, None)?;
1554                Ok((
1555                    vec![ResolvedSource {
1556                        name: alias.clone(),
1557                        columns: result.columns.clone(),
1558                        column_types: vec![DataType::Text; result.columns.len()],
1559                        column_collations: vec![None; result.columns.len()],
1560                        alias: None,
1561                        offset: 0,
1562                    }],
1563                    result.rows,
1564                ))
1565            }
1566            TableReference::CrossJoin(left, right) => {
1567                let (sources, left_rows, _, right_rows, _) =
1568                    self.materialize_join_sources(ctx, left, right)?;
1569                let mut rows = Vec::new();
1570                self.push_matching_join_rows(
1571                    ctx,
1572                    &sources,
1573                    &left_rows,
1574                    &right_rows,
1575                    None,
1576                    &mut rows,
1577                )?;
1578                Ok((sources, rows))
1579            }
1580            TableReference::InnerJoin { left, right, on } => {
1581                let (sources, left_rows, _, right_rows, _) =
1582                    self.materialize_join_sources(ctx, left, right)?;
1583                let mut rows = Vec::new();
1584                self.push_matching_join_rows(
1585                    ctx,
1586                    &sources,
1587                    &left_rows,
1588                    &right_rows,
1589                    Some(on),
1590                    &mut rows,
1591                )?;
1592                Ok((sources, rows))
1593            }
1594            TableReference::LeftJoin { left, right, on } => {
1595                let (sources, left_rows, _, right_rows, right_width) =
1596                    self.materialize_join_sources(ctx, left, right)?;
1597
1598                let mut rows = Vec::new();
1599                let mut subquery_cache = SubqueryCache::new();
1600                for left_row in &left_rows {
1601                    let mut matched = false;
1602                    for right_row in &right_rows {
1603                        let combined = self.combine_join_rows(left_row, right_row);
1604                        if self.evaluate_condition(
1605                            ctx,
1606                            &mut subquery_cache,
1607                            &sources,
1608                            on,
1609                            &combined,
1610                        )? == Some(true)
1611                        {
1612                            rows.push(combined);
1613                            matched = true;
1614                        }
1615                    }
1616
1617                    if !matched {
1618                        rows.push(self.combine_left_row_with_nulls(left_row, right_width));
1619                    }
1620                }
1621
1622                Ok((sources, rows))
1623            }
1624            TableReference::RightJoin { left, right, on } => {
1625                let (sources, left_rows, left_width, right_rows, _) =
1626                    self.materialize_join_sources(ctx, left, right)?;
1627
1628                let mut rows = Vec::new();
1629                let mut subquery_cache = SubqueryCache::new();
1630                for right_row in &right_rows {
1631                    let mut matched = false;
1632                    for left_row in &left_rows {
1633                        let combined = self.combine_join_rows(left_row, right_row);
1634                        if self.evaluate_condition(
1635                            ctx,
1636                            &mut subquery_cache,
1637                            &sources,
1638                            on,
1639                            &combined,
1640                        )? == Some(true)
1641                        {
1642                            rows.push(combined);
1643                            matched = true;
1644                        }
1645                    }
1646
1647                    if !matched {
1648                        rows.push(self.combine_nulls_with_right_row(left_width, right_row));
1649                    }
1650                }
1651
1652                Ok((sources, rows))
1653            }
1654            TableReference::FullOuterJoin { left, right, on } => {
1655                let (sources, left_rows, left_width, right_rows, right_width) =
1656                    self.materialize_join_sources(ctx, left, right)?;
1657
1658                let mut rows = Vec::new();
1659                let mut matched_right = vec![false; right_rows.len()];
1660                let mut subquery_cache = SubqueryCache::new();
1661
1662                for left_row in &left_rows {
1663                    let mut matched = false;
1664                    for (index, right_row) in right_rows.iter().enumerate() {
1665                        let combined = self.combine_join_rows(left_row, right_row);
1666                        if self.evaluate_condition(
1667                            ctx,
1668                            &mut subquery_cache,
1669                            &sources,
1670                            on,
1671                            &combined,
1672                        )? == Some(true)
1673                        {
1674                            rows.push(combined);
1675                            matched = true;
1676                            matched_right[index] = true;
1677                        }
1678                    }
1679
1680                    if !matched {
1681                        rows.push(self.combine_left_row_with_nulls(left_row, right_width));
1682                    }
1683                }
1684
1685                for (index, right_row) in right_rows.iter().enumerate() {
1686                    if !matched_right[index] {
1687                        rows.push(self.combine_nulls_with_right_row(left_width, right_row));
1688                    }
1689                }
1690
1691                Ok((sources, rows))
1692            }
1693        }
1694    }
1695
1696    fn execute_body(&mut self, ctx: &mut ExecutionContext) -> Result<QueryResult> {
1697        if let Some(set_operation) = &self.statement.set_operation {
1698            let mut subquery_cache = SubqueryCache::new();
1699            let mut left_statement = self.statement.clone();
1700            left_statement.set_operation = None;
1701            let mut left_executor = SelectExecutor::new(left_statement, self.access_path.clone());
1702            left_executor.outer_scopes = self.outer_scopes.clone();
1703            left_executor.materialized_ctes = self.materialized_ctes.clone();
1704            let mut left_result = left_executor.execute_body(ctx)?;
1705            let right_result = self.execute_subquery_cached(
1706                ctx,
1707                &mut subquery_cache,
1708                &set_operation.right,
1709                None,
1710                None,
1711            )?;
1712
1713            left_result.rows =
1714                apply_set_operation(set_operation.operator, left_result.rows, right_result.rows);
1715            left_result.affected_rows = left_result.rows.len();
1716            return Ok(left_result);
1717        }
1718
1719        let (sources, mut filtered_rows) = self.materialize_filtered_rows(ctx)?;
1720        let mut subquery_cache = SubqueryCache::new();
1721
1722        if !self.statement.order_by.is_empty() {
1723            filtered_rows.sort_by(|left, right| {
1724                for item in &self.statement.order_by {
1725                    let Ok(Some(index)) = self.resolve_column_index(&sources, &item.column) else {
1726                        continue;
1727                    };
1728
1729                    let text_context = self
1730                        .text_comparison_context_for_expression(
1731                            &sources,
1732                            &Expression::Column(item.column.clone()),
1733                        )
1734                        .ok()
1735                        .flatten();
1736                    let ordering =
1737                        self.compare_sort_values(&left[index], &right[index], text_context);
1738                    if ordering != Ordering::Equal {
1739                        return match item.direction {
1740                            SortDirection::Asc => ordering,
1741                            SortDirection::Desc => ordering.reverse(),
1742                        };
1743                    }
1744                }
1745
1746                Ordering::Equal
1747            });
1748        }
1749
1750        if !self.statement.group_by.is_empty() || self.has_aggregate_projection() {
1751            return self.execute_grouped(ctx, &mut subquery_cache, &sources, &filtered_rows);
1752        }
1753
1754        if self.has_window_projection() {
1755            let mut projected_rows =
1756                self.project_rows_with_windows(ctx, &mut subquery_cache, &sources, &filtered_rows)?;
1757            apply_distinct_if_needed(self.statement.distinct, &mut projected_rows);
1758            self.apply_select_window(&mut projected_rows);
1759            return Ok(self.build_query_result(self.get_column_names(&sources), projected_rows));
1760        }
1761
1762        let mut projected_rows = Vec::new();
1763        for row in filtered_rows {
1764            projected_rows.push(self.project_row(ctx, &mut subquery_cache, &sources, &row)?);
1765        }
1766
1767        apply_distinct_if_needed(self.statement.distinct, &mut projected_rows);
1768        self.apply_select_window(&mut projected_rows);
1769        Ok(self.build_query_result(self.get_column_names(&sources), projected_rows))
1770    }
1771
1772    fn materialize_filtered_rows(
1773        &mut self,
1774        ctx: &mut ExecutionContext<'_>,
1775    ) -> Result<(Vec<ResolvedSource>, Vec<Vec<Value>>)> {
1776        let direct_table = match &self.statement.from {
1777            TableReference::Table(table_name, _)
1778                if self.statement.lookup_cte(table_name).is_none() =>
1779            {
1780                ctx.catalog.get_table_by_name(table_name).cloned()
1781            }
1782            _ => None,
1783        };
1784
1785        let from = self.statement.from.clone();
1786        let (sources, all_rows) = if self.uses_materialized_reference() {
1787            self.materialize_reference(ctx, &from)?
1788        } else if let (TableReference::Table(table_name, _), Some(table)) =
1789            (&from, direct_table.as_ref())
1790        {
1791            let sources = self.resolve_sources(ctx)?;
1792            let rows = self.materialize_table_access_rows(ctx, table_name, table)?;
1793            (sources, rows)
1794        } else {
1795            return Err(HematiteError::InternalError(
1796                "Planner selected a direct table access path for a non-table source".to_string(),
1797            ));
1798        };
1799
1800        let mut subquery_cache = SubqueryCache::new();
1801        let filtered_rows =
1802            self.filter_source_rows(ctx, &mut subquery_cache, &sources, all_rows)?;
1803        Ok((sources, filtered_rows))
1804    }
1805
1806    fn compare_sort_values(
1807        &self,
1808        left: &Value,
1809        right: &Value,
1810        text_context: Option<TextComparisonContext>,
1811    ) -> Ordering {
1812        match (left.is_null(), right.is_null()) {
1813            (true, true) => Ordering::Equal,
1814            (true, false) => Ordering::Less,
1815            (false, true) => Ordering::Greater,
1816            (false, false) => sql_partial_cmp(left, right, text_context).unwrap_or(Ordering::Equal),
1817        }
1818    }
1819
1820    fn has_aggregate_projection(&self) -> bool {
1821        self.statement
1822            .columns
1823            .iter()
1824            .any(|item| matches!(item, SelectItem::CountAll | SelectItem::Aggregate { .. }))
1825    }
1826
1827    fn has_window_projection(&self) -> bool {
1828        self.statement
1829            .columns
1830            .iter()
1831            .any(|item| matches!(item, SelectItem::Window { .. }))
1832    }
1833
1834    fn project_rows_with_windows(
1835        &self,
1836        ctx: &mut ExecutionContext<'_>,
1837        cache: &mut SubqueryCache,
1838        sources: &[ResolvedSource],
1839        filtered_rows: &[Vec<Value>],
1840    ) -> Result<Vec<Vec<Value>>> {
1841        let mut projected_rows = Vec::with_capacity(filtered_rows.len());
1842
1843        for (row_index, row) in filtered_rows.iter().enumerate() {
1844            let mut projected = Vec::new();
1845
1846            for item in &self.statement.columns {
1847                match item {
1848                    SelectItem::Wildcard => projected.extend(row.iter().cloned()),
1849                    SelectItem::Column(name) => {
1850                        if let Some(index) = self.resolve_column_index(sources, name)? {
1851                            if index < row.len() {
1852                                projected.push(row[index].clone());
1853                            }
1854                        }
1855                    }
1856                    SelectItem::Expression(expr) => {
1857                        projected.push(self.evaluate_expression(ctx, cache, sources, expr, row)?);
1858                    }
1859                    SelectItem::Window { function, window } => {
1860                        projected.push(self.evaluate_window_item(
1861                            ctx,
1862                            cache,
1863                            sources,
1864                            filtered_rows,
1865                            row_index,
1866                            function,
1867                            window,
1868                        )?)
1869                    }
1870                    SelectItem::CountAll | SelectItem::Aggregate { .. } => {}
1871                }
1872            }
1873
1874            projected_rows.push(projected);
1875        }
1876
1877        Ok(projected_rows)
1878    }
1879
1880    fn evaluate_window_item(
1881        &self,
1882        ctx: &mut ExecutionContext<'_>,
1883        cache: &mut SubqueryCache,
1884        sources: &[ResolvedSource],
1885        filtered_rows: &[Vec<Value>],
1886        row_index: usize,
1887        function: &WindowFunction,
1888        window: &WindowSpec,
1889    ) -> Result<Value> {
1890        let partition_key = window
1891            .partition_by
1892            .iter()
1893            .map(|expr| {
1894                self.evaluate_expression(ctx, cache, sources, expr, &filtered_rows[row_index])
1895            })
1896            .collect::<Result<Vec<_>>>()?;
1897
1898        let mut partition_indexes = Vec::new();
1899        for (index, row) in filtered_rows.iter().enumerate() {
1900            let row_key = window
1901                .partition_by
1902                .iter()
1903                .map(|expr| self.evaluate_expression(ctx, cache, sources, expr, row))
1904                .collect::<Result<Vec<_>>>()?;
1905            if row_key == partition_key {
1906                partition_indexes.push(index);
1907            }
1908        }
1909
1910        if !window.order_by.is_empty() {
1911            partition_indexes.sort_by(|left_index, right_index| {
1912                let left = &filtered_rows[*left_index];
1913                let right = &filtered_rows[*right_index];
1914
1915                for item in &window.order_by {
1916                    let Ok(Some(column_index)) = self.resolve_column_index(sources, &item.column)
1917                    else {
1918                        continue;
1919                    };
1920
1921                    let ordering = self.compare_sort_values(
1922                        &left[column_index],
1923                        &right[column_index],
1924                        self.text_comparison_context_for_expression(
1925                            sources,
1926                            &Expression::Column(item.column.clone()),
1927                        )
1928                        .ok()
1929                        .flatten(),
1930                    );
1931                    if ordering != Ordering::Equal {
1932                        return match item.direction {
1933                            SortDirection::Asc => ordering,
1934                            SortDirection::Desc => ordering.reverse(),
1935                        };
1936                    }
1937                }
1938
1939                left_index.cmp(right_index)
1940            });
1941        }
1942
1943        let position = partition_indexes
1944            .iter()
1945            .position(|index| *index == row_index)
1946            .ok_or_else(|| {
1947                HematiteError::InternalError(
1948                    "Current row not found in window partition".to_string(),
1949                )
1950            })?;
1951
1952        match function {
1953            WindowFunction::RowNumber => Ok(Value::Integer((position + 1) as i32)),
1954            WindowFunction::Rank => {
1955                let mut rank = 1usize;
1956                for current in 1..=position {
1957                    if self.window_sort_key_changed(
1958                        sources,
1959                        window,
1960                        &filtered_rows[partition_indexes[current - 1]],
1961                        &filtered_rows[partition_indexes[current]],
1962                    )? {
1963                        rank = current + 1;
1964                    }
1965                }
1966                Ok(Value::Integer(rank as i32))
1967            }
1968            WindowFunction::DenseRank => {
1969                let mut rank = 1usize;
1970                for current in 1..=position {
1971                    if self.window_sort_key_changed(
1972                        sources,
1973                        window,
1974                        &filtered_rows[partition_indexes[current - 1]],
1975                        &filtered_rows[partition_indexes[current]],
1976                    )? {
1977                        rank += 1;
1978                    }
1979                }
1980                Ok(Value::Integer(rank as i32))
1981            }
1982            WindowFunction::Aggregate { function, target } => {
1983                let partition_rows = partition_indexes
1984                    .iter()
1985                    .map(|index| filtered_rows[*index].clone())
1986                    .collect::<Vec<_>>();
1987                Ok(self
1988                    .evaluate_aggregate_value(sources, *function, target, &partition_rows)?
1989                    .unwrap_or(Value::Null))
1990            }
1991        }
1992    }
1993
1994    fn window_sort_key_changed(
1995        &self,
1996        sources: &[ResolvedSource],
1997        window: &WindowSpec,
1998        left: &[Value],
1999        right: &[Value],
2000    ) -> Result<bool> {
2001        if window.order_by.is_empty() {
2002            return Ok(false);
2003        }
2004
2005        for item in &window.order_by {
2006            let index = self
2007                .resolve_column_index(sources, &item.column)?
2008                .ok_or_else(|| {
2009                    HematiteError::ParseError(format!("Column '{}' not found", item.column))
2010                })?;
2011
2012            if self.compare_sort_values(
2013                &left[index],
2014                &right[index],
2015                self.text_comparison_context_for_expression(
2016                    sources,
2017                    &Expression::Column(item.column.clone()),
2018                )
2019                .ok()
2020                .flatten(),
2021            ) != Ordering::Equal
2022            {
2023                return Ok(true);
2024            }
2025        }
2026
2027        Ok(false)
2028    }
2029
2030    fn apply_select_window(&self, rows: &mut Vec<Vec<Value>>) {
2031        if let Some(offset) = self.statement.offset {
2032            if offset >= rows.len() {
2033                rows.clear();
2034                return;
2035            }
2036            rows.drain(0..offset);
2037        }
2038
2039        if let Some(limit) = self.statement.limit {
2040            rows.truncate(limit);
2041        }
2042    }
2043
2044    fn build_query_result(&self, columns: Vec<String>, rows: Vec<Vec<Value>>) -> QueryResult {
2045        QueryResult {
2046            affected_rows: rows.len(),
2047            columns,
2048            rows,
2049        }
2050    }
2051
2052    fn evaluate_aggregate_value(
2053        &self,
2054        sources: &[ResolvedSource],
2055        function: AggregateFunction,
2056        target: &AggregateTarget,
2057        rows: &[Vec<Value>],
2058    ) -> Result<Option<Value>> {
2059        if matches!(target, AggregateTarget::All) {
2060            return match function {
2061                AggregateFunction::Count => Ok(Some(Value::Integer(rows.len() as i32))),
2062                _ => Err(HematiteError::ParseError(format!(
2063                    "{:?}(*) is not supported",
2064                    function
2065                ))),
2066            };
2067        }
2068
2069        let AggregateTarget::Column(column) = target else {
2070            return Ok(None);
2071        };
2072
2073        let index = self
2074            .resolve_column_index(sources, column)?
2075            .ok_or_else(|| HematiteError::ParseError(format!("Column '{}' not found", column)))?;
2076
2077        let values: Vec<&Value> = rows
2078            .iter()
2079            .map(|row| &row[index])
2080            .filter(|value| !value.is_null())
2081            .collect();
2082
2083        if values.is_empty() {
2084            return Ok(Some(match function {
2085                AggregateFunction::Count => Value::Integer(0),
2086                _ => Value::Null,
2087            }));
2088        }
2089
2090        match function {
2091            AggregateFunction::Count => Ok(Some(Value::Integer(values.len() as i32))),
2092            AggregateFunction::Min => {
2093                let mut current = values[0].clone();
2094                for value in values.into_iter().skip(1) {
2095                    if value.partial_cmp(&current).is_some_and(|ord| ord.is_lt()) {
2096                        current = value.clone();
2097                    }
2098                }
2099                Ok(Some(current))
2100            }
2101            AggregateFunction::Max => {
2102                let mut current = values[0].clone();
2103                for value in values.into_iter().skip(1) {
2104                    if value.partial_cmp(&current).is_some_and(|ord| ord.is_gt()) {
2105                        current = value.clone();
2106                    }
2107                }
2108                Ok(Some(current))
2109            }
2110            AggregateFunction::Sum => {
2111                let mut int_sum: i64 = 0;
2112                let mut float_sum: f64 = 0.0;
2113                let mut has_float = false;
2114
2115                for value in &values {
2116                    match value {
2117                        Value::Integer(i) => {
2118                            int_sum += *i as i64;
2119                            float_sum += *i as f64;
2120                        }
2121                        Value::Float32(f) => {
2122                            has_float = true;
2123                            float_sum += *f as f64;
2124                        }
2125                        Value::Float(f) => {
2126                            has_float = true;
2127                            float_sum += *f;
2128                        }
2129                        _ => {
2130                            return Err(HematiteError::ParseError(format!(
2131                                "SUM() requires numeric values, found {:?}",
2132                                value
2133                            )))
2134                        }
2135                    }
2136                }
2137
2138                if has_float {
2139                    Ok(Some(Value::Float(float_sum)))
2140                } else {
2141                    Ok(Some(Value::Integer(int_sum as i32)))
2142                }
2143            }
2144            AggregateFunction::Avg => {
2145                let mut sum: f64 = 0.0;
2146                let count = values.len() as f64;
2147
2148                for value in &values {
2149                    match value {
2150                        Value::Integer(i) => {
2151                            sum += *i as f64;
2152                        }
2153                        Value::Float32(f) => {
2154                            sum += *f as f64;
2155                        }
2156                        Value::Float(f) => {
2157                            sum += *f;
2158                        }
2159                        _ => {
2160                            return Err(HematiteError::ParseError(format!(
2161                                "AVG() requires numeric values, found {:?}",
2162                                value
2163                            )))
2164                        }
2165                    }
2166                }
2167
2168                Ok(Some(Value::Float(sum / count)))
2169            }
2170        }
2171    }
2172
2173    fn evaluate_aggregate_item(
2174        &self,
2175        sources: &[ResolvedSource],
2176        item: &SelectItem,
2177        rows: &[Vec<Value>],
2178    ) -> Result<Option<Value>> {
2179        match item {
2180            SelectItem::CountAll => self.evaluate_aggregate_value(
2181                sources,
2182                AggregateFunction::Count,
2183                &AggregateTarget::All,
2184                rows,
2185            ),
2186            SelectItem::Aggregate { function, column } => self.evaluate_aggregate_value(
2187                sources,
2188                *function,
2189                &AggregateTarget::Column(column.clone()),
2190                rows,
2191            ),
2192            _ => Ok(None),
2193        }
2194    }
2195
2196    fn result_column_index(
2197        &self,
2198        output_columns: &[String],
2199        order_by_column: &str,
2200    ) -> Option<usize> {
2201        let base_name = SelectStatement::column_reference_name(order_by_column);
2202        output_columns.iter().position(|column| {
2203            column.eq_ignore_ascii_case(order_by_column) || column.eq_ignore_ascii_case(base_name)
2204        })
2205    }
2206
2207    fn sort_projected_rows(&self, output_columns: &[String], rows: &mut [Vec<Value>]) {
2208        if self.statement.order_by.is_empty() {
2209            return;
2210        }
2211
2212        rows.sort_by(|left, right| {
2213            for item in &self.statement.order_by {
2214                let Some(index) = self.result_column_index(output_columns, &item.column) else {
2215                    continue;
2216                };
2217
2218                let ordering = self.compare_sort_values(&left[index], &right[index], None);
2219                if ordering != Ordering::Equal {
2220                    return match item.direction {
2221                        SortDirection::Asc => ordering,
2222                        SortDirection::Desc => ordering.reverse(),
2223                    };
2224                }
2225            }
2226
2227            Ordering::Equal
2228        });
2229    }
2230
2231    fn evaluate_projected_expression(
2232        &self,
2233        ctx: &mut ExecutionContext<'_>,
2234        cache: &mut SubqueryCache,
2235        sources: &[ResolvedSource],
2236        expr: &Expression,
2237        row: &[Value],
2238        output_columns: &[String],
2239        group_rows: &[Vec<Value>],
2240    ) -> Result<Value> {
2241        match expr {
2242            Expression::Literal(value) => Ok(lower_literal_value(value)),
2243            Expression::IntervalLiteral { value, qualifier } => match qualifier {
2244                IntervalQualifier::YearToMonth => Ok(Value::IntervalYearMonth(
2245                    IntervalYearMonthValue::parse(value)?,
2246                )),
2247                IntervalQualifier::DayToSecond => Ok(Value::IntervalDaySecond(
2248                    IntervalDaySecondValue::parse(value)?,
2249                )),
2250            },
2251            Expression::Parameter(index) => Err(HematiteError::ParseError(format!(
2252                "Unbound parameter {} reached execution",
2253                index + 1
2254            ))),
2255            Expression::Cast { expr, target_type } => cast_value_to_type(
2256                self.evaluate_projected_expression(
2257                    ctx,
2258                    cache,
2259                    sources,
2260                    expr,
2261                    row,
2262                    output_columns,
2263                    group_rows,
2264                )?,
2265                lower_type_name(target_type.clone()),
2266            ),
2267            Expression::Case {
2268                branches,
2269                else_expr,
2270            } => {
2271                for branch in branches {
2272                    match self.evaluate_projected_boolean_expression(
2273                        ctx,
2274                        cache,
2275                        sources,
2276                        &branch.condition,
2277                        row,
2278                        output_columns,
2279                        group_rows,
2280                    )? {
2281                        Some(true) => {
2282                            return self.evaluate_projected_expression(
2283                                ctx,
2284                                cache,
2285                                sources,
2286                                &branch.result,
2287                                row,
2288                                output_columns,
2289                                group_rows,
2290                            )
2291                        }
2292                        Some(false) | None => {}
2293                    }
2294                }
2295
2296                match else_expr {
2297                    Some(else_expr) => self.evaluate_projected_expression(
2298                        ctx,
2299                        cache,
2300                        sources,
2301                        else_expr,
2302                        row,
2303                        output_columns,
2304                        group_rows,
2305                    ),
2306                    None => Ok(Value::Null),
2307                }
2308            }
2309            Expression::ScalarSubquery(subquery) => {
2310                self.execute_scalar_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))
2311            }
2312            Expression::AggregateCall { function, target } => self
2313                .evaluate_aggregate_value(sources, *function, target, group_rows)?
2314                .ok_or_else(|| {
2315                    HematiteError::InternalError(
2316                        "Aggregate expression evaluation produced no value".to_string(),
2317                    )
2318                }),
2319            Expression::ScalarFunctionCall { function, args } => {
2320                evaluate_scalar_function_call(*function, args, |expr| {
2321                    self.evaluate_projected_expression(
2322                        ctx,
2323                        cache,
2324                        sources,
2325                        expr,
2326                        row,
2327                        output_columns,
2328                        group_rows,
2329                    )
2330                })
2331            }
2332            Expression::Column(name) => {
2333                let index = self
2334                    .result_column_index(output_columns, name)
2335                    .ok_or_else(|| {
2336                        HematiteError::ParseError(format!(
2337                            "HAVING column '{}' does not match any grouped output column or alias",
2338                            name
2339                        ))
2340                    })?;
2341                row.get(index).cloned().ok_or_else(|| {
2342                    HematiteError::InternalError(format!(
2343                        "Grouped output row is missing column index {} for '{}'",
2344                        index, name
2345                    ))
2346                })
2347            }
2348            Expression::UnaryMinus(expr) => {
2349                negate_numeric_value(self.evaluate_projected_expression(
2350                    ctx,
2351                    cache,
2352                    sources,
2353                    expr,
2354                    row,
2355                    output_columns,
2356                    group_rows,
2357                )?)
2358            }
2359            Expression::UnaryNot(_)
2360            | Expression::Comparison { .. }
2361            | Expression::InList { .. }
2362            | Expression::InSubquery { .. }
2363            | Expression::Between { .. }
2364            | Expression::Like { .. }
2365            | Expression::Exists { .. }
2366            | Expression::NullCheck { .. }
2367            | Expression::Logical { .. } => Ok(nullable_bool_to_value(
2368                self.evaluate_projected_boolean_expression(
2369                    ctx,
2370                    cache,
2371                    sources,
2372                    expr,
2373                    row,
2374                    output_columns,
2375                    group_rows,
2376                )?,
2377            )),
2378            Expression::Binary {
2379                left,
2380                operator,
2381                right,
2382            } => self.evaluate_arithmetic(
2383                operator,
2384                self.evaluate_projected_expression(
2385                    ctx,
2386                    cache,
2387                    sources,
2388                    left,
2389                    row,
2390                    output_columns,
2391                    group_rows,
2392                )?,
2393                self.evaluate_projected_expression(
2394                    ctx,
2395                    cache,
2396                    sources,
2397                    right,
2398                    row,
2399                    output_columns,
2400                    group_rows,
2401                )?,
2402            ),
2403        }
2404    }
2405
2406    fn evaluate_projected_boolean_expression(
2407        &self,
2408        ctx: &mut ExecutionContext<'_>,
2409        cache: &mut SubqueryCache,
2410        sources: &[ResolvedSource],
2411        expr: &Expression,
2412        row: &[Value],
2413        output_columns: &[String],
2414        group_rows: &[Vec<Value>],
2415    ) -> Result<Option<bool>> {
2416        match expr {
2417            Expression::Comparison {
2418                left,
2419                operator,
2420                right,
2421            } => {
2422                let left_val = self.evaluate_projected_expression(
2423                    ctx,
2424                    cache,
2425                    sources,
2426                    left,
2427                    row,
2428                    output_columns,
2429                    group_rows,
2430                )?;
2431                let right_val = self.evaluate_projected_expression(
2432                    ctx,
2433                    cache,
2434                    sources,
2435                    right,
2436                    row,
2437                    output_columns,
2438                    group_rows,
2439                )?;
2440                let text_context = self.merged_text_comparison_context(sources, left, right)?;
2441                Ok(compare_condition_values(
2442                    &left_val,
2443                    operator,
2444                    &right_val,
2445                    text_context,
2446                ))
2447            }
2448            Expression::InList {
2449                expr,
2450                values,
2451                is_not,
2452            } => evaluate_in_list_predicate(
2453                expr,
2454                values,
2455                *is_not,
2456                self.text_comparison_context_for_expression(sources, expr)?,
2457                |value_expr| {
2458                    self.evaluate_projected_expression(
2459                        ctx,
2460                        cache,
2461                        sources,
2462                        value_expr,
2463                        row,
2464                        output_columns,
2465                        group_rows,
2466                    )
2467                },
2468            ),
2469            Expression::InSubquery {
2470                expr,
2471                subquery,
2472                is_not,
2473            } => {
2474                let probe = self.evaluate_projected_expression(
2475                    ctx,
2476                    cache,
2477                    sources,
2478                    expr,
2479                    row,
2480                    output_columns,
2481                    group_rows,
2482                )?;
2483                let subquery_result =
2484                    self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
2485                let candidates = subquery_result
2486                    .rows
2487                    .into_iter()
2488                    .map(|row| row.into_iter().next().unwrap_or(Value::Null))
2489                    .collect::<Vec<_>>();
2490                let text_context = self.text_comparison_context_for_expression(sources, expr)?;
2491                Ok(evaluate_in_candidates(
2492                    probe,
2493                    candidates,
2494                    *is_not,
2495                    text_context,
2496                ))
2497            }
2498            Expression::Between {
2499                expr,
2500                lower,
2501                upper,
2502                is_not,
2503            } => evaluate_between_predicate(
2504                expr,
2505                lower,
2506                upper,
2507                *is_not,
2508                self.text_comparison_context_for_expression(sources, expr)?,
2509                |value_expr| {
2510                    self.evaluate_projected_expression(
2511                        ctx,
2512                        cache,
2513                        sources,
2514                        value_expr,
2515                        row,
2516                        output_columns,
2517                        group_rows,
2518                    )
2519                },
2520            ),
2521            Expression::Like {
2522                expr,
2523                pattern,
2524                is_not,
2525            } => evaluate_like_predicate(
2526                expr,
2527                pattern,
2528                *is_not,
2529                self.text_comparison_context_for_expression(sources, expr)?,
2530                |value_expr| {
2531                    self.evaluate_projected_expression(
2532                        ctx,
2533                        cache,
2534                        sources,
2535                        value_expr,
2536                        row,
2537                        output_columns,
2538                        group_rows,
2539                    )
2540                },
2541            ),
2542            Expression::Exists { subquery, is_not } => {
2543                let subquery_result =
2544                    self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
2545                let exists = !subquery_result.rows.is_empty();
2546                Ok(Some(if *is_not { !exists } else { exists }))
2547            }
2548            Expression::NullCheck { expr, is_not } => {
2549                let value = self.evaluate_projected_expression(
2550                    ctx,
2551                    cache,
2552                    sources,
2553                    expr,
2554                    row,
2555                    output_columns,
2556                    group_rows,
2557                )?;
2558                let is_null = value.is_null();
2559                Ok(Some(if *is_not { !is_null } else { is_null }))
2560            }
2561            Expression::UnaryNot(expr) => Ok(self
2562                .evaluate_projected_boolean_expression(
2563                    ctx,
2564                    cache,
2565                    sources,
2566                    expr,
2567                    row,
2568                    output_columns,
2569                    group_rows,
2570                )?
2571                .map(|value| !value)),
2572            Expression::Logical {
2573                left,
2574                operator,
2575                right,
2576            } => {
2577                let left_result = self.evaluate_projected_boolean_expression(
2578                    ctx,
2579                    cache,
2580                    sources,
2581                    left,
2582                    row,
2583                    output_columns,
2584                    group_rows,
2585                )?;
2586                let right_result = self.evaluate_projected_boolean_expression(
2587                    ctx,
2588                    cache,
2589                    sources,
2590                    right,
2591                    row,
2592                    output_columns,
2593                    group_rows,
2594                )?;
2595                Ok(match operator {
2596                    LogicalOperator::And => logical_and_values(left_result, right_result),
2597                    LogicalOperator::Or => logical_or_values(left_result, right_result),
2598                })
2599            }
2600            _ => coerce_value_to_nullable_bool(
2601                self.evaluate_projected_expression(
2602                    ctx,
2603                    cache,
2604                    sources,
2605                    expr,
2606                    row,
2607                    output_columns,
2608                    group_rows,
2609                )?,
2610                "Boolean expression",
2611            ),
2612        }
2613    }
2614
2615    fn evaluate_projected_condition(
2616        &self,
2617        ctx: &mut ExecutionContext<'_>,
2618        cache: &mut SubqueryCache,
2619        sources: &[ResolvedSource],
2620        condition: &Condition,
2621        row: &[Value],
2622        output_columns: &[String],
2623        group_rows: &[Vec<Value>],
2624    ) -> Result<Option<bool>> {
2625        match condition {
2626            Condition::Comparison {
2627                left,
2628                operator,
2629                right,
2630            } => {
2631                let left_val = self.evaluate_projected_expression(
2632                    ctx,
2633                    cache,
2634                    sources,
2635                    left,
2636                    row,
2637                    output_columns,
2638                    group_rows,
2639                )?;
2640                let right_val = self.evaluate_projected_expression(
2641                    ctx,
2642                    cache,
2643                    sources,
2644                    right,
2645                    row,
2646                    output_columns,
2647                    group_rows,
2648                )?;
2649                let text_context = self.merged_text_comparison_context(sources, left, right)?;
2650                Ok(self.compare_values(&left_val, operator, &right_val, text_context))
2651            }
2652            Condition::InList {
2653                expr,
2654                values,
2655                is_not,
2656            } => {
2657                let probe = self.evaluate_projected_expression(
2658                    ctx,
2659                    cache,
2660                    sources,
2661                    expr,
2662                    row,
2663                    output_columns,
2664                    group_rows,
2665                )?;
2666                let candidates = values
2667                    .iter()
2668                    .map(|value_expr| {
2669                        self.evaluate_projected_expression(
2670                            ctx,
2671                            cache,
2672                            sources,
2673                            value_expr,
2674                            row,
2675                            output_columns,
2676                            group_rows,
2677                        )
2678                    })
2679                    .collect::<Result<Vec<_>>>()?;
2680                let text_context = self.text_comparison_context_for_expression(sources, expr)?;
2681                Ok(self.evaluate_in_candidates(probe, candidates, *is_not, text_context))
2682            }
2683            Condition::InSubquery {
2684                expr,
2685                subquery,
2686                is_not,
2687            } => {
2688                let probe = self.evaluate_projected_expression(
2689                    ctx,
2690                    cache,
2691                    sources,
2692                    expr,
2693                    row,
2694                    output_columns,
2695                    group_rows,
2696                )?;
2697                let subquery_result =
2698                    self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
2699                let candidates = subquery_result
2700                    .rows
2701                    .into_iter()
2702                    .map(|row| row.into_iter().next().unwrap_or(Value::Null))
2703                    .collect::<Vec<_>>();
2704                let text_context = self.text_comparison_context_for_expression(sources, expr)?;
2705                Ok(self.evaluate_in_candidates(probe, candidates, *is_not, text_context))
2706            }
2707            Condition::Between {
2708                expr,
2709                lower,
2710                upper,
2711                is_not,
2712            } => {
2713                let value = self.evaluate_projected_expression(
2714                    ctx,
2715                    cache,
2716                    sources,
2717                    expr,
2718                    row,
2719                    output_columns,
2720                    group_rows,
2721                )?;
2722                let lower_value = self.evaluate_projected_expression(
2723                    ctx,
2724                    cache,
2725                    sources,
2726                    lower,
2727                    row,
2728                    output_columns,
2729                    group_rows,
2730                )?;
2731                let upper_value = self.evaluate_projected_expression(
2732                    ctx,
2733                    cache,
2734                    sources,
2735                    upper,
2736                    row,
2737                    output_columns,
2738                    group_rows,
2739                )?;
2740
2741                if value.is_null() || lower_value.is_null() || upper_value.is_null() {
2742                    return Ok(None);
2743                }
2744
2745                let text_context = self.text_comparison_context_for_expression(sources, expr)?;
2746                let lower_ok = sql_partial_cmp(&value, &lower_value, text_context)
2747                    .map(|ordering| !ordering.is_lt());
2748                let upper_ok = sql_partial_cmp(&value, &upper_value, text_context)
2749                    .map(|ordering| !ordering.is_gt());
2750
2751                match (lower_ok, upper_ok) {
2752                    (Some(true), Some(true)) => Ok(Some(!is_not)),
2753                    (Some(_), Some(_)) => Ok(Some(*is_not)),
2754                    _ => Ok(None),
2755                }
2756            }
2757            Condition::Like {
2758                expr,
2759                pattern,
2760                is_not,
2761            } => {
2762                let value = self.evaluate_projected_expression(
2763                    ctx,
2764                    cache,
2765                    sources,
2766                    expr,
2767                    row,
2768                    output_columns,
2769                    group_rows,
2770                )?;
2771                let pattern_value = self.evaluate_projected_expression(
2772                    ctx,
2773                    cache,
2774                    sources,
2775                    pattern,
2776                    row,
2777                    output_columns,
2778                    group_rows,
2779                )?;
2780
2781                match (value, pattern_value) {
2782                    (Value::Text(text), Value::Text(pattern)) => {
2783                        let matched = Self::like_matches(&pattern, &text);
2784                        Ok(Some(if *is_not { !matched } else { matched }))
2785                    }
2786                    (left, right) if left.is_null() || right.is_null() => Ok(None),
2787                    _ => Ok(None),
2788                }
2789            }
2790            Condition::Exists { subquery, is_not } => {
2791                let subquery_result =
2792                    self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
2793                let exists = !subquery_result.rows.is_empty();
2794                Ok(Some(if *is_not { !exists } else { exists }))
2795            }
2796            Condition::NullCheck { expr, is_not } => {
2797                let value = self.evaluate_projected_expression(
2798                    ctx,
2799                    cache,
2800                    sources,
2801                    expr,
2802                    row,
2803                    output_columns,
2804                    group_rows,
2805                )?;
2806                let is_null = value.is_null();
2807                Ok(Some(if *is_not { !is_null } else { is_null }))
2808            }
2809            Condition::Not(condition) => Ok(self
2810                .evaluate_projected_condition(
2811                    ctx,
2812                    cache,
2813                    sources,
2814                    condition,
2815                    row,
2816                    output_columns,
2817                    group_rows,
2818                )?
2819                .map(|value| !value)),
2820            Condition::Logical {
2821                left,
2822                operator,
2823                right,
2824            } => {
2825                let left_result = self.evaluate_projected_condition(
2826                    ctx,
2827                    cache,
2828                    sources,
2829                    left,
2830                    row,
2831                    output_columns,
2832                    group_rows,
2833                )?;
2834                let right_result = self.evaluate_projected_condition(
2835                    ctx,
2836                    cache,
2837                    sources,
2838                    right,
2839                    row,
2840                    output_columns,
2841                    group_rows,
2842                )?;
2843
2844                match operator {
2845                    LogicalOperator::And => Ok(self.logical_and(left_result, right_result)),
2846                    LogicalOperator::Or => Ok(self.logical_or(left_result, right_result)),
2847                }
2848            }
2849        }
2850    }
2851
2852    fn project_grouped_row(
2853        &self,
2854        ctx: &mut ExecutionContext<'_>,
2855        cache: &mut SubqueryCache,
2856        sources: &[ResolvedSource],
2857        group_rows: &[Vec<Value>],
2858    ) -> Result<Vec<Value>> {
2859        let representative = group_rows.first().map(Vec::as_slice).unwrap_or(&[]);
2860        let mut projected = Vec::new();
2861
2862        for item in &self.statement.columns {
2863            match item {
2864                SelectItem::Wildcard => {}
2865                SelectItem::Column(name) => {
2866                    if let Some(index) = self.resolve_column_index(sources, name)? {
2867                        if index < representative.len() {
2868                            projected.push(representative[index].clone());
2869                        }
2870                    }
2871                }
2872                SelectItem::Expression(expr) => {
2873                    projected.push(self.evaluate_expression(
2874                        ctx,
2875                        cache,
2876                        sources,
2877                        expr,
2878                        representative,
2879                    )?);
2880                }
2881                SelectItem::CountAll | SelectItem::Aggregate { .. } => {
2882                    projected.push(
2883                        self.evaluate_aggregate_item(sources, item, group_rows)?
2884                            .unwrap_or(Value::Null),
2885                    );
2886                }
2887                SelectItem::Window { .. } => {
2888                    return Err(HematiteError::InternalError(
2889                        "Window projections are not supported in grouped execution".to_string(),
2890                    ))
2891                }
2892            }
2893        }
2894
2895        Ok(projected)
2896    }
2897
2898    fn build_groups(
2899        &self,
2900        ctx: &mut ExecutionContext<'_>,
2901        cache: &mut SubqueryCache,
2902        sources: &[ResolvedSource],
2903        filtered_rows: &[Vec<Value>],
2904    ) -> Result<Vec<Vec<Vec<Value>>>> {
2905        if self.statement.group_by.is_empty() {
2906            if filtered_rows.is_empty() && self.has_aggregate_projection() {
2907                return Ok(vec![Vec::new()]);
2908            }
2909            return Ok(vec![filtered_rows.to_vec()]);
2910        }
2911
2912        let mut keyed_groups: Vec<(Vec<Value>, Vec<Vec<Value>>)> = Vec::new();
2913        for row in filtered_rows {
2914            let key = self
2915                .statement
2916                .group_by
2917                .iter()
2918                .map(|expr| self.evaluate_expression(ctx, cache, sources, expr, row))
2919                .collect::<Result<Vec<_>>>()?;
2920
2921            if let Some((_, rows)) = keyed_groups
2922                .iter_mut()
2923                .find(|(existing_key, _)| *existing_key == key)
2924            {
2925                rows.push(row.clone());
2926            } else {
2927                keyed_groups.push((key, vec![row.clone()]));
2928            }
2929        }
2930
2931        Ok(keyed_groups.into_iter().map(|(_, rows)| rows).collect())
2932    }
2933
2934    fn apply_having_clause(
2935        &self,
2936        ctx: &mut ExecutionContext<'_>,
2937        cache: &mut SubqueryCache,
2938        sources: &[ResolvedSource],
2939        output_columns: &[String],
2940        grouped_rows: Vec<GroupedRow>,
2941    ) -> Result<Vec<Vec<Value>>> {
2942        let Some(having_clause) = &self.statement.having_clause else {
2943            return Ok(grouped_rows
2944                .into_iter()
2945                .map(|group| group.projected)
2946                .collect::<Vec<_>>());
2947        };
2948
2949        let mut filtered_rows = Vec::with_capacity(grouped_rows.len());
2950        for grouped in grouped_rows {
2951            if self.projected_conditions_match(
2952                ctx,
2953                cache,
2954                sources,
2955                &having_clause.conditions,
2956                &grouped.projected,
2957                output_columns,
2958                &grouped.source_rows,
2959            )? {
2960                filtered_rows.push(grouped.projected);
2961            }
2962        }
2963
2964        Ok(filtered_rows)
2965    }
2966
2967    fn execute_grouped(
2968        &self,
2969        ctx: &mut ExecutionContext,
2970        cache: &mut SubqueryCache,
2971        sources: &[ResolvedSource],
2972        filtered_rows: &[Vec<Value>],
2973    ) -> Result<QueryResult> {
2974        let groups = self.build_groups(ctx, cache, sources, filtered_rows)?;
2975        let output_columns = self.get_column_names(sources);
2976        let mut grouped_rows = Vec::with_capacity(groups.len());
2977        for rows in groups {
2978            grouped_rows.push(GroupedRow {
2979                projected: self.project_grouped_row(ctx, cache, sources, &rows)?,
2980                source_rows: rows,
2981            });
2982        }
2983
2984        let projected_rows =
2985            self.apply_having_clause(ctx, cache, sources, &output_columns, grouped_rows)?;
2986        self.finalize_grouped_rows(output_columns, projected_rows)
2987    }
2988
2989    fn finalize_grouped_rows(
2990        &self,
2991        output_columns: Vec<String>,
2992        mut projected_rows: Vec<Vec<Value>>,
2993    ) -> Result<QueryResult> {
2994        apply_distinct_if_needed(self.statement.distinct, &mut projected_rows);
2995
2996        self.sort_projected_rows(&output_columns, &mut projected_rows);
2997        self.apply_select_window(&mut projected_rows);
2998
2999        Ok(self.build_query_result(output_columns, projected_rows))
3000    }
3001
3002    fn filter_source_rows(
3003        &self,
3004        ctx: &mut ExecutionContext<'_>,
3005        cache: &mut SubqueryCache,
3006        sources: &[ResolvedSource],
3007        all_rows: Vec<Vec<Value>>,
3008    ) -> Result<Vec<Vec<Value>>> {
3009        let skip_filter = matches!(self.access_path, SelectAccessPath::RowIdLookup);
3010        let mut filtered_rows = Vec::new();
3011
3012        for row in all_rows {
3013            let include = if skip_filter {
3014                true
3015            } else {
3016                match &self.statement.where_clause {
3017                    Some(where_clause) => {
3018                        self.conditions_match(ctx, cache, sources, &where_clause.conditions, &row)?
3019                    }
3020                    None => true,
3021                }
3022            };
3023
3024            if include {
3025                filtered_rows.push(row);
3026            }
3027        }
3028
3029        Ok(filtered_rows)
3030    }
3031
3032    fn conditions_match(
3033        &self,
3034        ctx: &mut ExecutionContext<'_>,
3035        cache: &mut SubqueryCache,
3036        sources: &[ResolvedSource],
3037        conditions: &[Condition],
3038        row: &[Value],
3039    ) -> Result<bool> {
3040        conditions_match_with(conditions, |condition| {
3041            self.evaluate_condition(ctx, cache, sources, condition, row)
3042        })
3043    }
3044
3045    fn projected_conditions_match(
3046        &self,
3047        ctx: &mut ExecutionContext<'_>,
3048        cache: &mut SubqueryCache,
3049        sources: &[ResolvedSource],
3050        conditions: &[Condition],
3051        row: &[Value],
3052        output_columns: &[String],
3053        group_rows: &[Vec<Value>],
3054    ) -> Result<bool> {
3055        conditions_match_with(conditions, |condition| {
3056            self.evaluate_projected_condition(
3057                ctx,
3058                cache,
3059                sources,
3060                condition,
3061                row,
3062                output_columns,
3063                group_rows,
3064            )
3065        })
3066    }
3067
3068    fn extract_primary_key_lookup(&self, table: &Table) -> Option<Vec<Value>> {
3069        let equalities = extract_literal_equalities(self.statement.where_clause.as_ref()?)?;
3070        table
3071            .primary_key_columns
3072            .iter()
3073            .map(|&index| table.columns.get(index))
3074            .collect::<Option<Vec<_>>>()?
3075            .into_iter()
3076            .map(|column| equalities.get(column.name.as_str()).cloned())
3077            .collect()
3078    }
3079
3080    fn extract_secondary_index_lookup(
3081        &self,
3082        table: &Table,
3083        index_name: &str,
3084    ) -> Option<Vec<Value>> {
3085        let index = table.get_secondary_index(index_name)?;
3086        let equalities = extract_literal_equalities(self.statement.where_clause.as_ref()?)?;
3087        index
3088            .column_indices
3089            .iter()
3090            .map(|&column_index| table.columns.get(column_index))
3091            .collect::<Option<Vec<_>>>()?
3092            .into_iter()
3093            .map(|column| equalities.get(column.name.as_str()).cloned())
3094            .collect()
3095    }
3096
3097    fn extract_rowid_lookup(&self) -> Option<u64> {
3098        let equalities = extract_literal_equalities(self.statement.where_clause.as_ref()?)?;
3099        match equalities.get("rowid") {
3100            Some(Value::Integer(v)) if v >= &0 => Some(*v as u64),
3101            _ => None,
3102        }
3103    }
3104
3105    fn uses_materialized_reference(&self) -> bool {
3106        matches!(
3107            (&self.access_path, &self.statement.from),
3108            (SelectAccessPath::JoinScan, _)
3109                | (_, TableReference::Derived { .. })
3110                | (_, TableReference::CrossJoin(_, _))
3111                | (_, TableReference::InnerJoin { .. })
3112                | (_, TableReference::LeftJoin { .. })
3113                | (_, TableReference::RightJoin { .. })
3114                | (_, TableReference::FullOuterJoin { .. })
3115        )
3116    }
3117
3118    fn materialize_table_access_rows(
3119        &self,
3120        ctx: &mut ExecutionContext,
3121        table_name: &str,
3122        table: &Table,
3123    ) -> Result<Vec<Vec<Value>>> {
3124        match self.access_path {
3125            SelectAccessPath::RowIdLookup => {
3126                let rowid = self.extract_rowid_lookup().ok_or_else(|| {
3127                    HematiteError::InternalError(
3128                        "Planner selected rowid lookup without a matching predicate".to_string(),
3129                    )
3130                })?;
3131                Ok(ctx
3132                    .engine
3133                    .lookup_row_by_rowid(table_name, rowid)?
3134                    .map(|row| vec![row.values])
3135                    .unwrap_or_default())
3136            }
3137            SelectAccessPath::PrimaryKeyLookup => {
3138                let primary_key_values =
3139                    self.extract_primary_key_lookup(table).ok_or_else(|| {
3140                        HematiteError::InternalError(
3141                            "Planner selected primary-key lookup without a matching predicate"
3142                                .to_string(),
3143                        )
3144                    })?;
3145                let encoded_key = ctx.engine.encode_primary_key(&primary_key_values)?;
3146                let mut index_cursor = ctx.engine.open_primary_key_cursor(table)?;
3147                let rowid = index_cursor
3148                    .seek_key(&encoded_key)
3149                    .then(|| index_cursor.current().map(|entry| entry.row_id))
3150                    .flatten();
3151                match rowid {
3152                    Some(rowid) => {
3153                        let mut table_cursor = ctx.engine.open_table_cursor(table_name)?;
3154                        Ok(table_cursor
3155                            .seek_rowid(rowid)
3156                            .then(|| table_cursor.current().map(|row| vec![row.values.clone()]))
3157                            .flatten()
3158                            .unwrap_or_default())
3159                    }
3160                    None => Ok(Vec::new()),
3161                }
3162            }
3163            SelectAccessPath::SecondaryIndexLookup(ref index_name) => {
3164                let key_values = self
3165                    .extract_secondary_index_lookup(table, index_name)
3166                    .ok_or_else(|| {
3167                        HematiteError::InternalError(format!(
3168                            "Planner selected secondary index lookup '{}' without a matching predicate",
3169                            index_name
3170                        ))
3171                    })?;
3172                let encoded_key = ctx.engine.encode_secondary_index_key(&key_values)?;
3173                let mut index_cursor = ctx.engine.open_secondary_index_cursor(table, index_name)?;
3174                let mut table_cursor = ctx.engine.open_table_cursor(table_name)?;
3175                let mut rows = Vec::new();
3176
3177                if index_cursor.seek_key(&encoded_key) {
3178                    loop {
3179                        let Some(entry) = index_cursor.current() else {
3180                            break;
3181                        };
3182                        if entry.key.as_slice() != encoded_key.as_slice() {
3183                            break;
3184                        }
3185                        if table_cursor.seek_rowid(entry.row_id) {
3186                            if let Some(row) = table_cursor.current() {
3187                                rows.push(row.values.clone());
3188                            }
3189                        }
3190                        if !index_cursor.next() {
3191                            break;
3192                        }
3193                    }
3194                }
3195
3196                Ok(rows)
3197            }
3198            SelectAccessPath::FullTableScan => ctx.engine.read_from_table(table_name),
3199            SelectAccessPath::JoinScan => Err(HematiteError::InternalError(
3200                "Planner selected join scan for direct table access".to_string(),
3201            )),
3202        }
3203    }
3204}
3205
3206impl QueryExecutor for SelectExecutor {
3207    fn execute(&mut self, ctx: &mut ExecutionContext) -> Result<QueryResult> {
3208        validate_statement(&Statement::Select(self.statement.clone()), &ctx.catalog)?;
3209        self.execute_body(ctx)
3210    }
3211}
3212
3213#[derive(Debug, Clone)]
3214pub struct InsertExecutor {
3215    pub statement: InsertStatement,
3216}
3217
3218impl InsertExecutor {
3219    pub fn new(statement: InsertStatement) -> Self {
3220        Self { statement }
3221    }
3222
3223    fn evaluate_value_expression(&self, expr: &Expression) -> Result<Value> {
3224        match expr {
3225            Expression::Literal(value) => Ok(lower_literal_value(value)),
3226            Expression::IntervalLiteral { value, qualifier } => match qualifier {
3227                IntervalQualifier::YearToMonth => Ok(Value::IntervalYearMonth(
3228                    IntervalYearMonthValue::parse(value)?,
3229                )),
3230                IntervalQualifier::DayToSecond => Ok(Value::IntervalDaySecond(
3231                    IntervalDaySecondValue::parse(value)?,
3232                )),
3233            },
3234            Expression::Parameter(index) => Err(HematiteError::ParseError(format!(
3235                "Unbound parameter {} reached execution",
3236                index + 1
3237            ))),
3238            Expression::Cast { expr, target_type } => cast_value_to_type(
3239                self.evaluate_value_expression(expr)?,
3240                lower_type_name(target_type.clone()),
3241            ),
3242            Expression::Case {
3243                branches,
3244                else_expr,
3245            } => evaluate_case_expression(
3246                branches,
3247                else_expr.as_deref(),
3248                |condition| self.evaluate_boolean_value_expression(condition),
3249                |expr| self.evaluate_value_expression(expr),
3250            ),
3251            Expression::ScalarSubquery(_) => Err(HematiteError::ParseError(
3252                "INSERT expressions cannot use scalar subqueries".to_string(),
3253            )),
3254            Expression::AggregateCall { .. } => Err(HematiteError::ParseError(
3255                "INSERT expressions cannot use aggregate functions".to_string(),
3256            )),
3257            Expression::ScalarFunctionCall { function, args } => {
3258                evaluate_scalar_function_call(*function, args, |expr| {
3259                    self.evaluate_value_expression(expr)
3260                })
3261            }
3262            Expression::UnaryMinus(expr) => {
3263                negate_numeric_value(self.evaluate_value_expression(expr)?)
3264            }
3265            Expression::UnaryNot(_)
3266            | Expression::Comparison { .. }
3267            | Expression::InList { .. }
3268            | Expression::InSubquery { .. }
3269            | Expression::Between { .. }
3270            | Expression::Like { .. }
3271            | Expression::Exists { .. }
3272            | Expression::NullCheck { .. }
3273            | Expression::Logical { .. } => Ok(nullable_bool_to_value(
3274                self.evaluate_boolean_value_expression(expr)?,
3275            )),
3276            Expression::Binary {
3277                left,
3278                operator,
3279                right,
3280            } => evaluate_arithmetic_values(
3281                operator,
3282                self.evaluate_value_expression(left)?,
3283                self.evaluate_value_expression(right)?,
3284            ),
3285            Expression::Column(name) => Err(HematiteError::ParseError(format!(
3286                "INSERT expressions cannot reference column '{}'",
3287                name
3288            ))),
3289        }
3290    }
3291
3292    fn evaluate_boolean_value_expression(&self, expr: &Expression) -> Result<Option<bool>> {
3293        match expr {
3294            Expression::Comparison {
3295                left,
3296                operator,
3297                right,
3298            } => {
3299                let left_val = self.evaluate_value_expression(left)?;
3300                let right_val = self.evaluate_value_expression(right)?;
3301                Ok(compare_condition_values(
3302                    &left_val, operator, &right_val, None,
3303                ))
3304            }
3305            Expression::InList {
3306                expr,
3307                values,
3308                is_not,
3309            } => evaluate_in_list_predicate(expr, values, *is_not, None, |value_expr| {
3310                self.evaluate_value_expression(value_expr)
3311            }),
3312            Expression::InSubquery { .. } => Err(HematiteError::ParseError(
3313                "INSERT expressions cannot use subqueries in boolean expressions".to_string(),
3314            )),
3315            Expression::Between {
3316                expr,
3317                lower,
3318                upper,
3319                is_not,
3320            } => evaluate_between_predicate(expr, lower, upper, *is_not, None, |value_expr| {
3321                self.evaluate_value_expression(value_expr)
3322            }),
3323            Expression::Like {
3324                expr,
3325                pattern,
3326                is_not,
3327            } => evaluate_like_predicate(expr, pattern, *is_not, None, |value_expr| {
3328                self.evaluate_value_expression(value_expr)
3329            }),
3330            Expression::Exists { .. } => Err(HematiteError::ParseError(
3331                "INSERT expressions cannot use EXISTS in boolean expressions".to_string(),
3332            )),
3333            Expression::NullCheck { expr, is_not } => {
3334                let value = self.evaluate_value_expression(expr)?;
3335                let is_null = value.is_null();
3336                Ok(Some(if *is_not { !is_null } else { is_null }))
3337            }
3338            Expression::UnaryNot(expr) => Ok(self
3339                .evaluate_boolean_value_expression(expr)?
3340                .map(|value| !value)),
3341            Expression::Logical {
3342                left,
3343                operator,
3344                right,
3345            } => {
3346                let left_result = self.evaluate_boolean_value_expression(left)?;
3347                let right_result = self.evaluate_boolean_value_expression(right)?;
3348                Ok(match operator {
3349                    LogicalOperator::And => logical_and_values(left_result, right_result),
3350                    LogicalOperator::Or => logical_or_values(left_result, right_result),
3351                })
3352            }
3353            _ => coerce_value_to_nullable_bool(
3354                self.evaluate_value_expression(expr)?,
3355                "Boolean expression",
3356            ),
3357        }
3358    }
3359
3360    fn ensure_primary_key_is_unique(
3361        &self,
3362        ctx: &mut ExecutionContext,
3363        table: &Table,
3364        existing_rows: &[Vec<Value>],
3365        candidate_row: &[Value],
3366    ) -> Result<()> {
3367        let candidate_pk = primary_key_values(table, candidate_row)?;
3368
3369        if ctx
3370            .engine
3371            .lookup_row_by_primary_key(table, &candidate_pk)?
3372            .is_some()
3373        {
3374            return Err(duplicate_primary_key_parse_error(
3375                &table.name,
3376                &candidate_pk,
3377            ));
3378        }
3379
3380        for existing_row in existing_rows {
3381            let existing_pk = primary_key_values(table, existing_row)?;
3382
3383            if existing_pk == candidate_pk {
3384                return Err(duplicate_primary_key_parse_error(
3385                    &table.name,
3386                    &candidate_pk,
3387                ));
3388            }
3389        }
3390
3391        Ok(())
3392    }
3393
3394    fn ensure_unique_secondary_indexes_are_unique(
3395        &self,
3396        ctx: &mut ExecutionContext,
3397        table: &Table,
3398        candidate_row: &[Value],
3399    ) -> Result<()> {
3400        for index in table.secondary_indexes.iter().filter(|index| index.unique) {
3401            let key_values = secondary_index_key_values(index, candidate_row);
3402            if !ctx
3403                .engine
3404                .lookup_secondary_index_rowids(table, &index.name, &key_values)?
3405                .is_empty()
3406            {
3407                return Err(unique_index_parse_error(&index.name, &table.name));
3408            }
3409        }
3410
3411        Ok(())
3412    }
3413
3414    fn find_conflicting_row(
3415        &self,
3416        ctx: &mut ExecutionContext<'_>,
3417        table: &Table,
3418        candidate_row: &[Value],
3419    ) -> Result<Option<StoredRow>> {
3420        let mut conflict_row: Option<StoredRow> = ctx
3421            .engine
3422            .lookup_row_by_primary_key(table, &primary_key_values(table, candidate_row)?)?;
3423
3424        for index in table.secondary_indexes.iter().filter(|index| index.unique) {
3425            let key_values = secondary_index_key_values(index, candidate_row);
3426            for row_id in
3427                ctx.engine
3428                    .lookup_secondary_index_rowids(table, &index.name, &key_values)?
3429            {
3430                let row = ctx
3431                    .engine
3432                    .lookup_row_by_rowid(&table.name, row_id)?
3433                    .ok_or_else(|| {
3434                        HematiteError::CorruptedData(format!(
3435                            "Unique index '{}' points at missing rowid {} in table '{}'",
3436                            index.name, row_id, table.name
3437                        ))
3438                    })?;
3439                if let Some(existing) = &conflict_row {
3440                    if existing.row_id != row.row_id {
3441                        return Err(HematiteError::ParseError(format!(
3442                            "INSERT ON DUPLICATE KEY UPDATE matched multiple rows in table '{}'",
3443                            table.name
3444                        )));
3445                    }
3446                } else {
3447                    conflict_row = Some(row);
3448                }
3449            }
3450        }
3451
3452        Ok(conflict_row)
3453    }
3454
3455    fn apply_on_duplicate_assignments(
3456        &self,
3457        ctx: &mut ExecutionContext<'_>,
3458        table: &Table,
3459        mut row: StoredRow,
3460        assignments: &[UpdateAssignment],
3461    ) -> Result<()> {
3462        if assignments.is_empty() {
3463            return Ok(());
3464        }
3465
3466        let evaluator = SelectExecutor::new(
3467            SelectStatement::single_table_scope(&table.name),
3468            SelectAccessPath::FullTableScan,
3469        );
3470        let sources = evaluator.resolve_sources(ctx)?;
3471        let mut subquery_cache = SubqueryCache::new();
3472        let original_values = row.values.clone();
3473
3474        for assignment in assignments {
3475            let column_index = table.get_column_index(&assignment.column).ok_or_else(|| {
3476                HematiteError::ParseError(format!(
3477                    "Column '{}' does not exist in table '{}'",
3478                    assignment.column, table.name
3479                ))
3480            })?;
3481            let column = &table.columns[column_index];
3482            let value = evaluator.evaluate_expression(
3483                ctx,
3484                &mut subquery_cache,
3485                &sources,
3486                &assignment.value,
3487                &row.values,
3488            )?;
3489            row.values[column_index] = coerce_column_value(column, value)?;
3490        }
3491
3492        table
3493            .validate_row(&row.values)
3494            .map_err(|err| HematiteError::ParseError(err.to_string()))?;
3495        validate_row_constraints(ctx, table, &row.values)?;
3496        if parent_reference_key_changed(ctx, table, &original_values, &row.values)? {
3497            apply_parent_update_foreign_key_actions(ctx, table, &original_values, &row.values)?;
3498        }
3499        ensure_stored_row_uniqueness(ctx, table, &row)?;
3500        remove_stored_row(ctx, &table.name, table, row.row_id)?;
3501        write_stored_row(ctx, &table.name, table, row, true)?;
3502        Ok(())
3503    }
3504
3505    fn build_row_with_metadata(
3506        &self,
3507        ctx: &ExecutionContext<'_>,
3508        table: &Table,
3509        value_row: &[Value],
3510    ) -> Result<Vec<Value>> {
3511        let mut row = Vec::with_capacity(table.columns.len());
3512        let next_row_id = ctx
3513            .engine
3514            .get_table_metadata()
3515            .get(&self.statement.table)
3516            .map(|metadata| metadata.next_row_id)
3517            .ok_or_else(|| {
3518                HematiteError::InternalError(format!(
3519                    "Table metadata for '{}' disappeared during INSERT",
3520                    self.statement.table
3521                ))
3522            })?;
3523
3524        for column in &table.columns {
3525            let value = if let Some(position) = self
3526                .statement
3527                .columns
3528                .iter()
3529                .position(|name| name == &column.name)
3530            {
3531                let expr = value_row.get(position).ok_or_else(|| {
3532                    HematiteError::ParseError(format!("Missing value for column '{}'", column.name))
3533                })?;
3534                if column.auto_increment && expr.is_null() {
3535                    auto_increment_value(column, next_row_id)?
3536                } else {
3537                    coerce_column_value(column, expr.clone())?
3538                }
3539            } else if column.auto_increment {
3540                auto_increment_value(column, next_row_id)?
3541            } else if let Some(default_value) = &column.default_value {
3542                default_value.clone()
3543            } else if column.nullable {
3544                Value::Null
3545            } else {
3546                return Err(HematiteError::ParseError(format!(
3547                    "Missing value for required column '{}'",
3548                    column.name
3549                )));
3550            };
3551
3552            row.push(value);
3553        }
3554
3555        table
3556            .validate_row(&row)
3557            .map_err(|err| HematiteError::ParseError(err.to_string()))?;
3558
3559        Ok(row)
3560    }
3561
3562    fn evaluate_value_row(&self, row: &[Expression]) -> Result<Vec<Value>> {
3563        row.iter()
3564            .map(|expr| self.evaluate_value_expression(expr))
3565            .collect()
3566    }
3567}
3568
3569impl QueryExecutor for InsertExecutor {
3570    fn execute(&mut self, ctx: &mut ExecutionContext) -> Result<QueryResult> {
3571        validate_statement(&Statement::Insert(self.statement.clone()), &ctx.catalog)?;
3572
3573        let table = catalog_table(ctx, &self.statement.table)?;
3574
3575        let input_rows = match &self.statement.source {
3576            InsertSource::Values(rows) => rows
3577                .iter()
3578                .map(|row| self.evaluate_value_row(row))
3579                .collect::<Result<Vec<_>>>()?,
3580            InsertSource::Select(select) => {
3581                let planner = QueryPlanner::new(ctx.catalog.clone())
3582                    .with_table_row_counts(current_table_row_counts(ctx.engine));
3583                let plan = planner.plan(Statement::Select((**select).clone()))?;
3584                match plan.program {
3585                    ExecutionProgram::Select {
3586                        statement,
3587                        access_path,
3588                    } => {
3589                        SelectExecutor::new(statement, access_path)
3590                            .execute(ctx)?
3591                            .rows
3592                    }
3593                    _ => {
3594                        return Err(HematiteError::InternalError(
3595                            "Expected SELECT execution program for INSERT source".to_string(),
3596                        ))
3597                    }
3598                }
3599            }
3600        };
3601
3602        for value_row in &input_rows {
3603            let row_values = self.build_row_with_metadata(ctx, &table, value_row)?;
3604            if let Some(assignments) = &self.statement.on_duplicate {
3605                if let Some(conflicting_row) =
3606                    self.find_conflicting_row(ctx, &table, &row_values)?
3607                {
3608                    self.apply_on_duplicate_assignments(ctx, &table, conflicting_row, assignments)?;
3609                    continue;
3610                }
3611            }
3612
3613            validate_row_constraints(ctx, &table, &row_values)?;
3614            self.ensure_primary_key_is_unique(ctx, &table, &[], &row_values)?;
3615            self.ensure_unique_secondary_indexes_are_unique(ctx, &table, &row_values)?;
3616            let inserted_row = StoredRow {
3617                row_id: 0,
3618                values: row_values,
3619            };
3620            write_stored_row(
3621                ctx,
3622                &self.statement.table,
3623                &table,
3624                inserted_row.clone(),
3625                false,
3626            )?;
3627            if let Some(new_row) = ctx.engine.lookup_row_by_primary_key(
3628                &table,
3629                &primary_key_values(&table, &inserted_row.values)?,
3630            )? {
3631                ctx.mutation_events.push(MutationEvent::Insert {
3632                    table_name: self.statement.table.clone(),
3633                    new_row,
3634                });
3635            }
3636        }
3637
3638        Ok(mutation_result(input_rows.len()))
3639    }
3640}
3641
3642#[derive(Debug, Clone)]
3643pub struct UpdateExecutor {
3644    pub statement: UpdateStatement,
3645    pub access_path: SelectAccessPath,
3646}
3647
3648impl UpdateExecutor {
3649    pub fn new(statement: UpdateStatement, access_path: SelectAccessPath) -> Self {
3650        Self {
3651            statement,
3652            access_path,
3653        }
3654    }
3655
3656    fn ensure_primary_keys_unique(&self, table: &Table, rows: &[Vec<Value>]) -> Result<()> {
3657        for i in 0..rows.len() {
3658            let left = primary_key_values(table, &rows[i])?;
3659            for right_row in rows.iter().skip(i + 1) {
3660                let right = primary_key_values(table, right_row)?;
3661                if left == right {
3662                    return Err(duplicate_primary_key_parse_error(&table.name, &left));
3663                }
3664            }
3665        }
3666
3667        Ok(())
3668    }
3669
3670    fn ensure_updated_primary_keys_remain_unique(
3671        &self,
3672        ctx: &mut ExecutionContext<'_>,
3673        table: &Table,
3674        updated_rows: &[StoredRow],
3675    ) -> Result<()> {
3676        self.ensure_primary_keys_unique(
3677            table,
3678            &updated_rows
3679                .iter()
3680                .map(|row| row.values.clone())
3681                .collect::<Vec<_>>(),
3682        )?;
3683
3684        for row in updated_rows {
3685            let candidate_pk = primary_key_values(table, &row.values)?;
3686            if let Some(existing_rowid) =
3687                ctx.engine.lookup_primary_key_rowid(table, &candidate_pk)?
3688            {
3689                if existing_rowid != row.row_id
3690                    && !updated_rows
3691                        .iter()
3692                        .any(|updated_row| updated_row.row_id == existing_rowid)
3693                {
3694                    return Err(duplicate_primary_key_parse_error(
3695                        &table.name,
3696                        &candidate_pk,
3697                    ));
3698                }
3699            }
3700        }
3701
3702        Ok(())
3703    }
3704
3705    fn ensure_updated_unique_indexes_remain_unique(
3706        &self,
3707        ctx: &mut ExecutionContext<'_>,
3708        table: &Table,
3709        updated_rows: &[StoredRow],
3710    ) -> Result<()> {
3711        let mut encoded_keys = std::collections::HashSet::new();
3712
3713        for index in table.secondary_indexes.iter().filter(|index| index.unique) {
3714            encoded_keys.clear();
3715            for row in updated_rows {
3716                let key_values = secondary_index_key_values(index, &row.values);
3717                let encoded_key = ctx.engine.encode_secondary_index_key(&key_values)?;
3718                if !encoded_keys.insert(encoded_key) {
3719                    return Err(unique_index_parse_error(&index.name, &table.name));
3720                }
3721            }
3722
3723            for row in updated_rows {
3724                let key_values = secondary_index_key_values(index, &row.values);
3725                let existing_rowids =
3726                    ctx.engine
3727                        .lookup_secondary_index_rowids(table, &index.name, &key_values)?;
3728                if existing_rowids.into_iter().any(|existing_rowid| {
3729                    existing_rowid != row.row_id
3730                        && !updated_rows
3731                            .iter()
3732                            .any(|updated_row| updated_row.row_id == existing_rowid)
3733                }) {
3734                    return Err(unique_index_parse_error(&index.name, &table.name));
3735                }
3736            }
3737        }
3738
3739        Ok(())
3740    }
3741}
3742
3743impl QueryExecutor for UpdateExecutor {
3744    fn execute(&mut self, ctx: &mut ExecutionContext<'_>) -> Result<QueryResult> {
3745        validate_statement(&Statement::Update(self.statement.clone()), &ctx.catalog)?;
3746
3747        let table = catalog_table(ctx, &self.statement.table)?;
3748        let LocatedMutationRows {
3749            sources,
3750            stored_rows: original_rows,
3751            joined_rows,
3752        } = locate_mutation_rows(
3753            ctx,
3754            &table,
3755            &self.statement.table,
3756            self.statement.target_binding_name(),
3757            self.statement.source.as_ref(),
3758            self.statement.where_clause.clone(),
3759            &self.access_path,
3760        )?;
3761        let locator_statement =
3762            locator_select_statement(self.statement.source(), self.statement.where_clause.clone());
3763        let select_executor = SelectExecutor::new(locator_statement, self.access_path.clone());
3764        let original_rows_snapshot = original_rows.clone();
3765        let mut updated_rows_data = Vec::with_capacity(original_rows.len());
3766        let mut updated_rows = 0usize;
3767        let mut subquery_cache = SubqueryCache::new();
3768        let row_contexts = joined_rows.as_deref();
3769
3770        for (index, stored_row) in original_rows.into_iter().enumerate() {
3771            let mut updated_row = stored_row.values.clone();
3772            for assignment in &self.statement.assignments {
3773                let column_index = table.get_column_index(&assignment.column).ok_or_else(|| {
3774                    HematiteError::ParseError(format!(
3775                        "Column '{}' does not exist in table '{}'",
3776                        assignment.column, self.statement.table
3777                    ))
3778                })?;
3779                let column = &table.columns[column_index];
3780                let value = {
3781                    let evaluation_row = row_contexts
3782                        .and_then(|rows| rows.get(index).map(Vec::as_slice))
3783                        .unwrap_or(updated_row.as_slice());
3784                    select_executor.evaluate_expression(
3785                        ctx,
3786                        &mut subquery_cache,
3787                        &sources,
3788                        &assignment.value,
3789                        evaluation_row,
3790                    )?
3791                };
3792                updated_row[column_index] = coerce_column_value(column, value)?;
3793            }
3794
3795            table
3796                .validate_row(&updated_row)
3797                .map_err(|err| HematiteError::ParseError(err.to_string()))?;
3798            validate_row_constraints(ctx, &table, &updated_row)?;
3799            if parent_reference_key_changed(ctx, &table, &stored_row.values, &updated_row)? {
3800                apply_parent_update_foreign_key_actions(
3801                    ctx,
3802                    &table,
3803                    &stored_row.values,
3804                    &updated_row,
3805                )?;
3806            }
3807            updated_rows_data.push(StoredRow {
3808                row_id: stored_row.row_id,
3809                values: updated_row,
3810            });
3811            updated_rows += 1;
3812        }
3813
3814        self.ensure_updated_primary_keys_remain_unique(ctx, &table, &updated_rows_data)?;
3815        self.ensure_updated_unique_indexes_remain_unique(ctx, &table, &updated_rows_data)?;
3816
3817        for original_row in &updated_rows_data {
3818            remove_stored_row(ctx, &self.statement.table, &table, original_row.row_id)?;
3819        }
3820
3821        for row in &updated_rows_data {
3822            write_stored_row(ctx, &self.statement.table, &table, row.clone(), true)?;
3823        }
3824
3825        for (old_row, new_row) in original_rows_snapshot
3826            .into_iter()
3827            .zip(updated_rows_data.into_iter())
3828        {
3829            ctx.mutation_events.push(MutationEvent::Update {
3830                table_name: self.statement.table.clone(),
3831                old_row,
3832                new_row,
3833            });
3834        }
3835
3836        Ok(mutation_result(updated_rows))
3837    }
3838}
3839
3840#[derive(Debug, Clone)]
3841pub struct DeleteExecutor {
3842    pub statement: DeleteStatement,
3843    pub access_path: SelectAccessPath,
3844}
3845
3846impl DeleteExecutor {
3847    pub fn new(statement: DeleteStatement, access_path: SelectAccessPath) -> Self {
3848        Self {
3849            statement,
3850            access_path,
3851        }
3852    }
3853}
3854
3855struct LocatedMutationRows {
3856    sources: Vec<ResolvedSource>,
3857    stored_rows: Vec<StoredRow>,
3858    joined_rows: Option<Vec<Vec<Value>>>,
3859}
3860
3861fn uses_join_mutation_source(source: Option<&TableReference>) -> bool {
3862    matches!(source, Some(source) if !matches!(source, TableReference::Table(_, _)))
3863}
3864
3865fn locate_rowids_for_access_path(
3866    ctx: &mut ExecutionContext<'_>,
3867    table: &Table,
3868    table_name: &str,
3869    access_path: &SelectAccessPath,
3870    select_executor: &SelectExecutor,
3871) -> Result<Vec<u64>> {
3872    match access_path {
3873        SelectAccessPath::JoinScan => Err(HematiteError::ParseError(
3874            "Join scans are not valid for UPDATE or DELETE locators".to_string(),
3875        )),
3876        SelectAccessPath::RowIdLookup => {
3877            Ok(select_executor.extract_rowid_lookup().into_iter().collect())
3878        }
3879        SelectAccessPath::PrimaryKeyLookup => {
3880            let Some(primary_key_values) = select_executor.extract_primary_key_lookup(table) else {
3881                return Ok(Vec::new());
3882            };
3883            let encoded_key = ctx.engine.encode_primary_key(&primary_key_values)?;
3884            let mut index_cursor = ctx.engine.open_primary_key_cursor(table)?;
3885            Ok(index_cursor
3886                .seek_key(&encoded_key)
3887                .then(|| index_cursor.current().map(|entry| entry.row_id))
3888                .flatten()
3889                .into_iter()
3890                .collect())
3891        }
3892        SelectAccessPath::SecondaryIndexLookup(index_name) => {
3893            let Some(key_values) =
3894                select_executor.extract_secondary_index_lookup(table, index_name)
3895            else {
3896                return Ok(Vec::new());
3897            };
3898            let encoded_key = ctx.engine.encode_secondary_index_key(&key_values)?;
3899            let mut index_cursor = ctx.engine.open_secondary_index_cursor(table, index_name)?;
3900            let mut rowids = Vec::new();
3901
3902            if index_cursor.seek_key(&encoded_key) {
3903                loop {
3904                    let Some(entry) = index_cursor.current() else {
3905                        break;
3906                    };
3907                    if entry.key.as_slice() != encoded_key.as_slice() {
3908                        break;
3909                    }
3910                    rowids.push(entry.row_id);
3911                    if !index_cursor.next() {
3912                        break;
3913                    }
3914                }
3915            }
3916
3917            Ok(rowids)
3918        }
3919        SelectAccessPath::FullTableScan => {
3920            let mut table_cursor = ctx.engine.open_table_cursor(table_name)?;
3921            let mut rowids = Vec::new();
3922            if table_cursor.first() {
3923                loop {
3924                    if let Some(row) = table_cursor.current() {
3925                        rowids.push(row.row_id);
3926                    }
3927                    if !table_cursor.next() {
3928                        break;
3929                    }
3930                }
3931            }
3932            Ok(rowids)
3933        }
3934    }
3935}
3936
3937fn locate_rows_for_access_path(
3938    ctx: &mut ExecutionContext<'_>,
3939    table: &Table,
3940    table_name: &str,
3941    access_path: &SelectAccessPath,
3942    select_executor: &SelectExecutor,
3943) -> Result<Vec<StoredRow>> {
3944    let rowids =
3945        locate_rowids_for_access_path(ctx, table, table_name, access_path, select_executor)?;
3946    let mut table_cursor = ctx.engine.open_table_cursor(table_name)?;
3947    let mut rows = Vec::new();
3948    let mut subquery_cache = SubqueryCache::new();
3949    let sources = select_executor.resolve_sources(ctx)?;
3950
3951    for rowid in rowids {
3952        if table_cursor.seek_rowid(rowid) {
3953            if let Some(row) = table_cursor.current() {
3954                let row = row.clone();
3955                let include = match &select_executor.statement.where_clause {
3956                    Some(where_clause) => select_executor.conditions_match(
3957                        ctx,
3958                        &mut subquery_cache,
3959                        &sources,
3960                        &where_clause.conditions,
3961                        &row.values,
3962                    )?,
3963                    None => true,
3964                };
3965
3966                if include {
3967                    rows.push(row);
3968                }
3969            }
3970        }
3971    }
3972
3973    Ok(rows)
3974}
3975
3976fn locate_rows_for_join_source(
3977    ctx: &mut ExecutionContext<'_>,
3978    table: &Table,
3979    target_binding: &str,
3980    select_executor: &mut SelectExecutor,
3981) -> Result<(Vec<ResolvedSource>, Vec<(StoredRow, Vec<Value>)>)> {
3982    let (sources, joined_rows) = select_executor.materialize_filtered_rows(ctx)?;
3983    let target_source = sources
3984        .iter()
3985        .find(|source| {
3986            source.name.eq_ignore_ascii_case(&table.name)
3987                && source
3988                    .alias
3989                    .as_deref()
3990                    .unwrap_or(&source.name)
3991                    .eq_ignore_ascii_case(target_binding)
3992        })
3993        .ok_or_else(|| {
3994            HematiteError::ParseError(format!(
3995                "Mutation target '{}' does not resolve to table '{}'",
3996                target_binding, table.name
3997            ))
3998        })?;
3999    let mut seen_rowids = std::collections::HashSet::new();
4000    let mut rows = Vec::new();
4001
4002    for joined_row in joined_rows {
4003        let Some(candidate_rowid) =
4004            target_rowid_from_join_row(ctx, table, target_source, &joined_row)?
4005        else {
4006            continue;
4007        };
4008
4009        if !seen_rowids.insert(candidate_rowid) {
4010            continue;
4011        }
4012
4013        if let Some(stored_row) = ctx
4014            .engine
4015            .lookup_row_by_rowid(&table.name, candidate_rowid)?
4016        {
4017            rows.push((stored_row, joined_row));
4018        }
4019    }
4020
4021    Ok((sources, rows))
4022}
4023
4024fn locate_mutation_rows(
4025    ctx: &mut ExecutionContext<'_>,
4026    table: &Table,
4027    table_name: &str,
4028    target_binding: &str,
4029    source: Option<&TableReference>,
4030    where_clause: Option<WhereClause>,
4031    access_path: &SelectAccessPath,
4032) -> Result<LocatedMutationRows> {
4033    let locator_statement = locator_select_statement(
4034        source
4035            .cloned()
4036            .unwrap_or_else(|| TableReference::Table(table_name.to_string(), None)),
4037        where_clause,
4038    );
4039    let mut select_executor = SelectExecutor::new(locator_statement, access_path.clone());
4040
4041    if uses_join_mutation_source(source) {
4042        let (sources, rows) =
4043            locate_rows_for_join_source(ctx, table, target_binding, &mut select_executor)?;
4044        let (stored_rows, joined_rows): (Vec<_>, Vec<_>) = rows.into_iter().unzip();
4045        Ok(LocatedMutationRows {
4046            sources,
4047            stored_rows,
4048            joined_rows: Some(joined_rows),
4049        })
4050    } else {
4051        let stored_rows =
4052            locate_rows_for_access_path(ctx, table, table_name, access_path, &select_executor)?;
4053        Ok(LocatedMutationRows {
4054            sources: select_executor.resolve_sources(ctx)?,
4055            stored_rows,
4056            joined_rows: None,
4057        })
4058    }
4059}
4060
4061fn target_rowid_from_join_row(
4062    ctx: &mut ExecutionContext<'_>,
4063    table: &Table,
4064    target_source: &ResolvedSource,
4065    joined_row: &[Value],
4066) -> Result<Option<u64>> {
4067    let mut primary_key = Vec::with_capacity(table.primary_key_columns.len());
4068    for &column_index in &table.primary_key_columns {
4069        let value = joined_row
4070            .get(target_source.offset + column_index)
4071            .cloned()
4072            .unwrap_or(Value::Null);
4073        if value.is_null() {
4074            return Ok(None);
4075        }
4076        primary_key.push(value);
4077    }
4078
4079    ctx.engine.lookup_primary_key_rowid(table, &primary_key)
4080}
4081
4082impl QueryExecutor for DeleteExecutor {
4083    fn execute(&mut self, ctx: &mut ExecutionContext) -> Result<QueryResult> {
4084        validate_statement(&Statement::Delete(self.statement.clone()), &ctx.catalog)?;
4085
4086        let table = catalog_table(ctx, &self.statement.table)?;
4087        let rows_to_delete = locate_mutation_rows(
4088            ctx,
4089            &table,
4090            &self.statement.table,
4091            self.statement.target_binding_name(),
4092            self.statement.source.as_ref(),
4093            self.statement.where_clause.clone(),
4094            &self.access_path,
4095        )?
4096        .stored_rows;
4097
4098        for row in &rows_to_delete {
4099            apply_parent_delete_foreign_key_actions(ctx, &table, &row.values)?;
4100            ctx.mutation_events.push(MutationEvent::Delete {
4101                table_name: self.statement.table.clone(),
4102                old_row: row.clone(),
4103            });
4104            remove_stored_row(ctx, &self.statement.table, &table, row.row_id)?;
4105        }
4106
4107        Ok(mutation_result(rows_to_delete.len()))
4108    }
4109}
4110
4111#[derive(Debug, Clone)]
4112pub struct CreateExecutor {
4113    pub statement: CreateStatement,
4114}
4115
4116impl CreateExecutor {
4117    pub fn new(statement: CreateStatement) -> Self {
4118        Self { statement }
4119    }
4120
4121    fn convert_column_definitions(&self) -> Result<Vec<Column>> {
4122        let mut columns = Vec::new();
4123        let mut next_id = 1;
4124
4125        for col_def in &self.statement.columns {
4126            let mut column = Column::new(
4127                crate::catalog::ColumnId::new(next_id),
4128                col_def.name.clone(),
4129                lower_type_name(col_def.data_type.clone()),
4130            )
4131            .character_set(col_def.character_set.clone())
4132            .collation(col_def.collation.clone())
4133            .nullable(col_def.nullable)
4134            .primary_key(col_def.primary_key)
4135            .auto_increment(col_def.auto_increment);
4136
4137            if let Some(default_val) = &col_def.default_value {
4138                let coerced_default =
4139                    coerce_column_value(&column, lower_literal_value(default_val))?;
4140                column = column.default_value(coerced_default);
4141            }
4142
4143            columns.push(column);
4144            next_id += 1;
4145        }
4146
4147        Ok(columns)
4148    }
4149
4150    fn unique_index_specs(&self) -> Result<Vec<(String, Vec<usize>)>> {
4151        let mut unique_indexes = self
4152            .statement
4153            .columns
4154            .iter()
4155            .enumerate()
4156            .filter_map(|(index, column)| {
4157                if column.unique && !column.primary_key {
4158                    Some((
4159                        auto_unique_index_name(&self.statement.table, &column.name, index),
4160                        vec![index],
4161                    ))
4162                } else {
4163                    None
4164                }
4165            })
4166            .collect::<Vec<_>>();
4167
4168        for (position, unique) in
4169            self.statement
4170                .constraints
4171                .iter()
4172                .enumerate()
4173                .filter_map(|(position, constraint)| match constraint {
4174                    TableConstraint::Unique(unique) => Some((position, unique)),
4175                    TableConstraint::Check(_) | TableConstraint::ForeignKey(_) => None,
4176                })
4177        {
4178            let column_indices = unique
4179                .columns
4180                .iter()
4181                .map(|column_name| {
4182                    self.statement
4183                        .columns
4184                        .iter()
4185                        .position(|column| column.name == *column_name)
4186                        .ok_or_else(|| {
4187                            HematiteError::ParseError(format!(
4188                                "UNIQUE constraint column '{}' does not exist in table '{}'",
4189                                column_name, self.statement.table
4190                            ))
4191                        })
4192                })
4193                .collect::<Result<Vec<_>>>()?;
4194            unique_indexes.push((
4195                unique_constraint_index_name(&self.statement.table, unique, position),
4196                column_indices,
4197            ));
4198        }
4199
4200        Ok(unique_indexes)
4201    }
4202
4203    fn constraints(&self, table: &Table) -> Result<CreateConstraints> {
4204        let check_constraints =
4205            self.statement
4206                .columns
4207                .iter()
4208                .filter_map(|column| column.check_constraint.as_ref())
4209                .map(Self::clone_check_constraint)
4210                .chain(self.statement.constraints.iter().filter_map(
4211                    |constraint| match constraint {
4212                        TableConstraint::Check(constraint) => {
4213                            Some(Self::clone_check_constraint(constraint))
4214                        }
4215                        TableConstraint::Unique(_) | TableConstraint::ForeignKey(_) => None,
4216                    },
4217                ))
4218                .collect();
4219
4220        let foreign_keys =
4221            self.statement
4222                .columns
4223                .iter()
4224                .filter_map(|column| column.references.as_ref())
4225                .chain(self.statement.constraints.iter().filter_map(
4226                    |constraint| match constraint {
4227                        TableConstraint::Check(_) | TableConstraint::Unique(_) => None,
4228                        TableConstraint::ForeignKey(foreign_key) => Some(foreign_key),
4229                    },
4230                ))
4231                .map(|foreign_key| self.convert_foreign_key(table, foreign_key))
4232                .collect::<Result<Vec<_>>>()?;
4233
4234        Ok(CreateConstraints {
4235            check_constraints,
4236            foreign_keys,
4237        })
4238    }
4239
4240    fn clone_check_constraint(constraint: &CheckConstraintDefinition) -> CheckConstraint {
4241        CheckConstraint {
4242            name: constraint.name.clone(),
4243            expression_sql: constraint.expression_sql.clone(),
4244        }
4245    }
4246
4247    fn convert_foreign_key(
4248        &self,
4249        table: &Table,
4250        foreign_key: &ForeignKeyDefinition,
4251    ) -> Result<ForeignKeyConstraint> {
4252        let column_indices = foreign_key
4253            .columns
4254            .iter()
4255            .map(|column_name| {
4256                table.get_column_index(column_name).ok_or_else(|| {
4257                    HematiteError::ParseError(format!(
4258                        "Foreign key column '{}' does not exist in table '{}'",
4259                        column_name, table.name
4260                    ))
4261                })
4262            })
4263            .collect::<Result<Vec<_>>>()?;
4264        Ok(ForeignKeyConstraint {
4265            name: foreign_key.name.clone(),
4266            column_indices,
4267            referenced_table: foreign_key.referenced_table.clone(),
4268            referenced_columns: foreign_key.referenced_columns.clone(),
4269            on_delete: convert_foreign_key_action(foreign_key.on_delete),
4270            on_update: convert_foreign_key_action(foreign_key.on_update),
4271        })
4272    }
4273}
4274
4275struct CreateConstraints {
4276    check_constraints: Vec<CheckConstraint>,
4277    foreign_keys: Vec<ForeignKeyConstraint>,
4278}
4279
4280impl QueryExecutor for CreateExecutor {
4281    fn execute(&mut self, ctx: &mut ExecutionContext) -> Result<QueryResult> {
4282        validate_statement(&Statement::Create(self.statement.clone()), &ctx.catalog)?;
4283        if self.statement.if_not_exists
4284            && ctx
4285                .catalog
4286                .get_table_by_name(&self.statement.table)
4287                .is_some()
4288        {
4289            return Ok(mutation_result(0));
4290        }
4291
4292        let columns = self.convert_column_definitions()?;
4293
4294        // Create storage structures first so the catalog only persists the final roots once.
4295        let root_page_id = ctx.engine.create_table(&self.statement.table)?;
4296        let primary_key_root_page_id = ctx.engine.create_empty_btree()?;
4297        ctx.catalog.create_table_with_roots(
4298            self.statement.table.clone(),
4299            columns,
4300            root_page_id,
4301            primary_key_root_page_id,
4302        )?;
4303
4304        let table = ctx
4305            .catalog
4306            .get_table_by_name(&self.statement.table)
4307            .ok_or_else(|| table_disappeared_internal_error(&self.statement.table, "CREATE TABLE"))?
4308            .clone();
4309        let constraints = self.constraints(&table)?;
4310        for (index_name, column_indices) in self.unique_index_specs()? {
4311            let unique_index_root_page_id = ctx.engine.create_empty_btree()?;
4312            ctx.catalog.add_secondary_index(
4313                table.id,
4314                crate::catalog::SecondaryIndex {
4315                    name: index_name,
4316                    column_indices,
4317                    root_page_id: unique_index_root_page_id,
4318                    unique: true,
4319                },
4320            )?;
4321        }
4322        for constraint in constraints.check_constraints {
4323            ctx.catalog.add_check_constraint(table.id, constraint)?;
4324        }
4325        for foreign_key in constraints.foreign_keys {
4326            ctx.catalog.add_foreign_key(table.id, foreign_key)?;
4327        }
4328
4329        Ok(QueryResult {
4330            affected_rows: 0,
4331            columns: Vec::new(),
4332            rows: Vec::new(),
4333        })
4334    }
4335}
4336
4337#[derive(Debug, Clone)]
4338pub struct DropExecutor {
4339    pub statement: DropStatement,
4340}
4341
4342impl DropExecutor {
4343    pub fn new(statement: DropStatement) -> Self {
4344        Self { statement }
4345    }
4346}
4347
4348impl QueryExecutor for DropExecutor {
4349    fn execute(&mut self, ctx: &mut ExecutionContext<'_>) -> Result<QueryResult> {
4350        validate_statement(&Statement::Drop(self.statement.clone()), &ctx.catalog)?;
4351        if self.statement.if_exists
4352            && ctx
4353                .catalog
4354                .get_table_by_name(&self.statement.table)
4355                .is_none()
4356        {
4357            return Ok(mutation_result(0));
4358        }
4359
4360        let table = catalog_table(ctx, &self.statement.table)?;
4361
4362        ctx.engine.drop_table_with_indexes(&table)?;
4363        ctx.catalog
4364            .drop_table(table.id)
4365            .map_err(|err| HematiteError::ParseError(err.to_string()))?;
4366
4367        Ok(QueryResult {
4368            affected_rows: 0,
4369            columns: Vec::new(),
4370            rows: Vec::new(),
4371        })
4372    }
4373}
4374
4375#[derive(Debug, Clone)]
4376pub struct AlterExecutor {
4377    pub statement: AlterStatement,
4378}
4379
4380impl AlterExecutor {
4381    pub fn new(statement: AlterStatement) -> Self {
4382        Self { statement }
4383    }
4384}
4385
4386impl QueryExecutor for AlterExecutor {
4387    fn execute(&mut self, ctx: &mut ExecutionContext<'_>) -> Result<QueryResult> {
4388        validate_statement(&Statement::Alter(self.statement.clone()), &ctx.catalog)?;
4389
4390        match &self.statement.operation {
4391            AlterOperation::RenameTo(new_name) => {
4392                let table = catalog_table(ctx, &self.statement.table)?;
4393                ctx.catalog.rename_table(table.id, new_name.clone())?;
4394                ctx.engine
4395                    .rename_table_runtime_metadata(&self.statement.table, new_name)?;
4396            }
4397            AlterOperation::RenameColumn { old_name, new_name } => {
4398                let table = catalog_table(ctx, &self.statement.table)?;
4399                ctx.catalog
4400                    .rename_column(table.id, old_name, new_name.clone())?;
4401            }
4402            AlterOperation::AddColumn(column_def) => {
4403                let table = catalog_table(ctx, &self.statement.table)?;
4404
4405                let column = Column::new(
4406                    crate::catalog::ColumnId::new(ctx.catalog.next_column_id()),
4407                    column_def.name.clone(),
4408                    lower_type_name(column_def.data_type.clone()),
4409                )
4410                .character_set(column_def.character_set.clone())
4411                .collation(column_def.collation.clone())
4412                .nullable(column_def.nullable)
4413                .primary_key(column_def.primary_key);
4414                let column = if let Some(default_value) = &column_def.default_value {
4415                    let coerced_default =
4416                        coerce_column_value(&column, lower_literal_value(default_value))?;
4417                    column.default_value(coerced_default)
4418                } else {
4419                    column
4420                };
4421
4422                let fill_value = column.get_default_or_null();
4423                let mut rows = ctx.engine.read_rows_with_ids(&self.statement.table)?;
4424                for row in &mut rows {
4425                    row.values.push(fill_value.clone());
4426                }
4427
4428                ctx.catalog.add_column(table.id, column)?;
4429                ctx.engine.replace_table_rows(&self.statement.table, rows)?;
4430            }
4431            AlterOperation::AddConstraint(constraint) => {
4432                let table = catalog_table(ctx, &self.statement.table)?;
4433                match constraint {
4434                    TableConstraint::Check(check) => {
4435                        ctx.catalog.add_check_constraint(
4436                            table.id,
4437                            CheckConstraint {
4438                                name: check.name.clone(),
4439                                expression_sql: check.expression_sql.clone(),
4440                            },
4441                        )?;
4442                    }
4443                    TableConstraint::Unique(unique) => {
4444                        let root_page_id = ctx.engine.create_empty_btree()?;
4445                        let column_indices = unique
4446                            .columns
4447                            .iter()
4448                            .map(|column_name| {
4449                                table.get_column_index(column_name).ok_or_else(|| {
4450                                    HematiteError::ParseError(format!(
4451                                        "UNIQUE constraint column '{}' does not exist in table '{}'",
4452                                        column_name, self.statement.table
4453                                    ))
4454                                })
4455                            })
4456                            .collect::<Result<Vec<_>>>()?;
4457                        ctx.catalog.add_secondary_index(
4458                            table.id,
4459                            crate::catalog::SecondaryIndex {
4460                                name: unique.name.clone().ok_or_else(|| {
4461                                    HematiteError::InternalError(
4462                                        "validated UNIQUE constraint lost its name".to_string(),
4463                                    )
4464                                })?,
4465                                column_indices,
4466                                root_page_id,
4467                                unique: true,
4468                            },
4469                        )?;
4470                        let updated_table = ctx
4471                            .catalog
4472                            .get_table(table.id)
4473                            .ok_or_else(|| {
4474                                table_disappeared_internal_error(
4475                                    &self.statement.table,
4476                                    "adding unique constraint",
4477                                )
4478                            })?
4479                            .clone();
4480                        let rows = ctx.engine.read_rows_with_ids(&self.statement.table)?;
4481                        ctx.engine
4482                            .rebuild_secondary_indexes(&updated_table, &rows)?;
4483                    }
4484                    TableConstraint::ForeignKey(foreign_key) => {
4485                        let column_indices = foreign_key
4486                            .columns
4487                            .iter()
4488                            .map(|column_name| {
4489                                table.get_column_index(column_name).ok_or_else(|| {
4490                                    HematiteError::ParseError(format!(
4491                                        "Foreign key column '{}' does not exist in table '{}'",
4492                                        column_name, self.statement.table
4493                                    ))
4494                                })
4495                            })
4496                            .collect::<Result<Vec<_>>>()?;
4497                        ctx.catalog.add_foreign_key(
4498                            table.id,
4499                            ForeignKeyConstraint {
4500                                name: foreign_key.name.clone(),
4501                                column_indices,
4502                                referenced_table: foreign_key.referenced_table.clone(),
4503                                referenced_columns: foreign_key.referenced_columns.clone(),
4504                                on_delete: convert_foreign_key_action(foreign_key.on_delete),
4505                                on_update: convert_foreign_key_action(foreign_key.on_update),
4506                            },
4507                        )?;
4508                    }
4509                }
4510            }
4511            AlterOperation::DropColumn(column_name) => {
4512                let table = catalog_table(ctx, &self.statement.table)?;
4513                let column_index = table.get_column_index(column_name).ok_or_else(|| {
4514                    HematiteError::InternalError(format!(
4515                        "Column '{}' disappeared during ALTER TABLE DROP COLUMN",
4516                        column_name
4517                    ))
4518                })?;
4519                let mut rows = ctx.engine.read_rows_with_ids(&self.statement.table)?;
4520                for row in &mut rows {
4521                    row.values.remove(column_index);
4522                }
4523
4524                ctx.catalog.drop_column(table.id, column_name)?;
4525                ctx.engine.replace_table_rows(&self.statement.table, rows)?;
4526            }
4527            AlterOperation::DropConstraint(constraint_name) => {
4528                let table = catalog_table(ctx, &self.statement.table)?;
4529                if let Some(index) = table.get_secondary_index(constraint_name) {
4530                    if index.unique {
4531                        ctx.engine.delete_tree(index.root_page_id)?;
4532                        ctx.catalog
4533                            .drop_secondary_index(table.id, constraint_name)?;
4534                    } else {
4535                        return Err(HematiteError::ParseError(format!(
4536                            "Constraint '{}' is not a droppable UNIQUE constraint",
4537                            constraint_name
4538                        )));
4539                    }
4540                } else {
4541                    ctx.catalog
4542                        .drop_named_constraint(table.id, constraint_name)?;
4543                }
4544            }
4545            AlterOperation::AlterColumnSetDefault {
4546                column_name,
4547                default_value,
4548            } => {
4549                let table = catalog_table(ctx, &self.statement.table)?;
4550                let column = table.get_column_by_name(column_name).ok_or_else(|| {
4551                    HematiteError::ParseError(format!(
4552                        "Column '{}' does not exist in table '{}'",
4553                        column_name, self.statement.table
4554                    ))
4555                })?;
4556                ctx.catalog.set_column_default(
4557                    table.id,
4558                    column_name,
4559                    Some(coerce_column_value(
4560                        column,
4561                        lower_literal_value(default_value),
4562                    )?),
4563                )?;
4564            }
4565            AlterOperation::AlterColumnDropDefault { column_name } => {
4566                let table = catalog_table(ctx, &self.statement.table)?;
4567                ctx.catalog
4568                    .set_column_default(table.id, column_name, None)?;
4569            }
4570            AlterOperation::AlterColumnSetNotNull { column_name } => {
4571                let table = catalog_table(ctx, &self.statement.table)?;
4572                let column_index = table.get_column_index(column_name).ok_or_else(|| {
4573                    HematiteError::InternalError(format!(
4574                        "Column '{}' disappeared during ALTER COLUMN SET NOT NULL",
4575                        column_name
4576                    ))
4577                })?;
4578                let rows = ctx.engine.read_from_table(&self.statement.table)?;
4579                if rows
4580                    .iter()
4581                    .any(|row| row.get(column_index).is_some_and(Value::is_null))
4582                {
4583                    return Err(HematiteError::ParseError(format!(
4584                        "Cannot set column '{}' to NOT NULL because existing rows contain NULL",
4585                        column_name
4586                    )));
4587                }
4588                ctx.catalog
4589                    .set_column_nullable(table.id, column_name, false)?;
4590            }
4591            AlterOperation::AlterColumnDropNotNull { column_name } => {
4592                let table = catalog_table(ctx, &self.statement.table)?;
4593                ctx.catalog
4594                    .set_column_nullable(table.id, column_name, true)?;
4595            }
4596        }
4597
4598        Ok(QueryResult {
4599            affected_rows: 0,
4600            columns: Vec::new(),
4601            rows: Vec::new(),
4602        })
4603    }
4604}
4605
4606#[derive(Debug, Clone)]
4607pub struct CreateIndexExecutor {
4608    pub statement: CreateIndexStatement,
4609}
4610
4611impl CreateIndexExecutor {
4612    pub fn new(statement: CreateIndexStatement) -> Self {
4613        Self { statement }
4614    }
4615}
4616
4617impl QueryExecutor for CreateIndexExecutor {
4618    fn execute(&mut self, ctx: &mut ExecutionContext<'_>) -> Result<QueryResult> {
4619        validate_statement(
4620            &Statement::CreateIndex(self.statement.clone()),
4621            &ctx.catalog,
4622        )?;
4623        if self.statement.if_not_exists {
4624            if let Some(table) = ctx.catalog.get_table_by_name(&self.statement.table) {
4625                if table
4626                    .get_secondary_index(&self.statement.index_name)
4627                    .is_some()
4628                {
4629                    return Ok(mutation_result(0));
4630                }
4631            }
4632        }
4633
4634        let table = catalog_table(ctx, &self.statement.table)?;
4635
4636        let column_indices = self
4637            .statement
4638            .columns
4639            .iter()
4640            .map(|column_name| {
4641                table.get_column_index(column_name).ok_or_else(|| {
4642                    HematiteError::ParseError(format!(
4643                        "Column '{}' does not exist in table '{}'",
4644                        column_name, self.statement.table
4645                    ))
4646                })
4647            })
4648            .collect::<Result<Vec<_>>>()?;
4649
4650        let root_page_id = ctx.engine.create_empty_btree()?;
4651        ctx.catalog.add_secondary_index(
4652            table.id,
4653            crate::catalog::SecondaryIndex {
4654                name: self.statement.index_name.clone(),
4655                column_indices,
4656                root_page_id,
4657                unique: self.statement.unique,
4658            },
4659        )?;
4660
4661        let updated_table = ctx
4662            .catalog
4663            .get_table(table.id)
4664            .ok_or_else(|| {
4665                table_disappeared_internal_error(
4666                    &self.statement.table,
4667                    &format!("creating index '{}'", self.statement.index_name),
4668                )
4669            })?
4670            .clone();
4671        let rows = ctx.engine.read_rows_with_ids(&self.statement.table)?;
4672        ctx.engine
4673            .rebuild_secondary_indexes(&updated_table, &rows)?;
4674
4675        Ok(QueryResult {
4676            affected_rows: 0,
4677            columns: Vec::new(),
4678            rows: Vec::new(),
4679        })
4680    }
4681}
4682
4683fn secondary_index_key_values(
4684    index: &crate::catalog::SecondaryIndex,
4685    row_values: &[Value],
4686) -> Vec<Value> {
4687    index
4688        .column_indices
4689        .iter()
4690        .map(|&column_index| row_values[column_index].clone())
4691        .collect()
4692}
4693
4694fn mutation_result(affected_rows: usize) -> QueryResult {
4695    QueryResult {
4696        affected_rows,
4697        columns: Vec::new(),
4698        rows: Vec::new(),
4699    }
4700}
4701
4702fn duplicate_primary_key_parse_error(table_name: &str, key_values: &[Value]) -> HematiteError {
4703    HematiteError::ParseError(format!(
4704        "Duplicate primary key for table '{}': {:?}",
4705        table_name, key_values
4706    ))
4707}
4708
4709fn table_not_found_parse_error(table_name: &str) -> HematiteError {
4710    HematiteError::ParseError(format!("Table '{}' not found", table_name))
4711}
4712
4713fn table_disappeared_internal_error(table_name: &str, operation: &str) -> HematiteError {
4714    HematiteError::InternalError(format!(
4715        "Table '{}' disappeared during {}",
4716        table_name, operation
4717    ))
4718}
4719
4720fn catalog_table(ctx: &ExecutionContext<'_>, table_name: &str) -> Result<Table> {
4721    ctx.catalog
4722        .get_table_by_name(table_name)
4723        .cloned()
4724        .ok_or_else(|| table_not_found_parse_error(table_name))
4725}
4726
4727fn current_table_row_counts(engine: &crate::catalog::CatalogEngine) -> HashMap<String, usize> {
4728    engine
4729        .get_table_metadata()
4730        .iter()
4731        .map(|(name, metadata)| (name.clone(), metadata.row_count as usize))
4732        .collect()
4733}
4734
4735fn apply_set_operation(
4736    operator: SetOperator,
4737    mut left_rows: Vec<Vec<Value>>,
4738    right_rows: Vec<Vec<Value>>,
4739) -> Vec<Vec<Value>> {
4740    match operator {
4741        SetOperator::UnionAll => {
4742            left_rows.extend(right_rows);
4743            left_rows
4744        }
4745        SetOperator::Union => {
4746            left_rows.extend(right_rows);
4747            apply_distinct_if_needed(true, &mut left_rows);
4748            left_rows
4749        }
4750        SetOperator::Intersect => {
4751            apply_distinct_if_needed(true, &mut left_rows);
4752            let mut distinct_right = right_rows;
4753            apply_distinct_if_needed(true, &mut distinct_right);
4754            left_rows
4755                .into_iter()
4756                .filter(|row| distinct_right.contains(row))
4757                .collect()
4758        }
4759        SetOperator::Except => {
4760            apply_distinct_if_needed(true, &mut left_rows);
4761            let mut distinct_right = right_rows;
4762            apply_distinct_if_needed(true, &mut distinct_right);
4763            left_rows
4764                .into_iter()
4765                .filter(|row| !distinct_right.contains(row))
4766                .collect()
4767        }
4768    }
4769}
4770
4771fn primary_key_values(table: &Table, row: &[Value]) -> Result<Vec<Value>> {
4772    table.get_primary_key_values(row).map_err(|err| {
4773        HematiteError::ParseError(format!("Failed to extract primary key values: {}", err))
4774    })
4775}
4776
4777fn out_of_range_error(column: &Column, type_name: &str) -> HematiteError {
4778    HematiteError::ParseError(format!(
4779        "Type mismatch: column '{}' expects {}, got out-of-range value",
4780        column.name, type_name
4781    ))
4782}
4783
4784fn coerce_varchar_value(value: String, max_chars: u32, label: &str) -> Result<Value> {
4785    if value.chars().count() > max_chars as usize {
4786        return Err(HematiteError::ParseError(format!(
4787            "{} exceeds declared character length {}",
4788            label, max_chars
4789        )));
4790    }
4791    Ok(Value::Text(value))
4792}
4793
4794fn coerce_char_value(value: String, length: u32, label: &str) -> Result<Value> {
4795    if value.chars().count() > length as usize {
4796        return Err(HematiteError::ParseError(format!(
4797            "{} exceeds declared character length {}",
4798            label, length
4799        )));
4800    }
4801    Ok(Value::Text(pad_text_to_char_length(&value, length)))
4802}
4803
4804fn cast_value_to_text_string(value: Value) -> Result<String> {
4805    match value {
4806        Value::Integer(value) => Ok(value.to_string()),
4807        Value::BigInt(value) => Ok(value.to_string()),
4808        Value::Int128(value) => Ok(value.to_string()),
4809        Value::UInteger(value) => Ok(value.to_string()),
4810        Value::UBigInt(value) => Ok(value.to_string()),
4811        Value::UInt128(value) => Ok(value.to_string()),
4812        Value::Float32(value) => Ok(value.to_string()),
4813        Value::Float(value) => Ok(value.to_string()),
4814        Value::Enum(value) | Value::Text(value) => Ok(value),
4815        Value::Boolean(true) => Ok("TRUE".to_string()),
4816        Value::Boolean(false) => Ok("FALSE".to_string()),
4817        Value::Decimal(value) => Ok(value.to_string()),
4818        Value::Date(value) => Ok(value.to_string()),
4819        Value::Time(value) => Ok(value.to_string()),
4820        Value::DateTime(value) => Ok(value.to_string()),
4821        Value::TimeWithTimeZone(value) => Ok(value.to_string()),
4822        Value::Blob(value) => Ok(String::from_utf8_lossy(&value).into_owned()),
4823        Value::Null => Err(HematiteError::ParseError(
4824            "Cannot CAST NULL to text without preserving NULL".to_string(),
4825        )),
4826        Value::IntervalYearMonth(value) => Ok(value.to_string()),
4827        Value::IntervalDaySecond(value) => Ok(value.to_string()),
4828    }
4829}
4830
4831fn coerce_binary_value(value: Value, max_len: u32, label: &str, fixed: bool) -> Result<Value> {
4832    let mut bytes = match value {
4833        Value::Blob(bytes) => bytes,
4834        Value::Text(value) => value.into_bytes(),
4835        Value::Enum(value) => value.into_bytes(),
4836        value => {
4837            return Err(HematiteError::ParseError(format!(
4838                "Expected binary-compatible value for {}, found {:?}",
4839                label, value
4840            )))
4841        }
4842    };
4843
4844    if bytes.len() > max_len as usize {
4845        return Err(HematiteError::ParseError(format!(
4846            "{} exceeds declared byte length {}",
4847            label, max_len
4848        )));
4849    }
4850    if fixed {
4851        bytes.resize(max_len as usize, 0);
4852    }
4853    Ok(Value::Blob(bytes))
4854}
4855
4856fn coerce_decimal_value(value: Value) -> Result<DecimalValue> {
4857    match value {
4858        Value::Decimal(value) => Ok(value),
4859        Value::Integer(value) => Ok(DecimalValue::from_i32(value)),
4860        Value::BigInt(value) => Ok(DecimalValue::from_i64(value)),
4861        Value::Int128(value) => Ok(DecimalValue::from_i128(value)),
4862        Value::UInteger(value) => Ok(DecimalValue::from_u32(value)),
4863        Value::UBigInt(value) => Ok(DecimalValue::from_u64(value)),
4864        Value::UInt128(value) => Ok(DecimalValue::from_u128(value)),
4865        Value::Float32(value) => DecimalValue::from_f64(value as f64),
4866        Value::Float(value) => DecimalValue::from_f64(value),
4867        Value::Text(value) => DecimalValue::parse(&value),
4868        value => Err(HematiteError::ParseError(format!(
4869            "Expected DECIMAL-compatible value, found {:?}",
4870            value
4871        ))),
4872    }
4873}
4874
4875fn numeric_value_as_f64(value: &Value) -> Option<f64> {
4876    match value {
4877        Value::Integer(value) => Some(*value as f64),
4878        Value::BigInt(value) => Some(*value as f64),
4879        Value::Int128(value) => Some(*value as f64),
4880        Value::UInteger(value) => Some(*value as f64),
4881        Value::UBigInt(value) => Some(*value as f64),
4882        Value::UInt128(value) => Some(*value as f64),
4883        Value::Float32(value) => Some(*value as f64),
4884        Value::Float(value) => Some(*value),
4885        Value::Decimal(value) => value.to_f64(),
4886        _ => None,
4887    }
4888}
4889
4890fn make_float_value(data_type: &DataType, value: f64) -> Value {
4891    match data_type {
4892        DataType::Float32 => Value::Float32(value as f32),
4893        DataType::Float => Value::Float(value),
4894        _ => unreachable!("non-float type used for float value construction"),
4895    }
4896}
4897
4898fn float_type_name(data_type: &DataType) -> &'static str {
4899    match data_type {
4900        DataType::Float32 => "FLOAT32",
4901        DataType::Float => "FLOAT",
4902        _ => unreachable!("non-float type used for float naming"),
4903    }
4904}
4905
4906fn coerce_enum_value(value: Value, variants: &[String], label: &str) -> Result<Value> {
4907    let value = match value {
4908        Value::Enum(value) | Value::Text(value) => value,
4909        value => {
4910            return Err(HematiteError::ParseError(format!(
4911                "Expected ENUM-compatible value for {}, found {:?}",
4912                label, value
4913            )))
4914        }
4915    };
4916
4917    if !variants.contains(&value) {
4918        return Err(HematiteError::ParseError(format!(
4919            "{} is not a valid ENUM variant",
4920            value
4921        )));
4922    }
4923
4924    Ok(Value::Enum(value))
4925}
4926
4927fn coerce_column_value(column: &Column, value: Value) -> Result<Value> {
4928    match (&column.data_type, value) {
4929        (DataType::Int8, Value::Integer(i)) => i8::try_from(i)
4930            .map(|_| Value::Integer(i))
4931            .map_err(|_| out_of_range_error(column, "INT8")),
4932        (DataType::Int8, Value::BigInt(i)) => i8::try_from(i)
4933            .map(|value| Value::Integer(value as i32))
4934            .map_err(|_| out_of_range_error(column, "INT8")),
4935        (DataType::Int8, Value::Int128(i)) => i8::try_from(i)
4936            .map(|value| Value::Integer(value as i32))
4937            .map_err(|_| out_of_range_error(column, "INT8")),
4938        (DataType::Int8, Value::UInteger(i)) => i8::try_from(i)
4939            .map(|value| Value::Integer(value as i32))
4940            .map_err(|_| out_of_range_error(column, "INT8")),
4941        (DataType::Int8, Value::UBigInt(i)) => i8::try_from(i)
4942            .map(|value| Value::Integer(value as i32))
4943            .map_err(|_| out_of_range_error(column, "INT8")),
4944        (DataType::Int8, Value::UInt128(i)) => i8::try_from(i)
4945            .map(|value| Value::Integer(value as i32))
4946            .map_err(|_| out_of_range_error(column, "INT8")),
4947        (DataType::Int16, Value::Integer(i)) => i16::try_from(i)
4948            .map(|_| Value::Integer(i))
4949            .map_err(|_| out_of_range_error(column, "INT16")),
4950        (DataType::Int16, Value::BigInt(i)) => i16::try_from(i)
4951            .map(|value| Value::Integer(value as i32))
4952            .map_err(|_| out_of_range_error(column, "INT16")),
4953        (DataType::Int16, Value::Int128(i)) => i16::try_from(i)
4954            .map(|value| Value::Integer(value as i32))
4955            .map_err(|_| out_of_range_error(column, "INT16")),
4956        (DataType::Int16, Value::UInteger(i)) => i16::try_from(i)
4957            .map(|value| Value::Integer(value as i32))
4958            .map_err(|_| out_of_range_error(column, "INT16")),
4959        (DataType::Int16, Value::UBigInt(i)) => i16::try_from(i)
4960            .map(|value| Value::Integer(value as i32))
4961            .map_err(|_| out_of_range_error(column, "INT16")),
4962        (DataType::Int16, Value::UInt128(i)) => i16::try_from(i)
4963            .map(|value| Value::Integer(value as i32))
4964            .map_err(|_| out_of_range_error(column, "INT16")),
4965        (DataType::Int, Value::Integer(i)) => Ok(Value::Integer(i)),
4966        (DataType::Int, Value::BigInt(i)) => i32::try_from(i)
4967            .map(Value::Integer)
4968            .map_err(|_| out_of_range_error(column, "INT")),
4969        (DataType::Int, Value::Int128(i)) => i32::try_from(i)
4970            .map(Value::Integer)
4971            .map_err(|_| out_of_range_error(column, "INT")),
4972        (DataType::Int, Value::UInteger(i)) => i32::try_from(i)
4973            .map(Value::Integer)
4974            .map_err(|_| out_of_range_error(column, "INT")),
4975        (DataType::Int, Value::UBigInt(i)) => i32::try_from(i)
4976            .map(Value::Integer)
4977            .map_err(|_| out_of_range_error(column, "INT")),
4978        (DataType::Int, Value::UInt128(i)) => i32::try_from(i)
4979            .map(Value::Integer)
4980            .map_err(|_| out_of_range_error(column, "INT")),
4981        (DataType::Int64, Value::Integer(i)) => Ok(Value::BigInt(i as i64)),
4982        (DataType::Int64, Value::BigInt(i)) => Ok(Value::BigInt(i)),
4983        (DataType::Int64, Value::Int128(i)) => i64::try_from(i)
4984            .map(Value::BigInt)
4985            .map_err(|_| out_of_range_error(column, "INT64")),
4986        (DataType::Int64, Value::UInteger(i)) => Ok(Value::BigInt(i as i64)),
4987        (DataType::Int64, Value::UBigInt(i)) => i64::try_from(i)
4988            .map(Value::BigInt)
4989            .map_err(|_| out_of_range_error(column, "INT64")),
4990        (DataType::Int64, Value::UInt128(i)) => i64::try_from(i)
4991            .map(Value::BigInt)
4992            .map_err(|_| out_of_range_error(column, "INT64")),
4993        (DataType::Int128, Value::Integer(i)) => Ok(Value::Int128(i as i128)),
4994        (DataType::Int128, Value::BigInt(i)) => Ok(Value::Int128(i as i128)),
4995        (DataType::Int128, Value::Int128(i)) => Ok(Value::Int128(i)),
4996        (DataType::Int128, Value::UInteger(i)) => Ok(Value::Int128(i as i128)),
4997        (DataType::Int128, Value::UBigInt(i)) => Ok(Value::Int128(i as i128)),
4998        (DataType::Int128, Value::UInt128(i)) => i128::try_from(i)
4999            .map(Value::Int128)
5000            .map_err(|_| out_of_range_error(column, "INT128")),
5001        (DataType::UInt8, Value::Integer(i)) if i >= 0 => u8::try_from(i)
5002            .map(|value| Value::UInteger(value as u32))
5003            .map_err(|_| out_of_range_error(column, "UINT8")),
5004        (DataType::UInt8, Value::BigInt(i)) if i >= 0 => u8::try_from(i)
5005            .map(|value| Value::UInteger(value as u32))
5006            .map_err(|_| out_of_range_error(column, "UINT8")),
5007        (DataType::UInt8, Value::Int128(i)) if i >= 0 => u8::try_from(i)
5008            .map(|value| Value::UInteger(value as u32))
5009            .map_err(|_| out_of_range_error(column, "UINT8")),
5010        (DataType::UInt8, Value::UInteger(i)) => u8::try_from(i)
5011            .map(|value| Value::UInteger(value as u32))
5012            .map_err(|_| out_of_range_error(column, "UINT8")),
5013        (DataType::UInt8, Value::UBigInt(i)) => u8::try_from(i)
5014            .map(|value| Value::UInteger(value as u32))
5015            .map_err(|_| out_of_range_error(column, "UINT8")),
5016        (DataType::UInt8, Value::UInt128(i)) => u8::try_from(i)
5017            .map(|value| Value::UInteger(value as u32))
5018            .map_err(|_| out_of_range_error(column, "UINT8")),
5019        (DataType::UInt16, Value::Integer(i)) if i >= 0 => u16::try_from(i)
5020            .map(|value| Value::UInteger(value as u32))
5021            .map_err(|_| out_of_range_error(column, "UINT16")),
5022        (DataType::UInt16, Value::BigInt(i)) if i >= 0 => u16::try_from(i)
5023            .map(|value| Value::UInteger(value as u32))
5024            .map_err(|_| out_of_range_error(column, "UINT16")),
5025        (DataType::UInt16, Value::Int128(i)) if i >= 0 => u16::try_from(i)
5026            .map(|value| Value::UInteger(value as u32))
5027            .map_err(|_| out_of_range_error(column, "UINT16")),
5028        (DataType::UInt16, Value::UInteger(i)) => u16::try_from(i)
5029            .map(|value| Value::UInteger(value as u32))
5030            .map_err(|_| out_of_range_error(column, "UINT16")),
5031        (DataType::UInt16, Value::UBigInt(i)) => u16::try_from(i)
5032            .map(|value| Value::UInteger(value as u32))
5033            .map_err(|_| out_of_range_error(column, "UINT16")),
5034        (DataType::UInt16, Value::UInt128(i)) => u16::try_from(i)
5035            .map(|value| Value::UInteger(value as u32))
5036            .map_err(|_| out_of_range_error(column, "UINT16")),
5037        (DataType::UInt, Value::Integer(i)) if i >= 0 => Ok(Value::UInteger(i as u32)),
5038        (DataType::UInt, Value::BigInt(i)) if i >= 0 => u32::try_from(i)
5039            .map(Value::UInteger)
5040            .map_err(|_| out_of_range_error(column, "UINT")),
5041        (DataType::UInt, Value::Int128(i)) if i >= 0 => u32::try_from(i)
5042            .map(Value::UInteger)
5043            .map_err(|_| out_of_range_error(column, "UINT")),
5044        (DataType::UInt, Value::UInteger(i)) => Ok(Value::UInteger(i)),
5045        (DataType::UInt, Value::UBigInt(i)) => u32::try_from(i)
5046            .map(Value::UInteger)
5047            .map_err(|_| out_of_range_error(column, "UINT")),
5048        (DataType::UInt, Value::UInt128(i)) => u32::try_from(i)
5049            .map(Value::UInteger)
5050            .map_err(|_| out_of_range_error(column, "UINT")),
5051        (DataType::UInt64, Value::Integer(i)) if i >= 0 => Ok(Value::UBigInt(i as u64)),
5052        (DataType::UInt64, Value::BigInt(i)) if i >= 0 => Ok(Value::UBigInt(i as u64)),
5053        (DataType::UInt64, Value::Int128(i)) if i >= 0 => u64::try_from(i)
5054            .map(Value::UBigInt)
5055            .map_err(|_| out_of_range_error(column, "UINT64")),
5056        (DataType::UInt64, Value::UInteger(i)) => Ok(Value::UBigInt(i as u64)),
5057        (DataType::UInt64, Value::UBigInt(i)) => Ok(Value::UBigInt(i)),
5058        (DataType::UInt64, Value::UInt128(i)) => u64::try_from(i)
5059            .map(Value::UBigInt)
5060            .map_err(|_| out_of_range_error(column, "UINT64")),
5061        (DataType::UInt128, Value::Integer(i)) if i >= 0 => Ok(Value::UInt128(i as u128)),
5062        (DataType::UInt128, Value::BigInt(i)) if i >= 0 => Ok(Value::UInt128(i as u128)),
5063        (DataType::UInt128, Value::Int128(i)) if i >= 0 => Ok(Value::UInt128(i as u128)),
5064        (DataType::UInt128, Value::UInteger(i)) => Ok(Value::UInt128(i as u128)),
5065        (DataType::UInt128, Value::UBigInt(i)) => Ok(Value::UInt128(i as u128)),
5066        (DataType::UInt128, Value::UInt128(i)) => Ok(Value::UInt128(i)),
5067        (_, Value::Null) if column.nullable => Ok(Value::Null),
5068        (_, Value::Null) => Err(HematiteError::ParseError(format!(
5069            "Column '{}' cannot be NULL",
5070            column.name
5071        ))),
5072        (DataType::Text, Value::Text(s)) => Ok(Value::Text(s)),
5073        (DataType::Char(length), Value::Text(s)) => coerce_char_value(s, *length, &column.name),
5074        (DataType::VarChar(length), Value::Text(s)) => {
5075            coerce_varchar_value(s, *length, &column.name)
5076        }
5077        (DataType::Binary(length), value) => {
5078            coerce_binary_value(value, *length, &column.name, true)
5079        }
5080        (DataType::VarBinary(length), value) => {
5081            coerce_binary_value(value, *length, &column.name, false)
5082        }
5083        (DataType::Enum(values), value) => coerce_enum_value(value, values, &column.name),
5084        (DataType::Boolean, Value::Boolean(b)) => Ok(Value::Boolean(b)),
5085        (data_type @ (DataType::Float32 | DataType::Float), value) => {
5086            let Some(number) = numeric_value_as_f64(&value) else {
5087                return Err(HematiteError::ParseError(format!(
5088                    "Type mismatch: column '{}' expects {:?}, got {:?}",
5089                    column.name, column.data_type, value
5090                )));
5091            };
5092            Ok(make_float_value(data_type, number))
5093        }
5094        (DataType::Decimal { precision, scale }, value) => {
5095            let decimal = coerce_decimal_value(value)?;
5096            if !decimal.fits_precision_scale(*precision, *scale) {
5097                return Err(HematiteError::ParseError(format!(
5098                    "Type mismatch: column '{}' exceeds {} precision/scale",
5099                    column.name,
5100                    column.data_type.base_name()
5101                )));
5102            }
5103            Ok(Value::Decimal(decimal))
5104        }
5105        (DataType::Blob, Value::Blob(bytes)) => Ok(Value::Blob(bytes)),
5106        (DataType::Blob, Value::Text(s)) => Ok(Value::Blob(s.into_bytes())),
5107        (DataType::Blob, Value::Integer(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5108        (DataType::Blob, Value::BigInt(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5109        (DataType::Blob, Value::UInteger(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5110        (DataType::Blob, Value::UBigInt(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5111        (DataType::Blob, Value::Int128(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5112        (DataType::Blob, Value::UInt128(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5113        (DataType::Date, Value::Date(s)) => Ok(Value::Date(s)),
5114        (DataType::Date, Value::Text(s)) => Ok(Value::Date(validate_date_string(&s)?)),
5115        (DataType::Time, Value::Time(s)) => Ok(Value::Time(s)),
5116        (DataType::Time, Value::Text(s)) => Ok(Value::Time(validate_time_string(&s)?)),
5117        (DataType::DateTime, Value::DateTime(s)) => Ok(Value::DateTime(s)),
5118        (DataType::DateTime, Value::Text(s)) => Ok(Value::DateTime(validate_datetime_string(&s)?)),
5119        (DataType::TimeWithTimeZone, Value::TimeWithTimeZone(s)) => Ok(Value::TimeWithTimeZone(s)),
5120        (DataType::TimeWithTimeZone, Value::Text(s)) => Ok(Value::TimeWithTimeZone(
5121            validate_time_with_time_zone_string(&s)?,
5122        )),
5123        (_, value) => Err(HematiteError::ParseError(format!(
5124            "Type mismatch: column '{}' expects {:?}, got {:?}",
5125            column.name, column.data_type, value
5126        ))),
5127    }
5128}
5129
5130fn validate_check_constraints(
5131    ctx: &mut ExecutionContext<'_>,
5132    table: &Table,
5133    row: &[Value],
5134) -> Result<()> {
5135    if table.check_constraints.is_empty() {
5136        return Ok(());
5137    }
5138
5139    let constraint_executor = SelectExecutor::new(
5140        locator_select_statement(TableReference::Table(table.name.clone(), None), None),
5141        SelectAccessPath::FullTableScan,
5142    );
5143    let sources = constraint_executor.resolve_sources(ctx)?;
5144    let mut subquery_cache = SubqueryCache::new();
5145
5146    for constraint in &table.check_constraints {
5147        let condition =
5148            crate::parser::parser::parse_condition_fragment(&constraint.expression_sql)?;
5149        let result = constraint_executor.evaluate_condition(
5150            ctx,
5151            &mut subquery_cache,
5152            &sources,
5153            &condition,
5154            row,
5155        )?;
5156        if result == Some(false) {
5157            let constraint_name = constraint
5158                .name
5159                .as_deref()
5160                .unwrap_or(constraint.expression_sql.as_str());
5161            return Err(HematiteError::ParseError(format!(
5162                "CHECK constraint '{}' failed for table '{}'",
5163                constraint_name, table.name
5164            )));
5165        }
5166    }
5167
5168    Ok(())
5169}
5170
5171fn validate_row_constraints(
5172    ctx: &mut ExecutionContext<'_>,
5173    table: &Table,
5174    row: &[Value],
5175) -> Result<()> {
5176    validate_check_constraints(ctx, table, row)?;
5177    validate_foreign_keys(ctx, table, row, None)
5178}
5179
5180fn validate_foreign_keys(
5181    ctx: &mut ExecutionContext<'_>,
5182    table: &Table,
5183    row: &[Value],
5184    skip_constraint: Option<&ForeignKeyConstraint>,
5185) -> Result<()> {
5186    for foreign_key in &table.foreign_keys {
5187        if skip_constraint == Some(foreign_key) {
5188            continue;
5189        }
5190        let key_values = foreign_key_values_for_row(table, foreign_key, row)?;
5191        if key_values.iter().any(Value::is_null) {
5192            continue;
5193        }
5194        let referenced_target = resolve_foreign_key_target(ctx, foreign_key)?;
5195        if !referenced_key_exists(ctx, &referenced_target, &key_values)? {
5196            return Err(HematiteError::ParseError(format!(
5197                "Foreign key constraint '{}' failed on table '{}': '{}.{:?}' does not contain {:?}",
5198                foreign_key_constraint_name(foreign_key),
5199                table.name,
5200                foreign_key.referenced_table,
5201                foreign_key.referenced_columns,
5202                key_values
5203            )));
5204        }
5205    }
5206
5207    Ok(())
5208}
5209
5210struct ResolvedForeignKeyTarget {
5211    table: Table,
5212    unique_index_name: Option<String>,
5213}
5214
5215fn resolve_foreign_key_target(
5216    ctx: &ExecutionContext<'_>,
5217    foreign_key: &ForeignKeyConstraint,
5218) -> Result<ResolvedForeignKeyTarget> {
5219    let referenced_table = ctx
5220        .catalog
5221        .get_table_by_name(&foreign_key.referenced_table)
5222        .ok_or_else(|| {
5223            HematiteError::ParseError(format!(
5224                "Referenced table '{}' not found",
5225                foreign_key.referenced_table
5226            ))
5227        })?
5228        .clone();
5229    let referenced_column_indices = foreign_key
5230        .referenced_columns
5231        .iter()
5232        .map(|column| {
5233            referenced_table.get_column_index(column).ok_or_else(|| {
5234                HematiteError::ParseError(format!(
5235                    "Referenced column '{}.{}' not found",
5236                    foreign_key.referenced_table, column
5237                ))
5238            })
5239        })
5240        .collect::<Result<Vec<_>>>()?;
5241
5242    let unique_index_name = if referenced_table.primary_key_columns == referenced_column_indices {
5243        None
5244    } else {
5245        Some(
5246            referenced_table
5247                .secondary_indexes
5248                .iter()
5249                .find(|index| index.unique && index.column_indices == referenced_column_indices)
5250                .ok_or_else(|| {
5251                    HematiteError::CorruptedData(format!(
5252                        "Referenced columns '{}.{:?}' are no longer backed by a PRIMARY KEY or UNIQUE index",
5253                        foreign_key.referenced_table, foreign_key.referenced_columns
5254                    ))
5255                })?
5256                .name
5257                .clone(),
5258        )
5259    };
5260
5261    Ok(ResolvedForeignKeyTarget {
5262        table: referenced_table,
5263        unique_index_name,
5264    })
5265}
5266
5267fn referenced_key_exists(
5268    ctx: &mut ExecutionContext<'_>,
5269    target: &ResolvedForeignKeyTarget,
5270    key_values: &[Value],
5271) -> Result<bool> {
5272    if target.unique_index_name.is_none() {
5273        return Ok(ctx
5274            .engine
5275            .lookup_row_by_primary_key(&target.table, key_values)?
5276            .is_some());
5277    }
5278
5279    Ok(!ctx
5280        .engine
5281        .lookup_secondary_index_rowids(
5282            &target.table,
5283            target
5284                .unique_index_name
5285                .as_deref()
5286                .expect("non-primary target must carry a unique index name"),
5287            key_values,
5288        )?
5289        .is_empty())
5290}
5291
5292fn referencing_foreign_keys(
5293    ctx: &mut ExecutionContext<'_>,
5294    parent_table: &Table,
5295) -> Result<Vec<ReferencingForeignKey>> {
5296    let mut references = Vec::new();
5297
5298    for (_, table_name) in ctx.catalog.list_tables() {
5299        let child_table = ctx.catalog.get_table_by_name(&table_name).ok_or_else(|| {
5300            HematiteError::ParseError(format!("Table '{}' not found", table_name))
5301        })?;
5302        for foreign_key in &child_table.foreign_keys {
5303            if foreign_key.referenced_table != parent_table.name {
5304                continue;
5305            }
5306            let referenced_column_indices = foreign_key
5307                .referenced_columns
5308                .iter()
5309                .map(|column| {
5310                    parent_table.get_column_index(column).ok_or_else(|| {
5311                        HematiteError::CorruptedData(format!(
5312                            "Referenced column '{}.{}' is missing",
5313                            foreign_key.referenced_table, column
5314                        ))
5315                    })
5316                })
5317                .collect::<Result<Vec<_>>>()?;
5318            references.push(ReferencingForeignKey {
5319                child_table: child_table.clone(),
5320                foreign_key: foreign_key.clone(),
5321                referenced_column_indices,
5322            });
5323        }
5324    }
5325
5326    Ok(references)
5327}
5328
5329fn parent_reference_key_changed(
5330    ctx: &mut ExecutionContext<'_>,
5331    parent_table: &Table,
5332    original_row: &[Value],
5333    updated_row: &[Value],
5334) -> Result<bool> {
5335    for reference in referencing_foreign_keys(ctx, parent_table)? {
5336        if parent_key_for_reference(parent_table, &reference, original_row)?
5337            != parent_key_for_reference(parent_table, &reference, updated_row)?
5338        {
5339            return Ok(true);
5340        }
5341    }
5342    Ok(false)
5343}
5344
5345fn apply_parent_delete_foreign_key_actions(
5346    ctx: &mut ExecutionContext<'_>,
5347    parent_table: &Table,
5348    row: &[Value],
5349) -> Result<()> {
5350    for reference in referencing_foreign_keys(ctx, parent_table)? {
5351        let parent_key = parent_key_for_reference(parent_table, &reference, row)?;
5352        if parent_key.iter().any(Value::is_null) {
5353            continue;
5354        }
5355        let child_rows = child_rows_referencing_parent_key(
5356            ctx,
5357            &reference.child_table,
5358            &reference,
5359            &parent_key,
5360        )?;
5361        execute_parent_foreign_key_action(
5362            ctx,
5363            parent_table,
5364            &reference,
5365            child_rows,
5366            reference.foreign_key.on_delete,
5367            "delete",
5368            None,
5369        )?;
5370    }
5371
5372    Ok(())
5373}
5374
5375fn apply_parent_update_foreign_key_actions(
5376    ctx: &mut ExecutionContext<'_>,
5377    parent_table: &Table,
5378    original_row: &[Value],
5379    updated_row: &[Value],
5380) -> Result<()> {
5381    for reference in referencing_foreign_keys(ctx, parent_table)? {
5382        let old_parent_key = parent_key_for_reference(parent_table, &reference, original_row)?;
5383        let new_parent_key = parent_key_for_reference(parent_table, &reference, updated_row)?;
5384        if old_parent_key == new_parent_key || old_parent_key.iter().any(Value::is_null) {
5385            continue;
5386        }
5387
5388        let child_rows = child_rows_referencing_parent_key(
5389            ctx,
5390            &reference.child_table,
5391            &reference,
5392            &old_parent_key,
5393        )?;
5394        execute_parent_foreign_key_action(
5395            ctx,
5396            parent_table,
5397            &reference,
5398            child_rows,
5399            reference.foreign_key.on_update,
5400            "update",
5401            Some(&new_parent_key),
5402        )?;
5403    }
5404    Ok(())
5405}
5406
5407fn foreign_key_constraint_name(foreign_key: &ForeignKeyConstraint) -> &str {
5408    foreign_key
5409        .name
5410        .as_deref()
5411        .unwrap_or(foreign_key.referenced_table.as_str())
5412}
5413
5414struct ReferencingForeignKey {
5415    child_table: Table,
5416    foreign_key: ForeignKeyConstraint,
5417    referenced_column_indices: Vec<usize>,
5418}
5419
5420enum ChildKeyRewrite<'a> {
5421    Replace(&'a [Value]),
5422    SetNull,
5423}
5424
5425fn foreign_key_values_for_row(
5426    table: &Table,
5427    foreign_key: &ForeignKeyConstraint,
5428    row: &[Value],
5429) -> Result<Vec<Value>> {
5430    row_values_for_indices(row, &foreign_key.column_indices, &table.name)
5431}
5432
5433fn parent_key_for_reference(
5434    parent_table: &Table,
5435    reference: &ReferencingForeignKey,
5436    row: &[Value],
5437) -> Result<Vec<Value>> {
5438    row_values_for_indices(
5439        row,
5440        &reference.referenced_column_indices,
5441        &parent_table.name,
5442    )
5443}
5444
5445fn execute_parent_foreign_key_action(
5446    ctx: &mut ExecutionContext<'_>,
5447    parent_table: &Table,
5448    reference: &ReferencingForeignKey,
5449    child_rows: Vec<StoredRow>,
5450    action: CatalogForeignKeyAction,
5451    operation: &str,
5452    replacement_key: Option<&[Value]>,
5453) -> Result<()> {
5454    match action {
5455        CatalogForeignKeyAction::Restrict => {
5456            if !child_rows.is_empty() {
5457                return Err(HematiteError::ParseError(format!(
5458                    "Cannot {} row in table '{}' because foreign key '{}' on table '{}' still references it",
5459                    operation,
5460                    parent_table.name,
5461                    foreign_key_constraint_name(&reference.foreign_key),
5462                    reference.child_table.name
5463                )));
5464            }
5465        }
5466        CatalogForeignKeyAction::Cascade => {
5467            if let Some(replacement_key) = replacement_key {
5468                rewrite_child_foreign_key_rows(
5469                    ctx,
5470                    &reference.child_table,
5471                    &reference.foreign_key,
5472                    child_rows,
5473                    ChildKeyRewrite::Replace(replacement_key),
5474                )?;
5475            } else {
5476                for child_row in child_rows {
5477                    remove_stored_row(
5478                        ctx,
5479                        &reference.child_table.name,
5480                        &reference.child_table,
5481                        child_row.row_id,
5482                    )?;
5483                }
5484            }
5485        }
5486        CatalogForeignKeyAction::SetNull => {
5487            rewrite_child_foreign_key_rows(
5488                ctx,
5489                &reference.child_table,
5490                &reference.foreign_key,
5491                child_rows,
5492                ChildKeyRewrite::SetNull,
5493            )?;
5494        }
5495    }
5496
5497    Ok(())
5498}
5499
5500fn row_values_for_indices(
5501    row: &[Value],
5502    indices: &[usize],
5503    table_name: &str,
5504) -> Result<Vec<Value>> {
5505    indices
5506        .iter()
5507        .map(|&index| {
5508            row.get(index).cloned().ok_or_else(|| {
5509                HematiteError::CorruptedData(format!(
5510                    "Column index {} is invalid for table '{}'",
5511                    index, table_name
5512                ))
5513            })
5514        })
5515        .collect()
5516}
5517
5518fn child_rows_referencing_parent_key(
5519    ctx: &mut ExecutionContext<'_>,
5520    child_table: &Table,
5521    reference: &ReferencingForeignKey,
5522    parent_key: &[Value],
5523) -> Result<Vec<StoredRow>> {
5524    let mut matches = Vec::new();
5525    for child_row in ctx.engine.read_rows_with_ids(&child_table.name)? {
5526        let child_key =
5527            foreign_key_values_for_row(child_table, &reference.foreign_key, &child_row.values)?;
5528        if child_key.iter().any(Value::is_null) {
5529            continue;
5530        }
5531        if child_key == parent_key {
5532            matches.push(child_row);
5533        }
5534    }
5535    Ok(matches)
5536}
5537
5538fn rewrite_child_foreign_key_rows(
5539    ctx: &mut ExecutionContext<'_>,
5540    table: &Table,
5541    foreign_key: &ForeignKeyConstraint,
5542    child_rows: Vec<StoredRow>,
5543    rewrite: ChildKeyRewrite<'_>,
5544) -> Result<()> {
5545    for mut child_row in child_rows {
5546        match rewrite {
5547            ChildKeyRewrite::Replace(replacement_key) => {
5548                for (position, &column_index) in foreign_key.column_indices.iter().enumerate() {
5549                    child_row.values[column_index] = replacement_key[position].clone();
5550                }
5551            }
5552            ChildKeyRewrite::SetNull => {
5553                for &column_index in &foreign_key.column_indices {
5554                    child_row.values[column_index] = Value::Null;
5555                }
5556            }
5557        }
5558        persist_foreign_key_child_update(ctx, table, foreign_key, child_row)?;
5559    }
5560    Ok(())
5561}
5562
5563fn persist_foreign_key_child_update(
5564    ctx: &mut ExecutionContext<'_>,
5565    table: &Table,
5566    skipped_foreign_key: &ForeignKeyConstraint,
5567    row: StoredRow,
5568) -> Result<()> {
5569    table
5570        .validate_row(&row.values)
5571        .map_err(|err| HematiteError::ParseError(err.to_string()))?;
5572    validate_check_constraints(ctx, table, &row.values)?;
5573    validate_foreign_keys(ctx, table, &row.values, Some(skipped_foreign_key))?;
5574    ensure_stored_row_uniqueness(ctx, table, &row)?;
5575    remove_stored_row(ctx, &table.name, table, row.row_id)?;
5576    write_stored_row(ctx, &table.name, table, row, true).map(|_| ())
5577}
5578
5579fn ensure_stored_row_uniqueness(
5580    ctx: &mut ExecutionContext<'_>,
5581    table: &Table,
5582    row: &StoredRow,
5583) -> Result<()> {
5584    let candidate_pk = primary_key_values(table, &row.values)?;
5585    if let Some(existing_rowid) = ctx.engine.lookup_primary_key_rowid(table, &candidate_pk)? {
5586        if existing_rowid != row.row_id {
5587            return Err(duplicate_primary_key_parse_error(
5588                &table.name,
5589                &candidate_pk,
5590            ));
5591        }
5592    }
5593
5594    for index in table.secondary_indexes.iter().filter(|index| index.unique) {
5595        let key_values = secondary_index_key_values(index, &row.values);
5596        if ctx
5597            .engine
5598            .lookup_secondary_index_rowids(table, &index.name, &key_values)?
5599            .into_iter()
5600            .any(|existing_rowid| existing_rowid != row.row_id)
5601        {
5602            return Err(unique_index_parse_error(&index.name, &table.name));
5603        }
5604    }
5605
5606    Ok(())
5607}
5608
5609fn remove_stored_row(
5610    ctx: &mut ExecutionContext<'_>,
5611    table_name: &str,
5612    table: &Table,
5613    row_id: u64,
5614) -> Result<()> {
5615    let Some(existing_row) = ctx.engine.lookup_row_by_rowid(table_name, row_id)? else {
5616        return Ok(());
5617    };
5618
5619    ctx.engine
5620        .delete_secondary_index_row(table, &existing_row)?;
5621    let deleted_pk = ctx.engine.delete_primary_key_row(table, &existing_row)?;
5622    if !deleted_pk {
5623        return Err(HematiteError::CorruptedData(format!(
5624            "Primary-key index entry vanished during row removal for table '{}'",
5625            table_name
5626        )));
5627    }
5628
5629    let deleted = ctx
5630        .engine
5631        .delete_from_table_by_rowid(table_name, existing_row.row_id)?;
5632    if !deleted {
5633        return Err(HematiteError::CorruptedData(format!(
5634            "Rowid {} vanished during row removal for table '{}'",
5635            existing_row.row_id, table_name
5636        )));
5637    }
5638
5639    Ok(())
5640}
5641
5642fn write_stored_row(
5643    ctx: &mut ExecutionContext<'_>,
5644    table_name: &str,
5645    table: &Table,
5646    mut row: StoredRow,
5647    preserve_row_id: bool,
5648) -> Result<u64> {
5649    let row_id = if preserve_row_id {
5650        ctx.engine.insert_row_with_rowid(table_name, row.clone())?;
5651        row.row_id
5652    } else {
5653        let allocated_row_id = ctx
5654            .engine
5655            .insert_into_table(table_name, row.values.clone())?;
5656        row.row_id = allocated_row_id;
5657        allocated_row_id
5658    };
5659
5660    ctx.engine.register_primary_key_row(table, row.clone())?;
5661    ctx.engine.register_secondary_index_row(table, row)?;
5662    Ok(row_id)
5663}
5664
5665fn apply_distinct_if_needed(distinct: bool, rows: &mut Vec<Vec<Value>>) {
5666    if !distinct {
5667        return;
5668    }
5669
5670    let mut distinct_rows = Vec::new();
5671    for row in rows.drain(..) {
5672        if !distinct_rows.contains(&row) {
5673            distinct_rows.push(row);
5674        }
5675    }
5676    *rows = distinct_rows;
5677}
5678
5679fn deduplicate_rows(mut rows: Vec<Vec<Value>>) -> Vec<Vec<Value>> {
5680    apply_distinct_if_needed(true, &mut rows);
5681    rows
5682}
5683
5684fn locator_select_statement(
5685    from: TableReference,
5686    where_clause: Option<WhereClause>,
5687) -> SelectStatement {
5688    SelectStatement {
5689        with_clause: Vec::new(),
5690        distinct: false,
5691        columns: vec![SelectItem::Wildcard],
5692        column_aliases: vec![None],
5693        from,
5694        where_clause,
5695        group_by: Vec::new(),
5696        having_clause: None,
5697        order_by: Vec::new(),
5698        limit: None,
5699        offset: None,
5700        set_operation: None,
5701    }
5702}
5703
5704fn evaluate_arithmetic_values(
5705    operator: &ArithmeticOperator,
5706    left: Value,
5707    right: Value,
5708) -> Result<Value> {
5709    if left.is_null() || right.is_null() {
5710        return Ok(Value::Null);
5711    }
5712
5713    if let Some(value) = evaluate_temporal_arithmetic(operator, &left, &right)? {
5714        return Ok(value);
5715    }
5716
5717    if left.is_float_like() || right.is_float_like() {
5718        if let (Some(left), Some(right)) =
5719            (numeric_value_as_f64(&left), numeric_value_as_f64(&right))
5720        {
5721            return evaluate_float_arithmetic(operator, left, right);
5722        }
5723    }
5724
5725    if is_exact_numeric_value(&left) && is_exact_numeric_value(&right) {
5726        return evaluate_exact_numeric_arithmetic(operator, left, right);
5727    }
5728
5729    Err(HematiteError::ParseError(format!(
5730        "Arithmetic requires numeric values, found {:?} and {:?}",
5731        left, right
5732    )))
5733}
5734
5735fn evaluate_exact_numeric_arithmetic(
5736    operator: &ArithmeticOperator,
5737    left: Value,
5738    right: Value,
5739) -> Result<Value> {
5740    let prefer_decimal = matches!(left, Value::Decimal(_))
5741        || matches!(right, Value::Decimal(_))
5742        || matches!(operator, ArithmeticOperator::Divide);
5743    let prefer_unsigned = !prefer_decimal
5744        && is_unsigned_integral_value(&left)
5745        && is_unsigned_integral_value(&right)
5746        && !matches!(operator, ArithmeticOperator::Subtract);
5747
5748    let left_decimal = coerce_decimal_value(left.clone())?;
5749    let right_decimal = coerce_decimal_value(right.clone())?;
5750    let result = match operator {
5751        ArithmeticOperator::Add => left_decimal.add(&right_decimal),
5752        ArithmeticOperator::Subtract => left_decimal.subtract(&right_decimal),
5753        ArithmeticOperator::Multiply => left_decimal.multiply(&right_decimal),
5754        ArithmeticOperator::Divide => left_decimal.divide(&right_decimal)?,
5755        ArithmeticOperator::Modulo => left_decimal.remainder(&right_decimal)?,
5756    };
5757
5758    if prefer_decimal || !result.is_integral() {
5759        Ok(Value::Decimal(result))
5760    } else {
5761        decimal_integral_result_to_value(result, prefer_unsigned)
5762    }
5763}
5764
5765fn evaluate_temporal_arithmetic(
5766    operator: &ArithmeticOperator,
5767    left: &Value,
5768    right: &Value,
5769) -> Result<Option<Value>> {
5770    match (left, right) {
5771        (Value::IntervalYearMonth(left), Value::IntervalYearMonth(right)) => {
5772            let total_months = match operator {
5773                ArithmeticOperator::Add => left.total_months().checked_add(right.total_months()),
5774                ArithmeticOperator::Subtract => {
5775                    left.total_months().checked_sub(right.total_months())
5776                }
5777                _ => None,
5778            };
5779            Ok(total_months
5780                .map(|value| Value::IntervalYearMonth(IntervalYearMonthValue::new(value))))
5781        }
5782        (Value::IntervalDaySecond(left), Value::IntervalDaySecond(right)) => {
5783            let total_seconds = match operator {
5784                ArithmeticOperator::Add => left.total_seconds().checked_add(right.total_seconds()),
5785                ArithmeticOperator::Subtract => {
5786                    left.total_seconds().checked_sub(right.total_seconds())
5787                }
5788                _ => None,
5789            };
5790            Ok(total_seconds
5791                .map(|value| Value::IntervalDaySecond(IntervalDaySecondValue::new(value))))
5792        }
5793        (Value::Date(left), Value::Date(right))
5794            if matches!(operator, ArithmeticOperator::Subtract) =>
5795        {
5796            Ok(Some(Value::BigInt(
5797                left.days_since_epoch() as i64 - right.days_since_epoch() as i64,
5798            )))
5799        }
5800        (Value::Date(left), Value::IntervalYearMonth(interval)) => {
5801            let months = signed_interval_months(operator, *interval)?;
5802            Ok(Some(Value::Date(add_months_to_date(*left, months)?)))
5803        }
5804        (Value::Date(left), Value::IntervalDaySecond(interval)) => {
5805            let days = whole_days_from_interval(operator, *interval)?;
5806            let result = left.days_since_epoch() as i64 + days;
5807            let result = i32::try_from(result).map_err(|_| {
5808                HematiteError::ParseError("DATE arithmetic overflowed supported range".to_string())
5809            })?;
5810            Ok(Some(Value::Date(DateValue::from_days_since_epoch(result))))
5811        }
5812        (Value::Date(left), right) => {
5813            let Some(days) = integral_rhs(right) else {
5814                return Ok(None);
5815            };
5816            let delta = match operator {
5817                ArithmeticOperator::Add => days,
5818                ArithmeticOperator::Subtract => -days,
5819                _ => return Ok(None),
5820            };
5821            let result = left.days_since_epoch() as i64 + delta;
5822            let result = i32::try_from(result).map_err(|_| {
5823                HematiteError::ParseError("DATE arithmetic overflowed supported range".to_string())
5824            })?;
5825            Ok(Some(Value::Date(DateValue::from_days_since_epoch(result))))
5826        }
5827
5828        (Value::DateTime(left), Value::DateTime(right))
5829            if matches!(operator, ArithmeticOperator::Subtract) =>
5830        {
5831            Ok(Some(Value::BigInt(
5832                left.seconds_since_epoch() - right.seconds_since_epoch(),
5833            )))
5834        }
5835        (Value::DateTime(left), Value::IntervalYearMonth(interval)) => Ok(Some(Value::DateTime(
5836            add_months_to_datetime(*left, signed_interval_months(operator, *interval)?)?,
5837        ))),
5838        (Value::DateTime(left), Value::IntervalDaySecond(interval)) => {
5839            let seconds = signed_interval_seconds(operator, *interval)?;
5840            Ok(Some(Value::DateTime(
5841                DateTimeValue::from_seconds_since_epoch(left.seconds_since_epoch() + seconds),
5842            )))
5843        }
5844        (Value::DateTime(left), right) => {
5845            let Some(seconds) = integral_rhs(right) else {
5846                return Ok(None);
5847            };
5848            let delta = match operator {
5849                ArithmeticOperator::Add => seconds,
5850                ArithmeticOperator::Subtract => -seconds,
5851                _ => return Ok(None),
5852            };
5853            Ok(Some(Value::DateTime(
5854                DateTimeValue::from_seconds_since_epoch(left.seconds_since_epoch() + delta),
5855            )))
5856        }
5857
5858        (Value::Time(left), Value::Time(right))
5859            if matches!(operator, ArithmeticOperator::Subtract) =>
5860        {
5861            Ok(Some(Value::Integer(
5862                left.seconds_since_midnight() as i32 - right.seconds_since_midnight() as i32,
5863            )))
5864        }
5865        (Value::Time(left), Value::IntervalDaySecond(interval)) => {
5866            let seconds = signed_interval_seconds(operator, *interval)?;
5867            Ok(Some(Value::Time(TimeValue::from_seconds_since_midnight(
5868                add_wrapped_seconds(left.seconds_since_midnight(), seconds),
5869            ))))
5870        }
5871        (Value::Time(left), right) => {
5872            let Some(seconds) = integral_rhs(right) else {
5873                return Ok(None);
5874            };
5875            let delta = match operator {
5876                ArithmeticOperator::Add => seconds,
5877                ArithmeticOperator::Subtract => -seconds,
5878                _ => return Ok(None),
5879            };
5880            Ok(Some(Value::Time(TimeValue::from_seconds_since_midnight(
5881                add_wrapped_seconds(left.seconds_since_midnight(), delta),
5882            ))))
5883        }
5884        (Value::TimeWithTimeZone(left), Value::IntervalDaySecond(interval)) => {
5885            let seconds = signed_interval_seconds(operator, *interval)?;
5886            Ok(Some(Value::TimeWithTimeZone(
5887                TimeWithTimeZoneValue::from_parts(
5888                    add_wrapped_seconds(left.seconds_since_midnight(), seconds),
5889                    left.offset_minutes(),
5890                ),
5891            )))
5892        }
5893        (Value::TimeWithTimeZone(left), right) => {
5894            let Some(seconds) = integral_rhs(right) else {
5895                return Ok(None);
5896            };
5897            let delta = match operator {
5898                ArithmeticOperator::Add => seconds,
5899                ArithmeticOperator::Subtract => -seconds,
5900                _ => return Ok(None),
5901            };
5902            Ok(Some(Value::TimeWithTimeZone(
5903                TimeWithTimeZoneValue::from_parts(
5904                    add_wrapped_seconds(left.seconds_since_midnight(), delta),
5905                    left.offset_minutes(),
5906                ),
5907            )))
5908        }
5909        _ => Ok(None),
5910    }
5911}
5912
5913fn add_wrapped_seconds(seconds_since_midnight: u32, delta: i64) -> u32 {
5914    (seconds_since_midnight as i64 + delta).rem_euclid(86_400) as u32
5915}
5916
5917fn signed_interval_months(
5918    operator: &ArithmeticOperator,
5919    interval: IntervalYearMonthValue,
5920) -> Result<i32> {
5921    match operator {
5922        ArithmeticOperator::Add => Ok(interval.total_months()),
5923        ArithmeticOperator::Subtract => interval.total_months().checked_neg().ok_or_else(|| {
5924            HematiteError::ParseError(
5925                "INTERVAL YEAR TO MONTH overflowed supported range".to_string(),
5926            )
5927        }),
5928        _ => Err(HematiteError::ParseError(
5929            "INTERVAL YEAR TO MONTH only supports addition and subtraction".to_string(),
5930        )),
5931    }
5932}
5933
5934fn signed_interval_seconds(
5935    operator: &ArithmeticOperator,
5936    interval: IntervalDaySecondValue,
5937) -> Result<i64> {
5938    match operator {
5939        ArithmeticOperator::Add => Ok(interval.total_seconds()),
5940        ArithmeticOperator::Subtract => interval.total_seconds().checked_neg().ok_or_else(|| {
5941            HematiteError::ParseError(
5942                "INTERVAL DAY TO SECOND overflowed supported range".to_string(),
5943            )
5944        }),
5945        _ => Err(HematiteError::ParseError(
5946            "INTERVAL DAY TO SECOND only supports addition and subtraction".to_string(),
5947        )),
5948    }
5949}
5950
5951fn whole_days_from_interval(
5952    operator: &ArithmeticOperator,
5953    interval: IntervalDaySecondValue,
5954) -> Result<i64> {
5955    let seconds = signed_interval_seconds(operator, interval)?;
5956    if seconds % 86_400 != 0 {
5957        return Err(HematiteError::ParseError(
5958            "DATE arithmetic requires INTERVAL DAY TO SECOND values aligned to whole days"
5959                .to_string(),
5960        ));
5961    }
5962    Ok(seconds / 86_400)
5963}
5964
5965fn add_months_to_date(value: DateValue, delta_months: i32) -> Result<DateValue> {
5966    let (year, month, day) = value.components();
5967    let total_months = year
5968        .checked_mul(12)
5969        .and_then(|total| total.checked_add(month as i32 - 1))
5970        .and_then(|total| total.checked_add(delta_months))
5971        .ok_or_else(|| {
5972            HematiteError::ParseError(
5973                "Temporal month arithmetic overflowed supported range".to_string(),
5974            )
5975        })?;
5976    let new_year = total_months.div_euclid(12);
5977    let new_month = total_months.rem_euclid(12) as u32 + 1;
5978    let clamped_day = day.min(executor_days_in_month(new_year, new_month));
5979    Ok(DateValue::from_days_since_epoch(executor_days_from_civil(
5980        new_year,
5981        new_month,
5982        clamped_day,
5983    )))
5984}
5985
5986fn add_months_to_datetime(value: DateTimeValue, delta_months: i32) -> Result<DateTimeValue> {
5987    let (date, time) = value.components();
5988    let shifted_date = add_months_to_date(date, delta_months)?;
5989    Ok(DateTimeValue::from_seconds_since_epoch(
5990        shifted_date.days_since_epoch() as i64 * 86_400 + time.seconds_since_midnight() as i64,
5991    ))
5992}
5993
5994fn executor_days_in_month(year: i32, month: u32) -> u32 {
5995    match month {
5996        1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
5997        4 | 6 | 9 | 11 => 30,
5998        2 if executor_is_leap_year(year) => 29,
5999        2 => 28,
6000        _ => unreachable!(),
6001    }
6002}
6003
6004fn executor_is_leap_year(year: i32) -> bool {
6005    (year % 4 == 0 && year % 100 != 0) || year % 400 == 0
6006}
6007
6008fn executor_days_from_civil(year: i32, month: u32, day: u32) -> i32 {
6009    let year = year - if month <= 2 { 1 } else { 0 };
6010    let era = if year >= 0 { year } else { year - 399 } / 400;
6011    let yoe = year - era * 400;
6012    let month = month as i32;
6013    let day = day as i32;
6014    let doy = (153 * (month + if month > 2 { -3 } else { 9 }) + 2) / 5 + day - 1;
6015    let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
6016    era * 146097 + doe - 719468
6017}
6018
6019fn integral_rhs(value: &Value) -> Option<i64> {
6020    match value {
6021        Value::Integer(value) => Some(*value as i64),
6022        Value::BigInt(value) => Some(*value),
6023        Value::Int128(value) => i64::try_from(*value).ok(),
6024        Value::UInteger(value) => Some(*value as i64),
6025        Value::UBigInt(value) => i64::try_from(*value).ok(),
6026        Value::UInt128(value) => i64::try_from(*value).ok(),
6027        _ => None,
6028    }
6029}
6030
6031fn auto_increment_value(column: &Column, next_row_id: u64) -> Result<Value> {
6032    match column.data_type {
6033        DataType::Int => i32::try_from(next_row_id)
6034            .map(Value::Integer)
6035            .map_err(|_| out_of_range_error(column, "INT")),
6036        DataType::UInt => u32::try_from(next_row_id)
6037            .map(Value::UInteger)
6038            .map_err(|_| out_of_range_error(column, "UINT")),
6039        _ => Err(HematiteError::ParseError(format!(
6040            "AUTO_INCREMENT column '{}' must use INT or UINT",
6041            column.name
6042        ))),
6043    }
6044}
6045
6046fn negate_numeric_value(value: Value) -> Result<Value> {
6047    match value {
6048        Value::Integer(value) => value.checked_neg().map(Value::Integer).ok_or_else(|| {
6049            HematiteError::ParseError("Integer overflow while evaluating unary '-'".to_string())
6050        }),
6051        Value::BigInt(value) => value.checked_neg().map(Value::BigInt).ok_or_else(|| {
6052            HematiteError::ParseError("INT64 overflow while evaluating unary '-'".to_string())
6053        }),
6054        Value::Int128(value) => value.checked_neg().map(Value::Int128).ok_or_else(|| {
6055            HematiteError::ParseError("INT128 overflow while evaluating unary '-'".to_string())
6056        }),
6057        Value::UInteger(value) => Ok(Value::BigInt(-(value as i64))),
6058        Value::UBigInt(value) => Ok(Value::Int128(-(value as i128))),
6059        Value::UInt128(value) => {
6060            if value > i128::MAX as u128 {
6061                return Err(HematiteError::ParseError(
6062                    "UINT128 overflow while evaluating unary '-'".to_string(),
6063                ));
6064            }
6065            Ok(Value::Int128(-(value as i128)))
6066        }
6067        Value::Float32(value) => Ok(Value::Float32(-value)),
6068        Value::Float(value) => Ok(Value::Float(-value)),
6069        Value::Decimal(value) => Ok(Value::Decimal(value.negate())),
6070        Value::Null => Ok(Value::Null),
6071        value => Err(HematiteError::ParseError(format!(
6072            "Unary '-' requires a numeric value, found {:?}",
6073            value
6074        ))),
6075    }
6076}
6077
6078fn evaluate_float_arithmetic(
6079    operator: &ArithmeticOperator,
6080    left: f64,
6081    right: f64,
6082) -> Result<Value> {
6083    let value = match operator {
6084        ArithmeticOperator::Add => left + right,
6085        ArithmeticOperator::Subtract => left - right,
6086        ArithmeticOperator::Multiply => left * right,
6087        ArithmeticOperator::Divide => {
6088            if right == 0.0 {
6089                return Err(HematiteError::ParseError("Division by zero".to_string()));
6090            }
6091            left / right
6092        }
6093        ArithmeticOperator::Modulo => {
6094            if right == 0.0 {
6095                return Err(HematiteError::ParseError("Division by zero".to_string()));
6096            }
6097            left % right
6098        }
6099    };
6100    Ok(Value::Float(value))
6101}
6102
6103fn is_exact_numeric_value(value: &Value) -> bool {
6104    matches!(
6105        value,
6106        Value::Integer(_)
6107            | Value::BigInt(_)
6108            | Value::Int128(_)
6109            | Value::UInteger(_)
6110            | Value::UBigInt(_)
6111            | Value::UInt128(_)
6112            | Value::Decimal(_)
6113    )
6114}
6115
6116fn is_unsigned_integral_value(value: &Value) -> bool {
6117    matches!(
6118        value,
6119        Value::UInteger(_) | Value::UBigInt(_) | Value::UInt128(_)
6120    )
6121}
6122
6123fn decimal_integral_result_to_value(value: DecimalValue, prefer_unsigned: bool) -> Result<Value> {
6124    debug_assert!(value.is_integral());
6125    if value.negative() {
6126        return value
6127            .to_integral_i128()
6128            .map(minimal_signed_value)
6129            .ok_or_else(|| {
6130                HematiteError::ParseError(
6131                    "Arithmetic overflowed the supported signed integer range".to_string(),
6132                )
6133            });
6134    }
6135
6136    if prefer_unsigned {
6137        return value
6138            .to_integral_u128()
6139            .map(minimal_unsigned_value)
6140            .ok_or_else(|| {
6141                HematiteError::ParseError(
6142                    "Arithmetic overflowed the supported unsigned integer range".to_string(),
6143                )
6144            });
6145    }
6146
6147    if let Some(signed) = value.to_integral_i128() {
6148        Ok(minimal_signed_value(signed))
6149    } else {
6150        value
6151            .to_integral_u128()
6152            .map(minimal_unsigned_value)
6153            .ok_or_else(|| {
6154                HematiteError::ParseError(
6155                    "Arithmetic overflowed the supported integer range".to_string(),
6156                )
6157            })
6158    }
6159}
6160
6161fn minimal_signed_value(value: i128) -> Value {
6162    if let Ok(value) = i32::try_from(value) {
6163        Value::Integer(value)
6164    } else if let Ok(value) = i64::try_from(value) {
6165        Value::BigInt(value)
6166    } else {
6167        Value::Int128(value)
6168    }
6169}
6170
6171fn minimal_unsigned_value(value: u128) -> Value {
6172    if let Ok(value) = u32::try_from(value) {
6173        Value::UInteger(value)
6174    } else if let Ok(value) = u64::try_from(value) {
6175        Value::UBigInt(value)
6176    } else {
6177        Value::UInt128(value)
6178    }
6179}
6180
6181fn cast_value_to_type(value: Value, data_type: DataType) -> Result<Value> {
6182    match (data_type.clone(), value) {
6183        (_, Value::Null) => Ok(Value::Null),
6184        (DataType::Int8, Value::Integer(value)) => i8::try_from(value)
6185            .map(|_| Value::Integer(value))
6186            .map_err(|_| {
6187                HematiteError::ParseError("Cannot CAST out-of-range INT AS INT8".to_string())
6188            }),
6189        (DataType::Int8, Value::Blob(bytes)) => decode_signed_blob(&bytes, 1, "INT8")
6190            .and_then(|value| {
6191                i8::try_from(value)
6192                    .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS INT8".to_string()))
6193            })
6194            .map(|value| Value::Integer(value as i32)),
6195        (DataType::Int16, Value::Integer(value)) => i16::try_from(value)
6196            .map(|_| Value::Integer(value))
6197            .map_err(|_| {
6198                HematiteError::ParseError("Cannot CAST out-of-range INT AS INT16".to_string())
6199            }),
6200        (DataType::Int16, Value::Blob(bytes)) => decode_signed_blob(&bytes, 2, "INT16")
6201            .and_then(|value| {
6202                i16::try_from(value)
6203                    .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS INT16".to_string()))
6204            })
6205            .map(|value| Value::Integer(value as i32)),
6206        (DataType::Int, Value::Integer(value)) => Ok(Value::Integer(value)),
6207        (DataType::Int, Value::BigInt(value)) => {
6208            i32::try_from(value).map(Value::Integer).map_err(|_| {
6209                HematiteError::ParseError("Cannot CAST out-of-range INT64 AS INT".to_string())
6210            })
6211        }
6212        (DataType::Int, Value::Int128(value)) => {
6213            i32::try_from(value).map(Value::Integer).map_err(|_| {
6214                HematiteError::ParseError("Cannot CAST out-of-range INT128 AS INT".to_string())
6215            })
6216        }
6217        (DataType::Int, Value::Float32(value)) => Ok(Value::Integer(value as i32)),
6218        (DataType::Int, Value::Float(value)) => Ok(Value::Integer(value as i32)),
6219        (DataType::Int, Value::Boolean(true)) => Ok(Value::Integer(1)),
6220        (DataType::Int, Value::Boolean(false)) => Ok(Value::Integer(0)),
6221        (DataType::Int, Value::Text(value)) => value
6222            .parse::<i32>()
6223            .map(Value::Integer)
6224            .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS INT", value))),
6225        (DataType::Int, Value::Blob(bytes)) => decode_signed_blob(&bytes, 4, "INT")
6226            .and_then(|value| {
6227                i32::try_from(value)
6228                    .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS INT".to_string()))
6229            })
6230            .map(Value::Integer),
6231        (DataType::Int64, Value::Integer(value)) => Ok(Value::BigInt(value as i64)),
6232        (DataType::Int64, Value::BigInt(value)) => Ok(Value::BigInt(value)),
6233        (DataType::Int64, Value::Int128(value)) => {
6234            i64::try_from(value).map(Value::BigInt).map_err(|_| {
6235                HematiteError::ParseError("Cannot CAST out-of-range INT128 AS INT64".to_string())
6236            })
6237        }
6238        (DataType::Int64, Value::Float32(value)) => Ok(Value::BigInt(value as i64)),
6239        (DataType::Int64, Value::Float(value)) => Ok(Value::BigInt(value as i64)),
6240        (DataType::Int64, Value::Boolean(true)) => Ok(Value::BigInt(1)),
6241        (DataType::Int64, Value::Boolean(false)) => Ok(Value::BigInt(0)),
6242        (DataType::Int64, Value::Text(value)) => value
6243            .parse::<i64>()
6244            .map(Value::BigInt)
6245            .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS INT64", value))),
6246        (DataType::Int64, Value::Blob(bytes)) => decode_signed_blob(&bytes, 8, "INT64")
6247            .and_then(|value| {
6248                i64::try_from(value)
6249                    .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS INT64".to_string()))
6250            })
6251            .map(Value::BigInt),
6252        (DataType::Int128, Value::Integer(value)) => Ok(Value::Int128(value as i128)),
6253        (DataType::Int128, Value::BigInt(value)) => Ok(Value::Int128(value as i128)),
6254        (DataType::Int128, Value::Int128(value)) => Ok(Value::Int128(value)),
6255        (DataType::Int128, Value::Float32(value)) => Ok(Value::Int128(value as i128)),
6256        (DataType::Int128, Value::Float(value)) => Ok(Value::Int128(value as i128)),
6257        (DataType::Int128, Value::Boolean(true)) => Ok(Value::Int128(1)),
6258        (DataType::Int128, Value::Boolean(false)) => Ok(Value::Int128(0)),
6259        (DataType::Int128, Value::Text(value)) => value
6260            .parse::<i128>()
6261            .map(Value::Int128)
6262            .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS INT128", value))),
6263        (DataType::Int128, Value::Blob(bytes)) => {
6264            decode_signed_blob(&bytes, 16, "INT128").map(Value::Int128)
6265        }
6266        (DataType::UInt8, Value::Integer(value)) if value >= 0 => u8::try_from(value)
6267            .map(|value| Value::UInteger(value as u32))
6268            .map_err(|_| {
6269                HematiteError::ParseError("Cannot CAST out-of-range INT AS UINT8".to_string())
6270            }),
6271        (DataType::UInt8, Value::Blob(bytes)) => decode_unsigned_blob(&bytes, 1, "UINT8")
6272            .and_then(|value| {
6273                u8::try_from(value)
6274                    .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS UINT8".to_string()))
6275            })
6276            .map(|value| Value::UInteger(value as u32)),
6277        (DataType::UInt16, Value::Integer(value)) if value >= 0 => u16::try_from(value)
6278            .map(|value| Value::UInteger(value as u32))
6279            .map_err(|_| {
6280                HematiteError::ParseError("Cannot CAST out-of-range INT AS UINT16".to_string())
6281            }),
6282        (DataType::UInt16, Value::Blob(bytes)) => decode_unsigned_blob(&bytes, 2, "UINT16")
6283            .and_then(|value| {
6284                u16::try_from(value).map_err(|_| {
6285                    HematiteError::ParseError("Cannot CAST blob AS UINT16".to_string())
6286                })
6287            })
6288            .map(|value| Value::UInteger(value as u32)),
6289        (DataType::UInt, Value::Integer(value)) if value >= 0 => Ok(Value::UInteger(value as u32)),
6290        (DataType::UInt, Value::BigInt(value)) if value >= 0 => {
6291            u32::try_from(value).map(Value::UInteger).map_err(|_| {
6292                HematiteError::ParseError("Cannot CAST out-of-range INT64 AS UINT".to_string())
6293            })
6294        }
6295        (DataType::UInt, Value::Int128(value)) if value >= 0 => {
6296            u32::try_from(value).map(Value::UInteger).map_err(|_| {
6297                HematiteError::ParseError("Cannot CAST out-of-range INT128 AS UINT".to_string())
6298            })
6299        }
6300        (DataType::UInt, Value::UInteger(value)) => Ok(Value::UInteger(value)),
6301        (DataType::UInt, Value::UBigInt(value)) => {
6302            u32::try_from(value).map(Value::UInteger).map_err(|_| {
6303                HematiteError::ParseError("Cannot CAST out-of-range UINT64 AS UINT".to_string())
6304            })
6305        }
6306        (DataType::UInt, Value::UInt128(value)) => {
6307            u32::try_from(value).map(Value::UInteger).map_err(|_| {
6308                HematiteError::ParseError("Cannot CAST out-of-range UINT128 AS UINT".to_string())
6309            })
6310        }
6311        (DataType::UInt, Value::Float32(value)) if value >= 0.0 => {
6312            Ok(Value::UInteger(value as u32))
6313        }
6314        (DataType::UInt, Value::Float(value)) if value >= 0.0 => Ok(Value::UInteger(value as u32)),
6315        (DataType::UInt, Value::Boolean(true)) => Ok(Value::UInteger(1)),
6316        (DataType::UInt, Value::Boolean(false)) => Ok(Value::UInteger(0)),
6317        (DataType::UInt, Value::Text(value)) => value
6318            .parse::<u32>()
6319            .map(Value::UInteger)
6320            .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS UINT", value))),
6321        (DataType::UInt, Value::Blob(bytes)) => decode_unsigned_blob(&bytes, 4, "UINT")
6322            .and_then(|value| {
6323                u32::try_from(value)
6324                    .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS UINT".to_string()))
6325            })
6326            .map(Value::UInteger),
6327        (DataType::UInt64, Value::Integer(value)) if value >= 0 => Ok(Value::UBigInt(value as u64)),
6328        (DataType::UInt64, Value::BigInt(value)) if value >= 0 => Ok(Value::UBigInt(value as u64)),
6329        (DataType::UInt64, Value::Int128(value)) if value >= 0 => {
6330            u64::try_from(value).map(Value::UBigInt).map_err(|_| {
6331                HematiteError::ParseError("Cannot CAST out-of-range INT128 AS UINT64".to_string())
6332            })
6333        }
6334        (DataType::UInt64, Value::UInteger(value)) => Ok(Value::UBigInt(value as u64)),
6335        (DataType::UInt64, Value::UBigInt(value)) => Ok(Value::UBigInt(value)),
6336        (DataType::UInt64, Value::UInt128(value)) => {
6337            u64::try_from(value).map(Value::UBigInt).map_err(|_| {
6338                HematiteError::ParseError("Cannot CAST out-of-range UINT128 AS UINT64".to_string())
6339            })
6340        }
6341        (DataType::UInt64, Value::Float32(value)) if value >= 0.0 => {
6342            Ok(Value::UBigInt(value as u64))
6343        }
6344        (DataType::UInt64, Value::Float(value)) if value >= 0.0 => Ok(Value::UBigInt(value as u64)),
6345        (DataType::UInt64, Value::Boolean(true)) => Ok(Value::UBigInt(1)),
6346        (DataType::UInt64, Value::Boolean(false)) => Ok(Value::UBigInt(0)),
6347        (DataType::UInt64, Value::Text(value)) => value
6348            .parse::<u64>()
6349            .map(Value::UBigInt)
6350            .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS UINT64", value))),
6351        (DataType::UInt64, Value::Blob(bytes)) => decode_unsigned_blob(&bytes, 8, "UINT64")
6352            .and_then(|value| {
6353                u64::try_from(value).map_err(|_| {
6354                    HematiteError::ParseError("Cannot CAST blob AS UINT64".to_string())
6355                })
6356            })
6357            .map(Value::UBigInt),
6358        (DataType::UInt128, Value::Integer(value)) if value >= 0 => {
6359            Ok(Value::UInt128(value as u128))
6360        }
6361        (DataType::UInt128, Value::BigInt(value)) if value >= 0 => {
6362            Ok(Value::UInt128(value as u128))
6363        }
6364        (DataType::UInt128, Value::Int128(value)) if value >= 0 => {
6365            Ok(Value::UInt128(value as u128))
6366        }
6367        (DataType::UInt128, Value::UInteger(value)) => Ok(Value::UInt128(value as u128)),
6368        (DataType::UInt128, Value::UBigInt(value)) => Ok(Value::UInt128(value as u128)),
6369        (DataType::UInt128, Value::UInt128(value)) => Ok(Value::UInt128(value)),
6370        (DataType::UInt128, Value::Float32(value)) if value >= 0.0 => {
6371            Ok(Value::UInt128(value as u128))
6372        }
6373        (DataType::UInt128, Value::Float(value)) if value >= 0.0 => {
6374            Ok(Value::UInt128(value as u128))
6375        }
6376        (DataType::UInt128, Value::Boolean(true)) => Ok(Value::UInt128(1)),
6377        (DataType::UInt128, Value::Boolean(false)) => Ok(Value::UInt128(0)),
6378        (DataType::UInt128, Value::Text(value)) => value
6379            .parse::<u128>()
6380            .map(Value::UInt128)
6381            .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS UINT128", value))),
6382        (DataType::UInt128, Value::Blob(bytes)) => {
6383            decode_unsigned_blob(&bytes, 16, "UINT128").map(Value::UInt128)
6384        }
6385        (DataType::Text, value) => cast_value_to_text_string(value).map(Value::Text),
6386        (DataType::Char(length), value) => {
6387            coerce_char_value(cast_value_to_text_string(value)?, length, "CAST")
6388        }
6389        (DataType::VarChar(length), value) => {
6390            coerce_varchar_value(cast_value_to_text_string(value)?, length, "CAST")
6391        }
6392        (DataType::Binary(length), value) => coerce_binary_value(value, length, "CAST", true),
6393        (DataType::VarBinary(length), value) => coerce_binary_value(value, length, "CAST", false),
6394        (DataType::Enum(values), value) => coerce_enum_value(value, &values, "CAST"),
6395        (DataType::Boolean, Value::Boolean(value)) => Ok(Value::Boolean(value)),
6396        (DataType::Boolean, Value::Integer(value)) => Ok(Value::Boolean(value != 0)),
6397        (DataType::Boolean, Value::BigInt(value)) => Ok(Value::Boolean(value != 0)),
6398        (DataType::Boolean, Value::Int128(value)) => Ok(Value::Boolean(value != 0)),
6399        (DataType::Boolean, Value::UInteger(value)) => Ok(Value::Boolean(value != 0)),
6400        (DataType::Boolean, Value::UBigInt(value)) => Ok(Value::Boolean(value != 0)),
6401        (DataType::Boolean, Value::UInt128(value)) => Ok(Value::Boolean(value != 0)),
6402        (DataType::Boolean, Value::Float32(value)) => Ok(Value::Boolean(value != 0.0)),
6403        (DataType::Boolean, Value::Float(value)) => Ok(Value::Boolean(value != 0.0)),
6404        (DataType::Boolean, Value::Text(value)) => match value.to_ascii_uppercase().as_str() {
6405            "TRUE" | "1" => Ok(Value::Boolean(true)),
6406            "FALSE" | "0" => Ok(Value::Boolean(false)),
6407            _ => Err(HematiteError::ParseError(format!(
6408                "Cannot CAST '{}' AS BOOLEAN",
6409                value
6410            ))),
6411        },
6412        (data_type @ (DataType::Float32 | DataType::Float), Value::Text(value)) => value
6413            .parse::<f64>()
6414            .map(|value| make_float_value(&data_type, value))
6415            .map_err(|_| {
6416                HematiteError::ParseError(format!(
6417                    "Cannot CAST '{}' AS {}",
6418                    value,
6419                    float_type_name(&data_type)
6420                ))
6421            }),
6422        (data_type @ (DataType::Float32 | DataType::Float), Value::Boolean(true)) => {
6423            Ok(make_float_value(&data_type, 1.0))
6424        }
6425        (data_type @ (DataType::Float32 | DataType::Float), Value::Boolean(false)) => {
6426            Ok(make_float_value(&data_type, 0.0))
6427        }
6428        (data_type @ (DataType::Float32 | DataType::Float), value) => {
6429            if let Some(number) = numeric_value_as_f64(&value) {
6430                Ok(make_float_value(&data_type, number))
6431            } else {
6432                Err(HematiteError::ParseError(format!(
6433                    "Cannot CAST '{:?}' AS {}",
6434                    value,
6435                    float_type_name(&data_type)
6436                )))
6437            }
6438        }
6439        (DataType::Decimal { precision, scale }, value) => {
6440            let decimal = coerce_decimal_value(value)?;
6441            if !decimal.fits_precision_scale(precision, scale) {
6442                return Err(HematiteError::ParseError(format!(
6443                    "Cannot CAST decimal outside declared precision/scale AS {}",
6444                    data_type.base_name()
6445                )));
6446            }
6447            Ok(Value::Decimal(decimal))
6448        }
6449        (DataType::Blob, Value::Blob(value)) => Ok(Value::Blob(value)),
6450        (DataType::Blob, Value::Text(value)) => Ok(Value::Blob(value.into_bytes())),
6451        (DataType::Blob, Value::Integer(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6452        (DataType::Blob, Value::BigInt(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6453        (DataType::Blob, Value::Int128(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6454        (DataType::Blob, Value::UInteger(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6455        (DataType::Blob, Value::UBigInt(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6456        (DataType::Blob, Value::UInt128(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6457        (DataType::Date, Value::Date(value)) => Ok(Value::Date(value)),
6458        (DataType::Date, Value::Text(value)) => Ok(Value::Date(validate_date_string(&value)?)),
6459        (DataType::Time, Value::Time(value)) => Ok(Value::Time(value)),
6460        (DataType::Time, Value::Text(value)) => Ok(Value::Time(validate_time_string(&value)?)),
6461        (DataType::DateTime, Value::DateTime(value)) => Ok(Value::DateTime(value)),
6462        (DataType::DateTime, Value::Text(value)) => {
6463            Ok(Value::DateTime(validate_datetime_string(&value)?))
6464        }
6465        (DataType::TimeWithTimeZone, Value::TimeWithTimeZone(value)) => {
6466            Ok(Value::TimeWithTimeZone(value))
6467        }
6468        (DataType::TimeWithTimeZone, Value::Text(value)) => Ok(Value::TimeWithTimeZone(
6469            validate_time_with_time_zone_string(&value)?,
6470        )),
6471        (DataType::IntervalYearMonth, value) => Ok(Value::IntervalYearMonth(
6472            IntervalYearMonthValue::parse(&cast_value_to_text_string(value)?)?,
6473        )),
6474        (DataType::IntervalDaySecond, value) => Ok(Value::IntervalDaySecond(
6475            IntervalDaySecondValue::parse(&cast_value_to_text_string(value)?)?,
6476        )),
6477        (data_type, value) => Err(HematiteError::ParseError(format!(
6478            "Cannot CAST {:?} AS {}",
6479            value,
6480            data_type.name()
6481        ))),
6482    }
6483}
6484
6485fn evaluate_scalar_function(function: ScalarFunction, args: Vec<Value>) -> Result<Value> {
6486    match function {
6487        ScalarFunction::Coalesce => evaluate_coalesce(args),
6488        ScalarFunction::IfNull => evaluate_ifnull(args),
6489        ScalarFunction::NullIf => evaluate_nullif(args),
6490        ScalarFunction::DateFn => evaluate_date_fn(args),
6491        ScalarFunction::TimeFn => evaluate_time_fn(args),
6492        ScalarFunction::Year => evaluate_year(args),
6493        ScalarFunction::Month => evaluate_month(args),
6494        ScalarFunction::Day => evaluate_day(args),
6495        ScalarFunction::Hour => evaluate_hour(args),
6496        ScalarFunction::Minute => evaluate_minute(args),
6497        ScalarFunction::Second => evaluate_second(args),
6498        ScalarFunction::TimeToSec => evaluate_time_to_sec(args),
6499        ScalarFunction::SecToTime => evaluate_sec_to_time(args),
6500        ScalarFunction::UnixTimestamp => evaluate_unix_timestamp(args),
6501        ScalarFunction::Lower => evaluate_lower(args),
6502        ScalarFunction::Upper => evaluate_upper(args),
6503        ScalarFunction::Length => evaluate_length(args),
6504        ScalarFunction::OctetLength => evaluate_octet_length(args),
6505        ScalarFunction::BitLength => evaluate_bit_length(args),
6506        ScalarFunction::Trim => evaluate_trim(args),
6507        ScalarFunction::Abs => evaluate_abs(args),
6508        ScalarFunction::Round => evaluate_round(args),
6509        ScalarFunction::Concat => evaluate_concat(args),
6510        ScalarFunction::ConcatWs => evaluate_concat_ws(args),
6511        ScalarFunction::Substring => evaluate_substring(args),
6512        ScalarFunction::LeftFn => evaluate_left(args),
6513        ScalarFunction::RightFn => evaluate_right(args),
6514        ScalarFunction::Greatest => evaluate_extremum("GREATEST", args, true),
6515        ScalarFunction::Least => evaluate_extremum("LEAST", args, false),
6516        ScalarFunction::Replace => evaluate_replace(args),
6517        ScalarFunction::Repeat => evaluate_repeat(args),
6518        ScalarFunction::Reverse => evaluate_reverse(args),
6519        ScalarFunction::Locate => evaluate_locate(args),
6520        ScalarFunction::Hex => evaluate_hex(args),
6521        ScalarFunction::Unhex => evaluate_unhex(args),
6522        ScalarFunction::Ceil => evaluate_ceil(args),
6523        ScalarFunction::Floor => evaluate_floor(args),
6524        ScalarFunction::Power => evaluate_power(args),
6525    }
6526}
6527
6528fn evaluate_coalesce(args: Vec<Value>) -> Result<Value> {
6529    if args.is_empty() {
6530        return Err(HematiteError::ParseError(
6531            "COALESCE requires at least one argument".to_string(),
6532        ));
6533    }
6534
6535    for arg in args {
6536        if !arg.is_null() {
6537            return Ok(arg);
6538        }
6539    }
6540
6541    Ok(Value::Null)
6542}
6543
6544fn evaluate_date_fn(args: Vec<Value>) -> Result<Value> {
6545    expect_unary_temporal_function("DATE", args, |value| {
6546        Ok(Value::Date(extract_date_component("DATE", value)?))
6547    })
6548}
6549
6550fn evaluate_time_fn(args: Vec<Value>) -> Result<Value> {
6551    expect_unary_temporal_function("TIME", args, |value| {
6552        Ok(Value::Time(extract_time_component("TIME", value)?))
6553    })
6554}
6555
6556fn evaluate_year(args: Vec<Value>) -> Result<Value> {
6557    expect_unary_temporal_function("YEAR", args, |value| {
6558        let (year, _, _) = extract_date_component("YEAR", value)?.components();
6559        Ok(Value::Integer(year))
6560    })
6561}
6562
6563fn evaluate_month(args: Vec<Value>) -> Result<Value> {
6564    expect_unary_temporal_function("MONTH", args, |value| {
6565        let (_, month, _) = extract_date_component("MONTH", value)?.components();
6566        Ok(Value::Integer(month as i32))
6567    })
6568}
6569
6570fn evaluate_day(args: Vec<Value>) -> Result<Value> {
6571    expect_unary_temporal_function("DAY", args, |value| {
6572        let (_, _, day) = extract_date_component("DAY", value)?.components();
6573        Ok(Value::Integer(day as i32))
6574    })
6575}
6576
6577fn evaluate_hour(args: Vec<Value>) -> Result<Value> {
6578    expect_unary_temporal_function("HOUR", args, |value| {
6579        let (hour, _, _) = extract_time_component("HOUR", value)?.components();
6580        Ok(Value::Integer(hour as i32))
6581    })
6582}
6583
6584fn evaluate_minute(args: Vec<Value>) -> Result<Value> {
6585    expect_unary_temporal_function("MINUTE", args, |value| {
6586        let (_, minute, _) = extract_time_component("MINUTE", value)?.components();
6587        Ok(Value::Integer(minute as i32))
6588    })
6589}
6590
6591fn evaluate_second(args: Vec<Value>) -> Result<Value> {
6592    expect_unary_temporal_function("SECOND", args, |value| {
6593        let (_, _, second) = extract_time_component("SECOND", value)?.components();
6594        Ok(Value::Integer(second as i32))
6595    })
6596}
6597
6598fn evaluate_time_to_sec(args: Vec<Value>) -> Result<Value> {
6599    expect_unary_temporal_function("TIME_TO_SEC", args, |value| {
6600        Ok(Value::BigInt(
6601            extract_time_component("TIME_TO_SEC", value)?.seconds_since_midnight() as i64,
6602        ))
6603    })
6604}
6605
6606fn evaluate_sec_to_time(args: Vec<Value>) -> Result<Value> {
6607    if args.len() != 1 {
6608        return Err(HematiteError::ParseError(
6609            "SEC_TO_TIME requires exactly one argument".to_string(),
6610        ));
6611    }
6612
6613    let value = args.into_iter().next().expect("validated arity");
6614    match value {
6615        Value::Null => Ok(Value::Null),
6616        Value::Integer(value) => Ok(Value::Time(TimeValue::from_seconds_since_midnight(
6617            add_wrapped_seconds(0, value as i64),
6618        ))),
6619        Value::BigInt(value) => Ok(Value::Time(TimeValue::from_seconds_since_midnight(
6620            add_wrapped_seconds(0, value),
6621        ))),
6622        value => Err(HematiteError::ParseError(format!(
6623            "SEC_TO_TIME requires an integer value, found {:?}",
6624            value
6625        ))),
6626    }
6627}
6628
6629fn evaluate_unix_timestamp(args: Vec<Value>) -> Result<Value> {
6630    expect_unary_temporal_function("UNIX_TIMESTAMP", args, |value| {
6631        let timestamp = extract_timestamp_component("UNIX_TIMESTAMP", value)?;
6632        Ok(Value::BigInt(timestamp.seconds_since_epoch()))
6633    })
6634}
6635
6636fn evaluate_ifnull(args: Vec<Value>) -> Result<Value> {
6637    if args.len() != 2 {
6638        return Err(HematiteError::ParseError(
6639            "IFNULL requires exactly two arguments".to_string(),
6640        ));
6641    }
6642
6643    let mut args = args.into_iter();
6644    let first = args.next().expect("ifnull validated arity");
6645    let second = args.next().expect("ifnull validated arity");
6646    if first.is_null() {
6647        Ok(second)
6648    } else {
6649        Ok(first)
6650    }
6651}
6652
6653fn evaluate_nullif(args: Vec<Value>) -> Result<Value> {
6654    if args.len() != 2 {
6655        return Err(HematiteError::ParseError(
6656            "NULLIF requires exactly two arguments".to_string(),
6657        ));
6658    }
6659
6660    let mut args = args.into_iter();
6661    let left = args.next().expect("nullif validated arity");
6662    let right = args.next().expect("nullif validated arity");
6663    if left.is_null() {
6664        return Ok(Value::Null);
6665    }
6666    if right.is_null() {
6667        return Ok(left);
6668    }
6669    if sql_values_equal(&left, &right, None) {
6670        Ok(Value::Null)
6671    } else {
6672        Ok(left)
6673    }
6674}
6675
6676fn evaluate_lower(args: Vec<Value>) -> Result<Value> {
6677    expect_unary_text_function("LOWER", args, |text| Ok(Value::Text(text.to_lowercase())))
6678}
6679
6680fn evaluate_upper(args: Vec<Value>) -> Result<Value> {
6681    expect_unary_text_function("UPPER", args, |text| Ok(Value::Text(text.to_uppercase())))
6682}
6683
6684fn evaluate_length(args: Vec<Value>) -> Result<Value> {
6685    expect_unary_length_function("LENGTH", args, |value| match value {
6686        Value::Text(text) => {
6687            let len = i32::try_from(text.chars().count()).map_err(|_| {
6688                HematiteError::ParseError("LENGTH result overflowed INT".to_string())
6689            })?;
6690            Ok(Value::Integer(len))
6691        }
6692        Value::Blob(bytes) => {
6693            let len = i32::try_from(bytes.len()).map_err(|_| {
6694                HematiteError::ParseError("LENGTH result overflowed INT".to_string())
6695            })?;
6696            Ok(Value::Integer(len))
6697        }
6698        value => Err(HematiteError::ParseError(format!(
6699            "LENGTH requires a text or blob value, found {:?}",
6700            value
6701        ))),
6702    })
6703}
6704
6705fn evaluate_octet_length(args: Vec<Value>) -> Result<Value> {
6706    expect_unary_length_function("OCTET_LENGTH", args, |value| {
6707        let len = match value {
6708            Value::Text(text) => text.len(),
6709            Value::Enum(text) => text.len(),
6710            Value::Blob(bytes) => bytes.len(),
6711            value => {
6712                return Err(HematiteError::ParseError(format!(
6713                    "OCTET_LENGTH requires a text or blob value, found {:?}",
6714                    value
6715                )))
6716            }
6717        };
6718        let len = i32::try_from(len).map_err(|_| {
6719            HematiteError::ParseError("OCTET_LENGTH result overflowed INT".to_string())
6720        })?;
6721        Ok(Value::Integer(len))
6722    })
6723}
6724
6725fn evaluate_bit_length(args: Vec<Value>) -> Result<Value> {
6726    expect_unary_length_function("BIT_LENGTH", args, |value| {
6727        let len = match evaluate_octet_length(vec![value])? {
6728            Value::Integer(length) => length,
6729            Value::Null => return Ok(Value::Null),
6730            _ => unreachable!("validated OCTET_LENGTH shape"),
6731        };
6732        len.checked_mul(8).map(Value::Integer).ok_or_else(|| {
6733            HematiteError::ParseError("BIT_LENGTH result overflowed INT".to_string())
6734        })
6735    })
6736}
6737
6738fn evaluate_trim(args: Vec<Value>) -> Result<Value> {
6739    expect_unary_text_function("TRIM", args, |text| {
6740        Ok(Value::Text(text.trim().to_string()))
6741    })
6742}
6743
6744fn expect_unary_text_function<F>(name: &str, args: Vec<Value>, f: F) -> Result<Value>
6745where
6746    F: FnOnce(&str) -> Result<Value>,
6747{
6748    if args.len() != 1 {
6749        return Err(HematiteError::ParseError(format!(
6750            "{} requires exactly one argument",
6751            name
6752        )));
6753    }
6754
6755    let value = args.into_iter().next().expect("validated unary arity");
6756    match value {
6757        Value::Null => Ok(Value::Null),
6758        Value::Text(text) => f(&text),
6759        value => Err(HematiteError::ParseError(format!(
6760            "{} requires a text value, found {:?}",
6761            name, value
6762        ))),
6763    }
6764}
6765
6766fn expect_unary_length_function<F>(name: &str, args: Vec<Value>, f: F) -> Result<Value>
6767where
6768    F: FnOnce(Value) -> Result<Value>,
6769{
6770    if args.len() != 1 {
6771        return Err(HematiteError::ParseError(format!(
6772            "{} requires exactly one argument",
6773            name
6774        )));
6775    }
6776
6777    let value = args.into_iter().next().expect("validated unary arity");
6778    match value {
6779        Value::Null => Ok(Value::Null),
6780        value => f(value),
6781    }
6782}
6783
6784fn expect_unary_temporal_function<F>(name: &str, args: Vec<Value>, f: F) -> Result<Value>
6785where
6786    F: FnOnce(Value) -> Result<Value>,
6787{
6788    if args.len() != 1 {
6789        return Err(HematiteError::ParseError(format!(
6790            "{} requires exactly one argument",
6791            name
6792        )));
6793    }
6794
6795    let value = args.into_iter().next().expect("validated unary arity");
6796    match value {
6797        Value::Null => Ok(Value::Null),
6798        value => f(value),
6799    }
6800}
6801
6802fn evaluate_abs(args: Vec<Value>) -> Result<Value> {
6803    expect_unary_numeric_function("ABS", args, |value| match value {
6804        Value::Integer(value) => {
6805            if value == i32::MIN {
6806                return Err(HematiteError::ParseError("ABS overflowed INT".to_string()));
6807            }
6808            Ok(Value::Integer(value.abs()))
6809        }
6810        Value::BigInt(value) => {
6811            if value == i64::MIN {
6812                return Err(HematiteError::ParseError(
6813                    "ABS overflowed INT64".to_string(),
6814                ));
6815            }
6816            Ok(Value::BigInt(value.abs()))
6817        }
6818        Value::Int128(value) => value
6819            .checked_abs()
6820            .map(Value::Int128)
6821            .ok_or_else(|| HematiteError::ParseError("ABS overflowed INT128".to_string())),
6822        Value::UInteger(value) => Ok(Value::UInteger(value)),
6823        Value::UBigInt(value) => Ok(Value::UBigInt(value)),
6824        Value::UInt128(value) => Ok(Value::UInt128(value)),
6825        Value::Float32(value) => Ok(Value::Float32(value.abs())),
6826        Value::Float(value) => Ok(Value::Float(value.abs())),
6827        _ => unreachable!("validated numeric input"),
6828    })
6829}
6830
6831fn evaluate_round(args: Vec<Value>) -> Result<Value> {
6832    if args.is_empty() || args.len() > 2 {
6833        return Err(HematiteError::ParseError(
6834            "ROUND requires one or two arguments".to_string(),
6835        ));
6836    }
6837
6838    let mut args = args.into_iter();
6839    let value = args.next().expect("validated round arity");
6840    let precision = match args.next() {
6841        Some(Value::Null) => return Ok(Value::Null),
6842        Some(Value::Integer(value)) => value,
6843        Some(value) => {
6844            return Err(HematiteError::ParseError(format!(
6845                "ROUND precision requires an integer value, found {:?}",
6846                value
6847            )))
6848        }
6849        None => 0,
6850    };
6851
6852    match value {
6853        Value::Null => Ok(Value::Null),
6854        Value::Integer(value) => round_integer(value, precision),
6855        Value::BigInt(value) => round_bigint(value, precision),
6856        Value::Int128(value) => round_int128(value, precision),
6857        Value::UInteger(value) => round_uinteger(value, precision),
6858        Value::UBigInt(value) => round_ubigint(value, precision),
6859        Value::UInt128(value) => round_uint128(value, precision),
6860        Value::Float32(value) => Ok(Value::Float32(round_float(value as f64, precision) as f32)),
6861        Value::Float(value) => Ok(Value::Float(round_float(value, precision))),
6862        value => Err(HematiteError::ParseError(format!(
6863            "ROUND requires a numeric value, found {:?}",
6864            value
6865        ))),
6866    }
6867}
6868
6869fn evaluate_concat(args: Vec<Value>) -> Result<Value> {
6870    if args.is_empty() {
6871        return Err(HematiteError::ParseError(
6872            "CONCAT requires at least one argument".to_string(),
6873        ));
6874    }
6875
6876    let mut out = String::new();
6877    for arg in args {
6878        if arg.is_null() {
6879            return Ok(Value::Null);
6880        }
6881        out.push_str(&coerce_value_to_string("CONCAT", arg)?);
6882    }
6883    Ok(Value::Text(out))
6884}
6885
6886fn evaluate_concat_ws(args: Vec<Value>) -> Result<Value> {
6887    if args.len() < 2 {
6888        return Err(HematiteError::ParseError(
6889            "CONCAT_WS requires at least two arguments".to_string(),
6890        ));
6891    }
6892
6893    let mut args = args.into_iter();
6894    let separator = args.next().expect("concat_ws validated arity");
6895    if separator.is_null() {
6896        return Ok(Value::Null);
6897    }
6898    let separator = coerce_value_to_string("CONCAT_WS", separator)?;
6899
6900    let mut parts = Vec::new();
6901    for arg in args {
6902        if arg.is_null() {
6903            continue;
6904        }
6905        parts.push(coerce_value_to_string("CONCAT_WS", arg)?);
6906    }
6907
6908    Ok(Value::Text(parts.join(&separator)))
6909}
6910
6911fn evaluate_substring(args: Vec<Value>) -> Result<Value> {
6912    if args.len() != 2 && args.len() != 3 {
6913        return Err(HematiteError::ParseError(
6914            "SUBSTRING requires two or three arguments".to_string(),
6915        ));
6916    }
6917
6918    let mut args = args.into_iter();
6919    let text = args.next().expect("validated substring arity");
6920    let start = args.next().expect("validated substring arity");
6921    let len = args.next();
6922
6923    if text.is_null() || start.is_null() || len.as_ref().is_some_and(Value::is_null) {
6924        return Ok(Value::Null);
6925    }
6926
6927    let text = expect_text_argument("SUBSTRING", text)?;
6928    let start = expect_integer_argument("SUBSTRING", start, "start position")?;
6929    let len = len
6930        .map(|value| expect_integer_argument("SUBSTRING", value, "length"))
6931        .transpose()?;
6932
6933    substring_chars(&text, start, len)
6934}
6935
6936fn evaluate_left(args: Vec<Value>) -> Result<Value> {
6937    if args.len() != 2 {
6938        return Err(HematiteError::ParseError(
6939            "LEFT requires exactly two arguments".to_string(),
6940        ));
6941    }
6942
6943    let mut args = args.into_iter();
6944    let text = args.next().expect("validated left arity");
6945    let len = args.next().expect("validated left arity");
6946    if text.is_null() || len.is_null() {
6947        return Ok(Value::Null);
6948    }
6949
6950    let text = expect_text_argument("LEFT", text)?;
6951    let len = expect_integer_argument("LEFT", len, "length")?;
6952    if len < 0 {
6953        return Ok(Value::Text(String::new()));
6954    }
6955
6956    let out = text.chars().take(len as usize).collect::<String>();
6957    Ok(Value::Text(out))
6958}
6959
6960fn evaluate_right(args: Vec<Value>) -> Result<Value> {
6961    if args.len() != 2 {
6962        return Err(HematiteError::ParseError(
6963            "RIGHT requires exactly two arguments".to_string(),
6964        ));
6965    }
6966
6967    let mut args = args.into_iter();
6968    let text = args.next().expect("validated right arity");
6969    let len = args.next().expect("validated right arity");
6970    if text.is_null() || len.is_null() {
6971        return Ok(Value::Null);
6972    }
6973
6974    let text = expect_text_argument("RIGHT", text)?;
6975    let len = expect_integer_argument("RIGHT", len, "length")?;
6976    if len < 0 {
6977        return Ok(Value::Text(String::new()));
6978    }
6979
6980    let chars = text.chars().collect::<Vec<_>>();
6981    let take = len as usize;
6982    let start = chars.len().saturating_sub(take);
6983    let out = chars[start..].iter().collect::<String>();
6984    Ok(Value::Text(out))
6985}
6986
6987fn expect_unary_numeric_function<F>(name: &str, args: Vec<Value>, f: F) -> Result<Value>
6988where
6989    F: FnOnce(Value) -> Result<Value>,
6990{
6991    if args.len() != 1 {
6992        return Err(HematiteError::ParseError(format!(
6993            "{} requires exactly one argument",
6994            name
6995        )));
6996    }
6997
6998    let value = args.into_iter().next().expect("validated unary arity");
6999    match value {
7000        Value::Null => Ok(Value::Null),
7001        Value::Integer(_)
7002        | Value::BigInt(_)
7003        | Value::Int128(_)
7004        | Value::UInteger(_)
7005        | Value::UBigInt(_)
7006        | Value::UInt128(_)
7007        | Value::Float32(_)
7008        | Value::Float(_) => f(value),
7009        value => Err(HematiteError::ParseError(format!(
7010            "{} requires a numeric value, found {:?}",
7011            name, value
7012        ))),
7013    }
7014}
7015
7016fn expect_numeric_argument(function_name: &str, value: Value) -> Result<f64> {
7017    match value {
7018        Value::Decimal(value) => value.to_string().parse::<f64>().map_err(|_| {
7019            HematiteError::ParseError(format!(
7020                "{} requires a numeric value, found {:?}",
7021                function_name,
7022                Value::Decimal(value.clone())
7023            ))
7024        }),
7025        value if numeric_value_as_f64(&value).is_some() => {
7026            Ok(numeric_value_as_f64(&value).unwrap())
7027        }
7028        value => Err(HematiteError::ParseError(format!(
7029            "{} requires a numeric value, found {:?}",
7030            function_name, value
7031        ))),
7032    }
7033}
7034
7035fn coerce_value_to_string(function_name: &str, value: Value) -> Result<String> {
7036    match value {
7037        Value::Text(text) => Ok(text),
7038        Value::Enum(text) => Ok(text),
7039        Value::Decimal(text) => Ok(text.to_string()),
7040        Value::Date(text) => Ok(text.to_string()),
7041        Value::Time(text) => Ok(text.to_string()),
7042        Value::DateTime(text) => Ok(text.to_string()),
7043        Value::TimeWithTimeZone(text) => Ok(text.to_string()),
7044        Value::IntervalYearMonth(text) => Ok(text.to_string()),
7045        Value::IntervalDaySecond(text) => Ok(text.to_string()),
7046        Value::Integer(value) => Ok(value.to_string()),
7047        Value::BigInt(value) => Ok(value.to_string()),
7048        Value::Int128(value) => Ok(value.to_string()),
7049        Value::UInteger(value) => Ok(value.to_string()),
7050        Value::UBigInt(value) => Ok(value.to_string()),
7051        Value::UInt128(value) => Ok(value.to_string()),
7052        Value::Float32(value) => Ok(value.to_string()),
7053        Value::Float(value) => Ok(value.to_string()),
7054        Value::Boolean(true) => Ok("TRUE".to_string()),
7055        Value::Boolean(false) => Ok("FALSE".to_string()),
7056        Value::Blob(value) => Ok(String::from_utf8_lossy(&value).into_owned()),
7057        Value::Null => Err(HematiteError::ParseError(format!(
7058            "{} cannot stringify NULL directly",
7059            function_name
7060        ))),
7061    }
7062}
7063
7064fn expect_text_argument(function_name: &str, value: Value) -> Result<String> {
7065    match value {
7066        Value::Text(text) => Ok(text),
7067        Value::Enum(text) => Ok(text),
7068        Value::Decimal(text) => Ok(text.to_string()),
7069        Value::Date(text) => Ok(text.to_string()),
7070        Value::Time(text) => Ok(text.to_string()),
7071        Value::DateTime(text) => Ok(text.to_string()),
7072        Value::TimeWithTimeZone(text) => Ok(text.to_string()),
7073        Value::Integer(value) => Ok(value.to_string()),
7074        Value::BigInt(value) => Ok(value.to_string()),
7075        Value::Int128(value) => Ok(value.to_string()),
7076        Value::UInteger(value) => Ok(value.to_string()),
7077        Value::UBigInt(value) => Ok(value.to_string()),
7078        Value::UInt128(value) => Ok(value.to_string()),
7079        Value::Float32(value) => Ok(value.to_string()),
7080        Value::Float(value) => Ok(value.to_string()),
7081        value => Err(HematiteError::ParseError(format!(
7082            "{} requires a text value, found {:?}",
7083            function_name, value
7084        ))),
7085    }
7086}
7087
7088fn expect_integer_argument(function_name: &str, value: Value, label: &str) -> Result<i32> {
7089    match value {
7090        Value::Null => Err(HematiteError::ParseError(format!(
7091            "{} {} cannot be NULL",
7092            function_name, label
7093        ))),
7094        Value::Integer(value) => Ok(value),
7095        Value::BigInt(value) => i32::try_from(value).map_err(|_| {
7096            HematiteError::ParseError(format!(
7097                "{} {} requires a 32-bit integer value, found {:?}",
7098                function_name,
7099                label,
7100                Value::BigInt(value)
7101            ))
7102        }),
7103        Value::Int128(value) => i32::try_from(value).map_err(|_| {
7104            HematiteError::ParseError(format!(
7105                "{} {} requires a 32-bit integer value, found {:?}",
7106                function_name,
7107                label,
7108                Value::Int128(value)
7109            ))
7110        }),
7111        Value::UInteger(value) => i32::try_from(value).map_err(|_| {
7112            HematiteError::ParseError(format!(
7113                "{} {} requires a 32-bit integer value, found {:?}",
7114                function_name,
7115                label,
7116                Value::UInteger(value)
7117            ))
7118        }),
7119        Value::UBigInt(value) => i32::try_from(value).map_err(|_| {
7120            HematiteError::ParseError(format!(
7121                "{} {} requires a 32-bit integer value, found {:?}",
7122                function_name,
7123                label,
7124                Value::UBigInt(value)
7125            ))
7126        }),
7127        Value::UInt128(value) => i32::try_from(value).map_err(|_| {
7128            HematiteError::ParseError(format!(
7129                "{} {} requires a 32-bit integer value, found {:?}",
7130                function_name,
7131                label,
7132                Value::UInt128(value)
7133            ))
7134        }),
7135        Value::Float32(value) => Ok(value as i32),
7136        Value::Float(value) => Ok(value as i32),
7137        value => Err(HematiteError::ParseError(format!(
7138            "{} {} requires an integer value, found {:?}",
7139            function_name, label, value
7140        ))),
7141    }
7142}
7143
7144fn extract_date_component(function_name: &str, value: Value) -> Result<DateValue> {
7145    match value {
7146        Value::Date(value) => Ok(value),
7147        Value::DateTime(value) => Ok(value.components().0),
7148        Value::Text(value) | Value::Enum(value) => DateValue::parse(&value)
7149            .or_else(|_| DateTimeValue::parse(&value).map(|value| value.components().0))
7150            .map_err(|_| {
7151                HematiteError::ParseError(format!(
7152                    "{} requires a DATE-like value, found '{}'",
7153                    function_name, value
7154                ))
7155            }),
7156        value => Err(HematiteError::ParseError(format!(
7157            "{} requires a DATE-like value, found {:?}",
7158            function_name, value
7159        ))),
7160    }
7161}
7162
7163fn extract_time_component(function_name: &str, value: Value) -> Result<TimeValue> {
7164    match value {
7165        Value::Time(value) => Ok(value),
7166        Value::TimeWithTimeZone(value) => Ok(value.time()),
7167        Value::DateTime(value) => Ok(value.components().1),
7168        Value::Text(value) | Value::Enum(value) => TimeValue::parse(&value)
7169            .or_else(|_| TimeWithTimeZoneValue::parse(&value).map(|value| value.time()))
7170            .or_else(|_| DateTimeValue::parse(&value).map(|value| value.components().1))
7171            .map_err(|_| {
7172                HematiteError::ParseError(format!(
7173                    "{} requires a TIME-like value, found '{}'",
7174                    function_name, value
7175                ))
7176            }),
7177        value => Err(HematiteError::ParseError(format!(
7178            "{} requires a TIME-like value, found {:?}",
7179            function_name, value
7180        ))),
7181    }
7182}
7183
7184fn extract_timestamp_component(function_name: &str, value: Value) -> Result<DateTimeValue> {
7185    match value {
7186        Value::DateTime(value) => Ok(value),
7187        Value::Date(value) => Ok(DateTimeValue::from_seconds_since_epoch(
7188            value.days_since_epoch() as i64 * 86_400,
7189        )),
7190        Value::Text(value) | Value::Enum(value) => DateTimeValue::parse(&value)
7191            .or_else(|_| {
7192                DateValue::parse(&value).map(|value| {
7193                    DateTimeValue::from_seconds_since_epoch(
7194                        value.days_since_epoch() as i64 * 86_400,
7195                    )
7196                })
7197            })
7198            .map_err(|_| {
7199                HematiteError::ParseError(format!(
7200                    "{} requires a DATETIME-like value, found '{}'",
7201                    function_name, value
7202                ))
7203            }),
7204        value => Err(HematiteError::ParseError(format!(
7205            "{} requires a DATETIME-like value, found {:?}",
7206            function_name, value
7207        ))),
7208    }
7209}
7210
7211fn substring_chars(text: &str, start: i32, len: Option<i32>) -> Result<Value> {
7212    let chars = text.chars().collect::<Vec<_>>();
7213    let start_index = if start > 0 {
7214        start.saturating_sub(1) as usize
7215    } else if start < 0 {
7216        chars.len().saturating_sub((-start) as usize)
7217    } else {
7218        0
7219    };
7220
7221    if let Some(len) = len {
7222        if len <= 0 {
7223            return Ok(Value::Text(String::new()));
7224        }
7225        let end = start_index.saturating_add(len as usize).min(chars.len());
7226        return Ok(Value::Text(
7227            chars[start_index.min(chars.len())..end].iter().collect(),
7228        ));
7229    }
7230
7231    Ok(Value::Text(
7232        chars[start_index.min(chars.len())..].iter().collect(),
7233    ))
7234}
7235
7236fn evaluate_extremum(function_name: &str, args: Vec<Value>, pick_greater: bool) -> Result<Value> {
7237    if args.len() < 2 {
7238        return Err(HematiteError::ParseError(format!(
7239            "{} requires at least two arguments",
7240            function_name
7241        )));
7242    }
7243
7244    if args.iter().any(Value::is_null) {
7245        return Ok(Value::Null);
7246    }
7247
7248    let mut values = args.into_iter();
7249    let mut best = values.next().expect("validated extremum arity");
7250    for value in values {
7251        let ordering = sql_partial_cmp(&value, &best, None).ok_or_else(|| {
7252            HematiteError::ParseError(format!(
7253                "{} requires mutually comparable arguments",
7254                function_name
7255            ))
7256        })?;
7257        let should_replace = if pick_greater {
7258            ordering.is_gt()
7259        } else {
7260            ordering.is_lt()
7261        };
7262        if should_replace {
7263            best = value;
7264        }
7265    }
7266
7267    Ok(best)
7268}
7269
7270fn evaluate_replace(args: Vec<Value>) -> Result<Value> {
7271    if args.len() != 3 {
7272        return Err(HematiteError::ParseError(
7273            "REPLACE requires exactly three arguments".to_string(),
7274        ));
7275    }
7276
7277    let mut args = args.into_iter();
7278    let text = args.next().expect("validated replace arity");
7279    let from = args.next().expect("validated replace arity");
7280    let to = args.next().expect("validated replace arity");
7281    if text.is_null() || from.is_null() || to.is_null() {
7282        return Ok(Value::Null);
7283    }
7284
7285    let text = expect_text_argument("REPLACE", text)?;
7286    let from = expect_text_argument("REPLACE", from)?;
7287    let to = expect_text_argument("REPLACE", to)?;
7288    Ok(Value::Text(text.replace(&from, &to)))
7289}
7290
7291fn evaluate_repeat(args: Vec<Value>) -> Result<Value> {
7292    if args.len() != 2 {
7293        return Err(HematiteError::ParseError(
7294            "REPEAT requires exactly two arguments".to_string(),
7295        ));
7296    }
7297
7298    let mut args = args.into_iter();
7299    let text = args.next().expect("validated repeat arity");
7300    let count = args.next().expect("validated repeat arity");
7301    if text.is_null() || count.is_null() {
7302        return Ok(Value::Null);
7303    }
7304
7305    let text = expect_text_argument("REPEAT", text)?;
7306    let count = expect_integer_argument("REPEAT", count, "count")?;
7307    if count <= 0 {
7308        return Ok(Value::Text(String::new()));
7309    }
7310
7311    let count = usize::try_from(count)
7312        .map_err(|_| HematiteError::ParseError("REPEAT count overflowed usize".to_string()))?;
7313    Ok(Value::Text(text.repeat(count)))
7314}
7315
7316fn evaluate_reverse(args: Vec<Value>) -> Result<Value> {
7317    if args.len() != 1 {
7318        return Err(HematiteError::ParseError(
7319            "REVERSE requires exactly one argument".to_string(),
7320        ));
7321    }
7322
7323    let value = args.into_iter().next().expect("validated reverse arity");
7324    if value.is_null() {
7325        return Ok(Value::Null);
7326    }
7327
7328    let text = expect_text_argument("REVERSE", value)?;
7329    Ok(Value::Text(text.chars().rev().collect()))
7330}
7331
7332fn evaluate_locate(args: Vec<Value>) -> Result<Value> {
7333    if args.len() != 2 && args.len() != 3 {
7334        return Err(HematiteError::ParseError(
7335            "LOCATE requires two or three arguments".to_string(),
7336        ));
7337    }
7338
7339    let mut args = args.into_iter();
7340    let needle = args.next().expect("validated locate arity");
7341    let haystack = args.next().expect("validated locate arity");
7342    let start = args.next();
7343    if needle.is_null() || haystack.is_null() || start.as_ref().is_some_and(Value::is_null) {
7344        return Ok(Value::Null);
7345    }
7346
7347    let needle = expect_text_argument("LOCATE", needle)?;
7348    let haystack = expect_text_argument("LOCATE", haystack)?;
7349    let start = start
7350        .map(|value| expect_integer_argument("LOCATE", value, "start position"))
7351        .transpose()?
7352        .unwrap_or(1);
7353
7354    let haystack_chars = haystack.chars().collect::<Vec<_>>();
7355    let needle_chars = needle.chars().collect::<Vec<_>>();
7356    let start_index = start.saturating_sub(1).max(0) as usize;
7357    if needle_chars.is_empty() {
7358        let position = start_index.min(haystack_chars.len()) + 1;
7359        return Ok(Value::Integer(position as i32));
7360    }
7361    if start_index >= haystack_chars.len() || needle_chars.len() > haystack_chars.len() {
7362        return Ok(Value::Integer(0));
7363    }
7364
7365    for index in start_index..=haystack_chars.len() - needle_chars.len() {
7366        if haystack_chars[index..index + needle_chars.len()] == needle_chars[..] {
7367            return Ok(Value::Integer((index + 1) as i32));
7368        }
7369    }
7370
7371    Ok(Value::Integer(0))
7372}
7373
7374fn evaluate_hex(args: Vec<Value>) -> Result<Value> {
7375    expect_unary_length_function("HEX", args, |value| {
7376        let bytes = match value {
7377            Value::Blob(bytes) => bytes,
7378            Value::Text(text) => text.into_bytes(),
7379            Value::Enum(text) => text.into_bytes(),
7380            value => {
7381                return Err(HematiteError::ParseError(format!(
7382                    "HEX requires a text or blob value, found {:?}",
7383                    value
7384                )))
7385            }
7386        };
7387        Ok(Value::Text(
7388            bytes.iter().map(|byte| format!("{byte:02X}")).collect(),
7389        ))
7390    })
7391}
7392
7393fn evaluate_unhex(args: Vec<Value>) -> Result<Value> {
7394    expect_unary_text_function("UNHEX", args, |text| {
7395        if text.len() % 2 != 0 {
7396            return Err(HematiteError::ParseError(
7397                "UNHEX requires an even number of hexadecimal digits".to_string(),
7398            ));
7399        }
7400
7401        let mut bytes = Vec::with_capacity(text.len() / 2);
7402        for index in (0..text.len()).step_by(2) {
7403            let byte = u8::from_str_radix(&text[index..index + 2], 16).map_err(|_| {
7404                HematiteError::ParseError("UNHEX requires only hexadecimal digits".to_string())
7405            })?;
7406            bytes.push(byte);
7407        }
7408        Ok(Value::Blob(bytes))
7409    })
7410}
7411
7412fn evaluate_ceil(args: Vec<Value>) -> Result<Value> {
7413    expect_unary_numeric_function("CEIL", args, |value| match value {
7414        Value::Integer(value) => Ok(Value::Integer(value)),
7415        Value::BigInt(value) => Ok(Value::BigInt(value)),
7416        Value::Int128(value) => Ok(Value::Int128(value)),
7417        Value::UInteger(value) => Ok(Value::UInteger(value)),
7418        Value::UBigInt(value) => Ok(Value::UBigInt(value)),
7419        Value::UInt128(value) => Ok(Value::UInt128(value)),
7420        Value::Float32(value) => Ok(Value::Float32(value.ceil())),
7421        Value::Float(value) => Ok(Value::Float(value.ceil())),
7422        _ => unreachable!("validated numeric input"),
7423    })
7424}
7425
7426fn evaluate_floor(args: Vec<Value>) -> Result<Value> {
7427    expect_unary_numeric_function("FLOOR", args, |value| match value {
7428        Value::Integer(value) => Ok(Value::Integer(value)),
7429        Value::BigInt(value) => Ok(Value::BigInt(value)),
7430        Value::Int128(value) => Ok(Value::Int128(value)),
7431        Value::UInteger(value) => Ok(Value::UInteger(value)),
7432        Value::UBigInt(value) => Ok(Value::UBigInt(value)),
7433        Value::UInt128(value) => Ok(Value::UInt128(value)),
7434        Value::Float32(value) => Ok(Value::Float32(value.floor())),
7435        Value::Float(value) => Ok(Value::Float(value.floor())),
7436        _ => unreachable!("validated numeric input"),
7437    })
7438}
7439
7440fn evaluate_power(args: Vec<Value>) -> Result<Value> {
7441    if args.len() != 2 {
7442        return Err(HematiteError::ParseError(
7443            "POWER requires exactly two arguments".to_string(),
7444        ));
7445    }
7446
7447    let mut args = args.into_iter();
7448    let base = args.next().expect("validated power arity");
7449    let exponent = args.next().expect("validated power arity");
7450    if base.is_null() || exponent.is_null() {
7451        return Ok(Value::Null);
7452    }
7453
7454    let base = expect_numeric_argument("POWER", base)?;
7455    let exponent = expect_numeric_argument("POWER", exponent)?;
7456    let value = base.powf(exponent);
7457    if !value.is_finite() {
7458        return Err(HematiteError::ParseError(
7459            "POWER produced a non-finite result".to_string(),
7460        ));
7461    }
7462
7463    Ok(Value::Float(value))
7464}
7465
7466fn round_integer(value: i32, precision: i32) -> Result<Value> {
7467    if precision >= 0 {
7468        return Ok(Value::Integer(value));
7469    }
7470
7471    let rounded = round_float(value as f64, precision);
7472    let rounded = i32::try_from(rounded as i64)
7473        .map_err(|_| HematiteError::ParseError("ROUND overflowed INT".to_string()))?;
7474    Ok(Value::Integer(rounded))
7475}
7476
7477fn round_bigint(value: i64, precision: i32) -> Result<Value> {
7478    if precision >= 0 {
7479        return Ok(Value::BigInt(value));
7480    }
7481
7482    let rounded = round_float(value as f64, precision);
7483    let rounded = i64::try_from(rounded as i128)
7484        .map_err(|_| HematiteError::ParseError("ROUND overflowed INT64".to_string()))?;
7485    Ok(Value::BigInt(rounded))
7486}
7487
7488fn round_int128(value: i128, precision: i32) -> Result<Value> {
7489    if precision >= 0 {
7490        return Ok(Value::Int128(value));
7491    }
7492
7493    let rounded = round_float(value as f64, precision);
7494    if !rounded.is_finite() || rounded < i128::MIN as f64 || rounded > i128::MAX as f64 {
7495        return Err(HematiteError::ParseError(
7496            "ROUND overflowed INT128".to_string(),
7497        ));
7498    }
7499    Ok(Value::Int128(rounded as i128))
7500}
7501
7502fn round_uinteger(value: u32, precision: i32) -> Result<Value> {
7503    if precision >= 0 {
7504        return Ok(Value::UInteger(value));
7505    }
7506
7507    let rounded = round_float(value as f64, precision);
7508    if rounded < 0.0 || rounded > u32::MAX as f64 {
7509        return Err(HematiteError::ParseError(
7510            "ROUND overflowed UINT".to_string(),
7511        ));
7512    }
7513    Ok(Value::UInteger(rounded as u32))
7514}
7515
7516fn round_ubigint(value: u64, precision: i32) -> Result<Value> {
7517    if precision >= 0 {
7518        return Ok(Value::UBigInt(value));
7519    }
7520
7521    let rounded = round_float(value as f64, precision);
7522    if rounded < 0.0 || rounded > u64::MAX as f64 {
7523        return Err(HematiteError::ParseError(
7524            "ROUND overflowed UINT64".to_string(),
7525        ));
7526    }
7527    Ok(Value::UBigInt(rounded as u64))
7528}
7529
7530fn round_uint128(value: u128, precision: i32) -> Result<Value> {
7531    if precision >= 0 {
7532        return Ok(Value::UInt128(value));
7533    }
7534
7535    let rounded = round_float(value as f64, precision);
7536    if !rounded.is_finite() || rounded < 0.0 || rounded > u128::MAX as f64 {
7537        return Err(HematiteError::ParseError(
7538            "ROUND overflowed UINT128".to_string(),
7539        ));
7540    }
7541    Ok(Value::UInt128(rounded as u128))
7542}
7543
7544fn round_float(value: f64, precision: i32) -> f64 {
7545    if precision >= 0 {
7546        let factor = 10f64.powi(precision);
7547        (value * factor).round() / factor
7548    } else {
7549        let factor = 10f64.powi(-precision);
7550        (value / factor).round() * factor
7551    }
7552}
7553
7554fn apply_text_comparison_context(
7555    value: &str,
7556    text_context: Option<TextComparisonContext>,
7557) -> String {
7558    let mut normalized = if text_context.is_some_and(|context| context.trim_trailing_spaces) {
7559        value.trim_end_matches(' ').to_string()
7560    } else {
7561        value.to_string()
7562    };
7563
7564    if text_context.is_some_and(|context| context.case_insensitive) {
7565        normalized = normalized.to_lowercase();
7566    }
7567
7568    normalized
7569}
7570
7571fn apply_blob_comparison_context(
7572    value: &[u8],
7573    text_context: Option<TextComparisonContext>,
7574) -> Vec<u8> {
7575    if text_context.is_some_and(|context| context.trim_trailing_zero_bytes) {
7576        value
7577            .iter()
7578            .copied()
7579            .rev()
7580            .skip_while(|byte| *byte == 0)
7581            .collect::<Vec<_>>()
7582            .into_iter()
7583            .rev()
7584            .collect()
7585    } else {
7586        value.to_vec()
7587    }
7588}
7589
7590fn like_matches_with_context(
7591    pattern: &str,
7592    text: &str,
7593    text_context: Option<TextComparisonContext>,
7594) -> bool {
7595    let pattern = apply_text_comparison_context(pattern, text_context);
7596    let text = apply_text_comparison_context(text, text_context);
7597    SelectExecutor::like_matches(&pattern, &text)
7598}
7599
7600fn sql_values_equal(
7601    left: &Value,
7602    right: &Value,
7603    text_context: Option<TextComparisonContext>,
7604) -> bool {
7605    if let Some(ordering) = sql_decimal_cmp(left, right) {
7606        return ordering == Ordering::Equal;
7607    }
7608    if let Some((left, right)) = sql_numeric_pair(left, right) {
7609        return left == right;
7610    }
7611    if let (Value::Text(left), Value::Text(right)) = (left, right) {
7612        return apply_text_comparison_context(left, text_context)
7613            == apply_text_comparison_context(right, text_context);
7614    }
7615    if let (Value::Blob(left), Value::Blob(right)) = (left, right) {
7616        return apply_blob_comparison_context(left, text_context)
7617            == apply_blob_comparison_context(right, text_context);
7618    }
7619
7620    left == right
7621}
7622
7623fn sql_partial_cmp(
7624    left: &Value,
7625    right: &Value,
7626    text_context: Option<TextComparisonContext>,
7627) -> Option<Ordering> {
7628    if let Some(ordering) = sql_decimal_cmp(left, right) {
7629        return Some(ordering);
7630    }
7631    if let Some((left, right)) = sql_numeric_pair(left, right) {
7632        return left.partial_cmp(&right);
7633    }
7634    if let (Value::Text(left), Value::Text(right)) = (left, right) {
7635        return Some(
7636            apply_text_comparison_context(left, text_context)
7637                .cmp(&apply_text_comparison_context(right, text_context)),
7638        );
7639    }
7640    if let (Value::Blob(left), Value::Blob(right)) = (left, right) {
7641        return Some(
7642            apply_blob_comparison_context(left, text_context)
7643                .cmp(&apply_blob_comparison_context(right, text_context)),
7644        );
7645    }
7646
7647    left.partial_cmp(right)
7648}
7649
7650fn decode_signed_blob(bytes: &[u8], width: usize, target: &str) -> Result<i128> {
7651    if bytes.len() > width {
7652        return Err(HematiteError::ParseError(format!(
7653            "Cannot CAST blob of {} bytes AS {}",
7654            bytes.len(),
7655            target
7656        )));
7657    }
7658
7659    let fill = bytes
7660        .last()
7661        .is_some_and(|byte| (byte & 0x80) != 0)
7662        .then_some(0xFF)
7663        .unwrap_or(0);
7664    let mut extended = [fill; 16];
7665    extended[..bytes.len()].copy_from_slice(bytes);
7666    Ok(i128::from_le_bytes(extended))
7667}
7668
7669fn decode_unsigned_blob(bytes: &[u8], width: usize, target: &str) -> Result<u128> {
7670    if bytes.len() > width {
7671        return Err(HematiteError::ParseError(format!(
7672            "Cannot CAST blob of {} bytes AS {}",
7673            bytes.len(),
7674            target
7675        )));
7676    }
7677
7678    let mut extended = [0u8; 16];
7679    extended[..bytes.len()].copy_from_slice(bytes);
7680    Ok(u128::from_le_bytes(extended))
7681}
7682
7683fn sql_numeric_pair(left: &Value, right: &Value) -> Option<(f64, f64)> {
7684    Some((numeric_value_as_f64(left)?, numeric_value_as_f64(right)?))
7685}
7686
7687fn sql_decimal_cmp(left: &Value, right: &Value) -> Option<Ordering> {
7688    let left = match left {
7689        Value::Decimal(value) => value.clone(),
7690        Value::Integer(value) => DecimalValue::from_i32(*value),
7691        Value::BigInt(value) => DecimalValue::from_i64(*value),
7692        Value::Int128(value) => DecimalValue::from_i128(*value),
7693        Value::UInteger(value) => DecimalValue::from_u32(*value),
7694        Value::UBigInt(value) => DecimalValue::from_u64(*value),
7695        Value::UInt128(value) => DecimalValue::from_u128(*value),
7696        Value::Float32(value) => DecimalValue::from_f64(*value as f64).ok()?,
7697        Value::Float(value) => DecimalValue::from_f64(*value).ok()?,
7698        _ => return None,
7699    };
7700    let right = match right {
7701        Value::Decimal(value) => value.clone(),
7702        Value::Integer(value) => DecimalValue::from_i32(*value),
7703        Value::BigInt(value) => DecimalValue::from_i64(*value),
7704        Value::Int128(value) => DecimalValue::from_i128(*value),
7705        Value::UInteger(value) => DecimalValue::from_u32(*value),
7706        Value::UBigInt(value) => DecimalValue::from_u64(*value),
7707        Value::UInt128(value) => DecimalValue::from_u128(*value),
7708        Value::Float32(value) => DecimalValue::from_f64(*value as f64).ok()?,
7709        Value::Float(value) => DecimalValue::from_f64(*value).ok()?,
7710        _ => return None,
7711    };
7712    Some(left.cmp(&right))
7713}
7714
7715fn validate_date_string(input: &str) -> Result<DateValue> {
7716    DateValue::parse(input)
7717}
7718
7719fn validate_time_string(input: &str) -> Result<TimeValue> {
7720    TimeValue::parse(input)
7721}
7722
7723fn validate_datetime_string(input: &str) -> Result<DateTimeValue> {
7724    DateTimeValue::parse(input)
7725}
7726
7727fn validate_time_with_time_zone_string(input: &str) -> Result<TimeWithTimeZoneValue> {
7728    TimeWithTimeZoneValue::parse(input)
7729}
7730
7731fn compare_condition_values(
7732    left: &Value,
7733    operator: &ComparisonOperator,
7734    right: &Value,
7735    text_context: Option<TextComparisonContext>,
7736) -> Option<bool> {
7737    if left.is_null() || right.is_null() {
7738        return None;
7739    }
7740
7741    match operator {
7742        ComparisonOperator::Equal => Some(sql_values_equal(left, right, text_context)),
7743        ComparisonOperator::NotEqual => Some(!sql_values_equal(left, right, text_context)),
7744        ComparisonOperator::LessThan => {
7745            sql_partial_cmp(left, right, text_context).map(|ord| ord.is_lt())
7746        }
7747        ComparisonOperator::LessThanOrEqual => {
7748            sql_partial_cmp(left, right, text_context).map(|ord| ord.is_le())
7749        }
7750        ComparisonOperator::GreaterThan => {
7751            sql_partial_cmp(left, right, text_context).map(|ord| ord.is_gt())
7752        }
7753        ComparisonOperator::GreaterThanOrEqual => {
7754            sql_partial_cmp(left, right, text_context).map(|ord| ord.is_ge())
7755        }
7756    }
7757}
7758
7759fn logical_and_values(left: Option<bool>, right: Option<bool>) -> Option<bool> {
7760    match (left, right) {
7761        (Some(false), _) | (_, Some(false)) => Some(false),
7762        (Some(true), Some(true)) => Some(true),
7763        _ => None,
7764    }
7765}
7766
7767fn logical_or_values(left: Option<bool>, right: Option<bool>) -> Option<bool> {
7768    match (left, right) {
7769        (Some(true), _) | (_, Some(true)) => Some(true),
7770        (Some(false), Some(false)) => Some(false),
7771        _ => None,
7772    }
7773}
7774
7775fn evaluate_in_candidates(
7776    probe: Value,
7777    candidates: impl IntoIterator<Item = Value>,
7778    is_not: bool,
7779    text_context: Option<TextComparisonContext>,
7780) -> Option<bool> {
7781    if probe.is_null() {
7782        return None;
7783    }
7784
7785    let mut matched = false;
7786    let mut saw_null = false;
7787    for candidate in candidates {
7788        if candidate.is_null() {
7789            saw_null = true;
7790            continue;
7791        }
7792        if sql_values_equal(&candidate, &probe, text_context) {
7793            matched = true;
7794            break;
7795        }
7796    }
7797
7798    if matched {
7799        Some(!is_not)
7800    } else if saw_null {
7801        None
7802    } else {
7803        Some(is_not)
7804    }
7805}
7806
7807fn evaluate_between_values(
7808    value: Value,
7809    lower: Value,
7810    upper: Value,
7811    is_not: bool,
7812    text_context: Option<TextComparisonContext>,
7813) -> Option<bool> {
7814    if value.is_null() || lower.is_null() || upper.is_null() {
7815        return None;
7816    }
7817
7818    let lower_ok = sql_partial_cmp(&value, &lower, text_context).map(|ordering| !ordering.is_lt());
7819    let upper_ok = sql_partial_cmp(&value, &upper, text_context).map(|ordering| !ordering.is_gt());
7820
7821    match (lower_ok, upper_ok) {
7822        (Some(true), Some(true)) => Some(!is_not),
7823        (Some(_), Some(_)) => Some(is_not),
7824        _ => None,
7825    }
7826}
7827
7828fn evaluate_like_values(
7829    value: Value,
7830    pattern: Value,
7831    is_not: bool,
7832    text_context: Option<TextComparisonContext>,
7833) -> Option<bool> {
7834    match (value, pattern) {
7835        (Value::Text(text), Value::Text(pattern)) => {
7836            let matched = like_matches_with_context(&pattern, &text, text_context);
7837            Some(if is_not { !matched } else { matched })
7838        }
7839        (left, right) if left.is_null() || right.is_null() => None,
7840        _ => None,
7841    }
7842}
7843
7844fn nullable_bool_to_value(value: Option<bool>) -> Value {
7845    match value {
7846        Some(value) => Value::Boolean(value),
7847        None => Value::Null,
7848    }
7849}
7850
7851fn coerce_value_to_nullable_bool(value: Value, context: &str) -> Result<Option<bool>> {
7852    match value {
7853        Value::Boolean(value) => Ok(Some(value)),
7854        Value::Null => Ok(None),
7855        value => Err(HematiteError::ParseError(format!(
7856            "{} requires a boolean value, found {:?}",
7857            context, value
7858        ))),
7859    }
7860}
7861
7862fn unique_index_parse_error(index_name: &str, table_name: &str) -> HematiteError {
7863    HematiteError::ParseError(format!(
7864        "Duplicate value for UNIQUE index '{}' on table '{}'",
7865        index_name, table_name
7866    ))
7867}
7868
7869fn convert_foreign_key_action(action: ForeignKeyAction) -> CatalogForeignKeyAction {
7870    match action {
7871        ForeignKeyAction::Restrict => CatalogForeignKeyAction::Restrict,
7872        ForeignKeyAction::Cascade => CatalogForeignKeyAction::Cascade,
7873        ForeignKeyAction::SetNull => CatalogForeignKeyAction::SetNull,
7874    }
7875}
7876
7877fn auto_unique_index_name(table_name: &str, column_name: &str, position: usize) -> String {
7878    format!(
7879        "uq_{}_{}_{}",
7880        sanitize_identifier(table_name),
7881        sanitize_identifier(column_name),
7882        position
7883    )
7884}
7885
7886fn unique_constraint_index_name(
7887    table_name: &str,
7888    unique: &UniqueConstraintDefinition,
7889    position: usize,
7890) -> String {
7891    if let Some(name) = &unique.name {
7892        return sanitize_identifier(name);
7893    }
7894
7895    let column_suffix = unique
7896        .columns
7897        .iter()
7898        .map(|column| sanitize_identifier(column))
7899        .collect::<Vec<_>>()
7900        .join("_");
7901    format!(
7902        "uq_{}_{}_{}",
7903        sanitize_identifier(table_name),
7904        column_suffix,
7905        position
7906    )
7907}
7908
7909fn sanitize_identifier(identifier: &str) -> String {
7910    let mut sanitized = String::with_capacity(identifier.len());
7911    for ch in identifier.chars() {
7912        if ch.is_ascii_alphanumeric() || ch == '_' {
7913            sanitized.push(ch);
7914        } else {
7915            sanitized.push('_');
7916        }
7917    }
7918    sanitized
7919}
7920
7921#[derive(Debug, Clone)]
7922pub struct DropIndexExecutor {
7923    pub statement: DropIndexStatement,
7924}
7925
7926impl DropIndexExecutor {
7927    pub fn new(statement: DropIndexStatement) -> Self {
7928        Self { statement }
7929    }
7930}
7931
7932impl QueryExecutor for DropIndexExecutor {
7933    fn execute(&mut self, ctx: &mut ExecutionContext<'_>) -> Result<QueryResult> {
7934        validate_statement(&Statement::DropIndex(self.statement.clone()), &ctx.catalog)?;
7935        if self.statement.if_exists {
7936            let Some(table) = ctx.catalog.get_table_by_name(&self.statement.table) else {
7937                return Ok(mutation_result(0));
7938            };
7939            if table
7940                .get_secondary_index(&self.statement.index_name)
7941                .is_none()
7942            {
7943                return Ok(mutation_result(0));
7944            }
7945        }
7946
7947        let table = ctx
7948            .catalog
7949            .get_table_by_name(&self.statement.table)
7950            .ok_or_else(|| {
7951                HematiteError::ParseError(format!("Table '{}' not found", self.statement.table))
7952            })?
7953            .clone();
7954        let index = table
7955            .get_secondary_index(&self.statement.index_name)
7956            .ok_or_else(|| {
7957                HematiteError::ParseError(format!(
7958                    "Index '{}' does not exist on table '{}'",
7959                    self.statement.index_name, self.statement.table
7960                ))
7961            })?
7962            .clone();
7963
7964        ctx.engine.delete_tree(index.root_page_id)?;
7965        ctx.catalog
7966            .drop_secondary_index(table.id, &self.statement.index_name)?;
7967
7968        Ok(QueryResult {
7969            affected_rows: 0,
7970            columns: Vec::new(),
7971            rows: Vec::new(),
7972        })
7973    }
7974}