Skip to main content

sqlmodel_sqlite/
connection.rs

1//! SQLite connection implementation.
2//!
3//! This module provides safe wrappers around SQLite's C API and implements
4//! the Connection trait from sqlmodel-core.
5//!
6//! # Console Integration
7//!
8//! When the `console` feature is enabled, the connection can report status
9//! during operations. Use the `ConsoleAware` trait to attach a console.
10//!
11//! ```rust,ignore
12//! use sqlmodel_sqlite::SqliteConnection;
13//! use sqlmodel_console::{SqlModelConsole, ConsoleAware};
14//! use std::sync::Arc;
15//!
16//! let console = Arc::new(SqlModelConsole::new());
17//! let mut conn = SqliteConnection::open_memory().unwrap();
18//! conn.set_console(Some(console));
19//! ```
20
21// Allow casts in FFI code where we need to match C types exactly
22#![allow(clippy::cast_possible_truncation)]
23#![allow(clippy::cast_sign_loss)]
24#![allow(clippy::cast_lossless)]
25#![allow(clippy::result_large_err)] // Error type is defined in sqlmodel-core
26#![allow(clippy::borrow_as_ptr)] // FFI requires raw pointers
27#![allow(clippy::if_not_else)] // Clearer for error handling
28#![allow(clippy::implicit_clone)] // Minor optimization
29#![allow(clippy::map_unwrap_or)] // Clearer for optional formatting
30#![allow(clippy::redundant_closure)] // format_value requires context
31
32use crate::ffi;
33use crate::types;
34use sqlmodel_core::{
35    Connection, Cx, Error, IsolationLevel, Outcome, PreparedStatement, Row, TransactionOps, Value,
36    error::{ConnectionError, ConnectionErrorKind, QueryError, QueryErrorKind},
37    row::ColumnInfo,
38};
39use std::ffi::{CStr, CString, c_int};
40use std::future::Future;
41use std::ptr;
42use std::sync::{Arc, Mutex};
43use std::time::Duration;
44
45#[cfg(feature = "console")]
46use sqlmodel_console::{ConsoleAware, SqlModelConsole};
47
48/// Configuration for opening SQLite connections.
49#[derive(Debug, Clone)]
50pub struct SqliteConfig {
51    /// Path to the database file, or ":memory:" for in-memory database.
52    pub path: String,
53    /// Open flags (read-only, read-write, create, etc.)
54    pub flags: OpenFlags,
55    /// Busy timeout in milliseconds.
56    pub busy_timeout_ms: u32,
57}
58
59/// Flags controlling how the database is opened.
60#[derive(Debug, Clone, Copy, Default)]
61pub struct OpenFlags {
62    /// Open for reading only.
63    pub read_only: bool,
64    /// Open for reading and writing.
65    pub read_write: bool,
66    /// Create the database if it doesn't exist.
67    pub create: bool,
68    /// Enable URI filename interpretation.
69    pub uri: bool,
70    /// Open in multi-thread mode (connections not shared between threads).
71    pub no_mutex: bool,
72    /// Open in serialized mode (connections can be shared).
73    pub full_mutex: bool,
74    /// Enable shared cache mode.
75    pub shared_cache: bool,
76    /// Disable shared cache mode.
77    pub private_cache: bool,
78}
79
80impl OpenFlags {
81    /// Create flags for read-only access.
82    pub fn read_only() -> Self {
83        Self {
84            read_only: true,
85            ..Default::default()
86        }
87    }
88
89    /// Create flags for read-write access (database must exist).
90    pub fn read_write() -> Self {
91        Self {
92            read_write: true,
93            ..Default::default()
94        }
95    }
96
97    /// Create flags for read-write access with creation if needed.
98    pub fn create_read_write() -> Self {
99        Self {
100            read_write: true,
101            create: true,
102            ..Default::default()
103        }
104    }
105
106    fn to_sqlite_flags(self) -> c_int {
107        let mut flags = 0;
108
109        if self.read_only {
110            flags |= ffi::SQLITE_OPEN_READONLY;
111        }
112        if self.read_write {
113            flags |= ffi::SQLITE_OPEN_READWRITE;
114        }
115        if self.create {
116            flags |= ffi::SQLITE_OPEN_CREATE;
117        }
118        if self.uri {
119            flags |= ffi::SQLITE_OPEN_URI;
120        }
121        if self.no_mutex {
122            flags |= ffi::SQLITE_OPEN_NOMUTEX;
123        }
124        if self.full_mutex {
125            flags |= ffi::SQLITE_OPEN_FULLMUTEX;
126        }
127        if self.shared_cache {
128            flags |= ffi::SQLITE_OPEN_SHAREDCACHE;
129        }
130        if self.private_cache {
131            flags |= ffi::SQLITE_OPEN_PRIVATECACHE;
132        }
133
134        // Default to read-write if no mode specified
135        if flags & (ffi::SQLITE_OPEN_READONLY | ffi::SQLITE_OPEN_READWRITE) == 0 {
136            flags |= ffi::SQLITE_OPEN_READWRITE | ffi::SQLITE_OPEN_CREATE;
137        }
138
139        flags
140    }
141}
142
143impl Default for SqliteConfig {
144    fn default() -> Self {
145        Self {
146            path: ":memory:".to_string(),
147            flags: OpenFlags::create_read_write(),
148            busy_timeout_ms: 5000,
149        }
150    }
151}
152
153impl SqliteConfig {
154    /// Create a new config for a file-based database.
155    pub fn file(path: impl Into<String>) -> Self {
156        Self {
157            path: path.into(),
158            flags: OpenFlags::create_read_write(),
159            busy_timeout_ms: 5000,
160        }
161    }
162
163    /// Create a new config for an in-memory database.
164    pub fn memory() -> Self {
165        Self::default()
166    }
167
168    /// Set open flags.
169    pub fn flags(mut self, flags: OpenFlags) -> Self {
170        self.flags = flags;
171        self
172    }
173
174    /// Set busy timeout.
175    pub fn busy_timeout(mut self, ms: u32) -> Self {
176        self.busy_timeout_ms = ms;
177        self
178    }
179}
180
181/// Inner state of the SQLite connection, protected by a mutex for thread safety.
182struct SqliteInner {
183    db: *mut ffi::sqlite3,
184    in_transaction: bool,
185}
186
187// SAFETY: SQLite handles can be safely sent between threads when using
188// SQLITE_OPEN_FULLMUTEX (serialized mode) or when properly synchronized.
189// We use a Mutex to ensure synchronization.
190unsafe impl Send for SqliteInner {}
191
192/// A connection to a SQLite database.
193///
194/// This is a thread-safe wrapper around a SQLite database handle.
195pub struct SqliteConnection {
196    inner: Mutex<SqliteInner>,
197    path: String,
198    /// Optional console for rich output
199    #[cfg(feature = "console")]
200    console: Option<Arc<SqlModelConsole>>,
201}
202
203// SqliteConnection is Send + Sync because all access goes through the Mutex
204unsafe impl Send for SqliteConnection {}
205unsafe impl Sync for SqliteConnection {}
206
207impl SqliteConnection {
208    /// Open a new SQLite connection with the given configuration.
209    pub fn open(config: &SqliteConfig) -> Result<Self, Error> {
210        let c_path = CString::new(config.path.as_str()).map_err(|_| {
211            Error::Connection(ConnectionError {
212                kind: ConnectionErrorKind::Connect,
213                message: "Invalid path: contains null byte".to_string(),
214                source: None,
215            })
216        })?;
217
218        let mut db: *mut ffi::sqlite3 = ptr::null_mut();
219        let flags = config.flags.to_sqlite_flags();
220
221        // SAFETY: We pass valid pointers and check the return value
222        let rc = unsafe { ffi::sqlite3_open_v2(c_path.as_ptr(), &mut db, flags, ptr::null()) };
223
224        if rc != ffi::SQLITE_OK {
225            let msg = if !db.is_null() {
226                // SAFETY: db is valid, errmsg returns a valid C string
227                unsafe {
228                    let err_ptr = ffi::sqlite3_errmsg(db);
229                    let msg = CStr::from_ptr(err_ptr).to_string_lossy().into_owned();
230                    ffi::sqlite3_close(db);
231                    msg
232                }
233            } else {
234                ffi::error_string(rc).to_string()
235            };
236
237            return Err(Error::Connection(ConnectionError {
238                kind: ConnectionErrorKind::Connect,
239                message: format!("Failed to open database: {}", msg),
240                source: None,
241            }));
242        }
243
244        // Set busy timeout
245        if config.busy_timeout_ms > 0 {
246            // SAFETY: db is valid
247            unsafe {
248                ffi::sqlite3_busy_timeout(db, config.busy_timeout_ms as c_int);
249            }
250        }
251
252        Ok(Self {
253            inner: Mutex::new(SqliteInner {
254                db,
255                in_transaction: false,
256            }),
257            path: config.path.clone(),
258            #[cfg(feature = "console")]
259            console: None,
260        })
261    }
262
263    /// Open an in-memory database.
264    pub fn open_memory() -> Result<Self, Error> {
265        Self::open(&SqliteConfig::memory())
266    }
267
268    /// Open a file-based database.
269    pub fn open_file(path: impl Into<String>) -> Result<Self, Error> {
270        Self::open(&SqliteConfig::file(path))
271    }
272
273    /// Get the database path.
274    pub fn path(&self) -> &str {
275        &self.path
276    }
277
278    /// Execute SQL directly without preparing (for DDL, etc.)
279    pub fn execute_raw(&self, sql: &str) -> Result<(), Error> {
280        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
281        let c_sql = CString::new(sql).map_err(|_| {
282            Error::Query(QueryError {
283                kind: QueryErrorKind::Syntax,
284                sql: Some(sql.to_string()),
285                sqlstate: None,
286                message: "SQL contains null byte".to_string(),
287                detail: None,
288                hint: None,
289                position: None,
290                source: None,
291            })
292        })?;
293
294        let mut errmsg: *mut std::ffi::c_char = ptr::null_mut();
295
296        // SAFETY: All pointers are valid
297        let rc = unsafe {
298            ffi::sqlite3_exec(inner.db, c_sql.as_ptr(), None, ptr::null_mut(), &mut errmsg)
299        };
300
301        if rc != ffi::SQLITE_OK {
302            let msg = if !errmsg.is_null() {
303                // SAFETY: errmsg is valid
304                let msg = unsafe { CStr::from_ptr(errmsg).to_string_lossy().into_owned() };
305                unsafe { ffi::sqlite3_free(errmsg.cast()) };
306                msg
307            } else {
308                ffi::error_string(rc).to_string()
309            };
310
311            return Err(Error::Query(QueryError {
312                kind: error_code_to_kind(rc),
313                sql: Some(sql.to_string()),
314                sqlstate: None,
315                message: msg,
316                detail: None,
317                hint: None,
318                position: None,
319                source: None,
320            }));
321        }
322
323        Ok(())
324    }
325
326    /// Backup the current database to a destination path using the SQLite backup API.
327    ///
328    /// This opens (or creates) the destination database and performs an online backup
329    /// from this connection's `main` database into the destination's `main` database.
330    pub fn backup_to_path(&self, dest_path: impl AsRef<str>) -> Result<(), Error> {
331        let dest = SqliteConnection::open(
332            &SqliteConfig::file(dest_path.as_ref()).flags(OpenFlags::create_read_write()),
333        )?;
334        self.backup_to_connection(&dest)
335    }
336
337    /// Backup the current database to another open SQLite connection.
338    pub fn backup_to_connection(&self, dest: &SqliteConnection) -> Result<(), Error> {
339        let self_first = (std::ptr::from_ref(self) as usize) <= (std::ptr::from_ref(dest) as usize);
340        let (source_guard, dest_guard) = if self_first {
341            let source_guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
342            let dest_guard = dest.inner.lock().unwrap_or_else(|e| e.into_inner());
343            (source_guard, dest_guard)
344        } else {
345            let dest_guard = dest.inner.lock().unwrap_or_else(|e| e.into_inner());
346            let source_guard = self.inner.lock().unwrap_or_else(|e| e.into_inner());
347            (source_guard, dest_guard)
348        };
349
350        let source_db = source_guard.db;
351        let dest_db = dest_guard.db;
352
353        let main = CString::new("main").expect("static sqlite db name");
354
355        // SAFETY: We hold locks on both connections; db pointers are valid.
356        let backup =
357            unsafe { ffi::sqlite3_backup_init(dest_db, main.as_ptr(), source_db, main.as_ptr()) };
358        if backup.is_null() {
359            let msg = unsafe { CStr::from_ptr(ffi::sqlite3_errmsg(dest_db)) }
360                .to_string_lossy()
361                .into_owned();
362            return Err(Error::Connection(ConnectionError {
363                kind: ConnectionErrorKind::Connect,
364                message: format!("SQLite backup init failed: {msg}"),
365                source: None,
366            }));
367        }
368
369        let mut rc = unsafe { ffi::sqlite3_backup_step(backup, 100) };
370        loop {
371            if rc == ffi::SQLITE_DONE {
372                break;
373            }
374            if rc == ffi::SQLITE_OK {
375                rc = unsafe { ffi::sqlite3_backup_step(backup, 100) };
376                continue;
377            }
378            if rc == ffi::SQLITE_BUSY || rc == ffi::SQLITE_LOCKED {
379                std::thread::sleep(Duration::from_millis(50));
380                rc = unsafe { ffi::sqlite3_backup_step(backup, 100) };
381                continue;
382            }
383            break;
384        }
385
386        let finish_rc = unsafe { ffi::sqlite3_backup_finish(backup) };
387
388        if rc != ffi::SQLITE_DONE && rc != ffi::SQLITE_OK {
389            let msg = unsafe { CStr::from_ptr(ffi::sqlite3_errmsg(dest_db)) }
390                .to_string_lossy()
391                .into_owned();
392            return Err(Error::Connection(ConnectionError {
393                kind: ConnectionErrorKind::Connect,
394                message: format!("SQLite backup failed: {} ({})", msg, ffi::error_string(rc)),
395                source: None,
396            }));
397        }
398
399        if finish_rc != ffi::SQLITE_OK {
400            let msg = unsafe { CStr::from_ptr(ffi::sqlite3_errmsg(dest_db)) }
401                .to_string_lossy()
402                .into_owned();
403            return Err(Error::Connection(ConnectionError {
404                kind: ConnectionErrorKind::Connect,
405                message: format!(
406                    "SQLite backup finish failed: {} ({})",
407                    msg,
408                    ffi::error_string(finish_rc)
409                ),
410                source: None,
411            }));
412        }
413
414        Ok(())
415    }
416
417    /// Get the last insert rowid.
418    pub fn last_insert_rowid(&self) -> i64 {
419        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
420        // SAFETY: db is valid
421        unsafe { ffi::sqlite3_last_insert_rowid(inner.db) }
422    }
423
424    /// Get the number of rows changed by the last statement.
425    pub fn changes(&self) -> i32 {
426        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
427        // SAFETY: db is valid
428        unsafe { ffi::sqlite3_changes(inner.db) }
429    }
430
431    /// Prepare and execute a query synchronously, returning all rows.
432    ///
433    /// This is a blocking operation suitable for simple use cases.
434    /// For async usage, use the `Connection` trait methods instead.
435    pub fn query_sync(&self, sql: &str, params: &[Value]) -> Result<Vec<Row>, Error> {
436        #[cfg(feature = "console")]
437        let start = std::time::Instant::now();
438
439        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
440        let stmt = prepare_stmt(inner.db, sql)?;
441
442        // Bind parameters
443        for (i, param) in params.iter().enumerate() {
444            // SAFETY: stmt is valid, index is 1-based
445            let rc = unsafe { types::bind_value(stmt, (i + 1) as c_int, param) };
446            if rc != ffi::SQLITE_OK {
447                // SAFETY: stmt is valid
448                unsafe { ffi::sqlite3_finalize(stmt) };
449                return Err(bind_error(inner.db, sql, i + 1));
450            }
451        }
452
453        // Fetch column names
454        // SAFETY: stmt is valid
455        let col_count = unsafe { ffi::sqlite3_column_count(stmt) };
456        let mut col_names = Vec::with_capacity(col_count as usize);
457        for i in 0..col_count {
458            let name =
459                unsafe { types::column_name(stmt, i) }.unwrap_or_else(|| format!("col{}", i));
460            col_names.push(name);
461        }
462        let columns = Arc::new(ColumnInfo::new(col_names.clone()));
463
464        // Fetch rows
465        let mut rows = Vec::new();
466        loop {
467            // SAFETY: stmt is valid
468            let rc = unsafe { ffi::sqlite3_step(stmt) };
469            match rc {
470                ffi::SQLITE_ROW => {
471                    let mut values = Vec::with_capacity(col_count as usize);
472                    for i in 0..col_count {
473                        // SAFETY: stmt is valid, we just got SQLITE_ROW
474                        let value = unsafe { types::read_column(stmt, i) };
475                        values.push(value);
476                    }
477                    rows.push(Row::with_columns(Arc::clone(&columns), values));
478                }
479                ffi::SQLITE_DONE => break,
480                _ => {
481                    // SAFETY: stmt is valid
482                    unsafe { ffi::sqlite3_finalize(stmt) };
483                    return Err(step_error(inner.db, sql));
484                }
485            }
486        }
487
488        // SAFETY: stmt is valid
489        unsafe { ffi::sqlite3_finalize(stmt) };
490
491        // Emit console output for PRAGMA queries and timing
492        #[cfg(feature = "console")]
493        {
494            let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
495            self.emit_query_result(sql, &col_names, &rows, elapsed_ms);
496        }
497
498        Ok(rows)
499    }
500
501    /// Prepare and execute a statement synchronously, returning rows affected.
502    ///
503    /// This is a blocking operation suitable for simple use cases.
504    /// For async usage, use the `Connection` trait methods instead.
505    pub fn execute_sync(&self, sql: &str, params: &[Value]) -> Result<u64, Error> {
506        #[cfg(feature = "console")]
507        let start = std::time::Instant::now();
508
509        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
510        let stmt = prepare_stmt(inner.db, sql)?;
511
512        // Bind parameters
513        for (i, param) in params.iter().enumerate() {
514            // SAFETY: stmt is valid
515            let rc = unsafe { types::bind_value(stmt, (i + 1) as c_int, param) };
516            if rc != ffi::SQLITE_OK {
517                // SAFETY: stmt is valid
518                unsafe { ffi::sqlite3_finalize(stmt) };
519                return Err(bind_error(inner.db, sql, i + 1));
520            }
521        }
522
523        // Execute
524        // SAFETY: stmt is valid
525        let rc = unsafe { ffi::sqlite3_step(stmt) };
526
527        // SAFETY: stmt is valid
528        unsafe { ffi::sqlite3_finalize(stmt) };
529
530        match rc {
531            ffi::SQLITE_DONE | ffi::SQLITE_ROW => {
532                // SAFETY: db is valid
533                let changes = unsafe { ffi::sqlite3_changes(inner.db) };
534
535                #[cfg(feature = "console")]
536                {
537                    let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
538                    self.emit_execute_timing(sql, changes as u64, elapsed_ms);
539                }
540
541                Ok(changes as u64)
542            }
543            _ => Err(step_error(inner.db, sql)),
544        }
545    }
546
547    /// Execute an INSERT and return the last inserted rowid.
548    fn insert_sync(&self, sql: &str, params: &[Value]) -> Result<i64, Error> {
549        self.execute_sync(sql, params)?;
550        Ok(self.last_insert_rowid())
551    }
552
553    /// Begin a transaction.
554    fn begin_sync(&self, isolation: IsolationLevel) -> Result<(), Error> {
555        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
556        if inner.in_transaction {
557            return Err(Error::Query(QueryError {
558                kind: QueryErrorKind::Database,
559                sql: None,
560                sqlstate: None,
561                message: "Already in a transaction".to_string(),
562                detail: None,
563                hint: None,
564                position: None,
565                source: None,
566            }));
567        }
568
569        // SQLite doesn't support isolation levels in the same way as PostgreSQL,
570        // but we can approximate with different transaction types
571        let begin_sql = match isolation {
572            IsolationLevel::Serializable => "BEGIN EXCLUSIVE",
573            IsolationLevel::RepeatableRead | IsolationLevel::ReadCommitted => "BEGIN IMMEDIATE",
574            IsolationLevel::ReadUncommitted => "BEGIN DEFERRED",
575        };
576
577        drop(inner); // Release lock before calling execute_raw
578        self.execute_raw(begin_sql)?;
579
580        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
581        inner.in_transaction = true;
582        self.emit_transaction_state("BEGIN");
583        Ok(())
584    }
585
586    /// Commit the current transaction.
587    fn commit_sync(&self) -> Result<(), Error> {
588        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
589        if !inner.in_transaction {
590            return Err(Error::Query(QueryError {
591                kind: QueryErrorKind::Database,
592                sql: None,
593                sqlstate: None,
594                message: "Not in a transaction".to_string(),
595                detail: None,
596                hint: None,
597                position: None,
598                source: None,
599            }));
600        }
601
602        drop(inner);
603        self.execute_raw("COMMIT")?;
604
605        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
606        inner.in_transaction = false;
607        self.emit_transaction_state("COMMIT");
608        Ok(())
609    }
610
611    /// Rollback the current transaction.
612    fn rollback_sync(&self) -> Result<(), Error> {
613        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
614        if !inner.in_transaction {
615            return Err(Error::Query(QueryError {
616                kind: QueryErrorKind::Database,
617                sql: None,
618                sqlstate: None,
619                message: "Not in a transaction".to_string(),
620                detail: None,
621                hint: None,
622                position: None,
623                source: None,
624            }));
625        }
626
627        drop(inner);
628        self.execute_raw("ROLLBACK")?;
629
630        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
631        inner.in_transaction = false;
632        self.emit_transaction_state("ROLLBACK");
633        Ok(())
634    }
635}
636
637impl Drop for SqliteConnection {
638    fn drop(&mut self) {
639        if let Ok(inner) = self.inner.lock() {
640            if !inner.db.is_null() {
641                // SAFETY: db is valid
642                unsafe {
643                    ffi::sqlite3_close_v2(inner.db);
644                }
645            }
646        }
647    }
648}
649
650/// A SQLite transaction.
651pub struct SqliteTransaction<'conn> {
652    conn: &'conn SqliteConnection,
653    committed: bool,
654}
655
656impl<'conn> SqliteTransaction<'conn> {
657    fn new(conn: &'conn SqliteConnection) -> Self {
658        Self {
659            conn,
660            committed: false,
661        }
662    }
663}
664
665impl Drop for SqliteTransaction<'_> {
666    fn drop(&mut self) {
667        if !self.committed {
668            // Auto-rollback on drop if not committed
669            let _ = self.conn.rollback_sync();
670        }
671    }
672}
673
674// Implement Connection trait for SqliteConnection
675impl Connection for SqliteConnection {
676    type Tx<'conn>
677        = SqliteTransaction<'conn>
678    where
679        Self: 'conn;
680
681    fn dialect(&self) -> sqlmodel_core::Dialect {
682        sqlmodel_core::Dialect::Sqlite
683    }
684
685    fn query(
686        &self,
687        _cx: &Cx,
688        sql: &str,
689        params: &[Value],
690    ) -> impl Future<Output = Outcome<Vec<Row>, Error>> + Send {
691        let result = self.query_sync(sql, params);
692        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
693    }
694
695    fn query_one(
696        &self,
697        _cx: &Cx,
698        sql: &str,
699        params: &[Value],
700    ) -> impl Future<Output = Outcome<Option<Row>, Error>> + Send {
701        let result = self.query_sync(sql, params).map(|mut rows| rows.pop());
702        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
703    }
704
705    fn execute(
706        &self,
707        _cx: &Cx,
708        sql: &str,
709        params: &[Value],
710    ) -> impl Future<Output = Outcome<u64, Error>> + Send {
711        let result = self.execute_sync(sql, params);
712        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
713    }
714
715    fn insert(
716        &self,
717        _cx: &Cx,
718        sql: &str,
719        params: &[Value],
720    ) -> impl Future<Output = Outcome<i64, Error>> + Send {
721        let result = self.insert_sync(sql, params);
722        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
723    }
724
725    fn batch(
726        &self,
727        _cx: &Cx,
728        statements: &[(String, Vec<Value>)],
729    ) -> impl Future<Output = Outcome<Vec<u64>, Error>> + Send {
730        let mut results = Vec::with_capacity(statements.len());
731        let mut error = None;
732
733        for (sql, params) in statements {
734            match self.execute_sync(sql, params) {
735                Ok(n) => results.push(n),
736                Err(e) => {
737                    error = Some(e);
738                    break;
739                }
740            }
741        }
742
743        async move {
744            match error {
745                Some(e) => Outcome::Err(e),
746                None => Outcome::Ok(results),
747            }
748        }
749    }
750
751    fn begin(&self, cx: &Cx) -> impl Future<Output = Outcome<Self::Tx<'_>, Error>> + Send {
752        self.begin_with(cx, IsolationLevel::default())
753    }
754
755    fn begin_with(
756        &self,
757        _cx: &Cx,
758        isolation: IsolationLevel,
759    ) -> impl Future<Output = Outcome<Self::Tx<'_>, Error>> + Send {
760        let result = self
761            .begin_sync(isolation)
762            .map(|()| SqliteTransaction::new(self));
763        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
764    }
765
766    fn prepare(
767        &self,
768        _cx: &Cx,
769        sql: &str,
770    ) -> impl Future<Output = Outcome<PreparedStatement, Error>> + Send {
771        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
772        let result = prepare_stmt(inner.db, sql).map(|stmt| {
773            // SAFETY: stmt is valid
774            let param_count = unsafe { ffi::sqlite3_bind_parameter_count(stmt) } as usize;
775            let col_count = unsafe { ffi::sqlite3_column_count(stmt) } as c_int;
776
777            let mut columns = Vec::with_capacity(col_count as usize);
778            for i in 0..col_count {
779                if let Some(name) = unsafe { types::column_name(stmt, i) } {
780                    columns.push(name);
781                }
782            }
783
784            // SAFETY: stmt is valid
785            unsafe { ffi::sqlite3_finalize(stmt) };
786
787            // Use address as pseudo-ID since we don't cache statements yet
788            let id = sql.as_ptr() as u64;
789            PreparedStatement::with_columns(id, sql.to_string(), param_count, columns)
790        });
791
792        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
793    }
794
795    fn query_prepared(
796        &self,
797        cx: &Cx,
798        stmt: &PreparedStatement,
799        params: &[Value],
800    ) -> impl Future<Output = Outcome<Vec<Row>, Error>> + Send {
801        // For now, just re-execute the SQL
802        // Future optimization: cache prepared statements
803        self.query(cx, stmt.sql(), params)
804    }
805
806    fn execute_prepared(
807        &self,
808        cx: &Cx,
809        stmt: &PreparedStatement,
810        params: &[Value],
811    ) -> impl Future<Output = Outcome<u64, Error>> + Send {
812        self.execute(cx, stmt.sql(), params)
813    }
814
815    fn ping(&self, _cx: &Cx) -> impl Future<Output = Outcome<(), Error>> + Send {
816        // Simple ping: execute a trivial query
817        let result = self.query_sync("SELECT 1", &[]).map(|_| ());
818        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
819    }
820
821    async fn close(self, _cx: &Cx) -> sqlmodel_core::Result<()> {
822        // Connection is closed on drop
823        Ok(())
824    }
825}
826
827// Implement TransactionOps for SqliteTransaction
828impl TransactionOps for SqliteTransaction<'_> {
829    fn query(
830        &self,
831        _cx: &Cx,
832        sql: &str,
833        params: &[Value],
834    ) -> impl Future<Output = Outcome<Vec<Row>, Error>> + Send {
835        let result = self.conn.query_sync(sql, params);
836        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
837    }
838
839    fn query_one(
840        &self,
841        _cx: &Cx,
842        sql: &str,
843        params: &[Value],
844    ) -> impl Future<Output = Outcome<Option<Row>, Error>> + Send {
845        let result = self.conn.query_sync(sql, params).map(|mut rows| rows.pop());
846        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
847    }
848
849    fn execute(
850        &self,
851        _cx: &Cx,
852        sql: &str,
853        params: &[Value],
854    ) -> impl Future<Output = Outcome<u64, Error>> + Send {
855        let result = self.conn.execute_sync(sql, params);
856        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
857    }
858
859    fn savepoint(&self, _cx: &Cx, name: &str) -> impl Future<Output = Outcome<(), Error>> + Send {
860        // Quote identifier to prevent SQL injection
861        let quoted_name = format!("\"{}\"", name.replace('"', "\"\""));
862        let sql = format!("SAVEPOINT {}", quoted_name);
863        let result = self.conn.execute_raw(&sql);
864        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
865    }
866
867    fn rollback_to(&self, _cx: &Cx, name: &str) -> impl Future<Output = Outcome<(), Error>> + Send {
868        // Quote identifier to prevent SQL injection
869        let quoted_name = format!("\"{}\"", name.replace('"', "\"\""));
870        let sql = format!("ROLLBACK TO {}", quoted_name);
871        let result = self.conn.execute_raw(&sql);
872        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
873    }
874
875    fn release(&self, _cx: &Cx, name: &str) -> impl Future<Output = Outcome<(), Error>> + Send {
876        // Quote identifier to prevent SQL injection
877        let quoted_name = format!("\"{}\"", name.replace('"', "\"\""));
878        let sql = format!("RELEASE {}", quoted_name);
879        let result = self.conn.execute_raw(&sql);
880        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
881    }
882
883    async fn commit(mut self, _cx: &Cx) -> Outcome<(), Error> {
884        self.committed = true;
885        self.conn
886            .commit_sync()
887            .map_or_else(Outcome::Err, Outcome::Ok)
888    }
889
890    async fn rollback(mut self, _cx: &Cx) -> Outcome<(), Error> {
891        self.committed = true; // Prevent double rollback in drop
892        self.conn
893            .rollback_sync()
894            .map_or_else(Outcome::Err, Outcome::Ok)
895    }
896}
897
898// Helper functions
899
900fn prepare_stmt(db: *mut ffi::sqlite3, sql: &str) -> Result<*mut ffi::sqlite3_stmt, Error> {
901    let c_sql = CString::new(sql).map_err(|_| {
902        Error::Query(QueryError {
903            kind: QueryErrorKind::Syntax,
904            sql: Some(sql.to_string()),
905            sqlstate: None,
906            message: "SQL contains null byte".to_string(),
907            detail: None,
908            hint: None,
909            position: None,
910            source: None,
911        })
912    })?;
913
914    let mut stmt: *mut ffi::sqlite3_stmt = ptr::null_mut();
915
916    // SAFETY: All pointers are valid
917    let rc = unsafe {
918        ffi::sqlite3_prepare_v2(
919            db,
920            c_sql.as_ptr(),
921            c_sql.as_bytes().len() as c_int,
922            &mut stmt,
923            ptr::null_mut(),
924        )
925    };
926
927    if rc != ffi::SQLITE_OK {
928        return Err(prepare_error(db, sql));
929    }
930
931    Ok(stmt)
932}
933
934fn prepare_error(db: *mut ffi::sqlite3, sql: &str) -> Error {
935    // SAFETY: db is valid
936    let msg = unsafe {
937        let ptr = ffi::sqlite3_errmsg(db);
938        CStr::from_ptr(ptr).to_string_lossy().into_owned()
939    };
940    let code = unsafe { ffi::sqlite3_errcode(db) };
941
942    Error::Query(QueryError {
943        kind: error_code_to_kind(code),
944        sql: Some(sql.to_string()),
945        sqlstate: None,
946        message: msg,
947        detail: None,
948        hint: None,
949        position: None,
950        source: None,
951    })
952}
953
954fn bind_error(db: *mut ffi::sqlite3, sql: &str, param_index: usize) -> Error {
955    // SAFETY: db is valid
956    let msg = unsafe {
957        let ptr = ffi::sqlite3_errmsg(db);
958        CStr::from_ptr(ptr).to_string_lossy().into_owned()
959    };
960
961    Error::Query(QueryError {
962        kind: QueryErrorKind::Database,
963        sql: Some(sql.to_string()),
964        sqlstate: None,
965        message: format!("Failed to bind parameter {}: {}", param_index, msg),
966        detail: None,
967        hint: None,
968        position: None,
969        source: None,
970    })
971}
972
973fn step_error(db: *mut ffi::sqlite3, sql: &str) -> Error {
974    // SAFETY: db is valid
975    let msg = unsafe {
976        let ptr = ffi::sqlite3_errmsg(db);
977        CStr::from_ptr(ptr).to_string_lossy().into_owned()
978    };
979    let code = unsafe { ffi::sqlite3_errcode(db) };
980
981    Error::Query(QueryError {
982        kind: error_code_to_kind(code),
983        sql: Some(sql.to_string()),
984        sqlstate: None,
985        message: msg,
986        detail: None,
987        hint: None,
988        position: None,
989        source: None,
990    })
991}
992
993fn error_code_to_kind(code: c_int) -> QueryErrorKind {
994    match code {
995        ffi::SQLITE_CONSTRAINT => QueryErrorKind::Constraint,
996        ffi::SQLITE_BUSY | ffi::SQLITE_LOCKED => QueryErrorKind::Deadlock,
997        ffi::SQLITE_PERM | ffi::SQLITE_AUTH => QueryErrorKind::Permission,
998        ffi::SQLITE_NOTFOUND => QueryErrorKind::NotFound,
999        ffi::SQLITE_TOOBIG => QueryErrorKind::DataTruncation,
1000        ffi::SQLITE_INTERRUPT => QueryErrorKind::Cancelled,
1001        _ => QueryErrorKind::Database,
1002    }
1003}
1004
1005/// Format a Value for display in console output.
1006#[allow(dead_code)]
1007fn format_value(value: &Value) -> String {
1008    match value {
1009        Value::Null => "NULL".to_string(),
1010        Value::Bool(b) => if *b { "true" } else { "false" }.to_string(),
1011        Value::TinyInt(n) => n.to_string(),
1012        Value::SmallInt(n) => n.to_string(),
1013        Value::Int(n) => n.to_string(),
1014        Value::BigInt(n) => n.to_string(),
1015        Value::Float(n) => format!("{:.6}", n),
1016        Value::Double(n) => format!("{:.6}", n),
1017        Value::Text(s) => s.clone(),
1018        Value::Bytes(b) => format!("[BLOB: {} bytes]", b.len()),
1019        Value::Date(d) => d.to_string(),
1020        Value::Time(t) => t.to_string(),
1021        Value::Timestamp(ts) => ts.to_string(),
1022        Value::TimestampTz(ts) => ts.to_string(),
1023        Value::Json(j) => j.to_string(),
1024        Value::Uuid(u) => {
1025            // Format UUID as hex string: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
1026            format!(
1027                "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
1028                u[0],
1029                u[1],
1030                u[2],
1031                u[3],
1032                u[4],
1033                u[5],
1034                u[6],
1035                u[7],
1036                u[8],
1037                u[9],
1038                u[10],
1039                u[11],
1040                u[12],
1041                u[13],
1042                u[14],
1043                u[15]
1044            )
1045        }
1046        Value::Decimal(d) => d.to_string(),
1047        Value::Array(arr) => format!("[{} items]", arr.len()),
1048        Value::Default => "DEFAULT".to_string(),
1049    }
1050}
1051
1052// ==================== Console Support ====================
1053
1054#[cfg(feature = "console")]
1055impl ConsoleAware for SqliteConnection {
1056    fn set_console(&mut self, console: Option<Arc<SqlModelConsole>>) {
1057        self.console = console;
1058        // Emit database status when console is attached
1059        self.emit_open_status();
1060    }
1061
1062    fn console(&self) -> Option<&Arc<SqlModelConsole>> {
1063        self.console.as_ref()
1064    }
1065
1066    fn has_console(&self) -> bool {
1067        self.console.is_some()
1068    }
1069}
1070
1071impl SqliteConnection {
1072    /// Emit database open status to console if available.
1073    #[cfg(feature = "console")]
1074    fn emit_open_status(&self) {
1075        if let Some(console) = &self.console {
1076            // Get database info
1077            let mode = if self.path == ":memory:" {
1078                "in-memory"
1079            } else {
1080                "file"
1081            };
1082
1083            // Query journal mode if we can
1084            let journal_mode = self
1085                .query_sync("PRAGMA journal_mode", &[])
1086                .ok()
1087                .and_then(|rows| rows.first().and_then(|r| r.get_as::<String>(0).ok()));
1088
1089            let page_size = self
1090                .query_sync("PRAGMA page_size", &[])
1091                .ok()
1092                .and_then(|rows| rows.first().and_then(|r| r.get_as::<i64>(0).ok()));
1093
1094            if console.mode().is_plain() {
1095                // Plain text output for agents
1096                let journal = journal_mode.as_deref().unwrap_or("unknown");
1097                console.status(&format!(
1098                    "Opened SQLite database: {} ({} mode, journal: {})",
1099                    self.path, mode, journal
1100                ));
1101            } else {
1102                // Rich output
1103                console.status(&format!("SQLite database: {}", self.path));
1104                console.status(&format!("  Mode: {}", mode));
1105                if let Some(journal) = journal_mode {
1106                    console.status(&format!("  Journal: {}", journal.to_uppercase()));
1107                }
1108                if let Some(size) = page_size {
1109                    console.status(&format!("  Page size: {} bytes", size));
1110                }
1111            }
1112        }
1113    }
1114
1115    /// Emit transaction state to console if available.
1116    #[cfg(feature = "console")]
1117    fn emit_transaction_state(&self, state: &str) {
1118        if let Some(console) = &self.console {
1119            if console.mode().is_plain() {
1120                console.status(&format!("Transaction: {}", state));
1121            } else {
1122                console.status(&format!("[{}] Transaction {}", state, state.to_lowercase()));
1123            }
1124        }
1125    }
1126
1127    /// Emit query timing to console if available.
1128    #[cfg(feature = "console")]
1129    fn emit_query_timing(&self, elapsed_ms: f64, rows: usize) {
1130        if let Some(console) = &self.console {
1131            console.status(&format!("Query: {:.1}ms, {} rows", elapsed_ms, rows));
1132        }
1133    }
1134
1135    /// Emit query results with PRAGMA-aware formatting.
1136    #[cfg(feature = "console")]
1137    fn emit_query_result(&self, sql: &str, col_names: &[String], rows: &[Row], elapsed_ms: f64) {
1138        if let Some(console) = &self.console {
1139            // Check if this is a PRAGMA query for special formatting
1140            let sql_upper = sql.trim().to_uppercase();
1141            let is_pragma = sql_upper.starts_with("PRAGMA");
1142
1143            if is_pragma && !rows.is_empty() {
1144                // Format PRAGMA results as a table
1145                if console.mode().is_plain() {
1146                    // Plain text format for agents
1147                    console.status(&format!("{}:", sql.trim()));
1148                    // Header
1149                    console.status(&format!("  {}", col_names.join("|")));
1150                    // Rows
1151                    for row in rows.iter().take(20) {
1152                        let values: Vec<String> = (0..col_names.len())
1153                            .map(|i| {
1154                                row.get(i)
1155                                    .map(|v| format_value(v))
1156                                    .unwrap_or_else(|| "NULL".to_string())
1157                            })
1158                            .collect();
1159                        console.status(&format!("  {}", values.join("|")));
1160                    }
1161                    if rows.len() > 20 {
1162                        console.status(&format!("  ... and {} more rows", rows.len() - 20));
1163                    }
1164                    console.status(&format!("  ({:.1}ms)", elapsed_ms));
1165                } else {
1166                    // Rich format with table rendering
1167                    let mut table_output = String::new();
1168                    table_output.push_str(&format!("PRAGMA Query Results ({:.1}ms)\n", elapsed_ms));
1169
1170                    // Calculate column widths
1171                    let mut widths: Vec<usize> = col_names.iter().map(|c| c.len()).collect();
1172                    for row in rows.iter().take(20) {
1173                        for (i, w) in widths.iter_mut().enumerate() {
1174                            let val_len = row.get(i).map(|v| format_value(v).len()).unwrap_or(4); // "NULL".len()
1175                            if val_len > *w {
1176                                *w = val_len;
1177                            }
1178                        }
1179                    }
1180
1181                    // Build header separator
1182                    let sep: String = widths
1183                        .iter()
1184                        .map(|w| "-".repeat(*w + 2))
1185                        .collect::<Vec<_>>()
1186                        .join("+");
1187                    table_output.push_str(&format!("+{}+\n", sep));
1188
1189                    // Header row
1190                    let header: String = col_names
1191                        .iter()
1192                        .enumerate()
1193                        .map(|(i, name)| format!(" {:width$} ", name, width = widths[i]))
1194                        .collect::<Vec<_>>()
1195                        .join("|");
1196                    table_output.push_str(&format!("|{}|\n", header));
1197                    table_output.push_str(&format!("+{}+\n", sep));
1198
1199                    // Data rows
1200                    for row in rows.iter().take(20) {
1201                        let data: String = (0..col_names.len())
1202                            .map(|i| {
1203                                let val = row
1204                                    .get(i)
1205                                    .map(|v| format_value(v))
1206                                    .unwrap_or_else(|| "NULL".to_string());
1207                                format!(" {:width$} ", val, width = widths[i])
1208                            })
1209                            .collect::<Vec<_>>()
1210                            .join("|");
1211                        table_output.push_str(&format!("|{}|\n", data));
1212                    }
1213                    table_output.push_str(&format!("+{}+", sep));
1214
1215                    if rows.len() > 20 {
1216                        table_output.push_str(&format!("\n... and {} more rows", rows.len() - 20));
1217                    }
1218
1219                    console.status(&table_output);
1220                }
1221            } else {
1222                // Regular query timing
1223                self.emit_query_timing(elapsed_ms, rows.len());
1224            }
1225        }
1226    }
1227
1228    /// Emit execute operation timing to console.
1229    #[cfg(feature = "console")]
1230    fn emit_execute_timing(&self, sql: &str, rows_affected: u64, elapsed_ms: f64) {
1231        if let Some(console) = &self.console {
1232            let sql_upper = sql.trim().to_uppercase();
1233
1234            // Provide contextual message based on operation type
1235            let op_type = if sql_upper.starts_with("INSERT") {
1236                "Insert"
1237            } else if sql_upper.starts_with("UPDATE") {
1238                "Update"
1239            } else if sql_upper.starts_with("DELETE") {
1240                "Delete"
1241            } else if sql_upper.starts_with("CREATE") {
1242                "Create"
1243            } else if sql_upper.starts_with("DROP") {
1244                "Drop"
1245            } else if sql_upper.starts_with("ALTER") {
1246                "Alter"
1247            } else {
1248                "Execute"
1249            };
1250
1251            if console.mode().is_plain() {
1252                console.status(&format!(
1253                    "{}: {} rows affected ({:.1}ms)",
1254                    op_type, rows_affected, elapsed_ms
1255                ));
1256            } else {
1257                console.status(&format!(
1258                    "[{}] {} rows affected ({:.1}ms)",
1259                    op_type.to_uppercase(),
1260                    rows_affected,
1261                    elapsed_ms
1262                ));
1263            }
1264        }
1265    }
1266
1267    /// Emit busy waiting status to console.
1268    #[cfg(feature = "console")]
1269    pub fn emit_busy_waiting(&self, elapsed_secs: f64) {
1270        if let Some(console) = &self.console {
1271            if console.mode().is_plain() {
1272                console.status(&format!(
1273                    "Waiting for database lock... ({:.1}s)",
1274                    elapsed_secs
1275                ));
1276            } else {
1277                console.status(&format!(
1278                    "[..] Waiting for database lock... ({:.1}s)",
1279                    elapsed_secs
1280                ));
1281            }
1282        }
1283    }
1284
1285    /// Emit WAL checkpoint progress to console.
1286    #[cfg(feature = "console")]
1287    pub fn emit_checkpoint_progress(&self, pages_done: u32, pages_total: u32) {
1288        if let Some(console) = &self.console {
1289            let pct = if pages_total > 0 {
1290                (pages_done as f64 / pages_total as f64) * 100.0
1291            } else {
1292                100.0
1293            };
1294
1295            if console.mode().is_plain() {
1296                console.status(&format!(
1297                    "WAL checkpoint: {:.0}% ({}/{} pages)",
1298                    pct, pages_done, pages_total
1299                ));
1300            } else {
1301                // ASCII progress bar for rich mode
1302                let bar_width: usize = 20;
1303                let filled = ((pct / 100.0) * bar_width as f64).round() as usize;
1304                let empty = bar_width.saturating_sub(filled);
1305                let bar = format!("[{}{}]", "=".repeat(filled), " ".repeat(empty));
1306                console.status(&format!(
1307                    "WAL checkpoint: {} {:.0}% ({}/{} pages)",
1308                    bar, pct, pages_done, pages_total
1309                ));
1310            }
1311        }
1312    }
1313
1314    /// No-op when console feature is disabled.
1315    #[cfg(not(feature = "console"))]
1316    #[allow(dead_code)]
1317    fn emit_open_status(&self) {}
1318
1319    /// No-op when console feature is disabled.
1320    #[cfg(not(feature = "console"))]
1321    fn emit_transaction_state(&self, _state: &str) {}
1322
1323    /// No-op when console feature is disabled.
1324    #[cfg(not(feature = "console"))]
1325    #[allow(dead_code)]
1326    fn emit_query_timing(&self, _elapsed_ms: f64, _rows: usize) {}
1327
1328    /// No-op when console feature is disabled.
1329    #[cfg(not(feature = "console"))]
1330    #[allow(dead_code)]
1331    fn emit_query_result(
1332        &self,
1333        _sql: &str,
1334        _col_names: &[String],
1335        _rows: &[Row],
1336        _elapsed_ms: f64,
1337    ) {
1338    }
1339
1340    /// No-op when console feature is disabled.
1341    #[cfg(not(feature = "console"))]
1342    #[allow(dead_code)]
1343    fn emit_execute_timing(&self, _sql: &str, _rows_affected: u64, _elapsed_ms: f64) {}
1344
1345    /// No-op when console feature is disabled.
1346    #[cfg(not(feature = "console"))]
1347    pub fn emit_busy_waiting(&self, _elapsed_secs: f64) {}
1348
1349    /// No-op when console feature is disabled.
1350    #[cfg(not(feature = "console"))]
1351    pub fn emit_checkpoint_progress(&self, _pages_done: u32, _pages_total: u32) {}
1352}
1353
1354#[cfg(test)]
1355mod tests {
1356    use super::*;
1357
1358    #[test]
1359    fn test_open_memory() {
1360        let conn = SqliteConnection::open_memory().unwrap();
1361        assert_eq!(conn.path(), ":memory:");
1362    }
1363
1364    #[test]
1365    fn test_execute_raw() {
1366        let conn = SqliteConnection::open_memory().unwrap();
1367        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1368            .unwrap();
1369        conn.execute_raw("INSERT INTO test (name) VALUES ('Alice')")
1370            .unwrap();
1371        assert_eq!(conn.changes(), 1);
1372        assert_eq!(conn.last_insert_rowid(), 1);
1373    }
1374
1375    #[test]
1376    fn test_query_sync() {
1377        let conn = SqliteConnection::open_memory().unwrap();
1378        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1379            .unwrap();
1380        conn.execute_raw("INSERT INTO test (name) VALUES ('Alice'), ('Bob')")
1381            .unwrap();
1382
1383        let rows = conn
1384            .query_sync("SELECT * FROM test ORDER BY id", &[])
1385            .unwrap();
1386        assert_eq!(rows.len(), 2);
1387
1388        assert_eq!(rows[0].get_named::<i32>("id").unwrap(), 1);
1389        assert_eq!(rows[0].get_named::<String>("name").unwrap(), "Alice");
1390        assert_eq!(rows[1].get_named::<i32>("id").unwrap(), 2);
1391        assert_eq!(rows[1].get_named::<String>("name").unwrap(), "Bob");
1392    }
1393
1394    #[test]
1395    fn test_parameterized_query() {
1396        let conn = SqliteConnection::open_memory().unwrap();
1397        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)")
1398            .unwrap();
1399
1400        conn.execute_sync(
1401            "INSERT INTO test (name, age) VALUES (?, ?)",
1402            &[Value::Text("Alice".to_string()), Value::Int(30)],
1403        )
1404        .unwrap();
1405
1406        let rows = conn
1407            .query_sync(
1408                "SELECT * FROM test WHERE name = ?",
1409                &[Value::Text("Alice".to_string())],
1410            )
1411            .unwrap();
1412
1413        assert_eq!(rows.len(), 1);
1414        assert_eq!(rows[0].get_named::<String>("name").unwrap(), "Alice");
1415        assert_eq!(rows[0].get_named::<i32>("age").unwrap(), 30);
1416    }
1417
1418    #[test]
1419    fn test_null_handling() {
1420        let conn = SqliteConnection::open_memory().unwrap();
1421        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1422            .unwrap();
1423
1424        conn.execute_sync("INSERT INTO test (name) VALUES (?)", &[Value::Null])
1425            .unwrap();
1426
1427        let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1428        assert_eq!(rows.len(), 1);
1429        assert_eq!(rows[0].get_named::<Option<String>>("name").unwrap(), None);
1430    }
1431
1432    #[test]
1433    fn test_transaction() {
1434        let conn = SqliteConnection::open_memory().unwrap();
1435        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1436            .unwrap();
1437
1438        // Start transaction, insert, rollback
1439        conn.begin_sync(IsolationLevel::default()).unwrap();
1440        conn.execute_sync(
1441            "INSERT INTO test (name) VALUES (?)",
1442            &[Value::Text("Alice".to_string())],
1443        )
1444        .unwrap();
1445        conn.rollback_sync().unwrap();
1446
1447        // Verify rollback worked
1448        let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1449        assert_eq!(rows.len(), 0);
1450
1451        // Start transaction, insert, commit
1452        conn.begin_sync(IsolationLevel::default()).unwrap();
1453        conn.execute_sync(
1454            "INSERT INTO test (name) VALUES (?)",
1455            &[Value::Text("Bob".to_string())],
1456        )
1457        .unwrap();
1458        conn.commit_sync().unwrap();
1459
1460        // Verify commit worked
1461        let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1462        assert_eq!(rows.len(), 1);
1463        assert_eq!(rows[0].get_named::<String>("name").unwrap(), "Bob");
1464    }
1465
1466    #[test]
1467    fn test_insert_rowid() {
1468        let conn = SqliteConnection::open_memory().unwrap();
1469        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1470            .unwrap();
1471
1472        let rowid = conn
1473            .insert_sync(
1474                "INSERT INTO test (name) VALUES (?)",
1475                &[Value::Text("Alice".to_string())],
1476            )
1477            .unwrap();
1478        assert_eq!(rowid, 1);
1479
1480        let rowid = conn
1481            .insert_sync(
1482                "INSERT INTO test (name) VALUES (?)",
1483                &[Value::Text("Bob".to_string())],
1484            )
1485            .unwrap();
1486        assert_eq!(rowid, 2);
1487    }
1488
1489    #[test]
1490    #[allow(clippy::approx_constant)]
1491    fn test_type_conversions() {
1492        let conn = SqliteConnection::open_memory().unwrap();
1493        conn.execute_raw(
1494            "CREATE TABLE types (
1495                b BOOLEAN,
1496                i INTEGER,
1497                f REAL,
1498                t TEXT,
1499                bl BLOB
1500            )",
1501        )
1502        .unwrap();
1503
1504        conn.execute_sync(
1505            "INSERT INTO types VALUES (?, ?, ?, ?, ?)",
1506            &[
1507                Value::Bool(true),
1508                Value::BigInt(42),
1509                Value::Double(3.14),
1510                Value::Text("hello".to_string()),
1511                Value::Bytes(vec![1, 2, 3]),
1512            ],
1513        )
1514        .unwrap();
1515
1516        let rows = conn.query_sync("SELECT * FROM types", &[]).unwrap();
1517        assert_eq!(rows.len(), 1);
1518
1519        // SQLite stores booleans as integers
1520        let b: i32 = rows[0].get_named("b").unwrap();
1521        assert_eq!(b, 1);
1522
1523        let i: i32 = rows[0].get_named("i").unwrap();
1524        assert_eq!(i, 42);
1525
1526        let f: f64 = rows[0].get_named("f").unwrap();
1527        assert!((f - 3.14).abs() < 0.001);
1528
1529        let t: String = rows[0].get_named("t").unwrap();
1530        assert_eq!(t, "hello");
1531
1532        let bl: Vec<u8> = rows[0].get_named("bl").unwrap();
1533        assert_eq!(bl, vec![1, 2, 3]);
1534    }
1535
1536    #[test]
1537    fn test_open_flags() {
1538        // Test creating a database with create flag
1539        let tmp = std::env::temp_dir().join("sqlmodel_test.db");
1540        let _ = std::fs::remove_file(&tmp); // Ensure it doesn't exist
1541
1542        let config = SqliteConfig::file(tmp.to_string_lossy().to_string())
1543            .flags(OpenFlags::create_read_write());
1544        let conn = SqliteConnection::open(&config).unwrap();
1545        conn.execute_raw("CREATE TABLE test (id INTEGER)").unwrap();
1546        drop(conn);
1547
1548        // Open as read-only
1549        let config =
1550            SqliteConfig::file(tmp.to_string_lossy().to_string()).flags(OpenFlags::read_only());
1551        let conn = SqliteConnection::open(&config).unwrap();
1552
1553        // Reading should work
1554        let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1555        assert_eq!(rows.len(), 0);
1556
1557        // Writing should fail
1558        let result = conn.execute_raw("INSERT INTO test VALUES (1)");
1559        assert!(result.is_err());
1560
1561        drop(conn);
1562        let _ = std::fs::remove_file(&tmp);
1563    }
1564
1565    // ==================== Console Integration Tests ====================
1566
1567    #[cfg(feature = "console")]
1568    mod console_tests {
1569        use super::*;
1570
1571        /// Test that ConsoleAware trait is properly implemented.
1572        #[test]
1573        fn test_console_aware_trait_impl() {
1574            let mut conn = SqliteConnection::open_memory().unwrap();
1575
1576            // Initially no console
1577            assert!(!conn.has_console());
1578            assert!(conn.console().is_none());
1579
1580            // Attach console
1581            let console = Arc::new(SqlModelConsole::with_mode(
1582                sqlmodel_console::OutputMode::Plain,
1583            ));
1584            conn.set_console(Some(console.clone()));
1585
1586            // Verify console is attached
1587            assert!(conn.has_console());
1588            assert!(conn.console().is_some());
1589
1590            // Detach console
1591            conn.set_console(None);
1592            assert!(!conn.has_console());
1593        }
1594
1595        /// Test database open feedback is emitted when console is attached.
1596        #[test]
1597        fn test_database_open_feedback() {
1598            let mut conn = SqliteConnection::open_memory().unwrap();
1599
1600            // Attaching console should emit open status
1601            // (output goes to stderr, we just verify no panic)
1602            let console = Arc::new(SqlModelConsole::with_mode(
1603                sqlmodel_console::OutputMode::Plain,
1604            ));
1605            conn.set_console(Some(console));
1606
1607            // No panic means success
1608        }
1609
1610        /// Test PRAGMA query formatting.
1611        #[test]
1612        fn test_pragma_formatting() {
1613            let mut conn = SqliteConnection::open_memory().unwrap();
1614
1615            // Create a table to have something in pragma_table_info
1616            conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1617                .unwrap();
1618
1619            // Attach console for formatted output
1620            let console = Arc::new(SqlModelConsole::with_mode(
1621                sqlmodel_console::OutputMode::Plain,
1622            ));
1623            conn.set_console(Some(console));
1624
1625            // Execute PRAGMA query - should format as table
1626            let rows = conn.query_sync("PRAGMA table_info(test)", &[]).unwrap();
1627
1628            // Verify we got the expected columns
1629            assert!(!rows.is_empty());
1630        }
1631
1632        /// Test transaction state display.
1633        #[test]
1634        fn test_transaction_state() {
1635            let mut conn = SqliteConnection::open_memory().unwrap();
1636            conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY)")
1637                .unwrap();
1638
1639            // Attach console
1640            let console = Arc::new(SqlModelConsole::with_mode(
1641                sqlmodel_console::OutputMode::Plain,
1642            ));
1643            conn.set_console(Some(console));
1644
1645            // Transaction operations should emit state
1646            conn.begin_sync(IsolationLevel::default()).unwrap();
1647            conn.execute_sync("INSERT INTO test (id) VALUES (?)", &[Value::Int(1)])
1648                .unwrap();
1649            conn.commit_sync().unwrap();
1650
1651            // Verify the transaction worked
1652            let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1653            assert_eq!(rows.len(), 1);
1654        }
1655
1656        /// Test WAL checkpoint progress output.
1657        #[test]
1658        fn test_wal_checkpoint_progress() {
1659            let conn = SqliteConnection::open_memory().unwrap();
1660
1661            // emit_checkpoint_progress should not panic
1662            conn.emit_checkpoint_progress(50, 100);
1663            conn.emit_checkpoint_progress(100, 100);
1664            conn.emit_checkpoint_progress(0, 0);
1665        }
1666
1667        /// Test busy timeout feedback output.
1668        #[test]
1669        fn test_busy_timeout_feedback() {
1670            let conn = SqliteConnection::open_memory().unwrap();
1671
1672            // emit_busy_waiting should not panic
1673            conn.emit_busy_waiting(0.5);
1674            conn.emit_busy_waiting(2.1);
1675        }
1676
1677        /// Test that console disabled produces no output (no panic).
1678        #[test]
1679        fn test_console_disabled_no_output() {
1680            let conn = SqliteConnection::open_memory().unwrap();
1681
1682            // Without console, all emit methods should be no-ops
1683            conn.emit_busy_waiting(1.0);
1684            conn.emit_checkpoint_progress(10, 100);
1685
1686            // Query should work without console
1687            conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY)")
1688                .unwrap();
1689            let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1690            assert_eq!(rows.len(), 0);
1691        }
1692
1693        /// Test plain mode output format (parseable by agents).
1694        #[test]
1695        fn test_plain_mode_output() {
1696            let mut conn = SqliteConnection::open_memory().unwrap();
1697
1698            // Attach plain mode console
1699            let console = Arc::new(SqlModelConsole::with_mode(
1700                sqlmodel_console::OutputMode::Plain,
1701            ));
1702            conn.set_console(Some(console.clone()));
1703
1704            // Verify plain mode is active
1705            assert!(conn.console().unwrap().is_plain());
1706
1707            // Execute operations (output should be plain text)
1708            conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1709                .unwrap();
1710            conn.execute_sync(
1711                "INSERT INTO test (name) VALUES (?)",
1712                &[Value::Text("Alice".to_string())],
1713            )
1714            .unwrap();
1715
1716            let rows = conn.query_sync("PRAGMA table_info(test)", &[]).unwrap();
1717            assert!(!rows.is_empty());
1718        }
1719
1720        /// Test rich mode output format.
1721        #[test]
1722        fn test_rich_mode_output() {
1723            let mut conn = SqliteConnection::open_memory().unwrap();
1724
1725            // Attach rich mode console
1726            let console = Arc::new(SqlModelConsole::with_mode(
1727                sqlmodel_console::OutputMode::Rich,
1728            ));
1729            conn.set_console(Some(console.clone()));
1730
1731            // Verify rich mode is active
1732            assert!(conn.console().unwrap().is_rich());
1733
1734            // Execute operations (output should have formatting)
1735            conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY)")
1736                .unwrap();
1737            conn.emit_checkpoint_progress(50, 100);
1738        }
1739    }
1740}