spg-engine 7.36.0

Execution engine for SPG: glues spg-sql parsing to spg-storage. Foreign keys, joins, vectors, cold tier.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
//! Compiled expressions — PG's ExprState idea (cut 30, extracted
//! from `eval.rs`; v7.32 perf knife D / architecture v2 P1).
//!
//! Walk the tree ONCE per query, pre-resolve column positions and
//! collation-fold decisions (both row-invariant), emit a flat
//! post-order step program; per-row evaluation is a linear loop —
//! no tree dispatch, no name resolution, no collation lookups.
//! Anything the compiler doesn't model becomes a `Step::Subtree`
//! that calls the interpreter for that node, so values AND error
//! behaviour stay bit-for-bit with `eval_expr` (invariant I3).

use alloc::format;
use alloc::vec::Vec;

use spg_sql::ast::{BinOp, ColumnName, Expr, Literal, UnOp};
use spg_storage::{Row, Value};

use super::{
    EvalContext, EvalError, apply_binary, apply_unary, column_collation, composite_eq, eval_expr,
    like_match_inner, literal_to_value,
};

pub(crate) enum Step {
    /// Pre-resolved column read (position into the row).
    Column(usize),
    /// Pre-converted literal.
    Lit(Value),
    /// Pops rhs then lhs, pushes the op result. Eager both-sides
    /// evaluation — same as the interpreter (no short-circuit).
    Binary(BinOp),
    /// Comparison whose operands referenced a CaseInsensitive
    /// column: ASCII-fold Text operands first (decided at compile
    /// time; the interpreter re-decides per row).
    BinaryCi(BinOp),
    Unary(UnOp),
    IsNull {
        negated: bool,
    },
    /// v7.32 (architecture v2, P1) — `needle [NOT] IN (literals…)`.
    /// The membership SET is a COMPILE PRODUCT, not a runtime cache:
    /// it lives in the step, so there is no "forgot to pass the
    /// memo" failure mode (the round-25 18.7 s accident is now
    /// unconstructable — see v7.32-executor-architecture-design.md
    /// invariant I2). The needle is the preceding sub-program; this
    /// step pops it. `fallback` is the whole InList node, used only
    /// when the runtime needle family doesn't match the set
    /// (e.g. Float needle vs Int set) — same escape the interpreter
    /// takes, evaluated cold.
    InSet {
        set: crate::memoize::InListSet,
        has_null: bool,
        negated: bool,
        fallback: Expr,
    },
    /// v7.32 (P1) — `text [NOT] [I]LIKE '<literal pattern>'`. The
    /// pattern (and its lowercased form for ILIKE) is compiled once;
    /// the step pops the text operand.
    Like {
        pattern: alloc::vec::Vec<char>,
        negated: bool,
        case_insensitive: bool,
    },
    /// v7.36 (perf — mailrs Ask 1) — pure scalar function call
    /// (LENGTH, COALESCE, UPPER, etc.) on already-pushed args.
    /// Pops `n_args` values, calls `apply_function(name, args, ctx)`,
    /// pushes the result. Replaces the Subtree fallback for the
    /// "function over bound columns" shape that aggregate arg paths
    /// like `SUM(LENGTH(text_body))` and `MAX(COALESCE(col, ''))`
    /// otherwise force the row-materialise eval path. Only the
    /// `fully_compilable` whitelist (PURE scalars — no NOW / RANDOM
    /// / sequence accessors) is emitted; everything else stays on
    /// `Step::Subtree`.
    /// `name_lower` is pre-lowercased at compile time so the per-
    /// row dispatch in `apply_function` skips an allocation on
    /// every input row.
    Function {
        name_lower: alloc::string::String,
        n_args: usize,
    },
    /// v7.36 (perf — mailrs Ask 1 SUM(LENGTH(text_body)) zero-copy)
    /// — `LENGTH(<column>)` / `CHAR_LENGTH(<column>)` /
    /// `CHARACTER_LENGTH(<column>)` over a bound column. Reads the
    /// cell by reference, computes the char length WITHOUT cloning
    /// the underlying `String` — the 1 KB text bodies in
    /// `user_storage_usage` otherwise pay 25 k × 1 KB heap allocs
    /// per query just to push a `Value::Text` onto the stack so the
    /// next Step pops it and asks `s.len()`.
    ColumnLength {
        pos: usize,
    },
    /// v7.36 — `OCTET_LENGTH(<column>)` — byte count, regardless of
    /// encoding. Even simpler than `ColumnLength` (no ASCII probe).
    ColumnOctetLength {
        pos: usize,
    },
    /// v7.36 — `CAST(<expr> AS <ty>)` over an already-pushed value.
    /// Pure / context-free conversion goes through the same
    /// `cast_value` dispatcher the interpreter uses.
    Cast {
        target: spg_sql::ast::CastTarget,
    },
    /// Fallback: interpret this subtree with eval_expr.
    Subtree(Expr),
}

