Skip to main content

limbo/
lib.rs

1// UPSTREAM: vendored Limbo fork — allow upstream style
2#![allow(
3    rustdoc::bare_urls,
4    rustdoc::invalid_html_tags,
5    rustdoc::broken_intra_doc_links
6)]
7#![allow(
8    clippy::collapsible_match,
9    clippy::doc_overindented_list_items,
10    clippy::from_over_into
11)]
12
13pub mod params;
14pub mod value;
15
16pub use value::Value;
17
18pub use params::params_from_iter;
19
20use crate::params::*;
21use std::fmt::Debug;
22use std::num::NonZero;
23use std::sync::{Arc, Mutex};
24
25#[derive(Debug, thiserror::Error)]
26pub enum Error {
27    #[error("SQL conversion failure: `{0}`")]
28    ToSqlConversionFailure(BoxError),
29    #[error("Mutex lock error: {0}")]
30    MutexError(String),
31    #[error("SQL execution failure: `{0}`")]
32    SqlExecutionFailure(String),
33    /// The database schema changed after this statement was compiled (SQLITE_SCHEMA).
34    /// Re-prepare the statement and retry.
35    #[error("database schema has changed")]
36    SchemaChanged,
37}
38
39impl Error {
40    /// Returns `true` if this error signals that the database schema changed after
41    /// the statement was compiled.  Callers should re-prepare the statement
42    /// against the refreshed schema and retry.
43    pub fn is_schema_changed(&self) -> bool {
44        matches!(self, Error::SchemaChanged)
45    }
46}
47
48impl From<limbo_core::LimboError> for Error {
49    fn from(err: limbo_core::LimboError) -> Self {
50        match err {
51            limbo_core::LimboError::SchemaChanged => Error::SchemaChanged,
52            other => Error::SqlExecutionFailure(other.to_string()),
53        }
54    }
55}
56
57pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>;
58
59pub type Result<T> = std::result::Result<T, Error>;
60pub struct Builder {
61    path: String,
62}
63
64impl Builder {
65    pub fn new_local(path: &str) -> Self {
66        Self {
67            path: path.to_string(),
68        }
69    }
70
71    #[allow(unused_variables, clippy::arc_with_non_send_sync)]
72    pub async fn build(self) -> Result<Database> {
73        match self.path.as_str() {
74            ":memory:" => {
75                let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::MemoryIO::new());
76                let db = limbo_core::Database::open_file(io, self.path.as_str(), false)?;
77                Ok(Database { inner: db })
78            }
79            path => {
80                let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new()?);
81                let db = limbo_core::Database::open_file(io, path, false)?;
82                Ok(Database { inner: db })
83            }
84        }
85    }
86}
87
88#[derive(Clone)]
89pub struct Database {
90    inner: Arc<limbo_core::Database>,
91}
92
93unsafe impl Send for Database {}
94unsafe impl Sync for Database {}
95
96impl Debug for Database {
97    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98        f.debug_struct("Database").finish()
99    }
100}
101
102impl Database {
103    pub fn connect(&self) -> Result<Connection> {
104        let conn = self.inner.connect()?;
105        #[allow(clippy::arc_with_non_send_sync)]
106        let connection = Connection {
107            inner: Arc::new(Mutex::new(conn)),
108        };
109        Ok(connection)
110    }
111}
112
113pub struct Connection {
114    inner: Arc<Mutex<Arc<limbo_core::Connection>>>,
115}
116
117impl Clone for Connection {
118    fn clone(&self) -> Self {
119        Self {
120            inner: Arc::clone(&self.inner),
121        }
122    }
123}
124
125unsafe impl Send for Connection {}
126unsafe impl Sync for Connection {}
127
128impl Connection {
129    pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
130        let mut stmt = self.prepare(sql).await?;
131        stmt.query(params).await
132    }
133
134    pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<u64> {
135        let mut stmt = self.prepare(sql).await?;
136        stmt.execute(params).await
137    }
138
139    pub async fn prepare(&self, sql: &str) -> Result<Statement> {
140        let conn = self
141            .inner
142            .lock()
143            .map_err(|e| Error::MutexError(e.to_string()))?;
144
145        let stmt = conn.prepare(sql)?;
146
147        #[allow(clippy::arc_with_non_send_sync)]
148        let statement = Statement {
149            inner: Arc::new(Mutex::new(stmt)),
150        };
151        Ok(statement)
152    }
153
154    /// Return the number of rows changed by the most recent DML statement on
155    /// this connection.  Mirrors `sqlite3_changes()` semantics: DDL statements
156    /// and `BEGIN`/`COMMIT`/`ROLLBACK` return 0.
157    pub fn changes(&self) -> Result<i64> {
158        let conn = self
159            .inner
160            .lock()
161            .map_err(|e| Error::MutexError(e.to_string()))?;
162        Ok(conn.changes())
163    }
164
165    pub fn pragma_query<F>(&self, pragma_name: &str, mut f: F) -> Result<()>
166    where
167        F: FnMut(&Row) -> limbo_core::Result<()>,
168    {
169        let conn = self
170            .inner
171            .lock()
172            .map_err(|e| Error::MutexError(e.to_string()))?;
173
174        let rows: Vec<Row> = conn
175            .pragma_query(pragma_name)
176            .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?
177            .iter()
178            .map(|row| row.iter().collect::<Row>())
179            .collect();
180
181        rows.iter().try_for_each(|row| {
182            f(row).map_err(|e| {
183                Error::SqlExecutionFailure(format!("Error executing user defined function: {}", e))
184            })
185        })?;
186        Ok(())
187    }
188}
189
190pub struct Statement {
191    inner: Arc<Mutex<limbo_core::Statement>>,
192}
193
194impl Clone for Statement {
195    fn clone(&self) -> Self {
196        Self {
197            inner: Arc::clone(&self.inner),
198        }
199    }
200}
201
202unsafe impl Send for Statement {}
203unsafe impl Sync for Statement {}
204
205impl Statement {
206    pub async fn query(&mut self, params: impl IntoParams) -> Result<Rows> {
207        let params = params.into_params()?;
208        match params {
209            params::Params::None => (),
210            params::Params::Positional(values) => {
211                for (i, value) in values.into_iter().enumerate() {
212                    let mut stmt = self
213                        .inner
214                        .lock()
215                        .map_err(|e| Error::MutexError(e.to_string()))?;
216                    if let Some(idx) = NonZero::new(i + 1) {
217                        stmt.bind_at(idx, value.into());
218                    }
219                }
220            }
221            params::Params::Named(_items) => todo!(),
222        }
223        #[allow(clippy::arc_with_non_send_sync)]
224        let rows = Rows {
225            inner: Arc::clone(&self.inner),
226        };
227        Ok(rows)
228    }
229
230    pub async fn execute(&mut self, params: impl IntoParams) -> Result<u64> {
231        {
232            // Reset the statement before executing
233            self.inner
234                .lock()
235                .map_err(|e| Error::MutexError(e.to_string()))?
236                .reset();
237        }
238        let params = params.into_params()?;
239        match params {
240            params::Params::None => (),
241            params::Params::Positional(values) => {
242                for (i, value) in values.into_iter().enumerate() {
243                    let mut stmt = self
244                        .inner
245                        .lock()
246                        .map_err(|e| Error::MutexError(e.to_string()))?;
247                    if let Some(idx) = NonZero::new(i + 1) {
248                        stmt.bind_at(idx, value.into());
249                    }
250                }
251            }
252            params::Params::Named(_items) => todo!(),
253        }
254        loop {
255            let mut stmt = self
256                .inner
257                .lock()
258                .map_err(|e| Error::MutexError(e.to_string()))?;
259            match stmt.step() {
260                Ok(limbo_core::StepResult::Row) => {
261                    // unexpected row during execution, error out.
262                    return Ok(2);
263                }
264                Ok(limbo_core::StepResult::Done) => {
265                    return Ok(0);
266                }
267                Ok(limbo_core::StepResult::IO) => {
268                    let _ = stmt.run_once();
269                    //return Ok(1);
270                }
271                Ok(limbo_core::StepResult::Busy) => {
272                    return Ok(4);
273                }
274                Ok(limbo_core::StepResult::Interrupt) => {
275                    return Ok(3);
276                }
277                Err(err) => {
278                    return Err(err.into());
279                }
280            }
281        }
282    }
283
284    pub fn columns(&self) -> Vec<Column> {
285        let Ok(stmt) = self.inner.lock() else {
286            return Vec::new();
287        };
288
289        let n = stmt.num_columns();
290
291        let mut cols = Vec::with_capacity(n);
292
293        for i in 0..n {
294            let name = stmt.get_column_name(i).into_owned();
295            let decl_type = stmt.get_column_decl_type(i).map(|s| s.into_owned());
296            cols.push(Column { name, decl_type });
297        }
298
299        cols
300    }
301}
302
303pub struct Column {
304    name: String,
305    decl_type: Option<String>,
306}
307
308impl Column {
309    pub fn name(&self) -> &str {
310        &self.name
311    }
312
313    pub fn decl_type(&self) -> Option<&str> {
314        self.decl_type.as_deref()
315    }
316}
317
318pub trait IntoValue {
319    fn into_value(self) -> Result<Value>;
320}
321
322#[derive(Debug, Clone)]
323pub enum Params {
324    None,
325    Positional(Vec<Value>),
326    Named(Vec<(String, Value)>),
327}
328pub struct Transaction {}
329
330pub struct Rows {
331    inner: Arc<Mutex<limbo_core::Statement>>,
332}
333
334impl Clone for Rows {
335    fn clone(&self) -> Self {
336        Self {
337            inner: Arc::clone(&self.inner),
338        }
339    }
340}
341
342unsafe impl Send for Rows {}
343unsafe impl Sync for Rows {}
344
345impl Rows {
346    pub async fn next(&mut self) -> Result<Option<Row>> {
347        loop {
348            let mut stmt = self
349                .inner
350                .lock()
351                .map_err(|e| Error::MutexError(e.to_string()))?;
352            match stmt.step() {
353                Ok(limbo_core::StepResult::Row) => {
354                    let row = stmt.row().ok_or_else(|| {
355                        Error::SqlExecutionFailure(
356                            "row unavailable after Row step result".to_string(),
357                        )
358                    })?;
359                    return Ok(Some(Row {
360                        values: row.get_values().map(|v| v.to_owned()).collect(),
361                    }));
362                }
363                Ok(limbo_core::StepResult::Done) => return Ok(None),
364                Ok(limbo_core::StepResult::IO) => {
365                    if let Err(e) = stmt.run_once() {
366                        return Err(e.into());
367                    }
368                    continue;
369                }
370                Ok(limbo_core::StepResult::Busy) => return Ok(None),
371                Ok(limbo_core::StepResult::Interrupt) => return Ok(None),
372                _ => return Ok(None),
373            }
374        }
375    }
376}
377
378#[derive(Debug)]
379pub struct Row {
380    values: Vec<limbo_core::Value>,
381}
382
383unsafe impl Send for Row {}
384unsafe impl Sync for Row {}
385
386impl Row {
387    pub fn get_value(&self, index: usize) -> Result<Value> {
388        let value = &self.values[index];
389        match value {
390            limbo_core::Value::Integer(i) => Ok(Value::Integer(*i)),
391            limbo_core::Value::Null => Ok(Value::Null),
392            limbo_core::Value::Float(f) => Ok(Value::Real(*f)),
393            limbo_core::Value::Text(text) => Ok(Value::Text(text.to_string())),
394            limbo_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())),
395        }
396    }
397
398    pub fn column_count(&self) -> usize {
399        self.values.len()
400    }
401}
402
403impl<'a> FromIterator<&'a limbo_core::Value> for Row {
404    fn from_iter<T: IntoIterator<Item = &'a limbo_core::Value>>(iter: T) -> Self {
405        let values = iter
406            .into_iter()
407            .map(|v| match v {
408                limbo_core::Value::Integer(i) => limbo_core::Value::Integer(*i),
409                limbo_core::Value::Null => limbo_core::Value::Null,
410                limbo_core::Value::Float(f) => limbo_core::Value::Float(*f),
411                limbo_core::Value::Text(s) => limbo_core::Value::Text(s.clone()),
412                limbo_core::Value::Blob(b) => limbo_core::Value::Blob(b.clone()),
413            })
414            .collect();
415
416        Row { values }
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423    use tempfile::NamedTempFile;
424
425    #[tokio::test]
426    async fn test_database_persistence() -> Result<()> {
427        let temp_file = NamedTempFile::new().unwrap();
428        let db_path = temp_file.path().to_str().unwrap();
429
430        // First, create the database, a table, and insert some data
431        {
432            let db = Builder::new_local(db_path).build().await?;
433            let conn = db.connect()?;
434            conn.execute(
435                "CREATE TABLE test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
436                (),
437            )
438            .await?;
439            conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
440                .await?;
441            conn.execute("INSERT INTO test_persistence (name) VALUES ('Bob');", ())
442                .await?;
443        } // db and conn are dropped here, simulating closing
444
445        // Now, re-open the database and check if the data is still there
446        let db = Builder::new_local(db_path).build().await?;
447        let conn = db.connect()?;
448
449        let mut rows = conn
450            .query("SELECT name FROM test_persistence ORDER BY id;", ())
451            .await?;
452
453        let row1 = rows.next().await?.expect("Expected first row");
454        assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string()));
455
456        let row2 = rows.next().await?.expect("Expected second row");
457        assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string()));
458
459        assert!(rows.next().await?.is_none(), "Expected no more rows");
460
461        Ok(())
462    }
463
464    #[tokio::test]
465    async fn test_database_persistence_many_frames() -> Result<()> {
466        let temp_file = NamedTempFile::new().unwrap();
467        let db_path = temp_file.path().to_str().unwrap();
468
469        const NUM_INSERTS: usize = 100;
470        const TARGET_STRING_LEN: usize = 1024; // 1KB
471
472        let mut original_data = Vec::with_capacity(NUM_INSERTS);
473        for i in 0..NUM_INSERTS {
474            let prefix = format!("test_string_{:04}_", i);
475            let padding_len = TARGET_STRING_LEN.saturating_sub(prefix.len());
476            let padding: String = "A".repeat(padding_len);
477            original_data.push(format!("{}{}", prefix, padding));
478        }
479
480        // First, create the database, a table, and insert many large strings
481        {
482            let db = Builder::new_local(db_path).build().await?;
483            let conn = db.connect()?;
484            conn.execute(
485                "CREATE TABLE test_large_persistence (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT NOT NULL);",
486                (),
487            )
488            .await?;
489
490            for data_val in &original_data {
491                conn.execute(
492                    "INSERT INTO test_large_persistence (data) VALUES (?);",
493                    params::Params::Positional(vec![Value::Text(data_val.clone())]),
494                )
495                .await?;
496            }
497        } // db and conn are dropped here, simulating closing
498
499        // Now, re-open the database and check if the data is still there
500        let db = Builder::new_local(db_path).build().await?;
501        let conn = db.connect()?;
502
503        let mut rows = conn
504            .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
505            .await?;
506
507        for (i, expected) in original_data.iter().enumerate().take(NUM_INSERTS) {
508            let row = rows
509                .next()
510                .await?
511                .unwrap_or_else(|| panic!("Expected row {} but found None", i));
512            assert_eq!(
513                row.get_value(0)?,
514                Value::Text(expected.clone()),
515                "Mismatch in retrieved data for row {}",
516                i
517            );
518        }
519
520        assert!(
521            rows.next().await?.is_none(),
522            "Expected no more rows after retrieving all inserted data"
523        );
524
525        // Delete the WAL file only and try to re-open and query
526        let wal_path = format!("{}-wal", db_path);
527        std::fs::remove_file(&wal_path)
528            .map_err(|e| eprintln!("Warning: Failed to delete WAL file for test: {}", e))
529            .unwrap();
530
531        // Re-open the database after deleting the WAL and assert the data is still
532        // fully intact. The clean close above (dropping the connection) triggers a
533        // checkpoint-on-close, which writes all WAL frames into the main `.db` file
534        // and truncates the WAL. As a result the `-wal` file is no longer
535        // load-bearing after a clean close: deleting it must NOT lose any data.
536        let db_after_wal_delete = Builder::new_local(db_path).build().await?;
537        let conn_after_wal_delete = db_after_wal_delete.connect()?;
538
539        let mut rows_after_wal_delete = conn_after_wal_delete
540            .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
541            .await?;
542
543        for (i, expected) in original_data.iter().enumerate().take(NUM_INSERTS) {
544            let row = rows_after_wal_delete.next().await?.unwrap_or_else(|| {
545                panic!(
546                    "Expected row {} after WAL deletion but found None; \
547                         checkpoint-on-close should have persisted it into the main DB",
548                    i
549                )
550            });
551            assert_eq!(
552                row.get_value(0)?,
553                Value::Text(expected.clone()),
554                "Mismatch in retrieved data for row {} after WAL deletion",
555                i
556            );
557        }
558
559        assert!(
560            rows_after_wal_delete.next().await?.is_none(),
561            "Expected no more rows after WAL deletion once all checkpointed data was retrieved"
562        );
563
564        Ok(())
565    }
566
567    #[tokio::test]
568    async fn test_database_persistence_write_one_frame_many_times() -> Result<()> {
569        let temp_file = NamedTempFile::new().unwrap();
570        let db_path = temp_file.path().to_str().unwrap();
571
572        for i in 0..100 {
573            {
574                let db = Builder::new_local(db_path).build().await?;
575                let conn = db.connect()?;
576
577                conn.execute("CREATE TABLE IF NOT EXISTS test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);", ()).await?;
578                conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
579                    .await?;
580            }
581            {
582                let db = Builder::new_local(db_path).build().await?;
583                let conn = db.connect()?;
584
585                let mut rows_iter = conn
586                    .query("SELECT count(*) FROM test_persistence;", ())
587                    .await?;
588                let rows = rows_iter.next().await?.unwrap();
589                assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1));
590                assert!(rows_iter.next().await?.is_none());
591            }
592        }
593
594        Ok(())
595    }
596
597    // ------------------------------------------------------------------
598    // A1: PRAGMA application_id
599    // ------------------------------------------------------------------
600
601    /// Read a single scalar integer value produced by a query (e.g. a PRAGMA).
602    async fn query_scalar_i64(conn: &Connection, sql: &str) -> Result<i64> {
603        let mut rows = conn.query(sql, ()).await?;
604        let row = rows
605            .next()
606            .await?
607            .unwrap_or_else(|| panic!("expected a row from `{sql}`"));
608        match row.get_value(0)? {
609            Value::Integer(i) => Ok(i),
610            other => panic!("expected Integer from `{sql}`, got {other:?}"),
611        }
612    }
613
614    #[tokio::test]
615    async fn test_application_id_write_read_round_trip() -> Result<()> {
616        let db = Builder::new_local(":memory:").build().await?;
617        let conn = db.connect()?;
618
619        // Default is 0.
620        assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, 0);
621
622        // GPKG magic (0x47504B47 = 1196444487), a large positive identifier.
623        conn.execute("PRAGMA application_id = 1196444487;", ())
624            .await?;
625        assert_eq!(
626            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
627            1196444487
628        );
629
630        // Overwrite with another value.
631        conn.execute("PRAGMA application_id = 42;", ()).await?;
632        assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, 42);
633
634        Ok(())
635    }
636
637    #[tokio::test]
638    async fn test_application_id_negative_round_trip() -> Result<()> {
639        // SQLite presents application_id as a SIGNED 32-bit integer, so -1 must
640        // round-trip as -1 (not 4294967295).
641        let db = Builder::new_local(":memory:").build().await?;
642        let conn = db.connect()?;
643
644        conn.execute("PRAGMA application_id = -1;", ()).await?;
645        assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, -1);
646
647        conn.execute("PRAGMA application_id = -2147483648;", ())
648            .await?;
649        assert_eq!(
650            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
651            -2147483648
652        );
653
654        Ok(())
655    }
656
657    /// Build a unique, file-backed database path under the OS temp directory.
658    ///
659    /// Uses [`std::env::temp_dir`] plus the process id and an atomically
660    /// incrementing counter so concurrently-running tests never collide, and
661    /// cleans up the database file together with its `-wal` sidecar on drop.
662    struct TempDbPath {
663        path: std::path::PathBuf,
664    }
665
666    impl TempDbPath {
667        fn new(tag: &str) -> Self {
668            use std::sync::atomic::{AtomicU64, Ordering};
669            static COUNTER: AtomicU64 = AtomicU64::new(0);
670            let n = COUNTER.fetch_add(1, Ordering::Relaxed);
671            let mut path = std::env::temp_dir();
672            path.push(format!(
673                "oxisqlite_dur_{}_{}_{}.db",
674                tag,
675                std::process::id(),
676                n
677            ));
678            // Ensure a clean slate even if a previous run left files behind.
679            let _ = std::fs::remove_file(&path);
680            let _ = std::fs::remove_file(format!("{}-wal", path.display()));
681            Self { path }
682        }
683
684        fn as_str(&self) -> &str {
685            self.path
686                .to_str()
687                .expect("temp db path is valid UTF-8 on the test platforms")
688        }
689    }
690
691    impl Drop for TempDbPath {
692        fn drop(&mut self) {
693            let _ = std::fs::remove_file(&self.path);
694            let _ = std::fs::remove_file(format!("{}-wal", self.path.display()));
695        }
696    }
697
698    /// `application_id` survives a real close/reopen cycle for a file-backed
699    /// database (regression test for the header-cookie durability bug: the
700    /// in-memory header was previously re-read straight from the main DB file
701    /// at open time, bypassing the WAL, so a cookie that lived only in the WAL
702    /// reset to 0 on reopen).
703    #[tokio::test]
704    async fn test_application_id_persistence() -> Result<()> {
705        let temp = TempDbPath::new("app_id_persist");
706        let db_path = temp.as_str();
707
708        {
709            let db = Builder::new_local(db_path).build().await?;
710            let conn = db.connect()?;
711            conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);", ())
712                .await?;
713            conn.execute("PRAGMA application_id = -12345;", ()).await?;
714
715            // Within the same open database the value (and its sign) is retained.
716            assert_eq!(
717                query_scalar_i64(&conn, "PRAGMA application_id;").await?,
718                -12345
719            );
720        } // connection + database dropped here, simulating a close.
721
722        // Reopen and assert the value is durably restored from the WAL.
723        let db = Builder::new_local(db_path).build().await?;
724        let conn = db.connect()?;
725        assert_eq!(
726            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
727            -12345,
728            "application_id must survive close/reopen"
729        );
730
731        Ok(())
732    }
733
734    /// `application_id` set to a large positive identifier round-trips across a
735    /// close/reopen for a file-backed database.
736    #[tokio::test]
737    async fn test_application_id_durable_reopen() -> Result<()> {
738        let temp = TempDbPath::new("app_id_reopen");
739        let db_path = temp.as_str();
740
741        {
742            let db = Builder::new_local(db_path).build().await?;
743            let conn = db.connect()?;
744            conn.execute("PRAGMA application_id = 12345;", ()).await?;
745            assert_eq!(
746                query_scalar_i64(&conn, "PRAGMA application_id;").await?,
747                12345
748            );
749        }
750
751        let db = Builder::new_local(db_path).build().await?;
752        let conn = db.connect()?;
753        assert_eq!(
754            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
755            12345,
756            "application_id = 12345 must survive close/reopen"
757        );
758
759        Ok(())
760    }
761
762    /// `user_version` (the canonical cookie mirror of `application_id`) survives
763    /// a close/reopen identically.
764    #[tokio::test]
765    async fn test_user_version_durable_reopen() -> Result<()> {
766        let temp = TempDbPath::new("user_version_reopen");
767        let db_path = temp.as_str();
768
769        {
770            let db = Builder::new_local(db_path).build().await?;
771            let conn = db.connect()?;
772            conn.execute("PRAGMA user_version = 12345;", ()).await?;
773            assert_eq!(
774                query_scalar_i64(&conn, "PRAGMA user_version;").await?,
775                12345
776            );
777        }
778
779        let db = Builder::new_local(db_path).build().await?;
780        let conn = db.connect()?;
781        assert_eq!(
782            query_scalar_i64(&conn, "PRAGMA user_version;").await?,
783            12345,
784            "user_version = 12345 must survive close/reopen"
785        );
786
787        Ok(())
788    }
789
790    /// A negative `application_id` (e.g. -1) is stored on disk as 0xFFFFFFFF but
791    /// must read back as the signed value -1 after a durable close/reopen, just
792    /// like SQLite.
793    #[tokio::test]
794    async fn test_application_id_negative_durable_reopen() -> Result<()> {
795        let temp = TempDbPath::new("app_id_negative_reopen");
796        let db_path = temp.as_str();
797
798        {
799            let db = Builder::new_local(db_path).build().await?;
800            let conn = db.connect()?;
801            conn.execute("PRAGMA application_id = -1;", ()).await?;
802            assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, -1);
803        }
804
805        let db = Builder::new_local(db_path).build().await?;
806        let conn = db.connect()?;
807        assert_eq!(
808            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
809            -1,
810            "application_id = -1 must survive close/reopen as the signed value -1"
811        );
812
813        // And the on-disk bytes (after a checkpoint flushes the WAL into the
814        // main file) must be the 32-bit two's-complement big-endian 0xFFFFFFFF.
815        let conn = db.connect()?;
816        let _ = conn.execute("PRAGMA wal_checkpoint;", ()).await;
817        drop(conn);
818        drop(db);
819        let bytes = std::fs::read(db_path).expect("read database file");
820        assert!(bytes.len() >= 72, "database file shorter than the header");
821        assert_eq!(
822            &bytes[68..72],
823            &0xFFFF_FFFFu32.to_be_bytes(),
824            "application_id = -1 must be encoded as 0xFFFFFFFF at offset 68"
825        );
826
827        Ok(())
828    }
829
830    /// Byte-level GeoPackage check: writing the GPKG magic via
831    /// `PRAGMA application_id = 1196444487` (0x47504B47) and checkpointing must
832    /// land the big-endian magic at file offset 68, and a `user_version` write
833    /// must land at offset 60 — the exact layout GeoPackage requires.
834    #[tokio::test]
835    async fn test_application_id_byte_level_on_disk() -> Result<()> {
836        const GPKG_MAGIC: u32 = 1196444487; // 0x47504B47, "GPKG".
837        const USER_VERSION: i32 = 10201; // arbitrary GeoPackage-style version.
838
839        let temp = TempDbPath::new("app_id_bytes");
840        let db_path = temp.as_str();
841
842        {
843            let db = Builder::new_local(db_path).build().await?;
844            let conn = db.connect()?;
845            // A table forces real page allocation so the file is a valid db.
846            conn.execute("CREATE TABLE gpkg_contents (id INTEGER PRIMARY KEY);", ())
847                .await?;
848            conn.execute(&format!("PRAGMA application_id = {GPKG_MAGIC};"), ())
849                .await?;
850            conn.execute(&format!("PRAGMA user_version = {USER_VERSION};"), ())
851                .await?;
852            // Checkpoint so the WAL's page-1 frame is copied into the main
853            // database file: in WAL mode the header bytes only reach the main
854            // file after a checkpoint (this is the same requirement SQLite has
855            // for a byte-valid GeoPackage on disk).
856            let _ = conn.execute("PRAGMA wal_checkpoint;", ()).await;
857        }
858
859        let bytes = std::fs::read(db_path).expect("read database file");
860        assert!(
861            bytes.len() >= 72,
862            "database file is shorter than the 100-byte header"
863        );
864
865        // application_id at offset [68..72], big-endian == 0x47504B47.
866        assert_eq!(
867            &bytes[68..72],
868            &GPKG_MAGIC.to_be_bytes(),
869            "GPKG magic must be stored big-endian at file offset 68"
870        );
871        assert_eq!(
872            u32::from_be_bytes([bytes[68], bytes[69], bytes[70], bytes[71]]),
873            0x4750_4B47,
874            "application_id bytes must decode to 0x47504B47"
875        );
876
877        // user_version at offset [60..64], big-endian.
878        assert_eq!(
879            &bytes[60..64],
880            &USER_VERSION.to_be_bytes(),
881            "user_version must be stored big-endian at file offset 60"
882        );
883
884        // The value is also readable through PRAGMA after reopen.
885        let db = Builder::new_local(db_path).build().await?;
886        let conn = db.connect()?;
887        assert_eq!(
888            query_scalar_i64(&conn, "PRAGMA application_id;").await?,
889            GPKG_MAGIC as i64
890        );
891        assert_eq!(
892            query_scalar_i64(&conn, "PRAGMA user_version;").await?,
893            USER_VERSION as i64
894        );
895
896        Ok(())
897    }
898
899    // ------------------------------------------------------------------
900    // A2: INSERT OR IGNORE
901    // ------------------------------------------------------------------
902
903    #[tokio::test]
904    async fn test_insert_or_ignore_rowid_conflict_skipped() -> Result<()> {
905        let db = Builder::new_local(":memory:").build().await?;
906        let conn = db.connect()?;
907        conn.execute(
908            "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
909            (),
910        )
911        .await?;
912        conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
913            .await?;
914
915        // Conflicting rowid is silently ignored, not an error.
916        conn.execute("INSERT OR IGNORE INTO t (id, name) VALUES (1, 'Bob');", ())
917            .await?;
918
919        // Original row is untouched.
920        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
921        let mut rows = conn.query("SELECT name FROM t WHERE id = 1;", ()).await?;
922        let row = rows.next().await?.expect("row");
923        assert_eq!(row.get_value(0)?, Value::Text("Alice".to_string()));
924
925        Ok(())
926    }
927
928    #[tokio::test]
929    async fn test_insert_or_ignore_multi_row_other_rows_land() -> Result<()> {
930        let db = Builder::new_local(":memory:").build().await?;
931        let conn = db.connect()?;
932        conn.execute(
933            "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
934            (),
935        )
936        .await?;
937        conn.execute("INSERT INTO t (id, name) VALUES (2, 'Two');", ())
938            .await?;
939
940        // Multi-row INSERT OR IGNORE: row id=2 conflicts and is skipped, but ids
941        // 1 and 3 must still land.
942        conn.execute(
943            "INSERT OR IGNORE INTO t (id, name) VALUES (1, 'One'), (2, 'Dup'), (3, 'Three');",
944            (),
945        )
946        .await?;
947
948        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 3);
949        // The conflicting row keeps its original value.
950        let mut rows = conn.query("SELECT name FROM t WHERE id = 2;", ()).await?;
951        assert_eq!(
952            rows.next().await?.expect("row").get_value(0)?,
953            Value::Text("Two".to_string())
954        );
955        // The non-conflicting rows are present.
956        let mut rows = conn.query("SELECT id FROM t ORDER BY id;", ()).await?;
957        assert_eq!(
958            rows.next().await?.expect("row").get_value(0)?,
959            Value::Integer(1)
960        );
961        assert_eq!(
962            rows.next().await?.expect("row").get_value(0)?,
963            Value::Integer(2)
964        );
965        assert_eq!(
966            rows.next().await?.expect("row").get_value(0)?,
967            Value::Integer(3)
968        );
969
970        Ok(())
971    }
972
973    #[cfg(feature = "index_experimental")]
974    #[tokio::test]
975    async fn test_insert_or_ignore_unique_index_conflict_skipped() -> Result<()> {
976        let db = Builder::new_local(":memory:").build().await?;
977        let conn = db.connect()?;
978        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT);", ())
979            .await?;
980        conn.execute("CREATE UNIQUE INDEX idx_email ON t (email);", ())
981            .await?;
982        conn.execute("INSERT INTO t (id, email) VALUES (1, 'a@example.com');", ())
983            .await?;
984
985        // Different rowid but conflicting unique-index value -> skipped, no error
986        // and crucially no partial index/table state.
987        conn.execute(
988            "INSERT OR IGNORE INTO t (id, email) VALUES (2, 'a@example.com');",
989            (),
990        )
991        .await?;
992
993        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
994        // Row id=2 must NOT exist.
995        let mut rows = conn.query("SELECT id FROM t ORDER BY id;", ()).await?;
996        assert_eq!(
997            rows.next().await?.expect("row").get_value(0)?,
998            Value::Integer(1)
999        );
1000        assert!(rows.next().await?.is_none());
1001
1002        Ok(())
1003    }
1004
1005    // ------------------------------------------------------------------
1006    // A3: INSERT OR REPLACE
1007    // ------------------------------------------------------------------
1008
1009    #[tokio::test]
1010    async fn test_insert_or_replace_rowid_conflict_replaces() -> Result<()> {
1011        let db = Builder::new_local(":memory:").build().await?;
1012        let conn = db.connect()?;
1013        conn.execute(
1014            "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
1015            (),
1016        )
1017        .await?;
1018        conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
1019            .await?;
1020
1021        // Same rowid -> old row replaced by new one.
1022        conn.execute("INSERT OR REPLACE INTO t (id, name) VALUES (1, 'Bob');", ())
1023            .await?;
1024
1025        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1026        let mut rows = conn.query("SELECT name FROM t WHERE id = 1;", ()).await?;
1027        assert_eq!(
1028            rows.next().await?.expect("row").get_value(0)?,
1029            Value::Text("Bob".to_string())
1030        );
1031
1032        Ok(())
1033    }
1034
1035    #[tokio::test]
1036    async fn test_insert_or_replace_multi_row_conflict_with_prior_row() -> Result<()> {
1037        let db = Builder::new_local(":memory:").build().await?;
1038        let conn = db.connect()?;
1039        conn.execute(
1040            "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
1041            (),
1042        )
1043        .await?;
1044
1045        // Row N (id=1, 'Second') conflicts with just-inserted row N-1 (id=1,
1046        // 'First') within the same multi-row statement -> the later one wins.
1047        conn.execute(
1048            "INSERT OR REPLACE INTO t (id, name) VALUES (1, 'First'), (1, 'Second'), (2, 'Other');",
1049            (),
1050        )
1051        .await?;
1052
1053        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 2);
1054        let mut rows = conn.query("SELECT name FROM t WHERE id = 1;", ()).await?;
1055        assert_eq!(
1056            rows.next().await?.expect("row").get_value(0)?,
1057            Value::Text("Second".to_string())
1058        );
1059
1060        Ok(())
1061    }
1062
1063    #[cfg(feature = "index_experimental")]
1064    #[tokio::test]
1065    async fn test_insert_or_replace_single_unique_index_conflict() -> Result<()> {
1066        let db = Builder::new_local(":memory:").build().await?;
1067        let conn = db.connect()?;
1068        conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT);", ())
1069            .await?;
1070        conn.execute("CREATE UNIQUE INDEX idx_email ON t (email);", ())
1071            .await?;
1072        conn.execute("INSERT INTO t (id, email) VALUES (1, 'a@example.com');", ())
1073            .await?;
1074
1075        // New rowid (2) but conflicting unique-index value -> the victim (id=1)
1076        // is deleted and replaced by the new row (id=2).
1077        conn.execute(
1078            "INSERT OR REPLACE INTO t (id, email) VALUES (2, 'a@example.com');",
1079            (),
1080        )
1081        .await?;
1082
1083        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1084        // Only id=2 remains, and the unique index still resolves it.
1085        let mut rows = conn
1086            .query("SELECT id FROM t WHERE email = 'a@example.com';", ())
1087            .await?;
1088        assert_eq!(
1089            rows.next().await?.expect("row").get_value(0)?,
1090            Value::Integer(2)
1091        );
1092        assert!(rows.next().await?.is_none());
1093
1094        Ok(())
1095    }
1096
1097    #[cfg(feature = "index_experimental")]
1098    #[tokio::test]
1099    async fn test_insert_or_replace_multiple_unique_indexes_different_victims() -> Result<()> {
1100        // SQLite OR REPLACE semantics: a new row that conflicts on MULTIPLE
1101        // unique indexes pointing at DIFFERENT existing rows must delete EVERY
1102        // victim, leaving exactly the new row.
1103        let db = Builder::new_local(":memory:").build().await?;
1104        let conn = db.connect()?;
1105        conn.execute(
1106            "CREATE TABLE t (id INTEGER PRIMARY KEY, a TEXT, b TEXT);",
1107            (),
1108        )
1109        .await?;
1110        conn.execute("CREATE UNIQUE INDEX idx_a ON t (a);", ())
1111            .await?;
1112        conn.execute("CREATE UNIQUE INDEX idx_b ON t (b);", ())
1113            .await?;
1114
1115        // Two distinct existing rows; the new row collides with row 1 on column a
1116        // and with row 2 on column b.
1117        conn.execute("INSERT INTO t (id, a, b) VALUES (1, 'A1', 'B1');", ())
1118            .await?;
1119        conn.execute("INSERT INTO t (id, a, b) VALUES (2, 'A2', 'B2');", ())
1120            .await?;
1121
1122        conn.execute(
1123            "INSERT OR REPLACE INTO t (id, a, b) VALUES (3, 'A1', 'B2');",
1124            (),
1125        )
1126        .await?;
1127
1128        // Both victims (id=1 and id=2) are gone; exactly the new row remains.
1129        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1130        let mut rows = conn.query("SELECT id, a, b FROM t;", ()).await?;
1131        let row = rows.next().await?.expect("row");
1132        assert_eq!(row.get_value(0)?, Value::Integer(3));
1133        assert_eq!(row.get_value(1)?, Value::Text("A1".to_string()));
1134        assert_eq!(row.get_value(2)?, Value::Text("B2".to_string()));
1135        assert!(rows.next().await?.is_none());
1136
1137        // Indexes resolve only the surviving row.
1138        assert_eq!(
1139            query_scalar_i64(&conn, "SELECT id FROM t WHERE a = 'A1';").await?,
1140            3
1141        );
1142        assert_eq!(
1143            query_scalar_i64(&conn, "SELECT id FROM t WHERE b = 'B2';").await?,
1144            3
1145        );
1146
1147        Ok(())
1148    }
1149
1150    // ------------------------------------------------------------------
1151    // Regression: plain INSERT conflict must still error (no Halt regression).
1152    // ------------------------------------------------------------------
1153
1154    #[tokio::test]
1155    async fn test_plain_insert_rowid_conflict_still_errors() -> Result<()> {
1156        let db = Builder::new_local(":memory:").build().await?;
1157        let conn = db.connect()?;
1158        conn.execute(
1159            "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
1160            (),
1161        )
1162        .await?;
1163        conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
1164            .await?;
1165
1166        // A plain INSERT (no OR clause) on a duplicate rowid must still fail.
1167        let result = conn
1168            .execute("INSERT INTO t (id, name) VALUES (1, 'Bob');", ())
1169            .await;
1170        assert!(
1171            result.is_err(),
1172            "plain INSERT on duplicate PRIMARY KEY must error, got Ok"
1173        );
1174
1175        // The original row is intact and no second row was written.
1176        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1177
1178        Ok(())
1179    }
1180
1181    // ------------------------------------------------------------------
1182    // Regression: orphaned-WAL row duplication.
1183    //
1184    // A previous session leaves a populated `-wal` behind; the main `.db`
1185    // file is then deleted (or otherwise recreated empty) while the `-wal`
1186    // survives. On reopen the engine must NOT replay that orphaned WAL — doing
1187    // so resurrects the previous session's committed pages on top of the fresh
1188    // database, so every row count grows by the stale content on each reopen.
1189    //
1190    // This mirrors the downstream `oxiaero-ros2` rosbag2 roundtrip failure:
1191    // AUTOINCREMENT PRIMARY KEY + two NON-UNIQUE secondary indexes + a BLOB
1192    // column, two single-row INSERTs, then a `SELECT ... ORDER BY <indexed col>`
1193    // read-back that returned 4 (then 6, 8, ...) rows instead of 2 because the
1194    // index-driven scan walked the resurrected + new index entries.
1195    //
1196    // Index maintenance only runs under `index_experimental` (a plain INSERT
1197    // into an indexed table is rejected without it — exactly the feature the
1198    // `oxisql-sqlite-compat` consumer enables), so these tests are gated on it.
1199    // ------------------------------------------------------------------
1200
1201    /// Count rows returned by `sql` by draining the cursor (works for both
1202    /// table scans and index-driven scans like `ORDER BY <indexed column>`).
1203    #[cfg(feature = "index_experimental")]
1204    async fn query_row_count(conn: &Connection, sql: &str) -> Result<i64> {
1205        let mut rows = conn.query(sql, ()).await?;
1206        let mut n = 0i64;
1207        while rows.next().await?.is_some() {
1208            n += 1;
1209        }
1210        Ok(n)
1211    }
1212
1213    /// Apply the exact consumer schema (idempotent — `IF NOT EXISTS`) to `conn`.
1214    #[cfg(feature = "index_experimental")]
1215    async fn create_messages_schema(conn: &Connection) -> Result<()> {
1216        conn.execute(
1217            "CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY AUTOINCREMENT, topic TEXT NOT NULL, timestamp INTEGER NOT NULL, data BLOB NOT NULL);",
1218            (),
1219        )
1220        .await?;
1221        conn.execute(
1222            "CREATE INDEX IF NOT EXISTS idx_messages_topic ON messages (topic);",
1223            (),
1224        )
1225        .await?;
1226        conn.execute(
1227            "CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages (timestamp);",
1228            (),
1229        )
1230        .await?;
1231        Ok(())
1232    }
1233
1234    /// Insert one message row via positional parameters (mirrors the consumer's
1235    /// `INSERT INTO messages (topic, timestamp, data) VALUES (?, ?, ?)`).
1236    #[cfg(feature = "index_experimental")]
1237    async fn insert_message(
1238        conn: &Connection,
1239        topic: &str,
1240        timestamp: i64,
1241        data: Vec<u8>,
1242    ) -> Result<()> {
1243        conn.execute(
1244            "INSERT INTO messages (topic, timestamp, data) VALUES (?, ?, ?);",
1245            params::Params::Positional(vec![
1246                Value::Text(topic.to_string()),
1247                Value::Integer(timestamp),
1248                Value::Blob(data),
1249            ]),
1250        )
1251        .await?;
1252        Ok(())
1253    }
1254
1255    /// The full downstream consumer reproduction: write 2 rows to a file-backed
1256    /// DB, delete ONLY the main `.db` (leaving the populated `-wal`, exactly what
1257    /// the consumer's test harness does between runs), recreate + write 2 rows
1258    /// again, then read back. Must be exactly 2 rows — both via a plain table
1259    /// scan AND via the consumer's `ORDER BY timestamp` (index-driven) read — and
1260    /// must not accumulate across repeated cycles.
1261    #[cfg(feature = "index_experimental")]
1262    #[tokio::test]
1263    async fn test_orphaned_wal_does_not_duplicate_rows_two_indexes() -> Result<()> {
1264        let dir = std::env::temp_dir().join(format!(
1265            "oxisqlite_orphan_wal_{}_{:?}",
1266            std::process::id(),
1267            std::thread::current().id()
1268        ));
1269        let _ = std::fs::remove_dir_all(&dir);
1270        std::fs::create_dir_all(&dir).expect("create temp dir");
1271        let db_path = dir.join("messages.db3");
1272        let p = db_path.to_str().expect("utf-8 path").to_string();
1273
1274        // One write-then-readback cycle that recreates the DB while leaving any
1275        // pre-existing `-wal` in place.
1276        async fn cycle(p: &str) -> Result<(i64, i64)> {
1277            // Recreate the main DB file but keep a stale `-wal` if present.
1278            let _ = std::fs::remove_file(p);
1279            {
1280                let db = Builder::new_local(p).build().await?;
1281                let conn = db.connect()?;
1282                create_messages_schema(&conn).await?;
1283                insert_message(&conn, "/imu", 1_000_000_000, vec![0xDE, 0xAD]).await?;
1284                insert_message(&conn, "/gps", 2_000_000_000, vec![0xBE, 0xEF]).await?;
1285            }
1286            let db = Builder::new_local(p).build().await?;
1287            let conn = db.connect()?;
1288            create_messages_schema(&conn).await?; // consumer re-runs schema on open
1289            let scan =
1290                query_row_count(&conn, "SELECT timestamp, topic, data FROM messages;").await?;
1291            let ordered = query_row_count(
1292                &conn,
1293                "SELECT timestamp, topic, data FROM messages ORDER BY timestamp;",
1294            )
1295            .await?;
1296            Ok((scan, ordered))
1297        }
1298
1299        // First cycle starts clean.
1300        let (scan1, ord1) = cycle(&p).await?;
1301        assert_eq!(scan1, 2, "cycle 1 table scan");
1302        assert_eq!(ord1, 2, "cycle 1 ORDER BY timestamp (index scan)");
1303
1304        // Subsequent cycles each find a populated stale `-wal`; the orphaned WAL
1305        // must be discarded, so counts stay at 2 (pre-fix they were 4, 6, ...).
1306        for c in 2..=3 {
1307            let (scan, ord) = cycle(&p).await?;
1308            assert_eq!(scan, 2, "cycle {c} table scan must stay 2");
1309            assert_eq!(ord, 2, "cycle {c} ORDER BY timestamp must stay 2");
1310        }
1311
1312        let _ = std::fs::remove_dir_all(&dir);
1313        Ok(())
1314    }
1315
1316    /// Consumer-equivalent roundtrip on a clean DB: AUTOINCREMENT + 2 non-unique
1317    /// indexes + BLOB, 2 writes -> exactly 2 rows, with correct column values via
1318    /// the index-driven `ORDER BY timestamp` read.
1319    #[cfg(feature = "index_experimental")]
1320    #[tokio::test]
1321    async fn test_two_non_unique_indexes_roundtrip_values() -> Result<()> {
1322        let db = Builder::new_local(":memory:").build().await?;
1323        let conn = db.connect()?;
1324        create_messages_schema(&conn).await?;
1325        insert_message(&conn, "/imu", 1_000_000_000, vec![0xDE, 0xAD]).await?;
1326        insert_message(&conn, "/gps", 2_000_000_000, vec![0xBE, 0xEF]).await?;
1327
1328        assert_eq!(
1329            query_scalar_i64(&conn, "SELECT count(*) FROM messages;").await?,
1330            2
1331        );
1332
1333        let mut rows = conn
1334            .query(
1335                "SELECT timestamp, topic, data FROM messages ORDER BY timestamp;",
1336                (),
1337            )
1338            .await?;
1339        let r0 = rows.next().await?.expect("row 0");
1340        assert_eq!(r0.get_value(0)?, Value::Integer(1_000_000_000));
1341        assert_eq!(r0.get_value(1)?, Value::Text("/imu".to_string()));
1342        assert_eq!(r0.get_value(2)?, Value::Blob(vec![0xDE, 0xAD]));
1343        let r1 = rows.next().await?.expect("row 1");
1344        assert_eq!(r1.get_value(0)?, Value::Integer(2_000_000_000));
1345        assert_eq!(r1.get_value(1)?, Value::Text("/gps".to_string()));
1346        assert_eq!(r1.get_value(2)?, Value::Blob(vec![0xBE, 0xEF]));
1347        assert!(rows.next().await?.is_none(), "exactly two rows");
1348
1349        Ok(())
1350    }
1351
1352    /// Index-count matrix: 0/1/2/3 NON-UNIQUE secondary indexes, two single-row
1353    /// INSERTs each -> exactly 2 rows, verified by BOTH a plain table scan and an
1354    /// index-driven `ORDER BY a` scan (which is what surfaces duplicate index
1355    /// entries).
1356    #[cfg(feature = "index_experimental")]
1357    #[tokio::test]
1358    async fn test_index_count_matrix_single_inserts() -> Result<()> {
1359        for n_idx in 0..=3usize {
1360            let db = Builder::new_local(":memory:").build().await?;
1361            let conn = db.connect()?;
1362            conn.execute(
1363                "CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, a INTEGER, b INTEGER, c INTEGER);",
1364                (),
1365            )
1366            .await?;
1367            let cols = ["a", "b", "c"];
1368            for col in cols.iter().take(n_idx) {
1369                conn.execute(&format!("CREATE INDEX idx_{col} ON t ({col});"), ())
1370                    .await?;
1371            }
1372            conn.execute("INSERT INTO t (a, b, c) VALUES (1, 1, 1);", ())
1373                .await?;
1374            conn.execute("INSERT INTO t (a, b, c) VALUES (2, 2, 2);", ())
1375                .await?;
1376
1377            assert_eq!(
1378                query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?,
1379                2,
1380                "n_idx={n_idx}: count(*)"
1381            );
1382            assert_eq!(
1383                query_row_count(&conn, "SELECT a FROM t;").await?,
1384                2,
1385                "n_idx={n_idx}: table scan"
1386            );
1387            assert_eq!(
1388                query_row_count(&conn, "SELECT a FROM t ORDER BY a;").await?,
1389                2,
1390                "n_idx={n_idx}: ORDER BY a (index scan)"
1391            );
1392        }
1393        Ok(())
1394    }
1395
1396    /// Multi-row `INSERT ... VALUES (..),(..),(..)` into a table with two
1397    /// non-unique indexes -> exactly 3 rows (table scan and index scan agree).
1398    #[cfg(feature = "index_experimental")]
1399    #[tokio::test]
1400    async fn test_multi_row_insert_two_indexes() -> Result<()> {
1401        let db = Builder::new_local(":memory:").build().await?;
1402        let conn = db.connect()?;
1403        conn.execute(
1404            "CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, a INTEGER, b INTEGER);",
1405            (),
1406        )
1407        .await?;
1408        conn.execute("CREATE INDEX idx_a ON t (a);", ()).await?;
1409        conn.execute("CREATE INDEX idx_b ON t (b);", ()).await?;
1410        conn.execute("INSERT INTO t (a, b) VALUES (1, 10), (2, 20), (3, 30);", ())
1411            .await?;
1412
1413        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 3);
1414        assert_eq!(query_row_count(&conn, "SELECT a FROM t;").await?, 3);
1415        assert_eq!(
1416            query_row_count(&conn, "SELECT a FROM t ORDER BY a;").await?,
1417            3
1418        );
1419        assert_eq!(
1420            query_row_count(&conn, "SELECT b FROM t ORDER BY b;").await?,
1421            3
1422        );
1423        Ok(())
1424    }
1425
1426    /// `INSERT OR IGNORE` into a table with two non-unique indexes: a plain
1427    /// (non-unique) secondary index never causes a conflict, so all rows land
1428    /// exactly once.
1429    #[cfg(feature = "index_experimental")]
1430    #[tokio::test]
1431    async fn test_insert_or_ignore_two_non_unique_indexes() -> Result<()> {
1432        let db = Builder::new_local(":memory:").build().await?;
1433        let conn = db.connect()?;
1434        conn.execute(
1435            "CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, a INTEGER, b INTEGER);",
1436            (),
1437        )
1438        .await?;
1439        conn.execute("CREATE INDEX idx_a ON t (a);", ()).await?;
1440        conn.execute("CREATE INDEX idx_b ON t (b);", ()).await?;
1441
1442        // Duplicate (a,b) values are fine for non-unique indexes; nothing is
1443        // ignored and nothing is duplicated.
1444        conn.execute("INSERT OR IGNORE INTO t (a, b) VALUES (1, 10);", ())
1445            .await?;
1446        conn.execute("INSERT OR IGNORE INTO t (a, b) VALUES (1, 10);", ())
1447            .await?;
1448
1449        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 2);
1450        assert_eq!(
1451            query_row_count(&conn, "SELECT a FROM t ORDER BY a;").await?,
1452            2
1453        );
1454        Ok(())
1455    }
1456
1457    /// `INSERT OR REPLACE` into a table that has both a UNIQUE index and a
1458    /// secondary NON-UNIQUE index: replacing on the unique-index conflict must
1459    /// delete the victim's entry from EVERY index, leaving exactly one row and
1460    /// no duplicate index entries.
1461    #[cfg(feature = "index_experimental")]
1462    #[tokio::test]
1463    async fn test_insert_or_replace_unique_plus_non_unique_index() -> Result<()> {
1464        let db = Builder::new_local(":memory:").build().await?;
1465        let conn = db.connect()?;
1466        conn.execute(
1467            "CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT, tag INTEGER);",
1468            (),
1469        )
1470        .await?;
1471        conn.execute("CREATE UNIQUE INDEX idx_email ON t (email);", ())
1472            .await?;
1473        conn.execute("CREATE INDEX idx_tag ON t (tag);", ()).await?;
1474        conn.execute(
1475            "INSERT INTO t (id, email, tag) VALUES (1, 'a@example.com', 7);",
1476            (),
1477        )
1478        .await?;
1479
1480        // New rowid, same unique email -> victim id=1 replaced by id=2.
1481        conn.execute(
1482            "INSERT OR REPLACE INTO t (id, email, tag) VALUES (2, 'a@example.com', 9);",
1483            (),
1484        )
1485        .await?;
1486
1487        assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1488        // The non-unique secondary index must resolve only the surviving row
1489        // (no orphaned victim entry left behind).
1490        assert_eq!(
1491            query_row_count(&conn, "SELECT id FROM t ORDER BY tag;").await?,
1492            1
1493        );
1494        assert_eq!(
1495            query_scalar_i64(&conn, "SELECT id FROM t WHERE email = 'a@example.com';").await?,
1496            2
1497        );
1498        assert_eq!(
1499            query_scalar_i64(&conn, "SELECT id FROM t WHERE tag = 9;").await?,
1500            2
1501        );
1502        // The old tag value must no longer resolve any row.
1503        assert_eq!(
1504            query_row_count(&conn, "SELECT id FROM t WHERE tag = 7;").await?,
1505            0
1506        );
1507        Ok(())
1508    }
1509}