Skip to main content

spg_engine/
aggregate.rs

1//! Aggregate executor.
2//!
3//! Handles `SELECT … <aggs> … [GROUP BY …]` queries. The planning strategy
4//! is straightforward:
5//!
6//! 1. Walk the SELECT (and ORDER BY) expressions to find every aggregate
7//!    function call. Dedupe by AST equality and assign each `__agg_<i>`.
8//! 2. Same for every `GROUP BY` expression: assign `__grp_<j>`.
9//! 3. Stream the WHERE-filtered rows, group by the tuple of GROUP BY
10//!    values, and update per-group aggregate state.
11//! 4. Materialise a synthetic per-group row containing
12//!    `[__grp_0..__grp_K, __agg_0..__agg_N]` and rewrite the user's
13//!    SELECT / ORDER BY expressions to reference those synthetic columns
14//!    instead of the originals.
15//! 5. Evaluate the rewritten expressions against the synthetic schema and
16//!    emit results.
17//!
18//! v1.8 implements `count(*)`, `count(expr)`, `sum`, `min`, `max`, `avg`.
19//! NULL semantics follow PG: aggregates skip NULL inputs (except
20//! `count(*)`, which counts rows). `sum(int)` widens to `BigInt`;
21//! `avg(int|bigint)` returns `Float`.
22
23use alloc::borrow::Cow;
24use alloc::boxed::Box;
25use alloc::collections::BTreeSet;
26use alloc::format;
27use alloc::string::{String, ToString};
28use alloc::vec::Vec;
29
30use spg_sql::ast::{Expr, SelectItem, SelectStatement};
31use spg_storage::{ColumnSchema, DataType, Row, Value};
32
33use crate::eval::{self, EvalContext, EvalError};
34use crate::join::RowRef;
35
36/// True if this statement should go through the aggregate path.
37pub fn uses_aggregate(stmt: &SelectStatement) -> bool {
38    if stmt.group_by.is_some() || stmt.having.is_some() {
39        return true;
40    }
41    for item in &stmt.items {
42        if let SelectItem::Expr { expr, .. } = item
43            && contains_aggregate(expr)
44        {
45            return true;
46        }
47    }
48    for o in &stmt.order_by {
49        if contains_aggregate(&o.expr) {
50            return true;
51        }
52    }
53    if let Some(h) = &stmt.having
54        && contains_aggregate(h)
55    {
56        return true;
57    }
58    false
59}
60
61pub fn contains_aggregate(e: &Expr) -> bool {
62    match e {
63        Expr::FunctionCall { name, args } => {
64            is_aggregate_name(name) || args.iter().any(contains_aggregate)
65        }
66        Expr::AggregateOrdered { .. } => true,
67        Expr::Binary { lhs, rhs, .. } => contains_aggregate(lhs) || contains_aggregate(rhs),
68        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
69            contains_aggregate(expr)
70        }
71        Expr::Like { expr, pattern, .. } => contains_aggregate(expr) || contains_aggregate(pattern),
72        Expr::Extract { source, .. } => contains_aggregate(source),
73        // v4.10 subqueries + v4.12 window functions / Literal /
74        // Column — all non-aggregate leaves from the regular
75        // aggregate planner's POV. Window-bearing projections are
76        // routed to exec_select_with_window before this runs.
77        Expr::ScalarSubquery(_)
78        | Expr::Exists { .. }
79        | Expr::InSubquery { .. }
80        | Expr::WindowFunction { .. }
81        | Expr::Literal(_)
82        | Expr::Placeholder(_)
83        | Expr::Column(_) => false,
84        // v7.10.10 — recurse into array constructor / subscript /
85        // ANY/ALL children. Aggregates inside `ARRAY[SUM(x)]` are
86        // valid PG and must be detected here.
87        Expr::Array(items) => items.iter().any(contains_aggregate),
88        Expr::ArraySubscript { target, index } => {
89            contains_aggregate(target) || contains_aggregate(index)
90        }
91        Expr::AnyAll { expr, array, .. } => contains_aggregate(expr) || contains_aggregate(array),
92        Expr::InList { expr, list, .. } => {
93            contains_aggregate(expr) || list.iter().any(contains_aggregate)
94        }
95        // v7.13.0 — CASE WHEN … END. Recurse into operand,
96        // every (WHEN, THEN) pair, and the ELSE branch.
97        Expr::Case {
98            operand,
99            branches,
100            else_branch,
101        } => {
102            operand.as_deref().is_some_and(contains_aggregate)
103                || branches
104                    .iter()
105                    .any(|(w, t)| contains_aggregate(w) || contains_aggregate(t))
106                || else_branch.as_deref().is_some_and(contains_aggregate)
107        }
108    }
109}
110
111pub fn is_aggregate_name(name: &str) -> bool {
112    matches!(
113        name.to_ascii_lowercase().as_str(),
114        "count"
115            | "count_star"
116            | "sum"
117            | "min"
118            | "max"
119            | "avg"
120            // v7.17.0 — variadic / collection aggregates. ORM
121            // reports (Hibernate / Rails / Django) emit these in
122            // GROUP BY rollups; pre-7.17 SPG hit "unknown
123            // aggregate".
124            | "string_agg"
125            | "array_agg"
126            // v7.17.0 — boolean aggregates. `every` is SQL-standard
127            // alias for `bool_and`.
128            | "bool_and"
129            | "bool_or"
130            | "every"
131            // v7.32 (round-29) — statistical aggregates (every BI /
132            // dashboard emits these in rollups).
133            | "stddev" | "stddev_samp" | "stddev_pop"
134            | "variance" | "var_samp" | "var_pop"
135            // v7.32 (round-29) — bitwise aggregates.
136            | "bit_and" | "bit_or" | "bit_xor"
137            // v7.32 (round-29) — ordered-set aggregates (used with
138            // `WITHIN GROUP (ORDER BY …)`).
139            | "percentile_cont" | "percentile_disc" | "mode"
140            // v7.32 (round-29) — hypothetical-set aggregates (also
141            // `WITHIN GROUP`): the rank the direct args WOULD have.
142            | "rank" | "dense_rank" | "percent_rank" | "cume_dist"
143            // v7.32 (round-29) — two-argument regression family.
144            | "covar_pop" | "covar_samp" | "corr"
145            | "regr_count" | "regr_avgx" | "regr_avgy" | "regr_slope"
146            | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy"
147            // v7.32 (round-29) — JSON aggregates.
148            | "json_agg" | "jsonb_agg" | "json_object_agg" | "jsonb_object_agg"
149    )
150}
151
152/// v7.32 (round-29) — two-argument regression aggregates `f(Y, X)`.
153fn is_regression_name(name: &str) -> bool {
154    matches!(
155        name,
156        "covar_pop"
157            | "covar_samp"
158            | "corr"
159            | "regr_count"
160            | "regr_avgx"
161            | "regr_avgy"
162            | "regr_slope"
163            | "regr_intercept"
164            | "regr_r2"
165            | "regr_sxx"
166            | "regr_syy"
167            | "regr_sxy"
168    )
169}
170
171/// v7.32 (round-29) — aggregates that consume a second positional
172/// argument: `string_agg(v, sep)`, the regression family `f(Y, X)`, and
173/// `json_object_agg(key, value)`.
174fn agg_uses_second_arg(name: &str) -> bool {
175    name == "string_agg"
176        || name == "json_object_agg"
177        || name == "jsonb_object_agg"
178        || is_regression_name(name)
179}
180
181/// v7.32 (round-29) — ordered-set aggregates: the value to aggregate
182/// comes from the `WITHIN GROUP (ORDER BY …)` sort spec, and any
183/// in-parens arguments are *direct* arguments (the percentile fraction).
184/// `mode()` takes no direct argument.
185pub fn is_ordered_set_name(name: &str) -> bool {
186    // v7.32 — `eq_ignore_ascii_case` instead of `to_ascii_lowercase()`:
187    // these classifiers run in the aggregate row/group loop, where the
188    // old per-call `String` allocation showed up as ~16% of the inbox's
189    // aggregate path in a sampled profile (the names are constant).
190    ["percentile_cont", "percentile_disc", "mode"]
191        .iter()
192        .any(|k| name.eq_ignore_ascii_case(k))
193}
194
195/// v7.32 (round-29) — hypothetical-set aggregates: `rank(args) WITHIN
196/// GROUP (ORDER BY …)` and friends compute the rank the hypothetical
197/// row would have. Like ordered-set, the value stream comes from the
198/// sort spec and the in-parens args are direct (the hypothetical row).
199pub fn is_hypothetical_set_name(name: &str) -> bool {
200    ["rank", "dense_rank", "percent_rank", "cume_dist"]
201        .iter()
202        .any(|k| name.eq_ignore_ascii_case(k))
203}
204
205/// v7.32 (round-29) — every aggregate that takes its value stream from
206/// a `WITHIN GROUP (ORDER BY …)` clause (ordered-set + hypothetical-set).
207pub fn is_within_group_name(name: &str) -> bool {
208    is_ordered_set_name(name) || is_hypothetical_set_name(name)
209}
210
211/// Per-aggregate running state.
212#[derive(Debug, Default, Clone)]
213struct AggState {
214    count: i64,
215    sum_int: i64,
216    sum_float: f64,
217    extreme: Option<Value>,
218    use_float: bool,
219    /// v7.17.0 — running collection for string_agg / array_agg.
220    /// Each entry is one row's contribution (NULL preserved as
221    /// `Value::Null`; string_agg's finalize step drops them, but
222    /// array_agg keeps them). Pushing in insertion order matches
223    /// PG behaviour when no `ORDER BY` is given inside the
224    /// aggregate call.
225    items: Vec<Value>,
226    /// v7.25 (round-17) — per-group dedupe set for DISTINCT
227    /// aggregates (encoded values; NULLs never reach it because
228    /// the caller's skip runs after the per-aggregate NULL rules).
229    seen: BTreeSet<String>,
230    /// v7.24 (round-16 A) — per-item ORDER BY key tuples, parallel
231    /// to `items` (pushed under the same skip/keep conditions).
232    /// Empty when the aggregate carries no internal ordering.
233    item_keys: Vec<Vec<Value>>,
234    /// v7.17.0 — captured separator for string_agg. PG accepts a
235    /// non-constant separator expression but in practice every
236    /// caller passes a literal; the engine snapshots the last
237    /// non-NULL text it sees, which matches PG's "use the latest
238    /// row's value" behaviour.
239    separator: Option<String>,
240    /// v7.17.0 — running boolean accumulator for bool_and /
241    /// bool_or / every. `None` until the first non-NULL input;
242    /// at finalize None → SQL NULL.
243    bool_acc: Option<bool>,
244    /// v7.32 (round-29) — sum of squares for the variance / stddev
245    /// family (`sum_float` carries the running sum; `count` the n).
246    sum_sq: f64,
247    /// v7.32 (round-29) — running accumulator for bit_and / bit_or /
248    /// bit_xor. `None` until the first non-NULL input → SQL NULL.
249    bit_acc: Option<i64>,
250    /// v7.32 (round-29) — two-argument regression family
251    /// (`covar_*` / `corr` / `regr_*`), PG arg order `f(Y, X)`. Only
252    /// rows where BOTH inputs are non-NULL contribute (`count` is the
253    /// paired n, independent of the single-arg `sum_*`).
254    reg_n: i64,
255    reg_sx: f64,
256    reg_sy: f64,
257    reg_sxx: f64,
258    reg_syy: f64,
259    reg_sxy: f64,
260    /// v7.32 (round-29) — second value stream for `json_object_agg`
261    /// (`items` holds the keys, `aux_items` the values).
262    aux_items: Vec<Value>,
263    /// v7.33 (array_agg argmax) — for a `first_ordered` spec
264    /// (`(array_agg(x ORDER BY y))[1]`), the running first-by-order
265    /// (sort-key tuple, value). Replaced only when a new row's key sorts
266    /// strictly before the current best (ties keep the earliest row, =
267    /// the stable-sort `[1]`). No items/item_keys array is built.
268    first_best: Option<(Vec<Value>, Value)>,
269}
270
271#[derive(Debug, Clone)]
272struct AggSpec {
273    name: String, // lowercased
274    /// First argument (value expression) for every aggregate
275    /// except `count(*)`. `None` for `count_star`.
276    arg: Option<Expr>,
277    /// v7.17.0 — second argument. Only `string_agg(value, sep)`
278    /// uses it today. `None` for every other aggregate (or for
279    /// `array_agg`, which is single-arg). Carried in the spec so
280    /// per-row evaluation can re-use the same separator
281    /// expression across calls.
282    arg2: Option<Expr>,
283    /// v7.25 (round-17) — `COUNT(DISTINCT x)` & friends: dedupe
284    /// the input stream per group before accumulation.
285    distinct: bool,
286    /// v7.24 (round-16 A) — aggregate-internal ORDER BY keys
287    /// (`array_agg(x ORDER BY y DESC NULLS LAST)`). Empty for the
288    /// plain form. Only the collection aggregates honour it;
289    /// other aggregates are order-insensitive and ignore it (PG
290    /// accepts the syntax everywhere too).
291    order_by: Vec<spg_sql::ast::OrderBy>,
292    /// v7.32 (round-29) — `FILTER (WHERE cond)`: a per-row predicate
293    /// evaluated against the source row before accumulation. A row
294    /// whose `cond` is not TRUE (false or NULL) is excluded from this
295    /// aggregate only. `None` for the unfiltered form.
296    filter: Option<Expr>,
297    /// v7.32 (round-29) — ordered-set aggregates only: the *direct*
298    /// argument (the percentile fraction for `percentile_cont/disc`).
299    /// PG requires it constant, so it is evaluated once. `None` for
300    /// `mode()` and for every non-ordered-set aggregate.
301    direct_arg: Option<Expr>,
302    /// v7.33 (array_agg argmax) — set when this spec came from
303    /// `(array_agg(x ORDER BY y))[1]`: accumulate only the first-by-order
304    /// element (a running argmax/argmin) and finalise to that scalar
305    /// value, instead of collecting + sorting + materialising the whole
306    /// per-group array just to take element 1. Returns the element type,
307    /// not the array type.
308    first_ordered: bool,
309}
310
311/// Output of running the aggregate path. Schema describes one row per
312/// group; rows are not yet ORDER BY-sorted (caller does it).
313#[derive(Debug)]
314pub struct AggResult {
315    pub columns: Vec<ColumnSchema>,
316    pub rows: Vec<Row>,
317    /// v7.31 (perf — PG lesson #1, post-LIMIT subquery projection):
318    /// select-list items whose rewritten expr carries a subquery and
319    /// is referenced by neither ORDER BY nor HAVING. Their output
320    /// cells hold NULL placeholders; the caller truncates to
321    /// LIMIT+OFFSET first and only then evaluates these for the
322    /// surviving rows (PG runs the same shape with SubPlan loops=50
323    /// instead of loops=24000). `(output_col, rewritten_expr)`.
324    pub deferred: Vec<(usize, Expr)>,
325    /// Synthetic group rows aligned 1:1 with `rows`; populated only
326    /// when `deferred` is non-empty.
327    pub synth_rows: Vec<Row>,
328    /// Schema the deferred exprs evaluate against.
329    pub synth_schema: Vec<ColumnSchema>,
330}
331
332/// Execute aggregate logic against an already-WHERE-filtered iterator of
333/// rows. `table_alias` is the alias accepted by column resolution.
334#[allow(clippy::too_many_lines)]
335/// v7.25.2 (round-19 A) — caller-injected evaluator for synth-row
336/// expressions that still carry subquery nodes after the rewrite
337/// (correlated subqueries in the select list / HAVING / aggregate
338/// ORDER BY of a GROUP BY query). The engine passes its
339/// correlated-aware evaluator; pure-library callers pass None and
340/// surviving subqueries keep erroring loudly.
341pub type CorrelatedEval<'a> = &'a dyn Fn(&Expr, &Row, &EvalContext<'_>) -> Result<Value, EvalError>;
342
343/// Output of the per-group projection stage (`project_groups`): the
344/// output schema, the projected rows, the synth rows kept alongside
345/// them for post-LIMIT deferred evaluation, the deferred subquery
346/// items, and the rewritten ORDER BY exprs (shared with the sort).
347struct Projection {
348    columns: Vec<ColumnSchema>,
349    out_rows: Vec<Row>,
350    kept_synth: Vec<Row>,
351    deferred: Vec<(usize, Expr)>,
352    order_rewritten: Vec<Expr>,
353}
354
355/// v7.35.0 — detect the `SELECT COUNT(*) FROM … [WHERE …]` shape
356/// (single item, no GROUP BY / HAVING / ORDER BY / DISTINCT /
357/// LIMIT WITH TIES / FILTER / window). For this shape the answer
358/// is exactly `rows.len()` as `BigInt`, no group state needed.
359/// Returns `None` for any deviation so the caller's full pipeline
360/// runs verbatim.
361///
362/// v7.35.2 — also short-circuit `COUNT(<literal>)` (e.g.
363/// `COUNT(1)`) and `COUNT(<column>)` when the column is declared
364/// NOT NULL on the input schema. PG handles both cases as
365/// `COUNT(*)` (the non-null filter is a no-op), so doing the same
366/// here keeps every `count this thing` shape on the same fast path
367/// instead of routing the literal / non-null-col variants through
368/// the four-stage aggregate pipeline.
369fn try_pure_count_star_short_circuit(
370    stmt: &SelectStatement,
371    rows: &[RowRef<'_>],
372    schema_cols: &[ColumnSchema],
373    table_alias: Option<&str>,
374) -> Option<AggResult> {
375    if stmt.distinct
376        || stmt.limit_with_ties
377        || stmt.group_by.is_some()
378        || stmt.having.is_some()
379        || !stmt.order_by.is_empty()
380    {
381        return None;
382    }
383    if stmt.items.len() != 1 {
384        return None;
385    }
386    let SelectItem::Expr { expr, alias } = &stmt.items[0] else {
387        return None;
388    };
389    let Expr::FunctionCall { name, args } = expr else {
390        return None;
391    };
392    if !name.eq_ignore_ascii_case("count") && !name.eq_ignore_ascii_case("count_star") {
393        return None;
394    }
395    let count_star_shape = match args.as_slice() {
396        // `COUNT(*)` parses to `count_star` with no args.
397        [] if name.eq_ignore_ascii_case("count_star") => true,
398        // `COUNT(<literal>)` — the per-row test is "is this literal
399        // non-null?" which is constant, so it's COUNT(*) when the
400        // literal is non-null.
401        [Expr::Literal(lit)] => !matches!(lit, spg_sql::ast::Literal::Null),
402        // `COUNT(<column>)` — same answer as COUNT(*) when the
403        // column is statically declared NOT NULL on the input
404        // schema. Resolve through the alias if one is set.
405        [Expr::Column(c)] => {
406            if let Some(q) = c.qualifier.as_deref()
407                && let Some(alias) = table_alias
408                && !q.eq_ignore_ascii_case(alias)
409            {
410                return None;
411            }
412            schema_cols
413                .iter()
414                .find(|s| s.name.eq_ignore_ascii_case(&c.name))
415                .is_some_and(|s| !s.nullable)
416        }
417        _ => return None,
418    };
419    if !count_star_shape {
420        return None;
421    }
422    let col_name = alias.clone().unwrap_or_else(|| "count".to_string());
423    let count = i64::try_from(rows.len()).unwrap_or(i64::MAX);
424    Some(AggResult {
425        columns: alloc::vec![ColumnSchema::new(col_name, DataType::BigInt, false)],
426        rows: alloc::vec![Row::new(alloc::vec![Value::BigInt(count)])],
427        deferred: Vec::new(),
428        synth_rows: Vec::new(),
429        synth_schema: Vec::new(),
430    })
431}
432
433pub(crate) fn run(
434    stmt: &SelectStatement,
435    rows: &[RowRef<'_>],
436    schema_cols: &[ColumnSchema],
437    table_alias: Option<&str>,
438    correlated_eval: Option<CorrelatedEval<'_>>,
439) -> Result<AggResult, EvalError> {
440    // v7.35.0 — pure `SELECT COUNT(*) FROM … WHERE …` short-circuit.
441    // The caller already filtered rows by WHERE (we run on the
442    // post-WHERE survivor set), so for the canonical pure-COUNT(*)
443    // shape (no GROUP BY / HAVING / ORDER BY / DISTINCT / FILTER /
444    // window) the answer is simply `rows.len()`. The four-stage
445    // aggregate pipeline below (accumulate_groups → build_synth_schema
446    // → finalize_synth_rows → project_groups) collapses to a single
447    // BigInt cell when there's a single group, but each stage still
448    // pays its own allocation tax — group state map, synth schema
449    // vec, finalize loop. `exists_in_60` (mailrs prod #4 baseline)
450    // is exactly this shape on a 25 k-row JOIN.
451    if let Some(short) = try_pure_count_star_short_circuit(stmt, rows, schema_cols, table_alias) {
452        return Ok(short);
453    }
454    let group_exprs: Vec<Expr> = stmt.group_by.clone().unwrap_or_default();
455
456    // Collect aggregate sub-expressions across items + order_by.
457    let mut agg_specs: Vec<AggSpec> = Vec::new();
458    for item in &stmt.items {
459        if let SelectItem::Expr { expr, .. } = item {
460            collect_aggregates(expr, &mut agg_specs);
461        }
462    }
463    for o in &stmt.order_by {
464        collect_aggregates(&o.expr, &mut agg_specs);
465    }
466    if let Some(h) = &stmt.having {
467        collect_aggregates(h, &mut agg_specs);
468    }
469    // v7.17.0 — arity validation. The collector tolerates an
470    // arbitrary positional-arg count; here we enforce the
471    // per-aggregate contract so a malformed call (e.g.
472    // `array_agg()` or `string_agg(x)`) surfaces as a SQL error
473    // rather than silently coercing to a degenerate aggregate.
474    validate_agg_arities(stmt, &agg_specs)?;
475    validate_within_group(&agg_specs)?;
476
477    // (1) Stream the WHERE-filtered rows into insertion-ordered group state.
478    let order = accumulate_groups(
479        rows,
480        &group_exprs,
481        &agg_specs,
482        schema_cols,
483        table_alias,
484        correlated_eval,
485    )?;
486
487    // (2) Build the synthetic per-group schema and finalise each group's row.
488    let synth_schema =
489        build_synth_schema(rows, &group_exprs, &agg_specs, schema_cols, table_alias)?;
490    let synth_rows = finalize_synth_rows(
491        &order,
492        &agg_specs,
493        &synth_schema,
494        rows,
495        schema_cols,
496        table_alias,
497    )?;
498
499    // (3) Rewrite the user's expressions, filter groups by HAVING and project.
500    let Projection {
501        columns,
502        mut out_rows,
503        mut kept_synth,
504        deferred,
505        order_rewritten,
506    } = project_groups(
507        synth_rows,
508        stmt,
509        &group_exprs,
510        &agg_specs,
511        &synth_schema,
512        correlated_eval,
513    )?;
514
515    // (4) ORDER BY on the aggregated output (the caller applies LIMIT).
516    //
517    // v7.37.3 (mailrs prod /api/contacts 3.21× regression — and the
518    // general inbox-listing-shape SPG-vs-PG gap) — top-K sink for
519    // `ORDER BY <agg> [DESC] LIMIT k`. Pre-7.37.3 this stage ran a
520    // full O(N log N) sort over every surviving group, then the
521    // caller truncated to `k`. With high-cardinality GROUP BY (a
522    // sender column with hundreds-thousands of distinct values) the
523    // truncated set is a tiny fraction of `N` — keep an O(k) top-K
524    // sink and never sort the discarded majority. Matches PG /
525    // MySQL / MariaDB's standard "LIMIT k under ORDER BY agg"
526    // optimisation; SPG previously implemented it only on the
527    // streamed inner-join path (`try_streamed_inner_join_topn`)
528    // and not on the aggregate output.
529    //
530    // Gate: needs a literal LIMIT (placeholder LIMIT we can't bound
531    // statically here), no DISTINCT (would need post-dedup, can't
532    // truncate during sort), no LIMIT WITH TIES (which extends past
533    // the literal k by run-time tie-key comparison).
534    let keep_n: Option<usize> = if !stmt.order_by.is_empty()
535        && !stmt.distinct
536        && !stmt.limit_with_ties
537    {
538        stmt.limit_literal().map(|l| {
539            let off = stmt.offset_literal().unwrap_or(0) as usize;
540            (l as usize).saturating_add(off)
541        })
542    } else {
543        None
544    };
545    if !stmt.order_by.is_empty() {
546        let (sorted_synth, sorted_out) = sort_synth_by_order_by(
547            &synth_schema,
548            &stmt.order_by,
549            &order_rewritten,
550            kept_synth,
551            out_rows,
552            correlated_eval,
553            keep_n,
554        )?;
555        kept_synth = sorted_synth;
556        out_rows = sorted_out;
557    }
558
559    let (synth_rows_out, synth_schema_out) = if deferred.is_empty() {
560        (Vec::new(), Vec::new())
561    } else {
562        (kept_synth, synth_schema.clone())
563    };
564    Ok(AggResult {
565        columns,
566        rows: out_rows,
567        deferred,
568        synth_rows: synth_rows_out,
569        synth_schema: synth_schema_out,
570    })
571}
572
573/// v7.32 (round-29) — validate the structural requirements of WITHIN
574/// GROUP (ordered-set / hypothetical-set) aggregates up front, so a
575/// malformed call surfaces as a SQL error rather than a silently
576/// degenerate aggregate.
577fn validate_within_group(agg_specs: &[AggSpec]) -> Result<(), EvalError> {
578    // v7.32 (round-29) — WITHIN GROUP aggregates require the clause (PG
579    // raises a hard error otherwise rather than silently degrading), and
580    // SPG supports the single-sort-key form only.
581    for spec in agg_specs {
582        if is_within_group_name(&spec.name) {
583            if spec.order_by.is_empty() {
584                return Err(EvalError::TypeMismatch {
585                    detail: format!("{}() requires WITHIN GROUP (ORDER BY …)", spec.name),
586                });
587            }
588            // mode() is the only WITHIN GROUP aggregate with no direct
589            // argument; the rest carry one (percentile fraction /
590            // hypothetical value).
591            if spec.name != "mode" && spec.direct_arg.is_none() {
592                return Err(EvalError::TypeMismatch {
593                    detail: format!("{}() requires a direct argument", spec.name),
594                });
595            }
596            // Multi-key WITHIN GROUP (multiple sort keys / hypothetical
597            // args) is not supported yet — error loudly instead of
598            // silently using only the first key.
599            if spec.order_by.len() > 1 {
600                return Err(EvalError::TypeMismatch {
601                    detail: format!(
602                        "{}() with multiple WITHIN GROUP sort keys is not supported yet",
603                        spec.name
604                    ),
605                });
606            }
607        }
608    }
609    Ok(())
610}
611
612/// (1) Stream the WHERE-filtered rows, group by the GROUP BY value
613/// tuple, and update per-group aggregate state. Returns the groups in
614/// insertion order. See `run` for the bind-once fast path rationale.
615#[allow(clippy::too_many_lines, clippy::type_complexity)]
616fn accumulate_groups(
617    rows: &[RowRef<'_>],
618    group_exprs: &[Expr],
619    agg_specs: &[AggSpec],
620    schema_cols: &[ColumnSchema],
621    table_alias: Option<&str>,
622    correlated_eval: Option<CorrelatedEval<'_>>,
623) -> Result<Vec<(Vec<Value>, Vec<AggState>)>, EvalError> {
624    let ctx = EvalContext::new(schema_cols, table_alias);
625    // Map group key (vec of values, encoded as canonical string) -> group state.
626    // v7.32 (architecture v2, P2b) — insertion-ordered group state in
627    // a Vec; the hash map only maps key → index. Removes the parallel
628    // `key_order: Vec<String>` (a second per-group key clone) and the
629    // per-group re-probe `groups[k]` at finalize (24k hash lookups for
630    // the inbox shape). The map owns its key once on vacant insert.
631    let mut order: Vec<(Vec<Value>, Vec<AggState>)> = Vec::new();
632    let mut groups: hashbrown::HashMap<String, usize> = hashbrown::HashMap::new();
633    // When there are no GROUP BY exprs *and* there is at least one aggregate,
634    // every row collapses into a single anonymous group keyed by "".
635    if rows.is_empty() && group_exprs.is_empty() {
636        // Single empty-aggregate group: count=0, sum=0, max=NULL, etc.
637        // No rows follow, so the map is never probed — seed `order` only.
638        let init: Vec<AggState> = (0..agg_specs.len()).map(|_| AggState::default()).collect();
639        order.push((Vec::new(), init));
640    }
641
642    // v7.30 (perf campaign) - hoist the per-row work that doesn't
643    // depend on the row: which group exprs need collation folding
644    // (none, for most queries - the old code cloned the whole
645    // group_vals vec per row just in case).
646    // v7.30 (perf campaign) - the no-tax row loop. When a group
647    // expr or an aggregate argument is a bare column reference
648    // (the overwhelmingly common shape), bind its position ONCE
649    // and read row cells by offset in the loop - no per-row tree
650    // walk, no owned-Value clone out of resolve_column. Anything
651    // more complex keeps the eval path.
652    let col_pos = |e: &Expr| -> Option<usize> {
653        // Qualified references only: the bare-name resolver carries
654        // alias/ambiguity logic the bind-once path must not fork.
655        if let Expr::Column(c) = e
656            && c.qualifier.is_some()
657        {
658            eval::find_column_pos(c, &ctx)
659        } else {
660            None
661        }
662    };
663    let group_pos: Vec<Option<usize>> = group_exprs.iter().map(col_pos).collect();
664    let all_groups_bound = group_pos.iter().all(Option::is_some);
665    let arg_pos: Vec<Option<usize>> = agg_specs
666        .iter()
667        .map(|spec| spec.arg.as_ref().and_then(|e| col_pos(e)))
668        .collect();
669    // v7.36 (perf — mailrs Ask 1 SUM(LENGTH(text_body)) 18ms → ?) —
670    // pre-compile every aggregate arg that's a `fully_compilable`
671    // PURE expression over bound columns. Without this, `LENGTH(col)`
672    // / `COALESCE(col, '')` / `CAST(col AS BIGINT)` etc. ALL fell
673    // through to the `(None, Some(e)) => eval_arg(e, mat, ...)` slow
674    // path that materialises a Cow<Row> per input row — for a 25k-row
675    // JOIN that's 25k full-row clones for one column read. The Step
676    // VM (`eval_compiled_ref`) reads columns by RowRef::get and runs
677    // the same `apply_function` dispatcher with zero materialisation.
678    let arg_compiled: Vec<Option<eval::CompiledExpr>> = agg_specs
679        .iter()
680        .enumerate()
681        .map(|(i, spec)| match (&arg_pos[i], &spec.arg) {
682            (Some(_), _) => None,
683            (None, Some(e)) if eval::fully_compilable(e) => Some(eval::compile_expr(e, &ctx)),
684            _ => None,
685        })
686        .collect();
687    // v7.33 (array_agg perf) — bound positions for each spec's internal
688    // ORDER BY keys, so an ordered aggregate (`array_agg(x ORDER BY y)`)
689    // reads the sort key by reference (RowRef::get) instead of
690    // materialising the whole combined join row per input row just to
691    // eval one bound column. Mirrors arg_pos. On the inbox shape this
692    // turned 24k full-row (~1 KB each) clones into 24k single-cell reads.
693    let order_pos: Vec<Vec<Option<usize>>> = agg_specs
694        .iter()
695        .map(|spec| spec.order_by.iter().map(|o| col_pos(&o.expr)).collect())
696        .collect();
697    // Does any spec need the fully-materialised row in the bound fast
698    // path — a FILTER, a non-bound value arg, a second arg, or a non-bound
699    // ORDER key? When false (every aggregate arg/key is a bound column —
700    // the inbox shape) the bound fast path never materialises a row.
701    let needs_mat = agg_specs.iter().enumerate().any(|(i, s)| {
702        s.filter.is_some()
703            || (s.arg.is_some() && arg_pos[i].is_none() && arg_compiled[i].is_none())
704            || s.arg2.is_some()
705            || order_pos[i].iter().any(Option::is_none)
706    });
707    let ci_positions: Vec<usize> = group_exprs
708        .iter()
709        .enumerate()
710        .filter(|(_, g)| {
711            matches!(
712                eval::column_collation(g, &ctx),
713                Some(spg_storage::Collation::CaseInsensitive)
714            )
715        })
716        .map(|(i, _)| i)
717        .collect();
718    // v7.31 (perf 3e) — per-row scratch buffers. The fast path used
719    // to allocate a key String (and a refs Vec) for EVERY row just
720    // to probe the group map; hits — the overwhelming case — now
721    // touch the allocator zero times.
722    let mut keybuf_s = String::new();
723    // v7.36 — reused Step VM eval stack for compiled aggregate args.
724    let mut eval_stack: Vec<Value> = Vec::new();
725    let mut dkeybuf = String::new();
726    let mut refs: Vec<&Value> = Vec::with_capacity(group_pos.len());
727    // v7.32 (round-31) — an aggregate's argument / FILTER / second arg /
728    // ORDER key may itself be a *correlated* subquery, e.g.
729    // `MAX((SELECT i.v FROM inner i WHERE i.fk = o.id))`. A non-correlated
730    // subquery is pre-resolved to a literal before this loop, but a
731    // correlated one survives as a subquery node and must be evaluated per
732    // outer row through the correlated evaluator — the same hook the
733    // select-list / HAVING / ORDER finalisers already use below. Plain
734    // `eval_expr` would hit "subquery reached row eval".
735    //
736    // The `any_agg_subquery` gate is computed once here so the common case
737    // (no subquery anywhere in the aggregate args — including every hot
738    // scan/group aggregate) short-circuits before the per-row
739    // `expr_has_subquery` walk: `eval_arg` is then exactly `eval_expr`.
740    let any_agg_subquery = correlated_eval.is_some()
741        && agg_specs.iter().any(|s| {
742            s.filter
743                .as_ref()
744                .is_some_and(|e| crate::expr_has_subquery(e))
745                || s.arg.as_ref().is_some_and(|e| crate::expr_has_subquery(e))
746                || s.arg2.as_ref().is_some_and(|e| crate::expr_has_subquery(e))
747                || s.order_by.iter().any(|o| crate::expr_has_subquery(&o.expr))
748        });
749    let eval_arg = |e: &Expr, r: &Row, c: &EvalContext<'_>| -> Result<Value, EvalError> {
750        match correlated_eval {
751            Some(f) if any_agg_subquery && crate::expr_has_subquery(e) => f(e, r, c),
752            _ => eval::eval_expr(e, r, c),
753        }
754    };
755    // v7.36 (perf — mailrs Phase 1, post u64-hash) — single
756    // anonymous group fast path. When the query has no GROUP BY
757    // (`SELECT SUM(LENGTH(col)) FROM ...`, COUNT, AVG, etc.) the
758    // whole input collapses into one group. The fast path below
759    // still pays one `groups.get("")` hash probe per row plus
760    // `entry = &mut order[0]` reindex even when the empty-key
761    // path encodes nothing — measured ~50 ns/row across 25 k rows
762    // = ~1.25 ms of pure bookkeeping on the user_storage_usage
763    // baseline.
764    //
765    // Bypass: lift `entry` outside the loop and feed every row
766    // straight into it. Same `update_state` machinery, zero
767    // per-row hash work, zero per-row index lookup.
768    let single_anon_group = group_exprs.is_empty() && !rows.is_empty();
769    if single_anon_group {
770        // Seed the single group at idx 0 once.
771        let init: Vec<AggState> = (0..agg_specs.len()).map(|_| AggState::default()).collect();
772        order.clear();
773        order.push((Vec::new(), init));
774    }
775    // v7.36 (perf — mailrs Phase 1, count_messages 2.58 → ?) —
776    // `COUNT(*)` short-circuit. For a single-anon-group `COUNT(*)`
777    // with no FILTER / DISTINCT, every survivor counts once — the
778    // answer IS `rows.len()`. Skips the 25 k iterations of
779    // `update_state("count_star", …)` on the mailrs count_messages
780    // shape; the JOIN already produced exactly the set of rows
781    // that must be counted.
782    if single_anon_group
783        && agg_specs.len() == 1
784        && agg_specs[0].name == "count_star"
785        && agg_specs[0].filter.is_none()
786        && agg_specs[0].arg.is_none()
787        && agg_specs[0].arg2.is_none()
788        && agg_specs[0].order_by.is_empty()
789        && !agg_specs[0].distinct
790    {
791        let state = &mut order[0].1[0];
792        state.count = rows.len() as i64;
793        return Ok(order);
794    }
795    // v7.36 (perf — mailrs Phase 1) — `COUNT(<bound col>)` (non-`*`)
796    // collapses to: read the cell, increment when not NULL. Skips
797    // the per-row spec dispatch + `update_state("count", …)`.
798    if single_anon_group
799        && agg_specs.len() == 1
800        && agg_specs[0].name == "count"
801        && agg_specs[0].filter.is_none()
802        && agg_specs[0].arg2.is_none()
803        && agg_specs[0].order_by.is_empty()
804        && !agg_specs[0].distinct
805        && arg_pos[0].is_some()
806    {
807        let p = arg_pos[0].unwrap();
808        let mut count: i64 = 0;
809        for row in rows {
810            if !matches!(row.get(p), Some(Value::Null) | None) {
811                count += 1;
812            }
813        }
814        let state = &mut order[0].1[0];
815        state.count = count;
816        return Ok(order);
817    }
818    // v7.36 (perf — mailrs Phase 1, user_storage_usage 7.5 → ?) —
819    // single-aggregate streaming accumulator. For
820    // `SUM(<compiled-expr>)` / `SUM(<bound col>)` with no GROUP BY,
821    // no FILTER, no arg2, no ORDER BY, no DISTINCT, the whole
822    // per-row work collapses to: eval the arg, match the Value
823    // variant, accumulate. Skips the spec-dispatch loop +
824    // `update_state` per-row name match. On a 25 k-row JOIN
825    // (user_storage_usage `SUM(LENGTH(text_body))`) that's
826    // ~50-100 ns/row of pure spec-dispatch overhead removed.
827    if single_anon_group
828        && agg_specs.len() == 1
829        && agg_specs[0].filter.is_none()
830        && agg_specs[0].arg2.is_none()
831        && agg_specs[0].order_by.is_empty()
832        && !agg_specs[0].distinct
833        && (agg_specs[0].name == "sum" || agg_specs[0].name == "avg")
834        && (arg_pos[0].is_some() || arg_compiled[0].is_some())
835    {
836        let arg_pos0 = arg_pos[0];
837        let arg_c0 = &arg_compiled[0];
838        let mut sum_int: i64 = 0;
839        let mut sum_float: f64 = 0.0;
840        let mut use_float = false;
841        let mut count: i64 = 0;
842        // Borrow-aware fast inner: avoid the per-row clone when arg
843        // is a bound column position.
844        if let Some(p) = arg_pos0 {
845            for row in rows {
846                let v_ref = row.get(p).unwrap_or(&Value::Null);
847                match v_ref {
848                    Value::Null => continue,
849                    Value::SmallInt(n) => {
850                        sum_int += i64::from(*n);
851                        count += 1;
852                    }
853                    Value::Int(n) => {
854                        sum_int += i64::from(*n);
855                        count += 1;
856                    }
857                    Value::BigInt(n) => {
858                        sum_int += *n;
859                        count += 1;
860                    }
861                    Value::Float(x) => {
862                        sum_float += *x;
863                        use_float = true;
864                        count += 1;
865                    }
866                    other => {
867                        return Err(EvalError::TypeMismatch {
868                            detail: format!("sum/avg need numeric, got {:?}", other.data_type()),
869                        });
870                    }
871                }
872            }
873        } else if let Some(p) = arg_c0.as_ref().and_then(|c| c.as_single_column_length()) {
874            // v7.36 (perf — mailrs Phase 1, user_storage_usage hot
875            // inner) — `SUM(LENGTH(<text col>))` collapses to a
876            // straight scan: read the cell by ref, branch on the
877            // variant, do an ASCII probe + `len()` (or
878            // `chars().count()` on non-ASCII), accumulate. No Step
879            // VM, no stack push/pop, no `BigInt` boxing on the way
880            // out — pure i64 sum. The original Step VM path keeps
881            // running for everything outside this shape (`SUM(col)`,
882            // `SUM(expr)`, multi-step compiled args).
883            for row in rows {
884                let Some(v_ref) = row.get(p) else {
885                    continue;
886                };
887                let n = match v_ref {
888                    Value::Null => continue,
889                    Value::Text(s) => {
890                        if s.is_ascii() {
891                            s.len() as i64
892                        } else {
893                            s.chars().count() as i64
894                        }
895                    }
896                    other => {
897                        return Err(EvalError::TypeMismatch {
898                            detail: format!("length() needs text, got {:?}", other.data_type()),
899                        });
900                    }
901                };
902                sum_int += n;
903                count += 1;
904            }
905        } else {
906            let c = arg_c0.as_ref().unwrap();
907            for row in rows {
908                let v = eval::eval_compiled_ref(c, row, &ctx, &mut eval_stack)?;
909                match v {
910                    Value::Null => continue,
911                    Value::SmallInt(n) => {
912                        sum_int += i64::from(n);
913                        count += 1;
914                    }
915                    Value::Int(n) => {
916                        sum_int += i64::from(n);
917                        count += 1;
918                    }
919                    Value::BigInt(n) => {
920                        sum_int += n;
921                        count += 1;
922                    }
923                    Value::Float(x) => {
924                        sum_float += x;
925                        use_float = true;
926                        count += 1;
927                    }
928                    other => {
929                        return Err(EvalError::TypeMismatch {
930                            detail: format!("sum/avg need numeric, got {:?}", other.data_type()),
931                        });
932                    }
933                }
934            }
935        }
936        let state = &mut order[0].1[0];
937        state.count = count;
938        state.sum_int = sum_int;
939        state.sum_float = sum_float;
940        state.use_float = use_float;
941        return Ok(order);
942    }
943    for row in rows {
944        if single_anon_group {
945            let entry = &mut order[0];
946            let mat: Option<Cow<'_, Row>> = if needs_mat { Some(row.as_row()) } else { None };
947            for (i, spec) in agg_specs.iter().enumerate() {
948                if let Some(f) = &spec.filter
949                    && !matches!(
950                        eval_arg(f, mat.as_deref().expect("needs_mat for FILTER"), &ctx)?,
951                        Value::Bool(true)
952                    )
953                {
954                    continue;
955                }
956                let arg_owned: Value;
957                let arg_ref: &Value = match (&arg_pos[i], &arg_compiled[i], &spec.arg) {
958                    (Some(p), _, _) => row.get(*p).unwrap_or(&Value::Null),
959                    (None, _, None) => {
960                        arg_owned = Value::Bool(true);
961                        &arg_owned
962                    }
963                    (None, Some(c), _) => {
964                        arg_owned = eval::eval_compiled_ref(c, row, &ctx, &mut eval_stack)?;
965                        &arg_owned
966                    }
967                    (None, None, Some(e)) => {
968                        arg_owned = eval_arg(
969                            e,
970                            mat.as_deref().expect("needs_mat for non-bound arg"),
971                            &ctx,
972                        )?;
973                        &arg_owned
974                    }
975                };
976                let arg2_val = match &spec.arg2 {
977                    None => None,
978                    Some(e) => Some(eval_arg(
979                        e,
980                        mat.as_deref().expect("needs_mat for arg2"),
981                        &ctx,
982                    )?),
983                };
984                let order_keys = if spec.order_by.is_empty() {
985                    None
986                } else {
987                    let mut keys = Vec::with_capacity(spec.order_by.len());
988                    for (k, o) in spec.order_by.iter().enumerate() {
989                        let v = if let Some(p) = order_pos[i][k] {
990                            row.get(p).cloned().unwrap_or(Value::Null)
991                        } else {
992                            eval_arg(
993                                &o.expr,
994                                mat.as_deref().expect("needs_mat for ORDER key"),
995                                &ctx,
996                            )?
997                        };
998                        keys.push(v);
999                    }
1000                    Some(keys)
1001                };
1002                // v7.36 (perf — bugfix v7.36.1 candidate) — first_ordered
1003                // was missing from the single_anon_group fast path,
1004                // sending `(array_agg(x ORDER BY y))[1]` values into
1005                // `update_state(array_agg, …)` whose finalize ignored
1006                // the absent `first_best` and returned `[]`. The slow
1007                // path below has the same branch — keep them aligned.
1008                if spec.first_ordered {
1009                    if let Some(keys) = order_keys {
1010                        let st = &mut entry.1[i];
1011                        let better = match &st.first_best {
1012                            None => true,
1013                            Some((bk, _)) => {
1014                                cmp_order_keys(&spec.order_by, &keys, bk)
1015                                    == core::cmp::Ordering::Less
1016                            }
1017                        };
1018                        if better {
1019                            st.first_best = Some((keys, arg_ref.clone()));
1020                        }
1021                    }
1022                    continue;
1023                }
1024                if spec.distinct {
1025                    encode_key_refs_into(core::slice::from_ref(&arg_ref), &mut dkeybuf);
1026                    if entry.1[i].seen.contains(dkeybuf.as_str()) {
1027                        continue;
1028                    }
1029                    entry.1[i].seen.insert(dkeybuf.clone());
1030                }
1031                update_state(
1032                    &mut entry.1[i],
1033                    &spec.name,
1034                    arg_ref,
1035                    arg2_val.as_ref(),
1036                    order_keys,
1037                )?;
1038            }
1039            continue;
1040        }
1041        // Fast key: bound positions + no ci folding -> encode
1042        // straight from borrowed cells; group_vals materialise
1043        // only when the group is NEW.
1044        if all_groups_bound && ci_positions.is_empty() {
1045            refs.clear();
1046            refs.extend(
1047                group_pos
1048                    .iter()
1049                    .map(|p| row.get(p.unwrap()).unwrap_or(&Value::Null)),
1050            );
1051            encode_key_refs_into(&refs, &mut keybuf_s);
1052            let idx = match groups.get(keybuf_s.as_str()) {
1053                Some(&i) => i,
1054                None => {
1055                    let i = order.len();
1056                    let init: Vec<AggState> =
1057                        (0..agg_specs.len()).map(|_| AggState::default()).collect();
1058                    let owned: Vec<Value> = refs.iter().map(|v| (*v).clone()).collect();
1059                    order.push((owned, init));
1060                    groups.insert(keybuf_s.clone(), i);
1061                    i
1062                }
1063            };
1064            let entry = &mut order[idx];
1065            // v7.33 (array_agg perf) — materialise the combined row AT
1066            // MOST once per input row, and only when a spec actually
1067            // needs the eval path (FILTER / non-bound arg / arg2 / non-
1068            // bound ORDER key). Bound args and bound ORDER keys read
1069            // cells by reference below, so the inbox shape (all bound)
1070            // never materialises — killing the per-row ~1 KB clone that
1071            // dominated the ordered-aggregate cost.
1072            let mat: Option<Cow<'_, Row>> = if needs_mat { Some(row.as_row()) } else { None };
1073            for (i, spec) in agg_specs.iter().enumerate() {
1074                // v7.32 (round-29) — FILTER (WHERE cond): exclude rows
1075                // where cond is not TRUE before they reach this
1076                // aggregate's accumulator (and before DISTINCT dedup).
1077                if let Some(f) = &spec.filter
1078                    && !matches!(
1079                        eval_arg(f, mat.as_deref().expect("needs_mat for FILTER"), &ctx)?,
1080                        Value::Bool(true)
1081                    )
1082                {
1083                    continue;
1084                }
1085                let arg_owned: Value;
1086                let arg_ref: &Value = match (&arg_pos[i], &arg_compiled[i], &spec.arg) {
1087                    (Some(p), _, _) => row.get(*p).unwrap_or(&Value::Null),
1088                    (None, _, None) => {
1089                        arg_owned = Value::Bool(true);
1090                        &arg_owned
1091                    }
1092                    (None, Some(c), _) => {
1093                        // v7.36 — compiled-arg fast path. `eval_stack`
1094                        // is reused across rows; the Step VM never
1095                        // materialises a row for column reads.
1096                        arg_owned = eval::eval_compiled_ref(c, row, &ctx, &mut eval_stack)?;
1097                        &arg_owned
1098                    }
1099                    (None, None, Some(e)) => {
1100                        arg_owned = eval_arg(
1101                            e,
1102                            mat.as_deref().expect("needs_mat for non-bound arg"),
1103                            &ctx,
1104                        )?;
1105                        &arg_owned
1106                    }
1107                };
1108                let arg2_val = match &spec.arg2 {
1109                    None => None,
1110                    Some(e) => Some(eval_arg(
1111                        e,
1112                        mat.as_deref().expect("needs_mat for arg2"),
1113                        &ctx,
1114                    )?),
1115                };
1116                let order_keys = if spec.order_by.is_empty() {
1117                    None
1118                } else {
1119                    let mut keys = Vec::with_capacity(spec.order_by.len());
1120                    for (k, o) in spec.order_by.iter().enumerate() {
1121                        // Bound ORDER key → read the cell by reference; only
1122                        // a non-bound key falls to the materialised eval path.
1123                        keys.push(match order_pos[i][k] {
1124                            Some(p) => row.get(p).cloned().unwrap_or(Value::Null),
1125                            None => eval_arg(
1126                                &o.expr,
1127                                mat.as_deref().expect("needs_mat for non-bound ORDER key"),
1128                                &ctx,
1129                            )?,
1130                        });
1131                    }
1132                    Some(keys)
1133                };
1134                // v7.33 (array_agg argmax) — first_ordered: keep only the
1135                // running first-by-order element (strict-less replacement
1136                // = ties keep the earliest row, matching the stable-sort
1137                // `[1]`), no array build.
1138                if spec.first_ordered {
1139                    if let Some(keys) = order_keys {
1140                        let st = &mut entry.1[i];
1141                        let better = match &st.first_best {
1142                            None => true,
1143                            Some((bk, _)) => {
1144                                cmp_order_keys(&spec.order_by, &keys, bk)
1145                                    == core::cmp::Ordering::Less
1146                            }
1147                        };
1148                        if better {
1149                            st.first_best = Some((keys, arg_ref.clone()));
1150                        }
1151                    }
1152                    continue;
1153                }
1154                if spec.distinct {
1155                    encode_key_refs_into(core::slice::from_ref(&arg_ref), &mut dkeybuf);
1156                    if entry.1[i].seen.contains(dkeybuf.as_str()) {
1157                        continue;
1158                    }
1159                    entry.1[i].seen.insert(dkeybuf.clone());
1160                }
1161                update_state(
1162                    &mut entry.1[i],
1163                    &spec.name,
1164                    arg_ref,
1165                    arg2_val.as_ref(),
1166                    order_keys,
1167                )?;
1168            }
1169            continue;
1170        }
1171        // v7.32 (P4 increment 2) — eval (non-bound) path: present the
1172        // row as a borrowed Row once (Owned → zero-cost borrow; a join
1173        // tuple materialises here exactly once, never on the bound fast
1174        // path above), then the original eval loop runs unchanged.
1175        let row_materialised = row.as_row();
1176        let row: &Row = &row_materialised;
1177        let group_vals: Vec<Value> = group_exprs
1178            .iter()
1179            .map(|g| eval::eval_expr(g, row, &ctx))
1180            .collect::<Result<_, _>>()?;
1181        // v7.17.0 Phase 2.5b — case-insensitive group keying: fold
1182        // only the ci columns, and only when any exist. Display
1183        // value (`group_vals`) stays original — only the key folds.
1184        let key = if ci_positions.is_empty() {
1185            encode_key(&group_vals)
1186        } else {
1187            let mut key_vals = group_vals.clone();
1188            for &i in &ci_positions {
1189                if let Value::Text(s) = &key_vals[i] {
1190                    key_vals[i] = Value::Text(s.to_ascii_lowercase());
1191                }
1192            }
1193            encode_key(&key_vals)
1194        };
1195        // Probe by index; the map owns the key once on vacant insert.
1196        let idx = match groups.get(key.as_str()) {
1197            Some(&i) => i,
1198            None => {
1199                let i = order.len();
1200                let init: Vec<AggState> =
1201                    (0..agg_specs.len()).map(|_| AggState::default()).collect();
1202                order.push((group_vals.clone(), init));
1203                groups.insert(key, i);
1204                i
1205            }
1206        };
1207        let entry = &mut order[idx];
1208        for (i, spec) in agg_specs.iter().enumerate() {
1209            // v7.32 (round-29) — FILTER (WHERE cond): exclude rows where
1210            // cond is not TRUE before accumulation (and before DISTINCT).
1211            if let Some(f) = &spec.filter
1212                && !matches!(eval_arg(f, row, &ctx)?, Value::Bool(true))
1213            {
1214                continue;
1215            }
1216            let arg_val = match &spec.arg {
1217                None => Value::Bool(true), // count_star: sentinel non-null
1218                Some(e) => eval_arg(e, row, &ctx)?,
1219            };
1220            // v7.17.0 — `string_agg(value, separator)` evaluates the
1221            // separator per row but PG treats it as constant; we
1222            // pass the per-row value into update_state so a future
1223            // varying-separator caller still sees correct output,
1224            // even though SPG (like PG) only uses the most recent.
1225            let arg2_val = match &spec.arg2 {
1226                None => None,
1227                Some(e) => Some(eval_arg(e, row, &ctx)?),
1228            };
1229            // v7.24 (round-16 A) — aggregate-internal ORDER BY:
1230            // evaluate the key tuple against the source row.
1231            let order_keys = if spec.order_by.is_empty() {
1232                None
1233            } else {
1234                let mut keys = Vec::with_capacity(spec.order_by.len());
1235                for o in &spec.order_by {
1236                    keys.push(eval_arg(&o.expr, row, &ctx)?);
1237                }
1238                Some(keys)
1239            };
1240            // v7.33 (array_agg argmax) — first_ordered: keep the running
1241            // first-by-order element only (mirrors the bound fast path).
1242            if spec.first_ordered {
1243                if let Some(keys) = order_keys {
1244                    let st = &mut entry.1[i];
1245                    let better = match &st.first_best {
1246                        None => true,
1247                        Some((bk, _)) => {
1248                            cmp_order_keys(&spec.order_by, &keys, bk) == core::cmp::Ordering::Less
1249                        }
1250                    };
1251                    if better {
1252                        st.first_best = Some((keys, arg_val.clone()));
1253                    }
1254                }
1255                continue;
1256            }
1257            // v7.25 (round-17) — DISTINCT: drop repeated inputs
1258            // before they reach the accumulator. NULLs flow through
1259            // (each aggregate's own NULL rule applies; PG also
1260            // treats NULL as a single distinct value for array_agg).
1261            if spec.distinct {
1262                let key = encode_key(core::slice::from_ref(&arg_val));
1263                if !entry.1[i].seen.insert(key) {
1264                    continue;
1265                }
1266            }
1267            update_state(
1268                &mut entry.1[i],
1269                &spec.name,
1270                &arg_val,
1271                arg2_val.as_ref(),
1272                order_keys,
1273            )?;
1274        }
1275    }
1276    Ok(order)
1277}
1278
1279/// (2a) Build the synthetic per-group schema: `__grp_0..K` then
1280/// `__agg_0..N`. Group types are probed from the first row; aggregate
1281/// types from each spec.
1282fn build_synth_schema(
1283    rows: &[RowRef<'_>],
1284    group_exprs: &[Expr],
1285    agg_specs: &[AggSpec],
1286    schema_cols: &[ColumnSchema],
1287    table_alias: Option<&str>,
1288) -> Result<Vec<ColumnSchema>, EvalError> {
1289    let ctx = EvalContext::new(schema_cols, table_alias);
1290    // Build synthetic schema: __grp_0..K then __agg_0..N.
1291    let group_types: Vec<DataType> = if rows.is_empty() {
1292        // Use Text as a safe stand-in — empty result means schema isn't
1293        // observable. Avoids needing to evaluate group exprs on no row.
1294        group_exprs.iter().map(|_| DataType::Text).collect()
1295    } else {
1296        let probe_row = rows[0].as_row();
1297        let probe: &Row = &probe_row;
1298        group_exprs
1299            .iter()
1300            .map(|g| {
1301                eval::eval_expr(g, probe, &ctx).map(|v| v.data_type().unwrap_or(DataType::Text))
1302            })
1303            .collect::<Result<_, _>>()?
1304    };
1305    let agg_types: Vec<DataType> = agg_specs
1306        .iter()
1307        .map(|spec| infer_agg_type(spec, schema_cols))
1308        .collect();
1309    let mut synth_schema: Vec<ColumnSchema> = Vec::new();
1310    for (i, ty) in group_types.iter().enumerate() {
1311        synth_schema.push(ColumnSchema::new(format!("__grp_{i}"), *ty, true));
1312    }
1313    for (i, ty) in agg_types.iter().enumerate() {
1314        synth_schema.push(ColumnSchema::new(format!("__agg_{i}"), *ty, true));
1315    }
1316    Ok(synth_schema)
1317}
1318
1319/// (2b) Materialise one synthetic row per group (insertion order):
1320/// apply each aggregate's internal ORDER BY, then finalise the running
1321/// state into the group + aggregate cells.
1322/// v7.33 — compare two aggregate-internal ORDER BY key tuples under the
1323/// per-key DESC / NULLS directives. This is the exact comparator the
1324/// finalize sort uses, factored out so the `first_ordered` argmax
1325/// accumulator's "keep first" decision is provably identical to taking
1326/// element `[1]` of the fully-sorted array.
1327fn cmp_order_keys(
1328    order_by: &[spg_sql::ast::OrderBy],
1329    a: &[Value],
1330    b: &[Value],
1331) -> core::cmp::Ordering {
1332    for (k, o) in order_by.iter().enumerate() {
1333        let cmp = crate::order_by_value_cmp(o.desc, o.nulls_first, &a[k], &b[k]);
1334        if cmp != core::cmp::Ordering::Equal {
1335            return cmp;
1336        }
1337    }
1338    core::cmp::Ordering::Equal
1339}
1340
1341fn finalize_synth_rows(
1342    order: &[(Vec<Value>, Vec<AggState>)],
1343    agg_specs: &[AggSpec],
1344    synth_schema: &[ColumnSchema],
1345    rows: &[RowRef<'_>],
1346    schema_cols: &[ColumnSchema],
1347    table_alias: Option<&str>,
1348) -> Result<Vec<Row>, EvalError> {
1349    let ctx = EvalContext::new(schema_cols, table_alias);
1350    // v7.32 (round-29) — ordered-set direct arguments (the percentile
1351    // fraction) are constant per PG, so evaluate each once up front.
1352    let direct_arg_vals: Vec<Option<Value>> = agg_specs
1353        .iter()
1354        .map(|spec| match (&spec.direct_arg, rows.first()) {
1355            (Some(e), Some(r)) => eval::eval_expr(e, &r.as_row(), &ctx).map(Some),
1356            _ => Ok(None),
1357        })
1358        .collect::<Result<_, _>>()?;
1359
1360    // Materialise synthetic rows (insertion order = `order`).
1361    let mut synth_rows: Vec<Row> = Vec::new();
1362    for (gvals, states) in order {
1363        let mut values: Vec<Value> = Vec::with_capacity(synth_schema.len());
1364        values.extend(gvals.iter().cloned());
1365        for (i, st) in states.iter().enumerate() {
1366            // v7.33 (array_agg argmax) — first_ordered: the running
1367            // first-by-order value IS the result; no array build/sort.
1368            if agg_specs[i].first_ordered {
1369                values.push(
1370                    st.first_best
1371                        .as_ref()
1372                        .map_or(Value::Null, |(_, v)| v.clone()),
1373                );
1374                continue;
1375            }
1376            // v7.24 (round-16 A) — order the collected items per the
1377            // aggregate-internal ORDER BY before finalize consumes
1378            // them.
1379            let st_sorted;
1380            let st_final: &AggState =
1381                if !agg_specs[i].order_by.is_empty() && st.item_keys.len() == st.items.len() {
1382                    let mut idx: Vec<usize> = (0..st.items.len()).collect();
1383                    let ob = &agg_specs[i].order_by;
1384                    idx.sort_by(|&x, &y| cmp_order_keys(ob, &st.item_keys[x], &st.item_keys[y]));
1385                    let mut sorted = st.clone();
1386                    sorted.items = idx.iter().map(|&j| st.items[j].clone()).collect();
1387                    st_sorted = sorted;
1388                    &st_sorted
1389                } else {
1390                    st
1391                };
1392            // Ordered-set aggregates compute from the sorted items + the
1393            // direct fraction; everything else uses the running state.
1394            let v = if is_within_group_name(&agg_specs[i].name) {
1395                finalize_ordered_set(
1396                    &agg_specs[i].name,
1397                    st_final,
1398                    direct_arg_vals[i].as_ref(),
1399                    agg_specs[i].order_by.first(),
1400                )
1401            } else {
1402                finalize(&agg_specs[i].name, st_final)
1403            };
1404            values.push(v);
1405        }
1406        synth_rows.push(Row::new(values));
1407    }
1408    Ok(synth_rows)
1409}
1410
1411/// (3) Rewrite the user's SELECT items + HAVING to reference the
1412/// synthetic columns, filter groups by HAVING, and project each
1413/// surviving group into an output row. The synth rows ride alongside
1414/// (`kept_synth`) so post-LIMIT deferred subqueries can evaluate later.
1415#[allow(clippy::too_many_lines)]
1416fn project_groups(
1417    synth_rows: Vec<Row>,
1418    stmt: &SelectStatement,
1419    group_exprs: &[Expr],
1420    agg_specs: &[AggSpec],
1421    synth_schema: &[ColumnSchema],
1422    correlated_eval: Option<CorrelatedEval<'_>>,
1423) -> Result<Projection, EvalError> {
1424    // Rewrite the user's SELECT items + ORDER BY to reference synthetic
1425    // columns. After rewriting, every remaining `Expr::Column` must
1426    // resolve against the synthetic schema (i.e. must have been a GROUP
1427    // BY expression).
1428    let columns: Vec<ColumnSchema> = stmt
1429        .items
1430        .iter()
1431        .map(|item| match item {
1432            SelectItem::Wildcard => Err(EvalError::TypeMismatch {
1433                detail: "SELECT * with aggregates is not supported".into(),
1434            }),
1435            SelectItem::Expr { expr, alias } => {
1436                let rewritten = rewrite_expr(expr, group_exprs, agg_specs);
1437                let name = alias.clone().unwrap_or_else(|| expr.to_string());
1438                Ok(ColumnSchema::new(
1439                    name,
1440                    agg_or_group_type(&rewritten, synth_schema),
1441                    true,
1442                ))
1443            }
1444        })
1445        .collect::<Result<_, _>>()?;
1446
1447    // Project per synthetic row. HAVING filters out groups *before*
1448    // we keep the projected row — same semantics as PG: HAVING runs
1449    // against the aggregated row (so `HAVING count(*) > 1` works) and
1450    // sees only group-by'd columns plus aggregate values.
1451    let synth_ctx = EvalContext::new(synth_schema, None);
1452    let having_rewritten = stmt
1453        .having
1454        .as_ref()
1455        .map(|h| rewrite_expr(h, group_exprs, agg_specs));
1456    // v7.30 (phase 3e-1) - rewrite SELECT items ONCE. This ran per
1457    // GROUP (23.5k x 9 items of AST cloning = ~48% of the inbox
1458    // query in sampled stacks); the rewrite is group-independent.
1459    // Stable addresses also let the per-expression subquery plans
1460    // (v7.29 3c) hit across groups instead of rebuilding.
1461    let items_rewritten: alloc::vec::Vec<Option<Expr>> = stmt
1462        .items
1463        .iter()
1464        .map(|item| match item {
1465            SelectItem::Expr { expr, .. } => Some(rewrite_expr(expr, group_exprs, agg_specs)),
1466            SelectItem::Wildcard => None,
1467        })
1468        .collect();
1469    // v7.31 (perf — PG lesson #1): subquery-bearing select items
1470    // deferred to post-LIMIT, when no sort/filter key can observe
1471    // them. ORDER BY rewrites are hoisted here so the safety check
1472    // and the sort below share one rewrite pass.
1473    let order_rewritten: Vec<Expr> = stmt
1474        .order_by
1475        .iter()
1476        .map(|o| rewrite_expr(&o.expr, group_exprs, agg_specs))
1477        .collect();
1478    let defer_enabled = correlated_eval.is_some()
1479        && !stmt.distinct
1480        && !having_rewritten
1481            .as_ref()
1482            .is_some_and(crate::expr_has_subquery)
1483        && !order_rewritten.iter().any(crate::expr_has_subquery);
1484    let deferred: Vec<(usize, Expr)> = if defer_enabled {
1485        items_rewritten
1486            .iter()
1487            .enumerate()
1488            .filter_map(|(i, r)| {
1489                r.as_ref()
1490                    .filter(|e| crate::expr_has_subquery(e))
1491                    .map(|e| (i, e.clone()))
1492            })
1493            .collect()
1494    } else {
1495        Vec::new()
1496    };
1497    // v7.32 (architecture v2, P2) — compile the per-group synth-row
1498    // expressions ONCE. The projection / HAVING here run per GROUP
1499    // (24k for the inbox shape) × per item; the rewritten exprs are
1500    // mostly `Column(__agg_N)` / `Column(__grp_K)` against the synth
1501    // schema — flat step programs, no tree walk per group.
1502    let having_compiled = having_rewritten
1503        .as_ref()
1504        .filter(|h| eval::fully_compilable(h))
1505        .map(|h| eval::compile_expr(h, &synth_ctx));
1506    let items_compiled: Vec<Option<eval::CompiledExpr>> = items_rewritten
1507        .iter()
1508        .enumerate()
1509        .map(|(i, r)| {
1510            r.as_ref()
1511                .filter(|e| !deferred.iter().any(|(c, _)| *c == i) && eval::fully_compilable(e))
1512                .map(|e| eval::compile_expr(e, &synth_ctx))
1513        })
1514        .collect();
1515    let mut kept_synth: Vec<Row> = Vec::new();
1516    let mut out_rows: Vec<Row> = Vec::new();
1517    let mut stack: Vec<Value> = Vec::new();
1518    for srow in synth_rows {
1519        if let Some(hc) = &having_compiled {
1520            let cond = eval::eval_compiled(hc, &srow, &synth_ctx, &mut stack)?;
1521            if !matches!(cond, Value::Bool(true)) {
1522                continue;
1523            }
1524        } else if let Some(h) = &having_rewritten {
1525            let cond = match correlated_eval {
1526                Some(f) if crate::expr_has_subquery(h) => f(h, &srow, &synth_ctx)?,
1527                _ => eval::eval_expr(h, &srow, &synth_ctx)?,
1528            };
1529            if !matches!(cond, Value::Bool(true)) {
1530                continue;
1531            }
1532        }
1533        let mut values: Vec<Value> = Vec::with_capacity(columns.len());
1534        for (i, rewritten) in items_rewritten.iter().enumerate() {
1535            let Some(rewritten) = rewritten else { continue };
1536            if deferred.iter().any(|(c, _)| *c == i) {
1537                values.push(Value::Null);
1538                continue;
1539            }
1540            values.push(if let Some(cc) = &items_compiled[i] {
1541                eval::eval_compiled(cc, &srow, &synth_ctx, &mut stack)?
1542            } else {
1543                match correlated_eval {
1544                    Some(f) if crate::expr_has_subquery(rewritten) => {
1545                        f(rewritten, &srow, &synth_ctx)?
1546                    }
1547                    _ => eval::eval_expr(rewritten, &srow, &synth_ctx)?,
1548                }
1549            });
1550        }
1551        kept_synth.push(srow);
1552        out_rows.push(Row::new(values));
1553    }
1554    Ok(Projection {
1555        columns,
1556        out_rows,
1557        kept_synth,
1558        deferred,
1559        order_rewritten,
1560    })
1561}
1562
1563/// (4) Sort the projected output by the rewritten ORDER BY keys. The
1564/// synth rows ride through the sort so deferred subqueries evaluate
1565/// against the surviving groups after the caller's LIMIT truncation.
1566fn sort_synth_by_order_by(
1567    synth_schema: &[ColumnSchema],
1568    order_by: &[spg_sql::ast::OrderBy],
1569    order_rewritten: &[Expr],
1570    mut kept_synth: Vec<Row>,
1571    mut out_rows: Vec<Row>,
1572    correlated_eval: Option<CorrelatedEval<'_>>,
1573    keep_n: Option<usize>,
1574) -> Result<(Vec<Row>, Vec<Row>), EvalError> {
1575    let synth_ctx = EvalContext::new(synth_schema, None);
1576    // v6.4.0 — multi-key ORDER BY on aggregate output. Each key
1577    // gets its own rewrite + per-key DESC flag. (Rewrites hoisted
1578    // above as `order_rewritten` — shared with the deferral
1579    // safety check.)
1580    let keys_meta: Vec<(bool, Option<bool>)> =
1581        order_by.iter().map(|o| (o.desc, o.nulls_first)).collect();
1582    // P2: compile order-by keys once (per-group sort keys are
1583    // the same `__agg_N` / `__grp_K` shape as the projection).
1584    let order_compiled: Vec<Option<eval::CompiledExpr>> = order_rewritten
1585        .iter()
1586        .map(|e| {
1587            Some(e)
1588                .filter(|e| eval::fully_compilable(e))
1589                .map(|e| eval::compile_expr(e, &synth_ctx))
1590        })
1591        .collect();
1592    // The synth row rides through the sort so deferred exprs can
1593    // evaluate against the surviving groups after the caller's
1594    // LIMIT truncation.
1595    let mut keystack: Vec<Value> = Vec::new();
1596    let mut tagged: Vec<(Vec<Value>, Row, Row)> = Vec::with_capacity(kept_synth.len());
1597    for (s, o) in kept_synth.into_iter().zip(out_rows) {
1598        let mut keys = Vec::with_capacity(order_rewritten.len());
1599        for (e, oc) in order_rewritten.iter().zip(&order_compiled) {
1600            keys.push(if let Some(oc) = oc {
1601                eval::eval_compiled(oc, &s, &synth_ctx, &mut keystack)?
1602            } else {
1603                match correlated_eval {
1604                    Some(f) if crate::expr_has_subquery(e) => f(e, &s, &synth_ctx)?,
1605                    _ => eval::eval_expr(e, &s, &synth_ctx)?,
1606                }
1607            });
1608        }
1609        tagged.push((keys, s, o));
1610    }
1611    let cmp = |a: &(Vec<Value>, Row, Row), b: &(Vec<Value>, Row, Row)| {
1612        use core::cmp::Ordering;
1613        for (i, (ka, kb)) in a.0.iter().zip(b.0.iter()).enumerate() {
1614            let (desc, nf) = keys_meta[i];
1615            let c = crate::order_by_value_cmp(desc, nf, ka, kb);
1616            if c != Ordering::Equal {
1617                return c;
1618            }
1619        }
1620        Ordering::Equal
1621    };
1622    // v7.37.3 — top-K partial sort when `keep_n` is small enough to
1623    // matter (`Some(k)` with `k < tagged.len()` and `k > 0`).
1624    // `select_nth_unstable_by` partitions in O(N), then we sort the
1625    // surviving prefix in O(K log K). Total = O(N + K log K) vs
1626    // O(N log N) the full sort would pay — matches the inbox-listing
1627    // shape PG uses.
1628    //
1629    match keep_n {
1630        Some(k) if k < tagged.len() && k > 0 => {
1631            let pivot = k - 1;
1632            tagged.select_nth_unstable_by(pivot, cmp);
1633            tagged[..k].sort_by(cmp);
1634            tagged.truncate(k);
1635        }
1636        _ => {
1637            tagged.sort_by(cmp);
1638        }
1639    }
1640    kept_synth = Vec::with_capacity(tagged.len());
1641    out_rows = Vec::with_capacity(tagged.len());
1642    for (_, s, o) in tagged {
1643        kept_synth.push(s);
1644        out_rows.push(o);
1645    }
1646    Ok((kept_synth, out_rows))
1647}
1648
1649/// v7.17.0 — walk the statement again to validate the positional
1650/// arity of every aggregate call site. Done after AST collection
1651/// rather than inside `collect_aggregates` so the collector stays
1652/// infallible; callers in `run()` can do a single early-error
1653/// exit before any per-row work.
1654fn validate_agg_arities(stmt: &SelectStatement, _specs: &[AggSpec]) -> Result<(), EvalError> {
1655    fn walk(e: &Expr) -> Result<(), EvalError> {
1656        if let Expr::FunctionCall { name, args } = e {
1657            let lower = name.to_ascii_lowercase();
1658            let expected: Option<usize> = match lower.as_str() {
1659                "count_star" => Some(0),
1660                "count" | "sum" | "avg" | "min" | "max" | "array_agg"
1661                // v7.17.0 — boolean aggregates also take exactly
1662                // one arg. `every` is an alias normalised inside
1663                // collect_aggregates / rewrite_expr.
1664                | "bool_and" | "bool_or" | "every"
1665                // v7.32 (round-29) — statistical + bitwise aggregates
1666                // + single-arg JSON aggregate.
1667                | "stddev" | "stddev_samp" | "stddev_pop"
1668                | "variance" | "var_samp" | "var_pop"
1669                | "bit_and" | "bit_or" | "bit_xor"
1670                | "json_agg" | "jsonb_agg" => Some(1),
1671                // v7.32 (round-29) — two-argument aggregates: string_agg,
1672                // the regression family f(Y, X), and json_object_agg.
1673                "string_agg"
1674                | "covar_pop" | "covar_samp" | "corr"
1675                | "regr_count" | "regr_avgx" | "regr_avgy" | "regr_slope"
1676                | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy"
1677                | "json_object_agg" | "jsonb_object_agg" => Some(2),
1678                _ => None,
1679            };
1680            if let Some(want) = expected
1681                && args.len() != want
1682            {
1683                return Err(EvalError::TypeMismatch {
1684                    detail: alloc::format!("{lower}() takes {want} arg(s), got {}", args.len()),
1685                });
1686            }
1687            for a in args {
1688                walk(a)?;
1689            }
1690        } else if let Expr::Binary { lhs, rhs, .. } = e {
1691            walk(lhs)?;
1692            walk(rhs)?;
1693        } else if let Expr::Unary { expr, .. }
1694        | Expr::Cast { expr, .. }
1695        | Expr::IsNull { expr, .. } = e
1696        {
1697            walk(expr)?;
1698        }
1699        Ok(())
1700    }
1701    for item in &stmt.items {
1702        if let SelectItem::Expr { expr, .. } = item {
1703            walk(expr)?;
1704        }
1705    }
1706    for o in &stmt.order_by {
1707        walk(&o.expr)?;
1708    }
1709    if let Some(h) = &stmt.having {
1710        walk(h)?;
1711    }
1712    Ok(())
1713}
1714
1715/// v7.33 (array_agg argmax) — recognise `(array_agg(x ORDER BY y))[1]`,
1716/// the argmax/argmin idiom: a non-DISTINCT ordered `array_agg`
1717/// subscripted by the constant 1. Returns `(value_arg, order_by,
1718/// filter)` on a match. When matched, the whole per-group array build +
1719/// sort + materialise is replaced by a running first-by-order scalar
1720/// accumulator and the subscript node is consumed (replaced by the
1721/// synthetic column). collect_aggregates and rewrite_expr share this one
1722/// matcher so their `__agg_<i>` assignment stays in lockstep.
1723fn first_ordered_array_agg(e: &Expr) -> Option<(&Expr, &[spg_sql::ast::OrderBy], Option<&Expr>)> {
1724    let Expr::ArraySubscript { target, index } = e else {
1725        return None;
1726    };
1727    if !matches!(
1728        index.as_ref(),
1729        Expr::Literal(spg_sql::ast::Literal::Integer(1))
1730    ) {
1731        return None;
1732    }
1733    let Expr::AggregateOrdered {
1734        call,
1735        order_by,
1736        distinct,
1737        filter,
1738    } = target.as_ref()
1739    else {
1740        return None;
1741    };
1742    if *distinct || order_by.is_empty() {
1743        return None;
1744    }
1745    let Expr::FunctionCall { name, args } = call.as_ref() else {
1746        return None;
1747    };
1748    if !name.eq_ignore_ascii_case("array_agg") || args.len() != 1 {
1749        return None;
1750    }
1751    Some((&args[0], order_by, filter.as_deref()))
1752}
1753
1754fn collect_aggregates(e: &Expr, out: &mut Vec<AggSpec>) {
1755    match e {
1756        // v7.24 (round-16 A) — ordered aggregate: register the inner
1757        // call's spec with the ordering attached.
1758        Expr::AggregateOrdered {
1759            call,
1760            order_by,
1761            distinct,
1762            filter,
1763        } => {
1764            if let Expr::FunctionCall { name, args } = call.as_ref() {
1765                let lower = name.to_ascii_lowercase();
1766                if is_aggregate_name(&lower) {
1767                    let canonical = if lower == "every" {
1768                        "bool_and".to_string()
1769                    } else {
1770                        lower
1771                    };
1772                    // Ordered-set aggregates (`percentile_cont(f)
1773                    // WITHIN GROUP (ORDER BY x)`) take the value to
1774                    // aggregate from the sort spec and the in-parens
1775                    // arg as the direct (fraction) argument.
1776                    let ordered_set = is_within_group_name(&canonical);
1777                    let (arg, direct_arg) = if ordered_set {
1778                        (
1779                            order_by.first().map(|o| o.expr.clone()),
1780                            args.first().cloned(),
1781                        )
1782                    } else {
1783                        (args.first().cloned(), None)
1784                    };
1785                    let spec = AggSpec {
1786                        name: canonical.clone(),
1787                        arg,
1788                        arg2: if agg_uses_second_arg(&canonical) {
1789                            args.get(1).cloned()
1790                        } else {
1791                            None
1792                        },
1793                        distinct: *distinct,
1794                        order_by: order_by.clone(),
1795                        filter: filter.as_deref().cloned(),
1796                        direct_arg,
1797                        first_ordered: false,
1798                    };
1799                    if !out.iter().any(|s| {
1800                        s.name == spec.name
1801                            && s.arg == spec.arg
1802                            && s.arg2 == spec.arg2
1803                            && s.distinct == spec.distinct
1804                            && s.order_by == spec.order_by
1805                            && s.filter == spec.filter
1806                            && s.direct_arg == spec.direct_arg
1807                            && s.first_ordered == spec.first_ordered
1808                    }) {
1809                        out.push(spec);
1810                    }
1811                    return;
1812                }
1813            }
1814            collect_aggregates(call, out);
1815            for o in order_by {
1816                collect_aggregates(&o.expr, out);
1817            }
1818        }
1819        Expr::FunctionCall { name, args } => {
1820            let lower = name.to_ascii_lowercase();
1821            if is_aggregate_name(&lower) {
1822                let arg = if lower == "count_star" {
1823                    None
1824                } else {
1825                    args.first().cloned()
1826                };
1827                // v7.17.0 — second positional arg for
1828                // `string_agg(value, separator)`; v7.32 — also the
1829                // regression family `f(Y, X)` and `json_object_agg`.
1830                let arg2 = if agg_uses_second_arg(&lower) {
1831                    args.get(1).cloned()
1832                } else {
1833                    None
1834                };
1835                // v7.17.0 — `every` is the SQL-standard alias for
1836                // `bool_and`; collapse at collection time so
1837                // update_state / finalize need only one arm.
1838                let canonical = if lower == "every" {
1839                    "bool_and".to_string()
1840                } else {
1841                    lower
1842                };
1843                let spec = AggSpec {
1844                    name: canonical,
1845                    arg: arg.clone(),
1846                    arg2: arg2.clone(),
1847                    distinct: false,
1848                    order_by: Vec::new(),
1849                    filter: None,
1850                    direct_arg: None,
1851                    first_ordered: false,
1852                };
1853                if !out.iter().any(|s| {
1854                    s.name == spec.name
1855                        && s.arg == spec.arg
1856                        && s.arg2 == spec.arg2
1857                        && !s.distinct
1858                        && s.order_by == spec.order_by
1859                        && s.filter.is_none()
1860                        && !s.first_ordered
1861                }) {
1862                    out.push(spec);
1863                }
1864                // Don't recurse into the arg — nested aggregates are
1865                // illegal in standard SQL.
1866            } else {
1867                for a in args {
1868                    collect_aggregates(a, out);
1869                }
1870            }
1871        }
1872        Expr::Binary { lhs, rhs, .. } => {
1873            collect_aggregates(lhs, out);
1874            collect_aggregates(rhs, out);
1875        }
1876        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
1877            collect_aggregates(expr, out);
1878        }
1879        Expr::Like { expr, pattern, .. } => {
1880            collect_aggregates(expr, out);
1881            collect_aggregates(pattern, out);
1882        }
1883        Expr::InList { expr, list, .. } => {
1884            collect_aggregates(expr, out);
1885            for item in list {
1886                collect_aggregates(item, out);
1887            }
1888        }
1889        Expr::Extract { source, .. } => collect_aggregates(source, out),
1890        // v4.10 subquery + v4.12 window / Literal / Column —
1891        // non-recursing leaves for the aggregate collector.
1892        Expr::ScalarSubquery(_)
1893        | Expr::Exists { .. }
1894        | Expr::InSubquery { .. }
1895        | Expr::WindowFunction { .. }
1896        | Expr::Literal(_)
1897        | Expr::Placeholder(_)
1898        | Expr::Column(_) => {}
1899        // v7.10.10 — recurse into array constructor children +
1900        // subscript / ANY/ALL operands.
1901        Expr::Array(items) => {
1902            for elem in items {
1903                collect_aggregates(elem, out);
1904            }
1905        }
1906        Expr::ArraySubscript { target, index } => {
1907            // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]`
1908            // collects as a first_ordered spec; the subscript is consumed
1909            // here (do NOT recurse into the array_agg, or it would also
1910            // register a plain full-array spec).
1911            if let Some((arg, order_by, filter)) = first_ordered_array_agg(e) {
1912                let spec = AggSpec {
1913                    name: "array_agg".to_string(),
1914                    arg: Some(arg.clone()),
1915                    arg2: None,
1916                    distinct: false,
1917                    order_by: order_by.to_vec(),
1918                    filter: filter.cloned(),
1919                    direct_arg: None,
1920                    first_ordered: true,
1921                };
1922                if !out.iter().any(|s| {
1923                    s.name == spec.name
1924                        && s.arg == spec.arg
1925                        && s.order_by == spec.order_by
1926                        && s.filter == spec.filter
1927                        && s.first_ordered
1928                }) {
1929                    out.push(spec);
1930                }
1931                return;
1932            }
1933            collect_aggregates(target, out);
1934            collect_aggregates(index, out);
1935        }
1936        Expr::AnyAll { expr, array, .. } => {
1937            collect_aggregates(expr, out);
1938            collect_aggregates(array, out);
1939        }
1940        Expr::Case {
1941            operand,
1942            branches,
1943            else_branch,
1944        } => {
1945            if let Some(o) = operand {
1946                collect_aggregates(o, out);
1947            }
1948            for (w, t) in branches {
1949                collect_aggregates(w, out);
1950                collect_aggregates(t, out);
1951            }
1952            if let Some(e) = else_branch {
1953                collect_aggregates(e, out);
1954            }
1955        }
1956    }
1957}
1958
1959fn update_state(
1960    st: &mut AggState,
1961    name: &str,
1962    v: &Value,
1963    arg2: Option<&Value>,
1964    order_keys: Option<Vec<Value>>,
1965) -> Result<(), EvalError> {
1966    let is_null = matches!(v, Value::Null);
1967    match name {
1968        "count_star" => st.count += 1,
1969        "count" => {
1970            if !is_null {
1971                st.count += 1;
1972            }
1973        }
1974        "sum" | "avg" => {
1975            if is_null {
1976                return Ok(());
1977            }
1978            st.count += 1;
1979            match v {
1980                Value::Int(n) => st.sum_int += i64::from(*n),
1981                Value::BigInt(n) => st.sum_int += *n,
1982                Value::Float(x) => {
1983                    st.use_float = true;
1984                    st.sum_float += *x;
1985                }
1986                other => {
1987                    return Err(EvalError::TypeMismatch {
1988                        detail: format!("sum/avg need numeric, got {:?}", other.data_type()),
1989                    });
1990                }
1991            }
1992        }
1993        "min" => {
1994            if is_null {
1995                return Ok(());
1996            }
1997            match &st.extreme {
1998                None => st.extreme = Some(v.clone()),
1999                Some(cur) => {
2000                    if value_cmp(v, cur) == core::cmp::Ordering::Less {
2001                        st.extreme = Some(v.clone());
2002                    }
2003                }
2004            }
2005        }
2006        "max" => {
2007            if is_null {
2008                return Ok(());
2009            }
2010            match &st.extreme {
2011                None => st.extreme = Some(v.clone()),
2012                Some(cur) => {
2013                    if value_cmp(v, cur) == core::cmp::Ordering::Greater {
2014                        st.extreme = Some(v.clone());
2015                    }
2016                }
2017            }
2018        }
2019        // v7.17.0 — string_agg(value, separator). NULL value is
2020        // skipped (PG aggregate-skip-null). Separator captured
2021        // from the latest row that flows through; matches PG's
2022        // semantics of evaluating the separator per row but using
2023        // the last value at finalize time (in practice it's
2024        // constant). count is bumped so we can distinguish "empty
2025        // group → NULL" from "all-NULL group → NULL".
2026        "string_agg" => {
2027            if let Some(sep) = arg2
2028                && let Value::Text(s) = sep
2029            {
2030                st.separator = Some(s.clone());
2031            }
2032            if is_null {
2033                return Ok(());
2034            }
2035            if let Value::Text(s) = v {
2036                st.items.push(Value::Text(s.clone()));
2037                if let Some(k) = order_keys {
2038                    st.item_keys.push(k);
2039                }
2040                st.count += 1;
2041            } else {
2042                return Err(EvalError::TypeMismatch {
2043                    detail: format!("string_agg requires text value, got {:?}", v.data_type()),
2044                });
2045            }
2046        }
2047        // v7.17.0 — array_agg(value). Unlike string_agg, NULL
2048        // elements are KEPT in the array (PG behaviour); the
2049        // result is NULL only when ZERO rows fed in. Element type
2050        // is locked from the first row's value type; subsequent
2051        // rows must match (PG also rejects mixed-type array_agg).
2052        "array_agg" => {
2053            st.items.push(v.clone());
2054            if let Some(k) = order_keys {
2055                st.item_keys.push(k);
2056            }
2057            st.count += 1;
2058        }
2059        // v7.17.0 — bool_and(p): TRUE iff every non-NULL input is
2060        // TRUE. NULL skipped; running accumulator stays at TRUE
2061        // until the first non-NULL FALSE.
2062        "bool_and" => {
2063            if is_null {
2064                return Ok(());
2065            }
2066            let b = match v {
2067                Value::Bool(b) => *b,
2068                other => {
2069                    return Err(EvalError::TypeMismatch {
2070                        detail: format!("bool_and requires bool, got {:?}", other.data_type()),
2071                    });
2072                }
2073            };
2074            st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc && b));
2075        }
2076        // v7.17.0 — bool_or(p): TRUE iff any non-NULL input is
2077        // TRUE. NULL skipped.
2078        "bool_or" => {
2079            if is_null {
2080                return Ok(());
2081            }
2082            let b = match v {
2083                Value::Bool(b) => *b,
2084                other => {
2085                    return Err(EvalError::TypeMismatch {
2086                        detail: format!("bool_or requires bool, got {:?}", other.data_type()),
2087                    });
2088                }
2089            };
2090            st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc || b));
2091        }
2092        // v7.32 (round-29) — variance / stddev family. Accumulate the
2093        // running sum (sum_float) and sum of squares (sum_sq) over the
2094        // non-NULL numeric inputs; finalize divides by n or n-1.
2095        "stddev" | "stddev_samp" | "stddev_pop" | "variance" | "var_samp" | "var_pop" => {
2096            if is_null {
2097                return Ok(());
2098            }
2099            let x = match v {
2100                Value::Int(n) => f64::from(*n),
2101                Value::SmallInt(n) => f64::from(*n),
2102                Value::BigInt(n) => *n as f64,
2103                Value::Float(x) => *x,
2104                other => {
2105                    return Err(EvalError::TypeMismatch {
2106                        detail: format!("{name} needs numeric, got {:?}", other.data_type()),
2107                    });
2108                }
2109            };
2110            st.count += 1;
2111            st.sum_float += x;
2112            st.sum_sq += x * x;
2113        }
2114        // v7.32 (round-29) — bitwise aggregates over integer inputs.
2115        "bit_and" | "bit_or" | "bit_xor" => {
2116            if is_null {
2117                return Ok(());
2118            }
2119            let n = match v {
2120                Value::Int(n) => i64::from(*n),
2121                Value::SmallInt(n) => i64::from(*n),
2122                Value::BigInt(n) => *n,
2123                other => {
2124                    return Err(EvalError::TypeMismatch {
2125                        detail: format!("{name} needs integer, got {:?}", other.data_type()),
2126                    });
2127                }
2128            };
2129            st.bit_acc = Some(match (st.bit_acc, name) {
2130                (None, _) => n,
2131                (Some(acc), "bit_and") => acc & n,
2132                (Some(acc), "bit_or") => acc | n,
2133                (Some(acc), _) => acc ^ n, // bit_xor
2134            });
2135        }
2136        // v7.32 (round-29) — WITHIN GROUP aggregates (ordered-set +
2137        // hypothetical-set) collect the sort value (NULLs ignored, per
2138        // PG) into `items`, sorted at finalize by the parallel
2139        // `item_keys`.
2140        n if is_within_group_name(n) => {
2141            if is_null {
2142                return Ok(());
2143            }
2144            st.items.push(v.clone());
2145            if let Some(k) = order_keys {
2146                st.item_keys.push(k);
2147            }
2148            st.count += 1;
2149        }
2150        // v7.32 (round-29) — regression family f(Y, X). Only rows with
2151        // BOTH inputs non-NULL contribute (PG semantics). `v` is Y,
2152        // `arg2` is X.
2153        n if is_regression_name(n) => {
2154            let (Some(y), Some(x)) = (agg_value_to_f64(v), arg2.and_then(agg_value_to_f64)) else {
2155                return Ok(()); // NULL (or non-numeric) in either input
2156            };
2157            st.reg_n += 1;
2158            st.reg_sx += x;
2159            st.reg_sy += y;
2160            st.reg_sxx += x * x;
2161            st.reg_syy += y * y;
2162            st.reg_sxy += x * y;
2163        }
2164        // v7.32 (round-29) — json_agg / jsonb_agg collect every input
2165        // (NULL becomes JSON null, per PG) in row order.
2166        "json_agg" | "jsonb_agg" => {
2167            st.items.push(v.clone());
2168            st.count += 1;
2169        }
2170        // v7.32 (round-29) — json_object_agg(key, value): keys in
2171        // `items`, values in `aux_items`. A NULL key is skipped (PG
2172        // raises; we drop it rather than abort the whole query).
2173        "json_object_agg" | "jsonb_object_agg" => {
2174            if is_null {
2175                return Ok(());
2176            }
2177            st.items.push(v.clone());
2178            st.aux_items.push(arg2.cloned().unwrap_or(Value::Null));
2179            st.count += 1;
2180        }
2181        _ => unreachable!("non-aggregate {name} in update_state"),
2182    }
2183    Ok(())
2184}
2185
2186#[allow(clippy::cast_precision_loss)]
2187fn finalize(name: &str, st: &AggState) -> Value {
2188    match name {
2189        "count" | "count_star" => Value::BigInt(st.count),
2190        "sum" => {
2191            if st.count == 0 {
2192                Value::Null
2193            } else if st.use_float {
2194                Value::Float(st.sum_float + (st.sum_int as f64))
2195            } else {
2196                Value::BigInt(st.sum_int)
2197            }
2198        }
2199        "avg" => {
2200            if st.count == 0 {
2201                Value::Null
2202            } else {
2203                let total = if st.use_float {
2204                    st.sum_float + (st.sum_int as f64)
2205                } else {
2206                    st.sum_int as f64
2207                };
2208                Value::Float(total / (st.count as f64))
2209            }
2210        }
2211        "min" | "max" => st.extreme.clone().unwrap_or(Value::Null),
2212        // v7.17.0 — string_agg: join all collected text items with
2213        // the captured separator. Empty / all-NULL group → NULL
2214        // (PG semantics).
2215        "string_agg" => {
2216            if st.items.is_empty() {
2217                return Value::Null;
2218            }
2219            let sep = st.separator.clone().unwrap_or_default();
2220            let mut out = String::new();
2221            for (i, item) in st.items.iter().enumerate() {
2222                if i > 0 {
2223                    out.push_str(&sep);
2224                }
2225                if let Value::Text(s) = item {
2226                    out.push_str(s);
2227                }
2228            }
2229            Value::Text(out)
2230        }
2231        // v7.17.0 — array_agg: collect into a typed array. NULL
2232        // elements are preserved per PG. Result type is decided
2233        // by the first non-NULL element seen (or Text fallback
2234        // when the whole group is NULL — PG would surface the
2235        // declared input type, but SPG hasn't yet wired the
2236        // aggregate's static input-type from `describe`).
2237        "array_agg" => {
2238            if st.items.is_empty() {
2239                return Value::Null;
2240            }
2241            let probe = st.items.iter().find(|v| !v.is_null());
2242            match probe.and_then(spg_storage::Value::data_type) {
2243                Some(DataType::Int) | Some(DataType::SmallInt) => {
2244                    let items: Vec<Option<i32>> = st
2245                        .items
2246                        .iter()
2247                        .map(|v| match v {
2248                            Value::Int(n) => Some(*n),
2249                            Value::SmallInt(n) => Some(i32::from(*n)),
2250                            _ => None,
2251                        })
2252                        .collect();
2253                    Value::IntArray(items)
2254                }
2255                Some(DataType::BigInt) => {
2256                    let items: Vec<Option<i64>> = st
2257                        .items
2258                        .iter()
2259                        .map(|v| match v {
2260                            Value::BigInt(n) => Some(*n),
2261                            _ => None,
2262                        })
2263                        .collect();
2264                    Value::BigIntArray(items)
2265                }
2266                _ => {
2267                    let items: Vec<Option<String>> = st
2268                        .items
2269                        .iter()
2270                        .map(|v| match v {
2271                            Value::Text(s) => Some(s.clone()),
2272                            Value::Null => None,
2273                            other => Some(format!("{other:?}")),
2274                        })
2275                        .collect();
2276                    Value::TextArray(items)
2277                }
2278            }
2279        }
2280        // v7.17.0 — bool_and / bool_or finalize: lazy-init pattern
2281        // means `None` is exactly "empty group or all-NULL", which
2282        // PG surfaces as SQL NULL.
2283        "bool_and" | "bool_or" => st.bool_acc.map_or(Value::Null, Value::Bool),
2284        // v7.32 (round-29) — variance / stddev. PG: `variance` ==
2285        // `var_samp`, `stddev` == `stddev_samp`. samp needs n >= 2
2286        // (n < 2 → NULL); pop needs n >= 1 (n == 1 → 0).
2287        "variance" | "var_samp" | "var_pop" | "stddev" | "stddev_samp" | "stddev_pop" => {
2288            let n = st.count;
2289            if n == 0 {
2290                return Value::Null;
2291            }
2292            let nf = n as f64;
2293            // Sum of squared deviations from the mean.
2294            let ss = st.sum_sq - (st.sum_float * st.sum_float) / nf;
2295            let pop = name.ends_with("_pop");
2296            let denom = if pop { nf } else { nf - 1.0 };
2297            if denom <= 0.0 {
2298                // var_samp / stddev (samp) with n == 1 → NULL.
2299                return Value::Null;
2300            }
2301            let var = (ss / denom).max(0.0); // clamp fp noise below 0
2302            if name.starts_with("stddev") {
2303                Value::Float(crate::eval::f64_sqrt(var))
2304            } else {
2305                Value::Float(var)
2306            }
2307        }
2308        // v7.32 (round-29) — bitwise aggregates: None (empty / all-NULL)
2309        // → SQL NULL.
2310        "bit_and" | "bit_or" | "bit_xor" => st.bit_acc.map_or(Value::Null, Value::BigInt),
2311        // v7.32 (round-29) — regression family. `regr_count` is the
2312        // paired n; everything else is NULL over an empty set. Terms
2313        // are the mean-centred sums of squares / cross-products.
2314        "regr_count" => Value::BigInt(st.reg_n),
2315        "covar_pop" | "covar_samp" | "corr" | "regr_avgx" | "regr_avgy" | "regr_slope"
2316        | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy" => {
2317            let n = st.reg_n;
2318            if n == 0 {
2319                return Value::Null;
2320            }
2321            let nf = n as f64;
2322            let sxx = st.reg_sxx - st.reg_sx * st.reg_sx / nf;
2323            let syy = st.reg_syy - st.reg_sy * st.reg_sy / nf;
2324            let sxy = st.reg_sxy - st.reg_sx * st.reg_sy / nf;
2325            let avgx = st.reg_sx / nf;
2326            let avgy = st.reg_sy / nf;
2327            let out = match name {
2328                "regr_avgx" => Some(avgx),
2329                "regr_avgy" => Some(avgy),
2330                "regr_sxx" => Some(sxx),
2331                "regr_syy" => Some(syy),
2332                "regr_sxy" => Some(sxy),
2333                "covar_pop" => Some(sxy / nf),
2334                "covar_samp" => (n >= 2).then(|| sxy / (nf - 1.0)),
2335                "regr_slope" => (sxx != 0.0).then(|| sxy / sxx),
2336                "regr_intercept" => (sxx != 0.0).then(|| avgy - (sxy / sxx) * avgx),
2337                "corr" => {
2338                    let d = sxx * syy;
2339                    (d > 0.0).then(|| sxy / crate::eval::f64_sqrt(d))
2340                }
2341                // PG: NULL when sxx==0; 1 when syy==0 (and sxx>0).
2342                "regr_r2" => {
2343                    if sxx == 0.0 {
2344                        None
2345                    } else if syy == 0.0 {
2346                        Some(1.0)
2347                    } else {
2348                        Some((sxy * sxy) / (sxx * syy))
2349                    }
2350                }
2351                _ => None,
2352            };
2353            out.map_or(Value::Null, Value::Float)
2354        }
2355        // v7.32 (round-29) — json_agg / jsonb_agg: a JSON array of every
2356        // collected element in row order; empty set → SQL NULL.
2357        "json_agg" | "jsonb_agg" => {
2358            if st.items.is_empty() {
2359                return Value::Null;
2360            }
2361            let mut out = String::from("[");
2362            for (i, item) in st.items.iter().enumerate() {
2363                if i > 0 {
2364                    out.push_str(", ");
2365                }
2366                out.push_str(&crate::json::value_to_json_text(item));
2367            }
2368            out.push(']');
2369            Value::Json(out)
2370        }
2371        // v7.32 (round-29) — json_object_agg: a JSON object built from
2372        // the parallel key (`items`) / value (`aux_items`) streams.
2373        "json_object_agg" | "jsonb_object_agg" => {
2374            if st.items.is_empty() {
2375                return Value::Null;
2376            }
2377            let mut out = String::from("{");
2378            for (i, key) in st.items.iter().enumerate() {
2379                if i > 0 {
2380                    out.push_str(", ");
2381                }
2382                // Object keys are always JSON strings (PG coerces).
2383                let key_text = match key {
2384                    Value::Text(s) | Value::Json(s) => s.clone(),
2385                    other => crate::json::value_to_json_text(other),
2386                };
2387                out.push_str(&crate::json::value_to_json_text(&Value::Text(key_text)));
2388                out.push_str(": ");
2389                let val = st.aux_items.get(i).unwrap_or(&Value::Null);
2390                out.push_str(&crate::json::value_to_json_text(val));
2391            }
2392            out.push('}');
2393            Value::Json(out)
2394        }
2395        // Ordered-set aggregates are finalized in `run` (they need the
2396        // sorted items + the direct fraction argument), never here.
2397        _ => unreachable!(),
2398    }
2399}
2400
2401/// v7.32 (round-29) — numeric coercion for the percentile interpolation.
2402fn agg_value_to_f64(v: &Value) -> Option<f64> {
2403    match v {
2404        Value::Int(n) => Some(f64::from(*n)),
2405        Value::SmallInt(n) => Some(f64::from(*n)),
2406        Value::BigInt(n) => Some(*n as f64),
2407        Value::Float(x) => Some(*x),
2408        _ => None,
2409    }
2410}
2411
2412/// v7.32 (round-29) — finalize a WITHIN GROUP aggregate. `st.items` is
2413/// already sorted by the `WITHIN GROUP (ORDER BY …)` spec. `direct` is
2414/// the evaluated direct argument: the fraction for `percentile_*`, the
2415/// hypothetical value for the hypothetical-set family (`rank` etc.),
2416/// and unused by `mode`. `order` is the (single) sort key, needed by
2417/// the hypothetical-set family to compare in the sort direction.
2418#[allow(
2419    clippy::cast_precision_loss,
2420    clippy::cast_possible_truncation,
2421    clippy::cast_sign_loss
2422)]
2423fn finalize_ordered_set(
2424    name: &str,
2425    st: &AggState,
2426    direct: Option<&Value>,
2427    order: Option<&spg_sql::ast::OrderBy>,
2428) -> Value {
2429    let fraction = direct;
2430    let items = &st.items;
2431    if items.is_empty() {
2432        // A hypothetical row ranks first over an empty group; the
2433        // distribution functions are 0 / divide-by-(n+1).
2434        return match name {
2435            "rank" | "dense_rank" => Value::BigInt(1),
2436            "percent_rank" => Value::Float(0.0),
2437            "cume_dist" => Value::Float(1.0),
2438            _ => Value::Null,
2439        };
2440    }
2441    let n = items.len();
2442    match name {
2443        // v7.32 (round-29) — hypothetical-set: the rank the direct value
2444        // would have if inserted into the group, in the sort direction.
2445        "rank" | "dense_rank" | "percent_rank" | "cume_dist" => {
2446            let Some(h) = fraction else {
2447                return Value::Null;
2448            };
2449            let (desc, nulls_first) = order.map_or((false, None), |o| (o.desc, o.nulls_first));
2450            let mut before = 0usize; // sort strictly before h
2451            let mut before_or_eq = 0usize; // sort before-or-peer with h
2452            let mut distinct_before = 0usize;
2453            let mut last_before: Option<&Value> = None;
2454            for it in items {
2455                match crate::order_by_value_cmp(desc, nulls_first, it, h) {
2456                    core::cmp::Ordering::Less => {
2457                        before += 1;
2458                        before_or_eq += 1;
2459                        if last_before
2460                            .is_none_or(|p| value_cmp(p, it) != core::cmp::Ordering::Equal)
2461                        {
2462                            distinct_before += 1;
2463                            last_before = Some(it);
2464                        }
2465                    }
2466                    core::cmp::Ordering::Equal => before_or_eq += 1,
2467                    core::cmp::Ordering::Greater => {}
2468                }
2469            }
2470            let nn = n as f64;
2471            match name {
2472                "rank" => Value::BigInt((before + 1) as i64),
2473                "dense_rank" => Value::BigInt((distinct_before + 1) as i64),
2474                "percent_rank" => Value::Float(before as f64 / nn),
2475                "cume_dist" => Value::Float((before_or_eq as f64 + 1.0) / (nn + 1.0)),
2476                _ => unreachable!(),
2477            }
2478        }
2479        // Most frequent value; equal values are adjacent in the sorted
2480        // run, and a frequency tie resolves to the earliest run (the
2481        // smallest value under an ascending sort), matching PG.
2482        "mode" => {
2483            let (mut best_i, mut best_cnt) = (0usize, 1usize);
2484            let (mut run_i, mut run_cnt) = (0usize, 1usize);
2485            for i in 1..n {
2486                if value_cmp(&items[i], &items[run_i]) == core::cmp::Ordering::Equal {
2487                    run_cnt += 1;
2488                } else {
2489                    run_i = i;
2490                    run_cnt = 1;
2491                }
2492                if run_cnt > best_cnt {
2493                    best_cnt = run_cnt;
2494                    best_i = run_i;
2495                }
2496            }
2497            items[best_i].clone()
2498        }
2499        // The first value whose cumulative fraction reaches `f`.
2500        "percentile_disc" => {
2501            let f = fraction
2502                .and_then(agg_value_to_f64)
2503                .unwrap_or(0.0)
2504                .clamp(0.0, 1.0);
2505            let idx = if f <= 0.0 {
2506                0
2507            } else {
2508                (crate::eval::f64_ceil(f * n as f64) as usize)
2509                    .saturating_sub(1)
2510                    .min(n - 1)
2511            };
2512            items[idx].clone()
2513        }
2514        // Linear interpolation between the two bracketing values.
2515        "percentile_cont" => {
2516            let f = fraction
2517                .and_then(agg_value_to_f64)
2518                .unwrap_or(0.0)
2519                .clamp(0.0, 1.0);
2520            let Some(nums) = items
2521                .iter()
2522                .map(agg_value_to_f64)
2523                .collect::<Option<Vec<f64>>>()
2524            else {
2525                return Value::Null; // non-numeric ordered set
2526            };
2527            if n == 1 {
2528                return Value::Float(nums[0]);
2529            }
2530            let rank = f * (n as f64 - 1.0);
2531            let lo = crate::eval::f64_floor(rank) as usize;
2532            let hi = crate::eval::f64_ceil(rank) as usize;
2533            let frac = rank - lo as f64;
2534            Value::Float(nums[lo] + (nums[hi] - nums[lo]) * frac)
2535        }
2536        _ => unreachable!(),
2537    }
2538}
2539
2540fn infer_agg_type(spec: &AggSpec, schema_cols: &[ColumnSchema]) -> DataType {
2541    // v7.26 (round-20 C) — the argument's statically-derived shape
2542    // types MIN/MAX/SUM/array_agg properly; RowDescription used to
2543    // report TEXT for these, breaking every sqlx typed decode.
2544    let arg_ty = spec
2545        .arg
2546        .as_ref()
2547        .and_then(|a| crate::describe::describe_expr(a, schema_cols))
2548        .map(|shape| shape.ty);
2549    // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]` yields the
2550    // ELEMENT type (x), not the array type.
2551    if spec.first_ordered {
2552        return arg_ty.unwrap_or(DataType::Text);
2553    }
2554    match spec.name.as_str() {
2555        "count" | "count_star" => DataType::BigInt,
2556        "sum" => match arg_ty {
2557            Some(DataType::Float) => DataType::Float,
2558            _ => DataType::BigInt,
2559        },
2560        "avg" => DataType::Float,
2561        // v7.17.0 — string_agg always returns TEXT.
2562        "string_agg" => DataType::Text,
2563        "array_agg" => match arg_ty {
2564            Some(DataType::Int | DataType::SmallInt) => DataType::IntArray,
2565            Some(DataType::BigInt) => DataType::BigIntArray,
2566            _ => DataType::TextArray,
2567        },
2568        // v7.17.0 — boolean aggregates always return BOOL (nullable
2569        // — empty / all-NULL group → NULL).
2570        "bool_and" | "bool_or" => DataType::Bool,
2571        // v7.32 (round-29) — variance / stddev are floating point;
2572        // percentile_cont interpolates to float; the regression family
2573        // (except regr_count) is floating point.
2574        "stddev" | "stddev_samp" | "stddev_pop" | "variance" | "var_samp" | "var_pop"
2575        | "percentile_cont" | "covar_pop" | "covar_samp" | "corr" | "regr_avgx" | "regr_avgy"
2576        | "regr_slope" | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy" => {
2577            DataType::Float
2578        }
2579        // v7.32 (round-29) — bitwise aggregates, regr_count, and the
2580        // integer hypothetical-set ranks return an integer.
2581        "bit_and" | "bit_or" | "bit_xor" | "regr_count" | "rank" | "dense_rank" => DataType::BigInt,
2582        // v7.32 (round-29) — hypothetical-set distribution functions.
2583        "percent_rank" | "cume_dist" => DataType::Float,
2584        // v7.32 (round-29) — JSON aggregates return JSON.
2585        "json_agg" | "jsonb_agg" | "json_object_agg" | "jsonb_object_agg" => DataType::Json,
2586        // min/max, percentile_disc, mode, and anything pass-through:
2587        // the argument's shape (for ordered-set aggs `spec.arg` is the
2588        // WITHIN GROUP value expression).
2589        _ => arg_ty.unwrap_or(DataType::Text),
2590    }
2591}
2592
2593fn agg_or_group_type(e: &Expr, synth: &[ColumnSchema]) -> DataType {
2594    if let Expr::Column(c) = e
2595        && let Some(s) = synth.iter().find(|s| s.name == c.name)
2596    {
2597        return s.ty;
2598    }
2599    // v7.26 (round-20 C) — compound expressions over aggregates
2600    // (COALESCE(BOOL_OR(…), false), (array_agg(…))[1], CASE …)
2601    // derive their shape statically against the synth schema; the
2602    // old Text fallback broke sqlx typed decodes of exactly these
2603    // columns.
2604    crate::describe::describe_expr(e, synth)
2605        .map(|shape| shape.ty)
2606        .unwrap_or(DataType::Text)
2607}
2608
2609fn rewrite_expr(e: &Expr, group_exprs: &[Expr], aggs: &[AggSpec]) -> Expr {
2610    // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]` rewrites
2611    // to its first_ordered synth column, consuming the subscript. Checked
2612    // before the AggregateOrdered/recursion arms (which would otherwise
2613    // rewrite the inner array_agg and leave the subscript). Same matcher
2614    // as collect_aggregates, so the spec it finds is the one collected.
2615    if let Some((arg, order_by, filter)) = first_ordered_array_agg(e) {
2616        let arg_owned = Some(arg.clone());
2617        let filter_owned = filter.cloned();
2618        for (i, spec) in aggs.iter().enumerate() {
2619            if spec.first_ordered
2620                && spec.name == "array_agg"
2621                && spec.arg == arg_owned
2622                && spec.order_by == *order_by
2623                && spec.filter == filter_owned
2624            {
2625                return Expr::Column(spg_sql::ast::ColumnName {
2626                    qualifier: None,
2627                    name: format!("__agg_{i}"),
2628                });
2629            }
2630        }
2631    }
2632    // v7.24 (round-16 A) — ordered aggregate: match on the inner
2633    // call PLUS the ordering keys.
2634    if let Expr::AggregateOrdered {
2635        call,
2636        order_by,
2637        distinct,
2638        filter,
2639    } = e
2640        && let Expr::FunctionCall { name, args } = call.as_ref()
2641    {
2642        let lower = name.to_ascii_lowercase();
2643        if is_aggregate_name(&lower) {
2644            let canonical: &str = if lower == "every" { "bool_and" } else { &lower };
2645            // Mirror collect_aggregates: ordered-set aggregates take the
2646            // value from the sort spec and the in-parens arg as direct.
2647            let (arg, direct_arg) = if is_within_group_name(canonical) {
2648                (
2649                    order_by.first().map(|o| o.expr.clone()),
2650                    args.first().cloned(),
2651                )
2652            } else {
2653                (args.first().cloned(), None)
2654            };
2655            let arg2 = if agg_uses_second_arg(canonical) {
2656                args.get(1).cloned()
2657            } else {
2658                None
2659            };
2660            let filter_owned = filter.as_deref().cloned();
2661            for (i, spec) in aggs.iter().enumerate() {
2662                if spec.name == canonical
2663                    && spec.arg == arg
2664                    && spec.arg2 == arg2
2665                    && spec.distinct == *distinct
2666                    && spec.order_by == *order_by
2667                    && spec.filter == filter_owned
2668                    && spec.direct_arg == direct_arg
2669                {
2670                    return Expr::Column(spg_sql::ast::ColumnName {
2671                        qualifier: None,
2672                        name: format!("__agg_{i}"),
2673                    });
2674                }
2675            }
2676        }
2677    }
2678    // Match aggregate FunctionCalls first — they sit outside group_by.
2679    if let Expr::FunctionCall { name, args } = e {
2680        let lower = name.to_ascii_lowercase();
2681        if is_aggregate_name(&lower) {
2682            let arg = if lower == "count_star" {
2683                None
2684            } else {
2685                args.first().cloned()
2686            };
2687            // v7.17.0 — match the spec we registered for
2688            // string_agg(value, separator) on the full pair; v7.32 also
2689            // the regression family and json_object_agg.
2690            let arg2 = if agg_uses_second_arg(&lower) {
2691                args.get(1).cloned()
2692            } else {
2693                None
2694            };
2695            // v7.17.0 — `every` collapses into `bool_and` at
2696            // collection; mirror that here so the rewrite finds
2697            // the matching synth column.
2698            let canonical: &str = if lower == "every" {
2699                "bool_and"
2700            } else {
2701                lower.as_str()
2702            };
2703            for (i, spec) in aggs.iter().enumerate() {
2704                if spec.name == canonical
2705                    && spec.arg == arg
2706                    && spec.arg2 == arg2
2707                    && !spec.distinct
2708                    && spec.order_by.is_empty()
2709                {
2710                    return Expr::Column(spg_sql::ast::ColumnName {
2711                        qualifier: None,
2712                        name: format!("__agg_{i}"),
2713                    });
2714                }
2715            }
2716        }
2717    }
2718    // Match a group_by expression by AST equality.
2719    for (i, g) in group_exprs.iter().enumerate() {
2720        if g == e {
2721            return Expr::Column(spg_sql::ast::ColumnName {
2722                qualifier: None,
2723                name: format!("__grp_{i}"),
2724            });
2725        }
2726    }
2727    // Recurse into children.
2728    match e {
2729        Expr::AggregateOrdered {
2730            call,
2731            order_by,
2732            distinct,
2733            filter,
2734        } => Expr::AggregateOrdered {
2735            call: Box::new(rewrite_expr(call, group_exprs, aggs)),
2736            distinct: *distinct,
2737            order_by: order_by
2738                .iter()
2739                .map(|o| spg_sql::ast::OrderBy {
2740                    expr: rewrite_expr(&o.expr, group_exprs, aggs),
2741                    desc: o.desc,
2742                    nulls_first: o.nulls_first,
2743                })
2744                .collect(),
2745            // The filter is evaluated against SOURCE rows during
2746            // accumulation, never against synth rows — keep it as-is.
2747            filter: filter.clone(),
2748        },
2749        Expr::Binary { lhs, op, rhs } => Expr::Binary {
2750            lhs: Box::new(rewrite_expr(lhs, group_exprs, aggs)),
2751            op: *op,
2752            rhs: Box::new(rewrite_expr(rhs, group_exprs, aggs)),
2753        },
2754        Expr::Unary { op, expr } => Expr::Unary {
2755            op: *op,
2756            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2757        },
2758        Expr::Cast { expr, target } => Expr::Cast {
2759            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2760            target: *target,
2761        },
2762        Expr::IsNull { expr, negated } => Expr::IsNull {
2763            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2764            negated: *negated,
2765        },
2766        Expr::FunctionCall { name, args } => Expr::FunctionCall {
2767            name: name.clone(),
2768            args: args
2769                .iter()
2770                .map(|a| rewrite_expr(a, group_exprs, aggs))
2771                .collect(),
2772        },
2773        Expr::Like {
2774            expr,
2775            pattern,
2776            negated,
2777            case_insensitive,
2778        } => Expr::Like {
2779            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2780            pattern: Box::new(rewrite_expr(pattern, group_exprs, aggs)),
2781            negated: *negated,
2782            case_insensitive: *case_insensitive,
2783        },
2784        Expr::Extract { field, source } => Expr::Extract {
2785            field: *field,
2786            source: Box::new(rewrite_expr(source, group_exprs, aggs)),
2787        },
2788        // v7.25.2 (round-19 A) — subquery nodes: rewrite group-key
2789        // references INSIDE the body to `__grp_N` so the correlated
2790        // resolver can substitute them against the synthesised group
2791        // row (aggs are NOT matched inside the body — a COUNT in the
2792        // subquery is the subquery's own aggregate).
2793        Expr::ScalarSubquery(s) => {
2794            Expr::ScalarSubquery(Box::new(rewrite_group_keys_in_select(s, group_exprs)))
2795        }
2796        Expr::Exists { subquery, negated } => Expr::Exists {
2797            subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
2798            negated: *negated,
2799        },
2800        Expr::InSubquery {
2801            expr,
2802            subquery,
2803            negated,
2804        } => Expr::InSubquery {
2805            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2806            subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
2807            negated: *negated,
2808        },
2809        // v4.12 window / Literal / Column — clone-pass (these don't
2810        // participate in aggregate rewrite).
2811        Expr::WindowFunction { .. } | Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => {
2812            e.clone()
2813        }
2814        // v7.10.10 — recurse children for array nodes.
2815        Expr::Array(items) => Expr::Array(
2816            items
2817                .iter()
2818                .map(|elem| rewrite_expr(elem, group_exprs, aggs))
2819                .collect(),
2820        ),
2821        Expr::ArraySubscript { target, index } => Expr::ArraySubscript {
2822            target: Box::new(rewrite_expr(target, group_exprs, aggs)),
2823            index: Box::new(rewrite_expr(index, group_exprs, aggs)),
2824        },
2825        Expr::AnyAll {
2826            expr,
2827            op,
2828            array,
2829            is_any,
2830        } => Expr::AnyAll {
2831            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2832            op: *op,
2833            array: Box::new(rewrite_expr(array, group_exprs, aggs)),
2834            is_any: *is_any,
2835        },
2836        Expr::InList {
2837            expr,
2838            list,
2839            negated,
2840        } => Expr::InList {
2841            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2842            list: list
2843                .iter()
2844                .map(|item| rewrite_expr(item, group_exprs, aggs))
2845                .collect(),
2846            negated: *negated,
2847        },
2848        Expr::Case {
2849            operand,
2850            branches,
2851            else_branch,
2852        } => Expr::Case {
2853            operand: operand
2854                .as_deref()
2855                .map(|o| Box::new(rewrite_expr(o, group_exprs, aggs))),
2856            branches: branches
2857                .iter()
2858                .map(|(w, t)| {
2859                    (
2860                        rewrite_expr(w, group_exprs, aggs),
2861                        rewrite_expr(t, group_exprs, aggs),
2862                    )
2863                })
2864                .collect(),
2865            else_branch: else_branch
2866                .as_deref()
2867                .map(|e| Box::new(rewrite_expr(e, group_exprs, aggs))),
2868        },
2869    }
2870}
2871
2872/// v7.25.2 (round-19 A) — rewrite group-key references inside a
2873/// subquery body to `__grp_N` synthetic columns (aggregates are
2874/// not touched: empty spec list). Runs through the canonical
2875/// Select walker so every expression slot is covered.
2876fn rewrite_group_keys_in_select(
2877    s: &spg_sql::ast::SelectStatement,
2878    group_exprs: &[Expr],
2879) -> spg_sql::ast::SelectStatement {
2880    let mut out = s.clone();
2881    let _ = crate::walk_select_exprs_mut(&mut out, &mut |e| {
2882        *e = rewrite_expr(e, group_exprs, &[]);
2883        Ok(())
2884    });
2885    out
2886}
2887
2888/// Canonical string key for a tuple of group values. Used as map key.
2889/// Per-value group-key encoding (shared by owned and borrowed paths).
2890fn encode_one(out: &mut String, v: &Value) {
2891    use core::fmt::Write;
2892    match v {
2893        Value::Null => out.push_str("N|"),
2894        // v7.36 (perf — mailrs Phase 1) — switch the integer / float
2895        // encoders to `write!`. `n.to_string()` allocates a fresh
2896        // `String` per cell just to push its bytes into the
2897        // (already-cleared) reuse buffer — for the 25 k-row JOIN
2898        // probe in `count_messages` that's 25 k heap allocs per
2899        // query. `write!(&mut String, ...)` formats straight into
2900        // the buffer; no intermediate alloc.
2901        Value::SmallInt(n) => {
2902            let _ = write!(out, "s{n}|");
2903        }
2904        Value::Int(n) => {
2905            let _ = write!(out, "I{n}|");
2906        }
2907        Value::BigInt(n) => {
2908            let _ = write!(out, "B{n}|");
2909        }
2910        Value::Float(x) => {
2911            let _ = write!(out, "F{x}|");
2912        }
2913        Value::Bool(b) => {
2914            out.push(if *b { 'T' } else { 'f' });
2915            out.push('|');
2916        }
2917        Value::Text(s) => {
2918            out.push('S');
2919            out.push_str(s);
2920            out.push('|');
2921        }
2922        Value::Vector(v) => {
2923            out.push('V');
2924            for x in v {
2925                out.push_str(&x.to_string());
2926                out.push(',');
2927            }
2928            out.push('|');
2929        }
2930        // v6.0.1: GROUP BY on a `VECTOR(N) USING SQ8` column.
2931        // Two cells with byte-identical `(min, max, bytes)`
2932        // share the same group; equivalence is byte-equality
2933        // (same as f32 grouping today — neither path tries to
2934        // normalise nan/-0).
2935        Value::Sq8Vector(q) => {
2936            out.push('Q');
2937            out.push_str(&q.min.to_string());
2938            out.push('@');
2939            out.push_str(&q.max.to_string());
2940            out.push(':');
2941            for b in &q.bytes {
2942                out.push_str(&b.to_string());
2943                out.push(',');
2944            }
2945            out.push('|');
2946        }
2947        // v6.0.3: GROUP BY on a `VECTOR(N) USING HALF` column.
2948        // Byte-equality over the raw u16 bits; matches the SQ8
2949        // path's byte-key model.
2950        Value::HalfVector(h) => {
2951            out.push('H');
2952            for b in &h.bytes {
2953                out.push_str(&b.to_string());
2954                out.push(',');
2955            }
2956            out.push('|');
2957        }
2958        Value::Numeric { scaled, scale } => {
2959            out.push('D');
2960            out.push_str(&scaled.to_string());
2961            out.push('@');
2962            out.push_str(&scale.to_string());
2963            out.push('|');
2964        }
2965        Value::Date(d) => {
2966            out.push('d');
2967            out.push_str(&d.to_string());
2968            out.push('|');
2969        }
2970        Value::Timestamp(t) => {
2971            out.push('t');
2972            out.push_str(&t.to_string());
2973            out.push('|');
2974        }
2975        Value::Interval { months, micros } => {
2976            out.push('i');
2977            out.push_str(&months.to_string());
2978            out.push('m');
2979            out.push_str(&micros.to_string());
2980            out.push('|');
2981        }
2982        Value::Json(s) => {
2983            out.push('j');
2984            out.push_str(s);
2985            out.push('|');
2986        }
2987        // v7.5.0 — Value is #[non_exhaustive] for downstream
2988        // forward-compat. Any future variant lacking explicit
2989        // handling here will share a debug-derived group key,
2990        // which is observably wrong but won't crash.
2991        _ => {
2992            out.push('?');
2993            out.push_str(&format!("{v:?}"));
2994            out.push('|');
2995        }
2996    }
2997}
2998
2999/// v7.30 (perf campaign) - encode from borrowed cells without
3000/// materialising an owned Vec<Value> first.
3001pub(crate) fn encode_key_refs(vals: &[&Value]) -> String {
3002    let mut out = String::new();
3003    for v in vals {
3004        encode_one(&mut out, v);
3005    }
3006    out
3007}
3008
3009/// v7.31 (perf 3e) — encode into a caller-owned scratch buffer.
3010/// The per-row key paths (group hash, DISTINCT set, join build/
3011/// probe) ran 24k+ String allocations per query through the
3012/// allocator just to LOOK UP a map; the scratch form allocates
3013/// only when a map actually has to take ownership (vacant insert).
3014pub(crate) fn encode_key_refs_into(vals: &[&Value], out: &mut String) {
3015    out.clear();
3016    for v in vals {
3017        encode_one(out, v);
3018    }
3019}
3020
3021pub(crate) fn encode_key(vals: &[Value]) -> String {
3022    let mut out = String::new();
3023    for v in vals {
3024        encode_one(&mut out, v);
3025    }
3026    out
3027}
3028
3029#[allow(clippy::cast_precision_loss)]
3030fn value_cmp(a: &Value, b: &Value) -> core::cmp::Ordering {
3031    use core::cmp::Ordering::Equal;
3032    match (a, b) {
3033        (Value::Null, Value::Null) => Equal,
3034        (Value::Null, _) => core::cmp::Ordering::Greater, // NULLs last
3035        (_, Value::Null) => core::cmp::Ordering::Less,
3036        (Value::Int(x), Value::Int(y)) => x.cmp(y),
3037        (Value::BigInt(x), Value::BigInt(y)) => x.cmp(y),
3038        (Value::Int(x), Value::BigInt(y)) => i64::from(*x).cmp(y),
3039        (Value::BigInt(x), Value::Int(y)) => x.cmp(&i64::from(*y)),
3040        (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(Equal),
3041        (Value::Int(x), Value::Float(y)) => f64::from(*x).partial_cmp(y).unwrap_or(Equal),
3042        (Value::Float(x), Value::Int(y)) => x.partial_cmp(&f64::from(*y)).unwrap_or(Equal),
3043        (Value::BigInt(x), Value::Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Equal),
3044        (Value::Float(x), Value::BigInt(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Equal),
3045        (Value::Text(x), Value::Text(y)) => x.cmp(y),
3046        (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
3047        _ => Equal,
3048    }
3049}