oxisql_sqlite_compat/connection.rs
1//! [`SqliteConnection`] — Limbo-backed implementation of [`oxisql_core::Connection`].
2//!
3//! # Concurrency model
4//!
5//! `limbo::Connection` is internally `Arc<Mutex<Arc<limbo_core::Connection>>>` with
6//! `unsafe impl Send + Sync`, so it is safe to clone and share across async tasks.
7//! `SqliteConnection` is a thin newtype that holds:
8//!
9//! - `conn: limbo::Connection` — the Limbo connection handle.
10//! - `txn_lock: Arc<tokio::sync::Mutex<()>>` — a guard that prevents two async tasks
11//! from issuing `BEGIN` concurrently on the same logical connection. SQLite does
12//! not support nested transactions, so only one task at a time may hold a
13//! transaction.
14//! - `path: String` — the path supplied to [`Builder::new_local`], retained for
15//! diagnostics.
16//!
17//! # Affected-row count
18//!
19//! After each DML statement we call `conn.changes()` to read the row count that
20//! was committed by the most-recent write transaction. DDL statements and
21//! `BEGIN`/`COMMIT`/`ROLLBACK` leave the counter at 0, which is the correct
22//! contract per OxiSQL and `sqlite3_changes()` semantics.
23//!
24//! # Parameter binding
25//!
26//! OxiSQL passes `$1`, `$2`, … positional parameters. SQLite / Limbo expects
27//! `?` placeholders. `types::rewrite_params` performs a quote-aware
28//! translation before each statement is prepared.
29//!
30//! # Schema introspection
31//!
32//! [`Connection::tables`] queries `sqlite_master`.
33//! [`Connection::columns`] uses `PRAGMA table_info`.
34//! [`Connection::indexes`] parses `sqlite_master` DDL (PRAGMA index_list/index_info are not
35//! yet implemented in Limbo 0.0.22).
36//! [`Connection::foreign_keys`] uses `PRAGMA foreign_key_list` — the engine now
37//! surfaces FK metadata from its in-memory schema.
38//!
39//! # Transactions
40//!
41//! [`Connection::transaction`] issues `BEGIN` and returns a [`SqliteTransaction`]
42//! that wraps the same `limbo::Connection`. The transaction holds a guard on
43//! `txn_lock` so that no other task can start a concurrent `BEGIN`.
44//! Dropping `SqliteTransaction` without calling `commit` or `rollback` will
45//! execute `ROLLBACK` (best-effort, via `Drop`).
46//!
47//! # Prepared-statement cache
48//!
49//! All DML and DDL statements pass through an LRU cache keyed by the
50//! **rewritten SQL** (after `$N`→`?` translation). The cache holds up to
51//! `STMT_CACHE_CAPACITY` (128) compiled `limbo::Statement` entries per connection
52//! (shared across clones of the same connection via `Arc<StdMutex<…>>`).
53//!
54//! On a cache hit the existing `limbo::Statement` is taken out of the cache,
55//! executed via `Statement::execute()` (which calls `reset()` before binding),
56//! and returned to the cache after execution. `Statement::reset()` now also
57//! zeroes `Program::n_change` (fixed in oxisqlite-core), so cached statement
58//! reuse produces correct per-execution change counts.
59//!
60//! # ROLLBACK
61//!
62//! `SqliteTransaction::rollback()` executes the SQL string `"ROLLBACK"` against
63//! the engine, exactly mirroring how `commit()` executes `"COMMIT"`. The engine
64//! emits an `AutoCommit { auto_commit: true, rollback: true }` VDBE instruction
65//! that discards all pending changes. The `Drop` impl also fires a best-effort
66//! ROLLBACK when the transaction is dropped without an explicit `commit()` or
67//! `rollback()`.
68//!
69//! # Prepared-statement reuse (via SqlitePrepared)
70//!
71//! Limbo's `Statement` is consumed after a single `execute`/`query` cycle.
72//! Our [`PreparedStatement`] wrapper therefore re-prepares on every call. The
73//! API contract (parse-once, bind-many) is satisfied at the OxiSQL trait level
74//! even though Limbo does not yet expose a stable compiled-statement cache.
75
76use std::num::NonZeroUsize;
77use std::sync::{Arc, Mutex as StdMutex};
78
79use async_trait::async_trait;
80use limbo::params::Params as LimboParams;
81use limbo::Builder;
82use tokio::sync::Mutex as TokioMutex;
83
84// ── statement-cache capacity ───────────────────────────────────────────────────
85
86/// Maximum number of compiled statements retained in the per-connection LRU
87/// cache. Statements are keyed by their rewritten SQL (`?`-placeholder form).
88const STMT_CACHE_CAPACITY: usize = 128;
89
90use oxisql_core::{
91 ColumnInfo, Connection, ForeignKeyInfo, IndexInfo, OxiSqlError, PreparedStatement, Row,
92 TableInfo, TableType, ToSqlValue, Transaction, Value,
93};
94
95use crate::error::SqliteCompatError;
96use crate::types::{limbo_to_core_typed, rewrite_params, split_statements};
97
98// ── helpers ───────────────────────────────────────────────────────────────────
99
100/// A per-connection LRU cache from rewritten SQL → compiled `limbo::Statement`.
101///
102/// Wrapped in `Arc<StdMutex<…>>` so it can be cheaply shared when the
103/// `SqliteConnection` is cloned. The std `Mutex` is deliberately chosen over
104/// `tokio::sync::Mutex`: the critical section is very short (single hash-lookup
105/// or insertion) and never held across an `.await` point.
106type StmtCache = Arc<StdMutex<lru::LruCache<String, limbo::Statement>>>;
107
108/// Construct a new, empty [`StmtCache`] with [`STMT_CACHE_CAPACITY`] slots.
109fn new_stmt_cache() -> StmtCache {
110 // SAFETY: STMT_CACHE_CAPACITY is a positive compile-time constant (128).
111 // `NonZeroUsize::new` returns `None` only for 0, which this is not.
112 let cap = NonZeroUsize::new(STMT_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
113 Arc::new(StdMutex::new(lru::LruCache::new(cap)))
114}
115
116/// Execute a SQL statement that has already been rewritten to `?` placeholders.
117///
118/// On a cache miss, compiles the statement via `conn.prepare()`, inserts it into
119/// the cache, then executes it via `stmt.execute()`. On a cache hit, retrieves
120/// the cached `limbo::Statement` and calls `stmt.execute()` directly —
121/// `Statement::execute()` calls `reset()` internally, which (after the engine
122/// fix in oxisqlite-core) also clears `n_change`, making reuse correct.
123///
124/// The affected-row count is read from `conn.changes()` after execution,
125/// which reflects the count committed by the most recent write transaction on
126/// this connection. DDL and `BEGIN`/`COMMIT`/`ROLLBACK` return 0, which is the
127/// correct value per the OxiSQL contract.
128///
129/// When no `cache` is provided (e.g., in unit tests that bypass the cache) the
130/// function falls back to `conn.execute()` followed by `conn.changes()`.
131async fn exec_rewritten(
132 conn: &limbo::Connection,
133 sql: &str,
134 limbo_params: Vec<limbo::Value>,
135 cache: Option<&StmtCache>,
136) -> Result<u64, SqliteCompatError> {
137 let lp = if limbo_params.is_empty() {
138 LimboParams::None
139 } else {
140 LimboParams::Positional(limbo_params)
141 };
142
143 match cache {
144 Some(c) => {
145 // ── cache path ─────────────────────────────────────────────────────
146 //
147 // Retrieve a mutable reference to the cached statement, or compile a
148 // fresh one and insert it. We hold the lock only for the duration of
149 // the lookup/insert, not across the `.await` in `stmt.execute()`.
150 //
151 // Because `StmtCache` holds the actual `Statement` value (not a
152 // reference-counted pointer to it), we need to take ownership on a
153 // cache miss and then put it back after execution.
154
155 // Try to remove a hit from the cache so we have owned access during
156 // the async execute call.
157 let cached_stmt: Option<limbo::Statement> = c
158 .lock()
159 .map_err(|e| SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}")))?
160 .pop(sql);
161
162 let mut stmt = match cached_stmt {
163 Some(s) => s,
164 None => {
165 // Cache miss — compile a new statement.
166 conn.prepare(sql).await.map_err(SqliteCompatError::from)?
167 }
168 };
169
170 // Execute the (possibly retrieved-from-cache) statement.
171 // `Statement::execute()` calls `reset()` before binding parameters,
172 // and `reset()` now also zeroes `n_change`, so reuse is correct.
173 stmt.execute(lp).await.map_err(SqliteCompatError::from)?;
174
175 // Return the statement to the cache for future reuse.
176 c.lock()
177 .map_err(|e| SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}")))?
178 .put(sql.to_owned(), stmt);
179
180 // Read the affected-row count from the connection's native counter.
181 // DDL and TCL statements leave this at 0.
182 let n = conn
183 .changes()
184 .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
185 Ok(n.max(0) as u64)
186 }
187 None => {
188 // ── no-cache path (uncommon; bypasses the cache entirely) ──────────
189 conn.execute(sql, lp)
190 .await
191 .map_err(SqliteCompatError::from)?;
192 let n = conn
193 .changes()
194 .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
195 Ok(n.max(0) as u64)
196 }
197 }
198}
199
200/// Execute a query that has already been rewritten to `?` placeholders and
201/// collect all result rows.
202///
203/// Column declared types (e.g. `"DATE"`, `"TIMESTAMP"`, `"UUID"`) are
204/// collected from the prepared statement and forwarded to [`limbo_to_core_typed`]
205/// so that richer [`Value`] variants are produced when appropriate.
206async fn query_rewritten(
207 conn: &limbo::Connection,
208 sql: &str,
209 limbo_params: Vec<limbo::Value>,
210) -> Result<Vec<Row>, SqliteCompatError> {
211 let lp = if limbo_params.is_empty() {
212 LimboParams::None
213 } else {
214 LimboParams::Positional(limbo_params)
215 };
216
217 let mut stmt = conn.prepare(sql).await.map_err(SqliteCompatError::from)?;
218
219 // Collect column names and declared types together.
220 let col_info: Vec<(String, Option<String>)> = stmt
221 .columns()
222 .iter()
223 .map(|c| (c.name().to_owned(), c.decl_type().map(str::to_owned)))
224 .collect();
225
226 let col_names: Vec<String> = col_info.iter().map(|(name, _)| name.clone()).collect();
227
228 let mut rows_iter = stmt.query(lp).await.map_err(SqliteCompatError::from)?;
229
230 let mut rows: Vec<Row> = Vec::new();
231 while let Some(limbo_row) = rows_iter.next().await.map_err(SqliteCompatError::from)? {
232 let mut values: Vec<Value> = Vec::with_capacity(col_info.len());
233 for idx in 0..limbo_row.column_count() {
234 let raw = limbo_row.get_value(idx).map_err(SqliteCompatError::from)?;
235 let decl = col_info.get(idx).and_then(|(_, dt)| dt.as_deref());
236 values.push(limbo_to_core_typed(raw, decl)?);
237 }
238 rows.push(Row::new(col_names.clone(), values));
239 }
240 Ok(rows)
241}
242
243// ── SqliteConnection ──────────────────────────────────────────────────────────
244
245/// A Limbo-backed SQLite connection implementing [`Connection`].
246///
247/// Create via [`SqliteConnection::open`] (file path) or
248/// [`SqliteConnection::open_memory`] (`:memory:`).
249///
250/// # Statement cache
251///
252/// Each `SqliteConnection` maintains an LRU cache of compiled `limbo::Statement`
253/// objects (capacity: `STMT_CACHE_CAPACITY` = 128). The cache is shared across
254/// clones of the same connection (the clones share the underlying
255/// `limbo::Connection`) and is updated on every DML/DDL execution. Cache hits
256/// save the per-statement parse-and-compile round-trip inside Limbo.
257#[derive(Clone)]
258pub struct SqliteConnection {
259 conn: limbo::Connection,
260 txn_lock: Arc<TokioMutex<()>>,
261 stmt_cache: StmtCache,
262 path: String,
263}
264
265impl std::fmt::Debug for SqliteConnection {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 let cache_len = self.stmt_cache.lock().map(|g| g.len()).unwrap_or(0);
268 f.debug_struct("SqliteConnection")
269 .field("path", &self.path)
270 .field("stmt_cache_len", &cache_len)
271 .finish_non_exhaustive()
272 }
273}
274
275impl SqliteConnection {
276 /// Open a Limbo database at the given file path.
277 ///
278 /// Pass `":memory:"` for an in-memory database, or use
279 /// [`open_memory`][Self::open_memory] for clarity.
280 ///
281 /// # Errors
282 ///
283 /// Returns [`OxiSqlError`] if the file cannot be opened or created.
284 pub async fn open(path: &str) -> Result<Self, OxiSqlError> {
285 let db = Builder::new_local(path)
286 .build()
287 .await
288 .map_err(|e| OxiSqlError::Other(format!("limbo open error: {e}")))?;
289 let conn = db
290 .connect()
291 .map_err(|e| OxiSqlError::Other(format!("limbo connect error: {e}")))?;
292 Ok(Self {
293 conn,
294 txn_lock: Arc::new(TokioMutex::new(())),
295 stmt_cache: new_stmt_cache(),
296 path: path.to_owned(),
297 })
298 }
299
300 /// Open a fresh in-memory Limbo database.
301 ///
302 /// # Errors
303 ///
304 /// Returns [`OxiSqlError`] if the engine cannot be initialised.
305 pub async fn open_memory() -> Result<Self, OxiSqlError> {
306 Self::open(":memory:").await
307 }
308
309 /// Return the path this connection was opened with.
310 pub fn path(&self) -> &str {
311 &self.path
312 }
313}
314
315// ── Connection impl ───────────────────────────────────────────────────────────
316
317#[async_trait]
318impl Connection for SqliteConnection {
319 async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
320 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
321 exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
322 .await
323 .map_err(OxiSqlError::from)
324 }
325
326 async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
327 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
328 query_rewritten(&self.conn, &rewritten, limbo_params)
329 .await
330 .map_err(OxiSqlError::from)
331 }
332
333 async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
334 // Acquire the exclusive transaction lock before issuing BEGIN.
335 // This prevents a second task from starting a concurrent transaction
336 // on the same SqliteConnection clone.
337 let guard = self.txn_lock.lock().await;
338 self.conn
339 .execute("BEGIN", LimboParams::None)
340 .await
341 .map_err(|e| OxiSqlError::Other(format!("BEGIN failed: {e}")))?;
342 Ok(Box::new(SqliteTransaction {
343 conn: self.conn.clone(),
344 // Share the connection-level stmt_cache so that DML executed inside
345 // a transaction also benefits from cached compiled statements.
346 stmt_cache: Arc::clone(&self.stmt_cache),
347 // Transfer ownership of the mutex guard into the transaction.
348 // The guard is released when SqliteTransaction is dropped.
349 _guard: guard,
350 done: false,
351 }))
352 }
353
354 async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
355 // Token-aware split: honours `;` inside string literals, quoted
356 // identifiers, block comments, and line comments.
357 let stmts = split_statements(sql);
358 let mut total = 0u64;
359 for stmt in stmts {
360 total += self.execute(stmt, &[]).await?;
361 }
362 Ok(total)
363 }
364
365 async fn ping(&self) -> Result<(), OxiSqlError> {
366 self.query("SELECT 1", &[]).await?;
367 Ok(())
368 }
369
370 async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
371 Ok(Box::new(SqlitePrepared {
372 conn: &self.conn,
373 stmt_cache: Arc::clone(&self.stmt_cache),
374 sql: sql.to_owned(),
375 }))
376 }
377
378 // ── Schema introspection ──────────────────────────────────────────────────
379
380 async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
381 let rows = self
382 .query(
383 "SELECT name, type FROM sqlite_master \
384 WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' \
385 ORDER BY name",
386 &[],
387 )
388 .await?;
389
390 let infos = rows
391 .into_iter()
392 .map(|row| {
393 let name = row
394 .get_by_index(0)
395 .and_then(|v| {
396 if let Value::Text(s) = v {
397 Some(s.clone())
398 } else {
399 None
400 }
401 })
402 .unwrap_or_default();
403 let ttype_str = row
404 .get_by_index(1)
405 .and_then(|v| {
406 if let Value::Text(s) = v {
407 Some(s.as_str())
408 } else {
409 None
410 }
411 })
412 .unwrap_or("table");
413 let table_type = match ttype_str {
414 "view" => TableType::View,
415 _ => TableType::Base,
416 };
417 TableInfo {
418 name,
419 schema: None,
420 table_type,
421 }
422 })
423 .collect();
424 Ok(infos)
425 }
426
427 async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
428 // PRAGMA table_info returns: cid, name, type, notnull, dflt_value, pk
429 let sql = format!("PRAGMA table_info(\"{table}\")");
430 let rows = self.query(&sql, &[]).await?;
431
432 let infos = rows
433 .into_iter()
434 .map(|row| {
435 // Helper: get column by index as string or empty string.
436 let text_at = |r: &Row, idx: usize| -> String {
437 r.get_by_index(idx)
438 .and_then(|v| match v {
439 Value::Text(s) => Some(s.clone()),
440 Value::I64(n) => Some(n.to_string()),
441 Value::Null => Some(String::new()),
442 _ => None,
443 })
444 .unwrap_or_default()
445 };
446 let i64_at = |r: &Row, idx: usize| -> i64 {
447 r.get_by_index(idx)
448 .and_then(|v| {
449 if let Value::I64(n) = v {
450 Some(*n)
451 } else {
452 None
453 }
454 })
455 .unwrap_or(0)
456 };
457
458 let ordinal = i64_at(&row, 0) as u32 + 1; // cid is 0-based
459 let name = text_at(&row, 1);
460 let data_type = text_at(&row, 2);
461 let notnull = i64_at(&row, 3) != 0;
462 let default_val = row.get_by_index(4).and_then(|v| match v {
463 Value::Text(s) => Some(s.clone()),
464 Value::Null => None,
465 other => Some(format!("{other:?}")),
466 });
467
468 ColumnInfo {
469 name,
470 ordinal_position: ordinal,
471 data_type,
472 nullable: !notnull,
473 default: default_val,
474 max_length: None,
475 numeric_precision: None,
476 numeric_scale: None,
477 }
478 })
479 .collect();
480 Ok(infos)
481 }
482
483 async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
484 // PRAGMA index_list and PRAGMA index_info are not yet implemented in limbo 0.0.22.
485 // Fall back to sqlite_master for index names and uniqueness, then parse
486 // the index SQL to extract column names. This is best-effort: multi-column
487 // indexes and expression indexes may not parse perfectly.
488 let sql = "SELECT name, sql FROM sqlite_master \
489 WHERE type='index' AND tbl_name=$1 AND name NOT LIKE 'sqlite_%'";
490 let rows = self.query(sql, &[&table]).await?;
491
492 let mut infos: Vec<IndexInfo> = Vec::new();
493 for row in rows {
494 let name = row
495 .get_by_index(0)
496 .and_then(|v| {
497 if let Value::Text(s) = v {
498 Some(s.clone())
499 } else {
500 None
501 }
502 })
503 .unwrap_or_default();
504 let idx_sql = row
505 .get_by_index(1)
506 .and_then(|v| {
507 if let Value::Text(s) = v {
508 Some(s.clone())
509 } else {
510 None
511 }
512 })
513 .unwrap_or_default();
514
515 // Detect UNIQUE from the CREATE INDEX / CREATE UNIQUE INDEX statement.
516 let upper = idx_sql.to_ascii_uppercase();
517 let unique = upper.contains("UNIQUE");
518
519 // Extract column list between the last `(` and `)`.
520 let columns: Vec<String> =
521 if let (Some(open), Some(close)) = (idx_sql.rfind('('), idx_sql.rfind(')')) {
522 idx_sql[open + 1..close]
523 .split(',')
524 .map(|c| c.trim().to_string())
525 .filter(|c| !c.is_empty())
526 .collect()
527 } else {
528 vec![]
529 };
530
531 infos.push(IndexInfo {
532 name,
533 columns,
534 unique,
535 primary: false,
536 });
537 }
538 Ok(infos)
539 }
540
541 async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
542 // Use PRAGMA foreign_key_list — the engine now surfaces FK metadata
543 // directly from the in-memory schema, avoiding brittle DDL text parsing.
544 let escaped = table.replace('"', "\"\"");
545 let sql = format!("PRAGMA foreign_key_list(\"{}\")", escaped);
546 let rows = query_rewritten(&self.conn, &sql, vec![])
547 .await
548 .map_err(OxiSqlError::from)?;
549
550 // PRAGMA foreign_key_list columns (by index):
551 // 0: id INTEGER — FK index within the table
552 // 1: seq INTEGER — column position within a composite FK
553 // 2: table TEXT — parent table name
554 // 3: from TEXT — child column name
555 // 4: to TEXT/NULL — parent column name (NULL = implicit PK ref)
556 // 5: on_update TEXT
557 // 6: on_delete TEXT
558 // 7: match TEXT
559 let mut infos: Vec<ForeignKeyInfo> = Vec::with_capacity(rows.len());
560 for row in &rows {
561 let id = match row.get_by_index(0) {
562 Some(Value::I64(v)) => *v,
563 _ => 0,
564 };
565 let from_col = match row.get_by_index(3) {
566 Some(Value::Text(s)) => s.clone(),
567 _ => continue,
568 };
569 let foreign_table = match row.get_by_index(2) {
570 Some(Value::Text(s)) => s.clone(),
571 _ => continue,
572 };
573 let foreign_column = match row.get_by_index(4) {
574 Some(Value::Text(s)) => s.clone(),
575 _ => String::new(),
576 };
577 let on_update = match row.get_by_index(5) {
578 Some(Value::Text(s)) => Some(s.clone()),
579 _ => None,
580 };
581 let on_delete = match row.get_by_index(6) {
582 Some(Value::Text(s)) => Some(s.clone()),
583 _ => None,
584 };
585 let constraint_name = format!("fk_{table}_{id}");
586 infos.push(ForeignKeyInfo {
587 constraint_name,
588 column: from_col,
589 foreign_table,
590 foreign_column,
591 on_update,
592 on_delete,
593 });
594 }
595 Ok(infos)
596 }
597}
598
599// ── SqliteTransaction ─────────────────────────────────────────────────────────
600
601/// A SQLite transaction backed by raw `BEGIN`/`COMMIT`/`ROLLBACK` statements.
602///
603/// Holds a guard on the connection-level transaction mutex so that no other
604/// async task can start a concurrent `BEGIN` on the same `SqliteConnection`.
605/// When dropped without an explicit `commit` or `rollback`, the transaction
606/// attempts a best-effort `ROLLBACK` via a background task.
607pub struct SqliteTransaction<'a> {
608 conn: limbo::Connection,
609 stmt_cache: StmtCache,
610 _guard: tokio::sync::MutexGuard<'a, ()>,
611 done: bool,
612}
613
614impl<'a> Drop for SqliteTransaction<'a> {
615 fn drop(&mut self) {
616 if !self.done {
617 // Best-effort rollback on implicit drop. We cannot `.await` inside
618 // `drop`, so we spawn a fire-and-forget task. The mutex guard is
619 // released when `SqliteTransaction` is fully dropped (after this
620 // function body returns).
621 let conn = self.conn.clone();
622 tokio::spawn(async move {
623 if let Err(e) = conn.execute("ROLLBACK", LimboParams::None).await {
624 log::warn!("SqliteTransaction drop: ROLLBACK failed: {e}");
625 }
626 });
627 }
628 }
629}
630
631#[async_trait]
632impl<'a> Transaction for SqliteTransaction<'a> {
633 async fn execute(&mut self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
634 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
635 exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
636 .await
637 .map_err(OxiSqlError::from)
638 }
639
640 async fn query(
641 &mut self,
642 sql: &str,
643 params: &[&dyn ToSqlValue],
644 ) -> Result<Vec<Row>, OxiSqlError> {
645 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
646 query_rewritten(&self.conn, &rewritten, limbo_params)
647 .await
648 .map_err(OxiSqlError::from)
649 }
650
651 async fn commit(mut self: Box<Self>) -> Result<(), OxiSqlError> {
652 self.done = true;
653 self.conn
654 .execute("COMMIT", LimboParams::None)
655 .await
656 .map_err(|e| OxiSqlError::Other(format!("COMMIT failed: {e}")))?;
657 Ok(())
658 }
659
660 async fn rollback(mut self: Box<Self>) -> Result<(), OxiSqlError> {
661 // Mark done so that Drop does not attempt a second ROLLBACK.
662 self.done = true;
663 self.conn
664 .execute("ROLLBACK", LimboParams::None)
665 .await
666 .map_err(|e| OxiSqlError::Other(format!("ROLLBACK failed: {e}")))?;
667 Ok(())
668 }
669}
670
671// ── SqlitePrepared ────────────────────────────────────────────────────────────
672
673/// A prepared statement backed by the connection-level LRU cache.
674///
675/// On each `execute()` call the cached `limbo::Statement` is retrieved (or
676/// compiled fresh on a miss), executed, and returned to the cache. Because
677/// `Statement::reset()` now zeroes `n_change`, every execution sees a correct
678/// change count without re-parsing the SQL.
679pub struct SqlitePrepared<'a> {
680 conn: &'a limbo::Connection,
681 stmt_cache: StmtCache,
682 sql: String,
683}
684
685#[async_trait]
686impl<'a> PreparedStatement for SqlitePrepared<'a> {
687 async fn execute(&mut self, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
688 let (rewritten, limbo_params) =
689 rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
690 exec_rewritten(self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
691 .await
692 .map_err(OxiSqlError::from)
693 }
694
695 async fn query(&mut self, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
696 let (rewritten, limbo_params) =
697 rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
698 query_rewritten(self.conn, &rewritten, limbo_params)
699 .await
700 .map_err(OxiSqlError::from)
701 }
702
703 fn sql(&self) -> &str {
704 &self.sql
705 }
706}