spg_sqlx/connection.rs
1//! v7.16.0 — `sqlx::Connection` for SPG.
2//!
3//! Wraps [`spg_embedded_tokio::AsyncDatabase`]. The engine is
4//! single-writer at the lock level, but read-only statements
5//! short-circuit through [`spg_embedded_tokio::AsyncReadHandle`]
6//! — each `SpgConnection` lazily attaches a read handle on first
7//! readonly statement and refreshes it per-statement so PG
8//! read-committed semantics hold (every statement sees the
9//! latest committed state). Writes / DDL / TX-control take the
10//! writer lock as before.
11//!
12//! Result: `SpgPoolOptions::new().max_connections(20)` behaves
13//! like its `PgPool` analogue — concurrent SELECTs run truly in
14//! parallel, transactions serialise. Stock sqlx code drops in
15//! unchanged.
16
17use std::sync::Arc;
18
19use futures_core::future::BoxFuture;
20use futures_core::stream::BoxStream;
21use sqlx_core::HashMap;
22use sqlx_core::connection::Connection;
23use sqlx_core::error::Error;
24use sqlx_core::executor::Executor;
25use sqlx_core::transaction::Transaction;
26
27use spg_embedded::QueryResult as EngineQueryResult;
28use spg_embedded_tokio::AsyncDatabase;
29
30use crate::column::SpgColumn;
31use crate::database::Spg;
32use crate::error::engine_to_sqlx;
33use crate::options::SpgConnectOptions;
34use crate::query_result::SpgQueryResult;
35use crate::row::SpgRow;
36use crate::type_info::SpgTypeInfo;
37
38/// v7.20 P3 — cached per-connection statement: the parse +
39/// readonly classification + prepare-time transforms run once
40/// per distinct SQL string. Repeated `query!()` call sites (the
41/// sqlx norm — fixed SQL, varying binds) hit the cache and pay
42/// zero parses.
43#[derive(Debug, Clone)]
44pub(crate) struct CachedStmt {
45 pub(crate) readonly: bool,
46 pub(crate) stmt: std::sync::Arc<spg_embedded::Statement>,
47}
48
49/// Cap on the per-connection statement cache. Sqlx apps use a
50/// finite set of static SQL strings; 256 is far above any real
51/// workload. On overflow the cache clears wholesale (simple +
52/// predictable; an LRU would buy nothing at this size).
53const STMT_CACHE_CAP: usize = 256;
54
55/// One sqlx connection backed by an in-process SPG.
56///
57/// - `inner: AsyncDatabase` — writer path. Used for DDL / DML /
58/// transaction control and statements inside a transaction.
59/// - readonly statements run INLINE on the async executor
60/// (v7.20 P3): per-statement snapshot via
61/// `clone_snapshot_inline` (~0 µs Arc bump) + static
62/// `Database::*_on_snapshot` calls (~2 µs CPU). No
63/// spawn_blocking on the read path — profile_breakdown
64/// measured the 3× thread-hop round-trips at 15-48 µs against
65/// 2 µs of actual work.
66#[derive(Debug)]
67pub struct SpgConnection {
68 pub(crate) inner: AsyncDatabase,
69 /// v7.20 P3 — per-connection statement cache.
70 pub(crate) stmt_cache: HashMap<String, CachedStmt>,
71 pub(crate) tx_depth: usize,
72 pub(crate) pending_rollback: bool,
73}
74
75impl Clone for SpgConnection {
76 fn clone(&self) -> Self {
77 // Fresh cache per clone — Statements are cheap to
78 // rebuild and the cache is an optimisation, not state.
79 Self {
80 inner: self.inner.clone(),
81 stmt_cache: HashMap::new(),
82 tx_depth: self.tx_depth,
83 pending_rollback: self.pending_rollback,
84 }
85 }
86}
87
88impl SpgConnection {
89 /// Build a connection from a ready `AsyncDatabase`. Called
90 /// internally by [`SpgConnectOptions::connect`] and by
91 /// [`crate::SpgPool::connect_in_memory`].
92 pub fn new(inner: AsyncDatabase) -> Self {
93 Self {
94 inner,
95 stmt_cache: HashMap::new(),
96 tx_depth: 0,
97 pending_rollback: false,
98 }
99 }
100
101 /// Borrow the underlying `AsyncDatabase`. Lets advanced
102 /// callers reach for the spg-embedded API directly.
103 #[must_use]
104 pub const fn engine(&self) -> &AsyncDatabase {
105 &self.inner
106 }
107
108 /// v7.20 P3 — look up (or build + cache) the parsed
109 /// statement and readonly classification for `sql`. One
110 /// parse per distinct SQL string per connection.
111 pub(crate) async fn cached_stmt(
112 &mut self,
113 sql: &str,
114 ) -> Result<CachedStmt, spg_embedded::EngineError> {
115 if let Some(c) = self.stmt_cache.get(sql) {
116 return Ok(c.clone());
117 }
118 let readonly = spg_embedded::Engine::is_readonly_sql(sql);
119 // Build the prepared Statement against a current snapshot
120 // (prepare-time transforms read the catalog for JOIN
121 // reorder). The AST stays valid across snapshots — schema
122 // drift surfaces at execute time exactly like PG's
123 // "cached plan must not change result type".
124 let snap = self.inner.clone_snapshot_inline().await;
125 let stmt = spg_embedded::Database::prepare_on_snapshot(&snap, sql)?;
126 let cached = CachedStmt {
127 readonly,
128 stmt: std::sync::Arc::new(stmt),
129 };
130 if self.stmt_cache.len() >= STMT_CACHE_CAP {
131 self.stmt_cache.clear();
132 }
133 self.stmt_cache.insert(sql.to_string(), cached.clone());
134 Ok(cached)
135 }
136}
137
138impl Connection for SpgConnection {
139 type Database = Spg;
140 type Options = SpgConnectOptions;
141
142 fn close(self) -> BoxFuture<'static, Result<(), Error>> {
143 // In-process — dropping the last `AsyncDatabase` clone
144 // releases the engine. Nothing to send.
145 Box::pin(async move { Ok(()) })
146 }
147
148 fn close_hard(self) -> BoxFuture<'static, Result<(), Error>> {
149 Box::pin(async move { Ok(()) })
150 }
151
152 fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
153 // The engine doesn't time-out; a quick no-op query
154 // exercises the lock path.
155 Box::pin(async move {
156 self.inner
157 .execute("SELECT 1")
158 .await
159 .map_err(engine_to_sqlx)?;
160 Ok(())
161 })
162 }
163
164 fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
165 where
166 Self: Sized,
167 {
168 Transaction::begin(self, None)
169 }
170
171 fn shrink_buffers(&mut self) {
172 // No-op — engine doesn't expose per-connection buffers.
173 }
174
175 fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
176 Box::pin(async move { Ok(()) })
177 }
178
179 fn should_flush(&self) -> bool {
180 false
181 }
182}
183
184// v7.16.0 — Executor on &mut SpgConnection. fetch_many returns
185// `Either<QueryResult, Row>` per sqlx-core's stream contract.
186//
187// v7.18 — fetch_many / fetch_optional take `&mut SpgConnection`
188// across the future (allowed by sqlx's `'c: 'e` bound) so the
189// run_one routing can lazy-init / refresh the per-connection
190// read handle without cloning state. Readonly statements
191// outside a transaction fan out through the snapshot path;
192// writer statements + everything inside BEGIN keep using the
193// writer mutex.
194impl<'c> Executor<'c> for &'c mut SpgConnection {
195 type Database = Spg;
196
197 fn fetch_many<'e, 'q: 'e, E>(
198 self,
199 mut query: E,
200 ) -> BoxStream<
201 'e,
202 Result<
203 either::Either<
204 <Self::Database as sqlx_core::database::Database>::QueryResult,
205 crate::SpgRow,
206 >,
207 Error,
208 >,
209 >
210 where
211 'c: 'e,
212 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
213 {
214 use futures_util::stream::{self, StreamExt};
215 let sql = query.sql().to_string();
216 let arguments = match query.take_arguments() {
217 Ok(args) => args,
218 Err(e) => {
219 return Box::pin(stream::iter(std::iter::once(Err(Error::Encode(e)))));
220 }
221 };
222 let outcome_fut = async move {
223 match arguments {
224 // Bind parameters imply exactly one statement (PG
225 // rejects multi-statement extended queries too).
226 Some(args) => run_one(self, &sql, Some(args)).await.map(|o| vec![o]),
227 // No parameters = sqlx's simple-query / `raw_sql`
228 // path. PG executes every `;`-separated statement of
229 // the message server-side inside ONE implicit
230 // transaction; `Database::execute_script` owns both
231 // the splitting and the transaction semantics
232 // (mailrs embed round-12 + v7.21 polish).
233 None => Ok(self
234 .inner
235 .execute_script(&sql)
236 .await
237 .map_err(engine_to_sqlx)?
238 .into_iter()
239 .map(outcome_from)
240 .collect()),
241 }
242 };
243 Box::pin(stream::once(outcome_fut).flat_map(|outcome| {
244 let items: Vec<Result<either::Either<SpgQueryResult, SpgRow>, Error>> = match outcome {
245 Ok(outcomes) => outcomes
246 .into_iter()
247 .flat_map(|o| match o {
248 Outcome::Affected(qr) => vec![Ok(either::Either::Left(qr))],
249 Outcome::Rows(rows) => rows
250 .into_iter()
251 .map(|r| Ok(either::Either::Right(r)))
252 .collect::<Vec<_>>(),
253 })
254 .collect(),
255 Err(e) => vec![Err(e)],
256 };
257 stream::iter(items)
258 }))
259 }
260
261 fn fetch_optional<'e, 'q: 'e, E>(
262 self,
263 mut query: E,
264 ) -> BoxFuture<'e, Result<Option<crate::SpgRow>, Error>>
265 where
266 'c: 'e,
267 E: 'q + sqlx_core::executor::Execute<'q, Self::Database>,
268 {
269 let sql = query.sql().to_string();
270 let arguments = query.take_arguments();
271 Box::pin(async move {
272 let args = arguments.map_err(Error::Encode)?;
273 match run_one(self, &sql, args).await? {
274 Outcome::Rows(mut rows) => Ok(rows.drain(..).next()),
275 Outcome::Affected(_) => Ok(None),
276 }
277 })
278 }
279
280 fn prepare_with<'e, 'q: 'e>(
281 self,
282 sql: &'q str,
283 _parameters: &'e [<Self::Database as sqlx_core::database::Database>::TypeInfo],
284 ) -> BoxFuture<
285 'e,
286 Result<<Self::Database as sqlx_core::database::Database>::Statement<'q>, Error>,
287 >
288 where
289 'c: 'e,
290 {
291 let inner = self.inner.clone();
292 let sql_str = sql.to_string();
293 Box::pin(async move {
294 let stmt = inner.prepare(&sql_str).await.map_err(engine_to_sqlx)?;
295 // The AsyncStatement wraps the embedded::Statement
296 // in Arc — pull it out for the sqlx-side handle.
297 // We expose the underlying handle via a tiny adapter
298 // method on AsyncStatement (added on the
299 // spg-embedded-tokio side).
300 let inner_stmt = spg_embedded_tokio::async_statement_inner(&stmt);
301 Ok(crate::SpgStatement {
302 sql: std::borrow::Cow::Owned(sql_str),
303 inner: Some(inner_stmt),
304 columns: std::sync::Arc::new(Vec::new()),
305 by_name: std::sync::Arc::new(sqlx_core::HashMap::new()),
306 })
307 })
308 }
309
310 fn describe<'e, 'q: 'e>(
311 self,
312 sql: &'q str,
313 ) -> BoxFuture<'e, Result<sqlx_core::describe::Describe<Self::Database>, Error>>
314 where
315 'c: 'e,
316 {
317 // v7.17.0 Phase 3.P0-66 — real Describe wired through to
318 // `Engine::describe_prepared`. Surfaces column metadata
319 // (name / type / nullable) and a parameter count for
320 // `sqlx::query!()` compile-time validation. Statement
321 // shapes the describe planner can't resolve (JOIN /
322 // subquery / unknown table) return an empty `columns`
323 // vec — sqlx tolerates this as "no row description
324 // available" and the macros fall back to offline mode
325 // for those shapes.
326 let inner = self.inner.clone();
327 let sql_str = sql.to_string();
328 Box::pin(async move {
329 let (params, cols) = inner.describe(&sql_str).await.map_err(engine_to_sqlx)?;
330 let nullable: Vec<Option<bool>> = cols.iter().map(|c| Some(c.nullable)).collect();
331 let columns: Vec<SpgColumn> = cols
332 .iter()
333 .enumerate()
334 .map(|(i, c)| {
335 let ti = SpgTypeInfo::from_data_type(c.ty);
336 SpgColumn::new(i, c.name.clone(), ti)
337 })
338 .collect();
339 let parameters = if params.is_empty() {
340 None
341 } else {
342 Some(either::Either::Right(params.len()))
343 };
344 Ok(sqlx_core::describe::Describe {
345 columns,
346 parameters,
347 nullable,
348 })
349 })
350 }
351}
352
353/// Outcome of a single dispatch — either rows-affected (DML / DDL)
354/// or a row stream (SELECT). The fetch helpers below convert this
355/// to sqlx's `Either<QueryResult, Row>` stream shape.
356enum Outcome {
357 /// DML / DDL statement returning a rows-affected counter.
358 Affected(SpgQueryResult),
359 /// SELECT result — every row already converted to an
360 /// [`SpgRow`].
361 Rows(Vec<SpgRow>),
362}
363
364async fn run_one(
365 conn: &mut SpgConnection,
366 sql: &str,
367 arguments: Option<crate::SpgArguments<'_>>,
368) -> Result<Outcome, Error> {
369 // v7.18 routing + v7.20 P3 inline read path. Inside a
370 // transaction the writer lock has to stay held end-to-end so
371 // the user's isolation contract holds; we never route to the
372 // snapshot path there. Outside a transaction the cached
373 // statement's readonly flag decides. A parse error falls
374 // through to the writer path so the user sees the same parse
375 // error they'd get from a simple-query SELECT.
376 let in_tx = conn.tx_depth > 0;
377 let cached = if in_tx {
378 None
379 } else {
380 // Parse + classify once per distinct SQL per connection.
381 conn.cached_stmt(sql).await.ok()
382 };
383
384 let result: EngineQueryResult = if let Some(c) = cached.filter(|c| c.readonly) {
385 // v7.20 P3 — readonly statements run INLINE on the async
386 // executor: snapshot clone is an Arc bump (~0 µs), the
387 // prepared execute is ~2 µs CPU. No spawn_blocking, no
388 // thread hop. Per-statement snapshot = PG read-committed.
389 //
390 // v7.28 (round-22) — INLINE WITH A BUDGET. Inline is perfect
391 // at 2 µs and fatal at 60 s: four slow inbox queries
392 // saturated mailrs's entire tokio worker pool, starving
393 // every other endpoint including /logout. The inline run is
394 // bounded (SPG_SQLX_INLINE_BUDGET_MS, default 25 ms); on
395 // budget expiry the SAME snapshot + statement re-runs to
396 // completion on the blocking pool, off the async workers.
397 let snap = conn.inner.clone_snapshot_inline().await;
398 let params = arguments.map(crate::SpgArguments::into_engine_values);
399 // v7.37.3 (mailrs prod /api/contacts 3.21× wall-clock gap) —
400 // default inline budget raised from 25 ms → 1000 ms. The
401 // v7.28 25 ms default was set to catch a runaway 60 s query
402 // family that saturated mailrs's tokio worker pool; the
403 // side-effect was that any query > 25 ms (including every
404 // inbox-listing-shape query: contacts, /api/conversations,
405 // /api/mail/stats — all 30-400 ms range in prod) burned the
406 // 25 ms inline budget, then cancelled, then re-executed
407 // from scratch on `tokio::spawn_blocking`. That's a
408 // structural double-execute the spg-sqlx adapter ate on
409 // every slow query. 1000 ms covers mailrs's reported worst
410 // p99 (~ 374 ms contacts) with 2.5× headroom and still
411 // catches the genuine catastrophic 10s+ runaway shapes the
412 // v7.28 protection was originally aimed at. SPG-side
413 // env-only knob — clients keep writing `sqlx::query!()`
414 // exactly the same as against `sqlx-postgres`.
415 let budget_ms: u64 = std::env::var("SPG_SQLX_INLINE_BUDGET_MS")
416 .ok()
417 .and_then(|v| v.parse().ok())
418 .unwrap_or(1000);
419 let started = std::time::Instant::now();
420 let inline = spg_embedded::Database::execute_prepared_on_snapshot_with_budget(
421 &snap,
422 &c.stmt,
423 params.as_deref().unwrap_or(&[]),
424 budget_ms.saturating_mul(1_000),
425 );
426 match inline {
427 Ok(r) => r,
428 Err(spg_embedded::EngineError::Cancelled) => {
429 let stmt = c.stmt.clone();
430 let params_owned: Vec<spg_embedded::Value> =
431 params.as_deref().unwrap_or(&[]).to_vec();
432 let result = tokio::task::spawn_blocking(move || {
433 spg_embedded::Database::execute_prepared_on_snapshot(
434 &snap,
435 &stmt,
436 ¶ms_owned,
437 )
438 })
439 .await
440 .map_err(|e| Error::Protocol(format!("blocking-pool join: {e}")))?
441 .map_err(engine_to_sqlx)?;
442 // v7.35.0 (mailrs ask #2, 3 reports running) — embed
443 // the total elapsed time in the budget-exceeded log
444 // line so the consumer can distinguish e.g. an 82 ms
445 // NOT-IN form from a 200 ms correlated scan. mailrs
446 // greps for "exceeded … inline budget" — the prefix
447 // is unchanged, the `elapsed_ms=N` suffix is purely
448 // additive.
449 let elapsed_ms = started.elapsed().as_millis();
450 eprintln!(
451 "spg-sqlx: readonly query exceeded the {budget_ms} ms inline budget; \
452 continuing on the blocking pool: elapsed_ms={elapsed_ms} sql={}",
453 &sql[..sql.len().min(120)]
454 );
455 result
456 }
457 Err(e) => return Err(engine_to_sqlx(e)),
458 }
459 } else {
460 let db = &conn.inner;
461 if let Some(args) = arguments {
462 let stmt = db.prepare(sql).await.map_err(engine_to_sqlx)?;
463 db.execute_prepared(&stmt, args.into_engine_values())
464 .await
465 .map_err(engine_to_sqlx)?
466 } else {
467 db.execute(sql).await.map_err(engine_to_sqlx)?
468 }
469 };
470 Ok(outcome_from(result))
471}
472
473fn outcome_from(result: EngineQueryResult) -> Outcome {
474 match result {
475 EngineQueryResult::Rows { columns, rows } => {
476 let row_values: Vec<Vec<spg_embedded::Value>> =
477 rows.into_iter().map(|r| r.values).collect();
478 Outcome::Rows(build_rows(&columns, row_values))
479 }
480 EngineQueryResult::CommandOk { affected, .. } => {
481 Outcome::Affected(SpgQueryResult::new(u64::try_from(affected).unwrap_or(0)))
482 }
483 _ => Outcome::Affected(SpgQueryResult::default()),
484 }
485}
486
487#[allow(dead_code)]
488fn affected_from(qr: &EngineQueryResult) -> u64 {
489 match qr {
490 EngineQueryResult::CommandOk { affected, .. } => u64::try_from(*affected).unwrap_or(0),
491 EngineQueryResult::Rows { rows, .. } => u64::try_from(rows.len()).unwrap_or(0),
492 _ => 0,
493 }
494}
495
496fn build_rows(
497 cols: &[spg_embedded::ColumnSchema],
498 rows: Vec<Vec<spg_embedded::Value>>,
499) -> Vec<SpgRow> {
500 let columns: Arc<Vec<SpgColumn>> = Arc::new(
501 cols.iter()
502 .enumerate()
503 .map(|(i, c)| SpgColumn::new(i, c.name.clone(), SpgTypeInfo::from_data_type(c.ty)))
504 .collect(),
505 );
506 let mut by_name: HashMap<String, usize> = HashMap::new();
507 for (i, c) in cols.iter().enumerate() {
508 by_name.insert(c.name.clone(), i);
509 }
510 let by_name = Arc::new(by_name);
511 rows.into_iter()
512 .map(|values| SpgRow::new(Arc::clone(&columns), Arc::clone(&by_name), values))
513 .collect()
514}