spg-embedded-tokio 7.35.0

Tokio-friendly async wrapper around spg-embedded. Single-writer Database serialised via tokio::sync::Mutex; blocking engine calls dispatched via spawn_blocking so the runtime never stalls on WAL fsync.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
//! Tokio-friendly async wrapper around `spg-embedded`.
//!
//! # Why this crate exists
//!
//! `spg-embedded`'s `Database::execute(&mut self, sql)` is sync
//! and may block on WAL fsync or cold-tier I/O. Called from
//! inside a `tokio::main` runtime that triggers the
//! `block_in_place` warning and ties up a worker thread until the
//! call returns. mailrs's cement (entirely tokio-based) is the
//! load-bearing consumer that surfaced this.
//!
//! v7.18 — [`AsyncDatabase`] holds a `tokio::sync::RwLock<Database>`
//! (upgraded from Mutex). Writer calls take the write lock —
//! the engine is still single-writer, that invariant hasn't
//! changed. Snapshot-taking (`read_handle` init / refresh) only
//! needs read access to `clone_snapshot`, so it takes the read
//! lock and concurrent snapshot refreshes do not serialise.
//! `spawn_blocking` insulates the runtime's worker pool from
//! disk stalls the same way it did under Mutex.
//!
//! # Why a separate crate
//!
//! `spg-embedded` keeps the workspace's "0 external dependencies"
//! policy. `tokio` is the largest external dep we'd ever pull,
//! and gating it behind a Cargo feature flag still surfaces
//! `tokio` in downstream consumers' `Cargo.lock`. A separate
//! adapter crate is the clean answer: anyone who wants the
//! tokio shape opts in by adding `spg-embedded-tokio`; everyone
//! else stays untouched.

#![deny(missing_debug_implementations)]

use std::path::Path;
use std::sync::Arc;

pub use spg_embedded::{
    ColumnSchema, DataType, Database, EngineError, ParsedStatement, QueryResult, Statement, Value,
};
pub use spg_engine::CatalogSnapshot;

use tokio::sync::RwLock;
use tokio::task::JoinError;

/// v7.34.1 (mailrs prod report bug B): drop the previous
/// `.expect("spawn_blocking join")` shape that panicked on
/// `JoinError::Cancelled` during runtime shutdown — a SIGKILL with any
/// readonly call in flight reliably reproduced it. Cancelled is the
/// expected state when the tokio runtime is being dropped; map it to
/// `EngineError::Cancelled` so the caller's `?` propagates a clean
/// "shutting down" error instead of a panic. A real panic inside the
/// blocking closure still surfaces — `resume_unwind` re-throws the
/// original payload so backtraces and any `catch_unwind` machinery
/// keeps its semantics.
trait FlattenBlockingExt<T> {
    /// Result-returning closures: flatten `JoinHandle`'s outer error
    /// into `EngineError`; an in-flight cancellation becomes
    /// `Err(EngineError::Cancelled)`, panics propagate verbatim.
    fn flatten_blocking(self) -> Result<T, EngineError>;
}

impl<T> FlattenBlockingExt<T> for Result<Result<T, EngineError>, JoinError> {
    fn flatten_blocking(self) -> Result<T, EngineError> {
        match self {
            Ok(inner) => inner,
            Err(je) if je.is_cancelled() => Err(EngineError::Cancelled),
            Err(je) => std::panic::resume_unwind(je.into_panic()),
        }
    }
}

/// Same idea for `spawn_blocking` closures whose return type is a bare
/// `T` (not a `Result`). Used by `read_handle` / `refresh` where the
/// historical signature is `-> T`. Cancellation here surfaces as a
/// panic with an honest message rather than the misleading
/// "spawn_blocking join" string the old expect produced — a
/// Result-returning rework of those two methods is the API-break that
/// follow-up work would carry.
trait UnwrapBlockingExt<T> {
    fn unwrap_blocking(self) -> T;
}

impl<T> UnwrapBlockingExt<T> for Result<T, JoinError> {
    fn unwrap_blocking(self) -> T {
        match self {
            Ok(v) => v,
            Err(je) if je.is_cancelled() => {
                panic!("spg-embedded-tokio: snapshot helper cancelled during runtime shutdown")
            }
            Err(je) => std::panic::resume_unwind(je.into_panic()),
        }
    }
}

