Skip to main content

spg_embedded_tokio/
lib.rs

1//! Tokio-friendly async wrapper around `spg-embedded`.
2//!
3//! # Why this crate exists
4//!
5//! `spg-embedded`'s `Database::execute(&mut self, sql)` is sync
6//! and may block on WAL fsync or cold-tier I/O. Called from
7//! inside a `tokio::main` runtime that triggers the
8//! `block_in_place` warning and ties up a worker thread until the
9//! call returns. mailrs's cement (entirely tokio-based) is the
10//! load-bearing consumer that surfaced this.
11//!
12//! v7.18 — [`AsyncDatabase`] holds a `tokio::sync::RwLock<Database>`
13//! (upgraded from Mutex). Writer calls take the write lock —
14//! the engine is still single-writer, that invariant hasn't
15//! changed. Snapshot-taking (`read_handle` init / refresh) only
16//! needs read access to `clone_snapshot`, so it takes the read
17//! lock and concurrent snapshot refreshes do not serialise.
18//! `spawn_blocking` insulates the runtime's worker pool from
19//! disk stalls the same way it did under Mutex.
20//!
21//! # Why a separate crate
22//!
23//! `spg-embedded` keeps the workspace's "0 external dependencies"
24//! policy. `tokio` is the largest external dep we'd ever pull,
25//! and gating it behind a Cargo feature flag still surfaces
26//! `tokio` in downstream consumers' `Cargo.lock`. A separate
27//! adapter crate is the clean answer: anyone who wants the
28//! tokio shape opts in by adding `spg-embedded-tokio`; everyone
29//! else stays untouched.
30
31#![deny(missing_debug_implementations)]
32
33use std::path::Path;
34use std::sync::Arc;
35
36pub use spg_embedded::{
37    ColumnSchema, DataType, Database, EngineError, ParsedStatement, QueryResult, Statement, Value,
38};
39pub use spg_engine::CatalogSnapshot;
40
41use tokio::sync::RwLock;
42use tokio::task::JoinError;
43
44/// v7.34.1 (mailrs prod report bug B): drop the previous
45/// `.expect("spawn_blocking join")` shape that panicked on
46/// `JoinError::Cancelled` during runtime shutdown — a SIGKILL with any
47/// readonly call in flight reliably reproduced it. Cancelled is the
48/// expected state when the tokio runtime is being dropped; map it to
49/// `EngineError::Cancelled` so the caller's `?` propagates a clean
50/// "shutting down" error instead of a panic. A real panic inside the
51/// blocking closure still surfaces — `resume_unwind` re-throws the
52/// original payload so backtraces and any `catch_unwind` machinery
53/// keeps its semantics.
54trait FlattenBlockingExt<T> {
55    /// Result-returning closures: flatten `JoinHandle`'s outer error
56    /// into `EngineError`; an in-flight cancellation becomes
57    /// `Err(EngineError::Cancelled)`, panics propagate verbatim.
58    fn flatten_blocking(self) -> Result<T, EngineError>;
59}
60
61impl<T> FlattenBlockingExt<T> for Result<Result<T, EngineError>, JoinError> {
62    fn flatten_blocking(self) -> Result<T, EngineError> {
63        match self {
64            Ok(inner) => inner,
65            Err(je) if je.is_cancelled() => Err(EngineError::Cancelled),
66            Err(je) => std::panic::resume_unwind(je.into_panic()),
67        }
68    }
69}
70
71/// Same idea for `spawn_blocking` closures whose return type is a bare
72/// `T` (not a `Result`). Used by `read_handle` / `refresh` where the
73/// historical signature is `-> T`. Cancellation here surfaces as a
74/// panic with an honest message rather than the misleading
75/// "spawn_blocking join" string the old expect produced — a
76/// Result-returning rework of those two methods is the API-break that
77/// follow-up work would carry.
78trait UnwrapBlockingExt<T> {
79    fn unwrap_blocking(self) -> T;
80}
81
82impl<T> UnwrapBlockingExt<T> for Result<T, JoinError> {
83    fn unwrap_blocking(self) -> T {
84        match self {
85            Ok(v) => v,
86            Err(je) if je.is_cancelled() => {
87                panic!("spg-embedded-tokio: snapshot helper cancelled during runtime shutdown")
88            }
89            Err(je) => std::panic::resume_unwind(je.into_panic()),
90        }
91    }
92}
93
94/// Tokio-friendly handle to an embedded SPG database. Clone-cheap
95/// (`Arc` inside); every clone shares the same underlying engine.
96///
97/// v7.18 — backed by a `tokio::sync::RwLock` so writer calls
98/// serialise (engine single-writer invariant) but snapshot-only
99/// operations (`read_handle` init / refresh, which just clone
100/// the catalog trie roots) take the read lock and run
101/// concurrently with each other.
102#[derive(Debug, Clone)]
103pub struct AsyncDatabase {
104    inner: Arc<RwLock<Database>>,
105}
106
107/// v7.16.0 — Tokio-flavoured prepared-statement handle. Wraps
108/// the sync `spg_embedded::Statement` in an `Arc` so the AST is
109/// shared (not cloned) across `execute_prepared` /
110/// `query_prepared` calls, and so the handle is `Clone + Send`
111/// without copying the AST per bind. The engine's per-bind
112/// internal clone still happens — that's where placeholder
113/// substitution lands — but the spg-embedded-tokio surface
114/// avoids the second clone the naive shape would force.
115///
116/// Holding an `AsyncStatement` does NOT pin the database; drop
117/// the last `AsyncDatabase` clone and the handle stops being
118/// useful (the next `execute_prepared` call would still find a
119/// locked `Database` if any other clone is alive, but bind
120/// against a dropped database surfaces as the underlying
121/// `EngineError`).
122#[derive(Debug, Clone)]
123pub struct AsyncStatement {
124    inner: Arc<crate::Statement>,
125}
126
127/// v7.16.0 — adapter escape hatch: hand back the inner
128/// `Arc<Statement>`. Used by the `spg-sqlx` crate to plug the
129/// engine-side prepared handle into sqlx's `Statement<'q>` trait
130/// without going through another clone. Not intended for
131/// application code.
132#[doc(hidden)]
133#[must_use]
134pub fn async_statement_inner(stmt: &AsyncStatement) -> Arc<crate::Statement> {
135    Arc::clone(&stmt.inner)
136}
137
138impl AsyncDatabase {
139    /// In-memory database. No WAL, no catalog snapshot on disk.
140    /// `Clone` shares the engine; drop the last clone to release.
141    #[must_use]
142    pub fn open_in_memory() -> Self {
143        Self {
144            inner: Arc::new(RwLock::new(Database::open_in_memory())),
145        }
146    }
147
148    /// Open or create a file-backed database at `path`. The open
149    /// itself can stat the file + replay the WAL, so the call is
150    /// dispatched via `spawn_blocking` to keep the runtime
151    /// responsive. Mirrors `Database::open_path`.
152    ///
153    /// # Errors
154    /// Propagates whatever `Database::open_path` returns on the
155    /// sync path (IO errors, format errors, etc.).
156    pub async fn open_path<P: AsRef<Path>>(path: P) -> Result<Self, EngineError> {
157        let path = path.as_ref().to_path_buf();
158        let db = tokio::task::spawn_blocking(move || Database::open_path(path))
159            .await
160            .flatten_blocking()?;
161        Ok(Self {
162            inner: Arc::new(RwLock::new(db)),
163        })
164    }
165
166    /// Execute a single SQL statement.
167    ///
168    /// v7.20 P2 — group-commit: the engine mutation + WAL enqueue
169    /// run under the write lock (~1 µs), then the lock DROPS
170    /// before the fsync wait. N concurrent writers' mutations
171    /// pipeline behind each other while the WAL leader fsyncs
172    /// once for the whole batch — profile_breakdown measured
173    /// fsync at 99.2% of the durable write path, so this is
174    /// where the concurrency comes back.
175    ///
176    /// # Errors
177    /// Propagates `EngineError` unchanged from the sync engine;
178    /// a failed batch flush poisons the WAL loudly for all
179    /// waiters.
180    pub async fn execute(&self, sql: &str) -> Result<QueryResult, EngineError> {
181        let inner = Arc::clone(&self.inner);
182        let sql = sql.to_string();
183        tokio::task::spawn_blocking(move || {
184            let (result, ticket) = {
185                let mut guard = inner.blocking_write();
186                guard.execute_buffered(&sql)?
187            }; // ← write lock released here
188            if let Some(t) = ticket {
189                t.wait()?; // group-commit: shared fsync
190            }
191            Ok(result)
192        })
193        .await
194        .flatten_blocking()
195    }
196
197    /// v7.21 — run a multi-statement script with PG simple-query
198    /// semantics (one implicit transaction; see
199    /// `spg_embedded::Database::execute_script`). The write lock is
200    /// held across the WHOLE script: the engine's transaction slot
201    /// is shared, so releasing the lock mid-script would let another
202    /// writer's statements join the script's implicit transaction.
203    ///
204    /// # Errors
205    /// Propagates the first failing statement's `EngineError` after
206    /// the implicit rollback.
207    pub async fn execute_script(&self, sql: &str) -> Result<Vec<QueryResult>, EngineError> {
208        let inner = Arc::clone(&self.inner);
209        let sql = sql.to_string();
210        tokio::task::spawn_blocking(move || {
211            let mut guard = inner.blocking_write();
212            guard.execute_script(&sql)
213        })
214        .await
215        .flatten_blocking()
216    }
217
218    /// Run a SELECT and return rows as `Vec<Vec<Value>>`. Same
219    /// dispatch shape as `execute` — lock + spawn_blocking.
220    ///
221    /// # Errors
222    /// Propagates `EngineError` from the engine.
223    pub async fn query(&self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
224        let inner = Arc::clone(&self.inner);
225        let sql = sql.to_string();
226        tokio::task::spawn_blocking(move || {
227            let mut guard = inner.blocking_write();
228            guard.query(&sql)
229        })
230        .await
231        .flatten_blocking()
232    }
233
234    /// v7.16.0 — parse + plan a SQL string once. Returns an
235    /// [`AsyncStatement`] handle that subsequent
236    /// `execute_prepared` / `query_prepared` calls can re-bind
237    /// without re-parsing. Cheap to `Clone` — the underlying AST
238    /// sits behind an `Arc`, so the same plan can drive many
239    /// concurrent bind calls.
240    ///
241    /// # Errors
242    /// Propagates `EngineError` from the underlying
243    /// `Database::prepare`.
244    pub async fn prepare(&self, sql: &str) -> Result<AsyncStatement, EngineError> {
245        let inner = Arc::clone(&self.inner);
246        let sql = sql.to_string();
247        tokio::task::spawn_blocking(move || {
248            let mut guard = inner.blocking_write();
249            guard.prepare(&sql).map(|stmt| AsyncStatement {
250                inner: Arc::new(stmt),
251            })
252        })
253        .await
254        .flatten_blocking()
255    }
256
257    /// v7.17.0 Phase 3.P0-66 — async wrapper for
258    /// [`Database::describe`]. Returns `(parameter_oids,
259    /// output_columns)` for a prepared SQL string without
260    /// executing it. Drives the spg-sqlx adapter's
261    /// `Executor::describe` so `sqlx::query!()` compile-time
262    /// validation can resolve column types.
263    ///
264    /// # Errors
265    /// Propagates `EngineError` from the prepare path
266    /// (typically `ParseError`).
267    pub async fn describe(
268        &self,
269        sql: &str,
270    ) -> Result<(Vec<u32>, Vec<spg_embedded::ColumnSchema>), EngineError> {
271        let inner = Arc::clone(&self.inner);
272        let sql = sql.to_string();
273        tokio::task::spawn_blocking(move || {
274            let mut guard = inner.blocking_write();
275            guard.describe(&sql)
276        })
277        .await
278        .flatten_blocking()
279    }
280
281    /// v7.16.0 — execute a prepared statement with bound params.
282    /// `params` is taken by value because the spawn_blocking
283    /// closure needs a `'static` capture; the cost is one
284    /// `Vec::clone`-equivalent ownership transfer, dwarfed by
285    /// the engine's per-bind work.
286    ///
287    /// # Errors
288    /// Propagates engine errors; arity mismatch surfaces as
289    /// "parameter \$N referenced but only M bound by client".
290    pub async fn execute_prepared(
291        &self,
292        stmt: &AsyncStatement,
293        params: Vec<Value>,
294    ) -> Result<QueryResult, EngineError> {
295        let inner = Arc::clone(&self.inner);
296        let stmt_inner = Arc::clone(&stmt.inner);
297        tokio::task::spawn_blocking(move || {
298            // v7.20 P2 — group-commit (see `execute`): mutation
299            // under the lock, fsync wait after release.
300            let (result, ticket) = {
301                let mut guard = inner.blocking_write();
302                guard.execute_prepared_buffered(&stmt_inner, &params)?
303            };
304            if let Some(t) = ticket {
305                t.wait()?;
306            }
307            Ok(result)
308        })
309        .await
310        .flatten_blocking()
311    }
312
313    /// v7.16.0 — run a prepared SELECT with bound params and
314    /// return rows as `Vec<Vec<Value>>`. Errors when the prepared
315    /// statement isn't a SELECT.
316    ///
317    /// # Errors
318    /// Propagates `EngineError` from the underlying
319    /// `Database::query_prepared`.
320    pub async fn query_prepared(
321        &self,
322        stmt: &AsyncStatement,
323        params: Vec<Value>,
324    ) -> Result<Vec<Vec<Value>>, EngineError> {
325        let inner = Arc::clone(&self.inner);
326        let stmt_inner = Arc::clone(&stmt.inner);
327        tokio::task::spawn_blocking(move || {
328            let mut guard = inner.blocking_write();
329            guard.query_prepared(&stmt_inner, &params)
330        })
331        .await
332        .flatten_blocking()
333    }
334
335    /// v7.16.0 — column-aware variant of `query`. Returns the
336    /// SELECT's column schema vec alongside the rows so adapters
337    /// (the spg-sqlx fetch path most notably) can drive name +
338    /// type-based column lookups.
339    ///
340    /// # Errors
341    /// Same shape as `query` — errors when the SQL isn't a SELECT
342    /// or the engine returns one.
343    pub async fn query_with_columns(
344        &self,
345        sql: &str,
346    ) -> Result<(Vec<spg_embedded::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
347        let inner = Arc::clone(&self.inner);
348        let sql = sql.to_string();
349        tokio::task::spawn_blocking(move || {
350            let mut guard = inner.blocking_write();
351            guard.query_with_columns(&sql)
352        })
353        .await
354        .flatten_blocking()
355    }
356
357    /// v7.16.0 — column-aware variant of `query_prepared`. Same
358    /// shape as `query_with_columns` but driven from a prepared
359    /// AsyncStatement + bound params.
360    ///
361    /// # Errors
362    /// Propagates `EngineError`; errors when the prepared
363    /// statement isn't a SELECT.
364    pub async fn query_prepared_with_columns(
365        &self,
366        stmt: &AsyncStatement,
367        params: Vec<Value>,
368    ) -> Result<(Vec<spg_embedded::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
369        let inner = Arc::clone(&self.inner);
370        let stmt_inner = Arc::clone(&stmt.inner);
371        tokio::task::spawn_blocking(move || {
372            let mut guard = inner.blocking_write();
373            guard.query_prepared_with_columns(&stmt_inner, &params)
374        })
375        .await
376        .flatten_blocking()
377    }
378
379    /// Run a checkpoint (flush WAL into the catalog snapshot +
380    /// truncate the WAL back to zero). Blocking work — dispatched
381    /// the same way as `execute`.
382    ///
383    /// # Errors
384    /// Propagates `EngineError` from the engine / IO layer.
385    pub async fn checkpoint(&self) -> Result<(), EngineError> {
386        let inner = Arc::clone(&self.inner);
387        tokio::task::spawn_blocking(move || {
388            let mut guard = inner.blocking_write();
389            guard.checkpoint()
390        })
391        .await
392        .flatten_blocking()
393    }
394
395    /// v7.20 P3 — inline snapshot clone for the read fan-out hot
396    /// path. Takes the async read lock (not `blocking_read` +
397    /// `spawn_blocking` — the clone is an Arc-bump of the catalog
398    /// trie roots, ~0 µs per profile_breakdown, far below tokio's
399    /// inline-work threshold). spg-sqlx's per-statement
400    /// read-committed refresh runs through here; pairing it with
401    /// `Database::{prepare,execute_prepared}_on_snapshot` keeps
402    /// the whole readonly statement on the async executor with
403    /// zero thread hops.
404    pub async fn clone_snapshot_inline(&self) -> CatalogSnapshot {
405        let guard = self.inner.read().await;
406        guard.engine().clone_snapshot()
407    }
408
409    /// v7.11.2 — fan-out reader. Clones the engine's committed
410    /// catalog under the writer lock, releases the lock, and
411    /// hands back an `AsyncReadHandle` that runs SELECTs against
412    /// the snapshot **without ever re-acquiring the writer
413    /// lock**. Multiple read handles can run concurrently — they
414    /// share nothing mutable. mailrs's IMAP fetch pattern lands
415    /// here.
416    ///
417    /// Contract: the snapshot is frozen at the moment this call
418    /// returns. Subsequent writes are NOT visible. Call
419    /// `AsyncReadHandle::refresh().await` to re-snapshot when
420    /// you need fresher data.
421    pub async fn read_handle(&self) -> AsyncReadHandle {
422        let inner = Arc::clone(&self.inner);
423        let snapshot = tokio::task::spawn_blocking(move || {
424            let guard = inner.blocking_read();
425            guard.engine().clone_snapshot()
426        })
427        .await
428        .unwrap_blocking();
429        AsyncReadHandle {
430            db: Arc::clone(&self.inner),
431            snapshot,
432        }
433    }
434}
435
436/// v7.11.2 — read-only handle backed by a frozen
437/// `CatalogSnapshot`. Multiple handles can run concurrently; they
438/// don't acquire the writer lock at query time. Refresh-on-demand
439/// — the contract is that the handle reflects committed state at
440/// the moment of construction or the last `refresh()`.
441///
442/// v7.18 — holds a reference to the underlying `AsyncDatabase`
443/// (via the shared `Arc<RwLock<Database>>`) only so `refresh()`
444/// can briefly take the read lock to clone a fresh snapshot.
445/// Read paths never touch the Database directly. Snapshot
446/// cloning is a trie-root `Arc` copy, so a busy writer barely
447/// affects refresh latency.
448#[derive(Debug)]
449pub struct AsyncReadHandle {
450    db: Arc<RwLock<Database>>,
451    snapshot: CatalogSnapshot,
452}
453
454impl AsyncReadHandle {
455    /// Run a read-only SQL statement against the frozen snapshot.
456    /// DDL / DML reject with `EngineError::WriteRequired`.
457    ///
458    /// # Errors
459    /// Propagates `EngineError` from the engine's read path.
460    pub async fn query(&self, sql: &str) -> Result<QueryResult, EngineError> {
461        let snapshot = self.snapshot.clone();
462        let sql = sql.to_string();
463        tokio::task::spawn_blocking(move || {
464            spg_engine::Engine::execute_readonly_on_snapshot(&snapshot, &sql)
465        })
466        .await
467        .flatten_blocking()
468    }
469
470    /// v7.18 — parse + plan a SQL string against this handle's
471    /// frozen snapshot. Mirror of [`AsyncDatabase::prepare`] for
472    /// the readonly fan-out path: clock rewrite + JOIN reorder +
473    /// position resolve happen against the snapshot's catalog +
474    /// statistics, no writer lock acquired. Multiple read handles
475    /// can prepare concurrently; the returned [`AsyncStatement`]
476    /// is `Clone + Send`.
477    ///
478    /// # Errors
479    /// Propagates [`EngineError`] from the parser
480    /// (`EngineError::Parse`).
481    pub async fn prepare(&self, sql: &str) -> Result<AsyncStatement, EngineError> {
482        let snapshot = self.snapshot.clone();
483        let sql = sql.to_string();
484        tokio::task::spawn_blocking(move || {
485            Database::prepare_on_snapshot(&snapshot, &sql).map(|stmt| AsyncStatement {
486                inner: Arc::new(stmt),
487            })
488        })
489        .await
490        .flatten_blocking()
491    }
492
493    /// v7.18 — execute a prepared statement against this handle's
494    /// frozen snapshot with bound params. Mirror of
495    /// [`AsyncDatabase::execute_prepared`] on the readonly path —
496    /// writes / DDL hit `EngineError::WriteRequired` so the caller
497    /// can route them to the writer mutex. No writer lock
498    /// acquired; multiple handles run truly concurrently.
499    ///
500    /// # Errors
501    /// Propagates engine errors (placeholder arity mismatch,
502    /// schema drift surfacing as catalog lookups, etc.).
503    pub async fn execute_prepared(
504        &self,
505        stmt: &AsyncStatement,
506        params: Vec<Value>,
507    ) -> Result<QueryResult, EngineError> {
508        let snapshot = self.snapshot.clone();
509        let stmt_inner = Arc::clone(&stmt.inner);
510        tokio::task::spawn_blocking(move || {
511            Database::execute_prepared_on_snapshot(&snapshot, &stmt_inner, &params)
512        })
513        .await
514        .flatten_blocking()
515    }
516
517    /// v7.18 — describe a prepared SQL string against this
518    /// handle's frozen snapshot. Returns `(parameter_oids,
519    /// output_columns)`. Drives the spg-sqlx adapter's readonly
520    /// `Executor::describe` path so `sqlx::query!()` compile-time
521    /// validation can resolve column types without touching the
522    /// writer engine.
523    ///
524    /// # Errors
525    /// Propagates [`EngineError`] from the parser
526    /// (`EngineError::Parse`).
527    pub async fn describe(
528        &self,
529        sql: &str,
530    ) -> Result<(Vec<u32>, Vec<spg_embedded::ColumnSchema>), EngineError> {
531        let snapshot = self.snapshot.clone();
532        let sql = sql.to_string();
533        tokio::task::spawn_blocking(move || Database::describe_on_snapshot(&snapshot, &sql))
534            .await
535            .flatten_blocking()
536    }
537
538    /// Re-snapshot the underlying engine. Briefly takes the
539    /// writer lock; subsequent `query()` calls see the new state.
540    /// Idempotent on a quiet engine (clones the same trie roots).
541    pub async fn refresh(&mut self) {
542        let inner = Arc::clone(&self.db);
543        let new_snapshot = tokio::task::spawn_blocking(move || {
544            let guard = inner.blocking_read();
545            guard.engine().clone_snapshot()
546        })
547        .await
548        .unwrap_blocking();
549        self.snapshot = new_snapshot;
550    }
551}