Skip to main content

spg_engine/
triggers.rs

1//! v7.12.4 — PL/pgSQL row-level trigger executor.
2//!
3//! The catalogued [`spg_storage::FunctionDef`] carries the trigger
4//! function's source body as raw text (between the original
5//! `$$ ... $$`). Each time a trigger fires we re-parse the body
6//! via `spg_sql::parse_function_body` and walk the resulting
7//! [`spg_sql::ast::PlPgSqlBlock`] against a NEW / OLD row context.
8//!
9//! v7.12.4 surface (the minimum that lets a mailrs-shape
10//! `update_search_vector` trigger run end-to-end):
11//!
12//!   * `NEW.col := <expr>;`     — mutate a NEW cell. BEFORE only.
13//!   * `RETURN NEW;`            — pass the (possibly-mutated) row
14//!                                back to the row writer.
15//!   * `RETURN OLD;`            — return the pre-change row.
16//!   * `RETURN NULL;` / `RETURN;` — skip the write (BEFORE) or
17//!                                no-op the notification (AFTER).
18//!   * sub-expression eval recurses through the regular
19//!     [`crate::eval::eval_expr`] so anything the SELECT executor
20//!     can compute is fair game inside a trigger body.
21//!
22//! Out of scope for v7.12.4 (land in v7.12.5+):
23//!
24//!   * `DECLARE`'d local variables
25//!   * `IF / ELSIF / ELSE / END IF;` control flow
26//!   * Embedded SQL statements (`UPDATE … WHERE …`, `SELECT … INTO var`)
27//!   * `RAISE NOTICE / RAISE EXCEPTION`
28//!   * Loop constructs
29
30use alloc::collections::BTreeMap;
31use alloc::format;
32use alloc::string::String;
33use alloc::vec::Vec;
34use core::fmt;
35
36use spg_sql::ast::{AssignTarget, Expr, PlPgSqlDeclare, PlPgSqlStmt, RaiseLevel, ReturnTarget};
37use spg_storage::{ColumnSchema, FunctionDef, Row, StorageError, TriggerDef, Value};
38
39use crate::eval::{self, EvalContext, EvalError};
40use crate::{CancelToken, Engine, EngineError, MAX_TRIGGER_RECURSION};
41
42/// v7.12.7 — embedded SQL statement collected during a trigger
43/// fire, queued for execution after the firing DML completes.
44/// NEW / OLD / DECLARE-local references inside the statement's
45/// Expr tree have already been substituted with literals; the
46/// engine just feeds it to `execute_stmt_with_cancel`.
47#[derive(Debug, Clone, PartialEq)]
48pub struct DeferredEmbeddedStmt {
49    /// Trigger function the embedded SQL came from. Used to
50    /// label recursion errors precisely.
51    pub function: String,
52    /// Substituted statement, ready to execute.
53    pub stmt: spg_sql::ast::Statement,
54}
55
56/// What the trigger function returned. Drives the row-write path
57/// the trigger fired from.
58#[derive(Debug, Clone, PartialEq)]
59pub enum TriggerOutcome {
60    /// `RETURN NEW;` (or `RETURN OLD;`) — write this row.
61    /// For BEFORE triggers, the row may differ from the input
62    /// (e.g. `NEW.search_vector := …` rewrote a cell). For AFTER
63    /// triggers, the value is currently ignored — but we still
64    /// surface it for symmetric callers / future v7.12.5 use.
65    Row(Row),
66    /// `RETURN NULL;` or trigger fell off the end. For a BEFORE
67    /// trigger, the row writer must skip the affected row. For
68    /// an AFTER trigger, no-op.
69    Skip,
70}
71
72/// Result type the trigger executor exposes. Wraps `EvalError`
73/// at the eval-of-expressions layer and adds trigger-specific
74/// failure modes (`OLD.col := …`, unsupported PL/pgSQL feature,
75/// body that fails to re-parse, …).
76#[derive(Debug, Clone, PartialEq)]
77pub enum TriggerError {
78    /// Body source stored in the catalog can't be re-parsed.
79    /// Usually means the function was created against a newer
80    /// PL/pgSQL surface than the running engine knows about.
81    UnparseableBody { function: String, detail: String },
82    /// Trigger function uses a v7.12.5+ language feature
83    /// (DECLARE, IF, embedded SQL, RAISE, …). The error names
84    /// the construct so the operator can plan around it until
85    /// the feature lands.
86    UnsupportedConstruct { function: String, detail: String },
87    /// `OLD.col := <expr>` inside the body. PG itself rejects
88    /// this; we surface a clear message rather than silently
89    /// dropping the assignment.
90    OldIsReadOnly { function: String, column: String },
91    /// `NEW.col := <expr>` in an AFTER trigger — same rationale
92    /// as OLD: PG enforces "NEW is read-only after the row has
93    /// been written" and we mirror.
94    NewReadOnlyInAfterTrigger { function: String, column: String },
95    /// `NEW.col := <expr>` against a non-existent column.
96    /// Usually a schema-drift bug.
97    UnknownColumn {
98        function: String,
99        column: String,
100        table: String,
101    },
102    /// Sub-expression eval inside the trigger body failed. The
103    /// wrapped [`EvalError`] explains the underlying cause
104    /// (`ColumnNotFound`, `TypeMismatch`, …).
105    EvalFailed { function: String, cause: EvalError },
106    /// v7.12.6 — `RAISE EXCEPTION '<message>' [, args]*` in the
107    /// trigger body. The interpreter formats the args into the
108    /// message via PG-style `%` substitution and surfaces the
109    /// resolved text up to the caller.
110    RaiseException { function: String, message: String },
111}
112
113impl fmt::Display for TriggerError {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        match self {
116            Self::UnparseableBody { function, detail } => {
117                write!(
118                    f,
119                    "trigger function {function:?} body did not parse: {detail}"
120                )
121            }
122            Self::UnsupportedConstruct { function, detail } => {
123                write!(
124                    f,
125                    "trigger function {function:?} uses an unsupported PL/pgSQL construct: {detail}"
126                )
127            }
128            Self::OldIsReadOnly { function, column } => {
129                write!(
130                    f,
131                    "trigger function {function:?}: cannot assign to OLD.{column} (OLD is read-only — PG rule)"
132                )
133            }
134            Self::NewReadOnlyInAfterTrigger { function, column } => {
135                write!(
136                    f,
137                    "trigger function {function:?}: cannot assign to NEW.{column} inside an AFTER trigger \
138                     (NEW is read-only post-write — use BEFORE triggers for mutation, or an embedded UPDATE statement \
139                      in v7.12.5+)"
140                )
141            }
142            Self::UnknownColumn {
143                function,
144                column,
145                table,
146            } => {
147                write!(
148                    f,
149                    "trigger function {function:?}: target column {column:?} not in table {table:?} schema"
150                )
151            }
152            Self::EvalFailed { function, cause } => {
153                write!(
154                    f,
155                    "trigger function {function:?}: expression eval failed: {cause}"
156                )
157            }
158            Self::RaiseException { function, message } => {
159                write!(
160                    f,
161                    "trigger function {function:?}: RAISE EXCEPTION {message:?}"
162                )
163            }
164        }
165    }
166}
167
168/// Fire a single row-level trigger.
169///
170/// `is_after` is true for AFTER triggers; the executor enforces
171/// "NEW is read-only" by rejecting NEW.col assignments in that
172/// case. AFTER trigger return values are ignored by callers; the
173/// returned [`TriggerOutcome`] just carries the (possibly
174/// untouched) NEW row for symmetry.
175#[allow(clippy::too_many_arguments)] // the table_name / columns / params /
176// ts-config trio are independent; folding
177// them into a struct just shuffles the
178// boilerplate to the call sites without
179// material gain.
180pub fn fire_row_trigger(
181    function: &FunctionDef,
182    new_row: Option<Row>,
183    old_row: Option<&Row>,
184    table_name: &str,
185    columns: &[ColumnSchema],
186    params: &[Value],
187    default_text_search_config: Option<&str>,
188    is_after: bool,
189) -> Result<(TriggerOutcome, Vec<DeferredEmbeddedStmt>), TriggerError> {
190    if !function.language.eq_ignore_ascii_case("plpgsql") {
191        return Err(TriggerError::UnsupportedConstruct {
192            function: function.name.clone(),
193            detail: format!(
194                "v7.12.4 only invokes LANGUAGE plpgsql trigger functions; \
195                 {:?} declares LANGUAGE {}",
196                function.name, function.language
197            ),
198        });
199    }
200    let block = spg_sql::parse_function_body(&function.body).map_err(|e| {
201        TriggerError::UnparseableBody {
202            function: function.name.clone(),
203            detail: format!("{e}"),
204        }
205    })?;
206    // v7.12.6 — initialise local variable scope from the DECLARE
207    // block. Each init expr (if any) evaluates against the
208    // so-far-bound scope + the NEW/OLD context, so later DECLAREs
209    // can reference earlier ones.
210    let mut locals: BTreeMap<String, Value> = BTreeMap::new();
211    init_locals_from_declarations(
212        &block.declarations,
213        &mut locals,
214        new_row.as_ref(),
215        old_row,
216        columns,
217        table_name,
218        params,
219        default_text_search_config,
220        &function.name,
221    )?;
222    let mut current_new = new_row;
223    let ctx = BodyCtx {
224        function: &function.name,
225        table_name,
226        columns,
227        params,
228        default_text_search_config,
229        is_after,
230        select_into_resolver: None,
231    };
232    let mut deferred: Vec<DeferredEmbeddedStmt> = Vec::new();
233    let outcome = match execute_stmts(
234        &block.statements,
235        &mut current_new,
236        old_row,
237        &mut locals,
238        &ctx,
239        &mut deferred,
240    )? {
241        BodyOutcome::Return(target) => resolve_return(target, current_new, old_row),
242        // Body fell off without an explicit RETURN. PL/pgSQL
243        // default is `RETURN NULL`; we mirror — the BEFORE
244        // trigger then skips the row.
245        BodyOutcome::FellThrough => TriggerOutcome::Skip,
246    };
247    Ok((outcome, deferred))
248}
249
250/// v7.12.6 — body-walk return signal. `Return(target)` short-
251/// circuits the caller; `FellThrough` means the statement list
252/// completed without a RETURN, equivalent to PL/pgSQL's implicit
253/// `RETURN NULL`.
254enum BodyOutcome {
255    Return(ReturnTarget),
256    FellThrough,
257}
258
259/// Shared parameters every body-stmt evaluation needs. Bundled so
260/// the recursive `execute_stmts` doesn't have to thread eight
261/// individual `&str` / `&[…]` args around.
262struct BodyCtx<'a> {
263    function: &'a str,
264    table_name: &'a str,
265    columns: &'a [ColumnSchema],
266    params: &'a [Value],
267    default_text_search_config: Option<&'a str>,
268    is_after: bool,
269    /// v7.16.2 — synchronous SELECT … INTO resolver. Provided
270    /// by `Engine::exec_do_block` so the walker can run a
271    /// SELECT against the engine right when SelectInto is
272    /// reached (so subsequent IF reads of the local see the
273    /// fresh value). `None` for trigger paths where SelectInto
274    /// isn't yet supported.
275    select_into_resolver: Option<&'a SelectIntoResolver<'a>>,
276}
277
278/// v7.16.2 — callback shape the DO-block executor registers
279/// on `BodyCtx`. Runs the supplied SELECT statement against
280/// the engine, returns the first row's first column.
281pub type SelectIntoResolver<'a> =
282    dyn Fn(&spg_sql::ast::Statement) -> Result<Value, TriggerError> + 'a;
283
284fn execute_stmts(
285    stmts: &[PlPgSqlStmt],
286    current_new: &mut Option<Row>,
287    old_row: Option<&Row>,
288    locals: &mut BTreeMap<String, Value>,
289    ctx: &BodyCtx<'_>,
290    deferred: &mut Vec<DeferredEmbeddedStmt>,
291) -> Result<BodyOutcome, TriggerError> {
292    for stmt in stmts {
293        match stmt {
294            PlPgSqlStmt::Assign { target, value } => {
295                let evaluated = eval_with_new_old_and_locals(
296                    value,
297                    current_new.as_ref(),
298                    old_row,
299                    locals,
300                    ctx.columns,
301                    ctx.table_name,
302                    ctx.params,
303                    ctx.default_text_search_config,
304                )
305                .map_err(|cause| TriggerError::EvalFailed {
306                    function: ctx.function.into(),
307                    cause,
308                })?;
309                match target {
310                    AssignTarget::NewColumn(col) => {
311                        if ctx.is_after {
312                            return Err(TriggerError::NewReadOnlyInAfterTrigger {
313                                function: ctx.function.into(),
314                                column: col.clone(),
315                            });
316                        }
317                        let pos = ctx
318                            .columns
319                            .iter()
320                            .position(|c| c.name.eq_ignore_ascii_case(col))
321                            .ok_or_else(|| TriggerError::UnknownColumn {
322                                function: ctx.function.into(),
323                                column: col.clone(),
324                                table: alloc::string::ToString::to_string(&ctx.table_name),
325                            })?;
326                        let row = current_new.as_mut().ok_or_else(|| {
327                            TriggerError::UnsupportedConstruct {
328                                function: ctx.function.into(),
329                                detail: format!(
330                                    "NEW.{col} := … requires a NEW row context \
331                                     (BEFORE INSERT / UPDATE only — not available on DELETE)"
332                                ),
333                            }
334                        })?;
335                        row.values[pos] = evaluated;
336                    }
337                    AssignTarget::OldColumn(col) => {
338                        return Err(TriggerError::OldIsReadOnly {
339                            function: ctx.function.into(),
340                            column: col.clone(),
341                        });
342                    }
343                    AssignTarget::Local(name) => {
344                        // v7.12.6 — write into the DECLARE scope.
345                        // Loose-typing: we don't enforce the
346                        // declared type at runtime (PG's INTO
347                        // coerces; v7.12.6 just stores the
348                        // evaluated Value as-is). Type coercion
349                        // tightens in a later release.
350                        locals.insert(name.clone(), evaluated);
351                    }
352                }
353            }
354            PlPgSqlStmt::Return(target) => {
355                return Ok(BodyOutcome::Return(target.clone()));
356            }
357            PlPgSqlStmt::If {
358                branches,
359                else_branch,
360            } => {
361                let mut matched = false;
362                for (cond_expr, body) in branches {
363                    let cond_val = eval_with_new_old_and_locals(
364                        cond_expr,
365                        current_new.as_ref(),
366                        old_row,
367                        locals,
368                        ctx.columns,
369                        ctx.table_name,
370                        ctx.params,
371                        ctx.default_text_search_config,
372                    )
373                    .map_err(|cause| TriggerError::EvalFailed {
374                        function: ctx.function.into(),
375                        cause,
376                    })?;
377                    if matches!(cond_val, Value::Bool(true)) {
378                        matched = true;
379                        match execute_stmts(body, current_new, old_row, locals, ctx, deferred)? {
380                            BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
381                            BodyOutcome::FellThrough => {}
382                        }
383                        break;
384                    }
385                }
386                if !matched && !else_branch.is_empty() {
387                    match execute_stmts(else_branch, current_new, old_row, locals, ctx, deferred)? {
388                        BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
389                        BodyOutcome::FellThrough => {}
390                    }
391                }
392            }
393            PlPgSqlStmt::Raise {
394                level,
395                message,
396                args,
397            } => {
398                // Resolve every %-format placeholder by evaluating
399                // each arg expression and rendering its Value.
400                let mut rendered_args: Vec<String> = Vec::with_capacity(args.len());
401                for a in args {
402                    let v = eval_with_new_old_and_locals(
403                        a,
404                        current_new.as_ref(),
405                        old_row,
406                        locals,
407                        ctx.columns,
408                        ctx.table_name,
409                        ctx.params,
410                        ctx.default_text_search_config,
411                    )
412                    .map_err(|cause| TriggerError::EvalFailed {
413                        function: ctx.function.into(),
414                        cause,
415                    })?;
416                    rendered_args.push(value_to_display_string(&v));
417                }
418                let resolved = format_raise_message(message, &rendered_args);
419                if matches!(level, RaiseLevel::Exception) {
420                    return Err(TriggerError::RaiseException {
421                        function: ctx.function.into(),
422                        message: resolved,
423                    });
424                }
425                // NOTICE / WARNING / INFO / LOG / DEBUG — log to
426                // stderr for v7.12.6. Wiring through the server's
427                // log channel is a v7.12.7+ polish item; the
428                // resolved message stays accessible regardless.
429                let _ = resolved;
430                let _ = level;
431            }
432            PlPgSqlStmt::SelectInto { var, body } => {
433                // v7.16.2 — execute via the engine callback the
434                // caller (Engine::exec_do_block) registered on
435                // ctx, assign the result to the local. Trigger
436                // path (no callback) errors loudly: SELECT INTO
437                // doesn't fit in a row-write loop.
438                let mut substituted = spg_sql::ast::Statement::Select((**body).clone());
439                substitute_trigger_context_in_statement(
440                    &mut substituted,
441                    current_new.as_ref(),
442                    old_row,
443                    locals,
444                    ctx.columns,
445                )
446                .map_err(|cause| TriggerError::EvalFailed {
447                    function: ctx.function.into(),
448                    cause,
449                })?;
450                let resolver =
451                    ctx.select_into_resolver.ok_or_else(|| TriggerError::UnsupportedConstruct {
452                        function: ctx.function.into(),
453                        detail: alloc::format!(
454                            "SELECT … INTO {var}: only supported inside DO blocks (not trigger bodies) in v7.16.2"
455                        ),
456                    })?;
457                let value = resolver(&substituted)?;
458                locals.insert(var.clone(), value);
459            }
460            PlPgSqlStmt::EmbeddedSql(boxed_stmt) => {
461                // v7.12.7 — substitute NEW/OLD/locals into every
462                // Expr field of the statement, then queue for
463                // post-DML execution. The trigger interpreter
464                // doesn't call back into Engine::execute directly
465                // (that would deadlock the row-write mut borrow);
466                // the engine drains `deferred` after the firing
467                // INSERT/UPDATE/DELETE completes its main work.
468                let mut substituted = (**boxed_stmt).clone();
469                substitute_trigger_context_in_statement(
470                    &mut substituted,
471                    current_new.as_ref(),
472                    old_row,
473                    locals,
474                    ctx.columns,
475                )
476                .map_err(|cause| TriggerError::EvalFailed {
477                    function: ctx.function.into(),
478                    cause,
479                })?;
480                deferred.push(DeferredEmbeddedStmt {
481                    function: ctx.function.into(),
482                    stmt: substituted,
483                });
484            }
485        }
486    }
487    Ok(BodyOutcome::FellThrough)
488}
489
490/// v7.16.2 — execute a DO block's PlPgSqlBlock at top level.
491/// Different from `fire_row_trigger` in three ways:
492///   1. No NEW/OLD row context — DO blocks aren't row-scoped.
493///   2. EmbeddedSql statements collected into the returned vec
494///      so the caller (`Engine::exec_do_block`) can dispatch
495///      them via `Engine::execute_in_with_cancel` IMMEDIATELY,
496///      not defer. Triggers defer because they fire inside a
497///      row-write `&mut Catalog` borrow; DO has no such borrow.
498///   3. Embedded condition Expr (e.g. `IF EXISTS (SELECT ...)`)
499///      evaluation happens inline against the engine's
500///      current state — the caller resolves the subquery
501///      result before walking the body. We do that by
502///      collecting the IF / Assign / RAISE statements and
503///      letting the caller-side evaluator decide; v7.16.2's
504///      simple path lets `eval_with_new_old_and_locals` do
505///      it inline, falling back to the embedded sub-engine
506///      for SELECT subqueries via the regular eval path.
507///
508/// Returns the deferred SQL list in execution order. Errors
509/// from the walk propagate verbatim (parse / eval / engine).
510pub fn execute_do_block_top_level<'a>(
511    block: &spg_sql::ast::PlPgSqlBlock,
512    default_text_search_config: Option<&'a str>,
513    select_into_resolver: Option<&'a SelectIntoResolver<'a>>,
514) -> Result<Vec<spg_sql::ast::Statement>, TriggerError> {
515    let mut locals: BTreeMap<String, Value> = BTreeMap::new();
516    let empty_cols: &[ColumnSchema] = &[];
517    init_locals_from_declarations(
518        &block.declarations,
519        &mut locals,
520        None,
521        None,
522        empty_cols,
523        "",
524        &[],
525        default_text_search_config,
526        "DO",
527    )?;
528    let ctx = BodyCtx {
529        function: "DO",
530        table_name: "",
531        columns: empty_cols,
532        params: &[],
533        default_text_search_config,
534        is_after: false,
535        select_into_resolver,
536    };
537    let mut current_new: Option<Row> = None;
538    let mut deferred: Vec<DeferredEmbeddedStmt> = Vec::new();
539    // execute_stmts returns BodyOutcome — for DO top-level we
540    // ignore the return target (RETURN inside DO is a no-op
541    // by PG semantics: the block's outer scope has no return
542    // contract).
543    let _ = execute_stmts(
544        &block.statements,
545        &mut current_new,
546        None,
547        &mut locals,
548        &ctx,
549        &mut deferred,
550    )?;
551    Ok(deferred.into_iter().map(|d| d.stmt).collect())
552}
553
554fn resolve_return(
555    target: ReturnTarget,
556    current_new: Option<Row>,
557    old_row: Option<&Row>,
558) -> TriggerOutcome {
559    match target {
560        ReturnTarget::New => current_new.map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
561        ReturnTarget::Old => old_row
562            .cloned()
563            .map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
564        ReturnTarget::Null => TriggerOutcome::Skip,
565        // The scalar UDF surface in a later release handles
566        // RETURN <expr> properly; for now we fall through to Skip.
567        ReturnTarget::Expr(_) => TriggerOutcome::Skip,
568    }
569}
570
571#[allow(clippy::too_many_arguments)]
572fn init_locals_from_declarations(
573    decls: &[PlPgSqlDeclare],
574    locals: &mut BTreeMap<String, Value>,
575    new_row: Option<&Row>,
576    old_row: Option<&Row>,
577    columns: &[ColumnSchema],
578    table_name: &str,
579    params: &[Value],
580    default_text_search_config: Option<&str>,
581    function_name: &str,
582) -> Result<(), TriggerError> {
583    for d in decls {
584        let v = if let Some(init) = &d.default {
585            eval_with_new_old_and_locals(
586                init,
587                new_row,
588                old_row,
589                locals,
590                columns,
591                table_name,
592                params,
593                default_text_search_config,
594            )
595            .map_err(|cause| TriggerError::EvalFailed {
596                function: function_name.into(),
597                cause,
598            })?
599        } else {
600            Value::Null
601        };
602        locals.insert(d.name.clone(), v);
603    }
604    Ok(())
605}
606
607/// v7.12.6 — PG `%` format expansion for RAISE. Sequential
608/// positional substitution; `%%` produces a literal `%`.
609fn format_raise_message(fmt: &str, args: &[String]) -> String {
610    let mut out = String::with_capacity(fmt.len());
611    let mut iter = args.iter();
612    let mut chars = fmt.chars().peekable();
613    while let Some(c) = chars.next() {
614        if c == '%' {
615            match chars.peek() {
616                Some('%') => {
617                    out.push('%');
618                    chars.next();
619                }
620                _ => {
621                    if let Some(a) = iter.next() {
622                        out.push_str(a);
623                    } else {
624                        // Unconsumed placeholder — PG emits an
625                        // error here; we mirror by leaving the
626                        // bare `%` so the message stays readable.
627                        out.push('%');
628                    }
629                }
630            }
631        } else {
632            out.push(c);
633        }
634    }
635    out
636}
637
638/// v7.12.6 — Display rendering for a [`Value`] inside a RAISE
639/// message arg. Booleans / ints / floats render naturally;
640/// strings render unquoted; other types fall back to Debug.
641fn value_to_display_string(v: &Value) -> String {
642    use alloc::string::ToString;
643    match v {
644        Value::Null => String::new(),
645        Value::Bool(b) => b.to_string(),
646        Value::SmallInt(n) => n.to_string(),
647        Value::Int(n) => n.to_string(),
648        Value::BigInt(n) => n.to_string(),
649        Value::Float(x) => x.to_string(),
650        Value::Text(s) | Value::Json(s) => s.clone(),
651        other => format!("{other:?}"),
652    }
653}
654
655/// Evaluate a sub-expression against the NEW / OLD row context.
656/// Pre-walks the AST replacing every `NEW.col` / `OLD.col`
657/// reference with a literal of the actual value, then dispatches
658/// to the regular [`eval::eval_expr`]. Pre-walk strategy mirrors
659/// the existing [`substitute_in_expr`] used by correlated
660/// subqueries.
661/// v7.12.6 — same as [`eval_with_new_old`] but also substitutes
662/// qualifier-less `Column(<name>)` references whose name matches
663/// a `DECLARE`'d local variable. Locals shadow table-column refs
664/// (PG semantics — though a careful trigger function avoids the
665/// collision via naming convention).
666#[allow(clippy::too_many_arguments)]
667fn eval_with_new_old_and_locals(
668    expr: &Expr,
669    new_row: Option<&Row>,
670    old_row: Option<&Row>,
671    locals: &BTreeMap<String, Value>,
672    columns: &[ColumnSchema],
673    table_alias: &str,
674    params: &[Value],
675    default_text_search_config: Option<&str>,
676) -> Result<Value, EvalError> {
677    let mut rewritten = expr.clone();
678    substitute_locals(&mut rewritten, locals);
679    substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
680    let ctx = EvalContext::new(columns, Some(table_alias))
681        .with_params(params)
682        .with_default_text_search_config(default_text_search_config);
683    let empty = Row::new(Vec::new());
684    eval::eval_expr(&rewritten, &empty, &ctx)
685}
686
687/// v7.12.6 — in-place substitute every qualifier-less
688/// `Column(<name>)` whose name is in `locals` with that local's
689/// current Value as a literal. Runs before [`substitute_new_old`]
690/// so NEW.col / OLD.col references (which have a qualifier) take
691/// the NEW/OLD path normally.
692fn substitute_locals(expr: &mut Expr, locals: &BTreeMap<String, Value>) {
693    if let Expr::Column(c) = expr {
694        if c.qualifier.is_none()
695            && let Some(v) = locals.get(&c.name)
696        {
697            *expr = value_to_literal_expr(&[], 0, v.clone());
698            return;
699        }
700    }
701    match expr {
702        Expr::AggregateOrdered { call, order_by, .. } => {
703            substitute_locals(call, locals);
704            for o in order_by.iter_mut() {
705                substitute_locals(&mut o.expr, locals);
706            }
707        }
708        Expr::Binary { lhs, rhs, .. } => {
709            substitute_locals(lhs, locals);
710            substitute_locals(rhs, locals);
711        }
712        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
713            substitute_locals(expr, locals);
714        }
715        Expr::Like { expr, pattern, .. } => {
716            substitute_locals(expr, locals);
717            substitute_locals(pattern, locals);
718        }
719        Expr::FunctionCall { args, .. } => {
720            for a in args {
721                substitute_locals(a, locals);
722            }
723        }
724        Expr::Extract { source, .. } => substitute_locals(source, locals),
725        Expr::Array(items) => {
726            for elem in items {
727                substitute_locals(elem, locals);
728            }
729        }
730        Expr::ArraySubscript { target, index } => {
731            substitute_locals(target, locals);
732            substitute_locals(index, locals);
733        }
734        Expr::AnyAll { expr, array, .. } => {
735            substitute_locals(expr, locals);
736            substitute_locals(array, locals);
737        }
738        Expr::InList { expr, list, .. } => {
739            substitute_locals(expr, locals);
740            for item in list {
741                substitute_locals(item, locals);
742            }
743        }
744        Expr::Case {
745            operand,
746            branches,
747            else_branch,
748        } => {
749            if let Some(o) = operand {
750                substitute_locals(o, locals);
751            }
752            for (w, t) in branches {
753                substitute_locals(w, locals);
754                substitute_locals(t, locals);
755            }
756            if let Some(e) = else_branch {
757                substitute_locals(e, locals);
758            }
759        }
760        Expr::Literal(_)
761        | Expr::Placeholder(_)
762        | Expr::Column(_)
763        | Expr::WindowFunction { .. }
764        | Expr::ScalarSubquery(_)
765        | Expr::Exists { .. }
766        | Expr::InSubquery { .. } => {}
767    }
768}
769
770fn eval_with_new_old(
771    expr: &Expr,
772    new_row: Option<&Row>,
773    old_row: Option<&Row>,
774    columns: &[ColumnSchema],
775    table_alias: &str,
776    params: &[Value],
777    default_text_search_config: Option<&str>,
778) -> Result<Value, EvalError> {
779    let mut rewritten = expr.clone();
780    substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
781    let ctx = EvalContext::new(columns, Some(table_alias))
782        .with_params(params)
783        .with_default_text_search_config(default_text_search_config);
784    // Empty row — the substitution above eliminated every column
785    // reference that depended on NEW / OLD; any remaining column
786    // reference is a bug (would surface as ColumnNotFound).
787    let empty = Row::new(Vec::new());
788    eval::eval_expr(&rewritten, &empty, &ctx)
789}
790
791/// In-place walk: replace every `Column{qualifier=NEW|OLD,name=c}`
792/// reference with the corresponding row value, materialised as
793/// an `Expr::Literal`. Recurses through every Expr variant so
794/// `to_tsvector('english', NEW.subject || ' ' || NEW.sender)`
795/// substitutes cleanly even though the references nest inside
796/// function calls + binary operators.
797fn substitute_new_old(
798    expr: &mut Expr,
799    new_row: Option<&Row>,
800    old_row: Option<&Row>,
801    columns: &[ColumnSchema],
802) -> Result<(), EvalError> {
803    if let Expr::Column(c) = expr {
804        if let Some(q) = &c.qualifier {
805            let lower = q.to_ascii_lowercase();
806            if lower == "new" || lower == "old" {
807                let (row, side) = if lower == "new" {
808                    (new_row, "NEW")
809                } else {
810                    (old_row, "OLD")
811                };
812                let pos = columns
813                    .iter()
814                    .position(|sc| sc.name.eq_ignore_ascii_case(&c.name))
815                    .ok_or_else(|| EvalError::ColumnNotFound {
816                        name: format!("{side}.{}", c.name),
817                    })?;
818                let v = match row {
819                    Some(r) => r.values.get(pos).cloned().unwrap_or(Value::Null),
820                    None => Value::Null,
821                };
822                *expr = value_to_literal_expr(columns, pos, v);
823                return Ok(());
824            }
825        }
826    }
827    match expr {
828        Expr::AggregateOrdered { call, order_by, .. } => {
829            substitute_new_old(call, new_row, old_row, columns)?;
830            for o in order_by.iter_mut() {
831                substitute_new_old(&mut o.expr, new_row, old_row, columns)?;
832            }
833        }
834        Expr::Binary { lhs, rhs, .. } => {
835            substitute_new_old(lhs, new_row, old_row, columns)?;
836            substitute_new_old(rhs, new_row, old_row, columns)?;
837        }
838        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
839            substitute_new_old(expr, new_row, old_row, columns)?;
840        }
841        Expr::Like { expr, pattern, .. } => {
842            substitute_new_old(expr, new_row, old_row, columns)?;
843            substitute_new_old(pattern, new_row, old_row, columns)?;
844        }
845        Expr::FunctionCall { args, .. } => {
846            for a in args {
847                substitute_new_old(a, new_row, old_row, columns)?;
848            }
849        }
850        Expr::Extract { source, .. } => substitute_new_old(source, new_row, old_row, columns)?,
851        Expr::Array(items) => {
852            for elem in items {
853                substitute_new_old(elem, new_row, old_row, columns)?;
854            }
855        }
856        Expr::ArraySubscript { target, index } => {
857            substitute_new_old(target, new_row, old_row, columns)?;
858            substitute_new_old(index, new_row, old_row, columns)?;
859        }
860        Expr::AnyAll { expr, array, .. } => {
861            substitute_new_old(expr, new_row, old_row, columns)?;
862            substitute_new_old(array, new_row, old_row, columns)?;
863        }
864        Expr::InList { expr, list, .. } => {
865            substitute_new_old(expr, new_row, old_row, columns)?;
866            for item in list {
867                substitute_new_old(item, new_row, old_row, columns)?;
868            }
869        }
870        Expr::Case {
871            operand,
872            branches,
873            else_branch,
874        } => {
875            if let Some(o) = operand {
876                substitute_new_old(o, new_row, old_row, columns)?;
877            }
878            for (w, t) in branches {
879                substitute_new_old(w, new_row, old_row, columns)?;
880                substitute_new_old(t, new_row, old_row, columns)?;
881            }
882            if let Some(e) = else_branch {
883                substitute_new_old(e, new_row, old_row, columns)?;
884            }
885        }
886        // Leaves + variants we don't recurse into (sub-queries
887        // inside a trigger body would require correlated-query
888        // wiring; carved out of v7.12.4).
889        Expr::Literal(_)
890        | Expr::Placeholder(_)
891        | Expr::Column(_)
892        | Expr::WindowFunction { .. }
893        | Expr::ScalarSubquery(_)
894        | Expr::Exists { .. }
895        | Expr::InSubquery { .. } => {}
896    }
897    Ok(())
898}
899
900/// Turn a [`Value`] back into an [`Expr::Literal`]. Necessary
901/// because [`substitute_new_old`] inlines NEW/OLD cell values
902/// into the expression tree.
903fn value_to_literal_expr(_columns: &[ColumnSchema], _pos: usize, v: Value) -> Expr {
904    use spg_sql::ast::Literal;
905    let lit = match v {
906        Value::Null => Literal::Null,
907        Value::Bool(b) => Literal::Bool(b),
908        Value::SmallInt(n) => Literal::Integer(i64::from(n)),
909        Value::Int(n) => Literal::Integer(i64::from(n)),
910        Value::BigInt(n) => Literal::Integer(n),
911        Value::Float(x) => Literal::Float(x),
912        Value::Text(s) | Value::Json(s) => Literal::String(s),
913        // Other values (Vector, Date, Timestamp, TsVector, etc.)
914        // round-trip through the Display form back into a string
915        // literal. v7.12.5 will add typed-literal variants here
916        // so the cast layer doesn't need to re-parse from text.
917        other => Literal::String(format!("{other:?}")),
918    };
919    Expr::Literal(lit)
920}
921
922/// v7.12.7 — substitute NEW / OLD / DECLARE-local references in
923/// every `Expr` field of a [`Statement`]. Used to materialise an
924/// embedded SQL statement's NEW.col / OLD.col / local-var refs as
925/// literals so the engine can re-execute it without holding the
926/// trigger context.
927fn substitute_trigger_context_in_statement(
928    stmt: &mut spg_sql::ast::Statement,
929    new_row: Option<&Row>,
930    old_row: Option<&Row>,
931    locals: &BTreeMap<String, Value>,
932    columns: &[ColumnSchema],
933) -> Result<(), EvalError> {
934    use spg_sql::ast::Statement;
935    let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
936        substitute_locals(e, locals);
937        substitute_new_old(e, new_row, old_row, columns)?;
938        Ok(())
939    };
940    match stmt {
941        Statement::Insert(s) => {
942            for tuple in &mut s.rows {
943                for e in tuple {
944                    walk(e)?;
945                }
946            }
947        }
948        Statement::Update(s) => {
949            for (_col, e) in &mut s.assignments {
950                walk(e)?;
951            }
952            if let Some(w) = &mut s.where_ {
953                walk(w)?;
954            }
955        }
956        Statement::Delete(s) => {
957            if let Some(w) = &mut s.where_ {
958                walk(w)?;
959            }
960        }
961        Statement::Select(s) => {
962            substitute_trigger_context_in_select(s, new_row, old_row, locals, columns)?
963        }
964        // Other statement kinds (DDL, SHOW, etc.) inside a
965        // trigger body would only meaningfully reference NEW/OLD
966        // in error-message position; v7.12.7 doesn't recursively
967        // substitute their Expr fields. Future surfaces (e.g.
968        // RAISE ... USING) can add cases here.
969        _ => {}
970    }
971    Ok(())
972}
973
974fn substitute_trigger_context_in_select(
975    s: &mut spg_sql::ast::SelectStatement,
976    new_row: Option<&Row>,
977    old_row: Option<&Row>,
978    locals: &BTreeMap<String, Value>,
979    columns: &[ColumnSchema],
980) -> Result<(), EvalError> {
981    use spg_sql::ast::SelectItem;
982    let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
983        substitute_locals(e, locals);
984        substitute_new_old(e, new_row, old_row, columns)?;
985        Ok(())
986    };
987    for item in &mut s.items {
988        if let SelectItem::Expr { expr, .. } = item {
989            walk(expr)?;
990        }
991    }
992    if let Some(w) = &mut s.where_ {
993        walk(w)?;
994    }
995    if let Some(group_by) = &mut s.group_by {
996        for g in group_by {
997            walk(g)?;
998        }
999    }
1000    if let Some(h) = &mut s.having {
1001        walk(h)?;
1002    }
1003    for ob in &mut s.order_by {
1004        walk(&mut ob.expr)?;
1005    }
1006    // LIMIT / OFFSET use `LimitExpr` (integer literal or
1007    // placeholder); they don't carry an `Expr` to substitute
1008    // into. Leave them alone.
1009    let _ = &s.limit;
1010    let _ = &s.offset;
1011    Ok(())
1012}
1013
1014/// v7.12.4 — find the triggers that should fire for a given
1015/// `(table, event, timing)` tuple. Returns names so the caller
1016/// can iterate without holding a borrow on the catalog while it
1017/// mutates rows.
1018pub fn matching_trigger_names<'a>(
1019    triggers: &'a [TriggerDef],
1020    table: &str,
1021    event: &str,
1022    timing: &str,
1023) -> Vec<&'a TriggerDef> {
1024    triggers
1025        .iter()
1026        .filter(|t| {
1027            t.table == table
1028                && t.timing.eq_ignore_ascii_case(timing)
1029                && t.for_each.eq_ignore_ascii_case("row")
1030                && t.events.iter().any(|e| e.eq_ignore_ascii_case(event))
1031        })
1032        .collect()
1033}
1034
1035impl Engine {
1036    /// v7.12.4 — snapshot every row-level trigger on `table` that
1037    /// fires for `event` (`"INSERT"` / `"UPDATE"` / `"DELETE"`) at
1038    /// the given `timing` (`"BEFORE"` / `"AFTER"`), and clone its
1039    /// referenced function definition. Returned as a vec of owned
1040    /// `FunctionDef` so the row-write loop can fire them without
1041    /// holding a borrow on the catalog (which would conflict with
1042    /// the table.insert / update_row / delete mutable borrows).
1043    pub(crate) fn snapshot_row_triggers(
1044        &self,
1045        table: &str,
1046        event: &str,
1047        timing: &str,
1048    ) -> Vec<spg_storage::FunctionDef> {
1049        let cat = self.active_catalog();
1050        cat.triggers()
1051            .iter()
1052            .filter(|t| {
1053                // v7.16.1 — skip disabled triggers (mailrs
1054                // round-9 A.2.b — pg_dump --disable-triggers).
1055                t.enabled
1056                    && t.table == table
1057                    && t.timing.eq_ignore_ascii_case(timing)
1058                    && t.for_each.eq_ignore_ascii_case("row")
1059                    && t.events.iter().any(|e| e.eq_ignore_ascii_case(event))
1060            })
1061            .filter_map(|t| cat.functions().get(&t.function).cloned())
1062            .collect()
1063    }
1064
1065    /// v7.13.0 — UPDATE-side snapshot that pairs each trigger's
1066    /// function with its `UPDATE OF cols` filter (mailrs round-5
1067    /// G7). Empty filter Vec means "fire unconditionally", matching
1068    /// the v7.12 behaviour.
1069    pub(crate) fn snapshot_update_row_triggers(
1070        &self,
1071        table: &str,
1072        timing: &str,
1073    ) -> Vec<(spg_storage::FunctionDef, Vec<String>)> {
1074        let cat = self.active_catalog();
1075        cat.triggers()
1076            .iter()
1077            .filter(|t| {
1078                // v7.16.1 — skip disabled triggers.
1079                t.enabled
1080                    && t.table == table
1081                    && t.timing.eq_ignore_ascii_case(timing)
1082                    && t.for_each.eq_ignore_ascii_case("row")
1083                    && t.events.iter().any(|e| e.eq_ignore_ascii_case("UPDATE"))
1084            })
1085            .filter_map(|t| {
1086                cat.functions()
1087                    .get(&t.function)
1088                    .cloned()
1089                    .map(|fd| (fd, t.update_columns.clone()))
1090            })
1091            .collect()
1092    }
1093
1094    /// v7.12.7 — drain the trigger-emitted embedded SQL queue.
1095    /// Called by the INSERT / UPDATE / DELETE executors after
1096    /// their main row-write loop returns. Each statement runs
1097    /// inside the same cancel scope as the firing DML and bumps
1098    /// the recursion counter; nested embedded SQL beyond
1099    /// [`MAX_TRIGGER_RECURSION`] errors with a clear message so
1100    /// a trigger-graph cycle surfaces as a query failure instead
1101    /// of stack-blowing the engine.
1102    pub(crate) fn execute_deferred_trigger_stmts(
1103        &mut self,
1104        deferred: Vec<DeferredEmbeddedStmt>,
1105        cancel: CancelToken<'_>,
1106    ) -> Result<(), EngineError> {
1107        for d in deferred {
1108            if self.trigger_recursion_depth >= MAX_TRIGGER_RECURSION {
1109                return Err(EngineError::Storage(StorageError::Corrupt(alloc::format!(
1110                    "trigger embedded SQL recursion depth {} exceeded (trigger function \
1111                     {:?} would push past the {} cap — check for trigger cycles)",
1112                    self.trigger_recursion_depth,
1113                    d.function,
1114                    MAX_TRIGGER_RECURSION,
1115                ))));
1116            }
1117            self.trigger_recursion_depth += 1;
1118            let res = self.execute_stmt_with_cancel(d.stmt, cancel);
1119            self.trigger_recursion_depth -= 1;
1120            res?;
1121        }
1122        Ok(())
1123    }
1124}