pub(crate) struct CompiledExpr {
    steps: Vec<Step>,
}

impl CompiledExpr {
    /// v7.36 (perf — mailrs Phase 1, user_storage_usage hot loop) —
    /// shape inspector for the aggregate's tight inner. Returns
    /// `Some(pos)` iff this compiled expression is exactly the
    /// single step `ColumnLength { pos }` — i.e. `LENGTH(<column>)`
    /// on a bound text column with no surrounding work.
    pub(crate) fn as_single_column_length(&self) -> Option<usize> {
        if self.steps.len() == 1
            && let Step::ColumnLength { pos } = &self.steps[0]
        {
            Some(*pos)
        } else {
            None
        }
    }
}

/// Column-position resolution at compile time. Mirrors the happy
/// layers of `resolve_column`; ANY case that would reach an error
/// path, an ambiguity, or a miss returns None so the node falls
/// back to the interpreter (identical runtime error / NULL
/// semantics).
fn compile_column_pos(c: &ColumnName, ctx: &EvalContext<'_>) -> Option<usize> {
    if let Some(q) = &c.qualifier {
        if let Some(pos) = ctx
            .columns
            .iter()
            .position(|s| composite_eq(&s.name, q, &c.name))
        {
            return Some(pos);
        }
        // resolve_column's error layers live behind this point:
        // composites under the qualifier exist (ColumnNotFound) or
        // the qualifier is unknown (UnknownQualifier) — interpret.
        let prefix_exists = ctx.columns.iter().any(|s| {
            s.name.starts_with(q.as_str()) && s.name.as_bytes().get(q.len()) == Some(&b'.')
        });
        if prefix_exists {
            return None;
        }
        match ctx.table_alias {
            // Alias-accepted single-table reference: fall through
            // to the bare layers (the inner-subquery hot shape).
            Some(a) if a == q => {}
            _ => return None,
        }
    }
    if let Some(pos) = ctx.columns.iter().position(|s| s.name == c.name) {
        return Some(pos);
    }
    let mut matches = ctx.columns.iter().enumerate().filter(|(_, s)| {
        s.name.len() > c.name.len()
            && s.name.ends_with(c.name.as_str())
            && s.name.as_bytes()[s.name.len() - c.name.len() - 1] == b'.'
    });
    let first = matches.next();
    if matches.next().is_some() {
        return None; // ambiguous — interpreter owns the error text
    }
    first.map(|(i, _)| i)
}

