Skip to main content

spg_engine/
triggers.rs

1//! v7.12.4 — PL/pgSQL row-level trigger executor.
2//!
3//! The catalogued [`spg_storage::FunctionDef`] carries the trigger
4//! function's source body as raw text (between the original
5//! `$$ ... $$`). Each time a trigger fires we re-parse the body
6//! via `spg_sql::parse_function_body` and walk the resulting
7//! [`spg_sql::ast::PlPgSqlBlock`] against a NEW / OLD row context.
8//!
9//! v7.12.4 surface (the minimum that lets a mailrs-shape
10//! `update_search_vector` trigger run end-to-end):
11//!
12//!   * `NEW.col := <expr>;`     — mutate a NEW cell. BEFORE only.
13//!   * `RETURN NEW;`            — pass the (possibly-mutated) row
14//!                                back to the row writer.
15//!   * `RETURN OLD;`            — return the pre-change row.
16//!   * `RETURN NULL;` / `RETURN;` — skip the write (BEFORE) or
17//!                                no-op the notification (AFTER).
18//!   * sub-expression eval recurses through the regular
19//!     [`crate::eval::eval_expr`] so anything the SELECT executor
20//!     can compute is fair game inside a trigger body.
21//!
22//! Out of scope for v7.12.4 (land in v7.12.5+):
23//!
24//!   * `DECLARE`'d local variables
25//!   * `IF / ELSIF / ELSE / END IF;` control flow
26//!   * Embedded SQL statements (`UPDATE … WHERE …`, `SELECT … INTO var`)
27//!   * `RAISE NOTICE / RAISE EXCEPTION`
28//!   * Loop constructs
29
30use alloc::collections::BTreeMap;
31use alloc::format;
32use alloc::string::String;
33use alloc::vec::Vec;
34use core::fmt;
35
36use spg_sql::ast::{AssignTarget, Expr, PlPgSqlDeclare, PlPgSqlStmt, RaiseLevel, ReturnTarget};
37use spg_storage::{ColumnSchema, FunctionDef, Row, TriggerDef, Value};
38
39use crate::eval::{self, EvalContext, EvalError};
40
41/// v7.12.7 — embedded SQL statement collected during a trigger
42/// fire, queued for execution after the firing DML completes.
43/// NEW / OLD / DECLARE-local references inside the statement's
44/// Expr tree have already been substituted with literals; the
45/// engine just feeds it to `execute_stmt_with_cancel`.
46#[derive(Debug, Clone, PartialEq)]
47pub struct DeferredEmbeddedStmt {
48    /// Trigger function the embedded SQL came from. Used to
49    /// label recursion errors precisely.
50    pub function: String,
51    /// Substituted statement, ready to execute.
52    pub stmt: spg_sql::ast::Statement,
53}
54
55/// What the trigger function returned. Drives the row-write path
56/// the trigger fired from.
57#[derive(Debug, Clone, PartialEq)]
58pub enum TriggerOutcome {
59    /// `RETURN NEW;` (or `RETURN OLD;`) — write this row.
60    /// For BEFORE triggers, the row may differ from the input
61    /// (e.g. `NEW.search_vector := …` rewrote a cell). For AFTER
62    /// triggers, the value is currently ignored — but we still
63    /// surface it for symmetric callers / future v7.12.5 use.
64    Row(Row),
65    /// `RETURN NULL;` or trigger fell off the end. For a BEFORE
66    /// trigger, the row writer must skip the affected row. For
67    /// an AFTER trigger, no-op.
68    Skip,
69}
70
71/// Result type the trigger executor exposes. Wraps `EvalError`
72/// at the eval-of-expressions layer and adds trigger-specific
73/// failure modes (`OLD.col := …`, unsupported PL/pgSQL feature,
74/// body that fails to re-parse, …).
75#[derive(Debug, Clone, PartialEq)]
76pub enum TriggerError {
77    /// Body source stored in the catalog can't be re-parsed.
78    /// Usually means the function was created against a newer
79    /// PL/pgSQL surface than the running engine knows about.
80    UnparseableBody { function: String, detail: String },
81    /// Trigger function uses a v7.12.5+ language feature
82    /// (DECLARE, IF, embedded SQL, RAISE, …). The error names
83    /// the construct so the operator can plan around it until
84    /// the feature lands.
85    UnsupportedConstruct { function: String, detail: String },
86    /// `OLD.col := <expr>` inside the body. PG itself rejects
87    /// this; we surface a clear message rather than silently
88    /// dropping the assignment.
89    OldIsReadOnly { function: String, column: String },
90    /// `NEW.col := <expr>` in an AFTER trigger — same rationale
91    /// as OLD: PG enforces "NEW is read-only after the row has
92    /// been written" and we mirror.
93    NewReadOnlyInAfterTrigger { function: String, column: String },
94    /// `NEW.col := <expr>` against a non-existent column.
95    /// Usually a schema-drift bug.
96    UnknownColumn {
97        function: String,
98        column: String,
99        table: String,
100    },
101    /// Sub-expression eval inside the trigger body failed. The
102    /// wrapped [`EvalError`] explains the underlying cause
103    /// (`ColumnNotFound`, `TypeMismatch`, …).
104    EvalFailed { function: String, cause: EvalError },
105    /// v7.12.6 — `RAISE EXCEPTION '<message>' [, args]*` in the
106    /// trigger body. The interpreter formats the args into the
107    /// message via PG-style `%` substitution and surfaces the
108    /// resolved text up to the caller.
109    RaiseException { function: String, message: String },
110}
111
112impl fmt::Display for TriggerError {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        match self {
115            Self::UnparseableBody { function, detail } => {
116                write!(
117                    f,
118                    "trigger function {function:?} body did not parse: {detail}"
119                )
120            }
121            Self::UnsupportedConstruct { function, detail } => {
122                write!(
123                    f,
124                    "trigger function {function:?} uses an unsupported PL/pgSQL construct: {detail}"
125                )
126            }
127            Self::OldIsReadOnly { function, column } => {
128                write!(
129                    f,
130                    "trigger function {function:?}: cannot assign to OLD.{column} (OLD is read-only — PG rule)"
131                )
132            }
133            Self::NewReadOnlyInAfterTrigger { function, column } => {
134                write!(
135                    f,
136                    "trigger function {function:?}: cannot assign to NEW.{column} inside an AFTER trigger \
137                     (NEW is read-only post-write — use BEFORE triggers for mutation, or an embedded UPDATE statement \
138                      in v7.12.5+)"
139                )
140            }
141            Self::UnknownColumn {
142                function,
143                column,
144                table,
145            } => {
146                write!(
147                    f,
148                    "trigger function {function:?}: target column {column:?} not in table {table:?} schema"
149                )
150            }
151            Self::EvalFailed { function, cause } => {
152                write!(
153                    f,
154                    "trigger function {function:?}: expression eval failed: {cause}"
155                )
156            }
157            Self::RaiseException { function, message } => {
158                write!(
159                    f,
160                    "trigger function {function:?}: RAISE EXCEPTION {message:?}"
161                )
162            }
163        }
164    }
165}
166
167/// Fire a single row-level trigger.
168///
169/// `is_after` is true for AFTER triggers; the executor enforces
170/// "NEW is read-only" by rejecting NEW.col assignments in that
171/// case. AFTER trigger return values are ignored by callers; the
172/// returned [`TriggerOutcome`] just carries the (possibly
173/// untouched) NEW row for symmetry.
174#[allow(clippy::too_many_arguments)] // the table_name / columns / params /
175// ts-config trio are independent; folding
176// them into a struct just shuffles the
177// boilerplate to the call sites without
178// material gain.
179pub fn fire_row_trigger(
180    function: &FunctionDef,
181    new_row: Option<Row>,
182    old_row: Option<&Row>,
183    table_name: &str,
184    columns: &[ColumnSchema],
185    params: &[Value],
186    default_text_search_config: Option<&str>,
187    is_after: bool,
188) -> Result<(TriggerOutcome, Vec<DeferredEmbeddedStmt>), TriggerError> {
189    if !function.language.eq_ignore_ascii_case("plpgsql") {
190        return Err(TriggerError::UnsupportedConstruct {
191            function: function.name.clone(),
192            detail: format!(
193                "v7.12.4 only invokes LANGUAGE plpgsql trigger functions; \
194                 {:?} declares LANGUAGE {}",
195                function.name, function.language
196            ),
197        });
198    }
199    let block = spg_sql::parse_function_body(&function.body).map_err(|e| {
200        TriggerError::UnparseableBody {
201            function: function.name.clone(),
202            detail: format!("{e}"),
203        }
204    })?;
205    // v7.12.6 — initialise local variable scope from the DECLARE
206    // block. Each init expr (if any) evaluates against the
207    // so-far-bound scope + the NEW/OLD context, so later DECLAREs
208    // can reference earlier ones.
209    let mut locals: BTreeMap<String, Value> = BTreeMap::new();
210    init_locals_from_declarations(
211        &block.declarations,
212        &mut locals,
213        new_row.as_ref(),
214        old_row,
215        columns,
216        table_name,
217        params,
218        default_text_search_config,
219        &function.name,
220    )?;
221    let mut current_new = new_row;
222    let ctx = BodyCtx {
223        function: &function.name,
224        table_name,
225        columns,
226        params,
227        default_text_search_config,
228        is_after,
229    };
230    let mut deferred: Vec<DeferredEmbeddedStmt> = Vec::new();
231    let outcome = match execute_stmts(
232        &block.statements,
233        &mut current_new,
234        old_row,
235        &mut locals,
236        &ctx,
237        &mut deferred,
238    )? {
239        BodyOutcome::Return(target) => resolve_return(target, current_new, old_row),
240        // Body fell off without an explicit RETURN. PL/pgSQL
241        // default is `RETURN NULL`; we mirror — the BEFORE
242        // trigger then skips the row.
243        BodyOutcome::FellThrough => TriggerOutcome::Skip,
244    };
245    Ok((outcome, deferred))
246}
247
248/// v7.12.6 — body-walk return signal. `Return(target)` short-
249/// circuits the caller; `FellThrough` means the statement list
250/// completed without a RETURN, equivalent to PL/pgSQL's implicit
251/// `RETURN NULL`.
252enum BodyOutcome {
253    Return(ReturnTarget),
254    FellThrough,
255}
256
257/// Shared parameters every body-stmt evaluation needs. Bundled so
258/// the recursive `execute_stmts` doesn't have to thread eight
259/// individual `&str` / `&[…]` args around.
260struct BodyCtx<'a> {
261    function: &'a str,
262    table_name: &'a str,
263    columns: &'a [ColumnSchema],
264    params: &'a [Value],
265    default_text_search_config: Option<&'a str>,
266    is_after: bool,
267}
268
269fn execute_stmts(
270    stmts: &[PlPgSqlStmt],
271    current_new: &mut Option<Row>,
272    old_row: Option<&Row>,
273    locals: &mut BTreeMap<String, Value>,
274    ctx: &BodyCtx<'_>,
275    deferred: &mut Vec<DeferredEmbeddedStmt>,
276) -> Result<BodyOutcome, TriggerError> {
277    for stmt in stmts {
278        match stmt {
279            PlPgSqlStmt::Assign { target, value } => {
280                let evaluated = eval_with_new_old_and_locals(
281                    value,
282                    current_new.as_ref(),
283                    old_row,
284                    locals,
285                    ctx.columns,
286                    ctx.table_name,
287                    ctx.params,
288                    ctx.default_text_search_config,
289                )
290                .map_err(|cause| TriggerError::EvalFailed {
291                    function: ctx.function.into(),
292                    cause,
293                })?;
294                match target {
295                    AssignTarget::NewColumn(col) => {
296                        if ctx.is_after {
297                            return Err(TriggerError::NewReadOnlyInAfterTrigger {
298                                function: ctx.function.into(),
299                                column: col.clone(),
300                            });
301                        }
302                        let pos = ctx
303                            .columns
304                            .iter()
305                            .position(|c| c.name.eq_ignore_ascii_case(col))
306                            .ok_or_else(|| TriggerError::UnknownColumn {
307                                function: ctx.function.into(),
308                                column: col.clone(),
309                                table: alloc::string::ToString::to_string(&ctx.table_name),
310                            })?;
311                        let row = current_new.as_mut().ok_or_else(|| {
312                            TriggerError::UnsupportedConstruct {
313                                function: ctx.function.into(),
314                                detail: format!(
315                                    "NEW.{col} := … requires a NEW row context \
316                                     (BEFORE INSERT / UPDATE only — not available on DELETE)"
317                                ),
318                            }
319                        })?;
320                        row.values[pos] = evaluated;
321                    }
322                    AssignTarget::OldColumn(col) => {
323                        return Err(TriggerError::OldIsReadOnly {
324                            function: ctx.function.into(),
325                            column: col.clone(),
326                        });
327                    }
328                    AssignTarget::Local(name) => {
329                        // v7.12.6 — write into the DECLARE scope.
330                        // Loose-typing: we don't enforce the
331                        // declared type at runtime (PG's INTO
332                        // coerces; v7.12.6 just stores the
333                        // evaluated Value as-is). Type coercion
334                        // tightens in a later release.
335                        locals.insert(name.clone(), evaluated);
336                    }
337                }
338            }
339            PlPgSqlStmt::Return(target) => {
340                return Ok(BodyOutcome::Return(target.clone()));
341            }
342            PlPgSqlStmt::If {
343                branches,
344                else_branch,
345            } => {
346                let mut matched = false;
347                for (cond_expr, body) in branches {
348                    let cond_val = eval_with_new_old_and_locals(
349                        cond_expr,
350                        current_new.as_ref(),
351                        old_row,
352                        locals,
353                        ctx.columns,
354                        ctx.table_name,
355                        ctx.params,
356                        ctx.default_text_search_config,
357                    )
358                    .map_err(|cause| TriggerError::EvalFailed {
359                        function: ctx.function.into(),
360                        cause,
361                    })?;
362                    if matches!(cond_val, Value::Bool(true)) {
363                        matched = true;
364                        match execute_stmts(body, current_new, old_row, locals, ctx, deferred)? {
365                            BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
366                            BodyOutcome::FellThrough => {}
367                        }
368                        break;
369                    }
370                }
371                if !matched && !else_branch.is_empty() {
372                    match execute_stmts(else_branch, current_new, old_row, locals, ctx, deferred)? {
373                        BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
374                        BodyOutcome::FellThrough => {}
375                    }
376                }
377            }
378            PlPgSqlStmt::Raise {
379                level,
380                message,
381                args,
382            } => {
383                // Resolve every %-format placeholder by evaluating
384                // each arg expression and rendering its Value.
385                let mut rendered_args: Vec<String> = Vec::with_capacity(args.len());
386                for a in args {
387                    let v = eval_with_new_old_and_locals(
388                        a,
389                        current_new.as_ref(),
390                        old_row,
391                        locals,
392                        ctx.columns,
393                        ctx.table_name,
394                        ctx.params,
395                        ctx.default_text_search_config,
396                    )
397                    .map_err(|cause| TriggerError::EvalFailed {
398                        function: ctx.function.into(),
399                        cause,
400                    })?;
401                    rendered_args.push(value_to_display_string(&v));
402                }
403                let resolved = format_raise_message(message, &rendered_args);
404                if matches!(level, RaiseLevel::Exception) {
405                    return Err(TriggerError::RaiseException {
406                        function: ctx.function.into(),
407                        message: resolved,
408                    });
409                }
410                // NOTICE / WARNING / INFO / LOG / DEBUG — log to
411                // stderr for v7.12.6. Wiring through the server's
412                // log channel is a v7.12.7+ polish item; the
413                // resolved message stays accessible regardless.
414                let _ = resolved;
415                let _ = level;
416            }
417            PlPgSqlStmt::EmbeddedSql(boxed_stmt) => {
418                // v7.12.7 — substitute NEW/OLD/locals into every
419                // Expr field of the statement, then queue for
420                // post-DML execution. The trigger interpreter
421                // doesn't call back into Engine::execute directly
422                // (that would deadlock the row-write mut borrow);
423                // the engine drains `deferred` after the firing
424                // INSERT/UPDATE/DELETE completes its main work.
425                let mut substituted = (**boxed_stmt).clone();
426                substitute_trigger_context_in_statement(
427                    &mut substituted,
428                    current_new.as_ref(),
429                    old_row,
430                    locals,
431                    ctx.columns,
432                )
433                .map_err(|cause| TriggerError::EvalFailed {
434                    function: ctx.function.into(),
435                    cause,
436                })?;
437                deferred.push(DeferredEmbeddedStmt {
438                    function: ctx.function.into(),
439                    stmt: substituted,
440                });
441            }
442        }
443    }
444    Ok(BodyOutcome::FellThrough)
445}
446
447fn resolve_return(
448    target: ReturnTarget,
449    current_new: Option<Row>,
450    old_row: Option<&Row>,
451) -> TriggerOutcome {
452    match target {
453        ReturnTarget::New => current_new.map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
454        ReturnTarget::Old => old_row
455            .cloned()
456            .map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
457        ReturnTarget::Null => TriggerOutcome::Skip,
458        // The scalar UDF surface in a later release handles
459        // RETURN <expr> properly; for now we fall through to Skip.
460        ReturnTarget::Expr(_) => TriggerOutcome::Skip,
461    }
462}
463
464#[allow(clippy::too_many_arguments)]
465fn init_locals_from_declarations(
466    decls: &[PlPgSqlDeclare],
467    locals: &mut BTreeMap<String, Value>,
468    new_row: Option<&Row>,
469    old_row: Option<&Row>,
470    columns: &[ColumnSchema],
471    table_name: &str,
472    params: &[Value],
473    default_text_search_config: Option<&str>,
474    function_name: &str,
475) -> Result<(), TriggerError> {
476    for d in decls {
477        let v = if let Some(init) = &d.default {
478            eval_with_new_old_and_locals(
479                init,
480                new_row,
481                old_row,
482                locals,
483                columns,
484                table_name,
485                params,
486                default_text_search_config,
487            )
488            .map_err(|cause| TriggerError::EvalFailed {
489                function: function_name.into(),
490                cause,
491            })?
492        } else {
493            Value::Null
494        };
495        locals.insert(d.name.clone(), v);
496    }
497    Ok(())
498}
499
500/// v7.12.6 — PG `%` format expansion for RAISE. Sequential
501/// positional substitution; `%%` produces a literal `%`.
502fn format_raise_message(fmt: &str, args: &[String]) -> String {
503    let mut out = String::with_capacity(fmt.len());
504    let mut iter = args.iter();
505    let mut chars = fmt.chars().peekable();
506    while let Some(c) = chars.next() {
507        if c == '%' {
508            match chars.peek() {
509                Some('%') => {
510                    out.push('%');
511                    chars.next();
512                }
513                _ => {
514                    if let Some(a) = iter.next() {
515                        out.push_str(a);
516                    } else {
517                        // Unconsumed placeholder — PG emits an
518                        // error here; we mirror by leaving the
519                        // bare `%` so the message stays readable.
520                        out.push('%');
521                    }
522                }
523            }
524        } else {
525            out.push(c);
526        }
527    }
528    out
529}
530
531/// v7.12.6 — Display rendering for a [`Value`] inside a RAISE
532/// message arg. Booleans / ints / floats render naturally;
533/// strings render unquoted; other types fall back to Debug.
534fn value_to_display_string(v: &Value) -> String {
535    use alloc::string::ToString;
536    match v {
537        Value::Null => String::new(),
538        Value::Bool(b) => b.to_string(),
539        Value::SmallInt(n) => n.to_string(),
540        Value::Int(n) => n.to_string(),
541        Value::BigInt(n) => n.to_string(),
542        Value::Float(x) => x.to_string(),
543        Value::Text(s) | Value::Json(s) => s.clone(),
544        other => format!("{other:?}"),
545    }
546}
547
548/// Evaluate a sub-expression against the NEW / OLD row context.
549/// Pre-walks the AST replacing every `NEW.col` / `OLD.col`
550/// reference with a literal of the actual value, then dispatches
551/// to the regular [`eval::eval_expr`]. Pre-walk strategy mirrors
552/// the existing [`substitute_in_expr`] used by correlated
553/// subqueries.
554/// v7.12.6 — same as [`eval_with_new_old`] but also substitutes
555/// qualifier-less `Column(<name>)` references whose name matches
556/// a `DECLARE`'d local variable. Locals shadow table-column refs
557/// (PG semantics — though a careful trigger function avoids the
558/// collision via naming convention).
559#[allow(clippy::too_many_arguments)]
560fn eval_with_new_old_and_locals(
561    expr: &Expr,
562    new_row: Option<&Row>,
563    old_row: Option<&Row>,
564    locals: &BTreeMap<String, Value>,
565    columns: &[ColumnSchema],
566    table_alias: &str,
567    params: &[Value],
568    default_text_search_config: Option<&str>,
569) -> Result<Value, EvalError> {
570    let mut rewritten = expr.clone();
571    substitute_locals(&mut rewritten, locals);
572    substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
573    let ctx = EvalContext::new(columns, Some(table_alias))
574        .with_params(params)
575        .with_default_text_search_config(default_text_search_config);
576    let empty = Row::new(Vec::new());
577    eval::eval_expr(&rewritten, &empty, &ctx)
578}
579
580/// v7.12.6 — in-place substitute every qualifier-less
581/// `Column(<name>)` whose name is in `locals` with that local's
582/// current Value as a literal. Runs before [`substitute_new_old`]
583/// so NEW.col / OLD.col references (which have a qualifier) take
584/// the NEW/OLD path normally.
585fn substitute_locals(expr: &mut Expr, locals: &BTreeMap<String, Value>) {
586    if let Expr::Column(c) = expr {
587        if c.qualifier.is_none()
588            && let Some(v) = locals.get(&c.name)
589        {
590            *expr = value_to_literal_expr(&[], 0, v.clone());
591            return;
592        }
593    }
594    match expr {
595        Expr::Binary { lhs, rhs, .. } => {
596            substitute_locals(lhs, locals);
597            substitute_locals(rhs, locals);
598        }
599        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
600            substitute_locals(expr, locals);
601        }
602        Expr::Like { expr, pattern, .. } => {
603            substitute_locals(expr, locals);
604            substitute_locals(pattern, locals);
605        }
606        Expr::FunctionCall { args, .. } => {
607            for a in args {
608                substitute_locals(a, locals);
609            }
610        }
611        Expr::Extract { source, .. } => substitute_locals(source, locals),
612        Expr::Array(items) => {
613            for elem in items {
614                substitute_locals(elem, locals);
615            }
616        }
617        Expr::ArraySubscript { target, index } => {
618            substitute_locals(target, locals);
619            substitute_locals(index, locals);
620        }
621        Expr::AnyAll { expr, array, .. } => {
622            substitute_locals(expr, locals);
623            substitute_locals(array, locals);
624        }
625        Expr::Literal(_)
626        | Expr::Placeholder(_)
627        | Expr::Column(_)
628        | Expr::WindowFunction { .. }
629        | Expr::ScalarSubquery(_)
630        | Expr::Exists { .. }
631        | Expr::InSubquery { .. } => {}
632    }
633}
634
635fn eval_with_new_old(
636    expr: &Expr,
637    new_row: Option<&Row>,
638    old_row: Option<&Row>,
639    columns: &[ColumnSchema],
640    table_alias: &str,
641    params: &[Value],
642    default_text_search_config: Option<&str>,
643) -> Result<Value, EvalError> {
644    let mut rewritten = expr.clone();
645    substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
646    let ctx = EvalContext::new(columns, Some(table_alias))
647        .with_params(params)
648        .with_default_text_search_config(default_text_search_config);
649    // Empty row — the substitution above eliminated every column
650    // reference that depended on NEW / OLD; any remaining column
651    // reference is a bug (would surface as ColumnNotFound).
652    let empty = Row::new(Vec::new());
653    eval::eval_expr(&rewritten, &empty, &ctx)
654}
655
656/// In-place walk: replace every `Column{qualifier=NEW|OLD,name=c}`
657/// reference with the corresponding row value, materialised as
658/// an `Expr::Literal`. Recurses through every Expr variant so
659/// `to_tsvector('english', NEW.subject || ' ' || NEW.sender)`
660/// substitutes cleanly even though the references nest inside
661/// function calls + binary operators.
662fn substitute_new_old(
663    expr: &mut Expr,
664    new_row: Option<&Row>,
665    old_row: Option<&Row>,
666    columns: &[ColumnSchema],
667) -> Result<(), EvalError> {
668    if let Expr::Column(c) = expr {
669        if let Some(q) = &c.qualifier {
670            let lower = q.to_ascii_lowercase();
671            if lower == "new" || lower == "old" {
672                let (row, side) = if lower == "new" {
673                    (new_row, "NEW")
674                } else {
675                    (old_row, "OLD")
676                };
677                let pos = columns
678                    .iter()
679                    .position(|sc| sc.name.eq_ignore_ascii_case(&c.name))
680                    .ok_or_else(|| EvalError::ColumnNotFound {
681                        name: format!("{side}.{}", c.name),
682                    })?;
683                let v = match row {
684                    Some(r) => r.values.get(pos).cloned().unwrap_or(Value::Null),
685                    None => Value::Null,
686                };
687                *expr = value_to_literal_expr(columns, pos, v);
688                return Ok(());
689            }
690        }
691    }
692    match expr {
693        Expr::Binary { lhs, rhs, .. } => {
694            substitute_new_old(lhs, new_row, old_row, columns)?;
695            substitute_new_old(rhs, new_row, old_row, columns)?;
696        }
697        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
698            substitute_new_old(expr, new_row, old_row, columns)?;
699        }
700        Expr::Like { expr, pattern, .. } => {
701            substitute_new_old(expr, new_row, old_row, columns)?;
702            substitute_new_old(pattern, new_row, old_row, columns)?;
703        }
704        Expr::FunctionCall { args, .. } => {
705            for a in args {
706                substitute_new_old(a, new_row, old_row, columns)?;
707            }
708        }
709        Expr::Extract { source, .. } => substitute_new_old(source, new_row, old_row, columns)?,
710        Expr::Array(items) => {
711            for elem in items {
712                substitute_new_old(elem, new_row, old_row, columns)?;
713            }
714        }
715        Expr::ArraySubscript { target, index } => {
716            substitute_new_old(target, new_row, old_row, columns)?;
717            substitute_new_old(index, new_row, old_row, columns)?;
718        }
719        Expr::AnyAll { expr, array, .. } => {
720            substitute_new_old(expr, new_row, old_row, columns)?;
721            substitute_new_old(array, new_row, old_row, columns)?;
722        }
723        // Leaves + variants we don't recurse into (sub-queries
724        // inside a trigger body would require correlated-query
725        // wiring; carved out of v7.12.4).
726        Expr::Literal(_)
727        | Expr::Placeholder(_)
728        | Expr::Column(_)
729        | Expr::WindowFunction { .. }
730        | Expr::ScalarSubquery(_)
731        | Expr::Exists { .. }
732        | Expr::InSubquery { .. } => {}
733    }
734    Ok(())
735}
736
737/// Turn a [`Value`] back into an [`Expr::Literal`]. Necessary
738/// because [`substitute_new_old`] inlines NEW/OLD cell values
739/// into the expression tree.
740fn value_to_literal_expr(_columns: &[ColumnSchema], _pos: usize, v: Value) -> Expr {
741    use spg_sql::ast::Literal;
742    let lit = match v {
743        Value::Null => Literal::Null,
744        Value::Bool(b) => Literal::Bool(b),
745        Value::SmallInt(n) => Literal::Integer(i64::from(n)),
746        Value::Int(n) => Literal::Integer(i64::from(n)),
747        Value::BigInt(n) => Literal::Integer(n),
748        Value::Float(x) => Literal::Float(x),
749        Value::Text(s) | Value::Json(s) => Literal::String(s),
750        // Other values (Vector, Date, Timestamp, TsVector, etc.)
751        // round-trip through the Display form back into a string
752        // literal. v7.12.5 will add typed-literal variants here
753        // so the cast layer doesn't need to re-parse from text.
754        other => Literal::String(format!("{other:?}")),
755    };
756    Expr::Literal(lit)
757}
758
759/// v7.12.7 — substitute NEW / OLD / DECLARE-local references in
760/// every `Expr` field of a [`Statement`]. Used to materialise an
761/// embedded SQL statement's NEW.col / OLD.col / local-var refs as
762/// literals so the engine can re-execute it without holding the
763/// trigger context.
764fn substitute_trigger_context_in_statement(
765    stmt: &mut spg_sql::ast::Statement,
766    new_row: Option<&Row>,
767    old_row: Option<&Row>,
768    locals: &BTreeMap<String, Value>,
769    columns: &[ColumnSchema],
770) -> Result<(), EvalError> {
771    use spg_sql::ast::Statement;
772    let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
773        substitute_locals(e, locals);
774        substitute_new_old(e, new_row, old_row, columns)?;
775        Ok(())
776    };
777    match stmt {
778        Statement::Insert(s) => {
779            for tuple in &mut s.rows {
780                for e in tuple {
781                    walk(e)?;
782                }
783            }
784        }
785        Statement::Update(s) => {
786            for (_col, e) in &mut s.assignments {
787                walk(e)?;
788            }
789            if let Some(w) = &mut s.where_ {
790                walk(w)?;
791            }
792        }
793        Statement::Delete(s) => {
794            if let Some(w) = &mut s.where_ {
795                walk(w)?;
796            }
797        }
798        Statement::Select(s) => {
799            substitute_trigger_context_in_select(s, new_row, old_row, locals, columns)?
800        }
801        // Other statement kinds (DDL, SHOW, etc.) inside a
802        // trigger body would only meaningfully reference NEW/OLD
803        // in error-message position; v7.12.7 doesn't recursively
804        // substitute their Expr fields. Future surfaces (e.g.
805        // RAISE ... USING) can add cases here.
806        _ => {}
807    }
808    Ok(())
809}
810
811fn substitute_trigger_context_in_select(
812    s: &mut spg_sql::ast::SelectStatement,
813    new_row: Option<&Row>,
814    old_row: Option<&Row>,
815    locals: &BTreeMap<String, Value>,
816    columns: &[ColumnSchema],
817) -> Result<(), EvalError> {
818    use spg_sql::ast::SelectItem;
819    let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
820        substitute_locals(e, locals);
821        substitute_new_old(e, new_row, old_row, columns)?;
822        Ok(())
823    };
824    for item in &mut s.items {
825        if let SelectItem::Expr { expr, .. } = item {
826            walk(expr)?;
827        }
828    }
829    if let Some(w) = &mut s.where_ {
830        walk(w)?;
831    }
832    if let Some(group_by) = &mut s.group_by {
833        for g in group_by {
834            walk(g)?;
835        }
836    }
837    if let Some(h) = &mut s.having {
838        walk(h)?;
839    }
840    for ob in &mut s.order_by {
841        walk(&mut ob.expr)?;
842    }
843    // LIMIT / OFFSET use `LimitExpr` (integer literal or
844    // placeholder); they don't carry an `Expr` to substitute
845    // into. Leave them alone.
846    let _ = &s.limit;
847    let _ = &s.offset;
848    Ok(())
849}
850
851/// v7.12.4 — find the triggers that should fire for a given
852/// `(table, event, timing)` tuple. Returns names so the caller
853/// can iterate without holding a borrow on the catalog while it
854/// mutates rows.
855pub fn matching_trigger_names<'a>(
856    triggers: &'a [TriggerDef],
857    table: &str,
858    event: &str,
859    timing: &str,
860) -> Vec<&'a TriggerDef> {
861    triggers
862        .iter()
863        .filter(|t| {
864            t.table == table
865                && t.timing.eq_ignore_ascii_case(timing)
866                && t.for_each.eq_ignore_ascii_case("row")
867                && t.events.iter().any(|e| e.eq_ignore_ascii_case(event))
868        })
869        .collect()
870}