1#![allow(clippy::ref_option)]
4#![allow(clippy::trivially_copy_pass_by_ref)]
5#![allow(clippy::items_after_statements)]
6
7use std::cmp::Ordering;
8use std::ops::Bound;
9
10const SCAN_LIMIT_MULTIPLIER_WITH_SORT: usize = 10;
20
21const SCAN_LIMIT_MULTIPLIER_NO_SORT: usize = 2;
26
27const DEFAULT_SCAN_LIMIT: usize = 10_000;
35
36const MAX_AGGREGATES_PER_QUERY: usize = 100;
42
43const MAX_JOIN_OUTPUT_ROWS: usize = 1_000_000;
49
50const MAX_GROUP_COUNT: usize = 100_000;
55
56use bytes::Bytes;
57use kimberlite_store::{Key, ProjectionStore, TableId};
58use kimberlite_types::Offset;
59
60use crate::error::{QueryError, Result};
61use crate::key_encoder::successor_key;
62use crate::plan::{QueryPlan, ScanOrder, SortSpec};
63use crate::schema::{ColumnName, TableDef};
64use crate::value::Value;
65
66#[derive(Debug, Clone)]
68pub struct QueryResult {
69 pub columns: Vec<ColumnName>,
71 pub rows: Vec<Row>,
73}
74
75impl QueryResult {
76 pub fn empty(columns: Vec<ColumnName>) -> Self {
78 Self {
79 columns,
80 rows: vec![],
81 }
82 }
83
84 pub fn len(&self) -> usize {
86 self.rows.len()
87 }
88
89 pub fn is_empty(&self) -> bool {
91 self.rows.is_empty()
92 }
93}
94
95pub type Row = Vec<Value>;
97
98#[allow(clippy::too_many_arguments)]
100fn execute_index_scan<S: ProjectionStore>(
101 store: &mut S,
102 metadata: &crate::plan::TableMetadata,
103 index_id: u64,
104 start: &Bound<Key>,
105 end: &Bound<Key>,
106 filter: &Option<crate::plan::Filter>,
107 limit: &Option<usize>,
108 offset: &Option<usize>,
109 order: &ScanOrder,
110 order_by: &Option<crate::plan::SortSpec>,
111 columns: &[usize],
112 column_names: &[ColumnName],
113 position: Option<Offset>,
114) -> Result<QueryResult> {
115 let (start_key, end_key) = bounds_to_range(start, end);
116
117 let limit_plus_offset = limit.map(|l| l.saturating_add(offset.unwrap_or(0)));
120
121 let scan_limit = if order_by.is_some() {
123 limit_plus_offset.map_or(DEFAULT_SCAN_LIMIT, |l| {
124 l.saturating_mul(SCAN_LIMIT_MULTIPLIER_WITH_SORT)
125 })
126 } else {
127 limit_plus_offset.map_or(DEFAULT_SCAN_LIMIT, |l| {
128 l.saturating_mul(SCAN_LIMIT_MULTIPLIER_NO_SORT)
129 })
130 };
131
132 debug_assert!(scan_limit > 0, "scan_limit must be positive");
134
135 use std::collections::hash_map::DefaultHasher;
137 use std::hash::{Hash, Hasher};
138
139 let mut hasher = DefaultHasher::new();
140 metadata.table_id.as_u64().hash(&mut hasher);
141 index_id.hash(&mut hasher);
142 let index_table_id = TableId::new(hasher.finish());
143
144 let index_pairs = match position {
146 Some(pos) => store.scan_at(index_table_id, start_key..end_key, scan_limit, pos)?,
147 None => store.scan(index_table_id, start_key..end_key, scan_limit)?,
148 };
149
150 let mut full_rows = Vec::new();
151 let index_iter: Box<dyn Iterator<Item = &(Key, Bytes)>> = match order {
152 ScanOrder::Ascending => Box::new(index_pairs.iter()),
153 ScanOrder::Descending => Box::new(index_pairs.iter().rev()),
154 };
155
156 for (index_key, _) in index_iter {
157 let pk_key = extract_pk_from_index_key(index_key, metadata);
159
160 let bytes_opt = match position {
162 Some(pos) => store.get_at(metadata.table_id, &pk_key, pos)?,
163 None => store.get(metadata.table_id, &pk_key)?,
164 };
165 if let Some(bytes) = bytes_opt {
166 let full_row = decode_row(&bytes, metadata)?;
167
168 if let Some(f) = filter {
170 if !f.matches(&full_row) {
171 continue;
172 }
173 }
174
175 full_rows.push(full_row);
176
177 if order_by.is_none() {
179 if let Some(target) = limit_plus_offset {
180 if full_rows.len() >= target {
181 break;
182 }
183 }
184 }
185 }
186 }
187
188 if let Some(sort_spec) = order_by {
190 sort_rows(&mut full_rows, sort_spec);
191 }
192
193 apply_offset_and_limit(&mut full_rows, *offset, *limit);
194
195 let rows: Vec<Row> = full_rows
197 .iter()
198 .map(|full_row| project_row(full_row, columns))
199 .collect();
200
201 Ok(QueryResult {
202 columns: column_names.to_vec(),
203 rows,
204 })
205}
206
207#[allow(clippy::too_many_arguments)]
209fn execute_table_scan<S: ProjectionStore>(
210 store: &mut S,
211 metadata: &crate::plan::TableMetadata,
212 filter: &Option<crate::plan::Filter>,
213 limit: &Option<usize>,
214 offset: &Option<usize>,
215 order: &Option<SortSpec>,
216 columns: &[usize],
217 column_names: &[ColumnName],
218 position: Option<Offset>,
219) -> Result<QueryResult> {
220 let limit_plus_offset = limit.map(|l| l.saturating_add(offset.unwrap_or(0)));
222 let scan_limit = limit_plus_offset.map_or(100_000, |l| l.saturating_mul(10));
223 let pairs = match position {
224 Some(pos) => store.scan_at(metadata.table_id, Key::min()..Key::max(), scan_limit, pos)?,
225 None => store.scan(metadata.table_id, Key::min()..Key::max(), scan_limit)?,
226 };
227
228 let mut full_rows = Vec::new();
229
230 for (_, bytes) in &pairs {
231 let full_row = decode_row(bytes, metadata)?;
232
233 if let Some(f) = filter {
235 if !f.matches(&full_row) {
236 continue;
237 }
238 }
239
240 full_rows.push(full_row);
241 }
242
243 if let Some(sort_spec) = order {
245 sort_rows(&mut full_rows, sort_spec);
246 }
247
248 apply_offset_and_limit(&mut full_rows, *offset, *limit);
249
250 let rows: Vec<Row> = full_rows
252 .iter()
253 .map(|full_row| project_row(full_row, columns))
254 .collect();
255
256 Ok(QueryResult {
257 columns: column_names.to_vec(),
258 rows,
259 })
260}
261
262#[allow(clippy::too_many_arguments)]
264fn execute_range_scan<S: ProjectionStore>(
265 store: &mut S,
266 metadata: &crate::plan::TableMetadata,
267 start: &Bound<Key>,
268 end: &Bound<Key>,
269 filter: &Option<crate::plan::Filter>,
270 limit: &Option<usize>,
271 offset: &Option<usize>,
272 order: &ScanOrder,
273 order_by: &Option<crate::plan::SortSpec>,
274 columns: &[usize],
275 column_names: &[ColumnName],
276 position: Option<Offset>,
277) -> Result<QueryResult> {
278 let (start_key, end_key) = bounds_to_range(start, end);
279
280 let limit_plus_offset = limit.map(|l| l.saturating_add(offset.unwrap_or(0)));
282
283 let scan_limit = if order_by.is_some() {
285 limit_plus_offset.map_or(DEFAULT_SCAN_LIMIT, |l| {
286 l.saturating_mul(SCAN_LIMIT_MULTIPLIER_WITH_SORT)
287 })
288 } else {
289 limit_plus_offset.map_or(DEFAULT_SCAN_LIMIT, |l| {
290 l.saturating_mul(SCAN_LIMIT_MULTIPLIER_NO_SORT)
291 })
292 };
293
294 debug_assert!(scan_limit > 0, "scan_limit must be positive");
296
297 let pairs = match position {
298 Some(pos) => store.scan_at(metadata.table_id, start_key..end_key, scan_limit, pos)?,
299 None => store.scan(metadata.table_id, start_key..end_key, scan_limit)?,
300 };
301
302 let mut full_rows = Vec::new();
303 let row_iter: Box<dyn Iterator<Item = &(Key, Bytes)>> = match order {
304 ScanOrder::Ascending => Box::new(pairs.iter()),
305 ScanOrder::Descending => Box::new(pairs.iter().rev()),
306 };
307
308 for (_, bytes) in row_iter {
309 let full_row = decode_row(bytes, metadata)?;
310
311 if let Some(f) = filter {
313 if !f.matches(&full_row) {
314 continue;
315 }
316 }
317
318 full_rows.push(full_row);
319
320 if order_by.is_none() {
322 if let Some(target) = limit_plus_offset {
323 if full_rows.len() >= target {
324 break;
325 }
326 }
327 }
328 }
329
330 if let Some(sort_spec) = order_by {
332 sort_rows(&mut full_rows, sort_spec);
333 }
334
335 apply_offset_and_limit(&mut full_rows, *offset, *limit);
336
337 let rows: Vec<Row> = full_rows
339 .iter()
340 .map(|full_row| project_row(full_row, columns))
341 .collect();
342
343 Ok(QueryResult {
344 columns: column_names.to_vec(),
345 rows,
346 })
347}
348
349#[inline]
355fn apply_offset_and_limit<T>(rows: &mut Vec<T>, offset: Option<usize>, limit: Option<usize>) {
356 if let Some(off) = offset {
357 if off >= rows.len() {
358 rows.clear();
359 } else {
360 rows.drain(0..off);
361 }
362 }
363 if let Some(lim) = limit {
364 rows.truncate(lim);
365 }
366}
367
368fn execute_point_lookup<S: ProjectionStore>(
370 store: &mut S,
371 metadata: &crate::plan::TableMetadata,
372 key: &Key,
373 columns: &[usize],
374 column_names: &[ColumnName],
375 position: Option<Offset>,
376) -> Result<QueryResult> {
377 let result = match position {
378 Some(pos) => store.get_at(metadata.table_id, key, pos)?,
379 None => store.get(metadata.table_id, key)?,
380 };
381 match result {
382 Some(bytes) => {
383 let row = decode_and_project(&bytes, columns, metadata)?;
384 Ok(QueryResult {
385 columns: column_names.to_vec(),
386 rows: vec![row],
387 })
388 }
389 None => Ok(QueryResult::empty(column_names.to_vec())),
390 }
391}
392
393#[allow(clippy::too_many_lines, clippy::used_underscore_binding)]
395fn execute_internal<S: ProjectionStore>(
396 store: &mut S,
397 plan: &QueryPlan,
398 _table_def: &TableDef, position: Option<Offset>,
400) -> Result<QueryResult> {
401 kimberlite_properties::sometimes!(
403 position.is_some(),
404 "query.time_travel_at_position",
405 "query executes at a pinned historical log offset"
406 );
407 let result = execute_internal_inner(store, plan, _table_def, position)?;
408
409 #[cfg(any(test, feature = "sim"))]
411 {
412 let _expected_cols = plan.column_names().len();
413 kimberlite_properties::always!(
414 result.columns.len() == _expected_cols,
415 "query.result_columns_match_plan",
416 "query result column count must equal plan-declared schema column count"
417 );
418 kimberlite_properties::always!(
420 result.rows.iter().all(|r| r.len() == _expected_cols),
421 "query.row_width_matches_columns",
422 "every result row must have width equal to declared column count"
423 );
424 }
425 Ok(result)
426}
427
428#[allow(clippy::too_many_lines)]
429fn execute_internal_inner<S: ProjectionStore>(
430 store: &mut S,
431 plan: &QueryPlan,
432 _table_def: &TableDef,
433 position: Option<Offset>,
434) -> Result<QueryResult> {
435 match plan {
436 QueryPlan::PointLookup {
437 metadata,
438 key,
439 columns,
440 column_names,
441 } => execute_point_lookup(store, metadata, key, columns, column_names, position),
442
443 QueryPlan::RangeScan {
444 metadata,
445 start,
446 end,
447 filter,
448 limit,
449 offset,
450 order,
451 order_by,
452 columns,
453 column_names,
454 } => execute_range_scan(
455 store,
456 metadata,
457 start,
458 end,
459 filter,
460 limit,
461 offset,
462 order,
463 order_by,
464 columns,
465 column_names,
466 position,
467 ),
468
469 QueryPlan::IndexScan {
470 metadata,
471 index_id,
472 start,
473 end,
474 filter,
475 limit,
476 offset,
477 order,
478 order_by,
479 columns,
480 column_names,
481 ..
482 } => execute_index_scan(
483 store,
484 metadata,
485 *index_id,
486 start,
487 end,
488 filter,
489 limit,
490 offset,
491 order,
492 order_by,
493 columns,
494 column_names,
495 position,
496 ),
497
498 QueryPlan::TableScan {
499 metadata,
500 filter,
501 limit,
502 offset,
503 order,
504 columns,
505 column_names,
506 } => execute_table_scan(
507 store,
508 metadata,
509 filter,
510 limit,
511 offset,
512 order,
513 columns,
514 column_names,
515 position,
516 ),
517
518 QueryPlan::Aggregate {
519 metadata,
520 source,
521 group_by_cols,
522 group_by_names: _,
523 aggregates,
524 aggregate_filters,
525 column_names,
526 having,
527 } => execute_aggregate(
528 store,
529 source,
530 group_by_cols,
531 aggregates,
532 aggregate_filters,
533 column_names,
534 metadata,
535 having,
536 position,
537 ),
538
539 QueryPlan::Join {
540 join_type,
541 left,
542 right,
543 on_conditions,
544 columns,
545 column_names,
546 } => execute_join(
547 store,
548 join_type,
549 left,
550 right,
551 on_conditions,
552 columns,
553 column_names,
554 position,
555 ),
556
557 QueryPlan::Materialize {
558 source,
559 filter,
560 case_columns,
561 scalar_columns,
562 order,
563 limit,
564 offset,
565 column_names,
566 } => execute_materialize(
567 store,
568 source,
569 filter,
570 case_columns,
571 scalar_columns,
572 order,
573 limit,
574 offset,
575 column_names,
576 position,
577 ),
578 }
579}
580
581#[allow(clippy::too_many_arguments)]
583fn execute_materialize<S: ProjectionStore>(
584 store: &mut S,
585 source: &QueryPlan,
586 filter: &Option<crate::plan::Filter>,
587 case_columns: &[crate::plan::CaseColumnDef],
588 scalar_columns: &[crate::plan::ScalarColumnDef],
589 order: &Option<SortSpec>,
590 limit: &Option<usize>,
591 offset: &Option<usize>,
592 column_names: &[ColumnName],
593 position: Option<Offset>,
594) -> Result<QueryResult> {
595 let dummy_def = TableDef {
597 table_id: kimberlite_store::TableId::from(0u64),
598 columns: vec![],
599 primary_key: vec![],
600 indexes: vec![],
601 };
602 let mut source_result = execute_internal(store, source, &dummy_def, position)?;
603
604 kimberlite_properties::sometimes!(
605 filter.is_some() || order.is_some() || limit.is_some() || offset.is_some(),
606 "query.materialize_applies_filter_order_limit",
607 "Materialize wrapper applies at least one of filter, order, limit, or offset"
608 );
609
610 if let Some(f) = filter {
612 source_result.rows.retain(|row| f.matches(row));
613 }
614
615 if !case_columns.is_empty() {
617 kimberlite_properties::sometimes!(
618 !source_result.rows.is_empty(),
619 "query.case_when_evaluated",
620 "CASE WHEN computed columns evaluated against at least one row"
621 );
622 for row in &mut source_result.rows {
623 for case_col in case_columns {
624 let val = evaluate_case_column(case_col, row);
625 row.push(val);
626 }
627 }
628 }
629
630 if !scalar_columns.is_empty() {
635 for row in &mut source_result.rows {
636 for sc in scalar_columns {
637 let ctx = crate::expression::EvalContext::new(&sc.columns, row);
638 let val = crate::expression::evaluate(&sc.expr, &ctx)?;
639 row.push(val);
640 }
641 }
642 }
643
644 let source_layout = source_result.columns.clone();
651 let mut full_layout: Vec<ColumnName> = source_layout;
652 full_layout.extend(case_columns.iter().map(|c| c.alias.clone()));
653 full_layout.extend(scalar_columns.iter().map(|c| c.output_name.clone()));
654 let needs_post_project = full_layout.len() != column_names.len()
655 || full_layout
656 .iter()
657 .zip(column_names.iter())
658 .any(|(a, b)| a != b);
659 if needs_post_project {
660 let mut index_map = Vec::with_capacity(column_names.len());
661 for target in column_names {
662 let pos = full_layout
663 .iter()
664 .position(|c| c == target)
665 .ok_or_else(|| QueryError::ColumnNotFound {
666 table: String::new(),
667 column: target.to_string(),
668 })?;
669 index_map.push(pos);
670 }
671 for row in &mut source_result.rows {
672 let projected: Vec<Value> = index_map.iter().map(|&i| row[i].clone()).collect();
673 *row = projected;
674 }
675 }
676
677 if let Some(spec) = order {
679 sort_rows(&mut source_result.rows, spec);
680 }
681
682 apply_offset_and_limit(&mut source_result.rows, *offset, *limit);
684
685 Ok(QueryResult {
687 columns: column_names.to_vec(),
688 rows: source_result.rows,
689 })
690}
691
692fn evaluate_case_column(case_col: &crate::plan::CaseColumnDef, row: &[Value]) -> Value {
694 for clause in &case_col.when_clauses {
695 if clause.condition.matches(row) {
696 return clause.result.clone();
697 }
698 }
699 case_col.else_value.clone()
700}
701
702pub fn execute<S: ProjectionStore>(
704 store: &mut S,
705 plan: &QueryPlan,
706 table_def: &TableDef,
707) -> Result<QueryResult> {
708 execute_internal(store, plan, table_def, None)
709}
710
711pub fn execute_at<S: ProjectionStore>(
713 store: &mut S,
714 plan: &QueryPlan,
715 table_def: &TableDef,
716 position: Offset,
717) -> Result<QueryResult> {
718 execute_internal(store, plan, table_def, Some(position))
719}
720
721fn bounds_to_range(start: &Bound<Key>, end: &Bound<Key>) -> (Key, Key) {
729 let start_key = match start {
730 Bound::Included(k) => k.clone(),
731 Bound::Excluded(k) => successor_key(k),
732 Bound::Unbounded => Key::min(),
733 };
734
735 let end_key = match end {
736 Bound::Included(k) => successor_key(k),
737 Bound::Excluded(k) => k.clone(),
738 Bound::Unbounded => Key::max(),
739 };
740
741 (start_key, end_key)
742}
743
744fn extract_pk_from_index_key(index_key: &Key, metadata: &crate::plan::TableMetadata) -> Key {
753 use crate::key_encoder::{decode_key, encode_key};
754
755 let all_values = decode_key(index_key);
757
758 let pk_count = metadata.primary_key.len();
760
761 debug_assert!(pk_count > 0, "primary key columns must be non-empty");
763 debug_assert!(
764 all_values.len() >= pk_count,
765 "index key must contain at least the primary key values"
766 );
767
768 let pk_values: Vec<Value> = all_values
771 .iter()
772 .skip(all_values.len() - pk_count)
773 .cloned()
774 .collect();
775
776 debug_assert_eq!(
777 pk_values.len(),
778 pk_count,
779 "extracted primary key must have correct number of columns"
780 );
781
782 encode_key(&pk_values)
784}
785
786fn decode_row(bytes: &Bytes, metadata: &crate::plan::TableMetadata) -> Result<Row> {
788 let json: serde_json::Value = serde_json::from_slice(bytes)?;
789
790 let obj = json.as_object().ok_or_else(|| QueryError::TypeMismatch {
791 expected: "object".to_string(),
792 actual: format!("{json:?}"),
793 })?;
794
795 let mut row = Vec::with_capacity(metadata.columns.len());
796
797 for col_def in &metadata.columns {
798 let col_name = col_def.name.as_str();
799 let json_val = obj.get(col_name).unwrap_or(&serde_json::Value::Null);
800 let value = Value::from_json(json_val, col_def.data_type)?;
801 row.push(value);
802 }
803
804 Ok(row)
805}
806
807fn decode_and_project(
809 bytes: &Bytes,
810 columns: &[usize],
811 metadata: &crate::plan::TableMetadata,
812) -> Result<Row> {
813 let full_row = decode_row(bytes, metadata)?;
814 Ok(project_row(&full_row, columns))
815}
816
817fn project_row(full_row: &[Value], columns: &[usize]) -> Row {
819 debug_assert!(
821 columns.iter().all(|&idx| idx < full_row.len()),
822 "column index out of bounds: columns={:?}, row_len={}",
823 columns,
824 full_row.len()
825 );
826
827 if columns.is_empty() {
828 return full_row.to_vec();
830 }
831
832 let projected: Vec<Value> = columns
833 .iter()
834 .map(|&idx| {
835 full_row.get(idx).cloned().unwrap_or_else(|| {
836 panic!(
838 "column index {} out of bounds (row len {})",
839 idx,
840 full_row.len()
841 );
842 })
843 })
844 .collect();
845
846 debug_assert_eq!(
848 projected.len(),
849 columns.len(),
850 "projected row length mismatch"
851 );
852
853 projected
854}
855
856fn sort_rows(rows: &mut [Row], spec: &SortSpec) {
858 rows.sort_by(|a, b| {
859 for (col_idx, order) in &spec.columns {
860 let a_val = a.get(*col_idx);
861 let b_val = b.get(*col_idx);
862
863 let cmp = match (a_val, b_val) {
864 (Some(av), Some(bv)) => av.compare(bv).unwrap_or(Ordering::Equal),
865 (None, None) => Ordering::Equal,
866 (None, Some(_)) => Ordering::Less,
867 (Some(_), None) => Ordering::Greater,
868 };
869
870 if cmp != Ordering::Equal {
871 return match order {
872 ScanOrder::Ascending => cmp,
873 ScanOrder::Descending => cmp.reverse(),
874 };
875 }
876 }
877 Ordering::Equal
878 });
879}
880
881#[allow(clippy::too_many_arguments)]
883fn evaluate_join_conditions(row: &[Value], conditions: &[crate::plan::JoinCondition]) -> bool {
887 use crate::plan::JoinOp;
888
889 conditions.iter().all(|cond| {
890 let left_val = row.get(cond.left_col_idx);
892 let right_val = row.get(cond.right_col_idx);
893
894 if left_val.is_none() || right_val.is_none() {
896 return false;
897 }
898
899 let left_val = left_val.unwrap();
900 let right_val = right_val.unwrap();
901
902 match cond.op {
904 JoinOp::Eq => left_val == right_val,
905 JoinOp::Lt => left_val.compare(right_val) == Some(std::cmp::Ordering::Less),
906 JoinOp::Le => matches!(
907 left_val.compare(right_val),
908 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
909 ),
910 JoinOp::Gt => left_val.compare(right_val) == Some(std::cmp::Ordering::Greater),
911 JoinOp::Ge => matches!(
912 left_val.compare(right_val),
913 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
914 ),
915 }
916 })
917}
918
919fn execute_join<S: ProjectionStore>(
920 store: &mut S,
921 join_type: &crate::parser::JoinType,
922 left: &QueryPlan,
923 right: &QueryPlan,
924 on_conditions: &[crate::plan::JoinCondition],
925 _columns: &[usize],
926 column_names: &[ColumnName],
927 position: Option<Offset>,
928) -> Result<QueryResult> {
929 let left_metadata = left.metadata().ok_or_else(|| {
931 QueryError::UnsupportedFeature("JOIN child plan missing metadata".to_string())
932 })?;
933 let right_metadata = right.metadata().ok_or_else(|| {
934 QueryError::UnsupportedFeature("JOIN child plan missing metadata".to_string())
935 })?;
936
937 let left_table_def = TableDef {
939 table_id: left_metadata.table_id,
940 columns: left_metadata.columns.clone(),
941 primary_key: left_metadata.primary_key.clone(),
942 indexes: vec![], };
944 let right_table_def = TableDef {
945 table_id: right_metadata.table_id,
946 columns: right_metadata.columns.clone(),
947 primary_key: right_metadata.primary_key.clone(),
948 indexes: vec![], };
950
951 let left_result = execute_internal(store, left, &left_table_def, position)?;
953 let right_result = execute_internal(store, right, &right_table_def, position)?;
954
955 let mut output_rows = Vec::new();
956
957 match join_type {
958 crate::parser::JoinType::Inner => {
959 for left_row in &left_result.rows {
961 for right_row in &right_result.rows {
962 let combined_row: Vec<Value> =
964 left_row.iter().chain(right_row.iter()).cloned().collect();
965
966 if evaluate_join_conditions(&combined_row, on_conditions) {
968 output_rows.push(combined_row);
969 kimberlite_properties::sometimes!(
970 output_rows.len() > MAX_JOIN_OUTPUT_ROWS,
971 "query.join_output_row_cap_hit",
972 "INNER JOIN output hits MAX_JOIN_OUTPUT_ROWS (1M) cap"
973 );
974 if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
975 return Err(QueryError::UnsupportedFeature(format!(
976 "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
977 )));
978 }
979 }
980 }
981 }
982 }
983 crate::parser::JoinType::Left => {
984 for left_row in &left_result.rows {
986 let mut matched = false;
987 for right_row in &right_result.rows {
988 let combined_row: Vec<Value> =
990 left_row.iter().chain(right_row.iter()).cloned().collect();
991
992 if evaluate_join_conditions(&combined_row, on_conditions) {
994 output_rows.push(combined_row);
995 matched = true;
996 kimberlite_properties::sometimes!(
997 output_rows.len() > MAX_JOIN_OUTPUT_ROWS,
998 "query.left_join_output_row_cap_hit",
999 "LEFT JOIN output hits MAX_JOIN_OUTPUT_ROWS (1M) cap"
1000 );
1001 if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
1002 return Err(QueryError::UnsupportedFeature(format!(
1003 "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
1004 )));
1005 }
1006 }
1007 }
1008
1009 if !matched {
1011 let right_nulls = vec![Value::Null; right_result.columns.len()];
1012 let combined_row: Vec<Value> = left_row
1013 .iter()
1014 .cloned()
1015 .chain(right_nulls.into_iter())
1016 .collect();
1017 output_rows.push(combined_row);
1018 }
1019 }
1020 }
1021 crate::parser::JoinType::Right => {
1022 for right_row in &right_result.rows {
1026 let mut matched = false;
1027 for left_row in &left_result.rows {
1028 let combined_row: Vec<Value> =
1029 left_row.iter().chain(right_row.iter()).cloned().collect();
1030 if evaluate_join_conditions(&combined_row, on_conditions) {
1031 output_rows.push(combined_row);
1032 matched = true;
1033 if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
1034 return Err(QueryError::UnsupportedFeature(format!(
1035 "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
1036 )));
1037 }
1038 }
1039 }
1040 if !matched {
1041 let left_nulls = vec![Value::Null; left_result.columns.len()];
1042 let combined_row: Vec<Value> = left_nulls
1043 .into_iter()
1044 .chain(right_row.iter().cloned())
1045 .collect();
1046 output_rows.push(combined_row);
1047 }
1048 }
1049 }
1050 crate::parser::JoinType::Full => {
1051 let mut right_matched = vec![false; right_result.rows.len()];
1055 for left_row in &left_result.rows {
1056 let mut matched = false;
1057 for (rj, right_row) in right_result.rows.iter().enumerate() {
1058 let combined_row: Vec<Value> =
1059 left_row.iter().chain(right_row.iter()).cloned().collect();
1060 if evaluate_join_conditions(&combined_row, on_conditions) {
1061 output_rows.push(combined_row);
1062 matched = true;
1063 right_matched[rj] = true;
1064 if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
1065 return Err(QueryError::UnsupportedFeature(format!(
1066 "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
1067 )));
1068 }
1069 }
1070 }
1071 if !matched {
1072 let right_nulls = vec![Value::Null; right_result.columns.len()];
1073 let combined_row: Vec<Value> = left_row
1074 .iter()
1075 .cloned()
1076 .chain(right_nulls.into_iter())
1077 .collect();
1078 output_rows.push(combined_row);
1079 }
1080 }
1081 for (rj, right_row) in right_result.rows.iter().enumerate() {
1083 if !right_matched[rj] {
1084 let left_nulls = vec![Value::Null; left_result.columns.len()];
1085 let combined_row: Vec<Value> = left_nulls
1086 .into_iter()
1087 .chain(right_row.iter().cloned())
1088 .collect();
1089 output_rows.push(combined_row);
1090 }
1091 }
1092 }
1093 crate::parser::JoinType::Cross => {
1094 let estimated = left_result
1098 .rows
1099 .len()
1100 .saturating_mul(right_result.rows.len());
1101 if estimated > MAX_JOIN_OUTPUT_ROWS {
1102 return Err(QueryError::UnsupportedFeature(format!(
1103 "CROSS JOIN cardinality {estimated} exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective query"
1104 )));
1105 }
1106 for left_row in &left_result.rows {
1107 for right_row in &right_result.rows {
1108 let combined_row: Vec<Value> =
1109 left_row.iter().chain(right_row.iter()).cloned().collect();
1110 output_rows.push(combined_row);
1111 }
1112 }
1113 }
1114 }
1115
1116 kimberlite_properties::sometimes!(
1117 output_rows.len() > 1,
1118 "query.join_produces_multi_row_output",
1119 "join execution produces more than one output row"
1120 );
1121
1122 Ok(QueryResult {
1123 columns: column_names.to_vec(),
1124 rows: output_rows,
1125 })
1126}
1127
1128#[allow(clippy::too_many_arguments)]
1130fn execute_aggregate<S: ProjectionStore>(
1131 store: &mut S,
1132 source: &QueryPlan,
1133 group_by_cols: &[usize],
1134 aggregates: &[crate::parser::AggregateFunction],
1135 aggregate_filters: &[Option<crate::plan::Filter>],
1136 column_names: &[ColumnName],
1137 metadata: &crate::plan::TableMetadata,
1138 having: &[crate::parser::HavingCondition],
1139 position: Option<Offset>,
1140) -> Result<QueryResult> {
1141 use std::collections::HashMap;
1142
1143 let dummy_table_def = TableDef {
1146 table_id: metadata.table_id,
1147 columns: metadata.columns.clone(),
1148 primary_key: metadata.primary_key.clone(),
1149 indexes: vec![],
1150 };
1151 let source_result = execute_internal(store, source, &dummy_table_def, position)?;
1152
1153 let mut groups: HashMap<Vec<Value>, AggregateState> = HashMap::new();
1155
1156 for row in source_result.rows {
1157 let group_key: Vec<Value> = if group_by_cols.is_empty() {
1159 vec![]
1161 } else {
1162 group_by_cols
1163 .iter()
1164 .map(|&idx| row.get(idx).cloned().unwrap_or(Value::Null))
1165 .collect()
1166 };
1167
1168 kimberlite_properties::sometimes!(
1170 !groups.contains_key(&group_key) && groups.len() >= MAX_GROUP_COUNT,
1171 "query.group_by_cardinality_cap_hit",
1172 "GROUP BY hits MAX_GROUP_COUNT (100k) distinct group cap"
1173 );
1174 if !groups.contains_key(&group_key) && groups.len() >= MAX_GROUP_COUNT {
1175 return Err(QueryError::UnsupportedFeature(format!(
1176 "GROUP BY cardinality exceeds maximum of {MAX_GROUP_COUNT} distinct groups"
1177 )));
1178 }
1179
1180 let state = groups.entry(group_key).or_insert_with(AggregateState::new);
1182 state.update(&row, aggregates, aggregate_filters, metadata)?;
1183 }
1184
1185 let group_by_count = group_by_cols.len();
1187 let mut result_rows = Vec::new();
1188 for (group_key, state) in groups {
1189 let agg_values = state.finalize(aggregates);
1190
1191 if !having.is_empty() && !evaluate_having(having, aggregates, &agg_values, group_by_count) {
1193 continue;
1194 }
1195
1196 let mut result_row = group_key; result_row.extend(agg_values); result_rows.push(result_row);
1199 }
1200
1201 if result_rows.is_empty() && group_by_cols.is_empty() && having.is_empty() {
1203 let state = AggregateState::new();
1204 let agg_values = state.finalize(aggregates);
1205 result_rows.push(agg_values);
1206 }
1207
1208 Ok(QueryResult {
1209 columns: column_names.to_vec(),
1210 rows: result_rows,
1211 })
1212}
1213
1214fn evaluate_having(
1218 having: &[crate::parser::HavingCondition],
1219 aggregates: &[crate::parser::AggregateFunction],
1220 agg_values: &[Value],
1221 _group_by_count: usize,
1222) -> bool {
1223 having.iter().all(|condition| match condition {
1224 crate::parser::HavingCondition::AggregateComparison {
1225 aggregate,
1226 op,
1227 value,
1228 } => {
1229 let agg_idx = aggregates.iter().position(|a| a == aggregate);
1231 let Some(idx) = agg_idx else {
1232 return false;
1233 };
1234 let Some(agg_value) = agg_values.get(idx) else {
1235 return false;
1236 };
1237
1238 match op {
1240 crate::parser::HavingOp::Eq => agg_value == value,
1241 crate::parser::HavingOp::Lt => agg_value.compare(value) == Some(Ordering::Less),
1242 crate::parser::HavingOp::Le => matches!(
1243 agg_value.compare(value),
1244 Some(Ordering::Less | Ordering::Equal)
1245 ),
1246 crate::parser::HavingOp::Gt => agg_value.compare(value) == Some(Ordering::Greater),
1247 crate::parser::HavingOp::Ge => matches!(
1248 agg_value.compare(value),
1249 Some(Ordering::Greater | Ordering::Equal)
1250 ),
1251 }
1252 }
1253 })
1254}
1255
1256#[derive(Debug, Clone)]
1258struct AggregateState {
1259 count: i64,
1260 per_agg_counts: Vec<i64>,
1264 non_null_counts: Vec<i64>, sums: Vec<Option<Value>>,
1266 mins: Vec<Option<Value>>,
1267 maxs: Vec<Option<Value>>,
1268}
1269
1270impl AggregateState {
1271 fn new() -> Self {
1272 Self {
1273 count: 0,
1274 per_agg_counts: Vec::new(),
1275 non_null_counts: Vec::new(),
1276 sums: Vec::new(),
1277 mins: Vec::new(),
1278 maxs: Vec::new(),
1279 }
1280 }
1281
1282 fn update(
1283 &mut self,
1284 row: &[Value],
1285 aggregates: &[crate::parser::AggregateFunction],
1286 aggregate_filters: &[Option<crate::plan::Filter>],
1287 metadata: &crate::plan::TableMetadata,
1288 ) -> Result<()> {
1289 debug_assert!(!row.is_empty(), "row must have at least one column");
1291
1292 assert!(
1295 aggregates.len() <= MAX_AGGREGATES_PER_QUERY,
1296 "too many aggregates ({} > {})",
1297 aggregates.len(),
1298 MAX_AGGREGATES_PER_QUERY
1299 );
1300
1301 let any_filter = aggregate_filters.iter().any(std::option::Option::is_some);
1304 if !any_filter {
1305 self.count += 1;
1306 }
1307
1308 while self.sums.len() < aggregates.len() {
1310 self.non_null_counts.push(0);
1311 self.sums.push(None);
1312 self.mins.push(None);
1313 self.maxs.push(None);
1314 self.per_agg_counts.push(0);
1315 }
1316
1317 debug_assert_eq!(
1319 self.sums.len(),
1320 self.non_null_counts.len(),
1321 "aggregate state vectors out of sync"
1322 );
1323 debug_assert_eq!(self.sums.len(), self.mins.len());
1324 debug_assert_eq!(self.sums.len(), self.maxs.len());
1325
1326 let find_col_idx = |col: &ColumnName| -> usize {
1328 metadata
1329 .columns
1330 .iter()
1331 .position(|c| &c.name == col)
1332 .unwrap_or(0)
1333 };
1334
1335 for (i, agg) in aggregates.iter().enumerate() {
1336 if let Some(Some(filter)) = aggregate_filters.get(i) {
1340 if !filter.matches(row) {
1341 continue;
1342 }
1343 }
1344 self.per_agg_counts[i] += 1;
1347 match agg {
1348 crate::parser::AggregateFunction::CountStar => {
1349 }
1351 crate::parser::AggregateFunction::Count(col) => {
1352 let col_idx = find_col_idx(col);
1354 if let Some(val) = row.get(col_idx) {
1355 if !val.is_null() {
1356 self.non_null_counts[i] += 1;
1357 }
1358 }
1359 }
1360 crate::parser::AggregateFunction::Sum(col) => {
1361 let col_idx = find_col_idx(col);
1362 if let Some(val) = row.get(col_idx) {
1363 if !val.is_null() {
1364 self.sums[i] = Some(add_values(&self.sums[i], val)?);
1365 }
1366 }
1367 }
1368 crate::parser::AggregateFunction::Avg(col) => {
1369 let col_idx = find_col_idx(col);
1371 if let Some(val) = row.get(col_idx) {
1372 if !val.is_null() {
1373 self.sums[i] = Some(add_values(&self.sums[i], val)?);
1374 }
1375 }
1376 }
1377 crate::parser::AggregateFunction::Min(col) => {
1378 let col_idx = find_col_idx(col);
1379 if let Some(val) = row.get(col_idx) {
1380 if !val.is_null() {
1381 self.mins[i] = Some(min_value(&self.mins[i], val));
1382 }
1383 }
1384 }
1385 crate::parser::AggregateFunction::Max(col) => {
1386 let col_idx = find_col_idx(col);
1387 if let Some(val) = row.get(col_idx) {
1388 if !val.is_null() {
1389 self.maxs[i] = Some(max_value(&self.maxs[i], val));
1390 }
1391 }
1392 }
1393 }
1394 }
1395
1396 debug_assert_eq!(
1398 self.sums.len(),
1399 aggregates.len(),
1400 "aggregate state must match aggregate count after update"
1401 );
1402
1403 Ok(())
1404 }
1405
1406 fn finalize(&self, aggregates: &[crate::parser::AggregateFunction]) -> Vec<Value> {
1407 let mut result = Vec::new();
1408
1409 for (i, agg) in aggregates.iter().enumerate() {
1414 let per_agg_count = self.per_agg_counts.get(i).copied().unwrap_or(self.count);
1415 let value = match agg {
1416 crate::parser::AggregateFunction::CountStar => Value::BigInt(per_agg_count),
1417 crate::parser::AggregateFunction::Count(_) => {
1418 Value::BigInt(self.non_null_counts.get(i).copied().unwrap_or(0))
1420 }
1421 crate::parser::AggregateFunction::Sum(_) => self
1422 .sums
1423 .get(i)
1424 .and_then(std::clone::Clone::clone)
1425 .unwrap_or(Value::Null),
1426 crate::parser::AggregateFunction::Avg(_) => {
1427 if per_agg_count == 0 {
1429 Value::Null
1430 } else {
1431 kimberlite_properties::never!(
1434 per_agg_count == 0,
1435 "query.avg_divide_by_zero",
1436 "AVG divide_value must never be reached with per_agg_count == 0"
1437 );
1438 match self.sums.get(i).and_then(|v| v.as_ref()) {
1439 Some(sum) => divide_value(sum, per_agg_count).unwrap_or(Value::Null),
1440 None => Value::Null,
1441 }
1442 }
1443 }
1444 crate::parser::AggregateFunction::Min(_) => self
1445 .mins
1446 .get(i)
1447 .and_then(std::clone::Clone::clone)
1448 .unwrap_or(Value::Null),
1449 crate::parser::AggregateFunction::Max(_) => self
1450 .maxs
1451 .get(i)
1452 .and_then(std::clone::Clone::clone)
1453 .unwrap_or(Value::Null),
1454 };
1455 result.push(value);
1456 }
1457
1458 result
1459 }
1460}
1461
1462fn add_values(a: &Option<Value>, b: &Value) -> Result<Value> {
1467 match a {
1468 None => Ok(b.clone()),
1469 Some(a_val) => match (a_val, b) {
1470 (Value::BigInt(x), Value::BigInt(y)) => {
1471 let checked = x.checked_add(*y);
1472 kimberlite_properties::sometimes!(
1476 checked.is_none(),
1477 "query.sum_bigint_overflow_detected",
1478 "SUM(BIGINT) overflow detected by checked_add"
1479 );
1480 if let Some(sum) = checked {
1481 kimberlite_properties::never!(
1484 sum != x.wrapping_add(*y)
1485 || (*x > 0 && *y > 0 && sum < 0)
1486 || (*x < 0 && *y < 0 && sum > 0),
1487 "query.sum_bigint_silent_wrap",
1488 "SUM(BIGINT) checked_add returned Some() for an overflowing result"
1489 );
1490 Ok(Value::BigInt(sum))
1491 } else {
1492 Err(QueryError::TypeMismatch {
1493 expected: "BigInt (non-overflowing)".to_string(),
1494 actual: format!("overflow: {x} + {y}"),
1495 })
1496 }
1497 }
1498 (Value::Integer(x), Value::Integer(y)) => x
1499 .checked_add(*y)
1500 .map(Value::Integer)
1501 .ok_or_else(|| QueryError::TypeMismatch {
1502 expected: "Integer (non-overflowing)".to_string(),
1503 actual: format!("overflow: {x} + {y}"),
1504 }),
1505 (Value::SmallInt(x), Value::SmallInt(y)) => x
1506 .checked_add(*y)
1507 .map(Value::SmallInt)
1508 .ok_or_else(|| QueryError::TypeMismatch {
1509 expected: "SmallInt (non-overflowing)".to_string(),
1510 actual: format!("overflow: {x} + {y}"),
1511 }),
1512 (Value::TinyInt(x), Value::TinyInt(y)) => x
1513 .checked_add(*y)
1514 .map(Value::TinyInt)
1515 .ok_or_else(|| QueryError::TypeMismatch {
1516 expected: "TinyInt (non-overflowing)".to_string(),
1517 actual: format!("overflow: {x} + {y}"),
1518 }),
1519 (Value::Real(x), Value::Real(y)) => Ok(Value::Real(x + y)),
1520 (Value::Decimal(x, sx), Value::Decimal(y, sy)) if sx == sy => x
1521 .checked_add(*y)
1522 .map(|sum| Value::Decimal(sum, *sx))
1523 .ok_or_else(|| QueryError::TypeMismatch {
1524 expected: "Decimal (non-overflowing)".to_string(),
1525 actual: format!("overflow: {x} + {y}"),
1526 }),
1527 _ => Err(QueryError::TypeMismatch {
1528 expected: format!("{a_val:?}"),
1529 actual: format!("{b:?}"),
1530 }),
1531 },
1532 }
1533}
1534
1535fn min_value(a: &Option<Value>, b: &Value) -> Value {
1537 match a {
1538 None => b.clone(),
1539 Some(a_val) => {
1540 if let Some(ord) = a_val.compare(b) {
1541 if ord == Ordering::Less {
1542 a_val.clone()
1543 } else {
1544 b.clone()
1545 }
1546 } else {
1547 a_val.clone() }
1549 }
1550 }
1551}
1552
1553fn max_value(a: &Option<Value>, b: &Value) -> Value {
1555 match a {
1556 None => b.clone(),
1557 Some(a_val) => {
1558 if let Some(ord) = a_val.compare(b) {
1559 if ord == Ordering::Greater {
1560 a_val.clone()
1561 } else {
1562 b.clone()
1563 }
1564 } else {
1565 a_val.clone() }
1567 }
1568 }
1569}
1570
1571#[allow(clippy::cast_precision_loss)]
1576fn divide_value(val: &Value, count: i64) -> Option<Value> {
1577 if count == 0 {
1579 return Some(Value::Null);
1580 }
1581
1582 match val {
1583 Value::BigInt(x) => Some(Value::Real(*x as f64 / count as f64)),
1584 Value::Integer(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1585 Value::SmallInt(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1586 Value::TinyInt(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1587 Value::Real(x) => Some(Value::Real(x / count as f64)),
1588 Value::Decimal(x, scale) => {
1589 let divisor = 10_i128.pow(u32::from(*scale));
1591 let float_val = *x as f64 / divisor as f64;
1592 Some(Value::Real(float_val / count as f64))
1593 }
1594 _ => None,
1595 }
1596}
1597
1598#[cfg(test)]
1599mod tests {
1600 use super::*;
1601 use crate::plan::Filter;
1602 use crate::plan::FilterCondition;
1603 use crate::plan::FilterOp;
1604
1605 #[test]
1606 fn test_project_row() {
1607 let row = vec![
1608 Value::BigInt(1),
1609 Value::Text("alice".to_string()),
1610 Value::BigInt(30),
1611 ];
1612
1613 let projected = project_row(&row, &[0, 2]);
1614 assert_eq!(projected, vec![Value::BigInt(1), Value::BigInt(30)]);
1615 }
1616
1617 #[test]
1618 fn test_project_row_all() {
1619 let row = vec![Value::BigInt(1), Value::Text("bob".to_string())];
1620 let projected = project_row(&row, &[]);
1621 assert_eq!(projected, row);
1622 }
1623
1624 #[test]
1625 fn test_filter_matches() {
1626 let row = vec![Value::BigInt(42), Value::Text("alice".to_string())];
1627
1628 let filter = Filter::single(FilterCondition {
1629 column_idx: 0,
1630 op: FilterOp::Eq,
1631 value: Value::BigInt(42),
1632 });
1633
1634 assert!(filter.matches(&row));
1635
1636 let filter_miss = Filter::single(FilterCondition {
1637 column_idx: 0,
1638 op: FilterOp::Eq,
1639 value: Value::BigInt(99),
1640 });
1641
1642 assert!(!filter_miss.matches(&row));
1643 }
1644
1645 #[test]
1646 fn test_sort_rows() {
1647 let mut rows = vec![
1648 vec![Value::BigInt(3), Value::Text("c".to_string())],
1649 vec![Value::BigInt(1), Value::Text("a".to_string())],
1650 vec![Value::BigInt(2), Value::Text("b".to_string())],
1651 ];
1652
1653 let spec = SortSpec {
1654 columns: vec![(0, ScanOrder::Ascending)],
1655 };
1656
1657 sort_rows(&mut rows, &spec);
1658
1659 assert_eq!(rows[0][0], Value::BigInt(1));
1660 assert_eq!(rows[1][0], Value::BigInt(2));
1661 assert_eq!(rows[2][0], Value::BigInt(3));
1662 }
1663
1664 #[test]
1665 fn test_sort_rows_descending() {
1666 let mut rows = vec![
1667 vec![Value::BigInt(1)],
1668 vec![Value::BigInt(3)],
1669 vec![Value::BigInt(2)],
1670 ];
1671
1672 let spec = SortSpec {
1673 columns: vec![(0, ScanOrder::Descending)],
1674 };
1675
1676 sort_rows(&mut rows, &spec);
1677
1678 assert_eq!(rows[0][0], Value::BigInt(3));
1679 assert_eq!(rows[1][0], Value::BigInt(2));
1680 assert_eq!(rows[2][0], Value::BigInt(1));
1681 }
1682}