Skip to main content

spg_engine/
aggregate.rs

1//! Aggregate executor.
2//!
3//! Handles `SELECT … <aggs> … [GROUP BY …]` queries. The planning strategy
4//! is straightforward:
5//!
6//! 1. Walk the SELECT (and ORDER BY) expressions to find every aggregate
7//!    function call. Dedupe by AST equality and assign each `__agg_<i>`.
8//! 2. Same for every `GROUP BY` expression: assign `__grp_<j>`.
9//! 3. Stream the WHERE-filtered rows, group by the tuple of GROUP BY
10//!    values, and update per-group aggregate state.
11//! 4. Materialise a synthetic per-group row containing
12//!    `[__grp_0..__grp_K, __agg_0..__agg_N]` and rewrite the user's
13//!    SELECT / ORDER BY expressions to reference those synthetic columns
14//!    instead of the originals.
15//! 5. Evaluate the rewritten expressions against the synthetic schema and
16//!    emit results.
17//!
18//! v1.8 implements `count(*)`, `count(expr)`, `sum`, `min`, `max`, `avg`.
19//! NULL semantics follow PG: aggregates skip NULL inputs (except
20//! `count(*)`, which counts rows). `sum(int)` widens to `BigInt`;
21//! `avg(int|bigint)` returns `Float`.
22
23use alloc::boxed::Box;
24use alloc::collections::BTreeSet;
25use alloc::format;
26use alloc::string::{String, ToString};
27use alloc::vec::Vec;
28
29use spg_sql::ast::{Expr, SelectItem, SelectStatement};
30use spg_storage::{ColumnSchema, DataType, Row, Value};
31
32use crate::eval::{self, EvalContext, EvalError};
33
34/// True if this statement should go through the aggregate path.
35pub fn uses_aggregate(stmt: &SelectStatement) -> bool {
36    if stmt.group_by.is_some() || stmt.having.is_some() {
37        return true;
38    }
39    for item in &stmt.items {
40        if let SelectItem::Expr { expr, .. } = item
41            && contains_aggregate(expr)
42        {
43            return true;
44        }
45    }
46    for o in &stmt.order_by {
47        if contains_aggregate(&o.expr) {
48            return true;
49        }
50    }
51    if let Some(h) = &stmt.having
52        && contains_aggregate(h)
53    {
54        return true;
55    }
56    false
57}
58
59pub fn contains_aggregate(e: &Expr) -> bool {
60    match e {
61        Expr::FunctionCall { name, args } => {
62            is_aggregate_name(name) || args.iter().any(contains_aggregate)
63        }
64        Expr::AggregateOrdered { .. } => true,
65        Expr::Binary { lhs, rhs, .. } => contains_aggregate(lhs) || contains_aggregate(rhs),
66        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
67            contains_aggregate(expr)
68        }
69        Expr::Like { expr, pattern, .. } => contains_aggregate(expr) || contains_aggregate(pattern),
70        Expr::Extract { source, .. } => contains_aggregate(source),
71        // v4.10 subqueries + v4.12 window functions / Literal /
72        // Column — all non-aggregate leaves from the regular
73        // aggregate planner's POV. Window-bearing projections are
74        // routed to exec_select_with_window before this runs.
75        Expr::ScalarSubquery(_)
76        | Expr::Exists { .. }
77        | Expr::InSubquery { .. }
78        | Expr::WindowFunction { .. }
79        | Expr::Literal(_)
80        | Expr::Placeholder(_)
81        | Expr::Column(_) => false,
82        // v7.10.10 — recurse into array constructor / subscript /
83        // ANY/ALL children. Aggregates inside `ARRAY[SUM(x)]` are
84        // valid PG and must be detected here.
85        Expr::Array(items) => items.iter().any(contains_aggregate),
86        Expr::ArraySubscript { target, index } => {
87            contains_aggregate(target) || contains_aggregate(index)
88        }
89        Expr::AnyAll { expr, array, .. } => contains_aggregate(expr) || contains_aggregate(array),
90        Expr::InList { expr, list, .. } => {
91            contains_aggregate(expr) || list.iter().any(contains_aggregate)
92        }
93        // v7.13.0 — CASE WHEN … END. Recurse into operand,
94        // every (WHEN, THEN) pair, and the ELSE branch.
95        Expr::Case {
96            operand,
97            branches,
98            else_branch,
99        } => {
100            operand.as_deref().is_some_and(contains_aggregate)
101                || branches
102                    .iter()
103                    .any(|(w, t)| contains_aggregate(w) || contains_aggregate(t))
104                || else_branch.as_deref().is_some_and(contains_aggregate)
105        }
106    }
107}
108
109pub fn is_aggregate_name(name: &str) -> bool {
110    matches!(
111        name.to_ascii_lowercase().as_str(),
112        "count"
113            | "count_star"
114            | "sum"
115            | "min"
116            | "max"
117            | "avg"
118            // v7.17.0 — variadic / collection aggregates. ORM
119            // reports (Hibernate / Rails / Django) emit these in
120            // GROUP BY rollups; pre-7.17 SPG hit "unknown
121            // aggregate".
122            | "string_agg"
123            | "array_agg"
124            // v7.17.0 — boolean aggregates. `every` is SQL-standard
125            // alias for `bool_and`.
126            | "bool_and"
127            | "bool_or"
128            | "every"
129    )
130}
131
132/// Per-aggregate running state.
133#[derive(Debug, Default, Clone)]
134struct AggState {
135    count: i64,
136    sum_int: i64,
137    sum_float: f64,
138    extreme: Option<Value>,
139    use_float: bool,
140    /// v7.17.0 — running collection for string_agg / array_agg.
141    /// Each entry is one row's contribution (NULL preserved as
142    /// `Value::Null`; string_agg's finalize step drops them, but
143    /// array_agg keeps them). Pushing in insertion order matches
144    /// PG behaviour when no `ORDER BY` is given inside the
145    /// aggregate call.
146    items: Vec<Value>,
147    /// v7.25 (round-17) — per-group dedupe set for DISTINCT
148    /// aggregates (encoded values; NULLs never reach it because
149    /// the caller's skip runs after the per-aggregate NULL rules).
150    seen: BTreeSet<String>,
151    /// v7.24 (round-16 A) — per-item ORDER BY key tuples, parallel
152    /// to `items` (pushed under the same skip/keep conditions).
153    /// Empty when the aggregate carries no internal ordering.
154    item_keys: Vec<Vec<Value>>,
155    /// v7.17.0 — captured separator for string_agg. PG accepts a
156    /// non-constant separator expression but in practice every
157    /// caller passes a literal; the engine snapshots the last
158    /// non-NULL text it sees, which matches PG's "use the latest
159    /// row's value" behaviour.
160    separator: Option<String>,
161    /// v7.17.0 — running boolean accumulator for bool_and /
162    /// bool_or / every. `None` until the first non-NULL input;
163    /// at finalize None → SQL NULL.
164    bool_acc: Option<bool>,
165}
166
167#[derive(Debug, Clone)]
168struct AggSpec {
169    name: String, // lowercased
170    /// First argument (value expression) for every aggregate
171    /// except `count(*)`. `None` for `count_star`.
172    arg: Option<Expr>,
173    /// v7.17.0 — second argument. Only `string_agg(value, sep)`
174    /// uses it today. `None` for every other aggregate (or for
175    /// `array_agg`, which is single-arg). Carried in the spec so
176    /// per-row evaluation can re-use the same separator
177    /// expression across calls.
178    arg2: Option<Expr>,
179    /// v7.25 (round-17) — `COUNT(DISTINCT x)` & friends: dedupe
180    /// the input stream per group before accumulation.
181    distinct: bool,
182    /// v7.24 (round-16 A) — aggregate-internal ORDER BY keys
183    /// (`array_agg(x ORDER BY y DESC NULLS LAST)`). Empty for the
184    /// plain form. Only the collection aggregates honour it;
185    /// other aggregates are order-insensitive and ignore it (PG
186    /// accepts the syntax everywhere too).
187    order_by: Vec<spg_sql::ast::OrderBy>,
188}
189
190/// Output of running the aggregate path. Schema describes one row per
191/// group; rows are not yet ORDER BY-sorted (caller does it).
192#[derive(Debug)]
193pub struct AggResult {
194    pub columns: Vec<ColumnSchema>,
195    pub rows: Vec<Row>,
196    /// v7.31 (perf — PG lesson #1, post-LIMIT subquery projection):
197    /// select-list items whose rewritten expr carries a subquery and
198    /// is referenced by neither ORDER BY nor HAVING. Their output
199    /// cells hold NULL placeholders; the caller truncates to
200    /// LIMIT+OFFSET first and only then evaluates these for the
201    /// surviving rows (PG runs the same shape with SubPlan loops=50
202    /// instead of loops=24000). `(output_col, rewritten_expr)`.
203    pub deferred: Vec<(usize, Expr)>,
204    /// Synthetic group rows aligned 1:1 with `rows`; populated only
205    /// when `deferred` is non-empty.
206    pub synth_rows: Vec<Row>,
207    /// Schema the deferred exprs evaluate against.
208    pub synth_schema: Vec<ColumnSchema>,
209}
210
211/// Execute aggregate logic against an already-WHERE-filtered iterator of
212/// rows. `table_alias` is the alias accepted by column resolution.
213#[allow(clippy::too_many_lines)]
214/// v7.25.2 (round-19 A) — caller-injected evaluator for synth-row
215/// expressions that still carry subquery nodes after the rewrite
216/// (correlated subqueries in the select list / HAVING / aggregate
217/// ORDER BY of a GROUP BY query). The engine passes its
218/// correlated-aware evaluator; pure-library callers pass None and
219/// surviving subqueries keep erroring loudly.
220pub type CorrelatedEval<'a> = &'a dyn Fn(&Expr, &Row, &EvalContext<'_>) -> Result<Value, EvalError>;
221
222pub fn run(
223    stmt: &SelectStatement,
224    rows: &[&Row],
225    schema_cols: &[ColumnSchema],
226    table_alias: Option<&str>,
227    correlated_eval: Option<CorrelatedEval<'_>>,
228) -> Result<AggResult, EvalError> {
229    let ctx = EvalContext::new(schema_cols, table_alias);
230    let group_exprs: Vec<Expr> = stmt.group_by.clone().unwrap_or_default();
231
232    // Collect aggregate sub-expressions across items + order_by.
233    let mut agg_specs: Vec<AggSpec> = Vec::new();
234    for item in &stmt.items {
235        if let SelectItem::Expr { expr, .. } = item {
236            collect_aggregates(expr, &mut agg_specs);
237        }
238    }
239    for o in &stmt.order_by {
240        collect_aggregates(&o.expr, &mut agg_specs);
241    }
242    if let Some(h) = &stmt.having {
243        collect_aggregates(h, &mut agg_specs);
244    }
245    // v7.17.0 — arity validation. The collector tolerates an
246    // arbitrary positional-arg count; here we enforce the
247    // per-aggregate contract so a malformed call (e.g.
248    // `array_agg()` or `string_agg(x)`) surfaces as a SQL error
249    // rather than silently coercing to a degenerate aggregate.
250    validate_agg_arities(stmt, &agg_specs)?;
251
252    // Map group key (vec of values, encoded as canonical string) -> group state.
253    // Order of insertion is preserved via a parallel Vec of keys.
254    // v7.29 - hash map (output order rides key_order, not map order).
255    let mut groups: hashbrown::HashMap<String, (Vec<Value>, Vec<AggState>)> =
256        hashbrown::HashMap::new();
257    let mut key_order: Vec<String> = Vec::new();
258    // When there are no GROUP BY exprs *and* there is at least one aggregate,
259    // every row collapses into a single anonymous group keyed by "".
260    if rows.is_empty() && group_exprs.is_empty() {
261        // Single empty-aggregate group: count=0, sum=0, max=NULL, etc.
262        let init: Vec<AggState> = (0..agg_specs.len()).map(|_| AggState::default()).collect();
263        groups.insert(String::new(), (Vec::new(), init));
264        key_order.push(String::new());
265    }
266
267    // v7.30 (perf campaign) - hoist the per-row work that doesn't
268    // depend on the row: which group exprs need collation folding
269    // (none, for most queries - the old code cloned the whole
270    // group_vals vec per row just in case).
271    // v7.30 (perf campaign) - the no-tax row loop. When a group
272    // expr or an aggregate argument is a bare column reference
273    // (the overwhelmingly common shape), bind its position ONCE
274    // and read row cells by offset in the loop - no per-row tree
275    // walk, no owned-Value clone out of resolve_column. Anything
276    // more complex keeps the eval path.
277    let col_pos = |e: &Expr| -> Option<usize> {
278        // Qualified references only: the bare-name resolver carries
279        // alias/ambiguity logic the bind-once path must not fork.
280        if let Expr::Column(c) = e
281            && c.qualifier.is_some()
282        {
283            eval::find_column_pos(c, &ctx)
284        } else {
285            None
286        }
287    };
288    let group_pos: Vec<Option<usize>> = group_exprs.iter().map(col_pos).collect();
289    let all_groups_bound = group_pos.iter().all(Option::is_some);
290    let arg_pos: Vec<Option<usize>> = agg_specs
291        .iter()
292        .map(|spec| spec.arg.as_ref().and_then(|e| col_pos(e)))
293        .collect();
294    let ci_positions: Vec<usize> = group_exprs
295        .iter()
296        .enumerate()
297        .filter(|(_, g)| {
298            matches!(
299                eval::column_collation(g, &ctx),
300                Some(spg_storage::Collation::CaseInsensitive)
301            )
302        })
303        .map(|(i, _)| i)
304        .collect();
305    // v7.31 (perf 3e) — per-row scratch buffers. The fast path used
306    // to allocate a key String (and a refs Vec) for EVERY row just
307    // to probe the group map; hits — the overwhelming case — now
308    // touch the allocator zero times.
309    let mut keybuf_s = String::new();
310    let mut dkeybuf = String::new();
311    let mut refs: Vec<&Value> = Vec::with_capacity(group_pos.len());
312    for row in rows {
313        // Fast key: bound positions + no ci folding -> encode
314        // straight from borrowed cells; group_vals materialise
315        // only when the group is NEW.
316        if all_groups_bound && ci_positions.is_empty() && !group_exprs.is_empty() {
317            refs.clear();
318            refs.extend(
319                group_pos
320                    .iter()
321                    .map(|p| row.values.get(p.unwrap()).unwrap_or(&Value::Null)),
322            );
323            encode_key_refs_into(&refs, &mut keybuf_s);
324            let entry = match groups.entry_ref(keybuf_s.as_str()) {
325                hashbrown::hash_map::EntryRef::Occupied(o) => o.into_mut(),
326                hashbrown::hash_map::EntryRef::Vacant(v) => {
327                    key_order.push(keybuf_s.clone());
328                    let init: Vec<AggState> =
329                        (0..agg_specs.len()).map(|_| AggState::default()).collect();
330                    let owned: Vec<Value> = refs.iter().map(|v| (*v).clone()).collect();
331                    v.insert((owned, init))
332                }
333            };
334            for (i, spec) in agg_specs.iter().enumerate() {
335                let arg_owned: Value;
336                let arg_ref: &Value = match (&arg_pos[i], &spec.arg) {
337                    (Some(p), _) => row.values.get(*p).unwrap_or(&Value::Null),
338                    (None, None) => {
339                        arg_owned = Value::Bool(true);
340                        &arg_owned
341                    }
342                    (None, Some(e)) => {
343                        arg_owned = eval::eval_expr(e, row, &ctx)?;
344                        &arg_owned
345                    }
346                };
347                let arg2_val = match &spec.arg2 {
348                    None => None,
349                    Some(e) => Some(eval::eval_expr(e, row, &ctx)?),
350                };
351                let order_keys = if spec.order_by.is_empty() {
352                    None
353                } else {
354                    let mut keys = Vec::with_capacity(spec.order_by.len());
355                    for o in &spec.order_by {
356                        keys.push(eval::eval_expr(&o.expr, row, &ctx)?);
357                    }
358                    Some(keys)
359                };
360                if spec.distinct {
361                    encode_key_refs_into(core::slice::from_ref(&arg_ref), &mut dkeybuf);
362                    if entry.1[i].seen.contains(dkeybuf.as_str()) {
363                        continue;
364                    }
365                    entry.1[i].seen.insert(dkeybuf.clone());
366                }
367                update_state(
368                    &mut entry.1[i],
369                    &spec.name,
370                    arg_ref,
371                    arg2_val.as_ref(),
372                    order_keys,
373                )?;
374            }
375            continue;
376        }
377        let group_vals: Vec<Value> = group_exprs
378            .iter()
379            .map(|g| eval::eval_expr(g, row, &ctx))
380            .collect::<Result<_, _>>()?;
381        // v7.17.0 Phase 2.5b — case-insensitive group keying: fold
382        // only the ci columns, and only when any exist. Display
383        // value (`group_vals`) stays original — only the key folds.
384        let key = if ci_positions.is_empty() {
385            encode_key(&group_vals)
386        } else {
387            let mut key_vals = group_vals.clone();
388            for &i in &ci_positions {
389                if let Value::Text(s) = &key_vals[i] {
390                    key_vals[i] = Value::Text(s.to_ascii_lowercase());
391                }
392            }
393            encode_key(&key_vals)
394        };
395        // entry_ref: no per-row key clone on the (dominant) hit path.
396        let entry = match groups.entry_ref(key.as_str()) {
397            hashbrown::hash_map::EntryRef::Occupied(o) => o.into_mut(),
398            hashbrown::hash_map::EntryRef::Vacant(v) => {
399                key_order.push(key.clone());
400                let init: Vec<AggState> =
401                    (0..agg_specs.len()).map(|_| AggState::default()).collect();
402                v.insert((group_vals.clone(), init))
403            }
404        };
405        for (i, spec) in agg_specs.iter().enumerate() {
406            let arg_val = match &spec.arg {
407                None => Value::Bool(true), // count_star: sentinel non-null
408                Some(e) => eval::eval_expr(e, row, &ctx)?,
409            };
410            // v7.17.0 — `string_agg(value, separator)` evaluates the
411            // separator per row but PG treats it as constant; we
412            // pass the per-row value into update_state so a future
413            // varying-separator caller still sees correct output,
414            // even though SPG (like PG) only uses the most recent.
415            let arg2_val = match &spec.arg2 {
416                None => None,
417                Some(e) => Some(eval::eval_expr(e, row, &ctx)?),
418            };
419            // v7.24 (round-16 A) — aggregate-internal ORDER BY:
420            // evaluate the key tuple against the source row.
421            let order_keys = if spec.order_by.is_empty() {
422                None
423            } else {
424                let mut keys = Vec::with_capacity(spec.order_by.len());
425                for o in &spec.order_by {
426                    keys.push(eval::eval_expr(&o.expr, row, &ctx)?);
427                }
428                Some(keys)
429            };
430            // v7.25 (round-17) — DISTINCT: drop repeated inputs
431            // before they reach the accumulator. NULLs flow through
432            // (each aggregate's own NULL rule applies; PG also
433            // treats NULL as a single distinct value for array_agg).
434            if spec.distinct {
435                let key = encode_key(core::slice::from_ref(&arg_val));
436                if !entry.1[i].seen.insert(key) {
437                    continue;
438                }
439            }
440            update_state(
441                &mut entry.1[i],
442                &spec.name,
443                &arg_val,
444                arg2_val.as_ref(),
445                order_keys,
446            )?;
447        }
448    }
449
450    // Build synthetic schema: __grp_0..K then __agg_0..N.
451    let group_types: Vec<DataType> = if rows.is_empty() {
452        // Use Text as a safe stand-in — empty result means schema isn't
453        // observable. Avoids needing to evaluate group exprs on no row.
454        group_exprs.iter().map(|_| DataType::Text).collect()
455    } else {
456        let probe = rows[0];
457        group_exprs
458            .iter()
459            .map(|g| {
460                eval::eval_expr(g, probe, &ctx).map(|v| v.data_type().unwrap_or(DataType::Text))
461            })
462            .collect::<Result<_, _>>()?
463    };
464    let agg_types: Vec<DataType> = agg_specs
465        .iter()
466        .map(|spec| infer_agg_type(spec, schema_cols))
467        .collect();
468    let mut synth_schema: Vec<ColumnSchema> = Vec::new();
469    for (i, ty) in group_types.iter().enumerate() {
470        synth_schema.push(ColumnSchema::new(format!("__grp_{i}"), *ty, true));
471    }
472    for (i, ty) in agg_types.iter().enumerate() {
473        synth_schema.push(ColumnSchema::new(format!("__agg_{i}"), *ty, true));
474    }
475
476    // Materialise synthetic rows.
477    let mut synth_rows: Vec<Row> = Vec::new();
478    for k in &key_order {
479        let (gvals, states) = &groups[k];
480        let mut values: Vec<Value> = Vec::with_capacity(synth_schema.len());
481        values.extend(gvals.iter().cloned());
482        for (i, st) in states.iter().enumerate() {
483            // v7.24 (round-16 A) — order the collected items per the
484            // aggregate-internal ORDER BY before finalize consumes
485            // them.
486            let st_sorted;
487            let st_final: &AggState =
488                if !agg_specs[i].order_by.is_empty() && st.item_keys.len() == st.items.len() {
489                    let mut idx: Vec<usize> = (0..st.items.len()).collect();
490                    let ob = &agg_specs[i].order_by;
491                    idx.sort_by(|&x, &y| {
492                        for (k, o) in ob.iter().enumerate() {
493                            let cmp = crate::order_by_value_cmp(
494                                o.desc,
495                                o.nulls_first,
496                                &st.item_keys[x][k],
497                                &st.item_keys[y][k],
498                            );
499                            if cmp != core::cmp::Ordering::Equal {
500                                return cmp;
501                            }
502                        }
503                        core::cmp::Ordering::Equal
504                    });
505                    let mut sorted = st.clone();
506                    sorted.items = idx.iter().map(|&j| st.items[j].clone()).collect();
507                    st_sorted = sorted;
508                    &st_sorted
509                } else {
510                    st
511                };
512            values.push(finalize(&agg_specs[i].name, st_final));
513        }
514        synth_rows.push(Row::new(values));
515    }
516
517    // Rewrite the user's SELECT items + ORDER BY to reference synthetic
518    // columns. After rewriting, every remaining `Expr::Column` must
519    // resolve against the synthetic schema (i.e. must have been a GROUP
520    // BY expression).
521    let columns: Vec<ColumnSchema> = stmt
522        .items
523        .iter()
524        .map(|item| match item {
525            SelectItem::Wildcard => Err(EvalError::TypeMismatch {
526                detail: "SELECT * with aggregates is not supported".into(),
527            }),
528            SelectItem::Expr { expr, alias } => {
529                let rewritten = rewrite_expr(expr, &group_exprs, &agg_specs);
530                let name = alias.clone().unwrap_or_else(|| expr.to_string());
531                Ok(ColumnSchema::new(
532                    name,
533                    agg_or_group_type(&rewritten, &synth_schema),
534                    true,
535                ))
536            }
537        })
538        .collect::<Result<_, _>>()?;
539
540    // Project per synthetic row. HAVING filters out groups *before*
541    // we keep the projected row — same semantics as PG: HAVING runs
542    // against the aggregated row (so `HAVING count(*) > 1` works) and
543    // sees only group-by'd columns plus aggregate values.
544    let synth_ctx = EvalContext::new(&synth_schema, None);
545    let having_rewritten = stmt
546        .having
547        .as_ref()
548        .map(|h| rewrite_expr(h, &group_exprs, &agg_specs));
549    // v7.30 (phase 3e-1) - rewrite SELECT items ONCE. This ran per
550    // GROUP (23.5k x 9 items of AST cloning = ~48% of the inbox
551    // query in sampled stacks); the rewrite is group-independent.
552    // Stable addresses also let the per-expression subquery plans
553    // (v7.29 3c) hit across groups instead of rebuilding.
554    let items_rewritten: alloc::vec::Vec<Option<Expr>> = stmt
555        .items
556        .iter()
557        .map(|item| match item {
558            SelectItem::Expr { expr, .. } => Some(rewrite_expr(expr, &group_exprs, &agg_specs)),
559            SelectItem::Wildcard => None,
560        })
561        .collect();
562    // v7.31 (perf — PG lesson #1): subquery-bearing select items
563    // deferred to post-LIMIT, when no sort/filter key can observe
564    // them. ORDER BY rewrites are hoisted here so the safety check
565    // and the sort below share one rewrite pass.
566    let order_rewritten: Vec<Expr> = stmt
567        .order_by
568        .iter()
569        .map(|o| rewrite_expr(&o.expr, &group_exprs, &agg_specs))
570        .collect();
571    let defer_enabled = correlated_eval.is_some()
572        && !stmt.distinct
573        && !having_rewritten
574            .as_ref()
575            .is_some_and(crate::expr_has_subquery)
576        && !order_rewritten.iter().any(crate::expr_has_subquery);
577    let deferred: Vec<(usize, Expr)> = if defer_enabled {
578        items_rewritten
579            .iter()
580            .enumerate()
581            .filter_map(|(i, r)| {
582                r.as_ref()
583                    .filter(|e| crate::expr_has_subquery(e))
584                    .map(|e| (i, e.clone()))
585            })
586            .collect()
587    } else {
588        Vec::new()
589    };
590    let mut kept_synth: Vec<Row> = Vec::new();
591    let mut out_rows: Vec<Row> = Vec::new();
592    for srow in synth_rows {
593        if let Some(h) = &having_rewritten {
594            let cond = match correlated_eval {
595                Some(f) if crate::expr_has_subquery(h) => f(h, &srow, &synth_ctx)?,
596                _ => eval::eval_expr(h, &srow, &synth_ctx)?,
597            };
598            if !matches!(cond, Value::Bool(true)) {
599                continue;
600            }
601        }
602        let mut values: Vec<Value> = Vec::with_capacity(columns.len());
603        for (i, rewritten) in items_rewritten.iter().enumerate() {
604            let Some(rewritten) = rewritten else { continue };
605            if deferred.iter().any(|(c, _)| *c == i) {
606                values.push(Value::Null);
607                continue;
608            }
609            values.push(match correlated_eval {
610                Some(f) if crate::expr_has_subquery(rewritten) => f(rewritten, &srow, &synth_ctx)?,
611                _ => eval::eval_expr(rewritten, &srow, &synth_ctx)?,
612            });
613        }
614        kept_synth.push(srow);
615        out_rows.push(Row::new(values));
616    }
617
618    // ORDER BY: evaluate the rewritten order_by against each synth row,
619    // sort, then drop the keys. Limit is applied by the caller.
620    if !stmt.order_by.is_empty() {
621        // v6.4.0 — multi-key ORDER BY on aggregate output. Each key
622        // gets its own rewrite + per-key DESC flag. (Rewrites hoisted
623        // above as `order_rewritten` — shared with the deferral
624        // safety check.)
625        let keys_meta: Vec<(bool, Option<bool>)> = stmt
626            .order_by
627            .iter()
628            .map(|o| (o.desc, o.nulls_first))
629            .collect();
630        // The synth row rides through the sort so deferred exprs can
631        // evaluate against the surviving groups after the caller's
632        // LIMIT truncation.
633        let mut tagged: Vec<(Vec<Value>, Row, Row)> = kept_synth
634            .into_iter()
635            .zip(out_rows)
636            .map(|(s, o)| {
637                let mut keys = Vec::with_capacity(order_rewritten.len());
638                for e in &order_rewritten {
639                    keys.push(match correlated_eval {
640                        Some(f) if crate::expr_has_subquery(e) => f(e, &s, &synth_ctx)?,
641                        _ => eval::eval_expr(e, &s, &synth_ctx)?,
642                    });
643                }
644                Ok::<_, EvalError>((keys, s, o))
645            })
646            .collect::<Result<_, _>>()?;
647        tagged.sort_by(|a, b| {
648            use core::cmp::Ordering;
649            for (i, (ka, kb)) in a.0.iter().zip(b.0.iter()).enumerate() {
650                let (desc, nf) = keys_meta[i];
651                let cmp = crate::order_by_value_cmp(desc, nf, ka, kb);
652                if cmp != Ordering::Equal {
653                    return cmp;
654                }
655            }
656            Ordering::Equal
657        });
658        kept_synth = Vec::with_capacity(tagged.len());
659        out_rows = Vec::with_capacity(tagged.len());
660        for (_, s, o) in tagged {
661            kept_synth.push(s);
662            out_rows.push(o);
663        }
664    }
665
666    let (synth_rows_out, synth_schema_out) = if deferred.is_empty() {
667        (Vec::new(), Vec::new())
668    } else {
669        (kept_synth, synth_schema.clone())
670    };
671    Ok(AggResult {
672        columns,
673        rows: out_rows,
674        deferred,
675        synth_rows: synth_rows_out,
676        synth_schema: synth_schema_out,
677    })
678}
679
680/// v7.17.0 — walk the statement again to validate the positional
681/// arity of every aggregate call site. Done after AST collection
682/// rather than inside `collect_aggregates` so the collector stays
683/// infallible; callers in `run()` can do a single early-error
684/// exit before any per-row work.
685fn validate_agg_arities(stmt: &SelectStatement, _specs: &[AggSpec]) -> Result<(), EvalError> {
686    fn walk(e: &Expr) -> Result<(), EvalError> {
687        if let Expr::FunctionCall { name, args } = e {
688            let lower = name.to_ascii_lowercase();
689            let expected: Option<usize> = match lower.as_str() {
690                "count_star" => Some(0),
691                "count" | "sum" | "avg" | "min" | "max" | "array_agg"
692                // v7.17.0 — boolean aggregates also take exactly
693                // one arg. `every` is an alias normalised inside
694                // collect_aggregates / rewrite_expr.
695                | "bool_and" | "bool_or" | "every" => Some(1),
696                "string_agg" => Some(2),
697                _ => None,
698            };
699            if let Some(want) = expected
700                && args.len() != want
701            {
702                return Err(EvalError::TypeMismatch {
703                    detail: alloc::format!("{lower}() takes {want} arg(s), got {}", args.len()),
704                });
705            }
706            for a in args {
707                walk(a)?;
708            }
709        } else if let Expr::Binary { lhs, rhs, .. } = e {
710            walk(lhs)?;
711            walk(rhs)?;
712        } else if let Expr::Unary { expr, .. }
713        | Expr::Cast { expr, .. }
714        | Expr::IsNull { expr, .. } = e
715        {
716            walk(expr)?;
717        }
718        Ok(())
719    }
720    for item in &stmt.items {
721        if let SelectItem::Expr { expr, .. } = item {
722            walk(expr)?;
723        }
724    }
725    for o in &stmt.order_by {
726        walk(&o.expr)?;
727    }
728    if let Some(h) = &stmt.having {
729        walk(h)?;
730    }
731    Ok(())
732}
733
734fn collect_aggregates(e: &Expr, out: &mut Vec<AggSpec>) {
735    match e {
736        // v7.24 (round-16 A) — ordered aggregate: register the inner
737        // call's spec with the ordering attached.
738        Expr::AggregateOrdered {
739            call,
740            order_by,
741            distinct,
742        } => {
743            if let Expr::FunctionCall { name, args } = call.as_ref() {
744                let lower = name.to_ascii_lowercase();
745                if is_aggregate_name(&lower) {
746                    let canonical = if lower == "every" {
747                        "bool_and".to_string()
748                    } else {
749                        lower
750                    };
751                    let spec = AggSpec {
752                        name: canonical,
753                        arg: args.first().cloned(),
754                        arg2: if name.eq_ignore_ascii_case("string_agg") {
755                            args.get(1).cloned()
756                        } else {
757                            None
758                        },
759                        distinct: *distinct,
760                        order_by: order_by.clone(),
761                    };
762                    if !out.iter().any(|s| {
763                        s.name == spec.name
764                            && s.arg == spec.arg
765                            && s.arg2 == spec.arg2
766                            && s.distinct == spec.distinct
767                            && s.order_by == spec.order_by
768                    }) {
769                        out.push(spec);
770                    }
771                    return;
772                }
773            }
774            collect_aggregates(call, out);
775            for o in order_by {
776                collect_aggregates(&o.expr, out);
777            }
778        }
779        Expr::FunctionCall { name, args } => {
780            let lower = name.to_ascii_lowercase();
781            if is_aggregate_name(&lower) {
782                let arg = if lower == "count_star" {
783                    None
784                } else {
785                    args.first().cloned()
786                };
787                // v7.17.0 — second positional arg for
788                // `string_agg(value, separator)`. Everything else
789                // ignores it.
790                let arg2 = if lower == "string_agg" {
791                    args.get(1).cloned()
792                } else {
793                    None
794                };
795                // v7.17.0 — `every` is the SQL-standard alias for
796                // `bool_and`; collapse at collection time so
797                // update_state / finalize need only one arm.
798                let canonical = if lower == "every" {
799                    "bool_and".to_string()
800                } else {
801                    lower
802                };
803                let spec = AggSpec {
804                    name: canonical,
805                    arg: arg.clone(),
806                    arg2: arg2.clone(),
807                    distinct: false,
808                    order_by: Vec::new(),
809                };
810                if !out.iter().any(|s| {
811                    s.name == spec.name
812                        && s.arg == spec.arg
813                        && s.arg2 == spec.arg2
814                        && !s.distinct
815                        && s.order_by == spec.order_by
816                }) {
817                    out.push(spec);
818                }
819                // Don't recurse into the arg — nested aggregates are
820                // illegal in standard SQL.
821            } else {
822                for a in args {
823                    collect_aggregates(a, out);
824                }
825            }
826        }
827        Expr::Binary { lhs, rhs, .. } => {
828            collect_aggregates(lhs, out);
829            collect_aggregates(rhs, out);
830        }
831        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
832            collect_aggregates(expr, out);
833        }
834        Expr::Like { expr, pattern, .. } => {
835            collect_aggregates(expr, out);
836            collect_aggregates(pattern, out);
837        }
838        Expr::InList { expr, list, .. } => {
839            collect_aggregates(expr, out);
840            for item in list {
841                collect_aggregates(item, out);
842            }
843        }
844        Expr::Extract { source, .. } => collect_aggregates(source, out),
845        // v4.10 subquery + v4.12 window / Literal / Column —
846        // non-recursing leaves for the aggregate collector.
847        Expr::ScalarSubquery(_)
848        | Expr::Exists { .. }
849        | Expr::InSubquery { .. }
850        | Expr::WindowFunction { .. }
851        | Expr::Literal(_)
852        | Expr::Placeholder(_)
853        | Expr::Column(_) => {}
854        // v7.10.10 — recurse into array constructor children +
855        // subscript / ANY/ALL operands.
856        Expr::Array(items) => {
857            for elem in items {
858                collect_aggregates(elem, out);
859            }
860        }
861        Expr::ArraySubscript { target, index } => {
862            collect_aggregates(target, out);
863            collect_aggregates(index, out);
864        }
865        Expr::AnyAll { expr, array, .. } => {
866            collect_aggregates(expr, out);
867            collect_aggregates(array, out);
868        }
869        Expr::Case {
870            operand,
871            branches,
872            else_branch,
873        } => {
874            if let Some(o) = operand {
875                collect_aggregates(o, out);
876            }
877            for (w, t) in branches {
878                collect_aggregates(w, out);
879                collect_aggregates(t, out);
880            }
881            if let Some(e) = else_branch {
882                collect_aggregates(e, out);
883            }
884        }
885    }
886}
887
888fn update_state(
889    st: &mut AggState,
890    name: &str,
891    v: &Value,
892    arg2: Option<&Value>,
893    order_keys: Option<Vec<Value>>,
894) -> Result<(), EvalError> {
895    let is_null = matches!(v, Value::Null);
896    match name {
897        "count_star" => st.count += 1,
898        "count" => {
899            if !is_null {
900                st.count += 1;
901            }
902        }
903        "sum" | "avg" => {
904            if is_null {
905                return Ok(());
906            }
907            st.count += 1;
908            match v {
909                Value::Int(n) => st.sum_int += i64::from(*n),
910                Value::BigInt(n) => st.sum_int += *n,
911                Value::Float(x) => {
912                    st.use_float = true;
913                    st.sum_float += *x;
914                }
915                other => {
916                    return Err(EvalError::TypeMismatch {
917                        detail: format!("sum/avg need numeric, got {:?}", other.data_type()),
918                    });
919                }
920            }
921        }
922        "min" => {
923            if is_null {
924                return Ok(());
925            }
926            match &st.extreme {
927                None => st.extreme = Some(v.clone()),
928                Some(cur) => {
929                    if value_cmp(v, cur) == core::cmp::Ordering::Less {
930                        st.extreme = Some(v.clone());
931                    }
932                }
933            }
934        }
935        "max" => {
936            if is_null {
937                return Ok(());
938            }
939            match &st.extreme {
940                None => st.extreme = Some(v.clone()),
941                Some(cur) => {
942                    if value_cmp(v, cur) == core::cmp::Ordering::Greater {
943                        st.extreme = Some(v.clone());
944                    }
945                }
946            }
947        }
948        // v7.17.0 — string_agg(value, separator). NULL value is
949        // skipped (PG aggregate-skip-null). Separator captured
950        // from the latest row that flows through; matches PG's
951        // semantics of evaluating the separator per row but using
952        // the last value at finalize time (in practice it's
953        // constant). count is bumped so we can distinguish "empty
954        // group → NULL" from "all-NULL group → NULL".
955        "string_agg" => {
956            if let Some(sep) = arg2
957                && let Value::Text(s) = sep
958            {
959                st.separator = Some(s.clone());
960            }
961            if is_null {
962                return Ok(());
963            }
964            if let Value::Text(s) = v {
965                st.items.push(Value::Text(s.clone()));
966                if let Some(k) = order_keys {
967                    st.item_keys.push(k);
968                }
969                st.count += 1;
970            } else {
971                return Err(EvalError::TypeMismatch {
972                    detail: format!("string_agg requires text value, got {:?}", v.data_type()),
973                });
974            }
975        }
976        // v7.17.0 — array_agg(value). Unlike string_agg, NULL
977        // elements are KEPT in the array (PG behaviour); the
978        // result is NULL only when ZERO rows fed in. Element type
979        // is locked from the first row's value type; subsequent
980        // rows must match (PG also rejects mixed-type array_agg).
981        "array_agg" => {
982            st.items.push(v.clone());
983            if let Some(k) = order_keys {
984                st.item_keys.push(k);
985            }
986            st.count += 1;
987        }
988        // v7.17.0 — bool_and(p): TRUE iff every non-NULL input is
989        // TRUE. NULL skipped; running accumulator stays at TRUE
990        // until the first non-NULL FALSE.
991        "bool_and" => {
992            if is_null {
993                return Ok(());
994            }
995            let b = match v {
996                Value::Bool(b) => *b,
997                other => {
998                    return Err(EvalError::TypeMismatch {
999                        detail: format!("bool_and requires bool, got {:?}", other.data_type()),
1000                    });
1001                }
1002            };
1003            st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc && b));
1004        }
1005        // v7.17.0 — bool_or(p): TRUE iff any non-NULL input is
1006        // TRUE. NULL skipped.
1007        "bool_or" => {
1008            if is_null {
1009                return Ok(());
1010            }
1011            let b = match v {
1012                Value::Bool(b) => *b,
1013                other => {
1014                    return Err(EvalError::TypeMismatch {
1015                        detail: format!("bool_or requires bool, got {:?}", other.data_type()),
1016                    });
1017                }
1018            };
1019            st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc || b));
1020        }
1021        _ => unreachable!("non-aggregate {name} in update_state"),
1022    }
1023    Ok(())
1024}
1025
1026#[allow(clippy::cast_precision_loss)]
1027fn finalize(name: &str, st: &AggState) -> Value {
1028    match name {
1029        "count" | "count_star" => Value::BigInt(st.count),
1030        "sum" => {
1031            if st.count == 0 {
1032                Value::Null
1033            } else if st.use_float {
1034                Value::Float(st.sum_float + (st.sum_int as f64))
1035            } else {
1036                Value::BigInt(st.sum_int)
1037            }
1038        }
1039        "avg" => {
1040            if st.count == 0 {
1041                Value::Null
1042            } else {
1043                let total = if st.use_float {
1044                    st.sum_float + (st.sum_int as f64)
1045                } else {
1046                    st.sum_int as f64
1047                };
1048                Value::Float(total / (st.count as f64))
1049            }
1050        }
1051        "min" | "max" => st.extreme.clone().unwrap_or(Value::Null),
1052        // v7.17.0 — string_agg: join all collected text items with
1053        // the captured separator. Empty / all-NULL group → NULL
1054        // (PG semantics).
1055        "string_agg" => {
1056            if st.items.is_empty() {
1057                return Value::Null;
1058            }
1059            let sep = st.separator.clone().unwrap_or_default();
1060            let mut out = String::new();
1061            for (i, item) in st.items.iter().enumerate() {
1062                if i > 0 {
1063                    out.push_str(&sep);
1064                }
1065                if let Value::Text(s) = item {
1066                    out.push_str(s);
1067                }
1068            }
1069            Value::Text(out)
1070        }
1071        // v7.17.0 — array_agg: collect into a typed array. NULL
1072        // elements are preserved per PG. Result type is decided
1073        // by the first non-NULL element seen (or Text fallback
1074        // when the whole group is NULL — PG would surface the
1075        // declared input type, but SPG hasn't yet wired the
1076        // aggregate's static input-type from `describe`).
1077        "array_agg" => {
1078            if st.items.is_empty() {
1079                return Value::Null;
1080            }
1081            let probe = st.items.iter().find(|v| !v.is_null());
1082            match probe.and_then(spg_storage::Value::data_type) {
1083                Some(DataType::Int) | Some(DataType::SmallInt) => {
1084                    let items: Vec<Option<i32>> = st
1085                        .items
1086                        .iter()
1087                        .map(|v| match v {
1088                            Value::Int(n) => Some(*n),
1089                            Value::SmallInt(n) => Some(i32::from(*n)),
1090                            _ => None,
1091                        })
1092                        .collect();
1093                    Value::IntArray(items)
1094                }
1095                Some(DataType::BigInt) => {
1096                    let items: Vec<Option<i64>> = st
1097                        .items
1098                        .iter()
1099                        .map(|v| match v {
1100                            Value::BigInt(n) => Some(*n),
1101                            _ => None,
1102                        })
1103                        .collect();
1104                    Value::BigIntArray(items)
1105                }
1106                _ => {
1107                    let items: Vec<Option<String>> = st
1108                        .items
1109                        .iter()
1110                        .map(|v| match v {
1111                            Value::Text(s) => Some(s.clone()),
1112                            Value::Null => None,
1113                            other => Some(format!("{other:?}")),
1114                        })
1115                        .collect();
1116                    Value::TextArray(items)
1117                }
1118            }
1119        }
1120        // v7.17.0 — bool_and / bool_or finalize: lazy-init pattern
1121        // means `None` is exactly "empty group or all-NULL", which
1122        // PG surfaces as SQL NULL.
1123        "bool_and" | "bool_or" => st.bool_acc.map_or(Value::Null, Value::Bool),
1124        _ => unreachable!(),
1125    }
1126}
1127
1128fn infer_agg_type(spec: &AggSpec, schema_cols: &[ColumnSchema]) -> DataType {
1129    // v7.26 (round-20 C) — the argument's statically-derived shape
1130    // types MIN/MAX/SUM/array_agg properly; RowDescription used to
1131    // report TEXT for these, breaking every sqlx typed decode.
1132    let arg_ty = spec
1133        .arg
1134        .as_ref()
1135        .and_then(|a| crate::describe::describe_expr(a, schema_cols))
1136        .map(|shape| shape.ty);
1137    match spec.name.as_str() {
1138        "count" | "count_star" => DataType::BigInt,
1139        "sum" => match arg_ty {
1140            Some(DataType::Float) => DataType::Float,
1141            _ => DataType::BigInt,
1142        },
1143        "avg" => DataType::Float,
1144        // v7.17.0 — string_agg always returns TEXT.
1145        "string_agg" => DataType::Text,
1146        "array_agg" => match arg_ty {
1147            Some(DataType::Int | DataType::SmallInt) => DataType::IntArray,
1148            Some(DataType::BigInt) => DataType::BigIntArray,
1149            _ => DataType::TextArray,
1150        },
1151        // v7.17.0 — boolean aggregates always return BOOL (nullable
1152        // — empty / all-NULL group → NULL).
1153        "bool_and" | "bool_or" => DataType::Bool,
1154        // min/max and anything pass-through: the argument's shape.
1155        _ => arg_ty.unwrap_or(DataType::Text),
1156    }
1157}
1158
1159fn agg_or_group_type(e: &Expr, synth: &[ColumnSchema]) -> DataType {
1160    if let Expr::Column(c) = e
1161        && let Some(s) = synth.iter().find(|s| s.name == c.name)
1162    {
1163        return s.ty;
1164    }
1165    // v7.26 (round-20 C) — compound expressions over aggregates
1166    // (COALESCE(BOOL_OR(…), false), (array_agg(…))[1], CASE …)
1167    // derive their shape statically against the synth schema; the
1168    // old Text fallback broke sqlx typed decodes of exactly these
1169    // columns.
1170    crate::describe::describe_expr(e, synth)
1171        .map(|shape| shape.ty)
1172        .unwrap_or(DataType::Text)
1173}
1174
1175fn rewrite_expr(e: &Expr, group_exprs: &[Expr], aggs: &[AggSpec]) -> Expr {
1176    // v7.24 (round-16 A) — ordered aggregate: match on the inner
1177    // call PLUS the ordering keys.
1178    if let Expr::AggregateOrdered {
1179        call,
1180        order_by,
1181        distinct,
1182    } = e
1183        && let Expr::FunctionCall { name, args } = call.as_ref()
1184    {
1185        let lower = name.to_ascii_lowercase();
1186        if is_aggregate_name(&lower) {
1187            let canonical: &str = if lower == "every" { "bool_and" } else { &lower };
1188            let arg = args.first().cloned();
1189            let arg2 = if lower == "string_agg" {
1190                args.get(1).cloned()
1191            } else {
1192                None
1193            };
1194            for (i, spec) in aggs.iter().enumerate() {
1195                if spec.name == canonical
1196                    && spec.arg == arg
1197                    && spec.arg2 == arg2
1198                    && spec.distinct == *distinct
1199                    && spec.order_by == *order_by
1200                {
1201                    return Expr::Column(spg_sql::ast::ColumnName {
1202                        qualifier: None,
1203                        name: format!("__agg_{i}"),
1204                    });
1205                }
1206            }
1207        }
1208    }
1209    // Match aggregate FunctionCalls first — they sit outside group_by.
1210    if let Expr::FunctionCall { name, args } = e {
1211        let lower = name.to_ascii_lowercase();
1212        if is_aggregate_name(&lower) {
1213            let arg = if lower == "count_star" {
1214                None
1215            } else {
1216                args.first().cloned()
1217            };
1218            // v7.17.0 — match the spec we registered for
1219            // string_agg(value, separator) on the full pair.
1220            let arg2 = if lower == "string_agg" {
1221                args.get(1).cloned()
1222            } else {
1223                None
1224            };
1225            // v7.17.0 — `every` collapses into `bool_and` at
1226            // collection; mirror that here so the rewrite finds
1227            // the matching synth column.
1228            let canonical: &str = if lower == "every" {
1229                "bool_and"
1230            } else {
1231                lower.as_str()
1232            };
1233            for (i, spec) in aggs.iter().enumerate() {
1234                if spec.name == canonical
1235                    && spec.arg == arg
1236                    && spec.arg2 == arg2
1237                    && !spec.distinct
1238                    && spec.order_by.is_empty()
1239                {
1240                    return Expr::Column(spg_sql::ast::ColumnName {
1241                        qualifier: None,
1242                        name: format!("__agg_{i}"),
1243                    });
1244                }
1245            }
1246        }
1247    }
1248    // Match a group_by expression by AST equality.
1249    for (i, g) in group_exprs.iter().enumerate() {
1250        if g == e {
1251            return Expr::Column(spg_sql::ast::ColumnName {
1252                qualifier: None,
1253                name: format!("__grp_{i}"),
1254            });
1255        }
1256    }
1257    // Recurse into children.
1258    match e {
1259        Expr::AggregateOrdered {
1260            call,
1261            order_by,
1262            distinct,
1263        } => Expr::AggregateOrdered {
1264            call: Box::new(rewrite_expr(call, group_exprs, aggs)),
1265            distinct: *distinct,
1266            order_by: order_by
1267                .iter()
1268                .map(|o| spg_sql::ast::OrderBy {
1269                    expr: rewrite_expr(&o.expr, group_exprs, aggs),
1270                    desc: o.desc,
1271                    nulls_first: o.nulls_first,
1272                })
1273                .collect(),
1274        },
1275        Expr::Binary { lhs, op, rhs } => Expr::Binary {
1276            lhs: Box::new(rewrite_expr(lhs, group_exprs, aggs)),
1277            op: *op,
1278            rhs: Box::new(rewrite_expr(rhs, group_exprs, aggs)),
1279        },
1280        Expr::Unary { op, expr } => Expr::Unary {
1281            op: *op,
1282            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1283        },
1284        Expr::Cast { expr, target } => Expr::Cast {
1285            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1286            target: *target,
1287        },
1288        Expr::IsNull { expr, negated } => Expr::IsNull {
1289            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1290            negated: *negated,
1291        },
1292        Expr::FunctionCall { name, args } => Expr::FunctionCall {
1293            name: name.clone(),
1294            args: args
1295                .iter()
1296                .map(|a| rewrite_expr(a, group_exprs, aggs))
1297                .collect(),
1298        },
1299        Expr::Like {
1300            expr,
1301            pattern,
1302            negated,
1303            case_insensitive,
1304        } => Expr::Like {
1305            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1306            pattern: Box::new(rewrite_expr(pattern, group_exprs, aggs)),
1307            negated: *negated,
1308            case_insensitive: *case_insensitive,
1309        },
1310        Expr::Extract { field, source } => Expr::Extract {
1311            field: *field,
1312            source: Box::new(rewrite_expr(source, group_exprs, aggs)),
1313        },
1314        // v7.25.2 (round-19 A) — subquery nodes: rewrite group-key
1315        // references INSIDE the body to `__grp_N` so the correlated
1316        // resolver can substitute them against the synthesised group
1317        // row (aggs are NOT matched inside the body — a COUNT in the
1318        // subquery is the subquery's own aggregate).
1319        Expr::ScalarSubquery(s) => {
1320            Expr::ScalarSubquery(Box::new(rewrite_group_keys_in_select(s, group_exprs)))
1321        }
1322        Expr::Exists { subquery, negated } => Expr::Exists {
1323            subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
1324            negated: *negated,
1325        },
1326        Expr::InSubquery {
1327            expr,
1328            subquery,
1329            negated,
1330        } => Expr::InSubquery {
1331            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1332            subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
1333            negated: *negated,
1334        },
1335        // v4.12 window / Literal / Column — clone-pass (these don't
1336        // participate in aggregate rewrite).
1337        Expr::WindowFunction { .. } | Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => {
1338            e.clone()
1339        }
1340        // v7.10.10 — recurse children for array nodes.
1341        Expr::Array(items) => Expr::Array(
1342            items
1343                .iter()
1344                .map(|elem| rewrite_expr(elem, group_exprs, aggs))
1345                .collect(),
1346        ),
1347        Expr::ArraySubscript { target, index } => Expr::ArraySubscript {
1348            target: Box::new(rewrite_expr(target, group_exprs, aggs)),
1349            index: Box::new(rewrite_expr(index, group_exprs, aggs)),
1350        },
1351        Expr::AnyAll {
1352            expr,
1353            op,
1354            array,
1355            is_any,
1356        } => Expr::AnyAll {
1357            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1358            op: *op,
1359            array: Box::new(rewrite_expr(array, group_exprs, aggs)),
1360            is_any: *is_any,
1361        },
1362        Expr::InList {
1363            expr,
1364            list,
1365            negated,
1366        } => Expr::InList {
1367            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1368            list: list
1369                .iter()
1370                .map(|item| rewrite_expr(item, group_exprs, aggs))
1371                .collect(),
1372            negated: *negated,
1373        },
1374        Expr::Case {
1375            operand,
1376            branches,
1377            else_branch,
1378        } => Expr::Case {
1379            operand: operand
1380                .as_deref()
1381                .map(|o| Box::new(rewrite_expr(o, group_exprs, aggs))),
1382            branches: branches
1383                .iter()
1384                .map(|(w, t)| {
1385                    (
1386                        rewrite_expr(w, group_exprs, aggs),
1387                        rewrite_expr(t, group_exprs, aggs),
1388                    )
1389                })
1390                .collect(),
1391            else_branch: else_branch
1392                .as_deref()
1393                .map(|e| Box::new(rewrite_expr(e, group_exprs, aggs))),
1394        },
1395    }
1396}
1397
1398/// v7.25.2 (round-19 A) — rewrite group-key references inside a
1399/// subquery body to `__grp_N` synthetic columns (aggregates are
1400/// not touched: empty spec list). Runs through the canonical
1401/// Select walker so every expression slot is covered.
1402fn rewrite_group_keys_in_select(
1403    s: &spg_sql::ast::SelectStatement,
1404    group_exprs: &[Expr],
1405) -> spg_sql::ast::SelectStatement {
1406    let mut out = s.clone();
1407    let _ = crate::walk_select_exprs_mut(&mut out, &mut |e| {
1408        *e = rewrite_expr(e, group_exprs, &[]);
1409        Ok(())
1410    });
1411    out
1412}
1413
1414/// Canonical string key for a tuple of group values. Used as map key.
1415/// Per-value group-key encoding (shared by owned and borrowed paths).
1416fn encode_one(out: &mut String, v: &Value) {
1417    match v {
1418        Value::Null => out.push_str("N|"),
1419        Value::SmallInt(n) => {
1420            out.push('s');
1421            out.push_str(&n.to_string());
1422            out.push('|');
1423        }
1424        Value::Int(n) => {
1425            out.push('I');
1426            out.push_str(&n.to_string());
1427            out.push('|');
1428        }
1429        Value::BigInt(n) => {
1430            out.push('B');
1431            out.push_str(&n.to_string());
1432            out.push('|');
1433        }
1434        Value::Float(x) => {
1435            out.push('F');
1436            out.push_str(&x.to_string());
1437            out.push('|');
1438        }
1439        Value::Bool(b) => {
1440            out.push(if *b { 'T' } else { 'f' });
1441            out.push('|');
1442        }
1443        Value::Text(s) => {
1444            out.push('S');
1445            out.push_str(s);
1446            out.push('|');
1447        }
1448        Value::Vector(v) => {
1449            out.push('V');
1450            for x in v {
1451                out.push_str(&x.to_string());
1452                out.push(',');
1453            }
1454            out.push('|');
1455        }
1456        // v6.0.1: GROUP BY on a `VECTOR(N) USING SQ8` column.
1457        // Two cells with byte-identical `(min, max, bytes)`
1458        // share the same group; equivalence is byte-equality
1459        // (same as f32 grouping today — neither path tries to
1460        // normalise nan/-0).
1461        Value::Sq8Vector(q) => {
1462            out.push('Q');
1463            out.push_str(&q.min.to_string());
1464            out.push('@');
1465            out.push_str(&q.max.to_string());
1466            out.push(':');
1467            for b in &q.bytes {
1468                out.push_str(&b.to_string());
1469                out.push(',');
1470            }
1471            out.push('|');
1472        }
1473        // v6.0.3: GROUP BY on a `VECTOR(N) USING HALF` column.
1474        // Byte-equality over the raw u16 bits; matches the SQ8
1475        // path's byte-key model.
1476        Value::HalfVector(h) => {
1477            out.push('H');
1478            for b in &h.bytes {
1479                out.push_str(&b.to_string());
1480                out.push(',');
1481            }
1482            out.push('|');
1483        }
1484        Value::Numeric { scaled, scale } => {
1485            out.push('D');
1486            out.push_str(&scaled.to_string());
1487            out.push('@');
1488            out.push_str(&scale.to_string());
1489            out.push('|');
1490        }
1491        Value::Date(d) => {
1492            out.push('d');
1493            out.push_str(&d.to_string());
1494            out.push('|');
1495        }
1496        Value::Timestamp(t) => {
1497            out.push('t');
1498            out.push_str(&t.to_string());
1499            out.push('|');
1500        }
1501        Value::Interval { months, micros } => {
1502            out.push('i');
1503            out.push_str(&months.to_string());
1504            out.push('m');
1505            out.push_str(&micros.to_string());
1506            out.push('|');
1507        }
1508        Value::Json(s) => {
1509            out.push('j');
1510            out.push_str(s);
1511            out.push('|');
1512        }
1513        // v7.5.0 — Value is #[non_exhaustive] for downstream
1514        // forward-compat. Any future variant lacking explicit
1515        // handling here will share a debug-derived group key,
1516        // which is observably wrong but won't crash.
1517        _ => {
1518            out.push('?');
1519            out.push_str(&format!("{v:?}"));
1520            out.push('|');
1521        }
1522    }
1523}
1524
1525/// v7.30 (perf campaign) - encode from borrowed cells without
1526/// materialising an owned Vec<Value> first.
1527pub(crate) fn encode_key_refs(vals: &[&Value]) -> String {
1528    let mut out = String::new();
1529    for v in vals {
1530        encode_one(&mut out, v);
1531    }
1532    out
1533}
1534
1535/// v7.31 (perf 3e) — encode into a caller-owned scratch buffer.
1536/// The per-row key paths (group hash, DISTINCT set, join build/
1537/// probe) ran 24k+ String allocations per query through the
1538/// allocator just to LOOK UP a map; the scratch form allocates
1539/// only when a map actually has to take ownership (vacant insert).
1540pub(crate) fn encode_key_refs_into(vals: &[&Value], out: &mut String) {
1541    out.clear();
1542    for v in vals {
1543        encode_one(out, v);
1544    }
1545}
1546
1547pub(crate) fn encode_key(vals: &[Value]) -> String {
1548    let mut out = String::new();
1549    for v in vals {
1550        encode_one(&mut out, v);
1551    }
1552    out
1553}
1554
1555#[allow(clippy::cast_precision_loss)]
1556fn value_cmp(a: &Value, b: &Value) -> core::cmp::Ordering {
1557    use core::cmp::Ordering::Equal;
1558    match (a, b) {
1559        (Value::Null, Value::Null) => Equal,
1560        (Value::Null, _) => core::cmp::Ordering::Greater, // NULLs last
1561        (_, Value::Null) => core::cmp::Ordering::Less,
1562        (Value::Int(x), Value::Int(y)) => x.cmp(y),
1563        (Value::BigInt(x), Value::BigInt(y)) => x.cmp(y),
1564        (Value::Int(x), Value::BigInt(y)) => i64::from(*x).cmp(y),
1565        (Value::BigInt(x), Value::Int(y)) => x.cmp(&i64::from(*y)),
1566        (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(Equal),
1567        (Value::Int(x), Value::Float(y)) => f64::from(*x).partial_cmp(y).unwrap_or(Equal),
1568        (Value::Float(x), Value::Int(y)) => x.partial_cmp(&f64::from(*y)).unwrap_or(Equal),
1569        (Value::BigInt(x), Value::Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Equal),
1570        (Value::Float(x), Value::BigInt(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Equal),
1571        (Value::Text(x), Value::Text(y)) => x.cmp(y),
1572        (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
1573        _ => Equal,
1574    }
1575}