1#![allow(clippy::ref_option)]
4#![allow(clippy::trivially_copy_pass_by_ref)]
5#![allow(clippy::items_after_statements)]
6
7use std::cmp::Ordering;
8use std::ops::Bound;
9
10const SCAN_LIMIT_MULTIPLIER_WITH_SORT: usize = 10;
20
21const SCAN_LIMIT_MULTIPLIER_NO_SORT: usize = 2;
26
27const DEFAULT_SCAN_LIMIT: usize = 10_000;
35
36const MAX_AGGREGATES_PER_QUERY: usize = 100;
42
43const MAX_JOIN_OUTPUT_ROWS: usize = 1_000_000;
49
50const MAX_GROUP_COUNT: usize = 100_000;
55
56use bytes::Bytes;
57use kimberlite_store::{Key, ProjectionStore, TableId};
58use kimberlite_types::Offset;
59
60use crate::error::{QueryError, Result};
61use crate::key_encoder::successor_key;
62use crate::plan::{QueryPlan, ScanOrder, SortSpec};
63use crate::schema::{ColumnName, TableDef};
64use crate::value::Value;
65
66#[derive(Debug, Clone)]
68pub struct QueryResult {
69 pub columns: Vec<ColumnName>,
71 pub rows: Vec<Row>,
73}
74
75impl QueryResult {
76 pub fn empty(columns: Vec<ColumnName>) -> Self {
78 Self {
79 columns,
80 rows: vec![],
81 }
82 }
83
84 pub fn len(&self) -> usize {
86 self.rows.len()
87 }
88
89 pub fn is_empty(&self) -> bool {
91 self.rows.is_empty()
92 }
93}
94
95pub type Row = Vec<Value>;
97
98#[allow(clippy::too_many_arguments)]
100fn execute_index_scan<S: ProjectionStore>(
101 store: &mut S,
102 metadata: &crate::plan::TableMetadata,
103 index_id: u64,
104 start: &Bound<Key>,
105 end: &Bound<Key>,
106 filter: &Option<crate::plan::Filter>,
107 limit: &Option<usize>,
108 order: &ScanOrder,
109 order_by: &Option<crate::plan::SortSpec>,
110 columns: &[usize],
111 column_names: &[ColumnName],
112 position: Option<Offset>,
113) -> Result<QueryResult> {
114 let (start_key, end_key) = bounds_to_range(start, end);
115
116 let scan_limit = if order_by.is_some() {
118 limit
119 .map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_WITH_SORT))
120 .unwrap_or(DEFAULT_SCAN_LIMIT)
121 } else {
122 limit
123 .map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_NO_SORT))
124 .unwrap_or(DEFAULT_SCAN_LIMIT)
125 };
126
127 debug_assert!(scan_limit > 0, "scan_limit must be positive");
129
130 use std::collections::hash_map::DefaultHasher;
132 use std::hash::{Hash, Hasher};
133
134 let mut hasher = DefaultHasher::new();
135 metadata.table_id.as_u64().hash(&mut hasher);
136 index_id.hash(&mut hasher);
137 let index_table_id = TableId::new(hasher.finish());
138
139 let index_pairs = match position {
141 Some(pos) => store.scan_at(index_table_id, start_key..end_key, scan_limit, pos)?,
142 None => store.scan(index_table_id, start_key..end_key, scan_limit)?,
143 };
144
145 let mut full_rows = Vec::new();
146 let index_iter: Box<dyn Iterator<Item = &(Key, Bytes)>> = match order {
147 ScanOrder::Ascending => Box::new(index_pairs.iter()),
148 ScanOrder::Descending => Box::new(index_pairs.iter().rev()),
149 };
150
151 for (index_key, _) in index_iter {
152 let pk_key = extract_pk_from_index_key(index_key, metadata);
154
155 let bytes_opt = match position {
157 Some(pos) => store.get_at(metadata.table_id, &pk_key, pos)?,
158 None => store.get(metadata.table_id, &pk_key)?,
159 };
160 if let Some(bytes) = bytes_opt {
161 let full_row = decode_row(&bytes, metadata)?;
162
163 if let Some(f) = filter {
165 if !f.matches(&full_row) {
166 continue;
167 }
168 }
169
170 full_rows.push(full_row);
171
172 if order_by.is_none() {
174 if let Some(lim) = limit {
175 if full_rows.len() >= *lim {
176 break;
177 }
178 }
179 }
180 }
181 }
182
183 if let Some(sort_spec) = order_by {
185 sort_rows(&mut full_rows, sort_spec);
186 }
187
188 if let Some(lim) = limit {
190 full_rows.truncate(*lim);
191 }
192
193 let rows: Vec<Row> = full_rows
195 .iter()
196 .map(|full_row| project_row(full_row, columns))
197 .collect();
198
199 Ok(QueryResult {
200 columns: column_names.to_vec(),
201 rows,
202 })
203}
204
205#[allow(clippy::too_many_arguments)]
207fn execute_table_scan<S: ProjectionStore>(
208 store: &mut S,
209 metadata: &crate::plan::TableMetadata,
210 filter: &Option<crate::plan::Filter>,
211 limit: &Option<usize>,
212 order: &Option<SortSpec>,
213 columns: &[usize],
214 column_names: &[ColumnName],
215 position: Option<Offset>,
216) -> Result<QueryResult> {
217 let scan_limit = limit.map(|l| l * 10).unwrap_or(100_000);
219 let pairs = match position {
220 Some(pos) => store.scan_at(metadata.table_id, Key::min()..Key::max(), scan_limit, pos)?,
221 None => store.scan(metadata.table_id, Key::min()..Key::max(), scan_limit)?,
222 };
223
224 let mut full_rows = Vec::new();
225
226 for (_, bytes) in &pairs {
227 let full_row = decode_row(bytes, metadata)?;
228
229 if let Some(f) = filter {
231 if !f.matches(&full_row) {
232 continue;
233 }
234 }
235
236 full_rows.push(full_row);
237 }
238
239 if let Some(sort_spec) = order {
241 sort_rows(&mut full_rows, sort_spec);
242 }
243
244 if let Some(lim) = limit {
246 full_rows.truncate(*lim);
247 }
248
249 let rows: Vec<Row> = full_rows
251 .iter()
252 .map(|full_row| project_row(full_row, columns))
253 .collect();
254
255 Ok(QueryResult {
256 columns: column_names.to_vec(),
257 rows,
258 })
259}
260
261#[allow(clippy::too_many_arguments)]
263fn execute_range_scan<S: ProjectionStore>(
264 store: &mut S,
265 metadata: &crate::plan::TableMetadata,
266 start: &Bound<Key>,
267 end: &Bound<Key>,
268 filter: &Option<crate::plan::Filter>,
269 limit: &Option<usize>,
270 order: &ScanOrder,
271 order_by: &Option<crate::plan::SortSpec>,
272 columns: &[usize],
273 column_names: &[ColumnName],
274 position: Option<Offset>,
275) -> Result<QueryResult> {
276 let (start_key, end_key) = bounds_to_range(start, end);
277
278 let scan_limit = if order_by.is_some() {
280 limit
281 .map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_WITH_SORT))
282 .unwrap_or(DEFAULT_SCAN_LIMIT)
283 } else {
284 limit
285 .map(|l| l.saturating_mul(SCAN_LIMIT_MULTIPLIER_NO_SORT))
286 .unwrap_or(DEFAULT_SCAN_LIMIT)
287 };
288
289 debug_assert!(scan_limit > 0, "scan_limit must be positive");
291
292 let pairs = match position {
293 Some(pos) => store.scan_at(metadata.table_id, start_key..end_key, scan_limit, pos)?,
294 None => store.scan(metadata.table_id, start_key..end_key, scan_limit)?,
295 };
296
297 let mut full_rows = Vec::new();
298 let row_iter: Box<dyn Iterator<Item = &(Key, Bytes)>> = match order {
299 ScanOrder::Ascending => Box::new(pairs.iter()),
300 ScanOrder::Descending => Box::new(pairs.iter().rev()),
301 };
302
303 for (_, bytes) in row_iter {
304 let full_row = decode_row(bytes, metadata)?;
305
306 if let Some(f) = filter {
308 if !f.matches(&full_row) {
309 continue;
310 }
311 }
312
313 full_rows.push(full_row);
314
315 if order_by.is_none() {
317 if let Some(lim) = limit {
318 if full_rows.len() >= *lim {
319 break;
320 }
321 }
322 }
323 }
324
325 if let Some(sort_spec) = order_by {
327 sort_rows(&mut full_rows, sort_spec);
328 }
329
330 if let Some(lim) = limit {
332 full_rows.truncate(*lim);
333 }
334
335 let rows: Vec<Row> = full_rows
337 .iter()
338 .map(|full_row| project_row(full_row, columns))
339 .collect();
340
341 Ok(QueryResult {
342 columns: column_names.to_vec(),
343 rows,
344 })
345}
346
347fn execute_point_lookup<S: ProjectionStore>(
349 store: &mut S,
350 metadata: &crate::plan::TableMetadata,
351 key: &Key,
352 columns: &[usize],
353 column_names: &[ColumnName],
354 position: Option<Offset>,
355) -> Result<QueryResult> {
356 let result = match position {
357 Some(pos) => store.get_at(metadata.table_id, key, pos)?,
358 None => store.get(metadata.table_id, key)?,
359 };
360 match result {
361 Some(bytes) => {
362 let row = decode_and_project(&bytes, columns, metadata)?;
363 Ok(QueryResult {
364 columns: column_names.to_vec(),
365 rows: vec![row],
366 })
367 }
368 None => Ok(QueryResult::empty(column_names.to_vec())),
369 }
370}
371
372#[allow(clippy::too_many_lines, clippy::used_underscore_binding)]
374fn execute_internal<S: ProjectionStore>(
375 store: &mut S,
376 plan: &QueryPlan,
377 _table_def: &TableDef, position: Option<Offset>,
379) -> Result<QueryResult> {
380 kimberlite_properties::sometimes!(
382 position.is_some(),
383 "query.time_travel_at_position",
384 "query executes at a pinned historical log offset"
385 );
386 let result = execute_internal_inner(store, plan, _table_def, position)?;
387
388 #[cfg(any(test, feature = "sim"))]
390 {
391 let _expected_cols = plan.column_names().len();
392 kimberlite_properties::always!(
393 result.columns.len() == _expected_cols,
394 "query.result_columns_match_plan",
395 "query result column count must equal plan-declared schema column count"
396 );
397 kimberlite_properties::always!(
399 result.rows.iter().all(|r| r.len() == _expected_cols),
400 "query.row_width_matches_columns",
401 "every result row must have width equal to declared column count"
402 );
403 }
404 Ok(result)
405}
406
407#[allow(clippy::too_many_lines)]
408fn execute_internal_inner<S: ProjectionStore>(
409 store: &mut S,
410 plan: &QueryPlan,
411 _table_def: &TableDef,
412 position: Option<Offset>,
413) -> Result<QueryResult> {
414 match plan {
415 QueryPlan::PointLookup {
416 metadata,
417 key,
418 columns,
419 column_names,
420 } => execute_point_lookup(store, metadata, key, columns, column_names, position),
421
422 QueryPlan::RangeScan {
423 metadata,
424 start,
425 end,
426 filter,
427 limit,
428 order,
429 order_by,
430 columns,
431 column_names,
432 } => execute_range_scan(
433 store,
434 metadata,
435 start,
436 end,
437 filter,
438 limit,
439 order,
440 order_by,
441 columns,
442 column_names,
443 position,
444 ),
445
446 QueryPlan::IndexScan {
447 metadata,
448 index_id,
449 start,
450 end,
451 filter,
452 limit,
453 order,
454 order_by,
455 columns,
456 column_names,
457 ..
458 } => execute_index_scan(
459 store,
460 metadata,
461 *index_id,
462 start,
463 end,
464 filter,
465 limit,
466 order,
467 order_by,
468 columns,
469 column_names,
470 position,
471 ),
472
473 QueryPlan::TableScan {
474 metadata,
475 filter,
476 limit,
477 order,
478 columns,
479 column_names,
480 } => execute_table_scan(
481 store,
482 metadata,
483 filter,
484 limit,
485 order,
486 columns,
487 column_names,
488 position,
489 ),
490
491 QueryPlan::Aggregate {
492 metadata,
493 source,
494 group_by_cols,
495 group_by_names: _,
496 aggregates,
497 column_names,
498 having,
499 } => execute_aggregate(
500 store,
501 source,
502 group_by_cols,
503 aggregates,
504 column_names,
505 metadata,
506 having,
507 position,
508 ),
509
510 QueryPlan::Join {
511 join_type,
512 left,
513 right,
514 on_conditions,
515 columns,
516 column_names,
517 } => execute_join(
518 store,
519 join_type,
520 left,
521 right,
522 on_conditions,
523 columns,
524 column_names,
525 position,
526 ),
527
528 QueryPlan::Materialize {
529 source,
530 filter,
531 case_columns,
532 order,
533 limit,
534 column_names,
535 } => execute_materialize(
536 store,
537 source,
538 filter,
539 case_columns,
540 order,
541 limit,
542 column_names,
543 position,
544 ),
545 }
546}
547
548#[allow(clippy::too_many_arguments)]
550fn execute_materialize<S: ProjectionStore>(
551 store: &mut S,
552 source: &QueryPlan,
553 filter: &Option<crate::plan::Filter>,
554 case_columns: &[crate::plan::CaseColumnDef],
555 order: &Option<SortSpec>,
556 limit: &Option<usize>,
557 column_names: &[ColumnName],
558 position: Option<Offset>,
559) -> Result<QueryResult> {
560 let dummy_def = TableDef {
562 table_id: kimberlite_store::TableId::from(0u64),
563 columns: vec![],
564 primary_key: vec![],
565 indexes: vec![],
566 };
567 let mut source_result = execute_internal(store, source, &dummy_def, position)?;
568
569 kimberlite_properties::sometimes!(
570 filter.is_some() || order.is_some() || limit.is_some(),
571 "query.materialize_applies_filter_order_limit",
572 "Materialize wrapper applies at least one of filter, order, or limit"
573 );
574
575 if let Some(f) = filter {
577 source_result.rows.retain(|row| f.matches(row));
578 }
579
580 if !case_columns.is_empty() {
582 kimberlite_properties::sometimes!(
583 !source_result.rows.is_empty(),
584 "query.case_when_evaluated",
585 "CASE WHEN computed columns evaluated against at least one row"
586 );
587 for row in &mut source_result.rows {
588 for case_col in case_columns {
589 let val = evaluate_case_column(case_col, row);
590 row.push(val);
591 }
592 }
593 }
594
595 if let Some(spec) = order {
597 sort_rows(&mut source_result.rows, spec);
598 }
599
600 if let Some(n) = limit {
602 source_result.rows.truncate(*n);
603 }
604
605 Ok(QueryResult {
607 columns: column_names.to_vec(),
608 rows: source_result.rows,
609 })
610}
611
612fn evaluate_case_column(case_col: &crate::plan::CaseColumnDef, row: &[Value]) -> Value {
614 for clause in &case_col.when_clauses {
615 if clause.condition.matches(row) {
616 return clause.result.clone();
617 }
618 }
619 case_col.else_value.clone()
620}
621
622pub fn execute<S: ProjectionStore>(
624 store: &mut S,
625 plan: &QueryPlan,
626 table_def: &TableDef,
627) -> Result<QueryResult> {
628 execute_internal(store, plan, table_def, None)
629}
630
631pub fn execute_at<S: ProjectionStore>(
633 store: &mut S,
634 plan: &QueryPlan,
635 table_def: &TableDef,
636 position: Offset,
637) -> Result<QueryResult> {
638 execute_internal(store, plan, table_def, Some(position))
639}
640
641fn bounds_to_range(start: &Bound<Key>, end: &Bound<Key>) -> (Key, Key) {
649 let start_key = match start {
650 Bound::Included(k) => k.clone(),
651 Bound::Excluded(k) => successor_key(k),
652 Bound::Unbounded => Key::min(),
653 };
654
655 let end_key = match end {
656 Bound::Included(k) => successor_key(k),
657 Bound::Excluded(k) => k.clone(),
658 Bound::Unbounded => Key::max(),
659 };
660
661 (start_key, end_key)
662}
663
664fn extract_pk_from_index_key(index_key: &Key, metadata: &crate::plan::TableMetadata) -> Key {
673 use crate::key_encoder::{decode_key, encode_key};
674
675 let all_values = decode_key(index_key);
677
678 let pk_count = metadata.primary_key.len();
680
681 debug_assert!(pk_count > 0, "primary key columns must be non-empty");
683 debug_assert!(
684 all_values.len() >= pk_count,
685 "index key must contain at least the primary key values"
686 );
687
688 let pk_values: Vec<Value> = all_values
691 .iter()
692 .skip(all_values.len() - pk_count)
693 .cloned()
694 .collect();
695
696 debug_assert_eq!(
697 pk_values.len(),
698 pk_count,
699 "extracted primary key must have correct number of columns"
700 );
701
702 encode_key(&pk_values)
704}
705
706fn decode_row(bytes: &Bytes, metadata: &crate::plan::TableMetadata) -> Result<Row> {
708 let json: serde_json::Value = serde_json::from_slice(bytes)?;
709
710 let obj = json.as_object().ok_or_else(|| QueryError::TypeMismatch {
711 expected: "object".to_string(),
712 actual: format!("{json:?}"),
713 })?;
714
715 let mut row = Vec::with_capacity(metadata.columns.len());
716
717 for col_def in &metadata.columns {
718 let col_name = col_def.name.as_str();
719 let json_val = obj.get(col_name).unwrap_or(&serde_json::Value::Null);
720 let value = Value::from_json(json_val, col_def.data_type)?;
721 row.push(value);
722 }
723
724 Ok(row)
725}
726
727fn decode_and_project(
729 bytes: &Bytes,
730 columns: &[usize],
731 metadata: &crate::plan::TableMetadata,
732) -> Result<Row> {
733 let full_row = decode_row(bytes, metadata)?;
734 Ok(project_row(&full_row, columns))
735}
736
737fn project_row(full_row: &[Value], columns: &[usize]) -> Row {
739 debug_assert!(
741 columns.iter().all(|&idx| idx < full_row.len()),
742 "column index out of bounds: columns={:?}, row_len={}",
743 columns,
744 full_row.len()
745 );
746
747 if columns.is_empty() {
748 return full_row.to_vec();
750 }
751
752 let projected: Vec<Value> = columns
753 .iter()
754 .map(|&idx| {
755 full_row.get(idx).cloned().unwrap_or_else(|| {
756 panic!(
758 "column index {} out of bounds (row len {})",
759 idx,
760 full_row.len()
761 );
762 })
763 })
764 .collect();
765
766 debug_assert_eq!(
768 projected.len(),
769 columns.len(),
770 "projected row length mismatch"
771 );
772
773 projected
774}
775
776fn sort_rows(rows: &mut [Row], spec: &SortSpec) {
778 rows.sort_by(|a, b| {
779 for (col_idx, order) in &spec.columns {
780 let a_val = a.get(*col_idx);
781 let b_val = b.get(*col_idx);
782
783 let cmp = match (a_val, b_val) {
784 (Some(av), Some(bv)) => av.compare(bv).unwrap_or(Ordering::Equal),
785 (None, None) => Ordering::Equal,
786 (None, Some(_)) => Ordering::Less,
787 (Some(_), None) => Ordering::Greater,
788 };
789
790 if cmp != Ordering::Equal {
791 return match order {
792 ScanOrder::Ascending => cmp,
793 ScanOrder::Descending => cmp.reverse(),
794 };
795 }
796 }
797 Ordering::Equal
798 });
799}
800
801#[allow(clippy::too_many_arguments)]
803fn evaluate_join_conditions(row: &[Value], conditions: &[crate::plan::JoinCondition]) -> bool {
807 use crate::plan::JoinOp;
808
809 conditions.iter().all(|cond| {
810 let left_val = row.get(cond.left_col_idx);
812 let right_val = row.get(cond.right_col_idx);
813
814 if left_val.is_none() || right_val.is_none() {
816 return false;
817 }
818
819 let left_val = left_val.unwrap();
820 let right_val = right_val.unwrap();
821
822 match cond.op {
824 JoinOp::Eq => left_val == right_val,
825 JoinOp::Lt => left_val.compare(right_val) == Some(std::cmp::Ordering::Less),
826 JoinOp::Le => matches!(
827 left_val.compare(right_val),
828 Some(std::cmp::Ordering::Less | std::cmp::Ordering::Equal)
829 ),
830 JoinOp::Gt => left_val.compare(right_val) == Some(std::cmp::Ordering::Greater),
831 JoinOp::Ge => matches!(
832 left_val.compare(right_val),
833 Some(std::cmp::Ordering::Greater | std::cmp::Ordering::Equal)
834 ),
835 }
836 })
837}
838
839fn execute_join<S: ProjectionStore>(
840 store: &mut S,
841 join_type: &crate::parser::JoinType,
842 left: &QueryPlan,
843 right: &QueryPlan,
844 on_conditions: &[crate::plan::JoinCondition],
845 _columns: &[usize],
846 column_names: &[ColumnName],
847 position: Option<Offset>,
848) -> Result<QueryResult> {
849 let left_metadata = left.metadata().ok_or_else(|| {
851 QueryError::UnsupportedFeature("JOIN child plan missing metadata".to_string())
852 })?;
853 let right_metadata = right.metadata().ok_or_else(|| {
854 QueryError::UnsupportedFeature("JOIN child plan missing metadata".to_string())
855 })?;
856
857 let left_table_def = TableDef {
859 table_id: left_metadata.table_id,
860 columns: left_metadata.columns.clone(),
861 primary_key: left_metadata.primary_key.clone(),
862 indexes: vec![], };
864 let right_table_def = TableDef {
865 table_id: right_metadata.table_id,
866 columns: right_metadata.columns.clone(),
867 primary_key: right_metadata.primary_key.clone(),
868 indexes: vec![], };
870
871 let left_result = execute_internal(store, left, &left_table_def, position)?;
873 let right_result = execute_internal(store, right, &right_table_def, position)?;
874
875 let mut output_rows = Vec::new();
876
877 match join_type {
878 crate::parser::JoinType::Inner => {
879 for left_row in &left_result.rows {
881 for right_row in &right_result.rows {
882 let combined_row: Vec<Value> =
884 left_row.iter().chain(right_row.iter()).cloned().collect();
885
886 if evaluate_join_conditions(&combined_row, on_conditions) {
888 output_rows.push(combined_row);
889 kimberlite_properties::sometimes!(
890 output_rows.len() > MAX_JOIN_OUTPUT_ROWS,
891 "query.join_output_row_cap_hit",
892 "INNER JOIN output hits MAX_JOIN_OUTPUT_ROWS (1M) cap"
893 );
894 if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
895 return Err(QueryError::UnsupportedFeature(format!(
896 "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
897 )));
898 }
899 }
900 }
901 }
902 }
903 crate::parser::JoinType::Left => {
904 for left_row in &left_result.rows {
906 let mut matched = false;
907 for right_row in &right_result.rows {
908 let combined_row: Vec<Value> =
910 left_row.iter().chain(right_row.iter()).cloned().collect();
911
912 if evaluate_join_conditions(&combined_row, on_conditions) {
914 output_rows.push(combined_row);
915 matched = true;
916 kimberlite_properties::sometimes!(
917 output_rows.len() > MAX_JOIN_OUTPUT_ROWS,
918 "query.left_join_output_row_cap_hit",
919 "LEFT JOIN output hits MAX_JOIN_OUTPUT_ROWS (1M) cap"
920 );
921 if output_rows.len() > MAX_JOIN_OUTPUT_ROWS {
922 return Err(QueryError::UnsupportedFeature(format!(
923 "JOIN output exceeds maximum of {MAX_JOIN_OUTPUT_ROWS} rows — add a more selective filter"
924 )));
925 }
926 }
927 }
928
929 if !matched {
931 let right_nulls = vec![Value::Null; right_result.columns.len()];
932 let combined_row: Vec<Value> = left_row
933 .iter()
934 .cloned()
935 .chain(right_nulls.into_iter())
936 .collect();
937 output_rows.push(combined_row);
938 }
939 }
940 }
941 }
942
943 kimberlite_properties::sometimes!(
944 output_rows.len() > 1,
945 "query.join_produces_multi_row_output",
946 "join execution produces more than one output row"
947 );
948
949 Ok(QueryResult {
950 columns: column_names.to_vec(),
951 rows: output_rows,
952 })
953}
954
955fn execute_aggregate<S: ProjectionStore>(
957 store: &mut S,
958 source: &QueryPlan,
959 group_by_cols: &[usize],
960 aggregates: &[crate::parser::AggregateFunction],
961 column_names: &[ColumnName],
962 metadata: &crate::plan::TableMetadata,
963 having: &[crate::parser::HavingCondition],
964 position: Option<Offset>,
965) -> Result<QueryResult> {
966 use std::collections::HashMap;
967
968 let dummy_table_def = TableDef {
971 table_id: metadata.table_id,
972 columns: metadata.columns.clone(),
973 primary_key: metadata.primary_key.clone(),
974 indexes: vec![],
975 };
976 let source_result = execute_internal(store, source, &dummy_table_def, position)?;
977
978 let mut groups: HashMap<Vec<Value>, AggregateState> = HashMap::new();
980
981 for row in source_result.rows {
982 let group_key: Vec<Value> = if group_by_cols.is_empty() {
984 vec![]
986 } else {
987 group_by_cols
988 .iter()
989 .map(|&idx| row.get(idx).cloned().unwrap_or(Value::Null))
990 .collect()
991 };
992
993 kimberlite_properties::sometimes!(
995 !groups.contains_key(&group_key) && groups.len() >= MAX_GROUP_COUNT,
996 "query.group_by_cardinality_cap_hit",
997 "GROUP BY hits MAX_GROUP_COUNT (100k) distinct group cap"
998 );
999 if !groups.contains_key(&group_key) && groups.len() >= MAX_GROUP_COUNT {
1000 return Err(QueryError::UnsupportedFeature(format!(
1001 "GROUP BY cardinality exceeds maximum of {MAX_GROUP_COUNT} distinct groups"
1002 )));
1003 }
1004
1005 let state = groups.entry(group_key).or_insert_with(AggregateState::new);
1007 state.update(&row, aggregates, metadata)?;
1008 }
1009
1010 let group_by_count = group_by_cols.len();
1012 let mut result_rows = Vec::new();
1013 for (group_key, state) in groups {
1014 let agg_values = state.finalize(aggregates);
1015
1016 if !having.is_empty() && !evaluate_having(having, aggregates, &agg_values, group_by_count) {
1018 continue;
1019 }
1020
1021 let mut result_row = group_key; result_row.extend(agg_values); result_rows.push(result_row);
1024 }
1025
1026 if result_rows.is_empty() && group_by_cols.is_empty() && having.is_empty() {
1028 let state = AggregateState::new();
1029 let agg_values = state.finalize(aggregates);
1030 result_rows.push(agg_values);
1031 }
1032
1033 Ok(QueryResult {
1034 columns: column_names.to_vec(),
1035 rows: result_rows,
1036 })
1037}
1038
1039fn evaluate_having(
1043 having: &[crate::parser::HavingCondition],
1044 aggregates: &[crate::parser::AggregateFunction],
1045 agg_values: &[Value],
1046 _group_by_count: usize,
1047) -> bool {
1048 having.iter().all(|condition| match condition {
1049 crate::parser::HavingCondition::AggregateComparison {
1050 aggregate,
1051 op,
1052 value,
1053 } => {
1054 let agg_idx = aggregates.iter().position(|a| a == aggregate);
1056 let Some(idx) = agg_idx else {
1057 return false;
1058 };
1059 let Some(agg_value) = agg_values.get(idx) else {
1060 return false;
1061 };
1062
1063 match op {
1065 crate::parser::HavingOp::Eq => agg_value == value,
1066 crate::parser::HavingOp::Lt => agg_value.compare(value) == Some(Ordering::Less),
1067 crate::parser::HavingOp::Le => matches!(
1068 agg_value.compare(value),
1069 Some(Ordering::Less | Ordering::Equal)
1070 ),
1071 crate::parser::HavingOp::Gt => agg_value.compare(value) == Some(Ordering::Greater),
1072 crate::parser::HavingOp::Ge => matches!(
1073 agg_value.compare(value),
1074 Some(Ordering::Greater | Ordering::Equal)
1075 ),
1076 }
1077 }
1078 })
1079}
1080
1081#[derive(Debug, Clone)]
1083struct AggregateState {
1084 count: i64,
1085 non_null_counts: Vec<i64>, sums: Vec<Option<Value>>,
1087 mins: Vec<Option<Value>>,
1088 maxs: Vec<Option<Value>>,
1089}
1090
1091impl AggregateState {
1092 fn new() -> Self {
1093 Self {
1094 count: 0,
1095 non_null_counts: Vec::new(),
1096 sums: Vec::new(),
1097 mins: Vec::new(),
1098 maxs: Vec::new(),
1099 }
1100 }
1101
1102 fn update(
1103 &mut self,
1104 row: &[Value],
1105 aggregates: &[crate::parser::AggregateFunction],
1106 metadata: &crate::plan::TableMetadata,
1107 ) -> Result<()> {
1108 debug_assert!(!row.is_empty(), "row must have at least one column");
1110
1111 assert!(
1114 aggregates.len() <= MAX_AGGREGATES_PER_QUERY,
1115 "too many aggregates ({} > {})",
1116 aggregates.len(),
1117 MAX_AGGREGATES_PER_QUERY
1118 );
1119
1120 self.count += 1;
1121
1122 while self.sums.len() < aggregates.len() {
1124 self.non_null_counts.push(0);
1125 self.sums.push(None);
1126 self.mins.push(None);
1127 self.maxs.push(None);
1128 }
1129
1130 debug_assert_eq!(
1132 self.sums.len(),
1133 self.non_null_counts.len(),
1134 "aggregate state vectors out of sync"
1135 );
1136 debug_assert_eq!(self.sums.len(), self.mins.len());
1137 debug_assert_eq!(self.sums.len(), self.maxs.len());
1138
1139 let find_col_idx = |col: &ColumnName| -> usize {
1141 metadata
1142 .columns
1143 .iter()
1144 .position(|c| &c.name == col)
1145 .unwrap_or(0)
1146 };
1147
1148 for (i, agg) in aggregates.iter().enumerate() {
1149 match agg {
1150 crate::parser::AggregateFunction::CountStar => {
1151 }
1153 crate::parser::AggregateFunction::Count(col) => {
1154 let col_idx = find_col_idx(col);
1156 if let Some(val) = row.get(col_idx) {
1157 if !val.is_null() {
1158 self.non_null_counts[i] += 1;
1159 }
1160 }
1161 }
1162 crate::parser::AggregateFunction::Sum(col) => {
1163 let col_idx = find_col_idx(col);
1164 if let Some(val) = row.get(col_idx) {
1165 if !val.is_null() {
1166 self.sums[i] = Some(add_values(&self.sums[i], val)?);
1167 }
1168 }
1169 }
1170 crate::parser::AggregateFunction::Avg(col) => {
1171 let col_idx = find_col_idx(col);
1173 if let Some(val) = row.get(col_idx) {
1174 if !val.is_null() {
1175 self.sums[i] = Some(add_values(&self.sums[i], val)?);
1176 }
1177 }
1178 }
1179 crate::parser::AggregateFunction::Min(col) => {
1180 let col_idx = find_col_idx(col);
1181 if let Some(val) = row.get(col_idx) {
1182 if !val.is_null() {
1183 self.mins[i] = Some(min_value(&self.mins[i], val));
1184 }
1185 }
1186 }
1187 crate::parser::AggregateFunction::Max(col) => {
1188 let col_idx = find_col_idx(col);
1189 if let Some(val) = row.get(col_idx) {
1190 if !val.is_null() {
1191 self.maxs[i] = Some(max_value(&self.maxs[i], val));
1192 }
1193 }
1194 }
1195 }
1196 }
1197
1198 debug_assert_eq!(
1200 self.sums.len(),
1201 aggregates.len(),
1202 "aggregate state must match aggregate count after update"
1203 );
1204
1205 Ok(())
1206 }
1207
1208 fn finalize(&self, aggregates: &[crate::parser::AggregateFunction]) -> Vec<Value> {
1209 let mut result = Vec::new();
1210
1211 for (i, agg) in aggregates.iter().enumerate() {
1212 let value = match agg {
1213 crate::parser::AggregateFunction::CountStar => Value::BigInt(self.count),
1214 crate::parser::AggregateFunction::Count(_) => {
1215 Value::BigInt(self.non_null_counts.get(i).copied().unwrap_or(0))
1217 }
1218 crate::parser::AggregateFunction::Sum(_) => self
1219 .sums
1220 .get(i)
1221 .and_then(std::clone::Clone::clone)
1222 .unwrap_or(Value::Null),
1223 crate::parser::AggregateFunction::Avg(_) => {
1224 if self.count == 0 {
1226 Value::Null
1227 } else {
1228 kimberlite_properties::never!(
1231 self.count == 0,
1232 "query.avg_divide_by_zero",
1233 "AVG divide_value must never be reached with count == 0"
1234 );
1235 match self.sums.get(i).and_then(|v| v.as_ref()) {
1236 Some(sum) => divide_value(sum, self.count).unwrap_or(Value::Null),
1237 None => Value::Null,
1238 }
1239 }
1240 }
1241 crate::parser::AggregateFunction::Min(_) => self
1242 .mins
1243 .get(i)
1244 .and_then(std::clone::Clone::clone)
1245 .unwrap_or(Value::Null),
1246 crate::parser::AggregateFunction::Max(_) => self
1247 .maxs
1248 .get(i)
1249 .and_then(std::clone::Clone::clone)
1250 .unwrap_or(Value::Null),
1251 };
1252 result.push(value);
1253 }
1254
1255 result
1256 }
1257}
1258
1259fn add_values(a: &Option<Value>, b: &Value) -> Result<Value> {
1264 match a {
1265 None => Ok(b.clone()),
1266 Some(a_val) => match (a_val, b) {
1267 (Value::BigInt(x), Value::BigInt(y)) => {
1268 let checked = x.checked_add(*y);
1269 kimberlite_properties::sometimes!(
1273 checked.is_none(),
1274 "query.sum_bigint_overflow_detected",
1275 "SUM(BIGINT) overflow detected by checked_add"
1276 );
1277 if let Some(sum) = checked {
1278 kimberlite_properties::never!(
1281 sum != x.wrapping_add(*y) || (*x > 0 && *y > 0 && sum < 0)
1282 || (*x < 0 && *y < 0 && sum > 0),
1283 "query.sum_bigint_silent_wrap",
1284 "SUM(BIGINT) checked_add returned Some() for an overflowing result"
1285 );
1286 Ok(Value::BigInt(sum))
1287 } else {
1288 Err(QueryError::TypeMismatch {
1289 expected: "BigInt (non-overflowing)".to_string(),
1290 actual: format!("overflow: {x} + {y}"),
1291 })
1292 }
1293 }
1294 (Value::Integer(x), Value::Integer(y)) => x
1295 .checked_add(*y)
1296 .map(Value::Integer)
1297 .ok_or_else(|| QueryError::TypeMismatch {
1298 expected: "Integer (non-overflowing)".to_string(),
1299 actual: format!("overflow: {x} + {y}"),
1300 }),
1301 (Value::SmallInt(x), Value::SmallInt(y)) => x
1302 .checked_add(*y)
1303 .map(Value::SmallInt)
1304 .ok_or_else(|| QueryError::TypeMismatch {
1305 expected: "SmallInt (non-overflowing)".to_string(),
1306 actual: format!("overflow: {x} + {y}"),
1307 }),
1308 (Value::TinyInt(x), Value::TinyInt(y)) => x
1309 .checked_add(*y)
1310 .map(Value::TinyInt)
1311 .ok_or_else(|| QueryError::TypeMismatch {
1312 expected: "TinyInt (non-overflowing)".to_string(),
1313 actual: format!("overflow: {x} + {y}"),
1314 }),
1315 (Value::Real(x), Value::Real(y)) => Ok(Value::Real(x + y)),
1316 (Value::Decimal(x, sx), Value::Decimal(y, sy)) if sx == sy => x
1317 .checked_add(*y)
1318 .map(|sum| Value::Decimal(sum, *sx))
1319 .ok_or_else(|| QueryError::TypeMismatch {
1320 expected: "Decimal (non-overflowing)".to_string(),
1321 actual: format!("overflow: {x} + {y}"),
1322 }),
1323 _ => Err(QueryError::TypeMismatch {
1324 expected: format!("{a_val:?}"),
1325 actual: format!("{b:?}"),
1326 }),
1327 },
1328 }
1329}
1330
1331fn min_value(a: &Option<Value>, b: &Value) -> Value {
1333 match a {
1334 None => b.clone(),
1335 Some(a_val) => {
1336 if let Some(ord) = a_val.compare(b) {
1337 if ord == Ordering::Less {
1338 a_val.clone()
1339 } else {
1340 b.clone()
1341 }
1342 } else {
1343 a_val.clone() }
1345 }
1346 }
1347}
1348
1349fn max_value(a: &Option<Value>, b: &Value) -> Value {
1351 match a {
1352 None => b.clone(),
1353 Some(a_val) => {
1354 if let Some(ord) = a_val.compare(b) {
1355 if ord == Ordering::Greater {
1356 a_val.clone()
1357 } else {
1358 b.clone()
1359 }
1360 } else {
1361 a_val.clone() }
1363 }
1364 }
1365}
1366
1367#[allow(clippy::cast_precision_loss)]
1372fn divide_value(val: &Value, count: i64) -> Option<Value> {
1373 if count == 0 {
1375 return Some(Value::Null);
1376 }
1377
1378 match val {
1379 Value::BigInt(x) => Some(Value::Real(*x as f64 / count as f64)),
1380 Value::Integer(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1381 Value::SmallInt(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1382 Value::TinyInt(x) => Some(Value::Real(f64::from(*x) / count as f64)),
1383 Value::Real(x) => Some(Value::Real(x / count as f64)),
1384 Value::Decimal(x, scale) => {
1385 let divisor = 10_i128.pow(u32::from(*scale));
1387 let float_val = *x as f64 / divisor as f64;
1388 Some(Value::Real(float_val / count as f64))
1389 }
1390 _ => None,
1391 }
1392}
1393
1394#[cfg(test)]
1395mod tests {
1396 use super::*;
1397 use crate::plan::Filter;
1398 use crate::plan::FilterCondition;
1399 use crate::plan::FilterOp;
1400
1401 #[test]
1402 fn test_project_row() {
1403 let row = vec![
1404 Value::BigInt(1),
1405 Value::Text("alice".to_string()),
1406 Value::BigInt(30),
1407 ];
1408
1409 let projected = project_row(&row, &[0, 2]);
1410 assert_eq!(projected, vec![Value::BigInt(1), Value::BigInt(30)]);
1411 }
1412
1413 #[test]
1414 fn test_project_row_all() {
1415 let row = vec![Value::BigInt(1), Value::Text("bob".to_string())];
1416 let projected = project_row(&row, &[]);
1417 assert_eq!(projected, row);
1418 }
1419
1420 #[test]
1421 fn test_filter_matches() {
1422 let row = vec![Value::BigInt(42), Value::Text("alice".to_string())];
1423
1424 let filter = Filter::single(FilterCondition {
1425 column_idx: 0,
1426 op: FilterOp::Eq,
1427 value: Value::BigInt(42),
1428 });
1429
1430 assert!(filter.matches(&row));
1431
1432 let filter_miss = Filter::single(FilterCondition {
1433 column_idx: 0,
1434 op: FilterOp::Eq,
1435 value: Value::BigInt(99),
1436 });
1437
1438 assert!(!filter_miss.matches(&row));
1439 }
1440
1441 #[test]
1442 fn test_sort_rows() {
1443 let mut rows = vec![
1444 vec![Value::BigInt(3), Value::Text("c".to_string())],
1445 vec![Value::BigInt(1), Value::Text("a".to_string())],
1446 vec![Value::BigInt(2), Value::Text("b".to_string())],
1447 ];
1448
1449 let spec = SortSpec {
1450 columns: vec![(0, ScanOrder::Ascending)],
1451 };
1452
1453 sort_rows(&mut rows, &spec);
1454
1455 assert_eq!(rows[0][0], Value::BigInt(1));
1456 assert_eq!(rows[1][0], Value::BigInt(2));
1457 assert_eq!(rows[2][0], Value::BigInt(3));
1458 }
1459
1460 #[test]
1461 fn test_sort_rows_descending() {
1462 let mut rows = vec![
1463 vec![Value::BigInt(1)],
1464 vec![Value::BigInt(3)],
1465 vec![Value::BigInt(2)],
1466 ];
1467
1468 let spec = SortSpec {
1469 columns: vec![(0, ScanOrder::Descending)],
1470 };
1471
1472 sort_rows(&mut rows, &spec);
1473
1474 assert_eq!(rows[0][0], Value::BigInt(3));
1475 assert_eq!(rows[1][0], Value::BigInt(2));
1476 assert_eq!(rows[2][0], Value::BigInt(1));
1477 }
1478}