1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
//! PL/pgSQL `DO` block execution. The top-level DO executor walks
//! a parsed PlPgSqlBlock, pre-resolves the subqueries embedded in its
//! expression slots, then drives the block through the shared
//! `triggers` interpreter. Split out of `lib.rs` (cut 22).
use alloc::string::String;
use spg_storage::{StorageError, Value};
use crate::{CancelToken, Engine, EngineError, QueryResult, eval, triggers};
impl Engine {
/// v7.16.2 — top-level DO block executor. Walks the
/// PlPgSqlBlock via [`triggers::execute_do_block_top_level`],
/// then runs each collected EmbeddedSql statement through
/// the engine's regular execute path (NOT deferred — DO is
/// outside any row-write borrow). Errors from any step
/// abort the block and propagate verbatim.
pub(crate) fn exec_do_block(
&mut self,
body: spg_sql::ast::PlPgSqlBlock,
) -> Result<QueryResult, EngineError> {
// v7.16.2 — pre-resolve every subquery the body's
// expressions reach. `eval::eval_expr` errors on
// unresolved Exists/ScalarSubquery/InSubquery; the
// top-level SELECT path runs `resolve_select_subqueries`
// for the caller — for plpgsql we have to do the
// equivalent before the body walker runs. Catches the
// mailrs idiom `IF EXISTS (SELECT 1 FROM
// information_schema.columns WHERE …) THEN …`.
let mut body = body;
self.resolve_plpgsql_block_subqueries(&mut body, CancelToken::none())?;
let dts = self
.session_param("default_text_search_config")
.map(String::from);
// v7.16.2 — SELECT … INTO resolver. The walker calls
// this synchronously when it hits a SelectInto stmt
// so the IF / locals scope sees the result before the
// next statement. Body walks for trigger paths (no
// resolver) error loudly on SelectInto.
// SAFETY: the closure shares this engine borrow with
// the walker, but the walker only borrows for the
// duration of `execute_do_block_top_level` and doesn't
// reach back into the engine through any other path —
// so the recursive `&mut` is sound. We use a `RefCell`
// for interior mutability since the closure is
// Fn-shaped.
let engine_cell = core::cell::RefCell::new(&mut *self);
let resolver_fn =
|stmt: &spg_sql::ast::Statement| -> Result<Value, triggers::TriggerError> {
let mut eng = engine_cell.borrow_mut();
let r = eng
.execute_stmt_with_cancel(stmt.clone(), CancelToken::none())
.map_err(|e| triggers::TriggerError::EvalFailed {
function: "DO".into(),
cause: eval::EvalError::TypeMismatch {
detail: alloc::format!("SELECT … INTO failed: {e}"),
},
})?;
match r {
QueryResult::Rows { rows, .. } => match rows.into_iter().next() {
Some(row) => Ok(row.values.into_iter().next().unwrap_or(Value::Null)),
None => Ok(Value::Null),
},
_ => Err(triggers::TriggerError::EvalFailed {
function: "DO".into(),
cause: eval::EvalError::TypeMismatch {
detail: "SELECT … INTO body must be a SELECT".into(),
},
}),
}
};
let collected =
triggers::execute_do_block_top_level(&body, dts.as_deref(), Some(&resolver_fn))
.map_err(|e| {
EngineError::Storage(StorageError::Corrupt(alloc::format!("DO: {e}")))
})?;
// engine_cell goes out of scope here, releasing the &mut self borrow
// Run each embedded statement against the engine. The
// statements were already substitute-walked for NEW/OLD/
// locals (those evaluate to engine literals before they
// land here) so dispatch is plain execute_stmt_with_cancel.
for stmt in collected {
// v7.16.2 — preserve current_tx wrap so an outer
// BEGIN/COMMIT around a DO block keeps the
// EmbeddedSql writes inside that same tx slot.
self.execute_stmt_with_cancel(stmt, CancelToken::none())?;
}
Ok(QueryResult::CommandOk {
affected: 0,
modified_catalog: !self.in_transaction(),
})
}
/// v7.16.2 — resolve every subquery inside a PlPgSqlBlock's
/// expression slots so the downstream trigger-flavoured
/// evaluator (which expects pre-resolved Expr::Literal /
/// Binary chains) doesn't trip on raw Exists/ScalarSubquery
/// nodes. Walks IF conditions, Assign values, RAISE args.
/// EmbeddedSql statements re-enter the engine for execution
/// later so their subqueries get the normal SELECT-side
/// resolution.
fn resolve_plpgsql_block_subqueries(
&self,
block: &mut spg_sql::ast::PlPgSqlBlock,
cancel: CancelToken<'_>,
) -> Result<(), EngineError> {
for d in &mut block.declarations {
if let Some(e) = &mut d.default {
self.resolve_expr_subqueries(e, cancel)?;
}
}
self.resolve_plpgsql_stmts_subqueries(&mut block.statements, cancel)
}
fn resolve_plpgsql_stmts_subqueries(
&self,
stmts: &mut [spg_sql::ast::PlPgSqlStmt],
cancel: CancelToken<'_>,
) -> Result<(), EngineError> {
use spg_sql::ast::PlPgSqlStmt;
for stmt in stmts {
match stmt {
PlPgSqlStmt::Assign { value, .. } => {
self.resolve_expr_subqueries(value, cancel)?;
}
PlPgSqlStmt::Return(spg_sql::ast::ReturnTarget::Expr(e)) => {
self.resolve_expr_subqueries(e, cancel)?;
}
PlPgSqlStmt::Return(_) => {}
PlPgSqlStmt::If {
branches,
else_branch,
} => {
for (cond, body) in branches.iter_mut() {
self.resolve_expr_subqueries(cond, cancel)?;
self.resolve_plpgsql_stmts_subqueries(body, cancel)?;
}
self.resolve_plpgsql_stmts_subqueries(else_branch, cancel)?;
}
PlPgSqlStmt::Raise { args, .. } => {
for a in args {
self.resolve_expr_subqueries(a, cancel)?;
}
}
PlPgSqlStmt::EmbeddedSql(_) => {
// Embedded SQL goes back through execute_stmt
// _with_cancel which runs the SELECT-side
// resolver itself; nothing to do here.
}
PlPgSqlStmt::SelectInto { body, .. } => {
// SELECT INTO runs through Engine::execute
// when reached, so subquery resolution
// happens via the normal SELECT-side path.
// Still walk for nested subqueries inside
// the SELECT body so eval doesn't trip.
self.resolve_select_subqueries(body, cancel)?;
}
}
}
Ok(())
}
}