spg_engine/readonly.rs
1//! Read-only / snapshot execution, split out of `lib.rs` (lib.rs split
2//! 18). Two entry families share one module: the live read path
3//! (`execute_readonly` / `_with_cancel`, taken by the server under an
4//! `RwLock::read()` so SELECTs run in parallel) and the snapshot path
5//! (`execute_readonly_on_snapshot` / the prepared + describe variants /
6//! `is_readonly_sql` / `prepare_on_snapshot`), which run against a
7//! `CatalogSnapshot` without borrowing the engine. Both reject DDL/DML
8//! with `WriteRequired` and route SELECT / SHOW / EXPLAIN to the same
9//! domain handlers as the write path. Whole `impl Engine` methods; the
10//! public surface is unchanged, and `enforce_row_limit` stays in the
11//! crate root (shared with `execute.rs`, reached via self).
12
13use alloc::vec::Vec;
14
15use spg_sql::ast::Statement;
16use spg_sql::parser::{self, ParseError};
17use spg_storage::{ColumnSchema, Value};
18
19use crate::describe;
20use crate::{
21 CancelToken, CatalogSnapshot, Engine, EngineError, QueryResult, expand_group_by_all, reorder,
22 resolve_order_by_position, rewrite_clock_calls, substitute_placeholders,
23};
24
25impl Engine {
26 /// v7.11.1 — execute a read-only SQL statement against a
27 /// `CatalogSnapshot` without touching this engine. Same
28 /// semantics as `execute_readonly` but parameterised on the
29 /// snapshot's catalog. Reject DDL/DML the same way
30 /// `execute_readonly` does. Static-on-Self so the caller can
31 /// dispatch without holding an `Engine` borrow alongside the
32 /// snapshot.
33 pub fn execute_readonly_on_snapshot(
34 snapshot: &CatalogSnapshot,
35 sql: &str,
36 ) -> Result<QueryResult, EngineError> {
37 Self::execute_readonly_on_snapshot_with_cancel(snapshot, sql, CancelToken::none())
38 }
39
40 /// v7.11.1 — `execute_readonly_on_snapshot` with cooperative
41 /// cancellation. Builds a transient `Engine` over the snapshot
42 /// state, runs `execute_readonly_with_cancel`, drops. The
43 /// transient engine is cheap to construct (no I/O; everything
44 /// is just struct moves) and lets the existing read path stay
45 /// untouched.
46 pub fn execute_readonly_on_snapshot_with_cancel(
47 snapshot: &CatalogSnapshot,
48 sql: &str,
49 cancel: CancelToken<'_>,
50 ) -> Result<QueryResult, EngineError> {
51 let transient = Engine {
52 catalog: snapshot.catalog.clone(),
53 statistics: snapshot.statistics.clone(),
54 clock: snapshot.clock,
55 max_query_rows: snapshot.max_query_rows,
56 ..Engine::default()
57 };
58 transient.execute_readonly_with_cancel(sql, cancel)
59 }
60
61 /// v7.18 — execute a previously-prepared `Statement` against a
62 /// `CatalogSnapshot` in read-only mode. Mirror of
63 /// [`Engine::execute_prepared`] for the fan-out read path:
64 /// substitutes `Expr::Placeholder(n)` nodes from `params`, then
65 /// dispatches through [`Engine::execute_readonly_stmt_with_cancel`]
66 /// (writes / DDL hit `EngineError::WriteRequired`). Static-on-Self
67 /// so multiple readonly threads can dispatch against the same
68 /// snapshot concurrently without an `Engine` borrow.
69 ///
70 /// **Schema drift contract**. The `Statement` was prepared against
71 /// some prior catalog. If the snapshot's catalog has since
72 /// diverged (DDL renamed / dropped a referenced column / table),
73 /// execution surfaces the normal `EngineError` — same shape as
74 /// PG's "cached plan must not change result type". Caller decides
75 /// whether to re-prepare; engine does NOT auto-retry.
76 pub fn execute_readonly_prepared_on_snapshot(
77 snapshot: &CatalogSnapshot,
78 stmt: Statement,
79 params: &[Value],
80 ) -> Result<QueryResult, EngineError> {
81 Self::execute_readonly_prepared_on_snapshot_with_cancel(
82 snapshot,
83 stmt,
84 params,
85 CancelToken::none(),
86 )
87 }
88
89 /// v7.18 — cancellable variant of
90 /// [`Engine::execute_readonly_prepared_on_snapshot`].
91 pub fn execute_readonly_prepared_on_snapshot_with_cancel(
92 snapshot: &CatalogSnapshot,
93 mut stmt: Statement,
94 params: &[Value],
95 cancel: CancelToken<'_>,
96 ) -> Result<QueryResult, EngineError> {
97 cancel.check()?;
98 substitute_placeholders(&mut stmt, params)?;
99 let transient = Engine {
100 catalog: snapshot.catalog.clone(),
101 statistics: snapshot.statistics.clone(),
102 clock: snapshot.clock,
103 max_query_rows: snapshot.max_query_rows,
104 ..Engine::default()
105 };
106 transient.execute_readonly_stmt_with_cancel(stmt, cancel)
107 }
108
109 /// v7.18 — describe a prepared `Statement` against a
110 /// `CatalogSnapshot`. Same `(parameter_oids, output_columns)`
111 /// shape as [`Engine::describe_prepared`]; resolves names
112 /// against the snapshot's catalog instead of `self`. Pure
113 /// function — no engine state read.
114 pub fn describe_prepared_on_snapshot(
115 snapshot: &CatalogSnapshot,
116 stmt: &Statement,
117 ) -> (Vec<u32>, Vec<ColumnSchema>) {
118 describe::describe_prepared(stmt, &snapshot.catalog)
119 }
120
121 /// v7.18 — does this SQL string classify as read-only? Parses
122 /// `sql` with the engine parser and consults
123 /// `Statement::is_readonly()`. A parse error returns `false`
124 /// (route to the writer path so the user sees the canonical
125 /// parse error from the writer's simple-query dispatch).
126 /// Static-on-Self so the spg-sqlx connection layer can ask
127 /// without an `Engine` borrow.
128 #[must_use]
129 pub fn is_readonly_sql(sql: &str) -> bool {
130 parser::parse_statement(sql)
131 .as_ref()
132 .map(spg_sql::ast::Statement::is_readonly)
133 .unwrap_or(false)
134 }
135
136 /// v7.18 — parse + plan a SQL string against a
137 /// `CatalogSnapshot`. Mirror of [`Engine::prepare`] for the
138 /// readonly fan-out path: applies the same prepare-time
139 /// transforms (clock rewrite, `GROUP BY ALL` expansion, ORDER
140 /// BY position resolve, cost-based JOIN reorder) but resolves
141 /// catalog + statistics against the snapshot, not a live
142 /// engine. Static-on-Self — `AsyncReadHandle::prepare` calls
143 /// this without taking the writer lock so multiple read
144 /// handles can prepare concurrently against frozen views.
145 ///
146 /// # Errors
147 /// Propagates [`ParseError`] from the parser. Schema
148 /// validation deferred to execute time, same as
149 /// [`Engine::prepare`].
150 pub fn prepare_on_snapshot(
151 snapshot: &CatalogSnapshot,
152 sql: &str,
153 ) -> Result<Statement, ParseError> {
154 let mut stmt = parser::parse_statement(sql)?;
155 let now_micros = snapshot.clock.map(|f| f());
156 rewrite_clock_calls(&mut stmt, now_micros);
157 if let Statement::Select(s) = &mut stmt {
158 expand_group_by_all(s);
159 resolve_order_by_position(s);
160 reorder::reorder_joins(s, &snapshot.catalog, &snapshot.statistics);
161 }
162 Ok(stmt)
163 }
164
165 /// **v4.0 concurrency**: this is the entry point the server takes
166 /// under an `RwLock::read()` so multiple `SELECT` clients run in
167 /// parallel without serialising on a single mutex.
168 pub fn execute_readonly(&self, sql: &str) -> Result<QueryResult, EngineError> {
169 self.execute_readonly_with_cancel(sql, CancelToken::none())
170 }
171
172 /// v4.5 — read path with cooperative cancellation. Token's
173 /// `is_cancelled` is checked at the start (so a watchdog that
174 /// already fired returns Cancelled immediately) and at row-loop
175 /// checkpoints inside `exec_select`. SHOW paths are O(small) and
176 /// don't bother checking.
177 pub fn execute_readonly_with_cancel(
178 &self,
179 sql: &str,
180 cancel: CancelToken<'_>,
181 ) -> Result<QueryResult, EngineError> {
182 cancel.check()?;
183 let mut stmt = parser::parse_statement_with(sql, self.backslash_escapes)?;
184 let now_micros = self.clock.map(|f| f());
185 rewrite_clock_calls(&mut stmt, now_micros);
186 if let Statement::Select(s) = &mut stmt {
187 resolve_order_by_position(s);
188 // v6.2.3 — cost-based JOIN reorder (read path).
189 reorder::reorder_joins(s, &self.catalog, &self.statistics);
190 }
191 self.execute_readonly_stmt_with_cancel(stmt, cancel)
192 }
193
194 /// v7.18 — readonly dispatch on a pre-parsed `Statement`.
195 /// Internal helper shared by the SQL-string path
196 /// ([`Engine::execute_readonly_with_cancel`]) and the prepared-
197 /// statement path ([`Engine::execute_readonly_prepared_on_snapshot_with_cancel`]).
198 /// Statement-level transforms (clock rewrite, ORDER BY position,
199 /// JOIN reorder, placeholder substitution) are the caller's
200 /// responsibility — this helper assumes the AST is already
201 /// execution-ready. Writes / DDL hit
202 /// [`EngineError::WriteRequired`] the same way the SQL path does.
203 fn execute_readonly_stmt_with_cancel(
204 &self,
205 stmt: Statement,
206 cancel: CancelToken<'_>,
207 ) -> Result<QueryResult, EngineError> {
208 let result = match stmt {
209 Statement::Select(s) => self.exec_select_cancel(&s, cancel),
210 Statement::ShowTables => Ok(self.exec_show_tables()),
211 Statement::ShowDatabases => Ok(self.exec_show_databases()),
212 Statement::ShowCreateTable(name) => self.exec_show_create_table(&name),
213 Statement::ShowIndexes(name) => self.exec_show_indexes(&name),
214 Statement::ShowStatus => Ok(self.exec_show_status()),
215 Statement::ShowVariables => Ok(self.exec_show_variables()),
216 Statement::ShowProcesslist => Ok(self.exec_show_processlist()),
217 Statement::ShowColumns(table) => self.exec_show_columns(&table),
218 Statement::ShowUsers => Ok(self.exec_show_users()),
219 Statement::ShowPublications => Ok(self.exec_show_publications()),
220 Statement::ShowSubscriptions => Ok(self.exec_show_subscriptions()),
221 Statement::WaitForWalPosition { .. } => Err(EngineError::Unsupported(
222 "WAIT FOR WAL POSITION must be handled by the server layer".into(),
223 )),
224 Statement::Explain(e) => self.exec_explain(&e, cancel),
225 _ => Err(EngineError::WriteRequired),
226 };
227 self.enforce_row_limit(result)
228 }
229}