Skip to main content

spg_engine/
aggregate.rs

1//! Aggregate executor.
2//!
3//! Handles `SELECT … <aggs> … [GROUP BY …]` queries. The planning strategy
4//! is straightforward:
5//!
6//! 1. Walk the SELECT (and ORDER BY) expressions to find every aggregate
7//!    function call. Dedupe by AST equality and assign each `__agg_<i>`.
8//! 2. Same for every `GROUP BY` expression: assign `__grp_<j>`.
9//! 3. Stream the WHERE-filtered rows, group by the tuple of GROUP BY
10//!    values, and update per-group aggregate state.
11//! 4. Materialise a synthetic per-group row containing
12//!    `[__grp_0..__grp_K, __agg_0..__agg_N]` and rewrite the user's
13//!    SELECT / ORDER BY expressions to reference those synthetic columns
14//!    instead of the originals.
15//! 5. Evaluate the rewritten expressions against the synthetic schema and
16//!    emit results.
17//!
18//! v1.8 implements `count(*)`, `count(expr)`, `sum`, `min`, `max`, `avg`.
19//! NULL semantics follow PG: aggregates skip NULL inputs (except
20//! `count(*)`, which counts rows). `sum(int)` widens to `BigInt`;
21//! `avg(int|bigint)` returns `Float`.
22
23use alloc::borrow::Cow;
24use alloc::boxed::Box;
25use alloc::collections::BTreeSet;
26use alloc::format;
27use alloc::string::{String, ToString};
28use alloc::vec::Vec;
29
30use spg_sql::ast::{Expr, SelectItem, SelectStatement};
31use spg_storage::{ColumnSchema, DataType, Row, Value};
32
33use crate::eval::{self, EvalContext, EvalError};
34use crate::join::RowRef;
35
36/// True if this statement should go through the aggregate path.
37pub fn uses_aggregate(stmt: &SelectStatement) -> bool {
38    if stmt.group_by.is_some() || stmt.having.is_some() {
39        return true;
40    }
41    for item in &stmt.items {
42        if let SelectItem::Expr { expr, .. } = item
43            && contains_aggregate(expr)
44        {
45            return true;
46        }
47    }
48    for o in &stmt.order_by {
49        if contains_aggregate(&o.expr) {
50            return true;
51        }
52    }
53    if let Some(h) = &stmt.having
54        && contains_aggregate(h)
55    {
56        return true;
57    }
58    false
59}
60
61pub fn contains_aggregate(e: &Expr) -> bool {
62    match e {
63        Expr::FunctionCall { name, args } => {
64            is_aggregate_name(name) || args.iter().any(contains_aggregate)
65        }
66        Expr::AggregateOrdered { .. } => true,
67        Expr::Binary { lhs, rhs, .. } => contains_aggregate(lhs) || contains_aggregate(rhs),
68        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
69            contains_aggregate(expr)
70        }
71        Expr::Like { expr, pattern, .. } => contains_aggregate(expr) || contains_aggregate(pattern),
72        Expr::Extract { source, .. } => contains_aggregate(source),
73        // v4.10 subqueries + v4.12 window functions / Literal /
74        // Column — all non-aggregate leaves from the regular
75        // aggregate planner's POV. Window-bearing projections are
76        // routed to exec_select_with_window before this runs.
77        Expr::ScalarSubquery(_)
78        | Expr::Exists { .. }
79        | Expr::InSubquery { .. }
80        | Expr::WindowFunction { .. }
81        | Expr::Literal(_)
82        | Expr::Placeholder(_)
83        | Expr::Column(_) => false,
84        // v7.10.10 — recurse into array constructor / subscript /
85        // ANY/ALL children. Aggregates inside `ARRAY[SUM(x)]` are
86        // valid PG and must be detected here.
87        Expr::Array(items) => items.iter().any(contains_aggregate),
88        Expr::ArraySubscript { target, index } => {
89            contains_aggregate(target) || contains_aggregate(index)
90        }
91        Expr::AnyAll { expr, array, .. } => contains_aggregate(expr) || contains_aggregate(array),
92        Expr::InList { expr, list, .. } => {
93            contains_aggregate(expr) || list.iter().any(contains_aggregate)
94        }
95        // v7.13.0 — CASE WHEN … END. Recurse into operand,
96        // every (WHEN, THEN) pair, and the ELSE branch.
97        Expr::Case {
98            operand,
99            branches,
100            else_branch,
101        } => {
102            operand.as_deref().is_some_and(contains_aggregate)
103                || branches
104                    .iter()
105                    .any(|(w, t)| contains_aggregate(w) || contains_aggregate(t))
106                || else_branch.as_deref().is_some_and(contains_aggregate)
107        }
108    }
109}
110
111pub fn is_aggregate_name(name: &str) -> bool {
112    matches!(
113        name.to_ascii_lowercase().as_str(),
114        "count"
115            | "count_star"
116            | "sum"
117            | "min"
118            | "max"
119            | "avg"
120            // v7.17.0 — variadic / collection aggregates. ORM
121            // reports (Hibernate / Rails / Django) emit these in
122            // GROUP BY rollups; pre-7.17 SPG hit "unknown
123            // aggregate".
124            | "string_agg"
125            | "array_agg"
126            // v7.17.0 — boolean aggregates. `every` is SQL-standard
127            // alias for `bool_and`.
128            | "bool_and"
129            | "bool_or"
130            | "every"
131            // v7.32 (round-29) — statistical aggregates (every BI /
132            // dashboard emits these in rollups).
133            | "stddev" | "stddev_samp" | "stddev_pop"
134            | "variance" | "var_samp" | "var_pop"
135            // v7.32 (round-29) — bitwise aggregates.
136            | "bit_and" | "bit_or" | "bit_xor"
137            // v7.32 (round-29) — ordered-set aggregates (used with
138            // `WITHIN GROUP (ORDER BY …)`).
139            | "percentile_cont" | "percentile_disc" | "mode"
140            // v7.32 (round-29) — hypothetical-set aggregates (also
141            // `WITHIN GROUP`): the rank the direct args WOULD have.
142            | "rank" | "dense_rank" | "percent_rank" | "cume_dist"
143            // v7.32 (round-29) — two-argument regression family.
144            | "covar_pop" | "covar_samp" | "corr"
145            | "regr_count" | "regr_avgx" | "regr_avgy" | "regr_slope"
146            | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy"
147            // v7.32 (round-29) — JSON aggregates.
148            | "json_agg" | "jsonb_agg" | "json_object_agg" | "jsonb_object_agg"
149    )
150}
151
152/// v7.32 (round-29) — two-argument regression aggregates `f(Y, X)`.
153fn is_regression_name(name: &str) -> bool {
154    matches!(
155        name,
156        "covar_pop"
157            | "covar_samp"
158            | "corr"
159            | "regr_count"
160            | "regr_avgx"
161            | "regr_avgy"
162            | "regr_slope"
163            | "regr_intercept"
164            | "regr_r2"
165            | "regr_sxx"
166            | "regr_syy"
167            | "regr_sxy"
168    )
169}
170
171/// v7.32 (round-29) — aggregates that consume a second positional
172/// argument: `string_agg(v, sep)`, the regression family `f(Y, X)`, and
173/// `json_object_agg(key, value)`.
174fn agg_uses_second_arg(name: &str) -> bool {
175    name == "string_agg"
176        || name == "json_object_agg"
177        || name == "jsonb_object_agg"
178        || is_regression_name(name)
179}
180
181/// v7.32 (round-29) — ordered-set aggregates: the value to aggregate
182/// comes from the `WITHIN GROUP (ORDER BY …)` sort spec, and any
183/// in-parens arguments are *direct* arguments (the percentile fraction).
184/// `mode()` takes no direct argument.
185pub fn is_ordered_set_name(name: &str) -> bool {
186    // v7.32 — `eq_ignore_ascii_case` instead of `to_ascii_lowercase()`:
187    // these classifiers run in the aggregate row/group loop, where the
188    // old per-call `String` allocation showed up as ~16% of the inbox's
189    // aggregate path in a sampled profile (the names are constant).
190    ["percentile_cont", "percentile_disc", "mode"]
191        .iter()
192        .any(|k| name.eq_ignore_ascii_case(k))
193}
194
195/// v7.32 (round-29) — hypothetical-set aggregates: `rank(args) WITHIN
196/// GROUP (ORDER BY …)` and friends compute the rank the hypothetical
197/// row would have. Like ordered-set, the value stream comes from the
198/// sort spec and the in-parens args are direct (the hypothetical row).
199pub fn is_hypothetical_set_name(name: &str) -> bool {
200    ["rank", "dense_rank", "percent_rank", "cume_dist"]
201        .iter()
202        .any(|k| name.eq_ignore_ascii_case(k))
203}
204
205/// v7.32 (round-29) — every aggregate that takes its value stream from
206/// a `WITHIN GROUP (ORDER BY …)` clause (ordered-set + hypothetical-set).
207pub fn is_within_group_name(name: &str) -> bool {
208    is_ordered_set_name(name) || is_hypothetical_set_name(name)
209}
210
211/// Per-aggregate running state.
212#[derive(Debug, Default, Clone)]
213struct AggState {
214    count: i64,
215    sum_int: i64,
216    sum_float: f64,
217    extreme: Option<Value>,
218    use_float: bool,
219    /// v7.17.0 — running collection for string_agg / array_agg.
220    /// Each entry is one row's contribution (NULL preserved as
221    /// `Value::Null`; string_agg's finalize step drops them, but
222    /// array_agg keeps them). Pushing in insertion order matches
223    /// PG behaviour when no `ORDER BY` is given inside the
224    /// aggregate call.
225    items: Vec<Value>,
226    /// v7.25 (round-17) — per-group dedupe set for DISTINCT
227    /// aggregates (encoded values; NULLs never reach it because
228    /// the caller's skip runs after the per-aggregate NULL rules).
229    seen: BTreeSet<String>,
230    /// v7.24 (round-16 A) — per-item ORDER BY key tuples, parallel
231    /// to `items` (pushed under the same skip/keep conditions).
232    /// Empty when the aggregate carries no internal ordering.
233    item_keys: Vec<Vec<Value>>,
234    /// v7.17.0 — captured separator for string_agg. PG accepts a
235    /// non-constant separator expression but in practice every
236    /// caller passes a literal; the engine snapshots the last
237    /// non-NULL text it sees, which matches PG's "use the latest
238    /// row's value" behaviour.
239    separator: Option<String>,
240    /// v7.17.0 — running boolean accumulator for bool_and /
241    /// bool_or / every. `None` until the first non-NULL input;
242    /// at finalize None → SQL NULL.
243    bool_acc: Option<bool>,
244    /// v7.32 (round-29) — sum of squares for the variance / stddev
245    /// family (`sum_float` carries the running sum; `count` the n).
246    sum_sq: f64,
247    /// v7.32 (round-29) — running accumulator for bit_and / bit_or /
248    /// bit_xor. `None` until the first non-NULL input → SQL NULL.
249    bit_acc: Option<i64>,
250    /// v7.32 (round-29) — two-argument regression family
251    /// (`covar_*` / `corr` / `regr_*`), PG arg order `f(Y, X)`. Only
252    /// rows where BOTH inputs are non-NULL contribute (`count` is the
253    /// paired n, independent of the single-arg `sum_*`).
254    reg_n: i64,
255    reg_sx: f64,
256    reg_sy: f64,
257    reg_sxx: f64,
258    reg_syy: f64,
259    reg_sxy: f64,
260    /// v7.32 (round-29) — second value stream for `json_object_agg`
261    /// (`items` holds the keys, `aux_items` the values).
262    aux_items: Vec<Value>,
263    /// v7.33 (array_agg argmax) — for a `first_ordered` spec
264    /// (`(array_agg(x ORDER BY y))[1]`), the running first-by-order
265    /// (sort-key tuple, value). Replaced only when a new row's key sorts
266    /// strictly before the current best (ties keep the earliest row, =
267    /// the stable-sort `[1]`). No items/item_keys array is built.
268    first_best: Option<(Vec<Value>, Value)>,
269}
270
271#[derive(Debug, Clone)]
272struct AggSpec {
273    name: String, // lowercased
274    /// First argument (value expression) for every aggregate
275    /// except `count(*)`. `None` for `count_star`.
276    arg: Option<Expr>,
277    /// v7.17.0 — second argument. Only `string_agg(value, sep)`
278    /// uses it today. `None` for every other aggregate (or for
279    /// `array_agg`, which is single-arg). Carried in the spec so
280    /// per-row evaluation can re-use the same separator
281    /// expression across calls.
282    arg2: Option<Expr>,
283    /// v7.25 (round-17) — `COUNT(DISTINCT x)` & friends: dedupe
284    /// the input stream per group before accumulation.
285    distinct: bool,
286    /// v7.24 (round-16 A) — aggregate-internal ORDER BY keys
287    /// (`array_agg(x ORDER BY y DESC NULLS LAST)`). Empty for the
288    /// plain form. Only the collection aggregates honour it;
289    /// other aggregates are order-insensitive and ignore it (PG
290    /// accepts the syntax everywhere too).
291    order_by: Vec<spg_sql::ast::OrderBy>,
292    /// v7.32 (round-29) — `FILTER (WHERE cond)`: a per-row predicate
293    /// evaluated against the source row before accumulation. A row
294    /// whose `cond` is not TRUE (false or NULL) is excluded from this
295    /// aggregate only. `None` for the unfiltered form.
296    filter: Option<Expr>,
297    /// v7.32 (round-29) — ordered-set aggregates only: the *direct*
298    /// argument (the percentile fraction for `percentile_cont/disc`).
299    /// PG requires it constant, so it is evaluated once. `None` for
300    /// `mode()` and for every non-ordered-set aggregate.
301    direct_arg: Option<Expr>,
302    /// v7.33 (array_agg argmax) — set when this spec came from
303    /// `(array_agg(x ORDER BY y))[1]`: accumulate only the first-by-order
304    /// element (a running argmax/argmin) and finalise to that scalar
305    /// value, instead of collecting + sorting + materialising the whole
306    /// per-group array just to take element 1. Returns the element type,
307    /// not the array type.
308    first_ordered: bool,
309}
310
311/// Output of running the aggregate path. Schema describes one row per
312/// group; rows are not yet ORDER BY-sorted (caller does it).
313#[derive(Debug)]
314pub struct AggResult {
315    pub columns: Vec<ColumnSchema>,
316    pub rows: Vec<Row>,
317    /// v7.31 (perf — PG lesson #1, post-LIMIT subquery projection):
318    /// select-list items whose rewritten expr carries a subquery and
319    /// is referenced by neither ORDER BY nor HAVING. Their output
320    /// cells hold NULL placeholders; the caller truncates to
321    /// LIMIT+OFFSET first and only then evaluates these for the
322    /// surviving rows (PG runs the same shape with SubPlan loops=50
323    /// instead of loops=24000). `(output_col, rewritten_expr)`.
324    pub deferred: Vec<(usize, Expr)>,
325    /// Synthetic group rows aligned 1:1 with `rows`; populated only
326    /// when `deferred` is non-empty.
327    pub synth_rows: Vec<Row>,
328    /// Schema the deferred exprs evaluate against.
329    pub synth_schema: Vec<ColumnSchema>,
330}
331
332/// Execute aggregate logic against an already-WHERE-filtered iterator of
333/// rows. `table_alias` is the alias accepted by column resolution.
334#[allow(clippy::too_many_lines)]
335/// v7.25.2 (round-19 A) — caller-injected evaluator for synth-row
336/// expressions that still carry subquery nodes after the rewrite
337/// (correlated subqueries in the select list / HAVING / aggregate
338/// ORDER BY of a GROUP BY query). The engine passes its
339/// correlated-aware evaluator; pure-library callers pass None and
340/// surviving subqueries keep erroring loudly.
341pub type CorrelatedEval<'a> = &'a dyn Fn(&Expr, &Row, &EvalContext<'_>) -> Result<Value, EvalError>;
342
343/// Output of the per-group projection stage (`project_groups`): the
344/// output schema, the projected rows, the synth rows kept alongside
345/// them for post-LIMIT deferred evaluation, the deferred subquery
346/// items, and the rewritten ORDER BY exprs (shared with the sort).
347struct Projection {
348    columns: Vec<ColumnSchema>,
349    out_rows: Vec<Row>,
350    kept_synth: Vec<Row>,
351    deferred: Vec<(usize, Expr)>,
352    order_rewritten: Vec<Expr>,
353}
354
355pub(crate) fn run(
356    stmt: &SelectStatement,
357    rows: &[RowRef<'_>],
358    schema_cols: &[ColumnSchema],
359    table_alias: Option<&str>,
360    correlated_eval: Option<CorrelatedEval<'_>>,
361) -> Result<AggResult, EvalError> {
362    let group_exprs: Vec<Expr> = stmt.group_by.clone().unwrap_or_default();
363
364    // Collect aggregate sub-expressions across items + order_by.
365    let mut agg_specs: Vec<AggSpec> = Vec::new();
366    for item in &stmt.items {
367        if let SelectItem::Expr { expr, .. } = item {
368            collect_aggregates(expr, &mut agg_specs);
369        }
370    }
371    for o in &stmt.order_by {
372        collect_aggregates(&o.expr, &mut agg_specs);
373    }
374    if let Some(h) = &stmt.having {
375        collect_aggregates(h, &mut agg_specs);
376    }
377    // v7.17.0 — arity validation. The collector tolerates an
378    // arbitrary positional-arg count; here we enforce the
379    // per-aggregate contract so a malformed call (e.g.
380    // `array_agg()` or `string_agg(x)`) surfaces as a SQL error
381    // rather than silently coercing to a degenerate aggregate.
382    validate_agg_arities(stmt, &agg_specs)?;
383    validate_within_group(&agg_specs)?;
384
385    // (1) Stream the WHERE-filtered rows into insertion-ordered group state.
386    let order = accumulate_groups(
387        rows,
388        &group_exprs,
389        &agg_specs,
390        schema_cols,
391        table_alias,
392        correlated_eval,
393    )?;
394
395    // (2) Build the synthetic per-group schema and finalise each group's row.
396    let synth_schema =
397        build_synth_schema(rows, &group_exprs, &agg_specs, schema_cols, table_alias)?;
398    let synth_rows = finalize_synth_rows(
399        &order,
400        &agg_specs,
401        &synth_schema,
402        rows,
403        schema_cols,
404        table_alias,
405    )?;
406
407    // (3) Rewrite the user's expressions, filter groups by HAVING and project.
408    let Projection {
409        columns,
410        mut out_rows,
411        mut kept_synth,
412        deferred,
413        order_rewritten,
414    } = project_groups(
415        synth_rows,
416        stmt,
417        &group_exprs,
418        &agg_specs,
419        &synth_schema,
420        correlated_eval,
421    )?;
422
423    // (4) ORDER BY on the aggregated output (the caller applies LIMIT).
424    if !stmt.order_by.is_empty() {
425        let (sorted_synth, sorted_out) = sort_synth_by_order_by(
426            &synth_schema,
427            &stmt.order_by,
428            &order_rewritten,
429            kept_synth,
430            out_rows,
431            correlated_eval,
432        )?;
433        kept_synth = sorted_synth;
434        out_rows = sorted_out;
435    }
436
437    let (synth_rows_out, synth_schema_out) = if deferred.is_empty() {
438        (Vec::new(), Vec::new())
439    } else {
440        (kept_synth, synth_schema.clone())
441    };
442    Ok(AggResult {
443        columns,
444        rows: out_rows,
445        deferred,
446        synth_rows: synth_rows_out,
447        synth_schema: synth_schema_out,
448    })
449}
450
451/// v7.32 (round-29) — validate the structural requirements of WITHIN
452/// GROUP (ordered-set / hypothetical-set) aggregates up front, so a
453/// malformed call surfaces as a SQL error rather than a silently
454/// degenerate aggregate.
455fn validate_within_group(agg_specs: &[AggSpec]) -> Result<(), EvalError> {
456    // v7.32 (round-29) — WITHIN GROUP aggregates require the clause (PG
457    // raises a hard error otherwise rather than silently degrading), and
458    // SPG supports the single-sort-key form only.
459    for spec in agg_specs {
460        if is_within_group_name(&spec.name) {
461            if spec.order_by.is_empty() {
462                return Err(EvalError::TypeMismatch {
463                    detail: format!("{}() requires WITHIN GROUP (ORDER BY …)", spec.name),
464                });
465            }
466            // mode() is the only WITHIN GROUP aggregate with no direct
467            // argument; the rest carry one (percentile fraction /
468            // hypothetical value).
469            if spec.name != "mode" && spec.direct_arg.is_none() {
470                return Err(EvalError::TypeMismatch {
471                    detail: format!("{}() requires a direct argument", spec.name),
472                });
473            }
474            // Multi-key WITHIN GROUP (multiple sort keys / hypothetical
475            // args) is not supported yet — error loudly instead of
476            // silently using only the first key.
477            if spec.order_by.len() > 1 {
478                return Err(EvalError::TypeMismatch {
479                    detail: format!(
480                        "{}() with multiple WITHIN GROUP sort keys is not supported yet",
481                        spec.name
482                    ),
483                });
484            }
485        }
486    }
487    Ok(())
488}
489
490/// (1) Stream the WHERE-filtered rows, group by the GROUP BY value
491/// tuple, and update per-group aggregate state. Returns the groups in
492/// insertion order. See `run` for the bind-once fast path rationale.
493#[allow(clippy::too_many_lines, clippy::type_complexity)]
494fn accumulate_groups(
495    rows: &[RowRef<'_>],
496    group_exprs: &[Expr],
497    agg_specs: &[AggSpec],
498    schema_cols: &[ColumnSchema],
499    table_alias: Option<&str>,
500    correlated_eval: Option<CorrelatedEval<'_>>,
501) -> Result<Vec<(Vec<Value>, Vec<AggState>)>, EvalError> {
502    let ctx = EvalContext::new(schema_cols, table_alias);
503    // Map group key (vec of values, encoded as canonical string) -> group state.
504    // v7.32 (architecture v2, P2b) — insertion-ordered group state in
505    // a Vec; the hash map only maps key → index. Removes the parallel
506    // `key_order: Vec<String>` (a second per-group key clone) and the
507    // per-group re-probe `groups[k]` at finalize (24k hash lookups for
508    // the inbox shape). The map owns its key once on vacant insert.
509    let mut order: Vec<(Vec<Value>, Vec<AggState>)> = Vec::new();
510    let mut groups: hashbrown::HashMap<String, usize> = hashbrown::HashMap::new();
511    // When there are no GROUP BY exprs *and* there is at least one aggregate,
512    // every row collapses into a single anonymous group keyed by "".
513    if rows.is_empty() && group_exprs.is_empty() {
514        // Single empty-aggregate group: count=0, sum=0, max=NULL, etc.
515        // No rows follow, so the map is never probed — seed `order` only.
516        let init: Vec<AggState> = (0..agg_specs.len()).map(|_| AggState::default()).collect();
517        order.push((Vec::new(), init));
518    }
519
520    // v7.30 (perf campaign) - hoist the per-row work that doesn't
521    // depend on the row: which group exprs need collation folding
522    // (none, for most queries - the old code cloned the whole
523    // group_vals vec per row just in case).
524    // v7.30 (perf campaign) - the no-tax row loop. When a group
525    // expr or an aggregate argument is a bare column reference
526    // (the overwhelmingly common shape), bind its position ONCE
527    // and read row cells by offset in the loop - no per-row tree
528    // walk, no owned-Value clone out of resolve_column. Anything
529    // more complex keeps the eval path.
530    let col_pos = |e: &Expr| -> Option<usize> {
531        // Qualified references only: the bare-name resolver carries
532        // alias/ambiguity logic the bind-once path must not fork.
533        if let Expr::Column(c) = e
534            && c.qualifier.is_some()
535        {
536            eval::find_column_pos(c, &ctx)
537        } else {
538            None
539        }
540    };
541    let group_pos: Vec<Option<usize>> = group_exprs.iter().map(col_pos).collect();
542    let all_groups_bound = group_pos.iter().all(Option::is_some);
543    let arg_pos: Vec<Option<usize>> = agg_specs
544        .iter()
545        .map(|spec| spec.arg.as_ref().and_then(|e| col_pos(e)))
546        .collect();
547    // v7.33 (array_agg perf) — bound positions for each spec's internal
548    // ORDER BY keys, so an ordered aggregate (`array_agg(x ORDER BY y)`)
549    // reads the sort key by reference (RowRef::get) instead of
550    // materialising the whole combined join row per input row just to
551    // eval one bound column. Mirrors arg_pos. On the inbox shape this
552    // turned 24k full-row (~1 KB each) clones into 24k single-cell reads.
553    let order_pos: Vec<Vec<Option<usize>>> = agg_specs
554        .iter()
555        .map(|spec| spec.order_by.iter().map(|o| col_pos(&o.expr)).collect())
556        .collect();
557    // Does any spec need the fully-materialised row in the bound fast
558    // path — a FILTER, a non-bound value arg, a second arg, or a non-bound
559    // ORDER key? When false (every aggregate arg/key is a bound column —
560    // the inbox shape) the bound fast path never materialises a row.
561    let needs_mat = agg_specs.iter().enumerate().any(|(i, s)| {
562        s.filter.is_some()
563            || (s.arg.is_some() && arg_pos[i].is_none())
564            || s.arg2.is_some()
565            || order_pos[i].iter().any(Option::is_none)
566    });
567    let ci_positions: Vec<usize> = group_exprs
568        .iter()
569        .enumerate()
570        .filter(|(_, g)| {
571            matches!(
572                eval::column_collation(g, &ctx),
573                Some(spg_storage::Collation::CaseInsensitive)
574            )
575        })
576        .map(|(i, _)| i)
577        .collect();
578    // v7.31 (perf 3e) — per-row scratch buffers. The fast path used
579    // to allocate a key String (and a refs Vec) for EVERY row just
580    // to probe the group map; hits — the overwhelming case — now
581    // touch the allocator zero times.
582    let mut keybuf_s = String::new();
583    let mut dkeybuf = String::new();
584    let mut refs: Vec<&Value> = Vec::with_capacity(group_pos.len());
585    // v7.32 (round-31) — an aggregate's argument / FILTER / second arg /
586    // ORDER key may itself be a *correlated* subquery, e.g.
587    // `MAX((SELECT i.v FROM inner i WHERE i.fk = o.id))`. A non-correlated
588    // subquery is pre-resolved to a literal before this loop, but a
589    // correlated one survives as a subquery node and must be evaluated per
590    // outer row through the correlated evaluator — the same hook the
591    // select-list / HAVING / ORDER finalisers already use below. Plain
592    // `eval_expr` would hit "subquery reached row eval".
593    //
594    // The `any_agg_subquery` gate is computed once here so the common case
595    // (no subquery anywhere in the aggregate args — including every hot
596    // scan/group aggregate) short-circuits before the per-row
597    // `expr_has_subquery` walk: `eval_arg` is then exactly `eval_expr`.
598    let any_agg_subquery = correlated_eval.is_some()
599        && agg_specs.iter().any(|s| {
600            s.filter
601                .as_ref()
602                .is_some_and(|e| crate::expr_has_subquery(e))
603                || s.arg.as_ref().is_some_and(|e| crate::expr_has_subquery(e))
604                || s.arg2.as_ref().is_some_and(|e| crate::expr_has_subquery(e))
605                || s.order_by.iter().any(|o| crate::expr_has_subquery(&o.expr))
606        });
607    let eval_arg = |e: &Expr, r: &Row, c: &EvalContext<'_>| -> Result<Value, EvalError> {
608        match correlated_eval {
609            Some(f) if any_agg_subquery && crate::expr_has_subquery(e) => f(e, r, c),
610            _ => eval::eval_expr(e, r, c),
611        }
612    };
613    for row in rows {
614        // Fast key: bound positions + no ci folding -> encode
615        // straight from borrowed cells; group_vals materialise
616        // only when the group is NEW.
617        if all_groups_bound && ci_positions.is_empty() && !group_exprs.is_empty() {
618            refs.clear();
619            refs.extend(
620                group_pos
621                    .iter()
622                    .map(|p| row.get(p.unwrap()).unwrap_or(&Value::Null)),
623            );
624            encode_key_refs_into(&refs, &mut keybuf_s);
625            let idx = match groups.get(keybuf_s.as_str()) {
626                Some(&i) => i,
627                None => {
628                    let i = order.len();
629                    let init: Vec<AggState> =
630                        (0..agg_specs.len()).map(|_| AggState::default()).collect();
631                    let owned: Vec<Value> = refs.iter().map(|v| (*v).clone()).collect();
632                    order.push((owned, init));
633                    groups.insert(keybuf_s.clone(), i);
634                    i
635                }
636            };
637            let entry = &mut order[idx];
638            // v7.33 (array_agg perf) — materialise the combined row AT
639            // MOST once per input row, and only when a spec actually
640            // needs the eval path (FILTER / non-bound arg / arg2 / non-
641            // bound ORDER key). Bound args and bound ORDER keys read
642            // cells by reference below, so the inbox shape (all bound)
643            // never materialises — killing the per-row ~1 KB clone that
644            // dominated the ordered-aggregate cost.
645            let mat: Option<Cow<'_, Row>> = if needs_mat { Some(row.as_row()) } else { None };
646            for (i, spec) in agg_specs.iter().enumerate() {
647                // v7.32 (round-29) — FILTER (WHERE cond): exclude rows
648                // where cond is not TRUE before they reach this
649                // aggregate's accumulator (and before DISTINCT dedup).
650                if let Some(f) = &spec.filter
651                    && !matches!(
652                        eval_arg(f, mat.as_deref().expect("needs_mat for FILTER"), &ctx)?,
653                        Value::Bool(true)
654                    )
655                {
656                    continue;
657                }
658                let arg_owned: Value;
659                let arg_ref: &Value = match (&arg_pos[i], &spec.arg) {
660                    (Some(p), _) => row.get(*p).unwrap_or(&Value::Null),
661                    (None, None) => {
662                        arg_owned = Value::Bool(true);
663                        &arg_owned
664                    }
665                    (None, Some(e)) => {
666                        arg_owned = eval_arg(
667                            e,
668                            mat.as_deref().expect("needs_mat for non-bound arg"),
669                            &ctx,
670                        )?;
671                        &arg_owned
672                    }
673                };
674                let arg2_val = match &spec.arg2 {
675                    None => None,
676                    Some(e) => Some(eval_arg(
677                        e,
678                        mat.as_deref().expect("needs_mat for arg2"),
679                        &ctx,
680                    )?),
681                };
682                let order_keys = if spec.order_by.is_empty() {
683                    None
684                } else {
685                    let mut keys = Vec::with_capacity(spec.order_by.len());
686                    for (k, o) in spec.order_by.iter().enumerate() {
687                        // Bound ORDER key → read the cell by reference; only
688                        // a non-bound key falls to the materialised eval path.
689                        keys.push(match order_pos[i][k] {
690                            Some(p) => row.get(p).cloned().unwrap_or(Value::Null),
691                            None => eval_arg(
692                                &o.expr,
693                                mat.as_deref().expect("needs_mat for non-bound ORDER key"),
694                                &ctx,
695                            )?,
696                        });
697                    }
698                    Some(keys)
699                };
700                // v7.33 (array_agg argmax) — first_ordered: keep only the
701                // running first-by-order element (strict-less replacement
702                // = ties keep the earliest row, matching the stable-sort
703                // `[1]`), no array build.
704                if spec.first_ordered {
705                    if let Some(keys) = order_keys {
706                        let st = &mut entry.1[i];
707                        let better = match &st.first_best {
708                            None => true,
709                            Some((bk, _)) => {
710                                cmp_order_keys(&spec.order_by, &keys, bk)
711                                    == core::cmp::Ordering::Less
712                            }
713                        };
714                        if better {
715                            st.first_best = Some((keys, arg_ref.clone()));
716                        }
717                    }
718                    continue;
719                }
720                if spec.distinct {
721                    encode_key_refs_into(core::slice::from_ref(&arg_ref), &mut dkeybuf);
722                    if entry.1[i].seen.contains(dkeybuf.as_str()) {
723                        continue;
724                    }
725                    entry.1[i].seen.insert(dkeybuf.clone());
726                }
727                update_state(
728                    &mut entry.1[i],
729                    &spec.name,
730                    arg_ref,
731                    arg2_val.as_ref(),
732                    order_keys,
733                )?;
734            }
735            continue;
736        }
737        // v7.32 (P4 increment 2) — eval (non-bound) path: present the
738        // row as a borrowed Row once (Owned → zero-cost borrow; a join
739        // tuple materialises here exactly once, never on the bound fast
740        // path above), then the original eval loop runs unchanged.
741        let row_materialised = row.as_row();
742        let row: &Row = &row_materialised;
743        let group_vals: Vec<Value> = group_exprs
744            .iter()
745            .map(|g| eval::eval_expr(g, row, &ctx))
746            .collect::<Result<_, _>>()?;
747        // v7.17.0 Phase 2.5b — case-insensitive group keying: fold
748        // only the ci columns, and only when any exist. Display
749        // value (`group_vals`) stays original — only the key folds.
750        let key = if ci_positions.is_empty() {
751            encode_key(&group_vals)
752        } else {
753            let mut key_vals = group_vals.clone();
754            for &i in &ci_positions {
755                if let Value::Text(s) = &key_vals[i] {
756                    key_vals[i] = Value::Text(s.to_ascii_lowercase());
757                }
758            }
759            encode_key(&key_vals)
760        };
761        // Probe by index; the map owns the key once on vacant insert.
762        let idx = match groups.get(key.as_str()) {
763            Some(&i) => i,
764            None => {
765                let i = order.len();
766                let init: Vec<AggState> =
767                    (0..agg_specs.len()).map(|_| AggState::default()).collect();
768                order.push((group_vals.clone(), init));
769                groups.insert(key, i);
770                i
771            }
772        };
773        let entry = &mut order[idx];
774        for (i, spec) in agg_specs.iter().enumerate() {
775            // v7.32 (round-29) — FILTER (WHERE cond): exclude rows where
776            // cond is not TRUE before accumulation (and before DISTINCT).
777            if let Some(f) = &spec.filter
778                && !matches!(eval_arg(f, row, &ctx)?, Value::Bool(true))
779            {
780                continue;
781            }
782            let arg_val = match &spec.arg {
783                None => Value::Bool(true), // count_star: sentinel non-null
784                Some(e) => eval_arg(e, row, &ctx)?,
785            };
786            // v7.17.0 — `string_agg(value, separator)` evaluates the
787            // separator per row but PG treats it as constant; we
788            // pass the per-row value into update_state so a future
789            // varying-separator caller still sees correct output,
790            // even though SPG (like PG) only uses the most recent.
791            let arg2_val = match &spec.arg2 {
792                None => None,
793                Some(e) => Some(eval_arg(e, row, &ctx)?),
794            };
795            // v7.24 (round-16 A) — aggregate-internal ORDER BY:
796            // evaluate the key tuple against the source row.
797            let order_keys = if spec.order_by.is_empty() {
798                None
799            } else {
800                let mut keys = Vec::with_capacity(spec.order_by.len());
801                for o in &spec.order_by {
802                    keys.push(eval_arg(&o.expr, row, &ctx)?);
803                }
804                Some(keys)
805            };
806            // v7.33 (array_agg argmax) — first_ordered: keep the running
807            // first-by-order element only (mirrors the bound fast path).
808            if spec.first_ordered {
809                if let Some(keys) = order_keys {
810                    let st = &mut entry.1[i];
811                    let better = match &st.first_best {
812                        None => true,
813                        Some((bk, _)) => {
814                            cmp_order_keys(&spec.order_by, &keys, bk) == core::cmp::Ordering::Less
815                        }
816                    };
817                    if better {
818                        st.first_best = Some((keys, arg_val.clone()));
819                    }
820                }
821                continue;
822            }
823            // v7.25 (round-17) — DISTINCT: drop repeated inputs
824            // before they reach the accumulator. NULLs flow through
825            // (each aggregate's own NULL rule applies; PG also
826            // treats NULL as a single distinct value for array_agg).
827            if spec.distinct {
828                let key = encode_key(core::slice::from_ref(&arg_val));
829                if !entry.1[i].seen.insert(key) {
830                    continue;
831                }
832            }
833            update_state(
834                &mut entry.1[i],
835                &spec.name,
836                &arg_val,
837                arg2_val.as_ref(),
838                order_keys,
839            )?;
840        }
841    }
842    Ok(order)
843}
844
845/// (2a) Build the synthetic per-group schema: `__grp_0..K` then
846/// `__agg_0..N`. Group types are probed from the first row; aggregate
847/// types from each spec.
848fn build_synth_schema(
849    rows: &[RowRef<'_>],
850    group_exprs: &[Expr],
851    agg_specs: &[AggSpec],
852    schema_cols: &[ColumnSchema],
853    table_alias: Option<&str>,
854) -> Result<Vec<ColumnSchema>, EvalError> {
855    let ctx = EvalContext::new(schema_cols, table_alias);
856    // Build synthetic schema: __grp_0..K then __agg_0..N.
857    let group_types: Vec<DataType> = if rows.is_empty() {
858        // Use Text as a safe stand-in — empty result means schema isn't
859        // observable. Avoids needing to evaluate group exprs on no row.
860        group_exprs.iter().map(|_| DataType::Text).collect()
861    } else {
862        let probe_row = rows[0].as_row();
863        let probe: &Row = &probe_row;
864        group_exprs
865            .iter()
866            .map(|g| {
867                eval::eval_expr(g, probe, &ctx).map(|v| v.data_type().unwrap_or(DataType::Text))
868            })
869            .collect::<Result<_, _>>()?
870    };
871    let agg_types: Vec<DataType> = agg_specs
872        .iter()
873        .map(|spec| infer_agg_type(spec, schema_cols))
874        .collect();
875    let mut synth_schema: Vec<ColumnSchema> = Vec::new();
876    for (i, ty) in group_types.iter().enumerate() {
877        synth_schema.push(ColumnSchema::new(format!("__grp_{i}"), *ty, true));
878    }
879    for (i, ty) in agg_types.iter().enumerate() {
880        synth_schema.push(ColumnSchema::new(format!("__agg_{i}"), *ty, true));
881    }
882    Ok(synth_schema)
883}
884
885/// (2b) Materialise one synthetic row per group (insertion order):
886/// apply each aggregate's internal ORDER BY, then finalise the running
887/// state into the group + aggregate cells.
888/// v7.33 — compare two aggregate-internal ORDER BY key tuples under the
889/// per-key DESC / NULLS directives. This is the exact comparator the
890/// finalize sort uses, factored out so the `first_ordered` argmax
891/// accumulator's "keep first" decision is provably identical to taking
892/// element `[1]` of the fully-sorted array.
893fn cmp_order_keys(
894    order_by: &[spg_sql::ast::OrderBy],
895    a: &[Value],
896    b: &[Value],
897) -> core::cmp::Ordering {
898    for (k, o) in order_by.iter().enumerate() {
899        let cmp = crate::order_by_value_cmp(o.desc, o.nulls_first, &a[k], &b[k]);
900        if cmp != core::cmp::Ordering::Equal {
901            return cmp;
902        }
903    }
904    core::cmp::Ordering::Equal
905}
906
907fn finalize_synth_rows(
908    order: &[(Vec<Value>, Vec<AggState>)],
909    agg_specs: &[AggSpec],
910    synth_schema: &[ColumnSchema],
911    rows: &[RowRef<'_>],
912    schema_cols: &[ColumnSchema],
913    table_alias: Option<&str>,
914) -> Result<Vec<Row>, EvalError> {
915    let ctx = EvalContext::new(schema_cols, table_alias);
916    // v7.32 (round-29) — ordered-set direct arguments (the percentile
917    // fraction) are constant per PG, so evaluate each once up front.
918    let direct_arg_vals: Vec<Option<Value>> = agg_specs
919        .iter()
920        .map(|spec| match (&spec.direct_arg, rows.first()) {
921            (Some(e), Some(r)) => eval::eval_expr(e, &r.as_row(), &ctx).map(Some),
922            _ => Ok(None),
923        })
924        .collect::<Result<_, _>>()?;
925
926    // Materialise synthetic rows (insertion order = `order`).
927    let mut synth_rows: Vec<Row> = Vec::new();
928    for (gvals, states) in order {
929        let mut values: Vec<Value> = Vec::with_capacity(synth_schema.len());
930        values.extend(gvals.iter().cloned());
931        for (i, st) in states.iter().enumerate() {
932            // v7.33 (array_agg argmax) — first_ordered: the running
933            // first-by-order value IS the result; no array build/sort.
934            if agg_specs[i].first_ordered {
935                values.push(
936                    st.first_best
937                        .as_ref()
938                        .map_or(Value::Null, |(_, v)| v.clone()),
939                );
940                continue;
941            }
942            // v7.24 (round-16 A) — order the collected items per the
943            // aggregate-internal ORDER BY before finalize consumes
944            // them.
945            let st_sorted;
946            let st_final: &AggState =
947                if !agg_specs[i].order_by.is_empty() && st.item_keys.len() == st.items.len() {
948                    let mut idx: Vec<usize> = (0..st.items.len()).collect();
949                    let ob = &agg_specs[i].order_by;
950                    idx.sort_by(|&x, &y| cmp_order_keys(ob, &st.item_keys[x], &st.item_keys[y]));
951                    let mut sorted = st.clone();
952                    sorted.items = idx.iter().map(|&j| st.items[j].clone()).collect();
953                    st_sorted = sorted;
954                    &st_sorted
955                } else {
956                    st
957                };
958            // Ordered-set aggregates compute from the sorted items + the
959            // direct fraction; everything else uses the running state.
960            let v = if is_within_group_name(&agg_specs[i].name) {
961                finalize_ordered_set(
962                    &agg_specs[i].name,
963                    st_final,
964                    direct_arg_vals[i].as_ref(),
965                    agg_specs[i].order_by.first(),
966                )
967            } else {
968                finalize(&agg_specs[i].name, st_final)
969            };
970            values.push(v);
971        }
972        synth_rows.push(Row::new(values));
973    }
974    Ok(synth_rows)
975}
976
977/// (3) Rewrite the user's SELECT items + HAVING to reference the
978/// synthetic columns, filter groups by HAVING, and project each
979/// surviving group into an output row. The synth rows ride alongside
980/// (`kept_synth`) so post-LIMIT deferred subqueries can evaluate later.
981#[allow(clippy::too_many_lines)]
982fn project_groups(
983    synth_rows: Vec<Row>,
984    stmt: &SelectStatement,
985    group_exprs: &[Expr],
986    agg_specs: &[AggSpec],
987    synth_schema: &[ColumnSchema],
988    correlated_eval: Option<CorrelatedEval<'_>>,
989) -> Result<Projection, EvalError> {
990    // Rewrite the user's SELECT items + ORDER BY to reference synthetic
991    // columns. After rewriting, every remaining `Expr::Column` must
992    // resolve against the synthetic schema (i.e. must have been a GROUP
993    // BY expression).
994    let columns: Vec<ColumnSchema> = stmt
995        .items
996        .iter()
997        .map(|item| match item {
998            SelectItem::Wildcard => Err(EvalError::TypeMismatch {
999                detail: "SELECT * with aggregates is not supported".into(),
1000            }),
1001            SelectItem::Expr { expr, alias } => {
1002                let rewritten = rewrite_expr(expr, group_exprs, agg_specs);
1003                let name = alias.clone().unwrap_or_else(|| expr.to_string());
1004                Ok(ColumnSchema::new(
1005                    name,
1006                    agg_or_group_type(&rewritten, synth_schema),
1007                    true,
1008                ))
1009            }
1010        })
1011        .collect::<Result<_, _>>()?;
1012
1013    // Project per synthetic row. HAVING filters out groups *before*
1014    // we keep the projected row — same semantics as PG: HAVING runs
1015    // against the aggregated row (so `HAVING count(*) > 1` works) and
1016    // sees only group-by'd columns plus aggregate values.
1017    let synth_ctx = EvalContext::new(synth_schema, None);
1018    let having_rewritten = stmt
1019        .having
1020        .as_ref()
1021        .map(|h| rewrite_expr(h, group_exprs, agg_specs));
1022    // v7.30 (phase 3e-1) - rewrite SELECT items ONCE. This ran per
1023    // GROUP (23.5k x 9 items of AST cloning = ~48% of the inbox
1024    // query in sampled stacks); the rewrite is group-independent.
1025    // Stable addresses also let the per-expression subquery plans
1026    // (v7.29 3c) hit across groups instead of rebuilding.
1027    let items_rewritten: alloc::vec::Vec<Option<Expr>> = stmt
1028        .items
1029        .iter()
1030        .map(|item| match item {
1031            SelectItem::Expr { expr, .. } => Some(rewrite_expr(expr, group_exprs, agg_specs)),
1032            SelectItem::Wildcard => None,
1033        })
1034        .collect();
1035    // v7.31 (perf — PG lesson #1): subquery-bearing select items
1036    // deferred to post-LIMIT, when no sort/filter key can observe
1037    // them. ORDER BY rewrites are hoisted here so the safety check
1038    // and the sort below share one rewrite pass.
1039    let order_rewritten: Vec<Expr> = stmt
1040        .order_by
1041        .iter()
1042        .map(|o| rewrite_expr(&o.expr, group_exprs, agg_specs))
1043        .collect();
1044    let defer_enabled = correlated_eval.is_some()
1045        && !stmt.distinct
1046        && !having_rewritten
1047            .as_ref()
1048            .is_some_and(crate::expr_has_subquery)
1049        && !order_rewritten.iter().any(crate::expr_has_subquery);
1050    let deferred: Vec<(usize, Expr)> = if defer_enabled {
1051        items_rewritten
1052            .iter()
1053            .enumerate()
1054            .filter_map(|(i, r)| {
1055                r.as_ref()
1056                    .filter(|e| crate::expr_has_subquery(e))
1057                    .map(|e| (i, e.clone()))
1058            })
1059            .collect()
1060    } else {
1061        Vec::new()
1062    };
1063    // v7.32 (architecture v2, P2) — compile the per-group synth-row
1064    // expressions ONCE. The projection / HAVING here run per GROUP
1065    // (24k for the inbox shape) × per item; the rewritten exprs are
1066    // mostly `Column(__agg_N)` / `Column(__grp_K)` against the synth
1067    // schema — flat step programs, no tree walk per group.
1068    let having_compiled = having_rewritten
1069        .as_ref()
1070        .filter(|h| eval::fully_compilable(h))
1071        .map(|h| eval::compile_expr(h, &synth_ctx));
1072    let items_compiled: Vec<Option<eval::CompiledExpr>> = items_rewritten
1073        .iter()
1074        .enumerate()
1075        .map(|(i, r)| {
1076            r.as_ref()
1077                .filter(|e| !deferred.iter().any(|(c, _)| *c == i) && eval::fully_compilable(e))
1078                .map(|e| eval::compile_expr(e, &synth_ctx))
1079        })
1080        .collect();
1081    let mut kept_synth: Vec<Row> = Vec::new();
1082    let mut out_rows: Vec<Row> = Vec::new();
1083    let mut stack: Vec<Value> = Vec::new();
1084    for srow in synth_rows {
1085        if let Some(hc) = &having_compiled {
1086            let cond = eval::eval_compiled(hc, &srow, &synth_ctx, &mut stack)?;
1087            if !matches!(cond, Value::Bool(true)) {
1088                continue;
1089            }
1090        } else if let Some(h) = &having_rewritten {
1091            let cond = match correlated_eval {
1092                Some(f) if crate::expr_has_subquery(h) => f(h, &srow, &synth_ctx)?,
1093                _ => eval::eval_expr(h, &srow, &synth_ctx)?,
1094            };
1095            if !matches!(cond, Value::Bool(true)) {
1096                continue;
1097            }
1098        }
1099        let mut values: Vec<Value> = Vec::with_capacity(columns.len());
1100        for (i, rewritten) in items_rewritten.iter().enumerate() {
1101            let Some(rewritten) = rewritten else { continue };
1102            if deferred.iter().any(|(c, _)| *c == i) {
1103                values.push(Value::Null);
1104                continue;
1105            }
1106            values.push(if let Some(cc) = &items_compiled[i] {
1107                eval::eval_compiled(cc, &srow, &synth_ctx, &mut stack)?
1108            } else {
1109                match correlated_eval {
1110                    Some(f) if crate::expr_has_subquery(rewritten) => {
1111                        f(rewritten, &srow, &synth_ctx)?
1112                    }
1113                    _ => eval::eval_expr(rewritten, &srow, &synth_ctx)?,
1114                }
1115            });
1116        }
1117        kept_synth.push(srow);
1118        out_rows.push(Row::new(values));
1119    }
1120    Ok(Projection {
1121        columns,
1122        out_rows,
1123        kept_synth,
1124        deferred,
1125        order_rewritten,
1126    })
1127}
1128
1129/// (4) Sort the projected output by the rewritten ORDER BY keys. The
1130/// synth rows ride through the sort so deferred subqueries evaluate
1131/// against the surviving groups after the caller's LIMIT truncation.
1132fn sort_synth_by_order_by(
1133    synth_schema: &[ColumnSchema],
1134    order_by: &[spg_sql::ast::OrderBy],
1135    order_rewritten: &[Expr],
1136    mut kept_synth: Vec<Row>,
1137    mut out_rows: Vec<Row>,
1138    correlated_eval: Option<CorrelatedEval<'_>>,
1139) -> Result<(Vec<Row>, Vec<Row>), EvalError> {
1140    let synth_ctx = EvalContext::new(synth_schema, None);
1141    // v6.4.0 — multi-key ORDER BY on aggregate output. Each key
1142    // gets its own rewrite + per-key DESC flag. (Rewrites hoisted
1143    // above as `order_rewritten` — shared with the deferral
1144    // safety check.)
1145    let keys_meta: Vec<(bool, Option<bool>)> =
1146        order_by.iter().map(|o| (o.desc, o.nulls_first)).collect();
1147    // P2: compile order-by keys once (per-group sort keys are
1148    // the same `__agg_N` / `__grp_K` shape as the projection).
1149    let order_compiled: Vec<Option<eval::CompiledExpr>> = order_rewritten
1150        .iter()
1151        .map(|e| {
1152            Some(e)
1153                .filter(|e| eval::fully_compilable(e))
1154                .map(|e| eval::compile_expr(e, &synth_ctx))
1155        })
1156        .collect();
1157    // The synth row rides through the sort so deferred exprs can
1158    // evaluate against the surviving groups after the caller's
1159    // LIMIT truncation.
1160    let mut keystack: Vec<Value> = Vec::new();
1161    let mut tagged: Vec<(Vec<Value>, Row, Row)> = Vec::with_capacity(kept_synth.len());
1162    for (s, o) in kept_synth.into_iter().zip(out_rows) {
1163        let mut keys = Vec::with_capacity(order_rewritten.len());
1164        for (e, oc) in order_rewritten.iter().zip(&order_compiled) {
1165            keys.push(if let Some(oc) = oc {
1166                eval::eval_compiled(oc, &s, &synth_ctx, &mut keystack)?
1167            } else {
1168                match correlated_eval {
1169                    Some(f) if crate::expr_has_subquery(e) => f(e, &s, &synth_ctx)?,
1170                    _ => eval::eval_expr(e, &s, &synth_ctx)?,
1171                }
1172            });
1173        }
1174        tagged.push((keys, s, o));
1175    }
1176    tagged.sort_by(|a, b| {
1177        use core::cmp::Ordering;
1178        for (i, (ka, kb)) in a.0.iter().zip(b.0.iter()).enumerate() {
1179            let (desc, nf) = keys_meta[i];
1180            let cmp = crate::order_by_value_cmp(desc, nf, ka, kb);
1181            if cmp != Ordering::Equal {
1182                return cmp;
1183            }
1184        }
1185        Ordering::Equal
1186    });
1187    kept_synth = Vec::with_capacity(tagged.len());
1188    out_rows = Vec::with_capacity(tagged.len());
1189    for (_, s, o) in tagged {
1190        kept_synth.push(s);
1191        out_rows.push(o);
1192    }
1193    Ok((kept_synth, out_rows))
1194}
1195
1196/// v7.17.0 — walk the statement again to validate the positional
1197/// arity of every aggregate call site. Done after AST collection
1198/// rather than inside `collect_aggregates` so the collector stays
1199/// infallible; callers in `run()` can do a single early-error
1200/// exit before any per-row work.
1201fn validate_agg_arities(stmt: &SelectStatement, _specs: &[AggSpec]) -> Result<(), EvalError> {
1202    fn walk(e: &Expr) -> Result<(), EvalError> {
1203        if let Expr::FunctionCall { name, args } = e {
1204            let lower = name.to_ascii_lowercase();
1205            let expected: Option<usize> = match lower.as_str() {
1206                "count_star" => Some(0),
1207                "count" | "sum" | "avg" | "min" | "max" | "array_agg"
1208                // v7.17.0 — boolean aggregates also take exactly
1209                // one arg. `every` is an alias normalised inside
1210                // collect_aggregates / rewrite_expr.
1211                | "bool_and" | "bool_or" | "every"
1212                // v7.32 (round-29) — statistical + bitwise aggregates
1213                // + single-arg JSON aggregate.
1214                | "stddev" | "stddev_samp" | "stddev_pop"
1215                | "variance" | "var_samp" | "var_pop"
1216                | "bit_and" | "bit_or" | "bit_xor"
1217                | "json_agg" | "jsonb_agg" => Some(1),
1218                // v7.32 (round-29) — two-argument aggregates: string_agg,
1219                // the regression family f(Y, X), and json_object_agg.
1220                "string_agg"
1221                | "covar_pop" | "covar_samp" | "corr"
1222                | "regr_count" | "regr_avgx" | "regr_avgy" | "regr_slope"
1223                | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy"
1224                | "json_object_agg" | "jsonb_object_agg" => Some(2),
1225                _ => None,
1226            };
1227            if let Some(want) = expected
1228                && args.len() != want
1229            {
1230                return Err(EvalError::TypeMismatch {
1231                    detail: alloc::format!("{lower}() takes {want} arg(s), got {}", args.len()),
1232                });
1233            }
1234            for a in args {
1235                walk(a)?;
1236            }
1237        } else if let Expr::Binary { lhs, rhs, .. } = e {
1238            walk(lhs)?;
1239            walk(rhs)?;
1240        } else if let Expr::Unary { expr, .. }
1241        | Expr::Cast { expr, .. }
1242        | Expr::IsNull { expr, .. } = e
1243        {
1244            walk(expr)?;
1245        }
1246        Ok(())
1247    }
1248    for item in &stmt.items {
1249        if let SelectItem::Expr { expr, .. } = item {
1250            walk(expr)?;
1251        }
1252    }
1253    for o in &stmt.order_by {
1254        walk(&o.expr)?;
1255    }
1256    if let Some(h) = &stmt.having {
1257        walk(h)?;
1258    }
1259    Ok(())
1260}
1261
1262/// v7.33 (array_agg argmax) — recognise `(array_agg(x ORDER BY y))[1]`,
1263/// the argmax/argmin idiom: a non-DISTINCT ordered `array_agg`
1264/// subscripted by the constant 1. Returns `(value_arg, order_by,
1265/// filter)` on a match. When matched, the whole per-group array build +
1266/// sort + materialise is replaced by a running first-by-order scalar
1267/// accumulator and the subscript node is consumed (replaced by the
1268/// synthetic column). collect_aggregates and rewrite_expr share this one
1269/// matcher so their `__agg_<i>` assignment stays in lockstep.
1270fn first_ordered_array_agg(e: &Expr) -> Option<(&Expr, &[spg_sql::ast::OrderBy], Option<&Expr>)> {
1271    let Expr::ArraySubscript { target, index } = e else {
1272        return None;
1273    };
1274    if !matches!(
1275        index.as_ref(),
1276        Expr::Literal(spg_sql::ast::Literal::Integer(1))
1277    ) {
1278        return None;
1279    }
1280    let Expr::AggregateOrdered {
1281        call,
1282        order_by,
1283        distinct,
1284        filter,
1285    } = target.as_ref()
1286    else {
1287        return None;
1288    };
1289    if *distinct || order_by.is_empty() {
1290        return None;
1291    }
1292    let Expr::FunctionCall { name, args } = call.as_ref() else {
1293        return None;
1294    };
1295    if !name.eq_ignore_ascii_case("array_agg") || args.len() != 1 {
1296        return None;
1297    }
1298    Some((&args[0], order_by, filter.as_deref()))
1299}
1300
1301fn collect_aggregates(e: &Expr, out: &mut Vec<AggSpec>) {
1302    match e {
1303        // v7.24 (round-16 A) — ordered aggregate: register the inner
1304        // call's spec with the ordering attached.
1305        Expr::AggregateOrdered {
1306            call,
1307            order_by,
1308            distinct,
1309            filter,
1310        } => {
1311            if let Expr::FunctionCall { name, args } = call.as_ref() {
1312                let lower = name.to_ascii_lowercase();
1313                if is_aggregate_name(&lower) {
1314                    let canonical = if lower == "every" {
1315                        "bool_and".to_string()
1316                    } else {
1317                        lower
1318                    };
1319                    // Ordered-set aggregates (`percentile_cont(f)
1320                    // WITHIN GROUP (ORDER BY x)`) take the value to
1321                    // aggregate from the sort spec and the in-parens
1322                    // arg as the direct (fraction) argument.
1323                    let ordered_set = is_within_group_name(&canonical);
1324                    let (arg, direct_arg) = if ordered_set {
1325                        (
1326                            order_by.first().map(|o| o.expr.clone()),
1327                            args.first().cloned(),
1328                        )
1329                    } else {
1330                        (args.first().cloned(), None)
1331                    };
1332                    let spec = AggSpec {
1333                        name: canonical.clone(),
1334                        arg,
1335                        arg2: if agg_uses_second_arg(&canonical) {
1336                            args.get(1).cloned()
1337                        } else {
1338                            None
1339                        },
1340                        distinct: *distinct,
1341                        order_by: order_by.clone(),
1342                        filter: filter.as_deref().cloned(),
1343                        direct_arg,
1344                        first_ordered: false,
1345                    };
1346                    if !out.iter().any(|s| {
1347                        s.name == spec.name
1348                            && s.arg == spec.arg
1349                            && s.arg2 == spec.arg2
1350                            && s.distinct == spec.distinct
1351                            && s.order_by == spec.order_by
1352                            && s.filter == spec.filter
1353                            && s.direct_arg == spec.direct_arg
1354                            && s.first_ordered == spec.first_ordered
1355                    }) {
1356                        out.push(spec);
1357                    }
1358                    return;
1359                }
1360            }
1361            collect_aggregates(call, out);
1362            for o in order_by {
1363                collect_aggregates(&o.expr, out);
1364            }
1365        }
1366        Expr::FunctionCall { name, args } => {
1367            let lower = name.to_ascii_lowercase();
1368            if is_aggregate_name(&lower) {
1369                let arg = if lower == "count_star" {
1370                    None
1371                } else {
1372                    args.first().cloned()
1373                };
1374                // v7.17.0 — second positional arg for
1375                // `string_agg(value, separator)`; v7.32 — also the
1376                // regression family `f(Y, X)` and `json_object_agg`.
1377                let arg2 = if agg_uses_second_arg(&lower) {
1378                    args.get(1).cloned()
1379                } else {
1380                    None
1381                };
1382                // v7.17.0 — `every` is the SQL-standard alias for
1383                // `bool_and`; collapse at collection time so
1384                // update_state / finalize need only one arm.
1385                let canonical = if lower == "every" {
1386                    "bool_and".to_string()
1387                } else {
1388                    lower
1389                };
1390                let spec = AggSpec {
1391                    name: canonical,
1392                    arg: arg.clone(),
1393                    arg2: arg2.clone(),
1394                    distinct: false,
1395                    order_by: Vec::new(),
1396                    filter: None,
1397                    direct_arg: None,
1398                    first_ordered: false,
1399                };
1400                if !out.iter().any(|s| {
1401                    s.name == spec.name
1402                        && s.arg == spec.arg
1403                        && s.arg2 == spec.arg2
1404                        && !s.distinct
1405                        && s.order_by == spec.order_by
1406                        && s.filter.is_none()
1407                        && !s.first_ordered
1408                }) {
1409                    out.push(spec);
1410                }
1411                // Don't recurse into the arg — nested aggregates are
1412                // illegal in standard SQL.
1413            } else {
1414                for a in args {
1415                    collect_aggregates(a, out);
1416                }
1417            }
1418        }
1419        Expr::Binary { lhs, rhs, .. } => {
1420            collect_aggregates(lhs, out);
1421            collect_aggregates(rhs, out);
1422        }
1423        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
1424            collect_aggregates(expr, out);
1425        }
1426        Expr::Like { expr, pattern, .. } => {
1427            collect_aggregates(expr, out);
1428            collect_aggregates(pattern, out);
1429        }
1430        Expr::InList { expr, list, .. } => {
1431            collect_aggregates(expr, out);
1432            for item in list {
1433                collect_aggregates(item, out);
1434            }
1435        }
1436        Expr::Extract { source, .. } => collect_aggregates(source, out),
1437        // v4.10 subquery + v4.12 window / Literal / Column —
1438        // non-recursing leaves for the aggregate collector.
1439        Expr::ScalarSubquery(_)
1440        | Expr::Exists { .. }
1441        | Expr::InSubquery { .. }
1442        | Expr::WindowFunction { .. }
1443        | Expr::Literal(_)
1444        | Expr::Placeholder(_)
1445        | Expr::Column(_) => {}
1446        // v7.10.10 — recurse into array constructor children +
1447        // subscript / ANY/ALL operands.
1448        Expr::Array(items) => {
1449            for elem in items {
1450                collect_aggregates(elem, out);
1451            }
1452        }
1453        Expr::ArraySubscript { target, index } => {
1454            // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]`
1455            // collects as a first_ordered spec; the subscript is consumed
1456            // here (do NOT recurse into the array_agg, or it would also
1457            // register a plain full-array spec).
1458            if let Some((arg, order_by, filter)) = first_ordered_array_agg(e) {
1459                let spec = AggSpec {
1460                    name: "array_agg".to_string(),
1461                    arg: Some(arg.clone()),
1462                    arg2: None,
1463                    distinct: false,
1464                    order_by: order_by.to_vec(),
1465                    filter: filter.cloned(),
1466                    direct_arg: None,
1467                    first_ordered: true,
1468                };
1469                if !out.iter().any(|s| {
1470                    s.name == spec.name
1471                        && s.arg == spec.arg
1472                        && s.order_by == spec.order_by
1473                        && s.filter == spec.filter
1474                        && s.first_ordered
1475                }) {
1476                    out.push(spec);
1477                }
1478                return;
1479            }
1480            collect_aggregates(target, out);
1481            collect_aggregates(index, out);
1482        }
1483        Expr::AnyAll { expr, array, .. } => {
1484            collect_aggregates(expr, out);
1485            collect_aggregates(array, out);
1486        }
1487        Expr::Case {
1488            operand,
1489            branches,
1490            else_branch,
1491        } => {
1492            if let Some(o) = operand {
1493                collect_aggregates(o, out);
1494            }
1495            for (w, t) in branches {
1496                collect_aggregates(w, out);
1497                collect_aggregates(t, out);
1498            }
1499            if let Some(e) = else_branch {
1500                collect_aggregates(e, out);
1501            }
1502        }
1503    }
1504}
1505
1506fn update_state(
1507    st: &mut AggState,
1508    name: &str,
1509    v: &Value,
1510    arg2: Option<&Value>,
1511    order_keys: Option<Vec<Value>>,
1512) -> Result<(), EvalError> {
1513    let is_null = matches!(v, Value::Null);
1514    match name {
1515        "count_star" => st.count += 1,
1516        "count" => {
1517            if !is_null {
1518                st.count += 1;
1519            }
1520        }
1521        "sum" | "avg" => {
1522            if is_null {
1523                return Ok(());
1524            }
1525            st.count += 1;
1526            match v {
1527                Value::Int(n) => st.sum_int += i64::from(*n),
1528                Value::BigInt(n) => st.sum_int += *n,
1529                Value::Float(x) => {
1530                    st.use_float = true;
1531                    st.sum_float += *x;
1532                }
1533                other => {
1534                    return Err(EvalError::TypeMismatch {
1535                        detail: format!("sum/avg need numeric, got {:?}", other.data_type()),
1536                    });
1537                }
1538            }
1539        }
1540        "min" => {
1541            if is_null {
1542                return Ok(());
1543            }
1544            match &st.extreme {
1545                None => st.extreme = Some(v.clone()),
1546                Some(cur) => {
1547                    if value_cmp(v, cur) == core::cmp::Ordering::Less {
1548                        st.extreme = Some(v.clone());
1549                    }
1550                }
1551            }
1552        }
1553        "max" => {
1554            if is_null {
1555                return Ok(());
1556            }
1557            match &st.extreme {
1558                None => st.extreme = Some(v.clone()),
1559                Some(cur) => {
1560                    if value_cmp(v, cur) == core::cmp::Ordering::Greater {
1561                        st.extreme = Some(v.clone());
1562                    }
1563                }
1564            }
1565        }
1566        // v7.17.0 — string_agg(value, separator). NULL value is
1567        // skipped (PG aggregate-skip-null). Separator captured
1568        // from the latest row that flows through; matches PG's
1569        // semantics of evaluating the separator per row but using
1570        // the last value at finalize time (in practice it's
1571        // constant). count is bumped so we can distinguish "empty
1572        // group → NULL" from "all-NULL group → NULL".
1573        "string_agg" => {
1574            if let Some(sep) = arg2
1575                && let Value::Text(s) = sep
1576            {
1577                st.separator = Some(s.clone());
1578            }
1579            if is_null {
1580                return Ok(());
1581            }
1582            if let Value::Text(s) = v {
1583                st.items.push(Value::Text(s.clone()));
1584                if let Some(k) = order_keys {
1585                    st.item_keys.push(k);
1586                }
1587                st.count += 1;
1588            } else {
1589                return Err(EvalError::TypeMismatch {
1590                    detail: format!("string_agg requires text value, got {:?}", v.data_type()),
1591                });
1592            }
1593        }
1594        // v7.17.0 — array_agg(value). Unlike string_agg, NULL
1595        // elements are KEPT in the array (PG behaviour); the
1596        // result is NULL only when ZERO rows fed in. Element type
1597        // is locked from the first row's value type; subsequent
1598        // rows must match (PG also rejects mixed-type array_agg).
1599        "array_agg" => {
1600            st.items.push(v.clone());
1601            if let Some(k) = order_keys {
1602                st.item_keys.push(k);
1603            }
1604            st.count += 1;
1605        }
1606        // v7.17.0 — bool_and(p): TRUE iff every non-NULL input is
1607        // TRUE. NULL skipped; running accumulator stays at TRUE
1608        // until the first non-NULL FALSE.
1609        "bool_and" => {
1610            if is_null {
1611                return Ok(());
1612            }
1613            let b = match v {
1614                Value::Bool(b) => *b,
1615                other => {
1616                    return Err(EvalError::TypeMismatch {
1617                        detail: format!("bool_and requires bool, got {:?}", other.data_type()),
1618                    });
1619                }
1620            };
1621            st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc && b));
1622        }
1623        // v7.17.0 — bool_or(p): TRUE iff any non-NULL input is
1624        // TRUE. NULL skipped.
1625        "bool_or" => {
1626            if is_null {
1627                return Ok(());
1628            }
1629            let b = match v {
1630                Value::Bool(b) => *b,
1631                other => {
1632                    return Err(EvalError::TypeMismatch {
1633                        detail: format!("bool_or requires bool, got {:?}", other.data_type()),
1634                    });
1635                }
1636            };
1637            st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc || b));
1638        }
1639        // v7.32 (round-29) — variance / stddev family. Accumulate the
1640        // running sum (sum_float) and sum of squares (sum_sq) over the
1641        // non-NULL numeric inputs; finalize divides by n or n-1.
1642        "stddev" | "stddev_samp" | "stddev_pop" | "variance" | "var_samp" | "var_pop" => {
1643            if is_null {
1644                return Ok(());
1645            }
1646            let x = match v {
1647                Value::Int(n) => f64::from(*n),
1648                Value::SmallInt(n) => f64::from(*n),
1649                Value::BigInt(n) => *n as f64,
1650                Value::Float(x) => *x,
1651                other => {
1652                    return Err(EvalError::TypeMismatch {
1653                        detail: format!("{name} needs numeric, got {:?}", other.data_type()),
1654                    });
1655                }
1656            };
1657            st.count += 1;
1658            st.sum_float += x;
1659            st.sum_sq += x * x;
1660        }
1661        // v7.32 (round-29) — bitwise aggregates over integer inputs.
1662        "bit_and" | "bit_or" | "bit_xor" => {
1663            if is_null {
1664                return Ok(());
1665            }
1666            let n = match v {
1667                Value::Int(n) => i64::from(*n),
1668                Value::SmallInt(n) => i64::from(*n),
1669                Value::BigInt(n) => *n,
1670                other => {
1671                    return Err(EvalError::TypeMismatch {
1672                        detail: format!("{name} needs integer, got {:?}", other.data_type()),
1673                    });
1674                }
1675            };
1676            st.bit_acc = Some(match (st.bit_acc, name) {
1677                (None, _) => n,
1678                (Some(acc), "bit_and") => acc & n,
1679                (Some(acc), "bit_or") => acc | n,
1680                (Some(acc), _) => acc ^ n, // bit_xor
1681            });
1682        }
1683        // v7.32 (round-29) — WITHIN GROUP aggregates (ordered-set +
1684        // hypothetical-set) collect the sort value (NULLs ignored, per
1685        // PG) into `items`, sorted at finalize by the parallel
1686        // `item_keys`.
1687        n if is_within_group_name(n) => {
1688            if is_null {
1689                return Ok(());
1690            }
1691            st.items.push(v.clone());
1692            if let Some(k) = order_keys {
1693                st.item_keys.push(k);
1694            }
1695            st.count += 1;
1696        }
1697        // v7.32 (round-29) — regression family f(Y, X). Only rows with
1698        // BOTH inputs non-NULL contribute (PG semantics). `v` is Y,
1699        // `arg2` is X.
1700        n if is_regression_name(n) => {
1701            let (Some(y), Some(x)) = (agg_value_to_f64(v), arg2.and_then(agg_value_to_f64)) else {
1702                return Ok(()); // NULL (or non-numeric) in either input
1703            };
1704            st.reg_n += 1;
1705            st.reg_sx += x;
1706            st.reg_sy += y;
1707            st.reg_sxx += x * x;
1708            st.reg_syy += y * y;
1709            st.reg_sxy += x * y;
1710        }
1711        // v7.32 (round-29) — json_agg / jsonb_agg collect every input
1712        // (NULL becomes JSON null, per PG) in row order.
1713        "json_agg" | "jsonb_agg" => {
1714            st.items.push(v.clone());
1715            st.count += 1;
1716        }
1717        // v7.32 (round-29) — json_object_agg(key, value): keys in
1718        // `items`, values in `aux_items`. A NULL key is skipped (PG
1719        // raises; we drop it rather than abort the whole query).
1720        "json_object_agg" | "jsonb_object_agg" => {
1721            if is_null {
1722                return Ok(());
1723            }
1724            st.items.push(v.clone());
1725            st.aux_items.push(arg2.cloned().unwrap_or(Value::Null));
1726            st.count += 1;
1727        }
1728        _ => unreachable!("non-aggregate {name} in update_state"),
1729    }
1730    Ok(())
1731}
1732
1733#[allow(clippy::cast_precision_loss)]
1734fn finalize(name: &str, st: &AggState) -> Value {
1735    match name {
1736        "count" | "count_star" => Value::BigInt(st.count),
1737        "sum" => {
1738            if st.count == 0 {
1739                Value::Null
1740            } else if st.use_float {
1741                Value::Float(st.sum_float + (st.sum_int as f64))
1742            } else {
1743                Value::BigInt(st.sum_int)
1744            }
1745        }
1746        "avg" => {
1747            if st.count == 0 {
1748                Value::Null
1749            } else {
1750                let total = if st.use_float {
1751                    st.sum_float + (st.sum_int as f64)
1752                } else {
1753                    st.sum_int as f64
1754                };
1755                Value::Float(total / (st.count as f64))
1756            }
1757        }
1758        "min" | "max" => st.extreme.clone().unwrap_or(Value::Null),
1759        // v7.17.0 — string_agg: join all collected text items with
1760        // the captured separator. Empty / all-NULL group → NULL
1761        // (PG semantics).
1762        "string_agg" => {
1763            if st.items.is_empty() {
1764                return Value::Null;
1765            }
1766            let sep = st.separator.clone().unwrap_or_default();
1767            let mut out = String::new();
1768            for (i, item) in st.items.iter().enumerate() {
1769                if i > 0 {
1770                    out.push_str(&sep);
1771                }
1772                if let Value::Text(s) = item {
1773                    out.push_str(s);
1774                }
1775            }
1776            Value::Text(out)
1777        }
1778        // v7.17.0 — array_agg: collect into a typed array. NULL
1779        // elements are preserved per PG. Result type is decided
1780        // by the first non-NULL element seen (or Text fallback
1781        // when the whole group is NULL — PG would surface the
1782        // declared input type, but SPG hasn't yet wired the
1783        // aggregate's static input-type from `describe`).
1784        "array_agg" => {
1785            if st.items.is_empty() {
1786                return Value::Null;
1787            }
1788            let probe = st.items.iter().find(|v| !v.is_null());
1789            match probe.and_then(spg_storage::Value::data_type) {
1790                Some(DataType::Int) | Some(DataType::SmallInt) => {
1791                    let items: Vec<Option<i32>> = st
1792                        .items
1793                        .iter()
1794                        .map(|v| match v {
1795                            Value::Int(n) => Some(*n),
1796                            Value::SmallInt(n) => Some(i32::from(*n)),
1797                            _ => None,
1798                        })
1799                        .collect();
1800                    Value::IntArray(items)
1801                }
1802                Some(DataType::BigInt) => {
1803                    let items: Vec<Option<i64>> = st
1804                        .items
1805                        .iter()
1806                        .map(|v| match v {
1807                            Value::BigInt(n) => Some(*n),
1808                            _ => None,
1809                        })
1810                        .collect();
1811                    Value::BigIntArray(items)
1812                }
1813                _ => {
1814                    let items: Vec<Option<String>> = st
1815                        .items
1816                        .iter()
1817                        .map(|v| match v {
1818                            Value::Text(s) => Some(s.clone()),
1819                            Value::Null => None,
1820                            other => Some(format!("{other:?}")),
1821                        })
1822                        .collect();
1823                    Value::TextArray(items)
1824                }
1825            }
1826        }
1827        // v7.17.0 — bool_and / bool_or finalize: lazy-init pattern
1828        // means `None` is exactly "empty group or all-NULL", which
1829        // PG surfaces as SQL NULL.
1830        "bool_and" | "bool_or" => st.bool_acc.map_or(Value::Null, Value::Bool),
1831        // v7.32 (round-29) — variance / stddev. PG: `variance` ==
1832        // `var_samp`, `stddev` == `stddev_samp`. samp needs n >= 2
1833        // (n < 2 → NULL); pop needs n >= 1 (n == 1 → 0).
1834        "variance" | "var_samp" | "var_pop" | "stddev" | "stddev_samp" | "stddev_pop" => {
1835            let n = st.count;
1836            if n == 0 {
1837                return Value::Null;
1838            }
1839            let nf = n as f64;
1840            // Sum of squared deviations from the mean.
1841            let ss = st.sum_sq - (st.sum_float * st.sum_float) / nf;
1842            let pop = name.ends_with("_pop");
1843            let denom = if pop { nf } else { nf - 1.0 };
1844            if denom <= 0.0 {
1845                // var_samp / stddev (samp) with n == 1 → NULL.
1846                return Value::Null;
1847            }
1848            let var = (ss / denom).max(0.0); // clamp fp noise below 0
1849            if name.starts_with("stddev") {
1850                Value::Float(crate::eval::f64_sqrt(var))
1851            } else {
1852                Value::Float(var)
1853            }
1854        }
1855        // v7.32 (round-29) — bitwise aggregates: None (empty / all-NULL)
1856        // → SQL NULL.
1857        "bit_and" | "bit_or" | "bit_xor" => st.bit_acc.map_or(Value::Null, Value::BigInt),
1858        // v7.32 (round-29) — regression family. `regr_count` is the
1859        // paired n; everything else is NULL over an empty set. Terms
1860        // are the mean-centred sums of squares / cross-products.
1861        "regr_count" => Value::BigInt(st.reg_n),
1862        "covar_pop" | "covar_samp" | "corr" | "regr_avgx" | "regr_avgy" | "regr_slope"
1863        | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy" => {
1864            let n = st.reg_n;
1865            if n == 0 {
1866                return Value::Null;
1867            }
1868            let nf = n as f64;
1869            let sxx = st.reg_sxx - st.reg_sx * st.reg_sx / nf;
1870            let syy = st.reg_syy - st.reg_sy * st.reg_sy / nf;
1871            let sxy = st.reg_sxy - st.reg_sx * st.reg_sy / nf;
1872            let avgx = st.reg_sx / nf;
1873            let avgy = st.reg_sy / nf;
1874            let out = match name {
1875                "regr_avgx" => Some(avgx),
1876                "regr_avgy" => Some(avgy),
1877                "regr_sxx" => Some(sxx),
1878                "regr_syy" => Some(syy),
1879                "regr_sxy" => Some(sxy),
1880                "covar_pop" => Some(sxy / nf),
1881                "covar_samp" => (n >= 2).then(|| sxy / (nf - 1.0)),
1882                "regr_slope" => (sxx != 0.0).then(|| sxy / sxx),
1883                "regr_intercept" => (sxx != 0.0).then(|| avgy - (sxy / sxx) * avgx),
1884                "corr" => {
1885                    let d = sxx * syy;
1886                    (d > 0.0).then(|| sxy / crate::eval::f64_sqrt(d))
1887                }
1888                // PG: NULL when sxx==0; 1 when syy==0 (and sxx>0).
1889                "regr_r2" => {
1890                    if sxx == 0.0 {
1891                        None
1892                    } else if syy == 0.0 {
1893                        Some(1.0)
1894                    } else {
1895                        Some((sxy * sxy) / (sxx * syy))
1896                    }
1897                }
1898                _ => None,
1899            };
1900            out.map_or(Value::Null, Value::Float)
1901        }
1902        // v7.32 (round-29) — json_agg / jsonb_agg: a JSON array of every
1903        // collected element in row order; empty set → SQL NULL.
1904        "json_agg" | "jsonb_agg" => {
1905            if st.items.is_empty() {
1906                return Value::Null;
1907            }
1908            let mut out = String::from("[");
1909            for (i, item) in st.items.iter().enumerate() {
1910                if i > 0 {
1911                    out.push_str(", ");
1912                }
1913                out.push_str(&crate::json::value_to_json_text(item));
1914            }
1915            out.push(']');
1916            Value::Json(out)
1917        }
1918        // v7.32 (round-29) — json_object_agg: a JSON object built from
1919        // the parallel key (`items`) / value (`aux_items`) streams.
1920        "json_object_agg" | "jsonb_object_agg" => {
1921            if st.items.is_empty() {
1922                return Value::Null;
1923            }
1924            let mut out = String::from("{");
1925            for (i, key) in st.items.iter().enumerate() {
1926                if i > 0 {
1927                    out.push_str(", ");
1928                }
1929                // Object keys are always JSON strings (PG coerces).
1930                let key_text = match key {
1931                    Value::Text(s) | Value::Json(s) => s.clone(),
1932                    other => crate::json::value_to_json_text(other),
1933                };
1934                out.push_str(&crate::json::value_to_json_text(&Value::Text(key_text)));
1935                out.push_str(": ");
1936                let val = st.aux_items.get(i).unwrap_or(&Value::Null);
1937                out.push_str(&crate::json::value_to_json_text(val));
1938            }
1939            out.push('}');
1940            Value::Json(out)
1941        }
1942        // Ordered-set aggregates are finalized in `run` (they need the
1943        // sorted items + the direct fraction argument), never here.
1944        _ => unreachable!(),
1945    }
1946}
1947
1948/// v7.32 (round-29) — numeric coercion for the percentile interpolation.
1949fn agg_value_to_f64(v: &Value) -> Option<f64> {
1950    match v {
1951        Value::Int(n) => Some(f64::from(*n)),
1952        Value::SmallInt(n) => Some(f64::from(*n)),
1953        Value::BigInt(n) => Some(*n as f64),
1954        Value::Float(x) => Some(*x),
1955        _ => None,
1956    }
1957}
1958
1959/// v7.32 (round-29) — finalize a WITHIN GROUP aggregate. `st.items` is
1960/// already sorted by the `WITHIN GROUP (ORDER BY …)` spec. `direct` is
1961/// the evaluated direct argument: the fraction for `percentile_*`, the
1962/// hypothetical value for the hypothetical-set family (`rank` etc.),
1963/// and unused by `mode`. `order` is the (single) sort key, needed by
1964/// the hypothetical-set family to compare in the sort direction.
1965#[allow(
1966    clippy::cast_precision_loss,
1967    clippy::cast_possible_truncation,
1968    clippy::cast_sign_loss
1969)]
1970fn finalize_ordered_set(
1971    name: &str,
1972    st: &AggState,
1973    direct: Option<&Value>,
1974    order: Option<&spg_sql::ast::OrderBy>,
1975) -> Value {
1976    let fraction = direct;
1977    let items = &st.items;
1978    if items.is_empty() {
1979        // A hypothetical row ranks first over an empty group; the
1980        // distribution functions are 0 / divide-by-(n+1).
1981        return match name {
1982            "rank" | "dense_rank" => Value::BigInt(1),
1983            "percent_rank" => Value::Float(0.0),
1984            "cume_dist" => Value::Float(1.0),
1985            _ => Value::Null,
1986        };
1987    }
1988    let n = items.len();
1989    match name {
1990        // v7.32 (round-29) — hypothetical-set: the rank the direct value
1991        // would have if inserted into the group, in the sort direction.
1992        "rank" | "dense_rank" | "percent_rank" | "cume_dist" => {
1993            let Some(h) = fraction else {
1994                return Value::Null;
1995            };
1996            let (desc, nulls_first) = order.map_or((false, None), |o| (o.desc, o.nulls_first));
1997            let mut before = 0usize; // sort strictly before h
1998            let mut before_or_eq = 0usize; // sort before-or-peer with h
1999            let mut distinct_before = 0usize;
2000            let mut last_before: Option<&Value> = None;
2001            for it in items {
2002                match crate::order_by_value_cmp(desc, nulls_first, it, h) {
2003                    core::cmp::Ordering::Less => {
2004                        before += 1;
2005                        before_or_eq += 1;
2006                        if last_before
2007                            .is_none_or(|p| value_cmp(p, it) != core::cmp::Ordering::Equal)
2008                        {
2009                            distinct_before += 1;
2010                            last_before = Some(it);
2011                        }
2012                    }
2013                    core::cmp::Ordering::Equal => before_or_eq += 1,
2014                    core::cmp::Ordering::Greater => {}
2015                }
2016            }
2017            let nn = n as f64;
2018            match name {
2019                "rank" => Value::BigInt((before + 1) as i64),
2020                "dense_rank" => Value::BigInt((distinct_before + 1) as i64),
2021                "percent_rank" => Value::Float(before as f64 / nn),
2022                "cume_dist" => Value::Float((before_or_eq as f64 + 1.0) / (nn + 1.0)),
2023                _ => unreachable!(),
2024            }
2025        }
2026        // Most frequent value; equal values are adjacent in the sorted
2027        // run, and a frequency tie resolves to the earliest run (the
2028        // smallest value under an ascending sort), matching PG.
2029        "mode" => {
2030            let (mut best_i, mut best_cnt) = (0usize, 1usize);
2031            let (mut run_i, mut run_cnt) = (0usize, 1usize);
2032            for i in 1..n {
2033                if value_cmp(&items[i], &items[run_i]) == core::cmp::Ordering::Equal {
2034                    run_cnt += 1;
2035                } else {
2036                    run_i = i;
2037                    run_cnt = 1;
2038                }
2039                if run_cnt > best_cnt {
2040                    best_cnt = run_cnt;
2041                    best_i = run_i;
2042                }
2043            }
2044            items[best_i].clone()
2045        }
2046        // The first value whose cumulative fraction reaches `f`.
2047        "percentile_disc" => {
2048            let f = fraction
2049                .and_then(agg_value_to_f64)
2050                .unwrap_or(0.0)
2051                .clamp(0.0, 1.0);
2052            let idx = if f <= 0.0 {
2053                0
2054            } else {
2055                (crate::eval::f64_ceil(f * n as f64) as usize)
2056                    .saturating_sub(1)
2057                    .min(n - 1)
2058            };
2059            items[idx].clone()
2060        }
2061        // Linear interpolation between the two bracketing values.
2062        "percentile_cont" => {
2063            let f = fraction
2064                .and_then(agg_value_to_f64)
2065                .unwrap_or(0.0)
2066                .clamp(0.0, 1.0);
2067            let Some(nums) = items
2068                .iter()
2069                .map(agg_value_to_f64)
2070                .collect::<Option<Vec<f64>>>()
2071            else {
2072                return Value::Null; // non-numeric ordered set
2073            };
2074            if n == 1 {
2075                return Value::Float(nums[0]);
2076            }
2077            let rank = f * (n as f64 - 1.0);
2078            let lo = crate::eval::f64_floor(rank) as usize;
2079            let hi = crate::eval::f64_ceil(rank) as usize;
2080            let frac = rank - lo as f64;
2081            Value::Float(nums[lo] + (nums[hi] - nums[lo]) * frac)
2082        }
2083        _ => unreachable!(),
2084    }
2085}
2086
2087fn infer_agg_type(spec: &AggSpec, schema_cols: &[ColumnSchema]) -> DataType {
2088    // v7.26 (round-20 C) — the argument's statically-derived shape
2089    // types MIN/MAX/SUM/array_agg properly; RowDescription used to
2090    // report TEXT for these, breaking every sqlx typed decode.
2091    let arg_ty = spec
2092        .arg
2093        .as_ref()
2094        .and_then(|a| crate::describe::describe_expr(a, schema_cols))
2095        .map(|shape| shape.ty);
2096    // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]` yields the
2097    // ELEMENT type (x), not the array type.
2098    if spec.first_ordered {
2099        return arg_ty.unwrap_or(DataType::Text);
2100    }
2101    match spec.name.as_str() {
2102        "count" | "count_star" => DataType::BigInt,
2103        "sum" => match arg_ty {
2104            Some(DataType::Float) => DataType::Float,
2105            _ => DataType::BigInt,
2106        },
2107        "avg" => DataType::Float,
2108        // v7.17.0 — string_agg always returns TEXT.
2109        "string_agg" => DataType::Text,
2110        "array_agg" => match arg_ty {
2111            Some(DataType::Int | DataType::SmallInt) => DataType::IntArray,
2112            Some(DataType::BigInt) => DataType::BigIntArray,
2113            _ => DataType::TextArray,
2114        },
2115        // v7.17.0 — boolean aggregates always return BOOL (nullable
2116        // — empty / all-NULL group → NULL).
2117        "bool_and" | "bool_or" => DataType::Bool,
2118        // v7.32 (round-29) — variance / stddev are floating point;
2119        // percentile_cont interpolates to float; the regression family
2120        // (except regr_count) is floating point.
2121        "stddev" | "stddev_samp" | "stddev_pop" | "variance" | "var_samp" | "var_pop"
2122        | "percentile_cont" | "covar_pop" | "covar_samp" | "corr" | "regr_avgx" | "regr_avgy"
2123        | "regr_slope" | "regr_intercept" | "regr_r2" | "regr_sxx" | "regr_syy" | "regr_sxy" => {
2124            DataType::Float
2125        }
2126        // v7.32 (round-29) — bitwise aggregates, regr_count, and the
2127        // integer hypothetical-set ranks return an integer.
2128        "bit_and" | "bit_or" | "bit_xor" | "regr_count" | "rank" | "dense_rank" => DataType::BigInt,
2129        // v7.32 (round-29) — hypothetical-set distribution functions.
2130        "percent_rank" | "cume_dist" => DataType::Float,
2131        // v7.32 (round-29) — JSON aggregates return JSON.
2132        "json_agg" | "jsonb_agg" | "json_object_agg" | "jsonb_object_agg" => DataType::Json,
2133        // min/max, percentile_disc, mode, and anything pass-through:
2134        // the argument's shape (for ordered-set aggs `spec.arg` is the
2135        // WITHIN GROUP value expression).
2136        _ => arg_ty.unwrap_or(DataType::Text),
2137    }
2138}
2139
2140fn agg_or_group_type(e: &Expr, synth: &[ColumnSchema]) -> DataType {
2141    if let Expr::Column(c) = e
2142        && let Some(s) = synth.iter().find(|s| s.name == c.name)
2143    {
2144        return s.ty;
2145    }
2146    // v7.26 (round-20 C) — compound expressions over aggregates
2147    // (COALESCE(BOOL_OR(…), false), (array_agg(…))[1], CASE …)
2148    // derive their shape statically against the synth schema; the
2149    // old Text fallback broke sqlx typed decodes of exactly these
2150    // columns.
2151    crate::describe::describe_expr(e, synth)
2152        .map(|shape| shape.ty)
2153        .unwrap_or(DataType::Text)
2154}
2155
2156fn rewrite_expr(e: &Expr, group_exprs: &[Expr], aggs: &[AggSpec]) -> Expr {
2157    // v7.33 (array_agg argmax) — `(array_agg(x ORDER BY y))[1]` rewrites
2158    // to its first_ordered synth column, consuming the subscript. Checked
2159    // before the AggregateOrdered/recursion arms (which would otherwise
2160    // rewrite the inner array_agg and leave the subscript). Same matcher
2161    // as collect_aggregates, so the spec it finds is the one collected.
2162    if let Some((arg, order_by, filter)) = first_ordered_array_agg(e) {
2163        let arg_owned = Some(arg.clone());
2164        let filter_owned = filter.cloned();
2165        for (i, spec) in aggs.iter().enumerate() {
2166            if spec.first_ordered
2167                && spec.name == "array_agg"
2168                && spec.arg == arg_owned
2169                && spec.order_by == *order_by
2170                && spec.filter == filter_owned
2171            {
2172                return Expr::Column(spg_sql::ast::ColumnName {
2173                    qualifier: None,
2174                    name: format!("__agg_{i}"),
2175                });
2176            }
2177        }
2178    }
2179    // v7.24 (round-16 A) — ordered aggregate: match on the inner
2180    // call PLUS the ordering keys.
2181    if let Expr::AggregateOrdered {
2182        call,
2183        order_by,
2184        distinct,
2185        filter,
2186    } = e
2187        && let Expr::FunctionCall { name, args } = call.as_ref()
2188    {
2189        let lower = name.to_ascii_lowercase();
2190        if is_aggregate_name(&lower) {
2191            let canonical: &str = if lower == "every" { "bool_and" } else { &lower };
2192            // Mirror collect_aggregates: ordered-set aggregates take the
2193            // value from the sort spec and the in-parens arg as direct.
2194            let (arg, direct_arg) = if is_within_group_name(canonical) {
2195                (
2196                    order_by.first().map(|o| o.expr.clone()),
2197                    args.first().cloned(),
2198                )
2199            } else {
2200                (args.first().cloned(), None)
2201            };
2202            let arg2 = if agg_uses_second_arg(canonical) {
2203                args.get(1).cloned()
2204            } else {
2205                None
2206            };
2207            let filter_owned = filter.as_deref().cloned();
2208            for (i, spec) in aggs.iter().enumerate() {
2209                if spec.name == canonical
2210                    && spec.arg == arg
2211                    && spec.arg2 == arg2
2212                    && spec.distinct == *distinct
2213                    && spec.order_by == *order_by
2214                    && spec.filter == filter_owned
2215                    && spec.direct_arg == direct_arg
2216                {
2217                    return Expr::Column(spg_sql::ast::ColumnName {
2218                        qualifier: None,
2219                        name: format!("__agg_{i}"),
2220                    });
2221                }
2222            }
2223        }
2224    }
2225    // Match aggregate FunctionCalls first — they sit outside group_by.
2226    if let Expr::FunctionCall { name, args } = e {
2227        let lower = name.to_ascii_lowercase();
2228        if is_aggregate_name(&lower) {
2229            let arg = if lower == "count_star" {
2230                None
2231            } else {
2232                args.first().cloned()
2233            };
2234            // v7.17.0 — match the spec we registered for
2235            // string_agg(value, separator) on the full pair; v7.32 also
2236            // the regression family and json_object_agg.
2237            let arg2 = if agg_uses_second_arg(&lower) {
2238                args.get(1).cloned()
2239            } else {
2240                None
2241            };
2242            // v7.17.0 — `every` collapses into `bool_and` at
2243            // collection; mirror that here so the rewrite finds
2244            // the matching synth column.
2245            let canonical: &str = if lower == "every" {
2246                "bool_and"
2247            } else {
2248                lower.as_str()
2249            };
2250            for (i, spec) in aggs.iter().enumerate() {
2251                if spec.name == canonical
2252                    && spec.arg == arg
2253                    && spec.arg2 == arg2
2254                    && !spec.distinct
2255                    && spec.order_by.is_empty()
2256                {
2257                    return Expr::Column(spg_sql::ast::ColumnName {
2258                        qualifier: None,
2259                        name: format!("__agg_{i}"),
2260                    });
2261                }
2262            }
2263        }
2264    }
2265    // Match a group_by expression by AST equality.
2266    for (i, g) in group_exprs.iter().enumerate() {
2267        if g == e {
2268            return Expr::Column(spg_sql::ast::ColumnName {
2269                qualifier: None,
2270                name: format!("__grp_{i}"),
2271            });
2272        }
2273    }
2274    // Recurse into children.
2275    match e {
2276        Expr::AggregateOrdered {
2277            call,
2278            order_by,
2279            distinct,
2280            filter,
2281        } => Expr::AggregateOrdered {
2282            call: Box::new(rewrite_expr(call, group_exprs, aggs)),
2283            distinct: *distinct,
2284            order_by: order_by
2285                .iter()
2286                .map(|o| spg_sql::ast::OrderBy {
2287                    expr: rewrite_expr(&o.expr, group_exprs, aggs),
2288                    desc: o.desc,
2289                    nulls_first: o.nulls_first,
2290                })
2291                .collect(),
2292            // The filter is evaluated against SOURCE rows during
2293            // accumulation, never against synth rows — keep it as-is.
2294            filter: filter.clone(),
2295        },
2296        Expr::Binary { lhs, op, rhs } => Expr::Binary {
2297            lhs: Box::new(rewrite_expr(lhs, group_exprs, aggs)),
2298            op: *op,
2299            rhs: Box::new(rewrite_expr(rhs, group_exprs, aggs)),
2300        },
2301        Expr::Unary { op, expr } => Expr::Unary {
2302            op: *op,
2303            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2304        },
2305        Expr::Cast { expr, target } => Expr::Cast {
2306            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2307            target: *target,
2308        },
2309        Expr::IsNull { expr, negated } => Expr::IsNull {
2310            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2311            negated: *negated,
2312        },
2313        Expr::FunctionCall { name, args } => Expr::FunctionCall {
2314            name: name.clone(),
2315            args: args
2316                .iter()
2317                .map(|a| rewrite_expr(a, group_exprs, aggs))
2318                .collect(),
2319        },
2320        Expr::Like {
2321            expr,
2322            pattern,
2323            negated,
2324            case_insensitive,
2325        } => Expr::Like {
2326            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2327            pattern: Box::new(rewrite_expr(pattern, group_exprs, aggs)),
2328            negated: *negated,
2329            case_insensitive: *case_insensitive,
2330        },
2331        Expr::Extract { field, source } => Expr::Extract {
2332            field: *field,
2333            source: Box::new(rewrite_expr(source, group_exprs, aggs)),
2334        },
2335        // v7.25.2 (round-19 A) — subquery nodes: rewrite group-key
2336        // references INSIDE the body to `__grp_N` so the correlated
2337        // resolver can substitute them against the synthesised group
2338        // row (aggs are NOT matched inside the body — a COUNT in the
2339        // subquery is the subquery's own aggregate).
2340        Expr::ScalarSubquery(s) => {
2341            Expr::ScalarSubquery(Box::new(rewrite_group_keys_in_select(s, group_exprs)))
2342        }
2343        Expr::Exists { subquery, negated } => Expr::Exists {
2344            subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
2345            negated: *negated,
2346        },
2347        Expr::InSubquery {
2348            expr,
2349            subquery,
2350            negated,
2351        } => Expr::InSubquery {
2352            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2353            subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
2354            negated: *negated,
2355        },
2356        // v4.12 window / Literal / Column — clone-pass (these don't
2357        // participate in aggregate rewrite).
2358        Expr::WindowFunction { .. } | Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => {
2359            e.clone()
2360        }
2361        // v7.10.10 — recurse children for array nodes.
2362        Expr::Array(items) => Expr::Array(
2363            items
2364                .iter()
2365                .map(|elem| rewrite_expr(elem, group_exprs, aggs))
2366                .collect(),
2367        ),
2368        Expr::ArraySubscript { target, index } => Expr::ArraySubscript {
2369            target: Box::new(rewrite_expr(target, group_exprs, aggs)),
2370            index: Box::new(rewrite_expr(index, group_exprs, aggs)),
2371        },
2372        Expr::AnyAll {
2373            expr,
2374            op,
2375            array,
2376            is_any,
2377        } => Expr::AnyAll {
2378            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2379            op: *op,
2380            array: Box::new(rewrite_expr(array, group_exprs, aggs)),
2381            is_any: *is_any,
2382        },
2383        Expr::InList {
2384            expr,
2385            list,
2386            negated,
2387        } => Expr::InList {
2388            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
2389            list: list
2390                .iter()
2391                .map(|item| rewrite_expr(item, group_exprs, aggs))
2392                .collect(),
2393            negated: *negated,
2394        },
2395        Expr::Case {
2396            operand,
2397            branches,
2398            else_branch,
2399        } => Expr::Case {
2400            operand: operand
2401                .as_deref()
2402                .map(|o| Box::new(rewrite_expr(o, group_exprs, aggs))),
2403            branches: branches
2404                .iter()
2405                .map(|(w, t)| {
2406                    (
2407                        rewrite_expr(w, group_exprs, aggs),
2408                        rewrite_expr(t, group_exprs, aggs),
2409                    )
2410                })
2411                .collect(),
2412            else_branch: else_branch
2413                .as_deref()
2414                .map(|e| Box::new(rewrite_expr(e, group_exprs, aggs))),
2415        },
2416    }
2417}
2418
2419/// v7.25.2 (round-19 A) — rewrite group-key references inside a
2420/// subquery body to `__grp_N` synthetic columns (aggregates are
2421/// not touched: empty spec list). Runs through the canonical
2422/// Select walker so every expression slot is covered.
2423fn rewrite_group_keys_in_select(
2424    s: &spg_sql::ast::SelectStatement,
2425    group_exprs: &[Expr],
2426) -> spg_sql::ast::SelectStatement {
2427    let mut out = s.clone();
2428    let _ = crate::walk_select_exprs_mut(&mut out, &mut |e| {
2429        *e = rewrite_expr(e, group_exprs, &[]);
2430        Ok(())
2431    });
2432    out
2433}
2434
2435/// Canonical string key for a tuple of group values. Used as map key.
2436/// Per-value group-key encoding (shared by owned and borrowed paths).
2437fn encode_one(out: &mut String, v: &Value) {
2438    match v {
2439        Value::Null => out.push_str("N|"),
2440        Value::SmallInt(n) => {
2441            out.push('s');
2442            out.push_str(&n.to_string());
2443            out.push('|');
2444        }
2445        Value::Int(n) => {
2446            out.push('I');
2447            out.push_str(&n.to_string());
2448            out.push('|');
2449        }
2450        Value::BigInt(n) => {
2451            out.push('B');
2452            out.push_str(&n.to_string());
2453            out.push('|');
2454        }
2455        Value::Float(x) => {
2456            out.push('F');
2457            out.push_str(&x.to_string());
2458            out.push('|');
2459        }
2460        Value::Bool(b) => {
2461            out.push(if *b { 'T' } else { 'f' });
2462            out.push('|');
2463        }
2464        Value::Text(s) => {
2465            out.push('S');
2466            out.push_str(s);
2467            out.push('|');
2468        }
2469        Value::Vector(v) => {
2470            out.push('V');
2471            for x in v {
2472                out.push_str(&x.to_string());
2473                out.push(',');
2474            }
2475            out.push('|');
2476        }
2477        // v6.0.1: GROUP BY on a `VECTOR(N) USING SQ8` column.
2478        // Two cells with byte-identical `(min, max, bytes)`
2479        // share the same group; equivalence is byte-equality
2480        // (same as f32 grouping today — neither path tries to
2481        // normalise nan/-0).
2482        Value::Sq8Vector(q) => {
2483            out.push('Q');
2484            out.push_str(&q.min.to_string());
2485            out.push('@');
2486            out.push_str(&q.max.to_string());
2487            out.push(':');
2488            for b in &q.bytes {
2489                out.push_str(&b.to_string());
2490                out.push(',');
2491            }
2492            out.push('|');
2493        }
2494        // v6.0.3: GROUP BY on a `VECTOR(N) USING HALF` column.
2495        // Byte-equality over the raw u16 bits; matches the SQ8
2496        // path's byte-key model.
2497        Value::HalfVector(h) => {
2498            out.push('H');
2499            for b in &h.bytes {
2500                out.push_str(&b.to_string());
2501                out.push(',');
2502            }
2503            out.push('|');
2504        }
2505        Value::Numeric { scaled, scale } => {
2506            out.push('D');
2507            out.push_str(&scaled.to_string());
2508            out.push('@');
2509            out.push_str(&scale.to_string());
2510            out.push('|');
2511        }
2512        Value::Date(d) => {
2513            out.push('d');
2514            out.push_str(&d.to_string());
2515            out.push('|');
2516        }
2517        Value::Timestamp(t) => {
2518            out.push('t');
2519            out.push_str(&t.to_string());
2520            out.push('|');
2521        }
2522        Value::Interval { months, micros } => {
2523            out.push('i');
2524            out.push_str(&months.to_string());
2525            out.push('m');
2526            out.push_str(&micros.to_string());
2527            out.push('|');
2528        }
2529        Value::Json(s) => {
2530            out.push('j');
2531            out.push_str(s);
2532            out.push('|');
2533        }
2534        // v7.5.0 — Value is #[non_exhaustive] for downstream
2535        // forward-compat. Any future variant lacking explicit
2536        // handling here will share a debug-derived group key,
2537        // which is observably wrong but won't crash.
2538        _ => {
2539            out.push('?');
2540            out.push_str(&format!("{v:?}"));
2541            out.push('|');
2542        }
2543    }
2544}
2545
2546/// v7.30 (perf campaign) - encode from borrowed cells without
2547/// materialising an owned Vec<Value> first.
2548pub(crate) fn encode_key_refs(vals: &[&Value]) -> String {
2549    let mut out = String::new();
2550    for v in vals {
2551        encode_one(&mut out, v);
2552    }
2553    out
2554}
2555
2556/// v7.31 (perf 3e) — encode into a caller-owned scratch buffer.
2557/// The per-row key paths (group hash, DISTINCT set, join build/
2558/// probe) ran 24k+ String allocations per query through the
2559/// allocator just to LOOK UP a map; the scratch form allocates
2560/// only when a map actually has to take ownership (vacant insert).
2561pub(crate) fn encode_key_refs_into(vals: &[&Value], out: &mut String) {
2562    out.clear();
2563    for v in vals {
2564        encode_one(out, v);
2565    }
2566}
2567
2568pub(crate) fn encode_key(vals: &[Value]) -> String {
2569    let mut out = String::new();
2570    for v in vals {
2571        encode_one(&mut out, v);
2572    }
2573    out
2574}
2575
2576#[allow(clippy::cast_precision_loss)]
2577fn value_cmp(a: &Value, b: &Value) -> core::cmp::Ordering {
2578    use core::cmp::Ordering::Equal;
2579    match (a, b) {
2580        (Value::Null, Value::Null) => Equal,
2581        (Value::Null, _) => core::cmp::Ordering::Greater, // NULLs last
2582        (_, Value::Null) => core::cmp::Ordering::Less,
2583        (Value::Int(x), Value::Int(y)) => x.cmp(y),
2584        (Value::BigInt(x), Value::BigInt(y)) => x.cmp(y),
2585        (Value::Int(x), Value::BigInt(y)) => i64::from(*x).cmp(y),
2586        (Value::BigInt(x), Value::Int(y)) => x.cmp(&i64::from(*y)),
2587        (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(Equal),
2588        (Value::Int(x), Value::Float(y)) => f64::from(*x).partial_cmp(y).unwrap_or(Equal),
2589        (Value::Float(x), Value::Int(y)) => x.partial_cmp(&f64::from(*y)).unwrap_or(Equal),
2590        (Value::BigInt(x), Value::Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Equal),
2591        (Value::Float(x), Value::BigInt(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Equal),
2592        (Value::Text(x), Value::Text(y)) => x.cmp(y),
2593        (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
2594        _ => Equal,
2595    }
2596}