spg_embedded_tokio/lib.rs
1//! Tokio-friendly async wrapper around `spg-embedded`.
2//!
3//! # Why this crate exists
4//!
5//! `spg-embedded`'s `Database::execute(&mut self, sql)` is sync
6//! and may block on WAL fsync or cold-tier I/O. Called from
7//! inside a `tokio::main` runtime that triggers the
8//! `block_in_place` warning and ties up a worker thread until the
9//! call returns. mailrs's cement (entirely tokio-based) is the
10//! load-bearing consumer that surfaced this.
11//!
12//! v7.18 — [`AsyncDatabase`] holds a `tokio::sync::RwLock<Database>`
13//! (upgraded from Mutex). Writer calls take the write lock —
14//! the engine is still single-writer, that invariant hasn't
15//! changed. Snapshot-taking (`read_handle` init / refresh) only
16//! needs read access to `clone_snapshot`, so it takes the read
17//! lock and concurrent snapshot refreshes do not serialise.
18//! `spawn_blocking` insulates the runtime's worker pool from
19//! disk stalls the same way it did under Mutex.
20//!
21//! # Why a separate crate
22//!
23//! `spg-embedded` keeps the workspace's "0 external dependencies"
24//! policy. `tokio` is the largest external dep we'd ever pull,
25//! and gating it behind a Cargo feature flag still surfaces
26//! `tokio` in downstream consumers' `Cargo.lock`. A separate
27//! adapter crate is the clean answer: anyone who wants the
28//! tokio shape opts in by adding `spg-embedded-tokio`; everyone
29//! else stays untouched.
30
31#![deny(missing_debug_implementations)]
32
33use std::path::Path;
34use std::sync::Arc;
35
36pub use spg_embedded::{
37 ColumnSchema, DataType, Database, EngineError, ParsedStatement, QueryResult, Statement, Value,
38};
39pub use spg_engine::CatalogSnapshot;
40
41use tokio::sync::RwLock;
42use tokio::task::JoinError;
43
44/// v7.34.1 (mailrs prod report bug B): drop the previous
45/// `.expect("spawn_blocking join")` shape that panicked on
46/// `JoinError::Cancelled` during runtime shutdown — a SIGKILL with any
47/// readonly call in flight reliably reproduced it. Cancelled is the
48/// expected state when the tokio runtime is being dropped; map it to
49/// `EngineError::Cancelled` so the caller's `?` propagates a clean
50/// "shutting down" error instead of a panic. A real panic inside the
51/// blocking closure still surfaces — `resume_unwind` re-throws the
52/// original payload so backtraces and any `catch_unwind` machinery
53/// keeps its semantics.
54trait FlattenBlockingExt<T> {
55 /// Result-returning closures: flatten `JoinHandle`'s outer error
56 /// into `EngineError`; an in-flight cancellation becomes
57 /// `Err(EngineError::Cancelled)`, panics propagate verbatim.
58 fn flatten_blocking(self) -> Result<T, EngineError>;
59}
60
61impl<T> FlattenBlockingExt<T> for Result<Result<T, EngineError>, JoinError> {
62 fn flatten_blocking(self) -> Result<T, EngineError> {
63 match self {
64 Ok(inner) => inner,
65 Err(je) if je.is_cancelled() => Err(EngineError::Cancelled),
66 Err(je) => std::panic::resume_unwind(je.into_panic()),
67 }
68 }
69}
70
71/// Same idea for `spawn_blocking` closures whose return type is a bare
72/// `T` (not a `Result`). Used by `read_handle` / `refresh` where the
73/// historical signature is `-> T`. Cancellation here surfaces as a
74/// panic with an honest message rather than the misleading
75/// "spawn_blocking join" string the old expect produced — a
76/// Result-returning rework of those two methods is the API-break that
77/// follow-up work would carry.
78trait UnwrapBlockingExt<T> {
79 fn unwrap_blocking(self) -> T;
80}
81
82impl<T> UnwrapBlockingExt<T> for Result<T, JoinError> {
83 fn unwrap_blocking(self) -> T {
84 match self {
85 Ok(v) => v,
86 Err(je) if je.is_cancelled() => {
87 panic!("spg-embedded-tokio: snapshot helper cancelled during runtime shutdown")
88 }
89 Err(je) => std::panic::resume_unwind(je.into_panic()),
90 }
91 }
92}
93
94/// Tokio-friendly handle to an embedded SPG database. Clone-cheap
95/// (`Arc` inside); every clone shares the same underlying engine.
96///
97/// v7.18 — backed by a `tokio::sync::RwLock` so writer calls
98/// serialise (engine single-writer invariant) but snapshot-only
99/// operations (`read_handle` init / refresh, which just clone
100/// the catalog trie roots) take the read lock and run
101/// concurrently with each other.
102#[derive(Debug, Clone)]
103pub struct AsyncDatabase {
104 inner: Arc<RwLock<Database>>,
105}
106
107/// v7.16.0 — Tokio-flavoured prepared-statement handle. Wraps
108/// the sync `spg_embedded::Statement` in an `Arc` so the AST is
109/// shared (not cloned) across `execute_prepared` /
110/// `query_prepared` calls, and so the handle is `Clone + Send`
111/// without copying the AST per bind. The engine's per-bind
112/// internal clone still happens — that's where placeholder
113/// substitution lands — but the spg-embedded-tokio surface
114/// avoids the second clone the naive shape would force.
115///
116/// Holding an `AsyncStatement` does NOT pin the database; drop
117/// the last `AsyncDatabase` clone and the handle stops being
118/// useful (the next `execute_prepared` call would still find a
119/// locked `Database` if any other clone is alive, but bind
120/// against a dropped database surfaces as the underlying
121/// `EngineError`).
122#[derive(Debug, Clone)]
123pub struct AsyncStatement {
124 inner: Arc<crate::Statement>,
125}
126
127/// v7.16.0 — adapter escape hatch: hand back the inner
128/// `Arc<Statement>`. Used by the `spg-sqlx` crate to plug the
129/// engine-side prepared handle into sqlx's `Statement<'q>` trait
130/// without going through another clone. Not intended for
131/// application code.
132#[doc(hidden)]
133#[must_use]
134pub fn async_statement_inner(stmt: &AsyncStatement) -> Arc<crate::Statement> {
135 Arc::clone(&stmt.inner)
136}
137
138impl AsyncDatabase {
139 /// In-memory database. No WAL, no catalog snapshot on disk.
140 /// `Clone` shares the engine; drop the last clone to release.
141 #[must_use]
142 pub fn open_in_memory() -> Self {
143 Self {
144 inner: Arc::new(RwLock::new(Database::open_in_memory())),
145 }
146 }
147
148 /// Open or create a file-backed database at `path`. The open
149 /// itself can stat the file + replay the WAL, so the call is
150 /// dispatched via `spawn_blocking` to keep the runtime
151 /// responsive. Mirrors `Database::open_path`.
152 ///
153 /// # Errors
154 /// Propagates whatever `Database::open_path` returns on the
155 /// sync path (IO errors, format errors, etc.).
156 pub async fn open_path<P: AsRef<Path>>(path: P) -> Result<Self, EngineError> {
157 let path = path.as_ref().to_path_buf();
158 let db = tokio::task::spawn_blocking(move || Database::open_path(path))
159 .await
160 .flatten_blocking()?;
161 Ok(Self {
162 inner: Arc::new(RwLock::new(db)),
163 })
164 }
165
166 /// Execute a single SQL statement.
167 ///
168 /// v7.20 P2 — group-commit: the engine mutation + WAL enqueue
169 /// run under the write lock (~1 µs), then the lock DROPS
170 /// before the fsync wait. N concurrent writers' mutations
171 /// pipeline behind each other while the WAL leader fsyncs
172 /// once for the whole batch — profile_breakdown measured
173 /// fsync at 99.2% of the durable write path, so this is
174 /// where the concurrency comes back.
175 ///
176 /// # Errors
177 /// Propagates `EngineError` unchanged from the sync engine;
178 /// a failed batch flush poisons the WAL loudly for all
179 /// waiters.
180 pub async fn execute(&self, sql: &str) -> Result<QueryResult, EngineError> {
181 let inner = Arc::clone(&self.inner);
182 let sql = sql.to_string();
183 tokio::task::spawn_blocking(move || {
184 let (result, ticket) = {
185 let mut guard = inner.blocking_write();
186 guard.execute_buffered(&sql)?
187 }; // ← write lock released here
188 if let Some(t) = ticket {
189 t.wait()?; // group-commit: shared fsync
190 }
191 Ok(result)
192 })
193 .await
194 .flatten_blocking()
195 }
196
197 /// v7.21 — run a multi-statement script with PG simple-query
198 /// semantics (one implicit transaction; see
199 /// `spg_embedded::Database::execute_script`). The write lock is
200 /// held across the WHOLE script: the engine's transaction slot
201 /// is shared, so releasing the lock mid-script would let another
202 /// writer's statements join the script's implicit transaction.
203 ///
204 /// # Errors
205 /// Propagates the first failing statement's `EngineError` after
206 /// the implicit rollback.
207 pub async fn execute_script(&self, sql: &str) -> Result<Vec<QueryResult>, EngineError> {
208 let inner = Arc::clone(&self.inner);
209 let sql = sql.to_string();
210 tokio::task::spawn_blocking(move || {
211 let mut guard = inner.blocking_write();
212 guard.execute_script(&sql)
213 })
214 .await
215 .flatten_blocking()
216 }
217
218 /// Run a SELECT and return rows as `Vec<Vec<Value>>`. Same
219 /// dispatch shape as `execute` — lock + spawn_blocking.
220 ///
221 /// # Errors
222 /// Propagates `EngineError` from the engine.
223 pub async fn query(&self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
224 let inner = Arc::clone(&self.inner);
225 let sql = sql.to_string();
226 tokio::task::spawn_blocking(move || {
227 let mut guard = inner.blocking_write();
228 guard.query(&sql)
229 })
230 .await
231 .flatten_blocking()
232 }
233
234 /// v7.16.0 — parse + plan a SQL string once. Returns an
235 /// [`AsyncStatement`] handle that subsequent
236 /// `execute_prepared` / `query_prepared` calls can re-bind
237 /// without re-parsing. Cheap to `Clone` — the underlying AST
238 /// sits behind an `Arc`, so the same plan can drive many
239 /// concurrent bind calls.
240 ///
241 /// # Errors
242 /// Propagates `EngineError` from the underlying
243 /// `Database::prepare`.
244 pub async fn prepare(&self, sql: &str) -> Result<AsyncStatement, EngineError> {
245 let inner = Arc::clone(&self.inner);
246 let sql = sql.to_string();
247 tokio::task::spawn_blocking(move || {
248 let mut guard = inner.blocking_write();
249 guard.prepare(&sql).map(|stmt| AsyncStatement {
250 inner: Arc::new(stmt),
251 })
252 })
253 .await
254 .flatten_blocking()
255 }
256
257 /// v7.17.0 Phase 3.P0-66 — async wrapper for
258 /// [`Database::describe`]. Returns `(parameter_oids,
259 /// output_columns)` for a prepared SQL string without
260 /// executing it. Drives the spg-sqlx adapter's
261 /// `Executor::describe` so `sqlx::query!()` compile-time
262 /// validation can resolve column types.
263 ///
264 /// # Errors
265 /// Propagates `EngineError` from the prepare path
266 /// (typically `ParseError`).
267 pub async fn describe(
268 &self,
269 sql: &str,
270 ) -> Result<(Vec<u32>, Vec<spg_embedded::ColumnSchema>), EngineError> {
271 let inner = Arc::clone(&self.inner);
272 let sql = sql.to_string();
273 tokio::task::spawn_blocking(move || {
274 let mut guard = inner.blocking_write();
275 guard.describe(&sql)
276 })
277 .await
278 .flatten_blocking()
279 }
280
281 /// v7.16.0 — execute a prepared statement with bound params.
282 /// `params` is taken by value because the spawn_blocking
283 /// closure needs a `'static` capture; the cost is one
284 /// `Vec::clone`-equivalent ownership transfer, dwarfed by
285 /// the engine's per-bind work.
286 ///
287 /// # Errors
288 /// Propagates engine errors; arity mismatch surfaces as
289 /// "parameter \$N referenced but only M bound by client".
290 pub async fn execute_prepared(
291 &self,
292 stmt: &AsyncStatement,
293 params: Vec<Value>,
294 ) -> Result<QueryResult, EngineError> {
295 let inner = Arc::clone(&self.inner);
296 let stmt_inner = Arc::clone(&stmt.inner);
297 tokio::task::spawn_blocking(move || {
298 // v7.20 P2 — group-commit (see `execute`): mutation
299 // under the lock, fsync wait after release.
300 let (result, ticket) = {
301 let mut guard = inner.blocking_write();
302 guard.execute_prepared_buffered(&stmt_inner, ¶ms)?
303 };
304 if let Some(t) = ticket {
305 t.wait()?;
306 }
307 Ok(result)
308 })
309 .await
310 .flatten_blocking()
311 }
312
313 /// v7.16.0 — run a prepared SELECT with bound params and
314 /// return rows as `Vec<Vec<Value>>`. Errors when the prepared
315 /// statement isn't a SELECT.
316 ///
317 /// # Errors
318 /// Propagates `EngineError` from the underlying
319 /// `Database::query_prepared`.
320 pub async fn query_prepared(
321 &self,
322 stmt: &AsyncStatement,
323 params: Vec<Value>,
324 ) -> Result<Vec<Vec<Value>>, EngineError> {
325 let inner = Arc::clone(&self.inner);
326 let stmt_inner = Arc::clone(&stmt.inner);
327 tokio::task::spawn_blocking(move || {
328 let mut guard = inner.blocking_write();
329 guard.query_prepared(&stmt_inner, ¶ms)
330 })
331 .await
332 .flatten_blocking()
333 }
334
335 /// v7.16.0 — column-aware variant of `query`. Returns the
336 /// SELECT's column schema vec alongside the rows so adapters
337 /// (the spg-sqlx fetch path most notably) can drive name +
338 /// type-based column lookups.
339 ///
340 /// # Errors
341 /// Same shape as `query` — errors when the SQL isn't a SELECT
342 /// or the engine returns one.
343 pub async fn query_with_columns(
344 &self,
345 sql: &str,
346 ) -> Result<(Vec<spg_embedded::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
347 let inner = Arc::clone(&self.inner);
348 let sql = sql.to_string();
349 tokio::task::spawn_blocking(move || {
350 let mut guard = inner.blocking_write();
351 guard.query_with_columns(&sql)
352 })
353 .await
354 .flatten_blocking()
355 }
356
357 /// v7.16.0 — column-aware variant of `query_prepared`. Same
358 /// shape as `query_with_columns` but driven from a prepared
359 /// AsyncStatement + bound params.
360 ///
361 /// # Errors
362 /// Propagates `EngineError`; errors when the prepared
363 /// statement isn't a SELECT.
364 pub async fn query_prepared_with_columns(
365 &self,
366 stmt: &AsyncStatement,
367 params: Vec<Value>,
368 ) -> Result<(Vec<spg_embedded::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
369 let inner = Arc::clone(&self.inner);
370 let stmt_inner = Arc::clone(&stmt.inner);
371 tokio::task::spawn_blocking(move || {
372 let mut guard = inner.blocking_write();
373 guard.query_prepared_with_columns(&stmt_inner, ¶ms)
374 })
375 .await
376 .flatten_blocking()
377 }
378
379 /// Run a checkpoint (flush WAL into the catalog snapshot +
380 /// truncate the WAL back to zero). Blocking work — dispatched
381 /// the same way as `execute`.
382 ///
383 /// # Errors
384 /// Propagates `EngineError` from the engine / IO layer.
385 pub async fn checkpoint(&self) -> Result<(), EngineError> {
386 let inner = Arc::clone(&self.inner);
387 tokio::task::spawn_blocking(move || {
388 let mut guard = inner.blocking_write();
389 guard.checkpoint()
390 })
391 .await
392 .flatten_blocking()
393 }
394
395 /// v7.20 P3 — inline snapshot clone for the read fan-out hot
396 /// path. Takes the async read lock (not `blocking_read` +
397 /// `spawn_blocking` — the clone is an Arc-bump of the catalog
398 /// trie roots, ~0 µs per profile_breakdown, far below tokio's
399 /// inline-work threshold). spg-sqlx's per-statement
400 /// read-committed refresh runs through here; pairing it with
401 /// `Database::{prepare,execute_prepared}_on_snapshot` keeps
402 /// the whole readonly statement on the async executor with
403 /// zero thread hops.
404 pub async fn clone_snapshot_inline(&self) -> CatalogSnapshot {
405 let guard = self.inner.read().await;
406 guard.engine().clone_snapshot()
407 }
408
409 /// v7.11.2 — fan-out reader. Clones the engine's committed
410 /// catalog under the writer lock, releases the lock, and
411 /// hands back an `AsyncReadHandle` that runs SELECTs against
412 /// the snapshot **without ever re-acquiring the writer
413 /// lock**. Multiple read handles can run concurrently — they
414 /// share nothing mutable. mailrs's IMAP fetch pattern lands
415 /// here.
416 ///
417 /// Contract: the snapshot is frozen at the moment this call
418 /// returns. Subsequent writes are NOT visible. Call
419 /// `AsyncReadHandle::refresh().await` to re-snapshot when
420 /// you need fresher data.
421 pub async fn read_handle(&self) -> AsyncReadHandle {
422 let inner = Arc::clone(&self.inner);
423 let snapshot = tokio::task::spawn_blocking(move || {
424 let guard = inner.blocking_read();
425 guard.engine().clone_snapshot()
426 })
427 .await
428 .unwrap_blocking();
429 AsyncReadHandle {
430 db: Arc::clone(&self.inner),
431 snapshot,
432 }
433 }
434}
435
436/// v7.11.2 — read-only handle backed by a frozen
437/// `CatalogSnapshot`. Multiple handles can run concurrently; they
438/// don't acquire the writer lock at query time. Refresh-on-demand
439/// — the contract is that the handle reflects committed state at
440/// the moment of construction or the last `refresh()`.
441///
442/// v7.18 — holds a reference to the underlying `AsyncDatabase`
443/// (via the shared `Arc<RwLock<Database>>`) only so `refresh()`
444/// can briefly take the read lock to clone a fresh snapshot.
445/// Read paths never touch the Database directly. Snapshot
446/// cloning is a trie-root `Arc` copy, so a busy writer barely
447/// affects refresh latency.
448#[derive(Debug)]
449pub struct AsyncReadHandle {
450 db: Arc<RwLock<Database>>,
451 snapshot: CatalogSnapshot,
452}
453
454impl AsyncReadHandle {
455 /// Run a read-only SQL statement against the frozen snapshot.
456 /// DDL / DML reject with `EngineError::WriteRequired`.
457 ///
458 /// # Errors
459 /// Propagates `EngineError` from the engine's read path.
460 pub async fn query(&self, sql: &str) -> Result<QueryResult, EngineError> {
461 let snapshot = self.snapshot.clone();
462 let sql = sql.to_string();
463 tokio::task::spawn_blocking(move || {
464 spg_engine::Engine::execute_readonly_on_snapshot(&snapshot, &sql)
465 })
466 .await
467 .flatten_blocking()
468 }
469
470 /// v7.18 — parse + plan a SQL string against this handle's
471 /// frozen snapshot. Mirror of [`AsyncDatabase::prepare`] for
472 /// the readonly fan-out path: clock rewrite + JOIN reorder +
473 /// position resolve happen against the snapshot's catalog +
474 /// statistics, no writer lock acquired. Multiple read handles
475 /// can prepare concurrently; the returned [`AsyncStatement`]
476 /// is `Clone + Send`.
477 ///
478 /// # Errors
479 /// Propagates [`EngineError`] from the parser
480 /// (`EngineError::Parse`).
481 pub async fn prepare(&self, sql: &str) -> Result<AsyncStatement, EngineError> {
482 let snapshot = self.snapshot.clone();
483 let sql = sql.to_string();
484 tokio::task::spawn_blocking(move || {
485 Database::prepare_on_snapshot(&snapshot, &sql).map(|stmt| AsyncStatement {
486 inner: Arc::new(stmt),
487 })
488 })
489 .await
490 .flatten_blocking()
491 }
492
493 /// v7.18 — execute a prepared statement against this handle's
494 /// frozen snapshot with bound params. Mirror of
495 /// [`AsyncDatabase::execute_prepared`] on the readonly path —
496 /// writes / DDL hit `EngineError::WriteRequired` so the caller
497 /// can route them to the writer mutex. No writer lock
498 /// acquired; multiple handles run truly concurrently.
499 ///
500 /// # Errors
501 /// Propagates engine errors (placeholder arity mismatch,
502 /// schema drift surfacing as catalog lookups, etc.).
503 pub async fn execute_prepared(
504 &self,
505 stmt: &AsyncStatement,
506 params: Vec<Value>,
507 ) -> Result<QueryResult, EngineError> {
508 let snapshot = self.snapshot.clone();
509 let stmt_inner = Arc::clone(&stmt.inner);
510 tokio::task::spawn_blocking(move || {
511 Database::execute_prepared_on_snapshot(&snapshot, &stmt_inner, ¶ms)
512 })
513 .await
514 .flatten_blocking()
515 }
516
517 /// v7.18 — describe a prepared SQL string against this
518 /// handle's frozen snapshot. Returns `(parameter_oids,
519 /// output_columns)`. Drives the spg-sqlx adapter's readonly
520 /// `Executor::describe` path so `sqlx::query!()` compile-time
521 /// validation can resolve column types without touching the
522 /// writer engine.
523 ///
524 /// # Errors
525 /// Propagates [`EngineError`] from the parser
526 /// (`EngineError::Parse`).
527 pub async fn describe(
528 &self,
529 sql: &str,
530 ) -> Result<(Vec<u32>, Vec<spg_embedded::ColumnSchema>), EngineError> {
531 let snapshot = self.snapshot.clone();
532 let sql = sql.to_string();
533 tokio::task::spawn_blocking(move || Database::describe_on_snapshot(&snapshot, &sql))
534 .await
535 .flatten_blocking()
536 }
537
538 /// Re-snapshot the underlying engine. Briefly takes the
539 /// writer lock; subsequent `query()` calls see the new state.
540 /// Idempotent on a quiet engine (clones the same trie roots).
541 pub async fn refresh(&mut self) {
542 let inner = Arc::clone(&self.db);
543 let new_snapshot = tokio::task::spawn_blocking(move || {
544 let guard = inner.blocking_read();
545 guard.engine().clone_snapshot()
546 })
547 .await
548 .unwrap_blocking();
549 self.snapshot = new_snapshot;
550 }
551}