1use alloc::collections::BTreeMap;
31use alloc::format;
32use alloc::string::String;
33use alloc::vec::Vec;
34use core::fmt;
35
36use spg_sql::ast::{AssignTarget, Expr, PlPgSqlDeclare, PlPgSqlStmt, RaiseLevel, ReturnTarget};
37use spg_storage::{ColumnSchema, FunctionDef, Row, StorageError, TriggerDef, Value};
38
39use crate::eval::{self, EvalContext, EvalError};
40use crate::{CancelToken, Engine, EngineError, MAX_TRIGGER_RECURSION};
41
42#[derive(Debug, Clone, PartialEq)]
48pub struct DeferredEmbeddedStmt {
49 pub function: String,
52 pub stmt: spg_sql::ast::Statement,
54}
55
56#[derive(Debug, Clone, PartialEq)]
59pub enum TriggerOutcome {
60 Row(Row),
66 Skip,
70}
71
72#[derive(Debug, Clone, PartialEq)]
77pub enum TriggerError {
78 UnparseableBody { function: String, detail: String },
82 UnsupportedConstruct { function: String, detail: String },
87 OldIsReadOnly { function: String, column: String },
91 NewReadOnlyInAfterTrigger { function: String, column: String },
95 UnknownColumn {
98 function: String,
99 column: String,
100 table: String,
101 },
102 EvalFailed { function: String, cause: EvalError },
106 RaiseException { function: String, message: String },
111}
112
113impl fmt::Display for TriggerError {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 match self {
116 Self::UnparseableBody { function, detail } => {
117 write!(
118 f,
119 "trigger function {function:?} body did not parse: {detail}"
120 )
121 }
122 Self::UnsupportedConstruct { function, detail } => {
123 write!(
124 f,
125 "trigger function {function:?} uses an unsupported PL/pgSQL construct: {detail}"
126 )
127 }
128 Self::OldIsReadOnly { function, column } => {
129 write!(
130 f,
131 "trigger function {function:?}: cannot assign to OLD.{column} (OLD is read-only — PG rule)"
132 )
133 }
134 Self::NewReadOnlyInAfterTrigger { function, column } => {
135 write!(
136 f,
137 "trigger function {function:?}: cannot assign to NEW.{column} inside an AFTER trigger \
138 (NEW is read-only post-write — use BEFORE triggers for mutation, or an embedded UPDATE statement \
139 in v7.12.5+)"
140 )
141 }
142 Self::UnknownColumn {
143 function,
144 column,
145 table,
146 } => {
147 write!(
148 f,
149 "trigger function {function:?}: target column {column:?} not in table {table:?} schema"
150 )
151 }
152 Self::EvalFailed { function, cause } => {
153 write!(
154 f,
155 "trigger function {function:?}: expression eval failed: {cause}"
156 )
157 }
158 Self::RaiseException { function, message } => {
159 write!(
160 f,
161 "trigger function {function:?}: RAISE EXCEPTION {message:?}"
162 )
163 }
164 }
165 }
166}
167
168#[allow(clippy::too_many_arguments)] pub fn fire_row_trigger(
181 function: &FunctionDef,
182 new_row: Option<Row>,
183 old_row: Option<&Row>,
184 table_name: &str,
185 columns: &[ColumnSchema],
186 params: &[Value],
187 default_text_search_config: Option<&str>,
188 is_after: bool,
189) -> Result<(TriggerOutcome, Vec<DeferredEmbeddedStmt>), TriggerError> {
190 if !function.language.eq_ignore_ascii_case("plpgsql") {
191 return Err(TriggerError::UnsupportedConstruct {
192 function: function.name.clone(),
193 detail: format!(
194 "v7.12.4 only invokes LANGUAGE plpgsql trigger functions; \
195 {:?} declares LANGUAGE {}",
196 function.name, function.language
197 ),
198 });
199 }
200 let block = spg_sql::parse_function_body(&function.body).map_err(|e| {
201 TriggerError::UnparseableBody {
202 function: function.name.clone(),
203 detail: format!("{e}"),
204 }
205 })?;
206 let mut locals: BTreeMap<String, Value> = BTreeMap::new();
211 init_locals_from_declarations(
212 &block.declarations,
213 &mut locals,
214 new_row.as_ref(),
215 old_row,
216 columns,
217 table_name,
218 params,
219 default_text_search_config,
220 &function.name,
221 )?;
222 let mut current_new = new_row;
223 let ctx = BodyCtx {
224 function: &function.name,
225 table_name,
226 columns,
227 params,
228 default_text_search_config,
229 is_after,
230 select_into_resolver: None,
231 };
232 let mut deferred: Vec<DeferredEmbeddedStmt> = Vec::new();
233 let outcome = match execute_stmts(
234 &block.statements,
235 &mut current_new,
236 old_row,
237 &mut locals,
238 &ctx,
239 &mut deferred,
240 )? {
241 BodyOutcome::Return(target) => resolve_return(target, current_new, old_row),
242 BodyOutcome::FellThrough => TriggerOutcome::Skip,
246 };
247 Ok((outcome, deferred))
248}
249
250enum BodyOutcome {
255 Return(ReturnTarget),
256 FellThrough,
257}
258
259struct BodyCtx<'a> {
263 function: &'a str,
264 table_name: &'a str,
265 columns: &'a [ColumnSchema],
266 params: &'a [Value],
267 default_text_search_config: Option<&'a str>,
268 is_after: bool,
269 select_into_resolver: Option<&'a SelectIntoResolver<'a>>,
276}
277
278pub type SelectIntoResolver<'a> =
282 dyn Fn(&spg_sql::ast::Statement) -> Result<Value, TriggerError> + 'a;
283
284fn execute_stmts(
285 stmts: &[PlPgSqlStmt],
286 current_new: &mut Option<Row>,
287 old_row: Option<&Row>,
288 locals: &mut BTreeMap<String, Value>,
289 ctx: &BodyCtx<'_>,
290 deferred: &mut Vec<DeferredEmbeddedStmt>,
291) -> Result<BodyOutcome, TriggerError> {
292 for stmt in stmts {
293 match stmt {
294 PlPgSqlStmt::Assign { target, value } => {
295 let evaluated = eval_with_new_old_and_locals(
296 value,
297 current_new.as_ref(),
298 old_row,
299 locals,
300 ctx.columns,
301 ctx.table_name,
302 ctx.params,
303 ctx.default_text_search_config,
304 )
305 .map_err(|cause| TriggerError::EvalFailed {
306 function: ctx.function.into(),
307 cause,
308 })?;
309 match target {
310 AssignTarget::NewColumn(col) => {
311 if ctx.is_after {
312 return Err(TriggerError::NewReadOnlyInAfterTrigger {
313 function: ctx.function.into(),
314 column: col.clone(),
315 });
316 }
317 let pos = ctx
318 .columns
319 .iter()
320 .position(|c| c.name.eq_ignore_ascii_case(col))
321 .ok_or_else(|| TriggerError::UnknownColumn {
322 function: ctx.function.into(),
323 column: col.clone(),
324 table: alloc::string::ToString::to_string(&ctx.table_name),
325 })?;
326 let row = current_new.as_mut().ok_or_else(|| {
327 TriggerError::UnsupportedConstruct {
328 function: ctx.function.into(),
329 detail: format!(
330 "NEW.{col} := … requires a NEW row context \
331 (BEFORE INSERT / UPDATE only — not available on DELETE)"
332 ),
333 }
334 })?;
335 row.values[pos] = evaluated;
336 }
337 AssignTarget::OldColumn(col) => {
338 return Err(TriggerError::OldIsReadOnly {
339 function: ctx.function.into(),
340 column: col.clone(),
341 });
342 }
343 AssignTarget::Local(name) => {
344 locals.insert(name.clone(), evaluated);
351 }
352 }
353 }
354 PlPgSqlStmt::Return(target) => {
355 return Ok(BodyOutcome::Return(target.clone()));
356 }
357 PlPgSqlStmt::If {
358 branches,
359 else_branch,
360 } => {
361 let mut matched = false;
362 for (cond_expr, body) in branches {
363 let cond_val = eval_with_new_old_and_locals(
364 cond_expr,
365 current_new.as_ref(),
366 old_row,
367 locals,
368 ctx.columns,
369 ctx.table_name,
370 ctx.params,
371 ctx.default_text_search_config,
372 )
373 .map_err(|cause| TriggerError::EvalFailed {
374 function: ctx.function.into(),
375 cause,
376 })?;
377 if matches!(cond_val, Value::Bool(true)) {
378 matched = true;
379 match execute_stmts(body, current_new, old_row, locals, ctx, deferred)? {
380 BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
381 BodyOutcome::FellThrough => {}
382 }
383 break;
384 }
385 }
386 if !matched && !else_branch.is_empty() {
387 match execute_stmts(else_branch, current_new, old_row, locals, ctx, deferred)? {
388 BodyOutcome::Return(t) => return Ok(BodyOutcome::Return(t)),
389 BodyOutcome::FellThrough => {}
390 }
391 }
392 }
393 PlPgSqlStmt::Raise {
394 level,
395 message,
396 args,
397 } => {
398 let mut rendered_args: Vec<String> = Vec::with_capacity(args.len());
401 for a in args {
402 let v = eval_with_new_old_and_locals(
403 a,
404 current_new.as_ref(),
405 old_row,
406 locals,
407 ctx.columns,
408 ctx.table_name,
409 ctx.params,
410 ctx.default_text_search_config,
411 )
412 .map_err(|cause| TriggerError::EvalFailed {
413 function: ctx.function.into(),
414 cause,
415 })?;
416 rendered_args.push(value_to_display_string(&v));
417 }
418 let resolved = format_raise_message(message, &rendered_args);
419 if matches!(level, RaiseLevel::Exception) {
420 return Err(TriggerError::RaiseException {
421 function: ctx.function.into(),
422 message: resolved,
423 });
424 }
425 let _ = resolved;
430 let _ = level;
431 }
432 PlPgSqlStmt::SelectInto { var, body } => {
433 let mut substituted = spg_sql::ast::Statement::Select((**body).clone());
439 substitute_trigger_context_in_statement(
440 &mut substituted,
441 current_new.as_ref(),
442 old_row,
443 locals,
444 ctx.columns,
445 )
446 .map_err(|cause| TriggerError::EvalFailed {
447 function: ctx.function.into(),
448 cause,
449 })?;
450 let resolver =
451 ctx.select_into_resolver.ok_or_else(|| TriggerError::UnsupportedConstruct {
452 function: ctx.function.into(),
453 detail: alloc::format!(
454 "SELECT … INTO {var}: only supported inside DO blocks (not trigger bodies) in v7.16.2"
455 ),
456 })?;
457 let value = resolver(&substituted)?;
458 locals.insert(var.clone(), value);
459 }
460 PlPgSqlStmt::EmbeddedSql(boxed_stmt) => {
461 let mut substituted = (**boxed_stmt).clone();
469 substitute_trigger_context_in_statement(
470 &mut substituted,
471 current_new.as_ref(),
472 old_row,
473 locals,
474 ctx.columns,
475 )
476 .map_err(|cause| TriggerError::EvalFailed {
477 function: ctx.function.into(),
478 cause,
479 })?;
480 deferred.push(DeferredEmbeddedStmt {
481 function: ctx.function.into(),
482 stmt: substituted,
483 });
484 }
485 }
486 }
487 Ok(BodyOutcome::FellThrough)
488}
489
490pub fn execute_do_block_top_level<'a>(
511 block: &spg_sql::ast::PlPgSqlBlock,
512 default_text_search_config: Option<&'a str>,
513 select_into_resolver: Option<&'a SelectIntoResolver<'a>>,
514) -> Result<Vec<spg_sql::ast::Statement>, TriggerError> {
515 let mut locals: BTreeMap<String, Value> = BTreeMap::new();
516 let empty_cols: &[ColumnSchema] = &[];
517 init_locals_from_declarations(
518 &block.declarations,
519 &mut locals,
520 None,
521 None,
522 empty_cols,
523 "",
524 &[],
525 default_text_search_config,
526 "DO",
527 )?;
528 let ctx = BodyCtx {
529 function: "DO",
530 table_name: "",
531 columns: empty_cols,
532 params: &[],
533 default_text_search_config,
534 is_after: false,
535 select_into_resolver,
536 };
537 let mut current_new: Option<Row> = None;
538 let mut deferred: Vec<DeferredEmbeddedStmt> = Vec::new();
539 let _ = execute_stmts(
544 &block.statements,
545 &mut current_new,
546 None,
547 &mut locals,
548 &ctx,
549 &mut deferred,
550 )?;
551 Ok(deferred.into_iter().map(|d| d.stmt).collect())
552}
553
554fn resolve_return(
555 target: ReturnTarget,
556 current_new: Option<Row>,
557 old_row: Option<&Row>,
558) -> TriggerOutcome {
559 match target {
560 ReturnTarget::New => current_new.map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
561 ReturnTarget::Old => old_row
562 .cloned()
563 .map_or(TriggerOutcome::Skip, TriggerOutcome::Row),
564 ReturnTarget::Null => TriggerOutcome::Skip,
565 ReturnTarget::Expr(_) => TriggerOutcome::Skip,
568 }
569}
570
571#[allow(clippy::too_many_arguments)]
572fn init_locals_from_declarations(
573 decls: &[PlPgSqlDeclare],
574 locals: &mut BTreeMap<String, Value>,
575 new_row: Option<&Row>,
576 old_row: Option<&Row>,
577 columns: &[ColumnSchema],
578 table_name: &str,
579 params: &[Value],
580 default_text_search_config: Option<&str>,
581 function_name: &str,
582) -> Result<(), TriggerError> {
583 for d in decls {
584 let v = if let Some(init) = &d.default {
585 eval_with_new_old_and_locals(
586 init,
587 new_row,
588 old_row,
589 locals,
590 columns,
591 table_name,
592 params,
593 default_text_search_config,
594 )
595 .map_err(|cause| TriggerError::EvalFailed {
596 function: function_name.into(),
597 cause,
598 })?
599 } else {
600 Value::Null
601 };
602 locals.insert(d.name.clone(), v);
603 }
604 Ok(())
605}
606
607fn format_raise_message(fmt: &str, args: &[String]) -> String {
610 let mut out = String::with_capacity(fmt.len());
611 let mut iter = args.iter();
612 let mut chars = fmt.chars().peekable();
613 while let Some(c) = chars.next() {
614 if c == '%' {
615 match chars.peek() {
616 Some('%') => {
617 out.push('%');
618 chars.next();
619 }
620 _ => {
621 if let Some(a) = iter.next() {
622 out.push_str(a);
623 } else {
624 out.push('%');
628 }
629 }
630 }
631 } else {
632 out.push(c);
633 }
634 }
635 out
636}
637
638fn value_to_display_string(v: &Value) -> String {
642 use alloc::string::ToString;
643 match v {
644 Value::Null => String::new(),
645 Value::Bool(b) => b.to_string(),
646 Value::SmallInt(n) => n.to_string(),
647 Value::Int(n) => n.to_string(),
648 Value::BigInt(n) => n.to_string(),
649 Value::Float(x) => x.to_string(),
650 Value::Text(s) | Value::Json(s) => s.clone(),
651 other => format!("{other:?}"),
652 }
653}
654
655#[allow(clippy::too_many_arguments)]
667fn eval_with_new_old_and_locals(
668 expr: &Expr,
669 new_row: Option<&Row>,
670 old_row: Option<&Row>,
671 locals: &BTreeMap<String, Value>,
672 columns: &[ColumnSchema],
673 table_alias: &str,
674 params: &[Value],
675 default_text_search_config: Option<&str>,
676) -> Result<Value, EvalError> {
677 let mut rewritten = expr.clone();
678 substitute_locals(&mut rewritten, locals);
679 substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
680 let ctx = EvalContext::new(columns, Some(table_alias))
681 .with_params(params)
682 .with_default_text_search_config(default_text_search_config);
683 let empty = Row::new(Vec::new());
684 eval::eval_expr(&rewritten, &empty, &ctx)
685}
686
687fn substitute_locals(expr: &mut Expr, locals: &BTreeMap<String, Value>) {
693 if let Expr::Column(c) = expr {
694 if c.qualifier.is_none()
695 && let Some(v) = locals.get(&c.name)
696 {
697 *expr = value_to_literal_expr(&[], 0, v.clone());
698 return;
699 }
700 }
701 match expr {
702 Expr::AggregateOrdered { call, order_by, .. } => {
703 substitute_locals(call, locals);
704 for o in order_by.iter_mut() {
705 substitute_locals(&mut o.expr, locals);
706 }
707 }
708 Expr::Binary { lhs, rhs, .. } => {
709 substitute_locals(lhs, locals);
710 substitute_locals(rhs, locals);
711 }
712 Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
713 substitute_locals(expr, locals);
714 }
715 Expr::Like { expr, pattern, .. } => {
716 substitute_locals(expr, locals);
717 substitute_locals(pattern, locals);
718 }
719 Expr::FunctionCall { args, .. } => {
720 for a in args {
721 substitute_locals(a, locals);
722 }
723 }
724 Expr::Extract { source, .. } => substitute_locals(source, locals),
725 Expr::Array(items) => {
726 for elem in items {
727 substitute_locals(elem, locals);
728 }
729 }
730 Expr::ArraySubscript { target, index } => {
731 substitute_locals(target, locals);
732 substitute_locals(index, locals);
733 }
734 Expr::AnyAll { expr, array, .. } => {
735 substitute_locals(expr, locals);
736 substitute_locals(array, locals);
737 }
738 Expr::InList { expr, list, .. } => {
739 substitute_locals(expr, locals);
740 for item in list {
741 substitute_locals(item, locals);
742 }
743 }
744 Expr::Case {
745 operand,
746 branches,
747 else_branch,
748 } => {
749 if let Some(o) = operand {
750 substitute_locals(o, locals);
751 }
752 for (w, t) in branches {
753 substitute_locals(w, locals);
754 substitute_locals(t, locals);
755 }
756 if let Some(e) = else_branch {
757 substitute_locals(e, locals);
758 }
759 }
760 Expr::Literal(_)
761 | Expr::Placeholder(_)
762 | Expr::Column(_)
763 | Expr::WindowFunction { .. }
764 | Expr::ScalarSubquery(_)
765 | Expr::Exists { .. }
766 | Expr::InSubquery { .. } => {}
767 }
768}
769
770fn eval_with_new_old(
771 expr: &Expr,
772 new_row: Option<&Row>,
773 old_row: Option<&Row>,
774 columns: &[ColumnSchema],
775 table_alias: &str,
776 params: &[Value],
777 default_text_search_config: Option<&str>,
778) -> Result<Value, EvalError> {
779 let mut rewritten = expr.clone();
780 substitute_new_old(&mut rewritten, new_row, old_row, columns)?;
781 let ctx = EvalContext::new(columns, Some(table_alias))
782 .with_params(params)
783 .with_default_text_search_config(default_text_search_config);
784 let empty = Row::new(Vec::new());
788 eval::eval_expr(&rewritten, &empty, &ctx)
789}
790
791fn substitute_new_old(
798 expr: &mut Expr,
799 new_row: Option<&Row>,
800 old_row: Option<&Row>,
801 columns: &[ColumnSchema],
802) -> Result<(), EvalError> {
803 if let Expr::Column(c) = expr {
804 if let Some(q) = &c.qualifier {
805 let lower = q.to_ascii_lowercase();
806 if lower == "new" || lower == "old" {
807 let (row, side) = if lower == "new" {
808 (new_row, "NEW")
809 } else {
810 (old_row, "OLD")
811 };
812 let pos = columns
813 .iter()
814 .position(|sc| sc.name.eq_ignore_ascii_case(&c.name))
815 .ok_or_else(|| EvalError::ColumnNotFound {
816 name: format!("{side}.{}", c.name),
817 })?;
818 let v = match row {
819 Some(r) => r.values.get(pos).cloned().unwrap_or(Value::Null),
820 None => Value::Null,
821 };
822 *expr = value_to_literal_expr(columns, pos, v);
823 return Ok(());
824 }
825 }
826 }
827 match expr {
828 Expr::AggregateOrdered { call, order_by, .. } => {
829 substitute_new_old(call, new_row, old_row, columns)?;
830 for o in order_by.iter_mut() {
831 substitute_new_old(&mut o.expr, new_row, old_row, columns)?;
832 }
833 }
834 Expr::Binary { lhs, rhs, .. } => {
835 substitute_new_old(lhs, new_row, old_row, columns)?;
836 substitute_new_old(rhs, new_row, old_row, columns)?;
837 }
838 Expr::Unary { expr, .. } | Expr::Cast { expr, .. } | Expr::IsNull { expr, .. } => {
839 substitute_new_old(expr, new_row, old_row, columns)?;
840 }
841 Expr::Like { expr, pattern, .. } => {
842 substitute_new_old(expr, new_row, old_row, columns)?;
843 substitute_new_old(pattern, new_row, old_row, columns)?;
844 }
845 Expr::FunctionCall { args, .. } => {
846 for a in args {
847 substitute_new_old(a, new_row, old_row, columns)?;
848 }
849 }
850 Expr::Extract { source, .. } => substitute_new_old(source, new_row, old_row, columns)?,
851 Expr::Array(items) => {
852 for elem in items {
853 substitute_new_old(elem, new_row, old_row, columns)?;
854 }
855 }
856 Expr::ArraySubscript { target, index } => {
857 substitute_new_old(target, new_row, old_row, columns)?;
858 substitute_new_old(index, new_row, old_row, columns)?;
859 }
860 Expr::AnyAll { expr, array, .. } => {
861 substitute_new_old(expr, new_row, old_row, columns)?;
862 substitute_new_old(array, new_row, old_row, columns)?;
863 }
864 Expr::InList { expr, list, .. } => {
865 substitute_new_old(expr, new_row, old_row, columns)?;
866 for item in list {
867 substitute_new_old(item, new_row, old_row, columns)?;
868 }
869 }
870 Expr::Case {
871 operand,
872 branches,
873 else_branch,
874 } => {
875 if let Some(o) = operand {
876 substitute_new_old(o, new_row, old_row, columns)?;
877 }
878 for (w, t) in branches {
879 substitute_new_old(w, new_row, old_row, columns)?;
880 substitute_new_old(t, new_row, old_row, columns)?;
881 }
882 if let Some(e) = else_branch {
883 substitute_new_old(e, new_row, old_row, columns)?;
884 }
885 }
886 Expr::Literal(_)
890 | Expr::Placeholder(_)
891 | Expr::Column(_)
892 | Expr::WindowFunction { .. }
893 | Expr::ScalarSubquery(_)
894 | Expr::Exists { .. }
895 | Expr::InSubquery { .. } => {}
896 }
897 Ok(())
898}
899
900fn value_to_literal_expr(_columns: &[ColumnSchema], _pos: usize, v: Value) -> Expr {
904 use spg_sql::ast::Literal;
905 let lit = match v {
906 Value::Null => Literal::Null,
907 Value::Bool(b) => Literal::Bool(b),
908 Value::SmallInt(n) => Literal::Integer(i64::from(n)),
909 Value::Int(n) => Literal::Integer(i64::from(n)),
910 Value::BigInt(n) => Literal::Integer(n),
911 Value::Float(x) => Literal::Float(x),
912 Value::Text(s) | Value::Json(s) => Literal::String(s),
913 other => Literal::String(format!("{other:?}")),
918 };
919 Expr::Literal(lit)
920}
921
922fn substitute_trigger_context_in_statement(
928 stmt: &mut spg_sql::ast::Statement,
929 new_row: Option<&Row>,
930 old_row: Option<&Row>,
931 locals: &BTreeMap<String, Value>,
932 columns: &[ColumnSchema],
933) -> Result<(), EvalError> {
934 use spg_sql::ast::Statement;
935 let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
936 substitute_locals(e, locals);
937 substitute_new_old(e, new_row, old_row, columns)?;
938 Ok(())
939 };
940 match stmt {
941 Statement::Insert(s) => {
942 for tuple in &mut s.rows {
943 for e in tuple {
944 walk(e)?;
945 }
946 }
947 }
948 Statement::Update(s) => {
949 for (_col, e) in &mut s.assignments {
950 walk(e)?;
951 }
952 if let Some(w) = &mut s.where_ {
953 walk(w)?;
954 }
955 }
956 Statement::Delete(s) => {
957 if let Some(w) = &mut s.where_ {
958 walk(w)?;
959 }
960 }
961 Statement::Select(s) => {
962 substitute_trigger_context_in_select(s, new_row, old_row, locals, columns)?
963 }
964 _ => {}
970 }
971 Ok(())
972}
973
974fn substitute_trigger_context_in_select(
975 s: &mut spg_sql::ast::SelectStatement,
976 new_row: Option<&Row>,
977 old_row: Option<&Row>,
978 locals: &BTreeMap<String, Value>,
979 columns: &[ColumnSchema],
980) -> Result<(), EvalError> {
981 use spg_sql::ast::SelectItem;
982 let mut walk = |e: &mut Expr| -> Result<(), EvalError> {
983 substitute_locals(e, locals);
984 substitute_new_old(e, new_row, old_row, columns)?;
985 Ok(())
986 };
987 for item in &mut s.items {
988 if let SelectItem::Expr { expr, .. } = item {
989 walk(expr)?;
990 }
991 }
992 if let Some(w) = &mut s.where_ {
993 walk(w)?;
994 }
995 if let Some(group_by) = &mut s.group_by {
996 for g in group_by {
997 walk(g)?;
998 }
999 }
1000 if let Some(h) = &mut s.having {
1001 walk(h)?;
1002 }
1003 for ob in &mut s.order_by {
1004 walk(&mut ob.expr)?;
1005 }
1006 let _ = &s.limit;
1010 let _ = &s.offset;
1011 Ok(())
1012}
1013
1014pub fn matching_trigger_names<'a>(
1019 triggers: &'a [TriggerDef],
1020 table: &str,
1021 event: &str,
1022 timing: &str,
1023) -> Vec<&'a TriggerDef> {
1024 triggers
1025 .iter()
1026 .filter(|t| {
1027 t.table == table
1028 && t.timing.eq_ignore_ascii_case(timing)
1029 && t.for_each.eq_ignore_ascii_case("row")
1030 && t.events.iter().any(|e| e.eq_ignore_ascii_case(event))
1031 })
1032 .collect()
1033}
1034
1035impl Engine {
1036 pub(crate) fn snapshot_row_triggers(
1044 &self,
1045 table: &str,
1046 event: &str,
1047 timing: &str,
1048 ) -> Vec<spg_storage::FunctionDef> {
1049 let cat = self.active_catalog();
1050 cat.triggers()
1051 .iter()
1052 .filter(|t| {
1053 t.enabled
1056 && t.table == table
1057 && t.timing.eq_ignore_ascii_case(timing)
1058 && t.for_each.eq_ignore_ascii_case("row")
1059 && t.events.iter().any(|e| e.eq_ignore_ascii_case(event))
1060 })
1061 .filter_map(|t| cat.functions().get(&t.function).cloned())
1062 .collect()
1063 }
1064
1065 pub(crate) fn snapshot_update_row_triggers(
1070 &self,
1071 table: &str,
1072 timing: &str,
1073 ) -> Vec<(spg_storage::FunctionDef, Vec<String>)> {
1074 let cat = self.active_catalog();
1075 cat.triggers()
1076 .iter()
1077 .filter(|t| {
1078 t.enabled
1080 && t.table == table
1081 && t.timing.eq_ignore_ascii_case(timing)
1082 && t.for_each.eq_ignore_ascii_case("row")
1083 && t.events.iter().any(|e| e.eq_ignore_ascii_case("UPDATE"))
1084 })
1085 .filter_map(|t| {
1086 cat.functions()
1087 .get(&t.function)
1088 .cloned()
1089 .map(|fd| (fd, t.update_columns.clone()))
1090 })
1091 .collect()
1092 }
1093
1094 pub(crate) fn execute_deferred_trigger_stmts(
1103 &mut self,
1104 deferred: Vec<DeferredEmbeddedStmt>,
1105 cancel: CancelToken<'_>,
1106 ) -> Result<(), EngineError> {
1107 for d in deferred {
1108 if self.trigger_recursion_depth >= MAX_TRIGGER_RECURSION {
1109 return Err(EngineError::Storage(StorageError::Corrupt(alloc::format!(
1110 "trigger embedded SQL recursion depth {} exceeded (trigger function \
1111 {:?} would push past the {} cap — check for trigger cycles)",
1112 self.trigger_recursion_depth,
1113 d.function,
1114 MAX_TRIGGER_RECURSION,
1115 ))));
1116 }
1117 self.trigger_recursion_depth += 1;
1118 let res = self.execute_stmt_with_cancel(d.stmt, cancel);
1119 self.trigger_recursion_depth -= 1;
1120 res?;
1121 }
1122 Ok(())
1123 }
1124}