Skip to main content

sqlmodel_frankensqlite/
connection.rs

1//! FrankenSQLite connection implementing `sqlmodel_core::Connection`.
2//!
3//! Wraps `fsqlite::Connection` (which is `!Send` due to `Rc<RefCell<>>`) in
4//! `Arc<Mutex<>>` to satisfy the `Connection: Send + Sync` requirement.
5//! All operations execute synchronously under the mutex, matching the pattern
6//! used by `sqlmodel-sqlite` for its FFI-based wrapper.
7
8#![allow(clippy::cast_possible_truncation)]
9#![allow(clippy::cast_sign_loss)]
10#![allow(clippy::result_large_err)]
11
12use crate::value::{sqlite_to_value, value_to_sqlite};
13use fsqlite_types::value::SqliteValue;
14use sqlmodel_core::{
15    Connection, Cx, IsolationLevel, Outcome, PreparedStatement, Row, TransactionOps, Value,
16    error::{ConnectionError, ConnectionErrorKind, Error, QueryError, QueryErrorKind},
17    row::ColumnInfo,
18};
19use std::future::Future;
20use std::sync::{Arc, Mutex};
21
22/// Inner state guarded by a mutex.
23struct FrankenInner {
24    /// The underlying frankensqlite connection (`!Send`, hence wrapped).
25    conn: fsqlite::Connection,
26    /// Whether we are currently inside a transaction.
27    in_transaction: bool,
28    /// The last inserted rowid (tracked manually since frankensqlite stubs it).
29    last_insert_rowid: i64,
30}
31
32// SAFETY: All access to `FrankenInner` goes through the `Mutex`, which
33// serializes access. The `Rc<RefCell<>>` inside `fsqlite::Connection` is
34// never shared across threads — the mutex ensures single-threaded access.
35unsafe impl Send for FrankenInner {}
36
37/// A SQLite connection backed by FrankenSQLite (pure Rust).
38///
39/// Implements `sqlmodel_core::Connection` and provides sync helper methods
40/// (`execute_raw`, `query_sync`, `execute_sync`, etc.) matching the
41/// `SqliteConnection` API for drop-in replacement.
42pub struct FrankenConnection {
43    inner: Arc<Mutex<FrankenInner>>,
44    path: String,
45}
46
47// SAFETY: All access goes through Arc<Mutex<>> — single-thread serialization.
48unsafe impl Send for FrankenConnection {}
49unsafe impl Sync for FrankenConnection {}
50
51impl FrankenConnection {
52    /// Open a connection with the given path.
53    ///
54    /// Use `":memory:"` for an in-memory database, or a file path for
55    /// persistent storage.
56    pub fn open(path: impl Into<String>) -> Result<Self, Error> {
57        let path = path.into();
58        let conn = fsqlite::Connection::open(&path).map_err(|e| franken_to_conn_error(&e))?;
59        Ok(Self {
60            inner: Arc::new(Mutex::new(FrankenInner {
61                conn,
62                in_transaction: false,
63                last_insert_rowid: 0,
64            })),
65            path,
66        })
67    }
68
69    /// Open an in-memory database.
70    pub fn open_memory() -> Result<Self, Error> {
71        Self::open(":memory:")
72    }
73
74    /// Open a file-based database.
75    pub fn open_file(path: impl Into<String>) -> Result<Self, Error> {
76        Self::open(path)
77    }
78
79    /// Get the database path.
80    pub fn path(&self) -> &str {
81        &self.path
82    }
83
84    fn close_inner(inner: Arc<Mutex<FrankenInner>>) -> Result<(), Error> {
85        match Arc::try_unwrap(inner) {
86            Ok(mutex) => {
87                let inner = mutex
88                    .into_inner()
89                    .unwrap_or_else(|poisoned| poisoned.into_inner());
90                inner.conn.close().map_err(|e| franken_to_conn_error(&e))
91            }
92            Err(inner) => Err(Error::Connection(ConnectionError {
93                kind: ConnectionErrorKind::Disconnected,
94                message: format!(
95                    "cannot close FrankenConnection cleanly while {} strong references remain",
96                    Arc::strong_count(&inner)
97                ),
98                source: None,
99            })),
100        }
101    }
102
103    /// Close the underlying frankensqlite connection synchronously.
104    pub fn close_sync(self) -> Result<(), Error> {
105        let Self { inner, path: _ } = self;
106        Self::close_inner(inner)
107    }
108
109    /// Execute SQL directly without parameter binding (for DDL, PRAGMAs, etc.)
110    pub fn execute_raw(&self, sql: &str) -> Result<(), Error> {
111        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
112        inner
113            .conn
114            .execute(sql)
115            .map_err(|e| franken_to_query_error(&e, sql))?;
116        Ok(())
117    }
118
119    /// Prepare and execute a query synchronously, returning all rows.
120    pub fn query_sync(&self, sql: &str, params: &[Value]) -> Result<Vec<Row>, Error> {
121        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
122        let sqlite_params: Vec<SqliteValue> = params.iter().map(value_to_sqlite).collect();
123
124        let franken_rows = if sqlite_params.is_empty() {
125            inner.conn.query(sql)
126        } else {
127            inner.conn.query_with_params(sql, &sqlite_params)
128        }
129        .map_err(|e| franken_to_query_error(&e, sql))?;
130
131        // For RETURNING *, get column names from table schema
132        let schema_columns = self.get_returning_star_columns(sql, &inner.conn);
133        Ok(convert_rows_with_schema(
134            &franken_rows,
135            sql,
136            schema_columns.as_deref(),
137        ))
138    }
139
140    /// Get column names for RETURNING * from the table schema.
141    fn get_returning_star_columns(
142        &self,
143        sql: &str,
144        conn: &fsqlite_core::connection::Connection,
145    ) -> Option<Vec<String>> {
146        let upper = sql.to_uppercase();
147
148        // Check if this is a RETURNING * query
149        if !upper.contains(" RETURNING *") && !upper.ends_with("RETURNING *") {
150            return None;
151        }
152
153        // Extract table name
154        let table_name = extract_table_name_for_returning(sql)?;
155
156        // Query PRAGMA table_info to get column names
157        let pragma_sql = format!("PRAGMA table_info({})", table_name);
158        let pragma_rows = match conn.query(&pragma_sql) {
159            Ok(rows) => rows,
160            Err(_) => return None,
161        };
162
163        // PRAGMA table_info returns: cid, name, type, notnull, dflt_value, pk
164        // Column index 1 is the name
165        let columns: Vec<String> = pragma_rows
166            .iter()
167            .filter_map(|row| {
168                row.values().get(1).and_then(|v| match v {
169                    SqliteValue::Text(s) => Some(s.to_string()),
170                    _ => None,
171                })
172            })
173            .collect();
174
175        if columns.is_empty() {
176            None
177        } else {
178            Some(columns)
179        }
180    }
181
182    /// Prepare and execute a statement synchronously, returning rows affected.
183    pub fn execute_sync(&self, sql: &str, params: &[Value]) -> Result<u64, Error> {
184        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
185        let sqlite_params: Vec<SqliteValue> = params.iter().map(value_to_sqlite).collect();
186
187        let count = if sqlite_params.is_empty() {
188            inner.conn.execute(sql)
189        } else {
190            inner.conn.execute_with_params(sql, &sqlite_params)
191        }
192        .map_err(|e| franken_to_query_error(&e, sql))?;
193
194        // Track last_insert_rowid for INSERT statements
195        if is_insert_sql(sql) {
196            // After an INSERT, query last_insert_rowid()
197            if let Ok(rows) = inner.conn.query("SELECT last_insert_rowid()") {
198                if let Some(row) = rows.first() {
199                    if let Some(SqliteValue::Integer(id)) = row.get(0) {
200                        inner.last_insert_rowid = *id;
201                    }
202                }
203            }
204        }
205
206        Ok(count as u64)
207    }
208
209    /// Get the last inserted rowid.
210    pub fn last_insert_rowid(&self) -> i64 {
211        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
212        inner.last_insert_rowid
213    }
214
215    /// Get the number of rows changed by the last statement.
216    pub fn changes(&self) -> i64 {
217        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
218        if let Ok(rows) = inner.conn.query("SELECT changes()") {
219            if let Some(row) = rows.first() {
220                if let Some(SqliteValue::Integer(n)) = row.get(0) {
221                    return *n;
222                }
223            }
224        }
225        0
226    }
227
228    /// Execute an INSERT and return the last inserted rowid.
229    fn insert_sync(&self, sql: &str, params: &[Value]) -> Result<i64, Error> {
230        self.execute_sync(sql, params)?;
231        Ok(self.last_insert_rowid())
232    }
233
234    /// Begin a transaction.
235    fn begin_sync(&self, isolation: IsolationLevel) -> Result<(), Error> {
236        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
237        if inner.in_transaction {
238            return Err(Error::Query(QueryError {
239                kind: QueryErrorKind::Database,
240                sql: None,
241                sqlstate: None,
242                message: "Already in a transaction".to_string(),
243                detail: None,
244                hint: None,
245                position: None,
246                source: None,
247            }));
248        }
249
250        let begin_sql = match isolation {
251            IsolationLevel::Serializable => "BEGIN EXCLUSIVE",
252            IsolationLevel::RepeatableRead | IsolationLevel::ReadCommitted => "BEGIN IMMEDIATE",
253            IsolationLevel::ReadUncommitted => "BEGIN DEFERRED",
254        };
255
256        inner
257            .conn
258            .execute(begin_sql)
259            .map_err(|e| franken_to_query_error(&e, begin_sql))?;
260
261        inner.in_transaction = true;
262        Ok(())
263    }
264
265    /// Commit the current transaction.
266    fn commit_sync(&self) -> Result<(), Error> {
267        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
268        if !inner.in_transaction {
269            return Err(Error::Query(QueryError {
270                kind: QueryErrorKind::Database,
271                sql: None,
272                sqlstate: None,
273                message: "Not in a transaction".to_string(),
274                detail: None,
275                hint: None,
276                position: None,
277                source: None,
278            }));
279        }
280
281        inner
282            .conn
283            .execute("COMMIT")
284            .map_err(|e| franken_to_query_error(&e, "COMMIT"))?;
285
286        inner.in_transaction = false;
287        Ok(())
288    }
289
290    /// Rollback the current transaction.
291    fn rollback_sync(&self) -> Result<(), Error> {
292        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
293        if !inner.in_transaction {
294            return Err(Error::Query(QueryError {
295                kind: QueryErrorKind::Database,
296                sql: None,
297                sqlstate: None,
298                message: "Not in a transaction".to_string(),
299                detail: None,
300                hint: None,
301                position: None,
302                source: None,
303            }));
304        }
305
306        inner
307            .conn
308            .execute("ROLLBACK")
309            .map_err(|e| franken_to_query_error(&e, "ROLLBACK"))?;
310
311        inner.in_transaction = false;
312        Ok(())
313    }
314}
315
316// ── Connection trait impl ─────────────────────────────────────────────────
317
318impl Connection for FrankenConnection {
319    type Tx<'conn>
320        = FrankenTransaction<'conn>
321    where
322        Self: 'conn;
323
324    fn dialect(&self) -> sqlmodel_core::Dialect {
325        sqlmodel_core::Dialect::Sqlite
326    }
327
328    fn query(
329        &self,
330        _cx: &Cx,
331        sql: &str,
332        params: &[Value],
333    ) -> impl Future<Output = Outcome<Vec<Row>, Error>> + Send {
334        let result = self.query_sync(sql, params);
335        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
336    }
337
338    fn query_one(
339        &self,
340        _cx: &Cx,
341        sql: &str,
342        params: &[Value],
343    ) -> impl Future<Output = Outcome<Option<Row>, Error>> + Send {
344        let result = self.query_sync(sql, params).map(|mut rows| rows.pop());
345        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
346    }
347
348    fn execute(
349        &self,
350        _cx: &Cx,
351        sql: &str,
352        params: &[Value],
353    ) -> impl Future<Output = Outcome<u64, Error>> + Send {
354        let result = self.execute_sync(sql, params);
355        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
356    }
357
358    fn insert(
359        &self,
360        _cx: &Cx,
361        sql: &str,
362        params: &[Value],
363    ) -> impl Future<Output = Outcome<i64, Error>> + Send {
364        let result = self.insert_sync(sql, params);
365        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
366    }
367
368    fn batch(
369        &self,
370        _cx: &Cx,
371        statements: &[(String, Vec<Value>)],
372    ) -> impl Future<Output = Outcome<Vec<u64>, Error>> + Send {
373        let mut results = Vec::with_capacity(statements.len());
374        let mut error = None;
375
376        for (sql, params) in statements {
377            match self.execute_sync(sql, params) {
378                Ok(n) => results.push(n),
379                Err(e) => {
380                    error = Some(e);
381                    break;
382                }
383            }
384        }
385
386        async move {
387            match error {
388                Some(e) => Outcome::Err(e),
389                None => Outcome::Ok(results),
390            }
391        }
392    }
393
394    fn begin(&self, cx: &Cx) -> impl Future<Output = Outcome<Self::Tx<'_>, Error>> + Send {
395        self.begin_with(cx, IsolationLevel::default())
396    }
397
398    fn begin_with(
399        &self,
400        _cx: &Cx,
401        isolation: IsolationLevel,
402    ) -> impl Future<Output = Outcome<Self::Tx<'_>, Error>> + Send {
403        let result = self
404            .begin_sync(isolation)
405            .map(|()| FrankenTransaction::new(self));
406        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
407    }
408
409    fn prepare(
410        &self,
411        _cx: &Cx,
412        sql: &str,
413    ) -> impl Future<Output = Outcome<PreparedStatement, Error>> + Send {
414        // Count parameters (simple heuristic: count ?N placeholders)
415        let param_count = count_params(sql);
416        let id = sql.as_ptr() as u64;
417
418        // Try to infer column names from the SQL
419        let columns = infer_column_names(sql);
420
421        let stmt = if columns.is_empty() {
422            PreparedStatement::new(id, sql.to_string(), param_count)
423        } else {
424            PreparedStatement::with_columns(id, sql.to_string(), param_count, columns)
425        };
426
427        async move { Outcome::Ok(stmt) }
428    }
429
430    fn query_prepared(
431        &self,
432        cx: &Cx,
433        stmt: &PreparedStatement,
434        params: &[Value],
435    ) -> impl Future<Output = Outcome<Vec<Row>, Error>> + Send {
436        self.query(cx, stmt.sql(), params)
437    }
438
439    fn execute_prepared(
440        &self,
441        cx: &Cx,
442        stmt: &PreparedStatement,
443        params: &[Value],
444    ) -> impl Future<Output = Outcome<u64, Error>> + Send {
445        self.execute(cx, stmt.sql(), params)
446    }
447
448    fn ping(&self, _cx: &Cx) -> impl Future<Output = Outcome<(), Error>> + Send {
449        let result = self.query_sync("SELECT 1", &[]).map(|_| ());
450        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
451    }
452
453    async fn close(self, _cx: &Cx) -> sqlmodel_core::Result<()> {
454        self.close_sync()
455    }
456}
457
458// ── Transaction ───────────────────────────────────────────────────────────
459
460/// A FrankenSQLite transaction.
461pub struct FrankenTransaction<'conn> {
462    conn: &'conn FrankenConnection,
463    committed: bool,
464}
465
466impl<'conn> FrankenTransaction<'conn> {
467    fn new(conn: &'conn FrankenConnection) -> Self {
468        Self {
469            conn,
470            committed: false,
471        }
472    }
473}
474
475impl Drop for FrankenTransaction<'_> {
476    fn drop(&mut self) {
477        if !self.committed {
478            let _ = self.conn.rollback_sync();
479        }
480    }
481}
482
483impl TransactionOps for FrankenTransaction<'_> {
484    fn query(
485        &self,
486        _cx: &Cx,
487        sql: &str,
488        params: &[Value],
489    ) -> impl Future<Output = Outcome<Vec<Row>, Error>> + Send {
490        let result = self.conn.query_sync(sql, params);
491        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
492    }
493
494    fn query_one(
495        &self,
496        _cx: &Cx,
497        sql: &str,
498        params: &[Value],
499    ) -> impl Future<Output = Outcome<Option<Row>, Error>> + Send {
500        let result = self.conn.query_sync(sql, params).map(|mut rows| rows.pop());
501        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
502    }
503
504    fn execute(
505        &self,
506        _cx: &Cx,
507        sql: &str,
508        params: &[Value],
509    ) -> impl Future<Output = Outcome<u64, Error>> + Send {
510        let result = self.conn.execute_sync(sql, params);
511        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
512    }
513
514    fn savepoint(&self, _cx: &Cx, name: &str) -> impl Future<Output = Outcome<(), Error>> + Send {
515        let quoted = format!("\"{}\"", name.replace('"', "\"\""));
516        let sql = format!("SAVEPOINT {quoted}");
517        let result = self.conn.execute_raw(&sql);
518        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
519    }
520
521    fn rollback_to(&self, _cx: &Cx, name: &str) -> impl Future<Output = Outcome<(), Error>> + Send {
522        let quoted = format!("\"{}\"", name.replace('"', "\"\""));
523        let sql = format!("ROLLBACK TO {quoted}");
524        let result = self.conn.execute_raw(&sql);
525        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
526    }
527
528    fn release(&self, _cx: &Cx, name: &str) -> impl Future<Output = Outcome<(), Error>> + Send {
529        let quoted = format!("\"{}\"", name.replace('"', "\"\""));
530        let sql = format!("RELEASE {quoted}");
531        let result = self.conn.execute_raw(&sql);
532        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
533    }
534
535    async fn commit(mut self, _cx: &Cx) -> Outcome<(), Error> {
536        self.committed = true;
537        self.conn
538            .commit_sync()
539            .map_or_else(Outcome::Err, Outcome::Ok)
540    }
541
542    async fn rollback(mut self, _cx: &Cx) -> Outcome<(), Error> {
543        self.committed = true; // Prevent double rollback in drop
544        self.conn
545            .rollback_sync()
546            .map_or_else(Outcome::Err, Outcome::Ok)
547    }
548}
549
550// ── Helper functions ──────────────────────────────────────────────────────
551
552/// Convert frankensqlite rows to sqlmodel-core rows.
553///
554/// frankensqlite `Row` has no column names, so we infer them from the SQL
555/// or fall back to positional names (`_c0`, `_c1`, ...).
556#[allow(dead_code)]
557fn convert_rows(franken_rows: &[fsqlite_core::connection::Row], sql: &str) -> Vec<Row> {
558    convert_rows_with_schema(franken_rows, sql, None)
559}
560
561/// Convert frankensqlite rows to sqlmodel-core rows with optional schema-provided column names.
562///
563/// If `schema_columns` is provided (e.g., from PRAGMA table_info for RETURNING *),
564/// those names are used instead of inferring from SQL.
565fn convert_rows_with_schema(
566    franken_rows: &[fsqlite_core::connection::Row],
567    sql: &str,
568    schema_columns: Option<&[String]>,
569) -> Vec<Row> {
570    if franken_rows.is_empty() {
571        return Vec::new();
572    }
573
574    // Determine column count from first row
575    let col_count = franken_rows[0].values().len();
576
577    // Use schema columns if provided, otherwise infer from SQL
578    let mut col_names = if let Some(schema_cols) = schema_columns {
579        schema_cols.to_vec()
580    } else {
581        infer_column_names(sql)
582    };
583
584    // Pad or trim to match actual column count
585    while col_names.len() < col_count {
586        col_names.push(format!("_c{}", col_names.len()));
587    }
588    col_names.truncate(col_count);
589
590    let columns = Arc::new(ColumnInfo::new(col_names));
591
592    franken_rows
593        .iter()
594        .map(|fr| {
595            let values: Vec<Value> = fr.values().iter().map(sqlite_to_value).collect();
596            Row::with_columns(Arc::clone(&columns), values)
597        })
598        .collect()
599}
600
601/// Infer column names from SQL text.
602///
603/// Handles common patterns:
604/// - `SELECT col1, col2 AS alias, ...`
605/// - `PRAGMA table_info(...)` and other PRAGMA results
606/// - Expression-only SELECT with aliases
607///
608/// Falls back to empty vec if parsing fails.
609fn infer_column_names(sql: &str) -> Vec<String> {
610    let trimmed = sql.trim();
611    let upper = trimmed.to_uppercase();
612
613    // PRAGMA column name lookup
614    if upper.starts_with("PRAGMA") {
615        return infer_pragma_columns(&upper);
616    }
617
618    // For SELECT, try to extract column names from the result columns
619    if upper.starts_with("SELECT") || upper.starts_with("WITH") {
620        return infer_select_columns(trimmed);
621    }
622
623    // For INSERT/UPDATE/DELETE with RETURNING clause
624    if upper.contains(" RETURNING ") || upper.ends_with(" RETURNING *") {
625        return infer_returning_columns(trimmed);
626    }
627
628    Vec::new()
629}
630
631/// Infer column names for PRAGMA results.
632fn infer_pragma_columns(upper_sql: &str) -> Vec<String> {
633    // Extract PRAGMA name (e.g., "PRAGMA table_info(x)" -> "table_info")
634    let after_pragma = upper_sql.trim_start_matches("PRAGMA").trim();
635    let pragma_name = after_pragma
636        .split(|c: char| c == '(' || c == ';' || c == '=' || c.is_whitespace())
637        .next()
638        .unwrap_or("")
639        .trim();
640
641    match pragma_name {
642        "TABLE_INFO" | "TABLE_XINFO" => {
643            vec![
644                "cid".into(),
645                "name".into(),
646                "type".into(),
647                "notnull".into(),
648                "dflt_value".into(),
649                "pk".into(),
650            ]
651        }
652        "INDEX_LIST" => vec![
653            "seq".into(),
654            "name".into(),
655            "unique".into(),
656            "origin".into(),
657            "partial".into(),
658        ],
659        "INDEX_INFO" | "INDEX_XINFO" => {
660            vec!["seqno".into(), "cid".into(), "name".into()]
661        }
662        "FOREIGN_KEY_LIST" => vec![
663            "id".into(),
664            "seq".into(),
665            "table".into(),
666            "from".into(),
667            "to".into(),
668            "on_update".into(),
669            "on_delete".into(),
670            "match".into(),
671        ],
672        "DATABASE_LIST" => vec!["seq".into(), "name".into(), "file".into()],
673        "COMPILE_OPTIONS" => vec!["compile_option".into()],
674        "COLLATION_LIST" => vec!["seq".into(), "name".into()],
675        "INTEGRITY_CHECK" => vec!["integrity_check".into()],
676        "QUICK_CHECK" => vec!["quick_check".into()],
677        "WAL_CHECKPOINT" => vec!["busy".into(), "log".into(), "checkpointed".into()],
678        "FREELIST_COUNT" => vec!["freelist_count".into()],
679        "PAGE_COUNT" => vec!["page_count".into()],
680        _ => {
681            // For simple PRAGMA (e.g., PRAGMA journal_mode), return the pragma name
682            if !after_pragma.contains('(') && !after_pragma.contains('=') {
683                vec![pragma_name.to_lowercase()]
684            } else {
685                Vec::new()
686            }
687        }
688    }
689}
690
691/// Infer column names from a SELECT statement.
692///
693/// Extracts aliases and bare column references from the result column list.
694fn infer_select_columns(sql: &str) -> Vec<String> {
695    // Find the columns between SELECT and FROM (or end of statement)
696    let upper = sql.to_uppercase();
697
698    // Skip past WITH clause if present
699    let select_start = if upper.starts_with("WITH") {
700        // Find the actual SELECT after the CTE
701        if let Some(pos) = find_main_select(&upper) {
702            pos
703        } else {
704            return Vec::new();
705        }
706    } else {
707        0
708    };
709
710    let after_select = &sql[select_start..];
711    let upper_after = &upper[select_start..];
712
713    // Skip SELECT [DISTINCT] keyword
714    let col_start = if upper_after.starts_with("SELECT DISTINCT") {
715        15
716    } else if upper_after.starts_with("SELECT ALL") {
717        10
718    } else if upper_after.starts_with("SELECT") {
719        6
720    } else {
721        return Vec::new();
722    };
723
724    let cols_str = &after_select[col_start..];
725
726    // Find the FROM clause (respecting parentheses depth)
727    let from_pos = find_keyword_at_depth_zero(cols_str, "FROM");
728    let cols_region = if let Some(pos) = from_pos {
729        &cols_str[..pos]
730    } else {
731        // No FROM: everything after SELECT is result columns (minus ORDER BY, LIMIT, etc.)
732        let end_pos = find_keyword_at_depth_zero(cols_str, "ORDER")
733            .or_else(|| find_keyword_at_depth_zero(cols_str, "LIMIT"))
734            .or_else(|| find_keyword_at_depth_zero(cols_str, "GROUP"))
735            .or_else(|| find_keyword_at_depth_zero(cols_str, "HAVING"))
736            .or_else(|| cols_str.find(';'));
737        if let Some(pos) = end_pos {
738            &cols_str[..pos]
739        } else {
740            cols_str
741        }
742    };
743
744    // Split by commas (respecting parentheses depth)
745    let columns = split_at_depth_zero(cols_region, ',');
746
747    columns
748        .iter()
749        .map(|col| extract_column_name(col.trim()))
750        .collect()
751}
752
753/// Infer column names from a RETURNING clause in INSERT/UPDATE/DELETE.
754///
755/// For `RETURNING *`, we return `["*"]` and let the caller handle expansion.
756/// For explicit columns, we parse them like SELECT columns.
757fn infer_returning_columns(sql: &str) -> Vec<String> {
758    let upper = sql.to_uppercase();
759
760    // Find RETURNING keyword
761    let returning_pos = if let Some(pos) = find_keyword_at_depth_zero(&upper, "RETURNING") {
762        pos
763    } else {
764        return Vec::new();
765    };
766
767    // Extract the part after RETURNING
768    let after_returning = &sql[returning_pos + 9..].trim_start();
769
770    // Handle "RETURNING *"
771    if after_returning.trim() == "*"
772        || after_returning.starts_with("* ")
773        || after_returning.starts_with("*;")
774    {
775        // For RETURNING *, we need to get column names from the table.
776        // Extract table name from INSERT INTO or UPDATE or DELETE FROM.
777        if let Some(table_name) = extract_table_name_for_returning(sql) {
778            // Return a marker that indicates we need schema lookup
779            return vec![format!("__returning_star_table:{table_name}")];
780        }
781        return vec!["*".to_string()];
782    }
783
784    // Parse explicit column list (same logic as SELECT columns)
785    // Find end markers (semicolon or end of string)
786    let end_pos = after_returning.find(';').unwrap_or(after_returning.len());
787    let cols_region = &after_returning[..end_pos];
788
789    // Split by commas at depth 0
790    let columns = split_at_depth_zero(cols_region, ',');
791
792    columns
793        .iter()
794        .map(|col| extract_column_name(col.trim()))
795        .collect()
796}
797
798/// Extract the table name from INSERT INTO, UPDATE, or DELETE FROM for RETURNING.
799fn extract_table_name_for_returning(sql: &str) -> Option<String> {
800    let upper = sql.to_uppercase();
801
802    // INSERT INTO table_name (...)
803    if upper.starts_with("INSERT") {
804        if let Some(into_pos) = upper.find(" INTO ") {
805            let after_into = &sql[into_pos + 6..].trim_start();
806            // Table name is the next word (may be quoted)
807            let table = extract_identifier(after_into);
808            if !table.is_empty() {
809                return Some(table);
810            }
811        }
812    }
813
814    // UPDATE table_name SET ...
815    if upper.starts_with("UPDATE") {
816        let after_update = &sql[6..].trim_start();
817        let table = extract_identifier(after_update);
818        if !table.is_empty() {
819            return Some(table);
820        }
821    }
822
823    // DELETE FROM table_name ...
824    if upper.starts_with("DELETE") {
825        if let Some(from_pos) = upper.find(" FROM ") {
826            let after_from = &sql[from_pos + 6..].trim_start();
827            let table = extract_identifier(after_from);
828            if !table.is_empty() {
829                return Some(table);
830            }
831        }
832    }
833
834    None
835}
836
837/// Extract an identifier (table/column name) from the start of a string.
838/// Handles quoted identifiers with double quotes.
839fn extract_identifier(s: &str) -> String {
840    let trimmed = s.trim_start();
841    if trimmed.is_empty() {
842        return String::new();
843    }
844
845    // Quoted identifier
846    if trimmed.starts_with('"') {
847        if let Some(end) = trimmed[1..].find('"') {
848            return trimmed[1..end + 1].to_string();
849        }
850        return String::new();
851    }
852
853    // Unquoted identifier
854    let end = trimmed
855        .find(|c: char| !c.is_alphanumeric() && c != '_')
856        .unwrap_or(trimmed.len());
857    trimmed[..end].to_string()
858}
859
860/// Extract a column name or alias from a result column expression.
861fn extract_column_name(col_expr: &str) -> String {
862    let trimmed = col_expr.trim();
863
864    // Check for AS alias (case-insensitive) — search backwards to handle
865    // expressions containing "AS" in sub-expressions.
866    // We need to find " AS " at depth 0.
867    if let Some(as_pos) = find_last_as_at_depth_zero(trimmed) {
868        let alias = trimmed[as_pos + 4..].trim().trim_matches('"');
869        return alias.to_string();
870    }
871
872    // Star expansion — return *
873    if trimmed == "*" {
874        return "*".to_string();
875    }
876
877    // Table.column — return just column
878    if let Some(dot_pos) = trimmed.rfind('.') {
879        return trimmed[dot_pos + 1..].trim_matches('"').to_string();
880    }
881
882    // Bare identifier
883    trimmed.trim_matches('"').to_string()
884}
885
886/// Find the last occurrence of " AS " at parentheses depth 0 (case-insensitive).
887fn find_last_as_at_depth_zero(s: &str) -> Option<usize> {
888    let bytes = s.as_bytes();
889    let len = bytes.len();
890    if len < 4 {
891        return None;
892    }
893    let mut depth = 0i32;
894    let mut last_match = None;
895
896    // Track depth forward, record all " AS " positions at depth 0
897    for i in 0..len {
898        match bytes[i] {
899            b'(' => depth += 1,
900            b')' => depth -= 1,
901            _ => {}
902        }
903        // Check for " AS " pattern: space, A/a, S/s, space
904        if depth == 0
905            && i + 3 < len
906            && (bytes[i] == b' ')
907            && (bytes[i + 1] == b'A' || bytes[i + 1] == b'a')
908            && (bytes[i + 2] == b'S' || bytes[i + 2] == b's')
909            && (bytes[i + 3] == b' ')
910        {
911            last_match = Some(i);
912        }
913    }
914    last_match
915}
916
917/// Find a keyword at parentheses depth 0.
918fn find_keyword_at_depth_zero(s: &str, keyword: &str) -> Option<usize> {
919    let upper = s.to_uppercase();
920    let kw_upper = keyword.to_uppercase();
921    let kw_len = kw_upper.len();
922    let mut depth = 0i32;
923
924    for (i, c) in upper.char_indices() {
925        match c {
926            '(' => depth += 1,
927            ')' => depth -= 1,
928            _ => {}
929        }
930        if depth == 0 && upper[i..].starts_with(&kw_upper) {
931            // Ensure it's a word boundary (alphanumeric OR underscore counts as word char)
932            let is_word_char = |b: u8| b.is_ascii_alphanumeric() || b == b'_';
933            let before_ok = i == 0 || !is_word_char(upper.as_bytes()[i - 1]);
934            let after_ok = i + kw_len >= upper.len() || !is_word_char(upper.as_bytes()[i + kw_len]);
935            if before_ok && after_ok {
936                return Some(i);
937            }
938        }
939    }
940    None
941}
942
943/// Split a string by a delimiter at parentheses depth 0.
944fn split_at_depth_zero(s: &str, delim: char) -> Vec<&str> {
945    let mut parts = Vec::new();
946    let mut depth = 0i32;
947    let mut start = 0;
948
949    for (i, c) in s.char_indices() {
950        match c {
951            '(' => depth += 1,
952            ')' => depth -= 1,
953            _ if c == delim && depth == 0 => {
954                parts.push(&s[start..i]);
955                start = i + c.len_utf8();
956            }
957            _ => {}
958        }
959    }
960    parts.push(&s[start..]);
961    parts
962}
963
964/// Find the position of the main SELECT in a WITH ... SELECT statement.
965fn find_main_select(upper: &str) -> Option<usize> {
966    // Walk past CTE definitions (respecting parentheses)
967    let mut depth = 0i32;
968    let bytes = upper.as_bytes();
969    let mut i = 4; // Skip "WITH"
970
971    while i < bytes.len() {
972        match bytes[i] {
973            b'(' => depth += 1,
974            b')' => depth -= 1,
975            b'S' if depth == 0 && upper[i..].starts_with("SELECT") => {
976                return Some(i);
977            }
978            _ => {}
979        }
980        i += 1;
981    }
982    None
983}
984
985/// Check if SQL is an INSERT statement (case-insensitive).
986fn is_insert_sql(sql: &str) -> bool {
987    let trimmed = sql.trim().to_uppercase();
988    trimmed.starts_with("INSERT")
989        || trimmed.starts_with("REPLACE")
990        || trimmed.starts_with("INSERT OR")
991}
992
993/// Count parameter placeholders in SQL (?1, ?2, etc. or bare ?).
994fn count_params(sql: &str) -> usize {
995    let mut max_param = 0usize;
996    let mut bare_count = 0usize;
997    let bytes = sql.as_bytes();
998    let mut i = 0;
999
1000    while i < bytes.len() {
1001        if bytes[i] == b'?' {
1002            i += 1;
1003            let mut num = 0u64;
1004            let mut has_digits = false;
1005            while i < bytes.len() && bytes[i].is_ascii_digit() {
1006                num = num * 10 + u64::from(bytes[i] - b'0');
1007                has_digits = true;
1008                i += 1;
1009            }
1010            if has_digits {
1011                max_param = max_param.max(num as usize);
1012            } else {
1013                bare_count += 1;
1014            }
1015        } else {
1016            i += 1;
1017        }
1018    }
1019
1020    if max_param > 0 { max_param } else { bare_count }
1021}
1022
1023// ── Error conversion ──────────────────────────────────────────────────────
1024
1025fn franken_to_conn_error(e: &fsqlite_error::FrankenError) -> Error {
1026    Error::Connection(ConnectionError {
1027        kind: ConnectionErrorKind::Connect,
1028        message: e.to_string(),
1029        source: None,
1030    })
1031}
1032
1033fn franken_to_query_error(e: &fsqlite_error::FrankenError, sql: &str) -> Error {
1034    use fsqlite_error::FrankenError;
1035
1036    let kind = match e {
1037        FrankenError::UniqueViolation { .. } | FrankenError::NotNullViolation { .. } => {
1038            QueryErrorKind::Constraint
1039        }
1040        FrankenError::ForeignKeyViolation { .. } | FrankenError::CheckViolation { .. } => {
1041            QueryErrorKind::Constraint
1042        }
1043        FrankenError::WriteConflict { .. } | FrankenError::SerializationFailure { .. } => {
1044            QueryErrorKind::Deadlock
1045        }
1046        FrankenError::SyntaxError { .. } => QueryErrorKind::Syntax,
1047        FrankenError::QueryReturnedNoRows => QueryErrorKind::NotFound,
1048        _ => QueryErrorKind::Database,
1049    };
1050
1051    Error::Query(QueryError {
1052        kind,
1053        sql: Some(sql.to_string()),
1054        sqlstate: None,
1055        message: e.to_string(),
1056        detail: None,
1057        hint: None,
1058        position: None,
1059        source: None,
1060    })
1061}
1062
1063#[cfg(test)]
1064mod tests {
1065    use super::*;
1066
1067    #[test]
1068    fn open_memory_succeeds() {
1069        let conn = FrankenConnection::open_memory().expect("should open in-memory db");
1070        assert_eq!(conn.path(), ":memory:");
1071    }
1072
1073    #[test]
1074    fn close_sync_succeeds() {
1075        let conn = FrankenConnection::open_memory().expect("should open in-memory db");
1076        conn.close_sync()
1077            .expect("close_sync should close the underlying frankensqlite connection");
1078    }
1079
1080    #[test]
1081    fn execute_raw_create_table() {
1082        let conn = FrankenConnection::open_memory().unwrap();
1083        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT)")
1084            .unwrap();
1085    }
1086
1087    #[test]
1088    fn query_sync_basic() {
1089        let conn = FrankenConnection::open_memory().unwrap();
1090        let rows = conn.query_sync("SELECT 1 + 2, 'hello'", &[]).unwrap();
1091        assert_eq!(rows.len(), 1);
1092        assert_eq!(rows[0].get(0), Some(&Value::BigInt(3)));
1093        assert_eq!(rows[0].get(1), Some(&Value::Text("hello".into())));
1094    }
1095
1096    #[test]
1097    fn execute_sync_insert() {
1098        let conn = FrankenConnection::open_memory().unwrap();
1099        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1100            .unwrap();
1101        let count = conn
1102            .execute_sync(
1103                "INSERT INTO t (val) VALUES (?1)",
1104                &[Value::Text("test".into())],
1105            )
1106            .unwrap();
1107        assert_eq!(count, 1);
1108    }
1109
1110    #[test]
1111    fn query_with_params() {
1112        let conn = FrankenConnection::open_memory().unwrap();
1113        let rows = conn
1114            .query_sync("SELECT ?1 + ?2", &[Value::BigInt(10), Value::BigInt(20)])
1115            .unwrap();
1116        assert_eq!(rows.len(), 1);
1117        assert_eq!(rows[0].get(0), Some(&Value::BigInt(30)));
1118    }
1119
1120    #[test]
1121    fn transaction_commit() {
1122        let conn = FrankenConnection::open_memory().unwrap();
1123        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1124            .unwrap();
1125
1126        conn.begin_sync(IsolationLevel::ReadCommitted).unwrap();
1127        conn.execute_sync(
1128            "INSERT INTO t (val) VALUES (?1)",
1129            &[Value::Text("a".into())],
1130        )
1131        .unwrap();
1132        conn.commit_sync().unwrap();
1133
1134        let rows = conn.query_sync("SELECT val FROM t", &[]).unwrap();
1135        assert_eq!(rows.len(), 1);
1136    }
1137
1138    #[test]
1139    fn transaction_rollback() {
1140        let conn = FrankenConnection::open_memory().unwrap();
1141        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1142            .unwrap();
1143
1144        conn.begin_sync(IsolationLevel::ReadCommitted).unwrap();
1145        conn.execute_sync(
1146            "INSERT INTO t (val) VALUES (?1)",
1147            &[Value::Text("a".into())],
1148        )
1149        .unwrap();
1150        conn.rollback_sync().unwrap();
1151
1152        let rows = conn.query_sync("SELECT val FROM t", &[]).unwrap();
1153        assert_eq!(rows.len(), 0);
1154    }
1155
1156    #[test]
1157    fn dialect_is_sqlite() {
1158        let conn = FrankenConnection::open_memory().unwrap();
1159        assert_eq!(conn.dialect(), sqlmodel_core::Dialect::Sqlite);
1160    }
1161
1162    #[test]
1163    fn count_params_numbered() {
1164        assert_eq!(count_params("SELECT ?1, ?2, ?3"), 3);
1165        assert_eq!(count_params("INSERT INTO t VALUES (?1, ?2)"), 2);
1166    }
1167
1168    #[test]
1169    fn count_params_bare() {
1170        assert_eq!(count_params("SELECT ?, ?"), 2);
1171    }
1172
1173    #[test]
1174    fn count_params_none() {
1175        assert_eq!(count_params("SELECT 1"), 0);
1176    }
1177
1178    #[test]
1179    fn infer_select_column_names() {
1180        let names = infer_column_names("SELECT id, name AS username, count(*) AS total FROM t");
1181        assert_eq!(names, vec!["id", "username", "total"]);
1182    }
1183
1184    #[test]
1185    fn infer_pragma_table_info() {
1186        let names = infer_column_names("PRAGMA table_info(users)");
1187        assert!(names.contains(&"name".to_string()));
1188        assert!(names.contains(&"type".to_string()));
1189    }
1190
1191    #[test]
1192    fn infer_expression_select() {
1193        let names = infer_column_names("SELECT 1 + 2 AS result");
1194        assert_eq!(names, vec!["result"]);
1195    }
1196
1197    #[test]
1198    fn ping_succeeds() {
1199        let conn = FrankenConnection::open_memory().unwrap();
1200        let result = conn.query_sync("SELECT 1", &[]);
1201        assert!(result.is_ok());
1202    }
1203
1204    #[test]
1205    fn multiple_statements_in_execute_raw() {
1206        let conn = FrankenConnection::open_memory().unwrap();
1207        conn.execute_raw(
1208            "CREATE TABLE a (id INTEGER PRIMARY KEY); CREATE TABLE b (id INTEGER PRIMARY KEY)",
1209        )
1210        .unwrap();
1211        // Verify both tables exist by inserting into them
1212        conn.execute_sync("INSERT INTO a (id) VALUES (1)", &[])
1213            .unwrap();
1214        conn.execute_sync("INSERT INTO b (id) VALUES (1)", &[])
1215            .unwrap();
1216    }
1217
1218    #[test]
1219    fn insert_returns_rowid() {
1220        let conn = FrankenConnection::open_memory().unwrap();
1221        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1222            .unwrap();
1223        // Insert and verify via query
1224        conn.execute_sync(
1225            "INSERT INTO t (val) VALUES (?1)",
1226            &[Value::Text("a".into())],
1227        )
1228        .unwrap();
1229        let rows = conn.query_sync("SELECT id FROM t", &[]).unwrap();
1230        assert_eq!(rows.len(), 1);
1231        // Verify we got a row back (auto-increment may not produce the
1232        // same values as C SQLite, but row should exist)
1233        assert!(rows[0].get(0).is_some());
1234    }
1235
1236    // ── BEGIN CONCURRENT tests ────────────────────────────────────────────
1237
1238    #[test]
1239    fn begin_concurrent_basic() {
1240        let conn = FrankenConnection::open_memory().unwrap();
1241        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1242            .unwrap();
1243        conn.execute_raw("BEGIN CONCURRENT").unwrap();
1244        conn.execute_raw("INSERT INTO t VALUES (1, 'hello')")
1245            .unwrap();
1246        conn.execute_raw("COMMIT").unwrap();
1247
1248        let rows = conn
1249            .query_sync("SELECT val FROM t WHERE id = 1", &[])
1250            .unwrap();
1251        assert_eq!(rows.len(), 1);
1252        assert_eq!(rows[0].get(0), Some(&Value::Text("hello".into())));
1253    }
1254
1255    #[test]
1256    fn begin_concurrent_rollback() {
1257        let conn = FrankenConnection::open_memory().unwrap();
1258        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1259            .unwrap();
1260        conn.execute_raw("BEGIN CONCURRENT").unwrap();
1261        conn.execute_raw("INSERT INTO t VALUES (1, 'gone')")
1262            .unwrap();
1263        conn.execute_raw("ROLLBACK").unwrap();
1264
1265        let rows = conn.query_sync("SELECT count(*) FROM t", &[]).unwrap();
1266        assert_eq!(rows[0].get(0), Some(&Value::BigInt(0)));
1267    }
1268
1269    #[test]
1270    fn begin_concurrent_with_params() {
1271        let conn = FrankenConnection::open_memory().unwrap();
1272        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1273            .unwrap();
1274        conn.execute_raw("BEGIN CONCURRENT").unwrap();
1275        conn.execute_sync(
1276            "INSERT INTO t VALUES (?1, ?2)",
1277            &[Value::BigInt(1), Value::Text("parameterized".into())],
1278        )
1279        .unwrap();
1280        conn.execute_raw("COMMIT").unwrap();
1281
1282        let rows = conn
1283            .query_sync("SELECT val FROM t WHERE id = ?1", &[Value::BigInt(1)])
1284            .unwrap();
1285        assert_eq!(rows.len(), 1);
1286        assert_eq!(rows[0].get(0), Some(&Value::Text("parameterized".into())));
1287    }
1288
1289    #[test]
1290    fn begin_concurrent_multiple_inserts() {
1291        let conn = FrankenConnection::open_memory().unwrap();
1292        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1293            .unwrap();
1294        conn.execute_raw("BEGIN CONCURRENT").unwrap();
1295        for i in 1..=100 {
1296            conn.execute_sync(
1297                "INSERT INTO t VALUES (?1, ?2)",
1298                &[Value::BigInt(i), Value::Text(format!("row_{i}"))],
1299            )
1300            .unwrap();
1301        }
1302        conn.execute_raw("COMMIT").unwrap();
1303
1304        let rows = conn.query_sync("SELECT count(*) FROM t", &[]).unwrap();
1305        assert_eq!(rows[0].get(0), Some(&Value::BigInt(100)));
1306    }
1307
1308    // ── Isolation level tests ─────────────────────────────────────────────
1309
1310    #[test]
1311    fn begin_serializable_uses_exclusive() {
1312        let conn = FrankenConnection::open_memory().unwrap();
1313        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY)")
1314            .unwrap();
1315        conn.begin_sync(IsolationLevel::Serializable).unwrap();
1316        conn.execute_sync("INSERT INTO t VALUES (1)", &[]).unwrap();
1317        conn.commit_sync().unwrap();
1318        let rows = conn.query_sync("SELECT count(*) FROM t", &[]).unwrap();
1319        assert_eq!(rows[0].get(0), Some(&Value::BigInt(1)));
1320    }
1321
1322    #[test]
1323    fn begin_read_uncommitted_uses_deferred() {
1324        let conn = FrankenConnection::open_memory().unwrap();
1325        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY)")
1326            .unwrap();
1327        conn.begin_sync(IsolationLevel::ReadUncommitted).unwrap();
1328        conn.execute_sync("INSERT INTO t VALUES (1)", &[]).unwrap();
1329        conn.commit_sync().unwrap();
1330        let rows = conn.query_sync("SELECT count(*) FROM t", &[]).unwrap();
1331        assert_eq!(rows[0].get(0), Some(&Value::BigInt(1)));
1332    }
1333
1334    #[test]
1335    fn double_begin_returns_error() {
1336        let conn = FrankenConnection::open_memory().unwrap();
1337        conn.begin_sync(IsolationLevel::ReadCommitted).unwrap();
1338        let err = conn.begin_sync(IsolationLevel::ReadCommitted).unwrap_err();
1339        assert!(err.to_string().contains("Already in a transaction"));
1340    }
1341
1342    #[test]
1343    fn commit_without_begin_returns_error() {
1344        let conn = FrankenConnection::open_memory().unwrap();
1345        let err = conn.commit_sync().unwrap_err();
1346        assert!(err.to_string().contains("Not in a transaction"));
1347    }
1348
1349    #[test]
1350    fn rollback_without_begin_returns_error() {
1351        let conn = FrankenConnection::open_memory().unwrap();
1352        let err = conn.rollback_sync().unwrap_err();
1353        assert!(err.to_string().contains("Not in a transaction"));
1354    }
1355
1356    // ── Savepoint tests ──────────────────────────────────────────────────
1357
1358    #[test]
1359    fn savepoint_and_release() {
1360        let conn = FrankenConnection::open_memory().unwrap();
1361        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1362            .unwrap();
1363        conn.execute_raw("BEGIN CONCURRENT").unwrap();
1364        conn.execute_raw("INSERT INTO t VALUES (1, 'a')").unwrap();
1365        conn.execute_raw("SAVEPOINT sp1").unwrap();
1366        conn.execute_raw("INSERT INTO t VALUES (2, 'b')").unwrap();
1367        conn.execute_raw("RELEASE sp1").unwrap();
1368        conn.execute_raw("COMMIT").unwrap();
1369
1370        let rows = conn.query_sync("SELECT count(*) FROM t", &[]).unwrap();
1371        assert_eq!(rows[0].get(0), Some(&Value::BigInt(2)));
1372    }
1373
1374    #[test]
1375    fn savepoint_rollback_to() {
1376        let conn = FrankenConnection::open_memory().unwrap();
1377        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1378            .unwrap();
1379        conn.execute_raw("BEGIN CONCURRENT").unwrap();
1380        conn.execute_raw("INSERT INTO t VALUES (1, 'keep')")
1381            .unwrap();
1382        conn.execute_raw("SAVEPOINT sp1").unwrap();
1383        conn.execute_raw("INSERT INTO t VALUES (2, 'discard')")
1384            .unwrap();
1385        conn.execute_raw("ROLLBACK TO sp1").unwrap();
1386        conn.execute_raw("COMMIT").unwrap();
1387
1388        let rows = conn.query_sync("SELECT count(*) FROM t", &[]).unwrap();
1389        assert_eq!(rows[0].get(0), Some(&Value::BigInt(1)));
1390        let rows = conn
1391            .query_sync("SELECT val FROM t WHERE id = 1", &[])
1392            .unwrap();
1393        assert_eq!(rows[0].get(0), Some(&Value::Text("keep".into())));
1394    }
1395
1396    // ── File-based connection test ────────────────────────────────────────
1397
1398    #[test]
1399    fn file_based_connection() {
1400        let dir = std::env::temp_dir().join("sqlmodel_franken_test");
1401        let _ = std::fs::create_dir_all(&dir);
1402        let db_path = dir.join("test_file.db");
1403        let path_str = db_path.display().to_string();
1404
1405        // Clean up from previous runs
1406        let _ = std::fs::remove_file(&db_path);
1407
1408        {
1409            let conn = FrankenConnection::open_file(&path_str).unwrap();
1410            conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1411                .unwrap();
1412            conn.execute_raw("BEGIN CONCURRENT").unwrap();
1413            conn.execute_sync("INSERT INTO t VALUES (1, 'persistent')", &[])
1414                .unwrap();
1415            conn.execute_raw("COMMIT").unwrap();
1416        }
1417
1418        // Reopen and verify data persisted
1419        {
1420            let conn = FrankenConnection::open_file(&path_str).unwrap();
1421            let rows = conn
1422                .query_sync("SELECT val FROM t WHERE id = 1", &[])
1423                .unwrap();
1424            assert_eq!(rows.len(), 1);
1425            assert_eq!(rows[0].get(0), Some(&Value::Text("persistent".into())));
1426        }
1427
1428        let _ = std::fs::remove_file(&db_path);
1429    }
1430
1431    // ── Error mapping tests ──────────────────────────────────────────────
1432
1433    #[test]
1434    fn invalid_sql_returns_query_error() {
1435        let conn = FrankenConnection::open_memory().unwrap();
1436        let err = conn.execute_raw("SELECTT 1").unwrap_err();
1437        // frankensqlite returns a Database-level error for unrecognized statements
1438        match &err {
1439            Error::Query(qe) => {
1440                assert!(
1441                    qe.kind == QueryErrorKind::Syntax || qe.kind == QueryErrorKind::Database,
1442                    "expected Syntax or Database, got: {:?}",
1443                    qe.kind
1444                );
1445            }
1446            other => panic!("expected Query error, got: {other}"),
1447        }
1448    }
1449
1450    #[test]
1451    fn error_type_mapping_write_conflict() {
1452        // Verify that WriteConflict maps to Deadlock kind
1453        use fsqlite_error::FrankenError;
1454        let err = FrankenError::WriteConflict {
1455            page: 42,
1456            holder: 99,
1457        };
1458        let mapped = franken_to_query_error(&err, "COMMIT");
1459        match mapped {
1460            Error::Query(qe) => assert_eq!(qe.kind, QueryErrorKind::Deadlock),
1461            other => panic!("expected Deadlock error, got: {other}"),
1462        }
1463    }
1464
1465    #[test]
1466    fn error_type_mapping_serialization_failure() {
1467        use fsqlite_error::FrankenError;
1468        let err = FrankenError::SerializationFailure { page: 7 };
1469        let mapped = franken_to_query_error(&err, "COMMIT");
1470        match mapped {
1471            Error::Query(qe) => assert_eq!(qe.kind, QueryErrorKind::Deadlock),
1472            other => panic!("expected Deadlock error, got: {other}"),
1473        }
1474    }
1475
1476    // ── Column inference edge cases ──────────────────────────────────────
1477
1478    #[test]
1479    fn infer_columns_star_select() {
1480        let names = infer_column_names("SELECT * FROM t");
1481        assert_eq!(names, vec!["*"]);
1482    }
1483
1484    #[test]
1485    fn infer_columns_table_qualified() {
1486        let names = infer_column_names("SELECT t.id, t.name FROM t");
1487        assert_eq!(names, vec!["id", "name"]);
1488    }
1489
1490    #[test]
1491    fn infer_columns_table_qualified_with_alias() {
1492        // This is the pattern used in mcp-agent-mail-db queries
1493        let names = infer_column_names(
1494            "SELECT m.id, m.subject, a.name as from_name, m.body_md FROM messages m JOIN agents a ON a.id = m.sender_id",
1495        );
1496        assert_eq!(names, vec!["id", "subject", "from_name", "body_md"]);
1497    }
1498
1499    #[test]
1500    fn infer_columns_lowercase_as() {
1501        let names = infer_column_names("SELECT a.name as alias_name FROM t");
1502        assert_eq!(names, vec!["alias_name"]);
1503    }
1504
1505    #[test]
1506    fn infer_columns_with_cte() {
1507        let names = infer_column_names("WITH cte AS (SELECT 1 AS x) SELECT x, x + 1 AS y FROM cte");
1508        assert_eq!(names, vec!["x", "y"]);
1509    }
1510
1511    #[test]
1512    fn infer_columns_subquery_alias() {
1513        let names = infer_column_names("SELECT (SELECT 1) AS sub, 2 AS plain");
1514        assert_eq!(names, vec!["sub", "plain"]);
1515    }
1516
1517    #[test]
1518    fn infer_columns_no_from() {
1519        let names = infer_column_names("SELECT 1 AS a, 2 AS b, 3 AS c");
1520        assert_eq!(names, vec!["a", "b", "c"]);
1521    }
1522
1523    #[test]
1524    fn infer_pragma_database_list() {
1525        let names = infer_column_names("PRAGMA database_list");
1526        assert_eq!(names, vec!["seq", "name", "file"]);
1527    }
1528
1529    #[test]
1530    fn infer_pragma_integrity_check() {
1531        let names = infer_column_names("PRAGMA integrity_check");
1532        assert_eq!(names, vec!["integrity_check"]);
1533    }
1534
1535    #[test]
1536    fn infer_pragma_quick_check() {
1537        let names = infer_column_names("PRAGMA quick_check");
1538        assert_eq!(names, vec!["quick_check"]);
1539    }
1540
1541    #[test]
1542    fn infer_pragma_simple_value() {
1543        let names = infer_column_names("PRAGMA journal_mode");
1544        assert_eq!(names, vec!["journal_mode"]);
1545    }
1546
1547    // ── changes() test ───────────────────────────────────────────────────
1548
1549    #[test]
1550    fn changes_returns_value() {
1551        // frankensqlite's changes() may return 0 for non-INSERT statements;
1552        // verify it at least doesn't panic and returns a non-negative value
1553        let conn = FrankenConnection::open_memory().unwrap();
1554        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1555            .unwrap();
1556        conn.execute_sync("INSERT INTO t VALUES (1, 'a')", &[])
1557            .unwrap();
1558        let c = conn.changes();
1559        assert!(c >= 0, "changes() should be non-negative, got {c}");
1560    }
1561
1562    // ── last_insert_rowid tracking ───────────────────────────────────────
1563
1564    #[test]
1565    fn last_insert_rowid_accessible() {
1566        // frankensqlite may not update last_insert_rowid() the same way as C SQLite;
1567        // verify the method is callable and returns a consistent value
1568        let conn = FrankenConnection::open_memory().unwrap();
1569        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1570            .unwrap();
1571        conn.execute_sync("INSERT INTO t (val) VALUES ('a')", &[])
1572            .unwrap();
1573        let rowid = conn.last_insert_rowid();
1574        // At minimum, should not panic; value may be 0 if frankensqlite
1575        // doesn't support last_insert_rowid() via SELECT
1576        assert!(rowid >= 0, "last_insert_rowid should be >= 0, got {rowid}");
1577    }
1578
1579    // ── Transaction + Connection trait async bridge ──────────────────────
1580
1581    #[test]
1582    fn connection_trait_query_async_bridge() {
1583        use sqlmodel_core::Cx;
1584        let conn = FrankenConnection::open_memory().unwrap();
1585        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1586            .unwrap();
1587        conn.execute_sync("INSERT INTO t VALUES (1, 'async')", &[])
1588            .unwrap();
1589
1590        let cx = Cx::for_testing();
1591        // Test that the async Connection::query method works correctly
1592        let result = asupersync::runtime::RuntimeBuilder::current_thread()
1593            .build()
1594            .unwrap()
1595            .block_on(async { Connection::query(&conn, &cx, "SELECT val FROM t", &[]).await });
1596        match result {
1597            Outcome::Ok(rows) => {
1598                assert_eq!(rows.len(), 1);
1599                assert_eq!(rows[0].get(0), Some(&Value::Text("async".into())));
1600            }
1601            other => panic!("expected Ok, got: {other:?}"),
1602        }
1603    }
1604
1605    #[test]
1606    fn connection_trait_begin_and_commit() {
1607        use sqlmodel_core::Cx;
1608        let conn = FrankenConnection::open_memory().unwrap();
1609        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY)")
1610            .unwrap();
1611
1612        let rt = asupersync::runtime::RuntimeBuilder::current_thread()
1613            .build()
1614            .unwrap();
1615        let cx = Cx::for_testing();
1616
1617        rt.block_on(async {
1618            let tx = conn.begin(&cx).await.into_result().unwrap();
1619            TransactionOps::execute(&tx, &cx, "INSERT INTO t VALUES (1)", &[])
1620                .await
1621                .into_result()
1622                .unwrap();
1623            tx.commit(&cx).await.into_result().unwrap();
1624        });
1625
1626        let rows = conn.query_sync("SELECT count(*) FROM t", &[]).unwrap();
1627        assert_eq!(rows[0].get(0), Some(&Value::BigInt(1)));
1628    }
1629
1630    #[test]
1631    fn transaction_drop_auto_rollback() {
1632        let conn = FrankenConnection::open_memory().unwrap();
1633        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY)")
1634            .unwrap();
1635
1636        let rt = asupersync::runtime::RuntimeBuilder::current_thread()
1637            .build()
1638            .unwrap();
1639        let cx = Cx::for_testing();
1640
1641        rt.block_on(async {
1642            let tx = conn.begin(&cx).await.into_result().unwrap();
1643            TransactionOps::execute(&tx, &cx, "INSERT INTO t VALUES (1)", &[])
1644                .await
1645                .into_result()
1646                .unwrap();
1647            // Drop tx without commit — should auto-rollback
1648            drop(tx);
1649        });
1650
1651        let rows = conn.query_sync("SELECT count(*) FROM t", &[]).unwrap();
1652        assert_eq!(rows[0].get(0), Some(&Value::BigInt(0)));
1653    }
1654
1655    // ── Batch execution ──────────────────────────────────────────────────
1656
1657    #[test]
1658    fn batch_multiple_statements() {
1659        let conn = FrankenConnection::open_memory().unwrap();
1660        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1661            .unwrap();
1662
1663        let rt = asupersync::runtime::RuntimeBuilder::current_thread()
1664            .build()
1665            .unwrap();
1666        let cx = Cx::for_testing();
1667
1668        let results = rt.block_on(async {
1669            Connection::batch(
1670                &conn,
1671                &cx,
1672                &[
1673                    ("INSERT INTO t VALUES (1, 'a')".to_string(), vec![]),
1674                    ("INSERT INTO t VALUES (2, 'b')".to_string(), vec![]),
1675                    ("INSERT INTO t VALUES (3, 'c')".to_string(), vec![]),
1676                ],
1677            )
1678            .await
1679            .into_result()
1680            .unwrap()
1681        });
1682
1683        assert_eq!(results.len(), 3);
1684        let rows = conn.query_sync("SELECT count(*) FROM t", &[]).unwrap();
1685        assert_eq!(rows[0].get(0), Some(&Value::BigInt(3)));
1686    }
1687
1688    // ── NULL handling ────────────────────────────────────────────────────
1689
1690    #[test]
1691    fn null_values_round_trip() {
1692        let conn = FrankenConnection::open_memory().unwrap();
1693        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT)")
1694            .unwrap();
1695        conn.execute_sync(
1696            "INSERT INTO t VALUES (?1, ?2)",
1697            &[Value::BigInt(1), Value::Null],
1698        )
1699        .unwrap();
1700        let rows = conn
1701            .query_sync("SELECT val FROM t WHERE id = 1", &[])
1702            .unwrap();
1703        assert_eq!(rows[0].get(0), Some(&Value::Null));
1704    }
1705
1706    // ── Blob handling ────────────────────────────────────────────────────
1707
1708    #[test]
1709    fn blob_values_round_trip() {
1710        let conn = FrankenConnection::open_memory().unwrap();
1711        conn.execute_raw("CREATE TABLE t (id INTEGER PRIMARY KEY, data BLOB)")
1712            .unwrap();
1713        let blob = vec![0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0xFF];
1714        conn.execute_sync(
1715            "INSERT INTO t VALUES (1, ?1)",
1716            &[Value::Bytes(blob.clone())],
1717        )
1718        .unwrap();
1719        let rows = conn
1720            .query_sync("SELECT data FROM t WHERE id = 1", &[])
1721            .unwrap();
1722        assert_eq!(rows[0].get(0), Some(&Value::Bytes(blob)));
1723    }
1724
1725    // br-22iss: Test UPDATE with numbered placeholders matching E2E failure scenario
1726    #[test]
1727    fn update_with_numbered_placeholders_in_where() {
1728        let conn = FrankenConnection::open_memory().unwrap();
1729        conn.execute_raw(
1730            "CREATE TABLE agents (
1731                id INTEGER PRIMARY KEY,
1732                project_id INTEGER,
1733                name TEXT,
1734                contact_policy TEXT
1735            )",
1736        )
1737        .unwrap();
1738
1739        // Insert two agents
1740        conn.execute_sync(
1741            "INSERT INTO agents (project_id, name, contact_policy) VALUES (?1, ?2, ?3)",
1742            &[
1743                Value::BigInt(1),
1744                Value::Text("BlueLake".into()),
1745                Value::Text("auto".into()),
1746            ],
1747        )
1748        .unwrap();
1749        conn.execute_sync(
1750            "INSERT INTO agents (project_id, name, contact_policy) VALUES (?1, ?2, ?3)",
1751            &[
1752                Value::BigInt(1),
1753                Value::Text("RedFox".into()),
1754                Value::Text("auto".into()),
1755            ],
1756        )
1757        .unwrap();
1758
1759        // Verify both agents exist
1760        let rows = conn
1761            .query_sync(
1762                "SELECT * FROM agents WHERE project_id = ?1",
1763                &[Value::BigInt(1)],
1764            )
1765            .unwrap();
1766        assert_eq!(rows.len(), 2, "should have 2 agents");
1767
1768        // Update RedFox's contact_policy - this is the failing pattern from E2E
1769        let affected = conn
1770            .execute_sync(
1771                "UPDATE agents SET contact_policy = ?1 WHERE project_id = ?2 AND name = ?3",
1772                &[
1773                    Value::Text("open".into()),
1774                    Value::BigInt(1),
1775                    Value::Text("RedFox".into()),
1776                ],
1777            )
1778            .unwrap();
1779        assert_eq!(affected, 1, "should affect 1 row");
1780
1781        // Verify the update worked
1782        let rows = conn
1783            .query_sync(
1784                "SELECT contact_policy FROM agents WHERE name = ?1",
1785                &[Value::Text("RedFox".into())],
1786            )
1787            .unwrap();
1788        assert_eq!(rows[0].get(0), Some(&Value::Text("open".into())));
1789    }
1790
1791    // br-22iss: Test UPDATE with 4 numbered placeholders matching exact E2E scenario
1792    #[test]
1793    fn update_with_four_numbered_placeholders_in_where() {
1794        let conn = FrankenConnection::open_memory().unwrap();
1795        conn.execute_raw(
1796            "CREATE TABLE agents (
1797                id INTEGER PRIMARY KEY,
1798                project_id INTEGER,
1799                name TEXT,
1800                contact_policy TEXT,
1801                last_active_ts INTEGER
1802            )",
1803        )
1804        .unwrap();
1805
1806        // Insert two agents
1807        conn.execute_sync(
1808            "INSERT INTO agents (project_id, name, contact_policy, last_active_ts) VALUES (?1, ?2, ?3, ?4)",
1809            &[Value::BigInt(1), Value::Text("BlueLake".into()), Value::Text("auto".into()), Value::BigInt(1000)],
1810        )
1811        .unwrap();
1812        conn.execute_sync(
1813            "INSERT INTO agents (project_id, name, contact_policy, last_active_ts) VALUES (?1, ?2, ?3, ?4)",
1814            &[Value::BigInt(1), Value::Text("RedFox".into()), Value::Text("auto".into()), Value::BigInt(1000)],
1815        )
1816        .unwrap();
1817
1818        // Verify both agents exist
1819        let rows = conn
1820            .query_sync(
1821                "SELECT * FROM agents WHERE project_id = ?1",
1822                &[Value::BigInt(1)],
1823            )
1824            .unwrap();
1825        assert_eq!(rows.len(), 2, "should have 2 agents");
1826
1827        // Exact E2E scenario: UPDATE agents SET contact_policy = ?1, last_active_ts = ?2 WHERE project_id = ?3 AND name = ?4
1828        let affected = conn
1829            .execute_sync(
1830                "UPDATE agents SET contact_policy = ?1, last_active_ts = ?2 WHERE project_id = ?3 AND name = ?4",
1831                &[Value::Text("open".into()), Value::BigInt(2000), Value::BigInt(1), Value::Text("RedFox".into())],
1832            )
1833            .unwrap();
1834        assert_eq!(affected, 1, "should affect 1 row");
1835
1836        // Verify the update worked
1837        let rows = conn
1838            .query_sync(
1839                "SELECT contact_policy, last_active_ts FROM agents WHERE name = ?1",
1840                &[Value::Text("RedFox".into())],
1841            )
1842            .unwrap();
1843        assert_eq!(rows[0].get(0), Some(&Value::Text("open".into())));
1844        assert_eq!(rows[0].get(1), Some(&Value::BigInt(2000)));
1845    }
1846}