1use crate::connection::{
2 AsyncConnection, BulkInsert, ConnectOptions, ExecutionSummary, ForeignKey, QueryResult,
3 SchemaInfo, StatementResult,
4};
5use crate::error::SqlError;
6use crate::stream::{BoxRowStream, DEFAULT_CURSOR_CAPACITY, channel_stream};
7use crate::url::DatabaseUrl;
8use crate::value::Row;
9use crate::value::{ColumnInfo, TypeHint, Value};
10use async_trait::async_trait;
11use rusqlite::Connection as SqliteConn;
12use rusqlite::types::Value as SqliteValue;
13
14pub struct SqliteConnection {
15 conn: std::sync::Arc<std::sync::Mutex<SqliteConn>>,
16}
17
18#[async_trait]
19impl AsyncConnection for SqliteConnection {
20 async fn execute(&mut self, sql: &str) -> Result<ExecutionSummary, SqlError> {
21 let sql = sql.to_string();
22 let conn = self.conn.clone();
23 tokio::task::spawn_blocking(move || {
24 let guard = conn.lock().unwrap();
25 let affected = guard
26 .execute(&sql, [])
27 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
28 Ok(ExecutionSummary {
29 rows_affected: Some(affected as u64),
30 command_tag: None,
31 })
32 })
33 .await
34 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
35 }
36
37 async fn query(&mut self, sql: &str) -> Result<QueryResult, SqlError> {
38 let sql = sql.to_string();
39 let conn = self.conn.clone();
40 tokio::task::spawn_blocking(move || {
41 let guard = conn.lock().unwrap();
42 let mut stmt = guard
43 .prepare(&sql)
44 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
45 let col_names = stmt.column_names();
46 if col_names.is_empty() {
47 return Err(SqlError::QueryFailed(
48 "Statement does not return rows".to_string(),
49 ));
50 }
51 let columns: Vec<ColumnInfo> = col_names
52 .iter()
53 .map(|name| ColumnInfo {
54 name: name.to_string(),
55 type_hint: TypeHint::Other,
56 nullable: true,
57 })
58 .collect();
59
60 let mut rows = Vec::new();
61 let mut rows_iter = stmt
62 .query([])
63 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
64 while let Some(row) = rows_iter
65 .next()
66 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
67 {
68 let mut values = Vec::with_capacity(columns.len());
69 for i in 0..columns.len() {
70 let val: SqliteValue = row
71 .get(i)
72 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
73 values.push(sqlite_to_value(val));
74 }
75 rows.push(values);
76 }
77
78 Ok(QueryResult { columns, rows })
79 })
80 .await
81 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
82 }
83
84 async fn query_stream(
95 &mut self,
96 sql: &str,
97 ) -> Result<(Vec<ColumnInfo>, BoxRowStream<'_>), SqlError> {
98 let sql = sql.to_string();
99 let conn = self.conn.clone();
100 let (col_tx, col_rx) = tokio::sync::oneshot::channel::<Result<Vec<ColumnInfo>, SqlError>>();
101 let (row_tx, row_rx) =
102 tokio::sync::mpsc::channel::<Result<Row, SqlError>>(DEFAULT_CURSOR_CAPACITY);
103
104 tokio::task::spawn_blocking(move || {
105 let guard = match conn.lock() {
106 Ok(g) => g,
107 Err(_) => {
108 let _ = col_tx.send(Err(SqlError::QueryFailed(
109 "SQLite connection mutex poisoned".to_string(),
110 )));
111 return;
112 }
113 };
114 let mut stmt = match guard.prepare(&sql) {
115 Ok(s) => s,
116 Err(e) => {
117 let _ = col_tx.send(Err(SqlError::QueryFailed(e.to_string())));
118 return;
119 }
120 };
121 let col_names = stmt.column_names();
122 if col_names.is_empty() {
123 let _ = col_tx.send(Err(SqlError::QueryFailed(
124 "Statement does not return rows".to_string(),
125 )));
126 return;
127 }
128 let columns: Vec<ColumnInfo> = col_names
129 .iter()
130 .map(|name| ColumnInfo {
131 name: name.to_string(),
132 type_hint: TypeHint::Other,
133 nullable: true,
134 })
135 .collect();
136 let ncols = columns.len();
137 if col_tx.send(Ok(columns)).is_err() {
139 return;
140 }
141
142 let mut rows_iter = match stmt.query([]) {
143 Ok(r) => r,
144 Err(e) => {
145 let _ = row_tx.blocking_send(Err(SqlError::QueryFailed(e.to_string())));
146 return;
147 }
148 };
149 loop {
150 match rows_iter.next() {
151 Ok(Some(row)) => {
152 let mut values = Vec::with_capacity(ncols);
153 let mut decode_err = None;
154 for i in 0..ncols {
155 match row.get::<_, SqliteValue>(i) {
156 Ok(val) => values.push(sqlite_to_value(val)),
157 Err(e) => {
158 decode_err = Some(SqlError::QueryFailed(e.to_string()));
159 break;
160 }
161 }
162 }
163 let msg = decode_err.map_or(Ok(values), Err);
164 if row_tx.blocking_send(msg).is_err() {
167 return;
169 }
170 }
171 Ok(None) => return,
172 Err(e) => {
173 let _ = row_tx.blocking_send(Err(SqlError::QueryFailed(e.to_string())));
174 return;
175 }
176 }
177 }
178 });
179
180 let columns = col_rx
181 .await
182 .map_err(|_| SqlError::QueryFailed("SQLite cursor producer dropped".to_string()))??;
183 Ok((columns, channel_stream(row_rx)))
184 }
185
186 async fn execute_multi(&mut self, sql: &str) -> Result<Vec<StatementResult>, SqlError> {
189 let statements =
190 split_sqlite_statements(sql).map_err(|e| SqlError::QueryFailed(e.to_string()))?;
191 let mut results = Vec::with_capacity(statements.len());
192 for stmt in statements {
193 let stmt = stmt.trim();
194 if stmt.is_empty() {
195 continue;
196 }
197 match self.query(stmt).await {
198 Ok(result) => results.push(StatementResult::Query(result)),
199 Err(SqlError::QueryFailed(_)) => {
200 let summary = self.execute(stmt).await?;
201 results.push(StatementResult::Summary(summary));
202 }
203 Err(e) => return Err(e),
204 }
205 }
206 Ok(results)
207 }
208
209 async fn ping(&mut self) -> Result<(), SqlError> {
210 let _ = self.query("SELECT 1").await?;
211 Ok(())
212 }
213
214 async fn list_tables(&mut self, _schema: Option<&str>) -> Result<Vec<String>, SqlError> {
215 let conn = self.conn.clone();
216 tokio::task::spawn_blocking(move || {
217 let guard = conn.lock().unwrap();
218 let mut stmt = guard
219 .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
220 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
221 let names: Vec<String> = stmt
222 .query_map([], |row| row.get(0))
223 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
224 .collect::<Result<Vec<_>, _>>()
225 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
226 Ok(names)
227 })
228 .await
229 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
230 }
231
232 async fn list_schemas(&mut self) -> Result<Vec<SchemaInfo>, SqlError> {
233 let conn = self.conn.clone();
234 tokio::task::spawn_blocking(move || {
235 let guard = conn.lock().unwrap();
236 let mut stmt = guard
240 .prepare("PRAGMA database_list")
241 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
242 let schemas: Vec<SchemaInfo> = stmt
243 .query_map([], |row| row.get::<_, String>(1))
244 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
245 .collect::<Result<Vec<String>, _>>()
246 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
247 .into_iter()
248 .map(|name| {
249 let is_default = name == "main";
250 SchemaInfo { name, is_default }
251 })
252 .collect();
253 Ok(schemas)
254 })
255 .await
256 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
257 }
258
259 async fn describe_table(
260 &mut self,
261 _schema: Option<&str>,
262 table: &str,
263 ) -> Result<QueryResult, SqlError> {
264 let table = table.to_string();
265 let conn = self.conn.clone();
266 tokio::task::spawn_blocking(move || {
267 let guard = conn.lock().unwrap();
268 let sql = format!("PRAGMA table_info({})", escape_sqlite_identifier(&table));
269 let mut stmt = guard
270 .prepare(&sql)
271 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
272 let col_names = stmt.column_names();
273 let columns: Vec<ColumnInfo> = col_names
274 .iter()
275 .map(|name| ColumnInfo {
276 name: name.to_string(),
277 type_hint: TypeHint::String,
278 nullable: true,
279 })
280 .collect();
281 let mut rows = Vec::new();
282 let mut rows_iter = stmt
283 .query([])
284 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
285 while let Some(row) = rows_iter
286 .next()
287 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
288 {
289 let mut values = Vec::with_capacity(columns.len());
290 for i in 0..columns.len() {
291 let val: SqliteValue = row
292 .get(i)
293 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
294 values.push(sqlite_to_value(val));
295 }
296 rows.push(values);
297 }
298 Ok(QueryResult { columns, rows })
299 })
300 .await
301 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
302 }
303
304 async fn primary_key(
305 &mut self,
306 _schema: Option<&str>,
307 table: &str,
308 ) -> Result<Vec<String>, SqlError> {
309 let table = table.to_string();
310 let conn = self.conn.clone();
311 tokio::task::spawn_blocking(move || {
312 let guard = conn.lock().unwrap();
313 let sql = format!("PRAGMA table_info({})", escape_sqlite_identifier(&table));
316 let mut stmt = guard
317 .prepare(&sql)
318 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
319 let mut rows = stmt
320 .query([])
321 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
322 let mut keyed: Vec<(i64, String)> = Vec::new();
323 while let Some(row) = rows
324 .next()
325 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
326 {
327 let name: String = row
328 .get("name")
329 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
330 let pk: i64 = row
331 .get("pk")
332 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
333 if pk > 0 {
334 keyed.push((pk, name));
335 }
336 }
337 keyed.sort_by_key(|(pos, _)| *pos);
338 Ok(keyed.into_iter().map(|(_, n)| n).collect())
339 })
340 .await
341 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
342 }
343
344 async fn list_foreign_keys(
345 &mut self,
346 _schema: Option<&str>,
347 ) -> Result<Vec<ForeignKey>, SqlError> {
348 let conn = self.conn.clone();
349 tokio::task::spawn_blocking(move || {
350 let guard = conn.lock().unwrap();
351 let tables: Vec<String> = {
354 let mut stmt = guard
355 .prepare("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
356 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
357 let names: Result<Vec<String>, _> = stmt
358 .query_map([], |row| row.get::<_, String>(0))
359 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
360 .collect();
361 names.map_err(|e| SqlError::QueryFailed(e.to_string()))?
362 };
363 let mut out: Vec<ForeignKey> = Vec::new();
364 for child_table in tables {
365 let sql = format!(
366 "PRAGMA foreign_key_list({})",
367 escape_sqlite_identifier(&child_table)
368 );
369 let mut stmt = guard
370 .prepare(&sql)
371 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
372 let mut rows = stmt
373 .query([])
374 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
375 let mut by_id: indexmap::IndexMap<i64, ForeignKey> = indexmap::IndexMap::new();
376 while let Some(row) = rows
377 .next()
378 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
379 {
380 let id: i64 = row
381 .get("id")
382 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
383 let parent_table: String = row
384 .get("table")
385 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
386 let child_col: String = row
387 .get("from")
388 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
389 let parent_col: String = row
390 .get("to")
391 .map_err(|e| SqlError::QueryFailed(e.to_string()))?;
392 let on_delete: Option<String> = row.get("on_delete").ok();
393 let entry = by_id.entry(id).or_insert_with(|| ForeignKey {
394 child_table: child_table.clone(),
395 child_columns: Vec::new(),
396 parent_table: parent_table.clone(),
397 parent_columns: Vec::new(),
398 on_delete: on_delete.filter(|s| !s.is_empty() && s != "NO ACTION"),
399 });
400 entry.child_columns.push(child_col);
401 entry.parent_columns.push(parent_col);
402 }
403 out.extend(by_id.into_values());
404 }
405 Ok(out)
406 })
407 .await
408 .map_err(|e| SqlError::QueryFailed(e.to_string()))?
409 }
410
411 async fn bulk_insert_rows(&mut self, _target: BulkInsert<'_>) -> Result<usize, SqlError> {
412 Err(SqlError::BulkUnavailable(
417 "SQLite has no native bulk loader; multi-row INSERT is already optimal".into(),
418 ))
419 }
420}
421
422pub(crate) async fn connect(
423 _url: &DatabaseUrl,
424 _opts: &ConnectOptions,
425) -> Result<SqliteConnection, SqlError> {
426 let path = _url.path().to_string();
427 tokio::task::spawn_blocking(move || {
428 let conn =
429 SqliteConn::open(&path).map_err(|e| SqlError::ConnectionFailed(e.to_string()))?;
430 Ok(SqliteConnection {
431 conn: std::sync::Arc::new(std::sync::Mutex::new(conn)),
432 })
433 })
434 .await
435 .map_err(|e| SqlError::ConnectionFailed(e.to_string()))?
436}
437
438fn sqlite_to_value(v: SqliteValue) -> Value {
439 match v {
440 SqliteValue::Null => Value::Null,
441 SqliteValue::Integer(i) => Value::Int64(i),
442 SqliteValue::Real(f) => Value::Float64(f),
443 SqliteValue::Text(s) => Value::String(s),
444 SqliteValue::Blob(b) => Value::Bytes(b),
445 }
446}
447
448fn escape_sqlite_identifier(name: &str) -> String {
449 format!("\"{}\"", name.replace('"', "\"\""))
450}
451
452fn split_sqlite_statements(sql: &str) -> Result<Vec<&str>, String> {
457 let mut statements = Vec::new();
458 let mut start = 0usize;
459 let mut i = 0usize;
460 let bytes = sql.as_bytes();
461
462 while i < bytes.len() {
463 match bytes[i] {
464 b'\'' => {
465 i += 1;
466 while i < bytes.len() {
467 if bytes[i] == b'\'' {
468 if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
469 i += 2;
470 } else {
471 i += 1;
472 break;
473 }
474 } else {
475 i += 1;
476 }
477 }
478 }
479 b'"' => {
480 i += 1;
481 while i < bytes.len() {
482 if bytes[i] == b'"' {
483 if i + 1 < bytes.len() && bytes[i + 1] == b'"' {
484 i += 2;
485 } else {
486 i += 1;
487 break;
488 }
489 } else {
490 i += 1;
491 }
492 }
493 }
494 b'-' if i + 1 < bytes.len() && bytes[i + 1] == b'-' => {
495 i += 2;
496 while i < bytes.len() && bytes[i] != b'\n' {
497 i += 1;
498 }
499 }
500 b'/' if i + 1 < bytes.len() && bytes[i + 1] == b'*' => {
501 i += 2;
502 while i + 1 < bytes.len() {
503 if bytes[i] == b'*' && bytes[i + 1] == b'/' {
504 i += 2;
505 break;
506 }
507 i += 1;
508 }
509 }
510 b';' => {
511 statements.push(&sql[start..=i]);
512 i += 1;
513 start = i;
514 }
515 _ => i += 1,
516 }
517 }
518
519 if start < sql.len() {
520 let tail = &sql[start..];
521 if !tail.trim().is_empty() {
522 statements.push(tail.trim_end());
523 }
524 }
525
526 Ok(statements)
527}
528
529#[cfg(test)]
530mod tests {
531 use super::*;
532 use std::sync::atomic::{AtomicU64, Ordering};
533
534 static TEST_COUNTER: AtomicU64 = AtomicU64::new(0);
535
536 fn fresh_test_url() -> (String, std::path::PathBuf) {
539 let pid = std::process::id();
540 let n = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
541 let path = std::env::temp_dir().join(format!("ferrule-sqlite-test-{pid}-{n}.db"));
542 let _ = std::fs::remove_file(&path);
543 let url = format!("sqlite://{}", path.display());
544 (url, path)
545 }
546
547 fn fresh_conn() -> (Box<dyn crate::Connection>, std::path::PathBuf) {
550 let (raw_url, path) = fresh_test_url();
551 let url = DatabaseUrl::parse(&raw_url).expect("parse sqlite URL");
552 let conn =
553 crate::connect(&url, &ConnectOptions::default(), None).expect("connect should succeed");
554 (conn, path)
555 }
556
557 fn seed_test_users(conn: &mut dyn crate::Connection) {
560 conn.execute(
561 "CREATE TABLE test_users (
562 id INTEGER PRIMARY KEY AUTOINCREMENT,
563 name TEXT,
564 age INTEGER,
565 score REAL,
566 active INTEGER,
567 meta TEXT
568 )",
569 )
570 .expect("create table");
571 conn.execute("INSERT INTO test_users (name, age, score, active, meta) VALUES ('Alice', 30, 99.5, 1, '{\"role\":\"admin\"}')")
572 .expect("insert alice");
573 conn.execute("INSERT INTO test_users (name, age, score, active, meta) VALUES ('Bob', 25, 88.25, 0, '{\"role\":\"user\"}')")
574 .expect("insert bob");
575 }
576
577 #[test]
578 fn test_sqlite_ping() {
579 let (mut conn, path) = fresh_conn();
580 conn.ping().expect("ping should succeed");
581 let _ = std::fs::remove_file(&path);
582 }
583
584 #[test]
585 fn test_sqlite_query() {
586 let (mut conn, path) = fresh_conn();
587 seed_test_users(&mut conn);
588 let result = conn
589 .query("SELECT * FROM test_users ORDER BY id")
590 .expect("query should succeed");
591 assert_eq!(result.columns.len(), 6, "expected 6 columns");
592 assert_eq!(result.rows.len(), 2, "expected 2 seeded rows");
593 let _ = std::fs::remove_file(&path);
594 }
595
596 #[test]
597 fn test_sqlite_execute() {
598 let (mut conn, path) = fresh_conn();
599 seed_test_users(&mut conn);
600 let summary = conn
601 .execute("INSERT INTO test_users (name, age) VALUES ('Charlie', 35)")
602 .expect("execute should succeed");
603 assert_eq!(
604 summary.rows_affected,
605 Some(1),
606 "expected exactly one row inserted"
607 );
608 let _ = std::fs::remove_file(&path);
609 }
610
611 #[test]
612 fn test_sqlite_list_tables() {
613 let (mut conn, path) = fresh_conn();
614 seed_test_users(&mut conn);
615 conn.execute("CREATE TABLE other (id INTEGER)")
616 .expect("create other");
617 let tables = conn.list_tables(None).expect("list_tables");
618 assert!(tables.contains(&"test_users".to_string()));
619 assert!(tables.contains(&"other".to_string()));
620 let _ = std::fs::remove_file(&path);
621 }
622
623 #[test]
624 fn test_sqlite_list_schemas() {
625 let (mut conn, path) = fresh_conn();
626 let schemas = conn.list_schemas().expect("list_schemas");
627 assert_eq!(
630 schemas.len(),
631 1,
632 "fresh sqlite conn should report one schema, got: {schemas:?}"
633 );
634 assert_eq!(schemas[0].name, "main");
635 assert!(schemas[0].is_default, "main should be the default schema");
636 let _ = std::fs::remove_file(&path);
637 }
638
639 #[test]
640 fn test_sqlite_describe_table() {
641 let (mut conn, path) = fresh_conn();
642 seed_test_users(&mut conn);
643 let result = conn.describe_table(None, "test_users").expect("describe");
644 assert!(
646 result.rows.len() >= 6,
647 "expected >=6 columns in test_users, got {}",
648 result.rows.len()
649 );
650 let _ = std::fs::remove_file(&path);
651 }
652
653 #[test]
654 fn test_sqlite_type_mapping() {
655 let (mut conn, path) = fresh_conn();
656 conn.execute(
658 "CREATE TABLE typed (
659 i INTEGER,
660 r REAL,
661 t TEXT,
662 b BLOB,
663 n INTEGER
664 )",
665 )
666 .expect("create typed");
667 conn.execute("INSERT INTO typed VALUES (42, 2.5, 'hi', x'deadbeef', NULL)")
668 .expect("insert typed");
669
670 let result = conn
671 .query("SELECT i, r, t, b, n FROM typed")
672 .expect("query typed");
673 let row = &result.rows[0];
674 assert!(matches!(row[0], Value::Int64(42)), "i should be Int64(42)");
675 assert!(
676 matches!(row[1], Value::Float64(f) if (f - 2.5).abs() < 1e-9),
677 "r should be Float64(~2.5)"
678 );
679 assert!(
680 matches!(&row[2], Value::String(s) if s == "hi"),
681 "t should be String('hi')"
682 );
683 assert!(
684 matches!(&row[3], Value::Bytes(b) if b == &vec![0xde, 0xad, 0xbe, 0xef]),
685 "b should be Bytes(0xDEADBEEF)"
686 );
687 assert!(matches!(row[4], Value::Null), "n should be Null");
688 let _ = std::fs::remove_file(&path);
689 }
690
691 #[test]
692 fn test_sqlite_execute_multi() {
693 let (mut conn, path) = fresh_conn();
694 let results = conn
695 .execute_multi(
696 "CREATE TABLE m (id INTEGER); \
697 INSERT INTO m VALUES (1); \
698 INSERT INTO m VALUES (2); \
699 SELECT COUNT(*) AS c FROM m;",
700 )
701 .expect("execute_multi");
702 assert_eq!(results.len(), 4, "expected 4 statement results");
703 match results.last().unwrap() {
704 StatementResult::Query(qr) => {
705 assert_eq!(qr.rows.len(), 1);
706 assert!(matches!(qr.rows[0][0], Value::Int64(2)));
707 }
708 other => panic!("last result should be Query, got {:?}", other),
709 }
710 let _ = std::fs::remove_file(&path);
711 }
712
713 #[test]
714 fn test_escape_sqlite_identifier_doubles_quotes() {
715 assert_eq!(escape_sqlite_identifier("plain"), "\"plain\"");
716 assert_eq!(escape_sqlite_identifier("a\"b"), "\"a\"\"b\"");
717 }
718
719 #[test]
720 fn test_sqlite_primary_key() {
721 let (mut conn, path) = fresh_conn();
722 seed_test_users(&mut conn);
723 let pk = conn.primary_key(None, "test_users").expect("primary_key");
724 assert_eq!(pk, vec!["id".to_string()]);
725 let _ = std::fs::remove_file(&path);
726 }
727
728 #[test]
729 fn test_sqlite_primary_key_composite_in_order() {
730 let (mut conn, path) = fresh_conn();
731 conn.execute(
734 "CREATE TABLE membership (
735 tenant TEXT,
736 resource TEXT,
737 role TEXT,
738 PRIMARY KEY (tenant, resource)
739 )",
740 )
741 .expect("create membership");
742 let pk = conn.primary_key(None, "membership").expect("primary_key");
743 assert_eq!(pk, vec!["tenant".to_string(), "resource".to_string()]);
744 let _ = std::fs::remove_file(&path);
745 }
746
747 #[test]
748 fn test_sqlite_primary_key_none() {
749 let (mut conn, path) = fresh_conn();
750 conn.execute("CREATE TABLE no_pk (a INTEGER, b TEXT)")
751 .expect("create no_pk");
752 let pk = conn.primary_key(None, "no_pk").expect("primary_key");
753 assert!(pk.is_empty(), "expected no PK columns, got {pk:?}");
754 let _ = std::fs::remove_file(&path);
755 }
756
757 #[test]
758 fn test_sqlite_list_foreign_keys() {
759 let (mut conn, path) = fresh_conn();
760 seed_test_users(&mut conn);
761 conn.execute(
762 "CREATE TABLE test_orders (
763 id INTEGER PRIMARY KEY AUTOINCREMENT,
764 user_id INTEGER REFERENCES test_users(id) ON DELETE CASCADE,
765 total REAL
766 )",
767 )
768 .expect("create test_orders");
769 let fks = conn.list_foreign_keys(None).expect("list_foreign_keys");
770 assert_eq!(fks.len(), 1, "expected one FK edge, got {fks:?}");
771 let fk = &fks[0];
772 assert_eq!(fk.child_table, "test_orders");
773 assert_eq!(fk.child_columns, vec!["user_id".to_string()]);
774 assert_eq!(fk.parent_table, "test_users");
775 assert_eq!(fk.parent_columns, vec!["id".to_string()]);
776 assert_eq!(fk.on_delete.as_deref(), Some("CASCADE"));
777 let _ = std::fs::remove_file(&path);
778 }
779
780 #[test]
781 fn test_sqlite_list_foreign_keys_composite() {
782 let (mut conn, path) = fresh_conn();
783 conn.execute(
784 "CREATE TABLE parent (
785 a INTEGER, b INTEGER,
786 PRIMARY KEY (a, b)
787 )",
788 )
789 .expect("create parent");
790 conn.execute(
791 "CREATE TABLE child (
792 x INTEGER, y INTEGER,
793 FOREIGN KEY (x, y) REFERENCES parent(a, b)
794 )",
795 )
796 .expect("create child");
797 let fks = conn.list_foreign_keys(None).expect("list_foreign_keys");
798 assert_eq!(fks.len(), 1);
799 assert_eq!(fks[0].child_columns, vec!["x".to_string(), "y".to_string()]);
800 assert_eq!(
801 fks[0].parent_columns,
802 vec!["a".to_string(), "b".to_string()]
803 );
804 let _ = std::fs::remove_file(&path);
805 }
806
807 #[test]
817 fn test_sqlite_cursor_streams_in_bounded_batches() {
818 let (mut conn, path) = fresh_conn();
819 const TOTAL: i64 = 250_000;
820 const BATCH: usize = 128;
821 let cte = format!(
822 "WITH RECURSIVE seq(i) AS (\
823 SELECT 1 UNION ALL SELECT i + 1 FROM seq WHERE i < {TOTAL}\
824 ) SELECT i, i * 2 AS doubled FROM seq"
825 );
826 let mut cursor = conn.query_cursor(&cte).expect("open cursor");
827 assert_eq!(cursor.columns().len(), 2, "two projected columns");
828
829 let mut total: u64 = 0;
830 let mut batches: u64 = 0;
831 let mut max_batch_len = 0usize;
832 loop {
833 let batch = cursor.next_batch(BATCH).expect("pull batch");
834 if batch.is_empty() {
835 break;
836 }
837 assert!(
838 batch.len() <= BATCH,
839 "a streamed batch ({}) must never exceed the requested size {}",
840 batch.len(),
841 BATCH
842 );
843 max_batch_len = max_batch_len.max(batch.len());
844 total += batch.len() as u64;
845 batches += 1;
846 }
847 assert_eq!(total, TOTAL as u64, "streamed every row exactly once");
848 assert!(
849 max_batch_len <= BATCH,
850 "peak in-flight batch stayed bounded by batch size"
851 );
852 let expected_batches = (TOTAL as u64).div_ceil(BATCH as u64);
853 assert_eq!(
854 batches, expected_batches,
855 "exactly ceil(total/batch) batches — proves batch-at-a-time, not full buffering"
856 );
857 let _ = std::fs::remove_file(&path);
858 }
859
860 #[test]
865 fn test_sqlite_cursor_wide_rows_stay_bounded() {
866 let (mut conn, path) = fresh_conn();
867 const TOTAL: i64 = 100_000;
868 let cte = format!(
870 "WITH RECURSIVE seq(i) AS (\
871 SELECT 1 UNION ALL SELECT i + 1 FROM seq WHERE i < {TOTAL}\
872 ) SELECT i, printf('%.*c', 4096, 'x') AS payload FROM seq"
873 );
874 let cursor = conn.query_cursor(&cte).expect("open cursor");
875 let mut count: u64 = 0;
876 for row in cursor {
877 let row = row.expect("row ok");
878 assert_eq!(row.len(), 2);
879 if let Value::String(ref payload) = row[1] {
881 assert_eq!(payload.len(), 4096);
882 } else {
883 panic!("payload column should be a String");
884 }
885 count += 1;
886 }
887 assert_eq!(count, TOTAL as u64, "iterator drained every wide row");
888 let _ = std::fs::remove_file(&path);
889 }
890
891 #[test]
894 fn test_sqlite_cursor_matches_eager_query() {
895 let (mut conn, path) = fresh_conn();
896 seed_test_users(&mut conn);
897 let eager = conn
898 .query("SELECT id, name, age FROM test_users ORDER BY id")
899 .expect("eager query");
900 let streamed: Vec<crate::value::Row> = conn
901 .query_cursor("SELECT id, name, age FROM test_users ORDER BY id")
902 .expect("cursor")
903 .collect::<Result<Vec<_>, _>>()
904 .expect("collect streamed rows");
905 assert_eq!(eager.rows, streamed, "cursor data == eager data");
906 let _ = std::fs::remove_file(&path);
907 }
908
909 #[test]
912 fn test_sqlite_cursor_next_batch_zero_is_noop() {
913 let (mut conn, path) = fresh_conn();
914 seed_test_users(&mut conn);
915 let mut cursor = conn
916 .query_cursor("SELECT id FROM test_users ORDER BY id")
917 .expect("cursor");
918 assert!(cursor.next_batch(0).expect("zero batch").is_empty());
919 let first = cursor.next_batch(1).expect("first row");
920 assert_eq!(first.len(), 1);
921 let _ = std::fs::remove_file(&path);
922 }
923
924 #[test]
931 fn test_sqlite_query_cell_guard_fails_fast() {
932 let (mut conn, path) = fresh_conn();
933 conn.set_size_guards(crate::SizeGuards {
934 max_cell_bytes: 1024,
935 max_row_bytes: 0,
936 max_total_buffered_bytes: 0,
937 });
938 let err = conn
940 .query("SELECT printf('%.*c', 8192, 'x') AS big")
941 .expect_err("oversized cell must fail fast, not OOM");
942 match err {
943 SqlError::CellTooLarge {
944 column, size, cap, ..
945 } => {
946 assert_eq!(column, "big");
947 assert_eq!(size, 8192);
948 assert_eq!(cap, 1024);
949 }
950 other => panic!("expected CellTooLarge, got {other:?}"),
951 }
952 let _ = std::fs::remove_file(&path);
953 }
954
955 #[test]
959 fn test_sqlite_cursor_cell_guard_fails_fast() {
960 let (mut conn, path) = fresh_conn();
961 conn.set_size_guards(crate::SizeGuards {
962 max_cell_bytes: 1024,
963 max_row_bytes: 0,
964 max_total_buffered_bytes: 0,
965 });
966 let mut cursor = conn
967 .query_cursor("SELECT printf('%.*c', 8192, 'x') AS big")
968 .expect("cursor opens (guard fires per-row, not at open)");
969 let err = cursor
970 .next_batch(1)
971 .expect_err("streamed oversized cell must fail fast");
972 assert!(matches!(err, SqlError::CellTooLarge { ref column, .. } if column == "big"));
973 let _ = std::fs::remove_file(&path);
974 }
975
976 #[test]
982 fn test_sqlite_query_total_buffer_guard_fails_fast() {
983 let (mut conn, path) = fresh_conn();
984 conn.set_size_guards(crate::SizeGuards {
987 max_cell_bytes: 0,
988 max_row_bytes: 0,
989 max_total_buffered_bytes: 64 * 1024,
990 });
991 let cte = "WITH RECURSIVE seq(i) AS (\
992 SELECT 1 UNION ALL SELECT i + 1 FROM seq WHERE i < 10000\
993 ) SELECT i, printf('%.*c', 256, 'y') AS pad FROM seq";
994 let err = conn
995 .query(cte)
996 .expect_err("total-buffer cap must trip before full materialization");
997 match err {
998 SqlError::BufferTooLarge { rows_buffered, cap } => {
999 assert_eq!(cap, 64 * 1024);
1000 assert!(
1001 rows_buffered < 10_000,
1002 "guard tripped before buffering all rows ({rows_buffered})"
1003 );
1004 }
1005 other => panic!("expected BufferTooLarge, got {other:?}"),
1006 }
1007 let _ = std::fs::remove_file(&path);
1008 }
1009
1010 #[test]
1015 fn test_sqlite_cursor_ignores_total_buffer_cap() {
1016 let (mut conn, path) = fresh_conn();
1017 conn.set_size_guards(crate::SizeGuards {
1018 max_cell_bytes: 0,
1019 max_row_bytes: 0,
1020 max_total_buffered_bytes: 64 * 1024,
1021 });
1022 let cte = "WITH RECURSIVE seq(i) AS (\
1023 SELECT 1 UNION ALL SELECT i + 1 FROM seq WHERE i < 10000\
1024 ) SELECT i, printf('%.*c', 256, 'y') AS pad FROM seq";
1025 let cursor = conn.query_cursor(cte).expect("cursor");
1026 let count = cursor.into_iter().filter(|r| r.is_ok()).count();
1027 assert_eq!(count, 10_000, "cursor streams past the total-buffer cap");
1028 let _ = std::fs::remove_file(&path);
1029 }
1030}