oxisql_sqlite_compat/connection.rs
1//! [`SqliteConnection`] — Limbo-backed implementation of [`oxisql_core::Connection`].
2//!
3//! # Concurrency model
4//!
5//! `limbo::Connection` is internally `Arc<Mutex<Arc<limbo_core::Connection>>>` with
6//! `unsafe impl Send + Sync`, so it is safe to clone and share across async tasks.
7//! `SqliteConnection` is a thin newtype that holds:
8//!
9//! - `conn: limbo::Connection` — the Limbo connection handle.
10//! - `txn_lock: Arc<tokio::sync::Mutex<()>>` — a guard that prevents two async tasks
11//! from issuing `BEGIN` concurrently on the same logical connection. SQLite does
12//! not support nested transactions, so only one task at a time may hold a
13//! transaction.
14//! - `path: String` — the path supplied to [`Builder::new_local`], retained for
15//! diagnostics.
16//!
17//! # Affected-row count
18//!
19//! After each DML statement we call `conn.changes()` to read the row count that
20//! was committed by the most-recent write transaction. DDL statements and
21//! `BEGIN`/`COMMIT`/`ROLLBACK` leave the counter at 0, which is the correct
22//! contract per OxiSQL and `sqlite3_changes()` semantics.
23//!
24//! # Parameter binding
25//!
26//! OxiSQL passes `$1`, `$2`, … positional parameters. SQLite / Limbo expects
27//! `?` placeholders. `types::rewrite_params` performs a quote-aware
28//! translation before each statement is prepared.
29//!
30//! # Schema introspection
31//!
32//! [`Connection::tables`] queries `sqlite_master`.
33//! [`Connection::columns`] uses `PRAGMA table_info`.
34//! [`Connection::indexes`] parses `sqlite_master` DDL (PRAGMA index_list/index_info are not
35//! yet implemented in Limbo 0.0.22).
36//! [`Connection::foreign_keys`] uses `PRAGMA foreign_key_list` — the engine now
37//! surfaces FK metadata from its in-memory schema.
38//!
39//! # Transactions
40//!
41//! [`Connection::transaction`] issues `BEGIN` and returns a [`SqliteTransaction`]
42//! that wraps the same `limbo::Connection`. The transaction holds a guard on
43//! `txn_lock` so that no other task can start a concurrent `BEGIN`.
44//! Dropping `SqliteTransaction` without calling `commit` or `rollback` will
45//! execute `ROLLBACK` (best-effort, via `Drop`).
46//!
47//! # Prepared-statement cache
48//!
49//! All DML and DDL statements pass through an LRU cache keyed by the
50//! **rewritten SQL** (after `$N`→`?` translation). The cache holds up to
51//! `STMT_CACHE_CAPACITY` (128) compiled `limbo::Statement` entries per connection
52//! (shared across clones of the same connection via `Arc<StdMutex<…>>`).
53//!
54//! On a cache hit the existing `limbo::Statement` is taken out of the cache,
55//! executed via `Statement::execute()` (which calls `reset()` before binding),
56//! and returned to the cache after execution. `Statement::reset()` now also
57//! zeroes `Program::n_change` (fixed in oxisqlite-core), so cached statement
58//! reuse produces correct per-execution change counts.
59//!
60//! # ROLLBACK
61//!
62//! `SqliteTransaction::rollback()` executes the SQL string `"ROLLBACK"` against
63//! the engine, exactly mirroring how `commit()` executes `"COMMIT"`. The engine
64//! emits an `AutoCommit { auto_commit: true, rollback: true }` VDBE instruction
65//! that discards all pending changes. The `Drop` impl also fires a best-effort
66//! ROLLBACK when the transaction is dropped without an explicit `commit()` or
67//! `rollback()`.
68//!
69//! # Prepared-statement reuse (via SqlitePrepared)
70//!
71//! Limbo's `Statement` is consumed after a single `execute`/`query` cycle.
72//! Our [`PreparedStatement`] wrapper therefore re-prepares on every call. The
73//! API contract (parse-once, bind-many) is satisfied at the OxiSQL trait level
74//! even though Limbo does not yet expose a stable compiled-statement cache.
75
76use std::num::NonZeroUsize;
77use std::sync::{Arc, Mutex as StdMutex};
78
79use async_trait::async_trait;
80use limbo::params::Params as LimboParams;
81use limbo::Builder;
82use tokio::sync::Mutex as TokioMutex;
83
84// ── statement-cache capacity ───────────────────────────────────────────────────
85
86/// Maximum number of compiled statements retained in the per-connection LRU
87/// cache. Statements are keyed by their rewritten SQL (`?`-placeholder form).
88const STMT_CACHE_CAPACITY: usize = 128;
89
90use oxisql_core::{
91 ColumnInfo, Connection, ForeignKeyInfo, IndexInfo, OxiSqlError, PreparedStatement, Row,
92 TableInfo, TableType, ToSqlValue, Transaction, Value,
93};
94
95use crate::error::SqliteCompatError;
96use crate::types::{limbo_to_core_typed, rewrite_params, split_statements};
97
98// ── helpers ───────────────────────────────────────────────────────────────────
99
100/// A per-connection LRU cache from rewritten SQL → compiled `limbo::Statement`.
101///
102/// Wrapped in `Arc<StdMutex<…>>` so it can be cheaply shared when the
103/// `SqliteConnection` is cloned. The std `Mutex` is deliberately chosen over
104/// `tokio::sync::Mutex`: the critical section is very short (single hash-lookup
105/// or insertion) and never held across an `.await` point.
106type StmtCache = Arc<StdMutex<lru::LruCache<String, limbo::Statement>>>;
107
108/// Construct a new, empty [`StmtCache`] with [`STMT_CACHE_CAPACITY`] slots.
109fn new_stmt_cache() -> StmtCache {
110 // SAFETY: STMT_CACHE_CAPACITY is a positive compile-time constant (128).
111 // `NonZeroUsize::new` returns `None` only for 0, which this is not.
112 let cap = NonZeroUsize::new(STMT_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
113 Arc::new(StdMutex::new(lru::LruCache::new(cap)))
114}
115
116/// Execute a SQL statement that has already been rewritten to `?` placeholders.
117///
118/// All statements (DML and DDL) pass through the statement cache uniformly.
119/// On a cache miss the statement is compiled via `conn.prepare()`, executed,
120/// and stored for future reuse. On a cache hit the existing `limbo::Statement`
121/// is retrieved, executed via `stmt.execute()` (which calls `reset()` before
122/// binding, zeroing `n_change` so reuse produces correct per-execution change
123/// counts), and returned to the cache.
124///
125/// If the cached statement was compiled before a schema change (DDL, ALTER,
126/// CREATE INDEX, etc.), the engine's `op_transaction` cookie check fires on
127/// the first `step()` and returns `SchemaChanged`. This function catches that
128/// error, discards the stale compiled program, re-prepares against the
129/// refreshed schema, and retries exactly once. This transparent re-prepare
130/// replaces the old `is_ddl` keyword-prefix heuristic that failed on
131/// comment-prefixed DDL and left DML statements stale after schema changes.
132///
133/// The affected-row count is read from `conn.changes()` after execution,
134/// which reflects the count committed by the most recent write transaction on
135/// this connection. DDL and `BEGIN`/`COMMIT`/`ROLLBACK` return 0, which is the
136/// correct value per the OxiSQL contract.
137///
138/// When no `cache` is provided (e.g., in unit tests that bypass the cache) the
139/// function falls back to `conn.execute()` followed by `conn.changes()`.
140async fn exec_rewritten(
141 conn: &limbo::Connection,
142 sql: &str,
143 limbo_params: Vec<limbo::Value>,
144 cache: Option<&StmtCache>,
145) -> Result<u64, SqliteCompatError> {
146 match cache {
147 Some(c) => {
148 // Clone before consuming so we can rebuild the parameter list for a
149 // re-prepare-and-retry if the engine signals SchemaChanged.
150 let retry_params = limbo_params.clone();
151 let lp = if limbo_params.is_empty() {
152 LimboParams::None
153 } else {
154 LimboParams::Positional(limbo_params)
155 };
156
157 // Take the compiled statement out of the cache (if present).
158 // The lock is held only for this short lookup; never across `.await`.
159 let cached = {
160 let mut guard = c.lock().map_err(|e| {
161 SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}"))
162 })?;
163 guard.pop(sql)
164 };
165
166 let mut stmt = match cached {
167 Some(s) => s,
168 None => conn.prepare(sql).await.map_err(SqliteCompatError::from)?,
169 };
170
171 match stmt.execute(lp).await {
172 Ok(_) => {
173 // Execution succeeded — return the statement to the cache.
174 c.lock()
175 .map_err(|e| {
176 SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}"))
177 })?
178 .put(sql.to_owned(), stmt);
179 }
180 Err(e) if e.is_schema_changed() => {
181 // The schema changed after this statement was compiled. Drop
182 // the stale program, re-compile against the refreshed schema,
183 // and retry exactly once.
184 drop(stmt);
185 let retry_lp = if retry_params.is_empty() {
186 LimboParams::None
187 } else {
188 LimboParams::Positional(retry_params)
189 };
190 let mut fresh = conn.prepare(sql).await.map_err(SqliteCompatError::from)?;
191 fresh
192 .execute(retry_lp)
193 .await
194 .map_err(SqliteCompatError::from)?;
195 c.lock()
196 .map_err(|e| {
197 SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}"))
198 })?
199 .put(sql.to_owned(), fresh);
200 }
201 Err(e) => return Err(SqliteCompatError::from(e)),
202 }
203
204 let n = conn
205 .changes()
206 .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
207 Ok(n.max(0) as u64)
208 }
209 None => {
210 // ── no-cache path (uncommon; bypasses the cache entirely) ──────────
211 let lp = if limbo_params.is_empty() {
212 LimboParams::None
213 } else {
214 LimboParams::Positional(limbo_params)
215 };
216 conn.execute(sql, lp)
217 .await
218 .map_err(SqliteCompatError::from)?;
219 let n = conn
220 .changes()
221 .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
222 Ok(n.max(0) as u64)
223 }
224 }
225}
226
227/// Execute a query that has already been rewritten to `?` placeholders and
228/// collect all result rows.
229///
230/// Column declared types (e.g. `"DATE"`, `"TIMESTAMP"`, `"UUID"`) are
231/// collected from the prepared statement and forwarded to [`limbo_to_core_typed`]
232/// so that richer [`Value`] variants are produced when appropriate.
233async fn query_rewritten(
234 conn: &limbo::Connection,
235 sql: &str,
236 limbo_params: Vec<limbo::Value>,
237) -> Result<Vec<Row>, SqliteCompatError> {
238 let lp = if limbo_params.is_empty() {
239 LimboParams::None
240 } else {
241 LimboParams::Positional(limbo_params)
242 };
243
244 let mut stmt = conn.prepare(sql).await.map_err(SqliteCompatError::from)?;
245
246 // Collect column names and declared types together.
247 let col_info: Vec<(String, Option<String>)> = stmt
248 .columns()
249 .iter()
250 .map(|c| (c.name().to_owned(), c.decl_type().map(str::to_owned)))
251 .collect();
252
253 let col_names: Vec<String> = col_info.iter().map(|(name, _)| name.clone()).collect();
254
255 let mut rows_iter = stmt.query(lp).await.map_err(SqliteCompatError::from)?;
256
257 let mut rows: Vec<Row> = Vec::new();
258 while let Some(limbo_row) = rows_iter.next().await.map_err(SqliteCompatError::from)? {
259 let mut values: Vec<Value> = Vec::with_capacity(col_info.len());
260 for idx in 0..limbo_row.column_count() {
261 let raw = limbo_row.get_value(idx).map_err(SqliteCompatError::from)?;
262 let decl = col_info.get(idx).and_then(|(_, dt)| dt.as_deref());
263 values.push(limbo_to_core_typed(raw, decl)?);
264 }
265 rows.push(Row::new(col_names.clone(), values));
266 }
267 Ok(rows)
268}
269
270// ── SqliteConnection ──────────────────────────────────────────────────────────
271
272/// A Limbo-backed SQLite connection implementing [`Connection`].
273///
274/// Create via [`SqliteConnection::open`] (file path) or
275/// [`SqliteConnection::open_memory`] (`:memory:`).
276///
277/// # Statement cache
278///
279/// Each `SqliteConnection` maintains an LRU cache of compiled `limbo::Statement`
280/// objects (capacity: `STMT_CACHE_CAPACITY` = 128). The cache is shared across
281/// clones of the same connection (the clones share the underlying
282/// `limbo::Connection`) and is updated on every DML/DDL execution. Cache hits
283/// save the per-statement parse-and-compile round-trip inside Limbo.
284#[derive(Clone)]
285pub struct SqliteConnection {
286 conn: limbo::Connection,
287 txn_lock: Arc<TokioMutex<()>>,
288 stmt_cache: StmtCache,
289 path: String,
290}
291
292impl std::fmt::Debug for SqliteConnection {
293 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
294 let cache_len = self.stmt_cache.lock().map(|g| g.len()).unwrap_or(0);
295 f.debug_struct("SqliteConnection")
296 .field("path", &self.path)
297 .field("stmt_cache_len", &cache_len)
298 .finish_non_exhaustive()
299 }
300}
301
302impl SqliteConnection {
303 /// Open a Limbo database at the given file path.
304 ///
305 /// Pass `":memory:"` for an in-memory database, or use
306 /// [`open_memory`][Self::open_memory] for clarity.
307 ///
308 /// # Errors
309 ///
310 /// Returns [`OxiSqlError`] if the file cannot be opened or created.
311 pub async fn open(path: &str) -> Result<Self, OxiSqlError> {
312 let db = Builder::new_local(path)
313 .build()
314 .await
315 .map_err(|e| OxiSqlError::Other(format!("limbo open error: {e}")))?;
316 let conn = db
317 .connect()
318 .map_err(|e| OxiSqlError::Other(format!("limbo connect error: {e}")))?;
319 Ok(Self {
320 conn,
321 txn_lock: Arc::new(TokioMutex::new(())),
322 stmt_cache: new_stmt_cache(),
323 path: path.to_owned(),
324 })
325 }
326
327 /// Open a fresh in-memory Limbo database.
328 ///
329 /// # Errors
330 ///
331 /// Returns [`OxiSqlError`] if the engine cannot be initialised.
332 pub async fn open_memory() -> Result<Self, OxiSqlError> {
333 Self::open(":memory:").await
334 }
335
336 /// Return the path this connection was opened with.
337 pub fn path(&self) -> &str {
338 &self.path
339 }
340}
341
342// ── Connection impl ───────────────────────────────────────────────────────────
343
344#[async_trait]
345impl Connection for SqliteConnection {
346 async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
347 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
348 exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
349 .await
350 .map_err(OxiSqlError::from)
351 }
352
353 async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
354 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
355 query_rewritten(&self.conn, &rewritten, limbo_params)
356 .await
357 .map_err(OxiSqlError::from)
358 }
359
360 async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
361 // Acquire the exclusive transaction lock before issuing BEGIN.
362 // This prevents a second task from starting a concurrent transaction
363 // on the same SqliteConnection clone.
364 let guard = self.txn_lock.lock().await;
365 self.conn
366 .execute("BEGIN", LimboParams::None)
367 .await
368 .map_err(|e| OxiSqlError::Other(format!("BEGIN failed: {e}")))?;
369 Ok(Box::new(SqliteTransaction {
370 conn: self.conn.clone(),
371 // Share the connection-level stmt_cache so that DML executed inside
372 // a transaction also benefits from cached compiled statements.
373 stmt_cache: Arc::clone(&self.stmt_cache),
374 // Transfer ownership of the mutex guard into the transaction.
375 // The guard is released when SqliteTransaction is dropped.
376 _guard: guard,
377 done: false,
378 }))
379 }
380
381 async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
382 // Token-aware split: honours `;` inside string literals, quoted
383 // identifiers, block comments, and line comments.
384 let stmts = split_statements(sql);
385 let mut total = 0u64;
386 for stmt in stmts {
387 total += self.execute(stmt, &[]).await?;
388 }
389 Ok(total)
390 }
391
392 async fn ping(&self) -> Result<(), OxiSqlError> {
393 self.query("SELECT 1", &[]).await?;
394 Ok(())
395 }
396
397 async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
398 Ok(Box::new(SqlitePrepared {
399 conn: &self.conn,
400 stmt_cache: Arc::clone(&self.stmt_cache),
401 sql: sql.to_owned(),
402 }))
403 }
404
405 // ── Schema introspection ──────────────────────────────────────────────────
406
407 async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
408 let rows = self
409 .query(
410 "SELECT name, type FROM sqlite_master \
411 WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' \
412 ORDER BY name",
413 &[],
414 )
415 .await?;
416
417 let infos = rows
418 .into_iter()
419 .map(|row| {
420 let name = row
421 .get_by_index(0)
422 .and_then(|v| {
423 if let Value::Text(s) = v {
424 Some(s.clone())
425 } else {
426 None
427 }
428 })
429 .unwrap_or_default();
430 let ttype_str = row
431 .get_by_index(1)
432 .and_then(|v| {
433 if let Value::Text(s) = v {
434 Some(s.as_str())
435 } else {
436 None
437 }
438 })
439 .unwrap_or("table");
440 let table_type = match ttype_str {
441 "view" => TableType::View,
442 _ => TableType::Base,
443 };
444 TableInfo {
445 name,
446 schema: None,
447 table_type,
448 }
449 })
450 .collect();
451 Ok(infos)
452 }
453
454 async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
455 // PRAGMA table_info returns: cid, name, type, notnull, dflt_value, pk
456 let sql = format!("PRAGMA table_info(\"{table}\")");
457 let rows = self.query(&sql, &[]).await?;
458
459 let infos = rows
460 .into_iter()
461 .map(|row| {
462 // Helper: get column by index as string or empty string.
463 let text_at = |r: &Row, idx: usize| -> String {
464 r.get_by_index(idx)
465 .and_then(|v| match v {
466 Value::Text(s) => Some(s.clone()),
467 Value::I64(n) => Some(n.to_string()),
468 Value::Null => Some(String::new()),
469 _ => None,
470 })
471 .unwrap_or_default()
472 };
473 let i64_at = |r: &Row, idx: usize| -> i64 {
474 r.get_by_index(idx)
475 .and_then(|v| {
476 if let Value::I64(n) = v {
477 Some(*n)
478 } else {
479 None
480 }
481 })
482 .unwrap_or(0)
483 };
484
485 let ordinal = i64_at(&row, 0) as u32 + 1; // cid is 0-based
486 let name = text_at(&row, 1);
487 let data_type = text_at(&row, 2);
488 let notnull = i64_at(&row, 3) != 0;
489 let default_val = row.get_by_index(4).and_then(|v| match v {
490 Value::Text(s) => Some(s.clone()),
491 Value::Null => None,
492 other => Some(format!("{other:?}")),
493 });
494
495 ColumnInfo {
496 name,
497 ordinal_position: ordinal,
498 data_type,
499 nullable: !notnull,
500 default: default_val,
501 max_length: None,
502 numeric_precision: None,
503 numeric_scale: None,
504 }
505 })
506 .collect();
507 Ok(infos)
508 }
509
510 async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
511 // PRAGMA index_list and PRAGMA index_info are not yet implemented in limbo 0.0.22.
512 // Fall back to sqlite_master for index names and uniqueness, then parse
513 // the index SQL to extract column names. This is best-effort: multi-column
514 // indexes and expression indexes may not parse perfectly.
515 let sql = "SELECT name, sql FROM sqlite_master \
516 WHERE type='index' AND tbl_name=$1 AND name NOT LIKE 'sqlite_%'";
517 let rows = self.query(sql, &[&table]).await?;
518
519 let mut infos: Vec<IndexInfo> = Vec::new();
520 for row in rows {
521 let name = row
522 .get_by_index(0)
523 .and_then(|v| {
524 if let Value::Text(s) = v {
525 Some(s.clone())
526 } else {
527 None
528 }
529 })
530 .unwrap_or_default();
531 let idx_sql = row
532 .get_by_index(1)
533 .and_then(|v| {
534 if let Value::Text(s) = v {
535 Some(s.clone())
536 } else {
537 None
538 }
539 })
540 .unwrap_or_default();
541
542 // Detect UNIQUE from the CREATE INDEX / CREATE UNIQUE INDEX statement.
543 let upper = idx_sql.to_ascii_uppercase();
544 let unique = upper.contains("UNIQUE");
545
546 // Extract column list between the last `(` and `)`.
547 let columns: Vec<String> =
548 if let (Some(open), Some(close)) = (idx_sql.rfind('('), idx_sql.rfind(')')) {
549 idx_sql[open + 1..close]
550 .split(',')
551 .map(|c| c.trim().to_string())
552 .filter(|c| !c.is_empty())
553 .collect()
554 } else {
555 vec![]
556 };
557
558 infos.push(IndexInfo {
559 name,
560 columns,
561 unique,
562 primary: false,
563 });
564 }
565 Ok(infos)
566 }
567
568 async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
569 // Use PRAGMA foreign_key_list — the engine now surfaces FK metadata
570 // directly from the in-memory schema, avoiding brittle DDL text parsing.
571 let escaped = table.replace('"', "\"\"");
572 let sql = format!("PRAGMA foreign_key_list(\"{}\")", escaped);
573 let rows = query_rewritten(&self.conn, &sql, vec![])
574 .await
575 .map_err(OxiSqlError::from)?;
576
577 // PRAGMA foreign_key_list columns (by index):
578 // 0: id INTEGER — FK index within the table
579 // 1: seq INTEGER — column position within a composite FK
580 // 2: table TEXT — parent table name
581 // 3: from TEXT — child column name
582 // 4: to TEXT/NULL — parent column name (NULL = implicit PK ref)
583 // 5: on_update TEXT
584 // 6: on_delete TEXT
585 // 7: match TEXT
586 let mut infos: Vec<ForeignKeyInfo> = Vec::with_capacity(rows.len());
587 for row in &rows {
588 let id = match row.get_by_index(0) {
589 Some(Value::I64(v)) => *v,
590 _ => 0,
591 };
592 let from_col = match row.get_by_index(3) {
593 Some(Value::Text(s)) => s.clone(),
594 _ => continue,
595 };
596 let foreign_table = match row.get_by_index(2) {
597 Some(Value::Text(s)) => s.clone(),
598 _ => continue,
599 };
600 let foreign_column = match row.get_by_index(4) {
601 Some(Value::Text(s)) => s.clone(),
602 _ => String::new(),
603 };
604 let on_update = match row.get_by_index(5) {
605 Some(Value::Text(s)) => Some(s.clone()),
606 _ => None,
607 };
608 let on_delete = match row.get_by_index(6) {
609 Some(Value::Text(s)) => Some(s.clone()),
610 _ => None,
611 };
612 let constraint_name = format!("fk_{table}_{id}");
613 infos.push(ForeignKeyInfo {
614 constraint_name,
615 column: from_col,
616 foreign_table,
617 foreign_column,
618 on_update,
619 on_delete,
620 });
621 }
622 Ok(infos)
623 }
624}
625
626// ── SqliteTransaction ─────────────────────────────────────────────────────────
627
628/// A SQLite transaction backed by raw `BEGIN`/`COMMIT`/`ROLLBACK` statements.
629///
630/// Holds a guard on the connection-level transaction mutex so that no other
631/// async task can start a concurrent `BEGIN` on the same `SqliteConnection`.
632/// When dropped without an explicit `commit` or `rollback`, the transaction
633/// attempts a best-effort `ROLLBACK` via a background task.
634pub struct SqliteTransaction<'a> {
635 conn: limbo::Connection,
636 stmt_cache: StmtCache,
637 _guard: tokio::sync::MutexGuard<'a, ()>,
638 done: bool,
639}
640
641impl<'a> Drop for SqliteTransaction<'a> {
642 fn drop(&mut self) {
643 if !self.done {
644 // Best-effort rollback on implicit drop. We cannot `.await` inside
645 // `drop`, so we spawn a fire-and-forget task. The mutex guard is
646 // released when `SqliteTransaction` is fully dropped (after this
647 // function body returns).
648 let conn = self.conn.clone();
649 tokio::spawn(async move {
650 if let Err(e) = conn.execute("ROLLBACK", LimboParams::None).await {
651 log::warn!("SqliteTransaction drop: ROLLBACK failed: {e}");
652 }
653 });
654 }
655 }
656}
657
658#[async_trait]
659impl<'a> Transaction for SqliteTransaction<'a> {
660 async fn execute(&mut self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
661 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
662 exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
663 .await
664 .map_err(OxiSqlError::from)
665 }
666
667 async fn query(
668 &mut self,
669 sql: &str,
670 params: &[&dyn ToSqlValue],
671 ) -> Result<Vec<Row>, OxiSqlError> {
672 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
673 query_rewritten(&self.conn, &rewritten, limbo_params)
674 .await
675 .map_err(OxiSqlError::from)
676 }
677
678 async fn commit(mut self: Box<Self>) -> Result<(), OxiSqlError> {
679 self.done = true;
680 self.conn
681 .execute("COMMIT", LimboParams::None)
682 .await
683 .map_err(|e| OxiSqlError::Other(format!("COMMIT failed: {e}")))?;
684 Ok(())
685 }
686
687 async fn rollback(mut self: Box<Self>) -> Result<(), OxiSqlError> {
688 // Mark done so that Drop does not attempt a second ROLLBACK.
689 self.done = true;
690 self.conn
691 .execute("ROLLBACK", LimboParams::None)
692 .await
693 .map_err(|e| OxiSqlError::Other(format!("ROLLBACK failed: {e}")))?;
694 Ok(())
695 }
696}
697
698// ── SqlitePrepared ────────────────────────────────────────────────────────────
699
700/// A prepared statement backed by the connection-level LRU cache.
701///
702/// On each `execute()` call the cached `limbo::Statement` is retrieved (or
703/// compiled fresh on a miss), executed, and returned to the cache. Because
704/// `Statement::reset()` now zeroes `n_change`, every execution sees a correct
705/// change count without re-parsing the SQL.
706pub struct SqlitePrepared<'a> {
707 conn: &'a limbo::Connection,
708 stmt_cache: StmtCache,
709 sql: String,
710}
711
712#[async_trait]
713impl<'a> PreparedStatement for SqlitePrepared<'a> {
714 async fn execute(&mut self, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
715 let (rewritten, limbo_params) =
716 rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
717 exec_rewritten(self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
718 .await
719 .map_err(OxiSqlError::from)
720 }
721
722 async fn query(&mut self, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
723 let (rewritten, limbo_params) =
724 rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
725 query_rewritten(self.conn, &rewritten, limbo_params)
726 .await
727 .map_err(OxiSqlError::from)
728 }
729
730 fn sql(&self) -> &str {
731 &self.sql
732 }
733}