/// Tokio-friendly handle to an embedded SPG database. Clone-cheap
/// (`Arc` inside); every clone shares the same underlying engine.
///
/// v7.18 — backed by a `tokio::sync::RwLock` so writer calls
/// serialise (engine single-writer invariant) but snapshot-only
/// operations (`read_handle` init / refresh, which just clone
/// the catalog trie roots) take the read lock and run
/// concurrently with each other.
#[derive(Debug, Clone)]
pub struct AsyncDatabase {
    inner: Arc<RwLock<Database>>,
}

/// v7.16.0 — Tokio-flavoured prepared-statement handle. Wraps
/// the sync `spg_embedded::Statement` in an `Arc` so the AST is
/// shared (not cloned) across `execute_prepared` /
/// `query_prepared` calls, and so the handle is `Clone + Send`
/// without copying the AST per bind. The engine's per-bind
/// internal clone still happens — that's where placeholder
/// substitution lands — but the spg-embedded-tokio surface
/// avoids the second clone the naive shape would force.
///
/// Holding an `AsyncStatement` does NOT pin the database; drop
/// the last `AsyncDatabase` clone and the handle stops being
/// useful (the next `execute_prepared` call would still find a
/// locked `Database` if any other clone is alive, but bind
/// against a dropped database surfaces as the underlying
/// `EngineError`).
#[derive(Debug, Clone)]
pub struct AsyncStatement {
    inner: Arc<crate::Statement>,
}

/// v7.16.0 — adapter escape hatch: hand back the inner
/// `Arc<Statement>`. Used by the `spg-sqlx` crate to plug the
/// engine-side prepared handle into sqlx's `Statement<'q>` trait
/// without going through another clone. Not intended for
/// application code.
#[doc(hidden)]
#[must_use]
pub fn async_statement_inner(stmt: &AsyncStatement) -> Arc<crate::Statement> {
    Arc::clone(&stmt.inner)
}

impl AsyncDatabase {
    /// In-memory database. No WAL, no catalog snapshot on disk.
    /// `Clone` shares the engine; drop the last clone to release.
    #[must_use]
    pub fn open_in_memory() -> Self {
        Self {
            inner: Arc::new(RwLock::new(Database::open_in_memory())),
        }
    }

    /// Open or create a file-backed database at `path`. The open
    /// itself can stat the file + replay the WAL, so the call is
    /// dispatched via `spawn_blocking` to keep the runtime
    /// responsive. Mirrors `Database::open_path`.
    ///
    /// # Errors
    /// Propagates whatever `Database::open_path` returns on the
    /// sync path (IO errors, format errors, etc.).
    pub async fn open_path<P: AsRef<Path>>(path: P) -> Result<Self, EngineError> {
        let path = path.as_ref().to_path_buf();
        let db = tokio::task::spawn_blocking(move || Database::open_path(path))
            .await
            .flatten_blocking()?;
        Ok(Self {
            inner: Arc::new(RwLock::new(db)),
        })
    }

    /// Execute a single SQL statement.
    ///
    /// v7.20 P2 — group-commit: the engine mutation + WAL enqueue
    /// run under the write lock (~1 µs), then the lock DROPS
    /// before the fsync wait. N concurrent writers' mutations
    /// pipeline behind each other while the WAL leader fsyncs
    /// once for the whole batch — profile_breakdown measured
    /// fsync at 99.2% of the durable write path, so this is
    /// where the concurrency comes back.
    ///
    /// # Errors
    /// Propagates `EngineError` unchanged from the sync engine;
    /// a failed batch flush poisons the WAL loudly for all
    /// waiters.
    pub async fn execute(&self, sql: &str) -> Result<QueryResult, EngineError> {
        let inner = Arc::clone(&self.inner);
        let sql = sql.to_string();
        tokio::task::spawn_blocking(move || {
            let (result, ticket) = {
                let mut guard = inner.blocking_write();
                guard.execute_buffered(&sql)?
            }; // ← write lock released here
            if let Some(t) = ticket {
                t.wait()?; // group-commit: shared fsync
            }
            Ok(result)
        })
        .await
        .flatten_blocking()
    }

    /// v7.21 — run a multi-statement script with PG simple-query
    /// semantics (one implicit transaction; see
    /// `spg_embedded::Database::execute_script`). The write lock is
    /// held across the WHOLE script: the engine's transaction slot
    /// is shared, so releasing the lock mid-script would let another
    /// writer's statements join the script's implicit transaction.
    ///
    /// # Errors
    /// Propagates the first failing statement's `EngineError` after
    /// the implicit rollback.
    pub async fn execute_script(&self, sql: &str) -> Result<Vec<QueryResult>, EngineError> {
        let inner = Arc::clone(&self.inner);
        let sql = sql.to_string();
        tokio::task::spawn_blocking(move || {
            let mut guard = inner.blocking_write();
            guard.execute_script(&sql)
        })
        .await
        .flatten_blocking()
    }

