Skip to main content

spg_engine/
triggers.rs

1//! v7.12.4 — PL/pgSQL row-level trigger executor.
2//!
3//! The catalogued [`spg_storage::FunctionDef`] carries the trigger
4//! function's source body as raw text (between the original
5//! `$$ ... $$`). Each time a trigger fires we re-parse the body
6//! via `spg_sql::parse_function_body` and walk the resulting
7//! [`spg_sql::ast::PlPgSqlBlock`] against a NEW / OLD row context.
8//!
9//! v7.12.4 surface (the minimum that lets a mailrs-shape
10//! `update_search_vector` trigger run end-to-end):
11//!
12//!   * `NEW.col := <expr>;`     — mutate a NEW cell. BEFORE only.
13//!   * `RETURN NEW;`            — pass the (possibly-mutated) row
14//!                                back to the row writer.
15//!   * `RETURN OLD;`            — return the pre-change row.
16//!   * `RETURN NULL;` / `RETURN;` — skip the write (BEFORE) or
17//!                                no-op the notification (AFTER).
18//!   * sub-expression eval recurses through the regular
19//!     [`crate::eval::eval_expr`] so anything the SELECT executor
20//!     can compute is fair game inside a trigger body.
21//!
22//! Out of scope for v7.12.4 (land in v7.12.5+):
23//!
24//!   * `DECLARE`'d local variables
25//!   * `IF / ELSIF / ELSE / END IF;` control flow
26//!   * Embedded SQL statements (`UPDATE … WHERE …`, `SELECT … INTO var`)
27//!   * `RAISE NOTICE / RAISE EXCEPTION`
28//!   * Loop constructs
29
30use alloc::collections::BTreeMap;
31use alloc::format;
32use alloc::string::String;
33use alloc::vec::Vec;
34use core::fmt;
35
36use spg_sql::ast::{AssignTarget, Expr, PlPgSqlDeclare, PlPgSqlStmt, RaiseLevel, ReturnTarget};
37use spg_storage::{ColumnSchema, FunctionDef, Row, TriggerDef, Value};
38
39use crate::eval::{self, EvalContext, EvalError};
40
41/// v7.12.7 — embedded SQL statement collected during a trigger
42/// fire, queued for execution after the firing DML completes.
43/// NEW / OLD / DECLARE-local references inside the statement's
44/// Expr tree have already been substituted with literals; the
45/// engine just feeds it to `execute_stmt_with_cancel`.
46#[derive(Debug, Clone, PartialEq)]
47pub struct DeferredEmbeddedStmt {
48    /// Trigger function the embedded SQL came from. Used to
49    /// label recursion errors precisely.
50    pub function: String,
51    /// Substituted statement, ready to execute.
52    pub stmt: spg_sql::ast::Statement,
53}
54
55/// What the trigger function returned. Drives the row-write path
56/// the trigger fired from.
57#[derive(Debug, Clone, PartialEq)]
58pub enum TriggerOutcome {
59    /// `RETURN NEW;` (or `RETURN OLD;`) — write this row.
60    /// For BEFORE triggers, the row may differ from the input
61    /// (e.g. `NEW.search_vector := …` rewrote a cell). For AFTER
62    /// triggers, the value is currently ignored — but we still
63    /// surface it for symmetric callers / future v7.12.5 use.
64    Row(Row),
65    /// `RETURN NULL;` or trigger fell off the end. For a BEFORE
66    /// trigger, the row writer must skip the affected row. For
67    /// an AFTER trigger, no-op.
68    Skip,
69}
70
71/// Result type the trigger executor exposes. Wraps `EvalError`
72/// at the eval-of-expressions layer and adds trigger-specific
73/// failure modes (`OLD.col := …`, unsupported PL/pgSQL feature,
74/// body that fails to re-parse, …).
75#[derive(Debug, Clone, PartialEq)]
76pub enum TriggerError {
77    /// Body source stored in the catalog can't be re-parsed.
78    /// Usually means the function was created against a newer
79    /// PL/pgSQL surface than the running engine knows about.
80    UnparseableBody { function: String, detail: String },
81    /// Trigger function uses a v7.12.5+ language feature
82    /// (DECLARE, IF, embedded SQL, RAISE, …). The error names
83    /// the construct so the operator can plan around it until
84    /// the feature lands.
85    UnsupportedConstruct { function: String, detail: String },
86    /// `OLD.col := <expr>` inside the body. PG itself rejects
87    /// this; we surface a clear message rather than silently
88    /// dropping the assignment.
89    OldIsReadOnly { function: String, column: String },
90    /// `NEW.col := <expr>` in an AFTER trigger — same rationale
91    /// as OLD: PG enforces "NEW is read-only after the row has
92    /// been written" and we mirror.
93    NewReadOnlyInAfterTrigger { function: String, column: String },
94    /// `NEW.col := <expr>` against a non-existent column.
95    /// Usually a schema-drift bug.
96    UnknownColumn {
97        function: String,
98        column: String,
99        table: String,
100    },
101    /// Sub-expression eval inside the trigger body failed. The
102    /// wrapped [`EvalError`] explains the underlying cause
103    /// (`ColumnNotFound`, `TypeMismatch`, …).
104    EvalFailed { function: String, cause: EvalError },
105    /// v7.12.6 — `RAISE EXCEPTION '<message>' [, args]*` in the
106    /// trigger body. The interpreter formats the args into the
107    /// message via PG-style `%` substitution and surfaces the
108    /// resolved text up to the caller.
109    RaiseException { function: String, message: String },
110}
111
112impl fmt::Display for TriggerError {
113    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
114        match self {
115            Self::UnparseableBody { function, detail } => {
116                write!(
117                    f,
118                    "trigger function {function:?} body did not parse: {detail}"
119                )
120            }
121            Self::UnsupportedConstruct { function, detail } => {
122                write!(
123                    f,
124                    "trigger function {function:?} uses an unsupported PL/pgSQL construct: {detail}"
125                )
126            }
127            Self::OldIsReadOnly { function, column } => {
128                write!(
129                    f,
130                    "trigger function {function:?}: cannot assign to OLD.{column} (OLD is read-only — PG rule)"
131                )
132            }
133            Self::NewReadOnlyInAfterTrigger { function, column } => {
134                write!(
135                    f,
136                    "trigger function {function:?}: cannot assign to NEW.{column} inside an AFTER trigger \
137                     (NEW is read-only post-write — use BEFORE triggers for mutation, or an embedded UPDATE statement \
138                      in v7.12.5+)"
139                )
140            }
141            Self::UnknownColumn {
142                function,
143                column,
144                table,
145            } => {
146                write!(
147                    f,
148                    "trigger function {function:?}: target column {column:?} not in table {table:?} schema"
149                )
150            }
151            Self::EvalFailed { function, cause } => {
152                write!(
153                    f,
154                    "trigger function {function:?}: expression eval failed: {cause}"
155                )
156            }
157            Self::RaiseException { function, message } => {
158                write!(
159                    f,
160                    "trigger function {function:?}: RAISE EXCEPTION {message:?}"
161                )
162            }
163        }
164    }
165}
166
167/// Fire a single row-level trigger.
168///
169/// `is_after` is true for AFTER triggers; the executor enforces
170/// "NEW is read-only" by rejecting NEW.col assignments in that
171/// case. AFTER trigger return values are ignored by callers; the
172/// returned [`TriggerOutcome`] just carries the (possibly
173/// untouched) NEW row for symmetry.
174#[allow(clippy::too_many_arguments)] // the table_name / columns / params /
175// ts-config trio are independent; folding
176// them into a struct just shuffles the
177// boilerplate to the call sites without
178// material gain.
179pub fn fire_row_trigger(
180    function: &FunctionDef,
181    new_row: Option<Row>,
182    old_row: Option<&Row>,
183    table_name: &str,
184    columns: &[ColumnSchema],
185    params: &[Value],
186    default_text_search_config: Option<&str>,
187    is_after: bool,
188) -> Result<(TriggerOutcome, Vec<DeferredEmbeddedStmt>), TriggerError> {
189    if !function.language.eq_ignore_ascii_case("plpgsql") {
190        return Err(TriggerError::UnsupportedConstruct {
191            function: function.name.clone(),
192            detail: format!(
193                "v7.12.4 only invokes LANGUAGE plpgsql trigger functions; \
194                 {:?} declares LANGUAGE {}",
195                function.name, function.language
196            ),
197        });
198    }
199    let block = spg_sql::parse_function_body(&function.body).map_err(|e| {
200        TriggerError::UnparseableBody {
201            function: function.name.clone(),
202            detail: format!("{e}"),
203        }
204    })?;
205    // v7.12.6 — initialise local variable scope from the DECLARE
206    // block. Each init expr (if any) evaluates against the
207    // so-far-bound scope + the NEW/OLD context, so later DECLAREs
208    // can reference earlier ones.
209    let mut locals: BTreeMap<String, Value> = BTreeMap::new();
210    init_locals_from_declarations(
211        &block.declarations,
212        &mut locals,
213        new_row.as_ref(),
214        old_row,
215        columns,
216        table_name,
217        params,
218        default_text_search_config,
219        &function.name,
220    )?;
221    let mut current_new = new_row;
222    let ctx = BodyCtx {
223        function: &function.name,
224        table_name,
225        columns,
226        params,
227        default_text_search_config,
228        is_after,
229        select_into_resolver: None,
230    };
231    let mut deferred: Vec<DeferredEmbeddedStmt> = Vec::new();
232    let outcome = match execute_stmts(
233        &block.statements,
234        &mut current_new,
235        old_row,
236        &mut locals,
237        &ctx,
238        &mut deferred,
239    )? {
240        BodyOutcome::Return(target) => resolve_return(target, current_new, old_row),
241        // Body fell off without an explicit RETURN. PL/pgSQL
242        // default is `RETURN NULL`; we mirror — the BEFORE
243        // trigger then skips the row.
244        BodyOutcome::FellThrough => TriggerOutcome::Skip,
245    };
246    Ok((outcome, deferred))
247}
248
249/// v7.12.6 — body-walk return signal. `Return(target)` short-
250/// circuits the caller; `FellThrough` means the statement list
251/// completed without a RETURN, equivalent to PL/pgSQL's implicit
252/// `RETURN NULL`.
253enum BodyOutcome {
254    Return(ReturnTarget),
255    FellThrough,
256}
257
258/// Shared parameters every body-stmt evaluation needs. Bundled so
259/// the recursive `execute_stmts` doesn't have to thread eight
260/// individual `&str` / `&[…]` args around.
261struct BodyCtx<'a> {
262    function: &'a str,
263    table_name: &'a str,
264    columns: &'a [ColumnSchema],
265    params: &'a [Value],
266    default_text_search_config: Option<&'a str>,
267    is_after: bool,
268    /// v7.16.2 — synchronous SELECT … INTO resolver. Provided
269    /// by `Engine::exec_do_block` so the walker can run a
270    /// SELECT against the engine right when SelectInto is
271    /// reached (so subsequent IF reads of the local see the
272    /// fresh value). `None` for trigger paths where SelectInto
273    /// isn't yet supported.
274    select_into_resolver: Option<&'a SelectIntoResolver<'a>>,
275}
276
277/// v7.16.2 — callback shape the DO-block executor registers
278/// on `BodyCtx`. Runs the supplied SELECT statement against
279/// the engine, returns the first row's first column.
280pub type SelectIntoResolver<'a> =
281    dyn Fn(&spg_sql::ast::Statement) -> Result<Value, TriggerError> + 'a;
282
283fn execute_stmts(
284    stmts: &[PlPgSqlStmt],
285    current_new: &mut Option<Row>,
286    old_row: Option<&Row>,
287    locals: &mut BTreeMap<String, Value>,
288    ctx: &BodyCtx<'_>,
289    deferred: &mut Vec<DeferredEmbeddedStmt>,
290) -> Result<BodyOutcome, TriggerError> {
291    for stmt in stmts {
292        match stmt {
293            PlPgSqlStmt::Assign { target, value } => {
294                let evaluated = eval_with_new_old_and_locals(
295                    value,
296                    current_new.as_ref(),
297                    old_row,
298                    locals,
299                    ctx.columns,
300                    ctx.table_name,
301                    ctx.params,
302                    ctx.default_text_search_config,
303                )
304                .map_err(|cause| TriggerError::EvalFailed {
305                    function: ctx.function.into(),
306                    cause,
307                })?;
308                match target {
309                    AssignTarget::NewColumn(col) => {
310                        if ctx.is_after {
311                            return Err(TriggerError::NewReadOnlyInAfterTrigger {
312                                function: ctx.function.into(),
313                                column: col.clone(),
314                            });
315                        }
316                        let pos = ctx
317                            .columns
318                            .iter()
319                            .position(|c| c.name.eq_ignore_ascii_case(col))
320                            .ok_or_else(|| TriggerError::UnknownColumn {
321                                function: ctx.function.into(),
322                                column: col.clone(),
323                                table: alloc::string::ToString::to_string(&ctx.table_name),
324                            })?;
325                        let row = current_new.as_mut().ok_or_else(|| {
326                            TriggerError::UnsupportedConstruct {
327                                function: ctx.function.into(),
328                                detail: format!(
329                                    "NEW.{col} := … requires a NEW row context \
330                                     (BEFORE INSERT / UPDATE only — not available on DELETE)"
331                                ),
332                            }
333                        })?;
334                        row.values[pos] = evaluated;
335                    }
336                    AssignTarget::OldColumn(col) => {
337                        return Err(TriggerError::OldIsReadOnly {
338                            function: ctx.function.into(),
339                            column: col.clone(),
340                        });
341                    }
342                    AssignTarget::Local(name) => {
343                        // v7.12.6 — write into the DECLARE scope.
344                        // Loose-typing: we don't enforce the
345                        // declared type at runtime (PG's INTO
346                        // coerces; v7.12.6 just stores the
347                        // evaluated Value as-is). Type coercion
348                        // tightens in a later release.
349                        locals.insert(name.clone(), evaluated);
350                    }
351                }
352            }
353            PlPgSqlStmt::Return(target) => {
354                return Ok(BodyOutcome::Return(target.clone()));
355            }
356            PlPgSqlStmt::If {
357                branches,
358                else_branch,
359            } => {
360                let mut matched = false;
361                for (cond_expr, body) in branches {
362                    let cond_val = eval_with_new_old_and_locals(
363                        cond_expr,
364                        current_new.as_ref(),
365                        old_row,
366                        locals,
367                        ctx.columns,
368                        ctx.table_name,
369                        ctx.params,
370                        ctx.default_text_search_config,
371                    )
372                    .map_err(|cause| TriggerError::EvalFailed {
373                        function: ctx.function.into(),
374                        cause,
375                    })?;
376                    if matches!(cond_val, Value::Bool(true)) {
377                        matched = true;
378                        match execute_stmts(body, current_new, old_row, locals, ctx, deferred)? {
379                            BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
380                            BodyOutcome::FellThrough => {}
381                        }
382                        break;
383                    }
384                }
385                if !matched && !else_branch.is_empty() {
386                    match execute_stmts(else_branch, current_new, old_row, locals, ctx, deferred)? {
387                        BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
388                        BodyOutcome::FellThrough => {}
389                    }
390                }
391            }
392            PlPgSqlStmt::Raise {
393                level,
394                message,
395                args,
396            } => {
397                // Resolve every %-format placeholder by evaluating
398                // each arg expression and rendering its Value.
399                let mut rendered_args: Vec<String> = Vec::with_capacity(args.len());
400                for a in args {
401                    let v = eval_with_new_old_and_locals(
402                        a,
403                        current_new.as_ref(),
404                        old_row,
405                        locals,
406                        ctx.columns,
407                        ctx.table_name,
408                        ctx.params,
409                        ctx.default_text_search_config,
410                    )
411                    .map_err(|cause| TriggerError::EvalFailed {
412                        function: ctx.function.into(),
413                        cause,
414                    })?;
415                    rendered_args.push(value_to_display_string(&v));
416                }
417                let resolved = format_raise_message(message, &rendered_args);
418                if matches!(level, RaiseLevel::Exception) {
419                    return Err(TriggerError::RaiseException {
420                        function: ctx.function.into(),
421                        message: resolved,
422                    });
423                }
424                // NOTICE / WARNING / INFO / LOG / DEBUG — log to
425                // stderr for v7.12.6. Wiring through the server's
426                // log channel is a v7.12.7+ polish item; the
427                // resolved message stays accessible regardless.
428                let _ = resolved;
429                let _ = level;
430            }
431            PlPgSqlStmt::SelectInto { var, body } => {
432                // v7.16.2 — execute via the engine callback the
433                // caller (Engine::exec_do_block) registered on
434                // ctx, assign the result to the local. Trigger
435                // path (no callback) errors loudly: SELECT INTO
436                // doesn't fit in a row-write loop.
437                let mut substituted = spg_sql::ast::Statement::Select((**body).clone());
438                substitute_trigger_context_in_statement(
439                    &mut substituted,
440                    current_new.as_ref(),
441                    old_row,
442                    locals,
443                    ctx.columns,
444                )
445                .map_err(|cause| TriggerError::EvalFailed {
446                    function: ctx.function.into(),
447                    cause,
448                })?;
449                let resolver =
450                    ctx.select_into_resolver.ok_or_else(|| TriggerError::UnsupportedConstruct {
451                        function: ctx.function.into(),
452                        detail: alloc::format!(
453                            "SELECT … INTO {var}: only supported inside DO blocks (not trigger bodies) in v7.16.2"
454                        ),
455                    })?;
456                let value = resolver(&substituted)?;
457                locals.insert(var.clone(), value);
458            }
459            PlPgSqlStmt::EmbeddedSql(boxed_stmt) => {
460                // v7.12.7 — substitute NEW/OLD/locals into every
461                // Expr field of the statement, then queue for
462                // post-DML execution. The trigger interpreter
463                // doesn't call back into Engine::execute directly
464                // (that would deadlock the row-write mut borrow);
465                // the engine drains `deferred` after the firing
466                // INSERT/UPDATE/DELETE completes its main work.
467                let mut substituted = (**boxed_stmt).clone();
468                substitute_trigger_context_in_statement(
469                    &mut substituted,
470                    current_new.as_ref(),
471                    old_row,
472                    locals,
473                    ctx.columns,
474                )
475                .map_err(|cause| TriggerError::EvalFailed {
476                    function: ctx.function.into(),
477                    cause,
478                })?;
479                deferred.push(DeferredEmbeddedStmt {
480                    function: ctx.function.into(),
481                    stmt: substituted,
482                });
483            }
484        }
485    }
486    Ok(BodyOutcome::FellThrough)
487}
488
489/// v7.16.2 — execute a DO block's PlPgSqlBlock at top level.
490/// Different from `fire_row_trigger` in three ways:
491///   1. No NEW/OLD row context — DO blocks aren't row-scoped.
492///   2. EmbeddedSql statements collected into the returned vec
493///      so the caller (`Engine::exec_do_block`) can dispatch
494///      them via `Engine::execute_in_with_cancel` IMMEDIATELY,
495///      not defer. Triggers defer because they fire inside a
496///      row-write `&mut Catalog` borrow; DO has no such borrow.
497///   3. Embedded condition Expr (e.g. `IF EXISTS (SELECT ...)`)
498///      evaluation happens inline against the engine's
499///      current state — the caller resolves the subquery
500///      result before walking the body. We do that by
501///      collecting the IF / Assign / RAISE statements and
502///      letting the caller-side evaluator decide; v7.16.2's
503///      simple path lets `eval_with_new_old_and_locals` do
504///      it inline, falling back to the embedded sub-engine
505///      for SELECT subqueries via the regular eval path.
506///
507/// Returns the deferred SQL list in execution order. Errors
508/// from the walk propagate verbatim (parse / eval / engine).
509pub fn execute_do_block_top_level<'a>(
510    block: &spg_sql::ast::PlPgSqlBlock,
511    default_text_search_config: Option<&'a str>,
512    select_into_resolver: Option<&'a SelectIntoResolver<'a>>,
513) -> Result<Vec<spg_sql::ast::Statement>, TriggerError> {
514    let mut locals: BTreeMap<String, Value> = BTreeMap::new();
515    let empty_cols: &[ColumnSchema] = &[];
516    init_locals_from_declarations(
517        &block.declarations,
518        &mut locals,
519        None,
520        None,
521        empty_cols,
522        "",
523        &[],
524        default_text_search_config,
525        "DO",
526    )?;
527    let ctx = BodyCtx {
528        function: "DO",
529        table_name: "",
530        columns: empty_cols,
531        params: &[],
532        default_text_search_config,
533        is_after: false,
534        select_into_resolver,
535    };
536    let mut current_new: Option<Row> = None;
537    let mut deferred: Vec<DeferredEmbeddedStmt> = Vec::new();
538    // execute_stmts returns BodyOutcome — for DO top-level we
539    // ignore the return target (RETURN inside DO is a no-op
540    // by PG semantics: the block's outer scope has no return
541    // contract).
542    let _ = execute_stmts(
543        &block.statements,
544        &mut current_new,
545        None,
546        &mut locals,
547        &ctx,
548        &mut deferred,
549    )?;
550    Ok(deferred.into_iter().map(|d| d.stmt).collect())
551}
552
553fn resolve_return(
554    target: ReturnTarget,
555    current_new: Option<Row>,
556    old_row: Option<&Row>,
557) -> TriggerOutcome {
558    match target {
559        ReturnTarget::New => current_new.map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
560        ReturnTarget::Old => old_row
561            .cloned()
562            .map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
563        ReturnTarget::Null => TriggerOutcome::Skip,
564        // The scalar UDF surface in a later release handles
565        // RETURN <expr> properly; for now we fall through to Skip.
566        ReturnTarget::Expr(_) => TriggerOutcome::Skip,
567    }
568}
569
570#[allow(clippy::too_many_arguments)]
571fn init_locals_from_declarations(
572    decls: &[PlPgSqlDeclare],
573    locals: &mut BTreeMap<String, Value>,
574    new_row: Option<&Row>,
575    old_row: Option<&Row>,
576    columns: &[ColumnSchema],
577    table_name: &str,
578    params: &[Value],
579    default_text_search_config: Option<&str>,
580    function_name: &str,
581) -> Result<(), TriggerError> {
582    for d in decls {
583        let v = if let Some(init) = &d.default {
584            eval_with_new_old_and_locals(
585                init,
586                new_row,
587                old_row,
588                locals,
589                columns,
590                table_name,
591                params,
592                default_text_search_config,
593            )
594            .map_err(|cause| TriggerError::EvalFailed {
595                function: function_name.into(),
596                cause,
597            })?
598        } else {
599            Value::Null
600        };
601        locals.insert(d.name.clone(), v);
602    }
603    Ok(())
604}
605
606/// v7.12.6 — PG `%` format expansion for RAISE. Sequential
607/// positional substitution; `%%` produces a literal `%`.
608fn format_raise_message(fmt: &str, args: &[String]) -> String {
609    let mut out = String::with_capacity(fmt.len());
610    let mut iter = args.iter();
611    let mut chars = fmt.chars().peekable();
612    while let Some(c) = chars.next() {
613        if c == '%' {
614            match chars.peek() {
615                Some('%') => {
616                    out.push('%');
617                    chars.next();
618                }
619                _ => {
620                    if let Some(a) = iter.next() {
621                        out.push_str(a);
622                    } else {
623                        // Unconsumed placeholder — PG emits an
624                        // error here; we mirror by leaving the
625                        // bare `%` so the message stays readable.
626                        out.push('%');
627                    }
628                }
629            }
630        } else {
631            out.push(c);
632        }
633    }
634    out
635}
636
637/// v7.12.6 — Display rendering for a [`Value`] inside a RAISE
638/// message arg. Booleans / ints / floats render naturally;
639/// strings render unquoted; other types fall back to Debug.
640fn value_to_display_string(v: &Value) -> String {
641    use alloc::string::ToString;
642    match v {
643        Value::Null => String::new(),
644        Value::Bool(b) => b.to_string(),
645        Value::SmallInt(n) => n.to_string(),
646        Value::Int(n) => n.to_string(),
647        Value::BigInt(n) => n.to_string(),
648        Value::Float(x) => x.to_string(),
649        Value::Text(s) | Value::Json(s) => s.clone(),
650        other => format!("{other:?}"),
651    }
652}
653
654/// Evaluate a sub-expression against the NEW / OLD row context.
655/// Pre-walks the AST replacing every `NEW.col` / `OLD.col`
656/// reference with a literal of the actual value, then dispatches
657/// to the regular [`eval::eval_expr`]. Pre-walk strategy mirrors
658/// the existing [`substitute_in_expr`] used by correlated
659/// subqueries.
660/// v7.12.6 — same as [`eval_with_new_old`] but also substitutes
661/// qualifier-less `Column(<name>)` references whose name matches
662/// a `DECLARE`'d local variable. Locals shadow table-column refs
663/// (PG semantics — though a careful trigger function avoids the
664/// collision via naming convention).
665#[allow(clippy::too_many_arguments)]
666fn eval_with_new_old_and_locals(
667    expr: &Expr,
668    new_row: Option<&Row>,
669    old_row: Option<&Row>,
670    locals: &BTreeMap<String, Value>,
671    columns: &[ColumnSchema],
672    table_alias: &str,
673    params: &[Value],
674    default_text_search_config: Option<&str>,
675) -> Result<Value, EvalError> {
676    let mut rewritten = expr.clone();
677    substitute_locals(&mut rewritten, locals);
678    substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
679    let ctx = EvalContext::new(columns, Some(table_alias))
680        .with_params(params)
681        .with_default_text_search_config(default_text_search_config);
682    let empty = Row::new(Vec::new());
683    eval::eval_expr(&rewritten, &empty, &ctx)
684}
685
686/// v7.12.6 — in-place substitute every qualifier-less
687/// `Column(<name>)` whose name is in `locals` with that local's
688/// current Value as a literal. Runs before [`substitute_new_old`]
689/// so NEW.col / OLD.col references (which have a qualifier) take
690/// the NEW/OLD path normally.
691fn substitute_locals(expr: &mut Expr, locals: &BTreeMap<String, Value>) {
692    if let Expr::Column(c) = expr {
693        if c.qualifier.is_none()
694            && let Some(v) = locals.get(&c.name)
695        {
696            *expr = value_to_literal_expr(&[], 0, v.clone());
697            return;
698        }
699    }
700    match expr {
701        Expr::Binary { lhs, rhs, .. } => {
702            substitute_locals(lhs, locals);
703            substitute_locals(rhs, locals);
704        }
705        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
706            substitute_locals(expr, locals);
707        }
708        Expr::Like { expr, pattern, .. } => {
709            substitute_locals(expr, locals);
710            substitute_locals(pattern, locals);
711        }
712        Expr::FunctionCall { args, .. } => {
713            for a in args {
714                substitute_locals(a, locals);
715            }
716        }
717        Expr::Extract { source, .. } => substitute_locals(source, locals),
718        Expr::Array(items) => {
719            for elem in items {
720                substitute_locals(elem, locals);
721            }
722        }
723        Expr::ArraySubscript { target, index } => {
724            substitute_locals(target, locals);
725            substitute_locals(index, locals);
726        }
727        Expr::AnyAll { expr, array, .. } => {
728            substitute_locals(expr, locals);
729            substitute_locals(array, locals);
730        }
731        Expr::Case {
732            operand,
733            branches,
734            else_branch,
735        } => {
736            if let Some(o) = operand {
737                substitute_locals(o, locals);
738            }
739            for (w, t) in branches {
740                substitute_locals(w, locals);
741                substitute_locals(t, locals);
742            }
743            if let Some(e) = else_branch {
744                substitute_locals(e, locals);
745            }
746        }
747        Expr::Literal(_)
748        | Expr::Placeholder(_)
749        | Expr::Column(_)
750        | Expr::WindowFunction { .. }
751        | Expr::ScalarSubquery(_)
752        | Expr::Exists { .. }
753        | Expr::InSubquery { .. } => {}
754    }
755}
756
757fn eval_with_new_old(
758    expr: &Expr,
759    new_row: Option<&Row>,
760    old_row: Option<&Row>,
761    columns: &[ColumnSchema],
762    table_alias: &str,
763    params: &[Value],
764    default_text_search_config: Option<&str>,
765) -> Result<Value, EvalError> {
766    let mut rewritten = expr.clone();
767    substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
768    let ctx = EvalContext::new(columns, Some(table_alias))
769        .with_params(params)
770        .with_default_text_search_config(default_text_search_config);
771    // Empty row — the substitution above eliminated every column
772    // reference that depended on NEW / OLD; any remaining column
773    // reference is a bug (would surface as ColumnNotFound).
774    let empty = Row::new(Vec::new());
775    eval::eval_expr(&rewritten, &empty, &ctx)
776}
777
778/// In-place walk: replace every `Column{qualifier=NEW|OLD,name=c}`
779/// reference with the corresponding row value, materialised as
780/// an `Expr::Literal`. Recurses through every Expr variant so
781/// `to_tsvector('english', NEW.subject || ' ' || NEW.sender)`
782/// substitutes cleanly even though the references nest inside
783/// function calls + binary operators.
784fn substitute_new_old(
785    expr: &mut Expr,
786    new_row: Option<&Row>,
787    old_row: Option<&Row>,
788    columns: &[ColumnSchema],
789) -> Result<(), EvalError> {
790    if let Expr::Column(c) = expr {
791        if let Some(q) = &c.qualifier {
792            let lower = q.to_ascii_lowercase();
793            if lower == "new" || lower == "old" {
794                let (row, side) = if lower == "new" {
795                    (new_row, "NEW")
796                } else {
797                    (old_row, "OLD")
798                };
799                let pos = columns
800                    .iter()
801                    .position(|sc| sc.name.eq_ignore_ascii_case(&c.name))
802                    .ok_or_else(|| EvalError::ColumnNotFound {
803                        name: format!("{side}.{}", c.name),
804                    })?;
805                let v = match row {
806                    Some(r) => r.values.get(pos).cloned().unwrap_or(Value::Null),
807                    None => Value::Null,
808                };
809                *expr = value_to_literal_expr(columns, pos, v);
810                return Ok(());
811            }
812        }
813    }
814    match expr {
815        Expr::Binary { lhs, rhs, .. } => {
816            substitute_new_old(lhs, new_row, old_row, columns)?;
817            substitute_new_old(rhs, new_row, old_row, columns)?;
818        }
819        Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
820            substitute_new_old(expr, new_row, old_row, columns)?;
821        }
822        Expr::Like { expr, pattern, .. } => {
823            substitute_new_old(expr, new_row, old_row, columns)?;
824            substitute_new_old(pattern, new_row, old_row, columns)?;
825        }
826        Expr::FunctionCall { args, .. } => {
827            for a in args {
828                substitute_new_old(a, new_row, old_row, columns)?;
829            }
830        }
831        Expr::Extract { source, .. } => substitute_new_old(source, new_row, old_row, columns)?,
832        Expr::Array(items) => {
833            for elem in items {
834                substitute_new_old(elem, new_row, old_row, columns)?;
835            }
836        }
837        Expr::ArraySubscript { target, index } => {
838            substitute_new_old(target, new_row, old_row, columns)?;
839            substitute_new_old(index, new_row, old_row, columns)?;
840        }
841        Expr::AnyAll { expr, array, .. } => {
842            substitute_new_old(expr, new_row, old_row, columns)?;
843            substitute_new_old(array, new_row, old_row, columns)?;
844        }
845        Expr::Case {
846            operand,
847            branches,
848            else_branch,
849        } => {
850            if let Some(o) = operand {
851                substitute_new_old(o, new_row, old_row, columns)?;
852            }
853            for (w, t) in branches {
854                substitute_new_old(w, new_row, old_row, columns)?;
855                substitute_new_old(t, new_row, old_row, columns)?;
856            }
857            if let Some(e) = else_branch {
858                substitute_new_old(e, new_row, old_row, columns)?;
859            }
860        }
861        // Leaves + variants we don't recurse into (sub-queries
862        // inside a trigger body would require correlated-query
863        // wiring; carved out of v7.12.4).
864        Expr::Literal(_)
865        | Expr::Placeholder(_)
866        | Expr::Column(_)
867        | Expr::WindowFunction { .. }
868        | Expr::ScalarSubquery(_)
869        | Expr::Exists { .. }
870        | Expr::InSubquery { .. } => {}
871    }
872    Ok(())
873}
874
875/// Turn a [`Value`] back into an [`Expr::Literal`]. Necessary
876/// because [`substitute_new_old`] inlines NEW/OLD cell values
877/// into the expression tree.
878fn value_to_literal_expr(_columns: &[ColumnSchema], _pos: usize, v: Value) -> Expr {
879    use spg_sql::ast::Literal;
880    let lit = match v {
881        Value::Null => Literal::Null,
882        Value::Bool(b) => Literal::Bool(b),
883        Value::SmallInt(n) => Literal::Integer(i64::from(n)),
884        Value::Int(n) => Literal::Integer(i64::from(n)),
885        Value::BigInt(n) => Literal::Integer(n),
886        Value::Float(x) => Literal::Float(x),
887        Value::Text(s) | Value::Json(s) => Literal::String(s),
888        // Other values (Vector, Date, Timestamp, TsVector, etc.)
889        // round-trip through the Display form back into a string
890        // literal. v7.12.5 will add typed-literal variants here
891        // so the cast layer doesn't need to re-parse from text.
892        other => Literal::String(format!("{other:?}")),
893    };
894    Expr::Literal(lit)
895}
896
897/// v7.12.7 — substitute NEW / OLD / DECLARE-local references in
898/// every `Expr` field of a [`Statement`]. Used to materialise an
899/// embedded SQL statement's NEW.col / OLD.col / local-var refs as
900/// literals so the engine can re-execute it without holding the
901/// trigger context.
902fn substitute_trigger_context_in_statement(
903    stmt: &mut spg_sql::ast::Statement,
904    new_row: Option<&Row>,
905    old_row: Option<&Row>,
906    locals: &BTreeMap<String, Value>,
907    columns: &[ColumnSchema],
908) -> Result<(), EvalError> {
909    use spg_sql::ast::Statement;
910    let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
911        substitute_locals(e, locals);
912        substitute_new_old(e, new_row, old_row, columns)?;
913        Ok(())
914    };
915    match stmt {
916        Statement::Insert(s) => {
917            for tuple in &mut s.rows {
918                for e in tuple {
919                    walk(e)?;
920                }
921            }
922        }
923        Statement::Update(s) => {
924            for (_col, e) in &mut s.assignments {
925                walk(e)?;
926            }
927            if let Some(w) = &mut s.where_ {
928                walk(w)?;
929            }
930        }
931        Statement::Delete(s) => {
932            if let Some(w) = &mut s.where_ {
933                walk(w)?;
934            }
935        }
936        Statement::Select(s) => {
937            substitute_trigger_context_in_select(s, new_row, old_row, locals, columns)?
938        }
939        // Other statement kinds (DDL, SHOW, etc.) inside a
940        // trigger body would only meaningfully reference NEW/OLD
941        // in error-message position; v7.12.7 doesn't recursively
942        // substitute their Expr fields. Future surfaces (e.g.
943        // RAISE ... USING) can add cases here.
944        _ => {}
945    }
946    Ok(())
947}
948
949fn substitute_trigger_context_in_select(
950    s: &mut spg_sql::ast::SelectStatement,
951    new_row: Option<&Row>,
952    old_row: Option<&Row>,
953    locals: &BTreeMap<String, Value>,
954    columns: &[ColumnSchema],
955) -> Result<(), EvalError> {
956    use spg_sql::ast::SelectItem;
957    let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
958        substitute_locals(e, locals);
959        substitute_new_old(e, new_row, old_row, columns)?;
960        Ok(())
961    };
962    for item in &mut s.items {
963        if let SelectItem::Expr { expr, .. } = item {
964            walk(expr)?;
965        }
966    }
967    if let Some(w) = &mut s.where_ {
968        walk(w)?;
969    }
970    if let Some(group_by) = &mut s.group_by {
971        for g in group_by {
972            walk(g)?;
973        }
974    }
975    if let Some(h) = &mut s.having {
976        walk(h)?;
977    }
978    for ob in &mut s.order_by {
979        walk(&mut ob.expr)?;
980    }
981    // LIMIT / OFFSET use `LimitExpr` (integer literal or
982    // placeholder); they don't carry an `Expr` to substitute
983    // into. Leave them alone.
984    let _ = &s.limit;
985    let _ = &s.offset;
986    Ok(())
987}
988
989/// v7.12.4 — find the triggers that should fire for a given
990/// `(table, event, timing)` tuple. Returns names so the caller
991/// can iterate without holding a borrow on the catalog while it
992/// mutates rows.
993pub fn matching_trigger_names<'a>(
994    triggers: &'a [TriggerDef],
995    table: &str,
996    event: &str,
997    timing: &str,
998) -> Vec<&'a TriggerDef> {
999    triggers
1000        .iter()
1001        .filter(|t| {
1002            t.table == table
1003                && t.timing.eq_ignore_ascii_case(timing)
1004                && t.for_each.eq_ignore_ascii_case("row")
1005                && t.events.iter().any(|e| e.eq_ignore_ascii_case(event))
1006        })
1007        .collect()
1008}