Skip to main content

spg_engine/
aggregate.rs

1//! Aggregate executor.
2//!
3//! Handles `SELECT … <aggs> … [GROUP BY …]` queries. The planning strategy
4//! is straightforward:
5//!
6//! 1. Walk the SELECT (and ORDER BY) expressions to find every aggregate
7//!    function call. Dedupe by AST equality and assign each `__agg_<i>`.
8//! 2. Same for every `GROUP BY` expression: assign `__grp_<j>`.
9//! 3. Stream the WHERE-filtered rows, group by the tuple of GROUP BY
10//!    values, and update per-group aggregate state.
11//! 4. Materialise a synthetic per-group row containing
12//!    `[__grp_0..__grp_K, __agg_0..__agg_N]` and rewrite the user's
13//!    SELECT / ORDER BY expressions to reference those synthetic columns
14//!    instead of the originals.
15//! 5. Evaluate the rewritten expressions against the synthetic schema and
16//!    emit results.
17//!
18//! v1.8 implements `count(*)`, `count(expr)`, `sum`, `min`, `max`, `avg`.
19//! NULL semantics follow PG: aggregates skip NULL inputs (except
20//! `count(*)`, which counts rows). `sum(int)` widens to `BigInt`;
21//! `avg(int|bigint)` returns `Float`.
22
23use alloc::boxed::Box;
24use alloc::collections::BTreeSet;
25use alloc::format;
26use alloc::string::{String, ToString};
27use alloc::vec::Vec;
28
29use spg_sql::ast::{Expr, SelectItem, SelectStatement};
30use spg_storage::{ColumnSchema, DataType, Row, Value};
31
32use crate::eval::{self, EvalContext, EvalError};
33
34/// True if this statement should go through the aggregate path.
35pub fn uses_aggregate(stmt: &SelectStatement) -> bool {
36    if stmt.group_by.is_some() || stmt.having.is_some() {
37        return true;
38    }
39    for item in &stmt.items {
40        if let SelectItem::Expr { expr, .. } = item
41            && contains_aggregate(expr)
42        {
43            return true;
44        }
45    }
46    for o in &stmt.order_by {
47        if contains_aggregate(&o.expr) {
48            return true;
49        }
50    }
51    if let Some(h) = &stmt.having
52        && contains_aggregate(h)
53    {
54        return true;
55    }
56    false
57}
58
59pub fn contains_aggregate(e: &Expr) -> bool {
60    match e {
61        Expr::FunctionCall { name, args } => {
62            is_aggregate_name(name) || args.iter().any(contains_aggregate)
63        }
64        Expr::AggregateOrdered { .. } => true,
65        Expr::Binary { lhs, rhs, .. } => contains_aggregate(lhs) || contains_aggregate(rhs),
66        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
67            contains_aggregate(expr)
68        }
69        Expr::Like { expr, pattern, .. } => contains_aggregate(expr) || contains_aggregate(pattern),
70        Expr::Extract { source, .. } => contains_aggregate(source),
71        // v4.10 subqueries + v4.12 window functions / Literal /
72        // Column — all non-aggregate leaves from the regular
73        // aggregate planner's POV. Window-bearing projections are
74        // routed to exec_select_with_window before this runs.
75        Expr::ScalarSubquery(_)
76        | Expr::Exists { .. }
77        | Expr::InSubquery { .. }
78        | Expr::WindowFunction { .. }
79        | Expr::Literal(_)
80        | Expr::Placeholder(_)
81        | Expr::Column(_) => false,
82        // v7.10.10 — recurse into array constructor / subscript /
83        // ANY/ALL children. Aggregates inside `ARRAY[SUM(x)]` are
84        // valid PG and must be detected here.
85        Expr::Array(items) => items.iter().any(contains_aggregate),
86        Expr::ArraySubscript { target, index } => {
87            contains_aggregate(target) || contains_aggregate(index)
88        }
89        Expr::AnyAll { expr, array, .. } => contains_aggregate(expr) || contains_aggregate(array),
90        Expr::InList { expr, list, .. } => {
91            contains_aggregate(expr) || list.iter().any(contains_aggregate)
92        }
93        // v7.13.0 — CASE WHEN … END. Recurse into operand,
94        // every (WHEN, THEN) pair, and the ELSE branch.
95        Expr::Case {
96            operand,
97            branches,
98            else_branch,
99        } => {
100            operand.as_deref().is_some_and(contains_aggregate)
101                || branches
102                    .iter()
103                    .any(|(w, t)| contains_aggregate(w) || contains_aggregate(t))
104                || else_branch.as_deref().is_some_and(contains_aggregate)
105        }
106    }
107}
108
109pub fn is_aggregate_name(name: &str) -> bool {
110    matches!(
111        name.to_ascii_lowercase().as_str(),
112        "count"
113            | "count_star"
114            | "sum"
115            | "min"
116            | "max"
117            | "avg"
118            // v7.17.0 — variadic / collection aggregates. ORM
119            // reports (Hibernate / Rails / Django) emit these in
120            // GROUP BY rollups; pre-7.17 SPG hit "unknown
121            // aggregate".
122            | "string_agg"
123            | "array_agg"
124            // v7.17.0 — boolean aggregates. `every` is SQL-standard
125            // alias for `bool_and`.
126            | "bool_and"
127            | "bool_or"
128            | "every"
129    )
130}
131
132/// Per-aggregate running state.
133#[derive(Debug, Default, Clone)]
134struct AggState {
135    count: i64,
136    sum_int: i64,
137    sum_float: f64,
138    extreme: Option<Value>,
139    use_float: bool,
140    /// v7.17.0 — running collection for string_agg / array_agg.
141    /// Each entry is one row's contribution (NULL preserved as
142    /// `Value::Null`; string_agg's finalize step drops them, but
143    /// array_agg keeps them). Pushing in insertion order matches
144    /// PG behaviour when no `ORDER BY` is given inside the
145    /// aggregate call.
146    items: Vec<Value>,
147    /// v7.25 (round-17) — per-group dedupe set for DISTINCT
148    /// aggregates (encoded values; NULLs never reach it because
149    /// the caller's skip runs after the per-aggregate NULL rules).
150    seen: BTreeSet<String>,
151    /// v7.24 (round-16 A) — per-item ORDER BY key tuples, parallel
152    /// to `items` (pushed under the same skip/keep conditions).
153    /// Empty when the aggregate carries no internal ordering.
154    item_keys: Vec<Vec<Value>>,
155    /// v7.17.0 — captured separator for string_agg. PG accepts a
156    /// non-constant separator expression but in practice every
157    /// caller passes a literal; the engine snapshots the last
158    /// non-NULL text it sees, which matches PG's "use the latest
159    /// row's value" behaviour.
160    separator: Option<String>,
161    /// v7.17.0 — running boolean accumulator for bool_and /
162    /// bool_or / every. `None` until the first non-NULL input;
163    /// at finalize None → SQL NULL.
164    bool_acc: Option<bool>,
165}
166
167#[derive(Debug, Clone)]
168struct AggSpec {
169    name: String, // lowercased
170    /// First argument (value expression) for every aggregate
171    /// except `count(*)`. `None` for `count_star`.
172    arg: Option<Expr>,
173    /// v7.17.0 — second argument. Only `string_agg(value, sep)`
174    /// uses it today. `None` for every other aggregate (or for
175    /// `array_agg`, which is single-arg). Carried in the spec so
176    /// per-row evaluation can re-use the same separator
177    /// expression across calls.
178    arg2: Option<Expr>,
179    /// v7.25 (round-17) — `COUNT(DISTINCT x)` & friends: dedupe
180    /// the input stream per group before accumulation.
181    distinct: bool,
182    /// v7.24 (round-16 A) — aggregate-internal ORDER BY keys
183    /// (`array_agg(x ORDER BY y DESC NULLS LAST)`). Empty for the
184    /// plain form. Only the collection aggregates honour it;
185    /// other aggregates are order-insensitive and ignore it (PG
186    /// accepts the syntax everywhere too).
187    order_by: Vec<spg_sql::ast::OrderBy>,
188}
189
190/// Output of running the aggregate path. Schema describes one row per
191/// group; rows are not yet ORDER BY-sorted (caller does it).
192#[derive(Debug)]
193pub struct AggResult {
194    pub columns: Vec<ColumnSchema>,
195    pub rows: Vec<Row>,
196}
197
198/// Execute aggregate logic against an already-WHERE-filtered iterator of
199/// rows. `table_alias` is the alias accepted by column resolution.
200#[allow(clippy::too_many_lines)]
201/// v7.25.2 (round-19 A) — caller-injected evaluator for synth-row
202/// expressions that still carry subquery nodes after the rewrite
203/// (correlated subqueries in the select list / HAVING / aggregate
204/// ORDER BY of a GROUP BY query). The engine passes its
205/// correlated-aware evaluator; pure-library callers pass None and
206/// surviving subqueries keep erroring loudly.
207pub type CorrelatedEval<'a> = &'a dyn Fn(&Expr, &Row, &EvalContext<'_>) -> Result<Value, EvalError>;
208
209pub fn run(
210    stmt: &SelectStatement,
211    rows: &[&Row],
212    schema_cols: &[ColumnSchema],
213    table_alias: Option<&str>,
214    correlated_eval: Option<CorrelatedEval<'_>>,
215) -> Result<AggResult, EvalError> {
216    let ctx = EvalContext::new(schema_cols, table_alias);
217    let group_exprs: Vec<Expr> = stmt.group_by.clone().unwrap_or_default();
218
219    // Collect aggregate sub-expressions across items + order_by.
220    let mut agg_specs: Vec<AggSpec> = Vec::new();
221    for item in &stmt.items {
222        if let SelectItem::Expr { expr, .. } = item {
223            collect_aggregates(expr, &mut agg_specs);
224        }
225    }
226    for o in &stmt.order_by {
227        collect_aggregates(&o.expr, &mut agg_specs);
228    }
229    if let Some(h) = &stmt.having {
230        collect_aggregates(h, &mut agg_specs);
231    }
232    // v7.17.0 — arity validation. The collector tolerates an
233    // arbitrary positional-arg count; here we enforce the
234    // per-aggregate contract so a malformed call (e.g.
235    // `array_agg()` or `string_agg(x)`) surfaces as a SQL error
236    // rather than silently coercing to a degenerate aggregate.
237    validate_agg_arities(stmt, &agg_specs)?;
238
239    // Map group key (vec of values, encoded as canonical string) -> group state.
240    // Order of insertion is preserved via a parallel Vec of keys.
241    // v7.29 - hash map (output order rides key_order, not map order).
242    let mut groups: hashbrown::HashMap<String, (Vec<Value>, Vec<AggState>)> =
243        hashbrown::HashMap::new();
244    let mut key_order: Vec<String> = Vec::new();
245    // When there are no GROUP BY exprs *and* there is at least one aggregate,
246    // every row collapses into a single anonymous group keyed by "".
247    if rows.is_empty() && group_exprs.is_empty() {
248        // Single empty-aggregate group: count=0, sum=0, max=NULL, etc.
249        let init: Vec<AggState> = (0..agg_specs.len()).map(|_| AggState::default()).collect();
250        groups.insert(String::new(), (Vec::new(), init));
251        key_order.push(String::new());
252    }
253
254    // v7.30 (perf campaign) - hoist the per-row work that doesn't
255    // depend on the row: which group exprs need collation folding
256    // (none, for most queries - the old code cloned the whole
257    // group_vals vec per row just in case).
258    // v7.30 (perf campaign) - the no-tax row loop. When a group
259    // expr or an aggregate argument is a bare column reference
260    // (the overwhelmingly common shape), bind its position ONCE
261    // and read row cells by offset in the loop - no per-row tree
262    // walk, no owned-Value clone out of resolve_column. Anything
263    // more complex keeps the eval path.
264    let col_pos = |e: &Expr| -> Option<usize> {
265        // Qualified references only: the bare-name resolver carries
266        // alias/ambiguity logic the bind-once path must not fork.
267        if let Expr::Column(c) = e
268            && c.qualifier.is_some()
269        {
270            eval::find_column_pos(c, &ctx)
271        } else {
272            None
273        }
274    };
275    let group_pos: Vec<Option<usize>> = group_exprs.iter().map(col_pos).collect();
276    let all_groups_bound = group_pos.iter().all(Option::is_some);
277    let arg_pos: Vec<Option<usize>> = agg_specs
278        .iter()
279        .map(|spec| spec.arg.as_ref().and_then(|e| col_pos(e)))
280        .collect();
281    let ci_positions: Vec<usize> = group_exprs
282        .iter()
283        .enumerate()
284        .filter(|(_, g)| {
285            matches!(
286                eval::column_collation(g, &ctx),
287                Some(spg_storage::Collation::CaseInsensitive)
288            )
289        })
290        .map(|(i, _)| i)
291        .collect();
292    for row in rows {
293        // Fast key: bound positions + no ci folding -> encode
294        // straight from borrowed cells; group_vals materialise
295        // only when the group is NEW.
296        if all_groups_bound && ci_positions.is_empty() && !group_exprs.is_empty() {
297            let refs: Vec<&Value> = group_pos
298                .iter()
299                .map(|p| row.values.get(p.unwrap()).unwrap_or(&Value::Null))
300                .collect();
301            let key = encode_key_refs(&refs);
302            let entry = match groups.entry_ref(key.as_str()) {
303                hashbrown::hash_map::EntryRef::Occupied(o) => o.into_mut(),
304                hashbrown::hash_map::EntryRef::Vacant(v) => {
305                    key_order.push(key.clone());
306                    let init: Vec<AggState> =
307                        (0..agg_specs.len()).map(|_| AggState::default()).collect();
308                    let owned: Vec<Value> = refs.iter().map(|v| (*v).clone()).collect();
309                    v.insert((owned, init))
310                }
311            };
312            for (i, spec) in agg_specs.iter().enumerate() {
313                let arg_owned: Value;
314                let arg_ref: &Value = match (&arg_pos[i], &spec.arg) {
315                    (Some(p), _) => row.values.get(*p).unwrap_or(&Value::Null),
316                    (None, None) => {
317                        arg_owned = Value::Bool(true);
318                        &arg_owned
319                    }
320                    (None, Some(e)) => {
321                        arg_owned = eval::eval_expr(e, row, &ctx)?;
322                        &arg_owned
323                    }
324                };
325                let arg2_val = match &spec.arg2 {
326                    None => None,
327                    Some(e) => Some(eval::eval_expr(e, row, &ctx)?),
328                };
329                let order_keys = if spec.order_by.is_empty() {
330                    None
331                } else {
332                    let mut keys = Vec::with_capacity(spec.order_by.len());
333                    for o in &spec.order_by {
334                        keys.push(eval::eval_expr(&o.expr, row, &ctx)?);
335                    }
336                    Some(keys)
337                };
338                if spec.distinct {
339                    let dkey = encode_key_refs(core::slice::from_ref(&arg_ref));
340                    if !entry.1[i].seen.insert(dkey) {
341                        continue;
342                    }
343                }
344                update_state(
345                    &mut entry.1[i],
346                    &spec.name,
347                    arg_ref,
348                    arg2_val.as_ref(),
349                    order_keys,
350                )?;
351            }
352            continue;
353        }
354        let group_vals: Vec<Value> = group_exprs
355            .iter()
356            .map(|g| eval::eval_expr(g, row, &ctx))
357            .collect::<Result<_, _>>()?;
358        // v7.17.0 Phase 2.5b — case-insensitive group keying: fold
359        // only the ci columns, and only when any exist. Display
360        // value (`group_vals`) stays original — only the key folds.
361        let key = if ci_positions.is_empty() {
362            encode_key(&group_vals)
363        } else {
364            let mut key_vals = group_vals.clone();
365            for &i in &ci_positions {
366                if let Value::Text(s) = &key_vals[i] {
367                    key_vals[i] = Value::Text(s.to_ascii_lowercase());
368                }
369            }
370            encode_key(&key_vals)
371        };
372        // entry_ref: no per-row key clone on the (dominant) hit path.
373        let entry = match groups.entry_ref(key.as_str()) {
374            hashbrown::hash_map::EntryRef::Occupied(o) => o.into_mut(),
375            hashbrown::hash_map::EntryRef::Vacant(v) => {
376                key_order.push(key.clone());
377                let init: Vec<AggState> =
378                    (0..agg_specs.len()).map(|_| AggState::default()).collect();
379                v.insert((group_vals.clone(), init))
380            }
381        };
382        for (i, spec) in agg_specs.iter().enumerate() {
383            let arg_val = match &spec.arg {
384                None => Value::Bool(true), // count_star: sentinel non-null
385                Some(e) => eval::eval_expr(e, row, &ctx)?,
386            };
387            // v7.17.0 — `string_agg(value, separator)` evaluates the
388            // separator per row but PG treats it as constant; we
389            // pass the per-row value into update_state so a future
390            // varying-separator caller still sees correct output,
391            // even though SPG (like PG) only uses the most recent.
392            let arg2_val = match &spec.arg2 {
393                None => None,
394                Some(e) => Some(eval::eval_expr(e, row, &ctx)?),
395            };
396            // v7.24 (round-16 A) — aggregate-internal ORDER BY:
397            // evaluate the key tuple against the source row.
398            let order_keys = if spec.order_by.is_empty() {
399                None
400            } else {
401                let mut keys = Vec::with_capacity(spec.order_by.len());
402                for o in &spec.order_by {
403                    keys.push(eval::eval_expr(&o.expr, row, &ctx)?);
404                }
405                Some(keys)
406            };
407            // v7.25 (round-17) — DISTINCT: drop repeated inputs
408            // before they reach the accumulator. NULLs flow through
409            // (each aggregate's own NULL rule applies; PG also
410            // treats NULL as a single distinct value for array_agg).
411            if spec.distinct {
412                let key = encode_key(core::slice::from_ref(&arg_val));
413                if !entry.1[i].seen.insert(key) {
414                    continue;
415                }
416            }
417            update_state(
418                &mut entry.1[i],
419                &spec.name,
420                &arg_val,
421                arg2_val.as_ref(),
422                order_keys,
423            )?;
424        }
425    }
426
427    // Build synthetic schema: __grp_0..K then __agg_0..N.
428    let group_types: Vec<DataType> = if rows.is_empty() {
429        // Use Text as a safe stand-in — empty result means schema isn't
430        // observable. Avoids needing to evaluate group exprs on no row.
431        group_exprs.iter().map(|_| DataType::Text).collect()
432    } else {
433        let probe = rows[0];
434        group_exprs
435            .iter()
436            .map(|g| {
437                eval::eval_expr(g, probe, &ctx).map(|v| v.data_type().unwrap_or(DataType::Text))
438            })
439            .collect::<Result<_, _>>()?
440    };
441    let agg_types: Vec<DataType> = agg_specs
442        .iter()
443        .map(|spec| infer_agg_type(spec, schema_cols))
444        .collect();
445    let mut synth_schema: Vec<ColumnSchema> = Vec::new();
446    for (i, ty) in group_types.iter().enumerate() {
447        synth_schema.push(ColumnSchema::new(format!("__grp_{i}"), *ty, true));
448    }
449    for (i, ty) in agg_types.iter().enumerate() {
450        synth_schema.push(ColumnSchema::new(format!("__agg_{i}"), *ty, true));
451    }
452
453    // Materialise synthetic rows.
454    let mut synth_rows: Vec<Row> = Vec::new();
455    for k in &key_order {
456        let (gvals, states) = &groups[k];
457        let mut values: Vec<Value> = Vec::with_capacity(synth_schema.len());
458        values.extend(gvals.iter().cloned());
459        for (i, st) in states.iter().enumerate() {
460            // v7.24 (round-16 A) — order the collected items per the
461            // aggregate-internal ORDER BY before finalize consumes
462            // them.
463            let st_sorted;
464            let st_final: &AggState =
465                if !agg_specs[i].order_by.is_empty() && st.item_keys.len() == st.items.len() {
466                    let mut idx: Vec<usize> = (0..st.items.len()).collect();
467                    let ob = &agg_specs[i].order_by;
468                    idx.sort_by(|&x, &y| {
469                        for (k, o) in ob.iter().enumerate() {
470                            let cmp = crate::order_by_value_cmp(
471                                o.desc,
472                                o.nulls_first,
473                                &st.item_keys[x][k],
474                                &st.item_keys[y][k],
475                            );
476                            if cmp != core::cmp::Ordering::Equal {
477                                return cmp;
478                            }
479                        }
480                        core::cmp::Ordering::Equal
481                    });
482                    let mut sorted = st.clone();
483                    sorted.items = idx.iter().map(|&j| st.items[j].clone()).collect();
484                    st_sorted = sorted;
485                    &st_sorted
486                } else {
487                    st
488                };
489            values.push(finalize(&agg_specs[i].name, st_final));
490        }
491        synth_rows.push(Row::new(values));
492    }
493
494    // Rewrite the user's SELECT items + ORDER BY to reference synthetic
495    // columns. After rewriting, every remaining `Expr::Column` must
496    // resolve against the synthetic schema (i.e. must have been a GROUP
497    // BY expression).
498    let columns: Vec<ColumnSchema> = stmt
499        .items
500        .iter()
501        .map(|item| match item {
502            SelectItem::Wildcard => Err(EvalError::TypeMismatch {
503                detail: "SELECT * with aggregates is not supported".into(),
504            }),
505            SelectItem::Expr { expr, alias } => {
506                let rewritten = rewrite_expr(expr, &group_exprs, &agg_specs);
507                let name = alias.clone().unwrap_or_else(|| expr.to_string());
508                Ok(ColumnSchema::new(
509                    name,
510                    agg_or_group_type(&rewritten, &synth_schema),
511                    true,
512                ))
513            }
514        })
515        .collect::<Result<_, _>>()?;
516
517    // Project per synthetic row. HAVING filters out groups *before*
518    // we keep the projected row — same semantics as PG: HAVING runs
519    // against the aggregated row (so `HAVING count(*) > 1` works) and
520    // sees only group-by'd columns plus aggregate values.
521    let synth_ctx = EvalContext::new(&synth_schema, None);
522    let having_rewritten = stmt
523        .having
524        .as_ref()
525        .map(|h| rewrite_expr(h, &group_exprs, &agg_specs));
526    // v7.30 (phase 3e-1) - rewrite SELECT items ONCE. This ran per
527    // GROUP (23.5k x 9 items of AST cloning = ~48% of the inbox
528    // query in sampled stacks); the rewrite is group-independent.
529    // Stable addresses also let the per-expression subquery plans
530    // (v7.29 3c) hit across groups instead of rebuilding.
531    let items_rewritten: alloc::vec::Vec<Option<Expr>> = stmt
532        .items
533        .iter()
534        .map(|item| match item {
535            SelectItem::Expr { expr, .. } => Some(rewrite_expr(expr, &group_exprs, &agg_specs)),
536            SelectItem::Wildcard => None,
537        })
538        .collect();
539    let mut kept_synth: Vec<Row> = Vec::new();
540    let mut out_rows: Vec<Row> = Vec::new();
541    for srow in synth_rows {
542        if let Some(h) = &having_rewritten {
543            let cond = match correlated_eval {
544                Some(f) if crate::expr_has_subquery(h) => f(h, &srow, &synth_ctx)?,
545                _ => eval::eval_expr(h, &srow, &synth_ctx)?,
546            };
547            if !matches!(cond, Value::Bool(true)) {
548                continue;
549            }
550        }
551        let mut values: Vec<Value> = Vec::with_capacity(columns.len());
552        for rewritten in items_rewritten.iter().flatten() {
553            values.push(match correlated_eval {
554                Some(f) if crate::expr_has_subquery(rewritten) => f(rewritten, &srow, &synth_ctx)?,
555                _ => eval::eval_expr(rewritten, &srow, &synth_ctx)?,
556            });
557        }
558        kept_synth.push(srow);
559        out_rows.push(Row::new(values));
560    }
561
562    // ORDER BY: evaluate the rewritten order_by against each synth row,
563    // sort, then drop the keys. Limit is applied by the caller.
564    if !stmt.order_by.is_empty() {
565        // v6.4.0 — multi-key ORDER BY on aggregate output. Each key
566        // gets its own rewrite + per-key DESC flag.
567        let rewritten: Vec<Expr> = stmt
568            .order_by
569            .iter()
570            .map(|o| rewrite_expr(&o.expr, &group_exprs, &agg_specs))
571            .collect();
572        let keys_meta: Vec<(bool, Option<bool>)> = stmt
573            .order_by
574            .iter()
575            .map(|o| (o.desc, o.nulls_first))
576            .collect();
577        let mut tagged: Vec<(Vec<Value>, Row)> = kept_synth
578            .into_iter()
579            .zip(out_rows)
580            .map(|(s, o)| {
581                let mut keys = Vec::with_capacity(rewritten.len());
582                for e in &rewritten {
583                    keys.push(match correlated_eval {
584                        Some(f) if crate::expr_has_subquery(e) => f(e, &s, &synth_ctx)?,
585                        _ => eval::eval_expr(e, &s, &synth_ctx)?,
586                    });
587                }
588                Ok::<_, EvalError>((keys, o))
589            })
590            .collect::<Result<_, _>>()?;
591        tagged.sort_by(|a, b| {
592            use core::cmp::Ordering;
593            for (i, (ka, kb)) in a.0.iter().zip(b.0.iter()).enumerate() {
594                let (desc, nf) = keys_meta[i];
595                let cmp = crate::order_by_value_cmp(desc, nf, ka, kb);
596                if cmp != Ordering::Equal {
597                    return cmp;
598                }
599            }
600            Ordering::Equal
601        });
602        out_rows = tagged.into_iter().map(|(_, o)| o).collect();
603    }
604
605    Ok(AggResult {
606        columns,
607        rows: out_rows,
608    })
609}
610
611/// v7.17.0 — walk the statement again to validate the positional
612/// arity of every aggregate call site. Done after AST collection
613/// rather than inside `collect_aggregates` so the collector stays
614/// infallible; callers in `run()` can do a single early-error
615/// exit before any per-row work.
616fn validate_agg_arities(stmt: &SelectStatement, _specs: &[AggSpec]) -> Result<(), EvalError> {
617    fn walk(e: &Expr) -> Result<(), EvalError> {
618        if let Expr::FunctionCall { name, args } = e {
619            let lower = name.to_ascii_lowercase();
620            let expected: Option<usize> = match lower.as_str() {
621                "count_star" => Some(0),
622                "count" | "sum" | "avg" | "min" | "max" | "array_agg"
623                // v7.17.0 — boolean aggregates also take exactly
624                // one arg. `every` is an alias normalised inside
625                // collect_aggregates / rewrite_expr.
626                | "bool_and" | "bool_or" | "every" => Some(1),
627                "string_agg" => Some(2),
628                _ => None,
629            };
630            if let Some(want) = expected
631                && args.len() != want
632            {
633                return Err(EvalError::TypeMismatch {
634                    detail: alloc::format!("{lower}() takes {want} arg(s), got {}", args.len()),
635                });
636            }
637            for a in args {
638                walk(a)?;
639            }
640        } else if let Expr::Binary { lhs, rhs, .. } = e {
641            walk(lhs)?;
642            walk(rhs)?;
643        } else if let Expr::Unary { expr, .. }
644        | Expr::Cast { expr, .. }
645        | Expr::IsNull { expr, .. } = e
646        {
647            walk(expr)?;
648        }
649        Ok(())
650    }
651    for item in &stmt.items {
652        if let SelectItem::Expr { expr, .. } = item {
653            walk(expr)?;
654        }
655    }
656    for o in &stmt.order_by {
657        walk(&o.expr)?;
658    }
659    if let Some(h) = &stmt.having {
660        walk(h)?;
661    }
662    Ok(())
663}
664
665fn collect_aggregates(e: &Expr, out: &mut Vec<AggSpec>) {
666    match e {
667        // v7.24 (round-16 A) — ordered aggregate: register the inner
668        // call's spec with the ordering attached.
669        Expr::AggregateOrdered {
670            call,
671            order_by,
672            distinct,
673        } => {
674            if let Expr::FunctionCall { name, args } = call.as_ref() {
675                let lower = name.to_ascii_lowercase();
676                if is_aggregate_name(&lower) {
677                    let canonical = if lower == "every" {
678                        "bool_and".to_string()
679                    } else {
680                        lower
681                    };
682                    let spec = AggSpec {
683                        name: canonical,
684                        arg: args.first().cloned(),
685                        arg2: if name.eq_ignore_ascii_case("string_agg") {
686                            args.get(1).cloned()
687                        } else {
688                            None
689                        },
690                        distinct: *distinct,
691                        order_by: order_by.clone(),
692                    };
693                    if !out.iter().any(|s| {
694                        s.name == spec.name
695                            && s.arg == spec.arg
696                            && s.arg2 == spec.arg2
697                            && s.distinct == spec.distinct
698                            && s.order_by == spec.order_by
699                    }) {
700                        out.push(spec);
701                    }
702                    return;
703                }
704            }
705            collect_aggregates(call, out);
706            for o in order_by {
707                collect_aggregates(&o.expr, out);
708            }
709        }
710        Expr::FunctionCall { name, args } => {
711            let lower = name.to_ascii_lowercase();
712            if is_aggregate_name(&lower) {
713                let arg = if lower == "count_star" {
714                    None
715                } else {
716                    args.first().cloned()
717                };
718                // v7.17.0 — second positional arg for
719                // `string_agg(value, separator)`. Everything else
720                // ignores it.
721                let arg2 = if lower == "string_agg" {
722                    args.get(1).cloned()
723                } else {
724                    None
725                };
726                // v7.17.0 — `every` is the SQL-standard alias for
727                // `bool_and`; collapse at collection time so
728                // update_state / finalize need only one arm.
729                let canonical = if lower == "every" {
730                    "bool_and".to_string()
731                } else {
732                    lower
733                };
734                let spec = AggSpec {
735                    name: canonical,
736                    arg: arg.clone(),
737                    arg2: arg2.clone(),
738                    distinct: false,
739                    order_by: Vec::new(),
740                };
741                if !out.iter().any(|s| {
742                    s.name == spec.name
743                        && s.arg == spec.arg
744                        && s.arg2 == spec.arg2
745                        && !s.distinct
746                        && s.order_by == spec.order_by
747                }) {
748                    out.push(spec);
749                }
750                // Don't recurse into the arg — nested aggregates are
751                // illegal in standard SQL.
752            } else {
753                for a in args {
754                    collect_aggregates(a, out);
755                }
756            }
757        }
758        Expr::Binary { lhs, rhs, .. } => {
759            collect_aggregates(lhs, out);
760            collect_aggregates(rhs, out);
761        }
762        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
763            collect_aggregates(expr, out);
764        }
765        Expr::Like { expr, pattern, .. } => {
766            collect_aggregates(expr, out);
767            collect_aggregates(pattern, out);
768        }
769        Expr::InList { expr, list, .. } => {
770            collect_aggregates(expr, out);
771            for item in list {
772                collect_aggregates(item, out);
773            }
774        }
775        Expr::Extract { source, .. } => collect_aggregates(source, out),
776        // v4.10 subquery + v4.12 window / Literal / Column —
777        // non-recursing leaves for the aggregate collector.
778        Expr::ScalarSubquery(_)
779        | Expr::Exists { .. }
780        | Expr::InSubquery { .. }
781        | Expr::WindowFunction { .. }
782        | Expr::Literal(_)
783        | Expr::Placeholder(_)
784        | Expr::Column(_) => {}
785        // v7.10.10 — recurse into array constructor children +
786        // subscript / ANY/ALL operands.
787        Expr::Array(items) => {
788            for elem in items {
789                collect_aggregates(elem, out);
790            }
791        }
792        Expr::ArraySubscript { target, index } => {
793            collect_aggregates(target, out);
794            collect_aggregates(index, out);
795        }
796        Expr::AnyAll { expr, array, .. } => {
797            collect_aggregates(expr, out);
798            collect_aggregates(array, out);
799        }
800        Expr::Case {
801            operand,
802            branches,
803            else_branch,
804        } => {
805            if let Some(o) = operand {
806                collect_aggregates(o, out);
807            }
808            for (w, t) in branches {
809                collect_aggregates(w, out);
810                collect_aggregates(t, out);
811            }
812            if let Some(e) = else_branch {
813                collect_aggregates(e, out);
814            }
815        }
816    }
817}
818
819fn update_state(
820    st: &mut AggState,
821    name: &str,
822    v: &Value,
823    arg2: Option<&Value>,
824    order_keys: Option<Vec<Value>>,
825) -> Result<(), EvalError> {
826    let is_null = matches!(v, Value::Null);
827    match name {
828        "count_star" => st.count += 1,
829        "count" => {
830            if !is_null {
831                st.count += 1;
832            }
833        }
834        "sum" | "avg" => {
835            if is_null {
836                return Ok(());
837            }
838            st.count += 1;
839            match v {
840                Value::Int(n) => st.sum_int += i64::from(*n),
841                Value::BigInt(n) => st.sum_int += *n,
842                Value::Float(x) => {
843                    st.use_float = true;
844                    st.sum_float += *x;
845                }
846                other => {
847                    return Err(EvalError::TypeMismatch {
848                        detail: format!("sum/avg need numeric, got {:?}", other.data_type()),
849                    });
850                }
851            }
852        }
853        "min" => {
854            if is_null {
855                return Ok(());
856            }
857            match &st.extreme {
858                None => st.extreme = Some(v.clone()),
859                Some(cur) => {
860                    if value_cmp(v, cur) == core::cmp::Ordering::Less {
861                        st.extreme = Some(v.clone());
862                    }
863                }
864            }
865        }
866        "max" => {
867            if is_null {
868                return Ok(());
869            }
870            match &st.extreme {
871                None => st.extreme = Some(v.clone()),
872                Some(cur) => {
873                    if value_cmp(v, cur) == core::cmp::Ordering::Greater {
874                        st.extreme = Some(v.clone());
875                    }
876                }
877            }
878        }
879        // v7.17.0 — string_agg(value, separator). NULL value is
880        // skipped (PG aggregate-skip-null). Separator captured
881        // from the latest row that flows through; matches PG's
882        // semantics of evaluating the separator per row but using
883        // the last value at finalize time (in practice it's
884        // constant). count is bumped so we can distinguish "empty
885        // group → NULL" from "all-NULL group → NULL".
886        "string_agg" => {
887            if let Some(sep) = arg2
888                && let Value::Text(s) = sep
889            {
890                st.separator = Some(s.clone());
891            }
892            if is_null {
893                return Ok(());
894            }
895            if let Value::Text(s) = v {
896                st.items.push(Value::Text(s.clone()));
897                if let Some(k) = order_keys {
898                    st.item_keys.push(k);
899                }
900                st.count += 1;
901            } else {
902                return Err(EvalError::TypeMismatch {
903                    detail: format!("string_agg requires text value, got {:?}", v.data_type()),
904                });
905            }
906        }
907        // v7.17.0 — array_agg(value). Unlike string_agg, NULL
908        // elements are KEPT in the array (PG behaviour); the
909        // result is NULL only when ZERO rows fed in. Element type
910        // is locked from the first row's value type; subsequent
911        // rows must match (PG also rejects mixed-type array_agg).
912        "array_agg" => {
913            st.items.push(v.clone());
914            if let Some(k) = order_keys {
915                st.item_keys.push(k);
916            }
917            st.count += 1;
918        }
919        // v7.17.0 — bool_and(p): TRUE iff every non-NULL input is
920        // TRUE. NULL skipped; running accumulator stays at TRUE
921        // until the first non-NULL FALSE.
922        "bool_and" => {
923            if is_null {
924                return Ok(());
925            }
926            let b = match v {
927                Value::Bool(b) => *b,
928                other => {
929                    return Err(EvalError::TypeMismatch {
930                        detail: format!("bool_and requires bool, got {:?}", other.data_type()),
931                    });
932                }
933            };
934            st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc && b));
935        }
936        // v7.17.0 — bool_or(p): TRUE iff any non-NULL input is
937        // TRUE. NULL skipped.
938        "bool_or" => {
939            if is_null {
940                return Ok(());
941            }
942            let b = match v {
943                Value::Bool(b) => *b,
944                other => {
945                    return Err(EvalError::TypeMismatch {
946                        detail: format!("bool_or requires bool, got {:?}", other.data_type()),
947                    });
948                }
949            };
950            st.bool_acc = Some(st.bool_acc.map_or(b, |acc| acc || b));
951        }
952        _ => unreachable!("non-aggregate {name} in update_state"),
953    }
954    Ok(())
955}
956
957#[allow(clippy::cast_precision_loss)]
958fn finalize(name: &str, st: &AggState) -> Value {
959    match name {
960        "count" | "count_star" => Value::BigInt(st.count),
961        "sum" => {
962            if st.count == 0 {
963                Value::Null
964            } else if st.use_float {
965                Value::Float(st.sum_float + (st.sum_int as f64))
966            } else {
967                Value::BigInt(st.sum_int)
968            }
969        }
970        "avg" => {
971            if st.count == 0 {
972                Value::Null
973            } else {
974                let total = if st.use_float {
975                    st.sum_float + (st.sum_int as f64)
976                } else {
977                    st.sum_int as f64
978                };
979                Value::Float(total / (st.count as f64))
980            }
981        }
982        "min" | "max" => st.extreme.clone().unwrap_or(Value::Null),
983        // v7.17.0 — string_agg: join all collected text items with
984        // the captured separator. Empty / all-NULL group → NULL
985        // (PG semantics).
986        "string_agg" => {
987            if st.items.is_empty() {
988                return Value::Null;
989            }
990            let sep = st.separator.clone().unwrap_or_default();
991            let mut out = String::new();
992            for (i, item) in st.items.iter().enumerate() {
993                if i > 0 {
994                    out.push_str(&sep);
995                }
996                if let Value::Text(s) = item {
997                    out.push_str(s);
998                }
999            }
1000            Value::Text(out)
1001        }
1002        // v7.17.0 — array_agg: collect into a typed array. NULL
1003        // elements are preserved per PG. Result type is decided
1004        // by the first non-NULL element seen (or Text fallback
1005        // when the whole group is NULL — PG would surface the
1006        // declared input type, but SPG hasn't yet wired the
1007        // aggregate's static input-type from `describe`).
1008        "array_agg" => {
1009            if st.items.is_empty() {
1010                return Value::Null;
1011            }
1012            let probe = st.items.iter().find(|v| !v.is_null());
1013            match probe.and_then(spg_storage::Value::data_type) {
1014                Some(DataType::Int) | Some(DataType::SmallInt) => {
1015                    let items: Vec<Option<i32>> = st
1016                        .items
1017                        .iter()
1018                        .map(|v| match v {
1019                            Value::Int(n) => Some(*n),
1020                            Value::SmallInt(n) => Some(i32::from(*n)),
1021                            _ => None,
1022                        })
1023                        .collect();
1024                    Value::IntArray(items)
1025                }
1026                Some(DataType::BigInt) => {
1027                    let items: Vec<Option<i64>> = st
1028                        .items
1029                        .iter()
1030                        .map(|v| match v {
1031                            Value::BigInt(n) => Some(*n),
1032                            _ => None,
1033                        })
1034                        .collect();
1035                    Value::BigIntArray(items)
1036                }
1037                _ => {
1038                    let items: Vec<Option<String>> = st
1039                        .items
1040                        .iter()
1041                        .map(|v| match v {
1042                            Value::Text(s) => Some(s.clone()),
1043                            Value::Null => None,
1044                            other => Some(format!("{other:?}")),
1045                        })
1046                        .collect();
1047                    Value::TextArray(items)
1048                }
1049            }
1050        }
1051        // v7.17.0 — bool_and / bool_or finalize: lazy-init pattern
1052        // means `None` is exactly "empty group or all-NULL", which
1053        // PG surfaces as SQL NULL.
1054        "bool_and" | "bool_or" => st.bool_acc.map_or(Value::Null, Value::Bool),
1055        _ => unreachable!(),
1056    }
1057}
1058
1059fn infer_agg_type(spec: &AggSpec, schema_cols: &[ColumnSchema]) -> DataType {
1060    // v7.26 (round-20 C) — the argument's statically-derived shape
1061    // types MIN/MAX/SUM/array_agg properly; RowDescription used to
1062    // report TEXT for these, breaking every sqlx typed decode.
1063    let arg_ty = spec
1064        .arg
1065        .as_ref()
1066        .and_then(|a| crate::describe::describe_expr(a, schema_cols))
1067        .map(|shape| shape.ty);
1068    match spec.name.as_str() {
1069        "count" | "count_star" => DataType::BigInt,
1070        "sum" => match arg_ty {
1071            Some(DataType::Float) => DataType::Float,
1072            _ => DataType::BigInt,
1073        },
1074        "avg" => DataType::Float,
1075        // v7.17.0 — string_agg always returns TEXT.
1076        "string_agg" => DataType::Text,
1077        "array_agg" => match arg_ty {
1078            Some(DataType::Int | DataType::SmallInt) => DataType::IntArray,
1079            Some(DataType::BigInt) => DataType::BigIntArray,
1080            _ => DataType::TextArray,
1081        },
1082        // v7.17.0 — boolean aggregates always return BOOL (nullable
1083        // — empty / all-NULL group → NULL).
1084        "bool_and" | "bool_or" => DataType::Bool,
1085        // min/max and anything pass-through: the argument's shape.
1086        _ => arg_ty.unwrap_or(DataType::Text),
1087    }
1088}
1089
1090fn agg_or_group_type(e: &Expr, synth: &[ColumnSchema]) -> DataType {
1091    if let Expr::Column(c) = e
1092        && let Some(s) = synth.iter().find(|s| s.name == c.name)
1093    {
1094        return s.ty;
1095    }
1096    // v7.26 (round-20 C) — compound expressions over aggregates
1097    // (COALESCE(BOOL_OR(…), false), (array_agg(…))[1], CASE …)
1098    // derive their shape statically against the synth schema; the
1099    // old Text fallback broke sqlx typed decodes of exactly these
1100    // columns.
1101    crate::describe::describe_expr(e, synth)
1102        .map(|shape| shape.ty)
1103        .unwrap_or(DataType::Text)
1104}
1105
1106fn rewrite_expr(e: &Expr, group_exprs: &[Expr], aggs: &[AggSpec]) -> Expr {
1107    // v7.24 (round-16 A) — ordered aggregate: match on the inner
1108    // call PLUS the ordering keys.
1109    if let Expr::AggregateOrdered {
1110        call,
1111        order_by,
1112        distinct,
1113    } = e
1114        && let Expr::FunctionCall { name, args } = call.as_ref()
1115    {
1116        let lower = name.to_ascii_lowercase();
1117        if is_aggregate_name(&lower) {
1118            let canonical: &str = if lower == "every" { "bool_and" } else { &lower };
1119            let arg = args.first().cloned();
1120            let arg2 = if lower == "string_agg" {
1121                args.get(1).cloned()
1122            } else {
1123                None
1124            };
1125            for (i, spec) in aggs.iter().enumerate() {
1126                if spec.name == canonical
1127                    && spec.arg == arg
1128                    && spec.arg2 == arg2
1129                    && spec.distinct == *distinct
1130                    && spec.order_by == *order_by
1131                {
1132                    return Expr::Column(spg_sql::ast::ColumnName {
1133                        qualifier: None,
1134                        name: format!("__agg_{i}"),
1135                    });
1136                }
1137            }
1138        }
1139    }
1140    // Match aggregate FunctionCalls first — they sit outside group_by.
1141    if let Expr::FunctionCall { name, args } = e {
1142        let lower = name.to_ascii_lowercase();
1143        if is_aggregate_name(&lower) {
1144            let arg = if lower == "count_star" {
1145                None
1146            } else {
1147                args.first().cloned()
1148            };
1149            // v7.17.0 — match the spec we registered for
1150            // string_agg(value, separator) on the full pair.
1151            let arg2 = if lower == "string_agg" {
1152                args.get(1).cloned()
1153            } else {
1154                None
1155            };
1156            // v7.17.0 — `every` collapses into `bool_and` at
1157            // collection; mirror that here so the rewrite finds
1158            // the matching synth column.
1159            let canonical: &str = if lower == "every" {
1160                "bool_and"
1161            } else {
1162                lower.as_str()
1163            };
1164            for (i, spec) in aggs.iter().enumerate() {
1165                if spec.name == canonical
1166                    && spec.arg == arg
1167                    && spec.arg2 == arg2
1168                    && !spec.distinct
1169                    && spec.order_by.is_empty()
1170                {
1171                    return Expr::Column(spg_sql::ast::ColumnName {
1172                        qualifier: None,
1173                        name: format!("__agg_{i}"),
1174                    });
1175                }
1176            }
1177        }
1178    }
1179    // Match a group_by expression by AST equality.
1180    for (i, g) in group_exprs.iter().enumerate() {
1181        if g == e {
1182            return Expr::Column(spg_sql::ast::ColumnName {
1183                qualifier: None,
1184                name: format!("__grp_{i}"),
1185            });
1186        }
1187    }
1188    // Recurse into children.
1189    match e {
1190        Expr::AggregateOrdered {
1191            call,
1192            order_by,
1193            distinct,
1194        } => Expr::AggregateOrdered {
1195            call: Box::new(rewrite_expr(call, group_exprs, aggs)),
1196            distinct: *distinct,
1197            order_by: order_by
1198                .iter()
1199                .map(|o| spg_sql::ast::OrderBy {
1200                    expr: rewrite_expr(&o.expr, group_exprs, aggs),
1201                    desc: o.desc,
1202                    nulls_first: o.nulls_first,
1203                })
1204                .collect(),
1205        },
1206        Expr::Binary { lhs, op, rhs } => Expr::Binary {
1207            lhs: Box::new(rewrite_expr(lhs, group_exprs, aggs)),
1208            op: *op,
1209            rhs: Box::new(rewrite_expr(rhs, group_exprs, aggs)),
1210        },
1211        Expr::Unary { op, expr } => Expr::Unary {
1212            op: *op,
1213            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1214        },
1215        Expr::Cast { expr, target } => Expr::Cast {
1216            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1217            target: *target,
1218        },
1219        Expr::IsNull { expr, negated } => Expr::IsNull {
1220            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1221            negated: *negated,
1222        },
1223        Expr::FunctionCall { name, args } => Expr::FunctionCall {
1224            name: name.clone(),
1225            args: args
1226                .iter()
1227                .map(|a| rewrite_expr(a, group_exprs, aggs))
1228                .collect(),
1229        },
1230        Expr::Like {
1231            expr,
1232            pattern,
1233            negated,
1234            case_insensitive,
1235        } => Expr::Like {
1236            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1237            pattern: Box::new(rewrite_expr(pattern, group_exprs, aggs)),
1238            negated: *negated,
1239            case_insensitive: *case_insensitive,
1240        },
1241        Expr::Extract { field, source } => Expr::Extract {
1242            field: *field,
1243            source: Box::new(rewrite_expr(source, group_exprs, aggs)),
1244        },
1245        // v7.25.2 (round-19 A) — subquery nodes: rewrite group-key
1246        // references INSIDE the body to `__grp_N` so the correlated
1247        // resolver can substitute them against the synthesised group
1248        // row (aggs are NOT matched inside the body — a COUNT in the
1249        // subquery is the subquery's own aggregate).
1250        Expr::ScalarSubquery(s) => {
1251            Expr::ScalarSubquery(Box::new(rewrite_group_keys_in_select(s, group_exprs)))
1252        }
1253        Expr::Exists { subquery, negated } => Expr::Exists {
1254            subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
1255            negated: *negated,
1256        },
1257        Expr::InSubquery {
1258            expr,
1259            subquery,
1260            negated,
1261        } => Expr::InSubquery {
1262            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1263            subquery: Box::new(rewrite_group_keys_in_select(subquery, group_exprs)),
1264            negated: *negated,
1265        },
1266        // v4.12 window / Literal / Column — clone-pass (these don't
1267        // participate in aggregate rewrite).
1268        Expr::WindowFunction { .. } | Expr::Literal(_) | Expr::Placeholder(_) | Expr::Column(_) => {
1269            e.clone()
1270        }
1271        // v7.10.10 — recurse children for array nodes.
1272        Expr::Array(items) => Expr::Array(
1273            items
1274                .iter()
1275                .map(|elem| rewrite_expr(elem, group_exprs, aggs))
1276                .collect(),
1277        ),
1278        Expr::ArraySubscript { target, index } => Expr::ArraySubscript {
1279            target: Box::new(rewrite_expr(target, group_exprs, aggs)),
1280            index: Box::new(rewrite_expr(index, group_exprs, aggs)),
1281        },
1282        Expr::AnyAll {
1283            expr,
1284            op,
1285            array,
1286            is_any,
1287        } => Expr::AnyAll {
1288            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1289            op: *op,
1290            array: Box::new(rewrite_expr(array, group_exprs, aggs)),
1291            is_any: *is_any,
1292        },
1293        Expr::InList {
1294            expr,
1295            list,
1296            negated,
1297        } => Expr::InList {
1298            expr: Box::new(rewrite_expr(expr, group_exprs, aggs)),
1299            list: list
1300                .iter()
1301                .map(|item| rewrite_expr(item, group_exprs, aggs))
1302                .collect(),
1303            negated: *negated,
1304        },
1305        Expr::Case {
1306            operand,
1307            branches,
1308            else_branch,
1309        } => Expr::Case {
1310            operand: operand
1311                .as_deref()
1312                .map(|o| Box::new(rewrite_expr(o, group_exprs, aggs))),
1313            branches: branches
1314                .iter()
1315                .map(|(w, t)| {
1316                    (
1317                        rewrite_expr(w, group_exprs, aggs),
1318                        rewrite_expr(t, group_exprs, aggs),
1319                    )
1320                })
1321                .collect(),
1322            else_branch: else_branch
1323                .as_deref()
1324                .map(|e| Box::new(rewrite_expr(e, group_exprs, aggs))),
1325        },
1326    }
1327}
1328
1329/// v7.25.2 (round-19 A) — rewrite group-key references inside a
1330/// subquery body to `__grp_N` synthetic columns (aggregates are
1331/// not touched: empty spec list). Runs through the canonical
1332/// Select walker so every expression slot is covered.
1333fn rewrite_group_keys_in_select(
1334    s: &spg_sql::ast::SelectStatement,
1335    group_exprs: &[Expr],
1336) -> spg_sql::ast::SelectStatement {
1337    let mut out = s.clone();
1338    let _ = crate::walk_select_exprs_mut(&mut out, &mut |e| {
1339        *e = rewrite_expr(e, group_exprs, &[]);
1340        Ok(())
1341    });
1342    out
1343}
1344
1345/// Canonical string key for a tuple of group values. Used as map key.
1346/// Per-value group-key encoding (shared by owned and borrowed paths).
1347fn encode_one(out: &mut String, v: &Value) {
1348    match v {
1349        Value::Null => out.push_str("N|"),
1350        Value::SmallInt(n) => {
1351            out.push('s');
1352            out.push_str(&n.to_string());
1353            out.push('|');
1354        }
1355        Value::Int(n) => {
1356            out.push('I');
1357            out.push_str(&n.to_string());
1358            out.push('|');
1359        }
1360        Value::BigInt(n) => {
1361            out.push('B');
1362            out.push_str(&n.to_string());
1363            out.push('|');
1364        }
1365        Value::Float(x) => {
1366            out.push('F');
1367            out.push_str(&x.to_string());
1368            out.push('|');
1369        }
1370        Value::Bool(b) => {
1371            out.push(if *b { 'T' } else { 'f' });
1372            out.push('|');
1373        }
1374        Value::Text(s) => {
1375            out.push('S');
1376            out.push_str(s);
1377            out.push('|');
1378        }
1379        Value::Vector(v) => {
1380            out.push('V');
1381            for x in v {
1382                out.push_str(&x.to_string());
1383                out.push(',');
1384            }
1385            out.push('|');
1386        }
1387        // v6.0.1: GROUP BY on a `VECTOR(N) USING SQ8` column.
1388        // Two cells with byte-identical `(min, max, bytes)`
1389        // share the same group; equivalence is byte-equality
1390        // (same as f32 grouping today — neither path tries to
1391        // normalise nan/-0).
1392        Value::Sq8Vector(q) => {
1393            out.push('Q');
1394            out.push_str(&q.min.to_string());
1395            out.push('@');
1396            out.push_str(&q.max.to_string());
1397            out.push(':');
1398            for b in &q.bytes {
1399                out.push_str(&b.to_string());
1400                out.push(',');
1401            }
1402            out.push('|');
1403        }
1404        // v6.0.3: GROUP BY on a `VECTOR(N) USING HALF` column.
1405        // Byte-equality over the raw u16 bits; matches the SQ8
1406        // path's byte-key model.
1407        Value::HalfVector(h) => {
1408            out.push('H');
1409            for b in &h.bytes {
1410                out.push_str(&b.to_string());
1411                out.push(',');
1412            }
1413            out.push('|');
1414        }
1415        Value::Numeric { scaled, scale } => {
1416            out.push('D');
1417            out.push_str(&scaled.to_string());
1418            out.push('@');
1419            out.push_str(&scale.to_string());
1420            out.push('|');
1421        }
1422        Value::Date(d) => {
1423            out.push('d');
1424            out.push_str(&d.to_string());
1425            out.push('|');
1426        }
1427        Value::Timestamp(t) => {
1428            out.push('t');
1429            out.push_str(&t.to_string());
1430            out.push('|');
1431        }
1432        Value::Interval { months, micros } => {
1433            out.push('i');
1434            out.push_str(&months.to_string());
1435            out.push('m');
1436            out.push_str(&micros.to_string());
1437            out.push('|');
1438        }
1439        Value::Json(s) => {
1440            out.push('j');
1441            out.push_str(s);
1442            out.push('|');
1443        }
1444        // v7.5.0 — Value is #[non_exhaustive] for downstream
1445        // forward-compat. Any future variant lacking explicit
1446        // handling here will share a debug-derived group key,
1447        // which is observably wrong but won't crash.
1448        _ => {
1449            out.push('?');
1450            out.push_str(&format!("{v:?}"));
1451            out.push('|');
1452        }
1453    }
1454}
1455
1456/// v7.30 (perf campaign) - encode from borrowed cells without
1457/// materialising an owned Vec<Value> first.
1458fn encode_key_refs(vals: &[&Value]) -> String {
1459    let mut out = String::new();
1460    for v in vals {
1461        encode_one(&mut out, v);
1462    }
1463    out
1464}
1465
1466pub(crate) fn encode_key(vals: &[Value]) -> String {
1467    let mut out = String::new();
1468    for v in vals {
1469        encode_one(&mut out, v);
1470    }
1471    out
1472}
1473
1474#[allow(clippy::cast_precision_loss)]
1475fn value_cmp(a: &Value, b: &Value) -> core::cmp::Ordering {
1476    use core::cmp::Ordering::Equal;
1477    match (a, b) {
1478        (Value::Null, Value::Null) => Equal,
1479        (Value::Null, _) => core::cmp::Ordering::Greater, // NULLs last
1480        (_, Value::Null) => core::cmp::Ordering::Less,
1481        (Value::Int(x), Value::Int(y)) => x.cmp(y),
1482        (Value::BigInt(x), Value::BigInt(y)) => x.cmp(y),
1483        (Value::Int(x), Value::BigInt(y)) => i64::from(*x).cmp(y),
1484        (Value::BigInt(x), Value::Int(y)) => x.cmp(&i64::from(*y)),
1485        (Value::Float(x), Value::Float(y)) => x.partial_cmp(y).unwrap_or(Equal),
1486        (Value::Int(x), Value::Float(y)) => f64::from(*x).partial_cmp(y).unwrap_or(Equal),
1487        (Value::Float(x), Value::Int(y)) => x.partial_cmp(&f64::from(*y)).unwrap_or(Equal),
1488        (Value::BigInt(x), Value::Float(y)) => (*x as f64).partial_cmp(y).unwrap_or(Equal),
1489        (Value::Float(x), Value::BigInt(y)) => x.partial_cmp(&(*y as f64)).unwrap_or(Equal),
1490        (Value::Text(x), Value::Text(y)) => x.cmp(y),
1491        (Value::Bool(x), Value::Bool(y)) => x.cmp(y),
1492        _ => Equal,
1493    }
1494}