spg_engine/aggregate.rs
1//! Aggregate executor.
2//!
3//! Handles `SELECT … <aggs> … [GROUP BY …]` queries. The planning strategy
4//! is straightforward:
5//!
6//! 1. Walk the SELECT (and ORDER BY) expressions to find every aggregate
7//! function call. Dedupe by AST equality and assign each `__agg_<i>`.
8//! 2. Same for every `GROUP BY` expression: assign `__grp_<j>`.
9//! 3. Stream the WHERE-filtered rows, group by the tuple of GROUP BY
10//! values, and update per-group aggregate state.
11//! 4. Materialise a synthetic per-group row containing
12//! `[__grp_0..__grp_K, __agg_0..__agg_N]` and rewrite the user's
13//! SELECT / ORDER BY expressions to reference those synthetic columns
14//! instead of the originals.
15//! 5. Evaluate the rewritten expressions against the synthetic schema and
16//! emit results.
17//!
18//! v1.8 implements `count(*)`, `count(expr)`, `sum`, `min`, `max`, `avg`.
19//! NULL semantics follow PG: aggregates skip NULL inputs (except
20//! `count(*)`, which counts rows). `sum(int)` widens to `BigInt`;
21//! `avg(int|bigint)` returns `Float`.
22
23use alloc::borrow::Cow;
24use alloc::boxed::Box;
25use alloc::collections::BTreeSet;
26use alloc::format;
27use alloc::string::{String, ToString};
28use alloc::vec::Vec;
29
30use spg_sql::ast::{Expr, SelectItem, SelectStatement};
31use spg_storage::{ColumnSchema, DataType, Row, Value};
32
33use crate::eval::{self, EvalContext, EvalError};
34use crate::join::RowRef;
35
36/// True if this statement should go through the aggregate path.
37pub fn uses_aggregate(stmt: &SelectStatement) -> bool {
38 if stmt.group_by.is_some() || stmt.having.is_some() {
39 return true;
40 }
41 for item in &stmt.items {
42 if let SelectItem::Expr { expr, .. } = item
43 && contains_aggregate(expr)
44 {
45 return true;
46 }
47 }
48 for o in &stmt.order_by {
49 if contains_aggregate(&o.expr) {
50 return true;
51 }
52 }
53 if let Some(h) = &stmt.having
54 && contains_aggregate(h)
55 {
56 return true;
57 }
58 false
59}
60
61pub fn contains_aggregate(e: &Expr) -> bool {
62 match e {
63 Expr::FunctionCall { name, args } => {
64 is_aggregate_name(name) || args.iter().any(contains_aggregate)
65 }
66 Expr::AggregateOrdered { .. } => true,
67 Expr::Binary { lhs, rhs, .. } => contains_aggregate(lhs) || contains_aggregate(rhs),
68 Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
69 contains_aggregate(expr)
70 }
71 Expr::Like { expr, pattern, .. } => contains_aggregate(expr) || contains_aggregate(pattern),
72 Expr::Extract { source, .. } => contains_aggregate(source),
73 // v4.10 subqueries + v4.12 window functions / Literal /
74 // Column — all non-aggregate leaves from the regular
75 // aggregate planner's POV. Window-bearing projections are
76 // routed to exec_select_with_window before this runs.
77 Expr::ScalarSubquery(_)
78 | Expr::Exists { .. }
79 | Expr::InSubquery { .. }
80 | Expr::WindowFunction { .. }
81 | Expr::Literal(_)
82 | Expr::Placeholder(_)
83 | Expr::Column(_) => false,
84 // v7.10.10 — recurse into array constructor / subscript /
85 // ANY/ALL children. Aggregates inside `ARRAY[SUM(x)]` are
86 // valid PG and must be detected here.
87 Expr::Array(items) => items.iter().any(contains_aggregate),
88 Expr::ArraySubscript { target, index } => {
89 contains_aggregate(target) || contains_aggregate(index)
90 }
91 Expr::AnyAll { expr, array, .. } => contains_aggregate(expr) || contains_aggregate(array),
92 Expr::InList { expr, list, .. } => {
93 contains_aggregate(expr) || list.iter().any(contains_aggregate)
94 }
95 // v7.13.0 — CASE WHEN … END. Recurse into operand,
96 // every (WHEN, THEN) pair, and the ELSE branch.
97 Expr::Case {
98 operand,
99 branches,
100 else_branch,
101 } => {
102 operand.as_deref().is_some_and(contains_aggregate)
103 || branches
104 .iter()
105 .any(|(w, t)| contains_aggregate(w) || contains_aggregate(t))
106 || else_branch.as_deref().is_some_and(contains_aggregate)
107 }
108 }
109}
110
111pub fn is_aggregate_name(name: &str) -> bool {
112 matches!(
113 name.to_ascii_lowercase().as_str(),
114 "count"
115 | "count_star"
116 | "sum"
117 | "min"
118 | "max"
119 | "avg"
120 // v7.17.0 — variadic / collection aggregates. ORM
121 // reports (Hibernate / Rails / Django) emit these in
122 // GROUP BY rollups; pre-7.17 SPG hit "unknown
123 // aggregate".
124 | "string_agg"
125 | "array_agg"
126 // v7.17.0 — boolean aggregates. `every` is SQL-standard
127 // alias for `bool_and`.
128 | "bool_and"
129 | "bool_or"
130 | "every"
131 // v7.32 (round-29) — statistical aggregates (every BI /
132 // dashboard emits these in rollups).
133 | "stddev" | "stddev_samp" | "stddev_pop"
134 | "variance" | "var_samp" | "var_pop"
135 // v7.32 (round-29) — bitwise aggregates.
136 | "bit_and" | "bit_or" | "bit_xor"
137 // v7.32 (round-29) — ordered-set aggregates (used with
138 // `WITHIN GROUP (ORDER BY …)`).
139 | "percentile_cont" | "percentile_disc" | "mode"
140 // v7.32 (round-29) — hypothetical-set aggregates (also
141 // `WITHIN GROUP`): the rank the direct args WOULD have.
142 | "rank" | "dense_rank" | "percent_rank" | "cume_dist"
143 // v7.32 (round-29) — two-argument regression family.
144 | "covar_pop" | "covar_samp" | "corr"
145 | "regr_count" | "regr_avgx" | "regr_avgy" | "regr_slope"
146 | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy"
147 // v7.32 (round-29) — JSON aggregates.
148 | "json_agg" | "jsonb_agg" | "json_object_agg" | "jsonb_object_agg"
149 )
150}
151
152/// v7.32 (round-29) — two-argument regression aggregates `f(Y, X)`.
153fn is_regression_name(name: &str) -> bool {
154 matches!(
155 name,
156 "covar_pop"
157 | "covar_samp"
158 | "corr"
159 | "regr_count"
160 | "regr_avgx"
161 | "regr_avgy"
162 | "regr_slope"
163 | "regr_intercept"
164 | "regr_r2"
165 | "regr_sxx"
166 | "regr_syy"
167 | "regr_sxy"
168 )
169}
170
171/// v7.32 (round-29) — aggregates that consume a second positional
172/// argument: `string_agg(v, sep)`, the regression family `f(Y, X)`, and
173/// `json_object_agg(key, value)`.
174fn agg_uses_second_arg(name: &str) -> bool {
175 name == "string_agg"
176 || name == "json_object_agg"
177 || name == "jsonb_object_agg"
178 || is_regression_name(name)
179}
180
181/// v7.32 (round-29) — ordered-set aggregates: the value to aggregate
182/// comes from the `WITHIN GROUP (ORDER BY …)` sort spec, and any
183/// in-parens arguments are *direct* arguments (the percentile fraction).
184/// `mode()` takes no direct argument.
185pub fn is_ordered_set_name(name: &str) -> bool {
186 // v7.32 — `eq_ignore_ascii_case` instead of `to_ascii_lowercase()`:
187 // these classifiers run in the aggregate row/group loop, where the
188 // old per-call `String` allocation showed up as ~16% of the inbox's
189 // aggregate path in a sampled profile (the names are constant).
190 ["percentile_cont", "percentile_disc", "mode"]
191 .iter()
192 .any(|k| name.eq_ignore_ascii_case(k))
193}
194
195/// v7.32 (round-29) — hypothetical-set aggregates: `rank(args) WITHIN
196/// GROUP (ORDER BY …)` and friends compute the rank the hypothetical
197/// row would have. Like ordered-set, the value stream comes from the
198/// sort spec and the in-parens args are direct (the hypothetical row).
199pub fn is_hypothetical_set_name(name: &str) -> bool {
200 ["rank", "dense_rank", "percent_rank", "cume_dist"]
201 .iter()
202 .any(|k| name.eq_ignore_ascii_case(k))
203}
204
205/// v7.32 (round-29) — every aggregate that takes its value stream from
206/// a `WITHIN GROUP (ORDER BY …)` clause (ordered-set + hypothetical-set).
207pub fn is_within_group_name(name: &str) -> bool {
208 is_ordered_set_name(name) || is_hypothetical_set_name(name)
209}
210
211/// Per-aggregate running state.
212#[derive(Debug, Default, Clone)]
213struct AggState {
214 count: i64,
215 sum_int: i64,
216 sum_float: f64,
217 extreme: Option<Value>,
218 use_float: bool,
219 /// v7.17.0 — running collection for string_agg / array_agg.
220 /// Each entry is one row's contribution (NULL preserved as
221 /// `Value::Null`; string_agg's finalize step drops them, but
222 /// array_agg keeps them). Pushing in insertion order matches
223 /// PG behaviour when no `ORDER BY` is given inside the
224 /// aggregate call.
225 items: Vec<Value>,
226 /// v7.25 (round-17) — per-group dedupe set for DISTINCT
227 /// aggregates (encoded values; NULLs never reach it because
228 /// the caller's skip runs after the per-aggregate NULL rules).
229 seen: BTreeSet<String>,
230 /// v7.24 (round-16 A) — per-item ORDER BY key tuples, parallel
231 /// to `items` (pushed under the same skip/keep conditions).
232 /// Empty when the aggregate carries no internal ordering.
233 item_keys: Vec<Vec<Value>>,
234 /// v7.17.0 — captured separator for string_agg. PG accepts a
235 /// non-constant separator expression but in practice every
236 /// caller passes a literal; the engine snapshots the last
237 /// non-NULL text it sees, which matches PG's "use the latest
238 /// row's value" behaviour.
239 separator: Option<String>,
240 /// v7.17.0 — running boolean accumulator for bool_and /
241 /// bool_or / every. `None` until the first non-NULL input;
242 /// at finalize None → SQL NULL.
243 bool_acc: Option<bool>,
244 /// v7.32 (round-29) — sum of squares for the variance / stddev
245 /// family (`sum_float` carries the running sum; `count` the n).
246 sum_sq: f64,
247 /// v7.32 (round-29) — running accumulator for bit_and / bit_or /
248 /// bit_xor. `None` until the first non-NULL input → SQL NULL.
249 bit_acc: Option<i64>,
250 /// v7.32 (round-29) — two-argument regression family
251 /// (`covar_*` / `corr` / `regr_*`), PG arg order `f(Y, X)`. Only
252 /// rows where BOTH inputs are non-NULL contribute (`count` is the
253 /// paired n, independent of the single-arg `sum_*`).
254 reg_n: i64,
255 reg_sx: f64,
256 reg_sy: f64,
257 reg_sxx: f64,
258 reg_syy: f64,
259 reg_sxy: f64,
260 /// v7.32 (round-29) — second value stream for `json_object_agg`
261 /// (`items` holds the keys, `aux_items` the values).
262 aux_items: Vec<Value>,
263 /// v7.33 (array_agg argmax) — for a `first_ordered` spec
264 /// (`(array_agg(x ORDER BY y))[1]`), the running first-by-order
265 /// (sort-key tuple, value). Replaced only when a new row's key sorts
266 /// strictly before the current best (ties keep the earliest row, =
267 /// the stable-sort `[1]`). No items/item_keys array is built.
268 first_best: Option<(Vec<Value>, Value)>,
269}
270
271#[derive(Debug, Clone)]
272struct AggSpec {
273 name: String, // lowercased
274 /// First argument (value expression) for every aggregate
275 /// except `count(*)`. `None` for `count_star`.
276 arg: Option<Expr>,
277 /// v7.17.0 — second argument. Only `string_agg(value, sep)`
278 /// uses it today. `None` for every other aggregate (or for
279 /// `array_agg`, which is single-arg). Carried in the spec so
280 /// per-row evaluation can re-use the same separator
281 /// expression across calls.
282 arg2: Option<Expr>,
283 /// v7.25 (round-17) — `COUNT(DISTINCT x)` & friends: dedupe
284 /// the input stream per group before accumulation.
285 distinct: bool,
286 /// v7.24 (round-16 A) — aggregate-internal ORDER BY keys
287 /// (`array_agg(x ORDER BY y DESC NULLS LAST)`). Empty for the
288 /// plain form. Only the collection aggregates honour it;
289 /// other aggregates are order-insensitive and ignore it (PG
290 /// accepts the syntax everywhere too).
291 order_by: Vec<spg_sql::ast::OrderBy>,
292 /// v7.32 (round-29) — `FILTER (WHERE cond)`: a per-row predicate
293 /// evaluated against the source row before accumulation. A row
294 /// whose `cond` is not TRUE (false or NULL) is excluded from this
295 /// aggregate only. `None` for the unfiltered form.
296 filter: Option<Expr>,
297 /// v7.32 (round-29) — ordered-set aggregates only: the *direct*
298 /// argument (the percentile fraction for `percentile_cont/disc`).
299 /// PG requires it constant, so it is evaluated once. `None` for
300 /// `mode()` and for every non-ordered-set aggregate.
301 direct_arg: Option<Expr>,
302 /// v7.33 (array_agg argmax) — set when this spec came from
303 /// `(array_agg(x ORDER BY y))[1]`: accumulate only the first-by-order
304 /// element (a running argmax/argmin) and finalise to that scalar
305 /// value, instead of collecting + sorting + materialising the whole
306 /// per-group array just to take element 1. Returns the element type,
307 /// not the array type.
308 first_ordered: bool,
309}
310
311/// Output of running the aggregate path. Schema describes one row per
312/// group; rows are not yet ORDER BY-sorted (caller does it).
313#[derive(Debug)]
314pub struct AggResult {
315 pub columns: Vec<ColumnSchema>,
316 pub rows: Vec<Row>,
317 /// v7.31 (perf — PG lesson #1, post-LIMIT subquery projection):
318 /// select-list items whose rewritten expr carries a subquery and
319 /// is referenced by neither ORDER BY nor HAVING. Their output
320 /// cells hold NULL placeholders; the caller truncates to
321 /// LIMIT+OFFSET first and only then evaluates these for the
322 /// surviving rows (PG runs the same shape with SubPlan loops=50
323 /// instead of loops=24000). `(output_col, rewritten_expr)`.
324 pub deferred: Vec<(usize, Expr)>,
325 /// Synthetic group rows aligned 1:1 with `rows`; populated only
326 /// when `deferred` is non-empty.
327 pub synth_rows: Vec<Row>,
328 /// Schema the deferred exprs evaluate against.
329 pub synth_schema: Vec<ColumnSchema>,
330}
331
332/// Execute aggregate logic against an already-WHERE-filtered iterator of
333/// rows. `table_alias` is the alias accepted by column resolution.
334#[allow(clippy::too_many_lines)]
335/// v7.25.2 (round-19 A) — caller-injected evaluator for synth-row
336/// expressions that still carry subquery nodes after the rewrite
337/// (correlated subqueries in the select list / HAVING / aggregate
338/// ORDER BY of a GROUP BY query). The engine passes its
339/// correlated-aware evaluator; pure-library callers pass None and
340/// surviving subqueries keep erroring loudly.
341pub type CorrelatedEval<'a> = &'a dyn Fn(&Expr, &Row, &EvalContext<'_>) -> Result<Value, EvalError>;
342
343/// Output of the per-group projection stage (`project_groups`): the
344/// output schema, the projected rows, the synth rows kept alongside
345/// them for post-LIMIT deferred evaluation, the deferred subquery
346/// items, and the rewritten ORDER BY exprs (shared with the sort).
347struct Projection {
348 columns: Vec<ColumnSchema>,
349 out_rows: Vec<Row>,
350 kept_synth: Vec<Row>,
351 deferred: Vec<(usize, Expr)>,
352 order_rewritten: Vec<Expr>,
353}
354
355/// v7.35.0 — detect the `SELECT COUNT(*) FROM … [WHERE …]` shape
356/// (single item, no GROUP BY / HAVING / ORDER BY / DISTINCT /
357/// LIMIT WITH TIES / FILTER / window). For this shape the answer
358/// is exactly `rows.len()` as `BigInt`, no group state needed.
359/// Returns `None` for any deviation so the caller's full pipeline
360/// runs verbatim.
361///
362/// v7.35.2 — also short-circuit `COUNT(<literal>)` (e.g.
363/// `COUNT(1)`) and `COUNT(<column>)` when the column is declared
364/// NOT NULL on the input schema. PG handles both cases as
365/// `COUNT(*)` (the non-null filter is a no-op), so doing the same
366/// here keeps every `count this thing` shape on the same fast path
367/// instead of routing the literal / non-null-col variants through
368/// the four-stage aggregate pipeline.
369fn try_pure_count_star_short_circuit(
370 stmt: &SelectStatement,
371 rows: &[RowRef<'_>],
372 schema_cols: &[ColumnSchema],
373 table_alias: Option<&str>,
374) -> Option<AggResult> {
375 if stmt.distinct
376 || stmt.limit_with_ties
377 || stmt.group_by.is_some()
378 || stmt.having.is_some()
379 || !stmt.order_by.is_empty()
380 {
381 return None;
382 }
383 if stmt.items.len() != 1 {
384 return None;
385 }
386 let SelectItem::Expr { expr, alias } = &stmt.items[0] else {
387 return None;
388 };
389 let Expr::FunctionCall { name, args } = expr else {
390 return None;
391 };
392 if !name.eq_ignore_ascii_case("count") && !name.eq_ignore_ascii_case("count_star") {
393 return None;
394 }
395 let count_star_shape = match args.as_slice() {
396 // `COUNT(*)` parses to `count_star` with no args.
397 [] if name.eq_ignore_ascii_case("count_star") => true,
398 // `COUNT(<literal>)` — the per-row test is "is this literal
399 // non-null?" which is constant, so it's COUNT(*) when the
400 // literal is non-null.
401 [Expr::Literal(lit)] => !matches!(lit, spg_sql::ast::Literal::Null),
402 // `COUNT(<column>)` — same answer as COUNT(*) when the
403 // column is statically declared NOT NULL on the input
404 // schema. Resolve through the alias if one is set.
405 [Expr::Column(c)] => {
406 if let Some(q) = c.qualifier.as_deref()
407 && let Some(alias) = table_alias
408 && !q.eq_ignore_ascii_case(alias)
409 {
410 return None;
411 }
412 schema_cols
413 .iter()
414 .find(|s| s.name.eq_ignore_ascii_case(&c.name))
415 .is_some_and(|s| !s.nullable)
416 }
417 _ => return None,
418 };
419 if !count_star_shape {
420 return None;
421 }
422 let col_name = alias.clone().unwrap_or_else(|| "count".to_string());
423 let count = i64::try_from(rows.len()).unwrap_or(i64::MAX);
424 Some(AggResult {
425 columns: alloc::vec![ColumnSchema::new(col_name, DataType::BigInt, false)],
426 rows: alloc::vec![Row::new(alloc::vec![Value::BigInt(count)])],
427 deferred: Vec::new(),
428 synth_rows: Vec::new(),
429 synth_schema: Vec::new(),
430 })
431}
432
433pub(crate) fn run(
434 stmt: &SelectStatement,
435 rows: &[RowRef<'_>],
436 schema_cols: &[ColumnSchema],
437 table_alias: Option<&str>,
438 correlated_eval: Option<CorrelatedEval<'_>>,
439) -> Result<AggResult, EvalError> {
440 // v7.35.0 — pure `SELECT COUNT(*) FROM … WHERE …` short-circuit.
441 // The caller already filtered rows by WHERE (we run on the
442 // post-WHERE survivor set), so for the canonical pure-COUNT(*)
443 // shape (no GROUP BY / HAVING / ORDER BY / DISTINCT / FILTER /
444 // window) the answer is simply `rows.len()`. The four-stage
445 // aggregate pipeline below (accumulate_groups → build_synth_schema
446 // → finalize_synth_rows → project_groups) collapses to a single
447 // BigInt cell when there's a single group, but each stage still
448 // pays its own allocation tax — group state map, synth schema
449 // vec, finalize loop. `exists_in_60` (mailrs prod #4 baseline)
450 // is exactly this shape on a 25 k-row JOIN.
451 if let Some(short) = try_pure_count_star_short_circuit(stmt, rows, schema_cols, table_alias) {
452 return Ok(short);
453 }
454 let group_exprs: Vec<Expr> = stmt.group_by.clone().unwrap_or_default();
455
456 // Collect aggregate sub-expressions across items + order_by.
457 let mut agg_specs: Vec<AggSpec> = Vec::new();
458 for item in &stmt.items {
459 if let SelectItem::Expr { expr, .. } = item {
460 collect_aggregates(expr, &mut agg_specs);
461 }
462 }
463 for o in &stmt.order_by {
464 collect_aggregates(&o.expr, &mut agg_specs);
465 }
466 if let Some(h) = &stmt.having {
467 collect_aggregates(h, &mut agg_specs);
468 }
469 // v7.17.0 — arity validation. The collector tolerates an
470 // arbitrary positional-arg count; here we enforce the
471 // per-aggregate contract so a malformed call (e.g.
472 // `array_agg()` or `string_agg(x)`) surfaces as a SQL error
473 // rather than silently coercing to a degenerate aggregate.
474 validate_agg_arities(stmt, &agg_specs)?;
475 validate_within_group(&agg_specs)?;
476
477 // (1) Stream the WHERE-filtered rows into insertion-ordered group state.
478 let order = accumulate_groups(
479 rows,
480 &group_exprs,
481 &agg_specs,
482 schema_cols,
483 table_alias,
484 correlated_eval,
485 )?;
486
487 // (2) Build the synthetic per-group schema and finalise each group's row.
488 let synth_schema =
489 build_synth_schema(rows, &group_exprs, &agg_specs, schema_cols, table_alias)?;
490 let synth_rows = finalize_synth_rows(
491 &order,
492 &agg_specs,
493 &synth_schema,
494 rows,
495 schema_cols,
496 table_alias,
497 )?;
498
499 // (3) Rewrite the user's expressions, filter groups by HAVING and project.
500 let Projection {
501 columns,
502 mut out_rows,
503 mut kept_synth,
504 deferred,
505 order_rewritten,
506 } = project_groups(
507 synth_rows,
508 stmt,
509 &group_exprs,
510 &agg_specs,
511 &synth_schema,
512 correlated_eval,
513 )?;
514
515 // (4) ORDER BY on the aggregated output (the caller applies LIMIT).
516 if !stmt.order_by.is_empty() {
517 let (sorted_synth, sorted_out) = sort_synth_by_order_by(
518 &synth_schema,
519 &stmt.order_by,
520 &order_rewritten,
521 kept_synth,
522 out_rows,
523 correlated_eval,
524 )?;
525 kept_synth = sorted_synth;
526 out_rows = sorted_out;
527 }
528
529 let (synth_rows_out, synth_schema_out) = if deferred.is_empty() {
530 (Vec::new(), Vec::new())
531 } else {
532 (kept_synth, synth_schema.clone())
533 };
534 Ok(AggResult {
535 columns,
536 rows: out_rows,
537 deferred,
538 synth_rows: synth_rows_out,
539 synth_schema: synth_schema_out,
540 })
541}
542
543/// v7.32 (round-29) — validate the structural requirements of WITHIN
544/// GROUP (ordered-set / hypothetical-set) aggregates up front, so a
545/// malformed call surfaces as a SQL error rather than a silently
546/// degenerate aggregate.
547fn validate_within_group(agg_specs: &[AggSpec]) -> Result<(), EvalError> {
548 // v7.32 (round-29) — WITHIN GROUP aggregates require the clause (PG
549 // raises a hard error otherwise rather than silently degrading), and
550 // SPG supports the single-sort-key form only.
551 for spec in agg_specs {
552 if is_within_group_name(&spec.name) {
553 if spec.order_by.is_empty() {
554 return Err(EvalError::TypeMismatch {
555 detail: format!("{}() requires WITHIN GROUP (ORDER BY …)", spec.name),
556 });
557 }
558 // mode() is the only WITHIN GROUP aggregate with no direct
559 // argument; the rest carry one (percentile fraction /
560 // hypothetical value).
561 if spec.name != "mode" && spec.direct_arg.is_none() {
562 return Err(EvalError::TypeMismatch {
563 detail: format!("{}() requires a direct argument", spec.name),
564 });
565 }
566 // Multi-key WITHIN GROUP (multiple sort keys / hypothetical
567 // args) is not supported yet — error loudly instead of
568 // silently using only the first key.
569 if spec.order_by.len() > 1 {
570 return Err(EvalError::TypeMismatch {
571 detail: format!(
572 "{}() with multiple WITHIN GROUP sort keys is not supported yet",
573 spec.name
574 ),
575 });
576 }
577 }
578 }
579 Ok(())
580}
581
582/// (1) Stream the WHERE-filtered rows, group by the GROUP BY value
583/// tuple, and update per-group aggregate state. Returns the groups in
584/// insertion order. See `run` for the bind-once fast path rationale.
585#[allow(clippy::too_many_lines, clippy::type_complexity)]
586fn accumulate_groups(
587 rows: &[RowRef<'_>],
588 group_exprs: &[Expr],
589 agg_specs: &[AggSpec],
590 schema_cols: &[ColumnSchema],
591 table_alias: Option<&str>,
592 correlated_eval: Option<CorrelatedEval<'_>>,
593) -> Result<Vec<(Vec<Value>, Vec<AggState>)>, EvalError> {
594 let ctx = EvalContext::new(schema_cols, table_alias);
595 // Map group key (vec of values, encoded as canonical string) -> group state.
596 // v7.32 (architecture v2, P2b) — insertion-ordered group state in
597 // a Vec; the hash map only maps key → index. Removes the parallel
598 // `key_order: Vec<String>` (a second per-group key clone) and the
599 // per-group re-probe `groups[k]` at finalize (24k hash lookups for
600 // the inbox shape). The map owns its key once on vacant insert.
601 let mut order: Vec<(Vec<Value>, Vec<AggState>)> = Vec::new();
602 let mut groups: hashbrown::HashMap<String, usize> = hashbrown::HashMap::new();
603 // When there are no GROUP BY exprs *and* there is at least one aggregate,
604 // every row collapses into a single anonymous group keyed by "".
605 if rows.is_empty() && group_exprs.is_empty() {
606 // Single empty-aggregate group: count=0, sum=0, max=NULL, etc.
607 // No rows follow, so the map is never probed — seed `order` only.
608 let init: Vec<AggState> = (0..agg_specs.len()).map(|_| AggState::default()).collect();
609 order.push((Vec::new(), init));
610 }
611
612 // v7.30 (perf campaign) - hoist the per-row work that doesn't
613 // depend on the row: which group exprs need collation folding
614 // (none, for most queries - the old code cloned the whole
615 // group_vals vec per row just in case).
616 // v7.30 (perf campaign) - the no-tax row loop. When a group
617 // expr or an aggregate argument is a bare column reference
618 // (the overwhelmingly common shape), bind its position ONCE
619 // and read row cells by offset in the loop - no per-row tree
620 // walk, no owned-Value clone out of resolve_column. Anything
621 // more complex keeps the eval path.
622 let col_pos = |e: &Expr| -> Option<usize> {
623 // Qualified references only: the bare-name resolver carries
624 // alias/ambiguity logic the bind-once path must not fork.
625 if let Expr::Column(c) = e
626 && c.qualifier.is_some()
627 {
628 eval::find_column_pos(c, &ctx)
629 } else {
630 None
631 }
632 };
633 let group_pos: Vec<Option<usize>> = group_exprs.iter().map(col_pos).collect();
634 let all_groups_bound = group_pos.iter().all(Option::is_some);
635 let arg_pos: Vec<Option<usize>> = agg_specs
636 .iter()
637 .map(|spec| spec.arg.as_ref().and_then(|e| col_pos(e)))
638 .collect();
639 // v7.36 (perf — mailrs Ask 1 SUM(LENGTH(text_body)) 18ms → ?) —
640 // pre-compile every aggregate arg that's a `fully_compilable`
641 // PURE expression over bound columns. Without this, `LENGTH(col)`
642 // / `COALESCE(col, '')` / `CAST(col AS BIGINT)` etc. ALL fell
643 // through to the `(None, Some(e)) => eval_arg(e, mat, ...)` slow
644 // path that materialises a Cow<Row> per input row — for a 25k-row
645 // JOIN that's 25k full-row clones for one column read. The Step
646 // VM (`eval_compiled_ref`) reads columns by RowRef::get and runs
647 // the same `apply_function` dispatcher with zero materialisation.
648 let arg_compiled: Vec<Option<eval::CompiledExpr>> = agg_specs
649 .iter()
650 .enumerate()
651 .map(|(i, spec)| match (&arg_pos[i], &spec.arg) {
652 (Some(_), _) => None,
653 (None, Some(e)) if eval::fully_compilable(e) => Some(eval::compile_expr(e, &ctx)),
654 _ => None,
655 })
656 .collect();
657 // v7.33 (array_agg perf) — bound positions for each spec's internal
658 // ORDER BY keys, so an ordered aggregate (`array_agg(x ORDER BY y)`)
659 // reads the sort key by reference (RowRef::get) instead of
660 // materialising the whole combined join row per input row just to
661 // eval one bound column. Mirrors arg_pos. On the inbox shape this
662 // turned 24k full-row (~1 KB each) clones into 24k single-cell reads.
663 let order_pos: Vec<Vec<Option<usize>>> = agg_specs
664 .iter()
665 .map(|spec| spec.order_by.iter().map(|o| col_pos(&o.expr)).collect())
666 .collect();
667 // Does any spec need the fully-materialised row in the bound fast
668 // path — a FILTER, a non-bound value arg, a second arg, or a non-bound
669 // ORDER key? When false (every aggregate arg/key is a bound column —
670 // the inbox shape) the bound fast path never materialises a row.
671 let needs_mat = agg_specs.iter().enumerate().any(|(i, s)| {
672 s.filter.is_some()
673 || (s.arg.is_some() && arg_pos[i].is_none() && arg_compiled[i].is_none())
674 || s.arg2.is_some()
675 || order_pos[i].iter().any(Option::is_none)
676 });
677 let ci_positions: Vec<usize> = group_exprs
678 .iter()
679 .enumerate()
680 .filter(|(_, g)| {
681 matches!(
682 eval::column_collation(g, &ctx),
683 Some(spg_storage::Collation::CaseInsensitive)
684 )
685 })
686 .map(|(i, _)| i)
687 .collect();
688 // v7.31 (perf 3e) — per-row scratch buffers. The fast path used
689 // to allocate a key String (and a refs Vec) for EVERY row just
690 // to probe the group map; hits — the overwhelming case — now
691 // touch the allocator zero times.
692 let mut keybuf_s = String::new();
693 // v7.36 — reused Step VM eval stack for compiled aggregate args.
694 let mut eval_stack: Vec<Value> = Vec::new();
695 let mut dkeybuf = String::new();
696 let mut refs: Vec<&Value> = Vec::with_capacity(group_pos.len());
697 // v7.32 (round-31) — an aggregate's argument / FILTER / second arg /
698 // ORDER key may itself be a *correlated* subquery, e.g.
699 // `MAX((SELECT i.v FROM inner i WHERE i.fk = o.id))`. A non-correlated
700 // subquery is pre-resolved to a literal before this loop, but a
701 // correlated one survives as a subquery node and must be evaluated per
702 // outer row through the correlated evaluator — the same hook the
703 // select-list / HAVING / ORDER finalisers already use below. Plain
704 // `eval_expr` would hit "subquery reached row eval".
705 //
706 // The `any_agg_subquery` gate is computed once here so the common case
707 // (no subquery anywhere in the aggregate args — including every hot
708 // scan/group aggregate) short-circuits before the per-row
709 // `expr_has_subquery` walk: `eval_arg` is then exactly `eval_expr`.
710 let any_agg_subquery = correlated_eval.is_some()
711 && agg_specs.iter().any(|s| {
712 s.filter
713 .as_ref()
714 .is_some_and(|e| crate::expr_has_subquery(e))
715 || s.arg.as_ref().is_some_and(|e| crate::expr_has_subquery(e))
716 || s.arg2.as_ref().is_some_and(|e| crate::expr_has_subquery(e))
717 || s.order_by.iter().any(|o| crate::expr_has_subquery(&o.expr))
718 });
719 let eval_arg = |e: &Expr, r: &Row, c: &EvalContext<'_>| -> Result<Value, EvalError> {
720 match correlated_eval {
721 Some(f) if any_agg_subquery && crate::expr_has_subquery(e) => f(e, r, c),
722 _ => eval::eval_expr(e, r, c),
723 }
724 };
725 // v7.36 (perf — mailrs Phase 1, post u64-hash) — single
726 // anonymous group fast path. When the query has no GROUP BY
727 // (`SELECT SUM(LENGTH(col)) FROM ...`, COUNT, AVG, etc.) the
728 // whole input collapses into one group. The fast path below
729 // still pays one `groups.get("")` hash probe per row plus
730 // `entry = &mut order[0]` reindex even when the empty-key
731 // path encodes nothing — measured ~50 ns/row across 25 k rows
732 // = ~1.25 ms of pure bookkeeping on the user_storage_usage
733 // baseline.
734 //
735 // Bypass: lift `entry` outside the loop and feed every row
736 // straight into it. Same `update_state` machinery, zero
737 // per-row hash work, zero per-row index lookup.
738 let single_anon_group = group_exprs.is_empty() && !rows.is_empty();
739 if single_anon_group {
740 // Seed the single group at idx 0 once.
741 let init: Vec<AggState> = (0..agg_specs.len()).map(|_| AggState::default()).collect();
742 order.clear();
743 order.push((Vec::new(), init));
744 }
745 // v7.36 (perf — mailrs Phase 1, count_messages 2.58 → ?) —
746 // `COUNT(*)` short-circuit. For a single-anon-group `COUNT(*)`
747 // with no FILTER / DISTINCT, every survivor counts once — the
748 // answer IS `rows.len()`. Skips the 25 k iterations of
749 // `update_state("count_star", …)` on the mailrs count_messages
750 // shape; the JOIN already produced exactly the set of rows
751 // that must be counted.
752 if single_anon_group
753 && agg_specs.len() == 1
754 && agg_specs[0].name == "count_star"
755 && agg_specs[0].filter.is_none()
756 && agg_specs[0].arg.is_none()
757 && agg_specs[0].arg2.is_none()
758 && agg_specs[0].order_by.is_empty()
759 && !agg_specs[0].distinct
760 {
761 let state = &mut order[0].1[0];
762 state.count = rows.len() as i64;
763 return Ok(order);
764 }
765 // v7.36 (perf — mailrs Phase 1) — `COUNT(<bound col>)` (non-`*`)
766 // collapses to: read the cell, increment when not NULL. Skips
767 // the per-row spec dispatch + `update_state("count", …)`.
768 if single_anon_group
769 && agg_specs.len() == 1
770 && agg_specs[0].name == "count"
771 && agg_specs[0].filter.is_none()
772 && agg_specs[0].arg2.is_none()
773 && agg_specs[0].order_by.is_empty()
774 && !agg_specs[0].distinct
775 && arg_pos[0].is_some()
776 {
777 let p = arg_pos[0].unwrap();
778 let mut count: i64 = 0;
779 for row in rows {
780 if !matches!(row.get(p), Some(Value::Null) | None) {
781 count += 1;
782 }
783 }
784 let state = &mut order[0].1[0];
785 state.count = count;
786 return Ok(order);
787 }
788 // v7.36 (perf — mailrs Phase 1, user_storage_usage 7.5 → ?) —
789 // single-aggregate streaming accumulator. For
790 // `SUM(<compiled-expr>)` / `SUM(<bound col>)` with no GROUP BY,
791 // no FILTER, no arg2, no ORDER BY, no DISTINCT, the whole
792 // per-row work collapses to: eval the arg, match the Value
793 // variant, accumulate. Skips the spec-dispatch loop +
794 // `update_state` per-row name match. On a 25 k-row JOIN
795 // (user_storage_usage `SUM(LENGTH(text_body))`) that's
796 // ~50-100 ns/row of pure spec-dispatch overhead removed.
797 if single_anon_group
798 && agg_specs.len() == 1
799 && agg_specs[0].filter.is_none()
800 && agg_specs[0].arg2.is_none()
801 && agg_specs[0].order_by.is_empty()
802 && !agg_specs[0].distinct
803 && (agg_specs[0].name == "sum" || agg_specs[0].name == "avg")
804 && (arg_pos[0].is_some() || arg_compiled[0].is_some())
805 {
806 let arg_pos0 = arg_pos[0];
807 let arg_c0 = &arg_compiled[0];
808 let mut sum_int: i64 = 0;
809 let mut sum_float: f64 = 0.0;
810 let mut use_float = false;
811 let mut count: i64 = 0;
812 // Borrow-aware fast inner: avoid the per-row clone when arg
813 // is a bound column position.
814 if let Some(p) = arg_pos0 {
815 for row in rows {
816 let v_ref = row.get(p).unwrap_or(&Value::Null);
817 match v_ref {
818 Value::Null => continue,
819 Value::SmallInt(n) => {
820 sum_int += i64::from(*n);
821 count += 1;
822 }
823 Value::Int(n) => {
824 sum_int += i64::from(*n);
825 count += 1;
826 }
827 Value::BigInt(n) => {
828 sum_int += *n;
829 count += 1;
830 }
831 Value::Float(x) => {
832 sum_float += *x;
833 use_float = true;
834 count += 1;
835 }
836 other => {
837 return Err(EvalError::TypeMismatch {
838 detail: format!("sum/avg need numeric, got {:?}", other.data_type()),
839 });
840 }
841 }
842 }
843 } else if let Some(p) = arg_c0.as_ref().and_then(|c| c.as_single_column_length()) {
844 // v7.36 (perf — mailrs Phase 1, user_storage_usage hot
845 // inner) — `SUM(LENGTH(<text col>))` collapses to a
846 // straight scan: read the cell by ref, branch on the
847 // variant, do an ASCII probe + `len()` (or
848 // `chars().count()` on non-ASCII), accumulate. No Step
849 // VM, no stack push/pop, no `BigInt` boxing on the way
850 // out — pure i64 sum. The original Step VM path keeps
851 // running for everything outside this shape (`SUM(col)`,
852 // `SUM(expr)`, multi-step compiled args).
853 for row in rows {
854 let Some(v_ref) = row.get(p) else {
855 continue;
856 };
857 let n = match v_ref {
858 Value::Null => continue,
859 Value::Text(s) => {
860 if s.is_ascii() {
861 s.len() as i64
862 } else {
863 s.chars().count() as i64
864 }
865 }
866 other => {
867 return Err(EvalError::TypeMismatch {
868 detail: format!("length() needs text, got {:?}", other.data_type()),
869 });
870 }
871 };
872 sum_int += n;
873 count += 1;
874 }
875 } else {
876 let c = arg_c0.as_ref().unwrap();
877 for row in rows {
878 let v = eval::eval_compiled_ref(c, row, &ctx, &mut eval_stack)?;
879 match v {
880 Value::Null => continue,
881 Value::SmallInt(n) => {
882 sum_int += i64::from(n);
883 count += 1;
884 }
885 Value::Int(n) => {
886 sum_int += i64::from(n);
887 count += 1;
888 }
889 Value::BigInt(n) => {
890 sum_int += n;
891 count += 1;
892 }
893 Value::Float(x) => {
894 sum_float += x;
895 use_float = true;
896 count += 1;
897 }
898 other => {
899 return Err(EvalError::TypeMismatch {
900 detail: format!("sum/avg need numeric, got {:?}", other.data_type()),
901 });
902 }
903 }
904 }
905 }
906 let state = &mut order[0].1[0];
907 state.count = count;
908 state.sum_int = sum_int;
909 state.sum_float = sum_float;
910 state.use_float = use_float;
911 return Ok(order);
912 }
913 for row in rows {
914 if single_anon_group {
915 let entry = &mut order[0];
916 let mat: Option<Cow<'_, Row>> = if needs_mat { Some(row.as_row()) } else { None };
917 for (i, spec) in agg_specs.iter().enumerate() {
918 if let Some(f) = &spec.filter
919 && !matches!(
920 eval_arg(f, mat.as_deref().expect("needs_mat for FILTER"), &ctx)?,
921 Value::Bool(true)
922 )
923 {
924 continue;
925 }
926 let arg_owned: Value;
927 let arg_ref: &Value = match (&arg_pos[i], &arg_compiled[i], &spec.arg) {
928 (Some(p), _, _) => row.get(*p).unwrap_or(&Value::Null),
929 (None, _, None) => {
930 arg_owned = Value::Bool(true);
931 &arg_owned
932 }
933 (None, Some(c), _) => {
934 arg_owned = eval::eval_compiled_ref(c, row, &ctx, &mut eval_stack)?;
935 &arg_owned
936 }
937 (None, None, Some(e)) => {
938 arg_owned = eval_arg(
939 e,
940 mat.as_deref().expect("needs_mat for non-bound arg"),
941 &ctx,
942 )?;
943 &arg_owned
944 }
945 };
946 let arg2_val = match &spec.arg2 {
947 None => None,
948 Some(e) => Some(eval_arg(
949 e,
950 mat.as_deref().expect("needs_mat for arg2"),
951 &ctx,
952 )?),
953 };
954 let order_keys = if spec.order_by.is_empty() {
955 None
956 } else {
957 let mut keys = Vec::with_capacity(spec.order_by.len());
958 for (k, o) in spec.order_by.iter().enumerate() {
959 let v = if let Some(p) = order_pos[i][k] {
960 row.get(p).cloned().unwrap_or(Value::Null)
961 } else {
962 eval_arg(
963 &o.expr,
964 mat.as_deref().expect("needs_mat for ORDER key"),
965 &ctx,
966 )?
967 };
968 keys.push(v);
969 }
970 Some(keys)
971 };
972 // v7.36 (perf — bugfix v7.36.1 candidate) — first_ordered
973 // was missing from the single_anon_group fast path,
974 // sending `(array_agg(x ORDER BY y))[1]` values into
975 // `update_state(array_agg, …)` whose finalize ignored
976 // the absent `first_best` and returned `[]`. The slow
977 // path below has the same branch — keep them aligned.
978 if spec.first_ordered {
979 if let Some(keys) = order_keys {
980 let st = &mut entry.1[i];
981 let better = match &st.first_best {
982 None => true,
983 Some((bk, _)) => {
984 cmp_order_keys(&spec.order_by, &keys, bk)
985 == core::cmp::Ordering::Less
986 }
987 };
988 if better {
989 st.first_best = Some((keys, arg_ref.clone()));
990 }
991 }
992 continue;
993 }
994 if spec.distinct {
995 encode_key_refs_into(core::slice::from_ref(&arg_ref), &mut dkeybuf);
996 if entry.1[i].seen.contains(dkeybuf.as_str()) {
997 continue;
998 }
999 entry.1[i].seen.insert(dkeybuf.clone());
1000 }
1001 update_state(
1002 &mut entry.1[i],
1003 &spec.name,
1004 arg_ref,
1005 arg2_val.as_ref(),
1006 order_keys,
1007 )?;
1008 }
1009 continue;
1010 }
1011 // Fast key: bound positions + no ci folding -> encode
1012 // straight from borrowed cells; group_vals materialise
1013 // only when the group is NEW.
1014 if all_groups_bound && ci_positions.is_empty() {
1015 refs.clear();
1016 refs.extend(
1017 group_pos
1018 .iter()
1019 .map(|p| row.get(p.unwrap()).unwrap_or(&Value::Null)),
1020 );
1021 encode_key_refs_into(&refs, &mut keybuf_s);
1022 let idx = match groups.get(keybuf_s.as_str()) {
1023 Some(&i) => i,
1024 None => {
1025 let i = order.len();
1026 let init: Vec<AggState> =
1027 (0..agg_specs.len()).map(|_| AggState::default()).collect();
1028 let owned: Vec<Value> = refs.iter().map(|v| (*v).clone()).collect();
1029 order.push((owned, init));
1030 groups.insert(keybuf_s.clone(), i);
1031 i
1032 }
1033 };
1034 let entry = &mut order[idx];
1035 // v7.33 (array_agg perf) — materialise the combined row AT
1036 // MOST once per input row, and only when a spec actually
1037 // needs the eval path (FILTER / non-bound arg / arg2 / non-
1038 // bound ORDER key). Bound args and bound ORDER keys read
1039 // cells by reference below, so the inbox shape (all bound)
1040 // never materialises — killing the per-row ~1 KB clone that
1041 // dominated the ordered-aggregate cost.
1042 let mat: Option<Cow<'_, Row>> = if needs_mat { Some(row.as_row()) } else { None };
1043 for (i, spec) in agg_specs.iter().enumerate() {
1044 // v7.32 (round-29) — FILTER (WHERE cond): exclude rows
1045 // where cond is not TRUE before they reach this
1046 // aggregate's accumulator (and before DISTINCT dedup).
1047 if let Some(f) = &spec.filter
1048 && !matches!(
1049 eval_arg(f, mat.as_deref().expect("needs_mat for FILTER"), &ctx)?,
1050 Value::Bool(true)
1051 )
1052 {
1053 continue;
1054 }
1055 let arg_owned: Value;
1056 let arg_ref: &Value = match (&arg_pos[i], &arg_compiled[i], &spec.arg) {
1057 (Some(p), _, _) => row.get(*p).unwrap_or(&Value::Null),
1058 (None, _, None) => {
1059 arg_owned = Value::Bool(true);
1060 &arg_owned
1061 }
1062 (None, Some(c), _) => {
1063 // v7.36 — compiled-arg fast path. `eval_stack`
1064 // is reused across rows; the Step VM never
1065 // materialises a row for column reads.
1066 arg_owned = eval::eval_compiled_ref(c, row, &ctx, &mut eval_stack)?;
1067 &arg_owned
1068 }
1069 (None, None, Some(e)) => {
1070 arg_owned = eval_arg(
1071 e,
1072 mat.as_deref().expect("needs_mat for non-bound arg"),
1073 &ctx,
1074 )?;
1075 &arg_owned
1076 }
1077 };
1078 let arg2_val = match &spec.arg2 {
1079 None => None,
1080 Some(e) => Some(eval_arg(
1081 e,
1082 mat.as_deref().expect("needs_mat for arg2"),
1083 &ctx,
1084 )?),
1085 };
1086 let order_keys = if spec.order_by.is_empty() {
1087 None
1088 } else {
1089 let mut keys = Vec::with_capacity(spec.order_by.len());
1090 for (k, o) in spec.order_by.iter().enumerate() {
1091 // Bound ORDER key → read the cell by reference; only
1092 // a non-bound key falls to the materialised eval path.
1093 keys.push(match order_pos[i][k] {
1094 Some(p) => row.get(p).cloned().unwrap_or(Value::Null),
1095 None => eval_arg(
1096 &o.expr,
1097 mat.as_deref().expect("needs_mat for non-bound ORDER key"),
1098 &ctx,
1099 )?,
1100 });
1101 }
1102 Some(keys)
1103 };
1104 // v7.33 (array_agg argmax) — first_ordered: keep only the
1105 // running first-by-order element (strict-less replacement
1106 // = ties keep the earliest row, matching the stable-sort
1107 // `[1]`), no array build.
1108 if spec.first_ordered {
1109 if let Some(keys) = order_keys {
1110 let st = &mut entry.1[i];
1111 let better = match &st.first_best {
1112 None => true,
1113 Some((bk, _)) => {
1114 cmp_order_keys(&spec.order_by, &keys, bk)
1115 == core::cmp::Ordering::Less
1116 }
1117 };
1118 if better {
1119 st.first_best = Some((keys, arg_ref.clone()));
1120 }
1121 }
1122 continue;
1123 }
1124 if spec.distinct {
1125 encode_key_refs_into(core::slice::from_ref(&arg_ref), &mut dkeybuf);
1126 if entry.1[i].seen.contains(dkeybuf.as_str()) {
1127 continue;
1128 }
1129 entry.1[i].seen.insert(dkeybuf.clone());
1130 }
1131 update_state(
1132 &mut entry.1[i],
1133 &spec.name,
1134 arg_ref,
1135 arg2_val.as_ref(),
1136 order_keys,
1137 )?;
1138 }
1139 continue;
1140 }
1141 // v7.32 (P4 increment 2) — eval (non-bound) path: present the
1142 // row as a borrowed Row once (Owned → zero-cost borrow; a join
1143 // tuple materialises here exactly once, never on the bound fast
1144 // path above), then the original eval loop runs unchanged.
1145 let row_materialised = row.as_row();
1146 let row: &Row = &row_materialised;
1147 let group_vals: Vec<Value> = group_exprs
1148 .iter()
1149 .map(|g| eval::eval_expr(g, row, &ctx))
1150 .collect::<Result<_, _>>()?;
1151 // v7.17.0 Phase 2.5b — case-insensitive group keying: fold
1152 // only the ci columns, and only when any exist. Display
1153 // value (`group_vals`) stays original — only the key folds.
1154 let key = if ci_positions.is_empty() {
1155 encode_key(&group_vals)
1156 } else {
1157 let mut key_vals = group_vals.clone();
1158 for &i in &ci_positions {
1159 if let Value::Text(s) = &key_vals[i] {
1160 key_vals[i] = Value::Text(s.to_ascii_lowercase());
1161 }
1162 }
1163 encode_key(&key_vals)
1164 };
1165 // Probe by index; the map owns the key once on vacant insert.
1166 let idx = match groups.get(key.as_str()) {
1167 Some(&i) => i,
1168 None => {
1169 let i = order.len();
1170 let init: Vec<AggState> =
1171 (0..agg_specs.len()).map(|_| AggState::default()).collect();
1172 order.push((group_vals.clone(), init));
1173 groups.insert(key, i);
1174 i
1175 }
1176 };
1177 let entry = &mut order[idx];
1178 for (i, spec) in agg_specs.iter().enumerate() {
1179 // v7.32 (round-29) — FILTER (WHERE cond): exclude rows where
1180 // cond is not TRUE before accumulation (and before DISTINCT).
1181 if let Some(f) = &spec.filter
1182 && !matches!(eval_arg(f, row, &ctx)?, Value::Bool(true))
1183 {
1184 continue;
1185 }
1186 let arg_val = match &spec.arg {
1187 None => Value::Bool(true), // count_star: sentinel non-null
1188 Some(e) => eval_arg(e, row, &ctx)?,
1189 };
1190 // v7.17.0 — `string_agg(value, separator)` evaluates the
1191 // separator per row but PG treats it as constant; we
1192 // pass the per-row value into update_state so a future
1193 // varying-separator caller still sees correct output,
1194 // even though SPG (like PG) only uses the most recent.
1195 let arg2_val = match &spec.arg2 {
1196 None => None,
1197 Some(e) => Some(eval_arg(e, row, &ctx)?),
1198 };
1199 // v7.24 (round-16 A) — aggregate-internal ORDER BY:
1200 // evaluate the key tuple against the source row.
1201 let order_keys = if spec.order_by.is_empty() {
1202 None
1203 } else {
1204 let mut keys = Vec::with_capacity(spec.order_by.len());
1205 for o in &spec.order_by {
1206 keys.push(eval_arg(&o.expr, row, &ctx)?);
1207 }
1208 Some(keys)
1209 };
1210 // v7.33 (array_agg argmax) — first_ordered: keep the running
1211 // first-by-order element only (mirrors the bound fast path).
1212 if spec.first_ordered {
1213 if let Some(keys) = order_keys {
1214 let st = &mut entry.1[i];
1215 let better = match &st.first_best {
1216 None => true,
1217 Some((bk, _)) => {
1218 cmp_order_keys(&spec.order_by, &keys, bk) == core::cmp::Ordering::Less
1219 }
1220 };
1221 if better {
1222 st.first_best = Some((keys, arg_val.clone()));
1223 }
1224 }
1225 continue;
1226 }
1227 // v7.25 (round-17) — DISTINCT: drop repeated inputs
1228 // before they reach the accumulator. NULLs flow through
1229 // (each aggregate's own NULL rule applies; PG also
1230 // treats NULL as a single distinct value for array_agg).
1231 if spec.distinct {
1232 let key = encode_key(core::slice::from_ref(&arg_val));
1233 if !entry.1[i].seen.insert(key) {
1234 continue;
1235 }
1236 }
1237 update_state(
1238 &mut entry.1[i],
1239 &spec.name,
1240 &arg_val,
1241 arg2_val.as_ref(),
1242 order_keys,
1243 )?;
1244 }
1245 }
1246 Ok(order)
1247}
1248
1249/// (2a) Build the synthetic per-group schema: `__grp_0..K` then
1250/// `__agg_0..N`. Group types are probed from the first row; aggregate
1251/// types from each spec.
1252fn build_synth_schema(
1253 rows: &[RowRef<'_>],
1254 group_exprs: &[Expr],
1255 agg_specs: &[AggSpec],
1256 schema_cols: &[ColumnSchema],
1257 table_alias: Option<&str>,
1258) -> Result<Vec<ColumnSchema>, EvalError> {
1259 let ctx = EvalContext::new(schema_cols, table_alias);
1260 // Build synthetic schema: __grp_0..K then __agg_0..N.
1261 let group_types: Vec<DataType> = if rows.is_empty() {
1262 // Use Text as a safe stand-in — empty result means schema isn't
1263 // observable. Avoids needing to evaluate group exprs on no row.
1264 group_exprs.iter().map(|_| DataType::Text).collect()
1265 } else {
1266 let probe_row = rows[0].as_row();
1267 let probe: &Row = &probe_row;
1268 group_exprs
1269 .iter()
1270 .map(|g| {
1271 eval::eval_expr(g, probe, &ctx).map(|v| v.data_type().unwrap_or(DataType::Text))
1272 })
1273 .collect::<Result<_, _>>()?
1274 };
1275 let agg_types: Vec<DataType> = agg_specs
1276 .iter()
1277 .map(|spec| infer_agg_type(spec, schema_cols))
1278 .collect();
1279 let mut synth_schema: Vec<ColumnSchema> = Vec::new();
1280 for (i, ty) in group_types.iter().enumerate() {
1281 synth_schema.push(ColumnSchema::new(format!("__grp_{i}"), *ty, true));
1282 }
1283 for (i, ty) in agg_types.iter().enumerate() {
1284 synth_schema.push(ColumnSchema::new(format!("__agg_{i}"), *ty, true));
1285 }
1286 Ok(synth_schema)
1287}
1288
1289/// (2b) Materialise one synthetic row per group (insertion order):
1290/// apply each aggregate's internal ORDER BY, then finalise the running
1291/// state into the group + aggregate cells.
1292/// v7.33 — compare two aggregate-internal ORDER BY key tuples under the
1293/// per-key DESC / NULLS directives. This is the exact comparator the
1294/// finalize sort uses, factored out so the `first_ordered` argmax
1295/// accumulator's "keep first" decision is provably identical to taking
1296/// element `[1]` of the fully-sorted array.
1297fn cmp_order_keys(
1298 order_by: &[spg_sql::ast::OrderBy],
1299 a: &[Value],
1300 b: &[Value],
1301) -> core::cmp::Ordering {
1302 for (k, o) in order_by.iter().enumerate() {
1303 let cmp = crate::order_by_value_cmp(o.desc, o.nulls_first, &a[k], &b[k]);
1304 if cmp != core::cmp::Ordering::Equal {
1305 return cmp;
1306 }
1307 }
1308 core::cmp::Ordering::Equal
1309}
1310
1311fn finalize_synth_rows(
1312 order: &[(Vec<Value>, Vec<AggState>)],
1313 agg_specs: &[AggSpec],
1314 synth_schema: &[ColumnSchema],
1315 rows: &[RowRef<'_>],
1316 schema_cols: &[ColumnSchema],
1317 table_alias: Option<&str>,
1318) -> Result<Vec<Row>, EvalError> {
1319 let ctx = EvalContext::new(schema_cols, table_alias);
1320 // v7.32 (round-29) — ordered-set direct arguments (the percentile
1321 // fraction) are constant per PG, so evaluate each once up front.
1322 let direct_arg_vals: Vec<Option<Value>> = agg_specs
1323 .iter()
1324 .map(|spec| match (&spec.direct_arg, rows.first()) {
1325 (Some(e), Some(r)) => eval::eval_expr(e, &r.as_row(), &ctx).map(Some),
1326 _ => Ok(None),
1327 })
1328 .collect::<Result<_, _>>()?;
1329
1330 // Materialise synthetic rows (insertion order = `order`).
1331 let mut synth_rows: Vec<Row> = Vec::new();
1332 for (gvals, states) in order {
1333 let mut values: Vec<Value> = Vec::with_capacity(synth_schema.len());
1334 values.extend(gvals.iter().cloned());
1335 for (i, st) in states.iter().enumerate() {
1336 // v7.33 (array_agg argmax) — first_ordered: the running
1337 // first-by-order value IS the result; no array build/sort.
1338 if agg_specs[i].first_ordered {
1339 values.push(
1340 st.first_best
1341 .as_ref()
1342 .map_or(Value::Null, |(_, v)| v.clone()),
1343 );
1344 continue;
1345 }
1346 // v7.24 (round-16 A) — order the collected items per the
1347 // aggregate-internal ORDER BY before finalize consumes
1348 // them.
1349 let st_sorted;
1350 let st_final: &AggState =
1351 if !agg_specs[i].order_by.is_empty() && st.item_keys.len() == st.items.len() {
1352 let mut idx: Vec<usize> = (0..st.items.len()).collect();
1353 let ob = &agg_specs[i].order_by;
1354 idx.sort_by(|&x, &y| cmp_order_keys(ob, &st.item_keys[x], &st.item_keys[y]));
1355 let mut sorted = st.clone();
1356 sorted.items = idx.iter().map(|&j| st.items[j].clone()).collect();
1357 st_sorted = sorted;
1358 &st_sorted
1359 } else {
1360 st
1361 };
1362 // Ordered-set aggregates compute from the sorted items + the
1363 // direct fraction; everything else uses the running state.
1364 let v = if is_within_group_name(&agg_specs[i].name) {
1365 finalize_ordered_set(
1366 &agg_specs[i].name,
1367 st_final,
1368 direct_arg_vals[i].as_ref(),
1369 agg_specs[i].order_by.first(),
1370 )
1371 } else {
1372 finalize(&agg_specs[i].name, st_final)
1373 };
1374 values.push(v);
1375 }
1376 synth_rows.push(Row::new(values));
1377 }
1378 Ok(synth_rows)
1379}
1380
1381/// (3) Rewrite the user's SELECT items + HAVING to reference the
1382/// synthetic columns, filter groups by HAVING, and project each
1383/// surviving group into an output row. The synth rows ride alongside
1384/// (`kept_synth`) so post-LIMIT deferred subqueries can evaluate later.
1385#[allow(clippy::too_many_lines)]
1386fn project_groups(
1387 synth_rows: Vec<Row>,
1388 stmt: &SelectStatement,
1389 group_exprs: &[Expr],
1390 agg_specs: &[AggSpec],
1391 synth_schema: &[ColumnSchema],
1392 correlated_eval: Option<CorrelatedEval<'_>>,
1393) -> Result<Projection, EvalError> {
1394 // Rewrite the user's SELECT items + ORDER BY to reference synthetic
1395 // columns. After rewriting, every remaining `Expr::Column` must
1396 // resolve against the synthetic schema (i.e. must have been a GROUP
1397 // BY expression).
1398 let columns: Vec<ColumnSchema> = stmt
1399 .items
1400 .iter()
1401 .map(|item| match item {
1402 SelectItem::Wildcard => Err(EvalError::TypeMismatch {
1403 detail: "SELECT * with aggregates is not supported".into(),
1404 }),
1405 SelectItem::Expr { expr, alias } => {
1406 let rewritten = rewrite_expr(expr, group_exprs, agg_specs);
1407 let name = alias.clone().unwrap_or_else(|| expr.to_string());
1408 Ok(ColumnSchema::new(
1409 name,
1410 agg_or_group_type(&rewritten, synth_schema),
1411 true,
1412 ))
1413 }
1414 })
1415 .collect::<Result<_, _>>()?;
1416
1417 // Project per synthetic row. HAVING filters out groups *before*
1418 // we keep the projected row — same semantics as PG: HAVING runs
1419 // against the aggregated row (so `HAVING count(*) > 1` works) and
1420 // sees only group-by'd columns plus aggregate values.
1421 let synth_ctx = EvalContext::new(synth_schema, None);
1422 let having_rewritten = stmt
1423 .having
1424 .as_ref()
1425 .map(|h| rewrite_expr(h, group_exprs, agg_specs));
1426 // v7.30 (phase 3e-1) - rewrite SELECT items ONCE. This ran per
1427 // GROUP (23.5k x 9 items of AST cloning = ~48% of the inbox
1428 // query in sampled stacks); the rewrite is group-independent.
1429 // Stable addresses also let the per-expression subquery plans
1430 // (v7.29 3c) hit across groups instead of rebuilding.
1431 let items_rewritten: alloc::vec::Vec<Option<Expr>> = stmt
1432 .items
1433 .iter()
1434 .map(|item| match item {
1435 SelectItem::Expr { expr, .. } => Some(rewrite_expr(expr, group_exprs, agg_specs)),
1436 SelectItem::Wildcard => None,
1437 })
1438 .collect();
1439 // v7.31 (perf — PG lesson #1): subquery-bearing select items
1440 // deferred to post-LIMIT, when no sort/filter key can observe
1441 // them. ORDER BY rewrites are hoisted here so the safety check
1442 // and the sort below share one rewrite pass.
1443 let order_rewritten: Vec<Expr> = stmt
1444 .order_by
1445 .iter()
1446 .map(|o| rewrite_expr(&o.expr, group_exprs, agg_specs))
1447 .collect();
1448 let defer_enabled = correlated_eval.is_some()
1449 && !stmt.distinct
1450 && !having_rewritten
1451 .as_ref()
1452 .is_some_and(crate::expr_has_subquery)
1453 && !order_rewritten.iter().any(crate::expr_has_subquery);
1454 let deferred: Vec<(usize, Expr)> = if defer_enabled {
1455 items_rewritten
1456 .iter()
1457 .enumerate()
1458 .filter_map(|(i, r)| {
1459 r.as_ref()
1460 .filter(|e| crate::expr_has_subquery(e))
1461 .map(|e| (i, e.clone()))
1462 })
1463 .collect()
1464 } else {
1465 Vec::new()
1466 };
1467 // v7.32 (architecture v2, P2) — compile the per-group synth-row
1468 // expressions ONCE. The projection / HAVING here run per GROUP
1469 // (24k for the inbox shape) × per item; the rewritten exprs are
1470 // mostly `Column(__agg_N)` / `Column(__grp_K)` against the synth
1471 // schema — flat step programs, no tree walk per group.
1472 let having_compiled = having_rewritten
1473 .as_ref()
1474 .filter(|h| eval::fully_compilable(h))
1475 .map(|h| eval::compile_expr(h, &synth_ctx));
1476 let items_compiled: Vec<Option<eval::CompiledExpr>> = items_rewritten
1477 .iter()
1478 .enumerate()
1479 .map(|(i, r)| {
1480 r.as_ref()
1481 .filter(|e| !deferred.iter().any(|(c, _)| *c == i) && eval::fully_compilable(e))
1482 .map(|e| eval::compile_expr(e, &synth_ctx))
1483 })
1484 .collect();
1485 let mut kept_synth: Vec<Row> = Vec::new();
1486 let mut out_rows: Vec<Row> = Vec::new();
1487 let mut stack: Vec<Value> = Vec::new();
1488 for srow in synth_rows {
1489 if let Some(hc) = &having_compiled {
1490 let cond = eval::eval_compiled(hc, &srow, &synth_ctx, &mut stack)?;
1491 if !matches!(cond, Value::Bool(true)) {
1492 continue;
1493 }
1494 } else if let Some(h) = &having_rewritten {
1495 let cond = match correlated_eval {
1496 Some(f) if crate::expr_has_subquery(h) => f(h, &srow, &synth_ctx)?,
1497 _ => eval::eval_expr(h, &srow, &synth_ctx)?,
1498 };
1499 if !matches!(cond, Value::Bool(true)) {
1500 continue;
1501 }
1502 }
1503 let mut values: Vec<Value> = Vec::with_capacity(columns.len());
1504 for (i, rewritten) in items_rewritten.iter().enumerate() {
1505 let Some(rewritten) = rewritten else { continue };
1506 if deferred.iter().any(|(c, _)| *c == i) {
1507 values.push(Value::Null);
1508 continue;
1509 }
1510 values.push(if let Some(cc) = &items_compiled[i] {
1511 eval::eval_compiled(cc, &srow, &synth_ctx, &mut stack)?
1512 } else {
1513 match correlated_eval {
1514 Some(f) if crate::expr_has_subquery(rewritten) => {
1515 f(rewritten, &srow, &synth_ctx)?
1516 }
1517 _ => eval::eval_expr(rewritten, &srow, &synth_ctx)?,
1518 }
1519 });
1520 }
1521 kept_synth.push(srow);
1522 out_rows.push(Row::new(values));
1523 }
1524 Ok(Projection {
1525 columns,
1526 out_rows,
1527 kept_synth,
1528 deferred,
1529 order_rewritten,
1530 })
1531}
1532
1533/// (4) Sort the projected output by the rewritten ORDER BY keys. The
1534/// synth rows ride through the sort so deferred subqueries evaluate
1535/// against the surviving groups after the caller's LIMIT truncation.
1536fn sort_synth_by_order_by(
1537 synth_schema: &[ColumnSchema],
1538 order_by: &[spg_sql::ast::OrderBy],
1539 order_rewritten: &[Expr],
1540 mut kept_synth: Vec<Row>,
1541 mut out_rows: Vec<Row>,
1542 correlated_eval: Option<CorrelatedEval<'_>>,
1543) -> Result<(Vec<Row>, Vec<Row>), EvalError> {
1544 let synth_ctx = EvalContext::new(synth_schema, None);
1545 // v6.4.0 — multi-key ORDER BY on aggregate output. Each key
1546 // gets its own rewrite + per-key DESC flag. (Rewrites hoisted
1547 // above as `order_rewritten` — shared with the deferral
1548 // safety check.)
1549 let keys_meta: Vec<(bool, Option<bool>)> =
1550 order_by.iter().map(|o| (o.desc, o.nulls_first)).collect();
1551 // P2: compile order-by keys once (per-group sort keys are
1552 // the same `__agg_N` / `__grp_K` shape as the projection).
1553 let order_compiled: Vec<Option<eval::CompiledExpr>> = order_rewritten
1554 .iter()
1555 .map(|e| {
1556 Some(e)
1557 .filter(|e| eval::fully_compilable(e))
1558 .map(|e| eval::compile_expr(e, &synth_ctx))
1559 })
1560 .collect();
1561 // The synth row rides through the sort so deferred exprs can
1562 // evaluate against the surviving groups after the caller's
1563 // LIMIT truncation.
1564 let mut keystack: Vec<Value> = Vec::new();
1565 let mut tagged: Vec<(Vec<Value>, Row, Row)> = Vec::with_capacity(kept_synth.len());
1566 for (s, o) in kept_synth.into_iter().zip(out_rows) {
1567 let mut keys = Vec::with_capacity(order_rewritten.len());
1568 for (e, oc) in order_rewritten.iter().zip(&order_compiled) {
1569 keys.push(if let Some(oc) = oc {
1570 eval::eval_compiled(oc, &s, &synth_ctx, &mut keystack)?
1571 } else {
1572 match correlated_eval {
1573 Some(f) if crate::expr_has_subquery(e) => f(e, &s, &synth_ctx)?,
1574 _ => eval::eval_expr(e, &s, &synth_ctx)?,
1575 }
1576 });
1577 }
1578 tagged.push((keys, s, o));
1579 }
1580 tagged.sort_by(|a, b| {
1581 use core::cmp::Ordering;
1582 for (i, (ka, kb)) in a.0.iter().zip(b.0.iter()).enumerate() {
1583 let (desc, nf) = keys_meta[i];
1584 let cmp = crate::order_by_value_cmp(desc, nf, ka, kb);
1585 if cmp != Ordering::Equal {
1586 return cmp;
1587 }
1588 }
1589 Ordering::Equal
1590 });
1591 kept_synth = Vec::with_capacity(tagged.len());
1592 out_rows = Vec::with_capacity(tagged.len());
1593 for (_, s, o) in tagged {
1594 kept_synth.push(s);
1595 out_rows.push(o);
1596 }
1597 Ok((kept_synth, out_rows))
1598}
1599
1600/// v7.17.0 — walk the statement again to validate the positional
1601/// arity of every aggregate call site. Done after AST collection
1602/// rather than inside `collect_aggregates` so the collector stays
1603/// infallible; callers in `run()` can do a single early-error
1604/// exit before any per-row work.
1605fn validate_agg_arities(stmt: &SelectStatement, _specs: &[AggSpec]) -> Result<(), EvalError> {
1606 fn walk(e: &Expr) -> Result<(), EvalError> {
1607 if let Expr::FunctionCall { name, args } = e {
1608 let lower = name.to_ascii_lowercase();
1609 let expected: Option<usize> = match lower.as_str() {
1610 "count_star" => Some(0),
1611 "count" | "sum" | "avg" | "min" | "max" | "array_agg"
1612 // v7.17.0 — boolean aggregates also take exactly
1613 // one arg. `every` is an alias normalised inside
1614 // collect_aggregates / rewrite_expr.
1615 | "bool_and" | "bool_or" | "every"
1616 // v7.32 (round-29) — statistical + bitwise aggregates
1617 // + single-arg JSON aggregate.
1618 | "stddev" | "stddev_samp" | "stddev_pop"
1619 | "variance" | "var_samp" | "var_pop"
1620 | "bit_and" | "bit_or" | "bit_xor"
1621 | "json_agg" | "jsonb_agg" => Some(1),
1622 // v7.32 (round-29) — two-argument aggregates: string_agg,
1623 // the regression family f(Y, X), and json_object_agg.
1624 "string_agg"
1625 | "covar_pop" | "covar_samp" | "corr"
1626 | "regr_count" | "regr_avgx" | "regr_avgy" | "regr_slope"
1627 | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy"
1628 | "json_object_agg" | "jsonb_object_agg" => Some(2),
1629 _ => None,
1630 };
1631 if let Some(want) = expected
1632 && args.len() != want
1633 {
1634 return Err(EvalError::TypeMismatch {
1635 detail: alloc::format!("{lower}() takes {want} arg(s), got {}", args.len()),
1636 });
1637 }
1638 for a in args {
1639 walk(a)?;
1640 }
1641 } else if let Expr::Binary { lhs, rhs, .. } = e {
1642 walk(lhs)?;
1643 walk(rhs)?;
1644 } else if let Expr::Unary { expr, .. }
1645 | Expr::Cast { expr, .. }
1646 | Expr::IsNull { expr, .. } = e
1647 {
1648 walk(expr)?;
1649 }
1650 Ok(())
1651 }
1652 for item in &stmt.items {
1653 if let SelectItem::Expr { expr, .. } = item {
1654 walk(expr)?;
1655 }
1656 }
1657 for o in &stmt.order_by {
1658 walk(&o.expr)?;
1659 }
1660 if let Some(h) = &stmt.having {
1661 walk(h)?;
1662 }
1663 Ok(())
1664}
1665
1666/// v7.33 (array_agg argmax) — recognise `(array_agg(x ORDER BY y))[1]`,
1667/// the argmax/argmin idiom: a non-DISTINCT ordered `array_agg`
1668/// subscripted by the constant 1. Returns `(value_arg, order_by,
1669/// filter)` on a match. When matched, the whole per-group array build +
1670/// sort + materialise is replaced by a running first-by-order scalar
1671/// accumulator and the subscript node is consumed (replaced by the
1672/// synthetic column). collect_aggregates and rewrite_expr share this one
1673/// matcher so their `__agg_<i>` assignment stays in lockstep.
1674fn first_ordered_array_agg(e: &Expr) -> Option<(&Expr, &[spg_sql::ast::OrderBy], Option<&Expr>)> {
1675 let Expr::ArraySubscript { target, index } = e else {
1676 return None;
1677 };
1678 if !matches!(
1679 index.as_ref(),
1680 Expr::Literal(spg_sql::ast::Literal::Integer(1))
1681 ) {
1682 return None;
1683 }
1684 let Expr::AggregateOrdered {
1685 call,
1686 order_by,
1687 distinct,
1688 filter,
1689 } = target.as_ref()
1690 else {
1691 return None;
1692 };
1693 if *distinct || order_by.is_empty() {
1694 return None;
1695 }
1696 let Expr::FunctionCall { name, args } = call.as_ref() else {
1697 return None;
1698 };
1699 if !name.eq_ignore_ascii_case("array_agg") || args.len() != 1 {
1700 return None;
1701 }
1702 Some((&args[0], order_by, filter.as_deref()))
1703}
1704
1705fn collect_aggregates(e: &Expr, out: &mut Vec<AggSpec>) {
1706 match e {
1707 // v7.24 (round-16 A) — ordered aggregate: register the inner
1708 // call's spec with the ordering attached.
1709 Expr::AggregateOrdered {
1710 call,
1711 order_by,
1712 distinct,
1713 filter,
1714 } => {
1715 if let Expr::FunctionCall { name, args } = call.as_ref() {
1716 let lower = name.to_ascii_lowercase();
1717 if is_aggregate_name(&lower) {
1718 let canonical = if lower == "every" {
1719 "bool_and".to_string()
1720 } else {
1721 lower
1722 };
1723 // Ordered-set aggregates (`percentile_cont(f)
1724 // WITHIN GROUP (ORDER BY x)`) take the value to
1725 // aggregate from the sort spec and the in-parens
1726 // arg as the direct (fraction) argument.
1727 let ordered_set = is_within_group_name(&canonical);
1728 let (arg, direct_arg) = if ordered_set {
1729 (
1730 order_by.first().map(|o| o.expr.clone()),
1731 args.first().cloned(),
1732 )
1733 } else {
1734 (args.first().cloned(), None)
1735 };
1736 let spec = AggSpec {
1737 name: canonical.clone(),
1738 arg,
1739 arg2: if agg_uses_second_arg(&canonical) {
1740 args.get(1).cloned()
1741 } else {
1742 None
1743 },
1744 distinct: *distinct,
1745 order_by: order_by.clone(),
1746 filter: filter.as_deref().cloned(),
1747 direct_arg,
1748 first_ordered: false,
1749 };
1750 if !out.iter().any(|s| {
1751 s.name == spec.name
1752 && s.arg == spec.arg
1753 && s.arg2 == spec.arg2
1754 && s.distinct == spec.distinct
1755 && s.order_by == spec.order_by
1756 && s.filter == spec.filter
1757 && s.direct_arg == spec.direct_arg
1758 && s.first_ordered == spec.first_ordered
1759 }) {
1760 out.push(spec);
1761 }
1762 return;
1763 }
1764 }
1765 collect_aggregates(call, out);
1766 for o in order_by {
1767 collect_aggregates(&o.expr, out);
1768 }
1769 }
1770 Expr::FunctionCall { name, args } => {
1771 let lower = name.to_ascii_lowercase();
1772 if is_aggregate_name(&lower) {
1773 let arg = if lower == "count_star" {
1774 None
1775 } else {
1776 args.first().cloned()
1777 };
1778 // v7.17.0 — second positional arg for
1779 // `string_agg(value, separator)`; v7.32 — also the
1780 // regression family `f(Y, X)` and `json_object_agg`.
1781 let arg2 = if agg_uses_second_arg(&lower) {
1782 args.get(1).cloned()
1783 } else {
1784 None
1785 };
1786 // v7.17.0 — `every` is the SQL-standard alias for
1787 // `bool_and`; collapse at collection time so
1788 // update_state / finalize need only one arm.
1789 let canonical = if lower == "every" {
1790 "bool_and".to_string()
1791 } else {
1792 lower
1793 };
1794 let spec = AggSpec {
1795 name: canonical,
1796 arg: arg.clone(),
1797 arg2: arg2.clone(),
1798 distinct: false,
1799 order_by: Vec::new(),
1800 filter: None,
1801 direct_arg: None,
1802 first_ordered: false,
1803 };
1804 if !out.iter().any(|s| {
1805 s.name == spec.name
1806 && s.arg == spec.arg
1807 && s.arg2 == spec.arg2
1808 && !s.distinct
1809 && s.order_by == spec.order_by
1810 && s.filter.is_none()
1811 && !s.first_ordered
1812 }) {
1813 out.push(spec);
1814 }
1815 // Don't recurse into the arg — nested aggregates are
1816 // illegal in standard SQL.
1817 } else {
1818 for a in args {
1819 collect_aggregates(a, out);
1820 }
1821 }
1822 }
1823 Expr::Binary { lhs, rhs, .. } => {
1824 collect_aggregates(lhs, out);
1825 collect_aggregates(rhs, out);
1826 }
1827 Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
1828 collect_aggregates(expr, out);
1829 }
1830 Expr::Like { expr, pattern, .. } => {
1831 collect_aggregates(expr, out);
1832 collect_aggregates(pattern, out);
1833 }
1834 Expr::InList { expr, list, .. } => {
1835 collect_aggregates(expr, out);
1836 for item in list {
1837 collect_aggregates(item, out);
1838 }
1839 }
1840 Expr::Extract { source, .. } => collect_aggregates(source, out),
1841 // v4.10 subquery + v4.12 window / Literal / Column —
1842 // non-recursing leaves for the aggregate collector.
1843 Expr::ScalarSubquery(_)
1844 | Expr::Exists { .. }
1845 | Expr::InSubquery { .. }
1846 | Expr::WindowFunction { .. }
1847 | Expr::Literal(_)
1848 | Expr::Placeholder(_)
1849 | Expr::Column(_) => {}
1850 // v7.10.10 — recurse into array constructor children +
1851 // subscript / ANY/ALL operands.
1852 Expr::Array(items) => {
1853 for elem in items {
1854 collect_aggregates(elem, out);
1855 }
1856 }
1857 Expr::ArraySubscript { target, index } => {
1858 // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]`
1859 // collects as a first_ordered spec; the subscript is consumed
1860 // here (do NOT recurse into the array_agg, or it would also
1861 // register a plain full-array spec).
1862 if let Some((arg, order_by, filter)) = first_ordered_array_agg(e) {
1863 let spec = AggSpec {
1864 name: "array_agg".to_string(),
1865 arg: Some(arg.clone()),
1866 arg2: None,
1867 distinct: false,
1868 order_by: order_by.to_vec(),
1869 filter: filter.cloned(),
1870 direct_arg: None,
1871 first_ordered: true,
1872 };
1873 if !out.iter().any(|s| {
1874 s.name == spec.name
1875 && s.arg == spec.arg
1876 && s.order_by == spec.order_by
1877 && s.filter == spec.filter
1878 && s.first_ordered
1879 }) {
1880 out.push(spec);
1881 }
1882 return;
1883 }
1884 collect_aggregates(target, out);
1885 collect_aggregates(index, out);
1886 }
1887 Expr::AnyAll { expr, array, .. } => {
1888 collect_aggregates(expr, out);
1889 collect_aggregates(array, out);
1890 }
1891 Expr::Case {
1892 operand,
1893 branches,
1894 else_branch,
1895 } => {
1896 if let Some(o) = operand {
1897 collect_aggregates(o, out);
1898 }
1899 for (w, t) in branches {
1900 collect_aggregates(w, out);
1901 collect_aggregates(t, out);
1902 }
1903 if let Some(e) = else_branch {
1904 collect_aggregates(e, out);
1905 }
1906 }
1907 }
1908}
1909
1910fn update_state(
1911 st: &mut AggState,
1912 name: &str,
1913 v: &Value,
1914 arg2: Option<&Value>,
1915 order_keys: Option<Vec<Value>>,
1916) -> Result<(), EvalError> {
1917 let is_null = matches!(v, Value::Null);
1918 match name {
1919 "count_star" => st.count += 1,
1920 "count" => {
1921 if !is_null {
1922 st.count += 1;
1923 }
1924 }
1925 "sum" | "avg" => {
1926 if is_null {
1927 return Ok(());
1928 }
1929 st.count += 1;
1930 match v {
1931 Value::Int(n) => st.sum_int += i64::from(*n),
1932 Value::BigInt(n) => st.sum_int += *n,
1933 Value::Float(x) => {
1934 st.use_float = true;
1935 st.sum_float += *x;
1936 }
1937 other => {
1938 return Err(EvalError::TypeMismatch {
1939 detail: format!("sum/avg need numeric, got {:?}", other.data_type()),
1940 });
1941 }
1942 }
1943 }
1944 "min" => {
1945 if is_null {
1946 return Ok(());
1947 }
1948 match &st.extreme {
1949 None => st.extreme = Some(v.clone()),
1950 Some(cur) => {
1951 if value_cmp(v, cur) == core::cmp::Ordering::Less {
1952 st.extreme = Some(v.clone());
1953 }
1954 }
1955 }
1956 }
1957 "max" => {
1958 if is_null {
1959 return Ok(());
1960 }
1961 match &st.extreme {
1962 None => st.extreme = Some(v.clone()),
1963 Some(cur) => {
1964 if value_cmp(v, cur) == core::cmp::Ordering::Greater {
1965 st.extreme = Some(v.clone());
1966 }
1967 }
1968 }
1969 }
1970 // v7.17.0 — string_agg(value, separator). NULL value is
1971 // skipped (PG aggregate-skip-null). Separator captured
1972 // from the latest row that flows through; matches PG's
1973 // semantics of evaluating the separator per row but using
1974 // the last value at finalize time (in practice it's
1975 // constant). count is bumped so we can distinguish "empty
1976 // group → NULL" from "all-NULL group → NULL".
1977 "string_agg" => {
1978 if let Some(sep) = arg2
1979 && let Value::Text(s) = sep
1980 {
1981 st.separator = Some(s.clone());
1982 }
1983 if is_null {
1984 return Ok(());
1985 }
1986 if let Value::Text(s) = v {
1987 st.items.push(Value::Text(s.clone()));
1988 if let Some(k) = order_keys {
1989 st.item_keys.push(k);
1990 }
1991 st.count += 1;
1992 } else {
1993 return Err(EvalError::TypeMismatch {
1994 detail: format!("string_agg requires text value, got {:?}", v.data_type()),
1995 });
1996 }
1997 }
1998 // v7.17.0 — array_agg(value). Unlike string_agg, NULL
1999 // elements are KEPT in the array (PG behaviour); the
2000 // result is NULL only when ZERO rows fed in. Element type
2001 // is locked from the first row's value type; subsequent
2002 // rows must match (PG also rejects mixed-type array_agg).
2003 "array_agg" => {
2004 st.items.push(v.clone());
2005 if let Some(k) = order_keys {
2006 st.item_keys.push(k);
2007 }
2008 st.count += 1;
2009 }
2010 // v7.17.0 — bool_and(p): TRUE iff every non-NULL input is
2011 // TRUE. NULL skipped; running accumulator stays at TRUE
2012 // until the first non-NULL FALSE.
2013 "bool_and" => {
2014 if is_null {
2015 return Ok(());
2016 }
2017 let b = match v {
2018 Value::Bool(b) => *b,
2019 other => {
2020 return Err(EvalError::TypeMismatch {
2021 detail: format!("bool_and requires bool, got {:?}", other.data_type()),
2022 });
2023 }
2024 };
2025 st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc && b));
2026 }
2027 // v7.17.0 — bool_or(p): TRUE iff any non-NULL input is
2028 // TRUE. NULL skipped.
2029 "bool_or" => {
2030 if is_null {
2031 return Ok(());
2032 }
2033 let b = match v {
2034 Value::Bool(b) => *b,
2035 other => {
2036 return Err(EvalError::TypeMismatch {
2037 detail: format!("bool_or requires bool, got {:?}", other.data_type()),
2038 });
2039 }
2040 };
2041 st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc || b));
2042 }
2043 // v7.32 (round-29) — variance / stddev family. Accumulate the
2044 // running sum (sum_float) and sum of squares (sum_sq) over the
2045 // non-NULL numeric inputs; finalize divides by n or n-1.
2046 "stddev" | "stddev_samp" | "stddev_pop" | "variance" | "var_samp" | "var_pop" => {
2047 if is_null {
2048 return Ok(());
2049 }
2050 let x = match v {
2051 Value::Int(n) => f64::from(*n),
2052 Value::SmallInt(n) => f64::from(*n),
2053 Value::BigInt(n) => *n as f64,
2054 Value::Float(x) => *x,
2055 other => {
2056 return Err(EvalError::TypeMismatch {
2057 detail: format!("{name} needs numeric, got {:?}", other.data_type()),
2058 });
2059 }
2060 };
2061 st.count += 1;
2062 st.sum_float += x;
2063 st.sum_sq += x * x;
2064 }
2065 // v7.32 (round-29) — bitwise aggregates over integer inputs.
2066 "bit_and" | "bit_or" | "bit_xor" => {
2067 if is_null {
2068 return Ok(());
2069 }
2070 let n = match v {
2071 Value::Int(n) => i64::from(*n),
2072 Value::SmallInt(n) => i64::from(*n),
2073 Value::BigInt(n) => *n,
2074 other => {
2075 return Err(EvalError::TypeMismatch {
2076 detail: format!("{name} needs integer, got {:?}", other.data_type()),
2077 });
2078 }
2079 };
2080 st.bit_acc = Some(match (st.bit_acc, name) {
2081 (None, _) => n,
2082 (Some(acc), "bit_and") => acc & n,
2083 (Some(acc), "bit_or") => acc | n,
2084 (Some(acc), _) => acc ^ n, // bit_xor
2085 });
2086 }
2087 // v7.32 (round-29) — WITHIN GROUP aggregates (ordered-set +
2088 // hypothetical-set) collect the sort value (NULLs ignored, per
2089 // PG) into `items`, sorted at finalize by the parallel
2090 // `item_keys`.
2091 n if is_within_group_name(n) => {
2092 if is_null {
2093 return Ok(());
2094 }
2095 st.items.push(v.clone());
2096 if let Some(k) = order_keys {
2097 st.item_keys.push(k);
2098 }
2099 st.count += 1;
2100 }
2101 // v7.32 (round-29) — regression family f(Y, X). Only rows with
2102 // BOTH inputs non-NULL contribute (PG semantics). `v` is Y,
2103 // `arg2` is X.
2104 n if is_regression_name(n) => {
2105 let (Some(y), Some(x)) = (agg_value_to_f64(v), arg2.and_then(agg_value_to_f64)) else {
2106 return Ok(()); // NULL (or non-numeric) in either input
2107 };
2108 st.reg_n += 1;
2109 st.reg_sx += x;
2110 st.reg_sy += y;
2111 st.reg_sxx += x * x;
2112 st.reg_syy += y * y;
2113 st.reg_sxy += x * y;
2114 }
2115 // v7.32 (round-29) — json_agg / jsonb_agg collect every input
2116 // (NULL becomes JSON null, per PG) in row order.
2117 "json_agg" | "jsonb_agg" => {
2118 st.items.push(v.clone());
2119 st.count += 1;
2120 }
2121 // v7.32 (round-29) — json_object_agg(key, value): keys in
2122 // `items`, values in `aux_items`. A NULL key is skipped (PG
2123 // raises; we drop it rather than abort the whole query).
2124 "json_object_agg" | "jsonb_object_agg" => {
2125 if is_null {
2126 return Ok(());
2127 }
2128 st.items.push(v.clone());
2129 st.aux_items.push(arg2.cloned().unwrap_or(Value::Null));
2130 st.count += 1;
2131 }
2132 _ => unreachable!("non-aggregate {name} in update_state"),
2133 }
2134 Ok(())
2135}
2136
2137#[allow(clippy::cast_precision_loss)]
2138fn finalize(name: &str, st: &AggState) -> Value {
2139 match name {
2140 "count" | "count_star" => Value::BigInt(st.count),
2141 "sum" => {
2142 if st.count == 0 {
2143 Value::Null
2144 } else if st.use_float {
2145 Value::Float(st.sum_float + (st.sum_int as f64))
2146 } else {
2147 Value::BigInt(st.sum_int)
2148 }
2149 }
2150 "avg" => {
2151 if st.count == 0 {
2152 Value::Null
2153 } else {
2154 let total = if st.use_float {
2155 st.sum_float + (st.sum_int as f64)
2156 } else {
2157 st.sum_int as f64
2158 };
2159 Value::Float(total / (st.count as f64))
2160 }
2161 }
2162 "min" | "max" => st.extreme.clone().unwrap_or(Value::Null),
2163 // v7.17.0 — string_agg: join all collected text items with
2164 // the captured separator. Empty / all-NULL group → NULL
2165 // (PG semantics).
2166 "string_agg" => {
2167 if st.items.is_empty() {
2168 return Value::Null;
2169 }
2170 let sep = st.separator.clone().unwrap_or_default();
2171 let mut out = String::new();
2172 for (i, item) in st.items.iter().enumerate() {
2173 if i > 0 {
2174 out.push_str(&sep);
2175 }
2176 if let Value::Text(s) = item {
2177 out.push_str(s);
2178 }
2179 }
2180 Value::Text(out)
2181 }
2182 // v7.17.0 — array_agg: collect into a typed array. NULL
2183 // elements are preserved per PG. Result type is decided
2184 // by the first non-NULL element seen (or Text fallback
2185 // when the whole group is NULL — PG would surface the
2186 // declared input type, but SPG hasn't yet wired the
2187 // aggregate's static input-type from `describe`).
2188 "array_agg" => {
2189 if st.items.is_empty() {
2190 return Value::Null;
2191 }
2192 let probe = st.items.iter().find(|v| !v.is_null());
2193 match probe.and_then(spg_storage::Value::data_type) {
2194 Some(DataType::Int) | Some(DataType::SmallInt) => {
2195 let items: Vec<Option<i32>> = st
2196 .items
2197 .iter()
2198 .map(|v| match v {
2199 Value::Int(n) => Some(*n),
2200 Value::SmallInt(n) => Some(i32::from(*n)),
2201 _ => None,
2202 })
2203 .collect();
2204 Value::IntArray(items)
2205 }
2206 Some(DataType::BigInt) => {
2207 let items: Vec<Option<i64>> = st
2208 .items
2209 .iter()
2210 .map(|v| match v {
2211 Value::BigInt(n) => Some(*n),
2212 _ => None,
2213 })
2214 .collect();
2215 Value::BigIntArray(items)
2216 }
2217 _ => {
2218 let items: Vec<Option<String>> = st
2219 .items
2220 .iter()
2221 .map(|v| match v {
2222 Value::Text(s) => Some(s.clone()),
2223 Value::Null => None,
2224 other => Some(format!("{other:?}")),
2225 })
2226 .collect();
2227 Value::TextArray(items)
2228 }
2229 }
2230 }
2231 // v7.17.0 — bool_and / bool_or finalize: lazy-init pattern
2232 // means `None` is exactly "empty group or all-NULL", which
2233 // PG surfaces as SQL NULL.
2234 "bool_and" | "bool_or" => st.bool_acc.map_or(Value::Null, Value::Bool),
2235 // v7.32 (round-29) — variance / stddev. PG: `variance` ==
2236 // `var_samp`, `stddev` == `stddev_samp`. samp needs n >= 2
2237 // (n < 2 → NULL); pop needs n >= 1 (n == 1 → 0).
2238 "variance" | "var_samp" | "var_pop" | "stddev" | "stddev_samp" | "stddev_pop" => {
2239 let n = st.count;
2240 if n == 0 {
2241 return Value::Null;
2242 }
2243 let nf = n as f64;
2244 // Sum of squared deviations from the mean.
2245 let ss = st.sum_sq - (st.sum_float * st.sum_float) / nf;
2246 let pop = name.ends_with("_pop");
2247 let denom = if pop { nf } else { nf - 1.0 };
2248 if denom <= 0.0 {
2249 // var_samp / stddev (samp) with n == 1 → NULL.
2250 return Value::Null;
2251 }
2252 let var = (ss / denom).max(0.0); // clamp fp noise below 0
2253 if name.starts_with("stddev") {
2254 Value::Float(crate::eval::f64_sqrt(var))
2255 } else {
2256 Value::Float(var)
2257 }
2258 }
2259 // v7.32 (round-29) — bitwise aggregates: None (empty / all-NULL)
2260 // → SQL NULL.
2261 "bit_and" | "bit_or" | "bit_xor" => st.bit_acc.map_or(Value::Null, Value::BigInt),
2262 // v7.32 (round-29) — regression family. `regr_count` is the
2263 // paired n; everything else is NULL over an empty set. Terms
2264 // are the mean-centred sums of squares / cross-products.
2265 "regr_count" => Value::BigInt(st.reg_n),
2266 "covar_pop" | "covar_samp" | "corr" | "regr_avgx" | "regr_avgy" | "regr_slope"
2267 | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy" => {
2268 let n = st.reg_n;
2269 if n == 0 {
2270 return Value::Null;
2271 }
2272 let nf = n as f64;
2273 let sxx = st.reg_sxx - st.reg_sx * st.reg_sx / nf;
2274 let syy = st.reg_syy - st.reg_sy * st.reg_sy / nf;
2275 let sxy = st.reg_sxy - st.reg_sx * st.reg_sy / nf;
2276 let avgx = st.reg_sx / nf;
2277 let avgy = st.reg_sy / nf;
2278 let out = match name {
2279 "regr_avgx" => Some(avgx),
2280 "regr_avgy" => Some(avgy),
2281 "regr_sxx" => Some(sxx),
2282 "regr_syy" => Some(syy),
2283 "regr_sxy" => Some(sxy),
2284 "covar_pop" => Some(sxy / nf),
2285 "covar_samp" => (n >= 2).then(|| sxy / (nf - 1.0)),
2286 "regr_slope" => (sxx != 0.0).then(|| sxy / sxx),
2287 "regr_intercept" => (sxx != 0.0).then(|| avgy - (sxy / sxx) * avgx),
2288 "corr" => {
2289 let d = sxx * syy;
2290 (d > 0.0).then(|| sxy / crate::eval::f64_sqrt(d))
2291 }
2292 // PG: NULL when sxx==0; 1 when syy==0 (and sxx>0).
2293 "regr_r2" => {
2294 if sxx == 0.0 {
2295 None
2296 } else if syy == 0.0 {
2297 Some(1.0)
2298 } else {
2299 Some((sxy * sxy) / (sxx * syy))
2300 }
2301 }
2302 _ => None,
2303 };
2304 out.map_or(Value::Null, Value::Float)
2305 }
2306 // v7.32 (round-29) — json_agg / jsonb_agg: a JSON array of every
2307 // collected element in row order; empty set → SQL NULL.
2308 "json_agg" | "jsonb_agg" => {
2309 if st.items.is_empty() {
2310 return Value::Null;
2311 }
2312 let mut out = String::from("[");
2313 for (i, item) in st.items.iter().enumerate() {
2314 if i > 0 {
2315 out.push_str(", ");
2316 }
2317 out.push_str(&crate::json::value_to_json_text(item));
2318 }
2319 out.push(']');
2320 Value::Json(out)
2321 }
2322 // v7.32 (round-29) — json_object_agg: a JSON object built from
2323 // the parallel key (`items`) / value (`aux_items`) streams.
2324 "json_object_agg" | "jsonb_object_agg" => {
2325 if st.items.is_empty() {
2326 return Value::Null;
2327 }
2328 let mut out = String::from("{");
2329 for (i, key) in st.items.iter().enumerate() {
2330 if i > 0 {
2331 out.push_str(", ");
2332 }
2333 // Object keys are always JSON strings (PG coerces).
2334 let key_text = match key {
2335 Value::Text(s) | Value::Json(s) => s.clone(),
2336 other => crate::json::value_to_json_text(other),
2337 };
2338 out.push_str(&crate::json::value_to_json_text(&Value::Text(key_text)));
2339 out.push_str(": ");
2340 let val = st.aux_items.get(i).unwrap_or(&Value::Null);
2341 out.push_str(&crate::json::value_to_json_text(val));
2342 }
2343 out.push('}');
2344 Value::Json(out)
2345 }
2346 // Ordered-set aggregates are finalized in `run` (they need the
2347 // sorted items + the direct fraction argument), never here.
2348 _ => unreachable!(),
2349 }
2350}
2351
2352/// v7.32 (round-29) — numeric coercion for the percentile interpolation.
2353fn agg_value_to_f64(v: &Value) -> Option<f64> {
2354 match v {
2355 Value::Int(n) => Some(f64::from(*n)),
2356 Value::SmallInt(n) => Some(f64::from(*n)),
2357 Value::BigInt(n) => Some(*n as f64),
2358 Value::Float(x) => Some(*x),
2359 _ => None,
2360 }
2361}
2362
2363/// v7.32 (round-29) — finalize a WITHIN GROUP aggregate. `st.items` is
2364/// already sorted by the `WITHIN GROUP (ORDER BY …)` spec. `direct` is
2365/// the evaluated direct argument: the fraction for `percentile_*`, the
2366/// hypothetical value for the hypothetical-set family (`rank` etc.),
2367/// and unused by `mode`. `order` is the (single) sort key, needed by
2368/// the hypothetical-set family to compare in the sort direction.
2369#[allow(
2370 clippy::cast_precision_loss,
2371 clippy::cast_possible_truncation,
2372 clippy::cast_sign_loss
2373)]
2374fn finalize_ordered_set(
2375 name: &str,
2376 st: &AggState,
2377 direct: Option<&Value>,
2378 order: Option<&spg_sql::ast::OrderBy>,
2379) -> Value {
2380 let fraction = direct;
2381 let items = &st.items;
2382 if items.is_empty() {
2383 // A hypothetical row ranks first over an empty group; the
2384 // distribution functions are 0 / divide-by-(n+1).
2385 return match name {
2386 "rank" | "dense_rank" => Value::BigInt(1),
2387 "percent_rank" => Value::Float(0.0),
2388 "cume_dist" => Value::Float(1.0),
2389 _ => Value::Null,
2390 };
2391 }
2392 let n = items.len();
2393 match name {
2394 // v7.32 (round-29) — hypothetical-set: the rank the direct value
2395 // would have if inserted into the group, in the sort direction.
2396 "rank" | "dense_rank" | "percent_rank" | "cume_dist" => {
2397 let Some(h) = fraction else {
2398 return Value::Null;
2399 };
2400 let (desc, nulls_first) = order.map_or((false, None), |o| (o.desc, o.nulls_first));
2401 let mut before = 0usize; // sort strictly before h
2402 let mut before_or_eq = 0usize; // sort before-or-peer with h
2403 let mut distinct_before = 0usize;
2404 let mut last_before: Option<&Value> = None;
2405 for it in items {
2406 match crate::order_by_value_cmp(desc, nulls_first, it, h) {
2407 core::cmp::Ordering::Less => {
2408 before += 1;
2409 before_or_eq += 1;
2410 if last_before
2411 .is_none_or(|p| value_cmp(p, it) != core::cmp::Ordering::Equal)
2412 {
2413 distinct_before += 1;
2414 last_before = Some(it);
2415 }
2416 }
2417 core::cmp::Ordering::Equal => before_or_eq += 1,
2418 core::cmp::Ordering::Greater => {}
2419 }
2420 }
2421 let nn = n as f64;
2422 match name {
2423 "rank" => Value::BigInt((before + 1) as i64),
2424 "dense_rank" => Value::BigInt((distinct_before + 1) as i64),
2425 "percent_rank" => Value::Float(before as f64 / nn),
2426 "cume_dist" => Value::Float((before_or_eq as f64 + 1.0) / (nn + 1.0)),
2427 _ => unreachable!(),
2428 }
2429 }
2430 // Most frequent value; equal values are adjacent in the sorted
2431 // run, and a frequency tie resolves to the earliest run (the
2432 // smallest value under an ascending sort), matching PG.
2433 "mode" => {
2434 let (mut best_i, mut best_cnt) = (0usize, 1usize);
2435 let (mut run_i, mut run_cnt) = (0usize, 1usize);
2436 for i in 1..n {
2437 if value_cmp(&items[i], &items[run_i]) == core::cmp::Ordering::Equal {
2438 run_cnt += 1;
2439 } else {
2440 run_i = i;
2441 run_cnt = 1;
2442 }
2443 if run_cnt > best_cnt {
2444 best_cnt = run_cnt;
2445 best_i = run_i;
2446 }
2447 }
2448 items[best_i].clone()
2449 }
2450 // The first value whose cumulative fraction reaches `f`.
2451 "percentile_disc" => {
2452 let f = fraction
2453 .and_then(agg_value_to_f64)
2454 .unwrap_or(0.0)
2455 .clamp(0.0, 1.0);
2456 let idx = if f <= 0.0 {
2457 0
2458 } else {
2459 (crate::eval::f64_ceil(f * n as f64) as usize)
2460 .saturating_sub(1)
2461 .min(n - 1)
2462 };
2463 items[idx].clone()
2464 }
2465 // Linear interpolation between the two bracketing values.
2466 "percentile_cont" => {
2467 let f = fraction
2468 .and_then(agg_value_to_f64)
2469 .unwrap_or(0.0)
2470 .clamp(0.0, 1.0);
2471 let Some(nums) = items
2472 .iter()
2473 .map(agg_value_to_f64)
2474 .collect::<Option<Vec<f64>>>()
2475 else {
2476 return Value::Null; // non-numeric ordered set
2477 };
2478 if n == 1 {
2479 return Value::Float(nums[0]);
2480 }
2481 let rank = f * (n as f64 - 1.0);
2482 let lo = crate::eval::f64_floor(rank) as usize;
2483 let hi = crate::eval::f64_ceil(rank) as usize;
2484 let frac = rank - lo as f64;
2485 Value::Float(nums[lo] + (nums[hi] - nums[lo]) * frac)
2486 }
2487 _ => unreachable!(),
2488 }
2489}
2490
2491fn infer_agg_type(spec: &AggSpec, schema_cols: &[ColumnSchema]) -> DataType {
2492 // v7.26 (round-20 C) — the argument's statically-derived shape
2493 // types MIN/MAX/SUM/array_agg properly; RowDescription used to
2494 // report TEXT for these, breaking every sqlx typed decode.
2495 let arg_ty = spec
2496 .arg
2497 .as_ref()
2498 .and_then(|a| crate::describe::describe_expr(a, schema_cols))
2499 .map(|shape| shape.ty);
2500 // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]` yields the
2501 // ELEMENT type (x), not the array type.
2502 if spec.first_ordered {
2503 return arg_ty.unwrap_or(DataType::Text);
2504 }
2505 match spec.name.as_str() {
2506 "count" | "count_star" => DataType::BigInt,
2507 "sum" => match arg_ty {
2508 Some(DataType::Float) => DataType::Float,
2509 _ => DataType::BigInt,
2510 },
2511 "avg" => DataType::Float,
2512 // v7.17.0 — string_agg always returns TEXT.
2513 "string_agg" => DataType::Text,
2514 "array_agg" => match arg_ty {
2515 Some(DataType::Int | DataType::SmallInt) => DataType::IntArray,
2516 Some(DataType::BigInt) => DataType::BigIntArray,
2517 _ => DataType::TextArray,
2518 },
2519 // v7.17.0 — boolean aggregates always return BOOL (nullable
2520 // — empty / all-NULL group → NULL).
2521 "bool_and" | "bool_or" => DataType::Bool,
2522 // v7.32 (round-29) — variance / stddev are floating point;
2523 // percentile_cont interpolates to float; the regression family
2524 // (except regr_count) is floating point.
2525 "stddev" | "stddev_samp" | "stddev_pop" | "variance" | "var_samp" | "var_pop"
2526 | "percentile_cont" | "covar_pop" | "covar_samp" | "corr" | "regr_avgx" | "regr_avgy"
2527 | "regr_slope" | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy" => {
2528 DataType::Float
2529 }
2530 // v7.32 (round-29) — bitwise aggregates, regr_count, and the
2531 // integer hypothetical-set ranks return an integer.
2532 "bit_and" | "bit_or" | "bit_xor" | "regr_count" | "rank" | "dense_rank" => DataType::BigInt,
2533 // v7.32 (round-29) — hypothetical-set distribution functions.
2534 "percent_rank" | "cume_dist" => DataType::Float,
2535 // v7.32 (round-29) — JSON aggregates return JSON.
2536 "json_agg" | "jsonb_agg" | "json_object_agg" | "jsonb_object_agg" => DataType::Json,
2537 // min/max, percentile_disc, mode, and anything pass-through:
2538 // the argument's shape (for ordered-set aggs `spec.arg` is the
2539 // WITHIN GROUP value expression).
2540 _ => arg_ty.unwrap_or(DataType::Text),
2541 }
2542}
2543
2544fn agg_or_group_type(e: &Expr, synth: &[ColumnSchema]) -> DataType {
2545 if let Expr::Column(c) = e
2546 && let Some(s) = synth.iter().find(|s| s.name == c.name)
2547 {
2548 return s.ty;
2549 }
2550 // v7.26 (round-20 C) — compound expressions over aggregates
2551 // (COALESCE(BOOL_OR(…), false), (array_agg(…))[1], CASE …)
2552 // derive their shape statically against the synth schema; the
2553 // old Text fallback broke sqlx typed decodes of exactly these
2554 // columns.
2555 crate::describe::describe_expr(e, synth)
2556 .map(|shape| shape.ty)
2557 .unwrap_or(DataType::Text)
2558}
2559
2560fn rewrite_expr(e: &Expr, group_exprs: &[Expr], aggs: &[AggSpec]) -> Expr {
2561 // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]` rewrites
2562 // to its first_ordered synth column, consuming the subscript. Checked
2563 // before the AggregateOrdered/recursion arms (which would otherwise
2564 // rewrite the inner array_agg and leave the subscript). Same matcher
2565 // as collect_aggregates, so the spec it finds is the one collected.
2566 if let Some((arg, order_by, filter)) = first_ordered_array_agg(e) {
2567 let arg_owned = Some(arg.clone());
2568 let filter_owned = filter.cloned();
2569 for (i, spec) in aggs.iter().enumerate() {
2570 if spec.first_ordered
2571 && spec.name == "array_agg"
2572 && spec.arg == arg_owned
2573 && spec.order_by == *order_by
2574 && spec.filter == filter_owned
2575 {
2576 return Expr::Column(spg_sql::ast::ColumnName {
2577 qualifier: None,
2578 name: format!("__agg_{i}"),
2579 });
2580 }
2581 }
2582 }
2583 // v7.24 (round-16 A) — ordered aggregate: match on the inner
2584 // call PLUS the ordering keys.
2585 if let Expr::AggregateOrdered {
2586 call,
2587 order_by,
2588 distinct,
2589 filter,
2590 } = e
2591 && let Expr::FunctionCall { name, args } = call.as_ref()
2592 {
2593 let lower = name.to_ascii_lowercase();
2594 if is_aggregate_name(&lower) {
2595 let canonical: &str = if lower == "every" { "bool_and" } else { &lower };
2596 // Mirror collect_aggregates: ordered-set aggregates take the
2597 // value from the sort spec and the in-parens arg as direct.
2598 let (arg, direct_arg) = if is_within_group_name(canonical) {
2599 (
2600 order_by.first().map(|o| o.expr.clone()),
2601 args.first().cloned(),
2602 )
2603 } else {
2604 (args.first().cloned(), None)
2605 };
2606 let arg2 = if agg_uses_second_arg(canonical) {
2607 args.get(1).cloned()
2608 } else {
2609 None
2610 };
2611 let filter_owned = filter.as_deref().cloned();
2612 for (i, spec) in aggs.iter().enumerate() {
2613 if spec.name == canonical
2614 && spec.arg == arg
2615 && spec.arg2 == arg2
2616 && spec.distinct == *distinct
2617 && spec.order_by == *order_by
2618 && spec.filter == filter_owned
2619 && spec.direct_arg == direct_arg
2620 {
2621 return Expr::Column(spg_sql::ast::ColumnName {
2622 qualifier: None,
2623 name: format!("__agg_{i}"),
2624 });
2625 }
2626 }
2627 }
2628 }
2629 // Match aggregate FunctionCalls first — they sit outside group_by.
2630 if let Expr::FunctionCall { name, args } = e {
2631 let lower = name.to_ascii_lowercase();
2632 if is_aggregate_name(&lower) {
2633 let arg = if lower == "count_star" {
2634 None
2635 } else {
2636 args.first().cloned()
2637 };
2638 // v7.17.0 — match the spec we registered for
2639 // string_agg(value, separator) on the full pair; v7.32 also
2640 // the regression family and json_object_agg.
2641 let arg2 = if agg_uses_second_arg(&lower) {
2642 args.get(1).cloned()
2643 } else {
2644 None
2645 };
2646 // v7.17.0 — `every` collapses into `bool_and` at
2647 // collection; mirror that here so the rewrite finds
2648 // the matching synth column.
2649 let canonical: &str = if lower == "every" {
2650 "bool_and"
2651 } else {
2652 lower.as_str()
2653 };
2654 for (i, spec) in aggs.iter().enumerate() {
2655 if spec.name == canonical
2656 && spec.arg == arg
2657 && spec.arg2 == arg2
2658 && !spec.distinct
2659 && spec.order_by.is_empty()
2660 {
2661 return Expr::Column(spg_sql::ast::ColumnName {
2662 qualifier: None,
2663 name: format!("__agg_{i}"),
2664 });
2665 }
2666 }
2667 }
2668 }
2669 // Match a group_by expression by AST equality.
2670 for (i, g) in group_exprs.iter().enumerate() {
2671 if g == e {
2672 return Expr::Column(spg_sql::ast::ColumnName {
2673 qualifier: None,
2674 name: format!("__grp_{i}"),
2675 });
2676 }
2677 }
2678 // Recurse into children.
2679 match e {
2680 Expr::AggregateOrdered {
2681 call,
2682 order_by,
2683 distinct,
2684 filter,
2685 } => Expr::AggregateOrdered {
2686 call: Box::new(rewrite_expr(call, group_exprs, aggs)),
2687 distinct: *distinct,
2688 order_by: order_by
2689 .iter()
2690 .map(|o| spg_sql::ast::OrderBy {
2691 expr: rewrite_expr(&o.expr, group_exprs, aggs),
2692 desc: o.desc,
2693 nulls_first: o.nulls_first,
2694 })
2695 .collect(),
2696 // The filter is evaluated against SOURCE rows during
2697 // accumulation, never against synth rows — keep it as-is.
2698 filter: filter.clone(),
2699 },
2700 Expr::Binary { lhs, op, rhs } => Expr::Binary {
2701 lhs: Box::new(rewrite_expr(lhs, group_exprs, aggs)),
2702 op: *op,
2703 rhs: Box::new(rewrite_expr(rhs, group_exprs, aggs)),
2704 },
2705 Expr::Unary { op, expr } => Expr::Unary {
2706 op: *op,
2707 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2708 },
2709 Expr::Cast { expr, target } => Expr::Cast {
2710 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2711 target: *target,
2712 },
2713 Expr::IsNull { expr, negated } => Expr::IsNull {
2714 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2715 negated: *negated,
2716 },
2717 Expr::FunctionCall { name, args } => Expr::FunctionCall {
2718 name: name.clone(),
2719 args: args
2720 .iter()
2721 .map(|a| rewrite_expr(a, group_exprs, aggs))
2722 .collect(),
2723 },
2724 Expr::Like {
2725 expr,
2726 pattern,
2727 negated,
2728 case_insensitive,
2729 } => Expr::Like {
2730 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2731 pattern: Box::new(rewrite_expr(pattern, group_exprs, aggs)),
2732 negated: *negated,
2733 case_insensitive: *case_insensitive,
2734 },
2735 Expr::Extract { field, source } => Expr::Extract {
2736 field: *field,
2737 source: Box::new(rewrite_expr(source, group_exprs, aggs)),
2738 },
2739 // v7.25.2 (round-19 A) — subquery nodes: rewrite group-key
2740 // references INSIDE the body to `__grp_N` so the correlated
2741 // resolver can substitute them against the synthesised group
2742 // row (aggs are NOT matched inside the body — a COUNT in the
2743 // subquery is the subquery's own aggregate).
2744 Expr::ScalarSubquery(s) => {
2745 Expr::ScalarSubquery(Box::new(rewrite_group_keys_in_select(s, group_exprs)))
2746 }
2747 Expr::Exists { subquery, negated } => Expr::Exists {
2748 subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
2749 negated: *negated,
2750 },
2751 Expr::InSubquery {
2752 expr,
2753 subquery,
2754 negated,
2755 } => Expr::InSubquery {
2756 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2757 subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
2758 negated: *negated,
2759 },
2760 // v4.12 window / Literal / Column — clone-pass (these don't
2761 // participate in aggregate rewrite).
2762 Expr::WindowFunction { .. } | Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => {
2763 e.clone()
2764 }
2765 // v7.10.10 — recurse children for array nodes.
2766 Expr::Array(items) => Expr::Array(
2767 items
2768 .iter()
2769 .map(|elem| rewrite_expr(elem, group_exprs, aggs))
2770 .collect(),
2771 ),
2772 Expr::ArraySubscript { target, index } => Expr::ArraySubscript {
2773 target: Box::new(rewrite_expr(target, group_exprs, aggs)),
2774 index: Box::new(rewrite_expr(index, group_exprs, aggs)),
2775 },
2776 Expr::AnyAll {
2777 expr,
2778 op,
2779 array,
2780 is_any,
2781 } => Expr::AnyAll {
2782 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2783 op: *op,
2784 array: Box::new(rewrite_expr(array, group_exprs, aggs)),
2785 is_any: *is_any,
2786 },
2787 Expr::InList {
2788 expr,
2789 list,
2790 negated,
2791 } => Expr::InList {
2792 expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2793 list: list
2794 .iter()
2795 .map(|item| rewrite_expr(item, group_exprs, aggs))
2796 .collect(),
2797 negated: *negated,
2798 },
2799 Expr::Case {
2800 operand,
2801 branches,
2802 else_branch,
2803 } => Expr::Case {
2804 operand: operand
2805 .as_deref()
2806 .map(|o| Box::new(rewrite_expr(o, group_exprs, aggs))),
2807 branches: branches
2808 .iter()
2809 .map(|(w, t)| {
2810 (
2811 rewrite_expr(w, group_exprs, aggs),
2812 rewrite_expr(t, group_exprs, aggs),
2813 )
2814 })
2815 .collect(),
2816 else_branch: else_branch
2817 .as_deref()
2818 .map(|e| Box::new(rewrite_expr(e, group_exprs, aggs))),
2819 },
2820 }
2821}
2822
2823/// v7.25.2 (round-19 A) — rewrite group-key references inside a
2824/// subquery body to `__grp_N` synthetic columns (aggregates are
2825/// not touched: empty spec list). Runs through the canonical
2826/// Select walker so every expression slot is covered.
2827fn rewrite_group_keys_in_select(
2828 s: &spg_sql::ast::SelectStatement,
2829 group_exprs: &[Expr],
2830) -> spg_sql::ast::SelectStatement {
2831 let mut out = s.clone();
2832 let _ = crate::walk_select_exprs_mut(&mut out, &mut |e| {
2833 *e = rewrite_expr(e, group_exprs, &[]);
2834 Ok(())
2835 });
2836 out
2837}
2838
2839/// Canonical string key for a tuple of group values. Used as map key.
2840/// Per-value group-key encoding (shared by owned and borrowed paths).
2841fn encode_one(out: &mut String, v: &Value) {
2842 use core::fmt::Write;
2843 match v {
2844 Value::Null => out.push_str("N|"),
2845 // v7.36 (perf — mailrs Phase 1) — switch the integer / float
2846 // encoders to `write!`. `n.to_string()` allocates a fresh
2847 // `String` per cell just to push its bytes into the
2848 // (already-cleared) reuse buffer — for the 25 k-row JOIN
2849 // probe in `count_messages` that's 25 k heap allocs per
2850 // query. `write!(&mut String, ...)` formats straight into
2851 // the buffer; no intermediate alloc.
2852 Value::SmallInt(n) => {
2853 let _ = write!(out, "s{n}|");
2854 }
2855 Value::Int(n) => {
2856 let _ = write!(out, "I{n}|");
2857 }
2858 Value::BigInt(n) => {
2859 let _ = write!(out, "B{n}|");
2860 }
2861 Value::Float(x) => {
2862 let _ = write!(out, "F{x}|");
2863 }
2864 Value::Bool(b) => {
2865 out.push(if *b { 'T' } else { 'f' });
2866 out.push('|');
2867 }
2868 Value::Text(s) => {
2869 out.push('S');
2870 out.push_str(s);
2871 out.push('|');
2872 }
2873 Value::Vector(v) => {
2874 out.push('V');
2875 for x in v {
2876 out.push_str(&x.to_string());
2877 out.push(',');
2878 }
2879 out.push('|');
2880 }
2881 // v6.0.1: GROUP BY on a `VECTOR(N) USING SQ8` column.
2882 // Two cells with byte-identical `(min, max, bytes)`
2883 // share the same group; equivalence is byte-equality
2884 // (same as f32 grouping today — neither path tries to
2885 // normalise nan/-0).
2886 Value::Sq8Vector(q) => {
2887 out.push('Q');
2888 out.push_str(&q.min.to_string());
2889 out.push('@');
2890 out.push_str(&q.max.to_string());
2891 out.push(':');
2892 for b in &q.bytes {
2893 out.push_str(&b.to_string());
2894 out.push(',');
2895 }
2896 out.push('|');
2897 }
2898 // v6.0.3: GROUP BY on a `VECTOR(N) USING HALF` column.
2899 // Byte-equality over the raw u16 bits; matches the SQ8
2900 // path's byte-key model.
2901 Value::HalfVector(h) => {
2902 out.push('H');
2903 for b in &h.bytes {
2904 out.push_str(&b.to_string());
2905 out.push(',');
2906 }
2907 out.push('|');
2908 }
2909 Value::Numeric { scaled, scale } => {
2910 out.push('D');
2911 out.push_str(&scaled.to_string());
2912 out.push('@');
2913 out.push_str(&scale.to_string());
2914 out.push('|');
2915 }
2916 Value::Date(d) => {
2917 out.push('d');
2918 out.push_str(&d.to_string());
2919 out.push('|');
2920 }
2921 Value::Timestamp(t) => {
2922 out.push('t');
2923 out.push_str(&t.to_string());
2924 out.push('|');
2925 }
2926 Value::Interval { months, micros } => {
2927 out.push('i');
2928 out.push_str(&months.to_string());
2929 out.push('m');
2930 out.push_str(µs.to_string());
2931 out.push('|');
2932 }
2933 Value::Json(s) => {
2934 out.push('j');
2935 out.push_str(s);
2936 out.push('|');
2937 }
2938 // v7.5.0 — Value is #[non_exhaustive] for downstream
2939 // forward-compat. Any future variant lacking explicit
2940 // handling here will share a debug-derived group key,
2941 // which is observably wrong but won't crash.
2942 _ => {
2943 out.push('?');
2944 out.push_str(&format!("{v:?}"));
2945 out.push('|');
2946 }
2947 }
2948}
2949
2950/// v7.30 (perf campaign) - encode from borrowed cells without
2951/// materialising an owned Vec<Value> first.
2952pub(crate) fn encode_key_refs(vals: &[&Value]) -> String {
2953 let mut out = String::new();
2954 for v in vals {
2955 encode_one(&mut out, v);
2956 }
2957 out
2958}
2959
2960/// v7.31 (perf 3e) — encode into a caller-owned scratch buffer.
2961/// The per-row key paths (group hash, DISTINCT set, join build/
2962/// probe) ran 24k+ String allocations per query through the
2963/// allocator just to LOOK UP a map; the scratch form allocates
2964/// only when a map actually has to take ownership (vacant insert).
2965pub(crate) fn encode_key_refs_into(vals: &[&Value], out: &mut String) {
2966 out.clear();
2967 for v in vals {
2968 encode_one(out, v);
2969 }
2970}
2971
2972pub(crate) fn encode_key(vals: &[Value]) -> String {
2973 let mut out = String::new();
2974 for v in vals {
2975 encode_one(&mut out, v);
2976 }
2977 out
2978}
2979
2980#[allow(clippy::cast_precision_loss)]
2981fn value_cmp(a: &Value, b: &Value) -> core::cmp::Ordering {
2982 use core::cmp::Ordering::Equal;
2983 match (a, b) {
2984 (Value::Null, Value::Null) => Equal,
2985 (Value::Null, _) => core::cmp::Ordering::Greater, // NULLs last
2986 (_, Value::Null) => core::cmp::Ordering::Less,
2987 (Value::Int(x), Value::Int(y)) => x.cmp(y),
2988 (Value::BigInt(x), Value::BigInt(y)) => x.cmp(y),
2989 (Value::Int(x), Value::BigInt(y)) => i64::from(*x).cmp(y),
2990 (Value::BigInt(x), Value::Int(y)) => x.cmp(&i64::from(*y)),
2991 (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(Equal),
2992 (Value::Int(x), Value::Float(y)) => f64::from(*x).partial_cmp(y).unwrap_or(Equal),
2993 (Value::Float(x), Value::Int(y)) => x.partial_cmp(&f64::from(*y)).unwrap_or(Equal),
2994 (Value::BigInt(x), Value::Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Equal),
2995 (Value::Float(x), Value::BigInt(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Equal),
2996 (Value::Text(x), Value::Text(y)) => x.cmp(y),
2997 (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
2998 _ => Equal,
2999 }
3000}