1use alloc::borrow::Cow;
24use alloc::boxed::Box;
25use alloc::collections::BTreeSet;
26use alloc::format;
27use alloc::string::{String, ToString};
28use alloc::vec::Vec;
29
30use spg_sql::ast::{Expr, SelectItem, SelectStatement};
31use spg_storage::{ColumnSchema, DataType, Row, Value};
32
33use crate::eval::{self, EvalContext, EvalError};
34use crate::join::RowRef;
35
36pub fn uses_aggregate(stmt: &SelectStatement) -> bool {
38 if stmt.group_by.is_some() || stmt.having.is_some() {
39 return true;
40 }
41 for item in &stmt.items {
42 if let SelectItem::Expr { expr, .. } = item
43 && contains_aggregate(expr)
44 {
45 return true;
46 }
47 }
48 for o in &stmt.order_by {
49 if contains_aggregate(&o.expr) {
50 return true;
51 }
52 }
53 if let Some(h) = &stmt.having
54 && contains_aggregate(h)
55 {
56 return true;
57 }
58 false
59}
60
61pub fn contains_aggregate(e: &Expr) -> bool {
62 match e {
63 Expr::FunctionCall { name, args } => {
64 is_aggregate_name(name) || args.iter().any(contains_aggregate)
65 }
66 Expr::AggregateOrdered { .. } => true,
67 Expr::Binary { lhs, rhs, .. } => contains_aggregate(lhs) || contains_aggregate(rhs),
68 Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
69 contains_aggregate(expr)
70 }
71 Expr::Like { expr, pattern, .. } => contains_aggregate(expr) || contains_aggregate(pattern),
72 Expr::Extract { source, .. } => contains_aggregate(source),
73 Expr::ScalarSubquery(_)
78 | Expr::Exists { .. }
79 | Expr::InSubquery { .. }
80 | Expr::WindowFunction { .. }
81 | Expr::Literal(_)
82 | Expr::Placeholder(_)
83 | Expr::Column(_) => false,
84 Expr::Array(items) => items.iter().any(contains_aggregate),
88 Expr::ArraySubscript { target, index } => {
89 contains_aggregate(target) || contains_aggregate(index)
90 }
91 Expr::AnyAll { expr, array, .. } => contains_aggregate(expr) || contains_aggregate(array),
92 Expr::InList { expr, list, .. } => {
93 contains_aggregate(expr) || list.iter().any(contains_aggregate)
94 }
95 Expr::Case {
98 operand,
99 branches,
100 else_branch,
101 } => {
102 operand.as_deref().is_some_and(contains_aggregate)
103 || branches
104 .iter()
105 .any(|(w, t)| contains_aggregate(w) || contains_aggregate(t))
106 || else_branch.as_deref().is_some_and(contains_aggregate)
107 }
108 }
109}
110
111pub fn is_aggregate_name(name: &str) -> bool {
112 matches!(
113 name.to_ascii_lowercase().as_str(),
114 "count"
115 | "count_star"
116 | "sum"
117 | "min"
118 | "max"
119 | "avg"
120 | "string_agg"
125 | "array_agg"
126 | "bool_and"
129 | "bool_or"
130 | "every"
131 | "stddev" | "stddev_samp" | "stddev_pop"
134 | "variance" | "var_samp" | "var_pop"
135 | "bit_and" | "bit_or" | "bit_xor"
137 | "percentile_cont" | "percentile_disc" | "mode"
140 | "rank" | "dense_rank" | "percent_rank" | "cume_dist"
143 | "covar_pop" | "covar_samp" | "corr"
145 | "regr_count" | "regr_avgx" | "regr_avgy" | "regr_slope"
146 | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy"
147 | "json_agg" | "jsonb_agg" | "json_object_agg" | "jsonb_object_agg"
149 )
150}
151
152fn is_regression_name(name: &str) -> bool {
154 matches!(
155 name,
156 "covar_pop"
157 | "covar_samp"
158 | "corr"
159 | "regr_count"
160 | "regr_avgx"
161 | "regr_avgy"
162 | "regr_slope"
163 | "regr_intercept"
164 | "regr_r2"
165 | "regr_sxx"
166 | "regr_syy"
167 | "regr_sxy"
168 )
169}
170
171fn agg_uses_second_arg(name: &str) -> bool {
175 name == "string_agg"
176 || name == "json_object_agg"
177 || name == "jsonb_object_agg"
178 || is_regression_name(name)
179}
180
181pub fn is_ordered_set_name(name: &str) -> bool {
186 ["percentile_cont", "percentile_disc", "mode"]
191 .iter()
192 .any(|k| name.eq_ignore_ascii_case(k))
193}
194
195pub fn is_hypothetical_set_name(name: &str) -> bool {
200 ["rank", "dense_rank", "percent_rank", "cume_dist"]
201 .iter()
202 .any(|k| name.eq_ignore_ascii_case(k))
203}
204
205pub fn is_within_group_name(name: &str) -> bool {
208 is_ordered_set_name(name) || is_hypothetical_set_name(name)
209}
210
211#[derive(Debug, Default, Clone)]
213struct AggState {
214 count: i64,
215 sum_int: i64,
216 sum_float: f64,
217 extreme: Option<Value>,
218 use_float: bool,
219 items: Vec<Value>,
226 seen: BTreeSet<String>,
230 item_keys: Vec<Vec<Value>>,
234 separator: Option<String>,
240 bool_acc: Option<bool>,
244 sum_sq: f64,
247 bit_acc: Option<i64>,
250 reg_n: i64,
255 reg_sx: f64,
256 reg_sy: f64,
257 reg_sxx: f64,
258 reg_syy: f64,
259 reg_sxy: f64,
260 aux_items: Vec<Value>,
263 first_best: Option<(Vec<Value>, Value)>,
269}
270
271#[derive(Debug, Clone)]
272struct AggSpec {
273 name: String, arg: Option<Expr>,
277 arg2: Option<Expr>,
283 distinct: bool,
286 order_by: Vec<spg_sql::ast::OrderBy>,
292 filter: Option<Expr>,
297 direct_arg: Option<Expr>,
302 first_ordered: bool,
309}
310
311#[derive(Debug)]
314pub struct AggResult {
315 pub columns: Vec<ColumnSchema>,
316 pub rows: Vec<Row>,
317 pub deferred: Vec<(usize, Expr)>,
325 pub synth_rows: Vec<Row>,
328 pub synth_schema: Vec<ColumnSchema>,
330}
331
332#[allow(clippy::too_many_lines)]
335pub type CorrelatedEval<'a> = &'a dyn Fn(&Expr, &Row, &EvalContext<'_>) -> Result<Value, EvalError>;
342
343struct Projection {
348 columns: Vec<ColumnSchema>,
349 out_rows: Vec<Row>,
350 kept_synth: Vec<Row>,
351 deferred: Vec<(usize, Expr)>,
352 order_rewritten: Vec<Expr>,
353}
354
355pub(crate) fn run(
356 stmt: &SelectStatement,
357 rows: &[RowRef<'_>],
358 schema_cols: &[ColumnSchema],
359 table_alias: Option<&str>,
360 correlated_eval: Option<CorrelatedEval<'_>>,
361) -> Result<AggResult, EvalError> {
362 let group_exprs: Vec<Expr> = stmt.group_by.clone().unwrap_or_default();
363
364 let mut agg_specs: Vec<AggSpec> = Vec::new();
366 for item in &stmt.items {
367 if let SelectItem::Expr { expr, .. } = item {
368 collect_aggregates(expr, &mut agg_specs);
369 }
370 }
371 for o in &stmt.order_by {
372 collect_aggregates(&o.expr, &mut agg_specs);
373 }
374 if let Some(h) = &stmt.having {
375 collect_aggregates(h, &mut agg_specs);
376 }
377 validate_agg_arities(stmt, &agg_specs)?;
383 validate_within_group(&agg_specs)?;
384
385 let order = accumulate_groups(
387 rows,
388 &group_exprs,
389 &agg_specs,
390 schema_cols,
391 table_alias,
392 correlated_eval,
393 )?;
394
395 let synth_schema =
397 build_synth_schema(rows, &group_exprs, &agg_specs, schema_cols, table_alias)?;
398 let synth_rows = finalize_synth_rows(
399 &order,
400 &agg_specs,
401 &synth_schema,
402 rows,
403 schema_cols,
404 table_alias,
405 )?;
406
407 let Projection {
409 columns,
410 mut out_rows,
411 mut kept_synth,
412 deferred,
413 order_rewritten,
414 } = project_groups(
415 synth_rows,
416 stmt,
417 &group_exprs,
418 &agg_specs,
419 &synth_schema,
420 correlated_eval,
421 )?;
422
423 if !stmt.order_by.is_empty() {
425 let (sorted_synth, sorted_out) = sort_synth_by_order_by(
426 &synth_schema,
427 &stmt.order_by,
428 &order_rewritten,
429 kept_synth,
430 out_rows,
431 correlated_eval,
432 )?;
433 kept_synth = sorted_synth;
434 out_rows = sorted_out;
435 }
436
437 let (synth_rows_out, synth_schema_out) = if deferred.is_empty() {
438 (Vec::new(), Vec::new())
439 } else {
440 (kept_synth, synth_schema.clone())
441 };
442 Ok(AggResult {
443 columns,
444 rows: out_rows,
445 deferred,
446 synth_rows: synth_rows_out,
447 synth_schema: synth_schema_out,
448 })
449}
450
451fn validate_within_group(agg_specs: &[AggSpec]) -> Result<(), EvalError> {
456 for spec in agg_specs {
460 if is_within_group_name(&spec.name) {
461 if spec.order_by.is_empty() {
462 return Err(EvalError::TypeMismatch {
463 detail: format!("{}() requires WITHIN GROUP (ORDER BY …)", spec.name),
464 });
465 }
466 if spec.name != "mode" && spec.direct_arg.is_none() {
470 return Err(EvalError::TypeMismatch {
471 detail: format!("{}() requires a direct argument", spec.name),
472 });
473 }
474 if spec.order_by.len() > 1 {
478 return Err(EvalError::TypeMismatch {
479 detail: format!(
480 "{}() with multiple WITHIN GROUP sort keys is not supported yet",
481 spec.name
482 ),
483 });
484 }
485 }
486 }
487 Ok(())
488}
489
490#[allow(clippy::too_many_lines, clippy::type_complexity)]
494fn accumulate_groups(
495 rows: &[RowRef<'_>],
496 group_exprs: &[Expr],
497 agg_specs: &[AggSpec],
498 schema_cols: &[ColumnSchema],
499 table_alias: Option<&str>,
500 correlated_eval: Option<CorrelatedEval<'_>>,
501) -> Result<Vec<(Vec<Value>, Vec<AggState>)>, EvalError> {
502 let ctx = EvalContext::new(schema_cols, table_alias);
503 let mut order: Vec<(Vec<Value>, Vec<AggState>)> = Vec::new();
510 let mut groups: hashbrown::HashMap<String, usize> = hashbrown::HashMap::new();
511 if rows.is_empty() && group_exprs.is_empty() {
514 let init: Vec<AggState> = (0..agg_specs.len()).map(|_| AggState::default()).collect();
517 order.push((Vec::new(), init));
518 }
519
520 let col_pos = |e: &Expr| -> Option<usize> {
531 if let Expr::Column(c) = e
534 && c.qualifier.is_some()
535 {
536 eval::find_column_pos(c, &ctx)
537 } else {
538 None
539 }
540 };
541 let group_pos: Vec<Option<usize>> = group_exprs.iter().map(col_pos).collect();
542 let all_groups_bound = group_pos.iter().all(Option::is_some);
543 let arg_pos: Vec<Option<usize>> = agg_specs
544 .iter()
545 .map(|spec| spec.arg.as_ref().and_then(|e| col_pos(e)))
546 .collect();
547 let order_pos: Vec<Vec<Option<usize>>> = agg_specs
554 .iter()
555 .map(|spec| spec.order_by.iter().map(|o| col_pos(&o.expr)).collect())
556 .collect();
557 let needs_mat = agg_specs.iter().enumerate().any(|(i, s)| {
562 s.filter.is_some()
563 || (s.arg.is_some() && arg_pos[i].is_none())
564 || s.arg2.is_some()
565 || order_pos[i].iter().any(Option::is_none)
566 });
567 let ci_positions: Vec<usize> = group_exprs
568 .iter()
569 .enumerate()
570 .filter(|(_, g)| {
571 matches!(
572 eval::column_collation(g, &ctx),
573 Some(spg_storage::Collation::CaseInsensitive)
574 )
575 })
576 .map(|(i, _)| i)
577 .collect();
578 let mut keybuf_s = String::new();
583 let mut dkeybuf = String::new();
584 let mut refs: Vec<&Value> = Vec::with_capacity(group_pos.len());
585 let any_agg_subquery = correlated_eval.is_some()
599 && agg_specs.iter().any(|s| {
600 s.filter
601 .as_ref()
602 .is_some_and(|e| crate::expr_has_subquery(e))
603 || s.arg.as_ref().is_some_and(|e| crate::expr_has_subquery(e))
604 || s.arg2.as_ref().is_some_and(|e| crate::expr_has_subquery(e))
605 || s.order_by.iter().any(|o| crate::expr_has_subquery(&o.expr))
606 });
607 let eval_arg = |e: &Expr, r: &Row, c: &EvalContext<'_>| -> Result<Value, EvalError> {
608 match correlated_eval {
609 Some(f) if any_agg_subquery && crate::expr_has_subquery(e) => f(e, r, c),
610 _ => eval::eval_expr(e, r, c),
611 }
612 };
613 for row in rows {
614 if all_groups_bound && ci_positions.is_empty() && !group_exprs.is_empty() {
618 refs.clear();
619 refs.extend(
620 group_pos
621 .iter()
622 .map(|p| row.get(p.unwrap()).unwrap_or(&Value::Null)),
623 );
624 encode_key_refs_into(&refs, &mut keybuf_s);
625 let idx = match groups.get(keybuf_s.as_str()) {
626 Some(&i) => i,
627 None => {
628 let i = order.len();
629 let init: Vec<AggState> =
630 (0..agg_specs.len()).map(|_| AggState::default()).collect();
631 let owned: Vec<Value> = refs.iter().map(|v| (*v).clone()).collect();
632 order.push((owned, init));
633 groups.insert(keybuf_s.clone(), i);
634 i
635 }
636 };
637 let entry = &mut order[idx];
638 let mat: Option<Cow<'_, Row>> = if needs_mat { Some(row.as_row()) } else { None };
646 for (i, spec) in agg_specs.iter().enumerate() {
647 if let Some(f) = &spec.filter
651 && !matches!(
652 eval_arg(f, mat.as_deref().expect("needs_mat for FILTER"), &ctx)?,
653 Value::Bool(true)
654 )
655 {
656 continue;
657 }
658 let arg_owned: Value;
659 let arg_ref: &Value = match (&arg_pos[i], &spec.arg) {
660 (Some(p), _) => row.get(*p).unwrap_or(&Value::Null),
661 (None, None) => {
662 arg_owned = Value::Bool(true);
663 &arg_owned
664 }
665 (None, Some(e)) => {
666 arg_owned = eval_arg(
667 e,
668 mat.as_deref().expect("needs_mat for non-bound arg"),
669 &ctx,
670 )?;
671 &arg_owned
672 }
673 };
674 let arg2_val = match &spec.arg2 {
675 None => None,
676 Some(e) => Some(eval_arg(
677 e,
678 mat.as_deref().expect("needs_mat for arg2"),
679 &ctx,
680 )?),
681 };
682 let order_keys = if spec.order_by.is_empty() {
683 None
684 } else {
685 let mut keys = Vec::with_capacity(spec.order_by.len());
686 for (k, o) in spec.order_by.iter().enumerate() {
687 keys.push(match order_pos[i][k] {
690 Some(p) => row.get(p).cloned().unwrap_or(Value::Null),
691 None => eval_arg(
692 &o.expr,
693 mat.as_deref().expect("needs_mat for non-bound ORDER key"),
694 &ctx,
695 )?,
696 });
697 }
698 Some(keys)
699 };
700 if spec.first_ordered {
705 if let Some(keys) = order_keys {
706 let st = &mut entry.1[i];
707 let better = match &st.first_best {
708 None => true,
709 Some((bk, _)) => {
710 cmp_order_keys(&spec.order_by, &keys, bk)
711 == core::cmp::Ordering::Less
712 }
713 };
714 if better {
715 st.first_best = Some((keys, arg_ref.clone()));
716 }
717 }
718 continue;
719 }
720 if spec.distinct {
721 encode_key_refs_into(core::slice::from_ref(&arg_ref), &mut dkeybuf);
722 if entry.1[i].seen.contains(dkeybuf.as_str()) {
723 continue;
724 }
725 entry.1[i].seen.insert(dkeybuf.clone());
726 }
727 update_state(
728 &mut entry.1[i],
729 &spec.name,
730 arg_ref,
731 arg2_val.as_ref(),
732 order_keys,
733 )?;
734 }
735 continue;
736 }
737 let row_materialised = row.as_row();
742 let row: &Row = &row_materialised;
743 let group_vals: Vec<Value> = group_exprs
744 .iter()
745 .map(|g| eval::eval_expr(g, row, &ctx))
746 .collect::<Result<_, _>>()?;
747 let key = if ci_positions.is_empty() {
751 encode_key(&group_vals)
752 } else {
753 let mut key_vals = group_vals.clone();
754 for &i in &ci_positions {
755 if let Value::Text(s) = &key_vals[i] {
756 key_vals[i] = Value::Text(s.to_ascii_lowercase());
757 }
758 }
759 encode_key(&key_vals)
760 };
761 let idx = match groups.get(key.as_str()) {
763 Some(&i) => i,
764 None => {
765 let i = order.len();
766 let init: Vec<AggState> =
767 (0..agg_specs.len()).map(|_| AggState::default()).collect();
768 order.push((group_vals.clone(), init));
769 groups.insert(key, i);
770 i
771 }
772 };
773 let entry = &mut order[idx];
774 for (i, spec) in agg_specs.iter().enumerate() {
775 if let Some(f) = &spec.filter
778 && !matches!(eval_arg(f, row, &ctx)?, Value::Bool(true))
779 {
780 continue;
781 }
782 let arg_val = match &spec.arg {
783 None => Value::Bool(true), Some(e) => eval_arg(e, row, &ctx)?,
785 };
786 let arg2_val = match &spec.arg2 {
792 None => None,
793 Some(e) => Some(eval_arg(e, row, &ctx)?),
794 };
795 let order_keys = if spec.order_by.is_empty() {
798 None
799 } else {
800 let mut keys = Vec::with_capacity(spec.order_by.len());
801 for o in &spec.order_by {
802 keys.push(eval_arg(&o.expr, row, &ctx)?);
803 }
804 Some(keys)
805 };
806 if spec.first_ordered {
809 if let Some(keys) = order_keys {
810 let st = &mut entry.1[i];
811 let better = match &st.first_best {
812 None => true,
813 Some((bk, _)) => {
814 cmp_order_keys(&spec.order_by, &keys, bk) == core::cmp::Ordering::Less
815 }
816 };
817 if better {
818 st.first_best = Some((keys, arg_val.clone()));
819 }
820 }
821 continue;
822 }
823 if spec.distinct {
828 let key = encode_key(core::slice::from_ref(&arg_val));
829 if !entry.1[i].seen.insert(key) {
830 continue;
831 }
832 }
833 update_state(
834 &mut entry.1[i],
835 &spec.name,
836 &arg_val,
837 arg2_val.as_ref(),
838 order_keys,
839 )?;
840 }
841 }
842 Ok(order)
843}
844
845fn build_synth_schema(
849 rows: &[RowRef<'_>],
850 group_exprs: &[Expr],
851 agg_specs: &[AggSpec],
852 schema_cols: &[ColumnSchema],
853 table_alias: Option<&str>,
854) -> Result<Vec<ColumnSchema>, EvalError> {
855 let ctx = EvalContext::new(schema_cols, table_alias);
856 let group_types: Vec<DataType> = if rows.is_empty() {
858 group_exprs.iter().map(|_| DataType::Text).collect()
861 } else {
862 let probe_row = rows[0].as_row();
863 let probe: &Row = &probe_row;
864 group_exprs
865 .iter()
866 .map(|g| {
867 eval::eval_expr(g, probe, &ctx).map(|v| v.data_type().unwrap_or(DataType::Text))
868 })
869 .collect::<Result<_, _>>()?
870 };
871 let agg_types: Vec<DataType> = agg_specs
872 .iter()
873 .map(|spec| infer_agg_type(spec, schema_cols))
874 .collect();
875 let mut synth_schema: Vec<ColumnSchema> = Vec::new();
876 for (i, ty) in group_types.iter().enumerate() {
877 synth_schema.push(ColumnSchema::new(format!("__grp_{i}"), *ty, true));
878 }
879 for (i, ty) in agg_types.iter().enumerate() {
880 synth_schema.push(ColumnSchema::new(format!("__agg_{i}"), *ty, true));
881 }
882 Ok(synth_schema)
883}
884
885fn cmp_order_keys(
894 order_by: &[spg_sql::ast::OrderBy],
895 a: &[Value],
896 b: &[Value],
897) -> core::cmp::Ordering {
898 for (k, o) in order_by.iter().enumerate() {
899 let cmp = crate::order_by_value_cmp(o.desc, o.nulls_first, &a[k], &b[k]);
900 if cmp != core::cmp::Ordering::Equal {
901 return cmp;
902 }
903 }
904 core::cmp::Ordering::Equal
905}
906
907fn finalize_synth_rows(
908 order: &[(Vec<Value>, Vec<AggState>)],
909 agg_specs: &[AggSpec],
910 synth_schema: &[ColumnSchema],
911 rows: &[RowRef<'_>],
912 schema_cols: &[ColumnSchema],
913 table_alias: Option<&str>,
914) -> Result<Vec<Row>, EvalError> {
915 let ctx = EvalContext::new(schema_cols, table_alias);
916 let direct_arg_vals: Vec<Option<Value>> = agg_specs
919 .iter()
920 .map(|spec| match (&spec.direct_arg, rows.first()) {
921 (Some(e), Some(r)) => eval::eval_expr(e, &r.as_row(), &ctx).map(Some),
922 _ => Ok(None),
923 })
924 .collect::<Result<_, _>>()?;
925
926 let mut synth_rows: Vec<Row> = Vec::new();
928 for (gvals, states) in order {
929 let mut values: Vec<Value> = Vec::with_capacity(synth_schema.len());
930 values.extend(gvals.iter().cloned());
931 for (i, st) in states.iter().enumerate() {
932 if agg_specs[i].first_ordered {
935 values.push(
936 st.first_best
937 .as_ref()
938 .map_or(Value::Null, |(_, v)| v.clone()),
939 );
940 continue;
941 }
942 let st_sorted;
946 let st_final: &AggState =
947 if !agg_specs[i].order_by.is_empty() && st.item_keys.len() == st.items.len() {
948 let mut idx: Vec<usize> = (0..st.items.len()).collect();
949 let ob = &agg_specs[i].order_by;
950 idx.sort_by(|&x, &y| cmp_order_keys(ob, &st.item_keys[x], &st.item_keys[y]));
951 let mut sorted = st.clone();
952 sorted.items = idx.iter().map(|&j| st.items[j].clone()).collect();
953 st_sorted = sorted;
954 &st_sorted
955 } else {
956 st
957 };
958 let v = if is_within_group_name(&agg_specs[i].name) {
961 finalize_ordered_set(
962 &agg_specs[i].name,
963 st_final,
964 direct_arg_vals[i].as_ref(),
965 agg_specs[i].order_by.first(),
966 )
967 } else {
968 finalize(&agg_specs[i].name, st_final)
969 };
970 values.push(v);
971 }
972 synth_rows.push(Row::new(values));
973 }
974 Ok(synth_rows)
975}
976
977#[allow(clippy::too_many_lines)]
982fn project_groups(
983 synth_rows: Vec<Row>,
984 stmt: &SelectStatement,
985 group_exprs: &[Expr],
986 agg_specs: &[AggSpec],
987 synth_schema: &[ColumnSchema],
988 correlated_eval: Option<CorrelatedEval<'_>>,
989) -> Result<Projection, EvalError> {
990 let columns: Vec<ColumnSchema> = stmt
995 .items
996 .iter()
997 .map(|item| match item {
998 SelectItem::Wildcard => Err(EvalError::TypeMismatch {
999 detail: "SELECT * with aggregates is not supported".into(),
1000 }),
1001 SelectItem::Expr { expr, alias } => {
1002 let rewritten = rewrite_expr(expr, group_exprs, agg_specs);
1003 let name = alias.clone().unwrap_or_else(|| expr.to_string());
1004 Ok(ColumnSchema::new(
1005 name,
1006 agg_or_group_type(&rewritten, synth_schema),
1007 true,
1008 ))
1009 }
1010 })
1011 .collect::<Result<_, _>>()?;
1012
1013 let synth_ctx = EvalContext::new(synth_schema, None);
1018 let having_rewritten = stmt
1019 .having
1020 .as_ref()
1021 .map(|h| rewrite_expr(h, group_exprs, agg_specs));
1022 let items_rewritten: alloc::vec::Vec<Option<Expr>> = stmt
1028 .items
1029 .iter()
1030 .map(|item| match item {
1031 SelectItem::Expr { expr, .. } => Some(rewrite_expr(expr, group_exprs, agg_specs)),
1032 SelectItem::Wildcard => None,
1033 })
1034 .collect();
1035 let order_rewritten: Vec<Expr> = stmt
1040 .order_by
1041 .iter()
1042 .map(|o| rewrite_expr(&o.expr, group_exprs, agg_specs))
1043 .collect();
1044 let defer_enabled = correlated_eval.is_some()
1045 && !stmt.distinct
1046 && !having_rewritten
1047 .as_ref()
1048 .is_some_and(crate::expr_has_subquery)
1049 && !order_rewritten.iter().any(crate::expr_has_subquery);
1050 let deferred: Vec<(usize, Expr)> = if defer_enabled {
1051 items_rewritten
1052 .iter()
1053 .enumerate()
1054 .filter_map(|(i, r)| {
1055 r.as_ref()
1056 .filter(|e| crate::expr_has_subquery(e))
1057 .map(|e| (i, e.clone()))
1058 })
1059 .collect()
1060 } else {
1061 Vec::new()
1062 };
1063 let having_compiled = having_rewritten
1069 .as_ref()
1070 .filter(|h| eval::fully_compilable(h))
1071 .map(|h| eval::compile_expr(h, &synth_ctx));
1072 let items_compiled: Vec<Option<eval::CompiledExpr>> = items_rewritten
1073 .iter()
1074 .enumerate()
1075 .map(|(i, r)| {
1076 r.as_ref()
1077 .filter(|e| !deferred.iter().any(|(c, _)| *c == i) && eval::fully_compilable(e))
1078 .map(|e| eval::compile_expr(e, &synth_ctx))
1079 })
1080 .collect();
1081 let mut kept_synth: Vec<Row> = Vec::new();
1082 let mut out_rows: Vec<Row> = Vec::new();
1083 let mut stack: Vec<Value> = Vec::new();
1084 for srow in synth_rows {
1085 if let Some(hc) = &having_compiled {
1086 let cond = eval::eval_compiled(hc, &srow, &synth_ctx, &mut stack)?;
1087 if !matches!(cond, Value::Bool(true)) {
1088 continue;
1089 }
1090 } else if let Some(h) = &having_rewritten {
1091 let cond = match correlated_eval {
1092 Some(f) if crate::expr_has_subquery(h) => f(h, &srow, &synth_ctx)?,
1093 _ => eval::eval_expr(h, &srow, &synth_ctx)?,
1094 };
1095 if !matches!(cond, Value::Bool(true)) {
1096 continue;
1097 }
1098 }
1099 let mut values: Vec<Value> = Vec::with_capacity(columns.len());
1100 for (i, rewritten) in items_rewritten.iter().enumerate() {
1101 let Some(rewritten) = rewritten else { continue };
1102 if deferred.iter().any(|(c, _)| *c == i) {
1103 values.push(Value::Null);
1104 continue;
1105 }
1106 values.push(if let Some(cc) = &items_compiled[i] {
1107 eval::eval_compiled(cc, &srow, &synth_ctx, &mut stack)?
1108 } else {
1109 match correlated_eval {
1110 Some(f) if crate::expr_has_subquery(rewritten) => {
1111 f(rewritten, &srow, &synth_ctx)?
1112 }
1113 _ => eval::eval_expr(rewritten, &srow, &synth_ctx)?,
1114 }
1115 });
1116 }
1117 kept_synth.push(srow);
1118 out_rows.push(Row::new(values));
1119 }
1120 Ok(Projection {
1121 columns,
1122 out_rows,
1123 kept_synth,
1124 deferred,
1125 order_rewritten,
1126 })
1127}
1128
1129fn sort_synth_by_order_by(
1133 synth_schema: &[ColumnSchema],
1134 order_by: &[spg_sql::ast::OrderBy],
1135 order_rewritten: &[Expr],
1136 mut kept_synth: Vec<Row>,
1137 mut out_rows: Vec<Row>,
1138 correlated_eval: Option<CorrelatedEval<'_>>,
1139) -> Result<(Vec<Row>, Vec<Row>), EvalError> {
1140 let synth_ctx = EvalContext::new(synth_schema, None);
1141 let keys_meta: Vec<(bool, Option<bool>)> =
1146 order_by.iter().map(|o| (o.desc, o.nulls_first)).collect();
1147 let order_compiled: Vec<Option<eval::CompiledExpr>> = order_rewritten
1150 .iter()
1151 .map(|e| {
1152 Some(e)
1153 .filter(|e| eval::fully_compilable(e))
1154 .map(|e| eval::compile_expr(e, &synth_ctx))
1155 })
1156 .collect();
1157 let mut keystack: Vec<Value> = Vec::new();
1161 let mut tagged: Vec<(Vec<Value>, Row, Row)> = Vec::with_capacity(kept_synth.len());
1162 for (s, o) in kept_synth.into_iter().zip(out_rows) {
1163 let mut keys = Vec::with_capacity(order_rewritten.len());
1164 for (e, oc) in order_rewritten.iter().zip(&order_compiled) {
1165 keys.push(if let Some(oc) = oc {
1166 eval::eval_compiled(oc, &s, &synth_ctx, &mut keystack)?
1167 } else {
1168 match correlated_eval {
1169 Some(f) if crate::expr_has_subquery(e) => f(e, &s, &synth_ctx)?,
1170 _ => eval::eval_expr(e, &s, &synth_ctx)?,
1171 }
1172 });
1173 }
1174 tagged.push((keys, s, o));
1175 }
1176 tagged.sort_by(|a, b| {
1177 use core::cmp::Ordering;
1178 for (i, (ka, kb)) in a.0.iter().zip(b.0.iter()).enumerate() {
1179 let (desc, nf) = keys_meta[i];
1180 let cmp = crate::order_by_value_cmp(desc, nf, ka, kb);
1181 if cmp != Ordering::Equal {
1182 return cmp;
1183 }
1184 }
1185 Ordering::Equal
1186 });
1187 kept_synth = Vec::with_capacity(tagged.len());
1188 out_rows = Vec::with_capacity(tagged.len());
1189 for (_, s, o) in tagged {
1190 kept_synth.push(s);
1191 out_rows.push(o);
1192 }
1193 Ok((kept_synth, out_rows))
1194}
1195
1196fn validate_agg_arities(stmt: &SelectStatement, _specs: &[AggSpec]) -> Result<(), EvalError> {
1202 fn walk(e: &Expr) -> Result<(), EvalError> {
1203 if let Expr::FunctionCall { name, args } = e {
1204 let lower = name.to_ascii_lowercase();
1205 let expected: Option<usize> = match lower.as_str() {
1206 "count_star" => Some(0),
1207 "count" | "sum" | "avg" | "min" | "max" | "array_agg"
1208 | "bool_and" | "bool_or" | "every"
1212 | "stddev" | "stddev_samp" | "stddev_pop"
1215 | "variance" | "var_samp" | "var_pop"
1216 | "bit_and" | "bit_or" | "bit_xor"
1217 | "json_agg" | "jsonb_agg" => Some(1),
1218 "string_agg"
1221 | "covar_pop" | "covar_samp" | "corr"
1222 | "regr_count" | "regr_avgx" | "regr_avgy" | "regr_slope"
1223 | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy"
1224 | "json_object_agg" | "jsonb_object_agg" => Some(2),
1225 _ => None,
1226 };
1227 if let Some(want) = expected
1228 && args.len() != want
1229 {
1230 return Err(EvalError::TypeMismatch {
1231 detail: alloc::format!("{lower}() takes {want} arg(s), got {}", args.len()),
1232 });
1233 }
1234 for a in args {
1235 walk(a)?;
1236 }
1237 } else if let Expr::Binary { lhs, rhs, .. } = e {
1238 walk(lhs)?;
1239 walk(rhs)?;
1240 } else if let Expr::Unary { expr, .. }
1241 | Expr::Cast { expr, .. }
1242 | Expr::IsNull { expr, .. } = e
1243 {
1244 walk(expr)?;
1245 }
1246 Ok(())
1247 }
1248 for item in &stmt.items {
1249 if let SelectItem::Expr { expr, .. } = item {
1250 walk(expr)?;
1251 }
1252 }
1253 for o in &stmt.order_by {
1254 walk(&o.expr)?;
1255 }
1256 if let Some(h) = &stmt.having {
1257 walk(h)?;
1258 }
1259 Ok(())
1260}
1261
1262fn first_ordered_array_agg(e: &Expr) -> Option<(&Expr, &[spg_sql::ast::OrderBy], Option<&Expr>)> {
1271 let Expr::ArraySubscript { target, index } = e else {
1272 return None;
1273 };
1274 if !matches!(
1275 index.as_ref(),
1276 Expr::Literal(spg_sql::ast::Literal::Integer(1))
1277 ) {
1278 return None;
1279 }
1280 let Expr::AggregateOrdered {
1281 call,
1282 order_by,
1283 distinct,
1284 filter,
1285 } = target.as_ref()
1286 else {
1287 return None;
1288 };
1289 if *distinct || order_by.is_empty() {
1290 return None;
1291 }
1292 let Expr::FunctionCall { name, args } = call.as_ref() else {
1293 return None;
1294 };
1295 if !name.eq_ignore_ascii_case("array_agg") || args.len() != 1 {
1296 return None;
1297 }
1298 Some((&args[0], order_by, filter.as_deref()))
1299}
1300
1301fn collect_aggregates(e: &Expr, out: &mut Vec<AggSpec>) {
1302 match e {
1303 Expr::AggregateOrdered {
1306 call,
1307 order_by,
1308 distinct,
1309 filter,
1310 } => {
1311 if let Expr::FunctionCall { name, args } = call.as_ref() {
1312 let lower = name.to_ascii_lowercase();
1313 if is_aggregate_name(&lower) {
1314 let canonical = if lower == "every" {
1315 "bool_and".to_string()
1316 } else {
1317 lower
1318 };
1319 let ordered_set = is_within_group_name(&canonical);
1324 let (arg, direct_arg) = if ordered_set {
1325 (
1326 order_by.first().map(|o| o.expr.clone()),
1327 args.first().cloned(),
1328 )
1329 } else {
1330 (args.first().cloned(), None)
1331 };
1332 let spec = AggSpec {
1333 name: canonical.clone(),
1334 arg,
1335 arg2: if agg_uses_second_arg(&canonical) {
1336 args.get(1).cloned()
1337 } else {
1338 None
1339 },
1340 distinct: *distinct,
1341 order_by: order_by.clone(),
1342 filter: filter.as_deref().cloned(),
1343 direct_arg,
1344 first_ordered: false,
1345 };
1346 if !out.iter().any(|s| {
1347 s.name == spec.name
1348 && s.arg == spec.arg
1349 && s.arg2 == spec.arg2
1350 && s.distinct == spec.distinct
1351 && s.order_by == spec.order_by
1352 && s.filter == spec.filter
1353 && s.direct_arg == spec.direct_arg
1354 && s.first_ordered == spec.first_ordered
1355 }) {
1356 out.push(spec);
1357 }
1358 return;
1359 }
1360 }
1361 collect_aggregates(call, out);
1362 for o in order_by {
1363 collect_aggregates(&o.expr, out);
1364 }
1365 }
1366 Expr::FunctionCall { name, args } => {
1367 let lower = name.to_ascii_lowercase();
1368 if is_aggregate_name(&lower) {
1369 let arg = if lower == "count_star" {
1370 None
1371 } else {
1372 args.first().cloned()
1373 };
1374 let arg2 = if agg_uses_second_arg(&lower) {
1378 args.get(1).cloned()
1379 } else {
1380 None
1381 };
1382 let canonical = if lower == "every" {
1386 "bool_and".to_string()
1387 } else {
1388 lower
1389 };
1390 let spec = AggSpec {
1391 name: canonical,
1392 arg: arg.clone(),
1393 arg2: arg2.clone(),
1394 distinct: false,
1395 order_by: Vec::new(),
1396 filter: None,
1397 direct_arg: None,
1398 first_ordered: false,
1399 };
1400 if !out.iter().any(|s| {
1401 s.name == spec.name
1402 && s.arg == spec.arg
1403 && s.arg2 == spec.arg2
1404 && !s.distinct
1405 && s.order_by == spec.order_by
1406 && s.filter.is_none()
1407 && !s.first_ordered
1408 }) {
1409 out.push(spec);
1410 }
1411 } else {
1414 for a in args {
1415 collect_aggregates(a, out);
1416 }
1417 }
1418 }
1419 Expr::Binary { lhs, rhs, .. } => {
1420 collect_aggregates(lhs, out);
1421 collect_aggregates(rhs, out);
1422 }
1423 Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
1424 collect_aggregates(expr, out);
1425 }
1426 Expr::Like { expr, pattern, .. } => {
1427 collect_aggregates(expr, out);
1428 collect_aggregates(pattern, out);
1429 }
1430 Expr::InList { expr, list, .. } => {
1431 collect_aggregates(expr, out);
1432 for item in list {
1433 collect_aggregates(item, out);
1434 }
1435 }
1436 Expr::Extract { source, .. } => collect_aggregates(source, out),
1437 Expr::ScalarSubquery(_)
1440 | Expr::Exists { .. }
1441 | Expr::InSubquery { .. }
1442 | Expr::WindowFunction { .. }
1443 | Expr::Literal(_)
1444 | Expr::Placeholder(_)
1445 | Expr::Column(_) => {}
1446 Expr::Array(items) => {
1449 for elem in items {
1450 collect_aggregates(elem, out);
1451 }
1452 }
1453 Expr::ArraySubscript { target, index } => {
1454 if let Some((arg, order_by, filter)) = first_ordered_array_agg(e) {
1459 let spec = AggSpec {
1460 name: "array_agg".to_string(),
1461 arg: Some(arg.clone()),
1462 arg2: None,
1463 distinct: false,
1464 order_by: order_by.to_vec(),
1465 filter: filter.cloned(),
1466 direct_arg: None,
1467 first_ordered: true,
1468 };
1469 if !out.iter().any(|s| {
1470 s.name == spec.name
1471 && s.arg == spec.arg
1472 && s.order_by == spec.order_by
1473 && s.filter == spec.filter
1474 && s.first_ordered
1475 }) {
1476 out.push(spec);
1477 }
1478 return;
1479 }
1480 collect_aggregates(target, out);
1481 collect_aggregates(index, out);
1482 }
1483 Expr::AnyAll { expr, array, .. } => {
1484 collect_aggregates(expr, out);
1485 collect_aggregates(array, out);
1486 }
1487 Expr::Case {
1488 operand,
1489 branches,
1490 else_branch,
1491 } => {
1492 if let Some(o) = operand {
1493 collect_aggregates(o, out);
1494 }
1495 for (w, t) in branches {
1496 collect_aggregates(w, out);
1497 collect_aggregates(t, out);
1498 }
1499 if let Some(e) = else_branch {
1500 collect_aggregates(e, out);
1501 }
1502 }
1503 }
1504}
1505
1506fn update_state(
1507 st: &mut AggState,
1508 name: &str,
1509 v: &Value,
1510 arg2: Option<&Value>,
1511 order_keys: Option<Vec<Value>>,
1512) -> Result<(), EvalError> {
1513 let is_null = matches!(v, Value::Null);
1514 match name {
1515 "count_star" => st.count += 1,
1516 "count" => {
1517 if !is_null {
1518 st.count += 1;
1519 }
1520 }
1521 "sum" | "avg" => {
1522 if is_null {
1523 return Ok(());
1524 }
1525 st.count += 1;
1526 match v {
1527 Value::Int(n) => st.sum_int += i64::from(*n),
1528 Value::BigInt(n) => st.sum_int += *n,
1529 Value::Float(x) => {
1530 st.use_float = true;
1531 st.sum_float += *x;
1532 }
1533 other => {
1534 return Err(EvalError::TypeMismatch {
1535 detail: format!("sum/avg need numeric, got {:?}", other.data_type()),
1536 });
1537 }
1538 }
1539 }
1540 "min" => {
1541 if is_null {
1542 return Ok(());
1543 }
1544 match &st.extreme {
1545 None => st.extreme = Some(v.clone()),
1546 Some(cur) => {
1547 if value_cmp(v, cur) == core::cmp::Ordering::Less {
1548 st.extreme = Some(v.clone());
1549 }
1550 }
1551 }
1552 }
1553 "max" => {
1554 if is_null {
1555 return Ok(());
1556 }
1557 match &st.extreme {
1558 None => st.extreme = Some(v.clone()),
1559 Some(cur) => {
1560 if value_cmp(v, cur) == core::cmp::Ordering::Greater {
1561 st.extreme = Some(v.clone());
1562 }
1563 }
1564 }
1565 }
1566 "string_agg" => {
1574 if let Some(sep) = arg2
1575 && let Value::Text(s) = sep
1576 {
1577 st.separator = Some(s.clone());
1578 }
1579 if is_null {
1580 return Ok(());
1581 }
1582 if let Value::Text(s) = v {
1583 st.items.push(Value::Text(s.clone()));
1584 if let Some(k) = order_keys {
1585 st.item_keys.push(k);
1586 }
1587 st.count += 1;
1588 } else {
1589 return Err(EvalError::TypeMismatch {
1590 detail: format!("string_agg requires text value, got {:?}", v.data_type()),
1591 });
1592 }
1593 }
1594 "array_agg" => {
1600 st.items.push(v.clone());
1601 if let Some(k) = order_keys {
1602 st.item_keys.push(k);
1603 }
1604 st.count += 1;
1605 }
1606 "bool_and" => {
1610 if is_null {
1611 return Ok(());
1612 }
1613 let b = match v {
1614 Value::Bool(b) => *b,
1615 other => {
1616 return Err(EvalError::TypeMismatch {
1617 detail: format!("bool_and requires bool, got {:?}", other.data_type()),
1618 });
1619 }
1620 };
1621 st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc && b));
1622 }
1623 "bool_or" => {
1626 if is_null {
1627 return Ok(());
1628 }
1629 let b = match v {
1630 Value::Bool(b) => *b,
1631 other => {
1632 return Err(EvalError::TypeMismatch {
1633 detail: format!("bool_or requires bool, got {:?}", other.data_type()),
1634 });
1635 }
1636 };
1637 st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc || b));
1638 }
1639 "stddev" | "stddev_samp" | "stddev_pop" | "variance" | "var_samp" | "var_pop" => {
1643 if is_null {
1644 return Ok(());
1645 }
1646 let x = match v {
1647 Value::Int(n) => f64::from(*n),
1648 Value::SmallInt(n) => f64::from(*n),
1649 Value::BigInt(n) => *n as f64,
1650 Value::Float(x) => *x,
1651 other => {
1652 return Err(EvalError::TypeMismatch {
1653 detail: format!("{name} needs numeric, got {:?}", other.data_type()),
1654 });
1655 }
1656 };
1657 st.count += 1;
1658 st.sum_float += x;
1659 st.sum_sq += x * x;
1660 }
1661 "bit_and" | "bit_or" | "bit_xor" => {
1663 if is_null {
1664 return Ok(());
1665 }
1666 let n = match v {
1667 Value::Int(n) => i64::from(*n),
1668 Value::SmallInt(n) => i64::from(*n),
1669 Value::BigInt(n) => *n,
1670 other => {
1671 return Err(EvalError::TypeMismatch {
1672 detail: format!("{name} needs integer, got {:?}", other.data_type()),
1673 });
1674 }
1675 };
1676 st.bit_acc = Some(match (st.bit_acc, name) {
1677 (None, _) => n,
1678 (Some(acc), "bit_and") => acc & n,
1679 (Some(acc), "bit_or") => acc | n,
1680 (Some(acc), _) => acc ^ n, });
1682 }
1683 n if is_within_group_name(n) => {
1688 if is_null {
1689 return Ok(());
1690 }
1691 st.items.push(v.clone());
1692 if let Some(k) = order_keys {
1693 st.item_keys.push(k);
1694 }
1695 st.count += 1;
1696 }
1697 n if is_regression_name(n) => {
1701 let (Some(y), Some(x)) = (agg_value_to_f64(v), arg2.and_then(agg_value_to_f64)) else {
1702 return Ok(()); };
1704 st.reg_n += 1;
1705 st.reg_sx += x;
1706 st.reg_sy += y;
1707 st.reg_sxx += x * x;
1708 st.reg_syy += y * y;
1709 st.reg_sxy += x * y;
1710 }
1711 "json_agg" | "jsonb_agg" => {
1714 st.items.push(v.clone());
1715 st.count += 1;
1716 }
1717 "json_object_agg" | "jsonb_object_agg" => {
1721 if is_null {
1722 return Ok(());
1723 }
1724 st.items.push(v.clone());
1725 st.aux_items.push(arg2.cloned().unwrap_or(Value::Null));
1726 st.count += 1;
1727 }
1728 _ => unreachable!("non-aggregate {name} in update_state"),
1729 }
1730 Ok(())
1731}
1732
1733#[allow(clippy::cast_precision_loss)]
1734fn finalize(name: &str, st: &AggState) -> Value {
1735 match name {
1736 "count" | "count_star" => Value::BigInt(st.count),
1737 "sum" => {
1738 if st.count == 0 {
1739 Value::Null
1740 } else if st.use_float {
1741 Value::Float(st.sum_float + (st.sum_int as f64))
1742 } else {
1743 Value::BigInt(st.sum_int)
1744 }
1745 }
1746 "avg" => {
1747 if st.count == 0 {
1748 Value::Null
1749 } else {
1750 let total = if st.use_float {
1751 st.sum_float + (st.sum_int as f64)
1752 } else {
1753 st.sum_int as f64
1754 };
1755 Value::Float(total / (st.count as f64))
1756 }
1757 }
1758 "min" | "max" => st.extreme.clone().unwrap_or(Value::Null),
1759 "string_agg" => {
1763 if st.items.is_empty() {
1764 return Value::Null;
1765 }
1766 let sep = st.separator.clone().unwrap_or_default();
1767 let mut out = String::new();
1768 for (i, item) in st.items.iter().enumerate() {
1769 if i > 0 {
1770 out.push_str(&sep);
1771 }
1772 if let Value::Text(s) = item {
1773 out.push_str(s);
1774 }
1775 }
1776 Value::Text(out)
1777 }
1778 "array_agg" => {
1785 if st.items.is_empty() {
1786 return Value::Null;
1787 }
1788 let probe = st.items.iter().find(|v| !v.is_null());
1789 match probe.and_then(spg_storage::Value::data_type) {
1790 Some(DataType::Int) | Some(DataType::SmallInt) => {
1791 let items: Vec<Option<i32>> = st
1792 .items
1793 .iter()
1794 .map(|v| match v {
1795 Value::Int(n) => Some(*n),
1796 Value::SmallInt(n) => Some(i32::from(*n)),
1797 _ => None,
1798 })
1799 .collect();
1800 Value::IntArray(items)
1801 }
1802 Some(DataType::BigInt) => {
1803 let items: Vec<Option<i64>> = st
1804 .items
1805 .iter()
1806 .map(|v| match v {
1807 Value::BigInt(n) => Some(*n),
1808 _ => None,
1809 })
1810 .collect();
1811 Value::BigIntArray(items)
1812 }
1813 _ => {
1814 let items: Vec<Option<String>> = st
1815 .items
1816 .iter()
1817 .map(|v| match v {
1818 Value::Text(s) => Some(s.clone()),
1819 Value::Null => None,
1820 other => Some(format!("{other:?}")),
1821 })
1822 .collect();
1823 Value::TextArray(items)
1824 }
1825 }
1826 }
1827 "bool_and" | "bool_or" => st.bool_acc.map_or(Value::Null, Value::Bool),
1831 "variance" | "var_samp" | "var_pop" | "stddev" | "stddev_samp" | "stddev_pop" => {
1835 let n = st.count;
1836 if n == 0 {
1837 return Value::Null;
1838 }
1839 let nf = n as f64;
1840 let ss = st.sum_sq - (st.sum_float * st.sum_float) / nf;
1842 let pop = name.ends_with("_pop");
1843 let denom = if pop { nf } else { nf - 1.0 };
1844 if denom <= 0.0 {
1845 return Value::Null;
1847 }
1848 let var = (ss / denom).max(0.0); if name.starts_with("stddev") {
1850 Value::Float(crate::eval::f64_sqrt(var))
1851 } else {
1852 Value::Float(var)
1853 }
1854 }
1855 "bit_and" | "bit_or" | "bit_xor" => st.bit_acc.map_or(Value::Null, Value::BigInt),
1858 "regr_count" => Value::BigInt(st.reg_n),
1862 "covar_pop" | "covar_samp" | "corr" | "regr_avgx" | "regr_avgy" | "regr_slope"
1863 | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy" => {
1864 let n = st.reg_n;
1865 if n == 0 {
1866 return Value::Null;
1867 }
1868 let nf = n as f64;
1869 let sxx = st.reg_sxx - st.reg_sx * st.reg_sx / nf;
1870 let syy = st.reg_syy - st.reg_sy * st.reg_sy / nf;
1871 let sxy = st.reg_sxy - st.reg_sx * st.reg_sy / nf;
1872 let avgx = st.reg_sx / nf;
1873 let avgy = st.reg_sy / nf;
1874 let out = match name {
1875 "regr_avgx" => Some(avgx),
1876 "regr_avgy" => Some(avgy),
1877 "regr_sxx" => Some(sxx),
1878 "regr_syy" => Some(syy),
1879 "regr_sxy" => Some(sxy),
1880 "covar_pop" => Some(sxy / nf),
1881 "covar_samp" => (n >= 2).then(|| sxy / (nf - 1.0)),
1882 "regr_slope" => (sxx != 0.0).then(|| sxy / sxx),
1883 "regr_intercept" => (sxx != 0.0).then(|| avgy - (sxy / sxx) * avgx),
1884 "corr" => {
1885 let d = sxx * syy;
1886 (d > 0.0).then(|| sxy / crate::eval::f64_sqrt(d))
1887 }
1888 "regr_r2" => {
1890 if sxx == 0.0 {
1891 None
1892 } else if syy == 0.0 {
1893 Some(1.0)
1894 } else {
1895 Some((sxy * sxy) / (sxx * syy))
1896 }
1897 }
1898 _ => None,
1899 };
1900 out.map_or(Value::Null, Value::Float)
1901 }
1902 "json_agg" | "jsonb_agg" => {
1905 if st.items.is_empty() {
1906 return Value::Null;
1907 }
1908 let mut out = String::from("[");
1909 for (i, item) in st.items.iter().enumerate() {
1910 if i > 0 {
1911 out.push_str(", ");
1912 }
1913 out.push_str(&crate::json::value_to_json_text(item));
1914 }
1915 out.push(']');
1916 Value::Json(out)
1917 }
1918 "json_object_agg" | "jsonb_object_agg" => {
1921 if st.items.is_empty() {
1922 return Value::Null;
1923 }
1924 let mut out = String::from("{");
1925 for (i, key) in st.items.iter().enumerate() {
1926 if i > 0 {
1927 out.push_str(", ");
1928 }
1929 let key_text = match key {
1931 Value::Text(s) | Value::Json(s) => s.clone(),
1932 other => crate::json::value_to_json_text(other),
1933 };
1934 out.push_str(&crate::json::value_to_json_text(&Value::Text(key_text)));
1935 out.push_str(": ");
1936 let val = st.aux_items.get(i).unwrap_or(&Value::Null);
1937 out.push_str(&crate::json::value_to_json_text(val));
1938 }
1939 out.push('}');
1940 Value::Json(out)
1941 }
1942 _ => unreachable!(),
1945 }
1946}
1947
1948fn agg_value_to_f64(v: &Value) -> Option<f64> {
1950 match v {
1951 Value::Int(n) => Some(f64::from(*n)),
1952 Value::SmallInt(n) => Some(f64::from(*n)),
1953 Value::BigInt(n) => Some(*n as f64),
1954 Value::Float(x) => Some(*x),
1955 _ => None,
1956 }
1957}
1958
1959#[allow(
1966 clippy::cast_precision_loss,
1967 clippy::cast_possible_truncation,
1968 clippy::cast_sign_loss
1969)]
1970fn finalize_ordered_set(
1971 name: &str,
1972 st: &AggState,
1973 direct: Option<&Value>,
1974 order: Option<&spg_sql::ast::OrderBy>,
1975) -> Value {
1976 let fraction = direct;
1977 let items = &st.items;
1978 if items.is_empty() {
1979 return match name {
1982 "rank" | "dense_rank" => Value::BigInt(1),
1983 "percent_rank" => Value::Float(0.0),
1984 "cume_dist" => Value::Float(1.0),
1985 _ => Value::Null,
1986 };
1987 }
1988 let n = items.len();
1989 match name {
1990 "rank" | "dense_rank" | "percent_rank" | "cume_dist" => {
1993 let Some(h) = fraction else {
1994 return Value::Null;
1995 };
1996 let (desc, nulls_first) = order.map_or((false, None), |o| (o.desc, o.nulls_first));
1997 let mut before = 0usize; let mut before_or_eq = 0usize; let mut distinct_before = 0usize;
2000 let mut last_before: Option<&Value> = None;
2001 for it in items {
2002 match crate::order_by_value_cmp(desc, nulls_first, it, h) {
2003 core::cmp::Ordering::Less => {
2004 before += 1;
2005 before_or_eq += 1;
2006 if last_before
2007 .is_none_or(|p| value_cmp(p, it) != core::cmp::Ordering::Equal)
2008 {
2009 distinct_before += 1;
2010 last_before = Some(it);
2011 }
2012 }
2013 core::cmp::Ordering::Equal => before_or_eq += 1,
2014 core::cmp::Ordering::Greater => {}
2015 }
2016 }
2017 let nn = n as f64;
2018 match name {
2019 "rank" => Value::BigInt((before + 1) as i64),
2020 "dense_rank" => Value::BigInt((distinct_before + 1) as i64),
2021 "percent_rank" => Value::Float(before as f64 / nn),
2022 "cume_dist" => Value::Float((before_or_eq as f64 + 1.0) / (nn + 1.0)),
2023 _ => unreachable!(),
2024 }
2025 }
2026 "mode" => {
2030 let (mut best_i, mut best_cnt) = (0usize, 1usize);
2031 let (mut run_i, mut run_cnt) = (0usize, 1usize);
2032 for i in 1..n {
2033 if value_cmp(&items[i], &items[run_i]) == core::cmp::Ordering::Equal {
2034 run_cnt += 1;
2035 } else {
2036 run_i = i;
2037 run_cnt = 1;
2038 }
2039 if run_cnt > best_cnt {
2040 best_cnt = run_cnt;
2041 best_i = run_i;
2042 }
2043 }
2044 items[best_i].clone()
2045 }
2046 "percentile_disc" => {
2048 let f = fraction
2049 .and_then(agg_value_to_f64)
2050 .unwrap_or(0.0)
2051 .clamp(0.0, 1.0);
2052 let idx = if f <= 0.0 {
2053 0
2054 } else {
2055 (crate::eval::f64_ceil(f * n as f64) as usize)
2056 .saturating_sub(1)
2057 .min(n - 1)
2058 };
2059 items[idx].clone()
2060 }
2061 "percentile_cont" => {
2063 let f = fraction
2064 .and_then(agg_value_to_f64)
2065 .unwrap_or(0.0)
2066 .clamp(0.0, 1.0);
2067 let Some(nums) = items
2068 .iter()
2069 .map(agg_value_to_f64)
2070 .collect::<Option<Vec<f64>>>()
2071 else {
2072 return Value::Null; };
2074 if n == 1 {
2075 return Value::Float(nums[0]);
2076 }
2077 let rank = f * (n as f64 - 1.0);
2078 let lo = crate::eval::f64_floor(rank) as usize;
2079 let hi = crate::eval::f64_ceil(rank) as usize;
2080 let frac = rank - lo as f64;
2081 Value::Float(nums[lo] + (nums[hi] - nums[lo]) * frac)
2082 }
2083 _ => unreachable!(),
2084 }
2085}
2086
2087fn infer_agg_type(spec: &AggSpec, schema_cols: &[ColumnSchema]) -> DataType {
2088 let arg_ty = spec
2092 .arg
2093 .as_ref()
2094 .and_then(|a| crate::describe::describe_expr(a, schema_cols))
2095 .map(|shape| shape.ty);
2096 if spec.first_ordered {
2099 return arg_ty.unwrap_or(DataType::Text);
2100 }
2101 match spec.name.as_str() {
2102 "count" | "count_star" => DataType::BigInt,
2103 "sum" => match arg_ty {
2104 Some(DataType::Float) => DataType::Float,
2105 _ => DataType::BigInt,
2106 },
2107 "avg" => DataType::Float,
2108 "string_agg" => DataType::Text,
2110 "array_agg" => match arg_ty {
2111 Some(DataType::Int | DataType::SmallInt) => DataType::IntArray,
2112 Some(DataType::BigInt) => DataType::BigIntArray,
2113 _ => DataType::TextArray,
2114 },
2115 "bool_and" | "bool_or" => DataType::Bool,
2118 "stddev" | "stddev_samp" | "stddev_pop" | "variance" | "var_samp" | "var_pop"
2122 | "percentile_cont" | "covar_pop" | "covar_samp" | "corr" | "regr_avgx" | "regr_avgy"
2123 | "regr_slope" | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy" => {
2124 DataType::Float
2125 }
2126 "bit_and" | "bit_or" | "bit_xor" | "regr_count" | "rank" | "dense_rank" => DataType::BigInt,
2129 "percent_rank" | "cume_dist" => DataType::Float,
2131 "json_agg" | "jsonb_agg" | "json_object_agg" | "jsonb_object_agg" => DataType::Json,
2133 _ => arg_ty.unwrap_or(DataType::Text),
2137 }
2138}
2139
2140fn agg_or_group_type(e: &Expr, synth: &[ColumnSchema]) -> DataType {
2141 if let Expr::Column(c) = e
2142 && let Some(s) = synth.iter().find(|s| s.name == c.name)
2143 {
2144 return s.ty;
2145 }
2146 crate::describe::describe_expr(e, synth)
2152 .map(|shape| shape.ty)
2153 .unwrap_or(DataType::Text)
2154}
2155
2156fn rewrite_expr(e: &Expr, group_exprs: &[Expr], aggs: &[AggSpec]) -> Expr {
2157 if let Some((arg, order_by, filter)) = first_ordered_array_agg(e) {
2163 let arg_owned = Some(arg.clone());
2164 let filter_owned = filter.cloned();
2165 for (i, spec) in aggs.iter().enumerate() {
2166 if spec.first_ordered
2167 && spec.name == "array_agg"
2168 && spec.arg == arg_owned
2169 && spec.order_by == *order_by
2170 && spec.filter == filter_owned
2171 {
2172 return Expr::Column(spg_sql::ast::ColumnName {
2173 qualifier: None,
2174 name: format!("__agg_{i}"),
2175 });
2176 }
2177 }
2178 }
2179 if let Expr::AggregateOrdered {
2182 call,
2183 order_by,
2184 distinct,
2185 filter,
2186 } = e
2187 && let Expr::FunctionCall { name, args } = call.as_ref()
2188 {
2189 let lower = name.to_ascii_lowercase();
2190 if is_aggregate_name(&lower) {
2191 let canonical: &str = if lower == "every" { "bool_and" } else { &lower };
2192 let (arg, direct_arg) = if is_within_group_name(canonical) {
2195 (
2196 order_by.first().map(|o| o.expr.clone()),
2197 args.first().cloned(),
2198 )
2199 } else {
2200 (args.first().cloned(), None)
2201 };
2202 let arg2 = if agg_uses_second_arg(canonical) {
2203 args.get(1).cloned()
2204 } else {
2205 None
2206 };
2207 let filter_owned = filter.as_deref().cloned();
2208 for (i, spec) in aggs.iter().enumerate() {
2209 if spec.name == canonical
2210 && spec.arg == arg
2211 && spec.arg2 == arg2
2212 && spec.distinct == *distinct
2213 && spec.order_by == *order_by
2214 && spec.filter == filter_owned
2215 && spec.direct_arg == direct_arg
2216 {
2217 return Expr::Column(spg_sql::ast::ColumnName {
2218 qualifier: None,
2219 name: format!("__agg_{i}"),
2220 });
2221 }
2222 }
2223 }
2224 }
2225 if let Expr::FunctionCall { name, args } = e {
2227 let lower = name.to_ascii_lowercase();
2228 if is_aggregate_name(&lower) {
2229 let arg = if lower == "count_star" {
2230 None
2231 } else {
2232 args.first().cloned()
2233 };
2234 let arg2 = if agg_uses_second_arg(&lower) {
2238 args.get(1).cloned()
2239 } else {
2240 None
2241 };
2242 let canonical: &str = if lower == "every" {
2246 "bool_and"
2247 } else {
2248 lower.as_str()
2249 };
2250 for (i, spec) in aggs.iter().enumerate() {
2251 if spec.name == canonical
2252 && spec.arg == arg
2253 && spec.arg2 == arg2
2254 && !spec.distinct
2255 && spec.order_by.is_empty()
2256 {
2257 return Expr::Column(spg_sql::ast::ColumnName {
2258 qualifier: None,
2259 name: format!("__agg_{i}"),
2260 });
2261 }
2262 }
2263 }
2264 }
2265 for (i, g) in group_exprs.iter().enumerate() {
2267 if g == e {
2268 return Expr::Column(spg_sql::ast::ColumnName {
2269 qualifier: None,
2270 name: format!("__grp_{i}"),
2271 });
2272 }
2273 }
2274 match e {
2276 Expr::AggregateOrdered {
2277 call,
2278 order_by,
2279 distinct,
2280 filter,
2281 } => Expr::AggregateOrdered {
2282 call: Box::new(rewrite_expr(call, group_exprs, aggs)),
2283 distinct: *distinct,
2284 order_by: order_by
2285 .iter()
2286 .map(|o| spg_sql::ast::OrderBy {
2287 expr: rewrite_expr(&o.expr, group_exprs, aggs),
2288 desc: o.desc,
2289 nulls_first: o.nulls_first,
2290 })
2291 .collect(),
2292 filter: filter.clone(),
2295 },
2296 Expr::Binary { lhs, op, rhs } => Expr::Binary {
2297 lhs: Box::new(rewrite_expr(lhs, group_exprs, aggs)),
2298 op: *op,
2299 rhs: Box::new(rewrite_expr(rhs, group_exprs, aggs)),
2300 },
2301 Expr::Unary { op, expr } => Expr::Unary {
2302 op: *op,
2303 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2304 },
2305 Expr::Cast { expr, target } => Expr::Cast {
2306 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2307 target: *target,
2308 },
2309 Expr::IsNull { expr, negated } => Expr::IsNull {
2310 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2311 negated: *negated,
2312 },
2313 Expr::FunctionCall { name, args } => Expr::FunctionCall {
2314 name: name.clone(),
2315 args: args
2316 .iter()
2317 .map(|a| rewrite_expr(a, group_exprs, aggs))
2318 .collect(),
2319 },
2320 Expr::Like {
2321 expr,
2322 pattern,
2323 negated,
2324 case_insensitive,
2325 } => Expr::Like {
2326 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2327 pattern: Box::new(rewrite_expr(pattern, group_exprs, aggs)),
2328 negated: *negated,
2329 case_insensitive: *case_insensitive,
2330 },
2331 Expr::Extract { field, source } => Expr::Extract {
2332 field: *field,
2333 source: Box::new(rewrite_expr(source, group_exprs, aggs)),
2334 },
2335 Expr::ScalarSubquery(s) => {
2341 Expr::ScalarSubquery(Box::new(rewrite_group_keys_in_select(s, group_exprs)))
2342 }
2343 Expr::Exists { subquery, negated } => Expr::Exists {
2344 subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
2345 negated: *negated,
2346 },
2347 Expr::InSubquery {
2348 expr,
2349 subquery,
2350 negated,
2351 } => Expr::InSubquery {
2352 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2353 subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
2354 negated: *negated,
2355 },
2356 Expr::WindowFunction { .. } | Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => {
2359 e.clone()
2360 }
2361 Expr::Array(items) => Expr::Array(
2363 items
2364 .iter()
2365 .map(|elem| rewrite_expr(elem, group_exprs, aggs))
2366 .collect(),
2367 ),
2368 Expr::ArraySubscript { target, index } => Expr::ArraySubscript {
2369 target: Box::new(rewrite_expr(target, group_exprs, aggs)),
2370 index: Box::new(rewrite_expr(index, group_exprs, aggs)),
2371 },
2372 Expr::AnyAll {
2373 expr,
2374 op,
2375 array,
2376 is_any,
2377 } => Expr::AnyAll {
2378 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2379 op: *op,
2380 array: Box::new(rewrite_expr(array, group_exprs, aggs)),
2381 is_any: *is_any,
2382 },
2383 Expr::InList {
2384 expr,
2385 list,
2386 negated,
2387 } => Expr::InList {
2388 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2389 list: list
2390 .iter()
2391 .map(|item| rewrite_expr(item, group_exprs, aggs))
2392 .collect(),
2393 negated: *negated,
2394 },
2395 Expr::Case {
2396 operand,
2397 branches,
2398 else_branch,
2399 } => Expr::Case {
2400 operand: operand
2401 .as_deref()
2402 .map(|o| Box::new(rewrite_expr(o, group_exprs, aggs))),
2403 branches: branches
2404 .iter()
2405 .map(|(w, t)| {
2406 (
2407 rewrite_expr(w, group_exprs, aggs),
2408 rewrite_expr(t, group_exprs, aggs),
2409 )
2410 })
2411 .collect(),
2412 else_branch: else_branch
2413 .as_deref()
2414 .map(|e| Box::new(rewrite_expr(e, group_exprs, aggs))),
2415 },
2416 }
2417}
2418
2419fn rewrite_group_keys_in_select(
2424 s: &spg_sql::ast::SelectStatement,
2425 group_exprs: &[Expr],
2426) -> spg_sql::ast::SelectStatement {
2427 let mut out = s.clone();
2428 let _ = crate::walk_select_exprs_mut(&mut out, &mut |e| {
2429 *e = rewrite_expr(e, group_exprs, &[]);
2430 Ok(())
2431 });
2432 out
2433}
2434
2435fn encode_one(out: &mut String, v: &Value) {
2438 match v {
2439 Value::Null => out.push_str("N|"),
2440 Value::SmallInt(n) => {
2441 out.push('s');
2442 out.push_str(&n.to_string());
2443 out.push('|');
2444 }
2445 Value::Int(n) => {
2446 out.push('I');
2447 out.push_str(&n.to_string());
2448 out.push('|');
2449 }
2450 Value::BigInt(n) => {
2451 out.push('B');
2452 out.push_str(&n.to_string());
2453 out.push('|');
2454 }
2455 Value::Float(x) => {
2456 out.push('F');
2457 out.push_str(&x.to_string());
2458 out.push('|');
2459 }
2460 Value::Bool(b) => {
2461 out.push(if *b { 'T' } else { 'f' });
2462 out.push('|');
2463 }
2464 Value::Text(s) => {
2465 out.push('S');
2466 out.push_str(s);
2467 out.push('|');
2468 }
2469 Value::Vector(v) => {
2470 out.push('V');
2471 for x in v {
2472 out.push_str(&x.to_string());
2473 out.push(',');
2474 }
2475 out.push('|');
2476 }
2477 Value::Sq8Vector(q) => {
2483 out.push('Q');
2484 out.push_str(&q.min.to_string());
2485 out.push('@');
2486 out.push_str(&q.max.to_string());
2487 out.push(':');
2488 for b in &q.bytes {
2489 out.push_str(&b.to_string());
2490 out.push(',');
2491 }
2492 out.push('|');
2493 }
2494 Value::HalfVector(h) => {
2498 out.push('H');
2499 for b in &h.bytes {
2500 out.push_str(&b.to_string());
2501 out.push(',');
2502 }
2503 out.push('|');
2504 }
2505 Value::Numeric { scaled, scale } => {
2506 out.push('D');
2507 out.push_str(&scaled.to_string());
2508 out.push('@');
2509 out.push_str(&scale.to_string());
2510 out.push('|');
2511 }
2512 Value::Date(d) => {
2513 out.push('d');
2514 out.push_str(&d.to_string());
2515 out.push('|');
2516 }
2517 Value::Timestamp(t) => {
2518 out.push('t');
2519 out.push_str(&t.to_string());
2520 out.push('|');
2521 }
2522 Value::Interval { months, micros } => {
2523 out.push('i');
2524 out.push_str(&months.to_string());
2525 out.push('m');
2526 out.push_str(µs.to_string());
2527 out.push('|');
2528 }
2529 Value::Json(s) => {
2530 out.push('j');
2531 out.push_str(s);
2532 out.push('|');
2533 }
2534 _ => {
2539 out.push('?');
2540 out.push_str(&format!("{v:?}"));
2541 out.push('|');
2542 }
2543 }
2544}
2545
2546pub(crate) fn encode_key_refs(vals: &[&Value]) -> String {
2549 let mut out = String::new();
2550 for v in vals {
2551 encode_one(&mut out, v);
2552 }
2553 out
2554}
2555
2556pub(crate) fn encode_key_refs_into(vals: &[&Value], out: &mut String) {
2562 out.clear();
2563 for v in vals {
2564 encode_one(out, v);
2565 }
2566}
2567
2568pub(crate) fn encode_key(vals: &[Value]) -> String {
2569 let mut out = String::new();
2570 for v in vals {
2571 encode_one(&mut out, v);
2572 }
2573 out
2574}
2575
2576#[allow(clippy::cast_precision_loss)]
2577fn value_cmp(a: &Value, b: &Value) -> core::cmp::Ordering {
2578 use core::cmp::Ordering::Equal;
2579 match (a, b) {
2580 (Value::Null, Value::Null) => Equal,
2581 (Value::Null, _) => core::cmp::Ordering::Greater, (_, Value::Null) => core::cmp::Ordering::Less,
2583 (Value::Int(x), Value::Int(y)) => x.cmp(y),
2584 (Value::BigInt(x), Value::BigInt(y)) => x.cmp(y),
2585 (Value::Int(x), Value::BigInt(y)) => i64::from(*x).cmp(y),
2586 (Value::BigInt(x), Value::Int(y)) => x.cmp(&i64::from(*y)),
2587 (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(Equal),
2588 (Value::Int(x), Value::Float(y)) => f64::from(*x).partial_cmp(y).unwrap_or(Equal),
2589 (Value::Float(x), Value::Int(y)) => x.partial_cmp(&f64::from(*y)).unwrap_or(Equal),
2590 (Value::BigInt(x), Value::Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Equal),
2591 (Value::Float(x), Value::BigInt(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Equal),
2592 (Value::Text(x), Value::Text(y)) => x.cmp(y),
2593 (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
2594 _ => Equal,
2595 }
2596}