Skip to main content

ferrule_sql/backends/
sqlite.rs

1use crate::connection::{
2    AsyncConnection, BulkInsert, ConnectOptions, ExecutionSummary, ForeignKey, QueryResult,
3    SchemaInfo, StatementResult,
4};
5use crate::error::SqlError;
6use crate::stream::{BoxRowStream, DEFAULT_CURSOR_CAPACITY, channel_stream};
7use crate::url::DatabaseUrl;
8use crate::value::Row;
9use crate::value::{ColumnInfo, TypeHint, Value};
10use async_trait::async_trait;
11use rusqlite::Connection as SqliteConn;
12use rusqlite::types::Value as SqliteValue;
13
14pub struct SqliteConnection {
15    conn: std::sync::Arc<std::sync::Mutex<SqliteConn>>,
16}
17
18#[async_trait]
19impl AsyncConnection for SqliteConnection {
20    async fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
21        let sql = sql.to_string();
22        let conn = self.conn.clone();
23        tokio::task::spawn_blocking(move || {
24            let guard = conn.lock().unwrap();
25            let affected = guard
26                .execute(&sql, [])
27                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
28            Ok(ExecutionSummary {
29                rows_affected: Some(affected as u64),
30                command_tag: None,
31            })
32        })
33        .await
34        .map_err(|e| SqlError::QueryFailed(e.to_string()))?
35    }
36
37    async fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
38        let sql = sql.to_string();
39        let conn = self.conn.clone();
40        tokio::task::spawn_blocking(move || {
41            let guard = conn.lock().unwrap();
42            let mut stmt = guard
43                .prepare(&sql)
44                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
45            let col_names = stmt.column_names();
46            if col_names.is_empty() {
47                return Err(SqlError::QueryFailed(
48                    "Statement does not return rows".to_string(),
49                ));
50            }
51            let columns: Vec<ColumnInfo> = col_names
52                .iter()
53                .map(|name| ColumnInfo {
54                    name: name.to_string(),
55                    type_hint: TypeHint::Other,
56                    nullable: true,
57                })
58                .collect();
59
60            let mut rows = Vec::new();
61            let mut rows_iter = stmt
62                .query([])
63                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
64            while let Some(row) = rows_iter
65                .next()
66                .map_err(|e| SqlError::QueryFailed(e.to_string()))?
67            {
68                let mut values = Vec::with_capacity(columns.len());
69                for i in 0..columns.len() {
70                    let val: SqliteValue = row
71                        .get(i)
72                        .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
73                    values.push(sqlite_to_value(val));
74                }
75                rows.push(values);
76            }
77
78            Ok(QueryResult { columns, rows })
79        })
80        .await
81        .map_err(|e| SqlError::QueryFailed(e.to_string()))?
82    }
83
84    /// Stream rows from a native `rusqlite` cursor at bounded memory.
85    ///
86    /// `rusqlite` is synchronous, so the row-stepping loop runs on a
87    /// `spawn_blocking` thread that pushes each decoded row through a
88    /// **bounded** `tokio::sync::mpsc` channel (`DEFAULT_CURSOR_CAPACITY`
89    /// rows). When the channel fills, the producer blocks on
90    /// `blocking_send`, so at most that many rows are buffered ahead of
91    /// the consumer — peak memory is `O(cap)`, never `O(total rows)`.
92    /// Column metadata is delivered up front through a oneshot so the
93    /// caller has it before pulling any row.
94    async fn query_stream(
95        &mut self,
96        sql: &str,
97    ) -> Result<(Vec<ColumnInfo>, BoxRowStream<'_>), SqlError> {
98        let sql = sql.to_string();
99        let conn = self.conn.clone();
100        let (col_tx, col_rx) = tokio::sync::oneshot::channel::<Result<Vec<ColumnInfo>, SqlError>>();
101        let (row_tx, row_rx) =
102            tokio::sync::mpsc::channel::<Result<Row, SqlError>>(DEFAULT_CURSOR_CAPACITY);
103
104        tokio::task::spawn_blocking(move || {
105            let guard = match conn.lock() {
106                Ok(g) => g,
107                Err(_) => {
108                    let _ = col_tx.send(Err(SqlError::QueryFailed(
109                        "SQLite connection mutex poisoned".to_string(),
110                    )));
111                    return;
112                }
113            };
114            let mut stmt = match guard.prepare(&sql) {
115                Ok(s) => s,
116                Err(e) => {
117                    let _ = col_tx.send(Err(SqlError::QueryFailed(e.to_string())));
118                    return;
119                }
120            };
121            let col_names = stmt.column_names();
122            if col_names.is_empty() {
123                let _ = col_tx.send(Err(SqlError::QueryFailed(
124                    "Statement does not return rows".to_string(),
125                )));
126                return;
127            }
128            let columns: Vec<ColumnInfo> = col_names
129                .iter()
130                .map(|name| ColumnInfo {
131                    name: name.to_string(),
132                    type_hint: TypeHint::Other,
133                    nullable: true,
134                })
135                .collect();
136            let ncols = columns.len();
137            // Send columns first; if the consumer already hung up, stop.
138            if col_tx.send(Ok(columns)).is_err() {
139                return;
140            }
141
142            let mut rows_iter = match stmt.query([]) {
143                Ok(r) => r,
144                Err(e) => {
145                    let _ = row_tx.blocking_send(Err(SqlError::QueryFailed(e.to_string())));
146                    return;
147                }
148            };
149            loop {
150                match rows_iter.next() {
151                    Ok(Some(row)) => {
152                        let mut values = Vec::with_capacity(ncols);
153                        let mut decode_err = None;
154                        for i in 0..ncols {
155                            match row.get::<_, SqliteValue>(i) {
156                                Ok(val) => values.push(sqlite_to_value(val)),
157                                Err(e) => {
158                                    decode_err = Some(SqlError::QueryFailed(e.to_string()));
159                                    break;
160                                }
161                            }
162                        }
163                        let msg = decode_err.map_or(Ok(values), Err);
164                        // blocking_send applies back-pressure: it parks
165                        // this thread until the consumer drains a slot.
166                        if row_tx.blocking_send(msg).is_err() {
167                            // Consumer dropped the cursor; stop stepping.
168                            return;
169                        }
170                    }
171                    Ok(None) => return,
172                    Err(e) => {
173                        let _ = row_tx.blocking_send(Err(SqlError::QueryFailed(e.to_string())));
174                        return;
175                    }
176                }
177            }
178        });
179
180        let columns = col_rx
181            .await
182            .map_err(|_| SqlError::QueryFailed("SQLite cursor producer dropped".to_string()))??;
183        Ok((columns, channel_stream(row_rx)))
184    }
185
186    // execute_multi uses the default impl: tries query(), falls back to execute()
187
188    async fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
189        let statements =
190            split_sqlite_statements(sql).map_err(|e| SqlError::QueryFailed(e.to_string()))?;
191        let mut results = Vec::with_capacity(statements.len());
192        for stmt in statements {
193            let stmt = stmt.trim();
194            if stmt.is_empty() {
195                continue;
196            }
197            match self.query(stmt).await {
198                Ok(result) => results.push(StatementResult::Query(result)),
199                Err(SqlError::QueryFailed(_)) => {
200                    let summary = self.execute(stmt).await?;
201                    results.push(StatementResult::Summary(summary));
202                }
203                Err(e) => return Err(e),
204            }
205        }
206        Ok(results)
207    }
208
209    async fn ping(&mut self) -> Result<(), SqlError> {
210        let _ = self.query("SELECT 1").await?;
211        Ok(())
212    }
213
214    async fn list_tables(&mut self, _schema: Option<&str>) -> Result<Vec<String>, SqlError> {
215        let conn = self.conn.clone();
216        tokio::task::spawn_blocking(move || {
217            let guard = conn.lock().unwrap();
218            let mut stmt = guard
219                .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
220                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
221            let names: Vec<String> = stmt
222                .query_map([], |row| row.get(0))
223                .map_err(|e| SqlError::QueryFailed(e.to_string()))?
224                .collect::<Result<Vec<_>, _>>()
225                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
226            Ok(names)
227        })
228        .await
229        .map_err(|e| SqlError::QueryFailed(e.to_string()))?
230    }
231
232    async fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError> {
233        let conn = self.conn.clone();
234        tokio::task::spawn_blocking(move || {
235            let guard = conn.lock().unwrap();
236            // `PRAGMA database_list` returns (seq, name, file) rows: the
237            // main database plus any ATTACHed ones. `main` is the schema
238            // unqualified objects resolve against.
239            let mut stmt = guard
240                .prepare("PRAGMA database_list")
241                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
242            let schemas: Vec<SchemaInfo> = stmt
243                .query_map([], |row| row.get::<_, String>(1))
244                .map_err(|e| SqlError::QueryFailed(e.to_string()))?
245                .collect::<Result<Vec<String>, _>>()
246                .map_err(|e| SqlError::QueryFailed(e.to_string()))?
247                .into_iter()
248                .map(|name| {
249                    let is_default = name == "main";
250                    SchemaInfo { name, is_default }
251                })
252                .collect();
253            Ok(schemas)
254        })
255        .await
256        .map_err(|e| SqlError::QueryFailed(e.to_string()))?
257    }
258
259    async fn describe_table(
260        &mut self,
261        _schema: Option<&str>,
262        table: &str,
263    ) -> Result<QueryResult, SqlError> {
264        let table = table.to_string();
265        let conn = self.conn.clone();
266        tokio::task::spawn_blocking(move || {
267            let guard = conn.lock().unwrap();
268            let sql = format!("PRAGMA table_info({})", escape_sqlite_identifier(&table));
269            let mut stmt = guard
270                .prepare(&sql)
271                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
272            let col_names = stmt.column_names();
273            let columns: Vec<ColumnInfo> = col_names
274                .iter()
275                .map(|name| ColumnInfo {
276                    name: name.to_string(),
277                    type_hint: TypeHint::String,
278                    nullable: true,
279                })
280                .collect();
281            let mut rows = Vec::new();
282            let mut rows_iter = stmt
283                .query([])
284                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
285            while let Some(row) = rows_iter
286                .next()
287                .map_err(|e| SqlError::QueryFailed(e.to_string()))?
288            {
289                let mut values = Vec::with_capacity(columns.len());
290                for i in 0..columns.len() {
291                    let val: SqliteValue = row
292                        .get(i)
293                        .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
294                    values.push(sqlite_to_value(val));
295                }
296                rows.push(values);
297            }
298            Ok(QueryResult { columns, rows })
299        })
300        .await
301        .map_err(|e| SqlError::QueryFailed(e.to_string()))?
302    }
303
304    async fn primary_key(
305        &mut self,
306        _schema: Option<&str>,
307        table: &str,
308    ) -> Result<Vec<String>, SqlError> {
309        let table = table.to_string();
310        let conn = self.conn.clone();
311        tokio::task::spawn_blocking(move || {
312            let guard = conn.lock().unwrap();
313            // `PRAGMA table_info(t)` exposes a `pk` column: 0 for
314            // non-PK columns, otherwise the 1-based key position.
315            let sql = format!("PRAGMA table_info({})", escape_sqlite_identifier(&table));
316            let mut stmt = guard
317                .prepare(&sql)
318                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
319            let mut rows = stmt
320                .query([])
321                .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
322            let mut keyed: Vec<(i64, String)> = Vec::new();
323            while let Some(row) = rows
324                .next()
325                .map_err(|e| SqlError::QueryFailed(e.to_string()))?
326            {
327                let name: String = row
328                    .get("name")
329                    .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
330                let pk: i64 = row
331                    .get("pk")
332                    .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
333                if pk > 0 {
334                    keyed.push((pk, name));
335                }
336            }
337            keyed.sort_by_key(|(pos, _)| *pos);
338            Ok(keyed.into_iter().map(|(_, n)| n).collect())
339        })
340        .await
341        .map_err(|e| SqlError::QueryFailed(e.to_string()))?
342    }
343
344    async fn list_foreign_keys(
345        &mut self,
346        _schema: Option<&str>,
347    ) -> Result<Vec<ForeignKey>, SqlError> {
348        let conn = self.conn.clone();
349        tokio::task::spawn_blocking(move || {
350            let guard = conn.lock().unwrap();
351            // SQLite has no schema-wide FK catalog; enumerate tables
352            // then call `PRAGMA foreign_key_list(t)` per table.
353            let tables: Vec<String> = {
354                let mut stmt = guard
355                    .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
356                    .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
357                let names: Result<Vec<String>, _> = stmt
358                    .query_map([], |row| row.get::<_, String>(0))
359                    .map_err(|e| SqlError::QueryFailed(e.to_string()))?
360                    .collect();
361                names.map_err(|e| SqlError::QueryFailed(e.to_string()))?
362            };
363            let mut out: Vec<ForeignKey> = Vec::new();
364            for child_table in tables {
365                let sql = format!(
366                    "PRAGMA foreign_key_list({})",
367                    escape_sqlite_identifier(&child_table)
368                );
369                let mut stmt = guard
370                    .prepare(&sql)
371                    .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
372                let mut rows = stmt
373                    .query([])
374                    .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
375                let mut by_id: indexmap::IndexMap<i64, ForeignKey> = indexmap::IndexMap::new();
376                while let Some(row) = rows
377                    .next()
378                    .map_err(|e| SqlError::QueryFailed(e.to_string()))?
379                {
380                    let id: i64 = row
381                        .get("id")
382                        .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
383                    let parent_table: String = row
384                        .get("table")
385                        .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
386                    let child_col: String = row
387                        .get("from")
388                        .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
389                    let parent_col: String = row
390                        .get("to")
391                        .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
392                    let on_delete: Option<String> = row.get("on_delete").ok();
393                    let entry = by_id.entry(id).or_insert_with(|| ForeignKey {
394                        child_table: child_table.clone(),
395                        child_columns: Vec::new(),
396                        parent_table: parent_table.clone(),
397                        parent_columns: Vec::new(),
398                        on_delete: on_delete.filter(|s| !s.is_empty() && s != "NO ACTION"),
399                    });
400                    entry.child_columns.push(child_col);
401                    entry.parent_columns.push(parent_col);
402                }
403                out.extend(by_id.into_values());
404            }
405            Ok(out)
406        })
407        .await
408        .map_err(|e| SqlError::QueryFailed(e.to_string()))?
409    }
410
411    async fn bulk_insert_rows(&mut self, _target: BulkInsert<'_>) -> Result<usize, SqlError> {
412        // SQLite has no protocol-level bulk loader; its bottleneck
413        // is fsync, not parse/plan. The generic multi-row INSERT in
414        // copy.rs is already optimal for it. Always degrade to the
415        // generic path.
416        Err(SqlError::BulkUnavailable(
417            "SQLite has no native bulk loader; multi-row INSERT is already optimal".into(),
418        ))
419    }
420}
421
422pub(crate) async fn connect(
423    _url: &DatabaseUrl,
424    _opts: &ConnectOptions,
425) -> Result<SqliteConnection, SqlError> {
426    let path = _url.path().to_string();
427    tokio::task::spawn_blocking(move || {
428        let conn =
429            SqliteConn::open(&path).map_err(|e| SqlError::ConnectionFailed(e.to_string()))?;
430        Ok(SqliteConnection {
431            conn: std::sync::Arc::new(std::sync::Mutex::new(conn)),
432        })
433    })
434    .await
435    .map_err(|e| SqlError::ConnectionFailed(e.to_string()))?
436}
437
438fn sqlite_to_value(v: SqliteValue) -> Value {
439    match v {
440        SqliteValue::Null => Value::Null,
441        SqliteValue::Integer(i) => Value::Int64(i),
442        SqliteValue::Real(f) => Value::Float64(f),
443        SqliteValue::Text(s) => Value::String(s),
444        SqliteValue::Blob(b) => Value::Bytes(b),
445    }
446}
447
448fn escape_sqlite_identifier(name: &str) -> String {
449    format!("\"{}\"", name.replace('"', "\"\""))
450}
451
452/// Split a SQL string into individual statements for SQLite.
453///
454/// Handles single-quoted strings (`''` escape), double-quoted identifiers
455/// (`""` escape), `--` line comments, and `/* */` block comments.
456fn split_sqlite_statements(sql: &str) -> Result<Vec<&str>, String> {
457    let mut statements = Vec::new();
458    let mut start = 0usize;
459    let mut i = 0usize;
460    let bytes = sql.as_bytes();
461
462    while i < bytes.len() {
463        match bytes[i] {
464            b'\'' => {
465                i += 1;
466                while i < bytes.len() {
467                    if bytes[i] == b'\'' {
468                        if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
469                            i += 2;
470                        } else {
471                            i += 1;
472                            break;
473                        }
474                    } else {
475                        i += 1;
476                    }
477                }
478            }
479            b'"' => {
480                i += 1;
481                while i < bytes.len() {
482                    if bytes[i] == b'"' {
483                        if i + 1 < bytes.len() && bytes[i + 1] == b'"' {
484                            i += 2;
485                        } else {
486                            i += 1;
487                            break;
488                        }
489                    } else {
490                        i += 1;
491                    }
492                }
493            }
494            b'-' if i + 1 < bytes.len() && bytes[i + 1] == b'-' => {
495                i += 2;
496                while i < bytes.len() && bytes[i] != b'\n' {
497                    i += 1;
498                }
499            }
500            b'/' if i + 1 < bytes.len() && bytes[i + 1] == b'*' => {
501                i += 2;
502                while i + 1 < bytes.len() {
503                    if bytes[i] == b'*' && bytes[i + 1] == b'/' {
504                        i += 2;
505                        break;
506                    }
507                    i += 1;
508                }
509            }
510            b';' => {
511                statements.push(&sql[start..=i]);
512                i += 1;
513                start = i;
514            }
515            _ => i += 1,
516        }
517    }
518
519    if start < sql.len() {
520        let tail = &sql[start..];
521        if !tail.trim().is_empty() {
522            statements.push(tail.trim_end());
523        }
524    }
525
526    Ok(statements)
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532    use std::sync::atomic::{AtomicU64, Ordering};
533
534    static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
535
536    /// Returns a fresh on-disk SQLite URL. Each call yields a unique path so
537    /// concurrent tests do not collide.
538    fn fresh_test_url() -> (String, std::path::PathBuf) {
539        let pid = std::process::id();
540        let n = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
541        let path = std::env::temp_dir().join(format!("ferrule-sqlite-test-{pid}-{n}.db"));
542        let _ = std::fs::remove_file(&path);
543        let url = format!("sqlite://{}", path.display());
544        (url, path)
545    }
546
547    /// Connect to a fresh on-disk SQLite database, returning the connection
548    /// and the path so the caller can clean up.
549    fn fresh_conn() -> (Box<dyn crate::Connection>, std::path::PathBuf) {
550        let (raw_url, path) = fresh_test_url();
551        let url = DatabaseUrl::parse(&raw_url).expect("parse sqlite URL");
552        let conn =
553            crate::connect(&url, &ConnectOptions::default(), None).expect("connect should succeed");
554        (conn, path)
555    }
556
557    /// Seed the standard test_users table; mirrors the schemas used for the
558    /// other backends (see CLAUDE.md "How to Test").
559    fn seed_test_users(conn: &mut dyn crate::Connection) {
560        conn.execute(
561            "CREATE TABLE test_users (
562                id INTEGER PRIMARY KEY AUTOINCREMENT,
563                name TEXT,
564                age INTEGER,
565                score REAL,
566                active INTEGER,
567                meta TEXT
568            )",
569        )
570        .expect("create table");
571        conn.execute("INSERT INTO test_users (name, age, score, active, meta) VALUES ('Alice', 30, 99.5, 1, '{\"role\":\"admin\"}')")
572            .expect("insert alice");
573        conn.execute("INSERT INTO test_users (name, age, score, active, meta) VALUES ('Bob', 25, 88.25, 0, '{\"role\":\"user\"}')")
574            .expect("insert bob");
575    }
576
577    #[test]
578    fn test_sqlite_ping() {
579        let (mut conn, path) = fresh_conn();
580        conn.ping().expect("ping should succeed");
581        let _ = std::fs::remove_file(&path);
582    }
583
584    #[test]
585    fn test_sqlite_query() {
586        let (mut conn, path) = fresh_conn();
587        seed_test_users(&mut conn);
588        let result = conn
589            .query("SELECT * FROM test_users ORDER BY id")
590            .expect("query should succeed");
591        assert_eq!(result.columns.len(), 6, "expected 6 columns");
592        assert_eq!(result.rows.len(), 2, "expected 2 seeded rows");
593        let _ = std::fs::remove_file(&path);
594    }
595
596    #[test]
597    fn test_sqlite_execute() {
598        let (mut conn, path) = fresh_conn();
599        seed_test_users(&mut conn);
600        let summary = conn
601            .execute("INSERT INTO test_users (name, age) VALUES ('Charlie', 35)")
602            .expect("execute should succeed");
603        assert_eq!(
604            summary.rows_affected,
605            Some(1),
606            "expected exactly one row inserted"
607        );
608        let _ = std::fs::remove_file(&path);
609    }
610
611    #[test]
612    fn test_sqlite_list_tables() {
613        let (mut conn, path) = fresh_conn();
614        seed_test_users(&mut conn);
615        conn.execute("CREATE TABLE other (id INTEGER)")
616            .expect("create other");
617        let tables = conn.list_tables(None).expect("list_tables");
618        assert!(tables.contains(&"test_users".to_string()));
619        assert!(tables.contains(&"other".to_string()));
620        let _ = std::fs::remove_file(&path);
621    }
622
623    #[test]
624    fn test_sqlite_list_schemas() {
625        let (mut conn, path) = fresh_conn();
626        let schemas = conn.list_schemas().expect("list_schemas");
627        // A fresh connection with no ATTACHed databases reports exactly
628        // one schema, `main`, which is the default.
629        assert_eq!(
630            schemas.len(),
631            1,
632            "fresh sqlite conn should report one schema, got: {schemas:?}"
633        );
634        assert_eq!(schemas[0].name, "main");
635        assert!(schemas[0].is_default, "main should be the default schema");
636        let _ = std::fs::remove_file(&path);
637    }
638
639    #[test]
640    fn test_sqlite_describe_table() {
641        let (mut conn, path) = fresh_conn();
642        seed_test_users(&mut conn);
643        let result = conn.describe_table(None, "test_users").expect("describe");
644        // PRAGMA table_info returns one row per column: cid, name, type, notnull, dflt_value, pk.
645        assert!(
646            result.rows.len() >= 6,
647            "expected >=6 columns in test_users, got {}",
648            result.rows.len()
649        );
650        let _ = std::fs::remove_file(&path);
651    }
652
653    #[test]
654    fn test_sqlite_type_mapping() {
655        let (mut conn, path) = fresh_conn();
656        // Build a row that exercises each SqliteValue branch in sqlite_to_value.
657        conn.execute(
658            "CREATE TABLE typed (
659                i INTEGER,
660                r REAL,
661                t TEXT,
662                b BLOB,
663                n INTEGER
664            )",
665        )
666        .expect("create typed");
667        conn.execute("INSERT INTO typed VALUES (42, 2.5, 'hi', x'deadbeef', NULL)")
668            .expect("insert typed");
669
670        let result = conn
671            .query("SELECT i, r, t, b, n FROM typed")
672            .expect("query typed");
673        let row = &result.rows[0];
674        assert!(matches!(row[0], Value::Int64(42)), "i should be Int64(42)");
675        assert!(
676            matches!(row[1], Value::Float64(f) if (f - 2.5).abs() < 1e-9),
677            "r should be Float64(~2.5)"
678        );
679        assert!(
680            matches!(&row[2], Value::String(s) if s == "hi"),
681            "t should be String('hi')"
682        );
683        assert!(
684            matches!(&row[3], Value::Bytes(b) if b == &vec![0xde, 0xad, 0xbe, 0xef]),
685            "b should be Bytes(0xDEADBEEF)"
686        );
687        assert!(matches!(row[4], Value::Null), "n should be Null");
688        let _ = std::fs::remove_file(&path);
689    }
690
691    #[test]
692    fn test_sqlite_execute_multi() {
693        let (mut conn, path) = fresh_conn();
694        let results = conn
695            .execute_multi(
696                "CREATE TABLE m (id INTEGER); \
697                 INSERT INTO m VALUES (1); \
698                 INSERT INTO m VALUES (2); \
699                 SELECT COUNT(*) AS c FROM m;",
700            )
701            .expect("execute_multi");
702        assert_eq!(results.len(), 4, "expected 4 statement results");
703        match results.last().unwrap() {
704            StatementResult::Query(qr) => {
705                assert_eq!(qr.rows.len(), 1);
706                assert!(matches!(qr.rows[0][0], Value::Int64(2)));
707            }
708            other => panic!("last result should be Query, got {:?}", other),
709        }
710        let _ = std::fs::remove_file(&path);
711    }
712
713    #[test]
714    fn test_escape_sqlite_identifier_doubles_quotes() {
715        assert_eq!(escape_sqlite_identifier("plain"), "\"plain\"");
716        assert_eq!(escape_sqlite_identifier("a\"b"), "\"a\"\"b\"");
717    }
718
719    #[test]
720    fn test_sqlite_primary_key() {
721        let (mut conn, path) = fresh_conn();
722        seed_test_users(&mut conn);
723        let pk = conn.primary_key(None, "test_users").expect("primary_key");
724        assert_eq!(pk, vec!["id".to_string()]);
725        let _ = std::fs::remove_file(&path);
726    }
727
728    #[test]
729    fn test_sqlite_primary_key_composite_in_order() {
730        let (mut conn, path) = fresh_conn();
731        // SQLite uses the column order in the PRIMARY KEY clause for
732        // the `pk` ordinal: tenant first, then resource.
733        conn.execute(
734            "CREATE TABLE membership (
735                tenant TEXT,
736                resource TEXT,
737                role TEXT,
738                PRIMARY KEY (tenant, resource)
739            )",
740        )
741        .expect("create membership");
742        let pk = conn.primary_key(None, "membership").expect("primary_key");
743        assert_eq!(pk, vec!["tenant".to_string(), "resource".to_string()]);
744        let _ = std::fs::remove_file(&path);
745    }
746
747    #[test]
748    fn test_sqlite_primary_key_none() {
749        let (mut conn, path) = fresh_conn();
750        conn.execute("CREATE TABLE no_pk (a INTEGER, b TEXT)")
751            .expect("create no_pk");
752        let pk = conn.primary_key(None, "no_pk").expect("primary_key");
753        assert!(pk.is_empty(), "expected no PK columns, got {pk:?}");
754        let _ = std::fs::remove_file(&path);
755    }
756
757    #[test]
758    fn test_sqlite_list_foreign_keys() {
759        let (mut conn, path) = fresh_conn();
760        seed_test_users(&mut conn);
761        conn.execute(
762            "CREATE TABLE test_orders (
763                id INTEGER PRIMARY KEY AUTOINCREMENT,
764                user_id INTEGER REFERENCES test_users(id) ON DELETE CASCADE,
765                total REAL
766            )",
767        )
768        .expect("create test_orders");
769        let fks = conn.list_foreign_keys(None).expect("list_foreign_keys");
770        assert_eq!(fks.len(), 1, "expected one FK edge, got {fks:?}");
771        let fk = &fks[0];
772        assert_eq!(fk.child_table, "test_orders");
773        assert_eq!(fk.child_columns, vec!["user_id".to_string()]);
774        assert_eq!(fk.parent_table, "test_users");
775        assert_eq!(fk.parent_columns, vec!["id".to_string()]);
776        assert_eq!(fk.on_delete.as_deref(), Some("CASCADE"));
777        let _ = std::fs::remove_file(&path);
778    }
779
780    #[test]
781    fn test_sqlite_list_foreign_keys_composite() {
782        let (mut conn, path) = fresh_conn();
783        conn.execute(
784            "CREATE TABLE parent (
785                a INTEGER, b INTEGER,
786                PRIMARY KEY (a, b)
787            )",
788        )
789        .expect("create parent");
790        conn.execute(
791            "CREATE TABLE child (
792                x INTEGER, y INTEGER,
793                FOREIGN KEY (x, y) REFERENCES parent(a, b)
794            )",
795        )
796        .expect("create child");
797        let fks = conn.list_foreign_keys(None).expect("list_foreign_keys");
798        assert_eq!(fks.len(), 1);
799        assert_eq!(fks[0].child_columns, vec!["x".to_string(), "y".to_string()]);
800        assert_eq!(
801            fks[0].parent_columns,
802            vec!["a".to_string(), "b".to_string()]
803        );
804        let _ = std::fs::remove_file(&path);
805    }
806
807    // --- #65 streaming cursor: bounded-memory reads (no container) ---
808
809    /// Stream a large synthetic result through `query_cursor` in small
810    /// fixed batches and assert the cursor pulls **batch-at-a-time** —
811    /// every batch is `<= batch_size`, the batch count is exactly
812    /// `ceil(total / batch_size)`, and the row total is exact. A full
813    /// materialization (`query`) would instead buffer all rows at once;
814    /// this proves the cursor never does, so peak in-flight memory is
815    /// `O(batch + channel cap)`, not `O(total rows)`.
816    #[test]
817    fn test_sqlite_cursor_streams_in_bounded_batches() {
818        let (mut conn, path) = fresh_conn();
819        const TOTAL: i64 = 250_000;
820        const BATCH: usize = 128;
821        let cte = format!(
822            "WITH RECURSIVE seq(i) AS (\
823                 SELECT 1 UNION ALL SELECT i + 1 FROM seq WHERE i < {TOTAL}\
824             ) SELECT i, i * 2 AS doubled FROM seq"
825        );
826        let mut cursor = conn.query_cursor(&cte).expect("open cursor");
827        assert_eq!(cursor.columns().len(), 2, "two projected columns");
828
829        let mut total: u64 = 0;
830        let mut batches: u64 = 0;
831        let mut max_batch_len = 0usize;
832        loop {
833            let batch = cursor.next_batch(BATCH).expect("pull batch");
834            if batch.is_empty() {
835                break;
836            }
837            assert!(
838                batch.len() <= BATCH,
839                "a streamed batch ({}) must never exceed the requested size {}",
840                batch.len(),
841                BATCH
842            );
843            max_batch_len = max_batch_len.max(batch.len());
844            total += batch.len() as u64;
845            batches += 1;
846        }
847        assert_eq!(total, TOTAL as u64, "streamed every row exactly once");
848        assert!(
849            max_batch_len <= BATCH,
850            "peak in-flight batch stayed bounded by batch size"
851        );
852        let expected_batches = (TOTAL as u64).div_ceil(BATCH as u64);
853        assert_eq!(
854            batches, expected_batches,
855            "exactly ceil(total/batch) batches — proves batch-at-a-time, not full buffering"
856        );
857        let _ = std::fs::remove_file(&path);
858    }
859
860    /// Streaming a result whose every row carries a wide payload at a
861    /// tiny batch size completes without buffering the whole thing.
862    /// 100k rows x ~4 KiB would be ~400 MiB if materialized; the cursor
863    /// holds at most `batch + channel-cap` rows, so this stays bounded.
864    #[test]
865    fn test_sqlite_cursor_wide_rows_stay_bounded() {
866        let (mut conn, path) = fresh_conn();
867        const TOTAL: i64 = 100_000;
868        // Each row interpolates a ~4 KiB text payload.
869        let cte = format!(
870            "WITH RECURSIVE seq(i) AS (\
871                 SELECT 1 UNION ALL SELECT i + 1 FROM seq WHERE i < {TOTAL}\
872             ) SELECT i, printf('%.*c', 4096, 'x') AS payload FROM seq"
873        );
874        let cursor = conn.query_cursor(&cte).expect("open cursor");
875        let mut count: u64 = 0;
876        for row in cursor {
877            let row = row.expect("row ok");
878            assert_eq!(row.len(), 2);
879            // Confirm the wide payload actually rode through the stream.
880            if let Value::String(ref payload) = row[1] {
881                assert_eq!(payload.len(), 4096);
882            } else {
883                panic!("payload column should be a String");
884            }
885            count += 1;
886        }
887        assert_eq!(count, TOTAL as u64, "iterator drained every wide row");
888        let _ = std::fs::remove_file(&path);
889    }
890
891    /// The eager `query` and the streaming cursor return identical data
892    /// for a modest result, so the cursor is a drop-in for ingestion.
893    #[test]
894    fn test_sqlite_cursor_matches_eager_query() {
895        let (mut conn, path) = fresh_conn();
896        seed_test_users(&mut conn);
897        let eager = conn
898            .query("SELECT id, name, age FROM test_users ORDER BY id")
899            .expect("eager query");
900        let streamed: Vec<crate::value::Row> = conn
901            .query_cursor("SELECT id, name, age FROM test_users ORDER BY id")
902            .expect("cursor")
903            .collect::<Result<Vec<_>, _>>()
904            .expect("collect streamed rows");
905        assert_eq!(eager.rows, streamed, "cursor data == eager data");
906        let _ = std::fs::remove_file(&path);
907    }
908
909    /// `next_batch(0)` is a no-op that returns an empty batch without
910    /// advancing the cursor; the next real pull still sees row 0.
911    #[test]
912    fn test_sqlite_cursor_next_batch_zero_is_noop() {
913        let (mut conn, path) = fresh_conn();
914        seed_test_users(&mut conn);
915        let mut cursor = conn
916            .query_cursor("SELECT id FROM test_users ORDER BY id")
917            .expect("cursor");
918        assert!(cursor.next_batch(0).expect("zero batch").is_empty());
919        let first = cursor.next_batch(1).expect("first row");
920        assert_eq!(first.len(), 1);
921        let _ = std::fs::remove_file(&path);
922    }
923
924    // --- #91 size guards: fail fast instead of OOM (no container) ---
925
926    /// An oversized cell trips the `max_cell_bytes` guard on the eager
927    /// `query` path and fails fast with a structured
928    /// [`SqlError::CellTooLarge`] naming the offending column and cap —
929    /// rather than materializing the giant value into the result `Vec`.
930    #[test]
931    fn test_sqlite_query_cell_guard_fails_fast() {
932        let (mut conn, path) = fresh_conn();
933        conn.set_size_guards(crate::SizeGuards {
934            max_cell_bytes: 1024,
935            max_row_bytes: 0,
936            max_total_buffered_bytes: 0,
937        });
938        // A single ~8 KiB text cell, well over the 1 KiB cell cap.
939        let err = conn
940            .query("SELECT printf('%.*c', 8192, 'x') AS big")
941            .expect_err("oversized cell must fail fast, not OOM");
942        match err {
943            SqlError::CellTooLarge {
944                column, size, cap, ..
945            } => {
946                assert_eq!(column, "big");
947                assert_eq!(size, 8192);
948                assert_eq!(cap, 1024);
949            }
950            other => panic!("expected CellTooLarge, got {other:?}"),
951        }
952        let _ = std::fs::remove_file(&path);
953    }
954
955    /// The same `max_cell_bytes` guard fires on the **streaming** path:
956    /// pulling an oversized cell through the cursor yields a structured
957    /// error item, so a pathological row cannot OOM a streaming ingest.
958    #[test]
959    fn test_sqlite_cursor_cell_guard_fails_fast() {
960        let (mut conn, path) = fresh_conn();
961        conn.set_size_guards(crate::SizeGuards {
962            max_cell_bytes: 1024,
963            max_row_bytes: 0,
964            max_total_buffered_bytes: 0,
965        });
966        let mut cursor = conn
967            .query_cursor("SELECT printf('%.*c', 8192, 'x') AS big")
968            .expect("cursor opens (guard fires per-row, not at open)");
969        let err = cursor
970            .next_batch(1)
971            .expect_err("streamed oversized cell must fail fast");
972        assert!(matches!(err, SqlError::CellTooLarge { ref column, .. } if column == "big"));
973        let _ = std::fs::remove_file(&path);
974    }
975
976    /// The `max_total_buffered_bytes` guard caps the eager `query`'s
977    /// running tally: a result whose summed row bytes cross the cap
978    /// aborts with [`SqlError::BufferTooLarge`] instead of buffering an
979    /// unbounded `Vec<Row>`. This is the guard the CLI eager table path
980    /// relies on so a huge table is not collected whole.
981    #[test]
982    fn test_sqlite_query_total_buffer_guard_fails_fast() {
983        let (mut conn, path) = fresh_conn();
984        // 10k rows x ~256 bytes ~= 2.5 MiB; cap at 64 KiB so it trips
985        // partway through without ever buffering the full result.
986        conn.set_size_guards(crate::SizeGuards {
987            max_cell_bytes: 0,
988            max_row_bytes: 0,
989            max_total_buffered_bytes: 64 * 1024,
990        });
991        let cte = "WITH RECURSIVE seq(i) AS (\
992                       SELECT 1 UNION ALL SELECT i + 1 FROM seq WHERE i < 10000\
993                   ) SELECT i, printf('%.*c', 256, 'y') AS pad FROM seq";
994        let err = conn
995            .query(cte)
996            .expect_err("total-buffer cap must trip before full materialization");
997        match err {
998            SqlError::BufferTooLarge { rows_buffered, cap } => {
999                assert_eq!(cap, 64 * 1024);
1000                assert!(
1001                    rows_buffered < 10_000,
1002                    "guard tripped before buffering all rows ({rows_buffered})"
1003                );
1004            }
1005            other => panic!("expected BufferTooLarge, got {other:?}"),
1006        }
1007        let _ = std::fs::remove_file(&path);
1008    }
1009
1010    /// The **streaming** cursor is bounded by construction and so does
1011    /// **not** apply the total-buffer cap — the same huge synthetic
1012    /// result that trips the eager `query` total cap streams to
1013    /// completion through the cursor under those guards.
1014    #[test]
1015    fn test_sqlite_cursor_ignores_total_buffer_cap() {
1016        let (mut conn, path) = fresh_conn();
1017        conn.set_size_guards(crate::SizeGuards {
1018            max_cell_bytes: 0,
1019            max_row_bytes: 0,
1020            max_total_buffered_bytes: 64 * 1024,
1021        });
1022        let cte = "WITH RECURSIVE seq(i) AS (\
1023                       SELECT 1 UNION ALL SELECT i + 1 FROM seq WHERE i < 10000\
1024                   ) SELECT i, printf('%.*c', 256, 'y') AS pad FROM seq";
1025        let cursor = conn.query_cursor(cte).expect("cursor");
1026        let count = cursor.into_iter().filter(|r| r.is_ok()).count();
1027        assert_eq!(count, 10_000, "cursor streams past the total-buffer cap");
1028        let _ = std::fs::remove_file(&path);
1029    }
1030}