    /// Run a SELECT and return rows as `Vec<Vec<Value>>`. Same
    /// dispatch shape as `execute` — lock + spawn_blocking.
    ///
    /// # Errors
    /// Propagates `EngineError` from the engine.
    pub async fn query(&self, sql: &str) -> Result<Vec<Vec<Value>>, EngineError> {
        let inner = Arc::clone(&self.inner);
        let sql = sql.to_string();
        tokio::task::spawn_blocking(move || {
            let mut guard = inner.blocking_write();
            guard.query(&sql)
        })
        .await
        .flatten_blocking()
    }

    /// v7.16.0 — parse + plan a SQL string once. Returns an
    /// [`AsyncStatement`] handle that subsequent
    /// `execute_prepared` / `query_prepared` calls can re-bind
    /// without re-parsing. Cheap to `Clone` — the underlying AST
    /// sits behind an `Arc`, so the same plan can drive many
    /// concurrent bind calls.
    ///
    /// # Errors
    /// Propagates `EngineError` from the underlying
    /// `Database::prepare`.
    pub async fn prepare(&self, sql: &str) -> Result<AsyncStatement, EngineError> {
        let inner = Arc::clone(&self.inner);
        let sql = sql.to_string();
        tokio::task::spawn_blocking(move || {
            let mut guard = inner.blocking_write();
            guard.prepare(&sql).map(|stmt| AsyncStatement {
                inner: Arc::new(stmt),
            })
        })
        .await
        .flatten_blocking()
    }

    /// v7.17.0 Phase 3.P0-66 — async wrapper for
    /// [`Database::describe`]. Returns `(parameter_oids,
    /// output_columns)` for a prepared SQL string without
    /// executing it. Drives the spg-sqlx adapter's
    /// `Executor::describe` so `sqlx::query!()` compile-time
    /// validation can resolve column types.
    ///
    /// # Errors
    /// Propagates `EngineError` from the prepare path
    /// (typically `ParseError`).
    pub async fn describe(
        &self,
        sql: &str,
    ) -> Result<(Vec<u32>, Vec<spg_embedded::ColumnSchema>), EngineError> {
        let inner = Arc::clone(&self.inner);
        let sql = sql.to_string();
        tokio::task::spawn_blocking(move || {
            let mut guard = inner.blocking_write();
            guard.describe(&sql)
        })
        .await
        .flatten_blocking()
    }

    /// v7.16.0 — execute a prepared statement with bound params.
    /// `params` is taken by value because the spawn_blocking
    /// closure needs a `'static` capture; the cost is one
    /// `Vec::clone`-equivalent ownership transfer, dwarfed by
    /// the engine's per-bind work.
    ///
    /// # Errors
    /// Propagates engine errors; arity mismatch surfaces as
    /// "parameter \$N referenced but only M bound by client".
    pub async fn execute_prepared(
        &self,
        stmt: &AsyncStatement,
        params: Vec<Value>,
    ) -> Result<QueryResult, EngineError> {
        let inner = Arc::clone(&self.inner);
        let stmt_inner = Arc::clone(&stmt.inner);
        tokio::task::spawn_blocking(move || {
            // v7.20 P2 — group-commit (see `execute`): mutation
            // under the lock, fsync wait after release.
            let (result, ticket) = {
                let mut guard = inner.blocking_write();
                guard.execute_prepared_buffered(&stmt_inner, &params)?
            };
            if let Some(t) = ticket {
                t.wait()?;
            }
            Ok(result)
        })
        .await
        .flatten_blocking()
    }

    /// v7.16.0 — run a prepared SELECT with bound params and
    /// return rows as `Vec<Vec<Value>>`. Errors when the prepared
    /// statement isn't a SELECT.
    ///
    /// # Errors
    /// Propagates `EngineError` from the underlying
    /// `Database::query_prepared`.
    pub async fn query_prepared(
        &self,
        stmt: &AsyncStatement,
        params: Vec<Value>,
    ) -> Result<Vec<Vec<Value>>, EngineError> {
        let inner = Arc::clone(&self.inner);
        let stmt_inner = Arc::clone(&stmt.inner);
        tokio::task::spawn_blocking(move || {
            let mut guard = inner.blocking_write();
            guard.query_prepared(&stmt_inner, &params)
        })
        .await
        .flatten_blocking()
    }

