1use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18 pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19 match plan {
20 PlanNode::SeqScan { table } => {
21 if self.view_registry.is_dirty(table) {
23 self.refresh_view(table)?;
24 }
25 let schema = self
26 .catalog
27 .schema(table)
28 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29 .clone();
30 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31 let rows: Vec<Vec<Value>> = self
32 .catalog
33 .scan(table)
34 .map_err(|e| QueryError::StorageError(e.to_string()))?
35 .map(|(_, row)| row)
36 .collect();
37 Ok(QueryResult::Rows { columns, rows })
38 }
39
40 PlanNode::Filter { input, predicate } => {
41 let materialized;
45 let predicate = if contains_subquery(predicate) {
46 materialized = self.materialize_subqueries(predicate)?;
47 &materialized
48 } else {
49 predicate
50 };
51
52 if contains_subquery(predicate) {
54 let result = self.execute_plan(input)?;
55 return match result {
56 QueryResult::Rows { columns, rows } => {
57 let mut filtered = Vec::new();
58 for row in rows {
59 let row_pred =
60 self.materialize_correlated_for_row(predicate, &row, &columns)?;
61 if eval_predicate(&row_pred, &row, &columns) {
62 filtered.push(row);
63 }
64 }
65 Ok(QueryResult::Rows {
66 columns,
67 rows: filtered,
68 })
69 }
70 _ => Err("filter requires row input".into()),
71 };
72 }
73
74 if let PlanNode::SeqScan { table } = input.as_ref() {
79 if self.view_registry.is_dirty(table) {
81 self.refresh_view(table)?;
82 }
83 let schema = self
84 .catalog
85 .schema(table)
86 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87 .clone();
88 let columns: Vec<String> =
89 schema.columns.iter().map(|c| c.name.clone()).collect();
90 let fast = FastLayout::new(&schema);
91 let row_layout = RowLayout::new(&schema);
92 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100 self.catalog
101 .for_each_row_raw(table, |_rid, data| {
102 if compiled(data) {
103 rows.push(decode_row(&schema, data));
104 }
105 })
106 .map_err(|e| QueryError::StorageError(e.to_string()))?;
107 } else {
108 let pred_cols = predicate_column_indices(predicate, &columns);
109 self.catalog
110 .for_each_row_raw(table, |_rid, data| {
111 let pred_row =
112 decode_selective(&schema, &row_layout, data, &pred_cols);
113 if eval_predicate(predicate, &pred_row, &columns) {
114 rows.push(decode_row(&schema, data));
115 }
116 })
117 .map_err(|e| QueryError::StorageError(e.to_string()))?;
118 }
119
120 return Ok(QueryResult::Rows { columns, rows });
121 }
122
123 let result = self.execute_plan(input)?;
125 match result {
126 QueryResult::Rows { columns, rows } => {
127 let filtered: Vec<Vec<Value>> = rows
128 .into_iter()
129 .filter(|row| eval_predicate(predicate, row, &columns))
130 .collect();
131 Ok(QueryResult::Rows {
132 columns,
133 rows: filtered,
134 })
135 }
136 _ => Err("filter requires row input".into()),
137 }
138 }
139
140 PlanNode::Project { input, fields } => {
141 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144 let schema = self
145 .catalog
146 .schema(table)
147 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148 .clone();
149 let all_columns: Vec<String> =
150 schema.columns.iter().map(|c| c.name.clone()).collect();
151 let key_value = literal_to_value(key)?;
152 let tbl = self
153 .catalog
154 .get_table(table)
155 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157 let proj_columns: Vec<String> = fields
158 .iter()
159 .map(|f| {
160 f.alias.clone().unwrap_or_else(|| match &f.expr {
161 Expr::Field(name) => name.clone(),
162 _ => "?".into(),
163 })
164 })
165 .collect();
166
167 let proj_indices: Vec<usize> = fields
169 .iter()
170 .filter_map(|f| {
171 if let Expr::Field(name) = &f.expr {
172 all_columns.iter().position(|c| c == name)
173 } else {
174 None
175 }
176 })
177 .collect();
178
179 if tbl.has_index(column) {
180 let layout = RowLayout::new(&schema);
181 let rids = tbl.index_lookup_all(column, &key_value);
182 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183 for rid in rids {
184 if let Some(data) = tbl.heap.get(rid) {
185 let row: Vec<Value> = proj_indices
186 .iter()
187 .map(|&ci| decode_column(&schema, &layout, &data, ci))
188 .collect();
189 rows.push(row);
190 }
191 }
192 return Ok(QueryResult::Rows {
193 columns: proj_columns,
194 rows,
195 });
196 }
197 }
198
199 if let PlanNode::Limit {
204 input: inner,
205 count: limit_expr,
206 } = input.as_ref()
207 {
208 if let PlanNode::Sort {
209 input: sort_input,
210 keys,
211 } = inner.as_ref()
212 {
213 if keys.len() == 1 {
215 let sort_field = &keys[0].field;
216 let descending = keys[0].descending;
217 let limit = match limit_expr {
218 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219 _ => usize::MAX,
220 };
221 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222 match sort_input.as_ref() {
223 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224 PlanNode::Filter {
225 input: fi,
226 predicate,
227 } => {
228 if let PlanNode::SeqScan { table } = fi.as_ref() {
229 (Some(table.as_str()), Some(predicate))
230 } else {
231 (None, None)
232 }
233 }
234 _ => (None, None),
235 };
236 if let Some(table) = table_opt {
237 if let Some(result) = self.project_filter_sort_limit_fast(
238 table, fields, sort_field, descending, limit, pred_opt,
239 )? {
240 return Ok(result);
241 }
242 }
243 }
244 }
245 if let PlanNode::Filter {
248 input: fi,
249 predicate,
250 } = inner.as_ref()
251 {
252 if let PlanNode::SeqScan { table } = fi.as_ref() {
253 let limit = match limit_expr {
254 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255 _ => usize::MAX,
256 };
257 if let Some(result) = self.project_filter_limit_fast(
258 table,
259 fields,
260 limit,
261 Some(predicate),
262 )? {
263 return Ok(result);
264 }
265 }
266 }
267 if let PlanNode::SeqScan { table } = inner.as_ref() {
269 let limit = match limit_expr {
270 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271 _ => usize::MAX,
272 };
273 if let Some(result) =
274 self.project_filter_limit_fast(table, fields, limit, None)?
275 {
276 return Ok(result);
277 }
278 }
279 }
280
281 if let PlanNode::Filter {
292 input: fi,
293 predicate,
294 } = input.as_ref()
295 {
296 if let PlanNode::SeqScan { table } = fi.as_ref() {
297 if let Some(result) = self.project_filter_limit_fast(
298 table,
299 fields,
300 usize::MAX,
301 Some(predicate),
302 )? {
303 return Ok(result);
304 }
305 }
306 }
307
308 if let PlanNode::SeqScan { table } = input.as_ref() {
312 if let Some(result) =
313 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314 {
315 return Ok(result);
316 }
317 }
318
319 let result = self.execute_plan(input)?;
320 match result {
321 QueryResult::Rows { columns, rows } => {
322 let proj_columns: Vec<String> = fields
323 .iter()
324 .map(|f| {
325 f.alias.clone().unwrap_or_else(|| match &f.expr {
326 Expr::Field(name) => name.clone(),
327 Expr::QualifiedField { qualifier, field } => {
331 format!("{qualifier}.{field}")
332 }
333 _ => "?".into(),
334 })
335 })
336 .collect();
337 let proj_rows: Vec<Vec<Value>> = rows
338 .iter()
339 .map(|row| {
340 fields
341 .iter()
342 .map(|f| eval_expr(&f.expr, row, &columns))
343 .collect()
344 })
345 .collect();
346 Ok(QueryResult::Rows {
347 columns: proj_columns,
348 rows: proj_rows,
349 })
350 }
351 _ => Err("project requires row input".into()),
352 }
353 }
354
355 PlanNode::Sort { input, keys } => {
356 let result = self.execute_plan(input)?;
357 match result {
358 QueryResult::Rows { columns, mut rows } => {
359 if rows.len() > MAX_SORT_ROWS {
360 return Err(QueryError::SortLimitExceeded);
361 }
362 let key_indices: Vec<(usize, bool)> = keys
363 .iter()
364 .map(|k| {
365 columns
366 .iter()
367 .position(|c| c == &k.field)
368 .map(|idx| (idx, k.descending))
369 .ok_or_else(|| QueryError::ColumnNotFound {
370 table: String::new(),
371 column: k.field.clone(),
372 })
373 })
374 .collect::<Result<_, QueryError>>()?;
375 rows.sort_by(|a, b| {
376 for &(col_idx, descending) in &key_indices {
377 let cmp = a[col_idx].cmp(&b[col_idx]);
378 let cmp = if descending { cmp.reverse() } else { cmp };
379 if cmp != std::cmp::Ordering::Equal {
380 return cmp;
381 }
382 }
383 std::cmp::Ordering::Equal
384 });
385 Ok(QueryResult::Rows { columns, rows })
386 }
387 _ => Err("sort requires row input".into()),
388 }
389 }
390
391 PlanNode::Limit { input, count } => {
392 let result = self.execute_plan(input)?;
393 let n = match count {
394 Expr::Literal(Literal::Int(v)) => *v as usize,
395 _ => return Err("limit must be integer literal".into()),
396 };
397 match result {
398 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
399 columns,
400 rows: rows.into_iter().take(n).collect(),
401 }),
402 _ => Err("limit requires row input".into()),
403 }
404 }
405
406 PlanNode::Offset { input, count } => {
407 let result = self.execute_plan(input)?;
408 let n = match count {
409 Expr::Literal(Literal::Int(v)) => *v as usize,
410 _ => return Err("offset must be integer literal".into()),
411 };
412 match result {
413 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
414 columns,
415 rows: rows.into_iter().skip(n).collect(),
416 }),
417 _ => Err("offset requires row input".into()),
418 }
419 }
420
421 PlanNode::Aggregate {
422 input,
423 function,
424 field,
425 } => {
426 if *function == AggFunc::Count {
428 if let PlanNode::SeqScan { table } = input.as_ref() {
429 let mut count: i64 = 0;
430 self.catalog
431 .for_each_row_raw(table, |_rid, _data| {
432 count += 1;
433 })
434 .map_err(|e| QueryError::StorageError(e.to_string()))?;
435 return Ok(QueryResult::Scalar(Value::Int(count)));
436 }
437 if let PlanNode::Filter {
440 input: inner,
441 predicate,
442 } = input.as_ref()
443 {
444 if let PlanNode::SeqScan { table } = inner.as_ref() {
445 let schema = self
446 .catalog
447 .schema(table)
448 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
449 .clone();
450 let columns: Vec<String> =
451 schema.columns.iter().map(|c| c.name.clone()).collect();
452 let fast = FastLayout::new(&schema);
453 let row_layout = RowLayout::new(&schema);
454
455 if let Some(compiled) =
458 compile_predicate(predicate, &columns, &fast, &schema)
459 {
460 let mut count: i64 = 0;
461 self.catalog
462 .for_each_row_raw(table, |_rid, data| {
463 if compiled(data) {
464 count += 1;
465 }
466 })
467 .map_err(|e| QueryError::StorageError(e.to_string()))?;
468 return Ok(QueryResult::Scalar(Value::Int(count)));
469 }
470
471 let pred_cols = predicate_column_indices(predicate, &columns);
473 let mut count: i64 = 0;
474 self.catalog
475 .for_each_row_raw(table, |_rid, data| {
476 let pred_row =
477 decode_selective(&schema, &row_layout, data, &pred_cols);
478 if eval_predicate(predicate, &pred_row, &columns) {
479 count += 1;
480 }
481 })
482 .map_err(|e| QueryError::StorageError(e.to_string()))?;
483
484 return Ok(QueryResult::Scalar(Value::Int(count)));
485 }
486 }
487 }
488
489 if matches!(
493 function,
494 AggFunc::Sum
495 | AggFunc::Avg
496 | AggFunc::Min
497 | AggFunc::Max
498 | AggFunc::CountDistinct
499 ) {
500 if let Some(col) = field.as_ref() {
501 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
503 match input.as_ref() {
504 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
505 PlanNode::Filter {
506 input: inner,
507 predicate,
508 } => {
509 if let PlanNode::SeqScan { table } = inner.as_ref() {
510 (Some(table.as_str()), Some(predicate))
511 } else {
512 (None, None)
513 }
514 }
515 _ => (None, None),
516 };
517 if let Some(table) = table_opt {
518 if let Some(result) =
519 self.agg_single_col_fast(table, col, *function, pred_opt)?
520 {
521 return Ok(result);
522 }
523 }
524 }
525 }
526
527 let result = self.execute_plan(input)?;
532 match result {
533 QueryResult::Rows { columns, rows } => {
534 match function {
535 AggFunc::Count => {
536 Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
537 }
538 AggFunc::CountDistinct => {
539 let col = field.as_ref().ok_or("count distinct requires field")?;
540 let idx = columns
541 .iter()
542 .position(|c| c == col)
543 .ok_or("col not found")?;
544 let mut seen = std::collections::HashSet::new();
545 for row in &rows {
546 let v = &row[idx];
547 if !v.is_empty() {
548 seen.insert(v.clone());
549 }
550 }
551 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
552 }
553 AggFunc::Avg => {
554 let col = field.as_ref().ok_or("avg requires field")?;
555 let idx = columns
556 .iter()
557 .position(|c| c == col)
558 .ok_or("col not found")?;
559 let sum: f64 = rows
560 .iter()
561 .filter_map(|r| match &r[idx] {
562 Value::Int(v) => Some(*v as f64),
563 Value::Float(v) => Some(*v),
564 _ => None,
565 })
566 .sum();
567 let count = rows.len() as f64;
568 Ok(QueryResult::Scalar(Value::Float(sum / count)))
569 }
570 AggFunc::Sum => {
571 let col = field.as_ref().ok_or("sum requires field")?;
572 let idx = columns
573 .iter()
574 .position(|c| c == col)
575 .ok_or("col not found")?;
576 let mut int_sum: i64 = 0;
582 let mut float_sum: f64 = 0.0;
583 let mut saw_float = false;
584 for r in &rows {
585 match &r[idx] {
586 Value::Int(v) => int_sum += *v,
587 Value::Float(v) => {
588 float_sum += *v;
589 saw_float = true;
590 }
591 _ => {}
592 }
593 }
594 let result = if saw_float {
595 Value::Float(float_sum + int_sum as f64)
596 } else {
597 Value::Int(int_sum)
598 };
599 Ok(QueryResult::Scalar(result))
600 }
601 AggFunc::Min | AggFunc::Max => {
602 let col = field.as_ref().ok_or("min/max requires field")?;
603 let idx = columns
604 .iter()
605 .position(|c| c == col)
606 .ok_or("col not found")?;
607 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
608 let result = if *function == AggFunc::Min {
609 vals.into_iter().min().cloned()
610 } else {
611 vals.into_iter().max().cloned()
612 };
613 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
614 }
615 }
616 }
617 _ => Err("aggregate requires row input".into()),
618 }
619 }
620
621 PlanNode::Insert { table, assignments } => {
622 let values = {
623 let schema = self
624 .catalog
625 .schema(table)
626 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
627 let mut values = vec![Value::Empty; schema.columns.len()];
628 for a in assignments {
629 let idx = schema.column_index(&a.field).ok_or_else(|| {
630 QueryError::ColumnNotFound {
631 table: String::new(),
632 column: a.field.clone(),
633 }
634 })?;
635 let raw = literal_to_value(&a.value)?;
636 values[idx] = coerce_value(raw, &schema.columns[idx])?;
637 }
638 for col in &schema.columns {
639 if col.required && matches!(values[col.position as usize], Value::Empty) {
640 return Err(QueryError::Execution(format!(
641 "column '{}' is required but no value was provided",
642 col.name
643 )));
644 }
645 }
646 values
647 };
648 self.catalog
649 .insert(table, &values)
650 .map_err(|e| QueryError::StorageError(e.to_string()))?;
651 self.view_registry.mark_dependents_dirty(table);
652 Ok(QueryResult::Modified(1))
653 }
654
655 PlanNode::Upsert {
656 table,
657 key_column,
658 assignments,
659 on_conflict,
660 } => {
661 let (values, key_idx) = {
662 let schema = self
663 .catalog
664 .schema(table)
665 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
666 let mut values = vec![Value::Empty; schema.columns.len()];
667 for a in assignments {
668 let idx = schema.column_index(&a.field).ok_or_else(|| {
669 QueryError::ColumnNotFound {
670 table: String::new(),
671 column: a.field.clone(),
672 }
673 })?;
674 let raw = literal_to_value(&a.value)?;
675 values[idx] = coerce_value(raw, &schema.columns[idx])?;
676 }
677 for col in &schema.columns {
678 if col.required && matches!(values[col.position as usize], Value::Empty) {
679 return Err(QueryError::Execution(format!(
680 "column '{}' is required but no value was provided",
681 col.name
682 )));
683 }
684 }
685 let key_idx = schema
686 .column_index(key_column)
687 .ok_or_else(|| format!("key column '{key_column}' not found"))?;
688 (values, key_idx)
689 };
690
691 let key_value = values[key_idx].clone();
692
693 let existing = {
695 let tbl = self
696 .catalog
697 .get_table(table)
698 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
699 if tbl.has_index(key_column) {
700 let rids = tbl.index_lookup_all(key_column, &key_value);
705 rids.into_iter().next().and_then(|rid| {
706 tbl.heap
707 .get(rid)
708 .map(|data| (rid, decode_row(&tbl.schema, &data)))
709 })
710 } else {
711 let mut found = None;
713 for (rid, row) in tbl.scan() {
714 if row[key_idx] == key_value {
715 found = Some((rid, row));
716 break;
717 }
718 }
719 found
720 }
721 };
722
723 if let Some((rid, mut existing_row)) = existing {
724 let update_assignments = if on_conflict.is_empty() {
726 assignments
727 } else {
728 on_conflict
729 };
730 let changed_cols: Vec<usize> = {
731 let schema = self
732 .catalog
733 .schema(table)
734 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
735 let mut indices = Vec::new();
736 for a in update_assignments {
737 let idx = schema.column_index(&a.field).ok_or_else(|| {
738 QueryError::ColumnNotFound {
739 table: String::new(),
740 column: a.field.clone(),
741 }
742 })?;
743 if idx != key_idx {
744 existing_row[idx] = literal_to_value(&a.value)?;
745 indices.push(idx);
746 }
747 }
748 indices
749 };
750 self.catalog
751 .update_hinted(table, rid, &existing_row, Some(&changed_cols))
752 .map_err(|e| QueryError::StorageError(e.to_string()))?;
753 self.view_registry.mark_dependents_dirty(table);
754 Ok(QueryResult::Modified(1))
755 } else {
756 self.catalog
758 .insert(table, &values)
759 .map_err(|e| QueryError::StorageError(e.to_string()))?;
760 self.view_registry.mark_dependents_dirty(table);
761 Ok(QueryResult::Modified(1))
762 }
763 }
764
765 PlanNode::Update {
766 input,
767 table,
768 assignments,
769 } => {
770 let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
776 let schema_ref = self
777 .catalog
778 .schema(table)
779 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
780 let indices: Vec<usize> = assignments
781 .iter()
782 .map(|a| {
783 schema_ref.column_index(&a.field).ok_or_else(|| {
784 QueryError::ColumnNotFound {
785 table: String::new(),
786 column: a.field.clone(),
787 }
788 })
789 })
790 .collect::<Result<_, _>>()?;
791 let vals: Result<Vec<Value>, _> = assignments
792 .iter()
793 .map(|a| literal_to_value(&a.value))
794 .collect();
795 (indices, vals.ok())
796 };
797 let resolved_assignments: Option<Vec<(usize, Value)>> =
798 literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
799
800 let changed_cols: Vec<usize> = col_indices.clone();
803
804 if let Some(ref resolved_assignments) = resolved_assignments {
811 if let PlanNode::Filter {
812 input: inner,
813 predicate,
814 } = input.as_ref()
815 {
816 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
817 if t == table {
818 let fused_result = self.try_fused_scan_update(
819 table,
820 predicate,
821 resolved_assignments,
822 &changed_cols,
823 );
824 if let Some(result) = fused_result {
825 return result;
826 }
827 }
828 }
829 }
830 }
831
832 let matching_rids = self.collect_rids_for_mutation(input, table)?;
834
835 if let Some(ref resolved_assignments) = resolved_assignments {
837 let fast_patch: Option<Vec<FastPatch>> = {
843 let tbl = self
844 .catalog
845 .get_table(table)
846 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
847 let schema = &tbl.schema;
848 let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
849 is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
850 });
851 let no_indexed = !resolved_assignments
852 .iter()
853 .any(|(idx, _)| tbl.has_indexed_col(*idx));
854
855 if all_fixed_nonnull && no_indexed {
856 let layout = RowLayout::new(schema);
857 let bitmap_size = layout.bitmap_size();
858 let patches: Vec<FastPatch> = resolved_assignments
859 .iter()
860 .map(|(idx, val)| {
861 let fixed_off = layout
862 .fixed_offset(*idx)
863 .expect("is_fixed_size already checked");
864 let field_off = 2 + bitmap_size + fixed_off;
865 let bytes: FixedBytes = match val {
866 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
867 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
868 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
869 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
870 Value::Uuid(v) => FixedBytes::Uuid(*v),
871 _ => unreachable!("all_fixed_nonnull guard lied"),
872 };
873 FastPatch {
874 field_off,
875 bitmap_byte_off: 2 + idx / 8,
876 bit_mask: 1u8 << (idx % 8),
877 bytes,
878 }
879 })
880 .collect();
881 Some(patches)
882 } else {
883 None
884 }
885 };
886
887 if let Some(patches) = fast_patch {
888 let mut count = 0u64;
889 for rid in matching_rids {
890 let ok = self
895 .catalog
896 .update_row_bytes_logged(table, rid, |row| {
897 for p in &patches {
898 row[p.bitmap_byte_off] &= !p.bit_mask;
899 let field_bytes = p.bytes.as_slice();
900 row[p.field_off..p.field_off + field_bytes.len()]
901 .copy_from_slice(field_bytes);
902 }
903 })
904 .map_err(|e| QueryError::StorageError(e.to_string()))?;
905 if ok {
906 count += 1;
907 }
908 }
909 self.view_registry.mark_dependents_dirty(table);
910 return Ok(QueryResult::Modified(count));
911 }
912
913 let var_fast: Option<(usize, Option<Vec<u8>>)> = {
915 let tbl = self
916 .catalog
917 .get_table(table)
918 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
919 let schema = &tbl.schema;
920 let is_single = resolved_assignments.len() == 1;
921 let is_var_col = is_single
922 && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
923 let no_indexed = !resolved_assignments
924 .iter()
925 .any(|(idx, _)| tbl.has_indexed_col(*idx));
926
927 if is_single && is_var_col && no_indexed {
928 let (idx, val) = &resolved_assignments[0];
929 let bytes_opt: Option<Vec<u8>> = match val {
930 Value::Str(s) => Some(s.as_bytes().to_vec()),
931 Value::Bytes(b) => Some(b.clone()),
932 Value::Empty => None,
933 _ => {
934 return Err(QueryError::TypeError(format!(
935 "cannot assign non-var value to var column '{}'",
936 schema.columns[*idx].name
937 )))
938 }
939 };
940 Some((*idx, bytes_opt))
941 } else {
942 None
943 }
944 };
945
946 if let Some((col_idx, new_bytes_opt)) = var_fast {
947 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
948 let mut count = 0u64;
949 let mut fallback_rids: Vec<RowId> = Vec::new();
950 for rid in &matching_rids {
951 let ok = self
957 .catalog
958 .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
959 .map_err(|e| QueryError::StorageError(e.to_string()))?;
960 if ok {
961 count += 1;
962 } else {
963 fallback_rids.push(*rid);
964 }
965 }
966 for rid in fallback_rids {
967 let mut row = match self.catalog.get(table, rid) {
968 Some(r) => r,
969 None => continue,
970 };
971 for (idx, val) in resolved_assignments.iter() {
972 row[*idx] = val.clone();
973 }
974 self.catalog
975 .update_hinted(table, rid, &row, Some(&changed_cols))
976 .map_err(|e| QueryError::StorageError(e.to_string()))?;
977 count += 1;
978 }
979 self.view_registry.mark_dependents_dirty(table);
980 return Ok(QueryResult::Modified(count));
981 }
982
983 let mut count = 0u64;
985 for rid in matching_rids {
986 let mut row = match self.catalog.get(table, rid) {
987 Some(r) => r,
988 None => continue,
989 };
990 for (idx, val) in resolved_assignments.iter() {
991 row[*idx] = val.clone();
992 }
993 self.catalog
994 .update_hinted(table, rid, &row, Some(&changed_cols))
995 .map_err(|e| QueryError::StorageError(e.to_string()))?;
996 count += 1;
997 }
998 self.view_registry.mark_dependents_dirty(table);
999 return Ok(QueryResult::Modified(count));
1000 } let col_names: Vec<String> = {
1006 let schema_ref = self
1007 .catalog
1008 .schema(table)
1009 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1010 schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1011 };
1012 let mut count = 0u64;
1013 for rid in matching_rids {
1014 let mut row = match self.catalog.get(table, rid) {
1015 Some(r) => r,
1016 None => continue,
1017 };
1018 for (i, asgn) in assignments.iter().enumerate() {
1019 let val = eval_expr(&asgn.value, &row, &col_names);
1020 row[col_indices[i]] = val;
1021 }
1022 self.catalog
1023 .update_hinted(table, rid, &row, Some(&changed_cols))
1024 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1025 count += 1;
1026 }
1027 self.view_registry.mark_dependents_dirty(table);
1028 Ok(QueryResult::Modified(count))
1029 }
1030
1031 PlanNode::Delete { input, table } => {
1032 if let PlanNode::Filter {
1053 input: inner,
1054 predicate,
1055 } = input.as_ref()
1056 {
1057 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1058 if t == table {
1059 let schema = self
1060 .catalog
1061 .schema(table)
1062 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1063 let columns: Vec<String> =
1064 schema.columns.iter().map(|c| c.name.clone()).collect();
1065 let fast = FastLayout::new(schema);
1066 if let Some(compiled) =
1067 compile_predicate(predicate, &columns, &fast, schema)
1068 {
1069 let count = self
1075 .catalog
1076 .scan_delete_matching_logged(table, |data| compiled(data))
1077 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1078 self.view_registry.mark_dependents_dirty(table);
1079 return Ok(QueryResult::Modified(count));
1080 }
1081 }
1082 }
1083 } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1084 if t == table {
1085 let count = self
1089 .catalog
1090 .scan_delete_matching_logged(table, |_| true)
1091 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1092 self.view_registry.mark_dependents_dirty(table);
1093 return Ok(QueryResult::Modified(count));
1094 }
1095 }
1096
1097 let matching_rids = self.collect_rids_for_mutation(input, table)?;
1098 let count = self
1099 .catalog
1100 .delete_many(table, &matching_rids)
1101 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1102 self.view_registry.mark_dependents_dirty(table);
1103 Ok(QueryResult::Modified(count))
1104 }
1105
1106 PlanNode::AliasScan { table, alias } => {
1107 let schema = self
1117 .catalog
1118 .schema(table)
1119 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1120 .clone();
1121 let columns: Vec<String> = schema
1122 .columns
1123 .iter()
1124 .map(|c| format!("{alias}.{}", c.name))
1125 .collect();
1126 let rows: Vec<Vec<Value>> = self
1127 .catalog
1128 .scan(table)
1129 .map_err(|e| QueryError::StorageError(e.to_string()))?
1130 .map(|(_, row)| row)
1131 .collect();
1132 Ok(QueryResult::Rows { columns, rows })
1133 }
1134
1135 PlanNode::NestedLoopJoin {
1136 left,
1137 right,
1138 on,
1139 kind,
1140 } => {
1141 let left_result = self.execute_plan(left)?;
1152 let right_result = self.execute_plan(right)?;
1153 let (left_columns, left_rows) = match left_result {
1154 QueryResult::Rows { columns, rows } => (columns, rows),
1155 _ => return Err("join left side must produce rows".into()),
1156 };
1157 let (right_columns, right_rows) = match right_result {
1158 QueryResult::Rows { columns, rows } => (columns, rows),
1159 _ => return Err("join right side must produce rows".into()),
1160 };
1161
1162 if !matches!(kind, JoinKind::Cross) {
1164 if let Some(pred) = on {
1165 if let Some((l_idx, r_idx)) =
1166 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1167 {
1168 let result = hash_join(
1169 left_columns,
1170 left_rows,
1171 right_columns,
1172 right_rows,
1173 l_idx,
1174 r_idx,
1175 *kind,
1176 );
1177 if let QueryResult::Rows { ref rows, .. } = result {
1178 check_join_limit(rows.len())?;
1179 }
1180 return Ok(result);
1181 }
1182 }
1183 }
1184
1185 let n_left = left_columns.len();
1187 let n_right = right_columns.len();
1188 let mut columns = Vec::with_capacity(n_left + n_right);
1189 columns.extend(left_columns);
1190 columns.extend(right_columns);
1191
1192 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1193 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1194
1195 for left_row in &left_rows {
1196 let mut matched = false;
1197 for right_row in &right_rows {
1198 combined.clear();
1199 combined.extend_from_slice(left_row);
1200 combined.extend_from_slice(right_row);
1201 let keep = match kind {
1202 JoinKind::Cross => true,
1203 JoinKind::Inner | JoinKind::LeftOuter => match on {
1204 Some(pred) => eval_predicate(pred, &combined, &columns),
1205 None => true,
1209 },
1210 JoinKind::RightOuter => {
1213 unreachable!("planner rewrites RightOuter to LeftOuter")
1214 }
1215 };
1216 if keep {
1217 rows.push(combined.clone());
1218 check_join_limit(rows.len())?;
1219 matched = true;
1220 }
1221 }
1222 if !matched && matches!(kind, JoinKind::LeftOuter) {
1223 let mut row = Vec::with_capacity(n_left + n_right);
1224 row.extend_from_slice(left_row);
1225 row.resize(n_left + n_right, Value::Empty);
1226 rows.push(row);
1227 check_join_limit(rows.len())?;
1228 }
1229 }
1230
1231 Ok(QueryResult::Rows { columns, rows })
1232 }
1233
1234 PlanNode::Distinct { input } => {
1235 let result = self.execute_plan(input)?;
1236 match result {
1237 QueryResult::Rows { columns, rows } => {
1238 let mut seen = std::collections::HashSet::new();
1239 let mut unique_rows = Vec::new();
1240 for row in rows {
1241 if seen.insert(row.clone()) {
1242 unique_rows.push(row);
1243 }
1244 }
1245 Ok(QueryResult::Rows {
1246 columns,
1247 rows: unique_rows,
1248 })
1249 }
1250 other => Ok(other),
1251 }
1252 }
1253
1254 PlanNode::GroupBy {
1255 input,
1256 keys,
1257 aggregates,
1258 having,
1259 } => {
1260 let result = self.execute_plan(input)?;
1261 match result {
1262 QueryResult::Rows { columns, rows } => {
1263 let key_indices: Vec<usize> = keys
1265 .iter()
1266 .map(|k| {
1267 columns
1268 .iter()
1269 .position(|c| c == k)
1270 .ok_or_else(|| format!("group-by column '{k}' not found"))
1271 })
1272 .collect::<Result<Vec<_>, _>>()?;
1273
1274 let agg_field_indices: Vec<usize> = aggregates
1278 .iter()
1279 .map(|a| {
1280 if a.field == "*" {
1281 Ok(usize::MAX)
1282 } else {
1283 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1284 format!("aggregate column '{}' not found", a.field)
1285 })
1286 }
1287 })
1288 .collect::<Result<Vec<_>, _>>()?;
1289
1290 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1292 rustc_hash::FxHashMap::default();
1293 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1294 for (ri, row) in rows.iter().enumerate() {
1295 let key: Vec<Value> =
1296 key_indices.iter().map(|&i| row[i].clone()).collect();
1297 match group_map.get(&key) {
1298 Some(&idx) => groups[idx].1.push(ri),
1299 None => {
1300 let idx = groups.len();
1301 group_map.insert(key.clone(), idx);
1302 groups.push((key, vec![ri]));
1303 }
1304 }
1305 }
1306
1307 let mut out_columns: Vec<String> = keys.clone();
1309 for agg in aggregates.iter() {
1310 out_columns.push(agg.output_name.clone());
1311 }
1312
1313 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1315 for (key_vals, row_indices) in &groups {
1316 let mut row = key_vals.clone();
1317 for (ai, agg) in aggregates.iter().enumerate() {
1318 let col_idx = agg_field_indices[ai];
1319 let val = compute_group_aggregate(
1320 agg.function,
1321 &rows,
1322 row_indices,
1323 col_idx,
1324 );
1325 row.push(val);
1326 }
1327 out_rows.push(row);
1328 }
1329
1330 if let Some(having_expr) = having {
1332 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1333 }
1334
1335 Ok(QueryResult::Rows {
1336 columns: out_columns,
1337 rows: out_rows,
1338 })
1339 }
1340 _ => Err("group by requires row input".into()),
1341 }
1342 }
1343
1344 PlanNode::CreateTable { name, fields } => {
1345 let columns: Vec<ColumnDef> = fields
1346 .iter()
1347 .enumerate()
1348 .map(
1349 |(i, (fname, tname, req))| -> Result<ColumnDef, QueryError> {
1350 Ok(ColumnDef {
1351 name: fname.clone(),
1352 type_id: type_name_to_id(tname).map_err(QueryError::TypeError)?,
1353 required: *req,
1354 position: i as u16,
1355 })
1356 },
1357 )
1358 .collect::<Result<Vec<_>, _>>()?;
1359 let schema = Schema {
1360 table_name: name.clone(),
1361 columns,
1362 };
1363 self.catalog
1364 .create_table(schema)
1365 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1366 Ok(QueryResult::Created(name.clone()))
1367 }
1368
1369 PlanNode::AlterTable { table, action } => match action {
1370 AlterAction::AddColumn {
1371 name,
1372 type_name,
1373 required,
1374 } => {
1375 let position = self
1376 .catalog
1377 .schema(table)
1378 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1379 .columns
1380 .len() as u16;
1381 let col = ColumnDef {
1382 name: name.clone(),
1383 type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1384 required: *required,
1385 position,
1386 };
1387 self.catalog
1388 .alter_table_add_column(table, col)
1389 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1390 Ok(QueryResult::Executed {
1391 message: format!("column '{name}' added to '{table}'"),
1392 })
1393 }
1394 AlterAction::DropColumn { name } => {
1395 self.catalog
1396 .alter_table_drop_column(table, name)
1397 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1398 Ok(QueryResult::Executed {
1399 message: format!("column '{name}' dropped from '{table}'"),
1400 })
1401 }
1402 AlterAction::AddIndex { column } => {
1403 self.catalog
1404 .create_index(table, column)
1405 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1406 Ok(QueryResult::Executed {
1407 message: format!("index on '{table}.{column}' created"),
1408 })
1409 }
1410 },
1411
1412 PlanNode::DropTable { name } => {
1413 self.catalog
1414 .drop_table(name)
1415 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1416 Ok(QueryResult::Executed {
1417 message: format!("table '{name}' dropped"),
1418 })
1419 }
1420
1421 PlanNode::CreateView { name, query_text } => {
1422 self.create_view(name, query_text)?;
1423 Ok(QueryResult::Executed {
1424 message: format!("materialized view '{name}' created"),
1425 })
1426 }
1427
1428 PlanNode::RefreshView { name } => {
1429 self.refresh_view(name)?;
1430 Ok(QueryResult::Executed {
1431 message: format!("materialized view '{name}' refreshed"),
1432 })
1433 }
1434
1435 PlanNode::DropView { name } => {
1436 self.drop_view(name)?;
1437 Ok(QueryResult::Executed {
1438 message: format!("materialized view '{name}' dropped"),
1439 })
1440 }
1441
1442 PlanNode::Window { input, windows } => {
1443 let result = self.execute_plan(input)?;
1444 execute_window(result, windows)
1445 }
1446
1447 PlanNode::Union { left, right, all } => {
1448 let left_result = self.execute_plan(left)?;
1449 let right_result = self.execute_plan(right)?;
1450 let (left_cols, left_rows) = match left_result {
1451 QueryResult::Rows { columns, rows } => (columns, rows),
1452 _ => return Err("UNION requires query results on left side".into()),
1453 };
1454 let (_, right_rows) = match right_result {
1455 QueryResult::Rows { columns, rows } => (columns, rows),
1456 _ => return Err("UNION requires query results on right side".into()),
1457 };
1458 let mut combined = left_rows;
1459 if *all {
1460 combined.extend(right_rows);
1462 } else {
1463 let mut seen = std::collections::HashSet::new();
1466 for row in &combined {
1467 seen.insert(row.clone());
1468 }
1469 for row in right_rows {
1470 if seen.insert(row.clone()) {
1471 combined.push(row);
1472 }
1473 }
1474 }
1475 Ok(QueryResult::Rows {
1476 columns: left_cols,
1477 rows: combined,
1478 })
1479 }
1480
1481 PlanNode::Explain { input } => {
1482 let text = format_plan_tree(input, 0);
1483 Ok(QueryResult::Rows {
1484 columns: vec!["plan".to_string()],
1485 rows: text
1486 .lines()
1487 .map(|line| vec![Value::Str(line.to_string())])
1488 .collect(),
1489 })
1490 }
1491
1492 PlanNode::Begin => {
1493 if self.in_transaction {
1494 return Err(QueryError::Execution(
1495 "already in a transaction (nested transactions not supported)".into(),
1496 ));
1497 }
1498 self.in_transaction = true;
1499 Ok(QueryResult::Executed {
1500 message: "transaction started".to_string(),
1501 })
1502 }
1503
1504 PlanNode::Commit => {
1505 if !self.in_transaction {
1506 return Err(QueryError::Execution(
1507 "no active transaction to commit".into(),
1508 ));
1509 }
1510 self.in_transaction = false;
1511 self.catalog
1512 .sync_wal()
1513 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1514 Ok(QueryResult::Executed {
1515 message: "transaction committed".to_string(),
1516 })
1517 }
1518
1519 PlanNode::Rollback => {
1520 if !self.in_transaction {
1521 return Err(QueryError::Execution(
1522 "no active transaction to roll back".into(),
1523 ));
1524 }
1525 self.in_transaction = false;
1526 self.catalog
1527 .rollback_to_last_sync()
1528 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1529 if let Ok(mut cache) = self.plan_cache.lock() {
1530 cache.clear();
1531 }
1532 self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1533 .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1534 Ok(QueryResult::Executed {
1535 message: "transaction rolled back".to_string(),
1536 })
1537 }
1538
1539 PlanNode::IndexScan { table, column, key } => {
1540 let key_value = literal_to_value(key)?;
1541 let tbl = self
1542 .catalog
1543 .get_table(table)
1544 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1545 let columns: Vec<String> =
1546 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1547
1548 if tbl.has_index(column) {
1552 let rids = tbl.index_lookup_all(column, &key_value);
1553 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1554 for rid in rids {
1555 if let Some(data) = tbl.heap.get(rid) {
1556 rows.push(decode_row(&tbl.schema, &data));
1557 }
1558 }
1559 return Ok(QueryResult::Rows { columns, rows });
1560 }
1561
1562 let schema = &tbl.schema;
1570 let fast = FastLayout::new(schema);
1571 let synth_pred = Expr::BinaryOp(
1572 Box::new(Expr::Field(column.clone())),
1573 BinOp::Eq,
1574 Box::new(key.clone()),
1575 );
1576 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1577 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1579 self.catalog
1580 .for_each_row_raw(table, |_rid, data| {
1581 if compiled(data) {
1582 rows.push(decode_row(schema, data));
1583 }
1584 })
1585 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1586 return Ok(QueryResult::Rows { columns, rows });
1587 }
1588
1589 let col_idx =
1591 schema
1592 .column_index(column)
1593 .ok_or_else(|| QueryError::ColumnNotFound {
1594 table: String::new(),
1595 column: column.clone(),
1596 })?;
1597 let rows: Vec<Vec<Value>> = tbl
1598 .scan()
1599 .filter_map(|(_, row)| {
1600 if row[col_idx] == key_value {
1601 Some(row)
1602 } else {
1603 None
1604 }
1605 })
1606 .collect();
1607 Ok(QueryResult::Rows { columns, rows })
1608 }
1609
1610 PlanNode::RangeScan {
1611 table,
1612 column,
1613 start,
1614 end,
1615 } => {
1616 let tbl = self
1617 .catalog
1618 .get_table(table)
1619 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1620 let columns: Vec<String> =
1621 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1622 let schema = &tbl.schema;
1623
1624 let start_val = match start {
1625 Some((expr, _)) => Some(literal_to_value(expr)?),
1626 None => None,
1627 };
1628 let end_val = match end {
1629 Some((expr, _)) => Some(literal_to_value(expr)?),
1630 None => None,
1631 };
1632 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1633 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1634
1635 if tbl.is_index_unique(column) == Some(true) {
1639 if let Some(btree) = tbl.index(column) {
1640 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1641 (Some(s), Some(e)) => btree.range(s, e).collect(),
1642 (Some(s), None) => btree.range_from(s),
1643 (None, Some(e)) => btree.range_to(e),
1644 (None, None) => {
1645 let rows: Vec<Vec<Value>> =
1646 tbl.scan().map(|(_, row)| row).collect();
1647 return Ok(QueryResult::Rows { columns, rows });
1648 }
1649 };
1650 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1651 for (key, rid) in hits {
1652 if !start_inclusive {
1653 if let Some(ref s) = start_val {
1654 if &key == s {
1655 continue;
1656 }
1657 }
1658 }
1659 if !end_inclusive {
1660 if let Some(ref e) = end_val {
1661 if &key == e {
1662 continue;
1663 }
1664 }
1665 }
1666 if let Some(data) = tbl.heap.get(rid) {
1667 rows.push(decode_row(schema, &data));
1668 }
1669 }
1670 return Ok(QueryResult::Rows { columns, rows });
1671 }
1672 }
1673
1674 let fast = FastLayout::new(schema);
1676 let synth = synthesize_range_predicate(column, start, end);
1677 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1678 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1679 self.catalog
1680 .for_each_row_raw(table, |_rid, data| {
1681 if compiled(data) {
1682 rows.push(decode_row(schema, data));
1683 }
1684 })
1685 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1686 return Ok(QueryResult::Rows { columns, rows });
1687 }
1688
1689 let col_idx =
1690 schema
1691 .column_index(column)
1692 .ok_or_else(|| QueryError::ColumnNotFound {
1693 table: String::new(),
1694 column: column.clone(),
1695 })?;
1696 let rows: Vec<Vec<Value>> = tbl
1697 .scan()
1698 .filter(|(_, row)| {
1699 range_matches(
1700 &row[col_idx],
1701 &start_val,
1702 start_inclusive,
1703 &end_val,
1704 end_inclusive,
1705 )
1706 })
1707 .map(|(_, row)| row)
1708 .collect();
1709 Ok(QueryResult::Rows { columns, rows })
1710 }
1711 }
1712 }
1713
1714 fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1719 if self.view_registry.is_view(name) {
1720 return Err(QueryError::ViewError(format!(
1721 "materialized view '{name}' already exists"
1722 )));
1723 }
1724 let result = self.execute_powql(query_text)?;
1726 let (columns, rows) = match result {
1727 QueryResult::Rows { columns, rows } => (columns, rows),
1728 _ => return Err("view source query must be a SELECT".into()),
1729 };
1730 let schema = self.derive_view_schema(name, &columns, &rows);
1732 self.catalog
1734 .create_table(schema)
1735 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1736 for row in &rows {
1737 self.catalog
1738 .insert(name, row)
1739 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1740 }
1741 let depends_on = self.extract_view_deps(query_text);
1743 self.view_registry
1744 .register(ViewDef {
1745 name: name.to_string(),
1746 query: query_text.to_string(),
1747 depends_on,
1748 dirty: false,
1749 })
1750 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1751 Ok(())
1752 }
1753
1754 fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1757 let def = self
1758 .view_registry
1759 .get(name)
1760 .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1761 let query_text = def.query.clone();
1762 let result = self.execute_powql(&query_text)?;
1764 let (_columns, rows) = match result {
1765 QueryResult::Rows { columns, rows } => (columns, rows),
1766 _ => return Err("view source query must be a SELECT".into()),
1767 };
1768 self.catalog
1772 .scan_delete_matching_logged(name, |_| true)
1773 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1774 for row in &rows {
1775 self.catalog
1776 .insert(name, row)
1777 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1778 }
1779 self.view_registry.mark_clean(name);
1780 Ok(())
1781 }
1782
1783 fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1785 if !self.view_registry.is_view(name) {
1786 return Err(QueryError::ViewError(format!(
1787 "materialized view '{name}' not found"
1788 )));
1789 }
1790 self.view_registry
1791 .unregister(name)
1792 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1793 self.catalog
1794 .drop_table(name)
1795 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1796 Ok(())
1797 }
1798
1799 fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1802 use powdb_storage::types::{ColumnDef, TypeId};
1803 let cols: Vec<ColumnDef> = columns
1804 .iter()
1805 .enumerate()
1806 .map(|(i, col_name)| {
1807 let type_id = rows
1808 .first()
1809 .and_then(|row| row.get(i))
1810 .map(|v| v.type_id())
1811 .unwrap_or(TypeId::Str);
1812 ColumnDef {
1813 name: col_name.clone(),
1814 type_id,
1815 required: false,
1816 position: i as u16,
1817 }
1818 })
1819 .collect();
1820 Schema {
1821 table_name: name.to_string(),
1822 columns: cols,
1823 }
1824 }
1825
1826 fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1829 use crate::parser::parse;
1830 match parse(query_text) {
1831 Ok(Statement::Query(q)) => {
1832 let mut deps = vec![q.source.clone()];
1833 for j in &q.joins {
1834 deps.push(j.source.clone());
1835 }
1836 deps
1837 }
1838 _ => Vec::new(),
1839 }
1840 }
1841
1842 pub(super) fn agg_single_col_fast(
1852 &self,
1853 table: &str,
1854 col: &str,
1855 function: AggFunc,
1856 predicate: Option<&Expr>,
1857 ) -> Result<Option<QueryResult>, QueryError> {
1858 let schema = self
1859 .catalog
1860 .schema(table)
1861 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1862 .clone();
1863 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1864 let col_idx = match schema.column_index(col) {
1865 Some(i) => i,
1866 None => return Ok(None),
1867 };
1868 let col_type = schema.columns[col_idx].type_id;
1875 if col_type != TypeId::Int && col_type != TypeId::Float {
1876 return Ok(None);
1877 }
1878
1879 let fast = FastLayout::new(&schema);
1880 let byte_offset = match fast.fixed_offsets[col_idx] {
1885 Some(o) => o,
1886 None => return Ok(None),
1887 };
1888 let bitmap_byte = col_idx / 8;
1889 let bitmap_bit = (col_idx % 8) as u32;
1890 let data_offset = 2 + fast.bitmap_size + byte_offset;
1891
1892 let compiled_pred: Option<CompiledPredicate> = match predicate {
1894 Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
1895 Some(c) => Some(c),
1896 None => return Ok(None), },
1898 None => None,
1899 };
1900
1901 let result = match col_type {
1928 TypeId::Int => match function {
1929 AggFunc::Sum | AggFunc::Avg => {
1930 let mut sum_i128: i128 = 0;
1931 let mut count: i64 = 0;
1932 agg_int_loop!(
1933 self,
1934 table,
1935 compiled_pred,
1936 bitmap_byte,
1937 bitmap_bit,
1938 data_offset,
1939 |v: i64| {
1940 count += 1;
1941 sum_i128 += v as i128;
1942 }
1943 );
1944 if matches!(function, AggFunc::Sum) {
1945 let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
1946 QueryResult::Scalar(Value::Int(clamped))
1947 } else if count == 0 {
1948 QueryResult::Scalar(Value::Empty)
1949 } else {
1950 let avg = (sum_i128 as f64) / (count as f64);
1951 QueryResult::Scalar(Value::Float(avg))
1952 }
1953 }
1954 AggFunc::Min => {
1955 let mut min_v: Option<i64> = None;
1956 agg_int_loop!(
1957 self,
1958 table,
1959 compiled_pred,
1960 bitmap_byte,
1961 bitmap_bit,
1962 data_offset,
1963 |v: i64| {
1964 min_v = Some(match min_v {
1965 Some(m) => m.min(v),
1966 None => v,
1967 });
1968 }
1969 );
1970 QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
1971 }
1972 AggFunc::Max => {
1973 let mut max_v: Option<i64> = None;
1974 agg_int_loop!(
1975 self,
1976 table,
1977 compiled_pred,
1978 bitmap_byte,
1979 bitmap_bit,
1980 data_offset,
1981 |v: i64| {
1982 max_v = Some(match max_v {
1983 Some(m) => m.max(v),
1984 None => v,
1985 });
1986 }
1987 );
1988 QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
1989 }
1990 AggFunc::Count => {
1991 let mut count: i64 = 0;
1992 agg_int_loop!(
1993 self,
1994 table,
1995 compiled_pred,
1996 bitmap_byte,
1997 bitmap_bit,
1998 data_offset,
1999 |_v: i64| {
2000 count += 1;
2001 }
2002 );
2003 QueryResult::Scalar(Value::Int(count))
2004 }
2005 AggFunc::CountDistinct => {
2006 let mut seen = rustc_hash::FxHashSet::default();
2007 agg_int_loop!(
2008 self,
2009 table,
2010 compiled_pred,
2011 bitmap_byte,
2012 bitmap_bit,
2013 data_offset,
2014 |v: i64| {
2015 seen.insert(v);
2016 }
2017 );
2018 QueryResult::Scalar(Value::Int(seen.len() as i64))
2019 }
2020 },
2021 TypeId::Float => match function {
2022 AggFunc::Sum => {
2023 let mut sum: f64 = 0.0;
2028 agg_float_loop!(
2029 self,
2030 table,
2031 compiled_pred,
2032 bitmap_byte,
2033 bitmap_bit,
2034 data_offset,
2035 |v: f64| {
2036 sum += v;
2037 }
2038 );
2039 QueryResult::Scalar(Value::Float(sum))
2040 }
2041 AggFunc::Avg => {
2042 let mut sum: f64 = 0.0;
2043 let mut count: i64 = 0;
2044 agg_float_loop!(
2045 self,
2046 table,
2047 compiled_pred,
2048 bitmap_byte,
2049 bitmap_bit,
2050 data_offset,
2051 |v: f64| {
2052 sum += v;
2053 count += 1;
2054 }
2055 );
2056 if count == 0 {
2057 QueryResult::Scalar(Value::Empty)
2058 } else {
2059 QueryResult::Scalar(Value::Float(sum / count as f64))
2060 }
2061 }
2062 AggFunc::Min => {
2063 let mut min_v: Option<f64> = None;
2067 agg_float_loop!(
2068 self,
2069 table,
2070 compiled_pred,
2071 bitmap_byte,
2072 bitmap_bit,
2073 data_offset,
2074 |v: f64| {
2075 min_v = Some(match min_v {
2076 Some(m) => {
2077 if v.total_cmp(&m).is_lt() {
2078 v
2079 } else {
2080 m
2081 }
2082 }
2083 None => v,
2084 });
2085 }
2086 );
2087 QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2088 }
2089 AggFunc::Max => {
2090 let mut max_v: Option<f64> = None;
2091 agg_float_loop!(
2092 self,
2093 table,
2094 compiled_pred,
2095 bitmap_byte,
2096 bitmap_bit,
2097 data_offset,
2098 |v: f64| {
2099 max_v = Some(match max_v {
2100 Some(m) => {
2101 if v.total_cmp(&m).is_gt() {
2102 v
2103 } else {
2104 m
2105 }
2106 }
2107 None => v,
2108 });
2109 }
2110 );
2111 QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2112 }
2113 AggFunc::Count => {
2114 let mut count: i64 = 0;
2115 agg_float_loop!(
2116 self,
2117 table,
2118 compiled_pred,
2119 bitmap_byte,
2120 bitmap_bit,
2121 data_offset,
2122 |_v: f64| {
2123 count += 1;
2124 }
2125 );
2126 QueryResult::Scalar(Value::Int(count))
2127 }
2128 AggFunc::CountDistinct => {
2129 let mut seen = rustc_hash::FxHashSet::default();
2135 agg_float_loop!(
2136 self,
2137 table,
2138 compiled_pred,
2139 bitmap_byte,
2140 bitmap_bit,
2141 data_offset,
2142 |v: f64| {
2143 seen.insert(v.to_bits());
2144 }
2145 );
2146 QueryResult::Scalar(Value::Int(seen.len() as i64))
2147 }
2148 },
2149 _ => unreachable!("type guard above restricts to Int/Float"),
2150 };
2151 Ok(Some(result))
2152 }
2153
2154 pub(super) fn project_filter_limit_fast(
2157 &self,
2158 table: &str,
2159 fields: &[ProjectField],
2160 limit: usize,
2161 predicate: Option<&Expr>,
2162 ) -> Result<Option<QueryResult>, QueryError> {
2163 let schema = self
2164 .catalog
2165 .schema(table)
2166 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2167 .clone();
2168 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2169
2170 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2173 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2174 for f in fields {
2175 let name = match &f.expr {
2176 Expr::Field(n) => n.clone(),
2177 _ => return Ok(None),
2178 };
2179 let idx = match all_columns.iter().position(|c| c == &name) {
2180 Some(i) => i,
2181 None => return Ok(None),
2182 };
2183 proj_indices.push(idx);
2184 proj_columns.push(f.alias.clone().unwrap_or(name));
2185 }
2186
2187 let fast = FastLayout::new(&schema);
2188 let row_layout = RowLayout::new(&schema);
2189
2190 let compiled_pred: Option<CompiledPredicate> = match predicate {
2191 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2192 Some(c) => Some(c),
2193 None => return Ok(None),
2194 },
2195 None => None,
2196 };
2197
2198 let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2199 self.catalog
2204 .try_for_each_row_raw(table, |_rid, data| {
2205 use std::ops::ControlFlow;
2206 if let Some(ref pred) = compiled_pred {
2207 if !pred(data) {
2208 return ControlFlow::Continue(());
2209 }
2210 }
2211 let row: Vec<Value> = proj_indices
2212 .iter()
2213 .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2214 .collect();
2215 out.push(row);
2216 if out.len() >= limit {
2217 ControlFlow::Break(())
2218 } else {
2219 ControlFlow::Continue(())
2220 }
2221 })
2222 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2223
2224 Ok(Some(QueryResult::Rows {
2225 columns: proj_columns,
2226 rows: out,
2227 }))
2228 }
2229
2230 pub(super) fn project_filter_sort_limit_fast(
2235 &self,
2236 table: &str,
2237 fields: &[ProjectField],
2238 sort_field: &str,
2239 descending: bool,
2240 limit: usize,
2241 predicate: Option<&Expr>,
2242 ) -> Result<Option<QueryResult>, QueryError> {
2243 if limit == 0 {
2244 return Ok(None);
2247 }
2248 let schema = self
2249 .catalog
2250 .schema(table)
2251 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2252 .clone();
2253 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2254
2255 let sort_idx = match schema.column_index(sort_field) {
2262 Some(i) => i,
2263 None => return Ok(None),
2264 };
2265 let sort_col_type = schema.columns[sort_idx].type_id;
2266 if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2267 return Ok(None);
2268 }
2269
2270 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2272 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2273 for f in fields {
2274 let name = match &f.expr {
2275 Expr::Field(n) => n.clone(),
2276 _ => return Ok(None),
2277 };
2278 let idx = match all_columns.iter().position(|c| c == &name) {
2279 Some(i) => i,
2280 None => return Ok(None),
2281 };
2282 proj_indices.push(idx);
2283 proj_columns.push(f.alias.clone().unwrap_or(name));
2284 }
2285
2286 let fast = FastLayout::new(&schema);
2287 let row_layout = RowLayout::new(&schema);
2288 let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2290 Some(o) => o,
2291 None => return Ok(None),
2292 };
2293 let sort_bitmap_byte = sort_idx / 8;
2294 let sort_bitmap_bit = (sort_idx % 8) as u32;
2295 let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2296
2297 let compiled_pred: Option<CompiledPredicate> = match predicate {
2298 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2299 Some(c) => Some(c),
2300 None => return Ok(None),
2301 },
2302 None => None,
2303 };
2304
2305 let drained: Vec<Vec<u8>> = match sort_col_type {
2314 TypeId::Int => {
2315 let mut seq: u64 = 0;
2316 let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2317 BinaryHeap::with_capacity(limit);
2318 let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2319 BinaryHeap::with_capacity(limit);
2320
2321 self.catalog
2322 .for_each_row_raw(table, |_rid, data| {
2323 if let Some(ref pred) = compiled_pred {
2324 if !pred(data) {
2325 return;
2326 }
2327 }
2328 if data.len() < sort_data_offset + 8 {
2330 return;
2331 }
2332 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2333 if is_null {
2334 return;
2335 }
2336 let key = i64::from_le_bytes(
2337 data[sort_data_offset..sort_data_offset + 8]
2338 .try_into()
2339 .unwrap_or_else(|_| unreachable!()),
2340 );
2341 let id = seq;
2342 seq += 1;
2343
2344 if descending {
2345 if heap_desc.len() < limit {
2346 heap_desc.push(Reverse((key, id, data.to_vec())));
2347 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2348 if key > *top_key {
2349 heap_desc.pop();
2350 heap_desc.push(Reverse((key, id, data.to_vec())));
2351 }
2352 }
2353 } else if heap_asc.len() < limit {
2354 heap_asc.push((key, id, data.to_vec()));
2355 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2356 if key < *top_key {
2357 heap_asc.pop();
2358 heap_asc.push((key, id, data.to_vec()));
2359 }
2360 }
2361 })
2362 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2363
2364 let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2365 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2366 } else {
2367 heap_asc.into_iter().collect()
2368 };
2369 if descending {
2370 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2371 } else {
2372 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2373 }
2374 drained.into_iter().map(|(_, _, d)| d).collect()
2375 }
2376 TypeId::Float => {
2377 let mut seq: u64 = 0;
2386 let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2387 BinaryHeap::with_capacity(limit);
2388 let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2389 BinaryHeap::with_capacity(limit);
2390
2391 self.catalog
2392 .for_each_row_raw(table, |_rid, data| {
2393 if let Some(ref pred) = compiled_pred {
2394 if !pred(data) {
2395 return;
2396 }
2397 }
2398 if data.len() < sort_data_offset + 8 {
2399 return;
2400 }
2401 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2402 if is_null {
2403 return;
2404 }
2405 let bits = u64::from_le_bytes(
2406 data[sort_data_offset..sort_data_offset + 8]
2407 .try_into()
2408 .unwrap_or_else(|_| unreachable!()),
2409 );
2410 let key = f64_bits_to_sortable_u64(bits);
2411 let id = seq;
2412 seq += 1;
2413
2414 if descending {
2415 if heap_desc.len() < limit {
2416 heap_desc.push(Reverse((key, id, data.to_vec())));
2417 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2418 if key > *top_key {
2419 heap_desc.pop();
2420 heap_desc.push(Reverse((key, id, data.to_vec())));
2421 }
2422 }
2423 } else if heap_asc.len() < limit {
2424 heap_asc.push((key, id, data.to_vec()));
2425 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2426 if key < *top_key {
2427 heap_asc.pop();
2428 heap_asc.push((key, id, data.to_vec()));
2429 }
2430 }
2431 })
2432 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2433
2434 let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2435 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2436 } else {
2437 heap_asc.into_iter().collect()
2438 };
2439 if descending {
2440 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2441 } else {
2442 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2443 }
2444 drained.into_iter().map(|(_, _, d)| d).collect()
2445 }
2446 _ => unreachable!("type guard above restricts to Int/Float"),
2447 };
2448
2449 let rows: Vec<Vec<Value>> = drained
2450 .into_iter()
2451 .map(|data| {
2452 proj_indices
2453 .iter()
2454 .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2455 .collect()
2456 })
2457 .collect();
2458
2459 Ok(Some(QueryResult::Rows {
2460 columns: proj_columns,
2461 rows,
2462 }))
2463 }
2464
2465 fn try_fused_scan_update(
2482 &mut self,
2483 table: &str,
2484 predicate: &Expr,
2485 resolved: &[(usize, Value)],
2486 changed_cols: &[usize],
2487 ) -> Option<Result<QueryResult, QueryError>> {
2488 let compiled = {
2491 let schema = self.catalog.schema(table)?;
2492 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2493 let fast = FastLayout::new(schema);
2494 compile_predicate(predicate, &columns, &fast, schema)?
2495 };
2496
2497 let fixed_patches: Option<Vec<FastPatch>> = {
2499 let tbl = self.catalog.get_table(table)?;
2500 let schema = &tbl.schema;
2501 let all_fixed_nonnull = resolved
2502 .iter()
2503 .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2504 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2505 if all_fixed_nonnull && no_indexed {
2506 let layout = RowLayout::new(schema);
2507 let bitmap_size = layout.bitmap_size();
2508 Some(
2509 resolved
2510 .iter()
2511 .map(|(idx, val)| {
2512 let fixed_off = layout
2513 .fixed_offset(*idx)
2514 .expect("is_fixed_size already checked");
2515 let field_off = 2 + bitmap_size + fixed_off;
2516 let bytes: FixedBytes = match val {
2517 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2518 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2519 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2520 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2521 Value::Uuid(v) => FixedBytes::Uuid(*v),
2522 _ => unreachable!("all_fixed_nonnull guard"),
2523 };
2524 FastPatch {
2525 field_off,
2526 bitmap_byte_off: 2 + idx / 8,
2527 bit_mask: 1u8 << (idx % 8),
2528 bytes,
2529 }
2530 })
2531 .collect(),
2532 )
2533 } else {
2534 None
2535 }
2536 };
2537 if let Some(patches) = fixed_patches {
2538 let result = self
2539 .catalog
2540 .scan_patch_matching_logged(table, compiled, |row| {
2541 for p in &patches {
2542 row[p.bitmap_byte_off] &= !p.bit_mask;
2543 let field_bytes = p.bytes.as_slice();
2544 row[p.field_off..p.field_off + field_bytes.len()]
2545 .copy_from_slice(field_bytes);
2546 }
2547 Some(row.len() as u16)
2548 })
2549 .map_err(|e| e.to_string());
2550 match result {
2551 Ok((count, _)) => {
2552 self.view_registry.mark_dependents_dirty(table);
2553 return Some(Ok(QueryResult::Modified(count)));
2554 }
2555 Err(e) => return Some(Err(QueryError::Execution(e))),
2556 }
2557 }
2558
2559 let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2561 let tbl = self.catalog.get_table(table)?;
2562 let schema = &tbl.schema;
2563 let is_single = resolved.len() == 1;
2564 let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2565 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2566 if is_single && is_var && no_indexed {
2567 let (idx, val) = &resolved[0];
2568 let bytes_opt = match val {
2569 Value::Str(s) => Some(s.as_bytes().to_vec()),
2570 Value::Bytes(b) => Some(b.clone()),
2571 Value::Empty => None,
2572 _ => return None, };
2574 Some((*idx, bytes_opt))
2575 } else {
2576 None
2577 }
2578 };
2579 if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2580 let layout = {
2582 let schema = self.catalog.schema(table)?;
2583 RowLayout::new(schema)
2584 };
2585 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2586 let result = self
2587 .catalog
2588 .scan_patch_matching_logged(table, compiled, |row| {
2589 patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2590 })
2591 .map_err(|e| e.to_string());
2592 match result {
2593 Ok((mut count, fallback_rids)) => {
2594 for rid in fallback_rids {
2596 let mut row = match self.catalog.get(table, rid) {
2597 Some(r) => r,
2598 None => continue,
2599 };
2600 for (idx, val) in resolved.iter() {
2601 row[*idx] = val.clone();
2602 }
2603 self.catalog
2604 .update_hinted(table, rid, &row, Some(changed_cols))
2605 .map_err(|e| e.to_string())
2606 .ok();
2607 count += 1;
2608 }
2609 self.view_registry.mark_dependents_dirty(table);
2610 return Some(Ok(QueryResult::Modified(count)));
2611 }
2612 Err(e) => return Some(Err(QueryError::Execution(e))),
2613 }
2614 }
2615
2616 None }
2618
2619 fn collect_rids_for_mutation(
2625 &mut self,
2626 input: &PlanNode,
2627 table: &str,
2628 ) -> Result<Vec<RowId>, QueryError> {
2629 match input {
2630 PlanNode::SeqScan { table: t } if t == table => {
2631 let rids: Vec<RowId> = self
2633 .catalog
2634 .scan(table)
2635 .map_err(|e| QueryError::StorageError(e.to_string()))?
2636 .map(|(rid, _)| rid)
2637 .collect();
2638 Ok(rids)
2639 }
2640 PlanNode::IndexScan {
2641 table: t,
2642 column,
2643 key,
2644 } if t == table => {
2645 let key_value = literal_to_value(key)?;
2646
2647 {
2656 let tbl = self
2657 .catalog
2658 .get_table(table)
2659 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2660 if tbl.has_index(column) {
2661 let rids = tbl.index_lookup_all(column, &key_value);
2662 return Ok(rids);
2663 }
2664 }
2665
2666 let schema = self
2671 .catalog
2672 .schema(table)
2673 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2674 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2675 let fast = FastLayout::new(schema);
2676 let synth = Expr::BinaryOp(
2677 Box::new(Expr::Field(column.clone())),
2678 BinOp::Eq,
2679 Box::new(key.clone()),
2680 );
2681 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2682 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2684 self.catalog
2685 .for_each_row_raw(table, |rid, data| {
2686 if compiled(data) {
2687 rids.push(rid);
2688 }
2689 })
2690 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2691 return Ok(rids);
2692 }
2693
2694 let col_idx =
2696 schema
2697 .column_index(column)
2698 .ok_or_else(|| QueryError::ColumnNotFound {
2699 table: String::new(),
2700 column: column.clone(),
2701 })?;
2702 let rids: Vec<RowId> = self
2703 .catalog
2704 .scan(table)
2705 .map_err(|e| QueryError::StorageError(e.to_string()))?
2706 .filter_map(|(rid, row)| {
2707 if row[col_idx] == key_value {
2708 Some(rid)
2709 } else {
2710 None
2711 }
2712 })
2713 .collect();
2714 Ok(rids)
2715 }
2716 PlanNode::Filter {
2717 input: inner,
2718 predicate,
2719 } => {
2720 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2721 if t != table {
2722 return self.generic_rid_match(input, table);
2723 }
2724 let schema = self
2725 .catalog
2726 .schema(table)
2727 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2728 let columns: Vec<String> =
2729 schema.columns.iter().map(|c| c.name.clone()).collect();
2730 let fast = FastLayout::new(schema);
2731 let row_layout = RowLayout::new(schema);
2732
2733 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2735 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2737 self.catalog
2738 .for_each_row_raw(table, |rid, data| {
2739 if compiled(data) {
2740 rids.push(rid);
2741 }
2742 })
2743 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2744 return Ok(rids);
2745 }
2746
2747 let pred_cols = predicate_column_indices(predicate, &columns);
2749 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2750 self.catalog
2751 .for_each_row_raw(table, |rid, data| {
2752 let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2753 if eval_predicate(predicate, &pred_row, &columns) {
2754 rids.push(rid);
2755 }
2756 })
2757 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2758 return Ok(rids);
2759 }
2760 self.generic_rid_match(input, table)
2761 }
2762 _ => self.generic_rid_match(input, table),
2763 }
2764 }
2765
2766 fn generic_rid_match(
2770 &mut self,
2771 input: &PlanNode,
2772 table: &str,
2773 ) -> Result<Vec<RowId>, QueryError> {
2774 let result = self.execute_plan(input)?;
2775 let rows = match result {
2776 QueryResult::Rows { rows, .. } => rows,
2777 _ => return Err("mutation source must be rows".into()),
2778 };
2779 let matching: Vec<RowId> = self
2780 .catalog
2781 .scan(table)
2782 .map_err(|e| QueryError::StorageError(e.to_string()))?
2783 .filter(|(_, row)| rows.iter().any(|r| r == row))
2784 .map(|(rid, _)| rid)
2785 .collect();
2786 Ok(matching)
2787 }
2788}
2789
2790pub(super) fn execute_window(
2791 result: QueryResult,
2792 windows: &[WindowDef],
2793) -> Result<QueryResult, QueryError> {
2794 let (mut columns, mut rows) = match result {
2795 QueryResult::Rows { columns, rows } => (columns, rows),
2796 _ => return Err("window function requires row input".into()),
2797 };
2798
2799 for wdef in windows {
2800 let part_indices: Vec<usize> = wdef
2802 .partition_by
2803 .iter()
2804 .map(|name| {
2805 columns
2806 .iter()
2807 .position(|c| c == name)
2808 .ok_or_else(|| format!("window partition column '{name}' not found"))
2809 })
2810 .collect::<Result<Vec<_>, _>>()?;
2811
2812 let ord_indices: Vec<(usize, bool)> = wdef
2813 .order_by
2814 .iter()
2815 .map(|sk| {
2816 columns
2817 .iter()
2818 .position(|c| c == &sk.field)
2819 .map(|i| (i, sk.descending))
2820 .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2821 })
2822 .collect::<Result<Vec<_>, _>>()?;
2823
2824 let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2826 match arg {
2827 Expr::Field(name) => {
2828 if name == "*" {
2829 None } else {
2831 Some(
2832 columns
2833 .iter()
2834 .position(|c| c == name)
2835 .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2836 )
2837 }
2838 }
2839 _ => None,
2840 }
2841 } else {
2842 None
2843 };
2844
2845 let n = rows.len();
2849 let mut indices: Vec<usize> = (0..n).collect();
2850 indices.sort_by(|&a, &b| {
2851 for &pi in &part_indices {
2853 let cmp = rows[a][pi].cmp(&rows[b][pi]);
2854 if cmp != std::cmp::Ordering::Equal {
2855 return cmp;
2856 }
2857 }
2858 for &(oi, desc) in &ord_indices {
2860 let cmp = rows[a][oi].cmp(&rows[b][oi]);
2861 if cmp != std::cmp::Ordering::Equal {
2862 return if desc { cmp.reverse() } else { cmp };
2863 }
2864 }
2865 std::cmp::Ordering::Equal
2866 });
2867
2868 let mut win_values: Vec<Value> = vec![Value::Empty; n];
2870 let mut partition_start = 0usize;
2871 let mut running_count: i64 = 0;
2873 let mut running_int_sum: i64 = 0;
2874 let mut running_float_sum: f64 = 0.0;
2875 let mut running_saw_float = false;
2876 let mut running_min: Option<Value> = None;
2877 let mut running_max: Option<Value> = None;
2878 let mut rank_counter: i64 = 0;
2879 let mut dense_rank_counter: i64 = 0;
2880 let mut prev_order_key: Option<Vec<Value>> = None;
2881 let mut same_rank_count: i64 = 0;
2882
2883 for sorted_pos in 0..n {
2884 let row_idx = indices[sorted_pos];
2885
2886 let new_partition = if sorted_pos == 0 {
2888 true
2889 } else {
2890 let prev_row_idx = indices[sorted_pos - 1];
2891 part_indices
2892 .iter()
2893 .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
2894 };
2895
2896 if new_partition {
2897 partition_start = sorted_pos;
2898 running_count = 0;
2899 running_int_sum = 0;
2900 running_float_sum = 0.0;
2901 running_saw_float = false;
2902 running_min = None;
2903 running_max = None;
2904 rank_counter = 0;
2905 dense_rank_counter = 0;
2906 prev_order_key = None;
2907 same_rank_count = 0;
2908 }
2909
2910 let current_order_key: Vec<Value> = ord_indices
2912 .iter()
2913 .map(|&(oi, _)| rows[row_idx][oi].clone())
2914 .collect();
2915 let same_as_prev = prev_order_key.as_ref() == Some(¤t_order_key);
2916
2917 let value = match wdef.function {
2918 WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
2919 WindowFunc::Rank => {
2920 if same_as_prev {
2921 same_rank_count += 1;
2922 } else {
2923 rank_counter += same_rank_count + 1;
2924 same_rank_count = 0;
2925 if rank_counter == 0 {
2926 rank_counter = 1;
2927 }
2928 }
2929 Value::Int(rank_counter)
2930 }
2931 WindowFunc::DenseRank => {
2932 if !same_as_prev {
2933 dense_rank_counter += 1;
2934 }
2935 Value::Int(dense_rank_counter)
2936 }
2937 WindowFunc::Sum => {
2938 if let Some(ci) = arg_col_idx {
2939 match &rows[row_idx][ci] {
2940 Value::Int(v) => running_int_sum += v,
2941 Value::Float(v) => {
2942 running_float_sum += v;
2943 running_saw_float = true;
2944 }
2945 _ => {}
2946 }
2947 }
2948 if running_saw_float {
2949 Value::Float(running_float_sum + running_int_sum as f64)
2950 } else {
2951 Value::Int(running_int_sum)
2952 }
2953 }
2954 WindowFunc::Avg => {
2955 if let Some(ci) = arg_col_idx {
2956 match &rows[row_idx][ci] {
2957 Value::Int(v) => {
2958 running_float_sum += *v as f64;
2959 running_count += 1;
2960 }
2961 Value::Float(v) => {
2962 running_float_sum += v;
2963 running_count += 1;
2964 }
2965 _ => {}
2966 }
2967 }
2968 if running_count == 0 {
2969 Value::Empty
2970 } else {
2971 Value::Float(running_float_sum / running_count as f64)
2972 }
2973 }
2974 WindowFunc::Count => {
2975 if let Some(ci) = arg_col_idx {
2976 if !rows[row_idx][ci].is_empty() {
2977 running_count += 1;
2978 }
2979 } else {
2980 running_count += 1;
2982 }
2983 Value::Int(running_count)
2984 }
2985 WindowFunc::Min => {
2986 if let Some(ci) = arg_col_idx {
2987 let v = &rows[row_idx][ci];
2988 if !v.is_empty() {
2989 running_min = Some(match &running_min {
2990 None => v.clone(),
2991 Some(cur) => {
2992 if v < cur {
2993 v.clone()
2994 } else {
2995 cur.clone()
2996 }
2997 }
2998 });
2999 }
3000 }
3001 running_min.clone().unwrap_or(Value::Empty)
3002 }
3003 WindowFunc::Max => {
3004 if let Some(ci) = arg_col_idx {
3005 let v = &rows[row_idx][ci];
3006 if !v.is_empty() {
3007 running_max = Some(match &running_max {
3008 None => v.clone(),
3009 Some(cur) => {
3010 if v > cur {
3011 v.clone()
3012 } else {
3013 cur.clone()
3014 }
3015 }
3016 });
3017 }
3018 }
3019 running_max.clone().unwrap_or(Value::Empty)
3020 }
3021 };
3022
3023 prev_order_key = Some(current_order_key);
3024 win_values[row_idx] = value;
3025 }
3026
3027 for (ri, row) in rows.iter_mut().enumerate() {
3029 row.push(win_values[ri].clone());
3030 }
3031 columns.push(wdef.output_name.clone());
3032 }
3033
3034 Ok(QueryResult::Rows { columns, rows })
3035}
3036
3037pub(super) fn compute_group_aggregate(
3039 func: AggFunc,
3040 all_rows: &[Vec<Value>],
3041 row_indices: &[usize],
3042 col_idx: usize,
3043) -> Value {
3044 match func {
3045 AggFunc::Count => {
3046 if col_idx == usize::MAX {
3047 return Value::Int(row_indices.len() as i64);
3049 }
3050 let count = row_indices
3051 .iter()
3052 .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3053 .count();
3054 Value::Int(count as i64)
3055 }
3056 AggFunc::CountDistinct => {
3057 let mut seen = std::collections::HashSet::new();
3058 for &ri in row_indices {
3059 let v = &all_rows[ri][col_idx];
3060 if !v.is_empty() {
3061 seen.insert(v.clone());
3062 }
3063 }
3064 Value::Int(seen.len() as i64)
3065 }
3066 AggFunc::Sum => {
3067 let mut int_sum: i64 = 0;
3072 let mut float_sum: f64 = 0.0;
3073 let mut saw_float = false;
3074 for &ri in row_indices {
3075 match &all_rows[ri][col_idx] {
3076 Value::Int(v) => int_sum += v,
3077 Value::Float(v) => {
3078 float_sum += *v;
3079 saw_float = true;
3080 }
3081 _ => {}
3082 }
3083 }
3084 if saw_float {
3085 Value::Float(float_sum + int_sum as f64)
3086 } else {
3087 Value::Int(int_sum)
3088 }
3089 }
3090 AggFunc::Avg => {
3091 let mut sum = 0.0f64;
3092 let mut count = 0usize;
3093 for &ri in row_indices {
3094 match &all_rows[ri][col_idx] {
3095 Value::Int(v) => {
3096 sum += *v as f64;
3097 count += 1;
3098 }
3099 Value::Float(v) => {
3100 sum += *v;
3101 count += 1;
3102 }
3103 _ => {}
3104 }
3105 }
3106 if count == 0 {
3107 Value::Empty
3108 } else {
3109 Value::Float(sum / count as f64)
3110 }
3111 }
3112 AggFunc::Min => row_indices
3113 .iter()
3114 .map(|&ri| &all_rows[ri][col_idx])
3115 .filter(|v| !v.is_empty())
3116 .min()
3117 .cloned()
3118 .unwrap_or(Value::Empty),
3119 AggFunc::Max => row_indices
3120 .iter()
3121 .map(|&ri| &all_rows[ri][col_idx])
3122 .filter(|v| !v.is_empty())
3123 .max()
3124 .cloned()
3125 .unwrap_or(Value::Empty),
3126 }
3127}
3128
3129pub(super) fn try_extract_equi_join_keys(
3143 pred: &Expr,
3144 left_columns: &[String],
3145 right_columns: &[String],
3146) -> Option<(usize, usize)> {
3147 let (lhs, op, rhs) = match pred {
3148 Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3149 _ => return None,
3150 };
3151 if op != BinOp::Eq {
3152 return None;
3153 }
3154 if let (Some(li), Some(ri)) = (
3156 resolve_side_column(lhs, left_columns),
3157 resolve_side_column(rhs, right_columns),
3158 ) {
3159 return Some((li, ri));
3160 }
3161 if let (Some(li), Some(ri)) = (
3164 resolve_side_column(rhs, left_columns),
3165 resolve_side_column(lhs, right_columns),
3166 ) {
3167 return Some((li, ri));
3168 }
3169 None
3170}
3171
3172fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3173 match expr {
3174 Expr::QualifiedField { qualifier, field } => {
3175 let q = qualifier.as_bytes();
3180 let f = field.as_bytes();
3181 columns.iter().position(|c| {
3182 let b = c.as_bytes();
3183 b.len() == q.len() + 1 + f.len()
3184 && b[..q.len()] == *q
3185 && b[q.len()] == b'.'
3186 && b[q.len() + 1..] == *f
3187 })
3188 }
3189 Expr::Field(name) => columns.iter().position(|c| c == name),
3190 _ => None,
3191 }
3192}
3193
3194pub(super) fn hash_join(
3206 left_columns: Vec<String>,
3207 left_rows: Vec<Vec<Value>>,
3208 right_columns: Vec<String>,
3209 right_rows: Vec<Vec<Value>>,
3210 left_key_idx: usize,
3211 right_key_idx: usize,
3212 kind: JoinKind,
3213) -> QueryResult {
3214 use rustc_hash::FxHashMap;
3215
3216 let n_left = left_columns.len();
3217 let n_right = right_columns.len();
3218 let mut columns = Vec::with_capacity(n_left + n_right);
3219 columns.extend(left_columns);
3220 columns.extend(right_columns);
3221
3222 let mut build: FxHashMap<Value, Vec<usize>> =
3225 FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3226 for (i, row) in right_rows.iter().enumerate() {
3227 if matches!(row[right_key_idx], Value::Empty) {
3231 continue;
3232 }
3233 build.entry(row[right_key_idx].clone()).or_default().push(i);
3234 }
3235
3236 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3239
3240 for left_row in &left_rows {
3241 let key = &left_row[left_key_idx];
3242 let matched = if matches!(key, Value::Empty) {
3243 None
3244 } else {
3245 build.get(key)
3246 };
3247 match matched {
3248 Some(matches) if !matches.is_empty() => {
3249 for &ri in matches {
3250 let right_row = &right_rows[ri];
3251 let mut combined = Vec::with_capacity(n_left + n_right);
3252 combined.extend_from_slice(left_row);
3253 combined.extend_from_slice(right_row);
3254 rows.push(combined);
3255 }
3256 }
3257 _ => {
3258 if matches!(kind, JoinKind::LeftOuter) {
3259 let mut row = Vec::with_capacity(n_left + n_right);
3260 row.extend_from_slice(left_row);
3261 row.resize(n_left + n_right, Value::Empty);
3262 rows.push(row);
3263 }
3264 }
3265 }
3266 }
3267
3268 QueryResult::Rows { columns, rows }
3269}
3270
3271pub(super) fn lower_unindexed_range_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3284 match plan {
3285 PlanNode::RangeScan {
3286 table,
3287 column,
3288 start,
3289 end,
3290 } => {
3291 if let Some(tbl) = catalog.get_table(table) {
3292 if tbl.is_index_unique(column) == Some(true) {
3297 return plan.clone();
3298 }
3299 }
3300 let pred = synthesize_range_predicate(column, start, end);
3301 PlanNode::Filter {
3302 input: Box::new(PlanNode::SeqScan {
3303 table: table.clone(),
3304 }),
3305 predicate: pred,
3306 }
3307 }
3308 PlanNode::Filter { input, predicate } => PlanNode::Filter {
3309 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3310 predicate: predicate.clone(),
3311 },
3312 PlanNode::Project { input, fields } => PlanNode::Project {
3313 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3314 fields: fields.clone(),
3315 },
3316 PlanNode::Sort { input, keys } => PlanNode::Sort {
3317 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3318 keys: keys.clone(),
3319 },
3320 PlanNode::Limit { input, count } => PlanNode::Limit {
3321 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3322 count: count.clone(),
3323 },
3324 PlanNode::Offset { input, count } => PlanNode::Offset {
3325 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3326 count: count.clone(),
3327 },
3328 PlanNode::Aggregate {
3329 input,
3330 function,
3331 field,
3332 } => PlanNode::Aggregate {
3333 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3334 function: *function,
3335 field: field.clone(),
3336 },
3337 PlanNode::Distinct { input } => PlanNode::Distinct {
3338 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3339 },
3340 PlanNode::GroupBy {
3341 input,
3342 keys,
3343 aggregates,
3344 having,
3345 } => PlanNode::GroupBy {
3346 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3347 keys: keys.clone(),
3348 aggregates: aggregates.clone(),
3349 having: having.clone(),
3350 },
3351 PlanNode::Update {
3352 input,
3353 table,
3354 assignments,
3355 } => PlanNode::Update {
3356 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3357 table: table.clone(),
3358 assignments: assignments.clone(),
3359 },
3360 PlanNode::Delete { input, table } => PlanNode::Delete {
3361 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3362 table: table.clone(),
3363 },
3364 PlanNode::Window { input, windows } => PlanNode::Window {
3365 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3366 windows: windows.clone(),
3367 },
3368 PlanNode::Union { left, right, all } => PlanNode::Union {
3369 left: Box::new(lower_unindexed_range_scans(catalog, left)),
3370 right: Box::new(lower_unindexed_range_scans(catalog, right)),
3371 all: *all,
3372 },
3373 PlanNode::Explain { input } => PlanNode::Explain {
3374 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3375 },
3376 PlanNode::NestedLoopJoin {
3377 left,
3378 right,
3379 on,
3380 kind,
3381 } => PlanNode::NestedLoopJoin {
3382 left: Box::new(lower_unindexed_range_scans(catalog, left)),
3383 right: Box::new(lower_unindexed_range_scans(catalog, right)),
3384 on: on.clone(),
3385 kind: *kind,
3386 },
3387 _ => plan.clone(),
3389 }
3390}
3391
3392pub(super) fn synthesize_range_predicate(
3394 column: &str,
3395 start: &Option<(Expr, bool)>,
3396 end: &Option<(Expr, bool)>,
3397) -> Expr {
3398 let lower = start.as_ref().map(|(expr, inclusive)| {
3399 let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3400 Expr::BinaryOp(
3401 Box::new(Expr::Field(column.to_string())),
3402 op,
3403 Box::new(expr.clone()),
3404 )
3405 });
3406 let upper = end.as_ref().map(|(expr, inclusive)| {
3407 let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3408 Expr::BinaryOp(
3409 Box::new(Expr::Field(column.to_string())),
3410 op,
3411 Box::new(expr.clone()),
3412 )
3413 });
3414 match (lower, upper) {
3415 (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3416 (Some(l), None) => l,
3417 (None, Some(u)) => u,
3418 (None, None) => Expr::Literal(Literal::Bool(true)),
3419 }
3420}
3421
3422pub(super) fn range_matches(
3424 val: &Value,
3425 start: &Option<Value>,
3426 start_inc: bool,
3427 end: &Option<Value>,
3428 end_inc: bool,
3429) -> bool {
3430 if let Some(ref s) = start {
3431 if start_inc {
3432 if val < s {
3433 return false;
3434 }
3435 } else if val <= s {
3436 return false;
3437 }
3438 }
3439 if let Some(ref e) = end {
3440 if end_inc {
3441 if val > e {
3442 return false;
3443 }
3444 } else if val >= e {
3445 return false;
3446 }
3447 }
3448 true
3449}
3450
3451pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3454 let indent = " ".repeat(depth);
3455 match plan {
3456 PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3457 PlanNode::AliasScan { table, alias } => {
3458 format!("{indent}AliasScan table={table} alias={alias}")
3459 }
3460 PlanNode::IndexScan { table, column, key } => {
3461 format!("{indent}IndexScan table={table} column={column} key={key:?}")
3462 }
3463 PlanNode::RangeScan {
3464 table,
3465 column,
3466 start,
3467 end,
3468 } => {
3469 let s = match start {
3470 Some((expr, inc)) => {
3471 let op = if *inc { ">=" } else { ">" };
3472 format!("{op}{expr:?}")
3473 }
3474 None => "unbounded".to_string(),
3475 };
3476 let e = match end {
3477 Some((expr, inc)) => {
3478 let op = if *inc { "<=" } else { "<" };
3479 format!("{op}{expr:?}")
3480 }
3481 None => "unbounded".to_string(),
3482 };
3483 format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3484 }
3485 PlanNode::Filter { input, predicate } => {
3486 let child = format_plan_tree(input, depth + 1);
3487 format!("{indent}Filter predicate={predicate:?}\n{child}")
3488 }
3489 PlanNode::Project { input, fields } => {
3490 let names: Vec<String> = fields
3491 .iter()
3492 .map(|f| match &f.alias {
3493 Some(a) => format!("{a}: {:?}", f.expr),
3494 None => format!("{:?}", f.expr),
3495 })
3496 .collect();
3497 let child = format_plan_tree(input, depth + 1);
3498 format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3499 }
3500 PlanNode::Sort { input, keys } => {
3501 let ks: Vec<String> = keys
3502 .iter()
3503 .map(|k| {
3504 if k.descending {
3505 format!("{} desc", k.field)
3506 } else {
3507 k.field.clone()
3508 }
3509 })
3510 .collect();
3511 let child = format_plan_tree(input, depth + 1);
3512 format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3513 }
3514 PlanNode::Limit { input, count } => {
3515 let child = format_plan_tree(input, depth + 1);
3516 format!("{indent}Limit count={count:?}\n{child}")
3517 }
3518 PlanNode::Offset { input, count } => {
3519 let child = format_plan_tree(input, depth + 1);
3520 format!("{indent}Offset count={count:?}\n{child}")
3521 }
3522 PlanNode::Aggregate {
3523 input,
3524 function,
3525 field,
3526 } => {
3527 let f = field.as_deref().unwrap_or("*");
3528 let child = format_plan_tree(input, depth + 1);
3529 format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3530 }
3531 PlanNode::NestedLoopJoin {
3532 left,
3533 right,
3534 on,
3535 kind,
3536 } => {
3537 let left_child = format_plan_tree(left, depth + 1);
3538 let right_child = format_plan_tree(right, depth + 1);
3539 let on_str = match on {
3540 Some(pred) => format!("{pred:?}"),
3541 None => "none".to_string(),
3542 };
3543 format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3544 }
3545 PlanNode::Distinct { input } => {
3546 let child = format_plan_tree(input, depth + 1);
3547 format!("{indent}Distinct\n{child}")
3548 }
3549 PlanNode::GroupBy {
3550 input,
3551 keys,
3552 aggregates,
3553 having,
3554 } => {
3555 let agg_strs: Vec<String> = aggregates
3556 .iter()
3557 .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3558 .collect();
3559 let having_str = match having {
3560 Some(h) => format!(" having={h:?}"),
3561 None => String::new(),
3562 };
3563 let child = format_plan_tree(input, depth + 1);
3564 format!(
3565 "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3566 keys.join(", "),
3567 agg_strs.join(", "),
3568 )
3569 }
3570 PlanNode::Insert { table, assignments } => {
3571 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3572 format!("{indent}Insert table={table} cols=[{}]", cols.join(", "))
3573 }
3574 PlanNode::Upsert {
3575 table,
3576 key_column,
3577 assignments,
3578 on_conflict,
3579 } => {
3580 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3581 let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3582 if conflict_cols.is_empty() {
3583 format!(
3584 "{indent}Upsert table={table} key={key_column} cols=[{}]",
3585 cols.join(", ")
3586 )
3587 } else {
3588 format!(
3589 "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3590 cols.join(", "),
3591 conflict_cols.join(", ")
3592 )
3593 }
3594 }
3595 PlanNode::Update {
3596 input,
3597 table,
3598 assignments,
3599 } => {
3600 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3601 let child = format_plan_tree(input, depth + 1);
3602 format!(
3603 "{indent}Update table={table} set=[{}]\n{child}",
3604 cols.join(", ")
3605 )
3606 }
3607 PlanNode::Delete { input, table } => {
3608 let child = format_plan_tree(input, depth + 1);
3609 format!("{indent}Delete table={table}\n{child}")
3610 }
3611 PlanNode::CreateTable { name, fields } => {
3612 let fs: Vec<String> = fields
3613 .iter()
3614 .map(|(n, t, r)| {
3615 if *r {
3616 format!("{n}: {t} required")
3617 } else {
3618 format!("{n}: {t}")
3619 }
3620 })
3621 .collect();
3622 format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3623 }
3624 PlanNode::AlterTable { table, action } => {
3625 format!("{indent}AlterTable table={table} action={action:?}")
3626 }
3627 PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3628 PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3629 PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3630 PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3631 PlanNode::Window { input, windows } => {
3632 let ws: Vec<String> = windows
3633 .iter()
3634 .map(|w| format!("{:?} as {}", w.function, w.output_name))
3635 .collect();
3636 let child = format_plan_tree(input, depth + 1);
3637 format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3638 }
3639 PlanNode::Union { left, right, all } => {
3640 let kind = if *all { "UNION ALL" } else { "UNION" };
3641 let left_child = format_plan_tree(left, depth + 1);
3642 let right_child = format_plan_tree(right, depth + 1);
3643 format!("{indent}{kind}\n{left_child}\n{right_child}")
3644 }
3645 PlanNode::Explain { input } => {
3646 let child = format_plan_tree(input, depth + 1);
3647 format!("{indent}Explain\n{child}")
3648 }
3649 PlanNode::Begin => format!("{indent}Begin"),
3650 PlanNode::Commit => format!("{indent}Commit"),
3651 PlanNode::Rollback => format!("{indent}Rollback"),
3652 }
3653}