1use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18 pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19 match plan {
20 PlanNode::SeqScan { table } => {
21 if self.view_registry.is_dirty(table) {
23 self.refresh_view(table)?;
24 }
25 let schema = self
26 .catalog
27 .schema(table)
28 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29 .clone();
30 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31 let rows: Vec<Vec<Value>> = self
32 .catalog
33 .scan(table)
34 .map_err(|e| QueryError::StorageError(e.to_string()))?
35 .map(|(_, row)| row)
36 .collect();
37 Ok(QueryResult::Rows { columns, rows })
38 }
39
40 PlanNode::Filter { input, predicate } => {
41 let materialized;
45 let predicate = if contains_subquery(predicate) {
46 materialized = self.materialize_subqueries(predicate)?;
47 &materialized
48 } else {
49 predicate
50 };
51
52 if contains_subquery(predicate) {
54 let result = self.execute_plan(input)?;
55 return match result {
56 QueryResult::Rows { columns, rows } => {
57 let mut filtered = Vec::new();
58 for row in rows {
59 let row_pred =
60 self.materialize_correlated_for_row(predicate, &row, &columns)?;
61 if eval_predicate(&row_pred, &row, &columns) {
62 filtered.push(row);
63 }
64 }
65 Ok(QueryResult::Rows {
66 columns,
67 rows: filtered,
68 })
69 }
70 _ => Err("filter requires row input".into()),
71 };
72 }
73
74 if let PlanNode::SeqScan { table } = input.as_ref() {
79 if self.view_registry.is_dirty(table) {
81 self.refresh_view(table)?;
82 }
83 let schema = self
84 .catalog
85 .schema(table)
86 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87 .clone();
88 let columns: Vec<String> =
89 schema.columns.iter().map(|c| c.name.clone()).collect();
90 let fast = FastLayout::new(&schema);
91 let row_layout = RowLayout::new(&schema);
92 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100 self.catalog
101 .for_each_row_raw(table, |_rid, data| {
102 if compiled(data) {
103 rows.push(decode_row(&schema, data));
104 }
105 })
106 .map_err(|e| QueryError::StorageError(e.to_string()))?;
107 } else {
108 let pred_cols = predicate_column_indices(predicate, &columns);
109 self.catalog
110 .for_each_row_raw(table, |_rid, data| {
111 let pred_row =
112 decode_selective(&schema, &row_layout, data, &pred_cols);
113 if eval_predicate(predicate, &pred_row, &columns) {
114 rows.push(decode_row(&schema, data));
115 }
116 })
117 .map_err(|e| QueryError::StorageError(e.to_string()))?;
118 }
119
120 return Ok(QueryResult::Rows { columns, rows });
121 }
122
123 let result = self.execute_plan(input)?;
125 match result {
126 QueryResult::Rows { columns, rows } => {
127 let filtered: Vec<Vec<Value>> = rows
128 .into_iter()
129 .filter(|row| eval_predicate(predicate, row, &columns))
130 .collect();
131 Ok(QueryResult::Rows {
132 columns,
133 rows: filtered,
134 })
135 }
136 _ => Err("filter requires row input".into()),
137 }
138 }
139
140 PlanNode::Project { input, fields } => {
141 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144 let schema = self
145 .catalog
146 .schema(table)
147 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148 .clone();
149 let all_columns: Vec<String> =
150 schema.columns.iter().map(|c| c.name.clone()).collect();
151 let key_value = literal_to_value(key)?;
152 let tbl = self
153 .catalog
154 .get_table(table)
155 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157 let proj_columns: Vec<String> = fields
158 .iter()
159 .map(|f| {
160 f.alias.clone().unwrap_or_else(|| match &f.expr {
161 Expr::Field(name) => name.clone(),
162 _ => "?".into(),
163 })
164 })
165 .collect();
166
167 let proj_indices: Vec<usize> = fields
169 .iter()
170 .filter_map(|f| {
171 if let Expr::Field(name) = &f.expr {
172 all_columns.iter().position(|c| c == name)
173 } else {
174 None
175 }
176 })
177 .collect();
178
179 if tbl.has_index(column) {
180 let layout = RowLayout::new(&schema);
181 let rids = tbl.index_lookup_all(column, &key_value);
182 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183 for rid in rids {
184 if let Some(data) = tbl.heap.get(rid) {
185 let row: Vec<Value> = proj_indices
186 .iter()
187 .map(|&ci| decode_column(&schema, &layout, &data, ci))
188 .collect();
189 rows.push(row);
190 }
191 }
192 return Ok(QueryResult::Rows {
193 columns: proj_columns,
194 rows,
195 });
196 }
197 }
198
199 if let PlanNode::Limit {
204 input: inner,
205 count: limit_expr,
206 } = input.as_ref()
207 {
208 if let PlanNode::Sort {
209 input: sort_input,
210 keys,
211 } = inner.as_ref()
212 {
213 if keys.len() == 1 {
215 let sort_field = &keys[0].field;
216 let descending = keys[0].descending;
217 let limit = match limit_expr {
218 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219 _ => usize::MAX,
220 };
221 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222 match sort_input.as_ref() {
223 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224 PlanNode::Filter {
225 input: fi,
226 predicate,
227 } => {
228 if let PlanNode::SeqScan { table } = fi.as_ref() {
229 (Some(table.as_str()), Some(predicate))
230 } else {
231 (None, None)
232 }
233 }
234 _ => (None, None),
235 };
236 if let Some(table) = table_opt {
237 if let Some(result) = self.project_filter_sort_limit_fast(
238 table, fields, sort_field, descending, limit, pred_opt,
239 )? {
240 return Ok(result);
241 }
242 }
243 }
244 }
245 if let PlanNode::Filter {
248 input: fi,
249 predicate,
250 } = inner.as_ref()
251 {
252 if let PlanNode::SeqScan { table } = fi.as_ref() {
253 let limit = match limit_expr {
254 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255 _ => usize::MAX,
256 };
257 if let Some(result) = self.project_filter_limit_fast(
258 table,
259 fields,
260 limit,
261 Some(predicate),
262 )? {
263 return Ok(result);
264 }
265 }
266 }
267 if let PlanNode::SeqScan { table } = inner.as_ref() {
269 let limit = match limit_expr {
270 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271 _ => usize::MAX,
272 };
273 if let Some(result) =
274 self.project_filter_limit_fast(table, fields, limit, None)?
275 {
276 return Ok(result);
277 }
278 }
279 }
280
281 if let PlanNode::Filter {
292 input: fi,
293 predicate,
294 } = input.as_ref()
295 {
296 if let PlanNode::SeqScan { table } = fi.as_ref() {
297 if let Some(result) = self.project_filter_limit_fast(
298 table,
299 fields,
300 usize::MAX,
301 Some(predicate),
302 )? {
303 return Ok(result);
304 }
305 }
306 }
307
308 if let PlanNode::SeqScan { table } = input.as_ref() {
312 if let Some(result) =
313 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314 {
315 return Ok(result);
316 }
317 }
318
319 let result = self.execute_plan(input)?;
320 match result {
321 QueryResult::Rows { columns, rows } => {
322 let proj_columns: Vec<String> = fields
323 .iter()
324 .map(|f| {
325 f.alias.clone().unwrap_or_else(|| match &f.expr {
326 Expr::Field(name) => name.clone(),
327 Expr::QualifiedField { qualifier, field } => {
331 format!("{qualifier}.{field}")
332 }
333 _ => "?".into(),
334 })
335 })
336 .collect();
337 let proj_rows: Vec<Vec<Value>> = rows
338 .iter()
339 .map(|row| {
340 fields
341 .iter()
342 .map(|f| eval_expr(&f.expr, row, &columns))
343 .collect()
344 })
345 .collect();
346 Ok(QueryResult::Rows {
347 columns: proj_columns,
348 rows: proj_rows,
349 })
350 }
351 _ => Err("project requires row input".into()),
352 }
353 }
354
355 PlanNode::Sort { input, keys } => {
356 let result = self.execute_plan(input)?;
357 match result {
358 QueryResult::Rows { columns, mut rows } => {
359 if rows.len() > MAX_SORT_ROWS {
363 return Err(QueryError::SortLimitExceeded);
364 }
365 self.charge_rows(&rows)?;
366 let key_indices: Vec<(usize, bool)> = keys
367 .iter()
368 .map(|k| {
369 columns
370 .iter()
371 .position(|c| c == &k.field)
372 .map(|idx| (idx, k.descending))
373 .ok_or_else(|| QueryError::ColumnNotFound {
374 table: String::new(),
375 column: k.field.clone(),
376 })
377 })
378 .collect::<Result<_, QueryError>>()?;
379 rows.sort_by(|a, b| {
380 for &(col_idx, descending) in &key_indices {
381 let cmp = a[col_idx].cmp(&b[col_idx]);
382 let cmp = if descending { cmp.reverse() } else { cmp };
383 if cmp != std::cmp::Ordering::Equal {
384 return cmp;
385 }
386 }
387 std::cmp::Ordering::Equal
388 });
389 Ok(QueryResult::Rows { columns, rows })
390 }
391 _ => Err("sort requires row input".into()),
392 }
393 }
394
395 PlanNode::Limit { input, count } => {
396 let result = self.execute_plan(input)?;
397 let n = match count {
398 Expr::Literal(Literal::Int(v)) => *v as usize,
399 _ => return Err("limit must be integer literal".into()),
400 };
401 match result {
402 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
403 columns,
404 rows: rows.into_iter().take(n).collect(),
405 }),
406 _ => Err("limit requires row input".into()),
407 }
408 }
409
410 PlanNode::Offset { input, count } => {
411 let result = self.execute_plan(input)?;
412 let n = match count {
413 Expr::Literal(Literal::Int(v)) => *v as usize,
414 _ => return Err("offset must be integer literal".into()),
415 };
416 match result {
417 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
418 columns,
419 rows: rows.into_iter().skip(n).collect(),
420 }),
421 _ => Err("offset requires row input".into()),
422 }
423 }
424
425 PlanNode::Aggregate {
426 input,
427 function,
428 field,
429 } => {
430 if *function == AggFunc::Count {
432 if let PlanNode::SeqScan { table } = input.as_ref() {
433 let mut count: i64 = 0;
434 self.catalog
435 .for_each_row_raw(table, |_rid, _data| {
436 count += 1;
437 })
438 .map_err(|e| QueryError::StorageError(e.to_string()))?;
439 return Ok(QueryResult::Scalar(Value::Int(count)));
440 }
441 if let PlanNode::Filter {
444 input: inner,
445 predicate,
446 } = input.as_ref()
447 {
448 if let PlanNode::SeqScan { table } = inner.as_ref() {
449 let schema = self
450 .catalog
451 .schema(table)
452 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
453 .clone();
454 let columns: Vec<String> =
455 schema.columns.iter().map(|c| c.name.clone()).collect();
456 let fast = FastLayout::new(&schema);
457 let row_layout = RowLayout::new(&schema);
458
459 if let Some(compiled) =
462 compile_predicate(predicate, &columns, &fast, &schema)
463 {
464 let mut count: i64 = 0;
465 self.catalog
466 .for_each_row_raw(table, |_rid, data| {
467 if compiled(data) {
468 count += 1;
469 }
470 })
471 .map_err(|e| QueryError::StorageError(e.to_string()))?;
472 return Ok(QueryResult::Scalar(Value::Int(count)));
473 }
474
475 let pred_cols = predicate_column_indices(predicate, &columns);
477 let mut count: i64 = 0;
478 self.catalog
479 .for_each_row_raw(table, |_rid, data| {
480 let pred_row =
481 decode_selective(&schema, &row_layout, data, &pred_cols);
482 if eval_predicate(predicate, &pred_row, &columns) {
483 count += 1;
484 }
485 })
486 .map_err(|e| QueryError::StorageError(e.to_string()))?;
487
488 return Ok(QueryResult::Scalar(Value::Int(count)));
489 }
490 }
491 }
492
493 if matches!(
497 function,
498 AggFunc::Sum
499 | AggFunc::Avg
500 | AggFunc::Min
501 | AggFunc::Max
502 | AggFunc::CountDistinct
503 ) {
504 if let Some(col) = field.as_ref() {
505 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
507 match input.as_ref() {
508 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
509 PlanNode::Filter {
510 input: inner,
511 predicate,
512 } => {
513 if let PlanNode::SeqScan { table } = inner.as_ref() {
514 (Some(table.as_str()), Some(predicate))
515 } else {
516 (None, None)
517 }
518 }
519 _ => (None, None),
520 };
521 if let Some(table) = table_opt {
522 if let Some(result) =
523 self.agg_single_col_fast(table, col, *function, pred_opt)?
524 {
525 return Ok(result);
526 }
527 }
528 }
529 }
530
531 let result = self.execute_plan(input)?;
536 match result {
537 QueryResult::Rows { columns, rows } => {
538 match function {
539 AggFunc::Count => {
540 Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
541 }
542 AggFunc::CountDistinct => {
543 let col = field.as_ref().ok_or("count distinct requires field")?;
544 let idx = columns
545 .iter()
546 .position(|c| c == col)
547 .ok_or("col not found")?;
548 let mut seen = std::collections::HashSet::new();
549 for row in &rows {
550 let v = &row[idx];
551 if !v.is_empty() {
552 seen.insert(v.clone());
553 }
554 }
555 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
556 }
557 AggFunc::Avg => {
558 let col = field.as_ref().ok_or("avg requires field")?;
559 let idx = columns
560 .iter()
561 .position(|c| c == col)
562 .ok_or("col not found")?;
563 let sum: f64 = rows
564 .iter()
565 .filter_map(|r| match &r[idx] {
566 Value::Int(v) => Some(*v as f64),
567 Value::Float(v) => Some(*v),
568 _ => None,
569 })
570 .sum();
571 let count = rows.len() as f64;
572 Ok(QueryResult::Scalar(Value::Float(sum / count)))
573 }
574 AggFunc::Sum => {
575 let col = field.as_ref().ok_or("sum requires field")?;
576 let idx = columns
577 .iter()
578 .position(|c| c == col)
579 .ok_or("col not found")?;
580 let mut int_sum: i64 = 0;
586 let mut float_sum: f64 = 0.0;
587 let mut saw_float = false;
588 for r in &rows {
589 match &r[idx] {
590 Value::Int(v) => int_sum += *v,
591 Value::Float(v) => {
592 float_sum += *v;
593 saw_float = true;
594 }
595 _ => {}
596 }
597 }
598 let result = if saw_float {
599 Value::Float(float_sum + int_sum as f64)
600 } else {
601 Value::Int(int_sum)
602 };
603 Ok(QueryResult::Scalar(result))
604 }
605 AggFunc::Min | AggFunc::Max => {
606 let col = field.as_ref().ok_or("min/max requires field")?;
607 let idx = columns
608 .iter()
609 .position(|c| c == col)
610 .ok_or("col not found")?;
611 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
612 let result = if *function == AggFunc::Min {
613 vals.into_iter().min().cloned()
614 } else {
615 vals.into_iter().max().cloned()
616 };
617 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
618 }
619 }
620 }
621 _ => Err("aggregate requires row input".into()),
622 }
623 }
624
625 PlanNode::Insert { table, assignments } => {
626 let values = {
627 let schema = self
628 .catalog
629 .schema(table)
630 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
631 let mut values = vec![Value::Empty; schema.columns.len()];
632 for a in assignments {
633 let idx = schema.column_index(&a.field).ok_or_else(|| {
634 QueryError::ColumnNotFound {
635 table: String::new(),
636 column: a.field.clone(),
637 }
638 })?;
639 let raw = literal_to_value(&a.value)?;
640 values[idx] = coerce_value(raw, &schema.columns[idx])?;
641 }
642 for col in &schema.columns {
643 if col.required && matches!(values[col.position as usize], Value::Empty) {
644 return Err(QueryError::Execution(format!(
645 "column '{}' is required but no value was provided",
646 col.name
647 )));
648 }
649 }
650 values
651 };
652 self.catalog
653 .insert(table, &values)
654 .map_err(|e| QueryError::StorageError(e.to_string()))?;
655 self.view_registry.mark_dependents_dirty(table);
656 Ok(QueryResult::Modified(1))
657 }
658
659 PlanNode::Upsert {
660 table,
661 key_column,
662 assignments,
663 on_conflict,
664 } => {
665 let (values, key_idx) = {
666 let schema = self
667 .catalog
668 .schema(table)
669 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
670 let mut values = vec![Value::Empty; schema.columns.len()];
671 for a in assignments {
672 let idx = schema.column_index(&a.field).ok_or_else(|| {
673 QueryError::ColumnNotFound {
674 table: String::new(),
675 column: a.field.clone(),
676 }
677 })?;
678 let raw = literal_to_value(&a.value)?;
679 values[idx] = coerce_value(raw, &schema.columns[idx])?;
680 }
681 for col in &schema.columns {
682 if col.required && matches!(values[col.position as usize], Value::Empty) {
683 return Err(QueryError::Execution(format!(
684 "column '{}' is required but no value was provided",
685 col.name
686 )));
687 }
688 }
689 let key_idx = schema
690 .column_index(key_column)
691 .ok_or_else(|| format!("key column '{key_column}' not found"))?;
692 (values, key_idx)
693 };
694
695 let key_value = values[key_idx].clone();
696
697 let existing = {
699 let tbl = self
700 .catalog
701 .get_table(table)
702 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
703 if tbl.has_index(key_column) {
704 let rids = tbl.index_lookup_all(key_column, &key_value);
709 rids.into_iter().next().and_then(|rid| {
710 tbl.heap
711 .get(rid)
712 .map(|data| (rid, decode_row(&tbl.schema, &data)))
713 })
714 } else {
715 let mut found = None;
717 for (rid, row) in tbl.scan() {
718 if row[key_idx] == key_value {
719 found = Some((rid, row));
720 break;
721 }
722 }
723 found
724 }
725 };
726
727 if let Some((rid, mut existing_row)) = existing {
728 let update_assignments = if on_conflict.is_empty() {
730 assignments
731 } else {
732 on_conflict
733 };
734 let changed_cols: Vec<usize> = {
735 let schema = self
736 .catalog
737 .schema(table)
738 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
739 let mut indices = Vec::new();
740 for a in update_assignments {
741 let idx = schema.column_index(&a.field).ok_or_else(|| {
742 QueryError::ColumnNotFound {
743 table: String::new(),
744 column: a.field.clone(),
745 }
746 })?;
747 if idx != key_idx {
748 existing_row[idx] = literal_to_value(&a.value)?;
749 indices.push(idx);
750 }
751 }
752 indices
753 };
754 self.catalog
755 .update_hinted(table, rid, &existing_row, Some(&changed_cols))
756 .map_err(|e| QueryError::StorageError(e.to_string()))?;
757 self.view_registry.mark_dependents_dirty(table);
758 Ok(QueryResult::Modified(1))
759 } else {
760 self.catalog
762 .insert(table, &values)
763 .map_err(|e| QueryError::StorageError(e.to_string()))?;
764 self.view_registry.mark_dependents_dirty(table);
765 Ok(QueryResult::Modified(1))
766 }
767 }
768
769 PlanNode::Update {
770 input,
771 table,
772 assignments,
773 } => {
774 let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
780 let schema_ref = self
781 .catalog
782 .schema(table)
783 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
784 let indices: Vec<usize> = assignments
785 .iter()
786 .map(|a| {
787 schema_ref.column_index(&a.field).ok_or_else(|| {
788 QueryError::ColumnNotFound {
789 table: String::new(),
790 column: a.field.clone(),
791 }
792 })
793 })
794 .collect::<Result<_, _>>()?;
795 let vals: Result<Vec<Value>, _> = assignments
796 .iter()
797 .map(|a| literal_to_value(&a.value))
798 .collect();
799 (indices, vals.ok())
800 };
801 let resolved_assignments: Option<Vec<(usize, Value)>> =
802 literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
803
804 let changed_cols: Vec<usize> = col_indices.clone();
807
808 if let Some(ref resolved_assignments) = resolved_assignments {
815 if let PlanNode::Filter {
816 input: inner,
817 predicate,
818 } = input.as_ref()
819 {
820 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
821 if t == table {
822 let fused_result = self.try_fused_scan_update(
823 table,
824 predicate,
825 resolved_assignments,
826 &changed_cols,
827 );
828 if let Some(result) = fused_result {
829 return result;
830 }
831 }
832 }
833 }
834 }
835
836 let matching_rids = self.collect_rids_for_mutation(input, table)?;
838
839 if let Some(ref resolved_assignments) = resolved_assignments {
841 let fast_patch: Option<Vec<FastPatch>> = {
847 let tbl = self
848 .catalog
849 .get_table(table)
850 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
851 let schema = &tbl.schema;
852 let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
853 is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
854 });
855 let no_indexed = !resolved_assignments
856 .iter()
857 .any(|(idx, _)| tbl.has_indexed_col(*idx));
858
859 if all_fixed_nonnull && no_indexed {
860 let layout = RowLayout::new(schema);
861 let bitmap_size = layout.bitmap_size();
862 let patches: Vec<FastPatch> = resolved_assignments
863 .iter()
864 .map(|(idx, val)| {
865 let fixed_off = layout
866 .fixed_offset(*idx)
867 .expect("is_fixed_size already checked");
868 let field_off = 2 + bitmap_size + fixed_off;
869 let bytes: FixedBytes = match val {
870 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
871 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
872 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
873 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
874 Value::Uuid(v) => FixedBytes::Uuid(*v),
875 _ => unreachable!("all_fixed_nonnull guard lied"),
876 };
877 FastPatch {
878 field_off,
879 bitmap_byte_off: 2 + idx / 8,
880 bit_mask: 1u8 << (idx % 8),
881 bytes,
882 }
883 })
884 .collect();
885 Some(patches)
886 } else {
887 None
888 }
889 };
890
891 if let Some(patches) = fast_patch {
892 let mut count = 0u64;
893 for rid in matching_rids {
894 let ok = self
899 .catalog
900 .update_row_bytes_logged(table, rid, |row| {
901 for p in &patches {
902 row[p.bitmap_byte_off] &= !p.bit_mask;
903 let field_bytes = p.bytes.as_slice();
904 row[p.field_off..p.field_off + field_bytes.len()]
905 .copy_from_slice(field_bytes);
906 }
907 })
908 .map_err(|e| QueryError::StorageError(e.to_string()))?;
909 if ok {
910 count += 1;
911 }
912 }
913 self.view_registry.mark_dependents_dirty(table);
914 return Ok(QueryResult::Modified(count));
915 }
916
917 let var_fast: Option<(usize, Option<Vec<u8>>)> = {
919 let tbl = self
920 .catalog
921 .get_table(table)
922 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
923 let schema = &tbl.schema;
924 let is_single = resolved_assignments.len() == 1;
925 let is_var_col = is_single
926 && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
927 let no_indexed = !resolved_assignments
928 .iter()
929 .any(|(idx, _)| tbl.has_indexed_col(*idx));
930
931 if is_single && is_var_col && no_indexed {
932 let (idx, val) = &resolved_assignments[0];
933 let bytes_opt: Option<Vec<u8>> = match val {
934 Value::Str(s) => Some(s.as_bytes().to_vec()),
935 Value::Bytes(b) => Some(b.clone()),
936 Value::Empty => None,
937 _ => {
938 return Err(QueryError::TypeError(format!(
939 "cannot assign non-var value to var column '{}'",
940 schema.columns[*idx].name
941 )))
942 }
943 };
944 Some((*idx, bytes_opt))
945 } else {
946 None
947 }
948 };
949
950 if let Some((col_idx, new_bytes_opt)) = var_fast {
951 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
952 let mut count = 0u64;
953 let mut fallback_rids: Vec<RowId> = Vec::new();
954 for rid in &matching_rids {
955 let ok = self
961 .catalog
962 .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
963 .map_err(|e| QueryError::StorageError(e.to_string()))?;
964 if ok {
965 count += 1;
966 } else {
967 fallback_rids.push(*rid);
968 }
969 }
970 for rid in fallback_rids {
971 let mut row = match self.catalog.get(table, rid) {
972 Some(r) => r,
973 None => continue,
974 };
975 for (idx, val) in resolved_assignments.iter() {
976 row[*idx] = val.clone();
977 }
978 self.catalog
979 .update_hinted(table, rid, &row, Some(&changed_cols))
980 .map_err(|e| QueryError::StorageError(e.to_string()))?;
981 count += 1;
982 }
983 self.view_registry.mark_dependents_dirty(table);
984 return Ok(QueryResult::Modified(count));
985 }
986
987 let mut count = 0u64;
989 for rid in matching_rids {
990 let mut row = match self.catalog.get(table, rid) {
991 Some(r) => r,
992 None => continue,
993 };
994 for (idx, val) in resolved_assignments.iter() {
995 row[*idx] = val.clone();
996 }
997 self.catalog
998 .update_hinted(table, rid, &row, Some(&changed_cols))
999 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1000 count += 1;
1001 }
1002 self.view_registry.mark_dependents_dirty(table);
1003 return Ok(QueryResult::Modified(count));
1004 } let col_names: Vec<String> = {
1010 let schema_ref = self
1011 .catalog
1012 .schema(table)
1013 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1014 schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1015 };
1016 let mut count = 0u64;
1017 for rid in matching_rids {
1018 let mut row = match self.catalog.get(table, rid) {
1019 Some(r) => r,
1020 None => continue,
1021 };
1022 for (i, asgn) in assignments.iter().enumerate() {
1023 let val = eval_expr(&asgn.value, &row, &col_names);
1024 row[col_indices[i]] = val;
1025 }
1026 self.catalog
1027 .update_hinted(table, rid, &row, Some(&changed_cols))
1028 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1029 count += 1;
1030 }
1031 self.view_registry.mark_dependents_dirty(table);
1032 Ok(QueryResult::Modified(count))
1033 }
1034
1035 PlanNode::Delete { input, table } => {
1036 if let PlanNode::Filter {
1057 input: inner,
1058 predicate,
1059 } = input.as_ref()
1060 {
1061 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1062 if t == table {
1063 let schema = self
1064 .catalog
1065 .schema(table)
1066 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1067 let columns: Vec<String> =
1068 schema.columns.iter().map(|c| c.name.clone()).collect();
1069 let fast = FastLayout::new(schema);
1070 if let Some(compiled) =
1071 compile_predicate(predicate, &columns, &fast, schema)
1072 {
1073 let count = self
1079 .catalog
1080 .scan_delete_matching_logged(table, |data| compiled(data))
1081 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1082 self.view_registry.mark_dependents_dirty(table);
1083 return Ok(QueryResult::Modified(count));
1084 }
1085 }
1086 }
1087 } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1088 if t == table {
1089 let count = self
1093 .catalog
1094 .scan_delete_matching_logged(table, |_| true)
1095 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1096 self.view_registry.mark_dependents_dirty(table);
1097 return Ok(QueryResult::Modified(count));
1098 }
1099 }
1100
1101 let matching_rids = self.collect_rids_for_mutation(input, table)?;
1102 let count = self
1103 .catalog
1104 .delete_many(table, &matching_rids)
1105 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1106 self.view_registry.mark_dependents_dirty(table);
1107 Ok(QueryResult::Modified(count))
1108 }
1109
1110 PlanNode::AliasScan { table, alias } => {
1111 let schema = self
1121 .catalog
1122 .schema(table)
1123 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1124 .clone();
1125 let columns: Vec<String> = schema
1126 .columns
1127 .iter()
1128 .map(|c| format!("{alias}.{}", c.name))
1129 .collect();
1130 let rows: Vec<Vec<Value>> = self
1131 .catalog
1132 .scan(table)
1133 .map_err(|e| QueryError::StorageError(e.to_string()))?
1134 .map(|(_, row)| row)
1135 .collect();
1136 Ok(QueryResult::Rows { columns, rows })
1137 }
1138
1139 PlanNode::NestedLoopJoin {
1140 left,
1141 right,
1142 on,
1143 kind,
1144 } => {
1145 let left_result = self.execute_plan(left)?;
1156 let right_result = self.execute_plan(right)?;
1157 let (left_columns, left_rows) = match left_result {
1158 QueryResult::Rows { columns, rows } => (columns, rows),
1159 _ => return Err("join left side must produce rows".into()),
1160 };
1161 let (right_columns, right_rows) = match right_result {
1162 QueryResult::Rows { columns, rows } => (columns, rows),
1163 _ => return Err("join right side must produce rows".into()),
1164 };
1165
1166 self.charge_rows(&left_rows)?;
1170 self.charge_rows(&right_rows)?;
1171
1172 if !matches!(kind, JoinKind::Cross) {
1174 if let Some(pred) = on {
1175 if let Some((l_idx, r_idx)) =
1176 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1177 {
1178 let result = hash_join(
1179 left_columns,
1180 left_rows,
1181 right_columns,
1182 right_rows,
1183 l_idx,
1184 r_idx,
1185 *kind,
1186 );
1187 if let QueryResult::Rows { ref rows, .. } = result {
1188 check_join_limit(rows.len())?;
1189 }
1190 return Ok(result);
1191 }
1192 }
1193 }
1194
1195 let n_left = left_columns.len();
1197 let n_right = right_columns.len();
1198 let mut columns = Vec::with_capacity(n_left + n_right);
1199 columns.extend(left_columns);
1200 columns.extend(right_columns);
1201
1202 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1203 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1204
1205 for left_row in &left_rows {
1206 let mut matched = false;
1207 for right_row in &right_rows {
1208 combined.clear();
1209 combined.extend_from_slice(left_row);
1210 combined.extend_from_slice(right_row);
1211 let keep = match kind {
1212 JoinKind::Cross => true,
1213 JoinKind::Inner | JoinKind::LeftOuter => match on {
1214 Some(pred) => eval_predicate(pred, &combined, &columns),
1215 None => true,
1219 },
1220 JoinKind::RightOuter => {
1223 unreachable!("planner rewrites RightOuter to LeftOuter")
1224 }
1225 };
1226 if keep {
1227 rows.push(combined.clone());
1228 check_join_limit(rows.len())?;
1229 matched = true;
1230 }
1231 }
1232 if !matched && matches!(kind, JoinKind::LeftOuter) {
1233 let mut row = Vec::with_capacity(n_left + n_right);
1234 row.extend_from_slice(left_row);
1235 row.resize(n_left + n_right, Value::Empty);
1236 rows.push(row);
1237 check_join_limit(rows.len())?;
1238 }
1239 }
1240
1241 Ok(QueryResult::Rows { columns, rows })
1242 }
1243
1244 PlanNode::Distinct { input } => {
1245 let result = self.execute_plan(input)?;
1246 match result {
1247 QueryResult::Rows { columns, rows } => {
1248 let mut seen = std::collections::HashSet::new();
1249 let mut unique_rows = Vec::new();
1250 for row in rows {
1251 if seen.insert(row.clone()) {
1252 unique_rows.push(row);
1253 }
1254 }
1255 Ok(QueryResult::Rows {
1256 columns,
1257 rows: unique_rows,
1258 })
1259 }
1260 other => Ok(other),
1261 }
1262 }
1263
1264 PlanNode::GroupBy {
1265 input,
1266 keys,
1267 aggregates,
1268 having,
1269 } => {
1270 let result = self.execute_plan(input)?;
1271 match result {
1272 QueryResult::Rows { columns, rows } => {
1273 self.charge_rows(&rows)?;
1276 let key_indices: Vec<usize> = keys
1278 .iter()
1279 .map(|k| {
1280 columns
1281 .iter()
1282 .position(|c| c == k)
1283 .ok_or_else(|| format!("group-by column '{k}' not found"))
1284 })
1285 .collect::<Result<Vec<_>, _>>()?;
1286
1287 let agg_field_indices: Vec<usize> = aggregates
1291 .iter()
1292 .map(|a| {
1293 if a.field == "*" {
1294 Ok(usize::MAX)
1295 } else {
1296 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1297 format!("aggregate column '{}' not found", a.field)
1298 })
1299 }
1300 })
1301 .collect::<Result<Vec<_>, _>>()?;
1302
1303 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1305 rustc_hash::FxHashMap::default();
1306 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1307 for (ri, row) in rows.iter().enumerate() {
1308 let key: Vec<Value> =
1309 key_indices.iter().map(|&i| row[i].clone()).collect();
1310 match group_map.get(&key) {
1311 Some(&idx) => groups[idx].1.push(ri),
1312 None => {
1313 let idx = groups.len();
1314 group_map.insert(key.clone(), idx);
1315 groups.push((key, vec![ri]));
1316 }
1317 }
1318 }
1319
1320 let mut out_columns: Vec<String> = keys.clone();
1322 for agg in aggregates.iter() {
1323 out_columns.push(agg.output_name.clone());
1324 }
1325
1326 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1328 for (key_vals, row_indices) in &groups {
1329 let mut row = key_vals.clone();
1330 for (ai, agg) in aggregates.iter().enumerate() {
1331 let col_idx = agg_field_indices[ai];
1332 let val = compute_group_aggregate(
1333 agg.function,
1334 &rows,
1335 row_indices,
1336 col_idx,
1337 );
1338 row.push(val);
1339 }
1340 out_rows.push(row);
1341 }
1342
1343 if let Some(having_expr) = having {
1345 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1346 }
1347
1348 Ok(QueryResult::Rows {
1349 columns: out_columns,
1350 rows: out_rows,
1351 })
1352 }
1353 _ => Err("group by requires row input".into()),
1354 }
1355 }
1356
1357 PlanNode::CreateTable { name, fields } => {
1358 let columns: Vec<ColumnDef> = fields
1359 .iter()
1360 .enumerate()
1361 .map(
1362 |(i, (fname, tname, req))| -> Result<ColumnDef, QueryError> {
1363 Ok(ColumnDef {
1364 name: fname.clone(),
1365 type_id: type_name_to_id(tname).map_err(QueryError::TypeError)?,
1366 required: *req,
1367 position: i as u16,
1368 })
1369 },
1370 )
1371 .collect::<Result<Vec<_>, _>>()?;
1372 let schema = Schema {
1373 table_name: name.clone(),
1374 columns,
1375 };
1376 self.catalog
1377 .create_table(schema)
1378 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1379 Ok(QueryResult::Created(name.clone()))
1380 }
1381
1382 PlanNode::AlterTable { table, action } => match action {
1383 AlterAction::AddColumn {
1384 name,
1385 type_name,
1386 required,
1387 } => {
1388 let position = self
1389 .catalog
1390 .schema(table)
1391 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1392 .columns
1393 .len() as u16;
1394 let col = ColumnDef {
1395 name: name.clone(),
1396 type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1397 required: *required,
1398 position,
1399 };
1400 self.catalog
1401 .alter_table_add_column(table, col)
1402 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1403 Ok(QueryResult::Executed {
1404 message: format!("column '{name}' added to '{table}'"),
1405 })
1406 }
1407 AlterAction::DropColumn { name } => {
1408 self.catalog
1409 .alter_table_drop_column(table, name)
1410 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1411 Ok(QueryResult::Executed {
1412 message: format!("column '{name}' dropped from '{table}'"),
1413 })
1414 }
1415 AlterAction::AddIndex { column } => {
1416 self.catalog
1417 .create_index(table, column)
1418 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1419 Ok(QueryResult::Executed {
1420 message: format!("index on '{table}.{column}' created"),
1421 })
1422 }
1423 },
1424
1425 PlanNode::DropTable { name } => {
1426 self.catalog
1427 .drop_table(name)
1428 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1429 Ok(QueryResult::Executed {
1430 message: format!("table '{name}' dropped"),
1431 })
1432 }
1433
1434 PlanNode::CreateView { name, query_text } => {
1435 self.create_view(name, query_text)?;
1436 Ok(QueryResult::Executed {
1437 message: format!("materialized view '{name}' created"),
1438 })
1439 }
1440
1441 PlanNode::RefreshView { name } => {
1442 self.refresh_view(name)?;
1443 Ok(QueryResult::Executed {
1444 message: format!("materialized view '{name}' refreshed"),
1445 })
1446 }
1447
1448 PlanNode::DropView { name } => {
1449 self.drop_view(name)?;
1450 Ok(QueryResult::Executed {
1451 message: format!("materialized view '{name}' dropped"),
1452 })
1453 }
1454
1455 PlanNode::Window { input, windows } => {
1456 let result = self.execute_plan(input)?;
1457 execute_window(result, windows)
1458 }
1459
1460 PlanNode::Union { left, right, all } => {
1461 let left_result = self.execute_plan(left)?;
1462 let right_result = self.execute_plan(right)?;
1463 let (left_cols, left_rows) = match left_result {
1464 QueryResult::Rows { columns, rows } => (columns, rows),
1465 _ => return Err("UNION requires query results on left side".into()),
1466 };
1467 let (_, right_rows) = match right_result {
1468 QueryResult::Rows { columns, rows } => (columns, rows),
1469 _ => return Err("UNION requires query results on right side".into()),
1470 };
1471 let mut combined = left_rows;
1472 if *all {
1473 combined.extend(right_rows);
1475 } else {
1476 let mut seen = std::collections::HashSet::new();
1479 for row in &combined {
1480 seen.insert(row.clone());
1481 }
1482 for row in right_rows {
1483 if seen.insert(row.clone()) {
1484 combined.push(row);
1485 }
1486 }
1487 }
1488 Ok(QueryResult::Rows {
1489 columns: left_cols,
1490 rows: combined,
1491 })
1492 }
1493
1494 PlanNode::Explain { input } => {
1495 let text = format_plan_tree(input, 0);
1496 Ok(QueryResult::Rows {
1497 columns: vec!["plan".to_string()],
1498 rows: text
1499 .lines()
1500 .map(|line| vec![Value::Str(line.to_string())])
1501 .collect(),
1502 })
1503 }
1504
1505 PlanNode::Begin => {
1506 if self.in_transaction {
1507 return Err(QueryError::Execution(
1508 "already in a transaction (nested transactions not supported)".into(),
1509 ));
1510 }
1511 self.in_transaction = true;
1512 Ok(QueryResult::Executed {
1513 message: "transaction started".to_string(),
1514 })
1515 }
1516
1517 PlanNode::Commit => {
1518 if !self.in_transaction {
1519 return Err(QueryError::Execution(
1520 "no active transaction to commit".into(),
1521 ));
1522 }
1523 self.in_transaction = false;
1524 self.catalog
1525 .sync_wal()
1526 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1527 Ok(QueryResult::Executed {
1528 message: "transaction committed".to_string(),
1529 })
1530 }
1531
1532 PlanNode::Rollback => {
1533 if !self.in_transaction {
1534 return Err(QueryError::Execution(
1535 "no active transaction to roll back".into(),
1536 ));
1537 }
1538 self.in_transaction = false;
1539 self.catalog
1540 .rollback_to_last_sync()
1541 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1542 if let Ok(mut cache) = self.plan_cache.lock() {
1543 cache.clear();
1544 }
1545 self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1546 .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1547 Ok(QueryResult::Executed {
1548 message: "transaction rolled back".to_string(),
1549 })
1550 }
1551
1552 PlanNode::IndexScan { table, column, key } => {
1553 let key_value = literal_to_value(key)?;
1554 let tbl = self
1555 .catalog
1556 .get_table(table)
1557 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1558 let columns: Vec<String> =
1559 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1560
1561 if tbl.has_index(column) {
1565 let rids = tbl.index_lookup_all(column, &key_value);
1566 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1567 for rid in rids {
1568 if let Some(data) = tbl.heap.get(rid) {
1569 rows.push(decode_row(&tbl.schema, &data));
1570 }
1571 }
1572 return Ok(QueryResult::Rows { columns, rows });
1573 }
1574
1575 let schema = &tbl.schema;
1583 let fast = FastLayout::new(schema);
1584 let synth_pred = Expr::BinaryOp(
1585 Box::new(Expr::Field(column.clone())),
1586 BinOp::Eq,
1587 Box::new(key.clone()),
1588 );
1589 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1590 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1592 self.catalog
1593 .for_each_row_raw(table, |_rid, data| {
1594 if compiled(data) {
1595 rows.push(decode_row(schema, data));
1596 }
1597 })
1598 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1599 return Ok(QueryResult::Rows { columns, rows });
1600 }
1601
1602 let col_idx =
1604 schema
1605 .column_index(column)
1606 .ok_or_else(|| QueryError::ColumnNotFound {
1607 table: String::new(),
1608 column: column.clone(),
1609 })?;
1610 let rows: Vec<Vec<Value>> = tbl
1611 .scan()
1612 .filter_map(|(_, row)| {
1613 if row[col_idx] == key_value {
1614 Some(row)
1615 } else {
1616 None
1617 }
1618 })
1619 .collect();
1620 Ok(QueryResult::Rows { columns, rows })
1621 }
1622
1623 PlanNode::RangeScan {
1624 table,
1625 column,
1626 start,
1627 end,
1628 } => {
1629 let tbl = self
1630 .catalog
1631 .get_table(table)
1632 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1633 let columns: Vec<String> =
1634 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1635 let schema = &tbl.schema;
1636
1637 let start_val = match start {
1638 Some((expr, _)) => Some(literal_to_value(expr)?),
1639 None => None,
1640 };
1641 let end_val = match end {
1642 Some((expr, _)) => Some(literal_to_value(expr)?),
1643 None => None,
1644 };
1645 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1646 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1647
1648 if tbl.is_index_unique(column) == Some(true) {
1652 if let Some(btree) = tbl.index(column) {
1653 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1654 (Some(s), Some(e)) => btree.range(s, e).collect(),
1655 (Some(s), None) => btree.range_from(s),
1656 (None, Some(e)) => btree.range_to(e),
1657 (None, None) => {
1658 let rows: Vec<Vec<Value>> =
1659 tbl.scan().map(|(_, row)| row).collect();
1660 return Ok(QueryResult::Rows { columns, rows });
1661 }
1662 };
1663 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1664 for (key, rid) in hits {
1665 if !start_inclusive {
1666 if let Some(ref s) = start_val {
1667 if &key == s {
1668 continue;
1669 }
1670 }
1671 }
1672 if !end_inclusive {
1673 if let Some(ref e) = end_val {
1674 if &key == e {
1675 continue;
1676 }
1677 }
1678 }
1679 if let Some(data) = tbl.heap.get(rid) {
1680 rows.push(decode_row(schema, &data));
1681 }
1682 }
1683 return Ok(QueryResult::Rows { columns, rows });
1684 }
1685 }
1686
1687 let fast = FastLayout::new(schema);
1689 let synth = synthesize_range_predicate(column, start, end);
1690 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1691 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1692 self.catalog
1693 .for_each_row_raw(table, |_rid, data| {
1694 if compiled(data) {
1695 rows.push(decode_row(schema, data));
1696 }
1697 })
1698 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1699 return Ok(QueryResult::Rows { columns, rows });
1700 }
1701
1702 let col_idx =
1703 schema
1704 .column_index(column)
1705 .ok_or_else(|| QueryError::ColumnNotFound {
1706 table: String::new(),
1707 column: column.clone(),
1708 })?;
1709 let rows: Vec<Vec<Value>> = tbl
1710 .scan()
1711 .filter(|(_, row)| {
1712 range_matches(
1713 &row[col_idx],
1714 &start_val,
1715 start_inclusive,
1716 &end_val,
1717 end_inclusive,
1718 )
1719 })
1720 .map(|(_, row)| row)
1721 .collect();
1722 Ok(QueryResult::Rows { columns, rows })
1723 }
1724 }
1725 }
1726
1727 fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1732 if self.view_registry.is_view(name) {
1733 return Err(QueryError::ViewError(format!(
1734 "materialized view '{name}' already exists"
1735 )));
1736 }
1737 let result = self.execute_powql(query_text)?;
1739 let (columns, rows) = match result {
1740 QueryResult::Rows { columns, rows } => (columns, rows),
1741 _ => return Err("view source query must be a SELECT".into()),
1742 };
1743 let schema = self.derive_view_schema(name, &columns, &rows);
1745 self.catalog
1747 .create_table(schema)
1748 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1749 for row in &rows {
1750 self.catalog
1751 .insert(name, row)
1752 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1753 }
1754 let depends_on = self.extract_view_deps(query_text);
1756 self.view_registry
1757 .register(ViewDef {
1758 name: name.to_string(),
1759 query: query_text.to_string(),
1760 depends_on,
1761 dirty: false,
1762 })
1763 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1764 Ok(())
1765 }
1766
1767 fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1770 let def = self
1771 .view_registry
1772 .get(name)
1773 .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1774 let query_text = def.query.clone();
1775 let result = self.execute_powql(&query_text)?;
1777 let (_columns, rows) = match result {
1778 QueryResult::Rows { columns, rows } => (columns, rows),
1779 _ => return Err("view source query must be a SELECT".into()),
1780 };
1781 self.catalog
1785 .scan_delete_matching_logged(name, |_| true)
1786 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1787 for row in &rows {
1788 self.catalog
1789 .insert(name, row)
1790 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1791 }
1792 self.view_registry.mark_clean(name);
1793 Ok(())
1794 }
1795
1796 fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1798 if !self.view_registry.is_view(name) {
1799 return Err(QueryError::ViewError(format!(
1800 "materialized view '{name}' not found"
1801 )));
1802 }
1803 self.view_registry
1804 .unregister(name)
1805 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1806 self.catalog
1807 .drop_table(name)
1808 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1809 Ok(())
1810 }
1811
1812 fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1815 use powdb_storage::types::{ColumnDef, TypeId};
1816 let cols: Vec<ColumnDef> = columns
1817 .iter()
1818 .enumerate()
1819 .map(|(i, col_name)| {
1820 let type_id = rows
1821 .first()
1822 .and_then(|row| row.get(i))
1823 .map(|v| v.type_id())
1824 .unwrap_or(TypeId::Str);
1825 ColumnDef {
1826 name: col_name.clone(),
1827 type_id,
1828 required: false,
1829 position: i as u16,
1830 }
1831 })
1832 .collect();
1833 Schema {
1834 table_name: name.to_string(),
1835 columns: cols,
1836 }
1837 }
1838
1839 fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1842 use crate::parser::parse;
1843 match parse(query_text) {
1844 Ok(Statement::Query(q)) => {
1845 let mut deps = vec![q.source.clone()];
1846 for j in &q.joins {
1847 deps.push(j.source.clone());
1848 }
1849 deps
1850 }
1851 _ => Vec::new(),
1852 }
1853 }
1854
1855 pub(super) fn agg_single_col_fast(
1865 &self,
1866 table: &str,
1867 col: &str,
1868 function: AggFunc,
1869 predicate: Option<&Expr>,
1870 ) -> Result<Option<QueryResult>, QueryError> {
1871 let schema = self
1872 .catalog
1873 .schema(table)
1874 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1875 .clone();
1876 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1877 let col_idx = match schema.column_index(col) {
1878 Some(i) => i,
1879 None => return Ok(None),
1880 };
1881 let col_type = schema.columns[col_idx].type_id;
1888 if col_type != TypeId::Int && col_type != TypeId::Float {
1889 return Ok(None);
1890 }
1891
1892 let fast = FastLayout::new(&schema);
1893 let byte_offset = match fast.fixed_offsets[col_idx] {
1898 Some(o) => o,
1899 None => return Ok(None),
1900 };
1901 let bitmap_byte = col_idx / 8;
1902 let bitmap_bit = (col_idx % 8) as u32;
1903 let data_offset = 2 + fast.bitmap_size + byte_offset;
1904
1905 let compiled_pred: Option<CompiledPredicate> = match predicate {
1907 Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
1908 Some(c) => Some(c),
1909 None => return Ok(None), },
1911 None => None,
1912 };
1913
1914 let result = match col_type {
1941 TypeId::Int => match function {
1942 AggFunc::Sum | AggFunc::Avg => {
1943 let mut sum_i128: i128 = 0;
1944 let mut count: i64 = 0;
1945 agg_int_loop!(
1946 self,
1947 table,
1948 compiled_pred,
1949 bitmap_byte,
1950 bitmap_bit,
1951 data_offset,
1952 |v: i64| {
1953 count += 1;
1954 sum_i128 += v as i128;
1955 }
1956 );
1957 if matches!(function, AggFunc::Sum) {
1958 let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
1959 QueryResult::Scalar(Value::Int(clamped))
1960 } else if count == 0 {
1961 QueryResult::Scalar(Value::Empty)
1962 } else {
1963 let avg = (sum_i128 as f64) / (count as f64);
1964 QueryResult::Scalar(Value::Float(avg))
1965 }
1966 }
1967 AggFunc::Min => {
1968 let mut min_v: Option<i64> = None;
1969 agg_int_loop!(
1970 self,
1971 table,
1972 compiled_pred,
1973 bitmap_byte,
1974 bitmap_bit,
1975 data_offset,
1976 |v: i64| {
1977 min_v = Some(match min_v {
1978 Some(m) => m.min(v),
1979 None => v,
1980 });
1981 }
1982 );
1983 QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
1984 }
1985 AggFunc::Max => {
1986 let mut max_v: Option<i64> = None;
1987 agg_int_loop!(
1988 self,
1989 table,
1990 compiled_pred,
1991 bitmap_byte,
1992 bitmap_bit,
1993 data_offset,
1994 |v: i64| {
1995 max_v = Some(match max_v {
1996 Some(m) => m.max(v),
1997 None => v,
1998 });
1999 }
2000 );
2001 QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2002 }
2003 AggFunc::Count => {
2004 let mut count: i64 = 0;
2005 agg_int_loop!(
2006 self,
2007 table,
2008 compiled_pred,
2009 bitmap_byte,
2010 bitmap_bit,
2011 data_offset,
2012 |_v: i64| {
2013 count += 1;
2014 }
2015 );
2016 QueryResult::Scalar(Value::Int(count))
2017 }
2018 AggFunc::CountDistinct => {
2019 let mut seen = rustc_hash::FxHashSet::default();
2020 agg_int_loop!(
2021 self,
2022 table,
2023 compiled_pred,
2024 bitmap_byte,
2025 bitmap_bit,
2026 data_offset,
2027 |v: i64| {
2028 seen.insert(v);
2029 }
2030 );
2031 QueryResult::Scalar(Value::Int(seen.len() as i64))
2032 }
2033 },
2034 TypeId::Float => match function {
2035 AggFunc::Sum => {
2036 let mut sum: f64 = 0.0;
2041 agg_float_loop!(
2042 self,
2043 table,
2044 compiled_pred,
2045 bitmap_byte,
2046 bitmap_bit,
2047 data_offset,
2048 |v: f64| {
2049 sum += v;
2050 }
2051 );
2052 QueryResult::Scalar(Value::Float(sum))
2053 }
2054 AggFunc::Avg => {
2055 let mut sum: f64 = 0.0;
2056 let mut count: i64 = 0;
2057 agg_float_loop!(
2058 self,
2059 table,
2060 compiled_pred,
2061 bitmap_byte,
2062 bitmap_bit,
2063 data_offset,
2064 |v: f64| {
2065 sum += v;
2066 count += 1;
2067 }
2068 );
2069 if count == 0 {
2070 QueryResult::Scalar(Value::Empty)
2071 } else {
2072 QueryResult::Scalar(Value::Float(sum / count as f64))
2073 }
2074 }
2075 AggFunc::Min => {
2076 let mut min_v: Option<f64> = None;
2080 agg_float_loop!(
2081 self,
2082 table,
2083 compiled_pred,
2084 bitmap_byte,
2085 bitmap_bit,
2086 data_offset,
2087 |v: f64| {
2088 min_v = Some(match min_v {
2089 Some(m) => {
2090 if v.total_cmp(&m).is_lt() {
2091 v
2092 } else {
2093 m
2094 }
2095 }
2096 None => v,
2097 });
2098 }
2099 );
2100 QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2101 }
2102 AggFunc::Max => {
2103 let mut max_v: Option<f64> = None;
2104 agg_float_loop!(
2105 self,
2106 table,
2107 compiled_pred,
2108 bitmap_byte,
2109 bitmap_bit,
2110 data_offset,
2111 |v: f64| {
2112 max_v = Some(match max_v {
2113 Some(m) => {
2114 if v.total_cmp(&m).is_gt() {
2115 v
2116 } else {
2117 m
2118 }
2119 }
2120 None => v,
2121 });
2122 }
2123 );
2124 QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2125 }
2126 AggFunc::Count => {
2127 let mut count: i64 = 0;
2128 agg_float_loop!(
2129 self,
2130 table,
2131 compiled_pred,
2132 bitmap_byte,
2133 bitmap_bit,
2134 data_offset,
2135 |_v: f64| {
2136 count += 1;
2137 }
2138 );
2139 QueryResult::Scalar(Value::Int(count))
2140 }
2141 AggFunc::CountDistinct => {
2142 let mut seen = rustc_hash::FxHashSet::default();
2148 agg_float_loop!(
2149 self,
2150 table,
2151 compiled_pred,
2152 bitmap_byte,
2153 bitmap_bit,
2154 data_offset,
2155 |v: f64| {
2156 seen.insert(v.to_bits());
2157 }
2158 );
2159 QueryResult::Scalar(Value::Int(seen.len() as i64))
2160 }
2161 },
2162 _ => unreachable!("type guard above restricts to Int/Float"),
2163 };
2164 Ok(Some(result))
2165 }
2166
2167 pub(super) fn project_filter_limit_fast(
2170 &self,
2171 table: &str,
2172 fields: &[ProjectField],
2173 limit: usize,
2174 predicate: Option<&Expr>,
2175 ) -> Result<Option<QueryResult>, QueryError> {
2176 let schema = self
2177 .catalog
2178 .schema(table)
2179 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2180 .clone();
2181 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2182
2183 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2186 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2187 for f in fields {
2188 let name = match &f.expr {
2189 Expr::Field(n) => n.clone(),
2190 _ => return Ok(None),
2191 };
2192 let idx = match all_columns.iter().position(|c| c == &name) {
2193 Some(i) => i,
2194 None => return Ok(None),
2195 };
2196 proj_indices.push(idx);
2197 proj_columns.push(f.alias.clone().unwrap_or(name));
2198 }
2199
2200 let fast = FastLayout::new(&schema);
2201 let row_layout = RowLayout::new(&schema);
2202
2203 let compiled_pred: Option<CompiledPredicate> = match predicate {
2204 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2205 Some(c) => Some(c),
2206 None => return Ok(None),
2207 },
2208 None => None,
2209 };
2210
2211 let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2212 self.catalog
2217 .try_for_each_row_raw(table, |_rid, data| {
2218 use std::ops::ControlFlow;
2219 if let Some(ref pred) = compiled_pred {
2220 if !pred(data) {
2221 return ControlFlow::Continue(());
2222 }
2223 }
2224 let row: Vec<Value> = proj_indices
2225 .iter()
2226 .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2227 .collect();
2228 out.push(row);
2229 if out.len() >= limit {
2230 ControlFlow::Break(())
2231 } else {
2232 ControlFlow::Continue(())
2233 }
2234 })
2235 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2236
2237 Ok(Some(QueryResult::Rows {
2238 columns: proj_columns,
2239 rows: out,
2240 }))
2241 }
2242
2243 pub(super) fn project_filter_sort_limit_fast(
2248 &self,
2249 table: &str,
2250 fields: &[ProjectField],
2251 sort_field: &str,
2252 descending: bool,
2253 limit: usize,
2254 predicate: Option<&Expr>,
2255 ) -> Result<Option<QueryResult>, QueryError> {
2256 if limit == 0 {
2257 return Ok(None);
2260 }
2261 let schema = self
2262 .catalog
2263 .schema(table)
2264 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2265 .clone();
2266 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2267
2268 let sort_idx = match schema.column_index(sort_field) {
2275 Some(i) => i,
2276 None => return Ok(None),
2277 };
2278 let sort_col_type = schema.columns[sort_idx].type_id;
2279 if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2280 return Ok(None);
2281 }
2282
2283 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2285 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2286 for f in fields {
2287 let name = match &f.expr {
2288 Expr::Field(n) => n.clone(),
2289 _ => return Ok(None),
2290 };
2291 let idx = match all_columns.iter().position(|c| c == &name) {
2292 Some(i) => i,
2293 None => return Ok(None),
2294 };
2295 proj_indices.push(idx);
2296 proj_columns.push(f.alias.clone().unwrap_or(name));
2297 }
2298
2299 let fast = FastLayout::new(&schema);
2300 let row_layout = RowLayout::new(&schema);
2301 let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2303 Some(o) => o,
2304 None => return Ok(None),
2305 };
2306 let sort_bitmap_byte = sort_idx / 8;
2307 let sort_bitmap_bit = (sort_idx % 8) as u32;
2308 let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2309
2310 let compiled_pred: Option<CompiledPredicate> = match predicate {
2311 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2312 Some(c) => Some(c),
2313 None => return Ok(None),
2314 },
2315 None => None,
2316 };
2317
2318 let drained: Vec<Vec<u8>> = match sort_col_type {
2327 TypeId::Int => {
2328 let mut seq: u64 = 0;
2329 let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2330 BinaryHeap::with_capacity(limit);
2331 let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2332 BinaryHeap::with_capacity(limit);
2333
2334 self.catalog
2335 .for_each_row_raw(table, |_rid, data| {
2336 if let Some(ref pred) = compiled_pred {
2337 if !pred(data) {
2338 return;
2339 }
2340 }
2341 if data.len() < sort_data_offset + 8 {
2343 return;
2344 }
2345 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2346 if is_null {
2347 return;
2348 }
2349 let key = i64::from_le_bytes(
2350 data[sort_data_offset..sort_data_offset + 8]
2351 .try_into()
2352 .unwrap_or_else(|_| unreachable!()),
2353 );
2354 let id = seq;
2355 seq += 1;
2356
2357 if descending {
2358 if heap_desc.len() < limit {
2359 heap_desc.push(Reverse((key, id, data.to_vec())));
2360 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2361 if key > *top_key {
2362 heap_desc.pop();
2363 heap_desc.push(Reverse((key, id, data.to_vec())));
2364 }
2365 }
2366 } else if heap_asc.len() < limit {
2367 heap_asc.push((key, id, data.to_vec()));
2368 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2369 if key < *top_key {
2370 heap_asc.pop();
2371 heap_asc.push((key, id, data.to_vec()));
2372 }
2373 }
2374 })
2375 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2376
2377 let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2378 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2379 } else {
2380 heap_asc.into_iter().collect()
2381 };
2382 if descending {
2383 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2384 } else {
2385 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2386 }
2387 drained.into_iter().map(|(_, _, d)| d).collect()
2388 }
2389 TypeId::Float => {
2390 let mut seq: u64 = 0;
2399 let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2400 BinaryHeap::with_capacity(limit);
2401 let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2402 BinaryHeap::with_capacity(limit);
2403
2404 self.catalog
2405 .for_each_row_raw(table, |_rid, data| {
2406 if let Some(ref pred) = compiled_pred {
2407 if !pred(data) {
2408 return;
2409 }
2410 }
2411 if data.len() < sort_data_offset + 8 {
2412 return;
2413 }
2414 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2415 if is_null {
2416 return;
2417 }
2418 let bits = u64::from_le_bytes(
2419 data[sort_data_offset..sort_data_offset + 8]
2420 .try_into()
2421 .unwrap_or_else(|_| unreachable!()),
2422 );
2423 let key = f64_bits_to_sortable_u64(bits);
2424 let id = seq;
2425 seq += 1;
2426
2427 if descending {
2428 if heap_desc.len() < limit {
2429 heap_desc.push(Reverse((key, id, data.to_vec())));
2430 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2431 if key > *top_key {
2432 heap_desc.pop();
2433 heap_desc.push(Reverse((key, id, data.to_vec())));
2434 }
2435 }
2436 } else if heap_asc.len() < limit {
2437 heap_asc.push((key, id, data.to_vec()));
2438 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2439 if key < *top_key {
2440 heap_asc.pop();
2441 heap_asc.push((key, id, data.to_vec()));
2442 }
2443 }
2444 })
2445 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2446
2447 let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2448 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2449 } else {
2450 heap_asc.into_iter().collect()
2451 };
2452 if descending {
2453 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2454 } else {
2455 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2456 }
2457 drained.into_iter().map(|(_, _, d)| d).collect()
2458 }
2459 _ => unreachable!("type guard above restricts to Int/Float"),
2460 };
2461
2462 let rows: Vec<Vec<Value>> = drained
2463 .into_iter()
2464 .map(|data| {
2465 proj_indices
2466 .iter()
2467 .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2468 .collect()
2469 })
2470 .collect();
2471
2472 Ok(Some(QueryResult::Rows {
2473 columns: proj_columns,
2474 rows,
2475 }))
2476 }
2477
2478 fn try_fused_scan_update(
2495 &mut self,
2496 table: &str,
2497 predicate: &Expr,
2498 resolved: &[(usize, Value)],
2499 changed_cols: &[usize],
2500 ) -> Option<Result<QueryResult, QueryError>> {
2501 let compiled = {
2504 let schema = self.catalog.schema(table)?;
2505 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2506 let fast = FastLayout::new(schema);
2507 compile_predicate(predicate, &columns, &fast, schema)?
2508 };
2509
2510 let fixed_patches: Option<Vec<FastPatch>> = {
2512 let tbl = self.catalog.get_table(table)?;
2513 let schema = &tbl.schema;
2514 let all_fixed_nonnull = resolved
2515 .iter()
2516 .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2517 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2518 if all_fixed_nonnull && no_indexed {
2519 let layout = RowLayout::new(schema);
2520 let bitmap_size = layout.bitmap_size();
2521 Some(
2522 resolved
2523 .iter()
2524 .map(|(idx, val)| {
2525 let fixed_off = layout
2526 .fixed_offset(*idx)
2527 .expect("is_fixed_size already checked");
2528 let field_off = 2 + bitmap_size + fixed_off;
2529 let bytes: FixedBytes = match val {
2530 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2531 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2532 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2533 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2534 Value::Uuid(v) => FixedBytes::Uuid(*v),
2535 _ => unreachable!("all_fixed_nonnull guard"),
2536 };
2537 FastPatch {
2538 field_off,
2539 bitmap_byte_off: 2 + idx / 8,
2540 bit_mask: 1u8 << (idx % 8),
2541 bytes,
2542 }
2543 })
2544 .collect(),
2545 )
2546 } else {
2547 None
2548 }
2549 };
2550 if let Some(patches) = fixed_patches {
2551 let result = self
2552 .catalog
2553 .scan_patch_matching_logged(table, compiled, |row| {
2554 for p in &patches {
2555 row[p.bitmap_byte_off] &= !p.bit_mask;
2556 let field_bytes = p.bytes.as_slice();
2557 row[p.field_off..p.field_off + field_bytes.len()]
2558 .copy_from_slice(field_bytes);
2559 }
2560 Some(row.len() as u16)
2561 })
2562 .map_err(|e| e.to_string());
2563 match result {
2564 Ok((count, _)) => {
2565 self.view_registry.mark_dependents_dirty(table);
2566 return Some(Ok(QueryResult::Modified(count)));
2567 }
2568 Err(e) => return Some(Err(QueryError::Execution(e))),
2569 }
2570 }
2571
2572 let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2574 let tbl = self.catalog.get_table(table)?;
2575 let schema = &tbl.schema;
2576 let is_single = resolved.len() == 1;
2577 let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2578 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2579 if is_single && is_var && no_indexed {
2580 let (idx, val) = &resolved[0];
2581 let bytes_opt = match val {
2582 Value::Str(s) => Some(s.as_bytes().to_vec()),
2583 Value::Bytes(b) => Some(b.clone()),
2584 Value::Empty => None,
2585 _ => return None, };
2587 Some((*idx, bytes_opt))
2588 } else {
2589 None
2590 }
2591 };
2592 if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2593 let layout = {
2595 let schema = self.catalog.schema(table)?;
2596 RowLayout::new(schema)
2597 };
2598 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2599 let result = self
2600 .catalog
2601 .scan_patch_matching_logged(table, compiled, |row| {
2602 patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2603 })
2604 .map_err(|e| e.to_string());
2605 match result {
2606 Ok((mut count, fallback_rids)) => {
2607 for rid in fallback_rids {
2609 let mut row = match self.catalog.get(table, rid) {
2610 Some(r) => r,
2611 None => continue,
2612 };
2613 for (idx, val) in resolved.iter() {
2614 row[*idx] = val.clone();
2615 }
2616 self.catalog
2617 .update_hinted(table, rid, &row, Some(changed_cols))
2618 .map_err(|e| e.to_string())
2619 .ok();
2620 count += 1;
2621 }
2622 self.view_registry.mark_dependents_dirty(table);
2623 return Some(Ok(QueryResult::Modified(count)));
2624 }
2625 Err(e) => return Some(Err(QueryError::Execution(e))),
2626 }
2627 }
2628
2629 None }
2631
2632 fn collect_rids_for_mutation(
2638 &mut self,
2639 input: &PlanNode,
2640 table: &str,
2641 ) -> Result<Vec<RowId>, QueryError> {
2642 match input {
2643 PlanNode::SeqScan { table: t } if t == table => {
2644 let rids: Vec<RowId> = self
2646 .catalog
2647 .scan(table)
2648 .map_err(|e| QueryError::StorageError(e.to_string()))?
2649 .map(|(rid, _)| rid)
2650 .collect();
2651 Ok(rids)
2652 }
2653 PlanNode::IndexScan {
2654 table: t,
2655 column,
2656 key,
2657 } if t == table => {
2658 let key_value = literal_to_value(key)?;
2659
2660 {
2669 let tbl = self
2670 .catalog
2671 .get_table(table)
2672 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2673 if tbl.has_index(column) {
2674 let rids = tbl.index_lookup_all(column, &key_value);
2675 return Ok(rids);
2676 }
2677 }
2678
2679 let schema = self
2684 .catalog
2685 .schema(table)
2686 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2687 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2688 let fast = FastLayout::new(schema);
2689 let synth = Expr::BinaryOp(
2690 Box::new(Expr::Field(column.clone())),
2691 BinOp::Eq,
2692 Box::new(key.clone()),
2693 );
2694 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2695 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2697 self.catalog
2698 .for_each_row_raw(table, |rid, data| {
2699 if compiled(data) {
2700 rids.push(rid);
2701 }
2702 })
2703 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2704 return Ok(rids);
2705 }
2706
2707 let col_idx =
2709 schema
2710 .column_index(column)
2711 .ok_or_else(|| QueryError::ColumnNotFound {
2712 table: String::new(),
2713 column: column.clone(),
2714 })?;
2715 let rids: Vec<RowId> = self
2716 .catalog
2717 .scan(table)
2718 .map_err(|e| QueryError::StorageError(e.to_string()))?
2719 .filter_map(|(rid, row)| {
2720 if row[col_idx] == key_value {
2721 Some(rid)
2722 } else {
2723 None
2724 }
2725 })
2726 .collect();
2727 Ok(rids)
2728 }
2729 PlanNode::Filter {
2730 input: inner,
2731 predicate,
2732 } => {
2733 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2734 if t != table {
2735 return self.generic_rid_match(input, table);
2736 }
2737 let schema = self
2738 .catalog
2739 .schema(table)
2740 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2741 let columns: Vec<String> =
2742 schema.columns.iter().map(|c| c.name.clone()).collect();
2743 let fast = FastLayout::new(schema);
2744 let row_layout = RowLayout::new(schema);
2745
2746 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2748 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2750 self.catalog
2751 .for_each_row_raw(table, |rid, data| {
2752 if compiled(data) {
2753 rids.push(rid);
2754 }
2755 })
2756 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2757 return Ok(rids);
2758 }
2759
2760 let pred_cols = predicate_column_indices(predicate, &columns);
2762 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2763 self.catalog
2764 .for_each_row_raw(table, |rid, data| {
2765 let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2766 if eval_predicate(predicate, &pred_row, &columns) {
2767 rids.push(rid);
2768 }
2769 })
2770 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2771 return Ok(rids);
2772 }
2773 self.generic_rid_match(input, table)
2774 }
2775 _ => self.generic_rid_match(input, table),
2776 }
2777 }
2778
2779 fn generic_rid_match(
2783 &mut self,
2784 input: &PlanNode,
2785 table: &str,
2786 ) -> Result<Vec<RowId>, QueryError> {
2787 let result = self.execute_plan(input)?;
2788 let rows = match result {
2789 QueryResult::Rows { rows, .. } => rows,
2790 _ => return Err("mutation source must be rows".into()),
2791 };
2792 let matching: Vec<RowId> = self
2793 .catalog
2794 .scan(table)
2795 .map_err(|e| QueryError::StorageError(e.to_string()))?
2796 .filter(|(_, row)| rows.iter().any(|r| r == row))
2797 .map(|(rid, _)| rid)
2798 .collect();
2799 Ok(matching)
2800 }
2801}
2802
2803pub(super) fn execute_window(
2804 result: QueryResult,
2805 windows: &[WindowDef],
2806) -> Result<QueryResult, QueryError> {
2807 let (mut columns, mut rows) = match result {
2808 QueryResult::Rows { columns, rows } => (columns, rows),
2809 _ => return Err("window function requires row input".into()),
2810 };
2811
2812 for wdef in windows {
2813 let part_indices: Vec<usize> = wdef
2815 .partition_by
2816 .iter()
2817 .map(|name| {
2818 columns
2819 .iter()
2820 .position(|c| c == name)
2821 .ok_or_else(|| format!("window partition column '{name}' not found"))
2822 })
2823 .collect::<Result<Vec<_>, _>>()?;
2824
2825 let ord_indices: Vec<(usize, bool)> = wdef
2826 .order_by
2827 .iter()
2828 .map(|sk| {
2829 columns
2830 .iter()
2831 .position(|c| c == &sk.field)
2832 .map(|i| (i, sk.descending))
2833 .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2834 })
2835 .collect::<Result<Vec<_>, _>>()?;
2836
2837 let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2839 match arg {
2840 Expr::Field(name) => {
2841 if name == "*" {
2842 None } else {
2844 Some(
2845 columns
2846 .iter()
2847 .position(|c| c == name)
2848 .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2849 )
2850 }
2851 }
2852 _ => None,
2853 }
2854 } else {
2855 None
2856 };
2857
2858 let n = rows.len();
2862 let mut indices: Vec<usize> = (0..n).collect();
2863 indices.sort_by(|&a, &b| {
2864 for &pi in &part_indices {
2866 let cmp = rows[a][pi].cmp(&rows[b][pi]);
2867 if cmp != std::cmp::Ordering::Equal {
2868 return cmp;
2869 }
2870 }
2871 for &(oi, desc) in &ord_indices {
2873 let cmp = rows[a][oi].cmp(&rows[b][oi]);
2874 if cmp != std::cmp::Ordering::Equal {
2875 return if desc { cmp.reverse() } else { cmp };
2876 }
2877 }
2878 std::cmp::Ordering::Equal
2879 });
2880
2881 let mut win_values: Vec<Value> = vec![Value::Empty; n];
2883 let mut partition_start = 0usize;
2884 let mut running_count: i64 = 0;
2886 let mut running_int_sum: i64 = 0;
2887 let mut running_float_sum: f64 = 0.0;
2888 let mut running_saw_float = false;
2889 let mut running_min: Option<Value> = None;
2890 let mut running_max: Option<Value> = None;
2891 let mut rank_counter: i64 = 0;
2892 let mut dense_rank_counter: i64 = 0;
2893 let mut prev_order_key: Option<Vec<Value>> = None;
2894 let mut same_rank_count: i64 = 0;
2895
2896 for sorted_pos in 0..n {
2897 let row_idx = indices[sorted_pos];
2898
2899 let new_partition = if sorted_pos == 0 {
2901 true
2902 } else {
2903 let prev_row_idx = indices[sorted_pos - 1];
2904 part_indices
2905 .iter()
2906 .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
2907 };
2908
2909 if new_partition {
2910 partition_start = sorted_pos;
2911 running_count = 0;
2912 running_int_sum = 0;
2913 running_float_sum = 0.0;
2914 running_saw_float = false;
2915 running_min = None;
2916 running_max = None;
2917 rank_counter = 0;
2918 dense_rank_counter = 0;
2919 prev_order_key = None;
2920 same_rank_count = 0;
2921 }
2922
2923 let current_order_key: Vec<Value> = ord_indices
2925 .iter()
2926 .map(|&(oi, _)| rows[row_idx][oi].clone())
2927 .collect();
2928 let same_as_prev = prev_order_key.as_ref() == Some(¤t_order_key);
2929
2930 let value = match wdef.function {
2931 WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
2932 WindowFunc::Rank => {
2933 if same_as_prev {
2934 same_rank_count += 1;
2935 } else {
2936 rank_counter += same_rank_count + 1;
2937 same_rank_count = 0;
2938 if rank_counter == 0 {
2939 rank_counter = 1;
2940 }
2941 }
2942 Value::Int(rank_counter)
2943 }
2944 WindowFunc::DenseRank => {
2945 if !same_as_prev {
2946 dense_rank_counter += 1;
2947 }
2948 Value::Int(dense_rank_counter)
2949 }
2950 WindowFunc::Sum => {
2951 if let Some(ci) = arg_col_idx {
2952 match &rows[row_idx][ci] {
2953 Value::Int(v) => running_int_sum += v,
2954 Value::Float(v) => {
2955 running_float_sum += v;
2956 running_saw_float = true;
2957 }
2958 _ => {}
2959 }
2960 }
2961 if running_saw_float {
2962 Value::Float(running_float_sum + running_int_sum as f64)
2963 } else {
2964 Value::Int(running_int_sum)
2965 }
2966 }
2967 WindowFunc::Avg => {
2968 if let Some(ci) = arg_col_idx {
2969 match &rows[row_idx][ci] {
2970 Value::Int(v) => {
2971 running_float_sum += *v as f64;
2972 running_count += 1;
2973 }
2974 Value::Float(v) => {
2975 running_float_sum += v;
2976 running_count += 1;
2977 }
2978 _ => {}
2979 }
2980 }
2981 if running_count == 0 {
2982 Value::Empty
2983 } else {
2984 Value::Float(running_float_sum / running_count as f64)
2985 }
2986 }
2987 WindowFunc::Count => {
2988 if let Some(ci) = arg_col_idx {
2989 if !rows[row_idx][ci].is_empty() {
2990 running_count += 1;
2991 }
2992 } else {
2993 running_count += 1;
2995 }
2996 Value::Int(running_count)
2997 }
2998 WindowFunc::Min => {
2999 if let Some(ci) = arg_col_idx {
3000 let v = &rows[row_idx][ci];
3001 if !v.is_empty() {
3002 running_min = Some(match &running_min {
3003 None => v.clone(),
3004 Some(cur) => {
3005 if v < cur {
3006 v.clone()
3007 } else {
3008 cur.clone()
3009 }
3010 }
3011 });
3012 }
3013 }
3014 running_min.clone().unwrap_or(Value::Empty)
3015 }
3016 WindowFunc::Max => {
3017 if let Some(ci) = arg_col_idx {
3018 let v = &rows[row_idx][ci];
3019 if !v.is_empty() {
3020 running_max = Some(match &running_max {
3021 None => v.clone(),
3022 Some(cur) => {
3023 if v > cur {
3024 v.clone()
3025 } else {
3026 cur.clone()
3027 }
3028 }
3029 });
3030 }
3031 }
3032 running_max.clone().unwrap_or(Value::Empty)
3033 }
3034 };
3035
3036 prev_order_key = Some(current_order_key);
3037 win_values[row_idx] = value;
3038 }
3039
3040 for (ri, row) in rows.iter_mut().enumerate() {
3042 row.push(win_values[ri].clone());
3043 }
3044 columns.push(wdef.output_name.clone());
3045 }
3046
3047 Ok(QueryResult::Rows { columns, rows })
3048}
3049
3050pub(super) fn compute_group_aggregate(
3052 func: AggFunc,
3053 all_rows: &[Vec<Value>],
3054 row_indices: &[usize],
3055 col_idx: usize,
3056) -> Value {
3057 match func {
3058 AggFunc::Count => {
3059 if col_idx == usize::MAX {
3060 return Value::Int(row_indices.len() as i64);
3062 }
3063 let count = row_indices
3064 .iter()
3065 .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3066 .count();
3067 Value::Int(count as i64)
3068 }
3069 AggFunc::CountDistinct => {
3070 let mut seen = std::collections::HashSet::new();
3071 for &ri in row_indices {
3072 let v = &all_rows[ri][col_idx];
3073 if !v.is_empty() {
3074 seen.insert(v.clone());
3075 }
3076 }
3077 Value::Int(seen.len() as i64)
3078 }
3079 AggFunc::Sum => {
3080 let mut int_sum: i64 = 0;
3085 let mut float_sum: f64 = 0.0;
3086 let mut saw_float = false;
3087 for &ri in row_indices {
3088 match &all_rows[ri][col_idx] {
3089 Value::Int(v) => int_sum += v,
3090 Value::Float(v) => {
3091 float_sum += *v;
3092 saw_float = true;
3093 }
3094 _ => {}
3095 }
3096 }
3097 if saw_float {
3098 Value::Float(float_sum + int_sum as f64)
3099 } else {
3100 Value::Int(int_sum)
3101 }
3102 }
3103 AggFunc::Avg => {
3104 let mut sum = 0.0f64;
3105 let mut count = 0usize;
3106 for &ri in row_indices {
3107 match &all_rows[ri][col_idx] {
3108 Value::Int(v) => {
3109 sum += *v as f64;
3110 count += 1;
3111 }
3112 Value::Float(v) => {
3113 sum += *v;
3114 count += 1;
3115 }
3116 _ => {}
3117 }
3118 }
3119 if count == 0 {
3120 Value::Empty
3121 } else {
3122 Value::Float(sum / count as f64)
3123 }
3124 }
3125 AggFunc::Min => row_indices
3126 .iter()
3127 .map(|&ri| &all_rows[ri][col_idx])
3128 .filter(|v| !v.is_empty())
3129 .min()
3130 .cloned()
3131 .unwrap_or(Value::Empty),
3132 AggFunc::Max => row_indices
3133 .iter()
3134 .map(|&ri| &all_rows[ri][col_idx])
3135 .filter(|v| !v.is_empty())
3136 .max()
3137 .cloned()
3138 .unwrap_or(Value::Empty),
3139 }
3140}
3141
3142pub(super) fn try_extract_equi_join_keys(
3156 pred: &Expr,
3157 left_columns: &[String],
3158 right_columns: &[String],
3159) -> Option<(usize, usize)> {
3160 let (lhs, op, rhs) = match pred {
3161 Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3162 _ => return None,
3163 };
3164 if op != BinOp::Eq {
3165 return None;
3166 }
3167 if let (Some(li), Some(ri)) = (
3169 resolve_side_column(lhs, left_columns),
3170 resolve_side_column(rhs, right_columns),
3171 ) {
3172 return Some((li, ri));
3173 }
3174 if let (Some(li), Some(ri)) = (
3177 resolve_side_column(rhs, left_columns),
3178 resolve_side_column(lhs, right_columns),
3179 ) {
3180 return Some((li, ri));
3181 }
3182 None
3183}
3184
3185fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3186 match expr {
3187 Expr::QualifiedField { qualifier, field } => {
3188 let q = qualifier.as_bytes();
3193 let f = field.as_bytes();
3194 columns.iter().position(|c| {
3195 let b = c.as_bytes();
3196 b.len() == q.len() + 1 + f.len()
3197 && b[..q.len()] == *q
3198 && b[q.len()] == b'.'
3199 && b[q.len() + 1..] == *f
3200 })
3201 }
3202 Expr::Field(name) => columns.iter().position(|c| c == name),
3203 _ => None,
3204 }
3205}
3206
3207pub(super) fn hash_join(
3219 left_columns: Vec<String>,
3220 left_rows: Vec<Vec<Value>>,
3221 right_columns: Vec<String>,
3222 right_rows: Vec<Vec<Value>>,
3223 left_key_idx: usize,
3224 right_key_idx: usize,
3225 kind: JoinKind,
3226) -> QueryResult {
3227 use rustc_hash::FxHashMap;
3228
3229 let n_left = left_columns.len();
3230 let n_right = right_columns.len();
3231 let mut columns = Vec::with_capacity(n_left + n_right);
3232 columns.extend(left_columns);
3233 columns.extend(right_columns);
3234
3235 let mut build: FxHashMap<Value, Vec<usize>> =
3238 FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3239 for (i, row) in right_rows.iter().enumerate() {
3240 if matches!(row[right_key_idx], Value::Empty) {
3244 continue;
3245 }
3246 build.entry(row[right_key_idx].clone()).or_default().push(i);
3247 }
3248
3249 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3252
3253 for left_row in &left_rows {
3254 let key = &left_row[left_key_idx];
3255 let matched = if matches!(key, Value::Empty) {
3256 None
3257 } else {
3258 build.get(key)
3259 };
3260 match matched {
3261 Some(matches) if !matches.is_empty() => {
3262 for &ri in matches {
3263 let right_row = &right_rows[ri];
3264 let mut combined = Vec::with_capacity(n_left + n_right);
3265 combined.extend_from_slice(left_row);
3266 combined.extend_from_slice(right_row);
3267 rows.push(combined);
3268 }
3269 }
3270 _ => {
3271 if matches!(kind, JoinKind::LeftOuter) {
3272 let mut row = Vec::with_capacity(n_left + n_right);
3273 row.extend_from_slice(left_row);
3274 row.resize(n_left + n_right, Value::Empty);
3275 rows.push(row);
3276 }
3277 }
3278 }
3279 }
3280
3281 QueryResult::Rows { columns, rows }
3282}
3283
3284pub(super) fn lower_unindexed_range_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3297 match plan {
3298 PlanNode::RangeScan {
3299 table,
3300 column,
3301 start,
3302 end,
3303 } => {
3304 if let Some(tbl) = catalog.get_table(table) {
3305 if tbl.is_index_unique(column) == Some(true) {
3310 return plan.clone();
3311 }
3312 }
3313 let pred = synthesize_range_predicate(column, start, end);
3314 PlanNode::Filter {
3315 input: Box::new(PlanNode::SeqScan {
3316 table: table.clone(),
3317 }),
3318 predicate: pred,
3319 }
3320 }
3321 PlanNode::Filter { input, predicate } => PlanNode::Filter {
3322 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3323 predicate: predicate.clone(),
3324 },
3325 PlanNode::Project { input, fields } => PlanNode::Project {
3326 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3327 fields: fields.clone(),
3328 },
3329 PlanNode::Sort { input, keys } => PlanNode::Sort {
3330 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3331 keys: keys.clone(),
3332 },
3333 PlanNode::Limit { input, count } => PlanNode::Limit {
3334 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3335 count: count.clone(),
3336 },
3337 PlanNode::Offset { input, count } => PlanNode::Offset {
3338 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3339 count: count.clone(),
3340 },
3341 PlanNode::Aggregate {
3342 input,
3343 function,
3344 field,
3345 } => PlanNode::Aggregate {
3346 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3347 function: *function,
3348 field: field.clone(),
3349 },
3350 PlanNode::Distinct { input } => PlanNode::Distinct {
3351 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3352 },
3353 PlanNode::GroupBy {
3354 input,
3355 keys,
3356 aggregates,
3357 having,
3358 } => PlanNode::GroupBy {
3359 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3360 keys: keys.clone(),
3361 aggregates: aggregates.clone(),
3362 having: having.clone(),
3363 },
3364 PlanNode::Update {
3365 input,
3366 table,
3367 assignments,
3368 } => PlanNode::Update {
3369 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3370 table: table.clone(),
3371 assignments: assignments.clone(),
3372 },
3373 PlanNode::Delete { input, table } => PlanNode::Delete {
3374 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3375 table: table.clone(),
3376 },
3377 PlanNode::Window { input, windows } => PlanNode::Window {
3378 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3379 windows: windows.clone(),
3380 },
3381 PlanNode::Union { left, right, all } => PlanNode::Union {
3382 left: Box::new(lower_unindexed_range_scans(catalog, left)),
3383 right: Box::new(lower_unindexed_range_scans(catalog, right)),
3384 all: *all,
3385 },
3386 PlanNode::Explain { input } => PlanNode::Explain {
3387 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3388 },
3389 PlanNode::NestedLoopJoin {
3390 left,
3391 right,
3392 on,
3393 kind,
3394 } => PlanNode::NestedLoopJoin {
3395 left: Box::new(lower_unindexed_range_scans(catalog, left)),
3396 right: Box::new(lower_unindexed_range_scans(catalog, right)),
3397 on: on.clone(),
3398 kind: *kind,
3399 },
3400 _ => plan.clone(),
3402 }
3403}
3404
3405pub(super) fn synthesize_range_predicate(
3407 column: &str,
3408 start: &Option<(Expr, bool)>,
3409 end: &Option<(Expr, bool)>,
3410) -> Expr {
3411 let lower = start.as_ref().map(|(expr, inclusive)| {
3412 let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3413 Expr::BinaryOp(
3414 Box::new(Expr::Field(column.to_string())),
3415 op,
3416 Box::new(expr.clone()),
3417 )
3418 });
3419 let upper = end.as_ref().map(|(expr, inclusive)| {
3420 let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3421 Expr::BinaryOp(
3422 Box::new(Expr::Field(column.to_string())),
3423 op,
3424 Box::new(expr.clone()),
3425 )
3426 });
3427 match (lower, upper) {
3428 (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3429 (Some(l), None) => l,
3430 (None, Some(u)) => u,
3431 (None, None) => Expr::Literal(Literal::Bool(true)),
3432 }
3433}
3434
3435pub(super) fn range_matches(
3437 val: &Value,
3438 start: &Option<Value>,
3439 start_inc: bool,
3440 end: &Option<Value>,
3441 end_inc: bool,
3442) -> bool {
3443 if let Some(ref s) = start {
3444 if start_inc {
3445 if val < s {
3446 return false;
3447 }
3448 } else if val <= s {
3449 return false;
3450 }
3451 }
3452 if let Some(ref e) = end {
3453 if end_inc {
3454 if val > e {
3455 return false;
3456 }
3457 } else if val >= e {
3458 return false;
3459 }
3460 }
3461 true
3462}
3463
3464pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3467 let indent = " ".repeat(depth);
3468 match plan {
3469 PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3470 PlanNode::AliasScan { table, alias } => {
3471 format!("{indent}AliasScan table={table} alias={alias}")
3472 }
3473 PlanNode::IndexScan { table, column, key } => {
3474 format!("{indent}IndexScan table={table} column={column} key={key:?}")
3475 }
3476 PlanNode::RangeScan {
3477 table,
3478 column,
3479 start,
3480 end,
3481 } => {
3482 let s = match start {
3483 Some((expr, inc)) => {
3484 let op = if *inc { ">=" } else { ">" };
3485 format!("{op}{expr:?}")
3486 }
3487 None => "unbounded".to_string(),
3488 };
3489 let e = match end {
3490 Some((expr, inc)) => {
3491 let op = if *inc { "<=" } else { "<" };
3492 format!("{op}{expr:?}")
3493 }
3494 None => "unbounded".to_string(),
3495 };
3496 format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3497 }
3498 PlanNode::Filter { input, predicate } => {
3499 let child = format_plan_tree(input, depth + 1);
3500 format!("{indent}Filter predicate={predicate:?}\n{child}")
3501 }
3502 PlanNode::Project { input, fields } => {
3503 let names: Vec<String> = fields
3504 .iter()
3505 .map(|f| match &f.alias {
3506 Some(a) => format!("{a}: {:?}", f.expr),
3507 None => format!("{:?}", f.expr),
3508 })
3509 .collect();
3510 let child = format_plan_tree(input, depth + 1);
3511 format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3512 }
3513 PlanNode::Sort { input, keys } => {
3514 let ks: Vec<String> = keys
3515 .iter()
3516 .map(|k| {
3517 if k.descending {
3518 format!("{} desc", k.field)
3519 } else {
3520 k.field.clone()
3521 }
3522 })
3523 .collect();
3524 let child = format_plan_tree(input, depth + 1);
3525 format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3526 }
3527 PlanNode::Limit { input, count } => {
3528 let child = format_plan_tree(input, depth + 1);
3529 format!("{indent}Limit count={count:?}\n{child}")
3530 }
3531 PlanNode::Offset { input, count } => {
3532 let child = format_plan_tree(input, depth + 1);
3533 format!("{indent}Offset count={count:?}\n{child}")
3534 }
3535 PlanNode::Aggregate {
3536 input,
3537 function,
3538 field,
3539 } => {
3540 let f = field.as_deref().unwrap_or("*");
3541 let child = format_plan_tree(input, depth + 1);
3542 format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3543 }
3544 PlanNode::NestedLoopJoin {
3545 left,
3546 right,
3547 on,
3548 kind,
3549 } => {
3550 let left_child = format_plan_tree(left, depth + 1);
3551 let right_child = format_plan_tree(right, depth + 1);
3552 let on_str = match on {
3553 Some(pred) => format!("{pred:?}"),
3554 None => "none".to_string(),
3555 };
3556 format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3557 }
3558 PlanNode::Distinct { input } => {
3559 let child = format_plan_tree(input, depth + 1);
3560 format!("{indent}Distinct\n{child}")
3561 }
3562 PlanNode::GroupBy {
3563 input,
3564 keys,
3565 aggregates,
3566 having,
3567 } => {
3568 let agg_strs: Vec<String> = aggregates
3569 .iter()
3570 .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3571 .collect();
3572 let having_str = match having {
3573 Some(h) => format!(" having={h:?}"),
3574 None => String::new(),
3575 };
3576 let child = format_plan_tree(input, depth + 1);
3577 format!(
3578 "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3579 keys.join(", "),
3580 agg_strs.join(", "),
3581 )
3582 }
3583 PlanNode::Insert { table, assignments } => {
3584 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3585 format!("{indent}Insert table={table} cols=[{}]", cols.join(", "))
3586 }
3587 PlanNode::Upsert {
3588 table,
3589 key_column,
3590 assignments,
3591 on_conflict,
3592 } => {
3593 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3594 let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3595 if conflict_cols.is_empty() {
3596 format!(
3597 "{indent}Upsert table={table} key={key_column} cols=[{}]",
3598 cols.join(", ")
3599 )
3600 } else {
3601 format!(
3602 "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3603 cols.join(", "),
3604 conflict_cols.join(", ")
3605 )
3606 }
3607 }
3608 PlanNode::Update {
3609 input,
3610 table,
3611 assignments,
3612 } => {
3613 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3614 let child = format_plan_tree(input, depth + 1);
3615 format!(
3616 "{indent}Update table={table} set=[{}]\n{child}",
3617 cols.join(", ")
3618 )
3619 }
3620 PlanNode::Delete { input, table } => {
3621 let child = format_plan_tree(input, depth + 1);
3622 format!("{indent}Delete table={table}\n{child}")
3623 }
3624 PlanNode::CreateTable { name, fields } => {
3625 let fs: Vec<String> = fields
3626 .iter()
3627 .map(|(n, t, r)| {
3628 if *r {
3629 format!("{n}: {t} required")
3630 } else {
3631 format!("{n}: {t}")
3632 }
3633 })
3634 .collect();
3635 format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3636 }
3637 PlanNode::AlterTable { table, action } => {
3638 format!("{indent}AlterTable table={table} action={action:?}")
3639 }
3640 PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3641 PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3642 PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3643 PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3644 PlanNode::Window { input, windows } => {
3645 let ws: Vec<String> = windows
3646 .iter()
3647 .map(|w| format!("{:?} as {}", w.function, w.output_name))
3648 .collect();
3649 let child = format_plan_tree(input, depth + 1);
3650 format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3651 }
3652 PlanNode::Union { left, right, all } => {
3653 let kind = if *all { "UNION ALL" } else { "UNION" };
3654 let left_child = format_plan_tree(left, depth + 1);
3655 let right_child = format_plan_tree(right, depth + 1);
3656 format!("{indent}{kind}\n{left_child}\n{right_child}")
3657 }
3658 PlanNode::Explain { input } => {
3659 let child = format_plan_tree(input, depth + 1);
3660 format!("{indent}Explain\n{child}")
3661 }
3662 PlanNode::Begin => format!("{indent}Begin"),
3663 PlanNode::Commit => format!("{indent}Commit"),
3664 PlanNode::Rollback => format!("{indent}Rollback"),
3665 }
3666}