fn compile_into(e: &Expr, ctx: &EvalContext<'_>, steps: &mut Vec<Step>) {
    match e {
        Expr::Literal(l) => steps.push(Step::Lit(literal_to_value(l))),
        Expr::Column(c) => match compile_column_pos(c, ctx) {
            Some(pos) => steps.push(Step::Column(pos)),
            None => steps.push(Step::Subtree(e.clone())),
        },
        Expr::Binary { lhs, op, rhs } => {
            compile_into(lhs, ctx, steps);
            compile_into(rhs, ctx, steps);
            let cmp = matches!(
                op,
                BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::LtEq | BinOp::Gt | BinOp::GtEq
            );
            let ci = cmp
                && (matches!(
                    column_collation(lhs, ctx),
                    Some(spg_storage::Collation::CaseInsensitive)
                ) || matches!(
                    column_collation(rhs, ctx),
                    Some(spg_storage::Collation::CaseInsensitive)
                ));
            steps.push(if ci {
                Step::BinaryCi(*op)
            } else {
                Step::Binary(*op)
            });
        }
        Expr::Unary { op, expr } => {
            compile_into(expr, ctx, steps);
            steps.push(Step::Unary(*op));
        }
        Expr::IsNull { expr, negated } => {
            compile_into(expr, ctx, steps);
            steps.push(Step::IsNull { negated: *negated });
        }
        Expr::InList {
            expr,
            list,
            negated,
        } => {
            // I2: the set is built at compile time. The gate
            // (`fully_compilable`) guarantees we only reach here
            // when the list builds a set and the needle compiles —
            // but keep the Subtree fallback for defence in depth.
            match crate::build_in_list_set(list) {
                Some(entry) if fully_compilable(expr) => {
                    compile_into(expr, ctx, steps);
                    steps.push(Step::InSet {
                        set: entry.set,
                        has_null: entry.has_null,
                        negated: *negated,
                        fallback: e.clone(),
                    });
                }
                _ => steps.push(Step::Subtree(e.clone())),
            }
        }
        Expr::Like {
            expr,
            pattern,
            negated,
            case_insensitive,
        } => match literal_text_pattern(pattern) {
            Some(pat) if fully_compilable(expr) => {
                // v7.36 (perf — mailrs Phase 1, get_contacts hot
                // inner) — trivial all-`%` pattern (`%`, `%%`, …)
                // matches every non-NULL text. Collapse the LIKE
                // into a `lhs IS NOT NULL` check: emit the operand
                // then `IsNull { negated: !*negated }`. For ILIKE
                // `%%` on 25 k rows the per-row `like_match_inner`
                // → 2-char walk (~30 ns each) becomes a tag check
                // (~3 ns); the operand still gets evaluated for the
                // NULL semantics that SQL `LIKE` requires.
                if !pat.is_empty() && pat.chars().all(|c| c == '%') {
                    compile_into(expr, ctx, steps);
                    steps.push(Step::IsNull { negated: !*negated });
                    return;
                }
                compile_into(expr, ctx, steps);
                let chars: alloc::vec::Vec<char> = if *case_insensitive {
                    pat.to_lowercase().chars().collect()
                } else {
                    pat.chars().collect()
                };
                steps.push(Step::Like {
                    pattern: chars,
                    negated: *negated,
                    case_insensitive: *case_insensitive,
                });
            }
            _ => steps.push(Step::Subtree(e.clone())),
        },
        // v7.36 — PURE scalar function call: emit args then a
        // single Function step that pops them. `fully_compilable`
        // gates the whitelist + recurses into args, so this branch
        // only fires when the entire subtree is compilable.
        Expr::FunctionCall { name, args } if is_pure_scalar_function(name) => {
            // v7.36 — specialise `LENGTH(<column>)` /
            // `OCTET_LENGTH(<column>)` so the column's `Value::Text`
            // isn't cloned just to read its length. The general
            // `Step::Function` path goes through `apply_function`,
            // which can't borrow off the stack — it copies.
            let lower = name.to_ascii_lowercase();
            if args.len() == 1 {
                if let Expr::Column(c) = &args[0]
                    && let Some(pos) = compile_column_pos(c, ctx)
                {
                    match lower.as_str() {
                        "length" | "char_length" | "character_length" => {
                            steps.push(Step::ColumnLength { pos });
                            return;
                        }
                        "octet_length" => {
                            steps.push(Step::ColumnOctetLength { pos });
                            return;
                        }
                        _ => {}
                    }
                }
            }
            for a in args {
                compile_into(a, ctx, steps);
            }
            steps.push(Step::Function {
                name_lower: lower,
                n_args: args.len(),
            });
        }
        Expr::Cast { expr, target } => {
            compile_into(expr, ctx, steps);
            steps.push(Step::Cast { target: *target });
        }
        other => steps.push(Step::Subtree(other.clone())),
    }
}

