Skip to main content

spg_engine/
triggers.rs

1//! v7.12.4 — PL/pgSQL row-level trigger executor.
2//!
3//! The catalogued [`spg_storage::FunctionDef`] carries the trigger
4//! function's source body as raw text (between the original
5//! `$$ ... $$`). Each time a trigger fires we re-parse the body
6//! via `spg_sql::parse_function_body` and walk the resulting
7//! [`spg_sql::ast::PlPgSqlBlock`] against a NEW / OLD row context.
8//!
9//! v7.12.4 surface (the minimum that lets a mailrs-shape
10//! `update_search_vector` trigger run end-to-end):
11//!
12//!   * `NEW.col := <expr>;`     — mutate a NEW cell. BEFORE only.
13//!   * `RETURN NEW;`            — pass the (possibly-mutated) row
14//!                                back to the row writer.
15//!   * `RETURN OLD;`            — return the pre-change row.
16//!   * `RETURN NULL;` / `RETURN;` — skip the write (BEFORE) or
17//!                                no-op the notification (AFTER).
18//!   * sub-expression eval recurses through the regular
19//!     [`crate::eval::eval_expr`] so anything the SELECT executor
20//!     can compute is fair game inside a trigger body.
21//!
22//! Out of scope for v7.12.4 (land in v7.12.5+):
23//!
24//!   * `DECLARE`'d local variables
25//!   * `IF / ELSIF / ELSE / END IF;` control flow
26//!   * Embedded SQL statements (`UPDATE … WHERE …`, `SELECT … INTO var`)
27//!   * `RAISE NOTICE / RAISE EXCEPTION`
28//!   * Loop constructs
29
30use alloc::collections::BTreeMap;
31use alloc::format;
32use alloc::string::String;
33use alloc::vec::Vec;
34use core::fmt;
35
36use spg_sql::ast::{AssignTarget, Expr, PlPgSqlDeclare, PlPgSqlStmt, RaiseLevel, ReturnTarget};
37use spg_storage::{ColumnSchema, FunctionDef, Row, TriggerDef, Value};
38
39use crate::eval::{self, EvalContext, EvalError};
40
41/// v7.12.7 — embedded SQL statement collected during a trigger
42/// fire, queued for execution after the firing DML completes.
43/// NEW / OLD / DECLARE-local references inside the statement's
44/// Expr tree have already been substituted with literals; the
45/// engine just feeds it to `execute_stmt_with_cancel`.
46#[derive(Debug, Clone, PartialEq)]
47pub struct DeferredEmbeddedStmt {
48    /// Trigger function the embedded SQL came from. Used to
49    /// label recursion errors precisely.
50    pub function: String,
51    /// Substituted statement, ready to execute.
52    pub stmt: spg_sql::ast::Statement,
53}
54
55/// What the trigger function returned. Drives the row-write path
56/// the trigger fired from.
57#[derive(Debug, Clone, PartialEq)]
58pub enum TriggerOutcome {
59    /// `RETURN NEW;` (or `RETURN OLD;`) — write this row.
60    /// For BEFORE triggers, the row may differ from the input
61    /// (e.g. `NEW.search_vector := …` rewrote a cell). For AFTER
62    /// triggers, the value is currently ignored — but we still
63    /// surface it for symmetric callers / future v7.12.5 use.
64    Row(Row),
65    /// `RETURN NULL;` or trigger fell off the end. For a BEFORE
66    /// trigger, the row writer must skip the affected row. For
67    /// an AFTER trigger, no-op.
68    Skip,
69}
70
71/// Result type the trigger executor exposes. Wraps `EvalError`
72/// at the eval-of-expressions layer and adds trigger-specific
73/// failure modes (`OLD.col := …`, unsupported PL/pgSQL feature,
74/// body that fails to re-parse, …).
75#[derive(Debug, Clone, PartialEq)]
76pub enum TriggerError {
77    /// Body source stored in the catalog can't be re-parsed.
78    /// Usually means the function was created against a newer
79    /// PL/pgSQL surface than the running engine knows about.
80    UnparseableBody { function: String, detail: String },
81    /// Trigger function uses a v7.12.5+ language feature
82    /// (DECLARE, IF, embedded SQL, RAISE, …). The error names
83    /// the construct so the operator can plan around it until
84    /// the feature lands.
85    UnsupportedConstruct { function: String, detail: String },
86    /// `OLD.col := <expr>` inside the body. PG itself rejects
87    /// this; we surface a clear message rather than silently
88    /// dropping the assignment.
89    OldIsReadOnly { function: String, column: String },
90    /// `NEW.col := <expr>` in an AFTER trigger — same rationale
91    /// as OLD: PG enforces "NEW is read-only after the row has
92    /// been written" and we mirror.
93    NewReadOnlyInAfterTrigger { function: String, column: String },
94    /// `NEW.col := <expr>` against a non-existent column.
95    /// Usually a schema-drift bug.
96    UnknownColumn {
97        function: String,
98        column: String,
99        table: String,
100    },
101    /// Sub-expression eval inside the trigger body failed. The
102    /// wrapped [`EvalError`] explains the underlying cause
103    /// (`ColumnNotFound`, `TypeMismatch`, …).
104    EvalFailed { function: String, cause: EvalError },
105    /// v7.12.6 — `RAISE EXCEPTION '<message>' [, args]*` in the
106    /// trigger body. The interpreter formats the args into the
107    /// message via PG-style `%` substitution and surfaces the
108    /// resolved text up to the caller.
109    RaiseException { function: String, message: String },
110}
111
112impl fmt::Display for TriggerError {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        match self {
115            Self::UnparseableBody { function, detail } => {
116                write!(
117                    f,
118                    "trigger function {function:?} body did not parse: {detail}"
119                )
120            }
121            Self::UnsupportedConstruct { function, detail } => {
122                write!(
123                    f,
124                    "trigger function {function:?} uses an unsupported PL/pgSQL construct: {detail}"
125                )
126            }
127            Self::OldIsReadOnly { function, column } => {
128                write!(
129                    f,
130                    "trigger function {function:?}: cannot assign to OLD.{column} (OLD is read-only — PG rule)"
131                )
132            }
133            Self::NewReadOnlyInAfterTrigger { function, column } => {
134                write!(
135                    f,
136                    "trigger function {function:?}: cannot assign to NEW.{column} inside an AFTER trigger \
137                     (NEW is read-only post-write — use BEFORE triggers for mutation, or an embedded UPDATE statement \
138                      in v7.12.5+)"
139                )
140            }
141            Self::UnknownColumn {
142                function,
143                column,
144                table,
145            } => {
146                write!(
147                    f,
148                    "trigger function {function:?}: target column {column:?} not in table {table:?} schema"
149                )
150            }
151            Self::EvalFailed { function, cause } => {
152                write!(
153                    f,
154                    "trigger function {function:?}: expression eval failed: {cause}"
155                )
156            }
157            Self::RaiseException { function, message } => {
158                write!(
159                    f,
160                    "trigger function {function:?}: RAISE EXCEPTION {message:?}"
161                )
162            }
163        }
164    }
165}
166
167/// Fire a single row-level trigger.
168///
169/// `is_after` is true for AFTER triggers; the executor enforces
170/// "NEW is read-only" by rejecting NEW.col assignments in that
171/// case. AFTER trigger return values are ignored by callers; the
172/// returned [`TriggerOutcome`] just carries the (possibly
173/// untouched) NEW row for symmetry.
174#[allow(clippy::too_many_arguments)] // the table_name / columns / params /
175// ts-config trio are independent; folding
176// them into a struct just shuffles the
177// boilerplate to the call sites without
178// material gain.
179pub fn fire_row_trigger(
180    function: &FunctionDef,
181    new_row: Option<Row>,
182    old_row: Option<&Row>,
183    table_name: &str,
184    columns: &[ColumnSchema],
185    params: &[Value],
186    default_text_search_config: Option<&str>,
187    is_after: bool,
188) -> Result<(TriggerOutcome, Vec<DeferredEmbeddedStmt>), TriggerError> {
189    if !function.language.eq_ignore_ascii_case("plpgsql") {
190        return Err(TriggerError::UnsupportedConstruct {
191            function: function.name.clone(),
192            detail: format!(
193                "v7.12.4 only invokes LANGUAGE plpgsql trigger functions; \
194                 {:?} declares LANGUAGE {}",
195                function.name, function.language
196            ),
197        });
198    }
199    let block = spg_sql::parse_function_body(&function.body).map_err(|e| {
200        TriggerError::UnparseableBody {
201            function: function.name.clone(),
202            detail: format!("{e}"),
203        }
204    })?;
205    // v7.12.6 — initialise local variable scope from the DECLARE
206    // block. Each init expr (if any) evaluates against the
207    // so-far-bound scope + the NEW/OLD context, so later DECLAREs
208    // can reference earlier ones.
209    let mut locals: BTreeMap<String, Value> = BTreeMap::new();
210    init_locals_from_declarations(
211        &block.declarations,
212        &mut locals,
213        new_row.as_ref(),
214        old_row,
215        columns,
216        table_name,
217        params,
218        default_text_search_config,
219        &function.name,
220    )?;
221    let mut current_new = new_row;
222    let ctx = BodyCtx {
223        function: &function.name,
224        table_name,
225        columns,
226        params,
227        default_text_search_config,
228        is_after,
229    };
230    let mut deferred: Vec<DeferredEmbeddedStmt> = Vec::new();
231    let outcome = match execute_stmts(
232        &block.statements,
233        &mut current_new,
234        old_row,
235        &mut locals,
236        &ctx,
237        &mut deferred,
238    )? {
239        BodyOutcome::Return(target) => resolve_return(target, current_new, old_row),
240        // Body fell off without an explicit RETURN. PL/pgSQL
241        // default is `RETURN NULL`; we mirror — the BEFORE
242        // trigger then skips the row.
243        BodyOutcome::FellThrough => TriggerOutcome::Skip,
244    };
245    Ok((outcome, deferred))
246}
247
248/// v7.12.6 — body-walk return signal. `Return(target)` short-
249/// circuits the caller; `FellThrough` means the statement list
250/// completed without a RETURN, equivalent to PL/pgSQL's implicit
251/// `RETURN NULL`.
252enum BodyOutcome {
253    Return(ReturnTarget),
254    FellThrough,
255}
256
257/// Shared parameters every body-stmt evaluation needs. Bundled so
258/// the recursive `execute_stmts` doesn't have to thread eight
259/// individual `&str` / `&[…]` args around.
260struct BodyCtx<'a> {
261    function: &'a str,
262    table_name: &'a str,
263    columns: &'a [ColumnSchema],
264    params: &'a [Value],
265    default_text_search_config: Option<&'a str>,
266    is_after: bool,
267}
268
269fn execute_stmts(
270    stmts: &[PlPgSqlStmt],
271    current_new: &mut Option<Row>,
272    old_row: Option<&Row>,
273    locals: &mut BTreeMap<String, Value>,
274    ctx: &BodyCtx<'_>,
275    deferred: &mut Vec<DeferredEmbeddedStmt>,
276) -> Result<BodyOutcome, TriggerError> {
277    for stmt in stmts {
278        match stmt {
279            PlPgSqlStmt::Assign { target, value } => {
280                let evaluated = eval_with_new_old_and_locals(
281                    value,
282                    current_new.as_ref(),
283                    old_row,
284                    locals,
285                    ctx.columns,
286                    ctx.table_name,
287                    ctx.params,
288                    ctx.default_text_search_config,
289                )
290                .map_err(|cause| TriggerError::EvalFailed {
291                    function: ctx.function.into(),
292                    cause,
293                })?;
294                match target {
295                    AssignTarget::NewColumn(col) => {
296                        if ctx.is_after {
297                            return Err(TriggerError::NewReadOnlyInAfterTrigger {
298                                function: ctx.function.into(),
299                                column: col.clone(),
300                            });
301                        }
302                        let pos = ctx
303                            .columns
304                            .iter()
305                            .position(|c| c.name.eq_ignore_ascii_case(col))
306                            .ok_or_else(|| TriggerError::UnknownColumn {
307                                function: ctx.function.into(),
308                                column: col.clone(),
309                                table: alloc::string::ToString::to_string(&ctx.table_name),
310                            })?;
311                        let row = current_new.as_mut().ok_or_else(|| {
312                            TriggerError::UnsupportedConstruct {
313                                function: ctx.function.into(),
314                                detail: format!(
315                                    "NEW.{col} := … requires a NEW row context \
316                                     (BEFORE INSERT / UPDATE only — not available on DELETE)"
317                                ),
318                            }
319                        })?;
320                        row.values[pos] = evaluated;
321                    }
322                    AssignTarget::OldColumn(col) => {
323                        return Err(TriggerError::OldIsReadOnly {
324                            function: ctx.function.into(),
325                            column: col.clone(),
326                        });
327                    }
328                    AssignTarget::Local(name) => {
329                        // v7.12.6 — write into the DECLARE scope.
330                        // Loose-typing: we don't enforce the
331                        // declared type at runtime (PG's INTO
332                        // coerces; v7.12.6 just stores the
333                        // evaluated Value as-is). Type coercion
334                        // tightens in a later release.
335                        locals.insert(name.clone(), evaluated);
336                    }
337                }
338            }
339            PlPgSqlStmt::Return(target) => {
340                return Ok(BodyOutcome::Return(target.clone()));
341            }
342            PlPgSqlStmt::If {
343                branches,
344                else_branch,
345            } => {
346                let mut matched = false;
347                for (cond_expr, body) in branches {
348                    let cond_val = eval_with_new_old_and_locals(
349                        cond_expr,
350                        current_new.as_ref(),
351                        old_row,
352                        locals,
353                        ctx.columns,
354                        ctx.table_name,
355                        ctx.params,
356                        ctx.default_text_search_config,
357                    )
358                    .map_err(|cause| TriggerError::EvalFailed {
359                        function: ctx.function.into(),
360                        cause,
361                    })?;
362                    if matches!(cond_val, Value::Bool(true)) {
363                        matched = true;
364                        match execute_stmts(body, current_new, old_row, locals, ctx, deferred)? {
365                            BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
366                            BodyOutcome::FellThrough => {}
367                        }
368                        break;
369                    }
370                }
371                if !matched && !else_branch.is_empty() {
372                    match execute_stmts(else_branch, current_new, old_row, locals, ctx, deferred)? {
373                        BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
374                        BodyOutcome::FellThrough => {}
375                    }
376                }
377            }
378            PlPgSqlStmt::Raise {
379                level,
380                message,
381                args,
382            } => {
383                // Resolve every %-format placeholder by evaluating
384                // each arg expression and rendering its Value.
385                let mut rendered_args: Vec<String> = Vec::with_capacity(args.len());
386                for a in args {
387                    let v = eval_with_new_old_and_locals(
388                        a,
389                        current_new.as_ref(),
390                        old_row,
391                        locals,
392                        ctx.columns,
393                        ctx.table_name,
394                        ctx.params,
395                        ctx.default_text_search_config,
396                    )
397                    .map_err(|cause| TriggerError::EvalFailed {
398                        function: ctx.function.into(),
399                        cause,
400                    })?;
401                    rendered_args.push(value_to_display_string(&v));
402                }
403                let resolved = format_raise_message(message, &rendered_args);
404                if matches!(level, RaiseLevel::Exception) {
405                    return Err(TriggerError::RaiseException {
406                        function: ctx.function.into(),
407                        message: resolved,
408                    });
409                }
410                // NOTICE / WARNING / INFO / LOG / DEBUG — log to
411                // stderr for v7.12.6. Wiring through the server's
412                // log channel is a v7.12.7+ polish item; the
413                // resolved message stays accessible regardless.
414                let _ = resolved;
415                let _ = level;
416            }
417            PlPgSqlStmt::EmbeddedSql(boxed_stmt) => {
418                // v7.12.7 — substitute NEW/OLD/locals into every
419                // Expr field of the statement, then queue for
420                // post-DML execution. The trigger interpreter
421                // doesn't call back into Engine::execute directly
422                // (that would deadlock the row-write mut borrow);
423                // the engine drains `deferred` after the firing
424                // INSERT/UPDATE/DELETE completes its main work.
425                let mut substituted = (**boxed_stmt).clone();
426                substitute_trigger_context_in_statement(
427                    &mut substituted,
428                    current_new.as_ref(),
429                    old_row,
430                    locals,
431                    ctx.columns,
432                )
433                .map_err(|cause| TriggerError::EvalFailed {
434                    function: ctx.function.into(),
435                    cause,
436                })?;
437                deferred.push(DeferredEmbeddedStmt {
438                    function: ctx.function.into(),
439                    stmt: substituted,
440                });
441            }
442        }
443    }
444    Ok(BodyOutcome::FellThrough)
445}
446
447fn resolve_return(
448    target: ReturnTarget,
449    current_new: Option<Row>,
450    old_row: Option<&Row>,
451) -> TriggerOutcome {
452    match target {
453        ReturnTarget::New => current_new.map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
454        ReturnTarget::Old => old_row
455            .cloned()
456            .map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
457        ReturnTarget::Null => TriggerOutcome::Skip,
458        // The scalar UDF surface in a later release handles
459        // RETURN <expr> properly; for now we fall through to Skip.
460        ReturnTarget::Expr(_) => TriggerOutcome::Skip,
461    }
462}
463
464#[allow(clippy::too_many_arguments)]
465fn init_locals_from_declarations(
466    decls: &[PlPgSqlDeclare],
467    locals: &mut BTreeMap<String, Value>,
468    new_row: Option<&Row>,
469    old_row: Option<&Row>,
470    columns: &[ColumnSchema],
471    table_name: &str,
472    params: &[Value],
473    default_text_search_config: Option<&str>,
474    function_name: &str,
475) -> Result<(), TriggerError> {
476    for d in decls {
477        let v = if let Some(init) = &d.default {
478            eval_with_new_old_and_locals(
479                init,
480                new_row,
481                old_row,
482                locals,
483                columns,
484                table_name,
485                params,
486                default_text_search_config,
487            )
488            .map_err(|cause| TriggerError::EvalFailed {
489                function: function_name.into(),
490                cause,
491            })?
492        } else {
493            Value::Null
494        };
495        locals.insert(d.name.clone(), v);
496    }
497    Ok(())
498}
499
500/// v7.12.6 — PG `%` format expansion for RAISE. Sequential
501/// positional substitution; `%%` produces a literal `%`.
502fn format_raise_message(fmt: &str, args: &[String]) -> String {
503    let mut out = String::with_capacity(fmt.len());
504    let mut iter = args.iter();
505    let mut chars = fmt.chars().peekable();
506    while let Some(c) = chars.next() {
507        if c == '%' {
508            match chars.peek() {
509                Some('%') => {
510                    out.push('%');
511                    chars.next();
512                }
513                _ => {
514                    if let Some(a) = iter.next() {
515                        out.push_str(a);
516                    } else {
517                        // Unconsumed placeholder — PG emits an
518                        // error here; we mirror by leaving the
519                        // bare `%` so the message stays readable.
520                        out.push('%');
521                    }
522                }
523            }
524        } else {
525            out.push(c);
526        }
527    }
528    out
529}
530
531/// v7.12.6 — Display rendering for a [`Value`] inside a RAISE
532/// message arg. Booleans / ints / floats render naturally;
533/// strings render unquoted; other types fall back to Debug.
534fn value_to_display_string(v: &Value) -> String {
535    use alloc::string::ToString;
536    match v {
537        Value::Null => String::new(),
538        Value::Bool(b) => b.to_string(),
539        Value::SmallInt(n) => n.to_string(),
540        Value::Int(n) => n.to_string(),
541        Value::BigInt(n) => n.to_string(),
542        Value::Float(x) => x.to_string(),
543        Value::Text(s) | Value::Json(s) => s.clone(),
544        other => format!("{other:?}"),
545    }
546}
547
548/// Evaluate a sub-expression against the NEW / OLD row context.
549/// Pre-walks the AST replacing every `NEW.col` / `OLD.col`
550/// reference with a literal of the actual value, then dispatches
551/// to the regular [`eval::eval_expr`]. Pre-walk strategy mirrors
552/// the existing [`substitute_in_expr`] used by correlated
553/// subqueries.
554/// v7.12.6 — same as [`eval_with_new_old`] but also substitutes
555/// qualifier-less `Column(<name>)` references whose name matches
556/// a `DECLARE`'d local variable. Locals shadow table-column refs
557/// (PG semantics — though a careful trigger function avoids the
558/// collision via naming convention).
559#[allow(clippy::too_many_arguments)]
560fn eval_with_new_old_and_locals(
561    expr: &Expr,
562    new_row: Option<&Row>,
563    old_row: Option<&Row>,
564    locals: &BTreeMap<String, Value>,
565    columns: &[ColumnSchema],
566    table_alias: &str,
567    params: &[Value],
568    default_text_search_config: Option<&str>,
569) -> Result<Value, EvalError> {
570    let mut rewritten = expr.clone();
571    substitute_locals(&mut rewritten, locals);
572    substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
573    let ctx = EvalContext::new(columns, Some(table_alias))
574        .with_params(params)
575        .with_default_text_search_config(default_text_search_config);
576    let empty = Row::new(Vec::new());
577    eval::eval_expr(&rewritten, &empty, &ctx)
578}
579
580/// v7.12.6 — in-place substitute every qualifier-less
581/// `Column(<name>)` whose name is in `locals` with that local's
582/// current Value as a literal. Runs before [`substitute_new_old`]
583/// so NEW.col / OLD.col references (which have a qualifier) take
584/// the NEW/OLD path normally.
585fn substitute_locals(expr: &mut Expr, locals: &BTreeMap<String, Value>) {
586    if let Expr::Column(c) = expr {
587        if c.qualifier.is_none()
588            && let Some(v) = locals.get(&c.name)
589        {
590            *expr = value_to_literal_expr(&[], 0, v.clone());
591            return;
592        }
593    }
594    match expr {
595        Expr::Binary { lhs, rhs, .. } => {
596            substitute_locals(lhs, locals);
597            substitute_locals(rhs, locals);
598        }
599        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
600            substitute_locals(expr, locals);
601        }
602        Expr::Like { expr, pattern, .. } => {
603            substitute_locals(expr, locals);
604            substitute_locals(pattern, locals);
605        }
606        Expr::FunctionCall { args, .. } => {
607            for a in args {
608                substitute_locals(a, locals);
609            }
610        }
611        Expr::Extract { source, .. } => substitute_locals(source, locals),
612        Expr::Array(items) => {
613            for elem in items {
614                substitute_locals(elem, locals);
615            }
616        }
617        Expr::ArraySubscript { target, index } => {
618            substitute_locals(target, locals);
619            substitute_locals(index, locals);
620        }
621        Expr::AnyAll { expr, array, .. } => {
622            substitute_locals(expr, locals);
623            substitute_locals(array, locals);
624        }
625        Expr::Case {
626            operand,
627            branches,
628            else_branch,
629        } => {
630            if let Some(o) = operand {
631                substitute_locals(o, locals);
632            }
633            for (w, t) in branches {
634                substitute_locals(w, locals);
635                substitute_locals(t, locals);
636            }
637            if let Some(e) = else_branch {
638                substitute_locals(e, locals);
639            }
640        }
641        Expr::Literal(_)
642        | Expr::Placeholder(_)
643        | Expr::Column(_)
644        | Expr::WindowFunction { .. }
645        | Expr::ScalarSubquery(_)
646        | Expr::Exists { .. }
647        | Expr::InSubquery { .. } => {}
648    }
649}
650
651fn eval_with_new_old(
652    expr: &Expr,
653    new_row: Option<&Row>,
654    old_row: Option<&Row>,
655    columns: &[ColumnSchema],
656    table_alias: &str,
657    params: &[Value],
658    default_text_search_config: Option<&str>,
659) -> Result<Value, EvalError> {
660    let mut rewritten = expr.clone();
661    substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
662    let ctx = EvalContext::new(columns, Some(table_alias))
663        .with_params(params)
664        .with_default_text_search_config(default_text_search_config);
665    // Empty row — the substitution above eliminated every column
666    // reference that depended on NEW / OLD; any remaining column
667    // reference is a bug (would surface as ColumnNotFound).
668    let empty = Row::new(Vec::new());
669    eval::eval_expr(&rewritten, &empty, &ctx)
670}
671
672/// In-place walk: replace every `Column{qualifier=NEW|OLD,name=c}`
673/// reference with the corresponding row value, materialised as
674/// an `Expr::Literal`. Recurses through every Expr variant so
675/// `to_tsvector('english', NEW.subject || ' ' || NEW.sender)`
676/// substitutes cleanly even though the references nest inside
677/// function calls + binary operators.
678fn substitute_new_old(
679    expr: &mut Expr,
680    new_row: Option<&Row>,
681    old_row: Option<&Row>,
682    columns: &[ColumnSchema],
683) -> Result<(), EvalError> {
684    if let Expr::Column(c) = expr {
685        if let Some(q) = &c.qualifier {
686            let lower = q.to_ascii_lowercase();
687            if lower == "new" || lower == "old" {
688                let (row, side) = if lower == "new" {
689                    (new_row, "NEW")
690                } else {
691                    (old_row, "OLD")
692                };
693                let pos = columns
694                    .iter()
695                    .position(|sc| sc.name.eq_ignore_ascii_case(&c.name))
696                    .ok_or_else(|| EvalError::ColumnNotFound {
697                        name: format!("{side}.{}", c.name),
698                    })?;
699                let v = match row {
700                    Some(r) => r.values.get(pos).cloned().unwrap_or(Value::Null),
701                    None => Value::Null,
702                };
703                *expr = value_to_literal_expr(columns, pos, v);
704                return Ok(());
705            }
706        }
707    }
708    match expr {
709        Expr::Binary { lhs, rhs, .. } => {
710            substitute_new_old(lhs, new_row, old_row, columns)?;
711            substitute_new_old(rhs, new_row, old_row, columns)?;
712        }
713        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
714            substitute_new_old(expr, new_row, old_row, columns)?;
715        }
716        Expr::Like { expr, pattern, .. } => {
717            substitute_new_old(expr, new_row, old_row, columns)?;
718            substitute_new_old(pattern, new_row, old_row, columns)?;
719        }
720        Expr::FunctionCall { args, .. } => {
721            for a in args {
722                substitute_new_old(a, new_row, old_row, columns)?;
723            }
724        }
725        Expr::Extract { source, .. } => substitute_new_old(source, new_row, old_row, columns)?,
726        Expr::Array(items) => {
727            for elem in items {
728                substitute_new_old(elem, new_row, old_row, columns)?;
729            }
730        }
731        Expr::ArraySubscript { target, index } => {
732            substitute_new_old(target, new_row, old_row, columns)?;
733            substitute_new_old(index, new_row, old_row, columns)?;
734        }
735        Expr::AnyAll { expr, array, .. } => {
736            substitute_new_old(expr, new_row, old_row, columns)?;
737            substitute_new_old(array, new_row, old_row, columns)?;
738        }
739        Expr::Case {
740            operand,
741            branches,
742            else_branch,
743        } => {
744            if let Some(o) = operand {
745                substitute_new_old(o, new_row, old_row, columns)?;
746            }
747            for (w, t) in branches {
748                substitute_new_old(w, new_row, old_row, columns)?;
749                substitute_new_old(t, new_row, old_row, columns)?;
750            }
751            if let Some(e) = else_branch {
752                substitute_new_old(e, new_row, old_row, columns)?;
753            }
754        }
755        // Leaves + variants we don't recurse into (sub-queries
756        // inside a trigger body would require correlated-query
757        // wiring; carved out of v7.12.4).
758        Expr::Literal(_)
759        | Expr::Placeholder(_)
760        | Expr::Column(_)
761        | Expr::WindowFunction { .. }
762        | Expr::ScalarSubquery(_)
763        | Expr::Exists { .. }
764        | Expr::InSubquery { .. } => {}
765    }
766    Ok(())
767}
768
769/// Turn a [`Value`] back into an [`Expr::Literal`]. Necessary
770/// because [`substitute_new_old`] inlines NEW/OLD cell values
771/// into the expression tree.
772fn value_to_literal_expr(_columns: &[ColumnSchema], _pos: usize, v: Value) -> Expr {
773    use spg_sql::ast::Literal;
774    let lit = match v {
775        Value::Null => Literal::Null,
776        Value::Bool(b) => Literal::Bool(b),
777        Value::SmallInt(n) => Literal::Integer(i64::from(n)),
778        Value::Int(n) => Literal::Integer(i64::from(n)),
779        Value::BigInt(n) => Literal::Integer(n),
780        Value::Float(x) => Literal::Float(x),
781        Value::Text(s) | Value::Json(s) => Literal::String(s),
782        // Other values (Vector, Date, Timestamp, TsVector, etc.)
783        // round-trip through the Display form back into a string
784        // literal. v7.12.5 will add typed-literal variants here
785        // so the cast layer doesn't need to re-parse from text.
786        other => Literal::String(format!("{other:?}")),
787    };
788    Expr::Literal(lit)
789}
790
791/// v7.12.7 — substitute NEW / OLD / DECLARE-local references in
792/// every `Expr` field of a [`Statement`]. Used to materialise an
793/// embedded SQL statement's NEW.col / OLD.col / local-var refs as
794/// literals so the engine can re-execute it without holding the
795/// trigger context.
796fn substitute_trigger_context_in_statement(
797    stmt: &mut spg_sql::ast::Statement,
798    new_row: Option<&Row>,
799    old_row: Option<&Row>,
800    locals: &BTreeMap<String, Value>,
801    columns: &[ColumnSchema],
802) -> Result<(), EvalError> {
803    use spg_sql::ast::Statement;
804    let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
805        substitute_locals(e, locals);
806        substitute_new_old(e, new_row, old_row, columns)?;
807        Ok(())
808    };
809    match stmt {
810        Statement::Insert(s) => {
811            for tuple in &mut s.rows {
812                for e in tuple {
813                    walk(e)?;
814                }
815            }
816        }
817        Statement::Update(s) => {
818            for (_col, e) in &mut s.assignments {
819                walk(e)?;
820            }
821            if let Some(w) = &mut s.where_ {
822                walk(w)?;
823            }
824        }
825        Statement::Delete(s) => {
826            if let Some(w) = &mut s.where_ {
827                walk(w)?;
828            }
829        }
830        Statement::Select(s) => {
831            substitute_trigger_context_in_select(s, new_row, old_row, locals, columns)?
832        }
833        // Other statement kinds (DDL, SHOW, etc.) inside a
834        // trigger body would only meaningfully reference NEW/OLD
835        // in error-message position; v7.12.7 doesn't recursively
836        // substitute their Expr fields. Future surfaces (e.g.
837        // RAISE ... USING) can add cases here.
838        _ => {}
839    }
840    Ok(())
841}
842
843fn substitute_trigger_context_in_select(
844    s: &mut spg_sql::ast::SelectStatement,
845    new_row: Option<&Row>,
846    old_row: Option<&Row>,
847    locals: &BTreeMap<String, Value>,
848    columns: &[ColumnSchema],
849) -> Result<(), EvalError> {
850    use spg_sql::ast::SelectItem;
851    let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
852        substitute_locals(e, locals);
853        substitute_new_old(e, new_row, old_row, columns)?;
854        Ok(())
855    };
856    for item in &mut s.items {
857        if let SelectItem::Expr { expr, .. } = item {
858            walk(expr)?;
859        }
860    }
861    if let Some(w) = &mut s.where_ {
862        walk(w)?;
863    }
864    if let Some(group_by) = &mut s.group_by {
865        for g in group_by {
866            walk(g)?;
867        }
868    }
869    if let Some(h) = &mut s.having {
870        walk(h)?;
871    }
872    for ob in &mut s.order_by {
873        walk(&mut ob.expr)?;
874    }
875    // LIMIT / OFFSET use `LimitExpr` (integer literal or
876    // placeholder); they don't carry an `Expr` to substitute
877    // into. Leave them alone.
878    let _ = &s.limit;
879    let _ = &s.offset;
880    Ok(())
881}
882
883/// v7.12.4 — find the triggers that should fire for a given
884/// `(table, event, timing)` tuple. Returns names so the caller
885/// can iterate without holding a borrow on the catalog while it
886/// mutates rows.
887pub fn matching_trigger_names<'a>(
888    triggers: &'a [TriggerDef],
889    table: &str,
890    event: &str,
891    timing: &str,
892) -> Vec<&'a TriggerDef> {
893    triggers
894        .iter()
895        .filter(|t| {
896            t.table == table
897                && t.timing.eq_ignore_ascii_case(timing)
898                && t.for_each.eq_ignore_ascii_case("row")
899                && t.events.iter().any(|e| e.eq_ignore_ascii_case(event))
900        })
901        .collect()
902}