1use crate::catalog::column::{collation_is_nocase, pad_text_to_char_length};
23use crate::catalog::table::{
24 CheckConstraint, ForeignKeyAction as CatalogForeignKeyAction, ForeignKeyConstraint,
25};
26use crate::catalog::StoredRow;
27use crate::catalog::{
28 Column, DataType, DateTimeValue, DateValue, DecimalValue, IntervalDaySecondValue,
29 IntervalYearMonthValue, Table, TimeValue, TimeWithTimeZoneValue, Value,
30};
31use crate::error::{HematiteError, Result};
32use crate::parser::ast::*;
33use crate::query::lowering::{lower_literal_value, lower_type_name, raise_literal_value};
34use crate::query::plan::{ExecutionProgram, QueryPlan, SelectAccessPath};
35use crate::query::predicate::extract_literal_equalities;
36pub use crate::query::runtime::{ExecutionContext, MutationEvent, QueryExecutor, QueryResult};
37use crate::query::validation::{
38 projected_column_names, validate_column_reference, validate_statement,
39};
40use crate::query::QueryPlanner;
41use std::cmp::Ordering;
42use std::collections::HashMap;
43
44impl QueryPlan {
45 pub fn into_executor(self) -> Box<dyn QueryExecutor> {
46 build_executor(self.program)
47 }
48}
49
50pub fn build_executor(program: ExecutionProgram) -> Box<dyn QueryExecutor> {
51 match program {
52 ExecutionProgram::Select {
53 statement,
54 access_path,
55 } => Box::new(SelectExecutor::new(statement, access_path)),
56 ExecutionProgram::Insert { statement } => Box::new(InsertExecutor::new(statement)),
57 ExecutionProgram::Update {
58 statement,
59 access_path,
60 } => Box::new(UpdateExecutor::new(statement, access_path)),
61 ExecutionProgram::Delete {
62 statement,
63 access_path,
64 } => Box::new(DeleteExecutor::new(statement, access_path)),
65 ExecutionProgram::Create { statement } => Box::new(CreateExecutor::new(statement)),
66 ExecutionProgram::CreateIndex { statement } => {
67 Box::new(CreateIndexExecutor::new(statement))
68 }
69 ExecutionProgram::Alter { statement } => Box::new(AlterExecutor::new(statement)),
70 ExecutionProgram::Drop { statement } => Box::new(DropExecutor::new(statement)),
71 ExecutionProgram::DropIndex { statement } => Box::new(DropIndexExecutor::new(statement)),
72 }
73}
74
75#[derive(Debug, Clone)]
76pub struct SelectExecutor {
77 pub statement: SelectStatement,
78 pub access_path: SelectAccessPath,
79 outer_scopes: Vec<CorrelatedScope>,
80 materialized_ctes: HashMap<String, QueryResult>,
81}
82
83#[derive(Debug, Clone)]
84struct ResolvedSource {
85 name: String,
86 columns: Vec<String>,
87 column_types: Vec<DataType>,
88 column_collations: Vec<Option<String>>,
89 alias: Option<String>,
90 offset: usize,
91}
92
93impl ResolvedSource {
94 fn width(&self) -> usize {
95 self.columns.len()
96 }
97}
98
99#[derive(Debug, Clone, Copy)]
100struct TextComparisonContext {
101 trim_trailing_spaces: bool,
102 trim_trailing_zero_bytes: bool,
103 case_insensitive: bool,
104}
105
106#[derive(Debug, Clone)]
107enum NamedSourceKind {
108 BaseTable,
109 MaterializedCte(Vec<Vec<Value>>),
110 Cte(CommonTableExpression),
111}
112
113#[derive(Debug, Clone)]
114struct NamedSource {
115 source: ResolvedSource,
116 kind: NamedSourceKind,
117}
118
119#[derive(Debug, Clone)]
120struct GroupedRow {
121 projected: Vec<Value>,
122 source_rows: Vec<Vec<Value>>,
123}
124
125#[derive(Debug, Clone)]
126struct CorrelatedScope {
127 sources: Vec<ResolvedSource>,
128 row: Vec<Value>,
129}
130
131type SubqueryCache = HashMap<usize, QueryResult>;
132
133fn evaluate_case_expression<FBool, FExpr>(
134 branches: &[CaseWhenClause],
135 else_expr: Option<&Expression>,
136 mut eval_bool: FBool,
137 mut eval_expr: FExpr,
138) -> Result<Value>
139where
140 FBool: FnMut(&Expression) -> Result<Option<bool>>,
141 FExpr: FnMut(&Expression) -> Result<Value>,
142{
143 for branch in branches {
144 match eval_bool(&branch.condition)? {
145 Some(true) => return eval_expr(&branch.result),
146 Some(false) | None => {}
147 }
148 }
149
150 match else_expr {
151 Some(else_expr) => eval_expr(else_expr),
152 None => Ok(Value::Null),
153 }
154}
155
156fn evaluate_expression_list<FExpr>(
157 expressions: &[Expression],
158 mut eval_expr: FExpr,
159) -> Result<Vec<Value>>
160where
161 FExpr: FnMut(&Expression) -> Result<Value>,
162{
163 let mut values = Vec::with_capacity(expressions.len());
164 for expr in expressions {
165 values.push(eval_expr(expr)?);
166 }
167 Ok(values)
168}
169
170fn evaluate_scalar_function_call<FExpr>(
171 function: ScalarFunction,
172 args: &[Expression],
173 eval_expr: FExpr,
174) -> Result<Value>
175where
176 FExpr: FnMut(&Expression) -> Result<Value>,
177{
178 evaluate_scalar_function(function, evaluate_expression_list(args, eval_expr)?)
179}
180
181fn evaluate_in_list_predicate<FExpr>(
182 probe_expr: &Expression,
183 candidates: &[Expression],
184 is_not: bool,
185 text_context: Option<TextComparisonContext>,
186 mut eval_expr: FExpr,
187) -> Result<Option<bool>>
188where
189 FExpr: FnMut(&Expression) -> Result<Value>,
190{
191 let probe = eval_expr(probe_expr)?;
192 let candidates = evaluate_expression_list(candidates, eval_expr)?;
193 Ok(evaluate_in_candidates(
194 probe,
195 candidates,
196 is_not,
197 text_context,
198 ))
199}
200
201fn evaluate_between_predicate<FExpr>(
202 expr: &Expression,
203 lower: &Expression,
204 upper: &Expression,
205 is_not: bool,
206 text_context: Option<TextComparisonContext>,
207 mut eval_expr: FExpr,
208) -> Result<Option<bool>>
209where
210 FExpr: FnMut(&Expression) -> Result<Value>,
211{
212 Ok(evaluate_between_values(
213 eval_expr(expr)?,
214 eval_expr(lower)?,
215 eval_expr(upper)?,
216 is_not,
217 text_context,
218 ))
219}
220
221fn evaluate_like_predicate<FExpr>(
222 expr: &Expression,
223 pattern: &Expression,
224 is_not: bool,
225 text_context: Option<TextComparisonContext>,
226 mut eval_expr: FExpr,
227) -> Result<Option<bool>>
228where
229 FExpr: FnMut(&Expression) -> Result<Value>,
230{
231 Ok(evaluate_like_values(
232 eval_expr(expr)?,
233 eval_expr(pattern)?,
234 is_not,
235 text_context,
236 ))
237}
238
239fn conditions_match_with<FEval>(conditions: &[Condition], mut eval_condition: FEval) -> Result<bool>
240where
241 FEval: FnMut(&Condition) -> Result<Option<bool>>,
242{
243 for condition in conditions {
244 if eval_condition(condition)? != Some(true) {
245 return Ok(false);
246 }
247 }
248 Ok(true)
249}
250
251impl SelectExecutor {
252 pub fn new(statement: SelectStatement, access_path: SelectAccessPath) -> Self {
253 Self {
254 statement,
255 access_path,
256 outer_scopes: Vec::new(),
257 materialized_ctes: HashMap::new(),
258 }
259 }
260
261 fn with_outer_scope(mut self, sources: &[ResolvedSource], row: &[Value]) -> Self {
262 self.outer_scopes.push(CorrelatedScope {
263 sources: sources.to_vec(),
264 row: row.to_vec(),
265 });
266 self
267 }
268
269 fn cte_key(name: &str) -> String {
270 name.to_ascii_lowercase()
271 }
272
273 fn resolve_sources(&self, ctx: &ExecutionContext) -> Result<Vec<ResolvedSource>> {
274 let bindings = SelectStatement::collect_table_bindings(&self.statement.from);
275 let mut sources = Vec::with_capacity(bindings.len());
276 let mut offset = 0usize;
277
278 for binding in bindings {
279 sources.push(self.resolve_named_source(
280 ctx,
281 &binding.table_name,
282 binding.alias,
283 offset,
284 )?);
285 offset += sources.last().map(ResolvedSource::width).unwrap_or(0);
286 }
287
288 Ok(sources)
289 }
290
291 fn query_output_columns(
292 &self,
293 query: &SelectStatement,
294 ctx: &ExecutionContext,
295 ) -> Result<Vec<String>> {
296 projected_column_names(query, &ctx.catalog)
297 }
298
299 fn resolve_column_index(
300 &self,
301 sources: &[ResolvedSource],
302 column_reference: &str,
303 ) -> Result<Option<usize>> {
304 let (qualifier, column_name) = SelectStatement::split_column_reference(column_reference);
305 let mut matches = Vec::new();
306
307 for source in sources {
308 if let Some(qualifier) = qualifier {
309 if qualifier != source.name
310 && source
311 .alias
312 .as_deref()
313 .is_none_or(|alias| alias != qualifier)
314 {
315 continue;
316 }
317 }
318
319 if let Some(index) = source
320 .columns
321 .iter()
322 .position(|column| column == column_name)
323 {
324 matches.push(source.offset + index);
325 }
326 }
327
328 match matches.len() {
329 0 => Ok(None),
330 1 => Ok(matches.into_iter().next()),
331 _ => Err(HematiteError::ParseError(format!(
332 "Column reference '{}' is ambiguous",
333 column_reference
334 ))),
335 }
336 }
337
338 fn text_comparison_context_for_expression(
339 &self,
340 sources: &[ResolvedSource],
341 expr: &Expression,
342 ) -> Result<Option<TextComparisonContext>> {
343 let Expression::Column(column_reference) = expr else {
344 return Ok(None);
345 };
346 let Some(flat_index) = self.resolve_column_index(sources, column_reference)? else {
347 return Ok(None);
348 };
349 Ok(self
350 .source_column_metadata(sources, flat_index)
351 .map(|(data_type, collation)| TextComparisonContext {
352 trim_trailing_spaces: matches!(data_type, DataType::Char(_)),
353 trim_trailing_zero_bytes: matches!(data_type, DataType::Binary(_)),
354 case_insensitive: collation_is_nocase(collation.as_deref()),
355 }))
356 }
357
358 fn merged_text_comparison_context(
359 &self,
360 sources: &[ResolvedSource],
361 left: &Expression,
362 right: &Expression,
363 ) -> Result<Option<TextComparisonContext>> {
364 let left_context = self.text_comparison_context_for_expression(sources, left)?;
365 let right_context = self.text_comparison_context_for_expression(sources, right)?;
366 Ok(match (left_context, right_context) {
367 (Some(left), Some(right)) => Some(TextComparisonContext {
368 trim_trailing_spaces: left.trim_trailing_spaces || right.trim_trailing_spaces,
369 trim_trailing_zero_bytes: left.trim_trailing_zero_bytes
370 || right.trim_trailing_zero_bytes,
371 case_insensitive: left.case_insensitive || right.case_insensitive,
372 }),
373 (Some(context), None) | (None, Some(context)) => Some(context),
374 (None, None) => None,
375 })
376 }
377
378 fn source_column_metadata(
379 &self,
380 sources: &[ResolvedSource],
381 flat_index: usize,
382 ) -> Option<(DataType, Option<String>)> {
383 for source in sources {
384 if flat_index < source.offset {
385 continue;
386 }
387 let relative = flat_index - source.offset;
388 if relative < source.columns.len() {
389 return Some((
390 source.column_types.get(relative)?.clone(),
391 source.column_collations.get(relative)?.clone(),
392 ));
393 }
394 }
395 None
396 }
397
398 fn resolve_column_value(
399 &self,
400 sources: &[ResolvedSource],
401 column_reference: &str,
402 row: &[Value],
403 ) -> Result<Value> {
404 if let Some(index) = self.resolve_column_index(sources, column_reference)? {
405 return row.get(index).cloned().ok_or_else(|| {
406 HematiteError::ParseError(format!("Column '{}' not found", column_reference))
407 });
408 }
409
410 for scope in self.outer_scopes.iter().rev() {
411 if let Some(index) = self.resolve_column_index(&scope.sources, column_reference)? {
412 return scope.row.get(index).cloned().ok_or_else(|| {
413 HematiteError::ParseError(format!("Column '{}' not found", column_reference))
414 });
415 }
416 }
417
418 Err(HematiteError::ParseError(format!(
419 "Column '{}' not found",
420 column_reference
421 )))
422 }
423
424 fn evaluate_expression(
425 &self,
426 ctx: &mut ExecutionContext<'_>,
427 cache: &mut SubqueryCache,
428 sources: &[ResolvedSource],
429 expr: &Expression,
430 row: &[Value],
431 ) -> Result<Value> {
432 match expr {
433 Expression::Literal(value) => Ok(lower_literal_value(value)),
434 Expression::IntervalLiteral { value, qualifier } => match qualifier {
435 IntervalQualifier::YearToMonth => Ok(Value::IntervalYearMonth(
436 IntervalYearMonthValue::parse(value)?,
437 )),
438 IntervalQualifier::DayToSecond => Ok(Value::IntervalDaySecond(
439 IntervalDaySecondValue::parse(value)?,
440 )),
441 },
442 Expression::Parameter(index) => Err(HematiteError::ParseError(format!(
443 "Unbound parameter {} reached execution",
444 index + 1
445 ))),
446 Expression::Cast { expr, target_type } => cast_value_to_type(
447 self.evaluate_expression(ctx, cache, sources, expr, row)?,
448 lower_type_name(target_type.clone()),
449 ),
450 Expression::Case {
451 branches,
452 else_expr,
453 } => {
454 for branch in branches {
455 match self.evaluate_boolean_expression(
456 ctx,
457 cache,
458 sources,
459 &branch.condition,
460 row,
461 )? {
462 Some(true) => {
463 return self.evaluate_expression(
464 ctx,
465 cache,
466 sources,
467 &branch.result,
468 row,
469 )
470 }
471 Some(false) | None => {}
472 }
473 }
474
475 match else_expr {
476 Some(else_expr) => {
477 self.evaluate_expression(ctx, cache, sources, else_expr, row)
478 }
479 None => Ok(Value::Null),
480 }
481 }
482 Expression::AggregateCall { .. } => Err(HematiteError::ParseError(
483 "Aggregate expressions can only be evaluated in grouped query contexts".to_string(),
484 )),
485 Expression::ScalarFunctionCall { function, args } => {
486 evaluate_scalar_function_call(*function, args, |expr| {
487 self.evaluate_expression(ctx, cache, sources, expr, row)
488 })
489 }
490 Expression::ScalarSubquery(subquery) => {
491 self.execute_scalar_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))
492 }
493 Expression::Column(name) => self.resolve_column_value(sources, name, row),
494 Expression::UnaryMinus(expr) => {
495 negate_numeric_value(self.evaluate_expression(ctx, cache, sources, expr, row)?)
496 }
497 Expression::UnaryNot(_)
498 | Expression::Comparison { .. }
499 | Expression::InList { .. }
500 | Expression::InSubquery { .. }
501 | Expression::Between { .. }
502 | Expression::Like { .. }
503 | Expression::Exists { .. }
504 | Expression::NullCheck { .. }
505 | Expression::Logical { .. } => Ok(nullable_bool_to_value(
506 self.evaluate_boolean_expression(ctx, cache, sources, expr, row)?,
507 )),
508 Expression::Binary {
509 left,
510 operator,
511 right,
512 } => {
513 let left = self.evaluate_expression(ctx, cache, sources, left, row)?;
514 let right = self.evaluate_expression(ctx, cache, sources, right, row)?;
515 self.evaluate_arithmetic(operator, left, right)
516 }
517 }
518 }
519
520 fn evaluate_boolean_expression(
521 &self,
522 ctx: &mut ExecutionContext<'_>,
523 cache: &mut SubqueryCache,
524 sources: &[ResolvedSource],
525 expr: &Expression,
526 row: &[Value],
527 ) -> Result<Option<bool>> {
528 match expr {
529 Expression::Comparison {
530 left,
531 operator,
532 right,
533 } => {
534 let left_val = self.evaluate_expression(ctx, cache, sources, left, row)?;
535 let right_val = self.evaluate_expression(ctx, cache, sources, right, row)?;
536 let text_context = self.merged_text_comparison_context(sources, left, right)?;
537 Ok(self.compare_values(&left_val, operator, &right_val, text_context))
538 }
539 Expression::InList {
540 expr,
541 values,
542 is_not,
543 } => evaluate_in_list_predicate(expr, values, *is_not, None, |value_expr| {
544 self.evaluate_expression(ctx, cache, sources, value_expr, row)
545 }),
546 Expression::InSubquery {
547 expr,
548 subquery,
549 is_not,
550 } => {
551 let probe = self.evaluate_expression(ctx, cache, sources, expr, row)?;
552 let subquery_result =
553 self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
554 let candidates = subquery_result
555 .rows
556 .into_iter()
557 .map(|row| row.into_iter().next().unwrap_or(Value::Null))
558 .collect::<Vec<_>>();
559 Ok(self.evaluate_in_candidates(probe, candidates, *is_not, None))
560 }
561 Expression::Between {
562 expr,
563 lower,
564 upper,
565 is_not,
566 } => evaluate_between_predicate(
567 expr,
568 lower,
569 upper,
570 *is_not,
571 self.text_comparison_context_for_expression(sources, expr)?,
572 |value_expr| self.evaluate_expression(ctx, cache, sources, value_expr, row),
573 ),
574 Expression::Like {
575 expr,
576 pattern,
577 is_not,
578 } => evaluate_like_predicate(
579 expr,
580 pattern,
581 *is_not,
582 self.text_comparison_context_for_expression(sources, expr)?,
583 |value_expr| self.evaluate_expression(ctx, cache, sources, value_expr, row),
584 ),
585 Expression::Exists { subquery, is_not } => {
586 let subquery_result =
587 self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
588 let exists = !subquery_result.rows.is_empty();
589 Ok(Some(if *is_not { !exists } else { exists }))
590 }
591 Expression::NullCheck { expr, is_not } => {
592 let value = self.evaluate_expression(ctx, cache, sources, expr, row)?;
593 let is_null = value.is_null();
594 Ok(Some(if *is_not { !is_null } else { is_null }))
595 }
596 Expression::UnaryNot(expr) => Ok(self
597 .evaluate_boolean_expression(ctx, cache, sources, expr, row)?
598 .map(|value| !value)),
599 Expression::Logical {
600 left,
601 operator,
602 right,
603 } => {
604 let left_result =
605 self.evaluate_boolean_expression(ctx, cache, sources, left, row)?;
606 let right_result =
607 self.evaluate_boolean_expression(ctx, cache, sources, right, row)?;
608 match operator {
609 LogicalOperator::And => Ok(self.logical_and(left_result, right_result)),
610 LogicalOperator::Or => Ok(self.logical_or(left_result, right_result)),
611 }
612 }
613 _ => coerce_value_to_nullable_bool(
614 self.evaluate_expression(ctx, cache, sources, expr, row)?,
615 "Boolean expression",
616 ),
617 }
618 }
619
620 fn evaluate_arithmetic(
621 &self,
622 operator: &ArithmeticOperator,
623 left: Value,
624 right: Value,
625 ) -> Result<Value> {
626 evaluate_arithmetic_values(operator, left, right)
627 }
628
629 fn compare_values(
630 &self,
631 left_val: &Value,
632 operator: &ComparisonOperator,
633 right_val: &Value,
634 text_context: Option<TextComparisonContext>,
635 ) -> Option<bool> {
636 compare_condition_values(left_val, operator, right_val, text_context)
637 }
638
639 fn like_matches(pattern: &str, text: &str) -> bool {
640 fn matches(pattern: &[char], text: &[char]) -> bool {
641 if pattern.is_empty() {
642 return text.is_empty();
643 }
644
645 match pattern[0] {
646 '%' => {
647 if matches(&pattern[1..], text) {
648 return true;
649 }
650 for index in 0..text.len() {
651 if matches(&pattern[1..], &text[index + 1..]) {
652 return true;
653 }
654 }
655 false
656 }
657 '_' => !text.is_empty() && matches(&pattern[1..], &text[1..]),
658 ch => !text.is_empty() && text[0] == ch && matches(&pattern[1..], &text[1..]),
659 }
660 }
661
662 let pattern_chars: Vec<char> = pattern.chars().collect();
663 let text_chars: Vec<char> = text.chars().collect();
664 matches(&pattern_chars, &text_chars)
665 }
666
667 fn logical_and(&self, left: Option<bool>, right: Option<bool>) -> Option<bool> {
668 logical_and_values(left, right)
669 }
670
671 fn logical_or(&self, left: Option<bool>, right: Option<bool>) -> Option<bool> {
672 logical_or_values(left, right)
673 }
674
675 fn evaluate_in_candidates(
676 &self,
677 probe: Value,
678 candidates: impl IntoIterator<Item = Value>,
679 is_not: bool,
680 text_context: Option<TextComparisonContext>,
681 ) -> Option<bool> {
682 evaluate_in_candidates(probe, candidates, is_not, text_context)
683 }
684
685 fn execute_subquery(
686 &self,
687 ctx: &mut ExecutionContext<'_>,
688 subquery: &SelectStatement,
689 current_sources: Option<&[ResolvedSource]>,
690 current_row: Option<&[Value]>,
691 ) -> Result<QueryResult> {
692 let effective_subquery = if let (Some(sources), Some(row)) = (current_sources, current_row)
693 {
694 self.bind_correlated_subquery(ctx, subquery, sources, row)?
695 } else {
696 subquery.clone()
697 };
698 let planner = QueryPlanner::new(ctx.catalog.clone())
699 .with_table_row_counts(current_table_row_counts(ctx.engine));
700 let plan = planner.plan(Statement::Select(effective_subquery))?;
701 match plan.program {
702 ExecutionProgram::Select {
703 statement,
704 access_path,
705 } => {
706 let mut executor = SelectExecutor::new(statement, access_path);
707 executor.outer_scopes = self.outer_scopes.clone();
708 executor.materialized_ctes = self.materialized_ctes.clone();
709 if let (Some(sources), Some(row)) = (current_sources, current_row) {
710 executor = executor.with_outer_scope(sources, row);
711 }
712 executor.execute(ctx)
713 }
714 _ => Err(HematiteError::InternalError(
715 "Expected SELECT execution program for subquery".to_string(),
716 )),
717 }
718 }
719
720 fn bind_correlated_subquery(
721 &self,
722 ctx: &ExecutionContext<'_>,
723 subquery: &SelectStatement,
724 current_sources: &[ResolvedSource],
725 current_row: &[Value],
726 ) -> Result<SelectStatement> {
727 let mut bound = subquery.clone();
728 let mut scopes = self.outer_scopes.clone();
729 scopes.push(CorrelatedScope {
730 sources: current_sources.to_vec(),
731 row: current_row.to_vec(),
732 });
733 self.bind_select_outer_references(ctx, &mut bound, &scopes)?;
734 Ok(bound)
735 }
736
737 fn bind_select_outer_references(
738 &self,
739 ctx: &ExecutionContext<'_>,
740 statement: &mut SelectStatement,
741 scopes: &[CorrelatedScope],
742 ) -> Result<()> {
743 let local_from = statement.from.clone();
744 for item in &mut statement.columns {
745 match item {
746 SelectItem::Expression(expr) => {
747 self.bind_expression_outer_references(ctx, &local_from, expr, scopes)?
748 }
749 SelectItem::Window { window, .. } => {
750 for expr in &mut window.partition_by {
751 self.bind_expression_outer_references(ctx, &local_from, expr, scopes)?;
752 }
753 }
754 SelectItem::Wildcard
755 | SelectItem::Column(_)
756 | SelectItem::CountAll
757 | SelectItem::Aggregate { .. } => {}
758 }
759 }
760
761 if let Some(where_clause) = &mut statement.where_clause {
762 for condition in &mut where_clause.conditions {
763 self.bind_condition_outer_references(ctx, &local_from, condition, scopes)?;
764 }
765 }
766
767 for expr in &mut statement.group_by {
768 self.bind_expression_outer_references(ctx, &local_from, expr, scopes)?;
769 }
770
771 if let Some(having_clause) = &mut statement.having_clause {
772 for condition in &mut having_clause.conditions {
773 self.bind_condition_outer_references(ctx, &local_from, condition, scopes)?;
774 }
775 }
776
777 for cte in &mut statement.with_clause {
778 self.bind_select_outer_references(ctx, &mut cte.query, scopes)?;
779 }
780
781 if let Some(set_operation) = &mut statement.set_operation {
782 self.bind_select_outer_references(ctx, &mut set_operation.right, scopes)?;
783 }
784
785 Ok(())
786 }
787
788 fn bind_condition_outer_references(
789 &self,
790 ctx: &ExecutionContext<'_>,
791 from: &TableReference,
792 condition: &mut Condition,
793 scopes: &[CorrelatedScope],
794 ) -> Result<()> {
795 match condition {
796 Condition::Comparison { left, right, .. } => {
797 self.bind_expression_outer_references(ctx, from, left, scopes)?;
798 self.bind_expression_outer_references(ctx, from, right, scopes)?;
799 }
800 Condition::InList { expr, values, .. } => {
801 self.bind_expression_outer_references(ctx, from, expr, scopes)?;
802 for value in values {
803 self.bind_expression_outer_references(ctx, from, value, scopes)?;
804 }
805 }
806 Condition::InSubquery { expr, subquery, .. } => {
807 self.bind_expression_outer_references(ctx, from, expr, scopes)?;
808 self.bind_select_outer_references(ctx, subquery, scopes)?;
809 }
810 Condition::Between {
811 expr, lower, upper, ..
812 } => {
813 self.bind_expression_outer_references(ctx, from, expr, scopes)?;
814 self.bind_expression_outer_references(ctx, from, lower, scopes)?;
815 self.bind_expression_outer_references(ctx, from, upper, scopes)?;
816 }
817 Condition::Like { expr, pattern, .. } => {
818 self.bind_expression_outer_references(ctx, from, expr, scopes)?;
819 self.bind_expression_outer_references(ctx, from, pattern, scopes)?;
820 }
821 Condition::Exists { subquery, .. } => {
822 self.bind_select_outer_references(ctx, subquery, scopes)?;
823 }
824 Condition::NullCheck { expr, .. } => {
825 self.bind_expression_outer_references(ctx, from, expr, scopes)?;
826 }
827 Condition::Not(inner) => {
828 self.bind_condition_outer_references(ctx, from, inner, scopes)?;
829 }
830 Condition::Logical { left, right, .. } => {
831 self.bind_condition_outer_references(ctx, from, left, scopes)?;
832 self.bind_condition_outer_references(ctx, from, right, scopes)?;
833 }
834 }
835
836 Ok(())
837 }
838
839 fn bind_expression_outer_references(
840 &self,
841 ctx: &ExecutionContext<'_>,
842 from: &TableReference,
843 expr: &mut Expression,
844 scopes: &[CorrelatedScope],
845 ) -> Result<()> {
846 match expr {
847 Expression::Column(name) => {
848 let local_scope = SelectStatement {
849 with_clause: Vec::new(),
850 distinct: false,
851 columns: Vec::new(),
852 column_aliases: Vec::new(),
853 from: from.clone(),
854 where_clause: None,
855 group_by: Vec::new(),
856 having_clause: None,
857 order_by: Vec::new(),
858 limit: None,
859 offset: None,
860 set_operation: None,
861 };
862 if validate_column_reference(&local_scope, name, &ctx.catalog, from).is_ok() {
863 return Ok(());
864 }
865 if let Some(value) = self.lookup_outer_scope_value(scopes, name)? {
866 *expr = Expression::Literal(raise_literal_value(&value));
867 }
868 }
869 Expression::Case {
870 branches,
871 else_expr,
872 } => {
873 for branch in branches {
874 self.bind_expression_outer_references(
875 ctx,
876 from,
877 &mut branch.condition,
878 scopes,
879 )?;
880 self.bind_expression_outer_references(ctx, from, &mut branch.result, scopes)?;
881 }
882 if let Some(else_expr) = else_expr {
883 self.bind_expression_outer_references(ctx, from, else_expr, scopes)?;
884 }
885 }
886 Expression::ScalarSubquery(subquery) => {
887 self.bind_select_outer_references(ctx, subquery, scopes)?;
888 }
889 Expression::ScalarFunctionCall { args, .. } => {
890 for arg in args {
891 self.bind_expression_outer_references(ctx, from, arg, scopes)?;
892 }
893 }
894 Expression::Cast { expr, .. } => {
895 self.bind_expression_outer_references(ctx, from, expr, scopes)?;
896 }
897 Expression::UnaryMinus(inner) => {
898 self.bind_expression_outer_references(ctx, from, inner, scopes)?;
899 }
900 Expression::UnaryNot(inner) => {
901 self.bind_expression_outer_references(ctx, from, inner, scopes)?;
902 }
903 Expression::Binary { left, right, .. } => {
904 self.bind_expression_outer_references(ctx, from, left, scopes)?;
905 self.bind_expression_outer_references(ctx, from, right, scopes)?;
906 }
907 Expression::Comparison { left, right, .. } => {
908 self.bind_expression_outer_references(ctx, from, left, scopes)?;
909 self.bind_expression_outer_references(ctx, from, right, scopes)?;
910 }
911 Expression::InList { expr, values, .. } => {
912 self.bind_expression_outer_references(ctx, from, expr, scopes)?;
913 for value in values {
914 self.bind_expression_outer_references(ctx, from, value, scopes)?;
915 }
916 }
917 Expression::InSubquery { expr, subquery, .. } => {
918 self.bind_expression_outer_references(ctx, from, expr, scopes)?;
919 self.bind_select_outer_references(ctx, subquery, scopes)?;
920 }
921 Expression::Between {
922 expr, lower, upper, ..
923 } => {
924 self.bind_expression_outer_references(ctx, from, expr, scopes)?;
925 self.bind_expression_outer_references(ctx, from, lower, scopes)?;
926 self.bind_expression_outer_references(ctx, from, upper, scopes)?;
927 }
928 Expression::Like { expr, pattern, .. } => {
929 self.bind_expression_outer_references(ctx, from, expr, scopes)?;
930 self.bind_expression_outer_references(ctx, from, pattern, scopes)?;
931 }
932 Expression::Exists { subquery, .. } => {
933 self.bind_select_outer_references(ctx, subquery, scopes)?;
934 }
935 Expression::NullCheck { expr, .. } => {
936 self.bind_expression_outer_references(ctx, from, expr, scopes)?;
937 }
938 Expression::Logical { left, right, .. } => {
939 self.bind_expression_outer_references(ctx, from, left, scopes)?;
940 self.bind_expression_outer_references(ctx, from, right, scopes)?;
941 }
942 Expression::AggregateCall { .. }
943 | Expression::Literal(_)
944 | Expression::IntervalLiteral { .. }
945 | Expression::Parameter(_) => {}
946 }
947
948 Ok(())
949 }
950
951 fn lookup_outer_scope_value(
952 &self,
953 scopes: &[CorrelatedScope],
954 column_reference: &str,
955 ) -> Result<Option<Value>> {
956 for scope in scopes.iter().rev() {
957 if let Some(index) = self.resolve_column_index(&scope.sources, column_reference)? {
958 return Ok(scope.row.get(index).cloned());
959 }
960 }
961
962 Ok(None)
963 }
964
965 fn execute_subquery_cached(
966 &self,
967 ctx: &mut ExecutionContext<'_>,
968 cache: &mut SubqueryCache,
969 subquery: &SelectStatement,
970 current_sources: Option<&[ResolvedSource]>,
971 current_row: Option<&[Value]>,
972 ) -> Result<QueryResult> {
973 if current_sources.is_some() && current_row.is_some() {
974 return self.execute_subquery(ctx, subquery, current_sources, current_row);
975 }
976
977 let key = subquery as *const SelectStatement as usize;
978 if let Some(result) = cache.get(&key) {
979 return Ok(result.clone());
980 }
981
982 let result = self.execute_subquery(ctx, subquery, None, None)?;
983 cache.insert(key, result.clone());
984 Ok(result)
985 }
986
987 fn execute_scalar_subquery_cached(
988 &self,
989 ctx: &mut ExecutionContext<'_>,
990 cache: &mut SubqueryCache,
991 subquery: &SelectStatement,
992 current_sources: Option<&[ResolvedSource]>,
993 current_row: Option<&[Value]>,
994 ) -> Result<Value> {
995 let result =
996 self.execute_subquery_cached(ctx, cache, subquery, current_sources, current_row)?;
997 if result.rows.len() > 1 {
998 return Err(HematiteError::ParseError(
999 "Scalar subquery returned more than one row".to_string(),
1000 ));
1001 }
1002 Ok(result
1003 .rows
1004 .into_iter()
1005 .next()
1006 .and_then(|row| row.into_iter().next())
1007 .unwrap_or(Value::Null))
1008 }
1009
1010 fn evaluate_condition(
1011 &self,
1012 ctx: &mut ExecutionContext<'_>,
1013 cache: &mut SubqueryCache,
1014 sources: &[ResolvedSource],
1015 condition: &Condition,
1016 row: &[Value],
1017 ) -> Result<Option<bool>> {
1018 match condition {
1019 Condition::Comparison {
1020 left,
1021 operator,
1022 right,
1023 } => {
1024 let left_val = self.evaluate_expression(ctx, cache, sources, left, row)?;
1025 let right_val = self.evaluate_expression(ctx, cache, sources, right, row)?;
1026 let text_context = self.merged_text_comparison_context(sources, left, right)?;
1027 Ok(self.compare_values(&left_val, operator, &right_val, text_context))
1028 }
1029 Condition::InList {
1030 expr,
1031 values,
1032 is_not,
1033 } => {
1034 let probe = self.evaluate_expression(ctx, cache, sources, expr, row)?;
1035 let candidates = values
1036 .iter()
1037 .map(|value_expr| {
1038 self.evaluate_expression(ctx, cache, sources, value_expr, row)
1039 })
1040 .collect::<Result<Vec<_>>>()?;
1041 let text_context = self.text_comparison_context_for_expression(sources, expr)?;
1042 Ok(self.evaluate_in_candidates(probe, candidates, *is_not, text_context))
1043 }
1044 Condition::InSubquery {
1045 expr,
1046 subquery,
1047 is_not,
1048 } => {
1049 let probe = self.evaluate_expression(ctx, cache, sources, expr, row)?;
1050 let subquery_result =
1051 self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
1052 let candidates = subquery_result
1053 .rows
1054 .into_iter()
1055 .map(|row| row.into_iter().next().unwrap_or(Value::Null))
1056 .collect::<Vec<_>>();
1057 let text_context = self.text_comparison_context_for_expression(sources, expr)?;
1058 Ok(self.evaluate_in_candidates(probe, candidates, *is_not, text_context))
1059 }
1060 Condition::Between {
1061 expr,
1062 lower,
1063 upper,
1064 is_not,
1065 } => {
1066 let value = self.evaluate_expression(ctx, cache, sources, expr, row)?;
1067 let lower_value = self.evaluate_expression(ctx, cache, sources, lower, row)?;
1068 let upper_value = self.evaluate_expression(ctx, cache, sources, upper, row)?;
1069
1070 if value.is_null() || lower_value.is_null() || upper_value.is_null() {
1071 return Ok(None);
1072 }
1073
1074 let text_context = self.text_comparison_context_for_expression(sources, expr)?;
1075 let lower_ok = sql_partial_cmp(&value, &lower_value, text_context)
1076 .map(|ordering| !ordering.is_lt());
1077 let upper_ok = sql_partial_cmp(&value, &upper_value, text_context)
1078 .map(|ordering| !ordering.is_gt());
1079
1080 match (lower_ok, upper_ok) {
1081 (Some(true), Some(true)) => Ok(Some(!is_not)),
1082 (Some(_), Some(_)) => Ok(Some(*is_not)),
1083 _ => Ok(None),
1084 }
1085 }
1086 Condition::Like {
1087 expr,
1088 pattern,
1089 is_not,
1090 } => {
1091 let value = self.evaluate_expression(ctx, cache, sources, expr, row)?;
1092 let pattern_value = self.evaluate_expression(ctx, cache, sources, pattern, row)?;
1093 let text_context = self.text_comparison_context_for_expression(sources, expr)?;
1094
1095 match (value, pattern_value) {
1096 (Value::Text(text), Value::Text(pattern)) => {
1097 let matched = like_matches_with_context(&pattern, &text, text_context);
1098 Ok(Some(if *is_not { !matched } else { matched }))
1099 }
1100 (left, right) if left.is_null() || right.is_null() => Ok(None),
1101 _ => Ok(None),
1102 }
1103 }
1104 Condition::Exists { subquery, is_not } => {
1105 let subquery_result =
1106 self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
1107 let exists = !subquery_result.rows.is_empty();
1108 Ok(Some(if *is_not { !exists } else { exists }))
1109 }
1110 Condition::NullCheck { expr, is_not } => {
1111 let value = self.evaluate_expression(ctx, cache, sources, expr, row)?;
1112 let is_null = value.is_null();
1113 Ok(Some(if *is_not { !is_null } else { is_null }))
1114 }
1115 Condition::Not(condition) => Ok(self
1116 .evaluate_condition(ctx, cache, sources, condition, row)?
1117 .map(|value| !value)),
1118 Condition::Logical {
1119 left,
1120 operator,
1121 right,
1122 } => {
1123 let left_result = self.evaluate_condition(ctx, cache, sources, left, row)?;
1124 let right_result = self.evaluate_condition(ctx, cache, sources, right, row)?;
1125
1126 match operator {
1127 LogicalOperator::And => Ok(self.logical_and(left_result, right_result)),
1128 LogicalOperator::Or => Ok(self.logical_or(left_result, right_result)),
1129 }
1130 }
1131 }
1132 }
1133
1134 fn project_row(
1135 &self,
1136 ctx: &mut ExecutionContext<'_>,
1137 cache: &mut SubqueryCache,
1138 sources: &[ResolvedSource],
1139 row: &[Value],
1140 ) -> Result<Vec<Value>> {
1141 let mut projected = Vec::new();
1142
1143 for item in &self.statement.columns {
1144 match item {
1145 SelectItem::Wildcard => projected.extend(row.iter().cloned()),
1146 SelectItem::Column(name) => {
1147 if let Some(index) = self.resolve_column_index(sources, name)? {
1148 if index < row.len() {
1149 projected.push(row[index].clone());
1150 }
1151 }
1152 }
1153 SelectItem::Expression(expr) => {
1154 projected.push(self.evaluate_expression(ctx, cache, sources, expr, row)?);
1155 }
1156 SelectItem::CountAll => {}
1157 SelectItem::Aggregate { .. } => {}
1158 SelectItem::Window { .. } => {}
1159 }
1160 }
1161
1162 Ok(projected)
1163 }
1164
1165 fn get_column_names(&self, sources: &[ResolvedSource]) -> Vec<String> {
1166 let mut columns = Vec::new();
1167
1168 for (index, item) in self.statement.columns.iter().enumerate() {
1169 match item {
1170 SelectItem::Wildcard => {
1171 for source in sources {
1172 for column in &source.columns {
1173 columns.push(column.clone());
1174 }
1175 }
1176 }
1177 _ => {
1178 if let Some(name) = self.statement.output_name(index) {
1179 columns.push(name);
1180 }
1181 }
1182 }
1183 }
1184
1185 columns
1186 }
1187
1188 fn shifted_sources(
1189 &self,
1190 mut sources: Vec<ResolvedSource>,
1191 offset: usize,
1192 ) -> Vec<ResolvedSource> {
1193 for source in &mut sources {
1194 source.offset += offset;
1195 }
1196 sources
1197 }
1198
1199 fn total_source_width(&self, sources: &[ResolvedSource]) -> usize {
1200 sources.iter().map(ResolvedSource::width).sum()
1201 }
1202
1203 fn combine_join_rows(&self, left_row: &[Value], right_row: &[Value]) -> Vec<Value> {
1204 let mut combined = left_row.to_vec();
1205 combined.extend(right_row.iter().cloned());
1206 combined
1207 }
1208
1209 fn combine_left_row_with_nulls(&self, left_row: &[Value], right_width: usize) -> Vec<Value> {
1210 let mut combined = left_row.to_vec();
1211 combined.extend(std::iter::repeat_n(Value::Null, right_width));
1212 combined
1213 }
1214
1215 fn combine_nulls_with_right_row(&self, left_width: usize, right_row: &[Value]) -> Vec<Value> {
1216 let mut combined = Vec::with_capacity(left_width + right_row.len());
1217 combined.extend(std::iter::repeat_n(Value::Null, left_width));
1218 combined.extend(right_row.iter().cloned());
1219 combined
1220 }
1221
1222 fn join_outer_is_left(&self, left_rows: &[Vec<Value>], right_rows: &[Vec<Value>]) -> bool {
1223 left_rows.len() <= right_rows.len()
1224 }
1225
1226 fn materialize_join_sources(
1227 &mut self,
1228 ctx: &mut ExecutionContext,
1229 left: &TableReference,
1230 right: &TableReference,
1231 ) -> Result<(
1232 Vec<ResolvedSource>,
1233 Vec<Vec<Value>>,
1234 usize,
1235 Vec<Vec<Value>>,
1236 usize,
1237 )> {
1238 let (left_sources, left_rows) = self.materialize_reference(ctx, left)?;
1239 let left_width = self.total_source_width(&left_sources);
1240 let (right_sources, right_rows) = self.materialize_reference(ctx, right)?;
1241 let right_width = self.total_source_width(&right_sources);
1242 let mut sources = left_sources;
1243 sources.extend(self.shifted_sources(right_sources, left_width));
1244 Ok((sources, left_rows, left_width, right_rows, right_width))
1245 }
1246
1247 fn push_matching_join_rows(
1248 &self,
1249 ctx: &mut ExecutionContext,
1250 sources: &[ResolvedSource],
1251 left_rows: &[Vec<Value>],
1252 right_rows: &[Vec<Value>],
1253 on: Option<&Condition>,
1254 rows: &mut Vec<Vec<Value>>,
1255 ) -> Result<()> {
1256 let push_matches = |outer_rows: &[Vec<Value>],
1257 inner_rows: &[Vec<Value>],
1258 outer_is_left: bool,
1259 rows: &mut Vec<Vec<Value>>| {
1260 for outer_row in outer_rows {
1261 for inner_row in inner_rows {
1262 rows.push(if outer_is_left {
1263 self.combine_join_rows(outer_row, inner_row)
1264 } else {
1265 self.combine_join_rows(inner_row, outer_row)
1266 });
1267 }
1268 }
1269 };
1270
1271 if on.is_none() {
1272 if self.join_outer_is_left(left_rows, right_rows) {
1273 push_matches(left_rows, right_rows, true, rows);
1274 } else {
1275 push_matches(right_rows, left_rows, false, rows);
1276 }
1277 return Ok(());
1278 }
1279
1280 let predicate = on.expect("checked above");
1281 if self.join_outer_is_left(left_rows, right_rows) {
1282 self.push_join_condition_matches(
1283 ctx, sources, left_rows, right_rows, true, predicate, rows,
1284 )
1285 } else {
1286 self.push_join_condition_matches(
1287 ctx, sources, right_rows, left_rows, false, predicate, rows,
1288 )
1289 }
1290 }
1291
1292 fn push_join_condition_matches(
1293 &self,
1294 ctx: &mut ExecutionContext,
1295 sources: &[ResolvedSource],
1296 outer_rows: &[Vec<Value>],
1297 inner_rows: &[Vec<Value>],
1298 outer_is_left: bool,
1299 predicate: &Condition,
1300 rows: &mut Vec<Vec<Value>>,
1301 ) -> Result<()> {
1302 let mut subquery_cache = SubqueryCache::new();
1303 for outer_row in outer_rows {
1304 for inner_row in inner_rows {
1305 let combined = if outer_is_left {
1306 self.combine_join_rows(outer_row, inner_row)
1307 } else {
1308 self.combine_join_rows(inner_row, outer_row)
1309 };
1310 if self.evaluate_condition(
1311 ctx,
1312 &mut subquery_cache,
1313 sources,
1314 predicate,
1315 &combined,
1316 )? == Some(true)
1317 {
1318 rows.push(combined);
1319 }
1320 }
1321 }
1322 Ok(())
1323 }
1324
1325 fn resolve_named_source(
1326 &self,
1327 ctx: &ExecutionContext,
1328 table_name: &str,
1329 alias: Option<String>,
1330 offset: usize,
1331 ) -> Result<ResolvedSource> {
1332 Ok(self.named_source(ctx, table_name, alias, offset)?.source)
1333 }
1334
1335 fn named_source(
1336 &self,
1337 ctx: &ExecutionContext,
1338 table_name: &str,
1339 alias: Option<String>,
1340 offset: usize,
1341 ) -> Result<NamedSource> {
1342 if let Some(result) = self.materialized_ctes.get(&Self::cte_key(table_name)) {
1343 return Ok(NamedSource {
1344 source: ResolvedSource {
1345 name: table_name.to_string(),
1346 columns: result.columns.clone(),
1347 column_types: vec![DataType::Text; result.columns.len()],
1348 column_collations: vec![None; result.columns.len()],
1349 alias,
1350 offset,
1351 },
1352 kind: NamedSourceKind::MaterializedCte(result.rows.clone()),
1353 });
1354 }
1355
1356 if let Some(cte) = self.statement.lookup_cte(table_name) {
1357 let columns = self.query_output_columns(&cte.query, ctx)?;
1358 return Ok(NamedSource {
1359 source: ResolvedSource {
1360 name: table_name.to_string(),
1361 column_types: vec![DataType::Text; columns.len()],
1362 column_collations: vec![None; columns.len()],
1363 columns,
1364 alias,
1365 offset,
1366 },
1367 kind: NamedSourceKind::Cte(cte.clone()),
1368 });
1369 }
1370
1371 let table = ctx
1372 .catalog
1373 .get_table_by_name(table_name)
1374 .ok_or_else(|| table_not_found_parse_error(table_name))?;
1375 Ok(NamedSource {
1376 source: ResolvedSource {
1377 name: table.name.clone(),
1378 columns: table
1379 .columns
1380 .iter()
1381 .map(|column| column.name.clone())
1382 .collect(),
1383 column_types: table
1384 .columns
1385 .iter()
1386 .map(|column| column.data_type.clone())
1387 .collect(),
1388 column_collations: table
1389 .columns
1390 .iter()
1391 .map(|column| column.collation.clone())
1392 .collect(),
1393 alias,
1394 offset,
1395 },
1396 kind: NamedSourceKind::BaseTable,
1397 })
1398 }
1399
1400 fn materialize_named_source(
1401 &mut self,
1402 ctx: &mut ExecutionContext,
1403 table_name: &str,
1404 alias: Option<String>,
1405 ) -> Result<(ResolvedSource, Vec<Vec<Value>>)> {
1406 let named_source = self.named_source(ctx, table_name, alias, 0)?;
1407 let rows = match named_source.kind {
1408 NamedSourceKind::BaseTable => ctx.engine.read_from_table(table_name)?,
1409 NamedSourceKind::MaterializedCte(rows) => rows,
1410 NamedSourceKind::Cte(cte) => {
1411 let key = Self::cte_key(table_name);
1412 if let Some(result) = self.materialized_ctes.get(&key) {
1413 result.rows.clone()
1414 } else {
1415 self.materialize_cte(ctx, &cte)?.rows
1416 }
1417 }
1418 };
1419 Ok((named_source.source, rows))
1420 }
1421
1422 fn materialize_cte(
1423 &mut self,
1424 ctx: &mut ExecutionContext<'_>,
1425 cte: &CommonTableExpression,
1426 ) -> Result<QueryResult> {
1427 let key = Self::cte_key(&cte.name);
1428 if let Some(result) = self.materialized_ctes.get(&key) {
1429 return Ok(result.clone());
1430 }
1431
1432 let result = if cte.recursive {
1433 self.execute_recursive_cte(ctx, cte)?
1434 } else {
1435 self.execute_subquery(ctx, &cte.query, None, None)?
1436 };
1437 self.materialized_ctes.insert(key, result.clone());
1438 Ok(result)
1439 }
1440
1441 fn execute_recursive_cte(
1442 &mut self,
1443 ctx: &mut ExecutionContext<'_>,
1444 cte: &CommonTableExpression,
1445 ) -> Result<QueryResult> {
1446 const MAX_RECURSIVE_CTE_ITERATIONS: usize = 1024;
1447
1448 let mut anchor = (*cte.query).clone();
1449 let set_operation = anchor.set_operation.take().ok_or_else(|| {
1450 HematiteError::ParseError(format!(
1451 "Recursive CTE '{}' requires UNION or UNION ALL",
1452 cte.name
1453 ))
1454 })?;
1455
1456 let operator = set_operation.operator;
1457 let mut recursive_term = *set_operation.right;
1458 recursive_term.with_clause.push(CommonTableExpression {
1459 name: cte.name.clone(),
1460 recursive: false,
1461 query: Box::new(anchor.clone()),
1462 });
1463 let anchor_result = self.execute_subquery(ctx, &anchor, None, None)?;
1464 let columns = anchor_result.columns.clone();
1465 let mut rows = match operator {
1466 SetOperator::Union => deduplicate_rows(anchor_result.rows),
1467 SetOperator::UnionAll => anchor_result.rows,
1468 _ => {
1469 return Err(HematiteError::ParseError(format!(
1470 "Recursive CTE '{}' requires UNION or UNION ALL",
1471 cte.name
1472 )))
1473 }
1474 };
1475 let mut delta = rows.clone();
1476
1477 let key = Self::cte_key(&cte.name);
1478 let mut converged = false;
1479 for _ in 0..MAX_RECURSIVE_CTE_ITERATIONS {
1480 self.materialized_ctes.insert(
1481 key.clone(),
1482 QueryResult {
1483 affected_rows: delta.len(),
1484 columns: columns.clone(),
1485 rows: delta.clone(),
1486 },
1487 );
1488
1489 let mut recursive_executor =
1490 SelectExecutor::new(recursive_term.clone(), SelectAccessPath::JoinScan);
1491 recursive_executor.outer_scopes = self.outer_scopes.clone();
1492 recursive_executor.materialized_ctes = self.materialized_ctes.clone();
1493 let next_rows = recursive_executor.execute_body(ctx)?.rows;
1494 if next_rows.is_empty() {
1495 converged = true;
1496 break;
1497 }
1498
1499 delta = match operator {
1500 SetOperator::Union => {
1501 let mut unique_rows = Vec::new();
1502 for row in next_rows {
1503 if !rows.contains(&row) && !unique_rows.contains(&row) {
1504 unique_rows.push(row);
1505 }
1506 }
1507 unique_rows
1508 }
1509 SetOperator::UnionAll => next_rows,
1510 _ => Vec::new(),
1511 };
1512
1513 if delta.is_empty() {
1514 converged = true;
1515 break;
1516 }
1517 rows.extend(delta.clone());
1518 }
1519
1520 self.materialized_ctes.insert(
1521 key,
1522 QueryResult {
1523 affected_rows: rows.len(),
1524 columns: columns.clone(),
1525 rows: rows.clone(),
1526 },
1527 );
1528
1529 if !converged {
1530 return Err(HematiteError::ParseError(format!(
1531 "Recursive CTE '{}' exceeded the maximum recursion depth of {}",
1532 cte.name, MAX_RECURSIVE_CTE_ITERATIONS
1533 )));
1534 }
1535
1536 Ok(QueryResult {
1537 affected_rows: rows.len(),
1538 columns,
1539 rows,
1540 })
1541 }
1542
1543 fn materialize_reference(
1544 &mut self,
1545 ctx: &mut ExecutionContext,
1546 from: &TableReference,
1547 ) -> Result<(Vec<ResolvedSource>, Vec<Vec<Value>>)> {
1548 match from {
1549 TableReference::Table(table_name, alias) => self
1550 .materialize_named_source(ctx, table_name, alias.clone())
1551 .map(|(source, rows)| (vec![source], rows)),
1552 TableReference::Derived { subquery, alias } => {
1553 let result = self.execute_subquery(ctx, subquery, None, None)?;
1554 Ok((
1555 vec![ResolvedSource {
1556 name: alias.clone(),
1557 columns: result.columns.clone(),
1558 column_types: vec![DataType::Text; result.columns.len()],
1559 column_collations: vec![None; result.columns.len()],
1560 alias: None,
1561 offset: 0,
1562 }],
1563 result.rows,
1564 ))
1565 }
1566 TableReference::CrossJoin(left, right) => {
1567 let (sources, left_rows, _, right_rows, _) =
1568 self.materialize_join_sources(ctx, left, right)?;
1569 let mut rows = Vec::new();
1570 self.push_matching_join_rows(
1571 ctx,
1572 &sources,
1573 &left_rows,
1574 &right_rows,
1575 None,
1576 &mut rows,
1577 )?;
1578 Ok((sources, rows))
1579 }
1580 TableReference::InnerJoin { left, right, on } => {
1581 let (sources, left_rows, _, right_rows, _) =
1582 self.materialize_join_sources(ctx, left, right)?;
1583 let mut rows = Vec::new();
1584 self.push_matching_join_rows(
1585 ctx,
1586 &sources,
1587 &left_rows,
1588 &right_rows,
1589 Some(on),
1590 &mut rows,
1591 )?;
1592 Ok((sources, rows))
1593 }
1594 TableReference::LeftJoin { left, right, on } => {
1595 let (sources, left_rows, _, right_rows, right_width) =
1596 self.materialize_join_sources(ctx, left, right)?;
1597
1598 let mut rows = Vec::new();
1599 let mut subquery_cache = SubqueryCache::new();
1600 for left_row in &left_rows {
1601 let mut matched = false;
1602 for right_row in &right_rows {
1603 let combined = self.combine_join_rows(left_row, right_row);
1604 if self.evaluate_condition(
1605 ctx,
1606 &mut subquery_cache,
1607 &sources,
1608 on,
1609 &combined,
1610 )? == Some(true)
1611 {
1612 rows.push(combined);
1613 matched = true;
1614 }
1615 }
1616
1617 if !matched {
1618 rows.push(self.combine_left_row_with_nulls(left_row, right_width));
1619 }
1620 }
1621
1622 Ok((sources, rows))
1623 }
1624 TableReference::RightJoin { left, right, on } => {
1625 let (sources, left_rows, left_width, right_rows, _) =
1626 self.materialize_join_sources(ctx, left, right)?;
1627
1628 let mut rows = Vec::new();
1629 let mut subquery_cache = SubqueryCache::new();
1630 for right_row in &right_rows {
1631 let mut matched = false;
1632 for left_row in &left_rows {
1633 let combined = self.combine_join_rows(left_row, right_row);
1634 if self.evaluate_condition(
1635 ctx,
1636 &mut subquery_cache,
1637 &sources,
1638 on,
1639 &combined,
1640 )? == Some(true)
1641 {
1642 rows.push(combined);
1643 matched = true;
1644 }
1645 }
1646
1647 if !matched {
1648 rows.push(self.combine_nulls_with_right_row(left_width, right_row));
1649 }
1650 }
1651
1652 Ok((sources, rows))
1653 }
1654 TableReference::FullOuterJoin { left, right, on } => {
1655 let (sources, left_rows, left_width, right_rows, right_width) =
1656 self.materialize_join_sources(ctx, left, right)?;
1657
1658 let mut rows = Vec::new();
1659 let mut matched_right = vec![false; right_rows.len()];
1660 let mut subquery_cache = SubqueryCache::new();
1661
1662 for left_row in &left_rows {
1663 let mut matched = false;
1664 for (index, right_row) in right_rows.iter().enumerate() {
1665 let combined = self.combine_join_rows(left_row, right_row);
1666 if self.evaluate_condition(
1667 ctx,
1668 &mut subquery_cache,
1669 &sources,
1670 on,
1671 &combined,
1672 )? == Some(true)
1673 {
1674 rows.push(combined);
1675 matched = true;
1676 matched_right[index] = true;
1677 }
1678 }
1679
1680 if !matched {
1681 rows.push(self.combine_left_row_with_nulls(left_row, right_width));
1682 }
1683 }
1684
1685 for (index, right_row) in right_rows.iter().enumerate() {
1686 if !matched_right[index] {
1687 rows.push(self.combine_nulls_with_right_row(left_width, right_row));
1688 }
1689 }
1690
1691 Ok((sources, rows))
1692 }
1693 }
1694 }
1695
1696 fn execute_body(&mut self, ctx: &mut ExecutionContext) -> Result<QueryResult> {
1697 if let Some(set_operation) = &self.statement.set_operation {
1698 let mut subquery_cache = SubqueryCache::new();
1699 let mut left_statement = self.statement.clone();
1700 left_statement.set_operation = None;
1701 let mut left_executor = SelectExecutor::new(left_statement, self.access_path.clone());
1702 left_executor.outer_scopes = self.outer_scopes.clone();
1703 left_executor.materialized_ctes = self.materialized_ctes.clone();
1704 let mut left_result = left_executor.execute_body(ctx)?;
1705 let right_result = self.execute_subquery_cached(
1706 ctx,
1707 &mut subquery_cache,
1708 &set_operation.right,
1709 None,
1710 None,
1711 )?;
1712
1713 left_result.rows =
1714 apply_set_operation(set_operation.operator, left_result.rows, right_result.rows);
1715 left_result.affected_rows = left_result.rows.len();
1716 return Ok(left_result);
1717 }
1718
1719 let (sources, mut filtered_rows) = self.materialize_filtered_rows(ctx)?;
1720 let mut subquery_cache = SubqueryCache::new();
1721
1722 if !self.statement.order_by.is_empty() {
1723 filtered_rows.sort_by(|left, right| {
1724 for item in &self.statement.order_by {
1725 let Ok(Some(index)) = self.resolve_column_index(&sources, &item.column) else {
1726 continue;
1727 };
1728
1729 let text_context = self
1730 .text_comparison_context_for_expression(
1731 &sources,
1732 &Expression::Column(item.column.clone()),
1733 )
1734 .ok()
1735 .flatten();
1736 let ordering =
1737 self.compare_sort_values(&left[index], &right[index], text_context);
1738 if ordering != Ordering::Equal {
1739 return match item.direction {
1740 SortDirection::Asc => ordering,
1741 SortDirection::Desc => ordering.reverse(),
1742 };
1743 }
1744 }
1745
1746 Ordering::Equal
1747 });
1748 }
1749
1750 if !self.statement.group_by.is_empty() || self.has_aggregate_projection() {
1751 return self.execute_grouped(ctx, &mut subquery_cache, &sources, &filtered_rows);
1752 }
1753
1754 if self.has_window_projection() {
1755 let mut projected_rows =
1756 self.project_rows_with_windows(ctx, &mut subquery_cache, &sources, &filtered_rows)?;
1757 apply_distinct_if_needed(self.statement.distinct, &mut projected_rows);
1758 self.apply_select_window(&mut projected_rows);
1759 return Ok(self.build_query_result(self.get_column_names(&sources), projected_rows));
1760 }
1761
1762 let mut projected_rows = Vec::new();
1763 for row in filtered_rows {
1764 projected_rows.push(self.project_row(ctx, &mut subquery_cache, &sources, &row)?);
1765 }
1766
1767 apply_distinct_if_needed(self.statement.distinct, &mut projected_rows);
1768 self.apply_select_window(&mut projected_rows);
1769 Ok(self.build_query_result(self.get_column_names(&sources), projected_rows))
1770 }
1771
1772 fn materialize_filtered_rows(
1773 &mut self,
1774 ctx: &mut ExecutionContext<'_>,
1775 ) -> Result<(Vec<ResolvedSource>, Vec<Vec<Value>>)> {
1776 let direct_table = match &self.statement.from {
1777 TableReference::Table(table_name, _)
1778 if self.statement.lookup_cte(table_name).is_none() =>
1779 {
1780 ctx.catalog.get_table_by_name(table_name).cloned()
1781 }
1782 _ => None,
1783 };
1784
1785 let from = self.statement.from.clone();
1786 let (sources, all_rows) = if self.uses_materialized_reference() {
1787 self.materialize_reference(ctx, &from)?
1788 } else if let (TableReference::Table(table_name, _), Some(table)) =
1789 (&from, direct_table.as_ref())
1790 {
1791 let sources = self.resolve_sources(ctx)?;
1792 let rows = self.materialize_table_access_rows(ctx, table_name, table)?;
1793 (sources, rows)
1794 } else {
1795 return Err(HematiteError::InternalError(
1796 "Planner selected a direct table access path for a non-table source".to_string(),
1797 ));
1798 };
1799
1800 let mut subquery_cache = SubqueryCache::new();
1801 let filtered_rows =
1802 self.filter_source_rows(ctx, &mut subquery_cache, &sources, all_rows)?;
1803 Ok((sources, filtered_rows))
1804 }
1805
1806 fn compare_sort_values(
1807 &self,
1808 left: &Value,
1809 right: &Value,
1810 text_context: Option<TextComparisonContext>,
1811 ) -> Ordering {
1812 match (left.is_null(), right.is_null()) {
1813 (true, true) => Ordering::Equal,
1814 (true, false) => Ordering::Less,
1815 (false, true) => Ordering::Greater,
1816 (false, false) => sql_partial_cmp(left, right, text_context).unwrap_or(Ordering::Equal),
1817 }
1818 }
1819
1820 fn has_aggregate_projection(&self) -> bool {
1821 self.statement
1822 .columns
1823 .iter()
1824 .any(|item| matches!(item, SelectItem::CountAll | SelectItem::Aggregate { .. }))
1825 }
1826
1827 fn has_window_projection(&self) -> bool {
1828 self.statement
1829 .columns
1830 .iter()
1831 .any(|item| matches!(item, SelectItem::Window { .. }))
1832 }
1833
1834 fn project_rows_with_windows(
1835 &self,
1836 ctx: &mut ExecutionContext<'_>,
1837 cache: &mut SubqueryCache,
1838 sources: &[ResolvedSource],
1839 filtered_rows: &[Vec<Value>],
1840 ) -> Result<Vec<Vec<Value>>> {
1841 let mut projected_rows = Vec::with_capacity(filtered_rows.len());
1842
1843 for (row_index, row) in filtered_rows.iter().enumerate() {
1844 let mut projected = Vec::new();
1845
1846 for item in &self.statement.columns {
1847 match item {
1848 SelectItem::Wildcard => projected.extend(row.iter().cloned()),
1849 SelectItem::Column(name) => {
1850 if let Some(index) = self.resolve_column_index(sources, name)? {
1851 if index < row.len() {
1852 projected.push(row[index].clone());
1853 }
1854 }
1855 }
1856 SelectItem::Expression(expr) => {
1857 projected.push(self.evaluate_expression(ctx, cache, sources, expr, row)?);
1858 }
1859 SelectItem::Window { function, window } => {
1860 projected.push(self.evaluate_window_item(
1861 ctx,
1862 cache,
1863 sources,
1864 filtered_rows,
1865 row_index,
1866 function,
1867 window,
1868 )?)
1869 }
1870 SelectItem::CountAll | SelectItem::Aggregate { .. } => {}
1871 }
1872 }
1873
1874 projected_rows.push(projected);
1875 }
1876
1877 Ok(projected_rows)
1878 }
1879
1880 fn evaluate_window_item(
1881 &self,
1882 ctx: &mut ExecutionContext<'_>,
1883 cache: &mut SubqueryCache,
1884 sources: &[ResolvedSource],
1885 filtered_rows: &[Vec<Value>],
1886 row_index: usize,
1887 function: &WindowFunction,
1888 window: &WindowSpec,
1889 ) -> Result<Value> {
1890 let partition_key = window
1891 .partition_by
1892 .iter()
1893 .map(|expr| {
1894 self.evaluate_expression(ctx, cache, sources, expr, &filtered_rows[row_index])
1895 })
1896 .collect::<Result<Vec<_>>>()?;
1897
1898 let mut partition_indexes = Vec::new();
1899 for (index, row) in filtered_rows.iter().enumerate() {
1900 let row_key = window
1901 .partition_by
1902 .iter()
1903 .map(|expr| self.evaluate_expression(ctx, cache, sources, expr, row))
1904 .collect::<Result<Vec<_>>>()?;
1905 if row_key == partition_key {
1906 partition_indexes.push(index);
1907 }
1908 }
1909
1910 if !window.order_by.is_empty() {
1911 partition_indexes.sort_by(|left_index, right_index| {
1912 let left = &filtered_rows[*left_index];
1913 let right = &filtered_rows[*right_index];
1914
1915 for item in &window.order_by {
1916 let Ok(Some(column_index)) = self.resolve_column_index(sources, &item.column)
1917 else {
1918 continue;
1919 };
1920
1921 let ordering = self.compare_sort_values(
1922 &left[column_index],
1923 &right[column_index],
1924 self.text_comparison_context_for_expression(
1925 sources,
1926 &Expression::Column(item.column.clone()),
1927 )
1928 .ok()
1929 .flatten(),
1930 );
1931 if ordering != Ordering::Equal {
1932 return match item.direction {
1933 SortDirection::Asc => ordering,
1934 SortDirection::Desc => ordering.reverse(),
1935 };
1936 }
1937 }
1938
1939 left_index.cmp(right_index)
1940 });
1941 }
1942
1943 let position = partition_indexes
1944 .iter()
1945 .position(|index| *index == row_index)
1946 .ok_or_else(|| {
1947 HematiteError::InternalError(
1948 "Current row not found in window partition".to_string(),
1949 )
1950 })?;
1951
1952 match function {
1953 WindowFunction::RowNumber => Ok(Value::Integer((position + 1) as i32)),
1954 WindowFunction::Rank => {
1955 let mut rank = 1usize;
1956 for current in 1..=position {
1957 if self.window_sort_key_changed(
1958 sources,
1959 window,
1960 &filtered_rows[partition_indexes[current - 1]],
1961 &filtered_rows[partition_indexes[current]],
1962 )? {
1963 rank = current + 1;
1964 }
1965 }
1966 Ok(Value::Integer(rank as i32))
1967 }
1968 WindowFunction::DenseRank => {
1969 let mut rank = 1usize;
1970 for current in 1..=position {
1971 if self.window_sort_key_changed(
1972 sources,
1973 window,
1974 &filtered_rows[partition_indexes[current - 1]],
1975 &filtered_rows[partition_indexes[current]],
1976 )? {
1977 rank += 1;
1978 }
1979 }
1980 Ok(Value::Integer(rank as i32))
1981 }
1982 WindowFunction::Aggregate { function, target } => {
1983 let partition_rows = partition_indexes
1984 .iter()
1985 .map(|index| filtered_rows[*index].clone())
1986 .collect::<Vec<_>>();
1987 Ok(self
1988 .evaluate_aggregate_value(sources, *function, target, &partition_rows)?
1989 .unwrap_or(Value::Null))
1990 }
1991 }
1992 }
1993
1994 fn window_sort_key_changed(
1995 &self,
1996 sources: &[ResolvedSource],
1997 window: &WindowSpec,
1998 left: &[Value],
1999 right: &[Value],
2000 ) -> Result<bool> {
2001 if window.order_by.is_empty() {
2002 return Ok(false);
2003 }
2004
2005 for item in &window.order_by {
2006 let index = self
2007 .resolve_column_index(sources, &item.column)?
2008 .ok_or_else(|| {
2009 HematiteError::ParseError(format!("Column '{}' not found", item.column))
2010 })?;
2011
2012 if self.compare_sort_values(
2013 &left[index],
2014 &right[index],
2015 self.text_comparison_context_for_expression(
2016 sources,
2017 &Expression::Column(item.column.clone()),
2018 )
2019 .ok()
2020 .flatten(),
2021 ) != Ordering::Equal
2022 {
2023 return Ok(true);
2024 }
2025 }
2026
2027 Ok(false)
2028 }
2029
2030 fn apply_select_window(&self, rows: &mut Vec<Vec<Value>>) {
2031 if let Some(offset) = self.statement.offset {
2032 if offset >= rows.len() {
2033 rows.clear();
2034 return;
2035 }
2036 rows.drain(0..offset);
2037 }
2038
2039 if let Some(limit) = self.statement.limit {
2040 rows.truncate(limit);
2041 }
2042 }
2043
2044 fn build_query_result(&self, columns: Vec<String>, rows: Vec<Vec<Value>>) -> QueryResult {
2045 QueryResult {
2046 affected_rows: rows.len(),
2047 columns,
2048 rows,
2049 }
2050 }
2051
2052 fn evaluate_aggregate_value(
2053 &self,
2054 sources: &[ResolvedSource],
2055 function: AggregateFunction,
2056 target: &AggregateTarget,
2057 rows: &[Vec<Value>],
2058 ) -> Result<Option<Value>> {
2059 if matches!(target, AggregateTarget::All) {
2060 return match function {
2061 AggregateFunction::Count => Ok(Some(Value::Integer(rows.len() as i32))),
2062 _ => Err(HematiteError::ParseError(format!(
2063 "{:?}(*) is not supported",
2064 function
2065 ))),
2066 };
2067 }
2068
2069 let AggregateTarget::Column(column) = target else {
2070 return Ok(None);
2071 };
2072
2073 let index = self
2074 .resolve_column_index(sources, column)?
2075 .ok_or_else(|| HematiteError::ParseError(format!("Column '{}' not found", column)))?;
2076
2077 let values: Vec<&Value> = rows
2078 .iter()
2079 .map(|row| &row[index])
2080 .filter(|value| !value.is_null())
2081 .collect();
2082
2083 if values.is_empty() {
2084 return Ok(Some(match function {
2085 AggregateFunction::Count => Value::Integer(0),
2086 _ => Value::Null,
2087 }));
2088 }
2089
2090 match function {
2091 AggregateFunction::Count => Ok(Some(Value::Integer(values.len() as i32))),
2092 AggregateFunction::Min => {
2093 let mut current = values[0].clone();
2094 for value in values.into_iter().skip(1) {
2095 if value.partial_cmp(¤t).is_some_and(|ord| ord.is_lt()) {
2096 current = value.clone();
2097 }
2098 }
2099 Ok(Some(current))
2100 }
2101 AggregateFunction::Max => {
2102 let mut current = values[0].clone();
2103 for value in values.into_iter().skip(1) {
2104 if value.partial_cmp(¤t).is_some_and(|ord| ord.is_gt()) {
2105 current = value.clone();
2106 }
2107 }
2108 Ok(Some(current))
2109 }
2110 AggregateFunction::Sum => {
2111 let mut int_sum: i64 = 0;
2112 let mut float_sum: f64 = 0.0;
2113 let mut has_float = false;
2114
2115 for value in &values {
2116 match value {
2117 Value::Integer(i) => {
2118 int_sum += *i as i64;
2119 float_sum += *i as f64;
2120 }
2121 Value::Float32(f) => {
2122 has_float = true;
2123 float_sum += *f as f64;
2124 }
2125 Value::Float(f) => {
2126 has_float = true;
2127 float_sum += *f;
2128 }
2129 _ => {
2130 return Err(HematiteError::ParseError(format!(
2131 "SUM() requires numeric values, found {:?}",
2132 value
2133 )))
2134 }
2135 }
2136 }
2137
2138 if has_float {
2139 Ok(Some(Value::Float(float_sum)))
2140 } else {
2141 Ok(Some(Value::Integer(int_sum as i32)))
2142 }
2143 }
2144 AggregateFunction::Avg => {
2145 let mut sum: f64 = 0.0;
2146 let count = values.len() as f64;
2147
2148 for value in &values {
2149 match value {
2150 Value::Integer(i) => {
2151 sum += *i as f64;
2152 }
2153 Value::Float32(f) => {
2154 sum += *f as f64;
2155 }
2156 Value::Float(f) => {
2157 sum += *f;
2158 }
2159 _ => {
2160 return Err(HematiteError::ParseError(format!(
2161 "AVG() requires numeric values, found {:?}",
2162 value
2163 )))
2164 }
2165 }
2166 }
2167
2168 Ok(Some(Value::Float(sum / count)))
2169 }
2170 }
2171 }
2172
2173 fn evaluate_aggregate_item(
2174 &self,
2175 sources: &[ResolvedSource],
2176 item: &SelectItem,
2177 rows: &[Vec<Value>],
2178 ) -> Result<Option<Value>> {
2179 match item {
2180 SelectItem::CountAll => self.evaluate_aggregate_value(
2181 sources,
2182 AggregateFunction::Count,
2183 &AggregateTarget::All,
2184 rows,
2185 ),
2186 SelectItem::Aggregate { function, column } => self.evaluate_aggregate_value(
2187 sources,
2188 *function,
2189 &AggregateTarget::Column(column.clone()),
2190 rows,
2191 ),
2192 _ => Ok(None),
2193 }
2194 }
2195
2196 fn result_column_index(
2197 &self,
2198 output_columns: &[String],
2199 order_by_column: &str,
2200 ) -> Option<usize> {
2201 let base_name = SelectStatement::column_reference_name(order_by_column);
2202 output_columns.iter().position(|column| {
2203 column.eq_ignore_ascii_case(order_by_column) || column.eq_ignore_ascii_case(base_name)
2204 })
2205 }
2206
2207 fn sort_projected_rows(&self, output_columns: &[String], rows: &mut [Vec<Value>]) {
2208 if self.statement.order_by.is_empty() {
2209 return;
2210 }
2211
2212 rows.sort_by(|left, right| {
2213 for item in &self.statement.order_by {
2214 let Some(index) = self.result_column_index(output_columns, &item.column) else {
2215 continue;
2216 };
2217
2218 let ordering = self.compare_sort_values(&left[index], &right[index], None);
2219 if ordering != Ordering::Equal {
2220 return match item.direction {
2221 SortDirection::Asc => ordering,
2222 SortDirection::Desc => ordering.reverse(),
2223 };
2224 }
2225 }
2226
2227 Ordering::Equal
2228 });
2229 }
2230
2231 fn evaluate_projected_expression(
2232 &self,
2233 ctx: &mut ExecutionContext<'_>,
2234 cache: &mut SubqueryCache,
2235 sources: &[ResolvedSource],
2236 expr: &Expression,
2237 row: &[Value],
2238 output_columns: &[String],
2239 group_rows: &[Vec<Value>],
2240 ) -> Result<Value> {
2241 match expr {
2242 Expression::Literal(value) => Ok(lower_literal_value(value)),
2243 Expression::IntervalLiteral { value, qualifier } => match qualifier {
2244 IntervalQualifier::YearToMonth => Ok(Value::IntervalYearMonth(
2245 IntervalYearMonthValue::parse(value)?,
2246 )),
2247 IntervalQualifier::DayToSecond => Ok(Value::IntervalDaySecond(
2248 IntervalDaySecondValue::parse(value)?,
2249 )),
2250 },
2251 Expression::Parameter(index) => Err(HematiteError::ParseError(format!(
2252 "Unbound parameter {} reached execution",
2253 index + 1
2254 ))),
2255 Expression::Cast { expr, target_type } => cast_value_to_type(
2256 self.evaluate_projected_expression(
2257 ctx,
2258 cache,
2259 sources,
2260 expr,
2261 row,
2262 output_columns,
2263 group_rows,
2264 )?,
2265 lower_type_name(target_type.clone()),
2266 ),
2267 Expression::Case {
2268 branches,
2269 else_expr,
2270 } => {
2271 for branch in branches {
2272 match self.evaluate_projected_boolean_expression(
2273 ctx,
2274 cache,
2275 sources,
2276 &branch.condition,
2277 row,
2278 output_columns,
2279 group_rows,
2280 )? {
2281 Some(true) => {
2282 return self.evaluate_projected_expression(
2283 ctx,
2284 cache,
2285 sources,
2286 &branch.result,
2287 row,
2288 output_columns,
2289 group_rows,
2290 )
2291 }
2292 Some(false) | None => {}
2293 }
2294 }
2295
2296 match else_expr {
2297 Some(else_expr) => self.evaluate_projected_expression(
2298 ctx,
2299 cache,
2300 sources,
2301 else_expr,
2302 row,
2303 output_columns,
2304 group_rows,
2305 ),
2306 None => Ok(Value::Null),
2307 }
2308 }
2309 Expression::ScalarSubquery(subquery) => {
2310 self.execute_scalar_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))
2311 }
2312 Expression::AggregateCall { function, target } => self
2313 .evaluate_aggregate_value(sources, *function, target, group_rows)?
2314 .ok_or_else(|| {
2315 HematiteError::InternalError(
2316 "Aggregate expression evaluation produced no value".to_string(),
2317 )
2318 }),
2319 Expression::ScalarFunctionCall { function, args } => {
2320 evaluate_scalar_function_call(*function, args, |expr| {
2321 self.evaluate_projected_expression(
2322 ctx,
2323 cache,
2324 sources,
2325 expr,
2326 row,
2327 output_columns,
2328 group_rows,
2329 )
2330 })
2331 }
2332 Expression::Column(name) => {
2333 let index = self
2334 .result_column_index(output_columns, name)
2335 .ok_or_else(|| {
2336 HematiteError::ParseError(format!(
2337 "HAVING column '{}' does not match any grouped output column or alias",
2338 name
2339 ))
2340 })?;
2341 row.get(index).cloned().ok_or_else(|| {
2342 HematiteError::InternalError(format!(
2343 "Grouped output row is missing column index {} for '{}'",
2344 index, name
2345 ))
2346 })
2347 }
2348 Expression::UnaryMinus(expr) => {
2349 negate_numeric_value(self.evaluate_projected_expression(
2350 ctx,
2351 cache,
2352 sources,
2353 expr,
2354 row,
2355 output_columns,
2356 group_rows,
2357 )?)
2358 }
2359 Expression::UnaryNot(_)
2360 | Expression::Comparison { .. }
2361 | Expression::InList { .. }
2362 | Expression::InSubquery { .. }
2363 | Expression::Between { .. }
2364 | Expression::Like { .. }
2365 | Expression::Exists { .. }
2366 | Expression::NullCheck { .. }
2367 | Expression::Logical { .. } => Ok(nullable_bool_to_value(
2368 self.evaluate_projected_boolean_expression(
2369 ctx,
2370 cache,
2371 sources,
2372 expr,
2373 row,
2374 output_columns,
2375 group_rows,
2376 )?,
2377 )),
2378 Expression::Binary {
2379 left,
2380 operator,
2381 right,
2382 } => self.evaluate_arithmetic(
2383 operator,
2384 self.evaluate_projected_expression(
2385 ctx,
2386 cache,
2387 sources,
2388 left,
2389 row,
2390 output_columns,
2391 group_rows,
2392 )?,
2393 self.evaluate_projected_expression(
2394 ctx,
2395 cache,
2396 sources,
2397 right,
2398 row,
2399 output_columns,
2400 group_rows,
2401 )?,
2402 ),
2403 }
2404 }
2405
2406 fn evaluate_projected_boolean_expression(
2407 &self,
2408 ctx: &mut ExecutionContext<'_>,
2409 cache: &mut SubqueryCache,
2410 sources: &[ResolvedSource],
2411 expr: &Expression,
2412 row: &[Value],
2413 output_columns: &[String],
2414 group_rows: &[Vec<Value>],
2415 ) -> Result<Option<bool>> {
2416 match expr {
2417 Expression::Comparison {
2418 left,
2419 operator,
2420 right,
2421 } => {
2422 let left_val = self.evaluate_projected_expression(
2423 ctx,
2424 cache,
2425 sources,
2426 left,
2427 row,
2428 output_columns,
2429 group_rows,
2430 )?;
2431 let right_val = self.evaluate_projected_expression(
2432 ctx,
2433 cache,
2434 sources,
2435 right,
2436 row,
2437 output_columns,
2438 group_rows,
2439 )?;
2440 let text_context = self.merged_text_comparison_context(sources, left, right)?;
2441 Ok(compare_condition_values(
2442 &left_val,
2443 operator,
2444 &right_val,
2445 text_context,
2446 ))
2447 }
2448 Expression::InList {
2449 expr,
2450 values,
2451 is_not,
2452 } => evaluate_in_list_predicate(
2453 expr,
2454 values,
2455 *is_not,
2456 self.text_comparison_context_for_expression(sources, expr)?,
2457 |value_expr| {
2458 self.evaluate_projected_expression(
2459 ctx,
2460 cache,
2461 sources,
2462 value_expr,
2463 row,
2464 output_columns,
2465 group_rows,
2466 )
2467 },
2468 ),
2469 Expression::InSubquery {
2470 expr,
2471 subquery,
2472 is_not,
2473 } => {
2474 let probe = self.evaluate_projected_expression(
2475 ctx,
2476 cache,
2477 sources,
2478 expr,
2479 row,
2480 output_columns,
2481 group_rows,
2482 )?;
2483 let subquery_result =
2484 self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
2485 let candidates = subquery_result
2486 .rows
2487 .into_iter()
2488 .map(|row| row.into_iter().next().unwrap_or(Value::Null))
2489 .collect::<Vec<_>>();
2490 let text_context = self.text_comparison_context_for_expression(sources, expr)?;
2491 Ok(evaluate_in_candidates(
2492 probe,
2493 candidates,
2494 *is_not,
2495 text_context,
2496 ))
2497 }
2498 Expression::Between {
2499 expr,
2500 lower,
2501 upper,
2502 is_not,
2503 } => evaluate_between_predicate(
2504 expr,
2505 lower,
2506 upper,
2507 *is_not,
2508 self.text_comparison_context_for_expression(sources, expr)?,
2509 |value_expr| {
2510 self.evaluate_projected_expression(
2511 ctx,
2512 cache,
2513 sources,
2514 value_expr,
2515 row,
2516 output_columns,
2517 group_rows,
2518 )
2519 },
2520 ),
2521 Expression::Like {
2522 expr,
2523 pattern,
2524 is_not,
2525 } => evaluate_like_predicate(
2526 expr,
2527 pattern,
2528 *is_not,
2529 self.text_comparison_context_for_expression(sources, expr)?,
2530 |value_expr| {
2531 self.evaluate_projected_expression(
2532 ctx,
2533 cache,
2534 sources,
2535 value_expr,
2536 row,
2537 output_columns,
2538 group_rows,
2539 )
2540 },
2541 ),
2542 Expression::Exists { subquery, is_not } => {
2543 let subquery_result =
2544 self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
2545 let exists = !subquery_result.rows.is_empty();
2546 Ok(Some(if *is_not { !exists } else { exists }))
2547 }
2548 Expression::NullCheck { expr, is_not } => {
2549 let value = self.evaluate_projected_expression(
2550 ctx,
2551 cache,
2552 sources,
2553 expr,
2554 row,
2555 output_columns,
2556 group_rows,
2557 )?;
2558 let is_null = value.is_null();
2559 Ok(Some(if *is_not { !is_null } else { is_null }))
2560 }
2561 Expression::UnaryNot(expr) => Ok(self
2562 .evaluate_projected_boolean_expression(
2563 ctx,
2564 cache,
2565 sources,
2566 expr,
2567 row,
2568 output_columns,
2569 group_rows,
2570 )?
2571 .map(|value| !value)),
2572 Expression::Logical {
2573 left,
2574 operator,
2575 right,
2576 } => {
2577 let left_result = self.evaluate_projected_boolean_expression(
2578 ctx,
2579 cache,
2580 sources,
2581 left,
2582 row,
2583 output_columns,
2584 group_rows,
2585 )?;
2586 let right_result = self.evaluate_projected_boolean_expression(
2587 ctx,
2588 cache,
2589 sources,
2590 right,
2591 row,
2592 output_columns,
2593 group_rows,
2594 )?;
2595 Ok(match operator {
2596 LogicalOperator::And => logical_and_values(left_result, right_result),
2597 LogicalOperator::Or => logical_or_values(left_result, right_result),
2598 })
2599 }
2600 _ => coerce_value_to_nullable_bool(
2601 self.evaluate_projected_expression(
2602 ctx,
2603 cache,
2604 sources,
2605 expr,
2606 row,
2607 output_columns,
2608 group_rows,
2609 )?,
2610 "Boolean expression",
2611 ),
2612 }
2613 }
2614
2615 fn evaluate_projected_condition(
2616 &self,
2617 ctx: &mut ExecutionContext<'_>,
2618 cache: &mut SubqueryCache,
2619 sources: &[ResolvedSource],
2620 condition: &Condition,
2621 row: &[Value],
2622 output_columns: &[String],
2623 group_rows: &[Vec<Value>],
2624 ) -> Result<Option<bool>> {
2625 match condition {
2626 Condition::Comparison {
2627 left,
2628 operator,
2629 right,
2630 } => {
2631 let left_val = self.evaluate_projected_expression(
2632 ctx,
2633 cache,
2634 sources,
2635 left,
2636 row,
2637 output_columns,
2638 group_rows,
2639 )?;
2640 let right_val = self.evaluate_projected_expression(
2641 ctx,
2642 cache,
2643 sources,
2644 right,
2645 row,
2646 output_columns,
2647 group_rows,
2648 )?;
2649 let text_context = self.merged_text_comparison_context(sources, left, right)?;
2650 Ok(self.compare_values(&left_val, operator, &right_val, text_context))
2651 }
2652 Condition::InList {
2653 expr,
2654 values,
2655 is_not,
2656 } => {
2657 let probe = self.evaluate_projected_expression(
2658 ctx,
2659 cache,
2660 sources,
2661 expr,
2662 row,
2663 output_columns,
2664 group_rows,
2665 )?;
2666 let candidates = values
2667 .iter()
2668 .map(|value_expr| {
2669 self.evaluate_projected_expression(
2670 ctx,
2671 cache,
2672 sources,
2673 value_expr,
2674 row,
2675 output_columns,
2676 group_rows,
2677 )
2678 })
2679 .collect::<Result<Vec<_>>>()?;
2680 let text_context = self.text_comparison_context_for_expression(sources, expr)?;
2681 Ok(self.evaluate_in_candidates(probe, candidates, *is_not, text_context))
2682 }
2683 Condition::InSubquery {
2684 expr,
2685 subquery,
2686 is_not,
2687 } => {
2688 let probe = self.evaluate_projected_expression(
2689 ctx,
2690 cache,
2691 sources,
2692 expr,
2693 row,
2694 output_columns,
2695 group_rows,
2696 )?;
2697 let subquery_result =
2698 self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
2699 let candidates = subquery_result
2700 .rows
2701 .into_iter()
2702 .map(|row| row.into_iter().next().unwrap_or(Value::Null))
2703 .collect::<Vec<_>>();
2704 let text_context = self.text_comparison_context_for_expression(sources, expr)?;
2705 Ok(self.evaluate_in_candidates(probe, candidates, *is_not, text_context))
2706 }
2707 Condition::Between {
2708 expr,
2709 lower,
2710 upper,
2711 is_not,
2712 } => {
2713 let value = self.evaluate_projected_expression(
2714 ctx,
2715 cache,
2716 sources,
2717 expr,
2718 row,
2719 output_columns,
2720 group_rows,
2721 )?;
2722 let lower_value = self.evaluate_projected_expression(
2723 ctx,
2724 cache,
2725 sources,
2726 lower,
2727 row,
2728 output_columns,
2729 group_rows,
2730 )?;
2731 let upper_value = self.evaluate_projected_expression(
2732 ctx,
2733 cache,
2734 sources,
2735 upper,
2736 row,
2737 output_columns,
2738 group_rows,
2739 )?;
2740
2741 if value.is_null() || lower_value.is_null() || upper_value.is_null() {
2742 return Ok(None);
2743 }
2744
2745 let text_context = self.text_comparison_context_for_expression(sources, expr)?;
2746 let lower_ok = sql_partial_cmp(&value, &lower_value, text_context)
2747 .map(|ordering| !ordering.is_lt());
2748 let upper_ok = sql_partial_cmp(&value, &upper_value, text_context)
2749 .map(|ordering| !ordering.is_gt());
2750
2751 match (lower_ok, upper_ok) {
2752 (Some(true), Some(true)) => Ok(Some(!is_not)),
2753 (Some(_), Some(_)) => Ok(Some(*is_not)),
2754 _ => Ok(None),
2755 }
2756 }
2757 Condition::Like {
2758 expr,
2759 pattern,
2760 is_not,
2761 } => {
2762 let value = self.evaluate_projected_expression(
2763 ctx,
2764 cache,
2765 sources,
2766 expr,
2767 row,
2768 output_columns,
2769 group_rows,
2770 )?;
2771 let pattern_value = self.evaluate_projected_expression(
2772 ctx,
2773 cache,
2774 sources,
2775 pattern,
2776 row,
2777 output_columns,
2778 group_rows,
2779 )?;
2780
2781 match (value, pattern_value) {
2782 (Value::Text(text), Value::Text(pattern)) => {
2783 let matched = Self::like_matches(&pattern, &text);
2784 Ok(Some(if *is_not { !matched } else { matched }))
2785 }
2786 (left, right) if left.is_null() || right.is_null() => Ok(None),
2787 _ => Ok(None),
2788 }
2789 }
2790 Condition::Exists { subquery, is_not } => {
2791 let subquery_result =
2792 self.execute_subquery_cached(ctx, cache, subquery, Some(sources), Some(row))?;
2793 let exists = !subquery_result.rows.is_empty();
2794 Ok(Some(if *is_not { !exists } else { exists }))
2795 }
2796 Condition::NullCheck { expr, is_not } => {
2797 let value = self.evaluate_projected_expression(
2798 ctx,
2799 cache,
2800 sources,
2801 expr,
2802 row,
2803 output_columns,
2804 group_rows,
2805 )?;
2806 let is_null = value.is_null();
2807 Ok(Some(if *is_not { !is_null } else { is_null }))
2808 }
2809 Condition::Not(condition) => Ok(self
2810 .evaluate_projected_condition(
2811 ctx,
2812 cache,
2813 sources,
2814 condition,
2815 row,
2816 output_columns,
2817 group_rows,
2818 )?
2819 .map(|value| !value)),
2820 Condition::Logical {
2821 left,
2822 operator,
2823 right,
2824 } => {
2825 let left_result = self.evaluate_projected_condition(
2826 ctx,
2827 cache,
2828 sources,
2829 left,
2830 row,
2831 output_columns,
2832 group_rows,
2833 )?;
2834 let right_result = self.evaluate_projected_condition(
2835 ctx,
2836 cache,
2837 sources,
2838 right,
2839 row,
2840 output_columns,
2841 group_rows,
2842 )?;
2843
2844 match operator {
2845 LogicalOperator::And => Ok(self.logical_and(left_result, right_result)),
2846 LogicalOperator::Or => Ok(self.logical_or(left_result, right_result)),
2847 }
2848 }
2849 }
2850 }
2851
2852 fn project_grouped_row(
2853 &self,
2854 ctx: &mut ExecutionContext<'_>,
2855 cache: &mut SubqueryCache,
2856 sources: &[ResolvedSource],
2857 group_rows: &[Vec<Value>],
2858 ) -> Result<Vec<Value>> {
2859 let representative = group_rows.first().map(Vec::as_slice).unwrap_or(&[]);
2860 let mut projected = Vec::new();
2861
2862 for item in &self.statement.columns {
2863 match item {
2864 SelectItem::Wildcard => {}
2865 SelectItem::Column(name) => {
2866 if let Some(index) = self.resolve_column_index(sources, name)? {
2867 if index < representative.len() {
2868 projected.push(representative[index].clone());
2869 }
2870 }
2871 }
2872 SelectItem::Expression(expr) => {
2873 projected.push(self.evaluate_expression(
2874 ctx,
2875 cache,
2876 sources,
2877 expr,
2878 representative,
2879 )?);
2880 }
2881 SelectItem::CountAll | SelectItem::Aggregate { .. } => {
2882 projected.push(
2883 self.evaluate_aggregate_item(sources, item, group_rows)?
2884 .unwrap_or(Value::Null),
2885 );
2886 }
2887 SelectItem::Window { .. } => {
2888 return Err(HematiteError::InternalError(
2889 "Window projections are not supported in grouped execution".to_string(),
2890 ))
2891 }
2892 }
2893 }
2894
2895 Ok(projected)
2896 }
2897
2898 fn build_groups(
2899 &self,
2900 ctx: &mut ExecutionContext<'_>,
2901 cache: &mut SubqueryCache,
2902 sources: &[ResolvedSource],
2903 filtered_rows: &[Vec<Value>],
2904 ) -> Result<Vec<Vec<Vec<Value>>>> {
2905 if self.statement.group_by.is_empty() {
2906 if filtered_rows.is_empty() && self.has_aggregate_projection() {
2907 return Ok(vec![Vec::new()]);
2908 }
2909 return Ok(vec![filtered_rows.to_vec()]);
2910 }
2911
2912 let mut keyed_groups: Vec<(Vec<Value>, Vec<Vec<Value>>)> = Vec::new();
2913 for row in filtered_rows {
2914 let key = self
2915 .statement
2916 .group_by
2917 .iter()
2918 .map(|expr| self.evaluate_expression(ctx, cache, sources, expr, row))
2919 .collect::<Result<Vec<_>>>()?;
2920
2921 if let Some((_, rows)) = keyed_groups
2922 .iter_mut()
2923 .find(|(existing_key, _)| *existing_key == key)
2924 {
2925 rows.push(row.clone());
2926 } else {
2927 keyed_groups.push((key, vec![row.clone()]));
2928 }
2929 }
2930
2931 Ok(keyed_groups.into_iter().map(|(_, rows)| rows).collect())
2932 }
2933
2934 fn apply_having_clause(
2935 &self,
2936 ctx: &mut ExecutionContext<'_>,
2937 cache: &mut SubqueryCache,
2938 sources: &[ResolvedSource],
2939 output_columns: &[String],
2940 grouped_rows: Vec<GroupedRow>,
2941 ) -> Result<Vec<Vec<Value>>> {
2942 let Some(having_clause) = &self.statement.having_clause else {
2943 return Ok(grouped_rows
2944 .into_iter()
2945 .map(|group| group.projected)
2946 .collect::<Vec<_>>());
2947 };
2948
2949 let mut filtered_rows = Vec::with_capacity(grouped_rows.len());
2950 for grouped in grouped_rows {
2951 if self.projected_conditions_match(
2952 ctx,
2953 cache,
2954 sources,
2955 &having_clause.conditions,
2956 &grouped.projected,
2957 output_columns,
2958 &grouped.source_rows,
2959 )? {
2960 filtered_rows.push(grouped.projected);
2961 }
2962 }
2963
2964 Ok(filtered_rows)
2965 }
2966
2967 fn execute_grouped(
2968 &self,
2969 ctx: &mut ExecutionContext,
2970 cache: &mut SubqueryCache,
2971 sources: &[ResolvedSource],
2972 filtered_rows: &[Vec<Value>],
2973 ) -> Result<QueryResult> {
2974 let groups = self.build_groups(ctx, cache, sources, filtered_rows)?;
2975 let output_columns = self.get_column_names(sources);
2976 let mut grouped_rows = Vec::with_capacity(groups.len());
2977 for rows in groups {
2978 grouped_rows.push(GroupedRow {
2979 projected: self.project_grouped_row(ctx, cache, sources, &rows)?,
2980 source_rows: rows,
2981 });
2982 }
2983
2984 let projected_rows =
2985 self.apply_having_clause(ctx, cache, sources, &output_columns, grouped_rows)?;
2986 self.finalize_grouped_rows(output_columns, projected_rows)
2987 }
2988
2989 fn finalize_grouped_rows(
2990 &self,
2991 output_columns: Vec<String>,
2992 mut projected_rows: Vec<Vec<Value>>,
2993 ) -> Result<QueryResult> {
2994 apply_distinct_if_needed(self.statement.distinct, &mut projected_rows);
2995
2996 self.sort_projected_rows(&output_columns, &mut projected_rows);
2997 self.apply_select_window(&mut projected_rows);
2998
2999 Ok(self.build_query_result(output_columns, projected_rows))
3000 }
3001
3002 fn filter_source_rows(
3003 &self,
3004 ctx: &mut ExecutionContext<'_>,
3005 cache: &mut SubqueryCache,
3006 sources: &[ResolvedSource],
3007 all_rows: Vec<Vec<Value>>,
3008 ) -> Result<Vec<Vec<Value>>> {
3009 let skip_filter = matches!(self.access_path, SelectAccessPath::RowIdLookup);
3010 let mut filtered_rows = Vec::new();
3011
3012 for row in all_rows {
3013 let include = if skip_filter {
3014 true
3015 } else {
3016 match &self.statement.where_clause {
3017 Some(where_clause) => {
3018 self.conditions_match(ctx, cache, sources, &where_clause.conditions, &row)?
3019 }
3020 None => true,
3021 }
3022 };
3023
3024 if include {
3025 filtered_rows.push(row);
3026 }
3027 }
3028
3029 Ok(filtered_rows)
3030 }
3031
3032 fn conditions_match(
3033 &self,
3034 ctx: &mut ExecutionContext<'_>,
3035 cache: &mut SubqueryCache,
3036 sources: &[ResolvedSource],
3037 conditions: &[Condition],
3038 row: &[Value],
3039 ) -> Result<bool> {
3040 conditions_match_with(conditions, |condition| {
3041 self.evaluate_condition(ctx, cache, sources, condition, row)
3042 })
3043 }
3044
3045 fn projected_conditions_match(
3046 &self,
3047 ctx: &mut ExecutionContext<'_>,
3048 cache: &mut SubqueryCache,
3049 sources: &[ResolvedSource],
3050 conditions: &[Condition],
3051 row: &[Value],
3052 output_columns: &[String],
3053 group_rows: &[Vec<Value>],
3054 ) -> Result<bool> {
3055 conditions_match_with(conditions, |condition| {
3056 self.evaluate_projected_condition(
3057 ctx,
3058 cache,
3059 sources,
3060 condition,
3061 row,
3062 output_columns,
3063 group_rows,
3064 )
3065 })
3066 }
3067
3068 fn extract_primary_key_lookup(&self, table: &Table) -> Option<Vec<Value>> {
3069 let equalities = extract_literal_equalities(self.statement.where_clause.as_ref()?)?;
3070 table
3071 .primary_key_columns
3072 .iter()
3073 .map(|&index| table.columns.get(index))
3074 .collect::<Option<Vec<_>>>()?
3075 .into_iter()
3076 .map(|column| equalities.get(column.name.as_str()).cloned())
3077 .collect()
3078 }
3079
3080 fn extract_secondary_index_lookup(
3081 &self,
3082 table: &Table,
3083 index_name: &str,
3084 ) -> Option<Vec<Value>> {
3085 let index = table.get_secondary_index(index_name)?;
3086 let equalities = extract_literal_equalities(self.statement.where_clause.as_ref()?)?;
3087 index
3088 .column_indices
3089 .iter()
3090 .map(|&column_index| table.columns.get(column_index))
3091 .collect::<Option<Vec<_>>>()?
3092 .into_iter()
3093 .map(|column| equalities.get(column.name.as_str()).cloned())
3094 .collect()
3095 }
3096
3097 fn extract_rowid_lookup(&self) -> Option<u64> {
3098 let equalities = extract_literal_equalities(self.statement.where_clause.as_ref()?)?;
3099 match equalities.get("rowid") {
3100 Some(Value::Integer(v)) if v >= &0 => Some(*v as u64),
3101 _ => None,
3102 }
3103 }
3104
3105 fn uses_materialized_reference(&self) -> bool {
3106 matches!(
3107 (&self.access_path, &self.statement.from),
3108 (SelectAccessPath::JoinScan, _)
3109 | (_, TableReference::Derived { .. })
3110 | (_, TableReference::CrossJoin(_, _))
3111 | (_, TableReference::InnerJoin { .. })
3112 | (_, TableReference::LeftJoin { .. })
3113 | (_, TableReference::RightJoin { .. })
3114 | (_, TableReference::FullOuterJoin { .. })
3115 )
3116 }
3117
3118 fn materialize_table_access_rows(
3119 &self,
3120 ctx: &mut ExecutionContext,
3121 table_name: &str,
3122 table: &Table,
3123 ) -> Result<Vec<Vec<Value>>> {
3124 match self.access_path {
3125 SelectAccessPath::RowIdLookup => {
3126 let rowid = self.extract_rowid_lookup().ok_or_else(|| {
3127 HematiteError::InternalError(
3128 "Planner selected rowid lookup without a matching predicate".to_string(),
3129 )
3130 })?;
3131 Ok(ctx
3132 .engine
3133 .lookup_row_by_rowid(table_name, rowid)?
3134 .map(|row| vec![row.values])
3135 .unwrap_or_default())
3136 }
3137 SelectAccessPath::PrimaryKeyLookup => {
3138 let primary_key_values =
3139 self.extract_primary_key_lookup(table).ok_or_else(|| {
3140 HematiteError::InternalError(
3141 "Planner selected primary-key lookup without a matching predicate"
3142 .to_string(),
3143 )
3144 })?;
3145 let encoded_key = ctx.engine.encode_primary_key(&primary_key_values)?;
3146 let mut index_cursor = ctx.engine.open_primary_key_cursor(table)?;
3147 let rowid = index_cursor
3148 .seek_key(&encoded_key)
3149 .then(|| index_cursor.current().map(|entry| entry.row_id))
3150 .flatten();
3151 match rowid {
3152 Some(rowid) => {
3153 let mut table_cursor = ctx.engine.open_table_cursor(table_name)?;
3154 Ok(table_cursor
3155 .seek_rowid(rowid)
3156 .then(|| table_cursor.current().map(|row| vec![row.values.clone()]))
3157 .flatten()
3158 .unwrap_or_default())
3159 }
3160 None => Ok(Vec::new()),
3161 }
3162 }
3163 SelectAccessPath::SecondaryIndexLookup(ref index_name) => {
3164 let key_values = self
3165 .extract_secondary_index_lookup(table, index_name)
3166 .ok_or_else(|| {
3167 HematiteError::InternalError(format!(
3168 "Planner selected secondary index lookup '{}' without a matching predicate",
3169 index_name
3170 ))
3171 })?;
3172 let encoded_key = ctx.engine.encode_secondary_index_key(&key_values)?;
3173 let mut index_cursor = ctx.engine.open_secondary_index_cursor(table, index_name)?;
3174 let mut table_cursor = ctx.engine.open_table_cursor(table_name)?;
3175 let mut rows = Vec::new();
3176
3177 if index_cursor.seek_key(&encoded_key) {
3178 loop {
3179 let Some(entry) = index_cursor.current() else {
3180 break;
3181 };
3182 if entry.key.as_slice() != encoded_key.as_slice() {
3183 break;
3184 }
3185 if table_cursor.seek_rowid(entry.row_id) {
3186 if let Some(row) = table_cursor.current() {
3187 rows.push(row.values.clone());
3188 }
3189 }
3190 if !index_cursor.next() {
3191 break;
3192 }
3193 }
3194 }
3195
3196 Ok(rows)
3197 }
3198 SelectAccessPath::FullTableScan => ctx.engine.read_from_table(table_name),
3199 SelectAccessPath::JoinScan => Err(HematiteError::InternalError(
3200 "Planner selected join scan for direct table access".to_string(),
3201 )),
3202 }
3203 }
3204}
3205
3206impl QueryExecutor for SelectExecutor {
3207 fn execute(&mut self, ctx: &mut ExecutionContext) -> Result<QueryResult> {
3208 validate_statement(&Statement::Select(self.statement.clone()), &ctx.catalog)?;
3209 self.execute_body(ctx)
3210 }
3211}
3212
3213#[derive(Debug, Clone)]
3214pub struct InsertExecutor {
3215 pub statement: InsertStatement,
3216}
3217
3218impl InsertExecutor {
3219 pub fn new(statement: InsertStatement) -> Self {
3220 Self { statement }
3221 }
3222
3223 fn evaluate_value_expression(&self, expr: &Expression) -> Result<Value> {
3224 match expr {
3225 Expression::Literal(value) => Ok(lower_literal_value(value)),
3226 Expression::IntervalLiteral { value, qualifier } => match qualifier {
3227 IntervalQualifier::YearToMonth => Ok(Value::IntervalYearMonth(
3228 IntervalYearMonthValue::parse(value)?,
3229 )),
3230 IntervalQualifier::DayToSecond => Ok(Value::IntervalDaySecond(
3231 IntervalDaySecondValue::parse(value)?,
3232 )),
3233 },
3234 Expression::Parameter(index) => Err(HematiteError::ParseError(format!(
3235 "Unbound parameter {} reached execution",
3236 index + 1
3237 ))),
3238 Expression::Cast { expr, target_type } => cast_value_to_type(
3239 self.evaluate_value_expression(expr)?,
3240 lower_type_name(target_type.clone()),
3241 ),
3242 Expression::Case {
3243 branches,
3244 else_expr,
3245 } => evaluate_case_expression(
3246 branches,
3247 else_expr.as_deref(),
3248 |condition| self.evaluate_boolean_value_expression(condition),
3249 |expr| self.evaluate_value_expression(expr),
3250 ),
3251 Expression::ScalarSubquery(_) => Err(HematiteError::ParseError(
3252 "INSERT expressions cannot use scalar subqueries".to_string(),
3253 )),
3254 Expression::AggregateCall { .. } => Err(HematiteError::ParseError(
3255 "INSERT expressions cannot use aggregate functions".to_string(),
3256 )),
3257 Expression::ScalarFunctionCall { function, args } => {
3258 evaluate_scalar_function_call(*function, args, |expr| {
3259 self.evaluate_value_expression(expr)
3260 })
3261 }
3262 Expression::UnaryMinus(expr) => {
3263 negate_numeric_value(self.evaluate_value_expression(expr)?)
3264 }
3265 Expression::UnaryNot(_)
3266 | Expression::Comparison { .. }
3267 | Expression::InList { .. }
3268 | Expression::InSubquery { .. }
3269 | Expression::Between { .. }
3270 | Expression::Like { .. }
3271 | Expression::Exists { .. }
3272 | Expression::NullCheck { .. }
3273 | Expression::Logical { .. } => Ok(nullable_bool_to_value(
3274 self.evaluate_boolean_value_expression(expr)?,
3275 )),
3276 Expression::Binary {
3277 left,
3278 operator,
3279 right,
3280 } => evaluate_arithmetic_values(
3281 operator,
3282 self.evaluate_value_expression(left)?,
3283 self.evaluate_value_expression(right)?,
3284 ),
3285 Expression::Column(name) => Err(HematiteError::ParseError(format!(
3286 "INSERT expressions cannot reference column '{}'",
3287 name
3288 ))),
3289 }
3290 }
3291
3292 fn evaluate_boolean_value_expression(&self, expr: &Expression) -> Result<Option<bool>> {
3293 match expr {
3294 Expression::Comparison {
3295 left,
3296 operator,
3297 right,
3298 } => {
3299 let left_val = self.evaluate_value_expression(left)?;
3300 let right_val = self.evaluate_value_expression(right)?;
3301 Ok(compare_condition_values(
3302 &left_val, operator, &right_val, None,
3303 ))
3304 }
3305 Expression::InList {
3306 expr,
3307 values,
3308 is_not,
3309 } => evaluate_in_list_predicate(expr, values, *is_not, None, |value_expr| {
3310 self.evaluate_value_expression(value_expr)
3311 }),
3312 Expression::InSubquery { .. } => Err(HematiteError::ParseError(
3313 "INSERT expressions cannot use subqueries in boolean expressions".to_string(),
3314 )),
3315 Expression::Between {
3316 expr,
3317 lower,
3318 upper,
3319 is_not,
3320 } => evaluate_between_predicate(expr, lower, upper, *is_not, None, |value_expr| {
3321 self.evaluate_value_expression(value_expr)
3322 }),
3323 Expression::Like {
3324 expr,
3325 pattern,
3326 is_not,
3327 } => evaluate_like_predicate(expr, pattern, *is_not, None, |value_expr| {
3328 self.evaluate_value_expression(value_expr)
3329 }),
3330 Expression::Exists { .. } => Err(HematiteError::ParseError(
3331 "INSERT expressions cannot use EXISTS in boolean expressions".to_string(),
3332 )),
3333 Expression::NullCheck { expr, is_not } => {
3334 let value = self.evaluate_value_expression(expr)?;
3335 let is_null = value.is_null();
3336 Ok(Some(if *is_not { !is_null } else { is_null }))
3337 }
3338 Expression::UnaryNot(expr) => Ok(self
3339 .evaluate_boolean_value_expression(expr)?
3340 .map(|value| !value)),
3341 Expression::Logical {
3342 left,
3343 operator,
3344 right,
3345 } => {
3346 let left_result = self.evaluate_boolean_value_expression(left)?;
3347 let right_result = self.evaluate_boolean_value_expression(right)?;
3348 Ok(match operator {
3349 LogicalOperator::And => logical_and_values(left_result, right_result),
3350 LogicalOperator::Or => logical_or_values(left_result, right_result),
3351 })
3352 }
3353 _ => coerce_value_to_nullable_bool(
3354 self.evaluate_value_expression(expr)?,
3355 "Boolean expression",
3356 ),
3357 }
3358 }
3359
3360 fn ensure_primary_key_is_unique(
3361 &self,
3362 ctx: &mut ExecutionContext,
3363 table: &Table,
3364 existing_rows: &[Vec<Value>],
3365 candidate_row: &[Value],
3366 ) -> Result<()> {
3367 let candidate_pk = primary_key_values(table, candidate_row)?;
3368
3369 if ctx
3370 .engine
3371 .lookup_row_by_primary_key(table, &candidate_pk)?
3372 .is_some()
3373 {
3374 return Err(duplicate_primary_key_parse_error(
3375 &table.name,
3376 &candidate_pk,
3377 ));
3378 }
3379
3380 for existing_row in existing_rows {
3381 let existing_pk = primary_key_values(table, existing_row)?;
3382
3383 if existing_pk == candidate_pk {
3384 return Err(duplicate_primary_key_parse_error(
3385 &table.name,
3386 &candidate_pk,
3387 ));
3388 }
3389 }
3390
3391 Ok(())
3392 }
3393
3394 fn ensure_unique_secondary_indexes_are_unique(
3395 &self,
3396 ctx: &mut ExecutionContext,
3397 table: &Table,
3398 candidate_row: &[Value],
3399 ) -> Result<()> {
3400 for index in table.secondary_indexes.iter().filter(|index| index.unique) {
3401 let key_values = secondary_index_key_values(index, candidate_row);
3402 if !ctx
3403 .engine
3404 .lookup_secondary_index_rowids(table, &index.name, &key_values)?
3405 .is_empty()
3406 {
3407 return Err(unique_index_parse_error(&index.name, &table.name));
3408 }
3409 }
3410
3411 Ok(())
3412 }
3413
3414 fn find_conflicting_row(
3415 &self,
3416 ctx: &mut ExecutionContext<'_>,
3417 table: &Table,
3418 candidate_row: &[Value],
3419 ) -> Result<Option<StoredRow>> {
3420 let mut conflict_row: Option<StoredRow> = ctx
3421 .engine
3422 .lookup_row_by_primary_key(table, &primary_key_values(table, candidate_row)?)?;
3423
3424 for index in table.secondary_indexes.iter().filter(|index| index.unique) {
3425 let key_values = secondary_index_key_values(index, candidate_row);
3426 for row_id in
3427 ctx.engine
3428 .lookup_secondary_index_rowids(table, &index.name, &key_values)?
3429 {
3430 let row = ctx
3431 .engine
3432 .lookup_row_by_rowid(&table.name, row_id)?
3433 .ok_or_else(|| {
3434 HematiteError::CorruptedData(format!(
3435 "Unique index '{}' points at missing rowid {} in table '{}'",
3436 index.name, row_id, table.name
3437 ))
3438 })?;
3439 if let Some(existing) = &conflict_row {
3440 if existing.row_id != row.row_id {
3441 return Err(HematiteError::ParseError(format!(
3442 "INSERT ON DUPLICATE KEY UPDATE matched multiple rows in table '{}'",
3443 table.name
3444 )));
3445 }
3446 } else {
3447 conflict_row = Some(row);
3448 }
3449 }
3450 }
3451
3452 Ok(conflict_row)
3453 }
3454
3455 fn apply_on_duplicate_assignments(
3456 &self,
3457 ctx: &mut ExecutionContext<'_>,
3458 table: &Table,
3459 mut row: StoredRow,
3460 assignments: &[UpdateAssignment],
3461 ) -> Result<()> {
3462 if assignments.is_empty() {
3463 return Ok(());
3464 }
3465
3466 let evaluator = SelectExecutor::new(
3467 SelectStatement::single_table_scope(&table.name),
3468 SelectAccessPath::FullTableScan,
3469 );
3470 let sources = evaluator.resolve_sources(ctx)?;
3471 let mut subquery_cache = SubqueryCache::new();
3472 let original_values = row.values.clone();
3473
3474 for assignment in assignments {
3475 let column_index = table.get_column_index(&assignment.column).ok_or_else(|| {
3476 HematiteError::ParseError(format!(
3477 "Column '{}' does not exist in table '{}'",
3478 assignment.column, table.name
3479 ))
3480 })?;
3481 let column = &table.columns[column_index];
3482 let value = evaluator.evaluate_expression(
3483 ctx,
3484 &mut subquery_cache,
3485 &sources,
3486 &assignment.value,
3487 &row.values,
3488 )?;
3489 row.values[column_index] = coerce_column_value(column, value)?;
3490 }
3491
3492 table
3493 .validate_row(&row.values)
3494 .map_err(|err| HematiteError::ParseError(err.to_string()))?;
3495 validate_row_constraints(ctx, table, &row.values)?;
3496 if parent_reference_key_changed(ctx, table, &original_values, &row.values)? {
3497 apply_parent_update_foreign_key_actions(ctx, table, &original_values, &row.values)?;
3498 }
3499 ensure_stored_row_uniqueness(ctx, table, &row)?;
3500 remove_stored_row(ctx, &table.name, table, row.row_id)?;
3501 write_stored_row(ctx, &table.name, table, row, true)?;
3502 Ok(())
3503 }
3504
3505 fn build_row_with_metadata(
3506 &self,
3507 ctx: &ExecutionContext<'_>,
3508 table: &Table,
3509 value_row: &[Value],
3510 ) -> Result<Vec<Value>> {
3511 let mut row = Vec::with_capacity(table.columns.len());
3512 let next_row_id = ctx
3513 .engine
3514 .get_table_metadata()
3515 .get(&self.statement.table)
3516 .map(|metadata| metadata.next_row_id)
3517 .ok_or_else(|| {
3518 HematiteError::InternalError(format!(
3519 "Table metadata for '{}' disappeared during INSERT",
3520 self.statement.table
3521 ))
3522 })?;
3523
3524 for column in &table.columns {
3525 let value = if let Some(position) = self
3526 .statement
3527 .columns
3528 .iter()
3529 .position(|name| name == &column.name)
3530 {
3531 let expr = value_row.get(position).ok_or_else(|| {
3532 HematiteError::ParseError(format!("Missing value for column '{}'", column.name))
3533 })?;
3534 if column.auto_increment && expr.is_null() {
3535 auto_increment_value(column, next_row_id)?
3536 } else {
3537 coerce_column_value(column, expr.clone())?
3538 }
3539 } else if column.auto_increment {
3540 auto_increment_value(column, next_row_id)?
3541 } else if let Some(default_value) = &column.default_value {
3542 default_value.clone()
3543 } else if column.nullable {
3544 Value::Null
3545 } else {
3546 return Err(HematiteError::ParseError(format!(
3547 "Missing value for required column '{}'",
3548 column.name
3549 )));
3550 };
3551
3552 row.push(value);
3553 }
3554
3555 table
3556 .validate_row(&row)
3557 .map_err(|err| HematiteError::ParseError(err.to_string()))?;
3558
3559 Ok(row)
3560 }
3561
3562 fn evaluate_value_row(&self, row: &[Expression]) -> Result<Vec<Value>> {
3563 row.iter()
3564 .map(|expr| self.evaluate_value_expression(expr))
3565 .collect()
3566 }
3567}
3568
3569impl QueryExecutor for InsertExecutor {
3570 fn execute(&mut self, ctx: &mut ExecutionContext) -> Result<QueryResult> {
3571 validate_statement(&Statement::Insert(self.statement.clone()), &ctx.catalog)?;
3572
3573 let table = catalog_table(ctx, &self.statement.table)?;
3574
3575 let input_rows = match &self.statement.source {
3576 InsertSource::Values(rows) => rows
3577 .iter()
3578 .map(|row| self.evaluate_value_row(row))
3579 .collect::<Result<Vec<_>>>()?,
3580 InsertSource::Select(select) => {
3581 let planner = QueryPlanner::new(ctx.catalog.clone())
3582 .with_table_row_counts(current_table_row_counts(ctx.engine));
3583 let plan = planner.plan(Statement::Select((**select).clone()))?;
3584 match plan.program {
3585 ExecutionProgram::Select {
3586 statement,
3587 access_path,
3588 } => {
3589 SelectExecutor::new(statement, access_path)
3590 .execute(ctx)?
3591 .rows
3592 }
3593 _ => {
3594 return Err(HematiteError::InternalError(
3595 "Expected SELECT execution program for INSERT source".to_string(),
3596 ))
3597 }
3598 }
3599 }
3600 };
3601
3602 for value_row in &input_rows {
3603 let row_values = self.build_row_with_metadata(ctx, &table, value_row)?;
3604 if let Some(assignments) = &self.statement.on_duplicate {
3605 if let Some(conflicting_row) =
3606 self.find_conflicting_row(ctx, &table, &row_values)?
3607 {
3608 self.apply_on_duplicate_assignments(ctx, &table, conflicting_row, assignments)?;
3609 continue;
3610 }
3611 }
3612
3613 validate_row_constraints(ctx, &table, &row_values)?;
3614 self.ensure_primary_key_is_unique(ctx, &table, &[], &row_values)?;
3615 self.ensure_unique_secondary_indexes_are_unique(ctx, &table, &row_values)?;
3616 let inserted_row = StoredRow {
3617 row_id: 0,
3618 values: row_values,
3619 };
3620 write_stored_row(
3621 ctx,
3622 &self.statement.table,
3623 &table,
3624 inserted_row.clone(),
3625 false,
3626 )?;
3627 if let Some(new_row) = ctx.engine.lookup_row_by_primary_key(
3628 &table,
3629 &primary_key_values(&table, &inserted_row.values)?,
3630 )? {
3631 ctx.mutation_events.push(MutationEvent::Insert {
3632 table_name: self.statement.table.clone(),
3633 new_row,
3634 });
3635 }
3636 }
3637
3638 Ok(mutation_result(input_rows.len()))
3639 }
3640}
3641
3642#[derive(Debug, Clone)]
3643pub struct UpdateExecutor {
3644 pub statement: UpdateStatement,
3645 pub access_path: SelectAccessPath,
3646}
3647
3648impl UpdateExecutor {
3649 pub fn new(statement: UpdateStatement, access_path: SelectAccessPath) -> Self {
3650 Self {
3651 statement,
3652 access_path,
3653 }
3654 }
3655
3656 fn ensure_primary_keys_unique(&self, table: &Table, rows: &[Vec<Value>]) -> Result<()> {
3657 for i in 0..rows.len() {
3658 let left = primary_key_values(table, &rows[i])?;
3659 for right_row in rows.iter().skip(i + 1) {
3660 let right = primary_key_values(table, right_row)?;
3661 if left == right {
3662 return Err(duplicate_primary_key_parse_error(&table.name, &left));
3663 }
3664 }
3665 }
3666
3667 Ok(())
3668 }
3669
3670 fn ensure_updated_primary_keys_remain_unique(
3671 &self,
3672 ctx: &mut ExecutionContext<'_>,
3673 table: &Table,
3674 updated_rows: &[StoredRow],
3675 ) -> Result<()> {
3676 self.ensure_primary_keys_unique(
3677 table,
3678 &updated_rows
3679 .iter()
3680 .map(|row| row.values.clone())
3681 .collect::<Vec<_>>(),
3682 )?;
3683
3684 for row in updated_rows {
3685 let candidate_pk = primary_key_values(table, &row.values)?;
3686 if let Some(existing_rowid) =
3687 ctx.engine.lookup_primary_key_rowid(table, &candidate_pk)?
3688 {
3689 if existing_rowid != row.row_id
3690 && !updated_rows
3691 .iter()
3692 .any(|updated_row| updated_row.row_id == existing_rowid)
3693 {
3694 return Err(duplicate_primary_key_parse_error(
3695 &table.name,
3696 &candidate_pk,
3697 ));
3698 }
3699 }
3700 }
3701
3702 Ok(())
3703 }
3704
3705 fn ensure_updated_unique_indexes_remain_unique(
3706 &self,
3707 ctx: &mut ExecutionContext<'_>,
3708 table: &Table,
3709 updated_rows: &[StoredRow],
3710 ) -> Result<()> {
3711 let mut encoded_keys = std::collections::HashSet::new();
3712
3713 for index in table.secondary_indexes.iter().filter(|index| index.unique) {
3714 encoded_keys.clear();
3715 for row in updated_rows {
3716 let key_values = secondary_index_key_values(index, &row.values);
3717 let encoded_key = ctx.engine.encode_secondary_index_key(&key_values)?;
3718 if !encoded_keys.insert(encoded_key) {
3719 return Err(unique_index_parse_error(&index.name, &table.name));
3720 }
3721 }
3722
3723 for row in updated_rows {
3724 let key_values = secondary_index_key_values(index, &row.values);
3725 let existing_rowids =
3726 ctx.engine
3727 .lookup_secondary_index_rowids(table, &index.name, &key_values)?;
3728 if existing_rowids.into_iter().any(|existing_rowid| {
3729 existing_rowid != row.row_id
3730 && !updated_rows
3731 .iter()
3732 .any(|updated_row| updated_row.row_id == existing_rowid)
3733 }) {
3734 return Err(unique_index_parse_error(&index.name, &table.name));
3735 }
3736 }
3737 }
3738
3739 Ok(())
3740 }
3741}
3742
3743impl QueryExecutor for UpdateExecutor {
3744 fn execute(&mut self, ctx: &mut ExecutionContext<'_>) -> Result<QueryResult> {
3745 validate_statement(&Statement::Update(self.statement.clone()), &ctx.catalog)?;
3746
3747 let table = catalog_table(ctx, &self.statement.table)?;
3748 let LocatedMutationRows {
3749 sources,
3750 stored_rows: original_rows,
3751 joined_rows,
3752 } = locate_mutation_rows(
3753 ctx,
3754 &table,
3755 &self.statement.table,
3756 self.statement.target_binding_name(),
3757 self.statement.source.as_ref(),
3758 self.statement.where_clause.clone(),
3759 &self.access_path,
3760 )?;
3761 let locator_statement =
3762 locator_select_statement(self.statement.source(), self.statement.where_clause.clone());
3763 let select_executor = SelectExecutor::new(locator_statement, self.access_path.clone());
3764 let original_rows_snapshot = original_rows.clone();
3765 let mut updated_rows_data = Vec::with_capacity(original_rows.len());
3766 let mut updated_rows = 0usize;
3767 let mut subquery_cache = SubqueryCache::new();
3768 let row_contexts = joined_rows.as_deref();
3769
3770 for (index, stored_row) in original_rows.into_iter().enumerate() {
3771 let mut updated_row = stored_row.values.clone();
3772 for assignment in &self.statement.assignments {
3773 let column_index = table.get_column_index(&assignment.column).ok_or_else(|| {
3774 HematiteError::ParseError(format!(
3775 "Column '{}' does not exist in table '{}'",
3776 assignment.column, self.statement.table
3777 ))
3778 })?;
3779 let column = &table.columns[column_index];
3780 let value = {
3781 let evaluation_row = row_contexts
3782 .and_then(|rows| rows.get(index).map(Vec::as_slice))
3783 .unwrap_or(updated_row.as_slice());
3784 select_executor.evaluate_expression(
3785 ctx,
3786 &mut subquery_cache,
3787 &sources,
3788 &assignment.value,
3789 evaluation_row,
3790 )?
3791 };
3792 updated_row[column_index] = coerce_column_value(column, value)?;
3793 }
3794
3795 table
3796 .validate_row(&updated_row)
3797 .map_err(|err| HematiteError::ParseError(err.to_string()))?;
3798 validate_row_constraints(ctx, &table, &updated_row)?;
3799 if parent_reference_key_changed(ctx, &table, &stored_row.values, &updated_row)? {
3800 apply_parent_update_foreign_key_actions(
3801 ctx,
3802 &table,
3803 &stored_row.values,
3804 &updated_row,
3805 )?;
3806 }
3807 updated_rows_data.push(StoredRow {
3808 row_id: stored_row.row_id,
3809 values: updated_row,
3810 });
3811 updated_rows += 1;
3812 }
3813
3814 self.ensure_updated_primary_keys_remain_unique(ctx, &table, &updated_rows_data)?;
3815 self.ensure_updated_unique_indexes_remain_unique(ctx, &table, &updated_rows_data)?;
3816
3817 for original_row in &updated_rows_data {
3818 remove_stored_row(ctx, &self.statement.table, &table, original_row.row_id)?;
3819 }
3820
3821 for row in &updated_rows_data {
3822 write_stored_row(ctx, &self.statement.table, &table, row.clone(), true)?;
3823 }
3824
3825 for (old_row, new_row) in original_rows_snapshot
3826 .into_iter()
3827 .zip(updated_rows_data.into_iter())
3828 {
3829 ctx.mutation_events.push(MutationEvent::Update {
3830 table_name: self.statement.table.clone(),
3831 old_row,
3832 new_row,
3833 });
3834 }
3835
3836 Ok(mutation_result(updated_rows))
3837 }
3838}
3839
3840#[derive(Debug, Clone)]
3841pub struct DeleteExecutor {
3842 pub statement: DeleteStatement,
3843 pub access_path: SelectAccessPath,
3844}
3845
3846impl DeleteExecutor {
3847 pub fn new(statement: DeleteStatement, access_path: SelectAccessPath) -> Self {
3848 Self {
3849 statement,
3850 access_path,
3851 }
3852 }
3853}
3854
3855struct LocatedMutationRows {
3856 sources: Vec<ResolvedSource>,
3857 stored_rows: Vec<StoredRow>,
3858 joined_rows: Option<Vec<Vec<Value>>>,
3859}
3860
3861fn uses_join_mutation_source(source: Option<&TableReference>) -> bool {
3862 matches!(source, Some(source) if !matches!(source, TableReference::Table(_, _)))
3863}
3864
3865fn locate_rowids_for_access_path(
3866 ctx: &mut ExecutionContext<'_>,
3867 table: &Table,
3868 table_name: &str,
3869 access_path: &SelectAccessPath,
3870 select_executor: &SelectExecutor,
3871) -> Result<Vec<u64>> {
3872 match access_path {
3873 SelectAccessPath::JoinScan => Err(HematiteError::ParseError(
3874 "Join scans are not valid for UPDATE or DELETE locators".to_string(),
3875 )),
3876 SelectAccessPath::RowIdLookup => {
3877 Ok(select_executor.extract_rowid_lookup().into_iter().collect())
3878 }
3879 SelectAccessPath::PrimaryKeyLookup => {
3880 let Some(primary_key_values) = select_executor.extract_primary_key_lookup(table) else {
3881 return Ok(Vec::new());
3882 };
3883 let encoded_key = ctx.engine.encode_primary_key(&primary_key_values)?;
3884 let mut index_cursor = ctx.engine.open_primary_key_cursor(table)?;
3885 Ok(index_cursor
3886 .seek_key(&encoded_key)
3887 .then(|| index_cursor.current().map(|entry| entry.row_id))
3888 .flatten()
3889 .into_iter()
3890 .collect())
3891 }
3892 SelectAccessPath::SecondaryIndexLookup(index_name) => {
3893 let Some(key_values) =
3894 select_executor.extract_secondary_index_lookup(table, index_name)
3895 else {
3896 return Ok(Vec::new());
3897 };
3898 let encoded_key = ctx.engine.encode_secondary_index_key(&key_values)?;
3899 let mut index_cursor = ctx.engine.open_secondary_index_cursor(table, index_name)?;
3900 let mut rowids = Vec::new();
3901
3902 if index_cursor.seek_key(&encoded_key) {
3903 loop {
3904 let Some(entry) = index_cursor.current() else {
3905 break;
3906 };
3907 if entry.key.as_slice() != encoded_key.as_slice() {
3908 break;
3909 }
3910 rowids.push(entry.row_id);
3911 if !index_cursor.next() {
3912 break;
3913 }
3914 }
3915 }
3916
3917 Ok(rowids)
3918 }
3919 SelectAccessPath::FullTableScan => {
3920 let mut table_cursor = ctx.engine.open_table_cursor(table_name)?;
3921 let mut rowids = Vec::new();
3922 if table_cursor.first() {
3923 loop {
3924 if let Some(row) = table_cursor.current() {
3925 rowids.push(row.row_id);
3926 }
3927 if !table_cursor.next() {
3928 break;
3929 }
3930 }
3931 }
3932 Ok(rowids)
3933 }
3934 }
3935}
3936
3937fn locate_rows_for_access_path(
3938 ctx: &mut ExecutionContext<'_>,
3939 table: &Table,
3940 table_name: &str,
3941 access_path: &SelectAccessPath,
3942 select_executor: &SelectExecutor,
3943) -> Result<Vec<StoredRow>> {
3944 let rowids =
3945 locate_rowids_for_access_path(ctx, table, table_name, access_path, select_executor)?;
3946 let mut table_cursor = ctx.engine.open_table_cursor(table_name)?;
3947 let mut rows = Vec::new();
3948 let mut subquery_cache = SubqueryCache::new();
3949 let sources = select_executor.resolve_sources(ctx)?;
3950
3951 for rowid in rowids {
3952 if table_cursor.seek_rowid(rowid) {
3953 if let Some(row) = table_cursor.current() {
3954 let row = row.clone();
3955 let include = match &select_executor.statement.where_clause {
3956 Some(where_clause) => select_executor.conditions_match(
3957 ctx,
3958 &mut subquery_cache,
3959 &sources,
3960 &where_clause.conditions,
3961 &row.values,
3962 )?,
3963 None => true,
3964 };
3965
3966 if include {
3967 rows.push(row);
3968 }
3969 }
3970 }
3971 }
3972
3973 Ok(rows)
3974}
3975
3976fn locate_rows_for_join_source(
3977 ctx: &mut ExecutionContext<'_>,
3978 table: &Table,
3979 target_binding: &str,
3980 select_executor: &mut SelectExecutor,
3981) -> Result<(Vec<ResolvedSource>, Vec<(StoredRow, Vec<Value>)>)> {
3982 let (sources, joined_rows) = select_executor.materialize_filtered_rows(ctx)?;
3983 let target_source = sources
3984 .iter()
3985 .find(|source| {
3986 source.name.eq_ignore_ascii_case(&table.name)
3987 && source
3988 .alias
3989 .as_deref()
3990 .unwrap_or(&source.name)
3991 .eq_ignore_ascii_case(target_binding)
3992 })
3993 .ok_or_else(|| {
3994 HematiteError::ParseError(format!(
3995 "Mutation target '{}' does not resolve to table '{}'",
3996 target_binding, table.name
3997 ))
3998 })?;
3999 let mut seen_rowids = std::collections::HashSet::new();
4000 let mut rows = Vec::new();
4001
4002 for joined_row in joined_rows {
4003 let Some(candidate_rowid) =
4004 target_rowid_from_join_row(ctx, table, target_source, &joined_row)?
4005 else {
4006 continue;
4007 };
4008
4009 if !seen_rowids.insert(candidate_rowid) {
4010 continue;
4011 }
4012
4013 if let Some(stored_row) = ctx
4014 .engine
4015 .lookup_row_by_rowid(&table.name, candidate_rowid)?
4016 {
4017 rows.push((stored_row, joined_row));
4018 }
4019 }
4020
4021 Ok((sources, rows))
4022}
4023
4024fn locate_mutation_rows(
4025 ctx: &mut ExecutionContext<'_>,
4026 table: &Table,
4027 table_name: &str,
4028 target_binding: &str,
4029 source: Option<&TableReference>,
4030 where_clause: Option<WhereClause>,
4031 access_path: &SelectAccessPath,
4032) -> Result<LocatedMutationRows> {
4033 let locator_statement = locator_select_statement(
4034 source
4035 .cloned()
4036 .unwrap_or_else(|| TableReference::Table(table_name.to_string(), None)),
4037 where_clause,
4038 );
4039 let mut select_executor = SelectExecutor::new(locator_statement, access_path.clone());
4040
4041 if uses_join_mutation_source(source) {
4042 let (sources, rows) =
4043 locate_rows_for_join_source(ctx, table, target_binding, &mut select_executor)?;
4044 let (stored_rows, joined_rows): (Vec<_>, Vec<_>) = rows.into_iter().unzip();
4045 Ok(LocatedMutationRows {
4046 sources,
4047 stored_rows,
4048 joined_rows: Some(joined_rows),
4049 })
4050 } else {
4051 let stored_rows =
4052 locate_rows_for_access_path(ctx, table, table_name, access_path, &select_executor)?;
4053 Ok(LocatedMutationRows {
4054 sources: select_executor.resolve_sources(ctx)?,
4055 stored_rows,
4056 joined_rows: None,
4057 })
4058 }
4059}
4060
4061fn target_rowid_from_join_row(
4062 ctx: &mut ExecutionContext<'_>,
4063 table: &Table,
4064 target_source: &ResolvedSource,
4065 joined_row: &[Value],
4066) -> Result<Option<u64>> {
4067 let mut primary_key = Vec::with_capacity(table.primary_key_columns.len());
4068 for &column_index in &table.primary_key_columns {
4069 let value = joined_row
4070 .get(target_source.offset + column_index)
4071 .cloned()
4072 .unwrap_or(Value::Null);
4073 if value.is_null() {
4074 return Ok(None);
4075 }
4076 primary_key.push(value);
4077 }
4078
4079 ctx.engine.lookup_primary_key_rowid(table, &primary_key)
4080}
4081
4082impl QueryExecutor for DeleteExecutor {
4083 fn execute(&mut self, ctx: &mut ExecutionContext) -> Result<QueryResult> {
4084 validate_statement(&Statement::Delete(self.statement.clone()), &ctx.catalog)?;
4085
4086 let table = catalog_table(ctx, &self.statement.table)?;
4087 let rows_to_delete = locate_mutation_rows(
4088 ctx,
4089 &table,
4090 &self.statement.table,
4091 self.statement.target_binding_name(),
4092 self.statement.source.as_ref(),
4093 self.statement.where_clause.clone(),
4094 &self.access_path,
4095 )?
4096 .stored_rows;
4097
4098 for row in &rows_to_delete {
4099 apply_parent_delete_foreign_key_actions(ctx, &table, &row.values)?;
4100 ctx.mutation_events.push(MutationEvent::Delete {
4101 table_name: self.statement.table.clone(),
4102 old_row: row.clone(),
4103 });
4104 remove_stored_row(ctx, &self.statement.table, &table, row.row_id)?;
4105 }
4106
4107 Ok(mutation_result(rows_to_delete.len()))
4108 }
4109}
4110
4111#[derive(Debug, Clone)]
4112pub struct CreateExecutor {
4113 pub statement: CreateStatement,
4114}
4115
4116impl CreateExecutor {
4117 pub fn new(statement: CreateStatement) -> Self {
4118 Self { statement }
4119 }
4120
4121 fn convert_column_definitions(&self) -> Result<Vec<Column>> {
4122 let mut columns = Vec::new();
4123 let mut next_id = 1;
4124
4125 for col_def in &self.statement.columns {
4126 let mut column = Column::new(
4127 crate::catalog::ColumnId::new(next_id),
4128 col_def.name.clone(),
4129 lower_type_name(col_def.data_type.clone()),
4130 )
4131 .character_set(col_def.character_set.clone())
4132 .collation(col_def.collation.clone())
4133 .nullable(col_def.nullable)
4134 .primary_key(col_def.primary_key)
4135 .auto_increment(col_def.auto_increment);
4136
4137 if let Some(default_val) = &col_def.default_value {
4138 let coerced_default =
4139 coerce_column_value(&column, lower_literal_value(default_val))?;
4140 column = column.default_value(coerced_default);
4141 }
4142
4143 columns.push(column);
4144 next_id += 1;
4145 }
4146
4147 Ok(columns)
4148 }
4149
4150 fn unique_index_specs(&self) -> Result<Vec<(String, Vec<usize>)>> {
4151 let mut unique_indexes = self
4152 .statement
4153 .columns
4154 .iter()
4155 .enumerate()
4156 .filter_map(|(index, column)| {
4157 if column.unique && !column.primary_key {
4158 Some((
4159 auto_unique_index_name(&self.statement.table, &column.name, index),
4160 vec![index],
4161 ))
4162 } else {
4163 None
4164 }
4165 })
4166 .collect::<Vec<_>>();
4167
4168 for (position, unique) in
4169 self.statement
4170 .constraints
4171 .iter()
4172 .enumerate()
4173 .filter_map(|(position, constraint)| match constraint {
4174 TableConstraint::Unique(unique) => Some((position, unique)),
4175 TableConstraint::Check(_) | TableConstraint::ForeignKey(_) => None,
4176 })
4177 {
4178 let column_indices = unique
4179 .columns
4180 .iter()
4181 .map(|column_name| {
4182 self.statement
4183 .columns
4184 .iter()
4185 .position(|column| column.name == *column_name)
4186 .ok_or_else(|| {
4187 HematiteError::ParseError(format!(
4188 "UNIQUE constraint column '{}' does not exist in table '{}'",
4189 column_name, self.statement.table
4190 ))
4191 })
4192 })
4193 .collect::<Result<Vec<_>>>()?;
4194 unique_indexes.push((
4195 unique_constraint_index_name(&self.statement.table, unique, position),
4196 column_indices,
4197 ));
4198 }
4199
4200 Ok(unique_indexes)
4201 }
4202
4203 fn constraints(&self, table: &Table) -> Result<CreateConstraints> {
4204 let check_constraints =
4205 self.statement
4206 .columns
4207 .iter()
4208 .filter_map(|column| column.check_constraint.as_ref())
4209 .map(Self::clone_check_constraint)
4210 .chain(self.statement.constraints.iter().filter_map(
4211 |constraint| match constraint {
4212 TableConstraint::Check(constraint) => {
4213 Some(Self::clone_check_constraint(constraint))
4214 }
4215 TableConstraint::Unique(_) | TableConstraint::ForeignKey(_) => None,
4216 },
4217 ))
4218 .collect();
4219
4220 let foreign_keys =
4221 self.statement
4222 .columns
4223 .iter()
4224 .filter_map(|column| column.references.as_ref())
4225 .chain(self.statement.constraints.iter().filter_map(
4226 |constraint| match constraint {
4227 TableConstraint::Check(_) | TableConstraint::Unique(_) => None,
4228 TableConstraint::ForeignKey(foreign_key) => Some(foreign_key),
4229 },
4230 ))
4231 .map(|foreign_key| self.convert_foreign_key(table, foreign_key))
4232 .collect::<Result<Vec<_>>>()?;
4233
4234 Ok(CreateConstraints {
4235 check_constraints,
4236 foreign_keys,
4237 })
4238 }
4239
4240 fn clone_check_constraint(constraint: &CheckConstraintDefinition) -> CheckConstraint {
4241 CheckConstraint {
4242 name: constraint.name.clone(),
4243 expression_sql: constraint.expression_sql.clone(),
4244 }
4245 }
4246
4247 fn convert_foreign_key(
4248 &self,
4249 table: &Table,
4250 foreign_key: &ForeignKeyDefinition,
4251 ) -> Result<ForeignKeyConstraint> {
4252 let column_indices = foreign_key
4253 .columns
4254 .iter()
4255 .map(|column_name| {
4256 table.get_column_index(column_name).ok_or_else(|| {
4257 HematiteError::ParseError(format!(
4258 "Foreign key column '{}' does not exist in table '{}'",
4259 column_name, table.name
4260 ))
4261 })
4262 })
4263 .collect::<Result<Vec<_>>>()?;
4264 Ok(ForeignKeyConstraint {
4265 name: foreign_key.name.clone(),
4266 column_indices,
4267 referenced_table: foreign_key.referenced_table.clone(),
4268 referenced_columns: foreign_key.referenced_columns.clone(),
4269 on_delete: convert_foreign_key_action(foreign_key.on_delete),
4270 on_update: convert_foreign_key_action(foreign_key.on_update),
4271 })
4272 }
4273}
4274
4275struct CreateConstraints {
4276 check_constraints: Vec<CheckConstraint>,
4277 foreign_keys: Vec<ForeignKeyConstraint>,
4278}
4279
4280impl QueryExecutor for CreateExecutor {
4281 fn execute(&mut self, ctx: &mut ExecutionContext) -> Result<QueryResult> {
4282 validate_statement(&Statement::Create(self.statement.clone()), &ctx.catalog)?;
4283 if self.statement.if_not_exists
4284 && ctx
4285 .catalog
4286 .get_table_by_name(&self.statement.table)
4287 .is_some()
4288 {
4289 return Ok(mutation_result(0));
4290 }
4291
4292 let columns = self.convert_column_definitions()?;
4293
4294 let root_page_id = ctx.engine.create_table(&self.statement.table)?;
4296 let primary_key_root_page_id = ctx.engine.create_empty_btree()?;
4297 ctx.catalog.create_table_with_roots(
4298 self.statement.table.clone(),
4299 columns,
4300 root_page_id,
4301 primary_key_root_page_id,
4302 )?;
4303
4304 let table = ctx
4305 .catalog
4306 .get_table_by_name(&self.statement.table)
4307 .ok_or_else(|| table_disappeared_internal_error(&self.statement.table, "CREATE TABLE"))?
4308 .clone();
4309 let constraints = self.constraints(&table)?;
4310 for (index_name, column_indices) in self.unique_index_specs()? {
4311 let unique_index_root_page_id = ctx.engine.create_empty_btree()?;
4312 ctx.catalog.add_secondary_index(
4313 table.id,
4314 crate::catalog::SecondaryIndex {
4315 name: index_name,
4316 column_indices,
4317 root_page_id: unique_index_root_page_id,
4318 unique: true,
4319 },
4320 )?;
4321 }
4322 for constraint in constraints.check_constraints {
4323 ctx.catalog.add_check_constraint(table.id, constraint)?;
4324 }
4325 for foreign_key in constraints.foreign_keys {
4326 ctx.catalog.add_foreign_key(table.id, foreign_key)?;
4327 }
4328
4329 Ok(QueryResult {
4330 affected_rows: 0,
4331 columns: Vec::new(),
4332 rows: Vec::new(),
4333 })
4334 }
4335}
4336
4337#[derive(Debug, Clone)]
4338pub struct DropExecutor {
4339 pub statement: DropStatement,
4340}
4341
4342impl DropExecutor {
4343 pub fn new(statement: DropStatement) -> Self {
4344 Self { statement }
4345 }
4346}
4347
4348impl QueryExecutor for DropExecutor {
4349 fn execute(&mut self, ctx: &mut ExecutionContext<'_>) -> Result<QueryResult> {
4350 validate_statement(&Statement::Drop(self.statement.clone()), &ctx.catalog)?;
4351 if self.statement.if_exists
4352 && ctx
4353 .catalog
4354 .get_table_by_name(&self.statement.table)
4355 .is_none()
4356 {
4357 return Ok(mutation_result(0));
4358 }
4359
4360 let table = catalog_table(ctx, &self.statement.table)?;
4361
4362 ctx.engine.drop_table_with_indexes(&table)?;
4363 ctx.catalog
4364 .drop_table(table.id)
4365 .map_err(|err| HematiteError::ParseError(err.to_string()))?;
4366
4367 Ok(QueryResult {
4368 affected_rows: 0,
4369 columns: Vec::new(),
4370 rows: Vec::new(),
4371 })
4372 }
4373}
4374
4375#[derive(Debug, Clone)]
4376pub struct AlterExecutor {
4377 pub statement: AlterStatement,
4378}
4379
4380impl AlterExecutor {
4381 pub fn new(statement: AlterStatement) -> Self {
4382 Self { statement }
4383 }
4384}
4385
4386impl QueryExecutor for AlterExecutor {
4387 fn execute(&mut self, ctx: &mut ExecutionContext<'_>) -> Result<QueryResult> {
4388 validate_statement(&Statement::Alter(self.statement.clone()), &ctx.catalog)?;
4389
4390 match &self.statement.operation {
4391 AlterOperation::RenameTo(new_name) => {
4392 let table = catalog_table(ctx, &self.statement.table)?;
4393 ctx.catalog.rename_table(table.id, new_name.clone())?;
4394 ctx.engine
4395 .rename_table_runtime_metadata(&self.statement.table, new_name)?;
4396 }
4397 AlterOperation::RenameColumn { old_name, new_name } => {
4398 let table = catalog_table(ctx, &self.statement.table)?;
4399 ctx.catalog
4400 .rename_column(table.id, old_name, new_name.clone())?;
4401 }
4402 AlterOperation::AddColumn(column_def) => {
4403 let table = catalog_table(ctx, &self.statement.table)?;
4404
4405 let column = Column::new(
4406 crate::catalog::ColumnId::new(ctx.catalog.next_column_id()),
4407 column_def.name.clone(),
4408 lower_type_name(column_def.data_type.clone()),
4409 )
4410 .character_set(column_def.character_set.clone())
4411 .collation(column_def.collation.clone())
4412 .nullable(column_def.nullable)
4413 .primary_key(column_def.primary_key);
4414 let column = if let Some(default_value) = &column_def.default_value {
4415 let coerced_default =
4416 coerce_column_value(&column, lower_literal_value(default_value))?;
4417 column.default_value(coerced_default)
4418 } else {
4419 column
4420 };
4421
4422 let fill_value = column.get_default_or_null();
4423 let mut rows = ctx.engine.read_rows_with_ids(&self.statement.table)?;
4424 for row in &mut rows {
4425 row.values.push(fill_value.clone());
4426 }
4427
4428 ctx.catalog.add_column(table.id, column)?;
4429 ctx.engine.replace_table_rows(&self.statement.table, rows)?;
4430 }
4431 AlterOperation::AddConstraint(constraint) => {
4432 let table = catalog_table(ctx, &self.statement.table)?;
4433 match constraint {
4434 TableConstraint::Check(check) => {
4435 ctx.catalog.add_check_constraint(
4436 table.id,
4437 CheckConstraint {
4438 name: check.name.clone(),
4439 expression_sql: check.expression_sql.clone(),
4440 },
4441 )?;
4442 }
4443 TableConstraint::Unique(unique) => {
4444 let root_page_id = ctx.engine.create_empty_btree()?;
4445 let column_indices = unique
4446 .columns
4447 .iter()
4448 .map(|column_name| {
4449 table.get_column_index(column_name).ok_or_else(|| {
4450 HematiteError::ParseError(format!(
4451 "UNIQUE constraint column '{}' does not exist in table '{}'",
4452 column_name, self.statement.table
4453 ))
4454 })
4455 })
4456 .collect::<Result<Vec<_>>>()?;
4457 ctx.catalog.add_secondary_index(
4458 table.id,
4459 crate::catalog::SecondaryIndex {
4460 name: unique.name.clone().ok_or_else(|| {
4461 HematiteError::InternalError(
4462 "validated UNIQUE constraint lost its name".to_string(),
4463 )
4464 })?,
4465 column_indices,
4466 root_page_id,
4467 unique: true,
4468 },
4469 )?;
4470 let updated_table = ctx
4471 .catalog
4472 .get_table(table.id)
4473 .ok_or_else(|| {
4474 table_disappeared_internal_error(
4475 &self.statement.table,
4476 "adding unique constraint",
4477 )
4478 })?
4479 .clone();
4480 let rows = ctx.engine.read_rows_with_ids(&self.statement.table)?;
4481 ctx.engine
4482 .rebuild_secondary_indexes(&updated_table, &rows)?;
4483 }
4484 TableConstraint::ForeignKey(foreign_key) => {
4485 let column_indices = foreign_key
4486 .columns
4487 .iter()
4488 .map(|column_name| {
4489 table.get_column_index(column_name).ok_or_else(|| {
4490 HematiteError::ParseError(format!(
4491 "Foreign key column '{}' does not exist in table '{}'",
4492 column_name, self.statement.table
4493 ))
4494 })
4495 })
4496 .collect::<Result<Vec<_>>>()?;
4497 ctx.catalog.add_foreign_key(
4498 table.id,
4499 ForeignKeyConstraint {
4500 name: foreign_key.name.clone(),
4501 column_indices,
4502 referenced_table: foreign_key.referenced_table.clone(),
4503 referenced_columns: foreign_key.referenced_columns.clone(),
4504 on_delete: convert_foreign_key_action(foreign_key.on_delete),
4505 on_update: convert_foreign_key_action(foreign_key.on_update),
4506 },
4507 )?;
4508 }
4509 }
4510 }
4511 AlterOperation::DropColumn(column_name) => {
4512 let table = catalog_table(ctx, &self.statement.table)?;
4513 let column_index = table.get_column_index(column_name).ok_or_else(|| {
4514 HematiteError::InternalError(format!(
4515 "Column '{}' disappeared during ALTER TABLE DROP COLUMN",
4516 column_name
4517 ))
4518 })?;
4519 let mut rows = ctx.engine.read_rows_with_ids(&self.statement.table)?;
4520 for row in &mut rows {
4521 row.values.remove(column_index);
4522 }
4523
4524 ctx.catalog.drop_column(table.id, column_name)?;
4525 ctx.engine.replace_table_rows(&self.statement.table, rows)?;
4526 }
4527 AlterOperation::DropConstraint(constraint_name) => {
4528 let table = catalog_table(ctx, &self.statement.table)?;
4529 if let Some(index) = table.get_secondary_index(constraint_name) {
4530 if index.unique {
4531 ctx.engine.delete_tree(index.root_page_id)?;
4532 ctx.catalog
4533 .drop_secondary_index(table.id, constraint_name)?;
4534 } else {
4535 return Err(HematiteError::ParseError(format!(
4536 "Constraint '{}' is not a droppable UNIQUE constraint",
4537 constraint_name
4538 )));
4539 }
4540 } else {
4541 ctx.catalog
4542 .drop_named_constraint(table.id, constraint_name)?;
4543 }
4544 }
4545 AlterOperation::AlterColumnSetDefault {
4546 column_name,
4547 default_value,
4548 } => {
4549 let table = catalog_table(ctx, &self.statement.table)?;
4550 let column = table.get_column_by_name(column_name).ok_or_else(|| {
4551 HematiteError::ParseError(format!(
4552 "Column '{}' does not exist in table '{}'",
4553 column_name, self.statement.table
4554 ))
4555 })?;
4556 ctx.catalog.set_column_default(
4557 table.id,
4558 column_name,
4559 Some(coerce_column_value(
4560 column,
4561 lower_literal_value(default_value),
4562 )?),
4563 )?;
4564 }
4565 AlterOperation::AlterColumnDropDefault { column_name } => {
4566 let table = catalog_table(ctx, &self.statement.table)?;
4567 ctx.catalog
4568 .set_column_default(table.id, column_name, None)?;
4569 }
4570 AlterOperation::AlterColumnSetNotNull { column_name } => {
4571 let table = catalog_table(ctx, &self.statement.table)?;
4572 let column_index = table.get_column_index(column_name).ok_or_else(|| {
4573 HematiteError::InternalError(format!(
4574 "Column '{}' disappeared during ALTER COLUMN SET NOT NULL",
4575 column_name
4576 ))
4577 })?;
4578 let rows = ctx.engine.read_from_table(&self.statement.table)?;
4579 if rows
4580 .iter()
4581 .any(|row| row.get(column_index).is_some_and(Value::is_null))
4582 {
4583 return Err(HematiteError::ParseError(format!(
4584 "Cannot set column '{}' to NOT NULL because existing rows contain NULL",
4585 column_name
4586 )));
4587 }
4588 ctx.catalog
4589 .set_column_nullable(table.id, column_name, false)?;
4590 }
4591 AlterOperation::AlterColumnDropNotNull { column_name } => {
4592 let table = catalog_table(ctx, &self.statement.table)?;
4593 ctx.catalog
4594 .set_column_nullable(table.id, column_name, true)?;
4595 }
4596 }
4597
4598 Ok(QueryResult {
4599 affected_rows: 0,
4600 columns: Vec::new(),
4601 rows: Vec::new(),
4602 })
4603 }
4604}
4605
4606#[derive(Debug, Clone)]
4607pub struct CreateIndexExecutor {
4608 pub statement: CreateIndexStatement,
4609}
4610
4611impl CreateIndexExecutor {
4612 pub fn new(statement: CreateIndexStatement) -> Self {
4613 Self { statement }
4614 }
4615}
4616
4617impl QueryExecutor for CreateIndexExecutor {
4618 fn execute(&mut self, ctx: &mut ExecutionContext<'_>) -> Result<QueryResult> {
4619 validate_statement(
4620 &Statement::CreateIndex(self.statement.clone()),
4621 &ctx.catalog,
4622 )?;
4623 if self.statement.if_not_exists {
4624 if let Some(table) = ctx.catalog.get_table_by_name(&self.statement.table) {
4625 if table
4626 .get_secondary_index(&self.statement.index_name)
4627 .is_some()
4628 {
4629 return Ok(mutation_result(0));
4630 }
4631 }
4632 }
4633
4634 let table = catalog_table(ctx, &self.statement.table)?;
4635
4636 let column_indices = self
4637 .statement
4638 .columns
4639 .iter()
4640 .map(|column_name| {
4641 table.get_column_index(column_name).ok_or_else(|| {
4642 HematiteError::ParseError(format!(
4643 "Column '{}' does not exist in table '{}'",
4644 column_name, self.statement.table
4645 ))
4646 })
4647 })
4648 .collect::<Result<Vec<_>>>()?;
4649
4650 let root_page_id = ctx.engine.create_empty_btree()?;
4651 ctx.catalog.add_secondary_index(
4652 table.id,
4653 crate::catalog::SecondaryIndex {
4654 name: self.statement.index_name.clone(),
4655 column_indices,
4656 root_page_id,
4657 unique: self.statement.unique,
4658 },
4659 )?;
4660
4661 let updated_table = ctx
4662 .catalog
4663 .get_table(table.id)
4664 .ok_or_else(|| {
4665 table_disappeared_internal_error(
4666 &self.statement.table,
4667 &format!("creating index '{}'", self.statement.index_name),
4668 )
4669 })?
4670 .clone();
4671 let rows = ctx.engine.read_rows_with_ids(&self.statement.table)?;
4672 ctx.engine
4673 .rebuild_secondary_indexes(&updated_table, &rows)?;
4674
4675 Ok(QueryResult {
4676 affected_rows: 0,
4677 columns: Vec::new(),
4678 rows: Vec::new(),
4679 })
4680 }
4681}
4682
4683fn secondary_index_key_values(
4684 index: &crate::catalog::SecondaryIndex,
4685 row_values: &[Value],
4686) -> Vec<Value> {
4687 index
4688 .column_indices
4689 .iter()
4690 .map(|&column_index| row_values[column_index].clone())
4691 .collect()
4692}
4693
4694fn mutation_result(affected_rows: usize) -> QueryResult {
4695 QueryResult {
4696 affected_rows,
4697 columns: Vec::new(),
4698 rows: Vec::new(),
4699 }
4700}
4701
4702fn duplicate_primary_key_parse_error(table_name: &str, key_values: &[Value]) -> HematiteError {
4703 HematiteError::ParseError(format!(
4704 "Duplicate primary key for table '{}': {:?}",
4705 table_name, key_values
4706 ))
4707}
4708
4709fn table_not_found_parse_error(table_name: &str) -> HematiteError {
4710 HematiteError::ParseError(format!("Table '{}' not found", table_name))
4711}
4712
4713fn table_disappeared_internal_error(table_name: &str, operation: &str) -> HematiteError {
4714 HematiteError::InternalError(format!(
4715 "Table '{}' disappeared during {}",
4716 table_name, operation
4717 ))
4718}
4719
4720fn catalog_table(ctx: &ExecutionContext<'_>, table_name: &str) -> Result<Table> {
4721 ctx.catalog
4722 .get_table_by_name(table_name)
4723 .cloned()
4724 .ok_or_else(|| table_not_found_parse_error(table_name))
4725}
4726
4727fn current_table_row_counts(engine: &crate::catalog::CatalogEngine) -> HashMap<String, usize> {
4728 engine
4729 .get_table_metadata()
4730 .iter()
4731 .map(|(name, metadata)| (name.clone(), metadata.row_count as usize))
4732 .collect()
4733}
4734
4735fn apply_set_operation(
4736 operator: SetOperator,
4737 mut left_rows: Vec<Vec<Value>>,
4738 right_rows: Vec<Vec<Value>>,
4739) -> Vec<Vec<Value>> {
4740 match operator {
4741 SetOperator::UnionAll => {
4742 left_rows.extend(right_rows);
4743 left_rows
4744 }
4745 SetOperator::Union => {
4746 left_rows.extend(right_rows);
4747 apply_distinct_if_needed(true, &mut left_rows);
4748 left_rows
4749 }
4750 SetOperator::Intersect => {
4751 apply_distinct_if_needed(true, &mut left_rows);
4752 let mut distinct_right = right_rows;
4753 apply_distinct_if_needed(true, &mut distinct_right);
4754 left_rows
4755 .into_iter()
4756 .filter(|row| distinct_right.contains(row))
4757 .collect()
4758 }
4759 SetOperator::Except => {
4760 apply_distinct_if_needed(true, &mut left_rows);
4761 let mut distinct_right = right_rows;
4762 apply_distinct_if_needed(true, &mut distinct_right);
4763 left_rows
4764 .into_iter()
4765 .filter(|row| !distinct_right.contains(row))
4766 .collect()
4767 }
4768 }
4769}
4770
4771fn primary_key_values(table: &Table, row: &[Value]) -> Result<Vec<Value>> {
4772 table.get_primary_key_values(row).map_err(|err| {
4773 HematiteError::ParseError(format!("Failed to extract primary key values: {}", err))
4774 })
4775}
4776
4777fn out_of_range_error(column: &Column, type_name: &str) -> HematiteError {
4778 HematiteError::ParseError(format!(
4779 "Type mismatch: column '{}' expects {}, got out-of-range value",
4780 column.name, type_name
4781 ))
4782}
4783
4784fn coerce_varchar_value(value: String, max_chars: u32, label: &str) -> Result<Value> {
4785 if value.chars().count() > max_chars as usize {
4786 return Err(HematiteError::ParseError(format!(
4787 "{} exceeds declared character length {}",
4788 label, max_chars
4789 )));
4790 }
4791 Ok(Value::Text(value))
4792}
4793
4794fn coerce_char_value(value: String, length: u32, label: &str) -> Result<Value> {
4795 if value.chars().count() > length as usize {
4796 return Err(HematiteError::ParseError(format!(
4797 "{} exceeds declared character length {}",
4798 label, length
4799 )));
4800 }
4801 Ok(Value::Text(pad_text_to_char_length(&value, length)))
4802}
4803
4804fn cast_value_to_text_string(value: Value) -> Result<String> {
4805 match value {
4806 Value::Integer(value) => Ok(value.to_string()),
4807 Value::BigInt(value) => Ok(value.to_string()),
4808 Value::Int128(value) => Ok(value.to_string()),
4809 Value::UInteger(value) => Ok(value.to_string()),
4810 Value::UBigInt(value) => Ok(value.to_string()),
4811 Value::UInt128(value) => Ok(value.to_string()),
4812 Value::Float32(value) => Ok(value.to_string()),
4813 Value::Float(value) => Ok(value.to_string()),
4814 Value::Enum(value) | Value::Text(value) => Ok(value),
4815 Value::Boolean(true) => Ok("TRUE".to_string()),
4816 Value::Boolean(false) => Ok("FALSE".to_string()),
4817 Value::Decimal(value) => Ok(value.to_string()),
4818 Value::Date(value) => Ok(value.to_string()),
4819 Value::Time(value) => Ok(value.to_string()),
4820 Value::DateTime(value) => Ok(value.to_string()),
4821 Value::TimeWithTimeZone(value) => Ok(value.to_string()),
4822 Value::Blob(value) => Ok(String::from_utf8_lossy(&value).into_owned()),
4823 Value::Null => Err(HematiteError::ParseError(
4824 "Cannot CAST NULL to text without preserving NULL".to_string(),
4825 )),
4826 Value::IntervalYearMonth(value) => Ok(value.to_string()),
4827 Value::IntervalDaySecond(value) => Ok(value.to_string()),
4828 }
4829}
4830
4831fn coerce_binary_value(value: Value, max_len: u32, label: &str, fixed: bool) -> Result<Value> {
4832 let mut bytes = match value {
4833 Value::Blob(bytes) => bytes,
4834 Value::Text(value) => value.into_bytes(),
4835 Value::Enum(value) => value.into_bytes(),
4836 value => {
4837 return Err(HematiteError::ParseError(format!(
4838 "Expected binary-compatible value for {}, found {:?}",
4839 label, value
4840 )))
4841 }
4842 };
4843
4844 if bytes.len() > max_len as usize {
4845 return Err(HematiteError::ParseError(format!(
4846 "{} exceeds declared byte length {}",
4847 label, max_len
4848 )));
4849 }
4850 if fixed {
4851 bytes.resize(max_len as usize, 0);
4852 }
4853 Ok(Value::Blob(bytes))
4854}
4855
4856fn coerce_decimal_value(value: Value) -> Result<DecimalValue> {
4857 match value {
4858 Value::Decimal(value) => Ok(value),
4859 Value::Integer(value) => Ok(DecimalValue::from_i32(value)),
4860 Value::BigInt(value) => Ok(DecimalValue::from_i64(value)),
4861 Value::Int128(value) => Ok(DecimalValue::from_i128(value)),
4862 Value::UInteger(value) => Ok(DecimalValue::from_u32(value)),
4863 Value::UBigInt(value) => Ok(DecimalValue::from_u64(value)),
4864 Value::UInt128(value) => Ok(DecimalValue::from_u128(value)),
4865 Value::Float32(value) => DecimalValue::from_f64(value as f64),
4866 Value::Float(value) => DecimalValue::from_f64(value),
4867 Value::Text(value) => DecimalValue::parse(&value),
4868 value => Err(HematiteError::ParseError(format!(
4869 "Expected DECIMAL-compatible value, found {:?}",
4870 value
4871 ))),
4872 }
4873}
4874
4875fn numeric_value_as_f64(value: &Value) -> Option<f64> {
4876 match value {
4877 Value::Integer(value) => Some(*value as f64),
4878 Value::BigInt(value) => Some(*value as f64),
4879 Value::Int128(value) => Some(*value as f64),
4880 Value::UInteger(value) => Some(*value as f64),
4881 Value::UBigInt(value) => Some(*value as f64),
4882 Value::UInt128(value) => Some(*value as f64),
4883 Value::Float32(value) => Some(*value as f64),
4884 Value::Float(value) => Some(*value),
4885 Value::Decimal(value) => value.to_f64(),
4886 _ => None,
4887 }
4888}
4889
4890fn make_float_value(data_type: &DataType, value: f64) -> Value {
4891 match data_type {
4892 DataType::Float32 => Value::Float32(value as f32),
4893 DataType::Float => Value::Float(value),
4894 _ => unreachable!("non-float type used for float value construction"),
4895 }
4896}
4897
4898fn float_type_name(data_type: &DataType) -> &'static str {
4899 match data_type {
4900 DataType::Float32 => "FLOAT32",
4901 DataType::Float => "FLOAT",
4902 _ => unreachable!("non-float type used for float naming"),
4903 }
4904}
4905
4906fn coerce_enum_value(value: Value, variants: &[String], label: &str) -> Result<Value> {
4907 let value = match value {
4908 Value::Enum(value) | Value::Text(value) => value,
4909 value => {
4910 return Err(HematiteError::ParseError(format!(
4911 "Expected ENUM-compatible value for {}, found {:?}",
4912 label, value
4913 )))
4914 }
4915 };
4916
4917 if !variants.contains(&value) {
4918 return Err(HematiteError::ParseError(format!(
4919 "{} is not a valid ENUM variant",
4920 value
4921 )));
4922 }
4923
4924 Ok(Value::Enum(value))
4925}
4926
4927fn coerce_column_value(column: &Column, value: Value) -> Result<Value> {
4928 match (&column.data_type, value) {
4929 (DataType::Int8, Value::Integer(i)) => i8::try_from(i)
4930 .map(|_| Value::Integer(i))
4931 .map_err(|_| out_of_range_error(column, "INT8")),
4932 (DataType::Int8, Value::BigInt(i)) => i8::try_from(i)
4933 .map(|value| Value::Integer(value as i32))
4934 .map_err(|_| out_of_range_error(column, "INT8")),
4935 (DataType::Int8, Value::Int128(i)) => i8::try_from(i)
4936 .map(|value| Value::Integer(value as i32))
4937 .map_err(|_| out_of_range_error(column, "INT8")),
4938 (DataType::Int8, Value::UInteger(i)) => i8::try_from(i)
4939 .map(|value| Value::Integer(value as i32))
4940 .map_err(|_| out_of_range_error(column, "INT8")),
4941 (DataType::Int8, Value::UBigInt(i)) => i8::try_from(i)
4942 .map(|value| Value::Integer(value as i32))
4943 .map_err(|_| out_of_range_error(column, "INT8")),
4944 (DataType::Int8, Value::UInt128(i)) => i8::try_from(i)
4945 .map(|value| Value::Integer(value as i32))
4946 .map_err(|_| out_of_range_error(column, "INT8")),
4947 (DataType::Int16, Value::Integer(i)) => i16::try_from(i)
4948 .map(|_| Value::Integer(i))
4949 .map_err(|_| out_of_range_error(column, "INT16")),
4950 (DataType::Int16, Value::BigInt(i)) => i16::try_from(i)
4951 .map(|value| Value::Integer(value as i32))
4952 .map_err(|_| out_of_range_error(column, "INT16")),
4953 (DataType::Int16, Value::Int128(i)) => i16::try_from(i)
4954 .map(|value| Value::Integer(value as i32))
4955 .map_err(|_| out_of_range_error(column, "INT16")),
4956 (DataType::Int16, Value::UInteger(i)) => i16::try_from(i)
4957 .map(|value| Value::Integer(value as i32))
4958 .map_err(|_| out_of_range_error(column, "INT16")),
4959 (DataType::Int16, Value::UBigInt(i)) => i16::try_from(i)
4960 .map(|value| Value::Integer(value as i32))
4961 .map_err(|_| out_of_range_error(column, "INT16")),
4962 (DataType::Int16, Value::UInt128(i)) => i16::try_from(i)
4963 .map(|value| Value::Integer(value as i32))
4964 .map_err(|_| out_of_range_error(column, "INT16")),
4965 (DataType::Int, Value::Integer(i)) => Ok(Value::Integer(i)),
4966 (DataType::Int, Value::BigInt(i)) => i32::try_from(i)
4967 .map(Value::Integer)
4968 .map_err(|_| out_of_range_error(column, "INT")),
4969 (DataType::Int, Value::Int128(i)) => i32::try_from(i)
4970 .map(Value::Integer)
4971 .map_err(|_| out_of_range_error(column, "INT")),
4972 (DataType::Int, Value::UInteger(i)) => i32::try_from(i)
4973 .map(Value::Integer)
4974 .map_err(|_| out_of_range_error(column, "INT")),
4975 (DataType::Int, Value::UBigInt(i)) => i32::try_from(i)
4976 .map(Value::Integer)
4977 .map_err(|_| out_of_range_error(column, "INT")),
4978 (DataType::Int, Value::UInt128(i)) => i32::try_from(i)
4979 .map(Value::Integer)
4980 .map_err(|_| out_of_range_error(column, "INT")),
4981 (DataType::Int64, Value::Integer(i)) => Ok(Value::BigInt(i as i64)),
4982 (DataType::Int64, Value::BigInt(i)) => Ok(Value::BigInt(i)),
4983 (DataType::Int64, Value::Int128(i)) => i64::try_from(i)
4984 .map(Value::BigInt)
4985 .map_err(|_| out_of_range_error(column, "INT64")),
4986 (DataType::Int64, Value::UInteger(i)) => Ok(Value::BigInt(i as i64)),
4987 (DataType::Int64, Value::UBigInt(i)) => i64::try_from(i)
4988 .map(Value::BigInt)
4989 .map_err(|_| out_of_range_error(column, "INT64")),
4990 (DataType::Int64, Value::UInt128(i)) => i64::try_from(i)
4991 .map(Value::BigInt)
4992 .map_err(|_| out_of_range_error(column, "INT64")),
4993 (DataType::Int128, Value::Integer(i)) => Ok(Value::Int128(i as i128)),
4994 (DataType::Int128, Value::BigInt(i)) => Ok(Value::Int128(i as i128)),
4995 (DataType::Int128, Value::Int128(i)) => Ok(Value::Int128(i)),
4996 (DataType::Int128, Value::UInteger(i)) => Ok(Value::Int128(i as i128)),
4997 (DataType::Int128, Value::UBigInt(i)) => Ok(Value::Int128(i as i128)),
4998 (DataType::Int128, Value::UInt128(i)) => i128::try_from(i)
4999 .map(Value::Int128)
5000 .map_err(|_| out_of_range_error(column, "INT128")),
5001 (DataType::UInt8, Value::Integer(i)) if i >= 0 => u8::try_from(i)
5002 .map(|value| Value::UInteger(value as u32))
5003 .map_err(|_| out_of_range_error(column, "UINT8")),
5004 (DataType::UInt8, Value::BigInt(i)) if i >= 0 => u8::try_from(i)
5005 .map(|value| Value::UInteger(value as u32))
5006 .map_err(|_| out_of_range_error(column, "UINT8")),
5007 (DataType::UInt8, Value::Int128(i)) if i >= 0 => u8::try_from(i)
5008 .map(|value| Value::UInteger(value as u32))
5009 .map_err(|_| out_of_range_error(column, "UINT8")),
5010 (DataType::UInt8, Value::UInteger(i)) => u8::try_from(i)
5011 .map(|value| Value::UInteger(value as u32))
5012 .map_err(|_| out_of_range_error(column, "UINT8")),
5013 (DataType::UInt8, Value::UBigInt(i)) => u8::try_from(i)
5014 .map(|value| Value::UInteger(value as u32))
5015 .map_err(|_| out_of_range_error(column, "UINT8")),
5016 (DataType::UInt8, Value::UInt128(i)) => u8::try_from(i)
5017 .map(|value| Value::UInteger(value as u32))
5018 .map_err(|_| out_of_range_error(column, "UINT8")),
5019 (DataType::UInt16, Value::Integer(i)) if i >= 0 => u16::try_from(i)
5020 .map(|value| Value::UInteger(value as u32))
5021 .map_err(|_| out_of_range_error(column, "UINT16")),
5022 (DataType::UInt16, Value::BigInt(i)) if i >= 0 => u16::try_from(i)
5023 .map(|value| Value::UInteger(value as u32))
5024 .map_err(|_| out_of_range_error(column, "UINT16")),
5025 (DataType::UInt16, Value::Int128(i)) if i >= 0 => u16::try_from(i)
5026 .map(|value| Value::UInteger(value as u32))
5027 .map_err(|_| out_of_range_error(column, "UINT16")),
5028 (DataType::UInt16, Value::UInteger(i)) => u16::try_from(i)
5029 .map(|value| Value::UInteger(value as u32))
5030 .map_err(|_| out_of_range_error(column, "UINT16")),
5031 (DataType::UInt16, Value::UBigInt(i)) => u16::try_from(i)
5032 .map(|value| Value::UInteger(value as u32))
5033 .map_err(|_| out_of_range_error(column, "UINT16")),
5034 (DataType::UInt16, Value::UInt128(i)) => u16::try_from(i)
5035 .map(|value| Value::UInteger(value as u32))
5036 .map_err(|_| out_of_range_error(column, "UINT16")),
5037 (DataType::UInt, Value::Integer(i)) if i >= 0 => Ok(Value::UInteger(i as u32)),
5038 (DataType::UInt, Value::BigInt(i)) if i >= 0 => u32::try_from(i)
5039 .map(Value::UInteger)
5040 .map_err(|_| out_of_range_error(column, "UINT")),
5041 (DataType::UInt, Value::Int128(i)) if i >= 0 => u32::try_from(i)
5042 .map(Value::UInteger)
5043 .map_err(|_| out_of_range_error(column, "UINT")),
5044 (DataType::UInt, Value::UInteger(i)) => Ok(Value::UInteger(i)),
5045 (DataType::UInt, Value::UBigInt(i)) => u32::try_from(i)
5046 .map(Value::UInteger)
5047 .map_err(|_| out_of_range_error(column, "UINT")),
5048 (DataType::UInt, Value::UInt128(i)) => u32::try_from(i)
5049 .map(Value::UInteger)
5050 .map_err(|_| out_of_range_error(column, "UINT")),
5051 (DataType::UInt64, Value::Integer(i)) if i >= 0 => Ok(Value::UBigInt(i as u64)),
5052 (DataType::UInt64, Value::BigInt(i)) if i >= 0 => Ok(Value::UBigInt(i as u64)),
5053 (DataType::UInt64, Value::Int128(i)) if i >= 0 => u64::try_from(i)
5054 .map(Value::UBigInt)
5055 .map_err(|_| out_of_range_error(column, "UINT64")),
5056 (DataType::UInt64, Value::UInteger(i)) => Ok(Value::UBigInt(i as u64)),
5057 (DataType::UInt64, Value::UBigInt(i)) => Ok(Value::UBigInt(i)),
5058 (DataType::UInt64, Value::UInt128(i)) => u64::try_from(i)
5059 .map(Value::UBigInt)
5060 .map_err(|_| out_of_range_error(column, "UINT64")),
5061 (DataType::UInt128, Value::Integer(i)) if i >= 0 => Ok(Value::UInt128(i as u128)),
5062 (DataType::UInt128, Value::BigInt(i)) if i >= 0 => Ok(Value::UInt128(i as u128)),
5063 (DataType::UInt128, Value::Int128(i)) if i >= 0 => Ok(Value::UInt128(i as u128)),
5064 (DataType::UInt128, Value::UInteger(i)) => Ok(Value::UInt128(i as u128)),
5065 (DataType::UInt128, Value::UBigInt(i)) => Ok(Value::UInt128(i as u128)),
5066 (DataType::UInt128, Value::UInt128(i)) => Ok(Value::UInt128(i)),
5067 (_, Value::Null) if column.nullable => Ok(Value::Null),
5068 (_, Value::Null) => Err(HematiteError::ParseError(format!(
5069 "Column '{}' cannot be NULL",
5070 column.name
5071 ))),
5072 (DataType::Text, Value::Text(s)) => Ok(Value::Text(s)),
5073 (DataType::Char(length), Value::Text(s)) => coerce_char_value(s, *length, &column.name),
5074 (DataType::VarChar(length), Value::Text(s)) => {
5075 coerce_varchar_value(s, *length, &column.name)
5076 }
5077 (DataType::Binary(length), value) => {
5078 coerce_binary_value(value, *length, &column.name, true)
5079 }
5080 (DataType::VarBinary(length), value) => {
5081 coerce_binary_value(value, *length, &column.name, false)
5082 }
5083 (DataType::Enum(values), value) => coerce_enum_value(value, values, &column.name),
5084 (DataType::Boolean, Value::Boolean(b)) => Ok(Value::Boolean(b)),
5085 (data_type @ (DataType::Float32 | DataType::Float), value) => {
5086 let Some(number) = numeric_value_as_f64(&value) else {
5087 return Err(HematiteError::ParseError(format!(
5088 "Type mismatch: column '{}' expects {:?}, got {:?}",
5089 column.name, column.data_type, value
5090 )));
5091 };
5092 Ok(make_float_value(data_type, number))
5093 }
5094 (DataType::Decimal { precision, scale }, value) => {
5095 let decimal = coerce_decimal_value(value)?;
5096 if !decimal.fits_precision_scale(*precision, *scale) {
5097 return Err(HematiteError::ParseError(format!(
5098 "Type mismatch: column '{}' exceeds {} precision/scale",
5099 column.name,
5100 column.data_type.base_name()
5101 )));
5102 }
5103 Ok(Value::Decimal(decimal))
5104 }
5105 (DataType::Blob, Value::Blob(bytes)) => Ok(Value::Blob(bytes)),
5106 (DataType::Blob, Value::Text(s)) => Ok(Value::Blob(s.into_bytes())),
5107 (DataType::Blob, Value::Integer(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5108 (DataType::Blob, Value::BigInt(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5109 (DataType::Blob, Value::UInteger(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5110 (DataType::Blob, Value::UBigInt(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5111 (DataType::Blob, Value::Int128(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5112 (DataType::Blob, Value::UInt128(i)) => Ok(Value::Blob(i.to_le_bytes().to_vec())),
5113 (DataType::Date, Value::Date(s)) => Ok(Value::Date(s)),
5114 (DataType::Date, Value::Text(s)) => Ok(Value::Date(validate_date_string(&s)?)),
5115 (DataType::Time, Value::Time(s)) => Ok(Value::Time(s)),
5116 (DataType::Time, Value::Text(s)) => Ok(Value::Time(validate_time_string(&s)?)),
5117 (DataType::DateTime, Value::DateTime(s)) => Ok(Value::DateTime(s)),
5118 (DataType::DateTime, Value::Text(s)) => Ok(Value::DateTime(validate_datetime_string(&s)?)),
5119 (DataType::TimeWithTimeZone, Value::TimeWithTimeZone(s)) => Ok(Value::TimeWithTimeZone(s)),
5120 (DataType::TimeWithTimeZone, Value::Text(s)) => Ok(Value::TimeWithTimeZone(
5121 validate_time_with_time_zone_string(&s)?,
5122 )),
5123 (_, value) => Err(HematiteError::ParseError(format!(
5124 "Type mismatch: column '{}' expects {:?}, got {:?}",
5125 column.name, column.data_type, value
5126 ))),
5127 }
5128}
5129
5130fn validate_check_constraints(
5131 ctx: &mut ExecutionContext<'_>,
5132 table: &Table,
5133 row: &[Value],
5134) -> Result<()> {
5135 if table.check_constraints.is_empty() {
5136 return Ok(());
5137 }
5138
5139 let constraint_executor = SelectExecutor::new(
5140 locator_select_statement(TableReference::Table(table.name.clone(), None), None),
5141 SelectAccessPath::FullTableScan,
5142 );
5143 let sources = constraint_executor.resolve_sources(ctx)?;
5144 let mut subquery_cache = SubqueryCache::new();
5145
5146 for constraint in &table.check_constraints {
5147 let condition =
5148 crate::parser::parser::parse_condition_fragment(&constraint.expression_sql)?;
5149 let result = constraint_executor.evaluate_condition(
5150 ctx,
5151 &mut subquery_cache,
5152 &sources,
5153 &condition,
5154 row,
5155 )?;
5156 if result == Some(false) {
5157 let constraint_name = constraint
5158 .name
5159 .as_deref()
5160 .unwrap_or(constraint.expression_sql.as_str());
5161 return Err(HematiteError::ParseError(format!(
5162 "CHECK constraint '{}' failed for table '{}'",
5163 constraint_name, table.name
5164 )));
5165 }
5166 }
5167
5168 Ok(())
5169}
5170
5171fn validate_row_constraints(
5172 ctx: &mut ExecutionContext<'_>,
5173 table: &Table,
5174 row: &[Value],
5175) -> Result<()> {
5176 validate_check_constraints(ctx, table, row)?;
5177 validate_foreign_keys(ctx, table, row, None)
5178}
5179
5180fn validate_foreign_keys(
5181 ctx: &mut ExecutionContext<'_>,
5182 table: &Table,
5183 row: &[Value],
5184 skip_constraint: Option<&ForeignKeyConstraint>,
5185) -> Result<()> {
5186 for foreign_key in &table.foreign_keys {
5187 if skip_constraint == Some(foreign_key) {
5188 continue;
5189 }
5190 let key_values = foreign_key_values_for_row(table, foreign_key, row)?;
5191 if key_values.iter().any(Value::is_null) {
5192 continue;
5193 }
5194 let referenced_target = resolve_foreign_key_target(ctx, foreign_key)?;
5195 if !referenced_key_exists(ctx, &referenced_target, &key_values)? {
5196 return Err(HematiteError::ParseError(format!(
5197 "Foreign key constraint '{}' failed on table '{}': '{}.{:?}' does not contain {:?}",
5198 foreign_key_constraint_name(foreign_key),
5199 table.name,
5200 foreign_key.referenced_table,
5201 foreign_key.referenced_columns,
5202 key_values
5203 )));
5204 }
5205 }
5206
5207 Ok(())
5208}
5209
5210struct ResolvedForeignKeyTarget {
5211 table: Table,
5212 unique_index_name: Option<String>,
5213}
5214
5215fn resolve_foreign_key_target(
5216 ctx: &ExecutionContext<'_>,
5217 foreign_key: &ForeignKeyConstraint,
5218) -> Result<ResolvedForeignKeyTarget> {
5219 let referenced_table = ctx
5220 .catalog
5221 .get_table_by_name(&foreign_key.referenced_table)
5222 .ok_or_else(|| {
5223 HematiteError::ParseError(format!(
5224 "Referenced table '{}' not found",
5225 foreign_key.referenced_table
5226 ))
5227 })?
5228 .clone();
5229 let referenced_column_indices = foreign_key
5230 .referenced_columns
5231 .iter()
5232 .map(|column| {
5233 referenced_table.get_column_index(column).ok_or_else(|| {
5234 HematiteError::ParseError(format!(
5235 "Referenced column '{}.{}' not found",
5236 foreign_key.referenced_table, column
5237 ))
5238 })
5239 })
5240 .collect::<Result<Vec<_>>>()?;
5241
5242 let unique_index_name = if referenced_table.primary_key_columns == referenced_column_indices {
5243 None
5244 } else {
5245 Some(
5246 referenced_table
5247 .secondary_indexes
5248 .iter()
5249 .find(|index| index.unique && index.column_indices == referenced_column_indices)
5250 .ok_or_else(|| {
5251 HematiteError::CorruptedData(format!(
5252 "Referenced columns '{}.{:?}' are no longer backed by a PRIMARY KEY or UNIQUE index",
5253 foreign_key.referenced_table, foreign_key.referenced_columns
5254 ))
5255 })?
5256 .name
5257 .clone(),
5258 )
5259 };
5260
5261 Ok(ResolvedForeignKeyTarget {
5262 table: referenced_table,
5263 unique_index_name,
5264 })
5265}
5266
5267fn referenced_key_exists(
5268 ctx: &mut ExecutionContext<'_>,
5269 target: &ResolvedForeignKeyTarget,
5270 key_values: &[Value],
5271) -> Result<bool> {
5272 if target.unique_index_name.is_none() {
5273 return Ok(ctx
5274 .engine
5275 .lookup_row_by_primary_key(&target.table, key_values)?
5276 .is_some());
5277 }
5278
5279 Ok(!ctx
5280 .engine
5281 .lookup_secondary_index_rowids(
5282 &target.table,
5283 target
5284 .unique_index_name
5285 .as_deref()
5286 .expect("non-primary target must carry a unique index name"),
5287 key_values,
5288 )?
5289 .is_empty())
5290}
5291
5292fn referencing_foreign_keys(
5293 ctx: &mut ExecutionContext<'_>,
5294 parent_table: &Table,
5295) -> Result<Vec<ReferencingForeignKey>> {
5296 let mut references = Vec::new();
5297
5298 for (_, table_name) in ctx.catalog.list_tables() {
5299 let child_table = ctx.catalog.get_table_by_name(&table_name).ok_or_else(|| {
5300 HematiteError::ParseError(format!("Table '{}' not found", table_name))
5301 })?;
5302 for foreign_key in &child_table.foreign_keys {
5303 if foreign_key.referenced_table != parent_table.name {
5304 continue;
5305 }
5306 let referenced_column_indices = foreign_key
5307 .referenced_columns
5308 .iter()
5309 .map(|column| {
5310 parent_table.get_column_index(column).ok_or_else(|| {
5311 HematiteError::CorruptedData(format!(
5312 "Referenced column '{}.{}' is missing",
5313 foreign_key.referenced_table, column
5314 ))
5315 })
5316 })
5317 .collect::<Result<Vec<_>>>()?;
5318 references.push(ReferencingForeignKey {
5319 child_table: child_table.clone(),
5320 foreign_key: foreign_key.clone(),
5321 referenced_column_indices,
5322 });
5323 }
5324 }
5325
5326 Ok(references)
5327}
5328
5329fn parent_reference_key_changed(
5330 ctx: &mut ExecutionContext<'_>,
5331 parent_table: &Table,
5332 original_row: &[Value],
5333 updated_row: &[Value],
5334) -> Result<bool> {
5335 for reference in referencing_foreign_keys(ctx, parent_table)? {
5336 if parent_key_for_reference(parent_table, &reference, original_row)?
5337 != parent_key_for_reference(parent_table, &reference, updated_row)?
5338 {
5339 return Ok(true);
5340 }
5341 }
5342 Ok(false)
5343}
5344
5345fn apply_parent_delete_foreign_key_actions(
5346 ctx: &mut ExecutionContext<'_>,
5347 parent_table: &Table,
5348 row: &[Value],
5349) -> Result<()> {
5350 for reference in referencing_foreign_keys(ctx, parent_table)? {
5351 let parent_key = parent_key_for_reference(parent_table, &reference, row)?;
5352 if parent_key.iter().any(Value::is_null) {
5353 continue;
5354 }
5355 let child_rows = child_rows_referencing_parent_key(
5356 ctx,
5357 &reference.child_table,
5358 &reference,
5359 &parent_key,
5360 )?;
5361 execute_parent_foreign_key_action(
5362 ctx,
5363 parent_table,
5364 &reference,
5365 child_rows,
5366 reference.foreign_key.on_delete,
5367 "delete",
5368 None,
5369 )?;
5370 }
5371
5372 Ok(())
5373}
5374
5375fn apply_parent_update_foreign_key_actions(
5376 ctx: &mut ExecutionContext<'_>,
5377 parent_table: &Table,
5378 original_row: &[Value],
5379 updated_row: &[Value],
5380) -> Result<()> {
5381 for reference in referencing_foreign_keys(ctx, parent_table)? {
5382 let old_parent_key = parent_key_for_reference(parent_table, &reference, original_row)?;
5383 let new_parent_key = parent_key_for_reference(parent_table, &reference, updated_row)?;
5384 if old_parent_key == new_parent_key || old_parent_key.iter().any(Value::is_null) {
5385 continue;
5386 }
5387
5388 let child_rows = child_rows_referencing_parent_key(
5389 ctx,
5390 &reference.child_table,
5391 &reference,
5392 &old_parent_key,
5393 )?;
5394 execute_parent_foreign_key_action(
5395 ctx,
5396 parent_table,
5397 &reference,
5398 child_rows,
5399 reference.foreign_key.on_update,
5400 "update",
5401 Some(&new_parent_key),
5402 )?;
5403 }
5404 Ok(())
5405}
5406
5407fn foreign_key_constraint_name(foreign_key: &ForeignKeyConstraint) -> &str {
5408 foreign_key
5409 .name
5410 .as_deref()
5411 .unwrap_or(foreign_key.referenced_table.as_str())
5412}
5413
5414struct ReferencingForeignKey {
5415 child_table: Table,
5416 foreign_key: ForeignKeyConstraint,
5417 referenced_column_indices: Vec<usize>,
5418}
5419
5420enum ChildKeyRewrite<'a> {
5421 Replace(&'a [Value]),
5422 SetNull,
5423}
5424
5425fn foreign_key_values_for_row(
5426 table: &Table,
5427 foreign_key: &ForeignKeyConstraint,
5428 row: &[Value],
5429) -> Result<Vec<Value>> {
5430 row_values_for_indices(row, &foreign_key.column_indices, &table.name)
5431}
5432
5433fn parent_key_for_reference(
5434 parent_table: &Table,
5435 reference: &ReferencingForeignKey,
5436 row: &[Value],
5437) -> Result<Vec<Value>> {
5438 row_values_for_indices(
5439 row,
5440 &reference.referenced_column_indices,
5441 &parent_table.name,
5442 )
5443}
5444
5445fn execute_parent_foreign_key_action(
5446 ctx: &mut ExecutionContext<'_>,
5447 parent_table: &Table,
5448 reference: &ReferencingForeignKey,
5449 child_rows: Vec<StoredRow>,
5450 action: CatalogForeignKeyAction,
5451 operation: &str,
5452 replacement_key: Option<&[Value]>,
5453) -> Result<()> {
5454 match action {
5455 CatalogForeignKeyAction::Restrict => {
5456 if !child_rows.is_empty() {
5457 return Err(HematiteError::ParseError(format!(
5458 "Cannot {} row in table '{}' because foreign key '{}' on table '{}' still references it",
5459 operation,
5460 parent_table.name,
5461 foreign_key_constraint_name(&reference.foreign_key),
5462 reference.child_table.name
5463 )));
5464 }
5465 }
5466 CatalogForeignKeyAction::Cascade => {
5467 if let Some(replacement_key) = replacement_key {
5468 rewrite_child_foreign_key_rows(
5469 ctx,
5470 &reference.child_table,
5471 &reference.foreign_key,
5472 child_rows,
5473 ChildKeyRewrite::Replace(replacement_key),
5474 )?;
5475 } else {
5476 for child_row in child_rows {
5477 remove_stored_row(
5478 ctx,
5479 &reference.child_table.name,
5480 &reference.child_table,
5481 child_row.row_id,
5482 )?;
5483 }
5484 }
5485 }
5486 CatalogForeignKeyAction::SetNull => {
5487 rewrite_child_foreign_key_rows(
5488 ctx,
5489 &reference.child_table,
5490 &reference.foreign_key,
5491 child_rows,
5492 ChildKeyRewrite::SetNull,
5493 )?;
5494 }
5495 }
5496
5497 Ok(())
5498}
5499
5500fn row_values_for_indices(
5501 row: &[Value],
5502 indices: &[usize],
5503 table_name: &str,
5504) -> Result<Vec<Value>> {
5505 indices
5506 .iter()
5507 .map(|&index| {
5508 row.get(index).cloned().ok_or_else(|| {
5509 HematiteError::CorruptedData(format!(
5510 "Column index {} is invalid for table '{}'",
5511 index, table_name
5512 ))
5513 })
5514 })
5515 .collect()
5516}
5517
5518fn child_rows_referencing_parent_key(
5519 ctx: &mut ExecutionContext<'_>,
5520 child_table: &Table,
5521 reference: &ReferencingForeignKey,
5522 parent_key: &[Value],
5523) -> Result<Vec<StoredRow>> {
5524 let mut matches = Vec::new();
5525 for child_row in ctx.engine.read_rows_with_ids(&child_table.name)? {
5526 let child_key =
5527 foreign_key_values_for_row(child_table, &reference.foreign_key, &child_row.values)?;
5528 if child_key.iter().any(Value::is_null) {
5529 continue;
5530 }
5531 if child_key == parent_key {
5532 matches.push(child_row);
5533 }
5534 }
5535 Ok(matches)
5536}
5537
5538fn rewrite_child_foreign_key_rows(
5539 ctx: &mut ExecutionContext<'_>,
5540 table: &Table,
5541 foreign_key: &ForeignKeyConstraint,
5542 child_rows: Vec<StoredRow>,
5543 rewrite: ChildKeyRewrite<'_>,
5544) -> Result<()> {
5545 for mut child_row in child_rows {
5546 match rewrite {
5547 ChildKeyRewrite::Replace(replacement_key) => {
5548 for (position, &column_index) in foreign_key.column_indices.iter().enumerate() {
5549 child_row.values[column_index] = replacement_key[position].clone();
5550 }
5551 }
5552 ChildKeyRewrite::SetNull => {
5553 for &column_index in &foreign_key.column_indices {
5554 child_row.values[column_index] = Value::Null;
5555 }
5556 }
5557 }
5558 persist_foreign_key_child_update(ctx, table, foreign_key, child_row)?;
5559 }
5560 Ok(())
5561}
5562
5563fn persist_foreign_key_child_update(
5564 ctx: &mut ExecutionContext<'_>,
5565 table: &Table,
5566 skipped_foreign_key: &ForeignKeyConstraint,
5567 row: StoredRow,
5568) -> Result<()> {
5569 table
5570 .validate_row(&row.values)
5571 .map_err(|err| HematiteError::ParseError(err.to_string()))?;
5572 validate_check_constraints(ctx, table, &row.values)?;
5573 validate_foreign_keys(ctx, table, &row.values, Some(skipped_foreign_key))?;
5574 ensure_stored_row_uniqueness(ctx, table, &row)?;
5575 remove_stored_row(ctx, &table.name, table, row.row_id)?;
5576 write_stored_row(ctx, &table.name, table, row, true).map(|_| ())
5577}
5578
5579fn ensure_stored_row_uniqueness(
5580 ctx: &mut ExecutionContext<'_>,
5581 table: &Table,
5582 row: &StoredRow,
5583) -> Result<()> {
5584 let candidate_pk = primary_key_values(table, &row.values)?;
5585 if let Some(existing_rowid) = ctx.engine.lookup_primary_key_rowid(table, &candidate_pk)? {
5586 if existing_rowid != row.row_id {
5587 return Err(duplicate_primary_key_parse_error(
5588 &table.name,
5589 &candidate_pk,
5590 ));
5591 }
5592 }
5593
5594 for index in table.secondary_indexes.iter().filter(|index| index.unique) {
5595 let key_values = secondary_index_key_values(index, &row.values);
5596 if ctx
5597 .engine
5598 .lookup_secondary_index_rowids(table, &index.name, &key_values)?
5599 .into_iter()
5600 .any(|existing_rowid| existing_rowid != row.row_id)
5601 {
5602 return Err(unique_index_parse_error(&index.name, &table.name));
5603 }
5604 }
5605
5606 Ok(())
5607}
5608
5609fn remove_stored_row(
5610 ctx: &mut ExecutionContext<'_>,
5611 table_name: &str,
5612 table: &Table,
5613 row_id: u64,
5614) -> Result<()> {
5615 let Some(existing_row) = ctx.engine.lookup_row_by_rowid(table_name, row_id)? else {
5616 return Ok(());
5617 };
5618
5619 ctx.engine
5620 .delete_secondary_index_row(table, &existing_row)?;
5621 let deleted_pk = ctx.engine.delete_primary_key_row(table, &existing_row)?;
5622 if !deleted_pk {
5623 return Err(HematiteError::CorruptedData(format!(
5624 "Primary-key index entry vanished during row removal for table '{}'",
5625 table_name
5626 )));
5627 }
5628
5629 let deleted = ctx
5630 .engine
5631 .delete_from_table_by_rowid(table_name, existing_row.row_id)?;
5632 if !deleted {
5633 return Err(HematiteError::CorruptedData(format!(
5634 "Rowid {} vanished during row removal for table '{}'",
5635 existing_row.row_id, table_name
5636 )));
5637 }
5638
5639 Ok(())
5640}
5641
5642fn write_stored_row(
5643 ctx: &mut ExecutionContext<'_>,
5644 table_name: &str,
5645 table: &Table,
5646 mut row: StoredRow,
5647 preserve_row_id: bool,
5648) -> Result<u64> {
5649 let row_id = if preserve_row_id {
5650 ctx.engine.insert_row_with_rowid(table_name, row.clone())?;
5651 row.row_id
5652 } else {
5653 let allocated_row_id = ctx
5654 .engine
5655 .insert_into_table(table_name, row.values.clone())?;
5656 row.row_id = allocated_row_id;
5657 allocated_row_id
5658 };
5659
5660 ctx.engine.register_primary_key_row(table, row.clone())?;
5661 ctx.engine.register_secondary_index_row(table, row)?;
5662 Ok(row_id)
5663}
5664
5665fn apply_distinct_if_needed(distinct: bool, rows: &mut Vec<Vec<Value>>) {
5666 if !distinct {
5667 return;
5668 }
5669
5670 let mut distinct_rows = Vec::new();
5671 for row in rows.drain(..) {
5672 if !distinct_rows.contains(&row) {
5673 distinct_rows.push(row);
5674 }
5675 }
5676 *rows = distinct_rows;
5677}
5678
5679fn deduplicate_rows(mut rows: Vec<Vec<Value>>) -> Vec<Vec<Value>> {
5680 apply_distinct_if_needed(true, &mut rows);
5681 rows
5682}
5683
5684fn locator_select_statement(
5685 from: TableReference,
5686 where_clause: Option<WhereClause>,
5687) -> SelectStatement {
5688 SelectStatement {
5689 with_clause: Vec::new(),
5690 distinct: false,
5691 columns: vec![SelectItem::Wildcard],
5692 column_aliases: vec![None],
5693 from,
5694 where_clause,
5695 group_by: Vec::new(),
5696 having_clause: None,
5697 order_by: Vec::new(),
5698 limit: None,
5699 offset: None,
5700 set_operation: None,
5701 }
5702}
5703
5704fn evaluate_arithmetic_values(
5705 operator: &ArithmeticOperator,
5706 left: Value,
5707 right: Value,
5708) -> Result<Value> {
5709 if left.is_null() || right.is_null() {
5710 return Ok(Value::Null);
5711 }
5712
5713 if let Some(value) = evaluate_temporal_arithmetic(operator, &left, &right)? {
5714 return Ok(value);
5715 }
5716
5717 if left.is_float_like() || right.is_float_like() {
5718 if let (Some(left), Some(right)) =
5719 (numeric_value_as_f64(&left), numeric_value_as_f64(&right))
5720 {
5721 return evaluate_float_arithmetic(operator, left, right);
5722 }
5723 }
5724
5725 if is_exact_numeric_value(&left) && is_exact_numeric_value(&right) {
5726 return evaluate_exact_numeric_arithmetic(operator, left, right);
5727 }
5728
5729 Err(HematiteError::ParseError(format!(
5730 "Arithmetic requires numeric values, found {:?} and {:?}",
5731 left, right
5732 )))
5733}
5734
5735fn evaluate_exact_numeric_arithmetic(
5736 operator: &ArithmeticOperator,
5737 left: Value,
5738 right: Value,
5739) -> Result<Value> {
5740 let prefer_decimal = matches!(left, Value::Decimal(_))
5741 || matches!(right, Value::Decimal(_))
5742 || matches!(operator, ArithmeticOperator::Divide);
5743 let prefer_unsigned = !prefer_decimal
5744 && is_unsigned_integral_value(&left)
5745 && is_unsigned_integral_value(&right)
5746 && !matches!(operator, ArithmeticOperator::Subtract);
5747
5748 let left_decimal = coerce_decimal_value(left.clone())?;
5749 let right_decimal = coerce_decimal_value(right.clone())?;
5750 let result = match operator {
5751 ArithmeticOperator::Add => left_decimal.add(&right_decimal),
5752 ArithmeticOperator::Subtract => left_decimal.subtract(&right_decimal),
5753 ArithmeticOperator::Multiply => left_decimal.multiply(&right_decimal),
5754 ArithmeticOperator::Divide => left_decimal.divide(&right_decimal)?,
5755 ArithmeticOperator::Modulo => left_decimal.remainder(&right_decimal)?,
5756 };
5757
5758 if prefer_decimal || !result.is_integral() {
5759 Ok(Value::Decimal(result))
5760 } else {
5761 decimal_integral_result_to_value(result, prefer_unsigned)
5762 }
5763}
5764
5765fn evaluate_temporal_arithmetic(
5766 operator: &ArithmeticOperator,
5767 left: &Value,
5768 right: &Value,
5769) -> Result<Option<Value>> {
5770 match (left, right) {
5771 (Value::IntervalYearMonth(left), Value::IntervalYearMonth(right)) => {
5772 let total_months = match operator {
5773 ArithmeticOperator::Add => left.total_months().checked_add(right.total_months()),
5774 ArithmeticOperator::Subtract => {
5775 left.total_months().checked_sub(right.total_months())
5776 }
5777 _ => None,
5778 };
5779 Ok(total_months
5780 .map(|value| Value::IntervalYearMonth(IntervalYearMonthValue::new(value))))
5781 }
5782 (Value::IntervalDaySecond(left), Value::IntervalDaySecond(right)) => {
5783 let total_seconds = match operator {
5784 ArithmeticOperator::Add => left.total_seconds().checked_add(right.total_seconds()),
5785 ArithmeticOperator::Subtract => {
5786 left.total_seconds().checked_sub(right.total_seconds())
5787 }
5788 _ => None,
5789 };
5790 Ok(total_seconds
5791 .map(|value| Value::IntervalDaySecond(IntervalDaySecondValue::new(value))))
5792 }
5793 (Value::Date(left), Value::Date(right))
5794 if matches!(operator, ArithmeticOperator::Subtract) =>
5795 {
5796 Ok(Some(Value::BigInt(
5797 left.days_since_epoch() as i64 - right.days_since_epoch() as i64,
5798 )))
5799 }
5800 (Value::Date(left), Value::IntervalYearMonth(interval)) => {
5801 let months = signed_interval_months(operator, *interval)?;
5802 Ok(Some(Value::Date(add_months_to_date(*left, months)?)))
5803 }
5804 (Value::Date(left), Value::IntervalDaySecond(interval)) => {
5805 let days = whole_days_from_interval(operator, *interval)?;
5806 let result = left.days_since_epoch() as i64 + days;
5807 let result = i32::try_from(result).map_err(|_| {
5808 HematiteError::ParseError("DATE arithmetic overflowed supported range".to_string())
5809 })?;
5810 Ok(Some(Value::Date(DateValue::from_days_since_epoch(result))))
5811 }
5812 (Value::Date(left), right) => {
5813 let Some(days) = integral_rhs(right) else {
5814 return Ok(None);
5815 };
5816 let delta = match operator {
5817 ArithmeticOperator::Add => days,
5818 ArithmeticOperator::Subtract => -days,
5819 _ => return Ok(None),
5820 };
5821 let result = left.days_since_epoch() as i64 + delta;
5822 let result = i32::try_from(result).map_err(|_| {
5823 HematiteError::ParseError("DATE arithmetic overflowed supported range".to_string())
5824 })?;
5825 Ok(Some(Value::Date(DateValue::from_days_since_epoch(result))))
5826 }
5827
5828 (Value::DateTime(left), Value::DateTime(right))
5829 if matches!(operator, ArithmeticOperator::Subtract) =>
5830 {
5831 Ok(Some(Value::BigInt(
5832 left.seconds_since_epoch() - right.seconds_since_epoch(),
5833 )))
5834 }
5835 (Value::DateTime(left), Value::IntervalYearMonth(interval)) => Ok(Some(Value::DateTime(
5836 add_months_to_datetime(*left, signed_interval_months(operator, *interval)?)?,
5837 ))),
5838 (Value::DateTime(left), Value::IntervalDaySecond(interval)) => {
5839 let seconds = signed_interval_seconds(operator, *interval)?;
5840 Ok(Some(Value::DateTime(
5841 DateTimeValue::from_seconds_since_epoch(left.seconds_since_epoch() + seconds),
5842 )))
5843 }
5844 (Value::DateTime(left), right) => {
5845 let Some(seconds) = integral_rhs(right) else {
5846 return Ok(None);
5847 };
5848 let delta = match operator {
5849 ArithmeticOperator::Add => seconds,
5850 ArithmeticOperator::Subtract => -seconds,
5851 _ => return Ok(None),
5852 };
5853 Ok(Some(Value::DateTime(
5854 DateTimeValue::from_seconds_since_epoch(left.seconds_since_epoch() + delta),
5855 )))
5856 }
5857
5858 (Value::Time(left), Value::Time(right))
5859 if matches!(operator, ArithmeticOperator::Subtract) =>
5860 {
5861 Ok(Some(Value::Integer(
5862 left.seconds_since_midnight() as i32 - right.seconds_since_midnight() as i32,
5863 )))
5864 }
5865 (Value::Time(left), Value::IntervalDaySecond(interval)) => {
5866 let seconds = signed_interval_seconds(operator, *interval)?;
5867 Ok(Some(Value::Time(TimeValue::from_seconds_since_midnight(
5868 add_wrapped_seconds(left.seconds_since_midnight(), seconds),
5869 ))))
5870 }
5871 (Value::Time(left), right) => {
5872 let Some(seconds) = integral_rhs(right) else {
5873 return Ok(None);
5874 };
5875 let delta = match operator {
5876 ArithmeticOperator::Add => seconds,
5877 ArithmeticOperator::Subtract => -seconds,
5878 _ => return Ok(None),
5879 };
5880 Ok(Some(Value::Time(TimeValue::from_seconds_since_midnight(
5881 add_wrapped_seconds(left.seconds_since_midnight(), delta),
5882 ))))
5883 }
5884 (Value::TimeWithTimeZone(left), Value::IntervalDaySecond(interval)) => {
5885 let seconds = signed_interval_seconds(operator, *interval)?;
5886 Ok(Some(Value::TimeWithTimeZone(
5887 TimeWithTimeZoneValue::from_parts(
5888 add_wrapped_seconds(left.seconds_since_midnight(), seconds),
5889 left.offset_minutes(),
5890 ),
5891 )))
5892 }
5893 (Value::TimeWithTimeZone(left), right) => {
5894 let Some(seconds) = integral_rhs(right) else {
5895 return Ok(None);
5896 };
5897 let delta = match operator {
5898 ArithmeticOperator::Add => seconds,
5899 ArithmeticOperator::Subtract => -seconds,
5900 _ => return Ok(None),
5901 };
5902 Ok(Some(Value::TimeWithTimeZone(
5903 TimeWithTimeZoneValue::from_parts(
5904 add_wrapped_seconds(left.seconds_since_midnight(), delta),
5905 left.offset_minutes(),
5906 ),
5907 )))
5908 }
5909 _ => Ok(None),
5910 }
5911}
5912
5913fn add_wrapped_seconds(seconds_since_midnight: u32, delta: i64) -> u32 {
5914 (seconds_since_midnight as i64 + delta).rem_euclid(86_400) as u32
5915}
5916
5917fn signed_interval_months(
5918 operator: &ArithmeticOperator,
5919 interval: IntervalYearMonthValue,
5920) -> Result<i32> {
5921 match operator {
5922 ArithmeticOperator::Add => Ok(interval.total_months()),
5923 ArithmeticOperator::Subtract => interval.total_months().checked_neg().ok_or_else(|| {
5924 HematiteError::ParseError(
5925 "INTERVAL YEAR TO MONTH overflowed supported range".to_string(),
5926 )
5927 }),
5928 _ => Err(HematiteError::ParseError(
5929 "INTERVAL YEAR TO MONTH only supports addition and subtraction".to_string(),
5930 )),
5931 }
5932}
5933
5934fn signed_interval_seconds(
5935 operator: &ArithmeticOperator,
5936 interval: IntervalDaySecondValue,
5937) -> Result<i64> {
5938 match operator {
5939 ArithmeticOperator::Add => Ok(interval.total_seconds()),
5940 ArithmeticOperator::Subtract => interval.total_seconds().checked_neg().ok_or_else(|| {
5941 HematiteError::ParseError(
5942 "INTERVAL DAY TO SECOND overflowed supported range".to_string(),
5943 )
5944 }),
5945 _ => Err(HematiteError::ParseError(
5946 "INTERVAL DAY TO SECOND only supports addition and subtraction".to_string(),
5947 )),
5948 }
5949}
5950
5951fn whole_days_from_interval(
5952 operator: &ArithmeticOperator,
5953 interval: IntervalDaySecondValue,
5954) -> Result<i64> {
5955 let seconds = signed_interval_seconds(operator, interval)?;
5956 if seconds % 86_400 != 0 {
5957 return Err(HematiteError::ParseError(
5958 "DATE arithmetic requires INTERVAL DAY TO SECOND values aligned to whole days"
5959 .to_string(),
5960 ));
5961 }
5962 Ok(seconds / 86_400)
5963}
5964
5965fn add_months_to_date(value: DateValue, delta_months: i32) -> Result<DateValue> {
5966 let (year, month, day) = value.components();
5967 let total_months = year
5968 .checked_mul(12)
5969 .and_then(|total| total.checked_add(month as i32 - 1))
5970 .and_then(|total| total.checked_add(delta_months))
5971 .ok_or_else(|| {
5972 HematiteError::ParseError(
5973 "Temporal month arithmetic overflowed supported range".to_string(),
5974 )
5975 })?;
5976 let new_year = total_months.div_euclid(12);
5977 let new_month = total_months.rem_euclid(12) as u32 + 1;
5978 let clamped_day = day.min(executor_days_in_month(new_year, new_month));
5979 Ok(DateValue::from_days_since_epoch(executor_days_from_civil(
5980 new_year,
5981 new_month,
5982 clamped_day,
5983 )))
5984}
5985
5986fn add_months_to_datetime(value: DateTimeValue, delta_months: i32) -> Result<DateTimeValue> {
5987 let (date, time) = value.components();
5988 let shifted_date = add_months_to_date(date, delta_months)?;
5989 Ok(DateTimeValue::from_seconds_since_epoch(
5990 shifted_date.days_since_epoch() as i64 * 86_400 + time.seconds_since_midnight() as i64,
5991 ))
5992}
5993
5994fn executor_days_in_month(year: i32, month: u32) -> u32 {
5995 match month {
5996 1 | 3 | 5 | 7 | 8 | 10 | 12 => 31,
5997 4 | 6 | 9 | 11 => 30,
5998 2 if executor_is_leap_year(year) => 29,
5999 2 => 28,
6000 _ => unreachable!(),
6001 }
6002}
6003
6004fn executor_is_leap_year(year: i32) -> bool {
6005 (year % 4 == 0 && year % 100 != 0) || year % 400 == 0
6006}
6007
6008fn executor_days_from_civil(year: i32, month: u32, day: u32) -> i32 {
6009 let year = year - if month <= 2 { 1 } else { 0 };
6010 let era = if year >= 0 { year } else { year - 399 } / 400;
6011 let yoe = year - era * 400;
6012 let month = month as i32;
6013 let day = day as i32;
6014 let doy = (153 * (month + if month > 2 { -3 } else { 9 }) + 2) / 5 + day - 1;
6015 let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
6016 era * 146097 + doe - 719468
6017}
6018
6019fn integral_rhs(value: &Value) -> Option<i64> {
6020 match value {
6021 Value::Integer(value) => Some(*value as i64),
6022 Value::BigInt(value) => Some(*value),
6023 Value::Int128(value) => i64::try_from(*value).ok(),
6024 Value::UInteger(value) => Some(*value as i64),
6025 Value::UBigInt(value) => i64::try_from(*value).ok(),
6026 Value::UInt128(value) => i64::try_from(*value).ok(),
6027 _ => None,
6028 }
6029}
6030
6031fn auto_increment_value(column: &Column, next_row_id: u64) -> Result<Value> {
6032 match column.data_type {
6033 DataType::Int => i32::try_from(next_row_id)
6034 .map(Value::Integer)
6035 .map_err(|_| out_of_range_error(column, "INT")),
6036 DataType::UInt => u32::try_from(next_row_id)
6037 .map(Value::UInteger)
6038 .map_err(|_| out_of_range_error(column, "UINT")),
6039 _ => Err(HematiteError::ParseError(format!(
6040 "AUTO_INCREMENT column '{}' must use INT or UINT",
6041 column.name
6042 ))),
6043 }
6044}
6045
6046fn negate_numeric_value(value: Value) -> Result<Value> {
6047 match value {
6048 Value::Integer(value) => value.checked_neg().map(Value::Integer).ok_or_else(|| {
6049 HematiteError::ParseError("Integer overflow while evaluating unary '-'".to_string())
6050 }),
6051 Value::BigInt(value) => value.checked_neg().map(Value::BigInt).ok_or_else(|| {
6052 HematiteError::ParseError("INT64 overflow while evaluating unary '-'".to_string())
6053 }),
6054 Value::Int128(value) => value.checked_neg().map(Value::Int128).ok_or_else(|| {
6055 HematiteError::ParseError("INT128 overflow while evaluating unary '-'".to_string())
6056 }),
6057 Value::UInteger(value) => Ok(Value::BigInt(-(value as i64))),
6058 Value::UBigInt(value) => Ok(Value::Int128(-(value as i128))),
6059 Value::UInt128(value) => {
6060 if value > i128::MAX as u128 {
6061 return Err(HematiteError::ParseError(
6062 "UINT128 overflow while evaluating unary '-'".to_string(),
6063 ));
6064 }
6065 Ok(Value::Int128(-(value as i128)))
6066 }
6067 Value::Float32(value) => Ok(Value::Float32(-value)),
6068 Value::Float(value) => Ok(Value::Float(-value)),
6069 Value::Decimal(value) => Ok(Value::Decimal(value.negate())),
6070 Value::Null => Ok(Value::Null),
6071 value => Err(HematiteError::ParseError(format!(
6072 "Unary '-' requires a numeric value, found {:?}",
6073 value
6074 ))),
6075 }
6076}
6077
6078fn evaluate_float_arithmetic(
6079 operator: &ArithmeticOperator,
6080 left: f64,
6081 right: f64,
6082) -> Result<Value> {
6083 let value = match operator {
6084 ArithmeticOperator::Add => left + right,
6085 ArithmeticOperator::Subtract => left - right,
6086 ArithmeticOperator::Multiply => left * right,
6087 ArithmeticOperator::Divide => {
6088 if right == 0.0 {
6089 return Err(HematiteError::ParseError("Division by zero".to_string()));
6090 }
6091 left / right
6092 }
6093 ArithmeticOperator::Modulo => {
6094 if right == 0.0 {
6095 return Err(HematiteError::ParseError("Division by zero".to_string()));
6096 }
6097 left % right
6098 }
6099 };
6100 Ok(Value::Float(value))
6101}
6102
6103fn is_exact_numeric_value(value: &Value) -> bool {
6104 matches!(
6105 value,
6106 Value::Integer(_)
6107 | Value::BigInt(_)
6108 | Value::Int128(_)
6109 | Value::UInteger(_)
6110 | Value::UBigInt(_)
6111 | Value::UInt128(_)
6112 | Value::Decimal(_)
6113 )
6114}
6115
6116fn is_unsigned_integral_value(value: &Value) -> bool {
6117 matches!(
6118 value,
6119 Value::UInteger(_) | Value::UBigInt(_) | Value::UInt128(_)
6120 )
6121}
6122
6123fn decimal_integral_result_to_value(value: DecimalValue, prefer_unsigned: bool) -> Result<Value> {
6124 debug_assert!(value.is_integral());
6125 if value.negative() {
6126 return value
6127 .to_integral_i128()
6128 .map(minimal_signed_value)
6129 .ok_or_else(|| {
6130 HematiteError::ParseError(
6131 "Arithmetic overflowed the supported signed integer range".to_string(),
6132 )
6133 });
6134 }
6135
6136 if prefer_unsigned {
6137 return value
6138 .to_integral_u128()
6139 .map(minimal_unsigned_value)
6140 .ok_or_else(|| {
6141 HematiteError::ParseError(
6142 "Arithmetic overflowed the supported unsigned integer range".to_string(),
6143 )
6144 });
6145 }
6146
6147 if let Some(signed) = value.to_integral_i128() {
6148 Ok(minimal_signed_value(signed))
6149 } else {
6150 value
6151 .to_integral_u128()
6152 .map(minimal_unsigned_value)
6153 .ok_or_else(|| {
6154 HematiteError::ParseError(
6155 "Arithmetic overflowed the supported integer range".to_string(),
6156 )
6157 })
6158 }
6159}
6160
6161fn minimal_signed_value(value: i128) -> Value {
6162 if let Ok(value) = i32::try_from(value) {
6163 Value::Integer(value)
6164 } else if let Ok(value) = i64::try_from(value) {
6165 Value::BigInt(value)
6166 } else {
6167 Value::Int128(value)
6168 }
6169}
6170
6171fn minimal_unsigned_value(value: u128) -> Value {
6172 if let Ok(value) = u32::try_from(value) {
6173 Value::UInteger(value)
6174 } else if let Ok(value) = u64::try_from(value) {
6175 Value::UBigInt(value)
6176 } else {
6177 Value::UInt128(value)
6178 }
6179}
6180
6181fn cast_value_to_type(value: Value, data_type: DataType) -> Result<Value> {
6182 match (data_type.clone(), value) {
6183 (_, Value::Null) => Ok(Value::Null),
6184 (DataType::Int8, Value::Integer(value)) => i8::try_from(value)
6185 .map(|_| Value::Integer(value))
6186 .map_err(|_| {
6187 HematiteError::ParseError("Cannot CAST out-of-range INT AS INT8".to_string())
6188 }),
6189 (DataType::Int8, Value::Blob(bytes)) => decode_signed_blob(&bytes, 1, "INT8")
6190 .and_then(|value| {
6191 i8::try_from(value)
6192 .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS INT8".to_string()))
6193 })
6194 .map(|value| Value::Integer(value as i32)),
6195 (DataType::Int16, Value::Integer(value)) => i16::try_from(value)
6196 .map(|_| Value::Integer(value))
6197 .map_err(|_| {
6198 HematiteError::ParseError("Cannot CAST out-of-range INT AS INT16".to_string())
6199 }),
6200 (DataType::Int16, Value::Blob(bytes)) => decode_signed_blob(&bytes, 2, "INT16")
6201 .and_then(|value| {
6202 i16::try_from(value)
6203 .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS INT16".to_string()))
6204 })
6205 .map(|value| Value::Integer(value as i32)),
6206 (DataType::Int, Value::Integer(value)) => Ok(Value::Integer(value)),
6207 (DataType::Int, Value::BigInt(value)) => {
6208 i32::try_from(value).map(Value::Integer).map_err(|_| {
6209 HematiteError::ParseError("Cannot CAST out-of-range INT64 AS INT".to_string())
6210 })
6211 }
6212 (DataType::Int, Value::Int128(value)) => {
6213 i32::try_from(value).map(Value::Integer).map_err(|_| {
6214 HematiteError::ParseError("Cannot CAST out-of-range INT128 AS INT".to_string())
6215 })
6216 }
6217 (DataType::Int, Value::Float32(value)) => Ok(Value::Integer(value as i32)),
6218 (DataType::Int, Value::Float(value)) => Ok(Value::Integer(value as i32)),
6219 (DataType::Int, Value::Boolean(true)) => Ok(Value::Integer(1)),
6220 (DataType::Int, Value::Boolean(false)) => Ok(Value::Integer(0)),
6221 (DataType::Int, Value::Text(value)) => value
6222 .parse::<i32>()
6223 .map(Value::Integer)
6224 .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS INT", value))),
6225 (DataType::Int, Value::Blob(bytes)) => decode_signed_blob(&bytes, 4, "INT")
6226 .and_then(|value| {
6227 i32::try_from(value)
6228 .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS INT".to_string()))
6229 })
6230 .map(Value::Integer),
6231 (DataType::Int64, Value::Integer(value)) => Ok(Value::BigInt(value as i64)),
6232 (DataType::Int64, Value::BigInt(value)) => Ok(Value::BigInt(value)),
6233 (DataType::Int64, Value::Int128(value)) => {
6234 i64::try_from(value).map(Value::BigInt).map_err(|_| {
6235 HematiteError::ParseError("Cannot CAST out-of-range INT128 AS INT64".to_string())
6236 })
6237 }
6238 (DataType::Int64, Value::Float32(value)) => Ok(Value::BigInt(value as i64)),
6239 (DataType::Int64, Value::Float(value)) => Ok(Value::BigInt(value as i64)),
6240 (DataType::Int64, Value::Boolean(true)) => Ok(Value::BigInt(1)),
6241 (DataType::Int64, Value::Boolean(false)) => Ok(Value::BigInt(0)),
6242 (DataType::Int64, Value::Text(value)) => value
6243 .parse::<i64>()
6244 .map(Value::BigInt)
6245 .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS INT64", value))),
6246 (DataType::Int64, Value::Blob(bytes)) => decode_signed_blob(&bytes, 8, "INT64")
6247 .and_then(|value| {
6248 i64::try_from(value)
6249 .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS INT64".to_string()))
6250 })
6251 .map(Value::BigInt),
6252 (DataType::Int128, Value::Integer(value)) => Ok(Value::Int128(value as i128)),
6253 (DataType::Int128, Value::BigInt(value)) => Ok(Value::Int128(value as i128)),
6254 (DataType::Int128, Value::Int128(value)) => Ok(Value::Int128(value)),
6255 (DataType::Int128, Value::Float32(value)) => Ok(Value::Int128(value as i128)),
6256 (DataType::Int128, Value::Float(value)) => Ok(Value::Int128(value as i128)),
6257 (DataType::Int128, Value::Boolean(true)) => Ok(Value::Int128(1)),
6258 (DataType::Int128, Value::Boolean(false)) => Ok(Value::Int128(0)),
6259 (DataType::Int128, Value::Text(value)) => value
6260 .parse::<i128>()
6261 .map(Value::Int128)
6262 .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS INT128", value))),
6263 (DataType::Int128, Value::Blob(bytes)) => {
6264 decode_signed_blob(&bytes, 16, "INT128").map(Value::Int128)
6265 }
6266 (DataType::UInt8, Value::Integer(value)) if value >= 0 => u8::try_from(value)
6267 .map(|value| Value::UInteger(value as u32))
6268 .map_err(|_| {
6269 HematiteError::ParseError("Cannot CAST out-of-range INT AS UINT8".to_string())
6270 }),
6271 (DataType::UInt8, Value::Blob(bytes)) => decode_unsigned_blob(&bytes, 1, "UINT8")
6272 .and_then(|value| {
6273 u8::try_from(value)
6274 .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS UINT8".to_string()))
6275 })
6276 .map(|value| Value::UInteger(value as u32)),
6277 (DataType::UInt16, Value::Integer(value)) if value >= 0 => u16::try_from(value)
6278 .map(|value| Value::UInteger(value as u32))
6279 .map_err(|_| {
6280 HematiteError::ParseError("Cannot CAST out-of-range INT AS UINT16".to_string())
6281 }),
6282 (DataType::UInt16, Value::Blob(bytes)) => decode_unsigned_blob(&bytes, 2, "UINT16")
6283 .and_then(|value| {
6284 u16::try_from(value).map_err(|_| {
6285 HematiteError::ParseError("Cannot CAST blob AS UINT16".to_string())
6286 })
6287 })
6288 .map(|value| Value::UInteger(value as u32)),
6289 (DataType::UInt, Value::Integer(value)) if value >= 0 => Ok(Value::UInteger(value as u32)),
6290 (DataType::UInt, Value::BigInt(value)) if value >= 0 => {
6291 u32::try_from(value).map(Value::UInteger).map_err(|_| {
6292 HematiteError::ParseError("Cannot CAST out-of-range INT64 AS UINT".to_string())
6293 })
6294 }
6295 (DataType::UInt, Value::Int128(value)) if value >= 0 => {
6296 u32::try_from(value).map(Value::UInteger).map_err(|_| {
6297 HematiteError::ParseError("Cannot CAST out-of-range INT128 AS UINT".to_string())
6298 })
6299 }
6300 (DataType::UInt, Value::UInteger(value)) => Ok(Value::UInteger(value)),
6301 (DataType::UInt, Value::UBigInt(value)) => {
6302 u32::try_from(value).map(Value::UInteger).map_err(|_| {
6303 HematiteError::ParseError("Cannot CAST out-of-range UINT64 AS UINT".to_string())
6304 })
6305 }
6306 (DataType::UInt, Value::UInt128(value)) => {
6307 u32::try_from(value).map(Value::UInteger).map_err(|_| {
6308 HematiteError::ParseError("Cannot CAST out-of-range UINT128 AS UINT".to_string())
6309 })
6310 }
6311 (DataType::UInt, Value::Float32(value)) if value >= 0.0 => {
6312 Ok(Value::UInteger(value as u32))
6313 }
6314 (DataType::UInt, Value::Float(value)) if value >= 0.0 => Ok(Value::UInteger(value as u32)),
6315 (DataType::UInt, Value::Boolean(true)) => Ok(Value::UInteger(1)),
6316 (DataType::UInt, Value::Boolean(false)) => Ok(Value::UInteger(0)),
6317 (DataType::UInt, Value::Text(value)) => value
6318 .parse::<u32>()
6319 .map(Value::UInteger)
6320 .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS UINT", value))),
6321 (DataType::UInt, Value::Blob(bytes)) => decode_unsigned_blob(&bytes, 4, "UINT")
6322 .and_then(|value| {
6323 u32::try_from(value)
6324 .map_err(|_| HematiteError::ParseError("Cannot CAST blob AS UINT".to_string()))
6325 })
6326 .map(Value::UInteger),
6327 (DataType::UInt64, Value::Integer(value)) if value >= 0 => Ok(Value::UBigInt(value as u64)),
6328 (DataType::UInt64, Value::BigInt(value)) if value >= 0 => Ok(Value::UBigInt(value as u64)),
6329 (DataType::UInt64, Value::Int128(value)) if value >= 0 => {
6330 u64::try_from(value).map(Value::UBigInt).map_err(|_| {
6331 HematiteError::ParseError("Cannot CAST out-of-range INT128 AS UINT64".to_string())
6332 })
6333 }
6334 (DataType::UInt64, Value::UInteger(value)) => Ok(Value::UBigInt(value as u64)),
6335 (DataType::UInt64, Value::UBigInt(value)) => Ok(Value::UBigInt(value)),
6336 (DataType::UInt64, Value::UInt128(value)) => {
6337 u64::try_from(value).map(Value::UBigInt).map_err(|_| {
6338 HematiteError::ParseError("Cannot CAST out-of-range UINT128 AS UINT64".to_string())
6339 })
6340 }
6341 (DataType::UInt64, Value::Float32(value)) if value >= 0.0 => {
6342 Ok(Value::UBigInt(value as u64))
6343 }
6344 (DataType::UInt64, Value::Float(value)) if value >= 0.0 => Ok(Value::UBigInt(value as u64)),
6345 (DataType::UInt64, Value::Boolean(true)) => Ok(Value::UBigInt(1)),
6346 (DataType::UInt64, Value::Boolean(false)) => Ok(Value::UBigInt(0)),
6347 (DataType::UInt64, Value::Text(value)) => value
6348 .parse::<u64>()
6349 .map(Value::UBigInt)
6350 .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS UINT64", value))),
6351 (DataType::UInt64, Value::Blob(bytes)) => decode_unsigned_blob(&bytes, 8, "UINT64")
6352 .and_then(|value| {
6353 u64::try_from(value).map_err(|_| {
6354 HematiteError::ParseError("Cannot CAST blob AS UINT64".to_string())
6355 })
6356 })
6357 .map(Value::UBigInt),
6358 (DataType::UInt128, Value::Integer(value)) if value >= 0 => {
6359 Ok(Value::UInt128(value as u128))
6360 }
6361 (DataType::UInt128, Value::BigInt(value)) if value >= 0 => {
6362 Ok(Value::UInt128(value as u128))
6363 }
6364 (DataType::UInt128, Value::Int128(value)) if value >= 0 => {
6365 Ok(Value::UInt128(value as u128))
6366 }
6367 (DataType::UInt128, Value::UInteger(value)) => Ok(Value::UInt128(value as u128)),
6368 (DataType::UInt128, Value::UBigInt(value)) => Ok(Value::UInt128(value as u128)),
6369 (DataType::UInt128, Value::UInt128(value)) => Ok(Value::UInt128(value)),
6370 (DataType::UInt128, Value::Float32(value)) if value >= 0.0 => {
6371 Ok(Value::UInt128(value as u128))
6372 }
6373 (DataType::UInt128, Value::Float(value)) if value >= 0.0 => {
6374 Ok(Value::UInt128(value as u128))
6375 }
6376 (DataType::UInt128, Value::Boolean(true)) => Ok(Value::UInt128(1)),
6377 (DataType::UInt128, Value::Boolean(false)) => Ok(Value::UInt128(0)),
6378 (DataType::UInt128, Value::Text(value)) => value
6379 .parse::<u128>()
6380 .map(Value::UInt128)
6381 .map_err(|_| HematiteError::ParseError(format!("Cannot CAST '{}' AS UINT128", value))),
6382 (DataType::UInt128, Value::Blob(bytes)) => {
6383 decode_unsigned_blob(&bytes, 16, "UINT128").map(Value::UInt128)
6384 }
6385 (DataType::Text, value) => cast_value_to_text_string(value).map(Value::Text),
6386 (DataType::Char(length), value) => {
6387 coerce_char_value(cast_value_to_text_string(value)?, length, "CAST")
6388 }
6389 (DataType::VarChar(length), value) => {
6390 coerce_varchar_value(cast_value_to_text_string(value)?, length, "CAST")
6391 }
6392 (DataType::Binary(length), value) => coerce_binary_value(value, length, "CAST", true),
6393 (DataType::VarBinary(length), value) => coerce_binary_value(value, length, "CAST", false),
6394 (DataType::Enum(values), value) => coerce_enum_value(value, &values, "CAST"),
6395 (DataType::Boolean, Value::Boolean(value)) => Ok(Value::Boolean(value)),
6396 (DataType::Boolean, Value::Integer(value)) => Ok(Value::Boolean(value != 0)),
6397 (DataType::Boolean, Value::BigInt(value)) => Ok(Value::Boolean(value != 0)),
6398 (DataType::Boolean, Value::Int128(value)) => Ok(Value::Boolean(value != 0)),
6399 (DataType::Boolean, Value::UInteger(value)) => Ok(Value::Boolean(value != 0)),
6400 (DataType::Boolean, Value::UBigInt(value)) => Ok(Value::Boolean(value != 0)),
6401 (DataType::Boolean, Value::UInt128(value)) => Ok(Value::Boolean(value != 0)),
6402 (DataType::Boolean, Value::Float32(value)) => Ok(Value::Boolean(value != 0.0)),
6403 (DataType::Boolean, Value::Float(value)) => Ok(Value::Boolean(value != 0.0)),
6404 (DataType::Boolean, Value::Text(value)) => match value.to_ascii_uppercase().as_str() {
6405 "TRUE" | "1" => Ok(Value::Boolean(true)),
6406 "FALSE" | "0" => Ok(Value::Boolean(false)),
6407 _ => Err(HematiteError::ParseError(format!(
6408 "Cannot CAST '{}' AS BOOLEAN",
6409 value
6410 ))),
6411 },
6412 (data_type @ (DataType::Float32 | DataType::Float), Value::Text(value)) => value
6413 .parse::<f64>()
6414 .map(|value| make_float_value(&data_type, value))
6415 .map_err(|_| {
6416 HematiteError::ParseError(format!(
6417 "Cannot CAST '{}' AS {}",
6418 value,
6419 float_type_name(&data_type)
6420 ))
6421 }),
6422 (data_type @ (DataType::Float32 | DataType::Float), Value::Boolean(true)) => {
6423 Ok(make_float_value(&data_type, 1.0))
6424 }
6425 (data_type @ (DataType::Float32 | DataType::Float), Value::Boolean(false)) => {
6426 Ok(make_float_value(&data_type, 0.0))
6427 }
6428 (data_type @ (DataType::Float32 | DataType::Float), value) => {
6429 if let Some(number) = numeric_value_as_f64(&value) {
6430 Ok(make_float_value(&data_type, number))
6431 } else {
6432 Err(HematiteError::ParseError(format!(
6433 "Cannot CAST '{:?}' AS {}",
6434 value,
6435 float_type_name(&data_type)
6436 )))
6437 }
6438 }
6439 (DataType::Decimal { precision, scale }, value) => {
6440 let decimal = coerce_decimal_value(value)?;
6441 if !decimal.fits_precision_scale(precision, scale) {
6442 return Err(HematiteError::ParseError(format!(
6443 "Cannot CAST decimal outside declared precision/scale AS {}",
6444 data_type.base_name()
6445 )));
6446 }
6447 Ok(Value::Decimal(decimal))
6448 }
6449 (DataType::Blob, Value::Blob(value)) => Ok(Value::Blob(value)),
6450 (DataType::Blob, Value::Text(value)) => Ok(Value::Blob(value.into_bytes())),
6451 (DataType::Blob, Value::Integer(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6452 (DataType::Blob, Value::BigInt(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6453 (DataType::Blob, Value::Int128(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6454 (DataType::Blob, Value::UInteger(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6455 (DataType::Blob, Value::UBigInt(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6456 (DataType::Blob, Value::UInt128(value)) => Ok(Value::Blob(value.to_le_bytes().to_vec())),
6457 (DataType::Date, Value::Date(value)) => Ok(Value::Date(value)),
6458 (DataType::Date, Value::Text(value)) => Ok(Value::Date(validate_date_string(&value)?)),
6459 (DataType::Time, Value::Time(value)) => Ok(Value::Time(value)),
6460 (DataType::Time, Value::Text(value)) => Ok(Value::Time(validate_time_string(&value)?)),
6461 (DataType::DateTime, Value::DateTime(value)) => Ok(Value::DateTime(value)),
6462 (DataType::DateTime, Value::Text(value)) => {
6463 Ok(Value::DateTime(validate_datetime_string(&value)?))
6464 }
6465 (DataType::TimeWithTimeZone, Value::TimeWithTimeZone(value)) => {
6466 Ok(Value::TimeWithTimeZone(value))
6467 }
6468 (DataType::TimeWithTimeZone, Value::Text(value)) => Ok(Value::TimeWithTimeZone(
6469 validate_time_with_time_zone_string(&value)?,
6470 )),
6471 (DataType::IntervalYearMonth, value) => Ok(Value::IntervalYearMonth(
6472 IntervalYearMonthValue::parse(&cast_value_to_text_string(value)?)?,
6473 )),
6474 (DataType::IntervalDaySecond, value) => Ok(Value::IntervalDaySecond(
6475 IntervalDaySecondValue::parse(&cast_value_to_text_string(value)?)?,
6476 )),
6477 (data_type, value) => Err(HematiteError::ParseError(format!(
6478 "Cannot CAST {:?} AS {}",
6479 value,
6480 data_type.name()
6481 ))),
6482 }
6483}
6484
6485fn evaluate_scalar_function(function: ScalarFunction, args: Vec<Value>) -> Result<Value> {
6486 match function {
6487 ScalarFunction::Coalesce => evaluate_coalesce(args),
6488 ScalarFunction::IfNull => evaluate_ifnull(args),
6489 ScalarFunction::NullIf => evaluate_nullif(args),
6490 ScalarFunction::DateFn => evaluate_date_fn(args),
6491 ScalarFunction::TimeFn => evaluate_time_fn(args),
6492 ScalarFunction::Year => evaluate_year(args),
6493 ScalarFunction::Month => evaluate_month(args),
6494 ScalarFunction::Day => evaluate_day(args),
6495 ScalarFunction::Hour => evaluate_hour(args),
6496 ScalarFunction::Minute => evaluate_minute(args),
6497 ScalarFunction::Second => evaluate_second(args),
6498 ScalarFunction::TimeToSec => evaluate_time_to_sec(args),
6499 ScalarFunction::SecToTime => evaluate_sec_to_time(args),
6500 ScalarFunction::UnixTimestamp => evaluate_unix_timestamp(args),
6501 ScalarFunction::Lower => evaluate_lower(args),
6502 ScalarFunction::Upper => evaluate_upper(args),
6503 ScalarFunction::Length => evaluate_length(args),
6504 ScalarFunction::OctetLength => evaluate_octet_length(args),
6505 ScalarFunction::BitLength => evaluate_bit_length(args),
6506 ScalarFunction::Trim => evaluate_trim(args),
6507 ScalarFunction::Abs => evaluate_abs(args),
6508 ScalarFunction::Round => evaluate_round(args),
6509 ScalarFunction::Concat => evaluate_concat(args),
6510 ScalarFunction::ConcatWs => evaluate_concat_ws(args),
6511 ScalarFunction::Substring => evaluate_substring(args),
6512 ScalarFunction::LeftFn => evaluate_left(args),
6513 ScalarFunction::RightFn => evaluate_right(args),
6514 ScalarFunction::Greatest => evaluate_extremum("GREATEST", args, true),
6515 ScalarFunction::Least => evaluate_extremum("LEAST", args, false),
6516 ScalarFunction::Replace => evaluate_replace(args),
6517 ScalarFunction::Repeat => evaluate_repeat(args),
6518 ScalarFunction::Reverse => evaluate_reverse(args),
6519 ScalarFunction::Locate => evaluate_locate(args),
6520 ScalarFunction::Hex => evaluate_hex(args),
6521 ScalarFunction::Unhex => evaluate_unhex(args),
6522 ScalarFunction::Ceil => evaluate_ceil(args),
6523 ScalarFunction::Floor => evaluate_floor(args),
6524 ScalarFunction::Power => evaluate_power(args),
6525 }
6526}
6527
6528fn evaluate_coalesce(args: Vec<Value>) -> Result<Value> {
6529 if args.is_empty() {
6530 return Err(HematiteError::ParseError(
6531 "COALESCE requires at least one argument".to_string(),
6532 ));
6533 }
6534
6535 for arg in args {
6536 if !arg.is_null() {
6537 return Ok(arg);
6538 }
6539 }
6540
6541 Ok(Value::Null)
6542}
6543
6544fn evaluate_date_fn(args: Vec<Value>) -> Result<Value> {
6545 expect_unary_temporal_function("DATE", args, |value| {
6546 Ok(Value::Date(extract_date_component("DATE", value)?))
6547 })
6548}
6549
6550fn evaluate_time_fn(args: Vec<Value>) -> Result<Value> {
6551 expect_unary_temporal_function("TIME", args, |value| {
6552 Ok(Value::Time(extract_time_component("TIME", value)?))
6553 })
6554}
6555
6556fn evaluate_year(args: Vec<Value>) -> Result<Value> {
6557 expect_unary_temporal_function("YEAR", args, |value| {
6558 let (year, _, _) = extract_date_component("YEAR", value)?.components();
6559 Ok(Value::Integer(year))
6560 })
6561}
6562
6563fn evaluate_month(args: Vec<Value>) -> Result<Value> {
6564 expect_unary_temporal_function("MONTH", args, |value| {
6565 let (_, month, _) = extract_date_component("MONTH", value)?.components();
6566 Ok(Value::Integer(month as i32))
6567 })
6568}
6569
6570fn evaluate_day(args: Vec<Value>) -> Result<Value> {
6571 expect_unary_temporal_function("DAY", args, |value| {
6572 let (_, _, day) = extract_date_component("DAY", value)?.components();
6573 Ok(Value::Integer(day as i32))
6574 })
6575}
6576
6577fn evaluate_hour(args: Vec<Value>) -> Result<Value> {
6578 expect_unary_temporal_function("HOUR", args, |value| {
6579 let (hour, _, _) = extract_time_component("HOUR", value)?.components();
6580 Ok(Value::Integer(hour as i32))
6581 })
6582}
6583
6584fn evaluate_minute(args: Vec<Value>) -> Result<Value> {
6585 expect_unary_temporal_function("MINUTE", args, |value| {
6586 let (_, minute, _) = extract_time_component("MINUTE", value)?.components();
6587 Ok(Value::Integer(minute as i32))
6588 })
6589}
6590
6591fn evaluate_second(args: Vec<Value>) -> Result<Value> {
6592 expect_unary_temporal_function("SECOND", args, |value| {
6593 let (_, _, second) = extract_time_component("SECOND", value)?.components();
6594 Ok(Value::Integer(second as i32))
6595 })
6596}
6597
6598fn evaluate_time_to_sec(args: Vec<Value>) -> Result<Value> {
6599 expect_unary_temporal_function("TIME_TO_SEC", args, |value| {
6600 Ok(Value::BigInt(
6601 extract_time_component("TIME_TO_SEC", value)?.seconds_since_midnight() as i64,
6602 ))
6603 })
6604}
6605
6606fn evaluate_sec_to_time(args: Vec<Value>) -> Result<Value> {
6607 if args.len() != 1 {
6608 return Err(HematiteError::ParseError(
6609 "SEC_TO_TIME requires exactly one argument".to_string(),
6610 ));
6611 }
6612
6613 let value = args.into_iter().next().expect("validated arity");
6614 match value {
6615 Value::Null => Ok(Value::Null),
6616 Value::Integer(value) => Ok(Value::Time(TimeValue::from_seconds_since_midnight(
6617 add_wrapped_seconds(0, value as i64),
6618 ))),
6619 Value::BigInt(value) => Ok(Value::Time(TimeValue::from_seconds_since_midnight(
6620 add_wrapped_seconds(0, value),
6621 ))),
6622 value => Err(HematiteError::ParseError(format!(
6623 "SEC_TO_TIME requires an integer value, found {:?}",
6624 value
6625 ))),
6626 }
6627}
6628
6629fn evaluate_unix_timestamp(args: Vec<Value>) -> Result<Value> {
6630 expect_unary_temporal_function("UNIX_TIMESTAMP", args, |value| {
6631 let timestamp = extract_timestamp_component("UNIX_TIMESTAMP", value)?;
6632 Ok(Value::BigInt(timestamp.seconds_since_epoch()))
6633 })
6634}
6635
6636fn evaluate_ifnull(args: Vec<Value>) -> Result<Value> {
6637 if args.len() != 2 {
6638 return Err(HematiteError::ParseError(
6639 "IFNULL requires exactly two arguments".to_string(),
6640 ));
6641 }
6642
6643 let mut args = args.into_iter();
6644 let first = args.next().expect("ifnull validated arity");
6645 let second = args.next().expect("ifnull validated arity");
6646 if first.is_null() {
6647 Ok(second)
6648 } else {
6649 Ok(first)
6650 }
6651}
6652
6653fn evaluate_nullif(args: Vec<Value>) -> Result<Value> {
6654 if args.len() != 2 {
6655 return Err(HematiteError::ParseError(
6656 "NULLIF requires exactly two arguments".to_string(),
6657 ));
6658 }
6659
6660 let mut args = args.into_iter();
6661 let left = args.next().expect("nullif validated arity");
6662 let right = args.next().expect("nullif validated arity");
6663 if left.is_null() {
6664 return Ok(Value::Null);
6665 }
6666 if right.is_null() {
6667 return Ok(left);
6668 }
6669 if sql_values_equal(&left, &right, None) {
6670 Ok(Value::Null)
6671 } else {
6672 Ok(left)
6673 }
6674}
6675
6676fn evaluate_lower(args: Vec<Value>) -> Result<Value> {
6677 expect_unary_text_function("LOWER", args, |text| Ok(Value::Text(text.to_lowercase())))
6678}
6679
6680fn evaluate_upper(args: Vec<Value>) -> Result<Value> {
6681 expect_unary_text_function("UPPER", args, |text| Ok(Value::Text(text.to_uppercase())))
6682}
6683
6684fn evaluate_length(args: Vec<Value>) -> Result<Value> {
6685 expect_unary_length_function("LENGTH", args, |value| match value {
6686 Value::Text(text) => {
6687 let len = i32::try_from(text.chars().count()).map_err(|_| {
6688 HematiteError::ParseError("LENGTH result overflowed INT".to_string())
6689 })?;
6690 Ok(Value::Integer(len))
6691 }
6692 Value::Blob(bytes) => {
6693 let len = i32::try_from(bytes.len()).map_err(|_| {
6694 HematiteError::ParseError("LENGTH result overflowed INT".to_string())
6695 })?;
6696 Ok(Value::Integer(len))
6697 }
6698 value => Err(HematiteError::ParseError(format!(
6699 "LENGTH requires a text or blob value, found {:?}",
6700 value
6701 ))),
6702 })
6703}
6704
6705fn evaluate_octet_length(args: Vec<Value>) -> Result<Value> {
6706 expect_unary_length_function("OCTET_LENGTH", args, |value| {
6707 let len = match value {
6708 Value::Text(text) => text.len(),
6709 Value::Enum(text) => text.len(),
6710 Value::Blob(bytes) => bytes.len(),
6711 value => {
6712 return Err(HematiteError::ParseError(format!(
6713 "OCTET_LENGTH requires a text or blob value, found {:?}",
6714 value
6715 )))
6716 }
6717 };
6718 let len = i32::try_from(len).map_err(|_| {
6719 HematiteError::ParseError("OCTET_LENGTH result overflowed INT".to_string())
6720 })?;
6721 Ok(Value::Integer(len))
6722 })
6723}
6724
6725fn evaluate_bit_length(args: Vec<Value>) -> Result<Value> {
6726 expect_unary_length_function("BIT_LENGTH", args, |value| {
6727 let len = match evaluate_octet_length(vec![value])? {
6728 Value::Integer(length) => length,
6729 Value::Null => return Ok(Value::Null),
6730 _ => unreachable!("validated OCTET_LENGTH shape"),
6731 };
6732 len.checked_mul(8).map(Value::Integer).ok_or_else(|| {
6733 HematiteError::ParseError("BIT_LENGTH result overflowed INT".to_string())
6734 })
6735 })
6736}
6737
6738fn evaluate_trim(args: Vec<Value>) -> Result<Value> {
6739 expect_unary_text_function("TRIM", args, |text| {
6740 Ok(Value::Text(text.trim().to_string()))
6741 })
6742}
6743
6744fn expect_unary_text_function<F>(name: &str, args: Vec<Value>, f: F) -> Result<Value>
6745where
6746 F: FnOnce(&str) -> Result<Value>,
6747{
6748 if args.len() != 1 {
6749 return Err(HematiteError::ParseError(format!(
6750 "{} requires exactly one argument",
6751 name
6752 )));
6753 }
6754
6755 let value = args.into_iter().next().expect("validated unary arity");
6756 match value {
6757 Value::Null => Ok(Value::Null),
6758 Value::Text(text) => f(&text),
6759 value => Err(HematiteError::ParseError(format!(
6760 "{} requires a text value, found {:?}",
6761 name, value
6762 ))),
6763 }
6764}
6765
6766fn expect_unary_length_function<F>(name: &str, args: Vec<Value>, f: F) -> Result<Value>
6767where
6768 F: FnOnce(Value) -> Result<Value>,
6769{
6770 if args.len() != 1 {
6771 return Err(HematiteError::ParseError(format!(
6772 "{} requires exactly one argument",
6773 name
6774 )));
6775 }
6776
6777 let value = args.into_iter().next().expect("validated unary arity");
6778 match value {
6779 Value::Null => Ok(Value::Null),
6780 value => f(value),
6781 }
6782}
6783
6784fn expect_unary_temporal_function<F>(name: &str, args: Vec<Value>, f: F) -> Result<Value>
6785where
6786 F: FnOnce(Value) -> Result<Value>,
6787{
6788 if args.len() != 1 {
6789 return Err(HematiteError::ParseError(format!(
6790 "{} requires exactly one argument",
6791 name
6792 )));
6793 }
6794
6795 let value = args.into_iter().next().expect("validated unary arity");
6796 match value {
6797 Value::Null => Ok(Value::Null),
6798 value => f(value),
6799 }
6800}
6801
6802fn evaluate_abs(args: Vec<Value>) -> Result<Value> {
6803 expect_unary_numeric_function("ABS", args, |value| match value {
6804 Value::Integer(value) => {
6805 if value == i32::MIN {
6806 return Err(HematiteError::ParseError("ABS overflowed INT".to_string()));
6807 }
6808 Ok(Value::Integer(value.abs()))
6809 }
6810 Value::BigInt(value) => {
6811 if value == i64::MIN {
6812 return Err(HematiteError::ParseError(
6813 "ABS overflowed INT64".to_string(),
6814 ));
6815 }
6816 Ok(Value::BigInt(value.abs()))
6817 }
6818 Value::Int128(value) => value
6819 .checked_abs()
6820 .map(Value::Int128)
6821 .ok_or_else(|| HematiteError::ParseError("ABS overflowed INT128".to_string())),
6822 Value::UInteger(value) => Ok(Value::UInteger(value)),
6823 Value::UBigInt(value) => Ok(Value::UBigInt(value)),
6824 Value::UInt128(value) => Ok(Value::UInt128(value)),
6825 Value::Float32(value) => Ok(Value::Float32(value.abs())),
6826 Value::Float(value) => Ok(Value::Float(value.abs())),
6827 _ => unreachable!("validated numeric input"),
6828 })
6829}
6830
6831fn evaluate_round(args: Vec<Value>) -> Result<Value> {
6832 if args.is_empty() || args.len() > 2 {
6833 return Err(HematiteError::ParseError(
6834 "ROUND requires one or two arguments".to_string(),
6835 ));
6836 }
6837
6838 let mut args = args.into_iter();
6839 let value = args.next().expect("validated round arity");
6840 let precision = match args.next() {
6841 Some(Value::Null) => return Ok(Value::Null),
6842 Some(Value::Integer(value)) => value,
6843 Some(value) => {
6844 return Err(HematiteError::ParseError(format!(
6845 "ROUND precision requires an integer value, found {:?}",
6846 value
6847 )))
6848 }
6849 None => 0,
6850 };
6851
6852 match value {
6853 Value::Null => Ok(Value::Null),
6854 Value::Integer(value) => round_integer(value, precision),
6855 Value::BigInt(value) => round_bigint(value, precision),
6856 Value::Int128(value) => round_int128(value, precision),
6857 Value::UInteger(value) => round_uinteger(value, precision),
6858 Value::UBigInt(value) => round_ubigint(value, precision),
6859 Value::UInt128(value) => round_uint128(value, precision),
6860 Value::Float32(value) => Ok(Value::Float32(round_float(value as f64, precision) as f32)),
6861 Value::Float(value) => Ok(Value::Float(round_float(value, precision))),
6862 value => Err(HematiteError::ParseError(format!(
6863 "ROUND requires a numeric value, found {:?}",
6864 value
6865 ))),
6866 }
6867}
6868
6869fn evaluate_concat(args: Vec<Value>) -> Result<Value> {
6870 if args.is_empty() {
6871 return Err(HematiteError::ParseError(
6872 "CONCAT requires at least one argument".to_string(),
6873 ));
6874 }
6875
6876 let mut out = String::new();
6877 for arg in args {
6878 if arg.is_null() {
6879 return Ok(Value::Null);
6880 }
6881 out.push_str(&coerce_value_to_string("CONCAT", arg)?);
6882 }
6883 Ok(Value::Text(out))
6884}
6885
6886fn evaluate_concat_ws(args: Vec<Value>) -> Result<Value> {
6887 if args.len() < 2 {
6888 return Err(HematiteError::ParseError(
6889 "CONCAT_WS requires at least two arguments".to_string(),
6890 ));
6891 }
6892
6893 let mut args = args.into_iter();
6894 let separator = args.next().expect("concat_ws validated arity");
6895 if separator.is_null() {
6896 return Ok(Value::Null);
6897 }
6898 let separator = coerce_value_to_string("CONCAT_WS", separator)?;
6899
6900 let mut parts = Vec::new();
6901 for arg in args {
6902 if arg.is_null() {
6903 continue;
6904 }
6905 parts.push(coerce_value_to_string("CONCAT_WS", arg)?);
6906 }
6907
6908 Ok(Value::Text(parts.join(&separator)))
6909}
6910
6911fn evaluate_substring(args: Vec<Value>) -> Result<Value> {
6912 if args.len() != 2 && args.len() != 3 {
6913 return Err(HematiteError::ParseError(
6914 "SUBSTRING requires two or three arguments".to_string(),
6915 ));
6916 }
6917
6918 let mut args = args.into_iter();
6919 let text = args.next().expect("validated substring arity");
6920 let start = args.next().expect("validated substring arity");
6921 let len = args.next();
6922
6923 if text.is_null() || start.is_null() || len.as_ref().is_some_and(Value::is_null) {
6924 return Ok(Value::Null);
6925 }
6926
6927 let text = expect_text_argument("SUBSTRING", text)?;
6928 let start = expect_integer_argument("SUBSTRING", start, "start position")?;
6929 let len = len
6930 .map(|value| expect_integer_argument("SUBSTRING", value, "length"))
6931 .transpose()?;
6932
6933 substring_chars(&text, start, len)
6934}
6935
6936fn evaluate_left(args: Vec<Value>) -> Result<Value> {
6937 if args.len() != 2 {
6938 return Err(HematiteError::ParseError(
6939 "LEFT requires exactly two arguments".to_string(),
6940 ));
6941 }
6942
6943 let mut args = args.into_iter();
6944 let text = args.next().expect("validated left arity");
6945 let len = args.next().expect("validated left arity");
6946 if text.is_null() || len.is_null() {
6947 return Ok(Value::Null);
6948 }
6949
6950 let text = expect_text_argument("LEFT", text)?;
6951 let len = expect_integer_argument("LEFT", len, "length")?;
6952 if len < 0 {
6953 return Ok(Value::Text(String::new()));
6954 }
6955
6956 let out = text.chars().take(len as usize).collect::<String>();
6957 Ok(Value::Text(out))
6958}
6959
6960fn evaluate_right(args: Vec<Value>) -> Result<Value> {
6961 if args.len() != 2 {
6962 return Err(HematiteError::ParseError(
6963 "RIGHT requires exactly two arguments".to_string(),
6964 ));
6965 }
6966
6967 let mut args = args.into_iter();
6968 let text = args.next().expect("validated right arity");
6969 let len = args.next().expect("validated right arity");
6970 if text.is_null() || len.is_null() {
6971 return Ok(Value::Null);
6972 }
6973
6974 let text = expect_text_argument("RIGHT", text)?;
6975 let len = expect_integer_argument("RIGHT", len, "length")?;
6976 if len < 0 {
6977 return Ok(Value::Text(String::new()));
6978 }
6979
6980 let chars = text.chars().collect::<Vec<_>>();
6981 let take = len as usize;
6982 let start = chars.len().saturating_sub(take);
6983 let out = chars[start..].iter().collect::<String>();
6984 Ok(Value::Text(out))
6985}
6986
6987fn expect_unary_numeric_function<F>(name: &str, args: Vec<Value>, f: F) -> Result<Value>
6988where
6989 F: FnOnce(Value) -> Result<Value>,
6990{
6991 if args.len() != 1 {
6992 return Err(HematiteError::ParseError(format!(
6993 "{} requires exactly one argument",
6994 name
6995 )));
6996 }
6997
6998 let value = args.into_iter().next().expect("validated unary arity");
6999 match value {
7000 Value::Null => Ok(Value::Null),
7001 Value::Integer(_)
7002 | Value::BigInt(_)
7003 | Value::Int128(_)
7004 | Value::UInteger(_)
7005 | Value::UBigInt(_)
7006 | Value::UInt128(_)
7007 | Value::Float32(_)
7008 | Value::Float(_) => f(value),
7009 value => Err(HematiteError::ParseError(format!(
7010 "{} requires a numeric value, found {:?}",
7011 name, value
7012 ))),
7013 }
7014}
7015
7016fn expect_numeric_argument(function_name: &str, value: Value) -> Result<f64> {
7017 match value {
7018 Value::Decimal(value) => value.to_string().parse::<f64>().map_err(|_| {
7019 HematiteError::ParseError(format!(
7020 "{} requires a numeric value, found {:?}",
7021 function_name,
7022 Value::Decimal(value.clone())
7023 ))
7024 }),
7025 value if numeric_value_as_f64(&value).is_some() => {
7026 Ok(numeric_value_as_f64(&value).unwrap())
7027 }
7028 value => Err(HematiteError::ParseError(format!(
7029 "{} requires a numeric value, found {:?}",
7030 function_name, value
7031 ))),
7032 }
7033}
7034
7035fn coerce_value_to_string(function_name: &str, value: Value) -> Result<String> {
7036 match value {
7037 Value::Text(text) => Ok(text),
7038 Value::Enum(text) => Ok(text),
7039 Value::Decimal(text) => Ok(text.to_string()),
7040 Value::Date(text) => Ok(text.to_string()),
7041 Value::Time(text) => Ok(text.to_string()),
7042 Value::DateTime(text) => Ok(text.to_string()),
7043 Value::TimeWithTimeZone(text) => Ok(text.to_string()),
7044 Value::IntervalYearMonth(text) => Ok(text.to_string()),
7045 Value::IntervalDaySecond(text) => Ok(text.to_string()),
7046 Value::Integer(value) => Ok(value.to_string()),
7047 Value::BigInt(value) => Ok(value.to_string()),
7048 Value::Int128(value) => Ok(value.to_string()),
7049 Value::UInteger(value) => Ok(value.to_string()),
7050 Value::UBigInt(value) => Ok(value.to_string()),
7051 Value::UInt128(value) => Ok(value.to_string()),
7052 Value::Float32(value) => Ok(value.to_string()),
7053 Value::Float(value) => Ok(value.to_string()),
7054 Value::Boolean(true) => Ok("TRUE".to_string()),
7055 Value::Boolean(false) => Ok("FALSE".to_string()),
7056 Value::Blob(value) => Ok(String::from_utf8_lossy(&value).into_owned()),
7057 Value::Null => Err(HematiteError::ParseError(format!(
7058 "{} cannot stringify NULL directly",
7059 function_name
7060 ))),
7061 }
7062}
7063
7064fn expect_text_argument(function_name: &str, value: Value) -> Result<String> {
7065 match value {
7066 Value::Text(text) => Ok(text),
7067 Value::Enum(text) => Ok(text),
7068 Value::Decimal(text) => Ok(text.to_string()),
7069 Value::Date(text) => Ok(text.to_string()),
7070 Value::Time(text) => Ok(text.to_string()),
7071 Value::DateTime(text) => Ok(text.to_string()),
7072 Value::TimeWithTimeZone(text) => Ok(text.to_string()),
7073 Value::Integer(value) => Ok(value.to_string()),
7074 Value::BigInt(value) => Ok(value.to_string()),
7075 Value::Int128(value) => Ok(value.to_string()),
7076 Value::UInteger(value) => Ok(value.to_string()),
7077 Value::UBigInt(value) => Ok(value.to_string()),
7078 Value::UInt128(value) => Ok(value.to_string()),
7079 Value::Float32(value) => Ok(value.to_string()),
7080 Value::Float(value) => Ok(value.to_string()),
7081 value => Err(HematiteError::ParseError(format!(
7082 "{} requires a text value, found {:?}",
7083 function_name, value
7084 ))),
7085 }
7086}
7087
7088fn expect_integer_argument(function_name: &str, value: Value, label: &str) -> Result<i32> {
7089 match value {
7090 Value::Null => Err(HematiteError::ParseError(format!(
7091 "{} {} cannot be NULL",
7092 function_name, label
7093 ))),
7094 Value::Integer(value) => Ok(value),
7095 Value::BigInt(value) => i32::try_from(value).map_err(|_| {
7096 HematiteError::ParseError(format!(
7097 "{} {} requires a 32-bit integer value, found {:?}",
7098 function_name,
7099 label,
7100 Value::BigInt(value)
7101 ))
7102 }),
7103 Value::Int128(value) => i32::try_from(value).map_err(|_| {
7104 HematiteError::ParseError(format!(
7105 "{} {} requires a 32-bit integer value, found {:?}",
7106 function_name,
7107 label,
7108 Value::Int128(value)
7109 ))
7110 }),
7111 Value::UInteger(value) => i32::try_from(value).map_err(|_| {
7112 HematiteError::ParseError(format!(
7113 "{} {} requires a 32-bit integer value, found {:?}",
7114 function_name,
7115 label,
7116 Value::UInteger(value)
7117 ))
7118 }),
7119 Value::UBigInt(value) => i32::try_from(value).map_err(|_| {
7120 HematiteError::ParseError(format!(
7121 "{} {} requires a 32-bit integer value, found {:?}",
7122 function_name,
7123 label,
7124 Value::UBigInt(value)
7125 ))
7126 }),
7127 Value::UInt128(value) => i32::try_from(value).map_err(|_| {
7128 HematiteError::ParseError(format!(
7129 "{} {} requires a 32-bit integer value, found {:?}",
7130 function_name,
7131 label,
7132 Value::UInt128(value)
7133 ))
7134 }),
7135 Value::Float32(value) => Ok(value as i32),
7136 Value::Float(value) => Ok(value as i32),
7137 value => Err(HematiteError::ParseError(format!(
7138 "{} {} requires an integer value, found {:?}",
7139 function_name, label, value
7140 ))),
7141 }
7142}
7143
7144fn extract_date_component(function_name: &str, value: Value) -> Result<DateValue> {
7145 match value {
7146 Value::Date(value) => Ok(value),
7147 Value::DateTime(value) => Ok(value.components().0),
7148 Value::Text(value) | Value::Enum(value) => DateValue::parse(&value)
7149 .or_else(|_| DateTimeValue::parse(&value).map(|value| value.components().0))
7150 .map_err(|_| {
7151 HematiteError::ParseError(format!(
7152 "{} requires a DATE-like value, found '{}'",
7153 function_name, value
7154 ))
7155 }),
7156 value => Err(HematiteError::ParseError(format!(
7157 "{} requires a DATE-like value, found {:?}",
7158 function_name, value
7159 ))),
7160 }
7161}
7162
7163fn extract_time_component(function_name: &str, value: Value) -> Result<TimeValue> {
7164 match value {
7165 Value::Time(value) => Ok(value),
7166 Value::TimeWithTimeZone(value) => Ok(value.time()),
7167 Value::DateTime(value) => Ok(value.components().1),
7168 Value::Text(value) | Value::Enum(value) => TimeValue::parse(&value)
7169 .or_else(|_| TimeWithTimeZoneValue::parse(&value).map(|value| value.time()))
7170 .or_else(|_| DateTimeValue::parse(&value).map(|value| value.components().1))
7171 .map_err(|_| {
7172 HematiteError::ParseError(format!(
7173 "{} requires a TIME-like value, found '{}'",
7174 function_name, value
7175 ))
7176 }),
7177 value => Err(HematiteError::ParseError(format!(
7178 "{} requires a TIME-like value, found {:?}",
7179 function_name, value
7180 ))),
7181 }
7182}
7183
7184fn extract_timestamp_component(function_name: &str, value: Value) -> Result<DateTimeValue> {
7185 match value {
7186 Value::DateTime(value) => Ok(value),
7187 Value::Date(value) => Ok(DateTimeValue::from_seconds_since_epoch(
7188 value.days_since_epoch() as i64 * 86_400,
7189 )),
7190 Value::Text(value) | Value::Enum(value) => DateTimeValue::parse(&value)
7191 .or_else(|_| {
7192 DateValue::parse(&value).map(|value| {
7193 DateTimeValue::from_seconds_since_epoch(
7194 value.days_since_epoch() as i64 * 86_400,
7195 )
7196 })
7197 })
7198 .map_err(|_| {
7199 HematiteError::ParseError(format!(
7200 "{} requires a DATETIME-like value, found '{}'",
7201 function_name, value
7202 ))
7203 }),
7204 value => Err(HematiteError::ParseError(format!(
7205 "{} requires a DATETIME-like value, found {:?}",
7206 function_name, value
7207 ))),
7208 }
7209}
7210
7211fn substring_chars(text: &str, start: i32, len: Option<i32>) -> Result<Value> {
7212 let chars = text.chars().collect::<Vec<_>>();
7213 let start_index = if start > 0 {
7214 start.saturating_sub(1) as usize
7215 } else if start < 0 {
7216 chars.len().saturating_sub((-start) as usize)
7217 } else {
7218 0
7219 };
7220
7221 if let Some(len) = len {
7222 if len <= 0 {
7223 return Ok(Value::Text(String::new()));
7224 }
7225 let end = start_index.saturating_add(len as usize).min(chars.len());
7226 return Ok(Value::Text(
7227 chars[start_index.min(chars.len())..end].iter().collect(),
7228 ));
7229 }
7230
7231 Ok(Value::Text(
7232 chars[start_index.min(chars.len())..].iter().collect(),
7233 ))
7234}
7235
7236fn evaluate_extremum(function_name: &str, args: Vec<Value>, pick_greater: bool) -> Result<Value> {
7237 if args.len() < 2 {
7238 return Err(HematiteError::ParseError(format!(
7239 "{} requires at least two arguments",
7240 function_name
7241 )));
7242 }
7243
7244 if args.iter().any(Value::is_null) {
7245 return Ok(Value::Null);
7246 }
7247
7248 let mut values = args.into_iter();
7249 let mut best = values.next().expect("validated extremum arity");
7250 for value in values {
7251 let ordering = sql_partial_cmp(&value, &best, None).ok_or_else(|| {
7252 HematiteError::ParseError(format!(
7253 "{} requires mutually comparable arguments",
7254 function_name
7255 ))
7256 })?;
7257 let should_replace = if pick_greater {
7258 ordering.is_gt()
7259 } else {
7260 ordering.is_lt()
7261 };
7262 if should_replace {
7263 best = value;
7264 }
7265 }
7266
7267 Ok(best)
7268}
7269
7270fn evaluate_replace(args: Vec<Value>) -> Result<Value> {
7271 if args.len() != 3 {
7272 return Err(HematiteError::ParseError(
7273 "REPLACE requires exactly three arguments".to_string(),
7274 ));
7275 }
7276
7277 let mut args = args.into_iter();
7278 let text = args.next().expect("validated replace arity");
7279 let from = args.next().expect("validated replace arity");
7280 let to = args.next().expect("validated replace arity");
7281 if text.is_null() || from.is_null() || to.is_null() {
7282 return Ok(Value::Null);
7283 }
7284
7285 let text = expect_text_argument("REPLACE", text)?;
7286 let from = expect_text_argument("REPLACE", from)?;
7287 let to = expect_text_argument("REPLACE", to)?;
7288 Ok(Value::Text(text.replace(&from, &to)))
7289}
7290
7291fn evaluate_repeat(args: Vec<Value>) -> Result<Value> {
7292 if args.len() != 2 {
7293 return Err(HematiteError::ParseError(
7294 "REPEAT requires exactly two arguments".to_string(),
7295 ));
7296 }
7297
7298 let mut args = args.into_iter();
7299 let text = args.next().expect("validated repeat arity");
7300 let count = args.next().expect("validated repeat arity");
7301 if text.is_null() || count.is_null() {
7302 return Ok(Value::Null);
7303 }
7304
7305 let text = expect_text_argument("REPEAT", text)?;
7306 let count = expect_integer_argument("REPEAT", count, "count")?;
7307 if count <= 0 {
7308 return Ok(Value::Text(String::new()));
7309 }
7310
7311 let count = usize::try_from(count)
7312 .map_err(|_| HematiteError::ParseError("REPEAT count overflowed usize".to_string()))?;
7313 Ok(Value::Text(text.repeat(count)))
7314}
7315
7316fn evaluate_reverse(args: Vec<Value>) -> Result<Value> {
7317 if args.len() != 1 {
7318 return Err(HematiteError::ParseError(
7319 "REVERSE requires exactly one argument".to_string(),
7320 ));
7321 }
7322
7323 let value = args.into_iter().next().expect("validated reverse arity");
7324 if value.is_null() {
7325 return Ok(Value::Null);
7326 }
7327
7328 let text = expect_text_argument("REVERSE", value)?;
7329 Ok(Value::Text(text.chars().rev().collect()))
7330}
7331
7332fn evaluate_locate(args: Vec<Value>) -> Result<Value> {
7333 if args.len() != 2 && args.len() != 3 {
7334 return Err(HematiteError::ParseError(
7335 "LOCATE requires two or three arguments".to_string(),
7336 ));
7337 }
7338
7339 let mut args = args.into_iter();
7340 let needle = args.next().expect("validated locate arity");
7341 let haystack = args.next().expect("validated locate arity");
7342 let start = args.next();
7343 if needle.is_null() || haystack.is_null() || start.as_ref().is_some_and(Value::is_null) {
7344 return Ok(Value::Null);
7345 }
7346
7347 let needle = expect_text_argument("LOCATE", needle)?;
7348 let haystack = expect_text_argument("LOCATE", haystack)?;
7349 let start = start
7350 .map(|value| expect_integer_argument("LOCATE", value, "start position"))
7351 .transpose()?
7352 .unwrap_or(1);
7353
7354 let haystack_chars = haystack.chars().collect::<Vec<_>>();
7355 let needle_chars = needle.chars().collect::<Vec<_>>();
7356 let start_index = start.saturating_sub(1).max(0) as usize;
7357 if needle_chars.is_empty() {
7358 let position = start_index.min(haystack_chars.len()) + 1;
7359 return Ok(Value::Integer(position as i32));
7360 }
7361 if start_index >= haystack_chars.len() || needle_chars.len() > haystack_chars.len() {
7362 return Ok(Value::Integer(0));
7363 }
7364
7365 for index in start_index..=haystack_chars.len() - needle_chars.len() {
7366 if haystack_chars[index..index + needle_chars.len()] == needle_chars[..] {
7367 return Ok(Value::Integer((index + 1) as i32));
7368 }
7369 }
7370
7371 Ok(Value::Integer(0))
7372}
7373
7374fn evaluate_hex(args: Vec<Value>) -> Result<Value> {
7375 expect_unary_length_function("HEX", args, |value| {
7376 let bytes = match value {
7377 Value::Blob(bytes) => bytes,
7378 Value::Text(text) => text.into_bytes(),
7379 Value::Enum(text) => text.into_bytes(),
7380 value => {
7381 return Err(HematiteError::ParseError(format!(
7382 "HEX requires a text or blob value, found {:?}",
7383 value
7384 )))
7385 }
7386 };
7387 Ok(Value::Text(
7388 bytes.iter().map(|byte| format!("{byte:02X}")).collect(),
7389 ))
7390 })
7391}
7392
7393fn evaluate_unhex(args: Vec<Value>) -> Result<Value> {
7394 expect_unary_text_function("UNHEX", args, |text| {
7395 if text.len() % 2 != 0 {
7396 return Err(HematiteError::ParseError(
7397 "UNHEX requires an even number of hexadecimal digits".to_string(),
7398 ));
7399 }
7400
7401 let mut bytes = Vec::with_capacity(text.len() / 2);
7402 for index in (0..text.len()).step_by(2) {
7403 let byte = u8::from_str_radix(&text[index..index + 2], 16).map_err(|_| {
7404 HematiteError::ParseError("UNHEX requires only hexadecimal digits".to_string())
7405 })?;
7406 bytes.push(byte);
7407 }
7408 Ok(Value::Blob(bytes))
7409 })
7410}
7411
7412fn evaluate_ceil(args: Vec<Value>) -> Result<Value> {
7413 expect_unary_numeric_function("CEIL", args, |value| match value {
7414 Value::Integer(value) => Ok(Value::Integer(value)),
7415 Value::BigInt(value) => Ok(Value::BigInt(value)),
7416 Value::Int128(value) => Ok(Value::Int128(value)),
7417 Value::UInteger(value) => Ok(Value::UInteger(value)),
7418 Value::UBigInt(value) => Ok(Value::UBigInt(value)),
7419 Value::UInt128(value) => Ok(Value::UInt128(value)),
7420 Value::Float32(value) => Ok(Value::Float32(value.ceil())),
7421 Value::Float(value) => Ok(Value::Float(value.ceil())),
7422 _ => unreachable!("validated numeric input"),
7423 })
7424}
7425
7426fn evaluate_floor(args: Vec<Value>) -> Result<Value> {
7427 expect_unary_numeric_function("FLOOR", args, |value| match value {
7428 Value::Integer(value) => Ok(Value::Integer(value)),
7429 Value::BigInt(value) => Ok(Value::BigInt(value)),
7430 Value::Int128(value) => Ok(Value::Int128(value)),
7431 Value::UInteger(value) => Ok(Value::UInteger(value)),
7432 Value::UBigInt(value) => Ok(Value::UBigInt(value)),
7433 Value::UInt128(value) => Ok(Value::UInt128(value)),
7434 Value::Float32(value) => Ok(Value::Float32(value.floor())),
7435 Value::Float(value) => Ok(Value::Float(value.floor())),
7436 _ => unreachable!("validated numeric input"),
7437 })
7438}
7439
7440fn evaluate_power(args: Vec<Value>) -> Result<Value> {
7441 if args.len() != 2 {
7442 return Err(HematiteError::ParseError(
7443 "POWER requires exactly two arguments".to_string(),
7444 ));
7445 }
7446
7447 let mut args = args.into_iter();
7448 let base = args.next().expect("validated power arity");
7449 let exponent = args.next().expect("validated power arity");
7450 if base.is_null() || exponent.is_null() {
7451 return Ok(Value::Null);
7452 }
7453
7454 let base = expect_numeric_argument("POWER", base)?;
7455 let exponent = expect_numeric_argument("POWER", exponent)?;
7456 let value = base.powf(exponent);
7457 if !value.is_finite() {
7458 return Err(HematiteError::ParseError(
7459 "POWER produced a non-finite result".to_string(),
7460 ));
7461 }
7462
7463 Ok(Value::Float(value))
7464}
7465
7466fn round_integer(value: i32, precision: i32) -> Result<Value> {
7467 if precision >= 0 {
7468 return Ok(Value::Integer(value));
7469 }
7470
7471 let rounded = round_float(value as f64, precision);
7472 let rounded = i32::try_from(rounded as i64)
7473 .map_err(|_| HematiteError::ParseError("ROUND overflowed INT".to_string()))?;
7474 Ok(Value::Integer(rounded))
7475}
7476
7477fn round_bigint(value: i64, precision: i32) -> Result<Value> {
7478 if precision >= 0 {
7479 return Ok(Value::BigInt(value));
7480 }
7481
7482 let rounded = round_float(value as f64, precision);
7483 let rounded = i64::try_from(rounded as i128)
7484 .map_err(|_| HematiteError::ParseError("ROUND overflowed INT64".to_string()))?;
7485 Ok(Value::BigInt(rounded))
7486}
7487
7488fn round_int128(value: i128, precision: i32) -> Result<Value> {
7489 if precision >= 0 {
7490 return Ok(Value::Int128(value));
7491 }
7492
7493 let rounded = round_float(value as f64, precision);
7494 if !rounded.is_finite() || rounded < i128::MIN as f64 || rounded > i128::MAX as f64 {
7495 return Err(HematiteError::ParseError(
7496 "ROUND overflowed INT128".to_string(),
7497 ));
7498 }
7499 Ok(Value::Int128(rounded as i128))
7500}
7501
7502fn round_uinteger(value: u32, precision: i32) -> Result<Value> {
7503 if precision >= 0 {
7504 return Ok(Value::UInteger(value));
7505 }
7506
7507 let rounded = round_float(value as f64, precision);
7508 if rounded < 0.0 || rounded > u32::MAX as f64 {
7509 return Err(HematiteError::ParseError(
7510 "ROUND overflowed UINT".to_string(),
7511 ));
7512 }
7513 Ok(Value::UInteger(rounded as u32))
7514}
7515
7516fn round_ubigint(value: u64, precision: i32) -> Result<Value> {
7517 if precision >= 0 {
7518 return Ok(Value::UBigInt(value));
7519 }
7520
7521 let rounded = round_float(value as f64, precision);
7522 if rounded < 0.0 || rounded > u64::MAX as f64 {
7523 return Err(HematiteError::ParseError(
7524 "ROUND overflowed UINT64".to_string(),
7525 ));
7526 }
7527 Ok(Value::UBigInt(rounded as u64))
7528}
7529
7530fn round_uint128(value: u128, precision: i32) -> Result<Value> {
7531 if precision >= 0 {
7532 return Ok(Value::UInt128(value));
7533 }
7534
7535 let rounded = round_float(value as f64, precision);
7536 if !rounded.is_finite() || rounded < 0.0 || rounded > u128::MAX as f64 {
7537 return Err(HematiteError::ParseError(
7538 "ROUND overflowed UINT128".to_string(),
7539 ));
7540 }
7541 Ok(Value::UInt128(rounded as u128))
7542}
7543
7544fn round_float(value: f64, precision: i32) -> f64 {
7545 if precision >= 0 {
7546 let factor = 10f64.powi(precision);
7547 (value * factor).round() / factor
7548 } else {
7549 let factor = 10f64.powi(-precision);
7550 (value / factor).round() * factor
7551 }
7552}
7553
7554fn apply_text_comparison_context(
7555 value: &str,
7556 text_context: Option<TextComparisonContext>,
7557) -> String {
7558 let mut normalized = if text_context.is_some_and(|context| context.trim_trailing_spaces) {
7559 value.trim_end_matches(' ').to_string()
7560 } else {
7561 value.to_string()
7562 };
7563
7564 if text_context.is_some_and(|context| context.case_insensitive) {
7565 normalized = normalized.to_lowercase();
7566 }
7567
7568 normalized
7569}
7570
7571fn apply_blob_comparison_context(
7572 value: &[u8],
7573 text_context: Option<TextComparisonContext>,
7574) -> Vec<u8> {
7575 if text_context.is_some_and(|context| context.trim_trailing_zero_bytes) {
7576 value
7577 .iter()
7578 .copied()
7579 .rev()
7580 .skip_while(|byte| *byte == 0)
7581 .collect::<Vec<_>>()
7582 .into_iter()
7583 .rev()
7584 .collect()
7585 } else {
7586 value.to_vec()
7587 }
7588}
7589
7590fn like_matches_with_context(
7591 pattern: &str,
7592 text: &str,
7593 text_context: Option<TextComparisonContext>,
7594) -> bool {
7595 let pattern = apply_text_comparison_context(pattern, text_context);
7596 let text = apply_text_comparison_context(text, text_context);
7597 SelectExecutor::like_matches(&pattern, &text)
7598}
7599
7600fn sql_values_equal(
7601 left: &Value,
7602 right: &Value,
7603 text_context: Option<TextComparisonContext>,
7604) -> bool {
7605 if let Some(ordering) = sql_decimal_cmp(left, right) {
7606 return ordering == Ordering::Equal;
7607 }
7608 if let Some((left, right)) = sql_numeric_pair(left, right) {
7609 return left == right;
7610 }
7611 if let (Value::Text(left), Value::Text(right)) = (left, right) {
7612 return apply_text_comparison_context(left, text_context)
7613 == apply_text_comparison_context(right, text_context);
7614 }
7615 if let (Value::Blob(left), Value::Blob(right)) = (left, right) {
7616 return apply_blob_comparison_context(left, text_context)
7617 == apply_blob_comparison_context(right, text_context);
7618 }
7619
7620 left == right
7621}
7622
7623fn sql_partial_cmp(
7624 left: &Value,
7625 right: &Value,
7626 text_context: Option<TextComparisonContext>,
7627) -> Option<Ordering> {
7628 if let Some(ordering) = sql_decimal_cmp(left, right) {
7629 return Some(ordering);
7630 }
7631 if let Some((left, right)) = sql_numeric_pair(left, right) {
7632 return left.partial_cmp(&right);
7633 }
7634 if let (Value::Text(left), Value::Text(right)) = (left, right) {
7635 return Some(
7636 apply_text_comparison_context(left, text_context)
7637 .cmp(&apply_text_comparison_context(right, text_context)),
7638 );
7639 }
7640 if let (Value::Blob(left), Value::Blob(right)) = (left, right) {
7641 return Some(
7642 apply_blob_comparison_context(left, text_context)
7643 .cmp(&apply_blob_comparison_context(right, text_context)),
7644 );
7645 }
7646
7647 left.partial_cmp(right)
7648}
7649
7650fn decode_signed_blob(bytes: &[u8], width: usize, target: &str) -> Result<i128> {
7651 if bytes.len() > width {
7652 return Err(HematiteError::ParseError(format!(
7653 "Cannot CAST blob of {} bytes AS {}",
7654 bytes.len(),
7655 target
7656 )));
7657 }
7658
7659 let fill = bytes
7660 .last()
7661 .is_some_and(|byte| (byte & 0x80) != 0)
7662 .then_some(0xFF)
7663 .unwrap_or(0);
7664 let mut extended = [fill; 16];
7665 extended[..bytes.len()].copy_from_slice(bytes);
7666 Ok(i128::from_le_bytes(extended))
7667}
7668
7669fn decode_unsigned_blob(bytes: &[u8], width: usize, target: &str) -> Result<u128> {
7670 if bytes.len() > width {
7671 return Err(HematiteError::ParseError(format!(
7672 "Cannot CAST blob of {} bytes AS {}",
7673 bytes.len(),
7674 target
7675 )));
7676 }
7677
7678 let mut extended = [0u8; 16];
7679 extended[..bytes.len()].copy_from_slice(bytes);
7680 Ok(u128::from_le_bytes(extended))
7681}
7682
7683fn sql_numeric_pair(left: &Value, right: &Value) -> Option<(f64, f64)> {
7684 Some((numeric_value_as_f64(left)?, numeric_value_as_f64(right)?))
7685}
7686
7687fn sql_decimal_cmp(left: &Value, right: &Value) -> Option<Ordering> {
7688 let left = match left {
7689 Value::Decimal(value) => value.clone(),
7690 Value::Integer(value) => DecimalValue::from_i32(*value),
7691 Value::BigInt(value) => DecimalValue::from_i64(*value),
7692 Value::Int128(value) => DecimalValue::from_i128(*value),
7693 Value::UInteger(value) => DecimalValue::from_u32(*value),
7694 Value::UBigInt(value) => DecimalValue::from_u64(*value),
7695 Value::UInt128(value) => DecimalValue::from_u128(*value),
7696 Value::Float32(value) => DecimalValue::from_f64(*value as f64).ok()?,
7697 Value::Float(value) => DecimalValue::from_f64(*value).ok()?,
7698 _ => return None,
7699 };
7700 let right = match right {
7701 Value::Decimal(value) => value.clone(),
7702 Value::Integer(value) => DecimalValue::from_i32(*value),
7703 Value::BigInt(value) => DecimalValue::from_i64(*value),
7704 Value::Int128(value) => DecimalValue::from_i128(*value),
7705 Value::UInteger(value) => DecimalValue::from_u32(*value),
7706 Value::UBigInt(value) => DecimalValue::from_u64(*value),
7707 Value::UInt128(value) => DecimalValue::from_u128(*value),
7708 Value::Float32(value) => DecimalValue::from_f64(*value as f64).ok()?,
7709 Value::Float(value) => DecimalValue::from_f64(*value).ok()?,
7710 _ => return None,
7711 };
7712 Some(left.cmp(&right))
7713}
7714
7715fn validate_date_string(input: &str) -> Result<DateValue> {
7716 DateValue::parse(input)
7717}
7718
7719fn validate_time_string(input: &str) -> Result<TimeValue> {
7720 TimeValue::parse(input)
7721}
7722
7723fn validate_datetime_string(input: &str) -> Result<DateTimeValue> {
7724 DateTimeValue::parse(input)
7725}
7726
7727fn validate_time_with_time_zone_string(input: &str) -> Result<TimeWithTimeZoneValue> {
7728 TimeWithTimeZoneValue::parse(input)
7729}
7730
7731fn compare_condition_values(
7732 left: &Value,
7733 operator: &ComparisonOperator,
7734 right: &Value,
7735 text_context: Option<TextComparisonContext>,
7736) -> Option<bool> {
7737 if left.is_null() || right.is_null() {
7738 return None;
7739 }
7740
7741 match operator {
7742 ComparisonOperator::Equal => Some(sql_values_equal(left, right, text_context)),
7743 ComparisonOperator::NotEqual => Some(!sql_values_equal(left, right, text_context)),
7744 ComparisonOperator::LessThan => {
7745 sql_partial_cmp(left, right, text_context).map(|ord| ord.is_lt())
7746 }
7747 ComparisonOperator::LessThanOrEqual => {
7748 sql_partial_cmp(left, right, text_context).map(|ord| ord.is_le())
7749 }
7750 ComparisonOperator::GreaterThan => {
7751 sql_partial_cmp(left, right, text_context).map(|ord| ord.is_gt())
7752 }
7753 ComparisonOperator::GreaterThanOrEqual => {
7754 sql_partial_cmp(left, right, text_context).map(|ord| ord.is_ge())
7755 }
7756 }
7757}
7758
7759fn logical_and_values(left: Option<bool>, right: Option<bool>) -> Option<bool> {
7760 match (left, right) {
7761 (Some(false), _) | (_, Some(false)) => Some(false),
7762 (Some(true), Some(true)) => Some(true),
7763 _ => None,
7764 }
7765}
7766
7767fn logical_or_values(left: Option<bool>, right: Option<bool>) -> Option<bool> {
7768 match (left, right) {
7769 (Some(true), _) | (_, Some(true)) => Some(true),
7770 (Some(false), Some(false)) => Some(false),
7771 _ => None,
7772 }
7773}
7774
7775fn evaluate_in_candidates(
7776 probe: Value,
7777 candidates: impl IntoIterator<Item = Value>,
7778 is_not: bool,
7779 text_context: Option<TextComparisonContext>,
7780) -> Option<bool> {
7781 if probe.is_null() {
7782 return None;
7783 }
7784
7785 let mut matched = false;
7786 let mut saw_null = false;
7787 for candidate in candidates {
7788 if candidate.is_null() {
7789 saw_null = true;
7790 continue;
7791 }
7792 if sql_values_equal(&candidate, &probe, text_context) {
7793 matched = true;
7794 break;
7795 }
7796 }
7797
7798 if matched {
7799 Some(!is_not)
7800 } else if saw_null {
7801 None
7802 } else {
7803 Some(is_not)
7804 }
7805}
7806
7807fn evaluate_between_values(
7808 value: Value,
7809 lower: Value,
7810 upper: Value,
7811 is_not: bool,
7812 text_context: Option<TextComparisonContext>,
7813) -> Option<bool> {
7814 if value.is_null() || lower.is_null() || upper.is_null() {
7815 return None;
7816 }
7817
7818 let lower_ok = sql_partial_cmp(&value, &lower, text_context).map(|ordering| !ordering.is_lt());
7819 let upper_ok = sql_partial_cmp(&value, &upper, text_context).map(|ordering| !ordering.is_gt());
7820
7821 match (lower_ok, upper_ok) {
7822 (Some(true), Some(true)) => Some(!is_not),
7823 (Some(_), Some(_)) => Some(is_not),
7824 _ => None,
7825 }
7826}
7827
7828fn evaluate_like_values(
7829 value: Value,
7830 pattern: Value,
7831 is_not: bool,
7832 text_context: Option<TextComparisonContext>,
7833) -> Option<bool> {
7834 match (value, pattern) {
7835 (Value::Text(text), Value::Text(pattern)) => {
7836 let matched = like_matches_with_context(&pattern, &text, text_context);
7837 Some(if is_not { !matched } else { matched })
7838 }
7839 (left, right) if left.is_null() || right.is_null() => None,
7840 _ => None,
7841 }
7842}
7843
7844fn nullable_bool_to_value(value: Option<bool>) -> Value {
7845 match value {
7846 Some(value) => Value::Boolean(value),
7847 None => Value::Null,
7848 }
7849}
7850
7851fn coerce_value_to_nullable_bool(value: Value, context: &str) -> Result<Option<bool>> {
7852 match value {
7853 Value::Boolean(value) => Ok(Some(value)),
7854 Value::Null => Ok(None),
7855 value => Err(HematiteError::ParseError(format!(
7856 "{} requires a boolean value, found {:?}",
7857 context, value
7858 ))),
7859 }
7860}
7861
7862fn unique_index_parse_error(index_name: &str, table_name: &str) -> HematiteError {
7863 HematiteError::ParseError(format!(
7864 "Duplicate value for UNIQUE index '{}' on table '{}'",
7865 index_name, table_name
7866 ))
7867}
7868
7869fn convert_foreign_key_action(action: ForeignKeyAction) -> CatalogForeignKeyAction {
7870 match action {
7871 ForeignKeyAction::Restrict => CatalogForeignKeyAction::Restrict,
7872 ForeignKeyAction::Cascade => CatalogForeignKeyAction::Cascade,
7873 ForeignKeyAction::SetNull => CatalogForeignKeyAction::SetNull,
7874 }
7875}
7876
7877fn auto_unique_index_name(table_name: &str, column_name: &str, position: usize) -> String {
7878 format!(
7879 "uq_{}_{}_{}",
7880 sanitize_identifier(table_name),
7881 sanitize_identifier(column_name),
7882 position
7883 )
7884}
7885
7886fn unique_constraint_index_name(
7887 table_name: &str,
7888 unique: &UniqueConstraintDefinition,
7889 position: usize,
7890) -> String {
7891 if let Some(name) = &unique.name {
7892 return sanitize_identifier(name);
7893 }
7894
7895 let column_suffix = unique
7896 .columns
7897 .iter()
7898 .map(|column| sanitize_identifier(column))
7899 .collect::<Vec<_>>()
7900 .join("_");
7901 format!(
7902 "uq_{}_{}_{}",
7903 sanitize_identifier(table_name),
7904 column_suffix,
7905 position
7906 )
7907}
7908
7909fn sanitize_identifier(identifier: &str) -> String {
7910 let mut sanitized = String::with_capacity(identifier.len());
7911 for ch in identifier.chars() {
7912 if ch.is_ascii_alphanumeric() || ch == '_' {
7913 sanitized.push(ch);
7914 } else {
7915 sanitized.push('_');
7916 }
7917 }
7918 sanitized
7919}
7920
7921#[derive(Debug, Clone)]
7922pub struct DropIndexExecutor {
7923 pub statement: DropIndexStatement,
7924}
7925
7926impl DropIndexExecutor {
7927 pub fn new(statement: DropIndexStatement) -> Self {
7928 Self { statement }
7929 }
7930}
7931
7932impl QueryExecutor for DropIndexExecutor {
7933 fn execute(&mut self, ctx: &mut ExecutionContext<'_>) -> Result<QueryResult> {
7934 validate_statement(&Statement::DropIndex(self.statement.clone()), &ctx.catalog)?;
7935 if self.statement.if_exists {
7936 let Some(table) = ctx.catalog.get_table_by_name(&self.statement.table) else {
7937 return Ok(mutation_result(0));
7938 };
7939 if table
7940 .get_secondary_index(&self.statement.index_name)
7941 .is_none()
7942 {
7943 return Ok(mutation_result(0));
7944 }
7945 }
7946
7947 let table = ctx
7948 .catalog
7949 .get_table_by_name(&self.statement.table)
7950 .ok_or_else(|| {
7951 HematiteError::ParseError(format!("Table '{}' not found", self.statement.table))
7952 })?
7953 .clone();
7954 let index = table
7955 .get_secondary_index(&self.statement.index_name)
7956 .ok_or_else(|| {
7957 HematiteError::ParseError(format!(
7958 "Index '{}' does not exist on table '{}'",
7959 self.statement.index_name, self.statement.table
7960 ))
7961 })?
7962 .clone();
7963
7964 ctx.engine.delete_tree(index.root_page_id)?;
7965 ctx.catalog
7966 .drop_secondary_index(table.id, &self.statement.index_name)?;
7967
7968 Ok(QueryResult {
7969 affected_rows: 0,
7970 columns: Vec::new(),
7971 rows: Vec::new(),
7972 })
7973 }
7974}