1use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::ViewDef;
16
17impl Engine {
18 pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19 match plan {
20 PlanNode::SeqScan { table } => {
21 if self.view_registry.is_dirty(table) {
23 self.refresh_view(table)?;
24 }
25 let schema = self
26 .catalog
27 .schema(table)
28 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29 .clone();
30 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31 let rows: Vec<Vec<Value>> = self
32 .catalog
33 .scan(table)
34 .map_err(|e| QueryError::StorageError(e.to_string()))?
35 .map(|(_, row)| row)
36 .collect();
37 Ok(QueryResult::Rows { columns, rows })
38 }
39
40 PlanNode::Filter { input, predicate } => {
41 let materialized;
45 let predicate = if contains_subquery(predicate) {
46 materialized = self.materialize_subqueries(predicate)?;
47 &materialized
48 } else {
49 predicate
50 };
51
52 if contains_subquery(predicate) {
54 let result = self.execute_plan(input)?;
55 return match result {
56 QueryResult::Rows { columns, rows } => {
57 let mut filtered = Vec::new();
58 for row in rows {
59 let row_pred =
60 self.materialize_correlated_for_row(predicate, &row, &columns)?;
61 if eval_predicate(&row_pred, &row, &columns) {
62 filtered.push(row);
63 }
64 }
65 Ok(QueryResult::Rows {
66 columns,
67 rows: filtered,
68 })
69 }
70 _ => Err("filter requires row input".into()),
71 };
72 }
73
74 if let PlanNode::SeqScan { table } = input.as_ref() {
79 if self.view_registry.is_dirty(table) {
81 self.refresh_view(table)?;
82 }
83 let schema = self
84 .catalog
85 .schema(table)
86 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87 .clone();
88 let columns: Vec<String> =
89 schema.columns.iter().map(|c| c.name.clone()).collect();
90 let fast = FastLayout::new(&schema);
91 let row_layout = RowLayout::new(&schema);
92 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100 self.catalog
101 .for_each_row_raw(table, |_rid, data| {
102 if compiled(data) {
103 rows.push(decode_row(&schema, data));
104 }
105 })
106 .map_err(|e| QueryError::StorageError(e.to_string()))?;
107 } else {
108 let pred_cols = predicate_column_indices(predicate, &columns);
109 self.catalog
110 .for_each_row_raw(table, |_rid, data| {
111 let pred_row =
112 decode_selective(&schema, &row_layout, data, &pred_cols);
113 if eval_predicate(predicate, &pred_row, &columns) {
114 rows.push(decode_row(&schema, data));
115 }
116 })
117 .map_err(|e| QueryError::StorageError(e.to_string()))?;
118 }
119
120 return Ok(QueryResult::Rows { columns, rows });
121 }
122
123 let result = self.execute_plan(input)?;
125 match result {
126 QueryResult::Rows { columns, rows } => {
127 let filtered: Vec<Vec<Value>> = rows
128 .into_iter()
129 .filter(|row| eval_predicate(predicate, row, &columns))
130 .collect();
131 Ok(QueryResult::Rows {
132 columns,
133 rows: filtered,
134 })
135 }
136 _ => Err("filter requires row input".into()),
137 }
138 }
139
140 PlanNode::Project { input, fields } => {
141 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144 let schema = self
145 .catalog
146 .schema(table)
147 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148 .clone();
149 let all_columns: Vec<String> =
150 schema.columns.iter().map(|c| c.name.clone()).collect();
151 let key_value = literal_to_value(key)?;
152 let tbl = self
153 .catalog
154 .get_table(table)
155 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157 let proj_columns: Vec<String> = fields
158 .iter()
159 .map(|f| {
160 f.alias.clone().unwrap_or_else(|| match &f.expr {
161 Expr::Field(name) => name.clone(),
162 _ => "?".into(),
163 })
164 })
165 .collect();
166
167 let proj_indices: Vec<usize> = fields
169 .iter()
170 .filter_map(|f| {
171 if let Expr::Field(name) = &f.expr {
172 all_columns.iter().position(|c| c == name)
173 } else {
174 None
175 }
176 })
177 .collect();
178
179 if let Some(btree) = tbl.index(column) {
180 let layout = RowLayout::new(&schema);
181 let lookup_result = match &key_value {
185 Value::Int(k) => btree.lookup_int(*k),
186 other => btree.lookup(other),
187 };
188 let rows = match lookup_result {
189 Some(rid) => match tbl.heap.get(rid) {
190 Some(data) => {
191 let row: Vec<Value> = proj_indices
192 .iter()
193 .map(|&ci| decode_column(&schema, &layout, &data, ci))
194 .collect();
195 vec![row]
196 }
197 None => Vec::new(),
198 },
199 None => Vec::new(),
200 };
201 return Ok(QueryResult::Rows {
202 columns: proj_columns,
203 rows,
204 });
205 }
206 }
207
208 if let PlanNode::Limit {
213 input: inner,
214 count: limit_expr,
215 } = input.as_ref()
216 {
217 if let PlanNode::Sort {
218 input: sort_input,
219 keys,
220 } = inner.as_ref()
221 {
222 if keys.len() == 1 {
224 let sort_field = &keys[0].field;
225 let descending = keys[0].descending;
226 let limit = match limit_expr {
227 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
228 _ => usize::MAX,
229 };
230 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
231 match sort_input.as_ref() {
232 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
233 PlanNode::Filter {
234 input: fi,
235 predicate,
236 } => {
237 if let PlanNode::SeqScan { table } = fi.as_ref() {
238 (Some(table.as_str()), Some(predicate))
239 } else {
240 (None, None)
241 }
242 }
243 _ => (None, None),
244 };
245 if let Some(table) = table_opt {
246 if let Some(result) = self.project_filter_sort_limit_fast(
247 table, fields, sort_field, descending, limit, pred_opt,
248 )? {
249 return Ok(result);
250 }
251 }
252 }
253 }
254 if let PlanNode::Filter {
257 input: fi,
258 predicate,
259 } = inner.as_ref()
260 {
261 if let PlanNode::SeqScan { table } = fi.as_ref() {
262 let limit = match limit_expr {
263 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
264 _ => usize::MAX,
265 };
266 if let Some(result) = self.project_filter_limit_fast(
267 table,
268 fields,
269 limit,
270 Some(predicate),
271 )? {
272 return Ok(result);
273 }
274 }
275 }
276 if let PlanNode::SeqScan { table } = inner.as_ref() {
278 let limit = match limit_expr {
279 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
280 _ => usize::MAX,
281 };
282 if let Some(result) =
283 self.project_filter_limit_fast(table, fields, limit, None)?
284 {
285 return Ok(result);
286 }
287 }
288 }
289
290 if let PlanNode::Filter {
301 input: fi,
302 predicate,
303 } = input.as_ref()
304 {
305 if let PlanNode::SeqScan { table } = fi.as_ref() {
306 if let Some(result) = self.project_filter_limit_fast(
307 table,
308 fields,
309 usize::MAX,
310 Some(predicate),
311 )? {
312 return Ok(result);
313 }
314 }
315 }
316
317 if let PlanNode::SeqScan { table } = input.as_ref() {
321 if let Some(result) =
322 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
323 {
324 return Ok(result);
325 }
326 }
327
328 let result = self.execute_plan(input)?;
329 match result {
330 QueryResult::Rows { columns, rows } => {
331 let proj_columns: Vec<String> = fields
332 .iter()
333 .map(|f| {
334 f.alias.clone().unwrap_or_else(|| match &f.expr {
335 Expr::Field(name) => name.clone(),
336 Expr::QualifiedField { qualifier, field } => {
340 format!("{qualifier}.{field}")
341 }
342 _ => "?".into(),
343 })
344 })
345 .collect();
346 let proj_rows: Vec<Vec<Value>> = rows
347 .iter()
348 .map(|row| {
349 fields
350 .iter()
351 .map(|f| eval_expr(&f.expr, row, &columns))
352 .collect()
353 })
354 .collect();
355 Ok(QueryResult::Rows {
356 columns: proj_columns,
357 rows: proj_rows,
358 })
359 }
360 _ => Err("project requires row input".into()),
361 }
362 }
363
364 PlanNode::Sort { input, keys } => {
365 let result = self.execute_plan(input)?;
366 match result {
367 QueryResult::Rows { columns, mut rows } => {
368 if rows.len() > MAX_SORT_ROWS {
369 return Err(QueryError::SortLimitExceeded);
370 }
371 let key_indices: Vec<(usize, bool)> = keys
372 .iter()
373 .map(|k| {
374 columns
375 .iter()
376 .position(|c| c == &k.field)
377 .map(|idx| (idx, k.descending))
378 .ok_or_else(|| QueryError::ColumnNotFound {
379 table: String::new(),
380 column: k.field.clone(),
381 })
382 })
383 .collect::<Result<_, QueryError>>()?;
384 rows.sort_by(|a, b| {
385 for &(col_idx, descending) in &key_indices {
386 let cmp = a[col_idx].cmp(&b[col_idx]);
387 let cmp = if descending { cmp.reverse() } else { cmp };
388 if cmp != std::cmp::Ordering::Equal {
389 return cmp;
390 }
391 }
392 std::cmp::Ordering::Equal
393 });
394 Ok(QueryResult::Rows { columns, rows })
395 }
396 _ => Err("sort requires row input".into()),
397 }
398 }
399
400 PlanNode::Limit { input, count } => {
401 let result = self.execute_plan(input)?;
402 let n = match count {
403 Expr::Literal(Literal::Int(v)) => *v as usize,
404 _ => return Err("limit must be integer literal".into()),
405 };
406 match result {
407 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
408 columns,
409 rows: rows.into_iter().take(n).collect(),
410 }),
411 _ => Err("limit requires row input".into()),
412 }
413 }
414
415 PlanNode::Offset { input, count } => {
416 let result = self.execute_plan(input)?;
417 let n = match count {
418 Expr::Literal(Literal::Int(v)) => *v as usize,
419 _ => return Err("offset must be integer literal".into()),
420 };
421 match result {
422 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
423 columns,
424 rows: rows.into_iter().skip(n).collect(),
425 }),
426 _ => Err("offset requires row input".into()),
427 }
428 }
429
430 PlanNode::Aggregate {
431 input,
432 function,
433 field,
434 } => {
435 if *function == AggFunc::Count {
437 if let PlanNode::SeqScan { table } = input.as_ref() {
438 let mut count: i64 = 0;
439 self.catalog
440 .for_each_row_raw(table, |_rid, _data| {
441 count += 1;
442 })
443 .map_err(|e| QueryError::StorageError(e.to_string()))?;
444 return Ok(QueryResult::Scalar(Value::Int(count)));
445 }
446 if let PlanNode::Filter {
449 input: inner,
450 predicate,
451 } = input.as_ref()
452 {
453 if let PlanNode::SeqScan { table } = inner.as_ref() {
454 let schema = self
455 .catalog
456 .schema(table)
457 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
458 .clone();
459 let columns: Vec<String> =
460 schema.columns.iter().map(|c| c.name.clone()).collect();
461 let fast = FastLayout::new(&schema);
462 let row_layout = RowLayout::new(&schema);
463
464 if let Some(compiled) =
467 compile_predicate(predicate, &columns, &fast, &schema)
468 {
469 let mut count: i64 = 0;
470 self.catalog
471 .for_each_row_raw(table, |_rid, data| {
472 if compiled(data) {
473 count += 1;
474 }
475 })
476 .map_err(|e| QueryError::StorageError(e.to_string()))?;
477 return Ok(QueryResult::Scalar(Value::Int(count)));
478 }
479
480 let pred_cols = predicate_column_indices(predicate, &columns);
482 let mut count: i64 = 0;
483 self.catalog
484 .for_each_row_raw(table, |_rid, data| {
485 let pred_row =
486 decode_selective(&schema, &row_layout, data, &pred_cols);
487 if eval_predicate(predicate, &pred_row, &columns) {
488 count += 1;
489 }
490 })
491 .map_err(|e| QueryError::StorageError(e.to_string()))?;
492
493 return Ok(QueryResult::Scalar(Value::Int(count)));
494 }
495 }
496 }
497
498 if matches!(
502 function,
503 AggFunc::Sum
504 | AggFunc::Avg
505 | AggFunc::Min
506 | AggFunc::Max
507 | AggFunc::CountDistinct
508 ) {
509 if let Some(col) = field.as_ref() {
510 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
512 match input.as_ref() {
513 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
514 PlanNode::Filter {
515 input: inner,
516 predicate,
517 } => {
518 if let PlanNode::SeqScan { table } = inner.as_ref() {
519 (Some(table.as_str()), Some(predicate))
520 } else {
521 (None, None)
522 }
523 }
524 _ => (None, None),
525 };
526 if let Some(table) = table_opt {
527 if let Some(result) =
528 self.agg_single_col_fast(table, col, *function, pred_opt)?
529 {
530 return Ok(result);
531 }
532 }
533 }
534 }
535
536 let result = self.execute_plan(input)?;
541 match result {
542 QueryResult::Rows { columns, rows } => {
543 match function {
544 AggFunc::Count => {
545 Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
546 }
547 AggFunc::CountDistinct => {
548 let col = field.as_ref().ok_or("count distinct requires field")?;
549 let idx = columns
550 .iter()
551 .position(|c| c == col)
552 .ok_or("col not found")?;
553 let mut seen = std::collections::HashSet::new();
554 for row in &rows {
555 let v = &row[idx];
556 if !v.is_empty() {
557 seen.insert(v.clone());
558 }
559 }
560 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
561 }
562 AggFunc::Avg => {
563 let col = field.as_ref().ok_or("avg requires field")?;
564 let idx = columns
565 .iter()
566 .position(|c| c == col)
567 .ok_or("col not found")?;
568 let sum: f64 = rows
569 .iter()
570 .filter_map(|r| match &r[idx] {
571 Value::Int(v) => Some(*v as f64),
572 Value::Float(v) => Some(*v),
573 _ => None,
574 })
575 .sum();
576 let count = rows.len() as f64;
577 Ok(QueryResult::Scalar(Value::Float(sum / count)))
578 }
579 AggFunc::Sum => {
580 let col = field.as_ref().ok_or("sum requires field")?;
581 let idx = columns
582 .iter()
583 .position(|c| c == col)
584 .ok_or("col not found")?;
585 let mut int_sum: i64 = 0;
591 let mut float_sum: f64 = 0.0;
592 let mut saw_float = false;
593 for r in &rows {
594 match &r[idx] {
595 Value::Int(v) => int_sum += *v,
596 Value::Float(v) => {
597 float_sum += *v;
598 saw_float = true;
599 }
600 _ => {}
601 }
602 }
603 let result = if saw_float {
604 Value::Float(float_sum + int_sum as f64)
605 } else {
606 Value::Int(int_sum)
607 };
608 Ok(QueryResult::Scalar(result))
609 }
610 AggFunc::Min | AggFunc::Max => {
611 let col = field.as_ref().ok_or("min/max requires field")?;
612 let idx = columns
613 .iter()
614 .position(|c| c == col)
615 .ok_or("col not found")?;
616 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
617 let result = if *function == AggFunc::Min {
618 vals.into_iter().min().cloned()
619 } else {
620 vals.into_iter().max().cloned()
621 };
622 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
623 }
624 }
625 }
626 _ => Err("aggregate requires row input".into()),
627 }
628 }
629
630 PlanNode::Insert { table, assignments } => {
631 let values = {
638 let schema = self
639 .catalog
640 .schema(table)
641 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
642 let mut values = vec![Value::Empty; schema.columns.len()];
643 for a in assignments {
644 let idx = schema.column_index(&a.field).ok_or_else(|| {
645 QueryError::ColumnNotFound {
646 table: String::new(),
647 column: a.field.clone(),
648 }
649 })?;
650 values[idx] = literal_to_value(&a.value)?;
651 }
652 values
653 };
654 self.catalog
655 .insert(table, &values)
656 .map_err(|e| QueryError::StorageError(e.to_string()))?;
657 self.view_registry.mark_dependents_dirty(table);
658 Ok(QueryResult::Modified(1))
659 }
660
661 PlanNode::Upsert {
662 table,
663 key_column,
664 assignments,
665 on_conflict,
666 } => {
667 let (values, key_idx) = {
669 let schema = self
670 .catalog
671 .schema(table)
672 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
673 let mut values = vec![Value::Empty; schema.columns.len()];
674 for a in assignments {
675 let idx = schema.column_index(&a.field).ok_or_else(|| {
676 QueryError::ColumnNotFound {
677 table: String::new(),
678 column: a.field.clone(),
679 }
680 })?;
681 values[idx] = literal_to_value(&a.value)?;
682 }
683 let key_idx = schema
684 .column_index(key_column)
685 .ok_or_else(|| format!("key column '{key_column}' not found"))?;
686 (values, key_idx)
687 };
688
689 let key_value = values[key_idx].clone();
690
691 let existing = {
693 let tbl = self
694 .catalog
695 .get_table(table)
696 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
697 if let Some(btree) = tbl.index(key_column) {
698 let hit = match &key_value {
699 Value::Int(k) => btree.lookup_int(*k),
700 other => btree.lookup(other),
701 };
702 hit.and_then(|rid| {
703 tbl.heap
704 .get(rid)
705 .map(|data| (rid, decode_row(&tbl.schema, &data)))
706 })
707 } else {
708 let mut found = None;
710 for (rid, row) in tbl.scan() {
711 if row[key_idx] == key_value {
712 found = Some((rid, row));
713 break;
714 }
715 }
716 found
717 }
718 };
719
720 if let Some((rid, mut existing_row)) = existing {
721 let update_assignments = if on_conflict.is_empty() {
723 assignments
724 } else {
725 on_conflict
726 };
727 let changed_cols: Vec<usize> = {
728 let schema = self
729 .catalog
730 .schema(table)
731 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
732 let mut indices = Vec::new();
733 for a in update_assignments {
734 let idx = schema.column_index(&a.field).ok_or_else(|| {
735 QueryError::ColumnNotFound {
736 table: String::new(),
737 column: a.field.clone(),
738 }
739 })?;
740 if idx != key_idx {
741 existing_row[idx] = literal_to_value(&a.value)?;
742 indices.push(idx);
743 }
744 }
745 indices
746 };
747 self.catalog
748 .update_hinted(table, rid, &existing_row, Some(&changed_cols))
749 .map_err(|e| QueryError::StorageError(e.to_string()))?;
750 self.view_registry.mark_dependents_dirty(table);
751 Ok(QueryResult::Modified(1))
752 } else {
753 self.catalog
755 .insert(table, &values)
756 .map_err(|e| QueryError::StorageError(e.to_string()))?;
757 self.view_registry.mark_dependents_dirty(table);
758 Ok(QueryResult::Modified(1))
759 }
760 }
761
762 PlanNode::Update {
763 input,
764 table,
765 assignments,
766 } => {
767 let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
773 let schema_ref = self
774 .catalog
775 .schema(table)
776 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
777 let indices: Vec<usize> = assignments
778 .iter()
779 .map(|a| {
780 schema_ref.column_index(&a.field).ok_or_else(|| {
781 QueryError::ColumnNotFound {
782 table: String::new(),
783 column: a.field.clone(),
784 }
785 })
786 })
787 .collect::<Result<_, _>>()?;
788 let vals: Result<Vec<Value>, _> = assignments
789 .iter()
790 .map(|a| literal_to_value(&a.value))
791 .collect();
792 (indices, vals.ok())
793 };
794 let resolved_assignments: Option<Vec<(usize, Value)>> =
795 literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
796
797 let changed_cols: Vec<usize> = col_indices.clone();
800
801 if let Some(ref resolved_assignments) = resolved_assignments {
808 if let PlanNode::Filter {
809 input: inner,
810 predicate,
811 } = input.as_ref()
812 {
813 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
814 if t == table {
815 let fused_result = self.try_fused_scan_update(
816 table,
817 predicate,
818 resolved_assignments,
819 &changed_cols,
820 );
821 if let Some(result) = fused_result {
822 return result;
823 }
824 }
825 }
826 }
827 }
828
829 let matching_rids = self.collect_rids_for_mutation(input, table)?;
831
832 if let Some(ref resolved_assignments) = resolved_assignments {
834 let fast_patch: Option<Vec<FastPatch>> = {
840 let tbl = self
841 .catalog
842 .get_table(table)
843 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
844 let schema = &tbl.schema;
845 let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
846 is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
847 });
848 let no_indexed = !resolved_assignments
849 .iter()
850 .any(|(idx, _)| tbl.has_indexed_col(*idx));
851
852 if all_fixed_nonnull && no_indexed {
853 let layout = RowLayout::new(schema);
854 let bitmap_size = layout.bitmap_size();
855 let patches: Vec<FastPatch> = resolved_assignments
856 .iter()
857 .map(|(idx, val)| {
858 let fixed_off = layout
859 .fixed_offset(*idx)
860 .expect("is_fixed_size already checked");
861 let field_off = 2 + bitmap_size + fixed_off;
862 let bytes: FixedBytes = match val {
863 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
864 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
865 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
866 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
867 Value::Uuid(v) => FixedBytes::Uuid(*v),
868 _ => unreachable!("all_fixed_nonnull guard lied"),
869 };
870 FastPatch {
871 field_off,
872 bitmap_byte_off: 2 + idx / 8,
873 bit_mask: 1u8 << (idx % 8),
874 bytes,
875 }
876 })
877 .collect();
878 Some(patches)
879 } else {
880 None
881 }
882 };
883
884 if let Some(patches) = fast_patch {
885 let mut count = 0u64;
886 for rid in matching_rids {
887 let ok = self
892 .catalog
893 .update_row_bytes_logged(table, rid, |row| {
894 for p in &patches {
895 row[p.bitmap_byte_off] &= !p.bit_mask;
896 let field_bytes = p.bytes.as_slice();
897 row[p.field_off..p.field_off + field_bytes.len()]
898 .copy_from_slice(field_bytes);
899 }
900 })
901 .map_err(|e| QueryError::StorageError(e.to_string()))?;
902 if ok {
903 count += 1;
904 }
905 }
906 self.view_registry.mark_dependents_dirty(table);
907 return Ok(QueryResult::Modified(count));
908 }
909
910 let var_fast: Option<(usize, Option<Vec<u8>>)> = {
912 let tbl = self
913 .catalog
914 .get_table(table)
915 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
916 let schema = &tbl.schema;
917 let is_single = resolved_assignments.len() == 1;
918 let is_var_col = is_single
919 && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
920 let no_indexed = !resolved_assignments
921 .iter()
922 .any(|(idx, _)| tbl.has_indexed_col(*idx));
923
924 if is_single && is_var_col && no_indexed {
925 let (idx, val) = &resolved_assignments[0];
926 let bytes_opt: Option<Vec<u8>> = match val {
927 Value::Str(s) => Some(s.as_bytes().to_vec()),
928 Value::Bytes(b) => Some(b.clone()),
929 Value::Empty => None,
930 _ => {
931 return Err(QueryError::TypeError(format!(
932 "cannot assign non-var value to var column '{}'",
933 schema.columns[*idx].name
934 )))
935 }
936 };
937 Some((*idx, bytes_opt))
938 } else {
939 None
940 }
941 };
942
943 if let Some((col_idx, new_bytes_opt)) = var_fast {
944 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
945 let mut count = 0u64;
946 let mut fallback_rids: Vec<RowId> = Vec::new();
947 for rid in &matching_rids {
948 let ok = self
954 .catalog
955 .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
956 .map_err(|e| QueryError::StorageError(e.to_string()))?;
957 if ok {
958 count += 1;
959 } else {
960 fallback_rids.push(*rid);
961 }
962 }
963 for rid in fallback_rids {
964 let mut row = match self.catalog.get(table, rid) {
965 Some(r) => r,
966 None => continue,
967 };
968 for (idx, val) in resolved_assignments.iter() {
969 row[*idx] = val.clone();
970 }
971 self.catalog
972 .update_hinted(table, rid, &row, Some(&changed_cols))
973 .map_err(|e| QueryError::StorageError(e.to_string()))?;
974 count += 1;
975 }
976 self.view_registry.mark_dependents_dirty(table);
977 return Ok(QueryResult::Modified(count));
978 }
979
980 let mut count = 0u64;
982 for rid in matching_rids {
983 let mut row = match self.catalog.get(table, rid) {
984 Some(r) => r,
985 None => continue,
986 };
987 for (idx, val) in resolved_assignments.iter() {
988 row[*idx] = val.clone();
989 }
990 self.catalog
991 .update_hinted(table, rid, &row, Some(&changed_cols))
992 .map_err(|e| QueryError::StorageError(e.to_string()))?;
993 count += 1;
994 }
995 self.view_registry.mark_dependents_dirty(table);
996 return Ok(QueryResult::Modified(count));
997 } let col_names: Vec<String> = {
1003 let schema_ref = self
1004 .catalog
1005 .schema(table)
1006 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1007 schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1008 };
1009 let mut count = 0u64;
1010 for rid in matching_rids {
1011 let mut row = match self.catalog.get(table, rid) {
1012 Some(r) => r,
1013 None => continue,
1014 };
1015 for (i, asgn) in assignments.iter().enumerate() {
1016 let val = eval_expr(&asgn.value, &row, &col_names);
1017 row[col_indices[i]] = val;
1018 }
1019 self.catalog
1020 .update_hinted(table, rid, &row, Some(&changed_cols))
1021 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1022 count += 1;
1023 }
1024 self.view_registry.mark_dependents_dirty(table);
1025 Ok(QueryResult::Modified(count))
1026 }
1027
1028 PlanNode::Delete { input, table } => {
1029 if let PlanNode::Filter {
1050 input: inner,
1051 predicate,
1052 } = input.as_ref()
1053 {
1054 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1055 if t == table {
1056 let schema = self
1057 .catalog
1058 .schema(table)
1059 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1060 let columns: Vec<String> =
1061 schema.columns.iter().map(|c| c.name.clone()).collect();
1062 let fast = FastLayout::new(schema);
1063 if let Some(compiled) =
1064 compile_predicate(predicate, &columns, &fast, schema)
1065 {
1066 let count = self
1072 .catalog
1073 .scan_delete_matching_logged(table, |data| compiled(data))
1074 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1075 self.view_registry.mark_dependents_dirty(table);
1076 return Ok(QueryResult::Modified(count));
1077 }
1078 }
1079 }
1080 } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1081 if t == table {
1082 let count = self
1086 .catalog
1087 .scan_delete_matching_logged(table, |_| true)
1088 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1089 self.view_registry.mark_dependents_dirty(table);
1090 return Ok(QueryResult::Modified(count));
1091 }
1092 }
1093
1094 let matching_rids = self.collect_rids_for_mutation(input, table)?;
1095 let count = self
1096 .catalog
1097 .delete_many(table, &matching_rids)
1098 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1099 self.view_registry.mark_dependents_dirty(table);
1100 Ok(QueryResult::Modified(count))
1101 }
1102
1103 PlanNode::AliasScan { table, alias } => {
1104 let schema = self
1114 .catalog
1115 .schema(table)
1116 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1117 .clone();
1118 let columns: Vec<String> = schema
1119 .columns
1120 .iter()
1121 .map(|c| format!("{alias}.{}", c.name))
1122 .collect();
1123 let rows: Vec<Vec<Value>> = self
1124 .catalog
1125 .scan(table)
1126 .map_err(|e| QueryError::StorageError(e.to_string()))?
1127 .map(|(_, row)| row)
1128 .collect();
1129 Ok(QueryResult::Rows { columns, rows })
1130 }
1131
1132 PlanNode::NestedLoopJoin {
1133 left,
1134 right,
1135 on,
1136 kind,
1137 } => {
1138 let left_result = self.execute_plan(left)?;
1149 let right_result = self.execute_plan(right)?;
1150 let (left_columns, left_rows) = match left_result {
1151 QueryResult::Rows { columns, rows } => (columns, rows),
1152 _ => return Err("join left side must produce rows".into()),
1153 };
1154 let (right_columns, right_rows) = match right_result {
1155 QueryResult::Rows { columns, rows } => (columns, rows),
1156 _ => return Err("join right side must produce rows".into()),
1157 };
1158
1159 if !matches!(kind, JoinKind::Cross) {
1161 if let Some(pred) = on {
1162 if let Some((l_idx, r_idx)) =
1163 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1164 {
1165 let result = hash_join(
1166 left_columns,
1167 left_rows,
1168 right_columns,
1169 right_rows,
1170 l_idx,
1171 r_idx,
1172 *kind,
1173 );
1174 if let QueryResult::Rows { ref rows, .. } = result {
1175 check_join_limit(rows.len())?;
1176 }
1177 return Ok(result);
1178 }
1179 }
1180 }
1181
1182 let n_left = left_columns.len();
1184 let n_right = right_columns.len();
1185 let mut columns = Vec::with_capacity(n_left + n_right);
1186 columns.extend(left_columns);
1187 columns.extend(right_columns);
1188
1189 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1190 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1191
1192 for left_row in &left_rows {
1193 let mut matched = false;
1194 for right_row in &right_rows {
1195 combined.clear();
1196 combined.extend_from_slice(left_row);
1197 combined.extend_from_slice(right_row);
1198 let keep = match kind {
1199 JoinKind::Cross => true,
1200 JoinKind::Inner | JoinKind::LeftOuter => match on {
1201 Some(pred) => eval_predicate(pred, &combined, &columns),
1202 None => true,
1206 },
1207 JoinKind::RightOuter => {
1210 unreachable!("planner rewrites RightOuter to LeftOuter")
1211 }
1212 };
1213 if keep {
1214 rows.push(combined.clone());
1215 check_join_limit(rows.len())?;
1216 matched = true;
1217 }
1218 }
1219 if !matched && matches!(kind, JoinKind::LeftOuter) {
1220 let mut row = Vec::with_capacity(n_left + n_right);
1221 row.extend_from_slice(left_row);
1222 row.resize(n_left + n_right, Value::Empty);
1223 rows.push(row);
1224 check_join_limit(rows.len())?;
1225 }
1226 }
1227
1228 Ok(QueryResult::Rows { columns, rows })
1229 }
1230
1231 PlanNode::Distinct { input } => {
1232 let result = self.execute_plan(input)?;
1233 match result {
1234 QueryResult::Rows { columns, rows } => {
1235 let mut seen = std::collections::HashSet::new();
1236 let mut unique_rows = Vec::new();
1237 for row in rows {
1238 if seen.insert(row.clone()) {
1239 unique_rows.push(row);
1240 }
1241 }
1242 Ok(QueryResult::Rows {
1243 columns,
1244 rows: unique_rows,
1245 })
1246 }
1247 other => Ok(other),
1248 }
1249 }
1250
1251 PlanNode::GroupBy {
1252 input,
1253 keys,
1254 aggregates,
1255 having,
1256 } => {
1257 let result = self.execute_plan(input)?;
1258 match result {
1259 QueryResult::Rows { columns, rows } => {
1260 let key_indices: Vec<usize> = keys
1262 .iter()
1263 .map(|k| {
1264 columns
1265 .iter()
1266 .position(|c| c == k)
1267 .ok_or_else(|| format!("group-by column '{k}' not found"))
1268 })
1269 .collect::<Result<Vec<_>, _>>()?;
1270
1271 let agg_field_indices: Vec<usize> = aggregates
1275 .iter()
1276 .map(|a| {
1277 if a.field == "*" {
1278 Ok(usize::MAX)
1279 } else {
1280 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1281 format!("aggregate column '{}' not found", a.field)
1282 })
1283 }
1284 })
1285 .collect::<Result<Vec<_>, _>>()?;
1286
1287 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1289 rustc_hash::FxHashMap::default();
1290 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1291 for (ri, row) in rows.iter().enumerate() {
1292 let key: Vec<Value> =
1293 key_indices.iter().map(|&i| row[i].clone()).collect();
1294 match group_map.get(&key) {
1295 Some(&idx) => groups[idx].1.push(ri),
1296 None => {
1297 let idx = groups.len();
1298 group_map.insert(key.clone(), idx);
1299 groups.push((key, vec![ri]));
1300 }
1301 }
1302 }
1303
1304 let mut out_columns: Vec<String> = keys.clone();
1306 for agg in aggregates.iter() {
1307 out_columns.push(agg.output_name.clone());
1308 }
1309
1310 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1312 for (key_vals, row_indices) in &groups {
1313 let mut row = key_vals.clone();
1314 for (ai, agg) in aggregates.iter().enumerate() {
1315 let col_idx = agg_field_indices[ai];
1316 let val = compute_group_aggregate(
1317 agg.function,
1318 &rows,
1319 row_indices,
1320 col_idx,
1321 );
1322 row.push(val);
1323 }
1324 out_rows.push(row);
1325 }
1326
1327 if let Some(having_expr) = having {
1329 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1330 }
1331
1332 Ok(QueryResult::Rows {
1333 columns: out_columns,
1334 rows: out_rows,
1335 })
1336 }
1337 _ => Err("group by requires row input".into()),
1338 }
1339 }
1340
1341 PlanNode::CreateTable { name, fields } => {
1342 let columns: Vec<ColumnDef> = fields
1343 .iter()
1344 .enumerate()
1345 .map(|(i, (fname, tname, req))| ColumnDef {
1346 name: fname.clone(),
1347 type_id: type_name_to_id(tname),
1348 required: *req,
1349 position: i as u16,
1350 })
1351 .collect();
1352 let schema = Schema {
1353 table_name: name.clone(),
1354 columns,
1355 };
1356 self.catalog
1357 .create_table(schema)
1358 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1359 Ok(QueryResult::Created(name.clone()))
1360 }
1361
1362 PlanNode::AlterTable { table, action } => match action {
1363 AlterAction::AddColumn {
1364 name,
1365 type_name,
1366 required,
1367 } => {
1368 let position = self
1369 .catalog
1370 .schema(table)
1371 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1372 .columns
1373 .len() as u16;
1374 let col = ColumnDef {
1375 name: name.clone(),
1376 type_id: type_name_to_id(type_name),
1377 required: *required,
1378 position,
1379 };
1380 self.catalog
1381 .alter_table_add_column(table, col)
1382 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1383 Ok(QueryResult::Executed {
1384 message: format!("column '{name}' added to '{table}'"),
1385 })
1386 }
1387 AlterAction::DropColumn { name } => {
1388 self.catalog
1389 .alter_table_drop_column(table, name)
1390 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1391 Ok(QueryResult::Executed {
1392 message: format!("column '{name}' dropped from '{table}'"),
1393 })
1394 }
1395 AlterAction::AddIndex { column } => {
1396 self.catalog
1397 .create_index(table, column)
1398 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1399 Ok(QueryResult::Executed {
1400 message: format!("index on '{table}.{column}' created"),
1401 })
1402 }
1403 },
1404
1405 PlanNode::DropTable { name } => {
1406 self.catalog
1407 .drop_table(name)
1408 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1409 Ok(QueryResult::Executed {
1410 message: format!("table '{name}' dropped"),
1411 })
1412 }
1413
1414 PlanNode::CreateView { name, query_text } => {
1415 self.create_view(name, query_text)?;
1416 Ok(QueryResult::Executed {
1417 message: format!("materialized view '{name}' created"),
1418 })
1419 }
1420
1421 PlanNode::RefreshView { name } => {
1422 self.refresh_view(name)?;
1423 Ok(QueryResult::Executed {
1424 message: format!("materialized view '{name}' refreshed"),
1425 })
1426 }
1427
1428 PlanNode::DropView { name } => {
1429 self.drop_view(name)?;
1430 Ok(QueryResult::Executed {
1431 message: format!("materialized view '{name}' dropped"),
1432 })
1433 }
1434
1435 PlanNode::Window { input, windows } => {
1436 let result = self.execute_plan(input)?;
1437 execute_window(result, windows)
1438 }
1439
1440 PlanNode::Union { left, right, all } => {
1441 let left_result = self.execute_plan(left)?;
1442 let right_result = self.execute_plan(right)?;
1443 let (left_cols, left_rows) = match left_result {
1444 QueryResult::Rows { columns, rows } => (columns, rows),
1445 _ => return Err("UNION requires query results on left side".into()),
1446 };
1447 let (_, right_rows) = match right_result {
1448 QueryResult::Rows { columns, rows } => (columns, rows),
1449 _ => return Err("UNION requires query results on right side".into()),
1450 };
1451 let mut combined = left_rows;
1452 if *all {
1453 combined.extend(right_rows);
1455 } else {
1456 let mut seen = std::collections::HashSet::new();
1459 for row in &combined {
1460 seen.insert(row.clone());
1461 }
1462 for row in right_rows {
1463 if seen.insert(row.clone()) {
1464 combined.push(row);
1465 }
1466 }
1467 }
1468 Ok(QueryResult::Rows {
1469 columns: left_cols,
1470 rows: combined,
1471 })
1472 }
1473
1474 PlanNode::Explain { input } => {
1475 let text = format_plan_tree(input, 0);
1476 Ok(QueryResult::Rows {
1477 columns: vec!["plan".to_string()],
1478 rows: text
1479 .lines()
1480 .map(|line| vec![Value::Str(line.to_string())])
1481 .collect(),
1482 })
1483 }
1484
1485 PlanNode::IndexScan { table, column, key } => {
1486 let key_value = literal_to_value(key)?;
1487 let tbl = self
1488 .catalog
1489 .get_table(table)
1490 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1491 let columns: Vec<String> =
1492 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1493
1494 if let Some(btree) = tbl.index(column) {
1504 let hit = match &key_value {
1505 Value::Int(k) => btree.lookup_int(*k),
1506 other => btree.lookup(other),
1507 };
1508 let rows = match hit {
1509 Some(rid) => match tbl.heap.get(rid) {
1510 Some(data) => vec![decode_row(&tbl.schema, &data)],
1511 None => Vec::new(),
1512 },
1513 None => Vec::new(),
1514 };
1515 return Ok(QueryResult::Rows { columns, rows });
1516 }
1517
1518 let schema = &tbl.schema;
1526 let fast = FastLayout::new(schema);
1527 let synth_pred = Expr::BinaryOp(
1528 Box::new(Expr::Field(column.clone())),
1529 BinOp::Eq,
1530 Box::new(key.clone()),
1531 );
1532 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1533 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1535 self.catalog
1536 .for_each_row_raw(table, |_rid, data| {
1537 if compiled(data) {
1538 rows.push(decode_row(schema, data));
1539 }
1540 })
1541 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1542 return Ok(QueryResult::Rows { columns, rows });
1543 }
1544
1545 let col_idx =
1547 schema
1548 .column_index(column)
1549 .ok_or_else(|| QueryError::ColumnNotFound {
1550 table: String::new(),
1551 column: column.clone(),
1552 })?;
1553 let rows: Vec<Vec<Value>> = tbl
1554 .scan()
1555 .filter_map(|(_, row)| {
1556 if row[col_idx] == key_value {
1557 Some(row)
1558 } else {
1559 None
1560 }
1561 })
1562 .collect();
1563 Ok(QueryResult::Rows { columns, rows })
1564 }
1565
1566 PlanNode::RangeScan {
1567 table,
1568 column,
1569 start,
1570 end,
1571 } => {
1572 let tbl = self
1573 .catalog
1574 .get_table(table)
1575 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1576 let columns: Vec<String> =
1577 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1578 let schema = &tbl.schema;
1579
1580 let start_val = match start {
1581 Some((expr, _)) => Some(literal_to_value(expr)?),
1582 None => None,
1583 };
1584 let end_val = match end {
1585 Some((expr, _)) => Some(literal_to_value(expr)?),
1586 None => None,
1587 };
1588 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1589 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1590
1591 if let Some(btree) = tbl.index(column) {
1592 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1593 (Some(s), Some(e)) => btree.range(s, e).collect(),
1594 (Some(s), None) => btree.range_from(s),
1595 (None, Some(e)) => btree.range_to(e),
1596 (None, None) => {
1597 let rows: Vec<Vec<Value>> = tbl.scan().map(|(_, row)| row).collect();
1598 return Ok(QueryResult::Rows { columns, rows });
1599 }
1600 };
1601 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1602 for (key, rid) in hits {
1603 if !start_inclusive {
1604 if let Some(ref s) = start_val {
1605 if &key == s {
1606 continue;
1607 }
1608 }
1609 }
1610 if !end_inclusive {
1611 if let Some(ref e) = end_val {
1612 if &key == e {
1613 continue;
1614 }
1615 }
1616 }
1617 if let Some(data) = tbl.heap.get(rid) {
1618 rows.push(decode_row(schema, &data));
1619 }
1620 }
1621 return Ok(QueryResult::Rows { columns, rows });
1622 }
1623
1624 let fast = FastLayout::new(schema);
1626 let synth = synthesize_range_predicate(column, start, end);
1627 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1628 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1629 self.catalog
1630 .for_each_row_raw(table, |_rid, data| {
1631 if compiled(data) {
1632 rows.push(decode_row(schema, data));
1633 }
1634 })
1635 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1636 return Ok(QueryResult::Rows { columns, rows });
1637 }
1638
1639 let col_idx =
1640 schema
1641 .column_index(column)
1642 .ok_or_else(|| QueryError::ColumnNotFound {
1643 table: String::new(),
1644 column: column.clone(),
1645 })?;
1646 let rows: Vec<Vec<Value>> = tbl
1647 .scan()
1648 .filter(|(_, row)| {
1649 range_matches(
1650 &row[col_idx],
1651 &start_val,
1652 start_inclusive,
1653 &end_val,
1654 end_inclusive,
1655 )
1656 })
1657 .map(|(_, row)| row)
1658 .collect();
1659 Ok(QueryResult::Rows { columns, rows })
1660 }
1661 }
1662 }
1663
1664 fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1669 if self.view_registry.is_view(name) {
1670 return Err(QueryError::ViewError(format!(
1671 "materialized view '{name}' already exists"
1672 )));
1673 }
1674 let result = self.execute_powql(query_text)?;
1676 let (columns, rows) = match result {
1677 QueryResult::Rows { columns, rows } => (columns, rows),
1678 _ => return Err("view source query must be a SELECT".into()),
1679 };
1680 let schema = self.derive_view_schema(name, &columns, &rows);
1682 self.catalog
1684 .create_table(schema)
1685 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1686 for row in &rows {
1687 self.catalog
1688 .insert(name, row)
1689 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1690 }
1691 let depends_on = self.extract_view_deps(query_text);
1693 self.view_registry
1694 .register(ViewDef {
1695 name: name.to_string(),
1696 query: query_text.to_string(),
1697 depends_on,
1698 dirty: false,
1699 })
1700 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1701 Ok(())
1702 }
1703
1704 fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1707 let def = self
1708 .view_registry
1709 .get(name)
1710 .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1711 let query_text = def.query.clone();
1712 let result = self.execute_powql(&query_text)?;
1714 let (_columns, rows) = match result {
1715 QueryResult::Rows { columns, rows } => (columns, rows),
1716 _ => return Err("view source query must be a SELECT".into()),
1717 };
1718 self.catalog
1722 .scan_delete_matching_logged(name, |_| true)
1723 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1724 for row in &rows {
1725 self.catalog
1726 .insert(name, row)
1727 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1728 }
1729 self.view_registry.mark_clean(name);
1730 Ok(())
1731 }
1732
1733 fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1735 if !self.view_registry.is_view(name) {
1736 return Err(QueryError::ViewError(format!(
1737 "materialized view '{name}' not found"
1738 )));
1739 }
1740 self.view_registry
1741 .unregister(name)
1742 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1743 self.catalog
1744 .drop_table(name)
1745 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1746 Ok(())
1747 }
1748
1749 fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1752 use powdb_storage::types::{ColumnDef, TypeId};
1753 let cols: Vec<ColumnDef> = columns
1754 .iter()
1755 .enumerate()
1756 .map(|(i, col_name)| {
1757 let type_id = rows
1758 .first()
1759 .and_then(|row| row.get(i))
1760 .map(|v| v.type_id())
1761 .unwrap_or(TypeId::Str);
1762 ColumnDef {
1763 name: col_name.clone(),
1764 type_id,
1765 required: false,
1766 position: i as u16,
1767 }
1768 })
1769 .collect();
1770 Schema {
1771 table_name: name.to_string(),
1772 columns: cols,
1773 }
1774 }
1775
1776 fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1779 use crate::parser::parse;
1780 match parse(query_text) {
1781 Ok(Statement::Query(q)) => {
1782 let mut deps = vec![q.source.clone()];
1783 for j in &q.joins {
1784 deps.push(j.source.clone());
1785 }
1786 deps
1787 }
1788 _ => Vec::new(),
1789 }
1790 }
1791
1792 pub(super) fn agg_single_col_fast(
1802 &self,
1803 table: &str,
1804 col: &str,
1805 function: AggFunc,
1806 predicate: Option<&Expr>,
1807 ) -> Result<Option<QueryResult>, QueryError> {
1808 let schema = self
1809 .catalog
1810 .schema(table)
1811 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1812 .clone();
1813 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1814 let col_idx = match schema.column_index(col) {
1815 Some(i) => i,
1816 None => return Ok(None),
1817 };
1818 let col_type = schema.columns[col_idx].type_id;
1825 if col_type != TypeId::Int && col_type != TypeId::Float {
1826 return Ok(None);
1827 }
1828
1829 let fast = FastLayout::new(&schema);
1830 let byte_offset = match fast.fixed_offsets[col_idx] {
1835 Some(o) => o,
1836 None => return Ok(None),
1837 };
1838 let bitmap_byte = col_idx / 8;
1839 let bitmap_bit = (col_idx % 8) as u32;
1840 let data_offset = 2 + fast.bitmap_size + byte_offset;
1841
1842 let compiled_pred: Option<CompiledPredicate> = match predicate {
1844 Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
1845 Some(c) => Some(c),
1846 None => return Ok(None), },
1848 None => None,
1849 };
1850
1851 let result = match col_type {
1878 TypeId::Int => match function {
1879 AggFunc::Sum | AggFunc::Avg => {
1880 let mut sum_i128: i128 = 0;
1881 let mut count: i64 = 0;
1882 agg_int_loop!(
1883 self,
1884 table,
1885 compiled_pred,
1886 bitmap_byte,
1887 bitmap_bit,
1888 data_offset,
1889 |v: i64| {
1890 count += 1;
1891 sum_i128 += v as i128;
1892 }
1893 );
1894 if matches!(function, AggFunc::Sum) {
1895 let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
1896 QueryResult::Scalar(Value::Int(clamped))
1897 } else if count == 0 {
1898 QueryResult::Scalar(Value::Empty)
1899 } else {
1900 let avg = (sum_i128 as f64) / (count as f64);
1901 QueryResult::Scalar(Value::Float(avg))
1902 }
1903 }
1904 AggFunc::Min => {
1905 let mut min_v: Option<i64> = None;
1906 agg_int_loop!(
1907 self,
1908 table,
1909 compiled_pred,
1910 bitmap_byte,
1911 bitmap_bit,
1912 data_offset,
1913 |v: i64| {
1914 min_v = Some(match min_v {
1915 Some(m) => m.min(v),
1916 None => v,
1917 });
1918 }
1919 );
1920 QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
1921 }
1922 AggFunc::Max => {
1923 let mut max_v: Option<i64> = None;
1924 agg_int_loop!(
1925 self,
1926 table,
1927 compiled_pred,
1928 bitmap_byte,
1929 bitmap_bit,
1930 data_offset,
1931 |v: i64| {
1932 max_v = Some(match max_v {
1933 Some(m) => m.max(v),
1934 None => v,
1935 });
1936 }
1937 );
1938 QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
1939 }
1940 AggFunc::Count => {
1941 let mut count: i64 = 0;
1942 agg_int_loop!(
1943 self,
1944 table,
1945 compiled_pred,
1946 bitmap_byte,
1947 bitmap_bit,
1948 data_offset,
1949 |_v: i64| {
1950 count += 1;
1951 }
1952 );
1953 QueryResult::Scalar(Value::Int(count))
1954 }
1955 AggFunc::CountDistinct => {
1956 let mut seen = rustc_hash::FxHashSet::default();
1957 agg_int_loop!(
1958 self,
1959 table,
1960 compiled_pred,
1961 bitmap_byte,
1962 bitmap_bit,
1963 data_offset,
1964 |v: i64| {
1965 seen.insert(v);
1966 }
1967 );
1968 QueryResult::Scalar(Value::Int(seen.len() as i64))
1969 }
1970 },
1971 TypeId::Float => match function {
1972 AggFunc::Sum => {
1973 let mut sum: f64 = 0.0;
1978 agg_float_loop!(
1979 self,
1980 table,
1981 compiled_pred,
1982 bitmap_byte,
1983 bitmap_bit,
1984 data_offset,
1985 |v: f64| {
1986 sum += v;
1987 }
1988 );
1989 QueryResult::Scalar(Value::Float(sum))
1990 }
1991 AggFunc::Avg => {
1992 let mut sum: f64 = 0.0;
1993 let mut count: i64 = 0;
1994 agg_float_loop!(
1995 self,
1996 table,
1997 compiled_pred,
1998 bitmap_byte,
1999 bitmap_bit,
2000 data_offset,
2001 |v: f64| {
2002 sum += v;
2003 count += 1;
2004 }
2005 );
2006 if count == 0 {
2007 QueryResult::Scalar(Value::Empty)
2008 } else {
2009 QueryResult::Scalar(Value::Float(sum / count as f64))
2010 }
2011 }
2012 AggFunc::Min => {
2013 let mut min_v: Option<f64> = None;
2017 agg_float_loop!(
2018 self,
2019 table,
2020 compiled_pred,
2021 bitmap_byte,
2022 bitmap_bit,
2023 data_offset,
2024 |v: f64| {
2025 min_v = Some(match min_v {
2026 Some(m) => {
2027 if v.total_cmp(&m).is_lt() {
2028 v
2029 } else {
2030 m
2031 }
2032 }
2033 None => v,
2034 });
2035 }
2036 );
2037 QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2038 }
2039 AggFunc::Max => {
2040 let mut max_v: Option<f64> = None;
2041 agg_float_loop!(
2042 self,
2043 table,
2044 compiled_pred,
2045 bitmap_byte,
2046 bitmap_bit,
2047 data_offset,
2048 |v: f64| {
2049 max_v = Some(match max_v {
2050 Some(m) => {
2051 if v.total_cmp(&m).is_gt() {
2052 v
2053 } else {
2054 m
2055 }
2056 }
2057 None => v,
2058 });
2059 }
2060 );
2061 QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2062 }
2063 AggFunc::Count => {
2064 let mut count: i64 = 0;
2065 agg_float_loop!(
2066 self,
2067 table,
2068 compiled_pred,
2069 bitmap_byte,
2070 bitmap_bit,
2071 data_offset,
2072 |_v: f64| {
2073 count += 1;
2074 }
2075 );
2076 QueryResult::Scalar(Value::Int(count))
2077 }
2078 AggFunc::CountDistinct => {
2079 let mut seen = rustc_hash::FxHashSet::default();
2085 agg_float_loop!(
2086 self,
2087 table,
2088 compiled_pred,
2089 bitmap_byte,
2090 bitmap_bit,
2091 data_offset,
2092 |v: f64| {
2093 seen.insert(v.to_bits());
2094 }
2095 );
2096 QueryResult::Scalar(Value::Int(seen.len() as i64))
2097 }
2098 },
2099 _ => unreachable!("type guard above restricts to Int/Float"),
2100 };
2101 Ok(Some(result))
2102 }
2103
2104 pub(super) fn project_filter_limit_fast(
2107 &self,
2108 table: &str,
2109 fields: &[ProjectField],
2110 limit: usize,
2111 predicate: Option<&Expr>,
2112 ) -> Result<Option<QueryResult>, QueryError> {
2113 let schema = self
2114 .catalog
2115 .schema(table)
2116 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2117 .clone();
2118 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2119
2120 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2123 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2124 for f in fields {
2125 let name = match &f.expr {
2126 Expr::Field(n) => n.clone(),
2127 _ => return Ok(None),
2128 };
2129 let idx = match all_columns.iter().position(|c| c == &name) {
2130 Some(i) => i,
2131 None => return Ok(None),
2132 };
2133 proj_indices.push(idx);
2134 proj_columns.push(f.alias.clone().unwrap_or(name));
2135 }
2136
2137 let fast = FastLayout::new(&schema);
2138 let row_layout = RowLayout::new(&schema);
2139
2140 let compiled_pred: Option<CompiledPredicate> = match predicate {
2141 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2142 Some(c) => Some(c),
2143 None => return Ok(None),
2144 },
2145 None => None,
2146 };
2147
2148 let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2149 self.catalog
2154 .try_for_each_row_raw(table, |_rid, data| {
2155 use std::ops::ControlFlow;
2156 if let Some(ref pred) = compiled_pred {
2157 if !pred(data) {
2158 return ControlFlow::Continue(());
2159 }
2160 }
2161 let row: Vec<Value> = proj_indices
2162 .iter()
2163 .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2164 .collect();
2165 out.push(row);
2166 if out.len() >= limit {
2167 ControlFlow::Break(())
2168 } else {
2169 ControlFlow::Continue(())
2170 }
2171 })
2172 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2173
2174 Ok(Some(QueryResult::Rows {
2175 columns: proj_columns,
2176 rows: out,
2177 }))
2178 }
2179
2180 pub(super) fn project_filter_sort_limit_fast(
2185 &self,
2186 table: &str,
2187 fields: &[ProjectField],
2188 sort_field: &str,
2189 descending: bool,
2190 limit: usize,
2191 predicate: Option<&Expr>,
2192 ) -> Result<Option<QueryResult>, QueryError> {
2193 if limit == 0 {
2194 return Ok(None);
2197 }
2198 let schema = self
2199 .catalog
2200 .schema(table)
2201 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2202 .clone();
2203 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2204
2205 let sort_idx = match schema.column_index(sort_field) {
2212 Some(i) => i,
2213 None => return Ok(None),
2214 };
2215 let sort_col_type = schema.columns[sort_idx].type_id;
2216 if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2217 return Ok(None);
2218 }
2219
2220 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2222 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2223 for f in fields {
2224 let name = match &f.expr {
2225 Expr::Field(n) => n.clone(),
2226 _ => return Ok(None),
2227 };
2228 let idx = match all_columns.iter().position(|c| c == &name) {
2229 Some(i) => i,
2230 None => return Ok(None),
2231 };
2232 proj_indices.push(idx);
2233 proj_columns.push(f.alias.clone().unwrap_or(name));
2234 }
2235
2236 let fast = FastLayout::new(&schema);
2237 let row_layout = RowLayout::new(&schema);
2238 let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2240 Some(o) => o,
2241 None => return Ok(None),
2242 };
2243 let sort_bitmap_byte = sort_idx / 8;
2244 let sort_bitmap_bit = (sort_idx % 8) as u32;
2245 let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2246
2247 let compiled_pred: Option<CompiledPredicate> = match predicate {
2248 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2249 Some(c) => Some(c),
2250 None => return Ok(None),
2251 },
2252 None => None,
2253 };
2254
2255 let drained: Vec<Vec<u8>> = match sort_col_type {
2264 TypeId::Int => {
2265 let mut seq: u64 = 0;
2266 let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2267 BinaryHeap::with_capacity(limit);
2268 let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2269 BinaryHeap::with_capacity(limit);
2270
2271 self.catalog
2272 .for_each_row_raw(table, |_rid, data| {
2273 if let Some(ref pred) = compiled_pred {
2274 if !pred(data) {
2275 return;
2276 }
2277 }
2278 if data.len() < sort_data_offset + 8 {
2280 return;
2281 }
2282 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2283 if is_null {
2284 return;
2285 }
2286 let key = i64::from_le_bytes(
2287 data[sort_data_offset..sort_data_offset + 8]
2288 .try_into()
2289 .unwrap_or_else(|_| unreachable!()),
2290 );
2291 let id = seq;
2292 seq += 1;
2293
2294 if descending {
2295 if heap_desc.len() < limit {
2296 heap_desc.push(Reverse((key, id, data.to_vec())));
2297 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2298 if key > *top_key {
2299 heap_desc.pop();
2300 heap_desc.push(Reverse((key, id, data.to_vec())));
2301 }
2302 }
2303 } else if heap_asc.len() < limit {
2304 heap_asc.push((key, id, data.to_vec()));
2305 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2306 if key < *top_key {
2307 heap_asc.pop();
2308 heap_asc.push((key, id, data.to_vec()));
2309 }
2310 }
2311 })
2312 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2313
2314 let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2315 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2316 } else {
2317 heap_asc.into_iter().collect()
2318 };
2319 if descending {
2320 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2321 } else {
2322 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2323 }
2324 drained.into_iter().map(|(_, _, d)| d).collect()
2325 }
2326 TypeId::Float => {
2327 let mut seq: u64 = 0;
2336 let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2337 BinaryHeap::with_capacity(limit);
2338 let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2339 BinaryHeap::with_capacity(limit);
2340
2341 self.catalog
2342 .for_each_row_raw(table, |_rid, data| {
2343 if let Some(ref pred) = compiled_pred {
2344 if !pred(data) {
2345 return;
2346 }
2347 }
2348 if data.len() < sort_data_offset + 8 {
2349 return;
2350 }
2351 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2352 if is_null {
2353 return;
2354 }
2355 let bits = u64::from_le_bytes(
2356 data[sort_data_offset..sort_data_offset + 8]
2357 .try_into()
2358 .unwrap_or_else(|_| unreachable!()),
2359 );
2360 let key = f64_bits_to_sortable_u64(bits);
2361 let id = seq;
2362 seq += 1;
2363
2364 if descending {
2365 if heap_desc.len() < limit {
2366 heap_desc.push(Reverse((key, id, data.to_vec())));
2367 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2368 if key > *top_key {
2369 heap_desc.pop();
2370 heap_desc.push(Reverse((key, id, data.to_vec())));
2371 }
2372 }
2373 } else if heap_asc.len() < limit {
2374 heap_asc.push((key, id, data.to_vec()));
2375 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2376 if key < *top_key {
2377 heap_asc.pop();
2378 heap_asc.push((key, id, data.to_vec()));
2379 }
2380 }
2381 })
2382 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2383
2384 let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2385 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2386 } else {
2387 heap_asc.into_iter().collect()
2388 };
2389 if descending {
2390 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2391 } else {
2392 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2393 }
2394 drained.into_iter().map(|(_, _, d)| d).collect()
2395 }
2396 _ => unreachable!("type guard above restricts to Int/Float"),
2397 };
2398
2399 let rows: Vec<Vec<Value>> = drained
2400 .into_iter()
2401 .map(|data| {
2402 proj_indices
2403 .iter()
2404 .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2405 .collect()
2406 })
2407 .collect();
2408
2409 Ok(Some(QueryResult::Rows {
2410 columns: proj_columns,
2411 rows,
2412 }))
2413 }
2414
2415 fn try_fused_scan_update(
2432 &mut self,
2433 table: &str,
2434 predicate: &Expr,
2435 resolved: &[(usize, Value)],
2436 changed_cols: &[usize],
2437 ) -> Option<Result<QueryResult, QueryError>> {
2438 let compiled = {
2441 let schema = self.catalog.schema(table)?;
2442 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2443 let fast = FastLayout::new(schema);
2444 compile_predicate(predicate, &columns, &fast, schema)?
2445 };
2446
2447 let fixed_patches: Option<Vec<FastPatch>> = {
2449 let tbl = self.catalog.get_table(table)?;
2450 let schema = &tbl.schema;
2451 let all_fixed_nonnull = resolved
2452 .iter()
2453 .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2454 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2455 if all_fixed_nonnull && no_indexed {
2456 let layout = RowLayout::new(schema);
2457 let bitmap_size = layout.bitmap_size();
2458 Some(
2459 resolved
2460 .iter()
2461 .map(|(idx, val)| {
2462 let fixed_off = layout
2463 .fixed_offset(*idx)
2464 .expect("is_fixed_size already checked");
2465 let field_off = 2 + bitmap_size + fixed_off;
2466 let bytes: FixedBytes = match val {
2467 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2468 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2469 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2470 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2471 Value::Uuid(v) => FixedBytes::Uuid(*v),
2472 _ => unreachable!("all_fixed_nonnull guard"),
2473 };
2474 FastPatch {
2475 field_off,
2476 bitmap_byte_off: 2 + idx / 8,
2477 bit_mask: 1u8 << (idx % 8),
2478 bytes,
2479 }
2480 })
2481 .collect(),
2482 )
2483 } else {
2484 None
2485 }
2486 };
2487 if let Some(patches) = fixed_patches {
2488 let result = self
2489 .catalog
2490 .scan_patch_matching_logged(table, compiled, |row| {
2491 for p in &patches {
2492 row[p.bitmap_byte_off] &= !p.bit_mask;
2493 let field_bytes = p.bytes.as_slice();
2494 row[p.field_off..p.field_off + field_bytes.len()]
2495 .copy_from_slice(field_bytes);
2496 }
2497 Some(row.len() as u16)
2498 })
2499 .map_err(|e| e.to_string());
2500 match result {
2501 Ok((count, _)) => {
2502 self.view_registry.mark_dependents_dirty(table);
2503 return Some(Ok(QueryResult::Modified(count)));
2504 }
2505 Err(e) => return Some(Err(QueryError::Execution(e))),
2506 }
2507 }
2508
2509 let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2511 let tbl = self.catalog.get_table(table)?;
2512 let schema = &tbl.schema;
2513 let is_single = resolved.len() == 1;
2514 let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2515 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2516 if is_single && is_var && no_indexed {
2517 let (idx, val) = &resolved[0];
2518 let bytes_opt = match val {
2519 Value::Str(s) => Some(s.as_bytes().to_vec()),
2520 Value::Bytes(b) => Some(b.clone()),
2521 Value::Empty => None,
2522 _ => return None, };
2524 Some((*idx, bytes_opt))
2525 } else {
2526 None
2527 }
2528 };
2529 if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2530 let layout = {
2532 let schema = self.catalog.schema(table)?;
2533 RowLayout::new(schema)
2534 };
2535 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2536 let result = self
2537 .catalog
2538 .scan_patch_matching_logged(table, compiled, |row| {
2539 patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2540 })
2541 .map_err(|e| e.to_string());
2542 match result {
2543 Ok((mut count, fallback_rids)) => {
2544 for rid in fallback_rids {
2546 let mut row = match self.catalog.get(table, rid) {
2547 Some(r) => r,
2548 None => continue,
2549 };
2550 for (idx, val) in resolved.iter() {
2551 row[*idx] = val.clone();
2552 }
2553 self.catalog
2554 .update_hinted(table, rid, &row, Some(changed_cols))
2555 .map_err(|e| e.to_string())
2556 .ok();
2557 count += 1;
2558 }
2559 self.view_registry.mark_dependents_dirty(table);
2560 return Some(Ok(QueryResult::Modified(count)));
2561 }
2562 Err(e) => return Some(Err(QueryError::Execution(e))),
2563 }
2564 }
2565
2566 None }
2568
2569 fn collect_rids_for_mutation(
2575 &mut self,
2576 input: &PlanNode,
2577 table: &str,
2578 ) -> Result<Vec<RowId>, QueryError> {
2579 match input {
2580 PlanNode::SeqScan { table: t } if t == table => {
2581 let rids: Vec<RowId> = self
2583 .catalog
2584 .scan(table)
2585 .map_err(|e| QueryError::StorageError(e.to_string()))?
2586 .map(|(rid, _)| rid)
2587 .collect();
2588 Ok(rids)
2589 }
2590 PlanNode::IndexScan {
2591 table: t,
2592 column,
2593 key,
2594 } if t == table => {
2595 let key_value = literal_to_value(key)?;
2596
2597 {
2606 let tbl = self
2607 .catalog
2608 .get_table(table)
2609 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2610 if let Some(btree) = tbl.index(column) {
2611 let hit = match &key_value {
2612 Value::Int(k) => btree.lookup_int(*k),
2613 other => btree.lookup(other),
2614 };
2615 return Ok(match hit {
2616 Some(rid) => vec![rid],
2617 None => Vec::new(),
2618 });
2619 }
2620 }
2621
2622 let schema = self
2627 .catalog
2628 .schema(table)
2629 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2630 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2631 let fast = FastLayout::new(schema);
2632 let synth = Expr::BinaryOp(
2633 Box::new(Expr::Field(column.clone())),
2634 BinOp::Eq,
2635 Box::new(key.clone()),
2636 );
2637 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2638 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2640 self.catalog
2641 .for_each_row_raw(table, |rid, data| {
2642 if compiled(data) {
2643 rids.push(rid);
2644 }
2645 })
2646 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2647 return Ok(rids);
2648 }
2649
2650 let col_idx =
2652 schema
2653 .column_index(column)
2654 .ok_or_else(|| QueryError::ColumnNotFound {
2655 table: String::new(),
2656 column: column.clone(),
2657 })?;
2658 let rids: Vec<RowId> = self
2659 .catalog
2660 .scan(table)
2661 .map_err(|e| QueryError::StorageError(e.to_string()))?
2662 .filter_map(|(rid, row)| {
2663 if row[col_idx] == key_value {
2664 Some(rid)
2665 } else {
2666 None
2667 }
2668 })
2669 .collect();
2670 Ok(rids)
2671 }
2672 PlanNode::Filter {
2673 input: inner,
2674 predicate,
2675 } => {
2676 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2677 if t != table {
2678 return self.generic_rid_match(input, table);
2679 }
2680 let schema = self
2681 .catalog
2682 .schema(table)
2683 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2684 let columns: Vec<String> =
2685 schema.columns.iter().map(|c| c.name.clone()).collect();
2686 let fast = FastLayout::new(schema);
2687 let row_layout = RowLayout::new(schema);
2688
2689 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2691 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2693 self.catalog
2694 .for_each_row_raw(table, |rid, data| {
2695 if compiled(data) {
2696 rids.push(rid);
2697 }
2698 })
2699 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2700 return Ok(rids);
2701 }
2702
2703 let pred_cols = predicate_column_indices(predicate, &columns);
2705 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2706 self.catalog
2707 .for_each_row_raw(table, |rid, data| {
2708 let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2709 if eval_predicate(predicate, &pred_row, &columns) {
2710 rids.push(rid);
2711 }
2712 })
2713 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2714 return Ok(rids);
2715 }
2716 self.generic_rid_match(input, table)
2717 }
2718 _ => self.generic_rid_match(input, table),
2719 }
2720 }
2721
2722 fn generic_rid_match(
2726 &mut self,
2727 input: &PlanNode,
2728 table: &str,
2729 ) -> Result<Vec<RowId>, QueryError> {
2730 let result = self.execute_plan(input)?;
2731 let rows = match result {
2732 QueryResult::Rows { rows, .. } => rows,
2733 _ => return Err("mutation source must be rows".into()),
2734 };
2735 let matching: Vec<RowId> = self
2736 .catalog
2737 .scan(table)
2738 .map_err(|e| QueryError::StorageError(e.to_string()))?
2739 .filter(|(_, row)| rows.iter().any(|r| r == row))
2740 .map(|(rid, _)| rid)
2741 .collect();
2742 Ok(matching)
2743 }
2744}
2745
2746pub(super) fn execute_window(
2747 result: QueryResult,
2748 windows: &[WindowDef],
2749) -> Result<QueryResult, QueryError> {
2750 let (mut columns, mut rows) = match result {
2751 QueryResult::Rows { columns, rows } => (columns, rows),
2752 _ => return Err("window function requires row input".into()),
2753 };
2754
2755 for wdef in windows {
2756 let part_indices: Vec<usize> = wdef
2758 .partition_by
2759 .iter()
2760 .map(|name| {
2761 columns
2762 .iter()
2763 .position(|c| c == name)
2764 .ok_or_else(|| format!("window partition column '{name}' not found"))
2765 })
2766 .collect::<Result<Vec<_>, _>>()?;
2767
2768 let ord_indices: Vec<(usize, bool)> = wdef
2769 .order_by
2770 .iter()
2771 .map(|sk| {
2772 columns
2773 .iter()
2774 .position(|c| c == &sk.field)
2775 .map(|i| (i, sk.descending))
2776 .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2777 })
2778 .collect::<Result<Vec<_>, _>>()?;
2779
2780 let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2782 match arg {
2783 Expr::Field(name) => {
2784 if name == "*" {
2785 None } else {
2787 Some(
2788 columns
2789 .iter()
2790 .position(|c| c == name)
2791 .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2792 )
2793 }
2794 }
2795 _ => None,
2796 }
2797 } else {
2798 None
2799 };
2800
2801 let n = rows.len();
2805 let mut indices: Vec<usize> = (0..n).collect();
2806 indices.sort_by(|&a, &b| {
2807 for &pi in &part_indices {
2809 let cmp = rows[a][pi].cmp(&rows[b][pi]);
2810 if cmp != std::cmp::Ordering::Equal {
2811 return cmp;
2812 }
2813 }
2814 for &(oi, desc) in &ord_indices {
2816 let cmp = rows[a][oi].cmp(&rows[b][oi]);
2817 if cmp != std::cmp::Ordering::Equal {
2818 return if desc { cmp.reverse() } else { cmp };
2819 }
2820 }
2821 std::cmp::Ordering::Equal
2822 });
2823
2824 let mut win_values: Vec<Value> = vec![Value::Empty; n];
2826 let mut partition_start = 0usize;
2827 let mut running_count: i64 = 0;
2829 let mut running_int_sum: i64 = 0;
2830 let mut running_float_sum: f64 = 0.0;
2831 let mut running_saw_float = false;
2832 let mut running_min: Option<Value> = None;
2833 let mut running_max: Option<Value> = None;
2834 let mut rank_counter: i64 = 0;
2835 let mut dense_rank_counter: i64 = 0;
2836 let mut prev_order_key: Option<Vec<Value>> = None;
2837 let mut same_rank_count: i64 = 0;
2838
2839 for sorted_pos in 0..n {
2840 let row_idx = indices[sorted_pos];
2841
2842 let new_partition = if sorted_pos == 0 {
2844 true
2845 } else {
2846 let prev_row_idx = indices[sorted_pos - 1];
2847 part_indices
2848 .iter()
2849 .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
2850 };
2851
2852 if new_partition {
2853 partition_start = sorted_pos;
2854 running_count = 0;
2855 running_int_sum = 0;
2856 running_float_sum = 0.0;
2857 running_saw_float = false;
2858 running_min = None;
2859 running_max = None;
2860 rank_counter = 0;
2861 dense_rank_counter = 0;
2862 prev_order_key = None;
2863 same_rank_count = 0;
2864 }
2865
2866 let current_order_key: Vec<Value> = ord_indices
2868 .iter()
2869 .map(|&(oi, _)| rows[row_idx][oi].clone())
2870 .collect();
2871 let same_as_prev = prev_order_key.as_ref() == Some(¤t_order_key);
2872
2873 let value = match wdef.function {
2874 WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
2875 WindowFunc::Rank => {
2876 if same_as_prev {
2877 same_rank_count += 1;
2878 } else {
2879 rank_counter += same_rank_count + 1;
2880 same_rank_count = 0;
2881 if rank_counter == 0 {
2882 rank_counter = 1;
2883 }
2884 }
2885 Value::Int(rank_counter)
2886 }
2887 WindowFunc::DenseRank => {
2888 if !same_as_prev {
2889 dense_rank_counter += 1;
2890 }
2891 Value::Int(dense_rank_counter)
2892 }
2893 WindowFunc::Sum => {
2894 if let Some(ci) = arg_col_idx {
2895 match &rows[row_idx][ci] {
2896 Value::Int(v) => running_int_sum += v,
2897 Value::Float(v) => {
2898 running_float_sum += v;
2899 running_saw_float = true;
2900 }
2901 _ => {}
2902 }
2903 }
2904 if running_saw_float {
2905 Value::Float(running_float_sum + running_int_sum as f64)
2906 } else {
2907 Value::Int(running_int_sum)
2908 }
2909 }
2910 WindowFunc::Avg => {
2911 if let Some(ci) = arg_col_idx {
2912 match &rows[row_idx][ci] {
2913 Value::Int(v) => {
2914 running_float_sum += *v as f64;
2915 running_count += 1;
2916 }
2917 Value::Float(v) => {
2918 running_float_sum += v;
2919 running_count += 1;
2920 }
2921 _ => {}
2922 }
2923 }
2924 if running_count == 0 {
2925 Value::Empty
2926 } else {
2927 Value::Float(running_float_sum / running_count as f64)
2928 }
2929 }
2930 WindowFunc::Count => {
2931 if let Some(ci) = arg_col_idx {
2932 if !rows[row_idx][ci].is_empty() {
2933 running_count += 1;
2934 }
2935 } else {
2936 running_count += 1;
2938 }
2939 Value::Int(running_count)
2940 }
2941 WindowFunc::Min => {
2942 if let Some(ci) = arg_col_idx {
2943 let v = &rows[row_idx][ci];
2944 if !v.is_empty() {
2945 running_min = Some(match &running_min {
2946 None => v.clone(),
2947 Some(cur) => {
2948 if v < cur {
2949 v.clone()
2950 } else {
2951 cur.clone()
2952 }
2953 }
2954 });
2955 }
2956 }
2957 running_min.clone().unwrap_or(Value::Empty)
2958 }
2959 WindowFunc::Max => {
2960 if let Some(ci) = arg_col_idx {
2961 let v = &rows[row_idx][ci];
2962 if !v.is_empty() {
2963 running_max = Some(match &running_max {
2964 None => v.clone(),
2965 Some(cur) => {
2966 if v > cur {
2967 v.clone()
2968 } else {
2969 cur.clone()
2970 }
2971 }
2972 });
2973 }
2974 }
2975 running_max.clone().unwrap_or(Value::Empty)
2976 }
2977 };
2978
2979 prev_order_key = Some(current_order_key);
2980 win_values[row_idx] = value;
2981 }
2982
2983 for (ri, row) in rows.iter_mut().enumerate() {
2985 row.push(win_values[ri].clone());
2986 }
2987 columns.push(wdef.output_name.clone());
2988 }
2989
2990 Ok(QueryResult::Rows { columns, rows })
2991}
2992
2993pub(super) fn compute_group_aggregate(
2995 func: AggFunc,
2996 all_rows: &[Vec<Value>],
2997 row_indices: &[usize],
2998 col_idx: usize,
2999) -> Value {
3000 match func {
3001 AggFunc::Count => {
3002 if col_idx == usize::MAX {
3003 return Value::Int(row_indices.len() as i64);
3005 }
3006 let count = row_indices
3007 .iter()
3008 .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3009 .count();
3010 Value::Int(count as i64)
3011 }
3012 AggFunc::CountDistinct => {
3013 let mut seen = std::collections::HashSet::new();
3014 for &ri in row_indices {
3015 let v = &all_rows[ri][col_idx];
3016 if !v.is_empty() {
3017 seen.insert(v.clone());
3018 }
3019 }
3020 Value::Int(seen.len() as i64)
3021 }
3022 AggFunc::Sum => {
3023 let mut int_sum: i64 = 0;
3028 let mut float_sum: f64 = 0.0;
3029 let mut saw_float = false;
3030 for &ri in row_indices {
3031 match &all_rows[ri][col_idx] {
3032 Value::Int(v) => int_sum += v,
3033 Value::Float(v) => {
3034 float_sum += *v;
3035 saw_float = true;
3036 }
3037 _ => {}
3038 }
3039 }
3040 if saw_float {
3041 Value::Float(float_sum + int_sum as f64)
3042 } else {
3043 Value::Int(int_sum)
3044 }
3045 }
3046 AggFunc::Avg => {
3047 let mut sum = 0.0f64;
3048 let mut count = 0usize;
3049 for &ri in row_indices {
3050 match &all_rows[ri][col_idx] {
3051 Value::Int(v) => {
3052 sum += *v as f64;
3053 count += 1;
3054 }
3055 Value::Float(v) => {
3056 sum += *v;
3057 count += 1;
3058 }
3059 _ => {}
3060 }
3061 }
3062 if count == 0 {
3063 Value::Empty
3064 } else {
3065 Value::Float(sum / count as f64)
3066 }
3067 }
3068 AggFunc::Min => row_indices
3069 .iter()
3070 .map(|&ri| &all_rows[ri][col_idx])
3071 .filter(|v| !v.is_empty())
3072 .min()
3073 .cloned()
3074 .unwrap_or(Value::Empty),
3075 AggFunc::Max => row_indices
3076 .iter()
3077 .map(|&ri| &all_rows[ri][col_idx])
3078 .filter(|v| !v.is_empty())
3079 .max()
3080 .cloned()
3081 .unwrap_or(Value::Empty),
3082 }
3083}
3084
3085pub(super) fn try_extract_equi_join_keys(
3099 pred: &Expr,
3100 left_columns: &[String],
3101 right_columns: &[String],
3102) -> Option<(usize, usize)> {
3103 let (lhs, op, rhs) = match pred {
3104 Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3105 _ => return None,
3106 };
3107 if op != BinOp::Eq {
3108 return None;
3109 }
3110 if let (Some(li), Some(ri)) = (
3112 resolve_side_column(lhs, left_columns),
3113 resolve_side_column(rhs, right_columns),
3114 ) {
3115 return Some((li, ri));
3116 }
3117 if let (Some(li), Some(ri)) = (
3120 resolve_side_column(rhs, left_columns),
3121 resolve_side_column(lhs, right_columns),
3122 ) {
3123 return Some((li, ri));
3124 }
3125 None
3126}
3127
3128fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3129 match expr {
3130 Expr::QualifiedField { qualifier, field } => {
3131 let q = qualifier.as_bytes();
3136 let f = field.as_bytes();
3137 columns.iter().position(|c| {
3138 let b = c.as_bytes();
3139 b.len() == q.len() + 1 + f.len()
3140 && b[..q.len()] == *q
3141 && b[q.len()] == b'.'
3142 && b[q.len() + 1..] == *f
3143 })
3144 }
3145 Expr::Field(name) => columns.iter().position(|c| c == name),
3146 _ => None,
3147 }
3148}
3149
3150pub(super) fn hash_join(
3162 left_columns: Vec<String>,
3163 left_rows: Vec<Vec<Value>>,
3164 right_columns: Vec<String>,
3165 right_rows: Vec<Vec<Value>>,
3166 left_key_idx: usize,
3167 right_key_idx: usize,
3168 kind: JoinKind,
3169) -> QueryResult {
3170 use rustc_hash::FxHashMap;
3171
3172 let n_left = left_columns.len();
3173 let n_right = right_columns.len();
3174 let mut columns = Vec::with_capacity(n_left + n_right);
3175 columns.extend(left_columns);
3176 columns.extend(right_columns);
3177
3178 let mut build: FxHashMap<Value, Vec<usize>> =
3181 FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3182 for (i, row) in right_rows.iter().enumerate() {
3183 if matches!(row[right_key_idx], Value::Empty) {
3187 continue;
3188 }
3189 build.entry(row[right_key_idx].clone()).or_default().push(i);
3190 }
3191
3192 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3195
3196 for left_row in &left_rows {
3197 let key = &left_row[left_key_idx];
3198 let matched = if matches!(key, Value::Empty) {
3199 None
3200 } else {
3201 build.get(key)
3202 };
3203 match matched {
3204 Some(matches) if !matches.is_empty() => {
3205 for &ri in matches {
3206 let right_row = &right_rows[ri];
3207 let mut combined = Vec::with_capacity(n_left + n_right);
3208 combined.extend_from_slice(left_row);
3209 combined.extend_from_slice(right_row);
3210 rows.push(combined);
3211 }
3212 }
3213 _ => {
3214 if matches!(kind, JoinKind::LeftOuter) {
3215 let mut row = Vec::with_capacity(n_left + n_right);
3216 row.extend_from_slice(left_row);
3217 row.resize(n_left + n_right, Value::Empty);
3218 rows.push(row);
3219 }
3220 }
3221 }
3222 }
3223
3224 QueryResult::Rows { columns, rows }
3225}
3226
3227pub(super) fn lower_unindexed_range_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3240 match plan {
3241 PlanNode::RangeScan {
3242 table,
3243 column,
3244 start,
3245 end,
3246 } => {
3247 if let Some(tbl) = catalog.get_table(table) {
3248 if tbl.index(column).is_some() {
3249 return plan.clone();
3250 }
3251 }
3252 let pred = synthesize_range_predicate(column, start, end);
3253 PlanNode::Filter {
3254 input: Box::new(PlanNode::SeqScan {
3255 table: table.clone(),
3256 }),
3257 predicate: pred,
3258 }
3259 }
3260 PlanNode::Filter { input, predicate } => PlanNode::Filter {
3261 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3262 predicate: predicate.clone(),
3263 },
3264 PlanNode::Project { input, fields } => PlanNode::Project {
3265 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3266 fields: fields.clone(),
3267 },
3268 PlanNode::Sort { input, keys } => PlanNode::Sort {
3269 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3270 keys: keys.clone(),
3271 },
3272 PlanNode::Limit { input, count } => PlanNode::Limit {
3273 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3274 count: count.clone(),
3275 },
3276 PlanNode::Offset { input, count } => PlanNode::Offset {
3277 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3278 count: count.clone(),
3279 },
3280 PlanNode::Aggregate {
3281 input,
3282 function,
3283 field,
3284 } => PlanNode::Aggregate {
3285 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3286 function: *function,
3287 field: field.clone(),
3288 },
3289 PlanNode::Distinct { input } => PlanNode::Distinct {
3290 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3291 },
3292 PlanNode::GroupBy {
3293 input,
3294 keys,
3295 aggregates,
3296 having,
3297 } => PlanNode::GroupBy {
3298 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3299 keys: keys.clone(),
3300 aggregates: aggregates.clone(),
3301 having: having.clone(),
3302 },
3303 PlanNode::Update {
3304 input,
3305 table,
3306 assignments,
3307 } => PlanNode::Update {
3308 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3309 table: table.clone(),
3310 assignments: assignments.clone(),
3311 },
3312 PlanNode::Delete { input, table } => PlanNode::Delete {
3313 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3314 table: table.clone(),
3315 },
3316 PlanNode::Window { input, windows } => PlanNode::Window {
3317 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3318 windows: windows.clone(),
3319 },
3320 PlanNode::Union { left, right, all } => PlanNode::Union {
3321 left: Box::new(lower_unindexed_range_scans(catalog, left)),
3322 right: Box::new(lower_unindexed_range_scans(catalog, right)),
3323 all: *all,
3324 },
3325 PlanNode::Explain { input } => PlanNode::Explain {
3326 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3327 },
3328 PlanNode::NestedLoopJoin {
3329 left,
3330 right,
3331 on,
3332 kind,
3333 } => PlanNode::NestedLoopJoin {
3334 left: Box::new(lower_unindexed_range_scans(catalog, left)),
3335 right: Box::new(lower_unindexed_range_scans(catalog, right)),
3336 on: on.clone(),
3337 kind: *kind,
3338 },
3339 _ => plan.clone(),
3341 }
3342}
3343
3344pub(super) fn synthesize_range_predicate(
3346 column: &str,
3347 start: &Option<(Expr, bool)>,
3348 end: &Option<(Expr, bool)>,
3349) -> Expr {
3350 let lower = start.as_ref().map(|(expr, inclusive)| {
3351 let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3352 Expr::BinaryOp(
3353 Box::new(Expr::Field(column.to_string())),
3354 op,
3355 Box::new(expr.clone()),
3356 )
3357 });
3358 let upper = end.as_ref().map(|(expr, inclusive)| {
3359 let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3360 Expr::BinaryOp(
3361 Box::new(Expr::Field(column.to_string())),
3362 op,
3363 Box::new(expr.clone()),
3364 )
3365 });
3366 match (lower, upper) {
3367 (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3368 (Some(l), None) => l,
3369 (None, Some(u)) => u,
3370 (None, None) => Expr::Literal(Literal::Bool(true)),
3371 }
3372}
3373
3374pub(super) fn range_matches(
3376 val: &Value,
3377 start: &Option<Value>,
3378 start_inc: bool,
3379 end: &Option<Value>,
3380 end_inc: bool,
3381) -> bool {
3382 if let Some(ref s) = start {
3383 if start_inc {
3384 if val < s {
3385 return false;
3386 }
3387 } else if val <= s {
3388 return false;
3389 }
3390 }
3391 if let Some(ref e) = end {
3392 if end_inc {
3393 if val > e {
3394 return false;
3395 }
3396 } else if val >= e {
3397 return false;
3398 }
3399 }
3400 true
3401}
3402
3403pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3406 let indent = " ".repeat(depth);
3407 match plan {
3408 PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3409 PlanNode::AliasScan { table, alias } => {
3410 format!("{indent}AliasScan table={table} alias={alias}")
3411 }
3412 PlanNode::IndexScan { table, column, key } => {
3413 format!("{indent}IndexScan table={table} column={column} key={key:?}")
3414 }
3415 PlanNode::RangeScan {
3416 table,
3417 column,
3418 start,
3419 end,
3420 } => {
3421 let s = match start {
3422 Some((expr, inc)) => {
3423 let op = if *inc { ">=" } else { ">" };
3424 format!("{op}{expr:?}")
3425 }
3426 None => "unbounded".to_string(),
3427 };
3428 let e = match end {
3429 Some((expr, inc)) => {
3430 let op = if *inc { "<=" } else { "<" };
3431 format!("{op}{expr:?}")
3432 }
3433 None => "unbounded".to_string(),
3434 };
3435 format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3436 }
3437 PlanNode::Filter { input, predicate } => {
3438 let child = format_plan_tree(input, depth + 1);
3439 format!("{indent}Filter predicate={predicate:?}\n{child}")
3440 }
3441 PlanNode::Project { input, fields } => {
3442 let names: Vec<String> = fields
3443 .iter()
3444 .map(|f| match &f.alias {
3445 Some(a) => format!("{a}: {:?}", f.expr),
3446 None => format!("{:?}", f.expr),
3447 })
3448 .collect();
3449 let child = format_plan_tree(input, depth + 1);
3450 format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3451 }
3452 PlanNode::Sort { input, keys } => {
3453 let ks: Vec<String> = keys
3454 .iter()
3455 .map(|k| {
3456 if k.descending {
3457 format!("{} desc", k.field)
3458 } else {
3459 k.field.clone()
3460 }
3461 })
3462 .collect();
3463 let child = format_plan_tree(input, depth + 1);
3464 format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3465 }
3466 PlanNode::Limit { input, count } => {
3467 let child = format_plan_tree(input, depth + 1);
3468 format!("{indent}Limit count={count:?}\n{child}")
3469 }
3470 PlanNode::Offset { input, count } => {
3471 let child = format_plan_tree(input, depth + 1);
3472 format!("{indent}Offset count={count:?}\n{child}")
3473 }
3474 PlanNode::Aggregate {
3475 input,
3476 function,
3477 field,
3478 } => {
3479 let f = field.as_deref().unwrap_or("*");
3480 let child = format_plan_tree(input, depth + 1);
3481 format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3482 }
3483 PlanNode::NestedLoopJoin {
3484 left,
3485 right,
3486 on,
3487 kind,
3488 } => {
3489 let left_child = format_plan_tree(left, depth + 1);
3490 let right_child = format_plan_tree(right, depth + 1);
3491 let on_str = match on {
3492 Some(pred) => format!("{pred:?}"),
3493 None => "none".to_string(),
3494 };
3495 format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3496 }
3497 PlanNode::Distinct { input } => {
3498 let child = format_plan_tree(input, depth + 1);
3499 format!("{indent}Distinct\n{child}")
3500 }
3501 PlanNode::GroupBy {
3502 input,
3503 keys,
3504 aggregates,
3505 having,
3506 } => {
3507 let agg_strs: Vec<String> = aggregates
3508 .iter()
3509 .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3510 .collect();
3511 let having_str = match having {
3512 Some(h) => format!(" having={h:?}"),
3513 None => String::new(),
3514 };
3515 let child = format_plan_tree(input, depth + 1);
3516 format!(
3517 "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3518 keys.join(", "),
3519 agg_strs.join(", "),
3520 )
3521 }
3522 PlanNode::Insert { table, assignments } => {
3523 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3524 format!("{indent}Insert table={table} cols=[{}]", cols.join(", "))
3525 }
3526 PlanNode::Upsert {
3527 table,
3528 key_column,
3529 assignments,
3530 on_conflict,
3531 } => {
3532 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3533 let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3534 if conflict_cols.is_empty() {
3535 format!(
3536 "{indent}Upsert table={table} key={key_column} cols=[{}]",
3537 cols.join(", ")
3538 )
3539 } else {
3540 format!(
3541 "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3542 cols.join(", "),
3543 conflict_cols.join(", ")
3544 )
3545 }
3546 }
3547 PlanNode::Update {
3548 input,
3549 table,
3550 assignments,
3551 } => {
3552 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3553 let child = format_plan_tree(input, depth + 1);
3554 format!(
3555 "{indent}Update table={table} set=[{}]\n{child}",
3556 cols.join(", ")
3557 )
3558 }
3559 PlanNode::Delete { input, table } => {
3560 let child = format_plan_tree(input, depth + 1);
3561 format!("{indent}Delete table={table}\n{child}")
3562 }
3563 PlanNode::CreateTable { name, fields } => {
3564 let fs: Vec<String> = fields
3565 .iter()
3566 .map(|(n, t, r)| {
3567 if *r {
3568 format!("{n}: {t} required")
3569 } else {
3570 format!("{n}: {t}")
3571 }
3572 })
3573 .collect();
3574 format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3575 }
3576 PlanNode::AlterTable { table, action } => {
3577 format!("{indent}AlterTable table={table} action={action:?}")
3578 }
3579 PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3580 PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3581 PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3582 PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3583 PlanNode::Window { input, windows } => {
3584 let ws: Vec<String> = windows
3585 .iter()
3586 .map(|w| format!("{:?} as {}", w.function, w.output_name))
3587 .collect();
3588 let child = format_plan_tree(input, depth + 1);
3589 format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3590 }
3591 PlanNode::Union { left, right, all } => {
3592 let kind = if *all { "UNION ALL" } else { "UNION" };
3593 let left_child = format_plan_tree(left, depth + 1);
3594 let right_child = format_plan_tree(right, depth + 1);
3595 format!("{indent}{kind}\n{left_child}\n{right_child}")
3596 }
3597 PlanNode::Explain { input } => {
3598 let child = format_plan_tree(input, depth + 1);
3599 format!("{indent}Explain\n{child}")
3600 }
3601 }
3602}