Skip to main content

spg_sqlx/
connection.rs

1//! v7.16.0 — `sqlx::Connection` for SPG.
2//!
3//! Wraps [`spg_embedded_tokio::AsyncDatabase`]. The engine is
4//! single-writer at the lock level, but read-only statements
5//! short-circuit through [`spg_embedded_tokio::AsyncReadHandle`]
6//! — each `SpgConnection` lazily attaches a read handle on first
7//! readonly statement and refreshes it per-statement so PG
8//! read-committed semantics hold (every statement sees the
9//! latest committed state). Writes / DDL / TX-control take the
10//! writer lock as before.
11//!
12//! Result: `SpgPoolOptions::new().max_connections(20)` behaves
13//! like its `PgPool` analogue — concurrent SELECTs run truly in
14//! parallel, transactions serialise. Stock sqlx code drops in
15//! unchanged.
16
17use std::sync::Arc;
18
19use futures_core::future::BoxFuture;
20use futures_core::stream::BoxStream;
21use sqlx_core::HashMap;
22use sqlx_core::connection::Connection;
23use sqlx_core::error::Error;
24use sqlx_core::executor::Executor;
25use sqlx_core::transaction::Transaction;
26
27use spg_embedded::QueryResult as EngineQueryResult;
28use spg_embedded_tokio::AsyncDatabase;
29
30use crate::column::SpgColumn;
31use crate::database::Spg;
32use crate::error::engine_to_sqlx;
33use crate::options::SpgConnectOptions;
34use crate::query_result::SpgQueryResult;
35use crate::row::SpgRow;
36use crate::type_info::SpgTypeInfo;
37
38/// v7.20 P3 — cached per-connection statement: the parse +
39/// readonly classification + prepare-time transforms run once
40/// per distinct SQL string. Repeated `query!()` call sites (the
41/// sqlx norm — fixed SQL, varying binds) hit the cache and pay
42/// zero parses.
43#[derive(Debug, Clone)]
44pub(crate) struct CachedStmt {
45    pub(crate) readonly: bool,
46    pub(crate) stmt: std::sync::Arc<spg_embedded::Statement>,
47}
48
49/// Cap on the per-connection statement cache. Sqlx apps use a
50/// finite set of static SQL strings; 256 is far above any real
51/// workload. On overflow the cache clears wholesale (simple +
52/// predictable; an LRU would buy nothing at this size).
53const STMT_CACHE_CAP: usize = 256;
54
55/// One sqlx connection backed by an in-process SPG.
56///
57/// - `inner: AsyncDatabase` — writer path. Used for DDL / DML /
58///   transaction control and statements inside a transaction.
59/// - readonly statements run INLINE on the async executor
60///   (v7.20 P3): per-statement snapshot via
61///   `clone_snapshot_inline` (~0 µs Arc bump) + static
62///   `Database::*_on_snapshot` calls (~2 µs CPU). No
63///   spawn_blocking on the read path — profile_breakdown
64///   measured the 3× thread-hop round-trips at 15-48 µs against
65///   2 µs of actual work.
66#[derive(Debug)]
67pub struct SpgConnection {
68    pub(crate) inner: AsyncDatabase,
69    /// v7.20 P3 — per-connection statement cache.
70    pub(crate) stmt_cache: HashMap<String, CachedStmt>,
71    pub(crate) tx_depth: usize,
72    pub(crate) pending_rollback: bool,
73}
74
75impl Clone for SpgConnection {
76    fn clone(&self) -> Self {
77        // Fresh cache per clone — Statements are cheap to
78        // rebuild and the cache is an optimisation, not state.
79        Self {
80            inner: self.inner.clone(),
81            stmt_cache: HashMap::new(),
82            tx_depth: self.tx_depth,
83            pending_rollback: self.pending_rollback,
84        }
85    }
86}
87
88impl SpgConnection {
89    /// Build a connection from a ready `AsyncDatabase`. Called
90    /// internally by [`SpgConnectOptions::connect`] and by
91    /// [`crate::SpgPool::connect_in_memory`].
92    pub fn new(inner: AsyncDatabase) -> Self {
93        Self {
94            inner,
95            stmt_cache: HashMap::new(),
96            tx_depth: 0,
97            pending_rollback: false,
98        }
99    }
100
101    /// Borrow the underlying `AsyncDatabase`. Lets advanced
102    /// callers reach for the spg-embedded API directly.
103    #[must_use]
104    pub const fn engine(&self) -> &AsyncDatabase {
105        &self.inner
106    }
107
108    /// v7.20 P3 — look up (or build + cache) the parsed
109    /// statement and readonly classification for `sql`. One
110    /// parse per distinct SQL string per connection.
111    pub(crate) async fn cached_stmt(
112        &mut self,
113        sql: &str,
114    ) -> Result<CachedStmt, spg_embedded::EngineError> {
115        if let Some(c) = self.stmt_cache.get(sql) {
116            return Ok(c.clone());
117        }
118        let readonly = spg_embedded::Engine::is_readonly_sql(sql);
119        // Build the prepared Statement against a current snapshot
120        // (prepare-time transforms read the catalog for JOIN
121        // reorder). The AST stays valid across snapshots — schema
122        // drift surfaces at execute time exactly like PG's
123        // "cached plan must not change result type".
124        let snap = self.inner.clone_snapshot_inline().await;
125        let stmt = spg_embedded::Database::prepare_on_snapshot(&snap, sql)?;
126        let cached = CachedStmt {
127            readonly,
128            stmt: std::sync::Arc::new(stmt),
129        };
130        if self.stmt_cache.len() >= STMT_CACHE_CAP {
131            self.stmt_cache.clear();
132        }
133        self.stmt_cache.insert(sql.to_string(), cached.clone());
134        Ok(cached)
135    }
136}
137
138impl Connection for SpgConnection {
139    type Database = Spg;
140    type Options = SpgConnectOptions;
141
142    fn close(self) -> BoxFuture<'static, Result<(), Error>> {
143        // In-process — dropping the last `AsyncDatabase` clone
144        // releases the engine. Nothing to send.
145        Box::pin(async move { Ok(()) })
146    }
147
148    fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
149        Box::pin(async move { Ok(()) })
150    }
151
152    fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
153        // The engine doesn't time-out; a quick no-op query
154        // exercises the lock path.
155        Box::pin(async move {
156            self.inner
157                .execute("SELECT 1")
158                .await
159                .map_err(engine_to_sqlx)?;
160            Ok(())
161        })
162    }
163
164    fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
165    where
166        Self: Sized,
167    {
168        Transaction::begin(self, None)
169    }
170
171    fn shrink_buffers(&mut self) {
172        // No-op — engine doesn't expose per-connection buffers.
173    }
174
175    fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
176        Box::pin(async move { Ok(()) })
177    }
178
179    fn should_flush(&self) -> bool {
180        false
181    }
182}
183
184// v7.16.0 — Executor on &mut SpgConnection. fetch_many returns
185// `Either<QueryResult, Row>` per sqlx-core's stream contract.
186//
187// v7.18 — fetch_many / fetch_optional take `&mut SpgConnection`
188// across the future (allowed by sqlx's `'c: 'e` bound) so the
189// run_one routing can lazy-init / refresh the per-connection
190// read handle without cloning state. Readonly statements
191// outside a transaction fan out through the snapshot path;
192// writer statements + everything inside BEGIN keep using the
193// writer mutex.
194impl<'c> Executor<'c> for &'c mut SpgConnection {
195    type Database = Spg;
196
197    fn fetch_many<'e, 'q: 'e, E>(
198        self,
199        mut query: E,
200    ) -> BoxStream<
201        'e,
202        Result<
203            either::Either<
204                <Self::Database as sqlx_core::database::Database>::QueryResult,
205                crate::SpgRow,
206            >,
207            Error,
208        >,
209    >
210    where
211        'c: 'e,
212        E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
213    {
214        use futures_util::stream::{self, StreamExt};
215        let sql = query.sql().to_string();
216        let arguments = match query.take_arguments() {
217            Ok(args) => args,
218            Err(e) => {
219                return Box::pin(stream::iter(std::iter::once(Err(Error::Encode(e)))));
220            }
221        };
222        let outcome_fut = async move {
223            match arguments {
224                // Bind parameters imply exactly one statement (PG
225                // rejects multi-statement extended queries too).
226                Some(args) => run_one(self, &sql, Some(args)).await.map(|o| vec![o]),
227                // No parameters = sqlx's simple-query / `raw_sql`
228                // path. PG executes every `;`-separated statement of
229                // the message server-side inside ONE implicit
230                // transaction; `Database::execute_script` owns both
231                // the splitting and the transaction semantics
232                // (mailrs embed round-12 + v7.21 polish).
233                None => Ok(self
234                    .inner
235                    .execute_script(&sql)
236                    .await
237                    .map_err(engine_to_sqlx)?
238                    .into_iter()
239                    .map(outcome_from)
240                    .collect()),
241            }
242        };
243        Box::pin(stream::once(outcome_fut).flat_map(|outcome| {
244            let items: Vec<Result<either::Either<SpgQueryResult, SpgRow>, Error>> = match outcome {
245                Ok(outcomes) => outcomes
246                    .into_iter()
247                    .flat_map(|o| match o {
248                        Outcome::Affected(qr) => vec![Ok(either::Either::Left(qr))],
249                        Outcome::Rows(rows) => rows
250                            .into_iter()
251                            .map(|r| Ok(either::Either::Right(r)))
252                            .collect::<Vec<_>>(),
253                    })
254                    .collect(),
255                Err(e) => vec![Err(e)],
256            };
257            stream::iter(items)
258        }))
259    }
260
261    fn fetch_optional<'e, 'q: 'e, E>(
262        self,
263        mut query: E,
264    ) -> BoxFuture<'e, Result<Option<crate::SpgRow>, Error>>
265    where
266        'c: 'e,
267        E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
268    {
269        let sql = query.sql().to_string();
270        let arguments = query.take_arguments();
271        Box::pin(async move {
272            let args = arguments.map_err(Error::Encode)?;
273            match run_one(self, &sql, args).await? {
274                Outcome::Rows(mut rows) => Ok(rows.drain(..).next()),
275                Outcome::Affected(_) => Ok(None),
276            }
277        })
278    }
279
280    fn prepare_with<'e, 'q: 'e>(
281        self,
282        sql: &'q str,
283        _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
284    ) -> BoxFuture<
285        'e,
286        Result<<Self::Database as sqlx_core::database::Database>::Statement<'q>, Error>,
287    >
288    where
289        'c: 'e,
290    {
291        let inner = self.inner.clone();
292        let sql_str = sql.to_string();
293        Box::pin(async move {
294            let stmt = inner.prepare(&sql_str).await.map_err(engine_to_sqlx)?;
295            // The AsyncStatement wraps the embedded::Statement
296            // in Arc — pull it out for the sqlx-side handle.
297            // We expose the underlying handle via a tiny adapter
298            // method on AsyncStatement (added on the
299            // spg-embedded-tokio side).
300            let inner_stmt = spg_embedded_tokio::async_statement_inner(&stmt);
301            Ok(crate::SpgStatement {
302                sql: std::borrow::Cow::Owned(sql_str),
303                inner: Some(inner_stmt),
304                columns: std::sync::Arc::new(Vec::new()),
305                by_name: std::sync::Arc::new(sqlx_core::HashMap::new()),
306            })
307        })
308    }
309
310    fn describe<'e, 'q: 'e>(
311        self,
312        sql: &'q str,
313    ) -> BoxFuture<'e, Result<sqlx_core::describe::Describe<Self::Database>, Error>>
314    where
315        'c: 'e,
316    {
317        // v7.17.0 Phase 3.P0-66 — real Describe wired through to
318        // `Engine::describe_prepared`. Surfaces column metadata
319        // (name / type / nullable) and a parameter count for
320        // `sqlx::query!()` compile-time validation. Statement
321        // shapes the describe planner can't resolve (JOIN /
322        // subquery / unknown table) return an empty `columns`
323        // vec — sqlx tolerates this as "no row description
324        // available" and the macros fall back to offline mode
325        // for those shapes.
326        let inner = self.inner.clone();
327        let sql_str = sql.to_string();
328        Box::pin(async move {
329            let (params, cols) = inner.describe(&sql_str).await.map_err(engine_to_sqlx)?;
330            let nullable: Vec<Option<bool>> = cols.iter().map(|c| Some(c.nullable)).collect();
331            let columns: Vec<SpgColumn> = cols
332                .iter()
333                .enumerate()
334                .map(|(i, c)| {
335                    let ti = SpgTypeInfo::from_data_type(c.ty);
336                    SpgColumn::new(i, c.name.clone(), ti)
337                })
338                .collect();
339            let parameters = if params.is_empty() {
340                None
341            } else {
342                Some(either::Either::Right(params.len()))
343            };
344            Ok(sqlx_core::describe::Describe {
345                columns,
346                parameters,
347                nullable,
348            })
349        })
350    }
351}
352
353/// Outcome of a single dispatch — either rows-affected (DML / DDL)
354/// or a row stream (SELECT). The fetch helpers below convert this
355/// to sqlx's `Either<QueryResult, Row>` stream shape.
356enum Outcome {
357    /// DML / DDL statement returning a rows-affected counter.
358    Affected(SpgQueryResult),
359    /// SELECT result — every row already converted to an
360    /// [`SpgRow`].
361    Rows(Vec<SpgRow>),
362}
363
364async fn run_one(
365    conn: &mut SpgConnection,
366    sql: &str,
367    arguments: Option<crate::SpgArguments<'_>>,
368) -> Result<Outcome, Error> {
369    // v7.18 routing + v7.20 P3 inline read path. Inside a
370    // transaction the writer lock has to stay held end-to-end so
371    // the user's isolation contract holds; we never route to the
372    // snapshot path there. Outside a transaction the cached
373    // statement's readonly flag decides. A parse error falls
374    // through to the writer path so the user sees the same parse
375    // error they'd get from a simple-query SELECT.
376    let in_tx = conn.tx_depth > 0;
377    let cached = if in_tx {
378        None
379    } else {
380        // Parse + classify once per distinct SQL per connection.
381        conn.cached_stmt(sql).await.ok()
382    };
383
384    let result: EngineQueryResult = if let Some(c) = cached.filter(|c| c.readonly) {
385        // v7.20 P3 — readonly statements run INLINE on the async
386        // executor: snapshot clone is an Arc bump (~0 µs), the
387        // prepared execute is ~2 µs CPU. No spawn_blocking, no
388        // thread hop. Per-statement snapshot = PG read-committed.
389        //
390        // v7.28 (round-22) — INLINE WITH A BUDGET. Inline is perfect
391        // at 2 µs and fatal at 60 s: four slow inbox queries
392        // saturated mailrs's entire tokio worker pool, starving
393        // every other endpoint including /logout. The inline run is
394        // bounded (SPG_SQLX_INLINE_BUDGET_MS, default 25 ms); on
395        // budget expiry the SAME snapshot + statement re-runs to
396        // completion on the blocking pool, off the async workers.
397        let snap = conn.inner.clone_snapshot_inline().await;
398        let params = arguments.map(crate::SpgArguments::into_engine_values);
399        let budget_ms: u64 = std::env::var("SPG_SQLX_INLINE_BUDGET_MS")
400            .ok()
401            .and_then(|v| v.parse().ok())
402            .unwrap_or(25);
403        let started = std::time::Instant::now();
404        let inline = spg_embedded::Database::execute_prepared_on_snapshot_with_budget(
405            &snap,
406            &c.stmt,
407            params.as_deref().unwrap_or(&[]),
408            budget_ms.saturating_mul(1_000),
409        );
410        match inline {
411            Ok(r) => r,
412            Err(spg_embedded::EngineError::Cancelled) => {
413                let stmt = c.stmt.clone();
414                let params_owned: Vec<spg_embedded::Value> =
415                    params.as_deref().unwrap_or(&[]).to_vec();
416                let result = tokio::task::spawn_blocking(move || {
417                    spg_embedded::Database::execute_prepared_on_snapshot(
418                        &snap,
419                        &stmt,
420                        &params_owned,
421                    )
422                })
423                .await
424                .map_err(|e| Error::Protocol(format!("blocking-pool join: {e}")))?
425                .map_err(engine_to_sqlx)?;
426                // v7.35.0 (mailrs ask #2, 3 reports running) — embed
427                // the total elapsed time in the budget-exceeded log
428                // line so the consumer can distinguish e.g. an 82 ms
429                // NOT-IN form from a 200 ms correlated scan. mailrs
430                // greps for "exceeded … inline budget" — the prefix
431                // is unchanged, the `elapsed_ms=N` suffix is purely
432                // additive.
433                let elapsed_ms = started.elapsed().as_millis();
434                eprintln!(
435                    "spg-sqlx: readonly query exceeded the {budget_ms} ms inline budget; \
436                     continuing on the blocking pool: elapsed_ms={elapsed_ms} sql={}",
437                    &sql[..sql.len().min(120)]
438                );
439                result
440            }
441            Err(e) => return Err(engine_to_sqlx(e)),
442        }
443    } else {
444        let db = &conn.inner;
445        if let Some(args) = arguments {
446            let stmt = db.prepare(sql).await.map_err(engine_to_sqlx)?;
447            db.execute_prepared(&stmt, args.into_engine_values())
448                .await
449                .map_err(engine_to_sqlx)?
450        } else {
451            db.execute(sql).await.map_err(engine_to_sqlx)?
452        }
453    };
454    Ok(outcome_from(result))
455}
456
457fn outcome_from(result: EngineQueryResult) -> Outcome {
458    match result {
459        EngineQueryResult::Rows { columns, rows } => {
460            let row_values: Vec<Vec<spg_embedded::Value>> =
461                rows.into_iter().map(|r| r.values).collect();
462            Outcome::Rows(build_rows(&columns, row_values))
463        }
464        EngineQueryResult::CommandOk { affected, .. } => {
465            Outcome::Affected(SpgQueryResult::new(u64::try_from(affected).unwrap_or(0)))
466        }
467        _ => Outcome::Affected(SpgQueryResult::default()),
468    }
469}
470
471#[allow(dead_code)]
472fn affected_from(qr: &EngineQueryResult) -> u64 {
473    match qr {
474        EngineQueryResult::CommandOk { affected, .. } => u64::try_from(*affected).unwrap_or(0),
475        EngineQueryResult::Rows { rows, .. } => u64::try_from(rows.len()).unwrap_or(0),
476        _ => 0,
477    }
478}
479
480fn build_rows(
481    cols: &[spg_embedded::ColumnSchema],
482    rows: Vec<Vec<spg_embedded::Value>>,
483) -> Vec<SpgRow> {
484    let columns: Arc<Vec<SpgColumn>> = Arc::new(
485        cols.iter()
486            .enumerate()
487            .map(|(i, c)| SpgColumn::new(i, c.name.clone(), SpgTypeInfo::from_data_type(c.ty)))
488            .collect(),
489    );
490    let mut by_name: HashMap<String, usize> = HashMap::new();
491    for (i, c) in cols.iter().enumerate() {
492        by_name.insert(c.name.clone(), i);
493    }
494    let by_name = Arc::new(by_name);
495    rows.into_iter()
496        .map(|values| SpgRow::new(Arc::clone(&columns), Arc::clone(&by_name), values))
497        .collect()
498}