Skip to main content

limbo/
lib.rs

1// UPSTREAM: vendored Limbo fork — allow upstream style
2#![allow(
3    rustdoc::bare_urls,
4    rustdoc::invalid_html_tags,
5    rustdoc::broken_intra_doc_links
6)]
7#![allow(
8    clippy::collapsible_match,
9    clippy::doc_overindented_list_items,
10    clippy::from_over_into
11)]
12
13pub mod params;
14pub mod value;
15
16pub use value::Value;
17
18pub use params::params_from_iter;
19
20use crate::params::*;
21use std::fmt::Debug;
22use std::num::NonZero;
23use std::sync::{Arc, Mutex};
24
25#[derive(Debug, thiserror::Error)]
26pub enum Error {
27    #[error("SQL conversion failure: `{0}`")]
28    ToSqlConversionFailure(BoxError),
29    #[error("Mutex lock error: {0}")]
30    MutexError(String),
31    #[error("SQL execution failure: `{0}`")]
32    SqlExecutionFailure(String),
33}
34
35impl From<limbo_core::LimboError> for Error {
36    fn from(err: limbo_core::LimboError) -> Self {
37        Error::SqlExecutionFailure(err.to_string())
38    }
39}
40
41pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>;
42
43pub type Result<T> = std::result::Result<T, Error>;
44pub struct Builder {
45    path: String,
46}
47
48impl Builder {
49    pub fn new_local(path: &str) -> Self {
50        Self {
51            path: path.to_string(),
52        }
53    }
54
55    #[allow(unused_variables, clippy::arc_with_non_send_sync)]
56    pub async fn build(self) -> Result<Database> {
57        match self.path.as_str() {
58            ":memory:" => {
59                let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::MemoryIO::new());
60                let db = limbo_core::Database::open_file(io, self.path.as_str(), false)?;
61                Ok(Database { inner: db })
62            }
63            path => {
64                let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new()?);
65                let db = limbo_core::Database::open_file(io, path, false)?;
66                Ok(Database { inner: db })
67            }
68        }
69    }
70}
71
72#[derive(Clone)]
73pub struct Database {
74    inner: Arc<limbo_core::Database>,
75}
76
77unsafe impl Send for Database {}
78unsafe impl Sync for Database {}
79
80impl Debug for Database {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        f.debug_struct("Database").finish()
83    }
84}
85
86impl Database {
87    pub fn connect(&self) -> Result<Connection> {
88        let conn = self.inner.connect()?;
89        #[allow(clippy::arc_with_non_send_sync)]
90        let connection = Connection {
91            inner: Arc::new(Mutex::new(conn)),
92        };
93        Ok(connection)
94    }
95}
96
97pub struct Connection {
98    inner: Arc<Mutex<Arc<limbo_core::Connection>>>,
99}
100
101impl Clone for Connection {
102    fn clone(&self) -> Self {
103        Self {
104            inner: Arc::clone(&self.inner),
105        }
106    }
107}
108
109unsafe impl Send for Connection {}
110unsafe impl Sync for Connection {}
111
112impl Connection {
113    pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
114        let mut stmt = self.prepare(sql).await?;
115        stmt.query(params).await
116    }
117
118    pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<u64> {
119        let mut stmt = self.prepare(sql).await?;
120        stmt.execute(params).await
121    }
122
123    pub async fn prepare(&self, sql: &str) -> Result<Statement> {
124        let conn = self
125            .inner
126            .lock()
127            .map_err(|e| Error::MutexError(e.to_string()))?;
128
129        let stmt = conn.prepare(sql)?;
130
131        #[allow(clippy::arc_with_non_send_sync)]
132        let statement = Statement {
133            inner: Arc::new(Mutex::new(stmt)),
134        };
135        Ok(statement)
136    }
137
138    /// Return the number of rows changed by the most recent DML statement on
139    /// this connection.  Mirrors `sqlite3_changes()` semantics: DDL statements
140    /// and `BEGIN`/`COMMIT`/`ROLLBACK` return 0.
141    pub fn changes(&self) -> Result<i64> {
142        let conn = self
143            .inner
144            .lock()
145            .map_err(|e| Error::MutexError(e.to_string()))?;
146        Ok(conn.changes())
147    }
148
149    pub fn pragma_query<F>(&self, pragma_name: &str, mut f: F) -> Result<()>
150    where
151        F: FnMut(&Row) -> limbo_core::Result<()>,
152    {
153        let conn = self
154            .inner
155            .lock()
156            .map_err(|e| Error::MutexError(e.to_string()))?;
157
158        let rows: Vec<Row> = conn
159            .pragma_query(pragma_name)
160            .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?
161            .iter()
162            .map(|row| row.iter().collect::<Row>())
163            .collect();
164
165        rows.iter().try_for_each(|row| {
166            f(row).map_err(|e| {
167                Error::SqlExecutionFailure(format!("Error executing user defined function: {}", e))
168            })
169        })?;
170        Ok(())
171    }
172}
173
174pub struct Statement {
175    inner: Arc<Mutex<limbo_core::Statement>>,
176}
177
178impl Clone for Statement {
179    fn clone(&self) -> Self {
180        Self {
181            inner: Arc::clone(&self.inner),
182        }
183    }
184}
185
186unsafe impl Send for Statement {}
187unsafe impl Sync for Statement {}
188
189impl Statement {
190    pub async fn query(&mut self, params: impl IntoParams) -> Result<Rows> {
191        let params = params.into_params()?;
192        match params {
193            params::Params::None => (),
194            params::Params::Positional(values) => {
195                for (i, value) in values.into_iter().enumerate() {
196                    let mut stmt = self.inner.lock().unwrap();
197                    stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
198                }
199            }
200            params::Params::Named(_items) => todo!(),
201        }
202        #[allow(clippy::arc_with_non_send_sync)]
203        let rows = Rows {
204            inner: Arc::clone(&self.inner),
205        };
206        Ok(rows)
207    }
208
209    pub async fn execute(&mut self, params: impl IntoParams) -> Result<u64> {
210        {
211            // Reset the statement before executing
212            self.inner.lock().unwrap().reset();
213        }
214        let params = params.into_params()?;
215        match params {
216            params::Params::None => (),
217            params::Params::Positional(values) => {
218                for (i, value) in values.into_iter().enumerate() {
219                    let mut stmt = self.inner.lock().unwrap();
220                    stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
221                }
222            }
223            params::Params::Named(_items) => todo!(),
224        }
225        loop {
226            let mut stmt = self.inner.lock().unwrap();
227            match stmt.step() {
228                Ok(limbo_core::StepResult::Row) => {
229                    // unexpected row during execution, error out.
230                    return Ok(2);
231                }
232                Ok(limbo_core::StepResult::Done) => {
233                    return Ok(0);
234                }
235                Ok(limbo_core::StepResult::IO) => {
236                    let _ = stmt.run_once();
237                    //return Ok(1);
238                }
239                Ok(limbo_core::StepResult::Busy) => {
240                    return Ok(4);
241                }
242                Ok(limbo_core::StepResult::Interrupt) => {
243                    return Ok(3);
244                }
245                Err(err) => {
246                    return Err(err.into());
247                }
248            }
249        }
250    }
251
252    pub fn columns(&self) -> Vec<Column> {
253        let stmt = self.inner.lock().unwrap();
254
255        let n = stmt.num_columns();
256
257        let mut cols = Vec::with_capacity(n);
258
259        for i in 0..n {
260            let name = stmt.get_column_name(i).into_owned();
261            let decl_type = stmt.get_column_decl_type(i).map(|s| s.into_owned());
262            cols.push(Column { name, decl_type });
263        }
264
265        cols
266    }
267}
268
269pub struct Column {
270    name: String,
271    decl_type: Option<String>,
272}
273
274impl Column {
275    pub fn name(&self) -> &str {
276        &self.name
277    }
278
279    pub fn decl_type(&self) -> Option<&str> {
280        self.decl_type.as_deref()
281    }
282}
283
284pub trait IntoValue {
285    fn into_value(self) -> Result<Value>;
286}
287
288#[derive(Debug, Clone)]
289pub enum Params {
290    None,
291    Positional(Vec<Value>),
292    Named(Vec<(String, Value)>),
293}
294pub struct Transaction {}
295
296pub struct Rows {
297    inner: Arc<Mutex<limbo_core::Statement>>,
298}
299
300impl Clone for Rows {
301    fn clone(&self) -> Self {
302        Self {
303            inner: Arc::clone(&self.inner),
304        }
305    }
306}
307
308unsafe impl Send for Rows {}
309unsafe impl Sync for Rows {}
310
311impl Rows {
312    pub async fn next(&mut self) -> Result<Option<Row>> {
313        loop {
314            let mut stmt = self
315                .inner
316                .lock()
317                .map_err(|e| Error::MutexError(e.to_string()))?;
318            match stmt.step() {
319                Ok(limbo_core::StepResult::Row) => {
320                    let row = stmt.row().unwrap();
321                    return Ok(Some(Row {
322                        values: row.get_values().map(|v| v.to_owned()).collect(),
323                    }));
324                }
325                Ok(limbo_core::StepResult::Done) => return Ok(None),
326                Ok(limbo_core::StepResult::IO) => {
327                    if let Err(e) = stmt.run_once() {
328                        return Err(e.into());
329                    }
330                    continue;
331                }
332                Ok(limbo_core::StepResult::Busy) => return Ok(None),
333                Ok(limbo_core::StepResult::Interrupt) => return Ok(None),
334                _ => return Ok(None),
335            }
336        }
337    }
338}
339
340#[derive(Debug)]
341pub struct Row {
342    values: Vec<limbo_core::Value>,
343}
344
345unsafe impl Send for Row {}
346unsafe impl Sync for Row {}
347
348impl Row {
349    pub fn get_value(&self, index: usize) -> Result<Value> {
350        let value = &self.values[index];
351        match value {
352            limbo_core::Value::Integer(i) => Ok(Value::Integer(*i)),
353            limbo_core::Value::Null => Ok(Value::Null),
354            limbo_core::Value::Float(f) => Ok(Value::Real(*f)),
355            limbo_core::Value::Text(text) => Ok(Value::Text(text.to_string())),
356            limbo_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())),
357        }
358    }
359
360    pub fn column_count(&self) -> usize {
361        self.values.len()
362    }
363}
364
365impl<'a> FromIterator<&'a limbo_core::Value> for Row {
366    fn from_iter<T: IntoIterator<Item = &'a limbo_core::Value>>(iter: T) -> Self {
367        let values = iter
368            .into_iter()
369            .map(|v| match v {
370                limbo_core::Value::Integer(i) => limbo_core::Value::Integer(*i),
371                limbo_core::Value::Null => limbo_core::Value::Null,
372                limbo_core::Value::Float(f) => limbo_core::Value::Float(*f),
373                limbo_core::Value::Text(s) => limbo_core::Value::Text(s.clone()),
374                limbo_core::Value::Blob(b) => limbo_core::Value::Blob(b.clone()),
375            })
376            .collect();
377
378        Row { values }
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::*;
385    use tempfile::NamedTempFile;
386
387    #[tokio::test]
388    async fn test_database_persistence() -> Result<()> {
389        let temp_file = NamedTempFile::new().unwrap();
390        let db_path = temp_file.path().to_str().unwrap();
391
392        // First, create the database, a table, and insert some data
393        {
394            let db = Builder::new_local(db_path).build().await?;
395            let conn = db.connect()?;
396            conn.execute(
397                "CREATE TABLE test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
398                (),
399            )
400            .await?;
401            conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
402                .await?;
403            conn.execute("INSERT INTO test_persistence (name) VALUES ('Bob');", ())
404                .await?;
405        } // db and conn are dropped here, simulating closing
406
407        // Now, re-open the database and check if the data is still there
408        let db = Builder::new_local(db_path).build().await?;
409        let conn = db.connect()?;
410
411        let mut rows = conn
412            .query("SELECT name FROM test_persistence ORDER BY id;", ())
413            .await?;
414
415        let row1 = rows.next().await?.expect("Expected first row");
416        assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string()));
417
418        let row2 = rows.next().await?.expect("Expected second row");
419        assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string()));
420
421        assert!(rows.next().await?.is_none(), "Expected no more rows");
422
423        Ok(())
424    }
425
426    #[tokio::test]
427    async fn test_database_persistence_many_frames() -> Result<()> {
428        let temp_file = NamedTempFile::new().unwrap();
429        let db_path = temp_file.path().to_str().unwrap();
430
431        const NUM_INSERTS: usize = 100;
432        const TARGET_STRING_LEN: usize = 1024; // 1KB
433
434        let mut original_data = Vec::with_capacity(NUM_INSERTS);
435        for i in 0..NUM_INSERTS {
436            let prefix = format!("test_string_{:04}_", i);
437            let padding_len = TARGET_STRING_LEN.saturating_sub(prefix.len());
438            let padding: String = "A".repeat(padding_len);
439            original_data.push(format!("{}{}", prefix, padding));
440        }
441
442        // First, create the database, a table, and insert many large strings
443        {
444            let db = Builder::new_local(db_path).build().await?;
445            let conn = db.connect()?;
446            conn.execute(
447                "CREATE TABLE test_large_persistence (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT NOT NULL);",
448                (),
449            )
450            .await?;
451
452            for data_val in &original_data {
453                conn.execute(
454                    "INSERT INTO test_large_persistence (data) VALUES (?);",
455                    params::Params::Positional(vec![Value::Text(data_val.clone())]),
456                )
457                .await?;
458            }
459        } // db and conn are dropped here, simulating closing
460
461        // Now, re-open the database and check if the data is still there
462        let db = Builder::new_local(db_path).build().await?;
463        let conn = db.connect()?;
464
465        let mut rows = conn
466            .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
467            .await?;
468
469        for (i, expected) in original_data.iter().enumerate().take(NUM_INSERTS) {
470            let row = rows
471                .next()
472                .await?
473                .unwrap_or_else(|| panic!("Expected row {} but found None", i));
474            assert_eq!(
475                row.get_value(0)?,
476                Value::Text(expected.clone()),
477                "Mismatch in retrieved data for row {}",
478                i
479            );
480        }
481
482        assert!(
483            rows.next().await?.is_none(),
484            "Expected no more rows after retrieving all inserted data"
485        );
486
487        // Delete the WAL file only and try to re-open and query
488        let wal_path = format!("{}-wal", db_path);
489        std::fs::remove_file(&wal_path)
490            .map_err(|e| eprintln!("Warning: Failed to delete WAL file for test: {}", e))
491            .unwrap();
492
493        // Attempt to re-open the database after deleting WAL and assert that table is missing.
494        let db_after_wal_delete = Builder::new_local(db_path).build().await?;
495        let conn_after_wal_delete = db_after_wal_delete.connect()?;
496
497        let query_result_after_wal_delete = conn_after_wal_delete
498            .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
499            .await;
500
501        match query_result_after_wal_delete {
502            Ok(_) => panic!("Query succeeded after WAL deletion and DB reopen, but was expected to fail because the table definition should have been in the WAL."),
503            Err(Error::SqlExecutionFailure(msg)) => {
504                assert!(
505                    msg.contains("test_large_persistence not found"),
506                    "Expected 'test_large_persistence not found' error, but got: {}",
507                    msg
508                );
509            }
510            Err(e) => panic!(
511                "Expected SqlExecutionFailure for 'no such table', but got a different error: {:?}",
512                e
513            ),
514        }
515
516        Ok(())
517    }
518
519    #[tokio::test]
520    async fn test_database_persistence_write_one_frame_many_times() -> Result<()> {
521        let temp_file = NamedTempFile::new().unwrap();
522        let db_path = temp_file.path().to_str().unwrap();
523
524        for i in 0..100 {
525            {
526                let db = Builder::new_local(db_path).build().await?;
527                let conn = db.connect()?;
528
529                conn.execute("CREATE TABLE IF NOT EXISTS test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);", ()).await?;
530                conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
531                    .await?;
532            }
533            {
534                let db = Builder::new_local(db_path).build().await?;
535                let conn = db.connect()?;
536
537                let mut rows_iter = conn
538                    .query("SELECT count(*) FROM test_persistence;", ())
539                    .await?;
540                let rows = rows_iter.next().await?.unwrap();
541                assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1));
542                assert!(rows_iter.next().await?.is_none());
543            }
544        }
545
546        Ok(())
547    }
548
549    // ------------------------------------------------------------------
550    // A1: PRAGMA application_id
551    // ------------------------------------------------------------------
552
553    /// Read a single scalar integer value produced by a query (e.g. a PRAGMA).
554    async fn query_scalar_i64(conn: &Connection, sql: &str) -> Result<i64> {
555        let mut rows = conn.query(sql, ()).await?;
556        let row = rows
557            .next()
558            .await?
559            .unwrap_or_else(|| panic!("expected a row from `{sql}`"));
560        match row.get_value(0)? {
561            Value::Integer(i) => Ok(i),
562            other => panic!("expected Integer from `{sql}`, got {other:?}"),
563        }
564    }
565
566    #[tokio::test]
567    async fn test_application_id_write_read_round_trip() -> Result<()> {
568        let db = Builder::new_local(":memory:").build().await?;
569        let conn = db.connect()?;
570
571        // Default is 0.
572        assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, 0);
573
574        // GPKG magic (0x47504B47 = 1196444487), a large positive identifier.
575        conn.execute("PRAGMA application_id = 1196444487;", ())
576            .await?;
577        assert_eq!(
578            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
579            1196444487
580        );
581
582        // Overwrite with another value.
583        conn.execute("PRAGMA application_id = 42;", ()).await?;
584        assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, 42);
585
586        Ok(())
587    }
588
589    #[tokio::test]
590    async fn test_application_id_negative_round_trip() -> Result<()> {
591        // SQLite presents application_id as a SIGNED 32-bit integer, so -1 must
592        // round-trip as -1 (not 4294967295).
593        let db = Builder::new_local(":memory:").build().await?;
594        let conn = db.connect()?;
595
596        conn.execute("PRAGMA application_id = -1;", ()).await?;
597        assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, -1);
598
599        conn.execute("PRAGMA application_id = -2147483648;", ())
600            .await?;
601        assert_eq!(
602            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
603            -2147483648
604        );
605
606        Ok(())
607    }
608
609    /// Build a unique, file-backed database path under the OS temp directory.
610    ///
611    /// Uses [`std::env::temp_dir`] plus the process id and an atomically
612    /// incrementing counter so concurrently-running tests never collide, and
613    /// cleans up the database file together with its `-wal` sidecar on drop.
614    struct TempDbPath {
615        path: std::path::PathBuf,
616    }
617
618    impl TempDbPath {
619        fn new(tag: &str) -> Self {
620            use std::sync::atomic::{AtomicU64, Ordering};
621            static COUNTER: AtomicU64 = AtomicU64::new(0);
622            let n = COUNTER.fetch_add(1, Ordering::Relaxed);
623            let mut path = std::env::temp_dir();
624            path.push(format!(
625                "oxisqlite_dur_{}_{}_{}.db",
626                tag,
627                std::process::id(),
628                n
629            ));
630            // Ensure a clean slate even if a previous run left files behind.
631            let _ = std::fs::remove_file(&path);
632            let _ = std::fs::remove_file(format!("{}-wal", path.display()));
633            Self { path }
634        }
635
636        fn as_str(&self) -> &str {
637            self.path
638                .to_str()
639                .expect("temp db path is valid UTF-8 on the test platforms")
640        }
641    }
642
643    impl Drop for TempDbPath {
644        fn drop(&mut self) {
645            let _ = std::fs::remove_file(&self.path);
646            let _ = std::fs::remove_file(format!("{}-wal", self.path.display()));
647        }
648    }
649
650    /// `application_id` survives a real close/reopen cycle for a file-backed
651    /// database (regression test for the header-cookie durability bug: the
652    /// in-memory header was previously re-read straight from the main DB file
653    /// at open time, bypassing the WAL, so a cookie that lived only in the WAL
654    /// reset to 0 on reopen).
655    #[tokio::test]
656    async fn test_application_id_persistence() -> Result<()> {
657        let temp = TempDbPath::new("app_id_persist");
658        let db_path = temp.as_str();
659
660        {
661            let db = Builder::new_local(db_path).build().await?;
662            let conn = db.connect()?;
663            conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);", ())
664                .await?;
665            conn.execute("PRAGMA application_id = -12345;", ()).await?;
666
667            // Within the same open database the value (and its sign) is retained.
668            assert_eq!(
669                query_scalar_i64(&conn, "PRAGMA application_id;").await?,
670                -12345
671            );
672        } // connection + database dropped here, simulating a close.
673
674        // Reopen and assert the value is durably restored from the WAL.
675        let db = Builder::new_local(db_path).build().await?;
676        let conn = db.connect()?;
677        assert_eq!(
678            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
679            -12345,
680            "application_id must survive close/reopen"
681        );
682
683        Ok(())
684    }
685
686    /// `application_id` set to a large positive identifier round-trips across a
687    /// close/reopen for a file-backed database.
688    #[tokio::test]
689    async fn test_application_id_durable_reopen() -> Result<()> {
690        let temp = TempDbPath::new("app_id_reopen");
691        let db_path = temp.as_str();
692
693        {
694            let db = Builder::new_local(db_path).build().await?;
695            let conn = db.connect()?;
696            conn.execute("PRAGMA application_id = 12345;", ()).await?;
697            assert_eq!(
698                query_scalar_i64(&conn, "PRAGMA application_id;").await?,
699                12345
700            );
701        }
702
703        let db = Builder::new_local(db_path).build().await?;
704        let conn = db.connect()?;
705        assert_eq!(
706            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
707            12345,
708            "application_id = 12345 must survive close/reopen"
709        );
710
711        Ok(())
712    }
713
714    /// `user_version` (the canonical cookie mirror of `application_id`) survives
715    /// a close/reopen identically.
716    #[tokio::test]
717    async fn test_user_version_durable_reopen() -> Result<()> {
718        let temp = TempDbPath::new("user_version_reopen");
719        let db_path = temp.as_str();
720
721        {
722            let db = Builder::new_local(db_path).build().await?;
723            let conn = db.connect()?;
724            conn.execute("PRAGMA user_version = 12345;", ()).await?;
725            assert_eq!(
726                query_scalar_i64(&conn, "PRAGMA user_version;").await?,
727                12345
728            );
729        }
730
731        let db = Builder::new_local(db_path).build().await?;
732        let conn = db.connect()?;
733        assert_eq!(
734            query_scalar_i64(&conn, "PRAGMA user_version;").await?,
735            12345,
736            "user_version = 12345 must survive close/reopen"
737        );
738
739        Ok(())
740    }
741
742    /// A negative `application_id` (e.g. -1) is stored on disk as 0xFFFFFFFF but
743    /// must read back as the signed value -1 after a durable close/reopen, just
744    /// like SQLite.
745    #[tokio::test]
746    async fn test_application_id_negative_durable_reopen() -> Result<()> {
747        let temp = TempDbPath::new("app_id_negative_reopen");
748        let db_path = temp.as_str();
749
750        {
751            let db = Builder::new_local(db_path).build().await?;
752            let conn = db.connect()?;
753            conn.execute("PRAGMA application_id = -1;", ()).await?;
754            assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, -1);
755        }
756
757        let db = Builder::new_local(db_path).build().await?;
758        let conn = db.connect()?;
759        assert_eq!(
760            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
761            -1,
762            "application_id = -1 must survive close/reopen as the signed value -1"
763        );
764
765        // And the on-disk bytes (after a checkpoint flushes the WAL into the
766        // main file) must be the 32-bit two's-complement big-endian 0xFFFFFFFF.
767        let conn = db.connect()?;
768        let _ = conn.execute("PRAGMA wal_checkpoint;", ()).await;
769        drop(conn);
770        drop(db);
771        let bytes = std::fs::read(db_path).expect("read database file");
772        assert!(bytes.len() >= 72, "database file shorter than the header");
773        assert_eq!(
774            &bytes[68..72],
775            &0xFFFF_FFFFu32.to_be_bytes(),
776            "application_id = -1 must be encoded as 0xFFFFFFFF at offset 68"
777        );
778
779        Ok(())
780    }
781
782    /// Byte-level GeoPackage check: writing the GPKG magic via
783    /// `PRAGMA application_id = 1196444487` (0x47504B47) and checkpointing must
784    /// land the big-endian magic at file offset 68, and a `user_version` write
785    /// must land at offset 60 — the exact layout GeoPackage requires.
786    #[tokio::test]
787    async fn test_application_id_byte_level_on_disk() -> Result<()> {
788        const GPKG_MAGIC: u32 = 1196444487; // 0x47504B47, "GPKG".
789        const USER_VERSION: i32 = 10201; // arbitrary GeoPackage-style version.
790
791        let temp = TempDbPath::new("app_id_bytes");
792        let db_path = temp.as_str();
793
794        {
795            let db = Builder::new_local(db_path).build().await?;
796            let conn = db.connect()?;
797            // A table forces real page allocation so the file is a valid db.
798            conn.execute("CREATE TABLE gpkg_contents (id INTEGER PRIMARY KEY);", ())
799                .await?;
800            conn.execute(&format!("PRAGMA application_id = {GPKG_MAGIC};"), ())
801                .await?;
802            conn.execute(&format!("PRAGMA user_version = {USER_VERSION};"), ())
803                .await?;
804            // Checkpoint so the WAL's page-1 frame is copied into the main
805            // database file: in WAL mode the header bytes only reach the main
806            // file after a checkpoint (this is the same requirement SQLite has
807            // for a byte-valid GeoPackage on disk).
808            let _ = conn.execute("PRAGMA wal_checkpoint;", ()).await;
809        }
810
811        let bytes = std::fs::read(db_path).expect("read database file");
812        assert!(
813            bytes.len() >= 72,
814            "database file is shorter than the 100-byte header"
815        );
816
817        // application_id at offset [68..72], big-endian == 0x47504B47.
818        assert_eq!(
819            &bytes[68..72],
820            &GPKG_MAGIC.to_be_bytes(),
821            "GPKG magic must be stored big-endian at file offset 68"
822        );
823        assert_eq!(
824            u32::from_be_bytes([bytes[68], bytes[69], bytes[70], bytes[71]]),
825            0x4750_4B47,
826            "application_id bytes must decode to 0x47504B47"
827        );
828
829        // user_version at offset [60..64], big-endian.
830        assert_eq!(
831            &bytes[60..64],
832            &USER_VERSION.to_be_bytes(),
833            "user_version must be stored big-endian at file offset 60"
834        );
835
836        // The value is also readable through PRAGMA after reopen.
837        let db = Builder::new_local(db_path).build().await?;
838        let conn = db.connect()?;
839        assert_eq!(
840            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
841            GPKG_MAGIC as i64
842        );
843        assert_eq!(
844            query_scalar_i64(&conn, "PRAGMA user_version;").await?,
845            USER_VERSION as i64
846        );
847
848        Ok(())
849    }
850
851    // ------------------------------------------------------------------
852    // A2: INSERT OR IGNORE
853    // ------------------------------------------------------------------
854
855    #[tokio::test]
856    async fn test_insert_or_ignore_rowid_conflict_skipped() -> Result<()> {
857        let db = Builder::new_local(":memory:").build().await?;
858        let conn = db.connect()?;
859        conn.execute(
860            "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
861            (),
862        )
863        .await?;
864        conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
865            .await?;
866
867        // Conflicting rowid is silently ignored, not an error.
868        conn.execute("INSERT OR IGNORE INTO t (id, name) VALUES (1, 'Bob');", ())
869            .await?;
870
871        // Original row is untouched.
872        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
873        let mut rows = conn
874            .query("SELECT name FROM t WHERE id = 1;", ())
875            .await?;
876        let row = rows.next().await?.expect("row");
877        assert_eq!(row.get_value(0)?, Value::Text("Alice".to_string()));
878
879        Ok(())
880    }
881
882    #[tokio::test]
883    async fn test_insert_or_ignore_multi_row_other_rows_land() -> Result<()> {
884        let db = Builder::new_local(":memory:").build().await?;
885        let conn = db.connect()?;
886        conn.execute(
887            "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
888            (),
889        )
890        .await?;
891        conn.execute("INSERT INTO t (id, name) VALUES (2, 'Two');", ())
892            .await?;
893
894        // Multi-row INSERT OR IGNORE: row id=2 conflicts and is skipped, but ids
895        // 1 and 3 must still land.
896        conn.execute(
897            "INSERT OR IGNORE INTO t (id, name) VALUES (1, 'One'), (2, 'Dup'), (3, 'Three');",
898            (),
899        )
900        .await?;
901
902        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 3);
903        // The conflicting row keeps its original value.
904        let mut rows = conn
905            .query("SELECT name FROM t WHERE id = 2;", ())
906            .await?;
907        assert_eq!(
908            rows.next().await?.expect("row").get_value(0)?,
909            Value::Text("Two".to_string())
910        );
911        // The non-conflicting rows are present.
912        let mut rows = conn
913            .query("SELECT id FROM t ORDER BY id;", ())
914            .await?;
915        assert_eq!(
916            rows.next().await?.expect("row").get_value(0)?,
917            Value::Integer(1)
918        );
919        assert_eq!(
920            rows.next().await?.expect("row").get_value(0)?,
921            Value::Integer(2)
922        );
923        assert_eq!(
924            rows.next().await?.expect("row").get_value(0)?,
925            Value::Integer(3)
926        );
927
928        Ok(())
929    }
930
931    #[cfg(feature = "index_experimental")]
932    #[tokio::test]
933    async fn test_insert_or_ignore_unique_index_conflict_skipped() -> Result<()> {
934        let db = Builder::new_local(":memory:").build().await?;
935        let conn = db.connect()?;
936        conn.execute(
937            "CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT);",
938            (),
939        )
940        .await?;
941        conn.execute("CREATE UNIQUE INDEX idx_email ON t (email);", ())
942            .await?;
943        conn.execute(
944            "INSERT INTO t (id, email) VALUES (1, 'a@example.com');",
945            (),
946        )
947        .await?;
948
949        // Different rowid but conflicting unique-index value -> skipped, no error
950        // and crucially no partial index/table state.
951        conn.execute(
952            "INSERT OR IGNORE INTO t (id, email) VALUES (2, 'a@example.com');",
953            (),
954        )
955        .await?;
956
957        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
958        // Row id=2 must NOT exist.
959        let mut rows = conn.query("SELECT id FROM t ORDER BY id;", ()).await?;
960        assert_eq!(
961            rows.next().await?.expect("row").get_value(0)?,
962            Value::Integer(1)
963        );
964        assert!(rows.next().await?.is_none());
965
966        Ok(())
967    }
968
969    // ------------------------------------------------------------------
970    // A3: INSERT OR REPLACE
971    // ------------------------------------------------------------------
972
973    #[tokio::test]
974    async fn test_insert_or_replace_rowid_conflict_replaces() -> Result<()> {
975        let db = Builder::new_local(":memory:").build().await?;
976        let conn = db.connect()?;
977        conn.execute(
978            "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
979            (),
980        )
981        .await?;
982        conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
983            .await?;
984
985        // Same rowid -> old row replaced by new one.
986        conn.execute(
987            "INSERT OR REPLACE INTO t (id, name) VALUES (1, 'Bob');",
988            (),
989        )
990        .await?;
991
992        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
993        let mut rows = conn.query("SELECT name FROM t WHERE id = 1;", ()).await?;
994        assert_eq!(
995            rows.next().await?.expect("row").get_value(0)?,
996            Value::Text("Bob".to_string())
997        );
998
999        Ok(())
1000    }
1001
1002    #[tokio::test]
1003    async fn test_insert_or_replace_multi_row_conflict_with_prior_row() -> Result<()> {
1004        let db = Builder::new_local(":memory:").build().await?;
1005        let conn = db.connect()?;
1006        conn.execute(
1007            "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
1008            (),
1009        )
1010        .await?;
1011
1012        // Row N (id=1, 'Second') conflicts with just-inserted row N-1 (id=1,
1013        // 'First') within the same multi-row statement -> the later one wins.
1014        conn.execute(
1015            "INSERT OR REPLACE INTO t (id, name) VALUES (1, 'First'), (1, 'Second'), (2, 'Other');",
1016            (),
1017        )
1018        .await?;
1019
1020        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 2);
1021        let mut rows = conn.query("SELECT name FROM t WHERE id = 1;", ()).await?;
1022        assert_eq!(
1023            rows.next().await?.expect("row").get_value(0)?,
1024            Value::Text("Second".to_string())
1025        );
1026
1027        Ok(())
1028    }
1029
1030    #[cfg(feature = "index_experimental")]
1031    #[tokio::test]
1032    async fn test_insert_or_replace_single_unique_index_conflict() -> Result<()> {
1033        let db = Builder::new_local(":memory:").build().await?;
1034        let conn = db.connect()?;
1035        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT);", ())
1036            .await?;
1037        conn.execute("CREATE UNIQUE INDEX idx_email ON t (email);", ())
1038            .await?;
1039        conn.execute(
1040            "INSERT INTO t (id, email) VALUES (1, 'a@example.com');",
1041            (),
1042        )
1043        .await?;
1044
1045        // New rowid (2) but conflicting unique-index value -> the victim (id=1)
1046        // is deleted and replaced by the new row (id=2).
1047        conn.execute(
1048            "INSERT OR REPLACE INTO t (id, email) VALUES (2, 'a@example.com');",
1049            (),
1050        )
1051        .await?;
1052
1053        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1054        // Only id=2 remains, and the unique index still resolves it.
1055        let mut rows = conn
1056            .query("SELECT id FROM t WHERE email = 'a@example.com';", ())
1057            .await?;
1058        assert_eq!(
1059            rows.next().await?.expect("row").get_value(0)?,
1060            Value::Integer(2)
1061        );
1062        assert!(rows.next().await?.is_none());
1063
1064        Ok(())
1065    }
1066
1067    #[cfg(feature = "index_experimental")]
1068    #[tokio::test]
1069    async fn test_insert_or_replace_multiple_unique_indexes_different_victims() -> Result<()> {
1070        // SQLite OR REPLACE semantics: a new row that conflicts on MULTIPLE
1071        // unique indexes pointing at DIFFERENT existing rows must delete EVERY
1072        // victim, leaving exactly the new row.
1073        let db = Builder::new_local(":memory:").build().await?;
1074        let conn = db.connect()?;
1075        conn.execute(
1076            "CREATE TABLE t (id INTEGER PRIMARY KEY, a TEXT, b TEXT);",
1077            (),
1078        )
1079        .await?;
1080        conn.execute("CREATE UNIQUE INDEX idx_a ON t (a);", ())
1081            .await?;
1082        conn.execute("CREATE UNIQUE INDEX idx_b ON t (b);", ())
1083            .await?;
1084
1085        // Two distinct existing rows; the new row collides with row 1 on column a
1086        // and with row 2 on column b.
1087        conn.execute("INSERT INTO t (id, a, b) VALUES (1, 'A1', 'B1');", ())
1088            .await?;
1089        conn.execute("INSERT INTO t (id, a, b) VALUES (2, 'A2', 'B2');", ())
1090            .await?;
1091
1092        conn.execute(
1093            "INSERT OR REPLACE INTO t (id, a, b) VALUES (3, 'A1', 'B2');",
1094            (),
1095        )
1096        .await?;
1097
1098        // Both victims (id=1 and id=2) are gone; exactly the new row remains.
1099        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1100        let mut rows = conn.query("SELECT id, a, b FROM t;", ()).await?;
1101        let row = rows.next().await?.expect("row");
1102        assert_eq!(row.get_value(0)?, Value::Integer(3));
1103        assert_eq!(row.get_value(1)?, Value::Text("A1".to_string()));
1104        assert_eq!(row.get_value(2)?, Value::Text("B2".to_string()));
1105        assert!(rows.next().await?.is_none());
1106
1107        // Indexes resolve only the surviving row.
1108        assert_eq!(
1109            query_scalar_i64(&conn, "SELECT id FROM t WHERE a = 'A1';").await?,
1110            3
1111        );
1112        assert_eq!(
1113            query_scalar_i64(&conn, "SELECT id FROM t WHERE b = 'B2';").await?,
1114            3
1115        );
1116
1117        Ok(())
1118    }
1119
1120    // ------------------------------------------------------------------
1121    // Regression: plain INSERT conflict must still error (no Halt regression).
1122    // ------------------------------------------------------------------
1123
1124    #[tokio::test]
1125    async fn test_plain_insert_rowid_conflict_still_errors() -> Result<()> {
1126        let db = Builder::new_local(":memory:").build().await?;
1127        let conn = db.connect()?;
1128        conn.execute(
1129            "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
1130            (),
1131        )
1132        .await?;
1133        conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
1134            .await?;
1135
1136        // A plain INSERT (no OR clause) on a duplicate rowid must still fail.
1137        let result = conn
1138            .execute("INSERT INTO t (id, name) VALUES (1, 'Bob');", ())
1139            .await;
1140        assert!(
1141            result.is_err(),
1142            "plain INSERT on duplicate PRIMARY KEY must error, got Ok"
1143        );
1144
1145        // The original row is intact and no second row was written.
1146        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1147
1148        Ok(())
1149    }
1150}