1use crate::ast::*;
4use crate::plan::*;
5use crate::result::{QueryError, QueryResult};
6use powdb_storage::catalog::Catalog;
7use powdb_storage::row::{decode_column, decode_row, patch_var_column_in_place, RowLayout};
8use powdb_storage::types::*;
9use std::cmp::Reverse;
10use std::collections::BinaryHeap;
11
12use super::compiled::*;
13use super::eval::*;
14use super::{check_join_limit, Engine, MAX_SORT_ROWS};
15use powdb_storage::view::{ViewDef, ViewRegistry};
16
17impl Engine {
18 pub fn execute_plan(&mut self, plan: &PlanNode) -> Result<QueryResult, QueryError> {
19 match plan {
20 PlanNode::SeqScan { table } => {
21 if self.view_registry.is_dirty(table) {
23 self.refresh_view(table)?;
24 }
25 let schema = self
26 .catalog
27 .schema(table)
28 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
29 .clone();
30 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
31 let rows: Vec<Vec<Value>> = self
32 .catalog
33 .scan(table)
34 .map_err(|e| QueryError::StorageError(e.to_string()))?
35 .map(|(_, row)| row)
36 .collect();
37 Ok(QueryResult::Rows { columns, rows })
38 }
39
40 PlanNode::Filter { input, predicate } => {
41 let materialized;
45 let predicate = if contains_subquery(predicate) {
46 materialized = self.materialize_subqueries(predicate)?;
47 &materialized
48 } else {
49 predicate
50 };
51
52 if contains_subquery(predicate) {
54 let result = self.execute_plan(input)?;
55 return match result {
56 QueryResult::Rows { columns, rows } => {
57 let mut filtered = Vec::new();
58 for row in rows {
59 let row_pred =
60 self.materialize_correlated_for_row(predicate, &row, &columns)?;
61 if eval_predicate(&row_pred, &row, &columns) {
62 filtered.push(row);
63 }
64 }
65 Ok(QueryResult::Rows {
66 columns,
67 rows: filtered,
68 })
69 }
70 _ => Err("filter requires row input".into()),
71 };
72 }
73
74 if let PlanNode::SeqScan { table } = input.as_ref() {
79 if self.view_registry.is_dirty(table) {
81 self.refresh_view(table)?;
82 }
83 let schema = self
84 .catalog
85 .schema(table)
86 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
87 .clone();
88 let columns: Vec<String> =
89 schema.columns.iter().map(|c| c.name.clone()).collect();
90 let fast = FastLayout::new(&schema);
91 let row_layout = RowLayout::new(&schema);
92 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
96
97 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, &schema) {
100 self.catalog
101 .for_each_row_raw(table, |_rid, data| {
102 if compiled(data) {
103 rows.push(decode_row(&schema, data));
104 }
105 })
106 .map_err(|e| QueryError::StorageError(e.to_string()))?;
107 } else {
108 let pred_cols = predicate_column_indices(predicate, &columns);
109 self.catalog
110 .for_each_row_raw(table, |_rid, data| {
111 let pred_row =
112 decode_selective(&schema, &row_layout, data, &pred_cols);
113 if eval_predicate(predicate, &pred_row, &columns) {
114 rows.push(decode_row(&schema, data));
115 }
116 })
117 .map_err(|e| QueryError::StorageError(e.to_string()))?;
118 }
119
120 return Ok(QueryResult::Rows { columns, rows });
121 }
122
123 let result = self.execute_plan(input)?;
125 match result {
126 QueryResult::Rows { columns, rows } => {
127 let filtered: Vec<Vec<Value>> = rows
128 .into_iter()
129 .filter(|row| eval_predicate(predicate, row, &columns))
130 .collect();
131 Ok(QueryResult::Rows {
132 columns,
133 rows: filtered,
134 })
135 }
136 _ => Err("filter requires row input".into()),
137 }
138 }
139
140 PlanNode::Project { input, fields } => {
141 if let PlanNode::IndexScan { table, column, key } = input.as_ref() {
144 let schema = self
145 .catalog
146 .schema(table)
147 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
148 .clone();
149 let all_columns: Vec<String> =
150 schema.columns.iter().map(|c| c.name.clone()).collect();
151 let key_value = literal_to_value(key)?;
152 let tbl = self
153 .catalog
154 .get_table(table)
155 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
156
157 let proj_columns: Vec<String> = fields
158 .iter()
159 .map(|f| {
160 f.alias.clone().unwrap_or_else(|| match &f.expr {
161 Expr::Field(name) => name.clone(),
162 _ => "?".into(),
163 })
164 })
165 .collect();
166
167 let proj_indices: Vec<usize> = fields
169 .iter()
170 .filter_map(|f| {
171 if let Expr::Field(name) = &f.expr {
172 all_columns.iter().position(|c| c == name)
173 } else {
174 None
175 }
176 })
177 .collect();
178
179 if tbl.has_index(column) {
180 let layout = RowLayout::new(&schema);
181 let rids = tbl.index_lookup_all(column, &key_value);
182 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
183 for rid in rids {
184 if let Some(data) = tbl.heap.get(rid) {
185 let row: Vec<Value> = proj_indices
186 .iter()
187 .map(|&ci| decode_column(&schema, &layout, &data, ci))
188 .collect();
189 rows.push(row);
190 }
191 }
192 return Ok(QueryResult::Rows {
193 columns: proj_columns,
194 rows,
195 });
196 }
197 }
198
199 if let PlanNode::Limit {
204 input: inner,
205 count: limit_expr,
206 } = input.as_ref()
207 {
208 if let PlanNode::Sort {
209 input: sort_input,
210 keys,
211 } = inner.as_ref()
212 {
213 if keys.len() == 1 {
215 let sort_field = &keys[0].field;
216 let descending = keys[0].descending;
217 let limit = match limit_expr {
218 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
219 _ => usize::MAX,
220 };
221 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
222 match sort_input.as_ref() {
223 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
224 PlanNode::Filter {
225 input: fi,
226 predicate,
227 } => {
228 if let PlanNode::SeqScan { table } = fi.as_ref() {
229 (Some(table.as_str()), Some(predicate))
230 } else {
231 (None, None)
232 }
233 }
234 _ => (None, None),
235 };
236 if let Some(table) = table_opt {
237 if let Some(result) = self.project_filter_sort_limit_fast(
238 table, fields, sort_field, descending, limit, pred_opt,
239 )? {
240 return Ok(result);
241 }
242 }
243 }
244 }
245 if let PlanNode::Filter {
248 input: fi,
249 predicate,
250 } = inner.as_ref()
251 {
252 if let PlanNode::SeqScan { table } = fi.as_ref() {
253 let limit = match limit_expr {
254 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
255 _ => usize::MAX,
256 };
257 if let Some(result) = self.project_filter_limit_fast(
258 table,
259 fields,
260 limit,
261 Some(predicate),
262 )? {
263 return Ok(result);
264 }
265 }
266 }
267 if let PlanNode::SeqScan { table } = inner.as_ref() {
269 let limit = match limit_expr {
270 Expr::Literal(Literal::Int(v)) if *v >= 0 => *v as usize,
271 _ => usize::MAX,
272 };
273 if let Some(result) =
274 self.project_filter_limit_fast(table, fields, limit, None)?
275 {
276 return Ok(result);
277 }
278 }
279 }
280
281 if let PlanNode::Filter {
292 input: fi,
293 predicate,
294 } = input.as_ref()
295 {
296 if let PlanNode::SeqScan { table } = fi.as_ref() {
297 if let Some(result) = self.project_filter_limit_fast(
298 table,
299 fields,
300 usize::MAX,
301 Some(predicate),
302 )? {
303 return Ok(result);
304 }
305 }
306 }
307
308 if let PlanNode::SeqScan { table } = input.as_ref() {
312 if let Some(result) =
313 self.project_filter_limit_fast(table, fields, usize::MAX, None)?
314 {
315 return Ok(result);
316 }
317 }
318
319 let result = self.execute_plan(input)?;
320 match result {
321 QueryResult::Rows { columns, rows } => {
322 let proj_columns: Vec<String> = fields
323 .iter()
324 .map(|f| {
325 f.alias.clone().unwrap_or_else(|| match &f.expr {
326 Expr::Field(name) => name.clone(),
327 Expr::QualifiedField { qualifier, field } => {
331 format!("{qualifier}.{field}")
332 }
333 _ => "?".into(),
334 })
335 })
336 .collect();
337 let proj_rows: Vec<Vec<Value>> = rows
338 .iter()
339 .map(|row| {
340 fields
341 .iter()
342 .map(|f| eval_expr(&f.expr, row, &columns))
343 .collect()
344 })
345 .collect();
346 Ok(QueryResult::Rows {
347 columns: proj_columns,
348 rows: proj_rows,
349 })
350 }
351 _ => Err("project requires row input".into()),
352 }
353 }
354
355 PlanNode::Sort { input, keys } => {
356 let result = self.execute_plan(input)?;
357 match result {
358 QueryResult::Rows { columns, mut rows } => {
359 if rows.len() > MAX_SORT_ROWS {
363 return Err(QueryError::SortLimitExceeded);
364 }
365 self.charge_rows(&rows)?;
366 let key_indices: Vec<(usize, bool)> = keys
367 .iter()
368 .map(|k| {
369 columns
370 .iter()
371 .position(|c| c == &k.field)
372 .map(|idx| (idx, k.descending))
373 .ok_or_else(|| QueryError::ColumnNotFound {
374 table: String::new(),
375 column: k.field.clone(),
376 })
377 })
378 .collect::<Result<_, QueryError>>()?;
379 rows.sort_by(|a, b| {
380 for &(col_idx, descending) in &key_indices {
381 let cmp = a[col_idx].cmp(&b[col_idx]);
382 let cmp = if descending { cmp.reverse() } else { cmp };
383 if cmp != std::cmp::Ordering::Equal {
384 return cmp;
385 }
386 }
387 std::cmp::Ordering::Equal
388 });
389 Ok(QueryResult::Rows { columns, rows })
390 }
391 _ => Err("sort requires row input".into()),
392 }
393 }
394
395 PlanNode::Limit { input, count } => {
396 let result = self.execute_plan(input)?;
397 let n = match count {
398 Expr::Literal(Literal::Int(v)) => *v as usize,
399 _ => return Err("limit must be integer literal".into()),
400 };
401 match result {
402 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
403 columns,
404 rows: rows.into_iter().take(n).collect(),
405 }),
406 _ => Err("limit requires row input".into()),
407 }
408 }
409
410 PlanNode::Offset { input, count } => {
411 let result = self.execute_plan(input)?;
412 let n = match count {
413 Expr::Literal(Literal::Int(v)) => *v as usize,
414 _ => return Err("offset must be integer literal".into()),
415 };
416 match result {
417 QueryResult::Rows { columns, rows } => Ok(QueryResult::Rows {
418 columns,
419 rows: rows.into_iter().skip(n).collect(),
420 }),
421 _ => Err("offset requires row input".into()),
422 }
423 }
424
425 PlanNode::Aggregate {
426 input,
427 function,
428 field,
429 } => {
430 if *function == AggFunc::Count {
432 if let PlanNode::SeqScan { table } = input.as_ref() {
433 if self.view_registry.is_dirty(table) {
437 self.refresh_view(table)?;
438 }
439 let mut count: i64 = 0;
440 self.catalog
441 .for_each_row_raw(table, |_rid, _data| {
442 count += 1;
443 })
444 .map_err(|e| QueryError::StorageError(e.to_string()))?;
445 return Ok(QueryResult::Scalar(Value::Int(count)));
446 }
447 if let PlanNode::Filter {
455 input: inner,
456 predicate,
457 } = input.as_ref()
458 {
459 if let PlanNode::SeqScan { table } = inner.as_ref() {
460 if self.view_registry.is_dirty(table) {
461 self.refresh_view(table)?;
462 }
463 }
464 if let (PlanNode::SeqScan { table }, false) =
465 (inner.as_ref(), contains_subquery(predicate))
466 {
467 let schema = self
468 .catalog
469 .schema(table)
470 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
471 .clone();
472 let columns: Vec<String> =
473 schema.columns.iter().map(|c| c.name.clone()).collect();
474 let fast = FastLayout::new(&schema);
475 let row_layout = RowLayout::new(&schema);
476
477 if let Some(compiled) =
480 compile_predicate(predicate, &columns, &fast, &schema)
481 {
482 let mut count: i64 = 0;
483 self.catalog
484 .for_each_row_raw(table, |_rid, data| {
485 if compiled(data) {
486 count += 1;
487 }
488 })
489 .map_err(|e| QueryError::StorageError(e.to_string()))?;
490 return Ok(QueryResult::Scalar(Value::Int(count)));
491 }
492
493 let pred_cols = predicate_column_indices(predicate, &columns);
495 let mut count: i64 = 0;
496 self.catalog
497 .for_each_row_raw(table, |_rid, data| {
498 let pred_row =
499 decode_selective(&schema, &row_layout, data, &pred_cols);
500 if eval_predicate(predicate, &pred_row, &columns) {
501 count += 1;
502 }
503 })
504 .map_err(|e| QueryError::StorageError(e.to_string()))?;
505
506 return Ok(QueryResult::Scalar(Value::Int(count)));
507 }
508 }
509 }
510
511 if matches!(
515 function,
516 AggFunc::Sum
517 | AggFunc::Avg
518 | AggFunc::Min
519 | AggFunc::Max
520 | AggFunc::CountDistinct
521 ) {
522 if let Some(col) = field.as_ref() {
523 let (table_opt, pred_opt): (Option<&str>, Option<&Expr>) =
525 match input.as_ref() {
526 PlanNode::SeqScan { table } => (Some(table.as_str()), None),
527 PlanNode::Filter {
528 input: inner,
529 predicate,
530 } => {
531 if let PlanNode::SeqScan { table } = inner.as_ref() {
532 (Some(table.as_str()), Some(predicate))
533 } else {
534 (None, None)
535 }
536 }
537 _ => (None, None),
538 };
539 if let Some(table) = table_opt {
540 if let Some(result) =
541 self.agg_single_col_fast(table, col, *function, pred_opt)?
542 {
543 return Ok(result);
544 }
545 }
546 }
547 }
548
549 let result = self.execute_plan(input)?;
554 match result {
555 QueryResult::Rows { columns, rows } => {
556 match function {
557 AggFunc::Count => {
558 Ok(QueryResult::Scalar(Value::Int(rows.len() as i64)))
559 }
560 AggFunc::CountDistinct => {
561 let col = field.as_ref().ok_or("count distinct requires field")?;
562 let idx = columns
563 .iter()
564 .position(|c| c == col)
565 .ok_or("col not found")?;
566 let mut seen = std::collections::HashSet::new();
567 for row in &rows {
568 let v = &row[idx];
569 if !v.is_empty() {
570 seen.insert(v.clone());
571 }
572 }
573 Ok(QueryResult::Scalar(Value::Int(seen.len() as i64)))
574 }
575 AggFunc::Avg => {
576 let col = field.as_ref().ok_or("avg requires field")?;
577 let idx = columns
578 .iter()
579 .position(|c| c == col)
580 .ok_or("col not found")?;
581 let sum: f64 = rows
582 .iter()
583 .filter_map(|r| match &r[idx] {
584 Value::Int(v) => Some(*v as f64),
585 Value::Float(v) => Some(*v),
586 _ => None,
587 })
588 .sum();
589 let count = rows.len() as f64;
590 Ok(QueryResult::Scalar(Value::Float(sum / count)))
591 }
592 AggFunc::Sum => {
593 let col = field.as_ref().ok_or("sum requires field")?;
594 let idx = columns
595 .iter()
596 .position(|c| c == col)
597 .ok_or("col not found")?;
598 let mut int_sum: i64 = 0;
604 let mut float_sum: f64 = 0.0;
605 let mut saw_float = false;
606 for r in &rows {
607 match &r[idx] {
608 Value::Int(v) => int_sum += *v,
609 Value::Float(v) => {
610 float_sum += *v;
611 saw_float = true;
612 }
613 _ => {}
614 }
615 }
616 let result = if saw_float {
617 Value::Float(float_sum + int_sum as f64)
618 } else {
619 Value::Int(int_sum)
620 };
621 Ok(QueryResult::Scalar(result))
622 }
623 AggFunc::Min | AggFunc::Max => {
624 let col = field.as_ref().ok_or("min/max requires field")?;
625 let idx = columns
626 .iter()
627 .position(|c| c == col)
628 .ok_or("col not found")?;
629 let vals: Vec<&Value> = rows.iter().map(|r| &r[idx]).collect();
630 let result = if *function == AggFunc::Min {
631 vals.into_iter().min().cloned()
632 } else {
633 vals.into_iter().max().cloned()
634 };
635 Ok(QueryResult::Scalar(result.unwrap_or(Value::Empty)))
636 }
637 }
638 }
639 _ => Err("aggregate requires row input".into()),
640 }
641 }
642
643 PlanNode::Insert { table, assignments } => {
644 let values = {
645 let schema = self
646 .catalog
647 .schema(table)
648 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
649 let mut values = vec![Value::Empty; schema.columns.len()];
650 for a in assignments {
651 let idx = schema.column_index(&a.field).ok_or_else(|| {
652 QueryError::ColumnNotFound {
653 table: String::new(),
654 column: a.field.clone(),
655 }
656 })?;
657 let raw = literal_to_value(&a.value)?;
658 values[idx] = coerce_value(raw, &schema.columns[idx])?;
659 }
660 for col in &schema.columns {
661 if col.required && matches!(values[col.position as usize], Value::Empty) {
662 return Err(QueryError::Execution(format!(
663 "column '{}' is required but no value was provided",
664 col.name
665 )));
666 }
667 }
668 values
669 };
670 self.catalog
671 .insert(table, &values)
672 .map_err(|e| QueryError::StorageError(e.to_string()))?;
673 self.view_registry.mark_dependents_dirty(table);
674 Ok(QueryResult::Modified(1))
675 }
676
677 PlanNode::Upsert {
678 table,
679 key_column,
680 assignments,
681 on_conflict,
682 } => {
683 let (values, key_idx) = {
684 let schema = self
685 .catalog
686 .schema(table)
687 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
688 let mut values = vec![Value::Empty; schema.columns.len()];
689 for a in assignments {
690 let idx = schema.column_index(&a.field).ok_or_else(|| {
691 QueryError::ColumnNotFound {
692 table: String::new(),
693 column: a.field.clone(),
694 }
695 })?;
696 let raw = literal_to_value(&a.value)?;
697 values[idx] = coerce_value(raw, &schema.columns[idx])?;
698 }
699 for col in &schema.columns {
700 if col.required && matches!(values[col.position as usize], Value::Empty) {
701 return Err(QueryError::Execution(format!(
702 "column '{}' is required but no value was provided",
703 col.name
704 )));
705 }
706 }
707 let key_idx = schema
708 .column_index(key_column)
709 .ok_or_else(|| format!("key column '{key_column}' not found"))?;
710 (values, key_idx)
711 };
712
713 let key_value = values[key_idx].clone();
714
715 let existing = {
717 let tbl = self
718 .catalog
719 .get_table(table)
720 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
721 if tbl.has_index(key_column) {
722 let rids = tbl.index_lookup_all(key_column, &key_value);
727 rids.into_iter().next().and_then(|rid| {
728 tbl.heap
729 .get(rid)
730 .map(|data| (rid, decode_row(&tbl.schema, &data)))
731 })
732 } else {
733 let mut found = None;
735 for (rid, row) in tbl.scan() {
736 if row[key_idx] == key_value {
737 found = Some((rid, row));
738 break;
739 }
740 }
741 found
742 }
743 };
744
745 if let Some((rid, mut existing_row)) = existing {
746 let update_assignments = if on_conflict.is_empty() {
748 assignments
749 } else {
750 on_conflict
751 };
752 let changed_cols: Vec<usize> = {
753 let schema = self
754 .catalog
755 .schema(table)
756 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
757 let mut indices = Vec::new();
758 for a in update_assignments {
759 let idx = schema.column_index(&a.field).ok_or_else(|| {
760 QueryError::ColumnNotFound {
761 table: String::new(),
762 column: a.field.clone(),
763 }
764 })?;
765 if idx != key_idx {
766 existing_row[idx] = literal_to_value(&a.value)?;
767 indices.push(idx);
768 }
769 }
770 indices
771 };
772 self.catalog
773 .update_hinted(table, rid, &existing_row, Some(&changed_cols))
774 .map_err(|e| QueryError::StorageError(e.to_string()))?;
775 self.view_registry.mark_dependents_dirty(table);
776 Ok(QueryResult::Modified(1))
777 } else {
778 self.catalog
780 .insert(table, &values)
781 .map_err(|e| QueryError::StorageError(e.to_string()))?;
782 self.view_registry.mark_dependents_dirty(table);
783 Ok(QueryResult::Modified(1))
784 }
785 }
786
787 PlanNode::Update {
788 input,
789 table,
790 assignments,
791 } => {
792 let (col_indices, literal_vals): (Vec<usize>, Option<Vec<Value>>) = {
798 let schema_ref = self
799 .catalog
800 .schema(table)
801 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
802 let indices: Vec<usize> = assignments
803 .iter()
804 .map(|a| {
805 schema_ref.column_index(&a.field).ok_or_else(|| {
806 QueryError::ColumnNotFound {
807 table: String::new(),
808 column: a.field.clone(),
809 }
810 })
811 })
812 .collect::<Result<_, _>>()?;
813 let vals: Result<Vec<Value>, _> = assignments
814 .iter()
815 .map(|a| literal_to_value(&a.value))
816 .collect();
817 (indices, vals.ok())
818 };
819 let resolved_assignments: Option<Vec<(usize, Value)>> =
820 literal_vals.map(|vals| col_indices.iter().copied().zip(vals).collect());
821
822 let changed_cols: Vec<usize> = col_indices.clone();
825
826 if let Some(ref resolved_assignments) = resolved_assignments {
833 if let PlanNode::Filter {
834 input: inner,
835 predicate,
836 } = input.as_ref()
837 {
838 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
839 if t == table {
840 let fused_result = self.try_fused_scan_update(
841 table,
842 predicate,
843 resolved_assignments,
844 &changed_cols,
845 );
846 if let Some(result) = fused_result {
847 return result;
848 }
849 }
850 }
851 }
852 }
853
854 let matching_rids = self.collect_rids_for_mutation(input, table)?;
856
857 if let Some(ref resolved_assignments) = resolved_assignments {
859 let fast_patch: Option<Vec<FastPatch>> = {
865 let tbl = self
866 .catalog
867 .get_table(table)
868 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
869 let schema = &tbl.schema;
870 let all_fixed_nonnull = resolved_assignments.iter().all(|(idx, val)| {
871 is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty()
872 });
873 let no_indexed = !resolved_assignments
874 .iter()
875 .any(|(idx, _)| tbl.has_indexed_col(*idx));
876
877 if all_fixed_nonnull && no_indexed {
878 let layout = RowLayout::new(schema);
879 let bitmap_size = layout.bitmap_size();
880 let patches: Vec<FastPatch> = resolved_assignments
881 .iter()
882 .map(|(idx, val)| {
883 let fixed_off = layout
884 .fixed_offset(*idx)
885 .expect("is_fixed_size already checked");
886 let field_off = 2 + bitmap_size + fixed_off;
887 let bytes: FixedBytes = match val {
888 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
889 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
890 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
891 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
892 Value::Uuid(v) => FixedBytes::Uuid(*v),
893 _ => unreachable!("all_fixed_nonnull guard lied"),
894 };
895 FastPatch {
896 field_off,
897 bitmap_byte_off: 2 + idx / 8,
898 bit_mask: 1u8 << (idx % 8),
899 bytes,
900 }
901 })
902 .collect();
903 Some(patches)
904 } else {
905 None
906 }
907 };
908
909 if let Some(patches) = fast_patch {
910 let mut count = 0u64;
911 for rid in matching_rids {
912 let ok = self
917 .catalog
918 .update_row_bytes_logged(table, rid, |row| {
919 for p in &patches {
920 row[p.bitmap_byte_off] &= !p.bit_mask;
921 let field_bytes = p.bytes.as_slice();
922 row[p.field_off..p.field_off + field_bytes.len()]
923 .copy_from_slice(field_bytes);
924 }
925 })
926 .map_err(|e| QueryError::StorageError(e.to_string()))?;
927 if ok {
928 count += 1;
929 }
930 }
931 self.view_registry.mark_dependents_dirty(table);
932 return Ok(QueryResult::Modified(count));
933 }
934
935 let var_fast: Option<(usize, Option<Vec<u8>>)> = {
937 let tbl = self
938 .catalog
939 .get_table(table)
940 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
941 let schema = &tbl.schema;
942 let is_single = resolved_assignments.len() == 1;
943 let is_var_col = is_single
944 && !is_fixed_size(schema.columns[resolved_assignments[0].0].type_id);
945 let no_indexed = !resolved_assignments
946 .iter()
947 .any(|(idx, _)| tbl.has_indexed_col(*idx));
948
949 if is_single && is_var_col && no_indexed {
950 let (idx, val) = &resolved_assignments[0];
951 let bytes_opt: Option<Vec<u8>> = match val {
952 Value::Str(s) => Some(s.as_bytes().to_vec()),
953 Value::Bytes(b) => Some(b.clone()),
954 Value::Empty => None,
955 _ => {
956 return Err(QueryError::TypeError(format!(
957 "cannot assign non-var value to var column '{}'",
958 schema.columns[*idx].name
959 )))
960 }
961 };
962 Some((*idx, bytes_opt))
963 } else {
964 None
965 }
966 };
967
968 if let Some((col_idx, new_bytes_opt)) = var_fast {
969 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
970 let mut count = 0u64;
971 let mut fallback_rids: Vec<RowId> = Vec::new();
972 for rid in &matching_rids {
973 let ok = self
979 .catalog
980 .patch_var_col_logged(table, *rid, col_idx, new_bytes_ref)
981 .map_err(|e| QueryError::StorageError(e.to_string()))?;
982 if ok {
983 count += 1;
984 } else {
985 fallback_rids.push(*rid);
986 }
987 }
988 for rid in fallback_rids {
989 let mut row = match self.catalog.get(table, rid) {
990 Some(r) => r,
991 None => continue,
992 };
993 for (idx, val) in resolved_assignments.iter() {
994 row[*idx] = val.clone();
995 }
996 self.catalog
997 .update_hinted(table, rid, &row, Some(&changed_cols))
998 .map_err(|e| QueryError::StorageError(e.to_string()))?;
999 count += 1;
1000 }
1001 self.view_registry.mark_dependents_dirty(table);
1002 return Ok(QueryResult::Modified(count));
1003 }
1004
1005 let mut count = 0u64;
1007 for rid in matching_rids {
1008 let mut row = match self.catalog.get(table, rid) {
1009 Some(r) => r,
1010 None => continue,
1011 };
1012 for (idx, val) in resolved_assignments.iter() {
1013 row[*idx] = val.clone();
1014 }
1015 self.catalog
1016 .update_hinted(table, rid, &row, Some(&changed_cols))
1017 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1018 count += 1;
1019 }
1020 self.view_registry.mark_dependents_dirty(table);
1021 return Ok(QueryResult::Modified(count));
1022 } let col_names: Vec<String> = {
1028 let schema_ref = self
1029 .catalog
1030 .schema(table)
1031 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1032 schema_ref.columns.iter().map(|c| c.name.clone()).collect()
1033 };
1034 let mut count = 0u64;
1035 for rid in matching_rids {
1036 let mut row = match self.catalog.get(table, rid) {
1037 Some(r) => r,
1038 None => continue,
1039 };
1040 for (i, asgn) in assignments.iter().enumerate() {
1041 let val = eval_expr(&asgn.value, &row, &col_names);
1042 row[col_indices[i]] = val;
1043 }
1044 self.catalog
1045 .update_hinted(table, rid, &row, Some(&changed_cols))
1046 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1047 count += 1;
1048 }
1049 self.view_registry.mark_dependents_dirty(table);
1050 Ok(QueryResult::Modified(count))
1051 }
1052
1053 PlanNode::Delete { input, table } => {
1054 if let PlanNode::Filter {
1075 input: inner,
1076 predicate,
1077 } = input.as_ref()
1078 {
1079 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
1080 if t == table {
1081 let schema = self
1082 .catalog
1083 .schema(table)
1084 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1085 let columns: Vec<String> =
1086 schema.columns.iter().map(|c| c.name.clone()).collect();
1087 let fast = FastLayout::new(schema);
1088 if let Some(compiled) =
1089 compile_predicate(predicate, &columns, &fast, schema)
1090 {
1091 let count = self
1097 .catalog
1098 .scan_delete_matching_logged(table, |data| compiled(data))
1099 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1100 self.view_registry.mark_dependents_dirty(table);
1101 return Ok(QueryResult::Modified(count));
1102 }
1103 }
1104 }
1105 } else if let PlanNode::SeqScan { table: t } = input.as_ref() {
1106 if t == table {
1107 let count = self
1111 .catalog
1112 .scan_delete_matching_logged(table, |_| true)
1113 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1114 self.view_registry.mark_dependents_dirty(table);
1115 return Ok(QueryResult::Modified(count));
1116 }
1117 }
1118
1119 let matching_rids = self.collect_rids_for_mutation(input, table)?;
1120 let count = self
1121 .catalog
1122 .delete_many(table, &matching_rids)
1123 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1124 self.view_registry.mark_dependents_dirty(table);
1125 Ok(QueryResult::Modified(count))
1126 }
1127
1128 PlanNode::AliasScan { table, alias } => {
1129 let schema = self
1139 .catalog
1140 .schema(table)
1141 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1142 .clone();
1143 let columns: Vec<String> = schema
1144 .columns
1145 .iter()
1146 .map(|c| format!("{alias}.{}", c.name))
1147 .collect();
1148 let rows: Vec<Vec<Value>> = self
1149 .catalog
1150 .scan(table)
1151 .map_err(|e| QueryError::StorageError(e.to_string()))?
1152 .map(|(_, row)| row)
1153 .collect();
1154 Ok(QueryResult::Rows { columns, rows })
1155 }
1156
1157 PlanNode::NestedLoopJoin {
1158 left,
1159 right,
1160 on,
1161 kind,
1162 } => {
1163 let left_result = self.execute_plan(left)?;
1174 let right_result = self.execute_plan(right)?;
1175 let (left_columns, left_rows) = match left_result {
1176 QueryResult::Rows { columns, rows } => (columns, rows),
1177 _ => return Err("join left side must produce rows".into()),
1178 };
1179 let (right_columns, right_rows) = match right_result {
1180 QueryResult::Rows { columns, rows } => (columns, rows),
1181 _ => return Err("join right side must produce rows".into()),
1182 };
1183
1184 self.charge_rows(&left_rows)?;
1188 self.charge_rows(&right_rows)?;
1189
1190 if !matches!(kind, JoinKind::Cross) {
1192 if let Some(pred) = on {
1193 if let Some((l_idx, r_idx)) =
1194 try_extract_equi_join_keys(pred, &left_columns, &right_columns)
1195 {
1196 let result = hash_join(
1197 left_columns,
1198 left_rows,
1199 right_columns,
1200 right_rows,
1201 l_idx,
1202 r_idx,
1203 *kind,
1204 );
1205 if let QueryResult::Rows { ref rows, .. } = result {
1206 check_join_limit(rows.len())?;
1207 }
1208 return Ok(result);
1209 }
1210 }
1211 }
1212
1213 let n_left = left_columns.len();
1215 let n_right = right_columns.len();
1216 let mut columns = Vec::with_capacity(n_left + n_right);
1217 columns.extend(left_columns);
1218 columns.extend(right_columns);
1219
1220 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
1221 let mut combined: Vec<Value> = Vec::with_capacity(n_left + n_right);
1222
1223 for left_row in &left_rows {
1224 let mut matched = false;
1225 for right_row in &right_rows {
1226 combined.clear();
1227 combined.extend_from_slice(left_row);
1228 combined.extend_from_slice(right_row);
1229 let keep = match kind {
1230 JoinKind::Cross => true,
1231 JoinKind::Inner | JoinKind::LeftOuter => match on {
1232 Some(pred) => eval_predicate(pred, &combined, &columns),
1233 None => true,
1237 },
1238 JoinKind::RightOuter => {
1241 unreachable!("planner rewrites RightOuter to LeftOuter")
1242 }
1243 };
1244 if keep {
1245 rows.push(combined.clone());
1246 check_join_limit(rows.len())?;
1247 matched = true;
1248 }
1249 }
1250 if !matched && matches!(kind, JoinKind::LeftOuter) {
1251 let mut row = Vec::with_capacity(n_left + n_right);
1252 row.extend_from_slice(left_row);
1253 row.resize(n_left + n_right, Value::Empty);
1254 rows.push(row);
1255 check_join_limit(rows.len())?;
1256 }
1257 }
1258
1259 Ok(QueryResult::Rows { columns, rows })
1260 }
1261
1262 PlanNode::Distinct { input } => {
1263 let result = self.execute_plan(input)?;
1264 match result {
1265 QueryResult::Rows { columns, rows } => {
1266 let mut seen = std::collections::HashSet::new();
1267 let mut unique_rows = Vec::new();
1268 for row in rows {
1269 if seen.insert(row.clone()) {
1270 unique_rows.push(row);
1271 }
1272 }
1273 Ok(QueryResult::Rows {
1274 columns,
1275 rows: unique_rows,
1276 })
1277 }
1278 other => Ok(other),
1279 }
1280 }
1281
1282 PlanNode::GroupBy {
1283 input,
1284 keys,
1285 aggregates,
1286 having,
1287 } => {
1288 let result = self.execute_plan(input)?;
1289 match result {
1290 QueryResult::Rows { columns, rows } => {
1291 self.charge_rows(&rows)?;
1294 let key_indices: Vec<usize> = keys
1296 .iter()
1297 .map(|k| {
1298 columns
1299 .iter()
1300 .position(|c| c == k)
1301 .ok_or_else(|| format!("group-by column '{k}' not found"))
1302 })
1303 .collect::<Result<Vec<_>, _>>()?;
1304
1305 let agg_field_indices: Vec<usize> = aggregates
1309 .iter()
1310 .map(|a| {
1311 if a.field == "*" {
1312 Ok(usize::MAX)
1313 } else {
1314 columns.iter().position(|c| c == &a.field).ok_or_else(|| {
1315 format!("aggregate column '{}' not found", a.field)
1316 })
1317 }
1318 })
1319 .collect::<Result<Vec<_>, _>>()?;
1320
1321 let mut group_map: rustc_hash::FxHashMap<Vec<Value>, usize> =
1323 rustc_hash::FxHashMap::default();
1324 let mut groups: Vec<(Vec<Value>, Vec<usize>)> = Vec::new();
1325 for (ri, row) in rows.iter().enumerate() {
1326 let key: Vec<Value> =
1327 key_indices.iter().map(|&i| row[i].clone()).collect();
1328 match group_map.get(&key) {
1329 Some(&idx) => groups[idx].1.push(ri),
1330 None => {
1331 let idx = groups.len();
1332 group_map.insert(key.clone(), idx);
1333 groups.push((key, vec![ri]));
1334 }
1335 }
1336 }
1337
1338 let mut out_columns: Vec<String> = keys.clone();
1340 for agg in aggregates.iter() {
1341 out_columns.push(agg.output_name.clone());
1342 }
1343
1344 let mut out_rows: Vec<Vec<Value>> = Vec::with_capacity(groups.len());
1346 for (key_vals, row_indices) in &groups {
1347 let mut row = key_vals.clone();
1348 for (ai, agg) in aggregates.iter().enumerate() {
1349 let col_idx = agg_field_indices[ai];
1350 let val = compute_group_aggregate(
1351 agg.function,
1352 &rows,
1353 row_indices,
1354 col_idx,
1355 );
1356 row.push(val);
1357 }
1358 out_rows.push(row);
1359 }
1360
1361 if let Some(having_expr) = having {
1363 out_rows.retain(|row| eval_predicate(having_expr, row, &out_columns));
1364 }
1365
1366 Ok(QueryResult::Rows {
1367 columns: out_columns,
1368 rows: out_rows,
1369 })
1370 }
1371 _ => Err("group by requires row input".into()),
1372 }
1373 }
1374
1375 PlanNode::CreateTable { name, fields } => {
1376 let columns: Vec<ColumnDef> = fields
1377 .iter()
1378 .enumerate()
1379 .map(
1380 |(i, (fname, tname, req))| -> Result<ColumnDef, QueryError> {
1381 Ok(ColumnDef {
1382 name: fname.clone(),
1383 type_id: type_name_to_id(tname).map_err(QueryError::TypeError)?,
1384 required: *req,
1385 position: i as u16,
1386 })
1387 },
1388 )
1389 .collect::<Result<Vec<_>, _>>()?;
1390 let schema = Schema {
1391 table_name: name.clone(),
1392 columns,
1393 };
1394 self.catalog
1395 .create_table(schema)
1396 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1397 Ok(QueryResult::Created(name.clone()))
1398 }
1399
1400 PlanNode::AlterTable { table, action } => match action {
1401 AlterAction::AddColumn {
1402 name,
1403 type_name,
1404 required,
1405 } => {
1406 let position = self
1407 .catalog
1408 .schema(table)
1409 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1410 .columns
1411 .len() as u16;
1412 let col = ColumnDef {
1413 name: name.clone(),
1414 type_id: type_name_to_id(type_name).map_err(QueryError::TypeError)?,
1415 required: *required,
1416 position,
1417 };
1418 self.catalog
1419 .alter_table_add_column(table, col)
1420 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1421 Ok(QueryResult::Executed {
1422 message: format!("column '{name}' added to '{table}'"),
1423 })
1424 }
1425 AlterAction::DropColumn { name } => {
1426 self.catalog
1427 .alter_table_drop_column(table, name)
1428 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1429 Ok(QueryResult::Executed {
1430 message: format!("column '{name}' dropped from '{table}'"),
1431 })
1432 }
1433 AlterAction::AddIndex { column } => {
1434 self.catalog
1435 .create_index(table, column)
1436 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1437 Ok(QueryResult::Executed {
1438 message: format!("index on '{table}.{column}' created"),
1439 })
1440 }
1441 },
1442
1443 PlanNode::DropTable { name } => {
1444 self.catalog
1445 .drop_table(name)
1446 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1447 Ok(QueryResult::Executed {
1448 message: format!("table '{name}' dropped"),
1449 })
1450 }
1451
1452 PlanNode::CreateView { name, query_text } => {
1453 self.create_view(name, query_text)?;
1454 Ok(QueryResult::Executed {
1455 message: format!("materialized view '{name}' created"),
1456 })
1457 }
1458
1459 PlanNode::RefreshView { name } => {
1460 self.refresh_view(name)?;
1461 Ok(QueryResult::Executed {
1462 message: format!("materialized view '{name}' refreshed"),
1463 })
1464 }
1465
1466 PlanNode::DropView { name } => {
1467 self.drop_view(name)?;
1468 Ok(QueryResult::Executed {
1469 message: format!("materialized view '{name}' dropped"),
1470 })
1471 }
1472
1473 PlanNode::Window { input, windows } => {
1474 let result = self.execute_plan(input)?;
1475 execute_window(result, windows)
1476 }
1477
1478 PlanNode::Union { left, right, all } => {
1479 let left_result = self.execute_plan(left)?;
1480 let right_result = self.execute_plan(right)?;
1481 let (left_cols, left_rows) = match left_result {
1482 QueryResult::Rows { columns, rows } => (columns, rows),
1483 _ => return Err("UNION requires query results on left side".into()),
1484 };
1485 let (_, right_rows) = match right_result {
1486 QueryResult::Rows { columns, rows } => (columns, rows),
1487 _ => return Err("UNION requires query results on right side".into()),
1488 };
1489 let mut combined = left_rows;
1490 if *all {
1491 combined.extend(right_rows);
1493 } else {
1494 let mut seen = std::collections::HashSet::new();
1497 for row in &combined {
1498 seen.insert(row.clone());
1499 }
1500 for row in right_rows {
1501 if seen.insert(row.clone()) {
1502 combined.push(row);
1503 }
1504 }
1505 }
1506 Ok(QueryResult::Rows {
1507 columns: left_cols,
1508 rows: combined,
1509 })
1510 }
1511
1512 PlanNode::Explain { input } => {
1513 let text = format_plan_tree(input, 0);
1514 Ok(QueryResult::Rows {
1515 columns: vec!["plan".to_string()],
1516 rows: text
1517 .lines()
1518 .map(|line| vec![Value::Str(line.to_string())])
1519 .collect(),
1520 })
1521 }
1522
1523 PlanNode::Begin => {
1524 if self.in_transaction {
1525 return Err(QueryError::Execution(
1526 "already in a transaction (nested transactions not supported)".into(),
1527 ));
1528 }
1529 self.in_transaction = true;
1530 Ok(QueryResult::Executed {
1531 message: "transaction started".to_string(),
1532 })
1533 }
1534
1535 PlanNode::Commit => {
1536 if !self.in_transaction {
1537 return Err(QueryError::Execution(
1538 "no active transaction to commit".into(),
1539 ));
1540 }
1541 self.in_transaction = false;
1542 self.catalog
1543 .sync_wal()
1544 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1545 Ok(QueryResult::Executed {
1546 message: "transaction committed".to_string(),
1547 })
1548 }
1549
1550 PlanNode::Rollback => {
1551 if !self.in_transaction {
1552 return Err(QueryError::Execution(
1553 "no active transaction to roll back".into(),
1554 ));
1555 }
1556 self.in_transaction = false;
1557 self.catalog
1558 .rollback_to_last_sync()
1559 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1560 if let Ok(mut cache) = self.plan_cache.lock() {
1561 cache.clear();
1562 }
1563 self.view_registry = ViewRegistry::open(self.catalog.data_dir())
1564 .unwrap_or_else(|_| ViewRegistry::new(self.catalog.data_dir()));
1565 Ok(QueryResult::Executed {
1566 message: "transaction rolled back".to_string(),
1567 })
1568 }
1569
1570 PlanNode::IndexScan { table, column, key } => {
1571 let key_value = literal_to_value(key)?;
1572 let tbl = self
1573 .catalog
1574 .get_table(table)
1575 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1576 let columns: Vec<String> =
1577 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1578
1579 if tbl.has_index(column) {
1583 let rids = tbl.index_lookup_all(column, &key_value);
1584 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(rids.len());
1585 for rid in rids {
1586 if let Some(data) = tbl.heap.get(rid) {
1587 rows.push(decode_row(&tbl.schema, &data));
1588 }
1589 }
1590 return Ok(QueryResult::Rows { columns, rows });
1591 }
1592
1593 let schema = &tbl.schema;
1601 let fast = FastLayout::new(schema);
1602 let synth_pred = Expr::BinaryOp(
1603 Box::new(Expr::Field(column.clone())),
1604 BinOp::Eq,
1605 Box::new(key.clone()),
1606 );
1607 if let Some(compiled) = compile_predicate(&synth_pred, &columns, &fast, schema) {
1608 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1610 self.catalog
1611 .for_each_row_raw(table, |_rid, data| {
1612 if compiled(data) {
1613 rows.push(decode_row(schema, data));
1614 }
1615 })
1616 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1617 return Ok(QueryResult::Rows { columns, rows });
1618 }
1619
1620 let col_idx =
1622 schema
1623 .column_index(column)
1624 .ok_or_else(|| QueryError::ColumnNotFound {
1625 table: String::new(),
1626 column: column.clone(),
1627 })?;
1628 let rows: Vec<Vec<Value>> = tbl
1629 .scan()
1630 .filter_map(|(_, row)| {
1631 if row[col_idx] == key_value {
1632 Some(row)
1633 } else {
1634 None
1635 }
1636 })
1637 .collect();
1638 Ok(QueryResult::Rows { columns, rows })
1639 }
1640
1641 PlanNode::RangeScan {
1642 table,
1643 column,
1644 start,
1645 end,
1646 } => {
1647 let tbl = self
1648 .catalog
1649 .get_table(table)
1650 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
1651 let columns: Vec<String> =
1652 tbl.schema.columns.iter().map(|c| c.name.clone()).collect();
1653 let schema = &tbl.schema;
1654
1655 let start_val = match start {
1656 Some((expr, _)) => Some(literal_to_value(expr)?),
1657 None => None,
1658 };
1659 let end_val = match end {
1660 Some((expr, _)) => Some(literal_to_value(expr)?),
1661 None => None,
1662 };
1663 let start_inclusive = start.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1664 let end_inclusive = end.as_ref().map(|(_, inc)| *inc).unwrap_or(true);
1665
1666 if tbl.is_index_unique(column) == Some(true) {
1670 if let Some(btree) = tbl.index(column) {
1671 let hits: Vec<(Value, RowId)> = match (&start_val, &end_val) {
1672 (Some(s), Some(e)) => btree.range(s, e).collect(),
1673 (Some(s), None) => btree.range_from(s),
1674 (None, Some(e)) => btree.range_to(e),
1675 (None, None) => {
1676 let rows: Vec<Vec<Value>> =
1677 tbl.scan().map(|(_, row)| row).collect();
1678 return Ok(QueryResult::Rows { columns, rows });
1679 }
1680 };
1681 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(hits.len());
1682 for (key, rid) in hits {
1683 if !start_inclusive {
1684 if let Some(ref s) = start_val {
1685 if &key == s {
1686 continue;
1687 }
1688 }
1689 }
1690 if !end_inclusive {
1691 if let Some(ref e) = end_val {
1692 if &key == e {
1693 continue;
1694 }
1695 }
1696 }
1697 if let Some(data) = tbl.heap.get(rid) {
1698 rows.push(decode_row(schema, &data));
1699 }
1700 }
1701 return Ok(QueryResult::Rows { columns, rows });
1702 }
1703 }
1704
1705 let fast = FastLayout::new(schema);
1707 let synth = synthesize_range_predicate(column, start, end);
1708 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
1709 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(64);
1710 self.catalog
1711 .for_each_row_raw(table, |_rid, data| {
1712 if compiled(data) {
1713 rows.push(decode_row(schema, data));
1714 }
1715 })
1716 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1717 return Ok(QueryResult::Rows { columns, rows });
1718 }
1719
1720 let col_idx =
1721 schema
1722 .column_index(column)
1723 .ok_or_else(|| QueryError::ColumnNotFound {
1724 table: String::new(),
1725 column: column.clone(),
1726 })?;
1727 let rows: Vec<Vec<Value>> = tbl
1728 .scan()
1729 .filter(|(_, row)| {
1730 range_matches(
1731 &row[col_idx],
1732 &start_val,
1733 start_inclusive,
1734 &end_val,
1735 end_inclusive,
1736 )
1737 })
1738 .map(|(_, row)| row)
1739 .collect();
1740 Ok(QueryResult::Rows { columns, rows })
1741 }
1742 }
1743 }
1744
1745 fn create_view(&mut self, name: &str, query_text: &str) -> Result<(), QueryError> {
1750 if self.view_registry.is_view(name) {
1751 return Err(QueryError::ViewError(format!(
1752 "materialized view '{name}' already exists"
1753 )));
1754 }
1755 let result = self.execute_powql(query_text)?;
1757 let (columns, rows) = match result {
1758 QueryResult::Rows { columns, rows } => (columns, rows),
1759 _ => return Err("view source query must be a SELECT".into()),
1760 };
1761 let schema = self.derive_view_schema(name, &columns, &rows);
1763 self.catalog
1765 .create_table(schema)
1766 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1767 for row in &rows {
1768 self.catalog
1769 .insert(name, row)
1770 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1771 }
1772 let depends_on = self.extract_view_deps(query_text);
1774 self.view_registry
1775 .register(ViewDef {
1776 name: name.to_string(),
1777 query: query_text.to_string(),
1778 depends_on,
1779 dirty: false,
1780 })
1781 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1782 Ok(())
1783 }
1784
1785 fn refresh_view(&mut self, name: &str) -> Result<(), QueryError> {
1788 let def = self
1789 .view_registry
1790 .get(name)
1791 .ok_or_else(|| format!("materialized view '{name}' not found"))?;
1792 let query_text = def.query.clone();
1793 let result = self.execute_powql(&query_text)?;
1795 let (_columns, rows) = match result {
1796 QueryResult::Rows { columns, rows } => (columns, rows),
1797 _ => return Err("view source query must be a SELECT".into()),
1798 };
1799 self.catalog
1803 .scan_delete_matching_logged(name, |_| true)
1804 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1805 for row in &rows {
1806 self.catalog
1807 .insert(name, row)
1808 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1809 }
1810 self.view_registry.mark_clean(name);
1811 Ok(())
1812 }
1813
1814 fn drop_view(&mut self, name: &str) -> Result<(), QueryError> {
1816 if !self.view_registry.is_view(name) {
1817 return Err(QueryError::ViewError(format!(
1818 "materialized view '{name}' not found"
1819 )));
1820 }
1821 self.view_registry
1822 .unregister(name)
1823 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1824 self.catalog
1825 .drop_table(name)
1826 .map_err(|e| QueryError::StorageError(e.to_string()))?;
1827 Ok(())
1828 }
1829
1830 fn derive_view_schema(&self, name: &str, columns: &[String], rows: &[Vec<Value>]) -> Schema {
1833 use powdb_storage::types::{ColumnDef, TypeId};
1834 let cols: Vec<ColumnDef> = columns
1835 .iter()
1836 .enumerate()
1837 .map(|(i, col_name)| {
1838 let type_id = rows
1839 .first()
1840 .and_then(|row| row.get(i))
1841 .map(|v| v.type_id())
1842 .unwrap_or(TypeId::Str);
1843 ColumnDef {
1844 name: col_name.clone(),
1845 type_id,
1846 required: false,
1847 position: i as u16,
1848 }
1849 })
1850 .collect();
1851 Schema {
1852 table_name: name.to_string(),
1853 columns: cols,
1854 }
1855 }
1856
1857 fn extract_view_deps(&self, query_text: &str) -> Vec<String> {
1860 use crate::parser::parse;
1861 match parse(query_text) {
1862 Ok(Statement::Query(q)) => {
1863 let mut deps = vec![q.source.clone()];
1864 for j in &q.joins {
1865 deps.push(j.source.clone());
1866 }
1867 deps
1868 }
1869 _ => Vec::new(),
1870 }
1871 }
1872
1873 pub(super) fn agg_single_col_fast(
1883 &self,
1884 table: &str,
1885 col: &str,
1886 function: AggFunc,
1887 predicate: Option<&Expr>,
1888 ) -> Result<Option<QueryResult>, QueryError> {
1889 let schema = self
1890 .catalog
1891 .schema(table)
1892 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
1893 .clone();
1894 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
1895 let col_idx = match schema.column_index(col) {
1896 Some(i) => i,
1897 None => return Ok(None),
1898 };
1899 let col_type = schema.columns[col_idx].type_id;
1906 if col_type != TypeId::Int && col_type != TypeId::Float {
1907 return Ok(None);
1908 }
1909
1910 let fast = FastLayout::new(&schema);
1911 let byte_offset = match fast.fixed_offsets[col_idx] {
1916 Some(o) => o,
1917 None => return Ok(None),
1918 };
1919 let bitmap_byte = col_idx / 8;
1920 let bitmap_bit = (col_idx % 8) as u32;
1921 let data_offset = 2 + fast.bitmap_size + byte_offset;
1922
1923 let compiled_pred: Option<CompiledPredicate> = match predicate {
1925 Some(pred) => match compile_predicate(pred, &columns, &fast, &schema) {
1926 Some(c) => Some(c),
1927 None => return Ok(None), },
1929 None => None,
1930 };
1931
1932 let result = match col_type {
1959 TypeId::Int => match function {
1960 AggFunc::Sum | AggFunc::Avg => {
1961 let mut sum_i128: i128 = 0;
1962 let mut count: i64 = 0;
1963 agg_int_loop!(
1964 self,
1965 table,
1966 compiled_pred,
1967 bitmap_byte,
1968 bitmap_bit,
1969 data_offset,
1970 |v: i64| {
1971 count += 1;
1972 sum_i128 += v as i128;
1973 }
1974 );
1975 if matches!(function, AggFunc::Sum) {
1976 let clamped = sum_i128.clamp(i64::MIN as i128, i64::MAX as i128) as i64;
1977 QueryResult::Scalar(Value::Int(clamped))
1978 } else if count == 0 {
1979 QueryResult::Scalar(Value::Empty)
1980 } else {
1981 let avg = (sum_i128 as f64) / (count as f64);
1982 QueryResult::Scalar(Value::Float(avg))
1983 }
1984 }
1985 AggFunc::Min => {
1986 let mut min_v: Option<i64> = None;
1987 agg_int_loop!(
1988 self,
1989 table,
1990 compiled_pred,
1991 bitmap_byte,
1992 bitmap_bit,
1993 data_offset,
1994 |v: i64| {
1995 min_v = Some(match min_v {
1996 Some(m) => m.min(v),
1997 None => v,
1998 });
1999 }
2000 );
2001 QueryResult::Scalar(min_v.map(Value::Int).unwrap_or(Value::Empty))
2002 }
2003 AggFunc::Max => {
2004 let mut max_v: Option<i64> = None;
2005 agg_int_loop!(
2006 self,
2007 table,
2008 compiled_pred,
2009 bitmap_byte,
2010 bitmap_bit,
2011 data_offset,
2012 |v: i64| {
2013 max_v = Some(match max_v {
2014 Some(m) => m.max(v),
2015 None => v,
2016 });
2017 }
2018 );
2019 QueryResult::Scalar(max_v.map(Value::Int).unwrap_or(Value::Empty))
2020 }
2021 AggFunc::Count => {
2022 let mut count: i64 = 0;
2023 agg_int_loop!(
2024 self,
2025 table,
2026 compiled_pred,
2027 bitmap_byte,
2028 bitmap_bit,
2029 data_offset,
2030 |_v: i64| {
2031 count += 1;
2032 }
2033 );
2034 QueryResult::Scalar(Value::Int(count))
2035 }
2036 AggFunc::CountDistinct => {
2037 let mut seen = rustc_hash::FxHashSet::default();
2038 agg_int_loop!(
2039 self,
2040 table,
2041 compiled_pred,
2042 bitmap_byte,
2043 bitmap_bit,
2044 data_offset,
2045 |v: i64| {
2046 seen.insert(v);
2047 }
2048 );
2049 QueryResult::Scalar(Value::Int(seen.len() as i64))
2050 }
2051 },
2052 TypeId::Float => match function {
2053 AggFunc::Sum => {
2054 let mut sum: f64 = 0.0;
2059 agg_float_loop!(
2060 self,
2061 table,
2062 compiled_pred,
2063 bitmap_byte,
2064 bitmap_bit,
2065 data_offset,
2066 |v: f64| {
2067 sum += v;
2068 }
2069 );
2070 QueryResult::Scalar(Value::Float(sum))
2071 }
2072 AggFunc::Avg => {
2073 let mut sum: f64 = 0.0;
2074 let mut count: i64 = 0;
2075 agg_float_loop!(
2076 self,
2077 table,
2078 compiled_pred,
2079 bitmap_byte,
2080 bitmap_bit,
2081 data_offset,
2082 |v: f64| {
2083 sum += v;
2084 count += 1;
2085 }
2086 );
2087 if count == 0 {
2088 QueryResult::Scalar(Value::Empty)
2089 } else {
2090 QueryResult::Scalar(Value::Float(sum / count as f64))
2091 }
2092 }
2093 AggFunc::Min => {
2094 let mut min_v: Option<f64> = None;
2098 agg_float_loop!(
2099 self,
2100 table,
2101 compiled_pred,
2102 bitmap_byte,
2103 bitmap_bit,
2104 data_offset,
2105 |v: f64| {
2106 min_v = Some(match min_v {
2107 Some(m) => {
2108 if v.total_cmp(&m).is_lt() {
2109 v
2110 } else {
2111 m
2112 }
2113 }
2114 None => v,
2115 });
2116 }
2117 );
2118 QueryResult::Scalar(min_v.map(Value::Float).unwrap_or(Value::Empty))
2119 }
2120 AggFunc::Max => {
2121 let mut max_v: Option<f64> = None;
2122 agg_float_loop!(
2123 self,
2124 table,
2125 compiled_pred,
2126 bitmap_byte,
2127 bitmap_bit,
2128 data_offset,
2129 |v: f64| {
2130 max_v = Some(match max_v {
2131 Some(m) => {
2132 if v.total_cmp(&m).is_gt() {
2133 v
2134 } else {
2135 m
2136 }
2137 }
2138 None => v,
2139 });
2140 }
2141 );
2142 QueryResult::Scalar(max_v.map(Value::Float).unwrap_or(Value::Empty))
2143 }
2144 AggFunc::Count => {
2145 let mut count: i64 = 0;
2146 agg_float_loop!(
2147 self,
2148 table,
2149 compiled_pred,
2150 bitmap_byte,
2151 bitmap_bit,
2152 data_offset,
2153 |_v: f64| {
2154 count += 1;
2155 }
2156 );
2157 QueryResult::Scalar(Value::Int(count))
2158 }
2159 AggFunc::CountDistinct => {
2160 let mut seen = rustc_hash::FxHashSet::default();
2166 agg_float_loop!(
2167 self,
2168 table,
2169 compiled_pred,
2170 bitmap_byte,
2171 bitmap_bit,
2172 data_offset,
2173 |v: f64| {
2174 seen.insert(v.to_bits());
2175 }
2176 );
2177 QueryResult::Scalar(Value::Int(seen.len() as i64))
2178 }
2179 },
2180 _ => unreachable!("type guard above restricts to Int/Float"),
2181 };
2182 Ok(Some(result))
2183 }
2184
2185 pub(super) fn project_filter_limit_fast(
2188 &self,
2189 table: &str,
2190 fields: &[ProjectField],
2191 limit: usize,
2192 predicate: Option<&Expr>,
2193 ) -> Result<Option<QueryResult>, QueryError> {
2194 let schema = self
2195 .catalog
2196 .schema(table)
2197 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2198 .clone();
2199 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2200
2201 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2204 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2205 for f in fields {
2206 let name = match &f.expr {
2207 Expr::Field(n) => n.clone(),
2208 _ => return Ok(None),
2209 };
2210 let idx = match all_columns.iter().position(|c| c == &name) {
2211 Some(i) => i,
2212 None => return Ok(None),
2213 };
2214 proj_indices.push(idx);
2215 proj_columns.push(f.alias.clone().unwrap_or(name));
2216 }
2217
2218 let fast = FastLayout::new(&schema);
2219 let row_layout = RowLayout::new(&schema);
2220
2221 let compiled_pred: Option<CompiledPredicate> = match predicate {
2222 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2223 Some(c) => Some(c),
2224 None => return Ok(None),
2225 },
2226 None => None,
2227 };
2228
2229 let mut out: Vec<Vec<Value>> = Vec::with_capacity(limit.min(1024));
2230 self.catalog
2235 .try_for_each_row_raw(table, |_rid, data| {
2236 use std::ops::ControlFlow;
2237 if let Some(ref pred) = compiled_pred {
2238 if !pred(data) {
2239 return ControlFlow::Continue(());
2240 }
2241 }
2242 let row: Vec<Value> = proj_indices
2243 .iter()
2244 .map(|&ci| decode_column(&schema, &row_layout, data, ci))
2245 .collect();
2246 out.push(row);
2247 if out.len() >= limit {
2248 ControlFlow::Break(())
2249 } else {
2250 ControlFlow::Continue(())
2251 }
2252 })
2253 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2254
2255 Ok(Some(QueryResult::Rows {
2256 columns: proj_columns,
2257 rows: out,
2258 }))
2259 }
2260
2261 pub(super) fn project_filter_sort_limit_fast(
2266 &self,
2267 table: &str,
2268 fields: &[ProjectField],
2269 sort_field: &str,
2270 descending: bool,
2271 limit: usize,
2272 predicate: Option<&Expr>,
2273 ) -> Result<Option<QueryResult>, QueryError> {
2274 if limit == 0 {
2275 return Ok(None);
2278 }
2279 let schema = self
2280 .catalog
2281 .schema(table)
2282 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?
2283 .clone();
2284 let all_columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2285
2286 let sort_idx = match schema.column_index(sort_field) {
2293 Some(i) => i,
2294 None => return Ok(None),
2295 };
2296 let sort_col_type = schema.columns[sort_idx].type_id;
2297 if sort_col_type != TypeId::Int && sort_col_type != TypeId::Float {
2298 return Ok(None);
2299 }
2300
2301 let mut proj_indices: Vec<usize> = Vec::with_capacity(fields.len());
2303 let mut proj_columns: Vec<String> = Vec::with_capacity(fields.len());
2304 for f in fields {
2305 let name = match &f.expr {
2306 Expr::Field(n) => n.clone(),
2307 _ => return Ok(None),
2308 };
2309 let idx = match all_columns.iter().position(|c| c == &name) {
2310 Some(i) => i,
2311 None => return Ok(None),
2312 };
2313 proj_indices.push(idx);
2314 proj_columns.push(f.alias.clone().unwrap_or(name));
2315 }
2316
2317 let fast = FastLayout::new(&schema);
2318 let row_layout = RowLayout::new(&schema);
2319 let sort_byte_offset = match fast.fixed_offsets[sort_idx] {
2321 Some(o) => o,
2322 None => return Ok(None),
2323 };
2324 let sort_bitmap_byte = sort_idx / 8;
2325 let sort_bitmap_bit = (sort_idx % 8) as u32;
2326 let sort_data_offset = 2 + fast.bitmap_size + sort_byte_offset;
2327
2328 let compiled_pred: Option<CompiledPredicate> = match predicate {
2329 Some(pred) => match compile_predicate(pred, &all_columns, &fast, &schema) {
2330 Some(c) => Some(c),
2331 None => return Ok(None),
2332 },
2333 None => None,
2334 };
2335
2336 let drained: Vec<Vec<u8>> = match sort_col_type {
2345 TypeId::Int => {
2346 let mut seq: u64 = 0;
2347 let mut heap_desc: BinaryHeap<Reverse<(i64, u64, Vec<u8>)>> =
2348 BinaryHeap::with_capacity(limit);
2349 let mut heap_asc: BinaryHeap<(i64, u64, Vec<u8>)> =
2350 BinaryHeap::with_capacity(limit);
2351
2352 self.catalog
2353 .for_each_row_raw(table, |_rid, data| {
2354 if let Some(ref pred) = compiled_pred {
2355 if !pred(data) {
2356 return;
2357 }
2358 }
2359 if data.len() < sort_data_offset + 8 {
2361 return;
2362 }
2363 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2364 if is_null {
2365 return;
2366 }
2367 let key = i64::from_le_bytes(
2368 data[sort_data_offset..sort_data_offset + 8]
2369 .try_into()
2370 .unwrap_or_else(|_| unreachable!()),
2371 );
2372 let id = seq;
2373 seq += 1;
2374
2375 if descending {
2376 if heap_desc.len() < limit {
2377 heap_desc.push(Reverse((key, id, data.to_vec())));
2378 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2379 if key > *top_key {
2380 heap_desc.pop();
2381 heap_desc.push(Reverse((key, id, data.to_vec())));
2382 }
2383 }
2384 } else if heap_asc.len() < limit {
2385 heap_asc.push((key, id, data.to_vec()));
2386 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2387 if key < *top_key {
2388 heap_asc.pop();
2389 heap_asc.push((key, id, data.to_vec()));
2390 }
2391 }
2392 })
2393 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2394
2395 let mut drained: Vec<(i64, u64, Vec<u8>)> = if descending {
2396 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2397 } else {
2398 heap_asc.into_iter().collect()
2399 };
2400 if descending {
2401 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2402 } else {
2403 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2404 }
2405 drained.into_iter().map(|(_, _, d)| d).collect()
2406 }
2407 TypeId::Float => {
2408 let mut seq: u64 = 0;
2417 let mut heap_desc: BinaryHeap<Reverse<(u64, u64, Vec<u8>)>> =
2418 BinaryHeap::with_capacity(limit);
2419 let mut heap_asc: BinaryHeap<(u64, u64, Vec<u8>)> =
2420 BinaryHeap::with_capacity(limit);
2421
2422 self.catalog
2423 .for_each_row_raw(table, |_rid, data| {
2424 if let Some(ref pred) = compiled_pred {
2425 if !pred(data) {
2426 return;
2427 }
2428 }
2429 if data.len() < sort_data_offset + 8 {
2430 return;
2431 }
2432 let is_null = (data[2 + sort_bitmap_byte] >> sort_bitmap_bit) & 1 == 1;
2433 if is_null {
2434 return;
2435 }
2436 let bits = u64::from_le_bytes(
2437 data[sort_data_offset..sort_data_offset + 8]
2438 .try_into()
2439 .unwrap_or_else(|_| unreachable!()),
2440 );
2441 let key = f64_bits_to_sortable_u64(bits);
2442 let id = seq;
2443 seq += 1;
2444
2445 if descending {
2446 if heap_desc.len() < limit {
2447 heap_desc.push(Reverse((key, id, data.to_vec())));
2448 } else if let Some(Reverse((top_key, _, _))) = heap_desc.peek() {
2449 if key > *top_key {
2450 heap_desc.pop();
2451 heap_desc.push(Reverse((key, id, data.to_vec())));
2452 }
2453 }
2454 } else if heap_asc.len() < limit {
2455 heap_asc.push((key, id, data.to_vec()));
2456 } else if let Some((top_key, _, _)) = heap_asc.peek() {
2457 if key < *top_key {
2458 heap_asc.pop();
2459 heap_asc.push((key, id, data.to_vec()));
2460 }
2461 }
2462 })
2463 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2464
2465 let mut drained: Vec<(u64, u64, Vec<u8>)> = if descending {
2466 heap_desc.into_iter().map(|Reverse(t)| t).collect()
2467 } else {
2468 heap_asc.into_iter().collect()
2469 };
2470 if descending {
2471 drained.sort_unstable_by(|a, b| b.0.cmp(&a.0).then(a.1.cmp(&b.1)));
2472 } else {
2473 drained.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(a.1.cmp(&b.1)));
2474 }
2475 drained.into_iter().map(|(_, _, d)| d).collect()
2476 }
2477 _ => unreachable!("type guard above restricts to Int/Float"),
2478 };
2479
2480 let rows: Vec<Vec<Value>> = drained
2481 .into_iter()
2482 .map(|data| {
2483 proj_indices
2484 .iter()
2485 .map(|&ci| decode_column(&schema, &row_layout, &data, ci))
2486 .collect()
2487 })
2488 .collect();
2489
2490 Ok(Some(QueryResult::Rows {
2491 columns: proj_columns,
2492 rows,
2493 }))
2494 }
2495
2496 fn try_fused_scan_update(
2513 &mut self,
2514 table: &str,
2515 predicate: &Expr,
2516 resolved: &[(usize, Value)],
2517 changed_cols: &[usize],
2518 ) -> Option<Result<QueryResult, QueryError>> {
2519 let compiled = {
2522 let schema = self.catalog.schema(table)?;
2523 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2524 let fast = FastLayout::new(schema);
2525 compile_predicate(predicate, &columns, &fast, schema)?
2526 };
2527
2528 let fixed_patches: Option<Vec<FastPatch>> = {
2530 let tbl = self.catalog.get_table(table)?;
2531 let schema = &tbl.schema;
2532 let all_fixed_nonnull = resolved
2533 .iter()
2534 .all(|(idx, val)| is_fixed_size(schema.columns[*idx].type_id) && !val.is_empty());
2535 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2536 if all_fixed_nonnull && no_indexed {
2537 let layout = RowLayout::new(schema);
2538 let bitmap_size = layout.bitmap_size();
2539 Some(
2540 resolved
2541 .iter()
2542 .map(|(idx, val)| {
2543 let fixed_off = layout
2544 .fixed_offset(*idx)
2545 .expect("is_fixed_size already checked");
2546 let field_off = 2 + bitmap_size + fixed_off;
2547 let bytes: FixedBytes = match val {
2548 Value::Int(v) => FixedBytes::I64(v.to_le_bytes()),
2549 Value::Float(v) => FixedBytes::F64(v.to_le_bytes()),
2550 Value::Bool(v) => FixedBytes::Bool(if *v { 1 } else { 0 }),
2551 Value::DateTime(v) => FixedBytes::I64(v.to_le_bytes()),
2552 Value::Uuid(v) => FixedBytes::Uuid(*v),
2553 _ => unreachable!("all_fixed_nonnull guard"),
2554 };
2555 FastPatch {
2556 field_off,
2557 bitmap_byte_off: 2 + idx / 8,
2558 bit_mask: 1u8 << (idx % 8),
2559 bytes,
2560 }
2561 })
2562 .collect(),
2563 )
2564 } else {
2565 None
2566 }
2567 };
2568 if let Some(patches) = fixed_patches {
2569 let result = self
2570 .catalog
2571 .scan_patch_matching_logged(table, compiled, |row| {
2572 for p in &patches {
2573 row[p.bitmap_byte_off] &= !p.bit_mask;
2574 let field_bytes = p.bytes.as_slice();
2575 row[p.field_off..p.field_off + field_bytes.len()]
2576 .copy_from_slice(field_bytes);
2577 }
2578 Some(row.len() as u16)
2579 })
2580 .map_err(|e| e.to_string());
2581 match result {
2582 Ok((count, _)) => {
2583 self.view_registry.mark_dependents_dirty(table);
2584 return Some(Ok(QueryResult::Modified(count)));
2585 }
2586 Err(e) => return Some(Err(QueryError::Execution(e))),
2587 }
2588 }
2589
2590 let var_patch: Option<(usize, Option<Vec<u8>>)> = {
2592 let tbl = self.catalog.get_table(table)?;
2593 let schema = &tbl.schema;
2594 let is_single = resolved.len() == 1;
2595 let is_var = is_single && !is_fixed_size(schema.columns[resolved[0].0].type_id);
2596 let no_indexed = !resolved.iter().any(|(idx, _)| tbl.has_indexed_col(*idx));
2597 if is_single && is_var && no_indexed {
2598 let (idx, val) = &resolved[0];
2599 let bytes_opt = match val {
2600 Value::Str(s) => Some(s.as_bytes().to_vec()),
2601 Value::Bytes(b) => Some(b.clone()),
2602 Value::Empty => None,
2603 _ => return None, };
2605 Some((*idx, bytes_opt))
2606 } else {
2607 None
2608 }
2609 };
2610 if let Some((col_idx, ref new_bytes_opt)) = var_patch {
2611 let layout = {
2613 let schema = self.catalog.schema(table)?;
2614 RowLayout::new(schema)
2615 };
2616 let new_bytes_ref: Option<&[u8]> = new_bytes_opt.as_deref();
2617 let result = self
2618 .catalog
2619 .scan_patch_matching_logged(table, compiled, |row| {
2620 patch_var_column_in_place(row, &layout, col_idx, new_bytes_ref)
2621 })
2622 .map_err(|e| e.to_string());
2623 match result {
2624 Ok((mut count, fallback_rids)) => {
2625 for rid in fallback_rids {
2627 let mut row = match self.catalog.get(table, rid) {
2628 Some(r) => r,
2629 None => continue,
2630 };
2631 for (idx, val) in resolved.iter() {
2632 row[*idx] = val.clone();
2633 }
2634 self.catalog
2635 .update_hinted(table, rid, &row, Some(changed_cols))
2636 .map_err(|e| e.to_string())
2637 .ok();
2638 count += 1;
2639 }
2640 self.view_registry.mark_dependents_dirty(table);
2641 return Some(Ok(QueryResult::Modified(count)));
2642 }
2643 Err(e) => return Some(Err(QueryError::Execution(e))),
2644 }
2645 }
2646
2647 None }
2649
2650 fn collect_rids_for_mutation(
2656 &mut self,
2657 input: &PlanNode,
2658 table: &str,
2659 ) -> Result<Vec<RowId>, QueryError> {
2660 match input {
2661 PlanNode::SeqScan { table: t } if t == table => {
2662 let rids: Vec<RowId> = self
2664 .catalog
2665 .scan(table)
2666 .map_err(|e| QueryError::StorageError(e.to_string()))?
2667 .map(|(rid, _)| rid)
2668 .collect();
2669 Ok(rids)
2670 }
2671 PlanNode::IndexScan {
2672 table: t,
2673 column,
2674 key,
2675 } if t == table => {
2676 let key_value = literal_to_value(key)?;
2677
2678 {
2687 let tbl = self
2688 .catalog
2689 .get_table(table)
2690 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2691 if tbl.has_index(column) {
2692 let rids = tbl.index_lookup_all(column, &key_value);
2693 return Ok(rids);
2694 }
2695 }
2696
2697 let schema = self
2702 .catalog
2703 .schema(table)
2704 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2705 let columns: Vec<String> = schema.columns.iter().map(|c| c.name.clone()).collect();
2706 let fast = FastLayout::new(schema);
2707 let synth = Expr::BinaryOp(
2708 Box::new(Expr::Field(column.clone())),
2709 BinOp::Eq,
2710 Box::new(key.clone()),
2711 );
2712 if let Some(compiled) = compile_predicate(&synth, &columns, &fast, schema) {
2713 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2715 self.catalog
2716 .for_each_row_raw(table, |rid, data| {
2717 if compiled(data) {
2718 rids.push(rid);
2719 }
2720 })
2721 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2722 return Ok(rids);
2723 }
2724
2725 let col_idx =
2727 schema
2728 .column_index(column)
2729 .ok_or_else(|| QueryError::ColumnNotFound {
2730 table: String::new(),
2731 column: column.clone(),
2732 })?;
2733 let rids: Vec<RowId> = self
2734 .catalog
2735 .scan(table)
2736 .map_err(|e| QueryError::StorageError(e.to_string()))?
2737 .filter_map(|(rid, row)| {
2738 if row[col_idx] == key_value {
2739 Some(rid)
2740 } else {
2741 None
2742 }
2743 })
2744 .collect();
2745 Ok(rids)
2746 }
2747 PlanNode::Filter {
2748 input: inner,
2749 predicate,
2750 } => {
2751 if let PlanNode::SeqScan { table: t } = inner.as_ref() {
2752 if t != table {
2753 return self.generic_rid_match(input, table);
2754 }
2755 let schema = self
2756 .catalog
2757 .schema(table)
2758 .ok_or_else(|| QueryError::TableNotFound(table.to_string()))?;
2759 let columns: Vec<String> =
2760 schema.columns.iter().map(|c| c.name.clone()).collect();
2761 let fast = FastLayout::new(schema);
2762 let row_layout = RowLayout::new(schema);
2763
2764 if let Some(compiled) = compile_predicate(predicate, &columns, &fast, schema) {
2766 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2768 self.catalog
2769 .for_each_row_raw(table, |rid, data| {
2770 if compiled(data) {
2771 rids.push(rid);
2772 }
2773 })
2774 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2775 return Ok(rids);
2776 }
2777
2778 let pred_cols = predicate_column_indices(predicate, &columns);
2780 let mut rids: Vec<RowId> = Vec::with_capacity(64);
2781 self.catalog
2782 .for_each_row_raw(table, |rid, data| {
2783 let pred_row = decode_selective(schema, &row_layout, data, &pred_cols);
2784 if eval_predicate(predicate, &pred_row, &columns) {
2785 rids.push(rid);
2786 }
2787 })
2788 .map_err(|e| QueryError::StorageError(e.to_string()))?;
2789 return Ok(rids);
2790 }
2791 self.generic_rid_match(input, table)
2792 }
2793 _ => self.generic_rid_match(input, table),
2794 }
2795 }
2796
2797 fn generic_rid_match(
2801 &mut self,
2802 input: &PlanNode,
2803 table: &str,
2804 ) -> Result<Vec<RowId>, QueryError> {
2805 let result = self.execute_plan(input)?;
2806 let rows = match result {
2807 QueryResult::Rows { rows, .. } => rows,
2808 _ => return Err("mutation source must be rows".into()),
2809 };
2810 let matching: Vec<RowId> = self
2811 .catalog
2812 .scan(table)
2813 .map_err(|e| QueryError::StorageError(e.to_string()))?
2814 .filter(|(_, row)| rows.iter().any(|r| r == row))
2815 .map(|(rid, _)| rid)
2816 .collect();
2817 Ok(matching)
2818 }
2819}
2820
2821pub(super) fn execute_window(
2822 result: QueryResult,
2823 windows: &[WindowDef],
2824) -> Result<QueryResult, QueryError> {
2825 let (mut columns, mut rows) = match result {
2826 QueryResult::Rows { columns, rows } => (columns, rows),
2827 _ => return Err("window function requires row input".into()),
2828 };
2829
2830 for wdef in windows {
2831 let part_indices: Vec<usize> = wdef
2833 .partition_by
2834 .iter()
2835 .map(|name| {
2836 columns
2837 .iter()
2838 .position(|c| c == name)
2839 .ok_or_else(|| format!("window partition column '{name}' not found"))
2840 })
2841 .collect::<Result<Vec<_>, _>>()?;
2842
2843 let ord_indices: Vec<(usize, bool)> = wdef
2844 .order_by
2845 .iter()
2846 .map(|sk| {
2847 columns
2848 .iter()
2849 .position(|c| c == &sk.field)
2850 .map(|i| (i, sk.descending))
2851 .ok_or_else(|| format!("window order column '{}' not found", sk.field))
2852 })
2853 .collect::<Result<Vec<_>, _>>()?;
2854
2855 let arg_col_idx: Option<usize> = if let Some(arg) = wdef.args.first() {
2857 match arg {
2858 Expr::Field(name) => {
2859 if name == "*" {
2860 None } else {
2862 Some(
2863 columns
2864 .iter()
2865 .position(|c| c == name)
2866 .ok_or_else(|| format!("window arg column '{name}' not found"))?,
2867 )
2868 }
2869 }
2870 _ => None,
2871 }
2872 } else {
2873 None
2874 };
2875
2876 let n = rows.len();
2880 let mut indices: Vec<usize> = (0..n).collect();
2881 indices.sort_by(|&a, &b| {
2882 for &pi in &part_indices {
2884 let cmp = rows[a][pi].cmp(&rows[b][pi]);
2885 if cmp != std::cmp::Ordering::Equal {
2886 return cmp;
2887 }
2888 }
2889 for &(oi, desc) in &ord_indices {
2891 let cmp = rows[a][oi].cmp(&rows[b][oi]);
2892 if cmp != std::cmp::Ordering::Equal {
2893 return if desc { cmp.reverse() } else { cmp };
2894 }
2895 }
2896 std::cmp::Ordering::Equal
2897 });
2898
2899 let mut win_values: Vec<Value> = vec![Value::Empty; n];
2901 let mut partition_start = 0usize;
2902 let mut running_count: i64 = 0;
2904 let mut running_int_sum: i64 = 0;
2905 let mut running_float_sum: f64 = 0.0;
2906 let mut running_saw_float = false;
2907 let mut running_min: Option<Value> = None;
2908 let mut running_max: Option<Value> = None;
2909 let mut rank_counter: i64 = 0;
2910 let mut dense_rank_counter: i64 = 0;
2911 let mut prev_order_key: Option<Vec<Value>> = None;
2912 let mut same_rank_count: i64 = 0;
2913
2914 for sorted_pos in 0..n {
2915 let row_idx = indices[sorted_pos];
2916
2917 let new_partition = if sorted_pos == 0 {
2919 true
2920 } else {
2921 let prev_row_idx = indices[sorted_pos - 1];
2922 part_indices
2923 .iter()
2924 .any(|&pi| rows[row_idx][pi] != rows[prev_row_idx][pi])
2925 };
2926
2927 if new_partition {
2928 partition_start = sorted_pos;
2929 running_count = 0;
2930 running_int_sum = 0;
2931 running_float_sum = 0.0;
2932 running_saw_float = false;
2933 running_min = None;
2934 running_max = None;
2935 rank_counter = 0;
2936 dense_rank_counter = 0;
2937 prev_order_key = None;
2938 same_rank_count = 0;
2939 }
2940
2941 let current_order_key: Vec<Value> = ord_indices
2943 .iter()
2944 .map(|&(oi, _)| rows[row_idx][oi].clone())
2945 .collect();
2946 let same_as_prev = prev_order_key.as_ref() == Some(¤t_order_key);
2947
2948 let value = match wdef.function {
2949 WindowFunc::RowNumber => Value::Int((sorted_pos - partition_start + 1) as i64),
2950 WindowFunc::Rank => {
2951 if same_as_prev {
2952 same_rank_count += 1;
2953 } else {
2954 rank_counter += same_rank_count + 1;
2955 same_rank_count = 0;
2956 if rank_counter == 0 {
2957 rank_counter = 1;
2958 }
2959 }
2960 Value::Int(rank_counter)
2961 }
2962 WindowFunc::DenseRank => {
2963 if !same_as_prev {
2964 dense_rank_counter += 1;
2965 }
2966 Value::Int(dense_rank_counter)
2967 }
2968 WindowFunc::Sum => {
2969 if let Some(ci) = arg_col_idx {
2970 match &rows[row_idx][ci] {
2971 Value::Int(v) => running_int_sum += v,
2972 Value::Float(v) => {
2973 running_float_sum += v;
2974 running_saw_float = true;
2975 }
2976 _ => {}
2977 }
2978 }
2979 if running_saw_float {
2980 Value::Float(running_float_sum + running_int_sum as f64)
2981 } else {
2982 Value::Int(running_int_sum)
2983 }
2984 }
2985 WindowFunc::Avg => {
2986 if let Some(ci) = arg_col_idx {
2987 match &rows[row_idx][ci] {
2988 Value::Int(v) => {
2989 running_float_sum += *v as f64;
2990 running_count += 1;
2991 }
2992 Value::Float(v) => {
2993 running_float_sum += v;
2994 running_count += 1;
2995 }
2996 _ => {}
2997 }
2998 }
2999 if running_count == 0 {
3000 Value::Empty
3001 } else {
3002 Value::Float(running_float_sum / running_count as f64)
3003 }
3004 }
3005 WindowFunc::Count => {
3006 if let Some(ci) = arg_col_idx {
3007 if !rows[row_idx][ci].is_empty() {
3008 running_count += 1;
3009 }
3010 } else {
3011 running_count += 1;
3013 }
3014 Value::Int(running_count)
3015 }
3016 WindowFunc::Min => {
3017 if let Some(ci) = arg_col_idx {
3018 let v = &rows[row_idx][ci];
3019 if !v.is_empty() {
3020 running_min = Some(match &running_min {
3021 None => v.clone(),
3022 Some(cur) => {
3023 if v < cur {
3024 v.clone()
3025 } else {
3026 cur.clone()
3027 }
3028 }
3029 });
3030 }
3031 }
3032 running_min.clone().unwrap_or(Value::Empty)
3033 }
3034 WindowFunc::Max => {
3035 if let Some(ci) = arg_col_idx {
3036 let v = &rows[row_idx][ci];
3037 if !v.is_empty() {
3038 running_max = Some(match &running_max {
3039 None => v.clone(),
3040 Some(cur) => {
3041 if v > cur {
3042 v.clone()
3043 } else {
3044 cur.clone()
3045 }
3046 }
3047 });
3048 }
3049 }
3050 running_max.clone().unwrap_or(Value::Empty)
3051 }
3052 };
3053
3054 prev_order_key = Some(current_order_key);
3055 win_values[row_idx] = value;
3056 }
3057
3058 for (ri, row) in rows.iter_mut().enumerate() {
3060 row.push(win_values[ri].clone());
3061 }
3062 columns.push(wdef.output_name.clone());
3063 }
3064
3065 Ok(QueryResult::Rows { columns, rows })
3066}
3067
3068pub(super) fn compute_group_aggregate(
3070 func: AggFunc,
3071 all_rows: &[Vec<Value>],
3072 row_indices: &[usize],
3073 col_idx: usize,
3074) -> Value {
3075 match func {
3076 AggFunc::Count => {
3077 if col_idx == usize::MAX {
3078 return Value::Int(row_indices.len() as i64);
3080 }
3081 let count = row_indices
3082 .iter()
3083 .filter(|&&ri| !all_rows[ri][col_idx].is_empty())
3084 .count();
3085 Value::Int(count as i64)
3086 }
3087 AggFunc::CountDistinct => {
3088 let mut seen = std::collections::HashSet::new();
3089 for &ri in row_indices {
3090 let v = &all_rows[ri][col_idx];
3091 if !v.is_empty() {
3092 seen.insert(v.clone());
3093 }
3094 }
3095 Value::Int(seen.len() as i64)
3096 }
3097 AggFunc::Sum => {
3098 let mut int_sum: i64 = 0;
3103 let mut float_sum: f64 = 0.0;
3104 let mut saw_float = false;
3105 for &ri in row_indices {
3106 match &all_rows[ri][col_idx] {
3107 Value::Int(v) => int_sum += v,
3108 Value::Float(v) => {
3109 float_sum += *v;
3110 saw_float = true;
3111 }
3112 _ => {}
3113 }
3114 }
3115 if saw_float {
3116 Value::Float(float_sum + int_sum as f64)
3117 } else {
3118 Value::Int(int_sum)
3119 }
3120 }
3121 AggFunc::Avg => {
3122 let mut sum = 0.0f64;
3123 let mut count = 0usize;
3124 for &ri in row_indices {
3125 match &all_rows[ri][col_idx] {
3126 Value::Int(v) => {
3127 sum += *v as f64;
3128 count += 1;
3129 }
3130 Value::Float(v) => {
3131 sum += *v;
3132 count += 1;
3133 }
3134 _ => {}
3135 }
3136 }
3137 if count == 0 {
3138 Value::Empty
3139 } else {
3140 Value::Float(sum / count as f64)
3141 }
3142 }
3143 AggFunc::Min => row_indices
3144 .iter()
3145 .map(|&ri| &all_rows[ri][col_idx])
3146 .filter(|v| !v.is_empty())
3147 .min()
3148 .cloned()
3149 .unwrap_or(Value::Empty),
3150 AggFunc::Max => row_indices
3151 .iter()
3152 .map(|&ri| &all_rows[ri][col_idx])
3153 .filter(|v| !v.is_empty())
3154 .max()
3155 .cloned()
3156 .unwrap_or(Value::Empty),
3157 }
3158}
3159
3160pub(super) fn try_extract_equi_join_keys(
3174 pred: &Expr,
3175 left_columns: &[String],
3176 right_columns: &[String],
3177) -> Option<(usize, usize)> {
3178 let (lhs, op, rhs) = match pred {
3179 Expr::BinaryOp(l, op, r) => (l.as_ref(), *op, r.as_ref()),
3180 _ => return None,
3181 };
3182 if op != BinOp::Eq {
3183 return None;
3184 }
3185 if let (Some(li), Some(ri)) = (
3187 resolve_side_column(lhs, left_columns),
3188 resolve_side_column(rhs, right_columns),
3189 ) {
3190 return Some((li, ri));
3191 }
3192 if let (Some(li), Some(ri)) = (
3195 resolve_side_column(rhs, left_columns),
3196 resolve_side_column(lhs, right_columns),
3197 ) {
3198 return Some((li, ri));
3199 }
3200 None
3201}
3202
3203fn resolve_side_column(expr: &Expr, columns: &[String]) -> Option<usize> {
3204 match expr {
3205 Expr::QualifiedField { qualifier, field } => {
3206 let q = qualifier.as_bytes();
3211 let f = field.as_bytes();
3212 columns.iter().position(|c| {
3213 let b = c.as_bytes();
3214 b.len() == q.len() + 1 + f.len()
3215 && b[..q.len()] == *q
3216 && b[q.len()] == b'.'
3217 && b[q.len() + 1..] == *f
3218 })
3219 }
3220 Expr::Field(name) => columns.iter().position(|c| c == name),
3221 _ => None,
3222 }
3223}
3224
3225pub(super) fn hash_join(
3237 left_columns: Vec<String>,
3238 left_rows: Vec<Vec<Value>>,
3239 right_columns: Vec<String>,
3240 right_rows: Vec<Vec<Value>>,
3241 left_key_idx: usize,
3242 right_key_idx: usize,
3243 kind: JoinKind,
3244) -> QueryResult {
3245 use rustc_hash::FxHashMap;
3246
3247 let n_left = left_columns.len();
3248 let n_right = right_columns.len();
3249 let mut columns = Vec::with_capacity(n_left + n_right);
3250 columns.extend(left_columns);
3251 columns.extend(right_columns);
3252
3253 let mut build: FxHashMap<Value, Vec<usize>> =
3256 FxHashMap::with_capacity_and_hasher(right_rows.len(), Default::default());
3257 for (i, row) in right_rows.iter().enumerate() {
3258 if matches!(row[right_key_idx], Value::Empty) {
3262 continue;
3263 }
3264 build.entry(row[right_key_idx].clone()).or_default().push(i);
3265 }
3266
3267 let mut rows: Vec<Vec<Value>> = Vec::with_capacity(left_rows.len());
3270
3271 for left_row in &left_rows {
3272 let key = &left_row[left_key_idx];
3273 let matched = if matches!(key, Value::Empty) {
3274 None
3275 } else {
3276 build.get(key)
3277 };
3278 match matched {
3279 Some(matches) if !matches.is_empty() => {
3280 for &ri in matches {
3281 let right_row = &right_rows[ri];
3282 let mut combined = Vec::with_capacity(n_left + n_right);
3283 combined.extend_from_slice(left_row);
3284 combined.extend_from_slice(right_row);
3285 rows.push(combined);
3286 }
3287 }
3288 _ => {
3289 if matches!(kind, JoinKind::LeftOuter) {
3290 let mut row = Vec::with_capacity(n_left + n_right);
3291 row.extend_from_slice(left_row);
3292 row.resize(n_left + n_right, Value::Empty);
3293 rows.push(row);
3294 }
3295 }
3296 }
3297 }
3298
3299 QueryResult::Rows { columns, rows }
3300}
3301
3302pub(super) fn lower_unindexed_range_scans(catalog: &Catalog, plan: &PlanNode) -> PlanNode {
3315 match plan {
3316 PlanNode::RangeScan {
3317 table,
3318 column,
3319 start,
3320 end,
3321 } => {
3322 if let Some(tbl) = catalog.get_table(table) {
3323 if tbl.is_index_unique(column) == Some(true) {
3328 return plan.clone();
3329 }
3330 }
3331 let pred = synthesize_range_predicate(column, start, end);
3332 PlanNode::Filter {
3333 input: Box::new(PlanNode::SeqScan {
3334 table: table.clone(),
3335 }),
3336 predicate: pred,
3337 }
3338 }
3339 PlanNode::Filter { input, predicate } => PlanNode::Filter {
3340 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3341 predicate: predicate.clone(),
3342 },
3343 PlanNode::Project { input, fields } => PlanNode::Project {
3344 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3345 fields: fields.clone(),
3346 },
3347 PlanNode::Sort { input, keys } => PlanNode::Sort {
3348 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3349 keys: keys.clone(),
3350 },
3351 PlanNode::Limit { input, count } => PlanNode::Limit {
3352 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3353 count: count.clone(),
3354 },
3355 PlanNode::Offset { input, count } => PlanNode::Offset {
3356 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3357 count: count.clone(),
3358 },
3359 PlanNode::Aggregate {
3360 input,
3361 function,
3362 field,
3363 } => PlanNode::Aggregate {
3364 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3365 function: *function,
3366 field: field.clone(),
3367 },
3368 PlanNode::Distinct { input } => PlanNode::Distinct {
3369 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3370 },
3371 PlanNode::GroupBy {
3372 input,
3373 keys,
3374 aggregates,
3375 having,
3376 } => PlanNode::GroupBy {
3377 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3378 keys: keys.clone(),
3379 aggregates: aggregates.clone(),
3380 having: having.clone(),
3381 },
3382 PlanNode::Update {
3383 input,
3384 table,
3385 assignments,
3386 } => PlanNode::Update {
3387 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3388 table: table.clone(),
3389 assignments: assignments.clone(),
3390 },
3391 PlanNode::Delete { input, table } => PlanNode::Delete {
3392 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3393 table: table.clone(),
3394 },
3395 PlanNode::Window { input, windows } => PlanNode::Window {
3396 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3397 windows: windows.clone(),
3398 },
3399 PlanNode::Union { left, right, all } => PlanNode::Union {
3400 left: Box::new(lower_unindexed_range_scans(catalog, left)),
3401 right: Box::new(lower_unindexed_range_scans(catalog, right)),
3402 all: *all,
3403 },
3404 PlanNode::Explain { input } => PlanNode::Explain {
3405 input: Box::new(lower_unindexed_range_scans(catalog, input)),
3406 },
3407 PlanNode::NestedLoopJoin {
3408 left,
3409 right,
3410 on,
3411 kind,
3412 } => PlanNode::NestedLoopJoin {
3413 left: Box::new(lower_unindexed_range_scans(catalog, left)),
3414 right: Box::new(lower_unindexed_range_scans(catalog, right)),
3415 on: on.clone(),
3416 kind: *kind,
3417 },
3418 _ => plan.clone(),
3420 }
3421}
3422
3423pub(super) fn synthesize_range_predicate(
3425 column: &str,
3426 start: &Option<(Expr, bool)>,
3427 end: &Option<(Expr, bool)>,
3428) -> Expr {
3429 let lower = start.as_ref().map(|(expr, inclusive)| {
3430 let op = if *inclusive { BinOp::Gte } else { BinOp::Gt };
3431 Expr::BinaryOp(
3432 Box::new(Expr::Field(column.to_string())),
3433 op,
3434 Box::new(expr.clone()),
3435 )
3436 });
3437 let upper = end.as_ref().map(|(expr, inclusive)| {
3438 let op = if *inclusive { BinOp::Lte } else { BinOp::Lt };
3439 Expr::BinaryOp(
3440 Box::new(Expr::Field(column.to_string())),
3441 op,
3442 Box::new(expr.clone()),
3443 )
3444 });
3445 match (lower, upper) {
3446 (Some(l), Some(u)) => Expr::BinaryOp(Box::new(l), BinOp::And, Box::new(u)),
3447 (Some(l), None) => l,
3448 (None, Some(u)) => u,
3449 (None, None) => Expr::Literal(Literal::Bool(true)),
3450 }
3451}
3452
3453pub(super) fn range_matches(
3455 val: &Value,
3456 start: &Option<Value>,
3457 start_inc: bool,
3458 end: &Option<Value>,
3459 end_inc: bool,
3460) -> bool {
3461 if let Some(ref s) = start {
3462 if start_inc {
3463 if val < s {
3464 return false;
3465 }
3466 } else if val <= s {
3467 return false;
3468 }
3469 }
3470 if let Some(ref e) = end {
3471 if end_inc {
3472 if val > e {
3473 return false;
3474 }
3475 } else if val >= e {
3476 return false;
3477 }
3478 }
3479 true
3480}
3481
3482pub(super) fn format_plan_tree(plan: &PlanNode, depth: usize) -> String {
3485 let indent = " ".repeat(depth);
3486 match plan {
3487 PlanNode::SeqScan { table } => format!("{indent}SeqScan table={table}"),
3488 PlanNode::AliasScan { table, alias } => {
3489 format!("{indent}AliasScan table={table} alias={alias}")
3490 }
3491 PlanNode::IndexScan { table, column, key } => {
3492 format!("{indent}IndexScan table={table} column={column} key={key:?}")
3493 }
3494 PlanNode::RangeScan {
3495 table,
3496 column,
3497 start,
3498 end,
3499 } => {
3500 let s = match start {
3501 Some((expr, inc)) => {
3502 let op = if *inc { ">=" } else { ">" };
3503 format!("{op}{expr:?}")
3504 }
3505 None => "unbounded".to_string(),
3506 };
3507 let e = match end {
3508 Some((expr, inc)) => {
3509 let op = if *inc { "<=" } else { "<" };
3510 format!("{op}{expr:?}")
3511 }
3512 None => "unbounded".to_string(),
3513 };
3514 format!("{indent}RangeScan table={table} column={column} [{s}, {e}]")
3515 }
3516 PlanNode::Filter { input, predicate } => {
3517 let child = format_plan_tree(input, depth + 1);
3518 format!("{indent}Filter predicate={predicate:?}\n{child}")
3519 }
3520 PlanNode::Project { input, fields } => {
3521 let names: Vec<String> = fields
3522 .iter()
3523 .map(|f| match &f.alias {
3524 Some(a) => format!("{a}: {:?}", f.expr),
3525 None => format!("{:?}", f.expr),
3526 })
3527 .collect();
3528 let child = format_plan_tree(input, depth + 1);
3529 format!("{indent}Project fields=[{}]\n{child}", names.join(", "))
3530 }
3531 PlanNode::Sort { input, keys } => {
3532 let ks: Vec<String> = keys
3533 .iter()
3534 .map(|k| {
3535 if k.descending {
3536 format!("{} desc", k.field)
3537 } else {
3538 k.field.clone()
3539 }
3540 })
3541 .collect();
3542 let child = format_plan_tree(input, depth + 1);
3543 format!("{indent}Sort keys=[{}]\n{child}", ks.join(", "))
3544 }
3545 PlanNode::Limit { input, count } => {
3546 let child = format_plan_tree(input, depth + 1);
3547 format!("{indent}Limit count={count:?}\n{child}")
3548 }
3549 PlanNode::Offset { input, count } => {
3550 let child = format_plan_tree(input, depth + 1);
3551 format!("{indent}Offset count={count:?}\n{child}")
3552 }
3553 PlanNode::Aggregate {
3554 input,
3555 function,
3556 field,
3557 } => {
3558 let f = field.as_deref().unwrap_or("*");
3559 let child = format_plan_tree(input, depth + 1);
3560 format!("{indent}Aggregate fn={function:?} field={f}\n{child}")
3561 }
3562 PlanNode::NestedLoopJoin {
3563 left,
3564 right,
3565 on,
3566 kind,
3567 } => {
3568 let left_child = format_plan_tree(left, depth + 1);
3569 let right_child = format_plan_tree(right, depth + 1);
3570 let on_str = match on {
3571 Some(pred) => format!("{pred:?}"),
3572 None => "none".to_string(),
3573 };
3574 format!("{indent}NestedLoopJoin kind={kind:?} on={on_str}\n{left_child}\n{right_child}")
3575 }
3576 PlanNode::Distinct { input } => {
3577 let child = format_plan_tree(input, depth + 1);
3578 format!("{indent}Distinct\n{child}")
3579 }
3580 PlanNode::GroupBy {
3581 input,
3582 keys,
3583 aggregates,
3584 having,
3585 } => {
3586 let agg_strs: Vec<String> = aggregates
3587 .iter()
3588 .map(|a| format!("{:?}({}) as {}", a.function, a.field, a.output_name))
3589 .collect();
3590 let having_str = match having {
3591 Some(h) => format!(" having={h:?}"),
3592 None => String::new(),
3593 };
3594 let child = format_plan_tree(input, depth + 1);
3595 format!(
3596 "{indent}GroupBy keys=[{}] aggs=[{}]{having_str}\n{child}",
3597 keys.join(", "),
3598 agg_strs.join(", "),
3599 )
3600 }
3601 PlanNode::Insert { table, assignments } => {
3602 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3603 format!("{indent}Insert table={table} cols=[{}]", cols.join(", "))
3604 }
3605 PlanNode::Upsert {
3606 table,
3607 key_column,
3608 assignments,
3609 on_conflict,
3610 } => {
3611 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3612 let conflict_cols: Vec<&str> = on_conflict.iter().map(|a| a.field.as_str()).collect();
3613 if conflict_cols.is_empty() {
3614 format!(
3615 "{indent}Upsert table={table} key={key_column} cols=[{}]",
3616 cols.join(", ")
3617 )
3618 } else {
3619 format!(
3620 "{indent}Upsert table={table} key={key_column} cols=[{}] on_conflict=[{}]",
3621 cols.join(", "),
3622 conflict_cols.join(", ")
3623 )
3624 }
3625 }
3626 PlanNode::Update {
3627 input,
3628 table,
3629 assignments,
3630 } => {
3631 let cols: Vec<&str> = assignments.iter().map(|a| a.field.as_str()).collect();
3632 let child = format_plan_tree(input, depth + 1);
3633 format!(
3634 "{indent}Update table={table} set=[{}]\n{child}",
3635 cols.join(", ")
3636 )
3637 }
3638 PlanNode::Delete { input, table } => {
3639 let child = format_plan_tree(input, depth + 1);
3640 format!("{indent}Delete table={table}\n{child}")
3641 }
3642 PlanNode::CreateTable { name, fields } => {
3643 let fs: Vec<String> = fields
3644 .iter()
3645 .map(|(n, t, r)| {
3646 if *r {
3647 format!("{n}: {t} required")
3648 } else {
3649 format!("{n}: {t}")
3650 }
3651 })
3652 .collect();
3653 format!("{indent}CreateTable name={name} fields=[{}]", fs.join(", "))
3654 }
3655 PlanNode::AlterTable { table, action } => {
3656 format!("{indent}AlterTable table={table} action={action:?}")
3657 }
3658 PlanNode::DropTable { name } => format!("{indent}DropTable name={name}"),
3659 PlanNode::CreateView { name, .. } => format!("{indent}CreateView name={name}"),
3660 PlanNode::RefreshView { name } => format!("{indent}RefreshView name={name}"),
3661 PlanNode::DropView { name } => format!("{indent}DropView name={name}"),
3662 PlanNode::Window { input, windows } => {
3663 let ws: Vec<String> = windows
3664 .iter()
3665 .map(|w| format!("{:?} as {}", w.function, w.output_name))
3666 .collect();
3667 let child = format_plan_tree(input, depth + 1);
3668 format!("{indent}Window fns=[{}]\n{child}", ws.join(", "))
3669 }
3670 PlanNode::Union { left, right, all } => {
3671 let kind = if *all { "UNION ALL" } else { "UNION" };
3672 let left_child = format_plan_tree(left, depth + 1);
3673 let right_child = format_plan_tree(right, depth + 1);
3674 format!("{indent}{kind}\n{left_child}\n{right_child}")
3675 }
3676 PlanNode::Explain { input } => {
3677 let child = format_plan_tree(input, depth + 1);
3678 format!("{indent}Explain\n{child}")
3679 }
3680 PlanNode::Begin => format!("{indent}Begin"),
3681 PlanNode::Commit => format!("{indent}Commit"),
3682 PlanNode::Rollback => format!("{indent}Rollback"),
3683 }
3684}