    /// v7.16.0 — column-aware variant of `query`. Returns the
    /// SELECT's column schema vec alongside the rows so adapters
    /// (the spg-sqlx fetch path most notably) can drive name +
    /// type-based column lookups.
    ///
    /// # Errors
    /// Same shape as `query` — errors when the SQL isn't a SELECT
    /// or the engine returns one.
    pub async fn query_with_columns(
        &self,
        sql: &str,
    ) -> Result<(Vec<spg_embedded::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
        let inner = Arc::clone(&self.inner);
        let sql = sql.to_string();
        tokio::task::spawn_blocking(move || {
            let mut guard = inner.blocking_write();
            guard.query_with_columns(&sql)
        })
        .await
        .flatten_blocking()
    }

    /// v7.16.0 — column-aware variant of `query_prepared`. Same
    /// shape as `query_with_columns` but driven from a prepared
    /// AsyncStatement + bound params.
    ///
    /// # Errors
    /// Propagates `EngineError`; errors when the prepared
    /// statement isn't a SELECT.
    pub async fn query_prepared_with_columns(
        &self,
        stmt: &AsyncStatement,
        params: Vec<Value>,
    ) -> Result<(Vec<spg_embedded::ColumnSchema>, Vec<Vec<Value>>), EngineError> {
        let inner = Arc::clone(&self.inner);
        let stmt_inner = Arc::clone(&stmt.inner);
        tokio::task::spawn_blocking(move || {
            let mut guard = inner.blocking_write();
            guard.query_prepared_with_columns(&stmt_inner, &params)
        })
        .await
        .flatten_blocking()
    }

    /// Run a checkpoint (flush WAL into the catalog snapshot +
    /// truncate the WAL back to zero). Blocking work — dispatched
    /// the same way as `execute`.
    ///
    /// # Errors
    /// Propagates `EngineError` from the engine / IO layer.
    pub async fn checkpoint(&self) -> Result<(), EngineError> {
        let inner = Arc::clone(&self.inner);
        tokio::task::spawn_blocking(move || {
            let mut guard = inner.blocking_write();
            guard.checkpoint()
        })
        .await
        .flatten_blocking()
    }

    /// v7.20 P3 — inline snapshot clone for the read fan-out hot
    /// path. Takes the async read lock (not `blocking_read` +
    /// `spawn_blocking` — the clone is an Arc-bump of the catalog
    /// trie roots, ~0 µs per profile_breakdown, far below tokio's
    /// inline-work threshold). spg-sqlx's per-statement
    /// read-committed refresh runs through here; pairing it with
    /// `Database::{prepare,execute_prepared}_on_snapshot` keeps
    /// the whole readonly statement on the async executor with
    /// zero thread hops.
    pub async fn clone_snapshot_inline(&self) -> CatalogSnapshot {
        let guard = self.inner.read().await;
        guard.engine().clone_snapshot()
    }

    /// v7.11.2 — fan-out reader. Clones the engine's committed
    /// catalog under the writer lock, releases the lock, and
    /// hands back an `AsyncReadHandle` that runs SELECTs against
    /// the snapshot **without ever re-acquiring the writer
    /// lock**. Multiple read handles can run concurrently — they
    /// share nothing mutable. mailrs's IMAP fetch pattern lands
    /// here.
    ///
    /// Contract: the snapshot is frozen at the moment this call
    /// returns. Subsequent writes are NOT visible. Call
    /// `AsyncReadHandle::refresh().await` to re-snapshot when
    /// you need fresher data.
    pub async fn read_handle(&self) -> AsyncReadHandle {
        let inner = Arc::clone(&self.inner);
        let snapshot = tokio::task::spawn_blocking(move || {
            let guard = inner.blocking_read();
            guard.engine().clone_snapshot()
        })
        .await
        .unwrap_blocking();
        AsyncReadHandle {
            db: Arc::clone(&self.inner),
            snapshot,
        }
    }
}

