Skip to main content

spg_engine/
triggers.rs

1//! v7.12.4 — PL/pgSQL row-level trigger executor.
2//!
3//! The catalogued [`spg_storage::FunctionDef`] carries the trigger
4//! function's source body as raw text (between the original
5//! `$$ ... $$`). Each time a trigger fires we re-parse the body
6//! via `spg_sql::parse_function_body` and walk the resulting
7//! [`spg_sql::ast::PlPgSqlBlock`] against a NEW / OLD row context.
8//!
9//! v7.12.4 surface (the minimum that lets a mailrs-shape
10//! `update_search_vector` trigger run end-to-end):
11//!
12//!   * `NEW.col := <expr>;`     — mutate a NEW cell. BEFORE only.
13//!   * `RETURN NEW;`            — pass the (possibly-mutated) row
14//!                                back to the row writer.
15//!   * `RETURN OLD;`            — return the pre-change row.
16//!   * `RETURN NULL;` / `RETURN;` — skip the write (BEFORE) or
17//!                                no-op the notification (AFTER).
18//!   * sub-expression eval recurses through the regular
19//!     [`crate::eval::eval_expr`] so anything the SELECT executor
20//!     can compute is fair game inside a trigger body.
21//!
22//! Out of scope for v7.12.4 (land in v7.12.5+):
23//!
24//!   * `DECLARE`'d local variables
25//!   * `IF / ELSIF / ELSE / END IF;` control flow
26//!   * Embedded SQL statements (`UPDATE … WHERE …`, `SELECT … INTO var`)
27//!   * `RAISE NOTICE / RAISE EXCEPTION`
28//!   * Loop constructs
29
30use alloc::collections::BTreeMap;
31use alloc::format;
32use alloc::string::String;
33use alloc::vec::Vec;
34use core::fmt;
35
36use spg_sql::ast::{AssignTarget, Expr, PlPgSqlDeclare, PlPgSqlStmt, RaiseLevel, ReturnTarget};
37use spg_storage::{ColumnSchema, FunctionDef, Row, TriggerDef, Value};
38
39use crate::eval::{self, EvalContext, EvalError};
40
41/// v7.12.7 — embedded SQL statement collected during a trigger
42/// fire, queued for execution after the firing DML completes.
43/// NEW / OLD / DECLARE-local references inside the statement's
44/// Expr tree have already been substituted with literals; the
45/// engine just feeds it to `execute_stmt_with_cancel`.
46#[derive(Debug, Clone, PartialEq)]
47pub struct DeferredEmbeddedStmt {
48    /// Trigger function the embedded SQL came from. Used to
49    /// label recursion errors precisely.
50    pub function: String,
51    /// Substituted statement, ready to execute.
52    pub stmt: spg_sql::ast::Statement,
53}
54
55/// What the trigger function returned. Drives the row-write path
56/// the trigger fired from.
57#[derive(Debug, Clone, PartialEq)]
58pub enum TriggerOutcome {
59    /// `RETURN NEW;` (or `RETURN OLD;`) — write this row.
60    /// For BEFORE triggers, the row may differ from the input
61    /// (e.g. `NEW.search_vector := …` rewrote a cell). For AFTER
62    /// triggers, the value is currently ignored — but we still
63    /// surface it for symmetric callers / future v7.12.5 use.
64    Row(Row),
65    /// `RETURN NULL;` or trigger fell off the end. For a BEFORE
66    /// trigger, the row writer must skip the affected row. For
67    /// an AFTER trigger, no-op.
68    Skip,
69}
70
71/// Result type the trigger executor exposes. Wraps `EvalError`
72/// at the eval-of-expressions layer and adds trigger-specific
73/// failure modes (`OLD.col := …`, unsupported PL/pgSQL feature,
74/// body that fails to re-parse, …).
75#[derive(Debug, Clone, PartialEq)]
76pub enum TriggerError {
77    /// Body source stored in the catalog can't be re-parsed.
78    /// Usually means the function was created against a newer
79    /// PL/pgSQL surface than the running engine knows about.
80    UnparseableBody { function: String, detail: String },
81    /// Trigger function uses a v7.12.5+ language feature
82    /// (DECLARE, IF, embedded SQL, RAISE, …). The error names
83    /// the construct so the operator can plan around it until
84    /// the feature lands.
85    UnsupportedConstruct { function: String, detail: String },
86    /// `OLD.col := <expr>` inside the body. PG itself rejects
87    /// this; we surface a clear message rather than silently
88    /// dropping the assignment.
89    OldIsReadOnly { function: String, column: String },
90    /// `NEW.col := <expr>` in an AFTER trigger — same rationale
91    /// as OLD: PG enforces "NEW is read-only after the row has
92    /// been written" and we mirror.
93    NewReadOnlyInAfterTrigger { function: String, column: String },
94    /// `NEW.col := <expr>` against a non-existent column.
95    /// Usually a schema-drift bug.
96    UnknownColumn {
97        function: String,
98        column: String,
99        table: String,
100    },
101    /// Sub-expression eval inside the trigger body failed. The
102    /// wrapped [`EvalError`] explains the underlying cause
103    /// (`ColumnNotFound`, `TypeMismatch`, …).
104    EvalFailed { function: String, cause: EvalError },
105    /// v7.12.6 — `RAISE EXCEPTION '<message>' [, args]*` in the
106    /// trigger body. The interpreter formats the args into the
107    /// message via PG-style `%` substitution and surfaces the
108    /// resolved text up to the caller.
109    RaiseException { function: String, message: String },
110}
111
112impl fmt::Display for TriggerError {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        match self {
115            Self::UnparseableBody { function, detail } => {
116                write!(
117                    f,
118                    "trigger function {function:?} body did not parse: {detail}"
119                )
120            }
121            Self::UnsupportedConstruct { function, detail } => {
122                write!(
123                    f,
124                    "trigger function {function:?} uses an unsupported PL/pgSQL construct: {detail}"
125                )
126            }
127            Self::OldIsReadOnly { function, column } => {
128                write!(
129                    f,
130                    "trigger function {function:?}: cannot assign to OLD.{column} (OLD is read-only — PG rule)"
131                )
132            }
133            Self::NewReadOnlyInAfterTrigger { function, column } => {
134                write!(
135                    f,
136                    "trigger function {function:?}: cannot assign to NEW.{column} inside an AFTER trigger \
137                     (NEW is read-only post-write — use BEFORE triggers for mutation, or an embedded UPDATE statement \
138                      in v7.12.5+)"
139                )
140            }
141            Self::UnknownColumn {
142                function,
143                column,
144                table,
145            } => {
146                write!(
147                    f,
148                    "trigger function {function:?}: target column {column:?} not in table {table:?} schema"
149                )
150            }
151            Self::EvalFailed { function, cause } => {
152                write!(
153                    f,
154                    "trigger function {function:?}: expression eval failed: {cause}"
155                )
156            }
157            Self::RaiseException { function, message } => {
158                write!(
159                    f,
160                    "trigger function {function:?}: RAISE EXCEPTION {message:?}"
161                )
162            }
163        }
164    }
165}
166
167/// Fire a single row-level trigger.
168///
169/// `is_after` is true for AFTER triggers; the executor enforces
170/// "NEW is read-only" by rejecting NEW.col assignments in that
171/// case. AFTER trigger return values are ignored by callers; the
172/// returned [`TriggerOutcome`] just carries the (possibly
173/// untouched) NEW row for symmetry.
174#[allow(clippy::too_many_arguments)] // the table_name / columns / params /
175// ts-config trio are independent; folding
176// them into a struct just shuffles the
177// boilerplate to the call sites without
178// material gain.
179pub fn fire_row_trigger(
180    function: &FunctionDef,
181    new_row: Option<Row>,
182    old_row: Option<&Row>,
183    table_name: &str,
184    columns: &[ColumnSchema],
185    params: &[Value],
186    default_text_search_config: Option<&str>,
187    is_after: bool,
188) -> Result<(TriggerOutcome, Vec<DeferredEmbeddedStmt>), TriggerError> {
189    if !function.language.eq_ignore_ascii_case("plpgsql") {
190        return Err(TriggerError::UnsupportedConstruct {
191            function: function.name.clone(),
192            detail: format!(
193                "v7.12.4 only invokes LANGUAGE plpgsql trigger functions; \
194                 {:?} declares LANGUAGE {}",
195                function.name, function.language
196            ),
197        });
198    }
199    let block = spg_sql::parse_function_body(&function.body).map_err(|e| {
200        TriggerError::UnparseableBody {
201            function: function.name.clone(),
202            detail: format!("{e}"),
203        }
204    })?;
205    // v7.12.6 — initialise local variable scope from the DECLARE
206    // block. Each init expr (if any) evaluates against the
207    // so-far-bound scope + the NEW/OLD context, so later DECLAREs
208    // can reference earlier ones.
209    let mut locals: BTreeMap<String, Value> = BTreeMap::new();
210    init_locals_from_declarations(
211        &block.declarations,
212        &mut locals,
213        new_row.as_ref(),
214        old_row,
215        columns,
216        table_name,
217        params,
218        default_text_search_config,
219        &function.name,
220    )?;
221    let mut current_new = new_row;
222    let ctx = BodyCtx {
223        function: &function.name,
224        table_name,
225        columns,
226        params,
227        default_text_search_config,
228        is_after,
229        select_into_resolver: None,
230    };
231    let mut deferred: Vec<DeferredEmbeddedStmt> = Vec::new();
232    let outcome = match execute_stmts(
233        &block.statements,
234        &mut current_new,
235        old_row,
236        &mut locals,
237        &ctx,
238        &mut deferred,
239    )? {
240        BodyOutcome::Return(target) => resolve_return(target, current_new, old_row),
241        // Body fell off without an explicit RETURN. PL/pgSQL
242        // default is `RETURN NULL`; we mirror — the BEFORE
243        // trigger then skips the row.
244        BodyOutcome::FellThrough => TriggerOutcome::Skip,
245    };
246    Ok((outcome, deferred))
247}
248
249/// v7.12.6 — body-walk return signal. `Return(target)` short-
250/// circuits the caller; `FellThrough` means the statement list
251/// completed without a RETURN, equivalent to PL/pgSQL's implicit
252/// `RETURN NULL`.
253enum BodyOutcome {
254    Return(ReturnTarget),
255    FellThrough,
256}
257
258/// Shared parameters every body-stmt evaluation needs. Bundled so
259/// the recursive `execute_stmts` doesn't have to thread eight
260/// individual `&str` / `&[…]` args around.
261struct BodyCtx<'a> {
262    function: &'a str,
263    table_name: &'a str,
264    columns: &'a [ColumnSchema],
265    params: &'a [Value],
266    default_text_search_config: Option<&'a str>,
267    is_after: bool,
268    /// v7.16.2 — synchronous SELECT … INTO resolver. Provided
269    /// by `Engine::exec_do_block` so the walker can run a
270    /// SELECT against the engine right when SelectInto is
271    /// reached (so subsequent IF reads of the local see the
272    /// fresh value). `None` for trigger paths where SelectInto
273    /// isn't yet supported.
274    select_into_resolver: Option<&'a SelectIntoResolver<'a>>,
275}
276
277/// v7.16.2 — callback shape the DO-block executor registers
278/// on `BodyCtx`. Runs the supplied SELECT statement against
279/// the engine, returns the first row's first column.
280pub type SelectIntoResolver<'a> =
281    dyn Fn(&spg_sql::ast::Statement) -> Result<Value, TriggerError> + 'a;
282
283fn execute_stmts(
284    stmts: &[PlPgSqlStmt],
285    current_new: &mut Option<Row>,
286    old_row: Option<&Row>,
287    locals: &mut BTreeMap<String, Value>,
288    ctx: &BodyCtx<'_>,
289    deferred: &mut Vec<DeferredEmbeddedStmt>,
290) -> Result<BodyOutcome, TriggerError> {
291    for stmt in stmts {
292        match stmt {
293            PlPgSqlStmt::Assign { target, value } => {
294                let evaluated = eval_with_new_old_and_locals(
295                    value,
296                    current_new.as_ref(),
297                    old_row,
298                    locals,
299                    ctx.columns,
300                    ctx.table_name,
301                    ctx.params,
302                    ctx.default_text_search_config,
303                )
304                .map_err(|cause| TriggerError::EvalFailed {
305                    function: ctx.function.into(),
306                    cause,
307                })?;
308                match target {
309                    AssignTarget::NewColumn(col) => {
310                        if ctx.is_after {
311                            return Err(TriggerError::NewReadOnlyInAfterTrigger {
312                                function: ctx.function.into(),
313                                column: col.clone(),
314                            });
315                        }
316                        let pos = ctx
317                            .columns
318                            .iter()
319                            .position(|c| c.name.eq_ignore_ascii_case(col))
320                            .ok_or_else(|| TriggerError::UnknownColumn {
321                                function: ctx.function.into(),
322                                column: col.clone(),
323                                table: alloc::string::ToString::to_string(&ctx.table_name),
324                            })?;
325                        let row = current_new.as_mut().ok_or_else(|| {
326                            TriggerError::UnsupportedConstruct {
327                                function: ctx.function.into(),
328                                detail: format!(
329                                    "NEW.{col} := … requires a NEW row context \
330                                     (BEFORE INSERT / UPDATE only — not available on DELETE)"
331                                ),
332                            }
333                        })?;
334                        row.values[pos] = evaluated;
335                    }
336                    AssignTarget::OldColumn(col) => {
337                        return Err(TriggerError::OldIsReadOnly {
338                            function: ctx.function.into(),
339                            column: col.clone(),
340                        });
341                    }
342                    AssignTarget::Local(name) => {
343                        // v7.12.6 — write into the DECLARE scope.
344                        // Loose-typing: we don't enforce the
345                        // declared type at runtime (PG's INTO
346                        // coerces; v7.12.6 just stores the
347                        // evaluated Value as-is). Type coercion
348                        // tightens in a later release.
349                        locals.insert(name.clone(), evaluated);
350                    }
351                }
352            }
353            PlPgSqlStmt::Return(target) => {
354                return Ok(BodyOutcome::Return(target.clone()));
355            }
356            PlPgSqlStmt::If {
357                branches,
358                else_branch,
359            } => {
360                let mut matched = false;
361                for (cond_expr, body) in branches {
362                    let cond_val = eval_with_new_old_and_locals(
363                        cond_expr,
364                        current_new.as_ref(),
365                        old_row,
366                        locals,
367                        ctx.columns,
368                        ctx.table_name,
369                        ctx.params,
370                        ctx.default_text_search_config,
371                    )
372                    .map_err(|cause| TriggerError::EvalFailed {
373                        function: ctx.function.into(),
374                        cause,
375                    })?;
376                    if matches!(cond_val, Value::Bool(true)) {
377                        matched = true;
378                        match execute_stmts(body, current_new, old_row, locals, ctx, deferred)? {
379                            BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
380                            BodyOutcome::FellThrough => {}
381                        }
382                        break;
383                    }
384                }
385                if !matched && !else_branch.is_empty() {
386                    match execute_stmts(else_branch, current_new, old_row, locals, ctx, deferred)? {
387                        BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
388                        BodyOutcome::FellThrough => {}
389                    }
390                }
391            }
392            PlPgSqlStmt::Raise {
393                level,
394                message,
395                args,
396            } => {
397                // Resolve every %-format placeholder by evaluating
398                // each arg expression and rendering its Value.
399                let mut rendered_args: Vec<String> = Vec::with_capacity(args.len());
400                for a in args {
401                    let v = eval_with_new_old_and_locals(
402                        a,
403                        current_new.as_ref(),
404                        old_row,
405                        locals,
406                        ctx.columns,
407                        ctx.table_name,
408                        ctx.params,
409                        ctx.default_text_search_config,
410                    )
411                    .map_err(|cause| TriggerError::EvalFailed {
412                        function: ctx.function.into(),
413                        cause,
414                    })?;
415                    rendered_args.push(value_to_display_string(&v));
416                }
417                let resolved = format_raise_message(message, &rendered_args);
418                if matches!(level, RaiseLevel::Exception) {
419                    return Err(TriggerError::RaiseException {
420                        function: ctx.function.into(),
421                        message: resolved,
422                    });
423                }
424                // NOTICE / WARNING / INFO / LOG / DEBUG — log to
425                // stderr for v7.12.6. Wiring through the server's
426                // log channel is a v7.12.7+ polish item; the
427                // resolved message stays accessible regardless.
428                let _ = resolved;
429                let _ = level;
430            }
431            PlPgSqlStmt::SelectInto { var, body } => {
432                // v7.16.2 — execute via the engine callback the
433                // caller (Engine::exec_do_block) registered on
434                // ctx, assign the result to the local. Trigger
435                // path (no callback) errors loudly: SELECT INTO
436                // doesn't fit in a row-write loop.
437                let mut substituted = spg_sql::ast::Statement::Select((**body).clone());
438                substitute_trigger_context_in_statement(
439                    &mut substituted,
440                    current_new.as_ref(),
441                    old_row,
442                    locals,
443                    ctx.columns,
444                )
445                .map_err(|cause| TriggerError::EvalFailed {
446                    function: ctx.function.into(),
447                    cause,
448                })?;
449                let resolver =
450                    ctx.select_into_resolver.ok_or_else(|| TriggerError::UnsupportedConstruct {
451                        function: ctx.function.into(),
452                        detail: alloc::format!(
453                            "SELECT … INTO {var}: only supported inside DO blocks (not trigger bodies) in v7.16.2"
454                        ),
455                    })?;
456                let value = resolver(&substituted)?;
457                locals.insert(var.clone(), value);
458            }
459            PlPgSqlStmt::EmbeddedSql(boxed_stmt) => {
460                // v7.12.7 — substitute NEW/OLD/locals into every
461                // Expr field of the statement, then queue for
462                // post-DML execution. The trigger interpreter
463                // doesn't call back into Engine::execute directly
464                // (that would deadlock the row-write mut borrow);
465                // the engine drains `deferred` after the firing
466                // INSERT/UPDATE/DELETE completes its main work.
467                let mut substituted = (**boxed_stmt).clone();
468                substitute_trigger_context_in_statement(
469                    &mut substituted,
470                    current_new.as_ref(),
471                    old_row,
472                    locals,
473                    ctx.columns,
474                )
475                .map_err(|cause| TriggerError::EvalFailed {
476                    function: ctx.function.into(),
477                    cause,
478                })?;
479                deferred.push(DeferredEmbeddedStmt {
480                    function: ctx.function.into(),
481                    stmt: substituted,
482                });
483            }
484        }
485    }
486    Ok(BodyOutcome::FellThrough)
487}
488
489/// v7.16.2 — execute a DO block's PlPgSqlBlock at top level.
490/// Different from `fire_row_trigger` in three ways:
491///   1. No NEW/OLD row context — DO blocks aren't row-scoped.
492///   2. EmbeddedSql statements collected into the returned vec
493///      so the caller (`Engine::exec_do_block`) can dispatch
494///      them via `Engine::execute_in_with_cancel` IMMEDIATELY,
495///      not defer. Triggers defer because they fire inside a
496///      row-write `&mut Catalog` borrow; DO has no such borrow.
497///   3. Embedded condition Expr (e.g. `IF EXISTS (SELECT ...)`)
498///      evaluation happens inline against the engine's
499///      current state — the caller resolves the subquery
500///      result before walking the body. We do that by
501///      collecting the IF / Assign / RAISE statements and
502///      letting the caller-side evaluator decide; v7.16.2's
503///      simple path lets `eval_with_new_old_and_locals` do
504///      it inline, falling back to the embedded sub-engine
505///      for SELECT subqueries via the regular eval path.
506///
507/// Returns the deferred SQL list in execution order. Errors
508/// from the walk propagate verbatim (parse / eval / engine).
509pub fn execute_do_block_top_level<'a>(
510    block: &spg_sql::ast::PlPgSqlBlock,
511    default_text_search_config: Option<&'a str>,
512    select_into_resolver: Option<&'a SelectIntoResolver<'a>>,
513) -> Result<Vec<spg_sql::ast::Statement>, TriggerError> {
514    let mut locals: BTreeMap<String, Value> = BTreeMap::new();
515    let empty_cols: &[ColumnSchema] = &[];
516    init_locals_from_declarations(
517        &block.declarations,
518        &mut locals,
519        None,
520        None,
521        empty_cols,
522        "",
523        &[],
524        default_text_search_config,
525        "DO",
526    )?;
527    let ctx = BodyCtx {
528        function: "DO",
529        table_name: "",
530        columns: empty_cols,
531        params: &[],
532        default_text_search_config,
533        is_after: false,
534        select_into_resolver,
535    };
536    let mut current_new: Option<Row> = None;
537    let mut deferred: Vec<DeferredEmbeddedStmt> = Vec::new();
538    // execute_stmts returns BodyOutcome — for DO top-level we
539    // ignore the return target (RETURN inside DO is a no-op
540    // by PG semantics: the block's outer scope has no return
541    // contract).
542    let _ = execute_stmts(
543        &block.statements,
544        &mut current_new,
545        None,
546        &mut locals,
547        &ctx,
548        &mut deferred,
549    )?;
550    Ok(deferred.into_iter().map(|d| d.stmt).collect())
551}
552
553fn resolve_return(
554    target: ReturnTarget,
555    current_new: Option<Row>,
556    old_row: Option<&Row>,
557) -> TriggerOutcome {
558    match target {
559        ReturnTarget::New => current_new.map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
560        ReturnTarget::Old => old_row
561            .cloned()
562            .map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
563        ReturnTarget::Null => TriggerOutcome::Skip,
564        // The scalar UDF surface in a later release handles
565        // RETURN <expr> properly; for now we fall through to Skip.
566        ReturnTarget::Expr(_) => TriggerOutcome::Skip,
567    }
568}
569
570#[allow(clippy::too_many_arguments)]
571fn init_locals_from_declarations(
572    decls: &[PlPgSqlDeclare],
573    locals: &mut BTreeMap<String, Value>,
574    new_row: Option<&Row>,
575    old_row: Option<&Row>,
576    columns: &[ColumnSchema],
577    table_name: &str,
578    params: &[Value],
579    default_text_search_config: Option<&str>,
580    function_name: &str,
581) -> Result<(), TriggerError> {
582    for d in decls {
583        let v = if let Some(init) = &d.default {
584            eval_with_new_old_and_locals(
585                init,
586                new_row,
587                old_row,
588                locals,
589                columns,
590                table_name,
591                params,
592                default_text_search_config,
593            )
594            .map_err(|cause| TriggerError::EvalFailed {
595                function: function_name.into(),
596                cause,
597            })?
598        } else {
599            Value::Null
600        };
601        locals.insert(d.name.clone(), v);
602    }
603    Ok(())
604}
605
606/// v7.12.6 — PG `%` format expansion for RAISE. Sequential
607/// positional substitution; `%%` produces a literal `%`.
608fn format_raise_message(fmt: &str, args: &[String]) -> String {
609    let mut out = String::with_capacity(fmt.len());
610    let mut iter = args.iter();
611    let mut chars = fmt.chars().peekable();
612    while let Some(c) = chars.next() {
613        if c == '%' {
614            match chars.peek() {
615                Some('%') => {
616                    out.push('%');
617                    chars.next();
618                }
619                _ => {
620                    if let Some(a) = iter.next() {
621                        out.push_str(a);
622                    } else {
623                        // Unconsumed placeholder — PG emits an
624                        // error here; we mirror by leaving the
625                        // bare `%` so the message stays readable.
626                        out.push('%');
627                    }
628                }
629            }
630        } else {
631            out.push(c);
632        }
633    }
634    out
635}
636
637/// v7.12.6 — Display rendering for a [`Value`] inside a RAISE
638/// message arg. Booleans / ints / floats render naturally;
639/// strings render unquoted; other types fall back to Debug.
640fn value_to_display_string(v: &Value) -> String {
641    use alloc::string::ToString;
642    match v {
643        Value::Null => String::new(),
644        Value::Bool(b) => b.to_string(),
645        Value::SmallInt(n) => n.to_string(),
646        Value::Int(n) => n.to_string(),
647        Value::BigInt(n) => n.to_string(),
648        Value::Float(x) => x.to_string(),
649        Value::Text(s) | Value::Json(s) => s.clone(),
650        other => format!("{other:?}"),
651    }
652}
653
654/// Evaluate a sub-expression against the NEW / OLD row context.
655/// Pre-walks the AST replacing every `NEW.col` / `OLD.col`
656/// reference with a literal of the actual value, then dispatches
657/// to the regular [`eval::eval_expr`]. Pre-walk strategy mirrors
658/// the existing [`substitute_in_expr`] used by correlated
659/// subqueries.
660/// v7.12.6 — same as [`eval_with_new_old`] but also substitutes
661/// qualifier-less `Column(<name>)` references whose name matches
662/// a `DECLARE`'d local variable. Locals shadow table-column refs
663/// (PG semantics — though a careful trigger function avoids the
664/// collision via naming convention).
665#[allow(clippy::too_many_arguments)]
666fn eval_with_new_old_and_locals(
667    expr: &Expr,
668    new_row: Option<&Row>,
669    old_row: Option<&Row>,
670    locals: &BTreeMap<String, Value>,
671    columns: &[ColumnSchema],
672    table_alias: &str,
673    params: &[Value],
674    default_text_search_config: Option<&str>,
675) -> Result<Value, EvalError> {
676    let mut rewritten = expr.clone();
677    substitute_locals(&mut rewritten, locals);
678    substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
679    let ctx = EvalContext::new(columns, Some(table_alias))
680        .with_params(params)
681        .with_default_text_search_config(default_text_search_config);
682    let empty = Row::new(Vec::new());
683    eval::eval_expr(&rewritten, &empty, &ctx)
684}
685
686/// v7.12.6 — in-place substitute every qualifier-less
687/// `Column(<name>)` whose name is in `locals` with that local's
688/// current Value as a literal. Runs before [`substitute_new_old`]
689/// so NEW.col / OLD.col references (which have a qualifier) take
690/// the NEW/OLD path normally.
691fn substitute_locals(expr: &mut Expr, locals: &BTreeMap<String, Value>) {
692    if let Expr::Column(c) = expr {
693        if c.qualifier.is_none()
694            && let Some(v) = locals.get(&c.name)
695        {
696            *expr = value_to_literal_expr(&[], 0, v.clone());
697            return;
698        }
699    }
700    match expr {
701        Expr::AggregateOrdered { call, order_by } => {
702            substitute_locals(call, locals);
703            for o in order_by.iter_mut() {
704                substitute_locals(&mut o.expr, locals);
705            }
706        }
707        Expr::Binary { lhs, rhs, .. } => {
708            substitute_locals(lhs, locals);
709            substitute_locals(rhs, locals);
710        }
711        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
712            substitute_locals(expr, locals);
713        }
714        Expr::Like { expr, pattern, .. } => {
715            substitute_locals(expr, locals);
716            substitute_locals(pattern, locals);
717        }
718        Expr::FunctionCall { args, .. } => {
719            for a in args {
720                substitute_locals(a, locals);
721            }
722        }
723        Expr::Extract { source, .. } => substitute_locals(source, locals),
724        Expr::Array(items) => {
725            for elem in items {
726                substitute_locals(elem, locals);
727            }
728        }
729        Expr::ArraySubscript { target, index } => {
730            substitute_locals(target, locals);
731            substitute_locals(index, locals);
732        }
733        Expr::AnyAll { expr, array, .. } => {
734            substitute_locals(expr, locals);
735            substitute_locals(array, locals);
736        }
737        Expr::Case {
738            operand,
739            branches,
740            else_branch,
741        } => {
742            if let Some(o) = operand {
743                substitute_locals(o, locals);
744            }
745            for (w, t) in branches {
746                substitute_locals(w, locals);
747                substitute_locals(t, locals);
748            }
749            if let Some(e) = else_branch {
750                substitute_locals(e, locals);
751            }
752        }
753        Expr::Literal(_)
754        | Expr::Placeholder(_)
755        | Expr::Column(_)
756        | Expr::WindowFunction { .. }
757        | Expr::ScalarSubquery(_)
758        | Expr::Exists { .. }
759        | Expr::InSubquery { .. } => {}
760    }
761}
762
763fn eval_with_new_old(
764    expr: &Expr,
765    new_row: Option<&Row>,
766    old_row: Option<&Row>,
767    columns: &[ColumnSchema],
768    table_alias: &str,
769    params: &[Value],
770    default_text_search_config: Option<&str>,
771) -> Result<Value, EvalError> {
772    let mut rewritten = expr.clone();
773    substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
774    let ctx = EvalContext::new(columns, Some(table_alias))
775        .with_params(params)
776        .with_default_text_search_config(default_text_search_config);
777    // Empty row — the substitution above eliminated every column
778    // reference that depended on NEW / OLD; any remaining column
779    // reference is a bug (would surface as ColumnNotFound).
780    let empty = Row::new(Vec::new());
781    eval::eval_expr(&rewritten, &empty, &ctx)
782}
783
784/// In-place walk: replace every `Column{qualifier=NEW|OLD,name=c}`
785/// reference with the corresponding row value, materialised as
786/// an `Expr::Literal`. Recurses through every Expr variant so
787/// `to_tsvector('english', NEW.subject || ' ' || NEW.sender)`
788/// substitutes cleanly even though the references nest inside
789/// function calls + binary operators.
790fn substitute_new_old(
791    expr: &mut Expr,
792    new_row: Option<&Row>,
793    old_row: Option<&Row>,
794    columns: &[ColumnSchema],
795) -> Result<(), EvalError> {
796    if let Expr::Column(c) = expr {
797        if let Some(q) = &c.qualifier {
798            let lower = q.to_ascii_lowercase();
799            if lower == "new" || lower == "old" {
800                let (row, side) = if lower == "new" {
801                    (new_row, "NEW")
802                } else {
803                    (old_row, "OLD")
804                };
805                let pos = columns
806                    .iter()
807                    .position(|sc| sc.name.eq_ignore_ascii_case(&c.name))
808                    .ok_or_else(|| EvalError::ColumnNotFound {
809                        name: format!("{side}.{}", c.name),
810                    })?;
811                let v = match row {
812                    Some(r) => r.values.get(pos).cloned().unwrap_or(Value::Null),
813                    None => Value::Null,
814                };
815                *expr = value_to_literal_expr(columns, pos, v);
816                return Ok(());
817            }
818        }
819    }
820    match expr {
821        Expr::AggregateOrdered { call, order_by } => {
822            substitute_new_old(call, new_row, old_row, columns)?;
823            for o in order_by.iter_mut() {
824                substitute_new_old(&mut o.expr, new_row, old_row, columns)?;
825            }
826        }
827        Expr::Binary { lhs, rhs, .. } => {
828            substitute_new_old(lhs, new_row, old_row, columns)?;
829            substitute_new_old(rhs, new_row, old_row, columns)?;
830        }
831        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
832            substitute_new_old(expr, new_row, old_row, columns)?;
833        }
834        Expr::Like { expr, pattern, .. } => {
835            substitute_new_old(expr, new_row, old_row, columns)?;
836            substitute_new_old(pattern, new_row, old_row, columns)?;
837        }
838        Expr::FunctionCall { args, .. } => {
839            for a in args {
840                substitute_new_old(a, new_row, old_row, columns)?;
841            }
842        }
843        Expr::Extract { source, .. } => substitute_new_old(source, new_row, old_row, columns)?,
844        Expr::Array(items) => {
845            for elem in items {
846                substitute_new_old(elem, new_row, old_row, columns)?;
847            }
848        }
849        Expr::ArraySubscript { target, index } => {
850            substitute_new_old(target, new_row, old_row, columns)?;
851            substitute_new_old(index, new_row, old_row, columns)?;
852        }
853        Expr::AnyAll { expr, array, .. } => {
854            substitute_new_old(expr, new_row, old_row, columns)?;
855            substitute_new_old(array, new_row, old_row, columns)?;
856        }
857        Expr::Case {
858            operand,
859            branches,
860            else_branch,
861        } => {
862            if let Some(o) = operand {
863                substitute_new_old(o, new_row, old_row, columns)?;
864            }
865            for (w, t) in branches {
866                substitute_new_old(w, new_row, old_row, columns)?;
867                substitute_new_old(t, new_row, old_row, columns)?;
868            }
869            if let Some(e) = else_branch {
870                substitute_new_old(e, new_row, old_row, columns)?;
871            }
872        }
873        // Leaves + variants we don't recurse into (sub-queries
874        // inside a trigger body would require correlated-query
875        // wiring; carved out of v7.12.4).
876        Expr::Literal(_)
877        | Expr::Placeholder(_)
878        | Expr::Column(_)
879        | Expr::WindowFunction { .. }
880        | Expr::ScalarSubquery(_)
881        | Expr::Exists { .. }
882        | Expr::InSubquery { .. } => {}
883    }
884    Ok(())
885}
886
887/// Turn a [`Value`] back into an [`Expr::Literal`]. Necessary
888/// because [`substitute_new_old`] inlines NEW/OLD cell values
889/// into the expression tree.
890fn value_to_literal_expr(_columns: &[ColumnSchema], _pos: usize, v: Value) -> Expr {
891    use spg_sql::ast::Literal;
892    let lit = match v {
893        Value::Null => Literal::Null,
894        Value::Bool(b) => Literal::Bool(b),
895        Value::SmallInt(n) => Literal::Integer(i64::from(n)),
896        Value::Int(n) => Literal::Integer(i64::from(n)),
897        Value::BigInt(n) => Literal::Integer(n),
898        Value::Float(x) => Literal::Float(x),
899        Value::Text(s) | Value::Json(s) => Literal::String(s),
900        // Other values (Vector, Date, Timestamp, TsVector, etc.)
901        // round-trip through the Display form back into a string
902        // literal. v7.12.5 will add typed-literal variants here
903        // so the cast layer doesn't need to re-parse from text.
904        other => Literal::String(format!("{other:?}")),
905    };
906    Expr::Literal(lit)
907}
908
909/// v7.12.7 — substitute NEW / OLD / DECLARE-local references in
910/// every `Expr` field of a [`Statement`]. Used to materialise an
911/// embedded SQL statement's NEW.col / OLD.col / local-var refs as
912/// literals so the engine can re-execute it without holding the
913/// trigger context.
914fn substitute_trigger_context_in_statement(
915    stmt: &mut spg_sql::ast::Statement,
916    new_row: Option<&Row>,
917    old_row: Option<&Row>,
918    locals: &BTreeMap<String, Value>,
919    columns: &[ColumnSchema],
920) -> Result<(), EvalError> {
921    use spg_sql::ast::Statement;
922    let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
923        substitute_locals(e, locals);
924        substitute_new_old(e, new_row, old_row, columns)?;
925        Ok(())
926    };
927    match stmt {
928        Statement::Insert(s) => {
929            for tuple in &mut s.rows {
930                for e in tuple {
931                    walk(e)?;
932                }
933            }
934        }
935        Statement::Update(s) => {
936            for (_col, e) in &mut s.assignments {
937                walk(e)?;
938            }
939            if let Some(w) = &mut s.where_ {
940                walk(w)?;
941            }
942        }
943        Statement::Delete(s) => {
944            if let Some(w) = &mut s.where_ {
945                walk(w)?;
946            }
947        }
948        Statement::Select(s) => {
949            substitute_trigger_context_in_select(s, new_row, old_row, locals, columns)?
950        }
951        // Other statement kinds (DDL, SHOW, etc.) inside a
952        // trigger body would only meaningfully reference NEW/OLD
953        // in error-message position; v7.12.7 doesn't recursively
954        // substitute their Expr fields. Future surfaces (e.g.
955        // RAISE ... USING) can add cases here.
956        _ => {}
957    }
958    Ok(())
959}
960
961fn substitute_trigger_context_in_select(
962    s: &mut spg_sql::ast::SelectStatement,
963    new_row: Option<&Row>,
964    old_row: Option<&Row>,
965    locals: &BTreeMap<String, Value>,
966    columns: &[ColumnSchema],
967) -> Result<(), EvalError> {
968    use spg_sql::ast::SelectItem;
969    let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
970        substitute_locals(e, locals);
971        substitute_new_old(e, new_row, old_row, columns)?;
972        Ok(())
973    };
974    for item in &mut s.items {
975        if let SelectItem::Expr { expr, .. } = item {
976            walk(expr)?;
977        }
978    }
979    if let Some(w) = &mut s.where_ {
980        walk(w)?;
981    }
982    if let Some(group_by) = &mut s.group_by {
983        for g in group_by {
984            walk(g)?;
985        }
986    }
987    if let Some(h) = &mut s.having {
988        walk(h)?;
989    }
990    for ob in &mut s.order_by {
991        walk(&mut ob.expr)?;
992    }
993    // LIMIT / OFFSET use `LimitExpr` (integer literal or
994    // placeholder); they don't carry an `Expr` to substitute
995    // into. Leave them alone.
996    let _ = &s.limit;
997    let _ = &s.offset;
998    Ok(())
999}
1000
1001/// v7.12.4 — find the triggers that should fire for a given
1002/// `(table, event, timing)` tuple. Returns names so the caller
1003/// can iterate without holding a borrow on the catalog while it
1004/// mutates rows.
1005pub fn matching_trigger_names<'a>(
1006    triggers: &'a [TriggerDef],
1007    table: &str,
1008    event: &str,
1009    timing: &str,
1010) -> Vec<&'a TriggerDef> {
1011    triggers
1012        .iter()
1013        .filter(|t| {
1014            t.table == table
1015                && t.timing.eq_ignore_ascii_case(timing)
1016                && t.for_each.eq_ignore_ascii_case("row")
1017                && t.events.iter().any(|e| e.eq_ignore_ascii_case(event))
1018        })
1019        .collect()
1020}