/// Literal text pattern behind a LIKE/ILIKE, if any.
fn literal_text_pattern(pattern: &Expr) -> Option<&str> {
    match pattern {
        Expr::Literal(Literal::String(s)) => Some(s.as_str()),
        _ => None,
    }
}

/// True when the whole tree consists of nodes the compiler models
/// natively. Mixed trees stay on the interpreted path: a Subtree
/// fallback would run WITHOUT the per-query MemoizeCache, and
/// memo-dependent nodes (InList set fast path — round-25) rebuild
/// per row there. Measured: compiling a search WHERE with an
/// InList subtree regressed 634 ms → 18.7 s.
pub(crate) fn fully_compilable(e: &Expr) -> bool {
    match e {
        Expr::Literal(_) | Expr::Column(_) => true,
        Expr::Binary { lhs, rhs, .. } => fully_compilable(lhs) && fully_compilable(rhs),
        Expr::Unary { expr, .. } | Expr::IsNull { expr, .. } => fully_compilable(expr),
        // I2: an InList is compilable ONLY when it becomes a real
        // InSet (all-literal list + compilable needle). A
        // non-set-able InList must keep the whole tree off the
        // compiled path so it never degrades to a memo-less,
        // O(list) per-row Subtree (the round-25 18.7 s trap).
        Expr::InList { expr, list, .. } => {
            fully_compilable(expr) && crate::build_in_list_set(list).is_some()
        }
        Expr::Like { expr, pattern, .. } => {
            fully_compilable(expr) && literal_text_pattern(pattern).is_some()
        }
        // v7.36 (perf — mailrs Ask 1) — PURE scalar functions over
        // compilable args go to `Step::Function`. The whitelist
        // covers the high-traffic / non-volatile cases; anything
        // outside (NOW, RANDOM, sequence accessors, EXTRACT-with-
        // context-dependent fields, etc.) stays on Subtree where
        // the interpreter has the full ctx.
        Expr::FunctionCall { name, args } => {
            is_pure_scalar_function(name) && args.iter().all(fully_compilable)
        }
        // v7.36 — CAST over a compilable expression. `cast_value`
        // is pure / context-free for the scalar targets we care
        // about (text, ints, floats, bool, dates).
        Expr::Cast { expr, .. } => fully_compilable(expr),
        _ => false,
    }
}

/// v7.36 — PURE scalar function whitelist for `Step::Function`.
/// "Pure" means: deterministic, context-independent, no side
/// effects. Aggregate names (sum / count / max / …) are filtered
/// upstream by the caller — they never reach the compiler. NOW /
/// RANDOM / sequence accessors are excluded because they need the
/// `EvalContext`'s clock / sequence resolver and aren't
/// deterministic. EXTRACT is excluded because the field kind is
/// parsed off the Expr tree, not an arg.
fn is_pure_scalar_function(name: &str) -> bool {
    matches!(
        name.to_ascii_lowercase().as_str(),
        // string length + slicing
        "length"
            | "char_length"
            | "character_length"
            | "octet_length"
            | "upper"
            | "lower"
            | "trim"
            | "ltrim"
            | "rtrim"
            | "btrim"
            | "left"
            | "right"
            | "substring"
            | "substr"
            | "replace"
            | "position"
            | "strpos"
            | "concat"
            | "concat_ws"
            | "reverse"
            | "repeat"
            | "lpad"
            | "rpad"
            | "split_part"
            // null/conditional
            | "coalesce"
            | "nullif"
            | "greatest"
            | "least"
            | "ifnull"
            | "isnull"
            | "nvl"
            // numeric
            | "abs"
            | "ceil"
            | "ceiling"
            | "floor"
            | "round"
            | "trunc"
            | "sqrt"
            | "power"
            | "pow"
            | "mod"
            | "sign"
            | "log"
            | "log10"
            | "exp"
            | "ln"
            // boolean / cast helpers
            | "cast"
    )
}