/// v7.11.2 — read-only handle backed by a frozen
/// `CatalogSnapshot`. Multiple handles can run concurrently; they
/// don't acquire the writer lock at query time. Refresh-on-demand
/// — the contract is that the handle reflects committed state at
/// the moment of construction or the last `refresh()`.
///
/// v7.18 — holds a reference to the underlying `AsyncDatabase`
/// (via the shared `Arc<RwLock<Database>>`) only so `refresh()`
/// can briefly take the read lock to clone a fresh snapshot.
/// Read paths never touch the Database directly. Snapshot
/// cloning is a trie-root `Arc` copy, so a busy writer barely
/// affects refresh latency.
#[derive(Debug)]
pub struct AsyncReadHandle {
    db: Arc<RwLock<Database>>,
    snapshot: CatalogSnapshot,
}

impl AsyncReadHandle {
    /// Run a read-only SQL statement against the frozen snapshot.
    /// DDL / DML reject with `EngineError::WriteRequired`.
    ///
    /// # Errors
    /// Propagates `EngineError` from the engine's read path.
    pub async fn query(&self, sql: &str) -> Result<QueryResult, EngineError> {
        let snapshot = self.snapshot.clone();
        let sql = sql.to_string();
        tokio::task::spawn_blocking(move || {
            spg_engine::Engine::execute_readonly_on_snapshot(&snapshot, &sql)
        })
        .await
        .flatten_blocking()
    }

    /// v7.18 — parse + plan a SQL string against this handle's
    /// frozen snapshot. Mirror of [`AsyncDatabase::prepare`] for
    /// the readonly fan-out path: clock rewrite + JOIN reorder +
    /// position resolve happen against the snapshot's catalog +
    /// statistics, no writer lock acquired. Multiple read handles
    /// can prepare concurrently; the returned [`AsyncStatement`]
    /// is `Clone + Send`.
    ///
    /// # Errors
    /// Propagates [`EngineError`] from the parser
    /// (`EngineError::Parse`).
    pub async fn prepare(&self, sql: &str) -> Result<AsyncStatement, EngineError> {
        let snapshot = self.snapshot.clone();
        let sql = sql.to_string();
        tokio::task::spawn_blocking(move || {
            Database::prepare_on_snapshot(&snapshot, &sql).map(|stmt| AsyncStatement {
                inner: Arc::new(stmt),
            })
        })
        .await
        .flatten_blocking()
    }

    /// v7.18 — execute a prepared statement against this handle's
    /// frozen snapshot with bound params. Mirror of
    /// [`AsyncDatabase::execute_prepared`] on the readonly path —
    /// writes / DDL hit `EngineError::WriteRequired` so the caller
    /// can route them to the writer mutex. No writer lock
    /// acquired; multiple handles run truly concurrently.
    ///
    /// # Errors
    /// Propagates engine errors (placeholder arity mismatch,
    /// schema drift surfacing as catalog lookups, etc.).
    pub async fn execute_prepared(
        &self,
        stmt: &AsyncStatement,
        params: Vec<Value>,
    ) -> Result<QueryResult, EngineError> {
        let snapshot = self.snapshot.clone();
        let stmt_inner = Arc::clone(&stmt.inner);
        tokio::task::spawn_blocking(move || {
            Database::execute_prepared_on_snapshot(&snapshot, &stmt_inner, &params)
        })
        .await
        .flatten_blocking()
    }

    /// v7.18 — describe a prepared SQL string against this
    /// handle's frozen snapshot. Returns `(parameter_oids,
    /// output_columns)`. Drives the spg-sqlx adapter's readonly
    /// `Executor::describe` path so `sqlx::query!()` compile-time
    /// validation can resolve column types without touching the
    /// writer engine.
    ///
    /// # Errors
    /// Propagates [`EngineError`] from the parser
    /// (`EngineError::Parse`).
    pub async fn describe(
        &self,
        sql: &str,
    ) -> Result<(Vec<u32>, Vec<spg_embedded::ColumnSchema>), EngineError> {
        let snapshot = self.snapshot.clone();
        let sql = sql.to_string();
        tokio::task::spawn_blocking(move || Database::describe_on_snapshot(&snapshot, &sql))
            .await
            .flatten_blocking()
    }

    /// Re-snapshot the underlying engine. Briefly takes the
    /// writer lock; subsequent `query()` calls see the new state.
    /// Idempotent on a quiet engine (clones the same trie roots).
    pub async fn refresh(&mut self) {
        let inner = Arc::clone(&self.db);
        let new_snapshot = tokio::task::spawn_blocking(move || {
            let guard = inner.blocking_read();
            guard.engine().clone_snapshot()
        })
        .await
        .unwrap_blocking();
        self.snapshot = new_snapshot;
    }
}