Skip to main content

spg_engine/
readonly.rs

1//! Read-only / snapshot execution, split out of `lib.rs` (lib.rs split
2//! 18). Two entry families share one module: the live read path
3//! (`execute_readonly` / `_with_cancel`, taken by the server under an
4//! `RwLock::read()` so SELECTs run in parallel) and the snapshot path
5//! (`execute_readonly_on_snapshot` / the prepared + describe variants /
6//! `is_readonly_sql` / `prepare_on_snapshot`), which run against a
7//! `CatalogSnapshot` without borrowing the engine. Both reject DDL/DML
8//! with `WriteRequired` and route SELECT / SHOW / EXPLAIN to the same
9//! domain handlers as the write path. Whole `impl Engine` methods; the
10//! public surface is unchanged, and `enforce_row_limit` stays in the
11//! crate root (shared with `execute.rs`, reached via self).
12
13use alloc::vec::Vec;
14
15use spg_sql::ast::Statement;
16use spg_sql::parser::{self, ParseError};
17use spg_storage::{ColumnSchema, Value};
18
19use crate::describe;
20use crate::{
21    CancelToken, CatalogSnapshot, Engine, EngineError, QueryResult, expand_group_by_all, reorder,
22    resolve_order_by_position, rewrite_clock_calls, substitute_placeholders,
23};
24
25impl Engine {
26    /// v7.11.1 — execute a read-only SQL statement against a
27    /// `CatalogSnapshot` without touching this engine. Same
28    /// semantics as `execute_readonly` but parameterised on the
29    /// snapshot's catalog. Reject DDL/DML the same way
30    /// `execute_readonly` does. Static-on-Self so the caller can
31    /// dispatch without holding an `Engine` borrow alongside the
32    /// snapshot.
33    pub fn execute_readonly_on_snapshot(
34        snapshot: &CatalogSnapshot,
35        sql: &str,
36    ) -> Result<QueryResult, EngineError> {
37        Self::execute_readonly_on_snapshot_with_cancel(snapshot, sql, CancelToken::none())
38    }
39
40    /// v7.11.1 — `execute_readonly_on_snapshot` with cooperative
41    /// cancellation. Builds a transient `Engine` over the snapshot
42    /// state, runs `execute_readonly_with_cancel`, drops. The
43    /// transient engine is cheap to construct (no I/O; everything
44    /// is just struct moves) and lets the existing read path stay
45    /// untouched.
46    pub fn execute_readonly_on_snapshot_with_cancel(
47        snapshot: &CatalogSnapshot,
48        sql: &str,
49        cancel: CancelToken<'_>,
50    ) -> Result<QueryResult, EngineError> {
51        let transient = Engine {
52            catalog: snapshot.catalog.clone(),
53            statistics: snapshot.statistics.clone(),
54            clock: snapshot.clock,
55            max_query_rows: snapshot.max_query_rows,
56            ..Engine::default()
57        };
58        transient.execute_readonly_with_cancel(sql, cancel)
59    }
60
61    /// v7.18 — execute a previously-prepared `Statement` against a
62    /// `CatalogSnapshot` in read-only mode. Mirror of
63    /// [`Engine::execute_prepared`] for the fan-out read path:
64    /// substitutes `Expr::Placeholder(n)` nodes from `params`, then
65    /// dispatches through [`Engine::execute_readonly_stmt_with_cancel`]
66    /// (writes / DDL hit `EngineError::WriteRequired`). Static-on-Self
67    /// so multiple readonly threads can dispatch against the same
68    /// snapshot concurrently without an `Engine` borrow.
69    ///
70    /// **Schema drift contract**. The `Statement` was prepared against
71    /// some prior catalog. If the snapshot's catalog has since
72    /// diverged (DDL renamed / dropped a referenced column / table),
73    /// execution surfaces the normal `EngineError` — same shape as
74    /// PG's "cached plan must not change result type". Caller decides
75    /// whether to re-prepare; engine does NOT auto-retry.
76    pub fn execute_readonly_prepared_on_snapshot(
77        snapshot: &CatalogSnapshot,
78        stmt: Statement,
79        params: &[Value],
80    ) -> Result<QueryResult, EngineError> {
81        Self::execute_readonly_prepared_on_snapshot_with_cancel(
82            snapshot,
83            stmt,
84            params,
85            CancelToken::none(),
86        )
87    }
88
89    /// v7.18 — cancellable variant of
90    /// [`Engine::execute_readonly_prepared_on_snapshot`].
91    pub fn execute_readonly_prepared_on_snapshot_with_cancel(
92        snapshot: &CatalogSnapshot,
93        mut stmt: Statement,
94        params: &[Value],
95        cancel: CancelToken<'_>,
96    ) -> Result<QueryResult, EngineError> {
97        cancel.check()?;
98        substitute_placeholders(&mut stmt, params)?;
99        let transient = Engine {
100            catalog: snapshot.catalog.clone(),
101            statistics: snapshot.statistics.clone(),
102            clock: snapshot.clock,
103            max_query_rows: snapshot.max_query_rows,
104            ..Engine::default()
105        };
106        transient.execute_readonly_stmt_with_cancel(stmt, cancel)
107    }
108
109    /// v7.18 — describe a prepared `Statement` against a
110    /// `CatalogSnapshot`. Same `(parameter_oids, output_columns)`
111    /// shape as [`Engine::describe_prepared`]; resolves names
112    /// against the snapshot's catalog instead of `self`. Pure
113    /// function — no engine state read.
114    pub fn describe_prepared_on_snapshot(
115        snapshot: &CatalogSnapshot,
116        stmt: &Statement,
117    ) -> (Vec<u32>, Vec<ColumnSchema>) {
118        describe::describe_prepared(stmt, &snapshot.catalog)
119    }
120
121    /// v7.18 — does this SQL string classify as read-only? Parses
122    /// `sql` with the engine parser and consults
123    /// `Statement::is_readonly()`. A parse error returns `false`
124    /// (route to the writer path so the user sees the canonical
125    /// parse error from the writer's simple-query dispatch).
126    /// Static-on-Self so the spg-sqlx connection layer can ask
127    /// without an `Engine` borrow.
128    #[must_use]
129    pub fn is_readonly_sql(sql: &str) -> bool {
130        parser::parse_statement(sql)
131            .as_ref()
132            .map(spg_sql::ast::Statement::is_readonly)
133            .unwrap_or(false)
134    }
135
136    /// v7.18 — parse + plan a SQL string against a
137    /// `CatalogSnapshot`. Mirror of [`Engine::prepare`] for the
138    /// readonly fan-out path: applies the same prepare-time
139    /// transforms (clock rewrite, `GROUP BY ALL` expansion, ORDER
140    /// BY position resolve, cost-based JOIN reorder) but resolves
141    /// catalog + statistics against the snapshot, not a live
142    /// engine. Static-on-Self — `AsyncReadHandle::prepare` calls
143    /// this without taking the writer lock so multiple read
144    /// handles can prepare concurrently against frozen views.
145    ///
146    /// # Errors
147    /// Propagates [`ParseError`] from the parser. Schema
148    /// validation deferred to execute time, same as
149    /// [`Engine::prepare`].
150    pub fn prepare_on_snapshot(
151        snapshot: &CatalogSnapshot,
152        sql: &str,
153    ) -> Result<Statement, ParseError> {
154        let mut stmt = parser::parse_statement(sql)?;
155        let now_micros = snapshot.clock.map(|f| f());
156        rewrite_clock_calls(&mut stmt, now_micros);
157        if let Statement::Select(s) = &mut stmt {
158            expand_group_by_all(s);
159            resolve_order_by_position(s);
160            reorder::reorder_joins(s, &snapshot.catalog, &snapshot.statistics);
161        }
162        Ok(stmt)
163    }
164
165    /// **v4.0 concurrency**: this is the entry point the server takes
166    /// under an `RwLock::read()` so multiple `SELECT` clients run in
167    /// parallel without serialising on a single mutex.
168    pub fn execute_readonly(&self, sql: &str) -> Result<QueryResult, EngineError> {
169        self.execute_readonly_with_cancel(sql, CancelToken::none())
170    }
171
172    /// v4.5 — read path with cooperative cancellation. Token's
173    /// `is_cancelled` is checked at the start (so a watchdog that
174    /// already fired returns Cancelled immediately) and at row-loop
175    /// checkpoints inside `exec_select`. SHOW paths are O(small) and
176    /// don't bother checking.
177    pub fn execute_readonly_with_cancel(
178        &self,
179        sql: &str,
180        cancel: CancelToken<'_>,
181    ) -> Result<QueryResult, EngineError> {
182        cancel.check()?;
183        let mut stmt = parser::parse_statement_with(sql, self.backslash_escapes)?;
184        let now_micros = self.clock.map(|f| f());
185        rewrite_clock_calls(&mut stmt, now_micros);
186        if let Statement::Select(s) = &mut stmt {
187            resolve_order_by_position(s);
188            // v6.2.3 — cost-based JOIN reorder (read path).
189            reorder::reorder_joins(s, &self.catalog, &self.statistics);
190        }
191        self.execute_readonly_stmt_with_cancel(stmt, cancel)
192    }
193
194    /// v7.18 — readonly dispatch on a pre-parsed `Statement`.
195    /// Internal helper shared by the SQL-string path
196    /// ([`Engine::execute_readonly_with_cancel`]) and the prepared-
197    /// statement path ([`Engine::execute_readonly_prepared_on_snapshot_with_cancel`]).
198    /// Statement-level transforms (clock rewrite, ORDER BY position,
199    /// JOIN reorder, placeholder substitution) are the caller's
200    /// responsibility — this helper assumes the AST is already
201    /// execution-ready. Writes / DDL hit
202    /// [`EngineError::WriteRequired`] the same way the SQL path does.
203    fn execute_readonly_stmt_with_cancel(
204        &self,
205        stmt: Statement,
206        cancel: CancelToken<'_>,
207    ) -> Result<QueryResult, EngineError> {
208        let result = match stmt {
209            Statement::Select(s) => self.exec_select_cancel(&s, cancel),
210            Statement::ShowTables => Ok(self.exec_show_tables()),
211            Statement::ShowDatabases => Ok(self.exec_show_databases()),
212            Statement::ShowCreateTable(name) => self.exec_show_create_table(&name),
213            Statement::ShowIndexes(name) => self.exec_show_indexes(&name),
214            Statement::ShowStatus => Ok(self.exec_show_status()),
215            Statement::ShowVariables => Ok(self.exec_show_variables()),
216            Statement::ShowProcesslist => Ok(self.exec_show_processlist()),
217            Statement::ShowColumns(table) => self.exec_show_columns(&table),
218            Statement::ShowUsers => Ok(self.exec_show_users()),
219            Statement::ShowPublications => Ok(self.exec_show_publications()),
220            Statement::ShowSubscriptions => Ok(self.exec_show_subscriptions()),
221            Statement::WaitForWalPosition { .. } => Err(EngineError::Unsupported(
222                "WAIT FOR WAL POSITION must be handled by the server layer".into(),
223            )),
224            Statement::Explain(e) => self.exec_explain(&e, cancel),
225            _ => Err(EngineError::WriteRequired),
226        };
227        self.enforce_row_limit(result)
228    }
229}