spg_engine/aggregate.rs
1//! Aggregate executor.
2//!
3//! Handles `SELECT … <aggs> … [GROUP BY …]` queries. The planning strategy
4//! is straightforward:
5//!
6//! 1. Walk the SELECT (and ORDER BY) expressions to find every aggregate
7//! function call. Dedupe by AST equality and assign each `__agg_<i>`.
8//! 2. Same for every `GROUP BY` expression: assign `__grp_<j>`.
9//! 3. Stream the WHERE-filtered rows, group by the tuple of GROUP BY
10//! values, and update per-group aggregate state.
11//! 4. Materialise a synthetic per-group row containing
12//! `[__grp_0..__grp_K, __agg_0..__agg_N]` and rewrite the user's
13//! SELECT / ORDER BY expressions to reference those synthetic columns
14//! instead of the originals.
15//! 5. Evaluate the rewritten expressions against the synthetic schema and
16//! emit results.
17//!
18//! v1.8 implements `count(*)`, `count(expr)`, `sum`, `min`, `max`, `avg`.
19//! NULL semantics follow PG: aggregates skip NULL inputs (except
20//! `count(*)`, which counts rows). `sum(int)` widens to `BigInt`;
21//! `avg(int|bigint)` returns `Float`.
22
23use alloc::borrow::Cow;
24use alloc::boxed::Box;
25use alloc::collections::BTreeSet;
26use alloc::format;
27use alloc::string::{String, ToString};
28use alloc::vec::Vec;
29
30use spg_sql::ast::{Expr, SelectItem, SelectStatement};
31use spg_storage::{ColumnSchema, DataType, Row, Value};
32
33use crate::eval::{self, EvalContext, EvalError};
34use crate::join::RowRef;
35
36/// True if this statement should go through the aggregate path.
37pub fn uses_aggregate(stmt: &SelectStatement) -> bool {
38 if stmt.group_by.is_some() || stmt.having.is_some() {
39 return true;
40 }
41 for item in &stmt.items {
42 if let SelectItem::Expr { expr, .. } = item
43 && contains_aggregate(expr)
44 {
45 return true;
46 }
47 }
48 for o in &stmt.order_by {
49 if contains_aggregate(&o.expr) {
50 return true;
51 }
52 }
53 if let Some(h) = &stmt.having
54 && contains_aggregate(h)
55 {
56 return true;
57 }
58 false
59}
60
61pub fn contains_aggregate(e: &Expr) -> bool {
62 match e {
63 Expr::FunctionCall { name, args } => {
64 is_aggregate_name(name) || args.iter().any(contains_aggregate)
65 }
66 Expr::AggregateOrdered { .. } => true,
67 Expr::Binary { lhs, rhs, .. } => contains_aggregate(lhs) || contains_aggregate(rhs),
68 Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
69 contains_aggregate(expr)
70 }
71 Expr::Like { expr, pattern, .. } => contains_aggregate(expr) || contains_aggregate(pattern),
72 Expr::Extract { source, .. } => contains_aggregate(source),
73 // v4.10 subqueries + v4.12 window functions / Literal /
74 // Column — all non-aggregate leaves from the regular
75 // aggregate planner's POV. Window-bearing projections are
76 // routed to exec_select_with_window before this runs.
77 Expr::ScalarSubquery(_)
78 | Expr::Exists { .. }
79 | Expr::InSubquery { .. }
80 | Expr::WindowFunction { .. }
81 | Expr::Literal(_)
82 | Expr::Placeholder(_)
83 | Expr::Column(_) => false,
84 // v7.10.10 — recurse into array constructor / subscript /
85 // ANY/ALL children. Aggregates inside `ARRAY[SUM(x)]` are
86 // valid PG and must be detected here.
87 Expr::Array(items) => items.iter().any(contains_aggregate),
88 Expr::ArraySubscript { target, index } => {
89 contains_aggregate(target) || contains_aggregate(index)
90 }
91 Expr::AnyAll { expr, array, .. } => contains_aggregate(expr) || contains_aggregate(array),
92 Expr::InList { expr, list, .. } => {
93 contains_aggregate(expr) || list.iter().any(contains_aggregate)
94 }
95 // v7.13.0 — CASE WHEN … END. Recurse into operand,
96 // every (WHEN, THEN) pair, and the ELSE branch.
97 Expr::Case {
98 operand,
99 branches,
100 else_branch,
101 } => {
102 operand.as_deref().is_some_and(contains_aggregate)
103 || branches
104 .iter()
105 .any(|(w, t)| contains_aggregate(w) || contains_aggregate(t))
106 || else_branch.as_deref().is_some_and(contains_aggregate)
107 }
108 }
109}
110
111pub fn is_aggregate_name(name: &str) -> bool {
112 matches!(
113 name.to_ascii_lowercase().as_str(),
114 "count"
115 | "count_star"
116 | "sum"
117 | "min"
118 | "max"
119 | "avg"
120 // v7.17.0 — variadic / collection aggregates. ORM
121 // reports (Hibernate / Rails / Django) emit these in
122 // GROUP BY rollups; pre-7.17 SPG hit "unknown
123 // aggregate".
124 | "string_agg"
125 | "array_agg"
126 // v7.17.0 — boolean aggregates. `every` is SQL-standard
127 // alias for `bool_and`.
128 | "bool_and"
129 | "bool_or"
130 | "every"
131 // v7.32 (round-29) — statistical aggregates (every BI /
132 // dashboard emits these in rollups).
133 | "stddev" | "stddev_samp" | "stddev_pop"
134 | "variance" | "var_samp" | "var_pop"
135 // v7.32 (round-29) — bitwise aggregates.
136 | "bit_and" | "bit_or" | "bit_xor"
137 // v7.32 (round-29) — ordered-set aggregates (used with
138 // `WITHIN GROUP (ORDER BY …)`).
139 | "percentile_cont" | "percentile_disc" | "mode"
140 // v7.32 (round-29) — hypothetical-set aggregates (also
141 // `WITHIN GROUP`): the rank the direct args WOULD have.
142 | "rank" | "dense_rank" | "percent_rank" | "cume_dist"
143 // v7.32 (round-29) — two-argument regression family.
144 | "covar_pop" | "covar_samp" | "corr"
145 | "regr_count" | "regr_avgx" | "regr_avgy" | "regr_slope"
146 | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy"
147 // v7.32 (round-29) — JSON aggregates.
148 | "json_agg" | "jsonb_agg" | "json_object_agg" | "jsonb_object_agg"
149 )
150}
151
152/// v7.32 (round-29) — two-argument regression aggregates `f(Y, X)`.
153fn is_regression_name(name: &str) -> bool {
154 matches!(
155 name,
156 "covar_pop"
157 | "covar_samp"
158 | "corr"
159 | "regr_count"
160 | "regr_avgx"
161 | "regr_avgy"
162 | "regr_slope"
163 | "regr_intercept"
164 | "regr_r2"
165 | "regr_sxx"
166 | "regr_syy"
167 | "regr_sxy"
168 )
169}
170
171/// v7.32 (round-29) — aggregates that consume a second positional
172/// argument: `string_agg(v, sep)`, the regression family `f(Y, X)`, and
173/// `json_object_agg(key, value)`.
174fn agg_uses_second_arg(name: &str) -> bool {
175 name == "string_agg"
176 || name == "json_object_agg"
177 || name == "jsonb_object_agg"
178 || is_regression_name(name)
179}
180
181/// v7.32 (round-29) — ordered-set aggregates: the value to aggregate
182/// comes from the `WITHIN GROUP (ORDER BY …)` sort spec, and any
183/// in-parens arguments are *direct* arguments (the percentile fraction).
184/// `mode()` takes no direct argument.
185pub fn is_ordered_set_name(name: &str) -> bool {
186 // v7.32 — `eq_ignore_ascii_case` instead of `to_ascii_lowercase()`:
187 // these classifiers run in the aggregate row/group loop, where the
188 // old per-call `String` allocation showed up as ~16% of the inbox's
189 // aggregate path in a sampled profile (the names are constant).
190 ["percentile_cont", "percentile_disc", "mode"]
191 .iter()
192 .any(|k| name.eq_ignore_ascii_case(k))
193}
194
195/// v7.32 (round-29) — hypothetical-set aggregates: `rank(args) WITHIN
196/// GROUP (ORDER BY …)` and friends compute the rank the hypothetical
197/// row would have. Like ordered-set, the value stream comes from the
198/// sort spec and the in-parens args are direct (the hypothetical row).
199pub fn is_hypothetical_set_name(name: &str) -> bool {
200 ["rank", "dense_rank", "percent_rank", "cume_dist"]
201 .iter()
202 .any(|k| name.eq_ignore_ascii_case(k))
203}
204
205/// v7.32 (round-29) — every aggregate that takes its value stream from
206/// a `WITHIN GROUP (ORDER BY …)` clause (ordered-set + hypothetical-set).
207pub fn is_within_group_name(name: &str) -> bool {
208 is_ordered_set_name(name) || is_hypothetical_set_name(name)
209}
210
211/// Per-aggregate running state.
212#[derive(Debug, Default, Clone)]
213struct AggState {
214 count: i64,
215 sum_int: i64,
216 sum_float: f64,
217 extreme: Option<Value>,
218 use_float: bool,
219 /// v7.17.0 — running collection for string_agg / array_agg.
220 /// Each entry is one row's contribution (NULL preserved as
221 /// `Value::Null`; string_agg's finalize step drops them, but
222 /// array_agg keeps them). Pushing in insertion order matches
223 /// PG behaviour when no `ORDER BY` is given inside the
224 /// aggregate call.
225 items: Vec<Value>,
226 /// v7.25 (round-17) — per-group dedupe set for DISTINCT
227 /// aggregates (encoded values; NULLs never reach it because
228 /// the caller's skip runs after the per-aggregate NULL rules).
229 seen: BTreeSet<String>,
230 /// v7.24 (round-16 A) — per-item ORDER BY key tuples, parallel
231 /// to `items` (pushed under the same skip/keep conditions).
232 /// Empty when the aggregate carries no internal ordering.
233 item_keys: Vec<Vec<Value>>,
234 /// v7.17.0 — captured separator for string_agg. PG accepts a
235 /// non-constant separator expression but in practice every
236 /// caller passes a literal; the engine snapshots the last
237 /// non-NULL text it sees, which matches PG's "use the latest
238 /// row's value" behaviour.
239 separator: Option<String>,
240 /// v7.17.0 — running boolean accumulator for bool_and /
241 /// bool_or / every. `None` until the first non-NULL input;
242 /// at finalize None → SQL NULL.
243 bool_acc: Option<bool>,
244 /// v7.32 (round-29) — sum of squares for the variance / stddev
245 /// family (`sum_float` carries the running sum; `count` the n).
246 sum_sq: f64,
247 /// v7.32 (round-29) — running accumulator for bit_and / bit_or /
248 /// bit_xor. `None` until the first non-NULL input → SQL NULL.
249 bit_acc: Option<i64>,
250 /// v7.32 (round-29) — two-argument regression family
251 /// (`covar_*` / `corr` / `regr_*`), PG arg order `f(Y, X)`. Only
252 /// rows where BOTH inputs are non-NULL contribute (`count` is the
253 /// paired n, independent of the single-arg `sum_*`).
254 reg_n: i64,
255 reg_sx: f64,
256 reg_sy: f64,
257 reg_sxx: f64,
258 reg_syy: f64,
259 reg_sxy: f64,
260 /// v7.32 (round-29) — second value stream for `json_object_agg`
261 /// (`items` holds the keys, `aux_items` the values).
262 aux_items: Vec<Value>,
263 /// v7.33 (array_agg argmax) — for a `first_ordered` spec
264 /// (`(array_agg(x ORDER BY y))[1]`), the running first-by-order
265 /// (sort-key tuple, value). Replaced only when a new row's key sorts
266 /// strictly before the current best (ties keep the earliest row, =
267 /// the stable-sort `[1]`). No items/item_keys array is built.
268 first_best: Option<(Vec<Value>, Value)>,
269}
270
271#[derive(Debug, Clone)]
272struct AggSpec {
273 name: String, // lowercased
274 /// First argument (value expression) for every aggregate
275 /// except `count(*)`. `None` for `count_star`.
276 arg: Option<Expr>,
277 /// v7.17.0 — second argument. Only `string_agg(value, sep)`
278 /// uses it today. `None` for every other aggregate (or for
279 /// `array_agg`, which is single-arg). Carried in the spec so
280 /// per-row evaluation can re-use the same separator
281 /// expression across calls.
282 arg2: Option<Expr>,
283 /// v7.25 (round-17) — `COUNT(DISTINCT x)` & friends: dedupe
284 /// the input stream per group before accumulation.
285 distinct: bool,
286 /// v7.24 (round-16 A) — aggregate-internal ORDER BY keys
287 /// (`array_agg(x ORDER BY y DESC NULLS LAST)`). Empty for the
288 /// plain form. Only the collection aggregates honour it;
289 /// other aggregates are order-insensitive and ignore it (PG
290 /// accepts the syntax everywhere too).
291 order_by: Vec<spg_sql::ast::OrderBy>,
292 /// v7.32 (round-29) — `FILTER (WHERE cond)`: a per-row predicate
293 /// evaluated against the source row before accumulation. A row
294 /// whose `cond` is not TRUE (false or NULL) is excluded from this
295 /// aggregate only. `None` for the unfiltered form.
296 filter: Option<Expr>,
297 /// v7.32 (round-29) — ordered-set aggregates only: the *direct*
298 /// argument (the percentile fraction for `percentile_cont/disc`).
299 /// PG requires it constant, so it is evaluated once. `None` for
300 /// `mode()` and for every non-ordered-set aggregate.
301 direct_arg: Option<Expr>,
302 /// v7.33 (array_agg argmax) — set when this spec came from
303 /// `(array_agg(x ORDER BY y))[1]`: accumulate only the first-by-order
304 /// element (a running argmax/argmin) and finalise to that scalar
305 /// value, instead of collecting + sorting + materialising the whole
306 /// per-group array just to take element 1. Returns the element type,
307 /// not the array type.
308 first_ordered: bool,
309}
310
311/// Output of running the aggregate path. Schema describes one row per
312/// group; rows are not yet ORDER BY-sorted (caller does it).
313#[derive(Debug)]
314pub struct AggResult {
315 pub columns: Vec<ColumnSchema>,
316 pub rows: Vec<Row>,
317 /// v7.31 (perf — PG lesson #1, post-LIMIT subquery projection):
318 /// select-list items whose rewritten expr carries a subquery and
319 /// is referenced by neither ORDER BY nor HAVING. Their output
320 /// cells hold NULL placeholders; the caller truncates to
321 /// LIMIT+OFFSET first and only then evaluates these for the
322 /// surviving rows (PG runs the same shape with SubPlan loops=50
323 /// instead of loops=24000). `(output_col, rewritten_expr)`.
324 pub deferred: Vec<(usize, Expr)>,
325 /// Synthetic group rows aligned 1:1 with `rows`; populated only
326 /// when `deferred` is non-empty.
327 pub synth_rows: Vec<Row>,
328 /// Schema the deferred exprs evaluate against.
329 pub synth_schema: Vec<ColumnSchema>,
330}
331
332/// Execute aggregate logic against an already-WHERE-filtered iterator of
333/// rows. `table_alias` is the alias accepted by column resolution.
334#[allow(clippy::too_many_lines)]
335/// v7.25.2 (round-19 A) — caller-injected evaluator for synth-row
336/// expressions that still carry subquery nodes after the rewrite
337/// (correlated subqueries in the select list / HAVING / aggregate
338/// ORDER BY of a GROUP BY query). The engine passes its
339/// correlated-aware evaluator; pure-library callers pass None and
340/// surviving subqueries keep erroring loudly.
341pub type CorrelatedEval<'a> = &'a dyn Fn(&Expr, &Row, &EvalContext<'_>) -> Result<Value, EvalError>;
342
343/// Output of the per-group projection stage (`project_groups`): the
344/// output schema, the projected rows, the synth rows kept alongside
345/// them for post-LIMIT deferred evaluation, the deferred subquery
346/// items, and the rewritten ORDER BY exprs (shared with the sort).
347struct Projection {
348 columns: Vec<ColumnSchema>,
349 out_rows: Vec<Row>,
350 kept_synth: Vec<Row>,
351 deferred: Vec<(usize, Expr)>,
352 order_rewritten: Vec<Expr>,
353}
354
355/// v7.35.0 — detect the `SELECT COUNT(*) FROM … [WHERE …]` shape
356/// (single item, no GROUP BY / HAVING / ORDER BY / DISTINCT /
357/// LIMIT WITH TIES / FILTER / window). For this shape the answer
358/// is exactly `rows.len()` as `BigInt`, no group state needed.
359/// Returns `None` for any deviation so the caller's full pipeline
360/// runs verbatim.
361fn try_pure_count_star_short_circuit(
362 stmt: &SelectStatement,
363 rows: &[RowRef<'_>],
364) -> Option<AggResult> {
365 if stmt.distinct
366 || stmt.limit_with_ties
367 || stmt.group_by.is_some()
368 || stmt.having.is_some()
369 || !stmt.order_by.is_empty()
370 {
371 return None;
372 }
373 if stmt.items.len() != 1 {
374 return None;
375 }
376 let SelectItem::Expr { expr, alias } = &stmt.items[0] else {
377 return None;
378 };
379 let Expr::FunctionCall { name, args } = expr else {
380 return None;
381 };
382 if !name.eq_ignore_ascii_case("count_star") || !args.is_empty() {
383 return None;
384 }
385 let col_name = alias.clone().unwrap_or_else(|| "count".to_string());
386 let count = i64::try_from(rows.len()).unwrap_or(i64::MAX);
387 Some(AggResult {
388 columns: alloc::vec![ColumnSchema::new(col_name, DataType::BigInt, false)],
389 rows: alloc::vec![Row::new(alloc::vec![Value::BigInt(count)])],
390 deferred: Vec::new(),
391 synth_rows: Vec::new(),
392 synth_schema: Vec::new(),
393 })
394}
395
396pub(crate) fn run(
397 stmt: &SelectStatement,
398 rows: &[RowRef<'_>],
399 schema_cols: &[ColumnSchema],
400 table_alias: Option<&str>,
401 correlated_eval: Option<CorrelatedEval<'_>>,
402) -> Result<AggResult, EvalError> {
403 // v7.35.0 — pure `SELECT COUNT(*) FROM … WHERE …` short-circuit.
404 // The caller already filtered rows by WHERE (we run on the
405 // post-WHERE survivor set), so for the canonical pure-COUNT(*)
406 // shape (no GROUP BY / HAVING / ORDER BY / DISTINCT / FILTER /
407 // window) the answer is simply `rows.len()`. The four-stage
408 // aggregate pipeline below (accumulate_groups → build_synth_schema
409 // → finalize_synth_rows → project_groups) collapses to a single
410 // BigInt cell when there's a single group, but each stage still
411 // pays its own allocation tax — group state map, synth schema
412 // vec, finalize loop. `exists_in_60` (mailrs prod #4 baseline)
413 // is exactly this shape on a 25 k-row JOIN.
414 if let Some(short) = try_pure_count_star_short_circuit(stmt, rows) {
415 return Ok(short);
416 }
417 let group_exprs: Vec<Expr> = stmt.group_by.clone().unwrap_or_default();
418
419 // Collect aggregate sub-expressions across items + order_by.
420 let mut agg_specs: Vec<AggSpec> = Vec::new();
421 for item in &stmt.items {
422 if let SelectItem::Expr { expr, .. } = item {
423 collect_aggregates(expr, &mut agg_specs);
424 }
425 }
426 for o in &stmt.order_by {
427 collect_aggregates(&o.expr, &mut agg_specs);
428 }
429 if let Some(h) = &stmt.having {
430 collect_aggregates(h, &mut agg_specs);
431 }
432 // v7.17.0 — arity validation. The collector tolerates an
433 // arbitrary positional-arg count; here we enforce the
434 // per-aggregate contract so a malformed call (e.g.
435 // `array_agg()` or `string_agg(x)`) surfaces as a SQL error
436 // rather than silently coercing to a degenerate aggregate.
437 validate_agg_arities(stmt, &agg_specs)?;
438 validate_within_group(&agg_specs)?;
439
440 // (1) Stream the WHERE-filtered rows into insertion-ordered group state.
441 let order = accumulate_groups(
442 rows,
443 &group_exprs,
444 &agg_specs,
445 schema_cols,
446 table_alias,
447 correlated_eval,
448 )?;
449
450 // (2) Build the synthetic per-group schema and finalise each group's row.
451 let synth_schema =
452 build_synth_schema(rows, &group_exprs, &agg_specs, schema_cols, table_alias)?;
453 let synth_rows = finalize_synth_rows(
454 &order,
455 &agg_specs,
456 &synth_schema,
457 rows,
458 schema_cols,
459 table_alias,
460 )?;
461
462 // (3) Rewrite the user's expressions, filter groups by HAVING and project.
463 let Projection {
464 columns,
465 mut out_rows,
466 mut kept_synth,
467 deferred,
468 order_rewritten,
469 } = project_groups(
470 synth_rows,
471 stmt,
472 &group_exprs,
473 &agg_specs,
474 &synth_schema,
475 correlated_eval,
476 )?;
477
478 // (4) ORDER BY on the aggregated output (the caller applies LIMIT).
479 if !stmt.order_by.is_empty() {
480 let (sorted_synth, sorted_out) = sort_synth_by_order_by(
481 &synth_schema,
482 &stmt.order_by,
483 &order_rewritten,
484 kept_synth,
485 out_rows,
486 correlated_eval,
487 )?;
488 kept_synth = sorted_synth;
489 out_rows = sorted_out;
490 }
491
492 let (synth_rows_out, synth_schema_out) = if deferred.is_empty() {
493 (Vec::new(), Vec::new())
494 } else {
495 (kept_synth, synth_schema.clone())
496 };
497 Ok(AggResult {
498 columns,
499 rows: out_rows,
500 deferred,
501 synth_rows: synth_rows_out,
502 synth_schema: synth_schema_out,
503 })
504}
505
506/// v7.32 (round-29) — validate the structural requirements of WITHIN
507/// GROUP (ordered-set / hypothetical-set) aggregates up front, so a
508/// malformed call surfaces as a SQL error rather than a silently
509/// degenerate aggregate.
510fn validate_within_group(agg_specs: &[AggSpec]) -> Result<(), EvalError> {
511 // v7.32 (round-29) — WITHIN GROUP aggregates require the clause (PG
512 // raises a hard error otherwise rather than silently degrading), and
513 // SPG supports the single-sort-key form only.
514 for spec in agg_specs {
515 if is_within_group_name(&spec.name) {
516 if spec.order_by.is_empty() {
517 return Err(EvalError::TypeMismatch {
518 detail: format!("{}() requires WITHIN GROUP (ORDER BY …)", spec.name),
519 });
520 }
521 // mode() is the only WITHIN GROUP aggregate with no direct
522 // argument; the rest carry one (percentile fraction /
523 // hypothetical value).
524 if spec.name != "mode" && spec.direct_arg.is_none() {
525 return Err(EvalError::TypeMismatch {
526 detail: format!("{}() requires a direct argument", spec.name),
527 });
528 }
529 // Multi-key WITHIN GROUP (multiple sort keys / hypothetical
530 // args) is not supported yet — error loudly instead of
531 // silently using only the first key.
532 if spec.order_by.len() > 1 {
533 return Err(EvalError::TypeMismatch {
534 detail: format!(
535 "{}() with multiple WITHIN GROUP sort keys is not supported yet",
536 spec.name
537 ),
538 });
539 }
540 }
541 }
542 Ok(())
543}
544
545/// (1) Stream the WHERE-filtered rows, group by the GROUP BY value
546/// tuple, and update per-group aggregate state. Returns the groups in
547/// insertion order. See `run` for the bind-once fast path rationale.
548#[allow(clippy::too_many_lines, clippy::type_complexity)]
549fn accumulate_groups(
550 rows: &[RowRef<'_>],
551 group_exprs: &[Expr],
552 agg_specs: &[AggSpec],
553 schema_cols: &[ColumnSchema],
554 table_alias: Option<&str>,
555 correlated_eval: Option<CorrelatedEval<'_>>,
556) -> Result<Vec<(Vec<Value>, Vec<AggState>)>, EvalError> {
557 let ctx = EvalContext::new(schema_cols, table_alias);
558 // Map group key (vec of values, encoded as canonical string) -> group state.
559 // v7.32 (architecture v2, P2b) — insertion-ordered group state in
560 // a Vec; the hash map only maps key → index. Removes the parallel
561 // `key_order: Vec<String>` (a second per-group key clone) and the
562 // per-group re-probe `groups[k]` at finalize (24k hash lookups for
563 // the inbox shape). The map owns its key once on vacant insert.
564 let mut order: Vec<(Vec<Value>, Vec<AggState>)> = Vec::new();
565 let mut groups: hashbrown::HashMap<String, usize> = hashbrown::HashMap::new();
566 // When there are no GROUP BY exprs *and* there is at least one aggregate,
567 // every row collapses into a single anonymous group keyed by "".
568 if rows.is_empty() && group_exprs.is_empty() {
569 // Single empty-aggregate group: count=0, sum=0, max=NULL, etc.
570 // No rows follow, so the map is never probed — seed `order` only.
571 let init: Vec<AggState> = (0..agg_specs.len()).map(|_| AggState::default()).collect();
572 order.push((Vec::new(), init));
573 }
574
575 // v7.30 (perf campaign) - hoist the per-row work that doesn't
576 // depend on the row: which group exprs need collation folding
577 // (none, for most queries - the old code cloned the whole
578 // group_vals vec per row just in case).
579 // v7.30 (perf campaign) - the no-tax row loop. When a group
580 // expr or an aggregate argument is a bare column reference
581 // (the overwhelmingly common shape), bind its position ONCE
582 // and read row cells by offset in the loop - no per-row tree
583 // walk, no owned-Value clone out of resolve_column. Anything
584 // more complex keeps the eval path.
585 let col_pos = |e: &Expr| -> Option<usize> {
586 // Qualified references only: the bare-name resolver carries
587 // alias/ambiguity logic the bind-once path must not fork.
588 if let Expr::Column(c) = e
589 && c.qualifier.is_some()
590 {
591 eval::find_column_pos(c, &ctx)
592 } else {
593 None
594 }
595 };
596 let group_pos: Vec<Option<usize>> = group_exprs.iter().map(col_pos).collect();
597 let all_groups_bound = group_pos.iter().all(Option::is_some);
598 let arg_pos: Vec<Option<usize>> = agg_specs
599 .iter()
600 .map(|spec| spec.arg.as_ref().and_then(|e| col_pos(e)))
601 .collect();
602 // v7.33 (array_agg perf) — bound positions for each spec's internal
603 // ORDER BY keys, so an ordered aggregate (`array_agg(x ORDER BY y)`)
604 // reads the sort key by reference (RowRef::get) instead of
605 // materialising the whole combined join row per input row just to
606 // eval one bound column. Mirrors arg_pos. On the inbox shape this
607 // turned 24k full-row (~1 KB each) clones into 24k single-cell reads.
608 let order_pos: Vec<Vec<Option<usize>>> = agg_specs
609 .iter()
610 .map(|spec| spec.order_by.iter().map(|o| col_pos(&o.expr)).collect())
611 .collect();
612 // Does any spec need the fully-materialised row in the bound fast
613 // path — a FILTER, a non-bound value arg, a second arg, or a non-bound
614 // ORDER key? When false (every aggregate arg/key is a bound column —
615 // the inbox shape) the bound fast path never materialises a row.
616 let needs_mat = agg_specs.iter().enumerate().any(|(i, s)| {
617 s.filter.is_some()
618 || (s.arg.is_some() && arg_pos[i].is_none())
619 || s.arg2.is_some()
620 || order_pos[i].iter().any(Option::is_none)
621 });
622 let ci_positions: Vec<usize> = group_exprs
623 .iter()
624 .enumerate()
625 .filter(|(_, g)| {
626 matches!(
627 eval::column_collation(g, &ctx),
628 Some(spg_storage::Collation::CaseInsensitive)
629 )
630 })
631 .map(|(i, _)| i)
632 .collect();
633 // v7.31 (perf 3e) — per-row scratch buffers. The fast path used
634 // to allocate a key String (and a refs Vec) for EVERY row just
635 // to probe the group map; hits — the overwhelming case — now
636 // touch the allocator zero times.
637 let mut keybuf_s = String::new();
638 let mut dkeybuf = String::new();
639 let mut refs: Vec<&Value> = Vec::with_capacity(group_pos.len());
640 // v7.32 (round-31) — an aggregate's argument / FILTER / second arg /
641 // ORDER key may itself be a *correlated* subquery, e.g.
642 // `MAX((SELECT i.v FROM inner i WHERE i.fk = o.id))`. A non-correlated
643 // subquery is pre-resolved to a literal before this loop, but a
644 // correlated one survives as a subquery node and must be evaluated per
645 // outer row through the correlated evaluator — the same hook the
646 // select-list / HAVING / ORDER finalisers already use below. Plain
647 // `eval_expr` would hit "subquery reached row eval".
648 //
649 // The `any_agg_subquery` gate is computed once here so the common case
650 // (no subquery anywhere in the aggregate args — including every hot
651 // scan/group aggregate) short-circuits before the per-row
652 // `expr_has_subquery` walk: `eval_arg` is then exactly `eval_expr`.
653 let any_agg_subquery = correlated_eval.is_some()
654 && agg_specs.iter().any(|s| {
655 s.filter
656 .as_ref()
657 .is_some_and(|e| crate::expr_has_subquery(e))
658 || s.arg.as_ref().is_some_and(|e| crate::expr_has_subquery(e))
659 || s.arg2.as_ref().is_some_and(|e| crate::expr_has_subquery(e))
660 || s.order_by.iter().any(|o| crate::expr_has_subquery(&o.expr))
661 });
662 let eval_arg = |e: &Expr, r: &Row, c: &EvalContext<'_>| -> Result<Value, EvalError> {
663 match correlated_eval {
664 Some(f) if any_agg_subquery && crate::expr_has_subquery(e) => f(e, r, c),
665 _ => eval::eval_expr(e, r, c),
666 }
667 };
668 for row in rows {
669 // Fast key: bound positions + no ci folding -> encode
670 // straight from borrowed cells; group_vals materialise
671 // only when the group is NEW.
672 if all_groups_bound && ci_positions.is_empty() && !group_exprs.is_empty() {
673 refs.clear();
674 refs.extend(
675 group_pos
676 .iter()
677 .map(|p| row.get(p.unwrap()).unwrap_or(&Value::Null)),
678 );
679 encode_key_refs_into(&refs, &mut keybuf_s);
680 let idx = match groups.get(keybuf_s.as_str()) {
681 Some(&i) => i,
682 None => {
683 let i = order.len();
684 let init: Vec<AggState> =
685 (0..agg_specs.len()).map(|_| AggState::default()).collect();
686 let owned: Vec<Value> = refs.iter().map(|v| (*v).clone()).collect();
687 order.push((owned, init));
688 groups.insert(keybuf_s.clone(), i);
689 i
690 }
691 };
692 let entry = &mut order[idx];
693 // v7.33 (array_agg perf) — materialise the combined row AT
694 // MOST once per input row, and only when a spec actually
695 // needs the eval path (FILTER / non-bound arg / arg2 / non-
696 // bound ORDER key). Bound args and bound ORDER keys read
697 // cells by reference below, so the inbox shape (all bound)
698 // never materialises — killing the per-row ~1 KB clone that
699 // dominated the ordered-aggregate cost.
700 let mat: Option<Cow<'_, Row>> = if needs_mat { Some(row.as_row()) } else { None };
701 for (i, spec) in agg_specs.iter().enumerate() {
702 // v7.32 (round-29) — FILTER (WHERE cond): exclude rows
703 // where cond is not TRUE before they reach this
704 // aggregate's accumulator (and before DISTINCT dedup).
705 if let Some(f) = &spec.filter
706 && !matches!(
707 eval_arg(f, mat.as_deref().expect("needs_mat for FILTER"), &ctx)?,
708 Value::Bool(true)
709 )
710 {
711 continue;
712 }
713 let arg_owned: Value;
714 let arg_ref: &Value = match (&arg_pos[i], &spec.arg) {
715 (Some(p), _) => row.get(*p).unwrap_or(&Value::Null),
716 (None, None) => {
717 arg_owned = Value::Bool(true);
718 &arg_owned
719 }
720 (None, Some(e)) => {
721 arg_owned = eval_arg(
722 e,
723 mat.as_deref().expect("needs_mat for non-bound arg"),
724 &ctx,
725 )?;
726 &arg_owned
727 }
728 };
729 let arg2_val = match &spec.arg2 {
730 None => None,
731 Some(e) => Some(eval_arg(
732 e,
733 mat.as_deref().expect("needs_mat for arg2"),
734 &ctx,
735 )?),
736 };
737 let order_keys = if spec.order_by.is_empty() {
738 None
739 } else {
740 let mut keys = Vec::with_capacity(spec.order_by.len());
741 for (k, o) in spec.order_by.iter().enumerate() {
742 // Bound ORDER key → read the cell by reference; only
743 // a non-bound key falls to the materialised eval path.
744 keys.push(match order_pos[i][k] {
745 Some(p) => row.get(p).cloned().unwrap_or(Value::Null),
746 None => eval_arg(
747 &o.expr,
748 mat.as_deref().expect("needs_mat for non-bound ORDER key"),
749 &ctx,
750 )?,
751 });
752 }
753 Some(keys)
754 };
755 // v7.33 (array_agg argmax) — first_ordered: keep only the
756 // running first-by-order element (strict-less replacement
757 // = ties keep the earliest row, matching the stable-sort
758 // `[1]`), no array build.
759 if spec.first_ordered {
760 if let Some(keys) = order_keys {
761 let st = &mut entry.1[i];
762 let better = match &st.first_best {
763 None => true,
764 Some((bk, _)) => {
765 cmp_order_keys(&spec.order_by, &keys, bk)
766 == core::cmp::Ordering::Less
767 }
768 };
769 if better {
770 st.first_best = Some((keys, arg_ref.clone()));
771 }
772 }
773 continue;
774 }
775 if spec.distinct {
776 encode_key_refs_into(core::slice::from_ref(&arg_ref), &mut dkeybuf);
777 if entry.1[i].seen.contains(dkeybuf.as_str()) {
778 continue;
779 }
780 entry.1[i].seen.insert(dkeybuf.clone());
781 }
782 update_state(
783 &mut entry.1[i],
784 &spec.name,
785 arg_ref,
786 arg2_val.as_ref(),
787 order_keys,
788 )?;
789 }
790 continue;
791 }
792 // v7.32 (P4 increment 2) — eval (non-bound) path: present the
793 // row as a borrowed Row once (Owned → zero-cost borrow; a join
794 // tuple materialises here exactly once, never on the bound fast
795 // path above), then the original eval loop runs unchanged.
796 let row_materialised = row.as_row();
797 let row: &Row = &row_materialised;
798 let group_vals: Vec<Value> = group_exprs
799 .iter()
800 .map(|g| eval::eval_expr(g, row, &ctx))
801 .collect::<Result<_, _>>()?;
802 // v7.17.0 Phase 2.5b — case-insensitive group keying: fold
803 // only the ci columns, and only when any exist. Display
804 // value (`group_vals`) stays original — only the key folds.
805 let key = if ci_positions.is_empty() {
806 encode_key(&group_vals)
807 } else {
808 let mut key_vals = group_vals.clone();
809 for &i in &ci_positions {
810 if let Value::Text(s) = &key_vals[i] {
811 key_vals[i] = Value::Text(s.to_ascii_lowercase());
812 }
813 }
814 encode_key(&key_vals)
815 };
816 // Probe by index; the map owns the key once on vacant insert.
817 let idx = match groups.get(key.as_str()) {
818 Some(&i) => i,
819 None => {
820 let i = order.len();
821 let init: Vec<AggState> =
822 (0..agg_specs.len()).map(|_| AggState::default()).collect();
823 order.push((group_vals.clone(), init));
824 groups.insert(key, i);
825 i
826 }
827 };
828 let entry = &mut order[idx];
829 for (i, spec) in agg_specs.iter().enumerate() {
830 // v7.32 (round-29) — FILTER (WHERE cond): exclude rows where
831 // cond is not TRUE before accumulation (and before DISTINCT).
832 if let Some(f) = &spec.filter
833 && !matches!(eval_arg(f, row, &ctx)?, Value::Bool(true))
834 {
835 continue;
836 }
837 let arg_val = match &spec.arg {
838 None => Value::Bool(true), // count_star: sentinel non-null
839 Some(e) => eval_arg(e, row, &ctx)?,
840 };
841 // v7.17.0 — `string_agg(value, separator)` evaluates the
842 // separator per row but PG treats it as constant; we
843 // pass the per-row value into update_state so a future
844 // varying-separator caller still sees correct output,
845 // even though SPG (like PG) only uses the most recent.
846 let arg2_val = match &spec.arg2 {
847 None => None,
848 Some(e) => Some(eval_arg(e, row, &ctx)?),
849 };
850 // v7.24 (round-16 A) — aggregate-internal ORDER BY:
851 // evaluate the key tuple against the source row.
852 let order_keys = if spec.order_by.is_empty() {
853 None
854 } else {
855 let mut keys = Vec::with_capacity(spec.order_by.len());
856 for o in &spec.order_by {
857 keys.push(eval_arg(&o.expr, row, &ctx)?);
858 }
859 Some(keys)
860 };
861 // v7.33 (array_agg argmax) — first_ordered: keep the running
862 // first-by-order element only (mirrors the bound fast path).
863 if spec.first_ordered {
864 if let Some(keys) = order_keys {
865 let st = &mut entry.1[i];
866 let better = match &st.first_best {
867 None => true,
868 Some((bk, _)) => {
869 cmp_order_keys(&spec.order_by, &keys, bk) == core::cmp::Ordering::Less
870 }
871 };
872 if better {
873 st.first_best = Some((keys, arg_val.clone()));
874 }
875 }
876 continue;
877 }
878 // v7.25 (round-17) — DISTINCT: drop repeated inputs
879 // before they reach the accumulator. NULLs flow through
880 // (each aggregate's own NULL rule applies; PG also
881 // treats NULL as a single distinct value for array_agg).
882 if spec.distinct {
883 let key = encode_key(core::slice::from_ref(&arg_val));
884 if !entry.1[i].seen.insert(key) {
885 continue;
886 }
887 }
888 update_state(
889 &mut entry.1[i],
890 &spec.name,
891 &arg_val,
892 arg2_val.as_ref(),
893 order_keys,
894 )?;
895 }
896 }
897 Ok(order)
898}
899
900/// (2a) Build the synthetic per-group schema: `__grp_0..K` then
901/// `__agg_0..N`. Group types are probed from the first row; aggregate
902/// types from each spec.
903fn build_synth_schema(
904 rows: &[RowRef<'_>],
905 group_exprs: &[Expr],
906 agg_specs: &[AggSpec],
907 schema_cols: &[ColumnSchema],
908 table_alias: Option<&str>,
909) -> Result<Vec<ColumnSchema>, EvalError> {
910 let ctx = EvalContext::new(schema_cols, table_alias);
911 // Build synthetic schema: __grp_0..K then __agg_0..N.
912 let group_types: Vec<DataType> = if rows.is_empty() {
913 // Use Text as a safe stand-in — empty result means schema isn't
914 // observable. Avoids needing to evaluate group exprs on no row.
915 group_exprs.iter().map(|_| DataType::Text).collect()
916 } else {
917 let probe_row = rows[0].as_row();
918 let probe: &Row = &probe_row;
919 group_exprs
920 .iter()
921 .map(|g| {
922 eval::eval_expr(g, probe, &ctx).map(|v| v.data_type().unwrap_or(DataType::Text))
923 })
924 .collect::<Result<_, _>>()?
925 };
926 let agg_types: Vec<DataType> = agg_specs
927 .iter()
928 .map(|spec| infer_agg_type(spec, schema_cols))
929 .collect();
930 let mut synth_schema: Vec<ColumnSchema> = Vec::new();
931 for (i, ty) in group_types.iter().enumerate() {
932 synth_schema.push(ColumnSchema::new(format!("__grp_{i}"), *ty, true));
933 }
934 for (i, ty) in agg_types.iter().enumerate() {
935 synth_schema.push(ColumnSchema::new(format!("__agg_{i}"), *ty, true));
936 }
937 Ok(synth_schema)
938}
939
940/// (2b) Materialise one synthetic row per group (insertion order):
941/// apply each aggregate's internal ORDER BY, then finalise the running
942/// state into the group + aggregate cells.
943/// v7.33 — compare two aggregate-internal ORDER BY key tuples under the
944/// per-key DESC / NULLS directives. This is the exact comparator the
945/// finalize sort uses, factored out so the `first_ordered` argmax
946/// accumulator's "keep first" decision is provably identical to taking
947/// element `[1]` of the fully-sorted array.
948fn cmp_order_keys(
949 order_by: &[spg_sql::ast::OrderBy],
950 a: &[Value],
951 b: &[Value],
952) -> core::cmp::Ordering {
953 for (k, o) in order_by.iter().enumerate() {
954 let cmp = crate::order_by_value_cmp(o.desc, o.nulls_first, &a[k], &b[k]);
955 if cmp != core::cmp::Ordering::Equal {
956 return cmp;
957 }
958 }
959 core::cmp::Ordering::Equal
960}
961
962fn finalize_synth_rows(
963 order: &[(Vec<Value>, Vec<AggState>)],
964 agg_specs: &[AggSpec],
965 synth_schema: &[ColumnSchema],
966 rows: &[RowRef<'_>],
967 schema_cols: &[ColumnSchema],
968 table_alias: Option<&str>,
969) -> Result<Vec<Row>, EvalError> {
970 let ctx = EvalContext::new(schema_cols, table_alias);
971 // v7.32 (round-29) — ordered-set direct arguments (the percentile
972 // fraction) are constant per PG, so evaluate each once up front.
973 let direct_arg_vals: Vec<Option<Value>> = agg_specs
974 .iter()
975 .map(|spec| match (&spec.direct_arg, rows.first()) {
976 (Some(e), Some(r)) => eval::eval_expr(e, &r.as_row(), &ctx).map(Some),
977 _ => Ok(None),
978 })
979 .collect::<Result<_, _>>()?;
980
981 // Materialise synthetic rows (insertion order = `order`).
982 let mut synth_rows: Vec<Row> = Vec::new();
983 for (gvals, states) in order {
984 let mut values: Vec<Value> = Vec::with_capacity(synth_schema.len());
985 values.extend(gvals.iter().cloned());
986 for (i, st) in states.iter().enumerate() {
987 // v7.33 (array_agg argmax) — first_ordered: the running
988 // first-by-order value IS the result; no array build/sort.
989 if agg_specs[i].first_ordered {
990 values.push(
991 st.first_best
992 .as_ref()
993 .map_or(Value::Null, |(_, v)| v.clone()),
994 );
995 continue;
996 }
997 // v7.24 (round-16 A) — order the collected items per the
998 // aggregate-internal ORDER BY before finalize consumes
999 // them.
1000 let st_sorted;
1001 let st_final: &AggState =
1002 if !agg_specs[i].order_by.is_empty() && st.item_keys.len() == st.items.len() {
1003 let mut idx: Vec<usize> = (0..st.items.len()).collect();
1004 let ob = &agg_specs[i].order_by;
1005 idx.sort_by(|&x, &y| cmp_order_keys(ob, &st.item_keys[x], &st.item_keys[y]));
1006 let mut sorted = st.clone();
1007 sorted.items = idx.iter().map(|&j| st.items[j].clone()).collect();
1008 st_sorted = sorted;
1009 &st_sorted
1010 } else {
1011 st
1012 };
1013 // Ordered-set aggregates compute from the sorted items + the
1014 // direct fraction; everything else uses the running state.
1015 let v = if is_within_group_name(&agg_specs[i].name) {
1016 finalize_ordered_set(
1017 &agg_specs[i].name,
1018 st_final,
1019 direct_arg_vals[i].as_ref(),
1020 agg_specs[i].order_by.first(),
1021 )
1022 } else {
1023 finalize(&agg_specs[i].name, st_final)
1024 };
1025 values.push(v);
1026 }
1027 synth_rows.push(Row::new(values));
1028 }
1029 Ok(synth_rows)
1030}
1031
1032/// (3) Rewrite the user's SELECT items + HAVING to reference the
1033/// synthetic columns, filter groups by HAVING, and project each
1034/// surviving group into an output row. The synth rows ride alongside
1035/// (`kept_synth`) so post-LIMIT deferred subqueries can evaluate later.
1036#[allow(clippy::too_many_lines)]
1037fn project_groups(
1038 synth_rows: Vec<Row>,
1039 stmt: &SelectStatement,
1040 group_exprs: &[Expr],
1041 agg_specs: &[AggSpec],
1042 synth_schema: &[ColumnSchema],
1043 correlated_eval: Option<CorrelatedEval<'_>>,
1044) -> Result<Projection, EvalError> {
1045 // Rewrite the user's SELECT items + ORDER BY to reference synthetic
1046 // columns. After rewriting, every remaining `Expr::Column` must
1047 // resolve against the synthetic schema (i.e. must have been a GROUP
1048 // BY expression).
1049 let columns: Vec<ColumnSchema> = stmt
1050 .items
1051 .iter()
1052 .map(|item| match item {
1053 SelectItem::Wildcard => Err(EvalError::TypeMismatch {
1054 detail: "SELECT * with aggregates is not supported".into(),
1055 }),
1056 SelectItem::Expr { expr, alias } => {
1057 let rewritten = rewrite_expr(expr, group_exprs, agg_specs);
1058 let name = alias.clone().unwrap_or_else(|| expr.to_string());
1059 Ok(ColumnSchema::new(
1060 name,
1061 agg_or_group_type(&rewritten, synth_schema),
1062 true,
1063 ))
1064 }
1065 })
1066 .collect::<Result<_, _>>()?;
1067
1068 // Project per synthetic row. HAVING filters out groups *before*
1069 // we keep the projected row — same semantics as PG: HAVING runs
1070 // against the aggregated row (so `HAVING count(*) > 1` works) and
1071 // sees only group-by'd columns plus aggregate values.
1072 let synth_ctx = EvalContext::new(synth_schema, None);
1073 let having_rewritten = stmt
1074 .having
1075 .as_ref()
1076 .map(|h| rewrite_expr(h, group_exprs, agg_specs));
1077 // v7.30 (phase 3e-1) - rewrite SELECT items ONCE. This ran per
1078 // GROUP (23.5k x 9 items of AST cloning = ~48% of the inbox
1079 // query in sampled stacks); the rewrite is group-independent.
1080 // Stable addresses also let the per-expression subquery plans
1081 // (v7.29 3c) hit across groups instead of rebuilding.
1082 let items_rewritten: alloc::vec::Vec<Option<Expr>> = stmt
1083 .items
1084 .iter()
1085 .map(|item| match item {
1086 SelectItem::Expr { expr, .. } => Some(rewrite_expr(expr, group_exprs, agg_specs)),
1087 SelectItem::Wildcard => None,
1088 })
1089 .collect();
1090 // v7.31 (perf — PG lesson #1): subquery-bearing select items
1091 // deferred to post-LIMIT, when no sort/filter key can observe
1092 // them. ORDER BY rewrites are hoisted here so the safety check
1093 // and the sort below share one rewrite pass.
1094 let order_rewritten: Vec<Expr> = stmt
1095 .order_by
1096 .iter()
1097 .map(|o| rewrite_expr(&o.expr, group_exprs, agg_specs))
1098 .collect();
1099 let defer_enabled = correlated_eval.is_some()
1100 && !stmt.distinct
1101 && !having_rewritten
1102 .as_ref()
1103 .is_some_and(crate::expr_has_subquery)
1104 && !order_rewritten.iter().any(crate::expr_has_subquery);
1105 let deferred: Vec<(usize, Expr)> = if defer_enabled {
1106 items_rewritten
1107 .iter()
1108 .enumerate()
1109 .filter_map(|(i, r)| {
1110 r.as_ref()
1111 .filter(|e| crate::expr_has_subquery(e))
1112 .map(|e| (i, e.clone()))
1113 })
1114 .collect()
1115 } else {
1116 Vec::new()
1117 };
1118 // v7.32 (architecture v2, P2) — compile the per-group synth-row
1119 // expressions ONCE. The projection / HAVING here run per GROUP
1120 // (24k for the inbox shape) × per item; the rewritten exprs are
1121 // mostly `Column(__agg_N)` / `Column(__grp_K)` against the synth
1122 // schema — flat step programs, no tree walk per group.
1123 let having_compiled = having_rewritten
1124 .as_ref()
1125 .filter(|h| eval::fully_compilable(h))
1126 .map(|h| eval::compile_expr(h, &synth_ctx));
1127 let items_compiled: Vec<Option<eval::CompiledExpr>> = items_rewritten
1128 .iter()
1129 .enumerate()
1130 .map(|(i, r)| {
1131 r.as_ref()
1132 .filter(|e| !deferred.iter().any(|(c, _)| *c == i) && eval::fully_compilable(e))
1133 .map(|e| eval::compile_expr(e, &synth_ctx))
1134 })
1135 .collect();
1136 let mut kept_synth: Vec<Row> = Vec::new();
1137 let mut out_rows: Vec<Row> = Vec::new();
1138 let mut stack: Vec<Value> = Vec::new();
1139 for srow in synth_rows {
1140 if let Some(hc) = &having_compiled {
1141 let cond = eval::eval_compiled(hc, &srow, &synth_ctx, &mut stack)?;
1142 if !matches!(cond, Value::Bool(true)) {
1143 continue;
1144 }
1145 } else if let Some(h) = &having_rewritten {
1146 let cond = match correlated_eval {
1147 Some(f) if crate::expr_has_subquery(h) => f(h, &srow, &synth_ctx)?,
1148 _ => eval::eval_expr(h, &srow, &synth_ctx)?,
1149 };
1150 if !matches!(cond, Value::Bool(true)) {
1151 continue;
1152 }
1153 }
1154 let mut values: Vec<Value> = Vec::with_capacity(columns.len());
1155 for (i, rewritten) in items_rewritten.iter().enumerate() {
1156 let Some(rewritten) = rewritten else { continue };
1157 if deferred.iter().any(|(c, _)| *c == i) {
1158 values.push(Value::Null);
1159 continue;
1160 }
1161 values.push(if let Some(cc) = &items_compiled[i] {
1162 eval::eval_compiled(cc, &srow, &synth_ctx, &mut stack)?
1163 } else {
1164 match correlated_eval {
1165 Some(f) if crate::expr_has_subquery(rewritten) => {
1166 f(rewritten, &srow, &synth_ctx)?
1167 }
1168 _ => eval::eval_expr(rewritten, &srow, &synth_ctx)?,
1169 }
1170 });
1171 }
1172 kept_synth.push(srow);
1173 out_rows.push(Row::new(values));
1174 }
1175 Ok(Projection {
1176 columns,
1177 out_rows,
1178 kept_synth,
1179 deferred,
1180 order_rewritten,
1181 })
1182}
1183
1184/// (4) Sort the projected output by the rewritten ORDER BY keys. The
1185/// synth rows ride through the sort so deferred subqueries evaluate
1186/// against the surviving groups after the caller's LIMIT truncation.
1187fn sort_synth_by_order_by(
1188 synth_schema: &[ColumnSchema],
1189 order_by: &[spg_sql::ast::OrderBy],
1190 order_rewritten: &[Expr],
1191 mut kept_synth: Vec<Row>,
1192 mut out_rows: Vec<Row>,
1193 correlated_eval: Option<CorrelatedEval<'_>>,
1194) -> Result<(Vec<Row>, Vec<Row>), EvalError> {
1195 let synth_ctx = EvalContext::new(synth_schema, None);
1196 // v6.4.0 — multi-key ORDER BY on aggregate output. Each key
1197 // gets its own rewrite + per-key DESC flag. (Rewrites hoisted
1198 // above as `order_rewritten` — shared with the deferral
1199 // safety check.)
1200 let keys_meta: Vec<(bool, Option<bool>)> =
1201 order_by.iter().map(|o| (o.desc, o.nulls_first)).collect();
1202 // P2: compile order-by keys once (per-group sort keys are
1203 // the same `__agg_N` / `__grp_K` shape as the projection).
1204 let order_compiled: Vec<Option<eval::CompiledExpr>> = order_rewritten
1205 .iter()
1206 .map(|e| {
1207 Some(e)
1208 .filter(|e| eval::fully_compilable(e))
1209 .map(|e| eval::compile_expr(e, &synth_ctx))
1210 })
1211 .collect();
1212 // The synth row rides through the sort so deferred exprs can
1213 // evaluate against the surviving groups after the caller's
1214 // LIMIT truncation.
1215 let mut keystack: Vec<Value> = Vec::new();
1216 let mut tagged: Vec<(Vec<Value>, Row, Row)> = Vec::with_capacity(kept_synth.len());
1217 for (s, o) in kept_synth.into_iter().zip(out_rows) {
1218 let mut keys = Vec::with_capacity(order_rewritten.len());
1219 for (e, oc) in order_rewritten.iter().zip(&order_compiled) {
1220 keys.push(if let Some(oc) = oc {
1221 eval::eval_compiled(oc, &s, &synth_ctx, &mut keystack)?
1222 } else {
1223 match correlated_eval {
1224 Some(f) if crate::expr_has_subquery(e) => f(e, &s, &synth_ctx)?,
1225 _ => eval::eval_expr(e, &s, &synth_ctx)?,
1226 }
1227 });
1228 }
1229 tagged.push((keys, s, o));
1230 }
1231 tagged.sort_by(|a, b| {
1232 use core::cmp::Ordering;
1233 for (i, (ka, kb)) in a.0.iter().zip(b.0.iter()).enumerate() {
1234 let (desc, nf) = keys_meta[i];
1235 let cmp = crate::order_by_value_cmp(desc, nf, ka, kb);
1236 if cmp != Ordering::Equal {
1237 return cmp;
1238 }
1239 }
1240 Ordering::Equal
1241 });
1242 kept_synth = Vec::with_capacity(tagged.len());
1243 out_rows = Vec::with_capacity(tagged.len());
1244 for (_, s, o) in tagged {
1245 kept_synth.push(s);
1246 out_rows.push(o);
1247 }
1248 Ok((kept_synth, out_rows))
1249}
1250
1251/// v7.17.0 — walk the statement again to validate the positional
1252/// arity of every aggregate call site. Done after AST collection
1253/// rather than inside `collect_aggregates` so the collector stays
1254/// infallible; callers in `run()` can do a single early-error
1255/// exit before any per-row work.
1256fn validate_agg_arities(stmt: &SelectStatement, _specs: &[AggSpec]) -> Result<(), EvalError> {
1257 fn walk(e: &Expr) -> Result<(), EvalError> {
1258 if let Expr::FunctionCall { name, args } = e {
1259 let lower = name.to_ascii_lowercase();
1260 let expected: Option<usize> = match lower.as_str() {
1261 "count_star" => Some(0),
1262 "count" | "sum" | "avg" | "min" | "max" | "array_agg"
1263 // v7.17.0 — boolean aggregates also take exactly
1264 // one arg. `every` is an alias normalised inside
1265 // collect_aggregates / rewrite_expr.
1266 | "bool_and" | "bool_or" | "every"
1267 // v7.32 (round-29) — statistical + bitwise aggregates
1268 // + single-arg JSON aggregate.
1269 | "stddev" | "stddev_samp" | "stddev_pop"
1270 | "variance" | "var_samp" | "var_pop"
1271 | "bit_and" | "bit_or" | "bit_xor"
1272 | "json_agg" | "jsonb_agg" => Some(1),
1273 // v7.32 (round-29) — two-argument aggregates: string_agg,
1274 // the regression family f(Y, X), and json_object_agg.
1275 "string_agg"
1276 | "covar_pop" | "covar_samp" | "corr"
1277 | "regr_count" | "regr_avgx" | "regr_avgy" | "regr_slope"
1278 | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy"
1279 | "json_object_agg" | "jsonb_object_agg" => Some(2),
1280 _ => None,
1281 };
1282 if let Some(want) = expected
1283 && args.len() != want
1284 {
1285 return Err(EvalError::TypeMismatch {
1286 detail: alloc::format!("{lower}() takes {want} arg(s), got {}", args.len()),
1287 });
1288 }
1289 for a in args {
1290 walk(a)?;
1291 }
1292 } else if let Expr::Binary { lhs, rhs, .. } = e {
1293 walk(lhs)?;
1294 walk(rhs)?;
1295 } else if let Expr::Unary { expr, .. }
1296 | Expr::Cast { expr, .. }
1297 | Expr::IsNull { expr, .. } = e
1298 {
1299 walk(expr)?;
1300 }
1301 Ok(())
1302 }
1303 for item in &stmt.items {
1304 if let SelectItem::Expr { expr, .. } = item {
1305 walk(expr)?;
1306 }
1307 }
1308 for o in &stmt.order_by {
1309 walk(&o.expr)?;
1310 }
1311 if let Some(h) = &stmt.having {
1312 walk(h)?;
1313 }
1314 Ok(())
1315}
1316
1317/// v7.33 (array_agg argmax) — recognise `(array_agg(x ORDER BY y))[1]`,
1318/// the argmax/argmin idiom: a non-DISTINCT ordered `array_agg`
1319/// subscripted by the constant 1. Returns `(value_arg, order_by,
1320/// filter)` on a match. When matched, the whole per-group array build +
1321/// sort + materialise is replaced by a running first-by-order scalar
1322/// accumulator and the subscript node is consumed (replaced by the
1323/// synthetic column). collect_aggregates and rewrite_expr share this one
1324/// matcher so their `__agg_<i>` assignment stays in lockstep.
1325fn first_ordered_array_agg(e: &Expr) -> Option<(&Expr, &[spg_sql::ast::OrderBy], Option<&Expr>)> {
1326 let Expr::ArraySubscript { target, index } = e else {
1327 return None;
1328 };
1329 if !matches!(
1330 index.as_ref(),
1331 Expr::Literal(spg_sql::ast::Literal::Integer(1))
1332 ) {
1333 return None;
1334 }
1335 let Expr::AggregateOrdered {
1336 call,
1337 order_by,
1338 distinct,
1339 filter,
1340 } = target.as_ref()
1341 else {
1342 return None;
1343 };
1344 if *distinct || order_by.is_empty() {
1345 return None;
1346 }
1347 let Expr::FunctionCall { name, args } = call.as_ref() else {
1348 return None;
1349 };
1350 if !name.eq_ignore_ascii_case("array_agg") || args.len() != 1 {
1351 return None;
1352 }
1353 Some((&args[0], order_by, filter.as_deref()))
1354}
1355
1356fn collect_aggregates(e: &Expr, out: &mut Vec<AggSpec>) {
1357 match e {
1358 // v7.24 (round-16 A) — ordered aggregate: register the inner
1359 // call's spec with the ordering attached.
1360 Expr::AggregateOrdered {
1361 call,
1362 order_by,
1363 distinct,
1364 filter,
1365 } => {
1366 if let Expr::FunctionCall { name, args } = call.as_ref() {
1367 let lower = name.to_ascii_lowercase();
1368 if is_aggregate_name(&lower) {
1369 let canonical = if lower == "every" {
1370 "bool_and".to_string()
1371 } else {
1372 lower
1373 };
1374 // Ordered-set aggregates (`percentile_cont(f)
1375 // WITHIN GROUP (ORDER BY x)`) take the value to
1376 // aggregate from the sort spec and the in-parens
1377 // arg as the direct (fraction) argument.
1378 let ordered_set = is_within_group_name(&canonical);
1379 let (arg, direct_arg) = if ordered_set {
1380 (
1381 order_by.first().map(|o| o.expr.clone()),
1382 args.first().cloned(),
1383 )
1384 } else {
1385 (args.first().cloned(), None)
1386 };
1387 let spec = AggSpec {
1388 name: canonical.clone(),
1389 arg,
1390 arg2: if agg_uses_second_arg(&canonical) {
1391 args.get(1).cloned()
1392 } else {
1393 None
1394 },
1395 distinct: *distinct,
1396 order_by: order_by.clone(),
1397 filter: filter.as_deref().cloned(),
1398 direct_arg,
1399 first_ordered: false,
1400 };
1401 if !out.iter().any(|s| {
1402 s.name == spec.name
1403 && s.arg == spec.arg
1404 && s.arg2 == spec.arg2
1405 && s.distinct == spec.distinct
1406 && s.order_by == spec.order_by
1407 && s.filter == spec.filter
1408 && s.direct_arg == spec.direct_arg
1409 && s.first_ordered == spec.first_ordered
1410 }) {
1411 out.push(spec);
1412 }
1413 return;
1414 }
1415 }
1416 collect_aggregates(call, out);
1417 for o in order_by {
1418 collect_aggregates(&o.expr, out);
1419 }
1420 }
1421 Expr::FunctionCall { name, args } => {
1422 let lower = name.to_ascii_lowercase();
1423 if is_aggregate_name(&lower) {
1424 let arg = if lower == "count_star" {
1425 None
1426 } else {
1427 args.first().cloned()
1428 };
1429 // v7.17.0 — second positional arg for
1430 // `string_agg(value, separator)`; v7.32 — also the
1431 // regression family `f(Y, X)` and `json_object_agg`.
1432 let arg2 = if agg_uses_second_arg(&lower) {
1433 args.get(1).cloned()
1434 } else {
1435 None
1436 };
1437 // v7.17.0 — `every` is the SQL-standard alias for
1438 // `bool_and`; collapse at collection time so
1439 // update_state / finalize need only one arm.
1440 let canonical = if lower == "every" {
1441 "bool_and".to_string()
1442 } else {
1443 lower
1444 };
1445 let spec = AggSpec {
1446 name: canonical,
1447 arg: arg.clone(),
1448 arg2: arg2.clone(),
1449 distinct: false,
1450 order_by: Vec::new(),
1451 filter: None,
1452 direct_arg: None,
1453 first_ordered: false,
1454 };
1455 if !out.iter().any(|s| {
1456 s.name == spec.name
1457 && s.arg == spec.arg
1458 && s.arg2 == spec.arg2
1459 && !s.distinct
1460 && s.order_by == spec.order_by
1461 && s.filter.is_none()
1462 && !s.first_ordered
1463 }) {
1464 out.push(spec);
1465 }
1466 // Don't recurse into the arg — nested aggregates are
1467 // illegal in standard SQL.
1468 } else {
1469 for a in args {
1470 collect_aggregates(a, out);
1471 }
1472 }
1473 }
1474 Expr::Binary { lhs, rhs, .. } => {
1475 collect_aggregates(lhs, out);
1476 collect_aggregates(rhs, out);
1477 }
1478 Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
1479 collect_aggregates(expr, out);
1480 }
1481 Expr::Like { expr, pattern, .. } => {
1482 collect_aggregates(expr, out);
1483 collect_aggregates(pattern, out);
1484 }
1485 Expr::InList { expr, list, .. } => {
1486 collect_aggregates(expr, out);
1487 for item in list {
1488 collect_aggregates(item, out);
1489 }
1490 }
1491 Expr::Extract { source, .. } => collect_aggregates(source, out),
1492 // v4.10 subquery + v4.12 window / Literal / Column —
1493 // non-recursing leaves for the aggregate collector.
1494 Expr::ScalarSubquery(_)
1495 | Expr::Exists { .. }
1496 | Expr::InSubquery { .. }
1497 | Expr::WindowFunction { .. }
1498 | Expr::Literal(_)
1499 | Expr::Placeholder(_)
1500 | Expr::Column(_) => {}
1501 // v7.10.10 — recurse into array constructor children +
1502 // subscript / ANY/ALL operands.
1503 Expr::Array(items) => {
1504 for elem in items {
1505 collect_aggregates(elem, out);
1506 }
1507 }
1508 Expr::ArraySubscript { target, index } => {
1509 // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]`
1510 // collects as a first_ordered spec; the subscript is consumed
1511 // here (do NOT recurse into the array_agg, or it would also
1512 // register a plain full-array spec).
1513 if let Some((arg, order_by, filter)) = first_ordered_array_agg(e) {
1514 let spec = AggSpec {
1515 name: "array_agg".to_string(),
1516 arg: Some(arg.clone()),
1517 arg2: None,
1518 distinct: false,
1519 order_by: order_by.to_vec(),
1520 filter: filter.cloned(),
1521 direct_arg: None,
1522 first_ordered: true,
1523 };
1524 if !out.iter().any(|s| {
1525 s.name == spec.name
1526 && s.arg == spec.arg
1527 && s.order_by == spec.order_by
1528 && s.filter == spec.filter
1529 && s.first_ordered
1530 }) {
1531 out.push(spec);
1532 }
1533 return;
1534 }
1535 collect_aggregates(target, out);
1536 collect_aggregates(index, out);
1537 }
1538 Expr::AnyAll { expr, array, .. } => {
1539 collect_aggregates(expr, out);
1540 collect_aggregates(array, out);
1541 }
1542 Expr::Case {
1543 operand,
1544 branches,
1545 else_branch,
1546 } => {
1547 if let Some(o) = operand {
1548 collect_aggregates(o, out);
1549 }
1550 for (w, t) in branches {
1551 collect_aggregates(w, out);
1552 collect_aggregates(t, out);
1553 }
1554 if let Some(e) = else_branch {
1555 collect_aggregates(e, out);
1556 }
1557 }
1558 }
1559}
1560
1561fn update_state(
1562 st: &mut AggState,
1563 name: &str,
1564 v: &Value,
1565 arg2: Option<&Value>,
1566 order_keys: Option<Vec<Value>>,
1567) -> Result<(), EvalError> {
1568 let is_null = matches!(v, Value::Null);
1569 match name {
1570 "count_star" => st.count += 1,
1571 "count" => {
1572 if !is_null {
1573 st.count += 1;
1574 }
1575 }
1576 "sum" | "avg" => {
1577 if is_null {
1578 return Ok(());
1579 }
1580 st.count += 1;
1581 match v {
1582 Value::Int(n) => st.sum_int += i64::from(*n),
1583 Value::BigInt(n) => st.sum_int += *n,
1584 Value::Float(x) => {
1585 st.use_float = true;
1586 st.sum_float += *x;
1587 }
1588 other => {
1589 return Err(EvalError::TypeMismatch {
1590 detail: format!("sum/avg need numeric, got {:?}", other.data_type()),
1591 });
1592 }
1593 }
1594 }
1595 "min" => {
1596 if is_null {
1597 return Ok(());
1598 }
1599 match &st.extreme {
1600 None => st.extreme = Some(v.clone()),
1601 Some(cur) => {
1602 if value_cmp(v, cur) == core::cmp::Ordering::Less {
1603 st.extreme = Some(v.clone());
1604 }
1605 }
1606 }
1607 }
1608 "max" => {
1609 if is_null {
1610 return Ok(());
1611 }
1612 match &st.extreme {
1613 None => st.extreme = Some(v.clone()),
1614 Some(cur) => {
1615 if value_cmp(v, cur) == core::cmp::Ordering::Greater {
1616 st.extreme = Some(v.clone());
1617 }
1618 }
1619 }
1620 }
1621 // v7.17.0 — string_agg(value, separator). NULL value is
1622 // skipped (PG aggregate-skip-null). Separator captured
1623 // from the latest row that flows through; matches PG's
1624 // semantics of evaluating the separator per row but using
1625 // the last value at finalize time (in practice it's
1626 // constant). count is bumped so we can distinguish "empty
1627 // group → NULL" from "all-NULL group → NULL".
1628 "string_agg" => {
1629 if let Some(sep) = arg2
1630 && let Value::Text(s) = sep
1631 {
1632 st.separator = Some(s.clone());
1633 }
1634 if is_null {
1635 return Ok(());
1636 }
1637 if let Value::Text(s) = v {
1638 st.items.push(Value::Text(s.clone()));
1639 if let Some(k) = order_keys {
1640 st.item_keys.push(k);
1641 }
1642 st.count += 1;
1643 } else {
1644 return Err(EvalError::TypeMismatch {
1645 detail: format!("string_agg requires text value, got {:?}", v.data_type()),
1646 });
1647 }
1648 }
1649 // v7.17.0 — array_agg(value). Unlike string_agg, NULL
1650 // elements are KEPT in the array (PG behaviour); the
1651 // result is NULL only when ZERO rows fed in. Element type
1652 // is locked from the first row's value type; subsequent
1653 // rows must match (PG also rejects mixed-type array_agg).
1654 "array_agg" => {
1655 st.items.push(v.clone());
1656 if let Some(k) = order_keys {
1657 st.item_keys.push(k);
1658 }
1659 st.count += 1;
1660 }
1661 // v7.17.0 — bool_and(p): TRUE iff every non-NULL input is
1662 // TRUE. NULL skipped; running accumulator stays at TRUE
1663 // until the first non-NULL FALSE.
1664 "bool_and" => {
1665 if is_null {
1666 return Ok(());
1667 }
1668 let b = match v {
1669 Value::Bool(b) => *b,
1670 other => {
1671 return Err(EvalError::TypeMismatch {
1672 detail: format!("bool_and requires bool, got {:?}", other.data_type()),
1673 });
1674 }
1675 };
1676 st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc && b));
1677 }
1678 // v7.17.0 — bool_or(p): TRUE iff any non-NULL input is
1679 // TRUE. NULL skipped.
1680 "bool_or" => {
1681 if is_null {
1682 return Ok(());
1683 }
1684 let b = match v {
1685 Value::Bool(b) => *b,
1686 other => {
1687 return Err(EvalError::TypeMismatch {
1688 detail: format!("bool_or requires bool, got {:?}", other.data_type()),
1689 });
1690 }
1691 };
1692 st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc || b));
1693 }
1694 // v7.32 (round-29) — variance / stddev family. Accumulate the
1695 // running sum (sum_float) and sum of squares (sum_sq) over the
1696 // non-NULL numeric inputs; finalize divides by n or n-1.
1697 "stddev" | "stddev_samp" | "stddev_pop" | "variance" | "var_samp" | "var_pop" => {
1698 if is_null {
1699 return Ok(());
1700 }
1701 let x = match v {
1702 Value::Int(n) => f64::from(*n),
1703 Value::SmallInt(n) => f64::from(*n),
1704 Value::BigInt(n) => *n as f64,
1705 Value::Float(x) => *x,
1706 other => {
1707 return Err(EvalError::TypeMismatch {
1708 detail: format!("{name} needs numeric, got {:?}", other.data_type()),
1709 });
1710 }
1711 };
1712 st.count += 1;
1713 st.sum_float += x;
1714 st.sum_sq += x * x;
1715 }
1716 // v7.32 (round-29) — bitwise aggregates over integer inputs.
1717 "bit_and" | "bit_or" | "bit_xor" => {
1718 if is_null {
1719 return Ok(());
1720 }
1721 let n = match v {
1722 Value::Int(n) => i64::from(*n),
1723 Value::SmallInt(n) => i64::from(*n),
1724 Value::BigInt(n) => *n,
1725 other => {
1726 return Err(EvalError::TypeMismatch {
1727 detail: format!("{name} needs integer, got {:?}", other.data_type()),
1728 });
1729 }
1730 };
1731 st.bit_acc = Some(match (st.bit_acc, name) {
1732 (None, _) => n,
1733 (Some(acc), "bit_and") => acc & n,
1734 (Some(acc), "bit_or") => acc | n,
1735 (Some(acc), _) => acc ^ n, // bit_xor
1736 });
1737 }
1738 // v7.32 (round-29) — WITHIN GROUP aggregates (ordered-set +
1739 // hypothetical-set) collect the sort value (NULLs ignored, per
1740 // PG) into `items`, sorted at finalize by the parallel
1741 // `item_keys`.
1742 n if is_within_group_name(n) => {
1743 if is_null {
1744 return Ok(());
1745 }
1746 st.items.push(v.clone());
1747 if let Some(k) = order_keys {
1748 st.item_keys.push(k);
1749 }
1750 st.count += 1;
1751 }
1752 // v7.32 (round-29) — regression family f(Y, X). Only rows with
1753 // BOTH inputs non-NULL contribute (PG semantics). `v` is Y,
1754 // `arg2` is X.
1755 n if is_regression_name(n) => {
1756 let (Some(y), Some(x)) = (agg_value_to_f64(v), arg2.and_then(agg_value_to_f64)) else {
1757 return Ok(()); // NULL (or non-numeric) in either input
1758 };
1759 st.reg_n += 1;
1760 st.reg_sx += x;
1761 st.reg_sy += y;
1762 st.reg_sxx += x * x;
1763 st.reg_syy += y * y;
1764 st.reg_sxy += x * y;
1765 }
1766 // v7.32 (round-29) — json_agg / jsonb_agg collect every input
1767 // (NULL becomes JSON null, per PG) in row order.
1768 "json_agg" | "jsonb_agg" => {
1769 st.items.push(v.clone());
1770 st.count += 1;
1771 }
1772 // v7.32 (round-29) — json_object_agg(key, value): keys in
1773 // `items`, values in `aux_items`. A NULL key is skipped (PG
1774 // raises; we drop it rather than abort the whole query).
1775 "json_object_agg" | "jsonb_object_agg" => {
1776 if is_null {
1777 return Ok(());
1778 }
1779 st.items.push(v.clone());
1780 st.aux_items.push(arg2.cloned().unwrap_or(Value::Null));
1781 st.count += 1;
1782 }
1783 _ => unreachable!("non-aggregate {name} in update_state"),
1784 }
1785 Ok(())
1786}
1787
1788#[allow(clippy::cast_precision_loss)]
1789fn finalize(name: &str, st: &AggState) -> Value {
1790 match name {
1791 "count" | "count_star" => Value::BigInt(st.count),
1792 "sum" => {
1793 if st.count == 0 {
1794 Value::Null
1795 } else if st.use_float {
1796 Value::Float(st.sum_float + (st.sum_int as f64))
1797 } else {
1798 Value::BigInt(st.sum_int)
1799 }
1800 }
1801 "avg" => {
1802 if st.count == 0 {
1803 Value::Null
1804 } else {
1805 let total = if st.use_float {
1806 st.sum_float + (st.sum_int as f64)
1807 } else {
1808 st.sum_int as f64
1809 };
1810 Value::Float(total / (st.count as f64))
1811 }
1812 }
1813 "min" | "max" => st.extreme.clone().unwrap_or(Value::Null),
1814 // v7.17.0 — string_agg: join all collected text items with
1815 // the captured separator. Empty / all-NULL group → NULL
1816 // (PG semantics).
1817 "string_agg" => {
1818 if st.items.is_empty() {
1819 return Value::Null;
1820 }
1821 let sep = st.separator.clone().unwrap_or_default();
1822 let mut out = String::new();
1823 for (i, item) in st.items.iter().enumerate() {
1824 if i > 0 {
1825 out.push_str(&sep);
1826 }
1827 if let Value::Text(s) = item {
1828 out.push_str(s);
1829 }
1830 }
1831 Value::Text(out)
1832 }
1833 // v7.17.0 — array_agg: collect into a typed array. NULL
1834 // elements are preserved per PG. Result type is decided
1835 // by the first non-NULL element seen (or Text fallback
1836 // when the whole group is NULL — PG would surface the
1837 // declared input type, but SPG hasn't yet wired the
1838 // aggregate's static input-type from `describe`).
1839 "array_agg" => {
1840 if st.items.is_empty() {
1841 return Value::Null;
1842 }
1843 let probe = st.items.iter().find(|v| !v.is_null());
1844 match probe.and_then(spg_storage::Value::data_type) {
1845 Some(DataType::Int) | Some(DataType::SmallInt) => {
1846 let items: Vec<Option<i32>> = st
1847 .items
1848 .iter()
1849 .map(|v| match v {
1850 Value::Int(n) => Some(*n),
1851 Value::SmallInt(n) => Some(i32::from(*n)),
1852 _ => None,
1853 })
1854 .collect();
1855 Value::IntArray(items)
1856 }
1857 Some(DataType::BigInt) => {
1858 let items: Vec<Option<i64>> = st
1859 .items
1860 .iter()
1861 .map(|v| match v {
1862 Value::BigInt(n) => Some(*n),
1863 _ => None,
1864 })
1865 .collect();
1866 Value::BigIntArray(items)
1867 }
1868 _ => {
1869 let items: Vec<Option<String>> = st
1870 .items
1871 .iter()
1872 .map(|v| match v {
1873 Value::Text(s) => Some(s.clone()),
1874 Value::Null => None,
1875 other => Some(format!("{other:?}")),
1876 })
1877 .collect();
1878 Value::TextArray(items)
1879 }
1880 }
1881 }
1882 // v7.17.0 — bool_and / bool_or finalize: lazy-init pattern
1883 // means `None` is exactly "empty group or all-NULL", which
1884 // PG surfaces as SQL NULL.
1885 "bool_and" | "bool_or" => st.bool_acc.map_or(Value::Null, Value::Bool),
1886 // v7.32 (round-29) — variance / stddev. PG: `variance` ==
1887 // `var_samp`, `stddev` == `stddev_samp`. samp needs n >= 2
1888 // (n < 2 → NULL); pop needs n >= 1 (n == 1 → 0).
1889 "variance" | "var_samp" | "var_pop" | "stddev" | "stddev_samp" | "stddev_pop" => {
1890 let n = st.count;
1891 if n == 0 {
1892 return Value::Null;
1893 }
1894 let nf = n as f64;
1895 // Sum of squared deviations from the mean.
1896 let ss = st.sum_sq - (st.sum_float * st.sum_float) / nf;
1897 let pop = name.ends_with("_pop");
1898 let denom = if pop { nf } else { nf - 1.0 };
1899 if denom <= 0.0 {
1900 // var_samp / stddev (samp) with n == 1 → NULL.
1901 return Value::Null;
1902 }
1903 let var = (ss / denom).max(0.0); // clamp fp noise below 0
1904 if name.starts_with("stddev") {
1905 Value::Float(crate::eval::f64_sqrt(var))
1906 } else {
1907 Value::Float(var)
1908 }
1909 }
1910 // v7.32 (round-29) — bitwise aggregates: None (empty / all-NULL)
1911 // → SQL NULL.
1912 "bit_and" | "bit_or" | "bit_xor" => st.bit_acc.map_or(Value::Null, Value::BigInt),
1913 // v7.32 (round-29) — regression family. `regr_count` is the
1914 // paired n; everything else is NULL over an empty set. Terms
1915 // are the mean-centred sums of squares / cross-products.
1916 "regr_count" => Value::BigInt(st.reg_n),
1917 "covar_pop" | "covar_samp" | "corr" | "regr_avgx" | "regr_avgy" | "regr_slope"
1918 | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy" => {
1919 let n = st.reg_n;
1920 if n == 0 {
1921 return Value::Null;
1922 }
1923 let nf = n as f64;
1924 let sxx = st.reg_sxx - st.reg_sx * st.reg_sx / nf;
1925 let syy = st.reg_syy - st.reg_sy * st.reg_sy / nf;
1926 let sxy = st.reg_sxy - st.reg_sx * st.reg_sy / nf;
1927 let avgx = st.reg_sx / nf;
1928 let avgy = st.reg_sy / nf;
1929 let out = match name {
1930 "regr_avgx" => Some(avgx),
1931 "regr_avgy" => Some(avgy),
1932 "regr_sxx" => Some(sxx),
1933 "regr_syy" => Some(syy),
1934 "regr_sxy" => Some(sxy),
1935 "covar_pop" => Some(sxy / nf),
1936 "covar_samp" => (n >= 2).then(|| sxy / (nf - 1.0)),
1937 "regr_slope" => (sxx != 0.0).then(|| sxy / sxx),
1938 "regr_intercept" => (sxx != 0.0).then(|| avgy - (sxy / sxx) * avgx),
1939 "corr" => {
1940 let d = sxx * syy;
1941 (d > 0.0).then(|| sxy / crate::eval::f64_sqrt(d))
1942 }
1943 // PG: NULL when sxx==0; 1 when syy==0 (and sxx>0).
1944 "regr_r2" => {
1945 if sxx == 0.0 {
1946 None
1947 } else if syy == 0.0 {
1948 Some(1.0)
1949 } else {
1950 Some((sxy * sxy) / (sxx * syy))
1951 }
1952 }
1953 _ => None,
1954 };
1955 out.map_or(Value::Null, Value::Float)
1956 }
1957 // v7.32 (round-29) — json_agg / jsonb_agg: a JSON array of every
1958 // collected element in row order; empty set → SQL NULL.
1959 "json_agg" | "jsonb_agg" => {
1960 if st.items.is_empty() {
1961 return Value::Null;
1962 }
1963 let mut out = String::from("[");
1964 for (i, item) in st.items.iter().enumerate() {
1965 if i > 0 {
1966 out.push_str(", ");
1967 }
1968 out.push_str(&crate::json::value_to_json_text(item));
1969 }
1970 out.push(']');
1971 Value::Json(out)
1972 }
1973 // v7.32 (round-29) — json_object_agg: a JSON object built from
1974 // the parallel key (`items`) / value (`aux_items`) streams.
1975 "json_object_agg" | "jsonb_object_agg" => {
1976 if st.items.is_empty() {
1977 return Value::Null;
1978 }
1979 let mut out = String::from("{");
1980 for (i, key) in st.items.iter().enumerate() {
1981 if i > 0 {
1982 out.push_str(", ");
1983 }
1984 // Object keys are always JSON strings (PG coerces).
1985 let key_text = match key {
1986 Value::Text(s) | Value::Json(s) => s.clone(),
1987 other => crate::json::value_to_json_text(other),
1988 };
1989 out.push_str(&crate::json::value_to_json_text(&Value::Text(key_text)));
1990 out.push_str(": ");
1991 let val = st.aux_items.get(i).unwrap_or(&Value::Null);
1992 out.push_str(&crate::json::value_to_json_text(val));
1993 }
1994 out.push('}');
1995 Value::Json(out)
1996 }
1997 // Ordered-set aggregates are finalized in `run` (they need the
1998 // sorted items + the direct fraction argument), never here.
1999 _ => unreachable!(),
2000 }
2001}
2002
2003/// v7.32 (round-29) — numeric coercion for the percentile interpolation.
2004fn agg_value_to_f64(v: &Value) -> Option<f64> {
2005 match v {
2006 Value::Int(n) => Some(f64::from(*n)),
2007 Value::SmallInt(n) => Some(f64::from(*n)),
2008 Value::BigInt(n) => Some(*n as f64),
2009 Value::Float(x) => Some(*x),
2010 _ => None,
2011 }
2012}
2013
2014/// v7.32 (round-29) — finalize a WITHIN GROUP aggregate. `st.items` is
2015/// already sorted by the `WITHIN GROUP (ORDER BY …)` spec. `direct` is
2016/// the evaluated direct argument: the fraction for `percentile_*`, the
2017/// hypothetical value for the hypothetical-set family (`rank` etc.),
2018/// and unused by `mode`. `order` is the (single) sort key, needed by
2019/// the hypothetical-set family to compare in the sort direction.
2020#[allow(
2021 clippy::cast_precision_loss,
2022 clippy::cast_possible_truncation,
2023 clippy::cast_sign_loss
2024)]
2025fn finalize_ordered_set(
2026 name: &str,
2027 st: &AggState,
2028 direct: Option<&Value>,
2029 order: Option<&spg_sql::ast::OrderBy>,
2030) -> Value {
2031 let fraction = direct;
2032 let items = &st.items;
2033 if items.is_empty() {
2034 // A hypothetical row ranks first over an empty group; the
2035 // distribution functions are 0 / divide-by-(n+1).
2036 return match name {
2037 "rank" | "dense_rank" => Value::BigInt(1),
2038 "percent_rank" => Value::Float(0.0),
2039 "cume_dist" => Value::Float(1.0),
2040 _ => Value::Null,
2041 };
2042 }
2043 let n = items.len();
2044 match name {
2045 // v7.32 (round-29) — hypothetical-set: the rank the direct value
2046 // would have if inserted into the group, in the sort direction.
2047 "rank" | "dense_rank" | "percent_rank" | "cume_dist" => {
2048 let Some(h) = fraction else {
2049 return Value::Null;
2050 };
2051 let (desc, nulls_first) = order.map_or((false, None), |o| (o.desc, o.nulls_first));
2052 let mut before = 0usize; // sort strictly before h
2053 let mut before_or_eq = 0usize; // sort before-or-peer with h
2054 let mut distinct_before = 0usize;
2055 let mut last_before: Option<&Value> = None;
2056 for it in items {
2057 match crate::order_by_value_cmp(desc, nulls_first, it, h) {
2058 core::cmp::Ordering::Less => {
2059 before += 1;
2060 before_or_eq += 1;
2061 if last_before
2062 .is_none_or(|p| value_cmp(p, it) != core::cmp::Ordering::Equal)
2063 {
2064 distinct_before += 1;
2065 last_before = Some(it);
2066 }
2067 }
2068 core::cmp::Ordering::Equal => before_or_eq += 1,
2069 core::cmp::Ordering::Greater => {}
2070 }
2071 }
2072 let nn = n as f64;
2073 match name {
2074 "rank" => Value::BigInt((before + 1) as i64),
2075 "dense_rank" => Value::BigInt((distinct_before + 1) as i64),
2076 "percent_rank" => Value::Float(before as f64 / nn),
2077 "cume_dist" => Value::Float((before_or_eq as f64 + 1.0) / (nn + 1.0)),
2078 _ => unreachable!(),
2079 }
2080 }
2081 // Most frequent value; equal values are adjacent in the sorted
2082 // run, and a frequency tie resolves to the earliest run (the
2083 // smallest value under an ascending sort), matching PG.
2084 "mode" => {
2085 let (mut best_i, mut best_cnt) = (0usize, 1usize);
2086 let (mut run_i, mut run_cnt) = (0usize, 1usize);
2087 for i in 1..n {
2088 if value_cmp(&items[i], &items[run_i]) == core::cmp::Ordering::Equal {
2089 run_cnt += 1;
2090 } else {
2091 run_i = i;
2092 run_cnt = 1;
2093 }
2094 if run_cnt > best_cnt {
2095 best_cnt = run_cnt;
2096 best_i = run_i;
2097 }
2098 }
2099 items[best_i].clone()
2100 }
2101 // The first value whose cumulative fraction reaches `f`.
2102 "percentile_disc" => {
2103 let f = fraction
2104 .and_then(agg_value_to_f64)
2105 .unwrap_or(0.0)
2106 .clamp(0.0, 1.0);
2107 let idx = if f <= 0.0 {
2108 0
2109 } else {
2110 (crate::eval::f64_ceil(f * n as f64) as usize)
2111 .saturating_sub(1)
2112 .min(n - 1)
2113 };
2114 items[idx].clone()
2115 }
2116 // Linear interpolation between the two bracketing values.
2117 "percentile_cont" => {
2118 let f = fraction
2119 .and_then(agg_value_to_f64)
2120 .unwrap_or(0.0)
2121 .clamp(0.0, 1.0);
2122 let Some(nums) = items
2123 .iter()
2124 .map(agg_value_to_f64)
2125 .collect::<Option<Vec<f64>>>()
2126 else {
2127 return Value::Null; // non-numeric ordered set
2128 };
2129 if n == 1 {
2130 return Value::Float(nums[0]);
2131 }
2132 let rank = f * (n as f64 - 1.0);
2133 let lo = crate::eval::f64_floor(rank) as usize;
2134 let hi = crate::eval::f64_ceil(rank) as usize;
2135 let frac = rank - lo as f64;
2136 Value::Float(nums[lo] + (nums[hi] - nums[lo]) * frac)
2137 }
2138 _ => unreachable!(),
2139 }
2140}
2141
2142fn infer_agg_type(spec: &AggSpec, schema_cols: &[ColumnSchema]) -> DataType {
2143 // v7.26 (round-20 C) — the argument's statically-derived shape
2144 // types MIN/MAX/SUM/array_agg properly; RowDescription used to
2145 // report TEXT for these, breaking every sqlx typed decode.
2146 let arg_ty = spec
2147 .arg
2148 .as_ref()
2149 .and_then(|a| crate::describe::describe_expr(a, schema_cols))
2150 .map(|shape| shape.ty);
2151 // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]` yields the
2152 // ELEMENT type (x), not the array type.
2153 if spec.first_ordered {
2154 return arg_ty.unwrap_or(DataType::Text);
2155 }
2156 match spec.name.as_str() {
2157 "count" | "count_star" => DataType::BigInt,
2158 "sum" => match arg_ty {
2159 Some(DataType::Float) => DataType::Float,
2160 _ => DataType::BigInt,
2161 },
2162 "avg" => DataType::Float,
2163 // v7.17.0 — string_agg always returns TEXT.
2164 "string_agg" => DataType::Text,
2165 "array_agg" => match arg_ty {
2166 Some(DataType::Int | DataType::SmallInt) => DataType::IntArray,
2167 Some(DataType::BigInt) => DataType::BigIntArray,
2168 _ => DataType::TextArray,
2169 },
2170 // v7.17.0 — boolean aggregates always return BOOL (nullable
2171 // — empty / all-NULL group → NULL).
2172 "bool_and" | "bool_or" => DataType::Bool,
2173 // v7.32 (round-29) — variance / stddev are floating point;
2174 // percentile_cont interpolates to float; the regression family
2175 // (except regr_count) is floating point.
2176 "stddev" | "stddev_samp" | "stddev_pop" | "variance" | "var_samp" | "var_pop"
2177 | "percentile_cont" | "covar_pop" | "covar_samp" | "corr" | "regr_avgx" | "regr_avgy"
2178 | "regr_slope" | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy" => {
2179 DataType::Float
2180 }
2181 // v7.32 (round-29) — bitwise aggregates, regr_count, and the
2182 // integer hypothetical-set ranks return an integer.
2183 "bit_and" | "bit_or" | "bit_xor" | "regr_count" | "rank" | "dense_rank" => DataType::BigInt,
2184 // v7.32 (round-29) — hypothetical-set distribution functions.
2185 "percent_rank" | "cume_dist" => DataType::Float,
2186 // v7.32 (round-29) — JSON aggregates return JSON.
2187 "json_agg" | "jsonb_agg" | "json_object_agg" | "jsonb_object_agg" => DataType::Json,
2188 // min/max, percentile_disc, mode, and anything pass-through:
2189 // the argument's shape (for ordered-set aggs `spec.arg` is the
2190 // WITHIN GROUP value expression).
2191 _ => arg_ty.unwrap_or(DataType::Text),
2192 }
2193}
2194
2195fn agg_or_group_type(e: &Expr, synth: &[ColumnSchema]) -> DataType {
2196 if let Expr::Column(c) = e
2197 && let Some(s) = synth.iter().find(|s| s.name == c.name)
2198 {
2199 return s.ty;
2200 }
2201 // v7.26 (round-20 C) — compound expressions over aggregates
2202 // (COALESCE(BOOL_OR(…), false), (array_agg(…))[1], CASE …)
2203 // derive their shape statically against the synth schema; the
2204 // old Text fallback broke sqlx typed decodes of exactly these
2205 // columns.
2206 crate::describe::describe_expr(e, synth)
2207 .map(|shape| shape.ty)
2208 .unwrap_or(DataType::Text)
2209}
2210
2211fn rewrite_expr(e: &Expr, group_exprs: &[Expr], aggs: &[AggSpec]) -> Expr {
2212 // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]` rewrites
2213 // to its first_ordered synth column, consuming the subscript. Checked
2214 // before the AggregateOrdered/recursion arms (which would otherwise
2215 // rewrite the inner array_agg and leave the subscript). Same matcher
2216 // as collect_aggregates, so the spec it finds is the one collected.
2217 if let Some((arg, order_by, filter)) = first_ordered_array_agg(e) {
2218 let arg_owned = Some(arg.clone());
2219 let filter_owned = filter.cloned();
2220 for (i, spec) in aggs.iter().enumerate() {
2221 if spec.first_ordered
2222 && spec.name == "array_agg"
2223 && spec.arg == arg_owned
2224 && spec.order_by == *order_by
2225 && spec.filter == filter_owned
2226 {
2227 return Expr::Column(spg_sql::ast::ColumnName {
2228 qualifier: None,
2229 name: format!("__agg_{i}"),
2230 });
2231 }
2232 }
2233 }
2234 // v7.24 (round-16 A) — ordered aggregate: match on the inner
2235 // call PLUS the ordering keys.
2236 if let Expr::AggregateOrdered {
2237 call,
2238 order_by,
2239 distinct,
2240 filter,
2241 } = e
2242 && let Expr::FunctionCall { name, args } = call.as_ref()
2243 {
2244 let lower = name.to_ascii_lowercase();
2245 if is_aggregate_name(&lower) {
2246 let canonical: &str = if lower == "every" { "bool_and" } else { &lower };
2247 // Mirror collect_aggregates: ordered-set aggregates take the
2248 // value from the sort spec and the in-parens arg as direct.
2249 let (arg, direct_arg) = if is_within_group_name(canonical) {
2250 (
2251 order_by.first().map(|o| o.expr.clone()),
2252 args.first().cloned(),
2253 )
2254 } else {
2255 (args.first().cloned(), None)
2256 };
2257 let arg2 = if agg_uses_second_arg(canonical) {
2258 args.get(1).cloned()
2259 } else {
2260 None
2261 };
2262 let filter_owned = filter.as_deref().cloned();
2263 for (i, spec) in aggs.iter().enumerate() {
2264 if spec.name == canonical
2265 && spec.arg == arg
2266 && spec.arg2 == arg2
2267 && spec.distinct == *distinct
2268 && spec.order_by == *order_by
2269 && spec.filter == filter_owned
2270 && spec.direct_arg == direct_arg
2271 {
2272 return Expr::Column(spg_sql::ast::ColumnName {
2273 qualifier: None,
2274 name: format!("__agg_{i}"),
2275 });
2276 }
2277 }
2278 }
2279 }
2280 // Match aggregate FunctionCalls first — they sit outside group_by.
2281 if let Expr::FunctionCall { name, args } = e {
2282 let lower = name.to_ascii_lowercase();
2283 if is_aggregate_name(&lower) {
2284 let arg = if lower == "count_star" {
2285 None
2286 } else {
2287 args.first().cloned()
2288 };
2289 // v7.17.0 — match the spec we registered for
2290 // string_agg(value, separator) on the full pair; v7.32 also
2291 // the regression family and json_object_agg.
2292 let arg2 = if agg_uses_second_arg(&lower) {
2293 args.get(1).cloned()
2294 } else {
2295 None
2296 };
2297 // v7.17.0 — `every` collapses into `bool_and` at
2298 // collection; mirror that here so the rewrite finds
2299 // the matching synth column.
2300 let canonical: &str = if lower == "every" {
2301 "bool_and"
2302 } else {
2303 lower.as_str()
2304 };
2305 for (i, spec) in aggs.iter().enumerate() {
2306 if spec.name == canonical
2307 && spec.arg == arg
2308 && spec.arg2 == arg2
2309 && !spec.distinct
2310 && spec.order_by.is_empty()
2311 {
2312 return Expr::Column(spg_sql::ast::ColumnName {
2313 qualifier: None,
2314 name: format!("__agg_{i}"),
2315 });
2316 }
2317 }
2318 }
2319 }
2320 // Match a group_by expression by AST equality.
2321 for (i, g) in group_exprs.iter().enumerate() {
2322 if g == e {
2323 return Expr::Column(spg_sql::ast::ColumnName {
2324 qualifier: None,
2325 name: format!("__grp_{i}"),
2326 });
2327 }
2328 }
2329 // Recurse into children.
2330 match e {
2331 Expr::AggregateOrdered {
2332 call,
2333 order_by,
2334 distinct,
2335 filter,
2336 } => Expr::AggregateOrdered {
2337 call: Box::new(rewrite_expr(call, group_exprs, aggs)),
2338 distinct: *distinct,
2339 order_by: order_by
2340 .iter()
2341 .map(|o| spg_sql::ast::OrderBy {
2342 expr: rewrite_expr(&o.expr, group_exprs, aggs),
2343 desc: o.desc,
2344 nulls_first: o.nulls_first,
2345 })
2346 .collect(),
2347 // The filter is evaluated against SOURCE rows during
2348 // accumulation, never against synth rows — keep it as-is.
2349 filter: filter.clone(),
2350 },
2351 Expr::Binary { lhs, op, rhs } => Expr::Binary {
2352 lhs: Box::new(rewrite_expr(lhs, group_exprs, aggs)),
2353 op: *op,
2354 rhs: Box::new(rewrite_expr(rhs, group_exprs, aggs)),
2355 },
2356 Expr::Unary { op, expr } => Expr::Unary {
2357 op: *op,
2358 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2359 },
2360 Expr::Cast { expr, target } => Expr::Cast {
2361 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2362 target: *target,
2363 },
2364 Expr::IsNull { expr, negated } => Expr::IsNull {
2365 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2366 negated: *negated,
2367 },
2368 Expr::FunctionCall { name, args } => Expr::FunctionCall {
2369 name: name.clone(),
2370 args: args
2371 .iter()
2372 .map(|a| rewrite_expr(a, group_exprs, aggs))
2373 .collect(),
2374 },
2375 Expr::Like {
2376 expr,
2377 pattern,
2378 negated,
2379 case_insensitive,
2380 } => Expr::Like {
2381 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2382 pattern: Box::new(rewrite_expr(pattern, group_exprs, aggs)),
2383 negated: *negated,
2384 case_insensitive: *case_insensitive,
2385 },
2386 Expr::Extract { field, source } => Expr::Extract {
2387 field: *field,
2388 source: Box::new(rewrite_expr(source, group_exprs, aggs)),
2389 },
2390 // v7.25.2 (round-19 A) — subquery nodes: rewrite group-key
2391 // references INSIDE the body to `__grp_N` so the correlated
2392 // resolver can substitute them against the synthesised group
2393 // row (aggs are NOT matched inside the body — a COUNT in the
2394 // subquery is the subquery's own aggregate).
2395 Expr::ScalarSubquery(s) => {
2396 Expr::ScalarSubquery(Box::new(rewrite_group_keys_in_select(s, group_exprs)))
2397 }
2398 Expr::Exists { subquery, negated } => Expr::Exists {
2399 subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
2400 negated: *negated,
2401 },
2402 Expr::InSubquery {
2403 expr,
2404 subquery,
2405 negated,
2406 } => Expr::InSubquery {
2407 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2408 subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
2409 negated: *negated,
2410 },
2411 // v4.12 window / Literal / Column — clone-pass (these don't
2412 // participate in aggregate rewrite).
2413 Expr::WindowFunction { .. } | Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => {
2414 e.clone()
2415 }
2416 // v7.10.10 — recurse children for array nodes.
2417 Expr::Array(items) => Expr::Array(
2418 items
2419 .iter()
2420 .map(|elem| rewrite_expr(elem, group_exprs, aggs))
2421 .collect(),
2422 ),
2423 Expr::ArraySubscript { target, index } => Expr::ArraySubscript {
2424 target: Box::new(rewrite_expr(target, group_exprs, aggs)),
2425 index: Box::new(rewrite_expr(index, group_exprs, aggs)),
2426 },
2427 Expr::AnyAll {
2428 expr,
2429 op,
2430 array,
2431 is_any,
2432 } => Expr::AnyAll {
2433 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2434 op: *op,
2435 array: Box::new(rewrite_expr(array, group_exprs, aggs)),
2436 is_any: *is_any,
2437 },
2438 Expr::InList {
2439 expr,
2440 list,
2441 negated,
2442 } => Expr::InList {
2443 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2444 list: list
2445 .iter()
2446 .map(|item| rewrite_expr(item, group_exprs, aggs))
2447 .collect(),
2448 negated: *negated,
2449 },
2450 Expr::Case {
2451 operand,
2452 branches,
2453 else_branch,
2454 } => Expr::Case {
2455 operand: operand
2456 .as_deref()
2457 .map(|o| Box::new(rewrite_expr(o, group_exprs, aggs))),
2458 branches: branches
2459 .iter()
2460 .map(|(w, t)| {
2461 (
2462 rewrite_expr(w, group_exprs, aggs),
2463 rewrite_expr(t, group_exprs, aggs),
2464 )
2465 })
2466 .collect(),
2467 else_branch: else_branch
2468 .as_deref()
2469 .map(|e| Box::new(rewrite_expr(e, group_exprs, aggs))),
2470 },
2471 }
2472}
2473
2474/// v7.25.2 (round-19 A) — rewrite group-key references inside a
2475/// subquery body to `__grp_N` synthetic columns (aggregates are
2476/// not touched: empty spec list). Runs through the canonical
2477/// Select walker so every expression slot is covered.
2478fn rewrite_group_keys_in_select(
2479 s: &spg_sql::ast::SelectStatement,
2480 group_exprs: &[Expr],
2481) -> spg_sql::ast::SelectStatement {
2482 let mut out = s.clone();
2483 let _ = crate::walk_select_exprs_mut(&mut out, &mut |e| {
2484 *e = rewrite_expr(e, group_exprs, &[]);
2485 Ok(())
2486 });
2487 out
2488}
2489
2490/// Canonical string key for a tuple of group values. Used as map key.
2491/// Per-value group-key encoding (shared by owned and borrowed paths).
2492fn encode_one(out: &mut String, v: &Value) {
2493 match v {
2494 Value::Null => out.push_str("N|"),
2495 Value::SmallInt(n) => {
2496 out.push('s');
2497 out.push_str(&n.to_string());
2498 out.push('|');
2499 }
2500 Value::Int(n) => {
2501 out.push('I');
2502 out.push_str(&n.to_string());
2503 out.push('|');
2504 }
2505 Value::BigInt(n) => {
2506 out.push('B');
2507 out.push_str(&n.to_string());
2508 out.push('|');
2509 }
2510 Value::Float(x) => {
2511 out.push('F');
2512 out.push_str(&x.to_string());
2513 out.push('|');
2514 }
2515 Value::Bool(b) => {
2516 out.push(if *b { 'T' } else { 'f' });
2517 out.push('|');
2518 }
2519 Value::Text(s) => {
2520 out.push('S');
2521 out.push_str(s);
2522 out.push('|');
2523 }
2524 Value::Vector(v) => {
2525 out.push('V');
2526 for x in v {
2527 out.push_str(&x.to_string());
2528 out.push(',');
2529 }
2530 out.push('|');
2531 }
2532 // v6.0.1: GROUP BY on a `VECTOR(N) USING SQ8` column.
2533 // Two cells with byte-identical `(min, max, bytes)`
2534 // share the same group; equivalence is byte-equality
2535 // (same as f32 grouping today — neither path tries to
2536 // normalise nan/-0).
2537 Value::Sq8Vector(q) => {
2538 out.push('Q');
2539 out.push_str(&q.min.to_string());
2540 out.push('@');
2541 out.push_str(&q.max.to_string());
2542 out.push(':');
2543 for b in &q.bytes {
2544 out.push_str(&b.to_string());
2545 out.push(',');
2546 }
2547 out.push('|');
2548 }
2549 // v6.0.3: GROUP BY on a `VECTOR(N) USING HALF` column.
2550 // Byte-equality over the raw u16 bits; matches the SQ8
2551 // path's byte-key model.
2552 Value::HalfVector(h) => {
2553 out.push('H');
2554 for b in &h.bytes {
2555 out.push_str(&b.to_string());
2556 out.push(',');
2557 }
2558 out.push('|');
2559 }
2560 Value::Numeric { scaled, scale } => {
2561 out.push('D');
2562 out.push_str(&scaled.to_string());
2563 out.push('@');
2564 out.push_str(&scale.to_string());
2565 out.push('|');
2566 }
2567 Value::Date(d) => {
2568 out.push('d');
2569 out.push_str(&d.to_string());
2570 out.push('|');
2571 }
2572 Value::Timestamp(t) => {
2573 out.push('t');
2574 out.push_str(&t.to_string());
2575 out.push('|');
2576 }
2577 Value::Interval { months, micros } => {
2578 out.push('i');
2579 out.push_str(&months.to_string());
2580 out.push('m');
2581 out.push_str(µs.to_string());
2582 out.push('|');
2583 }
2584 Value::Json(s) => {
2585 out.push('j');
2586 out.push_str(s);
2587 out.push('|');
2588 }
2589 // v7.5.0 — Value is #[non_exhaustive] for downstream
2590 // forward-compat. Any future variant lacking explicit
2591 // handling here will share a debug-derived group key,
2592 // which is observably wrong but won't crash.
2593 _ => {
2594 out.push('?');
2595 out.push_str(&format!("{v:?}"));
2596 out.push('|');
2597 }
2598 }
2599}
2600
2601/// v7.30 (perf campaign) - encode from borrowed cells without
2602/// materialising an owned Vec<Value> first.
2603pub(crate) fn encode_key_refs(vals: &[&Value]) -> String {
2604 let mut out = String::new();
2605 for v in vals {
2606 encode_one(&mut out, v);
2607 }
2608 out
2609}
2610
2611/// v7.31 (perf 3e) — encode into a caller-owned scratch buffer.
2612/// The per-row key paths (group hash, DISTINCT set, join build/
2613/// probe) ran 24k+ String allocations per query through the
2614/// allocator just to LOOK UP a map; the scratch form allocates
2615/// only when a map actually has to take ownership (vacant insert).
2616pub(crate) fn encode_key_refs_into(vals: &[&Value], out: &mut String) {
2617 out.clear();
2618 for v in vals {
2619 encode_one(out, v);
2620 }
2621}
2622
2623pub(crate) fn encode_key(vals: &[Value]) -> String {
2624 let mut out = String::new();
2625 for v in vals {
2626 encode_one(&mut out, v);
2627 }
2628 out
2629}
2630
2631#[allow(clippy::cast_precision_loss)]
2632fn value_cmp(a: &Value, b: &Value) -> core::cmp::Ordering {
2633 use core::cmp::Ordering::Equal;
2634 match (a, b) {
2635 (Value::Null, Value::Null) => Equal,
2636 (Value::Null, _) => core::cmp::Ordering::Greater, // NULLs last
2637 (_, Value::Null) => core::cmp::Ordering::Less,
2638 (Value::Int(x), Value::Int(y)) => x.cmp(y),
2639 (Value::BigInt(x), Value::BigInt(y)) => x.cmp(y),
2640 (Value::Int(x), Value::BigInt(y)) => i64::from(*x).cmp(y),
2641 (Value::BigInt(x), Value::Int(y)) => x.cmp(&i64::from(*y)),
2642 (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(Equal),
2643 (Value::Int(x), Value::Float(y)) => f64::from(*x).partial_cmp(y).unwrap_or(Equal),
2644 (Value::Float(x), Value::Int(y)) => x.partial_cmp(&f64::from(*y)).unwrap_or(Equal),
2645 (Value::BigInt(x), Value::Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Equal),
2646 (Value::Float(x), Value::BigInt(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Equal),
2647 (Value::Text(x), Value::Text(y)) => x.cmp(y),
2648 (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
2649 _ => Equal,
2650 }
2651}