oxisql_sqlite_compat/connection.rs
1//! [`SqliteConnection`] — Limbo-backed implementation of [`oxisql_core::Connection`].
2//!
3//! # Concurrency model
4//!
5//! `limbo::Connection` is internally `Arc<Mutex<Arc<limbo_core::Connection>>>` with
6//! `unsafe impl Send + Sync`, so it is safe to clone and share across async tasks.
7//! `SqliteConnection` is a thin newtype that holds:
8//!
9//! - `conn: limbo::Connection` — the Limbo connection handle.
10//! - `txn_lock: Arc<tokio::sync::Mutex<()>>` — a guard that prevents two async tasks
11//! from issuing `BEGIN` concurrently on the same logical connection. SQLite does
12//! not support nested transactions, so only one task at a time may hold a
13//! transaction.
14//! - `path: String` — the path supplied to [`Builder::new_local`], retained for
15//! diagnostics.
16//!
17//! # Affected-row count
18//!
19//! After each DML statement we call `conn.changes()` to read the row count that
20//! was committed by the most-recent write transaction. DDL statements and
21//! `BEGIN`/`COMMIT`/`ROLLBACK` leave the counter at 0, which is the correct
22//! contract per OxiSQL and `sqlite3_changes()` semantics.
23//!
24//! # Parameter binding
25//!
26//! OxiSQL passes `$1`, `$2`, … positional parameters. SQLite / Limbo expects
27//! `?` placeholders. `types::rewrite_params` performs a quote-aware
28//! translation before each statement is prepared.
29//!
30//! # Schema introspection
31//!
32//! [`Connection::tables`] queries `sqlite_master`.
33//! [`Connection::columns`] uses `PRAGMA table_info`.
34//! [`Connection::indexes`] parses `sqlite_master` DDL (PRAGMA index_list/index_info are not
35//! yet implemented in Limbo 0.0.22).
36//! [`Connection::foreign_keys`] uses `PRAGMA foreign_key_list` — the engine now
37//! surfaces FK metadata from its in-memory schema.
38//!
39//! # Transactions
40//!
41//! [`Connection::transaction`] issues `BEGIN` and returns a [`SqliteTransaction`]
42//! that wraps the same `limbo::Connection`. The transaction holds a guard on
43//! `txn_lock` so that no other task can start a concurrent `BEGIN`.
44//! Dropping `SqliteTransaction` without calling `commit` or `rollback` will
45//! execute `ROLLBACK` (best-effort, via `Drop`).
46//!
47//! # Prepared-statement cache
48//!
49//! All DML and DDL statements pass through an LRU cache keyed by the
50//! **rewritten SQL** (after `$N`→`?` translation). The cache holds up to
51//! `STMT_CACHE_CAPACITY` (128) compiled `limbo::Statement` entries per connection
52//! (shared across clones of the same connection via `Arc<StdMutex<…>>`).
53//!
54//! On a cache hit the existing `limbo::Statement` is taken out of the cache,
55//! executed via `Statement::execute()` (which calls `reset()` before binding),
56//! and returned to the cache after execution. `Statement::reset()` now also
57//! zeroes `Program::n_change` (fixed in oxisqlite-core), so cached statement
58//! reuse produces correct per-execution change counts.
59//!
60//! # ROLLBACK
61//!
62//! `SqliteTransaction::rollback()` executes the SQL string `"ROLLBACK"` against
63//! the engine, exactly mirroring how `commit()` executes `"COMMIT"`. The engine
64//! emits an `AutoCommit { auto_commit: true, rollback: true }` VDBE instruction
65//! that discards all pending changes. The `Drop` impl also fires a best-effort
66//! ROLLBACK when the transaction is dropped without an explicit `commit()` or
67//! `rollback()`.
68//!
69//! # Prepared-statement reuse (via SqlitePrepared)
70//!
71//! Limbo's `Statement` is consumed after a single `execute`/`query` cycle.
72//! Our [`PreparedStatement`] wrapper therefore re-prepares on every call. The
73//! API contract (parse-once, bind-many) is satisfied at the OxiSQL trait level
74//! even though Limbo does not yet expose a stable compiled-statement cache.
75
76use std::num::NonZeroUsize;
77use std::sync::{Arc, Mutex as StdMutex};
78
79use async_trait::async_trait;
80use limbo::params::Params as LimboParams;
81use limbo::Builder;
82use tokio::sync::Mutex as TokioMutex;
83
84// ── statement-cache capacity ───────────────────────────────────────────────────
85
86/// Maximum number of compiled statements retained in the per-connection LRU
87/// cache. Statements are keyed by their rewritten SQL (`?`-placeholder form).
88const STMT_CACHE_CAPACITY: usize = 128;
89
90use oxisql_core::{
91 ColumnInfo, Connection, ForeignKeyInfo, IndexInfo, OxiSqlError, PreparedStatement, Row,
92 TableInfo, TableType, ToSqlValue, Transaction, Value,
93};
94
95use crate::error::SqliteCompatError;
96use crate::types::{limbo_to_core_typed, rewrite_params, split_statements};
97
98// ── helpers ───────────────────────────────────────────────────────────────────
99
100/// A per-connection LRU cache from rewritten SQL → compiled `limbo::Statement`.
101///
102/// Wrapped in `Arc<StdMutex<…>>` so it can be cheaply shared when the
103/// `SqliteConnection` is cloned. The std `Mutex` is deliberately chosen over
104/// `tokio::sync::Mutex`: the critical section is very short (single hash-lookup
105/// or insertion) and never held across an `.await` point.
106type StmtCache = Arc<StdMutex<lru::LruCache<String, limbo::Statement>>>;
107
108/// Construct a new, empty [`StmtCache`] with [`STMT_CACHE_CAPACITY`] slots.
109fn new_stmt_cache() -> StmtCache {
110 // SAFETY: STMT_CACHE_CAPACITY is a positive compile-time constant (128).
111 // `NonZeroUsize::new` returns `None` only for 0, which this is not.
112 let cap = NonZeroUsize::new(STMT_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
113 Arc::new(StdMutex::new(lru::LruCache::new(cap)))
114}
115
116/// Execute a SQL statement that has already been rewritten to `?` placeholders.
117///
118/// On a cache miss, compiles the statement via `conn.prepare()`, inserts it into
119/// the cache, then executes it via `stmt.execute()`. On a cache hit, retrieves
120/// the cached `limbo::Statement` and calls `stmt.execute()` directly —
121/// `Statement::execute()` calls `reset()` internally, which (after the engine
122/// fix in oxisqlite-core) also clears `n_change`, making reuse correct.
123///
124/// The affected-row count is read from `conn.changes()` after execution,
125/// which reflects the count committed by the most recent write transaction on
126/// this connection. DDL and `BEGIN`/`COMMIT`/`ROLLBACK` return 0, which is the
127/// correct value per the OxiSQL contract.
128///
129/// When no `cache` is provided (e.g., in unit tests that bypass the cache) the
130/// function falls back to `conn.execute()` followed by `conn.changes()`.
131async fn exec_rewritten(
132 conn: &limbo::Connection,
133 sql: &str,
134 limbo_params: Vec<limbo::Value>,
135 cache: Option<&StmtCache>,
136) -> Result<u64, SqliteCompatError> {
137 let lp = if limbo_params.is_empty() {
138 LimboParams::None
139 } else {
140 LimboParams::Positional(limbo_params)
141 };
142
143 match cache {
144 Some(c) => {
145 // ── cache path ─────────────────────────────────────────────────────
146 //
147 // DDL statements (CREATE, DROP, ALTER, PRAGMA, VACUUM) must bypass
148 // the statement cache entirely. Their compiled programs embed
149 // schema-state decisions (e.g., IF NOT EXISTS checks, ParseSchema
150 // opcodes) that are only valid at compile time. Re-executing a
151 // stale DDL program against a changed schema causes internal
152 // assertion failures inside the engine.
153 let is_ddl = {
154 let upper = sql.trim_start().to_ascii_uppercase();
155 upper.starts_with("CREATE")
156 || upper.starts_with("DROP")
157 || upper.starts_with("ALTER")
158 || upper.starts_with("PRAGMA")
159 || upper.starts_with("VACUUM")
160 };
161
162 if is_ddl {
163 // DDL: always compile fresh, never read from / write to cache.
164 let mut stmt = conn.prepare(sql).await.map_err(SqliteCompatError::from)?;
165 stmt.execute(lp).await.map_err(SqliteCompatError::from)?;
166 let n = conn
167 .changes()
168 .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
169 return Ok(n.max(0) as u64);
170 }
171
172 // ── DML / query cache path ─────────────────────────────────────────
173 //
174 // Retrieve a mutable reference to the cached statement, or compile a
175 // fresh one and insert it. We hold the lock only for the duration of
176 // the lookup/insert, not across the `.await` in `stmt.execute()`.
177 //
178 // Because `StmtCache` holds the actual `Statement` value (not a
179 // reference-counted pointer to it), we need to take ownership on a
180 // cache miss and then put it back after execution.
181
182 // Try to remove a hit from the cache so we have owned access during
183 // the async execute call.
184 let cached_stmt: Option<limbo::Statement> = c
185 .lock()
186 .map_err(|e| SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}")))?
187 .pop(sql);
188
189 let mut stmt = match cached_stmt {
190 Some(s) => s,
191 None => {
192 // Cache miss — compile a new statement.
193 conn.prepare(sql).await.map_err(SqliteCompatError::from)?
194 }
195 };
196
197 // Execute the (possibly retrieved-from-cache) statement.
198 // `Statement::execute()` calls `reset()` before binding parameters,
199 // and `reset()` now also zeroes `n_change`, so reuse is correct.
200 stmt.execute(lp).await.map_err(SqliteCompatError::from)?;
201
202 // Return the statement to the cache for future reuse (DML only).
203 c.lock()
204 .map_err(|e| SqliteCompatError::Other(format!("stmt_cache lock poisoned: {e}")))?
205 .put(sql.to_owned(), stmt);
206
207 // Read the affected-row count from the connection's native counter.
208 let n = conn
209 .changes()
210 .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
211 Ok(n.max(0) as u64)
212 }
213 None => {
214 // ── no-cache path (uncommon; bypasses the cache entirely) ──────────
215 conn.execute(sql, lp)
216 .await
217 .map_err(SqliteCompatError::from)?;
218 let n = conn
219 .changes()
220 .map_err(|e| SqliteCompatError::Other(format!("changes() failed: {e}")))?;
221 Ok(n.max(0) as u64)
222 }
223 }
224}
225
226/// Execute a query that has already been rewritten to `?` placeholders and
227/// collect all result rows.
228///
229/// Column declared types (e.g. `"DATE"`, `"TIMESTAMP"`, `"UUID"`) are
230/// collected from the prepared statement and forwarded to [`limbo_to_core_typed`]
231/// so that richer [`Value`] variants are produced when appropriate.
232async fn query_rewritten(
233 conn: &limbo::Connection,
234 sql: &str,
235 limbo_params: Vec<limbo::Value>,
236) -> Result<Vec<Row>, SqliteCompatError> {
237 let lp = if limbo_params.is_empty() {
238 LimboParams::None
239 } else {
240 LimboParams::Positional(limbo_params)
241 };
242
243 let mut stmt = conn.prepare(sql).await.map_err(SqliteCompatError::from)?;
244
245 // Collect column names and declared types together.
246 let col_info: Vec<(String, Option<String>)> = stmt
247 .columns()
248 .iter()
249 .map(|c| (c.name().to_owned(), c.decl_type().map(str::to_owned)))
250 .collect();
251
252 let col_names: Vec<String> = col_info.iter().map(|(name, _)| name.clone()).collect();
253
254 let mut rows_iter = stmt.query(lp).await.map_err(SqliteCompatError::from)?;
255
256 let mut rows: Vec<Row> = Vec::new();
257 while let Some(limbo_row) = rows_iter.next().await.map_err(SqliteCompatError::from)? {
258 let mut values: Vec<Value> = Vec::with_capacity(col_info.len());
259 for idx in 0..limbo_row.column_count() {
260 let raw = limbo_row.get_value(idx).map_err(SqliteCompatError::from)?;
261 let decl = col_info.get(idx).and_then(|(_, dt)| dt.as_deref());
262 values.push(limbo_to_core_typed(raw, decl)?);
263 }
264 rows.push(Row::new(col_names.clone(), values));
265 }
266 Ok(rows)
267}
268
269// ── SqliteConnection ──────────────────────────────────────────────────────────
270
271/// A Limbo-backed SQLite connection implementing [`Connection`].
272///
273/// Create via [`SqliteConnection::open`] (file path) or
274/// [`SqliteConnection::open_memory`] (`:memory:`).
275///
276/// # Statement cache
277///
278/// Each `SqliteConnection` maintains an LRU cache of compiled `limbo::Statement`
279/// objects (capacity: `STMT_CACHE_CAPACITY` = 128). The cache is shared across
280/// clones of the same connection (the clones share the underlying
281/// `limbo::Connection`) and is updated on every DML/DDL execution. Cache hits
282/// save the per-statement parse-and-compile round-trip inside Limbo.
283#[derive(Clone)]
284pub struct SqliteConnection {
285 conn: limbo::Connection,
286 txn_lock: Arc<TokioMutex<()>>,
287 stmt_cache: StmtCache,
288 path: String,
289}
290
291impl std::fmt::Debug for SqliteConnection {
292 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
293 let cache_len = self.stmt_cache.lock().map(|g| g.len()).unwrap_or(0);
294 f.debug_struct("SqliteConnection")
295 .field("path", &self.path)
296 .field("stmt_cache_len", &cache_len)
297 .finish_non_exhaustive()
298 }
299}
300
301impl SqliteConnection {
302 /// Open a Limbo database at the given file path.
303 ///
304 /// Pass `":memory:"` for an in-memory database, or use
305 /// [`open_memory`][Self::open_memory] for clarity.
306 ///
307 /// # Errors
308 ///
309 /// Returns [`OxiSqlError`] if the file cannot be opened or created.
310 pub async fn open(path: &str) -> Result<Self, OxiSqlError> {
311 let db = Builder::new_local(path)
312 .build()
313 .await
314 .map_err(|e| OxiSqlError::Other(format!("limbo open error: {e}")))?;
315 let conn = db
316 .connect()
317 .map_err(|e| OxiSqlError::Other(format!("limbo connect error: {e}")))?;
318 Ok(Self {
319 conn,
320 txn_lock: Arc::new(TokioMutex::new(())),
321 stmt_cache: new_stmt_cache(),
322 path: path.to_owned(),
323 })
324 }
325
326 /// Open a fresh in-memory Limbo database.
327 ///
328 /// # Errors
329 ///
330 /// Returns [`OxiSqlError`] if the engine cannot be initialised.
331 pub async fn open_memory() -> Result<Self, OxiSqlError> {
332 Self::open(":memory:").await
333 }
334
335 /// Return the path this connection was opened with.
336 pub fn path(&self) -> &str {
337 &self.path
338 }
339}
340
341// ── Connection impl ───────────────────────────────────────────────────────────
342
343#[async_trait]
344impl Connection for SqliteConnection {
345 async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
346 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
347 exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
348 .await
349 .map_err(OxiSqlError::from)
350 }
351
352 async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
353 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
354 query_rewritten(&self.conn, &rewritten, limbo_params)
355 .await
356 .map_err(OxiSqlError::from)
357 }
358
359 async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
360 // Acquire the exclusive transaction lock before issuing BEGIN.
361 // This prevents a second task from starting a concurrent transaction
362 // on the same SqliteConnection clone.
363 let guard = self.txn_lock.lock().await;
364 self.conn
365 .execute("BEGIN", LimboParams::None)
366 .await
367 .map_err(|e| OxiSqlError::Other(format!("BEGIN failed: {e}")))?;
368 Ok(Box::new(SqliteTransaction {
369 conn: self.conn.clone(),
370 // Share the connection-level stmt_cache so that DML executed inside
371 // a transaction also benefits from cached compiled statements.
372 stmt_cache: Arc::clone(&self.stmt_cache),
373 // Transfer ownership of the mutex guard into the transaction.
374 // The guard is released when SqliteTransaction is dropped.
375 _guard: guard,
376 done: false,
377 }))
378 }
379
380 async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
381 // Token-aware split: honours `;` inside string literals, quoted
382 // identifiers, block comments, and line comments.
383 let stmts = split_statements(sql);
384 let mut total = 0u64;
385 for stmt in stmts {
386 total += self.execute(stmt, &[]).await?;
387 }
388 Ok(total)
389 }
390
391 async fn ping(&self) -> Result<(), OxiSqlError> {
392 self.query("SELECT 1", &[]).await?;
393 Ok(())
394 }
395
396 async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
397 Ok(Box::new(SqlitePrepared {
398 conn: &self.conn,
399 stmt_cache: Arc::clone(&self.stmt_cache),
400 sql: sql.to_owned(),
401 }))
402 }
403
404 // ── Schema introspection ──────────────────────────────────────────────────
405
406 async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
407 let rows = self
408 .query(
409 "SELECT name, type FROM sqlite_master \
410 WHERE type IN ('table','view') AND name NOT LIKE 'sqlite_%' \
411 ORDER BY name",
412 &[],
413 )
414 .await?;
415
416 let infos = rows
417 .into_iter()
418 .map(|row| {
419 let name = row
420 .get_by_index(0)
421 .and_then(|v| {
422 if let Value::Text(s) = v {
423 Some(s.clone())
424 } else {
425 None
426 }
427 })
428 .unwrap_or_default();
429 let ttype_str = row
430 .get_by_index(1)
431 .and_then(|v| {
432 if let Value::Text(s) = v {
433 Some(s.as_str())
434 } else {
435 None
436 }
437 })
438 .unwrap_or("table");
439 let table_type = match ttype_str {
440 "view" => TableType::View,
441 _ => TableType::Base,
442 };
443 TableInfo {
444 name,
445 schema: None,
446 table_type,
447 }
448 })
449 .collect();
450 Ok(infos)
451 }
452
453 async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
454 // PRAGMA table_info returns: cid, name, type, notnull, dflt_value, pk
455 let sql = format!("PRAGMA table_info(\"{table}\")");
456 let rows = self.query(&sql, &[]).await?;
457
458 let infos = rows
459 .into_iter()
460 .map(|row| {
461 // Helper: get column by index as string or empty string.
462 let text_at = |r: &Row, idx: usize| -> String {
463 r.get_by_index(idx)
464 .and_then(|v| match v {
465 Value::Text(s) => Some(s.clone()),
466 Value::I64(n) => Some(n.to_string()),
467 Value::Null => Some(String::new()),
468 _ => None,
469 })
470 .unwrap_or_default()
471 };
472 let i64_at = |r: &Row, idx: usize| -> i64 {
473 r.get_by_index(idx)
474 .and_then(|v| {
475 if let Value::I64(n) = v {
476 Some(*n)
477 } else {
478 None
479 }
480 })
481 .unwrap_or(0)
482 };
483
484 let ordinal = i64_at(&row, 0) as u32 + 1; // cid is 0-based
485 let name = text_at(&row, 1);
486 let data_type = text_at(&row, 2);
487 let notnull = i64_at(&row, 3) != 0;
488 let default_val = row.get_by_index(4).and_then(|v| match v {
489 Value::Text(s) => Some(s.clone()),
490 Value::Null => None,
491 other => Some(format!("{other:?}")),
492 });
493
494 ColumnInfo {
495 name,
496 ordinal_position: ordinal,
497 data_type,
498 nullable: !notnull,
499 default: default_val,
500 max_length: None,
501 numeric_precision: None,
502 numeric_scale: None,
503 }
504 })
505 .collect();
506 Ok(infos)
507 }
508
509 async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
510 // PRAGMA index_list and PRAGMA index_info are not yet implemented in limbo 0.0.22.
511 // Fall back to sqlite_master for index names and uniqueness, then parse
512 // the index SQL to extract column names. This is best-effort: multi-column
513 // indexes and expression indexes may not parse perfectly.
514 let sql = "SELECT name, sql FROM sqlite_master \
515 WHERE type='index' AND tbl_name=$1 AND name NOT LIKE 'sqlite_%'";
516 let rows = self.query(sql, &[&table]).await?;
517
518 let mut infos: Vec<IndexInfo> = Vec::new();
519 for row in rows {
520 let name = row
521 .get_by_index(0)
522 .and_then(|v| {
523 if let Value::Text(s) = v {
524 Some(s.clone())
525 } else {
526 None
527 }
528 })
529 .unwrap_or_default();
530 let idx_sql = row
531 .get_by_index(1)
532 .and_then(|v| {
533 if let Value::Text(s) = v {
534 Some(s.clone())
535 } else {
536 None
537 }
538 })
539 .unwrap_or_default();
540
541 // Detect UNIQUE from the CREATE INDEX / CREATE UNIQUE INDEX statement.
542 let upper = idx_sql.to_ascii_uppercase();
543 let unique = upper.contains("UNIQUE");
544
545 // Extract column list between the last `(` and `)`.
546 let columns: Vec<String> =
547 if let (Some(open), Some(close)) = (idx_sql.rfind('('), idx_sql.rfind(')')) {
548 idx_sql[open + 1..close]
549 .split(',')
550 .map(|c| c.trim().to_string())
551 .filter(|c| !c.is_empty())
552 .collect()
553 } else {
554 vec![]
555 };
556
557 infos.push(IndexInfo {
558 name,
559 columns,
560 unique,
561 primary: false,
562 });
563 }
564 Ok(infos)
565 }
566
567 async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
568 // Use PRAGMA foreign_key_list — the engine now surfaces FK metadata
569 // directly from the in-memory schema, avoiding brittle DDL text parsing.
570 let escaped = table.replace('"', "\"\"");
571 let sql = format!("PRAGMA foreign_key_list(\"{}\")", escaped);
572 let rows = query_rewritten(&self.conn, &sql, vec![])
573 .await
574 .map_err(OxiSqlError::from)?;
575
576 // PRAGMA foreign_key_list columns (by index):
577 // 0: id INTEGER — FK index within the table
578 // 1: seq INTEGER — column position within a composite FK
579 // 2: table TEXT — parent table name
580 // 3: from TEXT — child column name
581 // 4: to TEXT/NULL — parent column name (NULL = implicit PK ref)
582 // 5: on_update TEXT
583 // 6: on_delete TEXT
584 // 7: match TEXT
585 let mut infos: Vec<ForeignKeyInfo> = Vec::with_capacity(rows.len());
586 for row in &rows {
587 let id = match row.get_by_index(0) {
588 Some(Value::I64(v)) => *v,
589 _ => 0,
590 };
591 let from_col = match row.get_by_index(3) {
592 Some(Value::Text(s)) => s.clone(),
593 _ => continue,
594 };
595 let foreign_table = match row.get_by_index(2) {
596 Some(Value::Text(s)) => s.clone(),
597 _ => continue,
598 };
599 let foreign_column = match row.get_by_index(4) {
600 Some(Value::Text(s)) => s.clone(),
601 _ => String::new(),
602 };
603 let on_update = match row.get_by_index(5) {
604 Some(Value::Text(s)) => Some(s.clone()),
605 _ => None,
606 };
607 let on_delete = match row.get_by_index(6) {
608 Some(Value::Text(s)) => Some(s.clone()),
609 _ => None,
610 };
611 let constraint_name = format!("fk_{table}_{id}");
612 infos.push(ForeignKeyInfo {
613 constraint_name,
614 column: from_col,
615 foreign_table,
616 foreign_column,
617 on_update,
618 on_delete,
619 });
620 }
621 Ok(infos)
622 }
623}
624
625// ── SqliteTransaction ─────────────────────────────────────────────────────────
626
627/// A SQLite transaction backed by raw `BEGIN`/`COMMIT`/`ROLLBACK` statements.
628///
629/// Holds a guard on the connection-level transaction mutex so that no other
630/// async task can start a concurrent `BEGIN` on the same `SqliteConnection`.
631/// When dropped without an explicit `commit` or `rollback`, the transaction
632/// attempts a best-effort `ROLLBACK` via a background task.
633pub struct SqliteTransaction<'a> {
634 conn: limbo::Connection,
635 stmt_cache: StmtCache,
636 _guard: tokio::sync::MutexGuard<'a, ()>,
637 done: bool,
638}
639
640impl<'a> Drop for SqliteTransaction<'a> {
641 fn drop(&mut self) {
642 if !self.done {
643 // Best-effort rollback on implicit drop. We cannot `.await` inside
644 // `drop`, so we spawn a fire-and-forget task. The mutex guard is
645 // released when `SqliteTransaction` is fully dropped (after this
646 // function body returns).
647 let conn = self.conn.clone();
648 tokio::spawn(async move {
649 if let Err(e) = conn.execute("ROLLBACK", LimboParams::None).await {
650 log::warn!("SqliteTransaction drop: ROLLBACK failed: {e}");
651 }
652 });
653 }
654 }
655}
656
657#[async_trait]
658impl<'a> Transaction for SqliteTransaction<'a> {
659 async fn execute(&mut self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
660 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
661 exec_rewritten(&self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
662 .await
663 .map_err(OxiSqlError::from)
664 }
665
666 async fn query(
667 &mut self,
668 sql: &str,
669 params: &[&dyn ToSqlValue],
670 ) -> Result<Vec<Row>, OxiSqlError> {
671 let (rewritten, limbo_params) = rewrite_params(sql, params).map_err(OxiSqlError::from)?;
672 query_rewritten(&self.conn, &rewritten, limbo_params)
673 .await
674 .map_err(OxiSqlError::from)
675 }
676
677 async fn commit(mut self: Box<Self>) -> Result<(), OxiSqlError> {
678 self.done = true;
679 self.conn
680 .execute("COMMIT", LimboParams::None)
681 .await
682 .map_err(|e| OxiSqlError::Other(format!("COMMIT failed: {e}")))?;
683 Ok(())
684 }
685
686 async fn rollback(mut self: Box<Self>) -> Result<(), OxiSqlError> {
687 // Mark done so that Drop does not attempt a second ROLLBACK.
688 self.done = true;
689 self.conn
690 .execute("ROLLBACK", LimboParams::None)
691 .await
692 .map_err(|e| OxiSqlError::Other(format!("ROLLBACK failed: {e}")))?;
693 Ok(())
694 }
695}
696
697// ── SqlitePrepared ────────────────────────────────────────────────────────────
698
699/// A prepared statement backed by the connection-level LRU cache.
700///
701/// On each `execute()` call the cached `limbo::Statement` is retrieved (or
702/// compiled fresh on a miss), executed, and returned to the cache. Because
703/// `Statement::reset()` now zeroes `n_change`, every execution sees a correct
704/// change count without re-parsing the SQL.
705pub struct SqlitePrepared<'a> {
706 conn: &'a limbo::Connection,
707 stmt_cache: StmtCache,
708 sql: String,
709}
710
711#[async_trait]
712impl<'a> PreparedStatement for SqlitePrepared<'a> {
713 async fn execute(&mut self, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
714 let (rewritten, limbo_params) =
715 rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
716 exec_rewritten(self.conn, &rewritten, limbo_params, Some(&self.stmt_cache))
717 .await
718 .map_err(OxiSqlError::from)
719 }
720
721 async fn query(&mut self, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
722 let (rewritten, limbo_params) =
723 rewrite_params(&self.sql, params).map_err(OxiSqlError::from)?;
724 query_rewritten(self.conn, &rewritten, limbo_params)
725 .await
726 .map_err(OxiSqlError::from)
727 }
728
729 fn sql(&self) -> &str {
730 &self.sql
731 }
732}