Skip to main content

sqlmodel_sqlite/
connection.rs

1//! SQLite connection implementation.
2//!
3//! This module provides safe wrappers around SQLite's C API and implements
4//! the Connection trait from sqlmodel-core.
5//!
6//! # Console Integration
7//!
8//! When the `console` feature is enabled, the connection can report status
9//! during operations. Use the `ConsoleAware` trait to attach a console.
10//!
11//! ```rust,ignore
12//! use sqlmodel_sqlite::SqliteConnection;
13//! use sqlmodel_console::{SqlModelConsole, ConsoleAware};
14//! use std::sync::Arc;
15//!
16//! let console = Arc::new(SqlModelConsole::new());
17//! let mut conn = SqliteConnection::open_memory().unwrap();
18//! conn.set_console(Some(console));
19//! ```
20
21// Allow casts in FFI code where we need to match C types exactly
22#![allow(clippy::cast_possible_truncation)]
23#![allow(clippy::cast_sign_loss)]
24#![allow(clippy::cast_lossless)]
25#![allow(clippy::result_large_err)] // Error type is defined in sqlmodel-core
26#![allow(clippy::borrow_as_ptr)] // FFI requires raw pointers
27#![allow(clippy::if_not_else)] // Clearer for error handling
28#![allow(clippy::implicit_clone)] // Minor optimization
29#![allow(clippy::map_unwrap_or)] // Clearer for optional formatting
30#![allow(clippy::redundant_closure)] // format_value requires context
31
32use crate::ffi;
33use crate::types;
34use sqlmodel_core::{
35    Connection, Cx, Error, IsolationLevel, Outcome, PreparedStatement, Row, TransactionOps, Value,
36    error::{ConnectionError, ConnectionErrorKind, QueryError, QueryErrorKind},
37    row::ColumnInfo,
38};
39use std::ffi::{CStr, CString, c_int};
40use std::future::Future;
41use std::ptr;
42use std::sync::{Arc, Mutex};
43
44#[cfg(feature = "console")]
45use sqlmodel_console::{ConsoleAware, SqlModelConsole};
46
47/// Configuration for opening SQLite connections.
48#[derive(Debug, Clone)]
49pub struct SqliteConfig {
50    /// Path to the database file, or ":memory:" for in-memory database.
51    pub path: String,
52    /// Open flags (read-only, read-write, create, etc.)
53    pub flags: OpenFlags,
54    /// Busy timeout in milliseconds.
55    pub busy_timeout_ms: u32,
56}
57
58/// Flags controlling how the database is opened.
59#[derive(Debug, Clone, Copy, Default)]
60pub struct OpenFlags {
61    /// Open for reading only.
62    pub read_only: bool,
63    /// Open for reading and writing.
64    pub read_write: bool,
65    /// Create the database if it doesn't exist.
66    pub create: bool,
67    /// Enable URI filename interpretation.
68    pub uri: bool,
69    /// Open in multi-thread mode (connections not shared between threads).
70    pub no_mutex: bool,
71    /// Open in serialized mode (connections can be shared).
72    pub full_mutex: bool,
73    /// Enable shared cache mode.
74    pub shared_cache: bool,
75    /// Disable shared cache mode.
76    pub private_cache: bool,
77}
78
79impl OpenFlags {
80    /// Create flags for read-only access.
81    pub fn read_only() -> Self {
82        Self {
83            read_only: true,
84            ..Default::default()
85        }
86    }
87
88    /// Create flags for read-write access (database must exist).
89    pub fn read_write() -> Self {
90        Self {
91            read_write: true,
92            ..Default::default()
93        }
94    }
95
96    /// Create flags for read-write access with creation if needed.
97    pub fn create_read_write() -> Self {
98        Self {
99            read_write: true,
100            create: true,
101            ..Default::default()
102        }
103    }
104
105    fn to_sqlite_flags(self) -> c_int {
106        let mut flags = 0;
107
108        if self.read_only {
109            flags |= ffi::SQLITE_OPEN_READONLY;
110        }
111        if self.read_write {
112            flags |= ffi::SQLITE_OPEN_READWRITE;
113        }
114        if self.create {
115            flags |= ffi::SQLITE_OPEN_CREATE;
116        }
117        if self.uri {
118            flags |= ffi::SQLITE_OPEN_URI;
119        }
120        if self.no_mutex {
121            flags |= ffi::SQLITE_OPEN_NOMUTEX;
122        }
123        if self.full_mutex {
124            flags |= ffi::SQLITE_OPEN_FULLMUTEX;
125        }
126        if self.shared_cache {
127            flags |= ffi::SQLITE_OPEN_SHAREDCACHE;
128        }
129        if self.private_cache {
130            flags |= ffi::SQLITE_OPEN_PRIVATECACHE;
131        }
132
133        // Default to read-write if no mode specified
134        if flags & (ffi::SQLITE_OPEN_READONLY | ffi::SQLITE_OPEN_READWRITE) == 0 {
135            flags |= ffi::SQLITE_OPEN_READWRITE | ffi::SQLITE_OPEN_CREATE;
136        }
137
138        flags
139    }
140}
141
142impl Default for SqliteConfig {
143    fn default() -> Self {
144        Self {
145            path: ":memory:".to_string(),
146            flags: OpenFlags::create_read_write(),
147            busy_timeout_ms: 5000,
148        }
149    }
150}
151
152impl SqliteConfig {
153    /// Create a new config for a file-based database.
154    pub fn file(path: impl Into<String>) -> Self {
155        Self {
156            path: path.into(),
157            flags: OpenFlags::create_read_write(),
158            busy_timeout_ms: 5000,
159        }
160    }
161
162    /// Create a new config for an in-memory database.
163    pub fn memory() -> Self {
164        Self::default()
165    }
166
167    /// Set open flags.
168    pub fn flags(mut self, flags: OpenFlags) -> Self {
169        self.flags = flags;
170        self
171    }
172
173    /// Set busy timeout.
174    pub fn busy_timeout(mut self, ms: u32) -> Self {
175        self.busy_timeout_ms = ms;
176        self
177    }
178}
179
180/// Inner state of the SQLite connection, protected by a mutex for thread safety.
181struct SqliteInner {
182    db: *mut ffi::sqlite3,
183    in_transaction: bool,
184}
185
186// SAFETY: SQLite handles can be safely sent between threads when using
187// SQLITE_OPEN_FULLMUTEX (serialized mode) or when properly synchronized.
188// We use a Mutex to ensure synchronization.
189unsafe impl Send for SqliteInner {}
190
191/// A connection to a SQLite database.
192///
193/// This is a thread-safe wrapper around a SQLite database handle.
194pub struct SqliteConnection {
195    inner: Mutex<SqliteInner>,
196    path: String,
197    /// Optional console for rich output
198    #[cfg(feature = "console")]
199    console: Option<Arc<SqlModelConsole>>,
200}
201
202// SqliteConnection is Send + Sync because all access goes through the Mutex
203unsafe impl Send for SqliteConnection {}
204unsafe impl Sync for SqliteConnection {}
205
206impl SqliteConnection {
207    /// Open a new SQLite connection with the given configuration.
208    pub fn open(config: &SqliteConfig) -> Result<Self, Error> {
209        let c_path = CString::new(config.path.as_str()).map_err(|_| {
210            Error::Connection(ConnectionError {
211                kind: ConnectionErrorKind::Connect,
212                message: "Invalid path: contains null byte".to_string(),
213                source: None,
214            })
215        })?;
216
217        let mut db: *mut ffi::sqlite3 = ptr::null_mut();
218        let flags = config.flags.to_sqlite_flags();
219
220        // SAFETY: We pass valid pointers and check the return value
221        let rc = unsafe { ffi::sqlite3_open_v2(c_path.as_ptr(), &mut db, flags, ptr::null()) };
222
223        if rc != ffi::SQLITE_OK {
224            let msg = if !db.is_null() {
225                // SAFETY: db is valid, errmsg returns a valid C string
226                unsafe {
227                    let err_ptr = ffi::sqlite3_errmsg(db);
228                    let msg = CStr::from_ptr(err_ptr).to_string_lossy().into_owned();
229                    ffi::sqlite3_close(db);
230                    msg
231                }
232            } else {
233                ffi::error_string(rc).to_string()
234            };
235
236            return Err(Error::Connection(ConnectionError {
237                kind: ConnectionErrorKind::Connect,
238                message: format!("Failed to open database: {}", msg),
239                source: None,
240            }));
241        }
242
243        // Set busy timeout
244        if config.busy_timeout_ms > 0 {
245            // SAFETY: db is valid
246            unsafe {
247                ffi::sqlite3_busy_timeout(db, config.busy_timeout_ms as c_int);
248            }
249        }
250
251        Ok(Self {
252            inner: Mutex::new(SqliteInner {
253                db,
254                in_transaction: false,
255            }),
256            path: config.path.clone(),
257            #[cfg(feature = "console")]
258            console: None,
259        })
260    }
261
262    /// Open an in-memory database.
263    pub fn open_memory() -> Result<Self, Error> {
264        Self::open(&SqliteConfig::memory())
265    }
266
267    /// Open a file-based database.
268    pub fn open_file(path: impl Into<String>) -> Result<Self, Error> {
269        Self::open(&SqliteConfig::file(path))
270    }
271
272    /// Get the database path.
273    pub fn path(&self) -> &str {
274        &self.path
275    }
276
277    /// Execute SQL directly without preparing (for DDL, etc.)
278    pub fn execute_raw(&self, sql: &str) -> Result<(), Error> {
279        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
280        let c_sql = CString::new(sql).map_err(|_| {
281            Error::Query(QueryError {
282                kind: QueryErrorKind::Syntax,
283                sql: Some(sql.to_string()),
284                sqlstate: None,
285                message: "SQL contains null byte".to_string(),
286                detail: None,
287                hint: None,
288                position: None,
289                source: None,
290            })
291        })?;
292
293        let mut errmsg: *mut std::ffi::c_char = ptr::null_mut();
294
295        // SAFETY: All pointers are valid
296        let rc = unsafe {
297            ffi::sqlite3_exec(inner.db, c_sql.as_ptr(), None, ptr::null_mut(), &mut errmsg)
298        };
299
300        if rc != ffi::SQLITE_OK {
301            let msg = if !errmsg.is_null() {
302                // SAFETY: errmsg is valid
303                let msg = unsafe { CStr::from_ptr(errmsg).to_string_lossy().into_owned() };
304                unsafe { ffi::sqlite3_free(errmsg.cast()) };
305                msg
306            } else {
307                ffi::error_string(rc).to_string()
308            };
309
310            return Err(Error::Query(QueryError {
311                kind: error_code_to_kind(rc),
312                sql: Some(sql.to_string()),
313                sqlstate: None,
314                message: msg,
315                detail: None,
316                hint: None,
317                position: None,
318                source: None,
319            }));
320        }
321
322        Ok(())
323    }
324
325    /// Get the last insert rowid.
326    pub fn last_insert_rowid(&self) -> i64 {
327        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
328        // SAFETY: db is valid
329        unsafe { ffi::sqlite3_last_insert_rowid(inner.db) }
330    }
331
332    /// Get the number of rows changed by the last statement.
333    pub fn changes(&self) -> i32 {
334        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
335        // SAFETY: db is valid
336        unsafe { ffi::sqlite3_changes(inner.db) }
337    }
338
339    /// Prepare and execute a query synchronously, returning all rows.
340    ///
341    /// This is a blocking operation suitable for simple use cases.
342    /// For async usage, use the `Connection` trait methods instead.
343    pub fn query_sync(&self, sql: &str, params: &[Value]) -> Result<Vec<Row>, Error> {
344        #[cfg(feature = "console")]
345        let start = std::time::Instant::now();
346
347        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
348        let stmt = prepare_stmt(inner.db, sql)?;
349
350        // Bind parameters
351        for (i, param) in params.iter().enumerate() {
352            // SAFETY: stmt is valid, index is 1-based
353            let rc = unsafe { types::bind_value(stmt, (i + 1) as c_int, param) };
354            if rc != ffi::SQLITE_OK {
355                // SAFETY: stmt is valid
356                unsafe { ffi::sqlite3_finalize(stmt) };
357                return Err(bind_error(inner.db, sql, i + 1));
358            }
359        }
360
361        // Fetch column names
362        // SAFETY: stmt is valid
363        let col_count = unsafe { ffi::sqlite3_column_count(stmt) };
364        let mut col_names = Vec::with_capacity(col_count as usize);
365        for i in 0..col_count {
366            let name =
367                unsafe { types::column_name(stmt, i) }.unwrap_or_else(|| format!("col{}", i));
368            col_names.push(name);
369        }
370        let columns = Arc::new(ColumnInfo::new(col_names.clone()));
371
372        // Fetch rows
373        let mut rows = Vec::new();
374        loop {
375            // SAFETY: stmt is valid
376            let rc = unsafe { ffi::sqlite3_step(stmt) };
377            match rc {
378                ffi::SQLITE_ROW => {
379                    let mut values = Vec::with_capacity(col_count as usize);
380                    for i in 0..col_count {
381                        // SAFETY: stmt is valid, we just got SQLITE_ROW
382                        let value = unsafe { types::read_column(stmt, i) };
383                        values.push(value);
384                    }
385                    rows.push(Row::with_columns(Arc::clone(&columns), values));
386                }
387                ffi::SQLITE_DONE => break,
388                _ => {
389                    // SAFETY: stmt is valid
390                    unsafe { ffi::sqlite3_finalize(stmt) };
391                    return Err(step_error(inner.db, sql));
392                }
393            }
394        }
395
396        // SAFETY: stmt is valid
397        unsafe { ffi::sqlite3_finalize(stmt) };
398
399        // Emit console output for PRAGMA queries and timing
400        #[cfg(feature = "console")]
401        {
402            let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
403            self.emit_query_result(sql, &col_names, &rows, elapsed_ms);
404        }
405
406        Ok(rows)
407    }
408
409    /// Prepare and execute a statement synchronously, returning rows affected.
410    ///
411    /// This is a blocking operation suitable for simple use cases.
412    /// For async usage, use the `Connection` trait methods instead.
413    pub fn execute_sync(&self, sql: &str, params: &[Value]) -> Result<u64, Error> {
414        #[cfg(feature = "console")]
415        let start = std::time::Instant::now();
416
417        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
418        let stmt = prepare_stmt(inner.db, sql)?;
419
420        // Bind parameters
421        for (i, param) in params.iter().enumerate() {
422            // SAFETY: stmt is valid
423            let rc = unsafe { types::bind_value(stmt, (i + 1) as c_int, param) };
424            if rc != ffi::SQLITE_OK {
425                // SAFETY: stmt is valid
426                unsafe { ffi::sqlite3_finalize(stmt) };
427                return Err(bind_error(inner.db, sql, i + 1));
428            }
429        }
430
431        // Execute
432        // SAFETY: stmt is valid
433        let rc = unsafe { ffi::sqlite3_step(stmt) };
434
435        // SAFETY: stmt is valid
436        unsafe { ffi::sqlite3_finalize(stmt) };
437
438        match rc {
439            ffi::SQLITE_DONE | ffi::SQLITE_ROW => {
440                // SAFETY: db is valid
441                let changes = unsafe { ffi::sqlite3_changes(inner.db) };
442
443                #[cfg(feature = "console")]
444                {
445                    let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
446                    self.emit_execute_timing(sql, changes as u64, elapsed_ms);
447                }
448
449                Ok(changes as u64)
450            }
451            _ => Err(step_error(inner.db, sql)),
452        }
453    }
454
455    /// Execute an INSERT and return the last inserted rowid.
456    fn insert_sync(&self, sql: &str, params: &[Value]) -> Result<i64, Error> {
457        self.execute_sync(sql, params)?;
458        Ok(self.last_insert_rowid())
459    }
460
461    /// Begin a transaction.
462    fn begin_sync(&self, isolation: IsolationLevel) -> Result<(), Error> {
463        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
464        if inner.in_transaction {
465            return Err(Error::Query(QueryError {
466                kind: QueryErrorKind::Database,
467                sql: None,
468                sqlstate: None,
469                message: "Already in a transaction".to_string(),
470                detail: None,
471                hint: None,
472                position: None,
473                source: None,
474            }));
475        }
476
477        // SQLite doesn't support isolation levels in the same way as PostgreSQL,
478        // but we can approximate with different transaction types
479        let begin_sql = match isolation {
480            IsolationLevel::Serializable => "BEGIN EXCLUSIVE",
481            IsolationLevel::RepeatableRead | IsolationLevel::ReadCommitted => "BEGIN IMMEDIATE",
482            IsolationLevel::ReadUncommitted => "BEGIN DEFERRED",
483        };
484
485        drop(inner); // Release lock before calling execute_raw
486        self.execute_raw(begin_sql)?;
487
488        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
489        inner.in_transaction = true;
490        self.emit_transaction_state("BEGIN");
491        Ok(())
492    }
493
494    /// Commit the current transaction.
495    fn commit_sync(&self) -> Result<(), Error> {
496        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
497        if !inner.in_transaction {
498            return Err(Error::Query(QueryError {
499                kind: QueryErrorKind::Database,
500                sql: None,
501                sqlstate: None,
502                message: "Not in a transaction".to_string(),
503                detail: None,
504                hint: None,
505                position: None,
506                source: None,
507            }));
508        }
509
510        drop(inner);
511        self.execute_raw("COMMIT")?;
512
513        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
514        inner.in_transaction = false;
515        self.emit_transaction_state("COMMIT");
516        Ok(())
517    }
518
519    /// Rollback the current transaction.
520    fn rollback_sync(&self) -> Result<(), Error> {
521        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
522        if !inner.in_transaction {
523            return Err(Error::Query(QueryError {
524                kind: QueryErrorKind::Database,
525                sql: None,
526                sqlstate: None,
527                message: "Not in a transaction".to_string(),
528                detail: None,
529                hint: None,
530                position: None,
531                source: None,
532            }));
533        }
534
535        drop(inner);
536        self.execute_raw("ROLLBACK")?;
537
538        let mut inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
539        inner.in_transaction = false;
540        self.emit_transaction_state("ROLLBACK");
541        Ok(())
542    }
543}
544
545impl Drop for SqliteConnection {
546    fn drop(&mut self) {
547        if let Ok(inner) = self.inner.lock() {
548            if !inner.db.is_null() {
549                // SAFETY: db is valid
550                unsafe {
551                    ffi::sqlite3_close_v2(inner.db);
552                }
553            }
554        }
555    }
556}
557
558/// A SQLite transaction.
559pub struct SqliteTransaction<'conn> {
560    conn: &'conn SqliteConnection,
561    committed: bool,
562}
563
564impl<'conn> SqliteTransaction<'conn> {
565    fn new(conn: &'conn SqliteConnection) -> Self {
566        Self {
567            conn,
568            committed: false,
569        }
570    }
571}
572
573impl Drop for SqliteTransaction<'_> {
574    fn drop(&mut self) {
575        if !self.committed {
576            // Auto-rollback on drop if not committed
577            let _ = self.conn.rollback_sync();
578        }
579    }
580}
581
582// Implement Connection trait for SqliteConnection
583impl Connection for SqliteConnection {
584    type Tx<'conn>
585        = SqliteTransaction<'conn>
586    where
587        Self: 'conn;
588
589    fn dialect(&self) -> sqlmodel_core::Dialect {
590        sqlmodel_core::Dialect::Sqlite
591    }
592
593    fn query(
594        &self,
595        _cx: &Cx,
596        sql: &str,
597        params: &[Value],
598    ) -> impl Future<Output = Outcome<Vec<Row>, Error>> + Send {
599        let result = self.query_sync(sql, params);
600        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
601    }
602
603    fn query_one(
604        &self,
605        _cx: &Cx,
606        sql: &str,
607        params: &[Value],
608    ) -> impl Future<Output = Outcome<Option<Row>, Error>> + Send {
609        let result = self.query_sync(sql, params).map(|mut rows| rows.pop());
610        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
611    }
612
613    fn execute(
614        &self,
615        _cx: &Cx,
616        sql: &str,
617        params: &[Value],
618    ) -> impl Future<Output = Outcome<u64, Error>> + Send {
619        let result = self.execute_sync(sql, params);
620        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
621    }
622
623    fn insert(
624        &self,
625        _cx: &Cx,
626        sql: &str,
627        params: &[Value],
628    ) -> impl Future<Output = Outcome<i64, Error>> + Send {
629        let result = self.insert_sync(sql, params);
630        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
631    }
632
633    fn batch(
634        &self,
635        _cx: &Cx,
636        statements: &[(String, Vec<Value>)],
637    ) -> impl Future<Output = Outcome<Vec<u64>, Error>> + Send {
638        let mut results = Vec::with_capacity(statements.len());
639        let mut error = None;
640
641        for (sql, params) in statements {
642            match self.execute_sync(sql, params) {
643                Ok(n) => results.push(n),
644                Err(e) => {
645                    error = Some(e);
646                    break;
647                }
648            }
649        }
650
651        async move {
652            match error {
653                Some(e) => Outcome::Err(e),
654                None => Outcome::Ok(results),
655            }
656        }
657    }
658
659    fn begin(&self, cx: &Cx) -> impl Future<Output = Outcome<Self::Tx<'_>, Error>> + Send {
660        self.begin_with(cx, IsolationLevel::default())
661    }
662
663    fn begin_with(
664        &self,
665        _cx: &Cx,
666        isolation: IsolationLevel,
667    ) -> impl Future<Output = Outcome<Self::Tx<'_>, Error>> + Send {
668        let result = self
669            .begin_sync(isolation)
670            .map(|()| SqliteTransaction::new(self));
671        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
672    }
673
674    fn prepare(
675        &self,
676        _cx: &Cx,
677        sql: &str,
678    ) -> impl Future<Output = Outcome<PreparedStatement, Error>> + Send {
679        let inner = self.inner.lock().unwrap_or_else(|e| e.into_inner());
680        let result = prepare_stmt(inner.db, sql).map(|stmt| {
681            // SAFETY: stmt is valid
682            let param_count = unsafe { ffi::sqlite3_bind_parameter_count(stmt) } as usize;
683            let col_count = unsafe { ffi::sqlite3_column_count(stmt) } as c_int;
684
685            let mut columns = Vec::with_capacity(col_count as usize);
686            for i in 0..col_count {
687                if let Some(name) = unsafe { types::column_name(stmt, i) } {
688                    columns.push(name);
689                }
690            }
691
692            // SAFETY: stmt is valid
693            unsafe { ffi::sqlite3_finalize(stmt) };
694
695            // Use address as pseudo-ID since we don't cache statements yet
696            let id = sql.as_ptr() as u64;
697            PreparedStatement::with_columns(id, sql.to_string(), param_count, columns)
698        });
699
700        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
701    }
702
703    fn query_prepared(
704        &self,
705        cx: &Cx,
706        stmt: &PreparedStatement,
707        params: &[Value],
708    ) -> impl Future<Output = Outcome<Vec<Row>, Error>> + Send {
709        // For now, just re-execute the SQL
710        // Future optimization: cache prepared statements
711        self.query(cx, stmt.sql(), params)
712    }
713
714    fn execute_prepared(
715        &self,
716        cx: &Cx,
717        stmt: &PreparedStatement,
718        params: &[Value],
719    ) -> impl Future<Output = Outcome<u64, Error>> + Send {
720        self.execute(cx, stmt.sql(), params)
721    }
722
723    fn ping(&self, _cx: &Cx) -> impl Future<Output = Outcome<(), Error>> + Send {
724        // Simple ping: execute a trivial query
725        let result = self.query_sync("SELECT 1", &[]).map(|_| ());
726        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
727    }
728
729    async fn close(self, _cx: &Cx) -> sqlmodel_core::Result<()> {
730        // Connection is closed on drop
731        Ok(())
732    }
733}
734
735// Implement TransactionOps for SqliteTransaction
736impl TransactionOps for SqliteTransaction<'_> {
737    fn query(
738        &self,
739        _cx: &Cx,
740        sql: &str,
741        params: &[Value],
742    ) -> impl Future<Output = Outcome<Vec<Row>, Error>> + Send {
743        let result = self.conn.query_sync(sql, params);
744        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
745    }
746
747    fn query_one(
748        &self,
749        _cx: &Cx,
750        sql: &str,
751        params: &[Value],
752    ) -> impl Future<Output = Outcome<Option<Row>, Error>> + Send {
753        let result = self.conn.query_sync(sql, params).map(|mut rows| rows.pop());
754        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
755    }
756
757    fn execute(
758        &self,
759        _cx: &Cx,
760        sql: &str,
761        params: &[Value],
762    ) -> impl Future<Output = Outcome<u64, Error>> + Send {
763        let result = self.conn.execute_sync(sql, params);
764        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
765    }
766
767    fn savepoint(&self, _cx: &Cx, name: &str) -> impl Future<Output = Outcome<(), Error>> + Send {
768        // Quote identifier to prevent SQL injection
769        let quoted_name = format!("\"{}\"", name.replace('"', "\"\""));
770        let sql = format!("SAVEPOINT {}", quoted_name);
771        let result = self.conn.execute_raw(&sql);
772        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
773    }
774
775    fn rollback_to(&self, _cx: &Cx, name: &str) -> impl Future<Output = Outcome<(), Error>> + Send {
776        // Quote identifier to prevent SQL injection
777        let quoted_name = format!("\"{}\"", name.replace('"', "\"\""));
778        let sql = format!("ROLLBACK TO {}", quoted_name);
779        let result = self.conn.execute_raw(&sql);
780        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
781    }
782
783    fn release(&self, _cx: &Cx, name: &str) -> impl Future<Output = Outcome<(), Error>> + Send {
784        // Quote identifier to prevent SQL injection
785        let quoted_name = format!("\"{}\"", name.replace('"', "\"\""));
786        let sql = format!("RELEASE {}", quoted_name);
787        let result = self.conn.execute_raw(&sql);
788        async move { result.map_or_else(Outcome::Err, Outcome::Ok) }
789    }
790
791    async fn commit(mut self, _cx: &Cx) -> Outcome<(), Error> {
792        self.committed = true;
793        self.conn
794            .commit_sync()
795            .map_or_else(Outcome::Err, Outcome::Ok)
796    }
797
798    async fn rollback(mut self, _cx: &Cx) -> Outcome<(), Error> {
799        self.committed = true; // Prevent double rollback in drop
800        self.conn
801            .rollback_sync()
802            .map_or_else(Outcome::Err, Outcome::Ok)
803    }
804}
805
806// Helper functions
807
808fn prepare_stmt(db: *mut ffi::sqlite3, sql: &str) -> Result<*mut ffi::sqlite3_stmt, Error> {
809    let c_sql = CString::new(sql).map_err(|_| {
810        Error::Query(QueryError {
811            kind: QueryErrorKind::Syntax,
812            sql: Some(sql.to_string()),
813            sqlstate: None,
814            message: "SQL contains null byte".to_string(),
815            detail: None,
816            hint: None,
817            position: None,
818            source: None,
819        })
820    })?;
821
822    let mut stmt: *mut ffi::sqlite3_stmt = ptr::null_mut();
823
824    // SAFETY: All pointers are valid
825    let rc = unsafe {
826        ffi::sqlite3_prepare_v2(
827            db,
828            c_sql.as_ptr(),
829            c_sql.as_bytes().len() as c_int,
830            &mut stmt,
831            ptr::null_mut(),
832        )
833    };
834
835    if rc != ffi::SQLITE_OK {
836        return Err(prepare_error(db, sql));
837    }
838
839    Ok(stmt)
840}
841
842fn prepare_error(db: *mut ffi::sqlite3, sql: &str) -> Error {
843    // SAFETY: db is valid
844    let msg = unsafe {
845        let ptr = ffi::sqlite3_errmsg(db);
846        CStr::from_ptr(ptr).to_string_lossy().into_owned()
847    };
848    let code = unsafe { ffi::sqlite3_errcode(db) };
849
850    Error::Query(QueryError {
851        kind: error_code_to_kind(code),
852        sql: Some(sql.to_string()),
853        sqlstate: None,
854        message: msg,
855        detail: None,
856        hint: None,
857        position: None,
858        source: None,
859    })
860}
861
862fn bind_error(db: *mut ffi::sqlite3, sql: &str, param_index: usize) -> Error {
863    // SAFETY: db is valid
864    let msg = unsafe {
865        let ptr = ffi::sqlite3_errmsg(db);
866        CStr::from_ptr(ptr).to_string_lossy().into_owned()
867    };
868
869    Error::Query(QueryError {
870        kind: QueryErrorKind::Database,
871        sql: Some(sql.to_string()),
872        sqlstate: None,
873        message: format!("Failed to bind parameter {}: {}", param_index, msg),
874        detail: None,
875        hint: None,
876        position: None,
877        source: None,
878    })
879}
880
881fn step_error(db: *mut ffi::sqlite3, sql: &str) -> Error {
882    // SAFETY: db is valid
883    let msg = unsafe {
884        let ptr = ffi::sqlite3_errmsg(db);
885        CStr::from_ptr(ptr).to_string_lossy().into_owned()
886    };
887    let code = unsafe { ffi::sqlite3_errcode(db) };
888
889    Error::Query(QueryError {
890        kind: error_code_to_kind(code),
891        sql: Some(sql.to_string()),
892        sqlstate: None,
893        message: msg,
894        detail: None,
895        hint: None,
896        position: None,
897        source: None,
898    })
899}
900
901fn error_code_to_kind(code: c_int) -> QueryErrorKind {
902    match code {
903        ffi::SQLITE_CONSTRAINT => QueryErrorKind::Constraint,
904        ffi::SQLITE_BUSY | ffi::SQLITE_LOCKED => QueryErrorKind::Deadlock,
905        ffi::SQLITE_PERM | ffi::SQLITE_AUTH => QueryErrorKind::Permission,
906        ffi::SQLITE_NOTFOUND => QueryErrorKind::NotFound,
907        ffi::SQLITE_TOOBIG => QueryErrorKind::DataTruncation,
908        ffi::SQLITE_INTERRUPT => QueryErrorKind::Cancelled,
909        _ => QueryErrorKind::Database,
910    }
911}
912
913/// Format a Value for display in console output.
914#[allow(dead_code)]
915fn format_value(value: &Value) -> String {
916    match value {
917        Value::Null => "NULL".to_string(),
918        Value::Bool(b) => if *b { "true" } else { "false" }.to_string(),
919        Value::TinyInt(n) => n.to_string(),
920        Value::SmallInt(n) => n.to_string(),
921        Value::Int(n) => n.to_string(),
922        Value::BigInt(n) => n.to_string(),
923        Value::Float(n) => format!("{:.6}", n),
924        Value::Double(n) => format!("{:.6}", n),
925        Value::Text(s) => s.clone(),
926        Value::Bytes(b) => format!("[BLOB: {} bytes]", b.len()),
927        Value::Date(d) => d.to_string(),
928        Value::Time(t) => t.to_string(),
929        Value::Timestamp(ts) => ts.to_string(),
930        Value::TimestampTz(ts) => ts.to_string(),
931        Value::Json(j) => j.to_string(),
932        Value::Uuid(u) => {
933            // Format UUID as hex string: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
934            format!(
935                "{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
936                u[0],
937                u[1],
938                u[2],
939                u[3],
940                u[4],
941                u[5],
942                u[6],
943                u[7],
944                u[8],
945                u[9],
946                u[10],
947                u[11],
948                u[12],
949                u[13],
950                u[14],
951                u[15]
952            )
953        }
954        Value::Decimal(d) => d.to_string(),
955        Value::Array(arr) => format!("[{} items]", arr.len()),
956        Value::Default => "DEFAULT".to_string(),
957    }
958}
959
960// ==================== Console Support ====================
961
962#[cfg(feature = "console")]
963impl ConsoleAware for SqliteConnection {
964    fn set_console(&mut self, console: Option<Arc<SqlModelConsole>>) {
965        self.console = console;
966        // Emit database status when console is attached
967        self.emit_open_status();
968    }
969
970    fn console(&self) -> Option<&Arc<SqlModelConsole>> {
971        self.console.as_ref()
972    }
973
974    fn has_console(&self) -> bool {
975        self.console.is_some()
976    }
977}
978
979impl SqliteConnection {
980    /// Emit database open status to console if available.
981    #[cfg(feature = "console")]
982    fn emit_open_status(&self) {
983        if let Some(console) = &self.console {
984            // Get database info
985            let mode = if self.path == ":memory:" {
986                "in-memory"
987            } else {
988                "file"
989            };
990
991            // Query journal mode if we can
992            let journal_mode = self
993                .query_sync("PRAGMA journal_mode", &[])
994                .ok()
995                .and_then(|rows| rows.first().and_then(|r| r.get_as::<String>(0).ok()));
996
997            let page_size = self
998                .query_sync("PRAGMA page_size", &[])
999                .ok()
1000                .and_then(|rows| rows.first().and_then(|r| r.get_as::<i64>(0).ok()));
1001
1002            if console.mode().is_plain() {
1003                // Plain text output for agents
1004                let journal = journal_mode.as_deref().unwrap_or("unknown");
1005                console.status(&format!(
1006                    "Opened SQLite database: {} ({} mode, journal: {})",
1007                    self.path, mode, journal
1008                ));
1009            } else {
1010                // Rich output
1011                console.status(&format!("SQLite database: {}", self.path));
1012                console.status(&format!("  Mode: {}", mode));
1013                if let Some(journal) = journal_mode {
1014                    console.status(&format!("  Journal: {}", journal.to_uppercase()));
1015                }
1016                if let Some(size) = page_size {
1017                    console.status(&format!("  Page size: {} bytes", size));
1018                }
1019            }
1020        }
1021    }
1022
1023    /// Emit transaction state to console if available.
1024    #[cfg(feature = "console")]
1025    fn emit_transaction_state(&self, state: &str) {
1026        if let Some(console) = &self.console {
1027            if console.mode().is_plain() {
1028                console.status(&format!("Transaction: {}", state));
1029            } else {
1030                console.status(&format!("[{}] Transaction {}", state, state.to_lowercase()));
1031            }
1032        }
1033    }
1034
1035    /// Emit query timing to console if available.
1036    #[cfg(feature = "console")]
1037    fn emit_query_timing(&self, elapsed_ms: f64, rows: usize) {
1038        if let Some(console) = &self.console {
1039            console.status(&format!("Query: {:.1}ms, {} rows", elapsed_ms, rows));
1040        }
1041    }
1042
1043    /// Emit query results with PRAGMA-aware formatting.
1044    #[cfg(feature = "console")]
1045    fn emit_query_result(&self, sql: &str, col_names: &[String], rows: &[Row], elapsed_ms: f64) {
1046        if let Some(console) = &self.console {
1047            // Check if this is a PRAGMA query for special formatting
1048            let sql_upper = sql.trim().to_uppercase();
1049            let is_pragma = sql_upper.starts_with("PRAGMA");
1050
1051            if is_pragma && !rows.is_empty() {
1052                // Format PRAGMA results as a table
1053                if console.mode().is_plain() {
1054                    // Plain text format for agents
1055                    console.status(&format!("{}:", sql.trim()));
1056                    // Header
1057                    console.status(&format!("  {}", col_names.join("|")));
1058                    // Rows
1059                    for row in rows.iter().take(20) {
1060                        let values: Vec<String> = (0..col_names.len())
1061                            .map(|i| {
1062                                row.get(i)
1063                                    .map(|v| format_value(v))
1064                                    .unwrap_or_else(|| "NULL".to_string())
1065                            })
1066                            .collect();
1067                        console.status(&format!("  {}", values.join("|")));
1068                    }
1069                    if rows.len() > 20 {
1070                        console.status(&format!("  ... and {} more rows", rows.len() - 20));
1071                    }
1072                    console.status(&format!("  ({:.1}ms)", elapsed_ms));
1073                } else {
1074                    // Rich format with table rendering
1075                    let mut table_output = String::new();
1076                    table_output.push_str(&format!("PRAGMA Query Results ({:.1}ms)\n", elapsed_ms));
1077
1078                    // Calculate column widths
1079                    let mut widths: Vec<usize> = col_names.iter().map(|c| c.len()).collect();
1080                    for row in rows.iter().take(20) {
1081                        for (i, w) in widths.iter_mut().enumerate() {
1082                            let val_len = row.get(i).map(|v| format_value(v).len()).unwrap_or(4); // "NULL".len()
1083                            if val_len > *w {
1084                                *w = val_len;
1085                            }
1086                        }
1087                    }
1088
1089                    // Build header separator
1090                    let sep: String = widths
1091                        .iter()
1092                        .map(|w| "-".repeat(*w + 2))
1093                        .collect::<Vec<_>>()
1094                        .join("+");
1095                    table_output.push_str(&format!("+{}+\n", sep));
1096
1097                    // Header row
1098                    let header: String = col_names
1099                        .iter()
1100                        .enumerate()
1101                        .map(|(i, name)| format!(" {:width$} ", name, width = widths[i]))
1102                        .collect::<Vec<_>>()
1103                        .join("|");
1104                    table_output.push_str(&format!("|{}|\n", header));
1105                    table_output.push_str(&format!("+{}+\n", sep));
1106
1107                    // Data rows
1108                    for row in rows.iter().take(20) {
1109                        let data: String = (0..col_names.len())
1110                            .map(|i| {
1111                                let val = row
1112                                    .get(i)
1113                                    .map(|v| format_value(v))
1114                                    .unwrap_or_else(|| "NULL".to_string());
1115                                format!(" {:width$} ", val, width = widths[i])
1116                            })
1117                            .collect::<Vec<_>>()
1118                            .join("|");
1119                        table_output.push_str(&format!("|{}|\n", data));
1120                    }
1121                    table_output.push_str(&format!("+{}+", sep));
1122
1123                    if rows.len() > 20 {
1124                        table_output.push_str(&format!("\n... and {} more rows", rows.len() - 20));
1125                    }
1126
1127                    console.status(&table_output);
1128                }
1129            } else {
1130                // Regular query timing
1131                self.emit_query_timing(elapsed_ms, rows.len());
1132            }
1133        }
1134    }
1135
1136    /// Emit execute operation timing to console.
1137    #[cfg(feature = "console")]
1138    fn emit_execute_timing(&self, sql: &str, rows_affected: u64, elapsed_ms: f64) {
1139        if let Some(console) = &self.console {
1140            let sql_upper = sql.trim().to_uppercase();
1141
1142            // Provide contextual message based on operation type
1143            let op_type = if sql_upper.starts_with("INSERT") {
1144                "Insert"
1145            } else if sql_upper.starts_with("UPDATE") {
1146                "Update"
1147            } else if sql_upper.starts_with("DELETE") {
1148                "Delete"
1149            } else if sql_upper.starts_with("CREATE") {
1150                "Create"
1151            } else if sql_upper.starts_with("DROP") {
1152                "Drop"
1153            } else if sql_upper.starts_with("ALTER") {
1154                "Alter"
1155            } else {
1156                "Execute"
1157            };
1158
1159            if console.mode().is_plain() {
1160                console.status(&format!(
1161                    "{}: {} rows affected ({:.1}ms)",
1162                    op_type, rows_affected, elapsed_ms
1163                ));
1164            } else {
1165                console.status(&format!(
1166                    "[{}] {} rows affected ({:.1}ms)",
1167                    op_type.to_uppercase(),
1168                    rows_affected,
1169                    elapsed_ms
1170                ));
1171            }
1172        }
1173    }
1174
1175    /// Emit busy waiting status to console.
1176    #[cfg(feature = "console")]
1177    pub fn emit_busy_waiting(&self, elapsed_secs: f64) {
1178        if let Some(console) = &self.console {
1179            if console.mode().is_plain() {
1180                console.status(&format!(
1181                    "Waiting for database lock... ({:.1}s)",
1182                    elapsed_secs
1183                ));
1184            } else {
1185                console.status(&format!(
1186                    "[..] Waiting for database lock... ({:.1}s)",
1187                    elapsed_secs
1188                ));
1189            }
1190        }
1191    }
1192
1193    /// Emit WAL checkpoint progress to console.
1194    #[cfg(feature = "console")]
1195    pub fn emit_checkpoint_progress(&self, pages_done: u32, pages_total: u32) {
1196        if let Some(console) = &self.console {
1197            let pct = if pages_total > 0 {
1198                (pages_done as f64 / pages_total as f64) * 100.0
1199            } else {
1200                100.0
1201            };
1202
1203            if console.mode().is_plain() {
1204                console.status(&format!(
1205                    "WAL checkpoint: {:.0}% ({}/{} pages)",
1206                    pct, pages_done, pages_total
1207                ));
1208            } else {
1209                // ASCII progress bar for rich mode
1210                let bar_width: usize = 20;
1211                let filled = ((pct / 100.0) * bar_width as f64).round() as usize;
1212                let empty = bar_width.saturating_sub(filled);
1213                let bar = format!("[{}{}]", "=".repeat(filled), " ".repeat(empty));
1214                console.status(&format!(
1215                    "WAL checkpoint: {} {:.0}% ({}/{} pages)",
1216                    bar, pct, pages_done, pages_total
1217                ));
1218            }
1219        }
1220    }
1221
1222    /// No-op when console feature is disabled.
1223    #[cfg(not(feature = "console"))]
1224    #[allow(dead_code)]
1225    fn emit_open_status(&self) {}
1226
1227    /// No-op when console feature is disabled.
1228    #[cfg(not(feature = "console"))]
1229    fn emit_transaction_state(&self, _state: &str) {}
1230
1231    /// No-op when console feature is disabled.
1232    #[cfg(not(feature = "console"))]
1233    #[allow(dead_code)]
1234    fn emit_query_timing(&self, _elapsed_ms: f64, _rows: usize) {}
1235
1236    /// No-op when console feature is disabled.
1237    #[cfg(not(feature = "console"))]
1238    #[allow(dead_code)]
1239    fn emit_query_result(
1240        &self,
1241        _sql: &str,
1242        _col_names: &[String],
1243        _rows: &[Row],
1244        _elapsed_ms: f64,
1245    ) {
1246    }
1247
1248    /// No-op when console feature is disabled.
1249    #[cfg(not(feature = "console"))]
1250    #[allow(dead_code)]
1251    fn emit_execute_timing(&self, _sql: &str, _rows_affected: u64, _elapsed_ms: f64) {}
1252
1253    /// No-op when console feature is disabled.
1254    #[cfg(not(feature = "console"))]
1255    pub fn emit_busy_waiting(&self, _elapsed_secs: f64) {}
1256
1257    /// No-op when console feature is disabled.
1258    #[cfg(not(feature = "console"))]
1259    pub fn emit_checkpoint_progress(&self, _pages_done: u32, _pages_total: u32) {}
1260}
1261
1262#[cfg(test)]
1263mod tests {
1264    use super::*;
1265
1266    #[test]
1267    fn test_open_memory() {
1268        let conn = SqliteConnection::open_memory().unwrap();
1269        assert_eq!(conn.path(), ":memory:");
1270    }
1271
1272    #[test]
1273    fn test_execute_raw() {
1274        let conn = SqliteConnection::open_memory().unwrap();
1275        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1276            .unwrap();
1277        conn.execute_raw("INSERT INTO test (name) VALUES ('Alice')")
1278            .unwrap();
1279        assert_eq!(conn.changes(), 1);
1280        assert_eq!(conn.last_insert_rowid(), 1);
1281    }
1282
1283    #[test]
1284    fn test_query_sync() {
1285        let conn = SqliteConnection::open_memory().unwrap();
1286        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1287            .unwrap();
1288        conn.execute_raw("INSERT INTO test (name) VALUES ('Alice'), ('Bob')")
1289            .unwrap();
1290
1291        let rows = conn
1292            .query_sync("SELECT * FROM test ORDER BY id", &[])
1293            .unwrap();
1294        assert_eq!(rows.len(), 2);
1295
1296        assert_eq!(rows[0].get_named::<i32>("id").unwrap(), 1);
1297        assert_eq!(rows[0].get_named::<String>("name").unwrap(), "Alice");
1298        assert_eq!(rows[1].get_named::<i32>("id").unwrap(), 2);
1299        assert_eq!(rows[1].get_named::<String>("name").unwrap(), "Bob");
1300    }
1301
1302    #[test]
1303    fn test_parameterized_query() {
1304        let conn = SqliteConnection::open_memory().unwrap();
1305        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT, age INTEGER)")
1306            .unwrap();
1307
1308        conn.execute_sync(
1309            "INSERT INTO test (name, age) VALUES (?, ?)",
1310            &[Value::Text("Alice".to_string()), Value::Int(30)],
1311        )
1312        .unwrap();
1313
1314        let rows = conn
1315            .query_sync(
1316                "SELECT * FROM test WHERE name = ?",
1317                &[Value::Text("Alice".to_string())],
1318            )
1319            .unwrap();
1320
1321        assert_eq!(rows.len(), 1);
1322        assert_eq!(rows[0].get_named::<String>("name").unwrap(), "Alice");
1323        assert_eq!(rows[0].get_named::<i32>("age").unwrap(), 30);
1324    }
1325
1326    #[test]
1327    fn test_null_handling() {
1328        let conn = SqliteConnection::open_memory().unwrap();
1329        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1330            .unwrap();
1331
1332        conn.execute_sync("INSERT INTO test (name) VALUES (?)", &[Value::Null])
1333            .unwrap();
1334
1335        let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1336        assert_eq!(rows.len(), 1);
1337        assert_eq!(rows[0].get_named::<Option<String>>("name").unwrap(), None);
1338    }
1339
1340    #[test]
1341    fn test_transaction() {
1342        let conn = SqliteConnection::open_memory().unwrap();
1343        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1344            .unwrap();
1345
1346        // Start transaction, insert, rollback
1347        conn.begin_sync(IsolationLevel::default()).unwrap();
1348        conn.execute_sync(
1349            "INSERT INTO test (name) VALUES (?)",
1350            &[Value::Text("Alice".to_string())],
1351        )
1352        .unwrap();
1353        conn.rollback_sync().unwrap();
1354
1355        // Verify rollback worked
1356        let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1357        assert_eq!(rows.len(), 0);
1358
1359        // Start transaction, insert, commit
1360        conn.begin_sync(IsolationLevel::default()).unwrap();
1361        conn.execute_sync(
1362            "INSERT INTO test (name) VALUES (?)",
1363            &[Value::Text("Bob".to_string())],
1364        )
1365        .unwrap();
1366        conn.commit_sync().unwrap();
1367
1368        // Verify commit worked
1369        let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1370        assert_eq!(rows.len(), 1);
1371        assert_eq!(rows[0].get_named::<String>("name").unwrap(), "Bob");
1372    }
1373
1374    #[test]
1375    fn test_insert_rowid() {
1376        let conn = SqliteConnection::open_memory().unwrap();
1377        conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1378            .unwrap();
1379
1380        let rowid = conn
1381            .insert_sync(
1382                "INSERT INTO test (name) VALUES (?)",
1383                &[Value::Text("Alice".to_string())],
1384            )
1385            .unwrap();
1386        assert_eq!(rowid, 1);
1387
1388        let rowid = conn
1389            .insert_sync(
1390                "INSERT INTO test (name) VALUES (?)",
1391                &[Value::Text("Bob".to_string())],
1392            )
1393            .unwrap();
1394        assert_eq!(rowid, 2);
1395    }
1396
1397    #[test]
1398    #[allow(clippy::approx_constant)]
1399    fn test_type_conversions() {
1400        let conn = SqliteConnection::open_memory().unwrap();
1401        conn.execute_raw(
1402            "CREATE TABLE types (
1403                b BOOLEAN,
1404                i INTEGER,
1405                f REAL,
1406                t TEXT,
1407                bl BLOB
1408            )",
1409        )
1410        .unwrap();
1411
1412        conn.execute_sync(
1413            "INSERT INTO types VALUES (?, ?, ?, ?, ?)",
1414            &[
1415                Value::Bool(true),
1416                Value::BigInt(42),
1417                Value::Double(3.14),
1418                Value::Text("hello".to_string()),
1419                Value::Bytes(vec![1, 2, 3]),
1420            ],
1421        )
1422        .unwrap();
1423
1424        let rows = conn.query_sync("SELECT * FROM types", &[]).unwrap();
1425        assert_eq!(rows.len(), 1);
1426
1427        // SQLite stores booleans as integers
1428        let b: i32 = rows[0].get_named("b").unwrap();
1429        assert_eq!(b, 1);
1430
1431        let i: i32 = rows[0].get_named("i").unwrap();
1432        assert_eq!(i, 42);
1433
1434        let f: f64 = rows[0].get_named("f").unwrap();
1435        assert!((f - 3.14).abs() < 0.001);
1436
1437        let t: String = rows[0].get_named("t").unwrap();
1438        assert_eq!(t, "hello");
1439
1440        let bl: Vec<u8> = rows[0].get_named("bl").unwrap();
1441        assert_eq!(bl, vec![1, 2, 3]);
1442    }
1443
1444    #[test]
1445    fn test_open_flags() {
1446        // Test creating a database with create flag
1447        let tmp = std::env::temp_dir().join("sqlmodel_test.db");
1448        let _ = std::fs::remove_file(&tmp); // Ensure it doesn't exist
1449
1450        let config = SqliteConfig::file(tmp.to_string_lossy().to_string())
1451            .flags(OpenFlags::create_read_write());
1452        let conn = SqliteConnection::open(&config).unwrap();
1453        conn.execute_raw("CREATE TABLE test (id INTEGER)").unwrap();
1454        drop(conn);
1455
1456        // Open as read-only
1457        let config =
1458            SqliteConfig::file(tmp.to_string_lossy().to_string()).flags(OpenFlags::read_only());
1459        let conn = SqliteConnection::open(&config).unwrap();
1460
1461        // Reading should work
1462        let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1463        assert_eq!(rows.len(), 0);
1464
1465        // Writing should fail
1466        let result = conn.execute_raw("INSERT INTO test VALUES (1)");
1467        assert!(result.is_err());
1468
1469        drop(conn);
1470        let _ = std::fs::remove_file(&tmp);
1471    }
1472
1473    // ==================== Console Integration Tests ====================
1474
1475    #[cfg(feature = "console")]
1476    mod console_tests {
1477        use super::*;
1478
1479        /// Test that ConsoleAware trait is properly implemented.
1480        #[test]
1481        fn test_console_aware_trait_impl() {
1482            let mut conn = SqliteConnection::open_memory().unwrap();
1483
1484            // Initially no console
1485            assert!(!conn.has_console());
1486            assert!(conn.console().is_none());
1487
1488            // Attach console
1489            let console = Arc::new(SqlModelConsole::with_mode(
1490                sqlmodel_console::OutputMode::Plain,
1491            ));
1492            conn.set_console(Some(console.clone()));
1493
1494            // Verify console is attached
1495            assert!(conn.has_console());
1496            assert!(conn.console().is_some());
1497
1498            // Detach console
1499            conn.set_console(None);
1500            assert!(!conn.has_console());
1501        }
1502
1503        /// Test database open feedback is emitted when console is attached.
1504        #[test]
1505        fn test_database_open_feedback() {
1506            let mut conn = SqliteConnection::open_memory().unwrap();
1507
1508            // Attaching console should emit open status
1509            // (output goes to stderr, we just verify no panic)
1510            let console = Arc::new(SqlModelConsole::with_mode(
1511                sqlmodel_console::OutputMode::Plain,
1512            ));
1513            conn.set_console(Some(console));
1514
1515            // No panic means success
1516        }
1517
1518        /// Test PRAGMA query formatting.
1519        #[test]
1520        fn test_pragma_formatting() {
1521            let mut conn = SqliteConnection::open_memory().unwrap();
1522
1523            // Create a table to have something in pragma_table_info
1524            conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1525                .unwrap();
1526
1527            // Attach console for formatted output
1528            let console = Arc::new(SqlModelConsole::with_mode(
1529                sqlmodel_console::OutputMode::Plain,
1530            ));
1531            conn.set_console(Some(console));
1532
1533            // Execute PRAGMA query - should format as table
1534            let rows = conn.query_sync("PRAGMA table_info(test)", &[]).unwrap();
1535
1536            // Verify we got the expected columns
1537            assert!(!rows.is_empty());
1538        }
1539
1540        /// Test transaction state display.
1541        #[test]
1542        fn test_transaction_state() {
1543            let mut conn = SqliteConnection::open_memory().unwrap();
1544            conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY)")
1545                .unwrap();
1546
1547            // Attach console
1548            let console = Arc::new(SqlModelConsole::with_mode(
1549                sqlmodel_console::OutputMode::Plain,
1550            ));
1551            conn.set_console(Some(console));
1552
1553            // Transaction operations should emit state
1554            conn.begin_sync(IsolationLevel::default()).unwrap();
1555            conn.execute_sync("INSERT INTO test (id) VALUES (?)", &[Value::Int(1)])
1556                .unwrap();
1557            conn.commit_sync().unwrap();
1558
1559            // Verify the transaction worked
1560            let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1561            assert_eq!(rows.len(), 1);
1562        }
1563
1564        /// Test WAL checkpoint progress output.
1565        #[test]
1566        fn test_wal_checkpoint_progress() {
1567            let conn = SqliteConnection::open_memory().unwrap();
1568
1569            // emit_checkpoint_progress should not panic
1570            conn.emit_checkpoint_progress(50, 100);
1571            conn.emit_checkpoint_progress(100, 100);
1572            conn.emit_checkpoint_progress(0, 0);
1573        }
1574
1575        /// Test busy timeout feedback output.
1576        #[test]
1577        fn test_busy_timeout_feedback() {
1578            let conn = SqliteConnection::open_memory().unwrap();
1579
1580            // emit_busy_waiting should not panic
1581            conn.emit_busy_waiting(0.5);
1582            conn.emit_busy_waiting(2.1);
1583        }
1584
1585        /// Test that console disabled produces no output (no panic).
1586        #[test]
1587        fn test_console_disabled_no_output() {
1588            let conn = SqliteConnection::open_memory().unwrap();
1589
1590            // Without console, all emit methods should be no-ops
1591            conn.emit_busy_waiting(1.0);
1592            conn.emit_checkpoint_progress(10, 100);
1593
1594            // Query should work without console
1595            conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY)")
1596                .unwrap();
1597            let rows = conn.query_sync("SELECT * FROM test", &[]).unwrap();
1598            assert_eq!(rows.len(), 0);
1599        }
1600
1601        /// Test plain mode output format (parseable by agents).
1602        #[test]
1603        fn test_plain_mode_output() {
1604            let mut conn = SqliteConnection::open_memory().unwrap();
1605
1606            // Attach plain mode console
1607            let console = Arc::new(SqlModelConsole::with_mode(
1608                sqlmodel_console::OutputMode::Plain,
1609            ));
1610            conn.set_console(Some(console.clone()));
1611
1612            // Verify plain mode is active
1613            assert!(conn.console().unwrap().is_plain());
1614
1615            // Execute operations (output should be plain text)
1616            conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY, name TEXT)")
1617                .unwrap();
1618            conn.execute_sync(
1619                "INSERT INTO test (name) VALUES (?)",
1620                &[Value::Text("Alice".to_string())],
1621            )
1622            .unwrap();
1623
1624            let rows = conn.query_sync("PRAGMA table_info(test)", &[]).unwrap();
1625            assert!(!rows.is_empty());
1626        }
1627
1628        /// Test rich mode output format.
1629        #[test]
1630        fn test_rich_mode_output() {
1631            let mut conn = SqliteConnection::open_memory().unwrap();
1632
1633            // Attach rich mode console
1634            let console = Arc::new(SqlModelConsole::with_mode(
1635                sqlmodel_console::OutputMode::Rich,
1636            ));
1637            conn.set_console(Some(console.clone()));
1638
1639            // Verify rich mode is active
1640            assert!(conn.console().unwrap().is_rich());
1641
1642            // Execute operations (output should have formatting)
1643            conn.execute_raw("CREATE TABLE test (id INTEGER PRIMARY KEY)")
1644                .unwrap();
1645            conn.emit_checkpoint_progress(50, 100);
1646        }
1647    }
1648}