pub(crate) fn compile_expr(e: &Expr, ctx: &EvalContext<'_>) -> CompiledExpr {
    let mut steps = Vec::new();
    compile_into(e, ctx, &mut steps);
    CompiledExpr { steps }
}

/// Run a compiled program. `stack` is caller-owned scratch
/// (cleared here) so tight row loops never touch the allocator
/// for the machine itself.
pub(crate) fn eval_compiled(
    c: &CompiledExpr,
    row: &Row,
    ctx: &EvalContext<'_>,
    stack: &mut Vec<Value>,
) -> Result<Value, EvalError> {
    eval_compiled_ref(c, &crate::join::RowRef::Owned(row), ctx, stack)
}

/// v7.32 (P4 borrow channel, increment 2) — the RowRef-borrowing form of
/// `eval_compiled`. `Step::Column` borrows its cell straight from the
/// RowRef (a join tuple resolves it via `tuple_value`, never
/// materialising a combined Row); only the rare Subtree / InSet
/// cross-family fallback materialises the row once. Bit-for-bit
/// equivalent to the Owned path — `eval_compiled` above is now a thin
/// `RowRef::Owned` wrapper, so there is a single interpreter (invariant
/// I3); a differential test pins the equivalence.
pub(crate) fn eval_compiled_ref(
    c: &CompiledExpr,
    row: &crate::join::RowRef<'_>,
    ctx: &EvalContext<'_>,
    stack: &mut Vec<Value>,
) -> Result<Value, EvalError> {
    stack.clear();
    for step in &c.steps {
        match step {
            Step::Column(pos) => {
                stack.push(row.get(*pos).cloned().unwrap_or(Value::Null));
            }
            Step::Lit(v) => stack.push(v.clone()),
            Step::Binary(op) => {
                let r = stack.pop().unwrap_or(Value::Null);
                let l = stack.pop().unwrap_or(Value::Null);
                stack.push(apply_binary(*op, l, r)?);
            }
            Step::BinaryCi(op) => {
                let fold = |v: Value| match v {
                    Value::Text(s) => Value::Text(s.to_ascii_lowercase()),
                    other => other,
                };
                let r = fold(stack.pop().unwrap_or(Value::Null));
                let l = fold(stack.pop().unwrap_or(Value::Null));
                stack.push(apply_binary(*op, l, r)?);
            }
            Step::Unary(op) => {
                let v = stack.pop().unwrap_or(Value::Null);
                stack.push(apply_unary(*op, v)?);
            }
            Step::IsNull { negated } => {
                let v = stack.pop().unwrap_or(Value::Null);
                let is_null = matches!(v, Value::Null);
                stack.push(Value::Bool(if *negated { !is_null } else { is_null }));
            }
            Step::InSet {
                set,
                has_null,
                negated,
                fallback,
            } => {
                let needle = stack.pop().unwrap_or(Value::Null);
                let contained = match (&needle, set) {
                    // Non-empty list + NULL needle → NULL (NOT NULL
                    // is still NULL) — matches the interpreter and
                    // eval_with_in_sets.
                    (Value::Null, _) => {
                        stack.push(Value::Null);
                        continue;
                    }
                    (Value::SmallInt(n), crate::memoize::InListSet::Int(s)) => {
                        s.contains(&i64::from(*n))
                    }
                    (Value::Int(n), crate::memoize::InListSet::Int(s)) => {
                        s.contains(&i64::from(*n))
                    }
                    (Value::BigInt(n), crate::memoize::InListSet::Int(s)) => s.contains(n),
                    (Value::Text(t), crate::memoize::InListSet::Text(s)) => s.contains(t.as_str()),
                    // Cross-family needle: take the interpreter's
                    // exact coercion / error path on the whole node.
                    _ => {
                        stack.push(eval_expr(fallback, &row.as_row(), ctx)?);
                        continue;
                    }
                };
                let inner = if contained {
                    Value::Bool(true)
                } else if *has_null {
                    Value::Null
                } else {
                    Value::Bool(false)
                };
                stack.push(match (negated, inner) {
                    (true, Value::Bool(b)) => Value::Bool(!b),
                    (_, v) => v,
                });
            }
            Step::Like {
                pattern,
                negated,
                case_insensitive,
            } => {
                let v = stack.pop().unwrap_or(Value::Null);
                match v {
                    Value::Null => stack.push(Value::Null),
                    Value::Text(t) => {
                        let text: Vec<char> = if *case_insensitive {
                            t.to_lowercase().chars().collect()
                        } else {
                            t.chars().collect()
                        };
                        let m = like_match_inner(&text, 0, pattern, 0);
                        stack.push(Value::Bool(if *negated { !m } else { m }));
                    }
                    other => {
                        return Err(EvalError::TypeMismatch {
                            detail: format!(
                                "LIKE requires text operands, got {:?}",
                                other.data_type()
                            ),
                        });
                    }
                }
            }
            Step::ColumnLength { pos } => {
                // v7.36 — zero-copy LENGTH on a column. Read the
                // cell by reference; compute char count without
                // cloning the underlying `String`. Saves 25 k ×
                // ~1 KB heap clones on the user_storage_usage shape.
                let v = row.get(*pos).unwrap_or(&Value::Null);
                let pushed = match v {
                    Value::Null => Value::Null,
                    Value::Text(s) => {
                        let n = if s.is_ascii() {
                            i32::try_from(s.len()).unwrap_or(i32::MAX)
                        } else {
                            i32::try_from(s.chars().count()).unwrap_or(i32::MAX)
                        };
                        Value::Int(n)
                    }
                    Value::Bytes(b) => Value::Int(i32::try_from(b.len()).unwrap_or(i32::MAX)),
                    other => {
                        return Err(EvalError::TypeMismatch {
                            detail: format!(
                                "length() needs text or bytea, got {:?}",
                                other.data_type()
                            ),
                        });
                    }
                };
                stack.push(pushed);
            }
            Step::ColumnOctetLength { pos } => {
                let v = row.get(*pos).unwrap_or(&Value::Null);
                let pushed = match v {
                    Value::Null => Value::Null,
                    Value::Text(s) => Value::Int(i32::try_from(s.len()).unwrap_or(i32::MAX)),
                    Value::Bytes(b) => Value::Int(i32::try_from(b.len()).unwrap_or(i32::MAX)),
                    other => {
                        return Err(EvalError::TypeMismatch {
                            detail: format!(
                                "octet_length() needs text or bytea, got {:?}",
                                other.data_type()
                            ),
                        });
                    }
                };
                stack.push(pushed);
            }
            Step::Function { name_lower, n_args } => {
                let start = stack.len().saturating_sub(*n_args);
                // `apply_function` borrows the trailing `n_args`
                // values off the stack; we then truncate + push the
                // result. `name_lower` is pre-lowercased at compile
                // time, so dispatch skips the per-row
                // `to_ascii_lowercase()` allocation.
                let result =
                    super::functions::apply_function_lower(name_lower, &stack[start..], ctx)?;
                stack.truncate(start);
                stack.push(result);
            }
            Step::Cast { target } => {
                let v = stack.pop().unwrap_or(Value::Null);
                stack.push(super::cast::cast_value(v, *target)?);
            }
            Step::Subtree(e) => stack.push(eval_expr(e, &row.as_row(), ctx)?),
        }
    }
    Ok(stack.pop().unwrap_or(Value::Null))
}