1#![allow(
3 rustdoc::bare_urls,
4 rustdoc::invalid_html_tags,
5 rustdoc::broken_intra_doc_links
6)]
7#![allow(
8 clippy::collapsible_match,
9 clippy::doc_overindented_list_items,
10 clippy::from_over_into
11)]
12
13pub mod params;
14pub mod value;
15
16pub use value::Value;
17
18pub use params::params_from_iter;
19
20use crate::params::*;
21use std::fmt::Debug;
22use std::num::NonZero;
23use std::sync::{Arc, Mutex};
24
25#[derive(Debug, thiserror::Error)]
26pub enum Error {
27 #[error("SQL conversion failure: `{0}`")]
28 ToSqlConversionFailure(BoxError),
29 #[error("Mutex lock error: {0}")]
30 MutexError(String),
31 #[error("SQL execution failure: `{0}`")]
32 SqlExecutionFailure(String),
33 #[error("database schema has changed")]
36 SchemaChanged,
37}
38
39impl Error {
40 pub fn is_schema_changed(&self) -> bool {
44 matches!(self, Error::SchemaChanged)
45 }
46}
47
48impl From<limbo_core::LimboError> for Error {
49 fn from(err: limbo_core::LimboError) -> Self {
50 match err {
51 limbo_core::LimboError::SchemaChanged => Error::SchemaChanged,
52 other => Error::SqlExecutionFailure(other.to_string()),
53 }
54 }
55}
56
57pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>;
58
59pub type Result<T> = std::result::Result<T, Error>;
60pub struct Builder {
61 path: String,
62}
63
64impl Builder {
65 pub fn new_local(path: &str) -> Self {
66 Self {
67 path: path.to_string(),
68 }
69 }
70
71 #[allow(unused_variables, clippy::arc_with_non_send_sync)]
72 pub async fn build(self) -> Result<Database> {
73 match self.path.as_str() {
74 ":memory:" => {
75 let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::MemoryIO::new());
76 let db = limbo_core::Database::open_file(io, self.path.as_str(), false)?;
77 Ok(Database { inner: db })
78 }
79 path => {
80 let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new()?);
81 let db = limbo_core::Database::open_file(io, path, false)?;
82 Ok(Database { inner: db })
83 }
84 }
85 }
86}
87
88#[derive(Clone)]
89pub struct Database {
90 inner: Arc<limbo_core::Database>,
91}
92
93unsafe impl Send for Database {}
94unsafe impl Sync for Database {}
95
96impl Debug for Database {
97 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
98 f.debug_struct("Database").finish()
99 }
100}
101
102impl Database {
103 pub fn connect(&self) -> Result<Connection> {
104 let conn = self.inner.connect()?;
105 #[allow(clippy::arc_with_non_send_sync)]
106 let connection = Connection {
107 inner: Arc::new(Mutex::new(conn)),
108 };
109 Ok(connection)
110 }
111}
112
113pub struct Connection {
114 inner: Arc<Mutex<Arc<limbo_core::Connection>>>,
115}
116
117impl Clone for Connection {
118 fn clone(&self) -> Self {
119 Self {
120 inner: Arc::clone(&self.inner),
121 }
122 }
123}
124
125unsafe impl Send for Connection {}
126unsafe impl Sync for Connection {}
127
128impl Connection {
129 pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
130 let mut stmt = self.prepare(sql).await?;
131 stmt.query(params).await
132 }
133
134 pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<u64> {
135 let mut stmt = self.prepare(sql).await?;
136 stmt.execute(params).await
137 }
138
139 pub async fn prepare(&self, sql: &str) -> Result<Statement> {
140 let conn = self
141 .inner
142 .lock()
143 .map_err(|e| Error::MutexError(e.to_string()))?;
144
145 let stmt = conn.prepare(sql)?;
146
147 #[allow(clippy::arc_with_non_send_sync)]
148 let statement = Statement {
149 inner: Arc::new(Mutex::new(stmt)),
150 };
151 Ok(statement)
152 }
153
154 pub fn changes(&self) -> Result<i64> {
158 let conn = self
159 .inner
160 .lock()
161 .map_err(|e| Error::MutexError(e.to_string()))?;
162 Ok(conn.changes())
163 }
164
165 pub fn pragma_query<F>(&self, pragma_name: &str, mut f: F) -> Result<()>
166 where
167 F: FnMut(&Row) -> limbo_core::Result<()>,
168 {
169 let conn = self
170 .inner
171 .lock()
172 .map_err(|e| Error::MutexError(e.to_string()))?;
173
174 let rows: Vec<Row> = conn
175 .pragma_query(pragma_name)
176 .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?
177 .iter()
178 .map(|row| row.iter().collect::<Row>())
179 .collect();
180
181 rows.iter().try_for_each(|row| {
182 f(row).map_err(|e| {
183 Error::SqlExecutionFailure(format!("Error executing user defined function: {}", e))
184 })
185 })?;
186 Ok(())
187 }
188}
189
190pub struct Statement {
191 inner: Arc<Mutex<limbo_core::Statement>>,
192}
193
194impl Clone for Statement {
195 fn clone(&self) -> Self {
196 Self {
197 inner: Arc::clone(&self.inner),
198 }
199 }
200}
201
202unsafe impl Send for Statement {}
203unsafe impl Sync for Statement {}
204
205impl Statement {
206 pub async fn query(&mut self, params: impl IntoParams) -> Result<Rows> {
207 let params = params.into_params()?;
208 match params {
209 params::Params::None => (),
210 params::Params::Positional(values) => {
211 for (i, value) in values.into_iter().enumerate() {
212 let mut stmt = self
213 .inner
214 .lock()
215 .map_err(|e| Error::MutexError(e.to_string()))?;
216 if let Some(idx) = NonZero::new(i + 1) {
217 stmt.bind_at(idx, value.into());
218 }
219 }
220 }
221 params::Params::Named(_items) => todo!(),
222 }
223 #[allow(clippy::arc_with_non_send_sync)]
224 let rows = Rows {
225 inner: Arc::clone(&self.inner),
226 };
227 Ok(rows)
228 }
229
230 pub async fn execute(&mut self, params: impl IntoParams) -> Result<u64> {
231 {
232 self.inner
234 .lock()
235 .map_err(|e| Error::MutexError(e.to_string()))?
236 .reset();
237 }
238 let params = params.into_params()?;
239 match params {
240 params::Params::None => (),
241 params::Params::Positional(values) => {
242 for (i, value) in values.into_iter().enumerate() {
243 let mut stmt = self
244 .inner
245 .lock()
246 .map_err(|e| Error::MutexError(e.to_string()))?;
247 if let Some(idx) = NonZero::new(i + 1) {
248 stmt.bind_at(idx, value.into());
249 }
250 }
251 }
252 params::Params::Named(_items) => todo!(),
253 }
254 loop {
255 let mut stmt = self
256 .inner
257 .lock()
258 .map_err(|e| Error::MutexError(e.to_string()))?;
259 match stmt.step() {
260 Ok(limbo_core::StepResult::Row) => {
261 return Ok(2);
263 }
264 Ok(limbo_core::StepResult::Done) => {
265 return Ok(0);
266 }
267 Ok(limbo_core::StepResult::IO) => {
268 let _ = stmt.run_once();
269 }
271 Ok(limbo_core::StepResult::Busy) => {
272 return Ok(4);
273 }
274 Ok(limbo_core::StepResult::Interrupt) => {
275 return Ok(3);
276 }
277 Err(err) => {
278 return Err(err.into());
279 }
280 }
281 }
282 }
283
284 pub fn columns(&self) -> Vec<Column> {
285 let Ok(stmt) = self.inner.lock() else {
286 return Vec::new();
287 };
288
289 let n = stmt.num_columns();
290
291 let mut cols = Vec::with_capacity(n);
292
293 for i in 0..n {
294 let name = stmt.get_column_name(i).into_owned();
295 let decl_type = stmt.get_column_decl_type(i).map(|s| s.into_owned());
296 cols.push(Column { name, decl_type });
297 }
298
299 cols
300 }
301}
302
303pub struct Column {
304 name: String,
305 decl_type: Option<String>,
306}
307
308impl Column {
309 pub fn name(&self) -> &str {
310 &self.name
311 }
312
313 pub fn decl_type(&self) -> Option<&str> {
314 self.decl_type.as_deref()
315 }
316}
317
318pub trait IntoValue {
319 fn into_value(self) -> Result<Value>;
320}
321
322#[derive(Debug, Clone)]
323pub enum Params {
324 None,
325 Positional(Vec<Value>),
326 Named(Vec<(String, Value)>),
327}
328pub struct Transaction {}
329
330pub struct Rows {
331 inner: Arc<Mutex<limbo_core::Statement>>,
332}
333
334impl Clone for Rows {
335 fn clone(&self) -> Self {
336 Self {
337 inner: Arc::clone(&self.inner),
338 }
339 }
340}
341
342unsafe impl Send for Rows {}
343unsafe impl Sync for Rows {}
344
345impl Rows {
346 pub async fn next(&mut self) -> Result<Option<Row>> {
347 loop {
348 let mut stmt = self
349 .inner
350 .lock()
351 .map_err(|e| Error::MutexError(e.to_string()))?;
352 match stmt.step() {
353 Ok(limbo_core::StepResult::Row) => {
354 let row = stmt.row().ok_or_else(|| {
355 Error::SqlExecutionFailure(
356 "row unavailable after Row step result".to_string(),
357 )
358 })?;
359 return Ok(Some(Row {
360 values: row.get_values().map(|v| v.to_owned()).collect(),
361 }));
362 }
363 Ok(limbo_core::StepResult::Done) => return Ok(None),
364 Ok(limbo_core::StepResult::IO) => {
365 if let Err(e) = stmt.run_once() {
366 return Err(e.into());
367 }
368 continue;
369 }
370 Ok(limbo_core::StepResult::Busy) => return Ok(None),
371 Ok(limbo_core::StepResult::Interrupt) => return Ok(None),
372 _ => return Ok(None),
373 }
374 }
375 }
376}
377
378#[derive(Debug)]
379pub struct Row {
380 values: Vec<limbo_core::Value>,
381}
382
383unsafe impl Send for Row {}
384unsafe impl Sync for Row {}
385
386impl Row {
387 pub fn get_value(&self, index: usize) -> Result<Value> {
388 let value = &self.values[index];
389 match value {
390 limbo_core::Value::Integer(i) => Ok(Value::Integer(*i)),
391 limbo_core::Value::Null => Ok(Value::Null),
392 limbo_core::Value::Float(f) => Ok(Value::Real(*f)),
393 limbo_core::Value::Text(text) => Ok(Value::Text(text.to_string())),
394 limbo_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())),
395 }
396 }
397
398 pub fn column_count(&self) -> usize {
399 self.values.len()
400 }
401}
402
403impl<'a> FromIterator<&'a limbo_core::Value> for Row {
404 fn from_iter<T: IntoIterator<Item = &'a limbo_core::Value>>(iter: T) -> Self {
405 let values = iter
406 .into_iter()
407 .map(|v| match v {
408 limbo_core::Value::Integer(i) => limbo_core::Value::Integer(*i),
409 limbo_core::Value::Null => limbo_core::Value::Null,
410 limbo_core::Value::Float(f) => limbo_core::Value::Float(*f),
411 limbo_core::Value::Text(s) => limbo_core::Value::Text(s.clone()),
412 limbo_core::Value::Blob(b) => limbo_core::Value::Blob(b.clone()),
413 })
414 .collect();
415
416 Row { values }
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423 use tempfile::NamedTempFile;
424
425 #[tokio::test]
426 async fn test_database_persistence() -> Result<()> {
427 let temp_file = NamedTempFile::new().unwrap();
428 let db_path = temp_file.path().to_str().unwrap();
429
430 {
432 let db = Builder::new_local(db_path).build().await?;
433 let conn = db.connect()?;
434 conn.execute(
435 "CREATE TABLE test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
436 (),
437 )
438 .await?;
439 conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
440 .await?;
441 conn.execute("INSERT INTO test_persistence (name) VALUES ('Bob');", ())
442 .await?;
443 } let db = Builder::new_local(db_path).build().await?;
447 let conn = db.connect()?;
448
449 let mut rows = conn
450 .query("SELECT name FROM test_persistence ORDER BY id;", ())
451 .await?;
452
453 let row1 = rows.next().await?.expect("Expected first row");
454 assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string()));
455
456 let row2 = rows.next().await?.expect("Expected second row");
457 assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string()));
458
459 assert!(rows.next().await?.is_none(), "Expected no more rows");
460
461 Ok(())
462 }
463
464 #[tokio::test]
465 async fn test_database_persistence_many_frames() -> Result<()> {
466 let temp_file = NamedTempFile::new().unwrap();
467 let db_path = temp_file.path().to_str().unwrap();
468
469 const NUM_INSERTS: usize = 100;
470 const TARGET_STRING_LEN: usize = 1024; let mut original_data = Vec::with_capacity(NUM_INSERTS);
473 for i in 0..NUM_INSERTS {
474 let prefix = format!("test_string_{:04}_", i);
475 let padding_len = TARGET_STRING_LEN.saturating_sub(prefix.len());
476 let padding: String = "A".repeat(padding_len);
477 original_data.push(format!("{}{}", prefix, padding));
478 }
479
480 {
482 let db = Builder::new_local(db_path).build().await?;
483 let conn = db.connect()?;
484 conn.execute(
485 "CREATE TABLE test_large_persistence (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT NOT NULL);",
486 (),
487 )
488 .await?;
489
490 for data_val in &original_data {
491 conn.execute(
492 "INSERT INTO test_large_persistence (data) VALUES (?);",
493 params::Params::Positional(vec![Value::Text(data_val.clone())]),
494 )
495 .await?;
496 }
497 } let db = Builder::new_local(db_path).build().await?;
501 let conn = db.connect()?;
502
503 let mut rows = conn
504 .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
505 .await?;
506
507 for (i, expected) in original_data.iter().enumerate().take(NUM_INSERTS) {
508 let row = rows
509 .next()
510 .await?
511 .unwrap_or_else(|| panic!("Expected row {} but found None", i));
512 assert_eq!(
513 row.get_value(0)?,
514 Value::Text(expected.clone()),
515 "Mismatch in retrieved data for row {}",
516 i
517 );
518 }
519
520 assert!(
521 rows.next().await?.is_none(),
522 "Expected no more rows after retrieving all inserted data"
523 );
524
525 let wal_path = format!("{}-wal", db_path);
527 std::fs::remove_file(&wal_path)
528 .map_err(|e| eprintln!("Warning: Failed to delete WAL file for test: {}", e))
529 .unwrap();
530
531 let db_after_wal_delete = Builder::new_local(db_path).build().await?;
537 let conn_after_wal_delete = db_after_wal_delete.connect()?;
538
539 let mut rows_after_wal_delete = conn_after_wal_delete
540 .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
541 .await?;
542
543 for (i, expected) in original_data.iter().enumerate().take(NUM_INSERTS) {
544 let row = rows_after_wal_delete.next().await?.unwrap_or_else(|| {
545 panic!(
546 "Expected row {} after WAL deletion but found None; \
547 checkpoint-on-close should have persisted it into the main DB",
548 i
549 )
550 });
551 assert_eq!(
552 row.get_value(0)?,
553 Value::Text(expected.clone()),
554 "Mismatch in retrieved data for row {} after WAL deletion",
555 i
556 );
557 }
558
559 assert!(
560 rows_after_wal_delete.next().await?.is_none(),
561 "Expected no more rows after WAL deletion once all checkpointed data was retrieved"
562 );
563
564 Ok(())
565 }
566
567 #[tokio::test]
568 async fn test_database_persistence_write_one_frame_many_times() -> Result<()> {
569 let temp_file = NamedTempFile::new().unwrap();
570 let db_path = temp_file.path().to_str().unwrap();
571
572 for i in 0..100 {
573 {
574 let db = Builder::new_local(db_path).build().await?;
575 let conn = db.connect()?;
576
577 conn.execute("CREATE TABLE IF NOT EXISTS test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);", ()).await?;
578 conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
579 .await?;
580 }
581 {
582 let db = Builder::new_local(db_path).build().await?;
583 let conn = db.connect()?;
584
585 let mut rows_iter = conn
586 .query("SELECT count(*) FROM test_persistence;", ())
587 .await?;
588 let rows = rows_iter.next().await?.unwrap();
589 assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1));
590 assert!(rows_iter.next().await?.is_none());
591 }
592 }
593
594 Ok(())
595 }
596
597 async fn query_scalar_i64(conn: &Connection, sql: &str) -> Result<i64> {
603 let mut rows = conn.query(sql, ()).await?;
604 let row = rows
605 .next()
606 .await?
607 .unwrap_or_else(|| panic!("expected a row from `{sql}`"));
608 match row.get_value(0)? {
609 Value::Integer(i) => Ok(i),
610 other => panic!("expected Integer from `{sql}`, got {other:?}"),
611 }
612 }
613
614 #[tokio::test]
615 async fn test_application_id_write_read_round_trip() -> Result<()> {
616 let db = Builder::new_local(":memory:").build().await?;
617 let conn = db.connect()?;
618
619 assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, 0);
621
622 conn.execute("PRAGMA application_id = 1196444487;", ())
624 .await?;
625 assert_eq!(
626 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
627 1196444487
628 );
629
630 conn.execute("PRAGMA application_id = 42;", ()).await?;
632 assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, 42);
633
634 Ok(())
635 }
636
637 #[tokio::test]
638 async fn test_application_id_negative_round_trip() -> Result<()> {
639 let db = Builder::new_local(":memory:").build().await?;
642 let conn = db.connect()?;
643
644 conn.execute("PRAGMA application_id = -1;", ()).await?;
645 assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, -1);
646
647 conn.execute("PRAGMA application_id = -2147483648;", ())
648 .await?;
649 assert_eq!(
650 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
651 -2147483648
652 );
653
654 Ok(())
655 }
656
657 struct TempDbPath {
663 path: std::path::PathBuf,
664 }
665
666 impl TempDbPath {
667 fn new(tag: &str) -> Self {
668 use std::sync::atomic::{AtomicU64, Ordering};
669 static COUNTER: AtomicU64 = AtomicU64::new(0);
670 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
671 let mut path = std::env::temp_dir();
672 path.push(format!(
673 "oxisqlite_dur_{}_{}_{}.db",
674 tag,
675 std::process::id(),
676 n
677 ));
678 let _ = std::fs::remove_file(&path);
680 let _ = std::fs::remove_file(format!("{}-wal", path.display()));
681 Self { path }
682 }
683
684 fn as_str(&self) -> &str {
685 self.path
686 .to_str()
687 .expect("temp db path is valid UTF-8 on the test platforms")
688 }
689 }
690
691 impl Drop for TempDbPath {
692 fn drop(&mut self) {
693 let _ = std::fs::remove_file(&self.path);
694 let _ = std::fs::remove_file(format!("{}-wal", self.path.display()));
695 }
696 }
697
698 #[tokio::test]
704 async fn test_application_id_persistence() -> Result<()> {
705 let temp = TempDbPath::new("app_id_persist");
706 let db_path = temp.as_str();
707
708 {
709 let db = Builder::new_local(db_path).build().await?;
710 let conn = db.connect()?;
711 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);", ())
712 .await?;
713 conn.execute("PRAGMA application_id = -12345;", ()).await?;
714
715 assert_eq!(
717 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
718 -12345
719 );
720 } let db = Builder::new_local(db_path).build().await?;
724 let conn = db.connect()?;
725 assert_eq!(
726 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
727 -12345,
728 "application_id must survive close/reopen"
729 );
730
731 Ok(())
732 }
733
734 #[tokio::test]
737 async fn test_application_id_durable_reopen() -> Result<()> {
738 let temp = TempDbPath::new("app_id_reopen");
739 let db_path = temp.as_str();
740
741 {
742 let db = Builder::new_local(db_path).build().await?;
743 let conn = db.connect()?;
744 conn.execute("PRAGMA application_id = 12345;", ()).await?;
745 assert_eq!(
746 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
747 12345
748 );
749 }
750
751 let db = Builder::new_local(db_path).build().await?;
752 let conn = db.connect()?;
753 assert_eq!(
754 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
755 12345,
756 "application_id = 12345 must survive close/reopen"
757 );
758
759 Ok(())
760 }
761
762 #[tokio::test]
765 async fn test_user_version_durable_reopen() -> Result<()> {
766 let temp = TempDbPath::new("user_version_reopen");
767 let db_path = temp.as_str();
768
769 {
770 let db = Builder::new_local(db_path).build().await?;
771 let conn = db.connect()?;
772 conn.execute("PRAGMA user_version = 12345;", ()).await?;
773 assert_eq!(
774 query_scalar_i64(&conn, "PRAGMA user_version;").await?,
775 12345
776 );
777 }
778
779 let db = Builder::new_local(db_path).build().await?;
780 let conn = db.connect()?;
781 assert_eq!(
782 query_scalar_i64(&conn, "PRAGMA user_version;").await?,
783 12345,
784 "user_version = 12345 must survive close/reopen"
785 );
786
787 Ok(())
788 }
789
790 #[tokio::test]
794 async fn test_application_id_negative_durable_reopen() -> Result<()> {
795 let temp = TempDbPath::new("app_id_negative_reopen");
796 let db_path = temp.as_str();
797
798 {
799 let db = Builder::new_local(db_path).build().await?;
800 let conn = db.connect()?;
801 conn.execute("PRAGMA application_id = -1;", ()).await?;
802 assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, -1);
803 }
804
805 let db = Builder::new_local(db_path).build().await?;
806 let conn = db.connect()?;
807 assert_eq!(
808 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
809 -1,
810 "application_id = -1 must survive close/reopen as the signed value -1"
811 );
812
813 let conn = db.connect()?;
816 let _ = conn.execute("PRAGMA wal_checkpoint;", ()).await;
817 drop(conn);
818 drop(db);
819 let bytes = std::fs::read(db_path).expect("read database file");
820 assert!(bytes.len() >= 72, "database file shorter than the header");
821 assert_eq!(
822 &bytes[68..72],
823 &0xFFFF_FFFFu32.to_be_bytes(),
824 "application_id = -1 must be encoded as 0xFFFFFFFF at offset 68"
825 );
826
827 Ok(())
828 }
829
830 #[tokio::test]
835 async fn test_application_id_byte_level_on_disk() -> Result<()> {
836 const GPKG_MAGIC: u32 = 1196444487; const USER_VERSION: i32 = 10201; let temp = TempDbPath::new("app_id_bytes");
840 let db_path = temp.as_str();
841
842 {
843 let db = Builder::new_local(db_path).build().await?;
844 let conn = db.connect()?;
845 conn.execute("CREATE TABLE gpkg_contents (id INTEGER PRIMARY KEY);", ())
847 .await?;
848 conn.execute(&format!("PRAGMA application_id = {GPKG_MAGIC};"), ())
849 .await?;
850 conn.execute(&format!("PRAGMA user_version = {USER_VERSION};"), ())
851 .await?;
852 let _ = conn.execute("PRAGMA wal_checkpoint;", ()).await;
857 }
858
859 let bytes = std::fs::read(db_path).expect("read database file");
860 assert!(
861 bytes.len() >= 72,
862 "database file is shorter than the 100-byte header"
863 );
864
865 assert_eq!(
867 &bytes[68..72],
868 &GPKG_MAGIC.to_be_bytes(),
869 "GPKG magic must be stored big-endian at file offset 68"
870 );
871 assert_eq!(
872 u32::from_be_bytes([bytes[68], bytes[69], bytes[70], bytes[71]]),
873 0x4750_4B47,
874 "application_id bytes must decode to 0x47504B47"
875 );
876
877 assert_eq!(
879 &bytes[60..64],
880 &USER_VERSION.to_be_bytes(),
881 "user_version must be stored big-endian at file offset 60"
882 );
883
884 let db = Builder::new_local(db_path).build().await?;
886 let conn = db.connect()?;
887 assert_eq!(
888 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
889 GPKG_MAGIC as i64
890 );
891 assert_eq!(
892 query_scalar_i64(&conn, "PRAGMA user_version;").await?,
893 USER_VERSION as i64
894 );
895
896 Ok(())
897 }
898
899 #[tokio::test]
904 async fn test_insert_or_ignore_rowid_conflict_skipped() -> Result<()> {
905 let db = Builder::new_local(":memory:").build().await?;
906 let conn = db.connect()?;
907 conn.execute(
908 "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
909 (),
910 )
911 .await?;
912 conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
913 .await?;
914
915 conn.execute("INSERT OR IGNORE INTO t (id, name) VALUES (1, 'Bob');", ())
917 .await?;
918
919 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
921 let mut rows = conn.query("SELECT name FROM t WHERE id = 1;", ()).await?;
922 let row = rows.next().await?.expect("row");
923 assert_eq!(row.get_value(0)?, Value::Text("Alice".to_string()));
924
925 Ok(())
926 }
927
928 #[tokio::test]
929 async fn test_insert_or_ignore_multi_row_other_rows_land() -> Result<()> {
930 let db = Builder::new_local(":memory:").build().await?;
931 let conn = db.connect()?;
932 conn.execute(
933 "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
934 (),
935 )
936 .await?;
937 conn.execute("INSERT INTO t (id, name) VALUES (2, 'Two');", ())
938 .await?;
939
940 conn.execute(
943 "INSERT OR IGNORE INTO t (id, name) VALUES (1, 'One'), (2, 'Dup'), (3, 'Three');",
944 (),
945 )
946 .await?;
947
948 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 3);
949 let mut rows = conn.query("SELECT name FROM t WHERE id = 2;", ()).await?;
951 assert_eq!(
952 rows.next().await?.expect("row").get_value(0)?,
953 Value::Text("Two".to_string())
954 );
955 let mut rows = conn.query("SELECT id FROM t ORDER BY id;", ()).await?;
957 assert_eq!(
958 rows.next().await?.expect("row").get_value(0)?,
959 Value::Integer(1)
960 );
961 assert_eq!(
962 rows.next().await?.expect("row").get_value(0)?,
963 Value::Integer(2)
964 );
965 assert_eq!(
966 rows.next().await?.expect("row").get_value(0)?,
967 Value::Integer(3)
968 );
969
970 Ok(())
971 }
972
973 #[cfg(feature = "index_experimental")]
974 #[tokio::test]
975 async fn test_insert_or_ignore_unique_index_conflict_skipped() -> Result<()> {
976 let db = Builder::new_local(":memory:").build().await?;
977 let conn = db.connect()?;
978 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT);", ())
979 .await?;
980 conn.execute("CREATE UNIQUE INDEX idx_email ON t (email);", ())
981 .await?;
982 conn.execute("INSERT INTO t (id, email) VALUES (1, 'a@example.com');", ())
983 .await?;
984
985 conn.execute(
988 "INSERT OR IGNORE INTO t (id, email) VALUES (2, 'a@example.com');",
989 (),
990 )
991 .await?;
992
993 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
994 let mut rows = conn.query("SELECT id FROM t ORDER BY id;", ()).await?;
996 assert_eq!(
997 rows.next().await?.expect("row").get_value(0)?,
998 Value::Integer(1)
999 );
1000 assert!(rows.next().await?.is_none());
1001
1002 Ok(())
1003 }
1004
1005 #[tokio::test]
1010 async fn test_insert_or_replace_rowid_conflict_replaces() -> Result<()> {
1011 let db = Builder::new_local(":memory:").build().await?;
1012 let conn = db.connect()?;
1013 conn.execute(
1014 "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
1015 (),
1016 )
1017 .await?;
1018 conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
1019 .await?;
1020
1021 conn.execute("INSERT OR REPLACE INTO t (id, name) VALUES (1, 'Bob');", ())
1023 .await?;
1024
1025 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1026 let mut rows = conn.query("SELECT name FROM t WHERE id = 1;", ()).await?;
1027 assert_eq!(
1028 rows.next().await?.expect("row").get_value(0)?,
1029 Value::Text("Bob".to_string())
1030 );
1031
1032 Ok(())
1033 }
1034
1035 #[tokio::test]
1036 async fn test_insert_or_replace_multi_row_conflict_with_prior_row() -> Result<()> {
1037 let db = Builder::new_local(":memory:").build().await?;
1038 let conn = db.connect()?;
1039 conn.execute(
1040 "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
1041 (),
1042 )
1043 .await?;
1044
1045 conn.execute(
1048 "INSERT OR REPLACE INTO t (id, name) VALUES (1, 'First'), (1, 'Second'), (2, 'Other');",
1049 (),
1050 )
1051 .await?;
1052
1053 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 2);
1054 let mut rows = conn.query("SELECT name FROM t WHERE id = 1;", ()).await?;
1055 assert_eq!(
1056 rows.next().await?.expect("row").get_value(0)?,
1057 Value::Text("Second".to_string())
1058 );
1059
1060 Ok(())
1061 }
1062
1063 #[cfg(feature = "index_experimental")]
1064 #[tokio::test]
1065 async fn test_insert_or_replace_single_unique_index_conflict() -> Result<()> {
1066 let db = Builder::new_local(":memory:").build().await?;
1067 let conn = db.connect()?;
1068 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT);", ())
1069 .await?;
1070 conn.execute("CREATE UNIQUE INDEX idx_email ON t (email);", ())
1071 .await?;
1072 conn.execute("INSERT INTO t (id, email) VALUES (1, 'a@example.com');", ())
1073 .await?;
1074
1075 conn.execute(
1078 "INSERT OR REPLACE INTO t (id, email) VALUES (2, 'a@example.com');",
1079 (),
1080 )
1081 .await?;
1082
1083 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1084 let mut rows = conn
1086 .query("SELECT id FROM t WHERE email = 'a@example.com';", ())
1087 .await?;
1088 assert_eq!(
1089 rows.next().await?.expect("row").get_value(0)?,
1090 Value::Integer(2)
1091 );
1092 assert!(rows.next().await?.is_none());
1093
1094 Ok(())
1095 }
1096
1097 #[cfg(feature = "index_experimental")]
1098 #[tokio::test]
1099 async fn test_insert_or_replace_multiple_unique_indexes_different_victims() -> Result<()> {
1100 let db = Builder::new_local(":memory:").build().await?;
1104 let conn = db.connect()?;
1105 conn.execute(
1106 "CREATE TABLE t (id INTEGER PRIMARY KEY, a TEXT, b TEXT);",
1107 (),
1108 )
1109 .await?;
1110 conn.execute("CREATE UNIQUE INDEX idx_a ON t (a);", ())
1111 .await?;
1112 conn.execute("CREATE UNIQUE INDEX idx_b ON t (b);", ())
1113 .await?;
1114
1115 conn.execute("INSERT INTO t (id, a, b) VALUES (1, 'A1', 'B1');", ())
1118 .await?;
1119 conn.execute("INSERT INTO t (id, a, b) VALUES (2, 'A2', 'B2');", ())
1120 .await?;
1121
1122 conn.execute(
1123 "INSERT OR REPLACE INTO t (id, a, b) VALUES (3, 'A1', 'B2');",
1124 (),
1125 )
1126 .await?;
1127
1128 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1130 let mut rows = conn.query("SELECT id, a, b FROM t;", ()).await?;
1131 let row = rows.next().await?.expect("row");
1132 assert_eq!(row.get_value(0)?, Value::Integer(3));
1133 assert_eq!(row.get_value(1)?, Value::Text("A1".to_string()));
1134 assert_eq!(row.get_value(2)?, Value::Text("B2".to_string()));
1135 assert!(rows.next().await?.is_none());
1136
1137 assert_eq!(
1139 query_scalar_i64(&conn, "SELECT id FROM t WHERE a = 'A1';").await?,
1140 3
1141 );
1142 assert_eq!(
1143 query_scalar_i64(&conn, "SELECT id FROM t WHERE b = 'B2';").await?,
1144 3
1145 );
1146
1147 Ok(())
1148 }
1149
1150 #[tokio::test]
1155 async fn test_plain_insert_rowid_conflict_still_errors() -> Result<()> {
1156 let db = Builder::new_local(":memory:").build().await?;
1157 let conn = db.connect()?;
1158 conn.execute(
1159 "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
1160 (),
1161 )
1162 .await?;
1163 conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
1164 .await?;
1165
1166 let result = conn
1168 .execute("INSERT INTO t (id, name) VALUES (1, 'Bob');", ())
1169 .await;
1170 assert!(
1171 result.is_err(),
1172 "plain INSERT on duplicate PRIMARY KEY must error, got Ok"
1173 );
1174
1175 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1177
1178 Ok(())
1179 }
1180
1181 #[cfg(feature = "index_experimental")]
1204 async fn query_row_count(conn: &Connection, sql: &str) -> Result<i64> {
1205 let mut rows = conn.query(sql, ()).await?;
1206 let mut n = 0i64;
1207 while rows.next().await?.is_some() {
1208 n += 1;
1209 }
1210 Ok(n)
1211 }
1212
1213 #[cfg(feature = "index_experimental")]
1215 async fn create_messages_schema(conn: &Connection) -> Result<()> {
1216 conn.execute(
1217 "CREATE TABLE IF NOT EXISTS messages (id INTEGER PRIMARY KEY AUTOINCREMENT, topic TEXT NOT NULL, timestamp INTEGER NOT NULL, data BLOB NOT NULL);",
1218 (),
1219 )
1220 .await?;
1221 conn.execute(
1222 "CREATE INDEX IF NOT EXISTS idx_messages_topic ON messages (topic);",
1223 (),
1224 )
1225 .await?;
1226 conn.execute(
1227 "CREATE INDEX IF NOT EXISTS idx_messages_timestamp ON messages (timestamp);",
1228 (),
1229 )
1230 .await?;
1231 Ok(())
1232 }
1233
1234 #[cfg(feature = "index_experimental")]
1237 async fn insert_message(
1238 conn: &Connection,
1239 topic: &str,
1240 timestamp: i64,
1241 data: Vec<u8>,
1242 ) -> Result<()> {
1243 conn.execute(
1244 "INSERT INTO messages (topic, timestamp, data) VALUES (?, ?, ?);",
1245 params::Params::Positional(vec![
1246 Value::Text(topic.to_string()),
1247 Value::Integer(timestamp),
1248 Value::Blob(data),
1249 ]),
1250 )
1251 .await?;
1252 Ok(())
1253 }
1254
1255 #[cfg(feature = "index_experimental")]
1262 #[tokio::test]
1263 async fn test_orphaned_wal_does_not_duplicate_rows_two_indexes() -> Result<()> {
1264 let dir = std::env::temp_dir().join(format!(
1265 "oxisqlite_orphan_wal_{}_{:?}",
1266 std::process::id(),
1267 std::thread::current().id()
1268 ));
1269 let _ = std::fs::remove_dir_all(&dir);
1270 std::fs::create_dir_all(&dir).expect("create temp dir");
1271 let db_path = dir.join("messages.db3");
1272 let p = db_path.to_str().expect("utf-8 path").to_string();
1273
1274 async fn cycle(p: &str) -> Result<(i64, i64)> {
1277 let _ = std::fs::remove_file(p);
1279 {
1280 let db = Builder::new_local(p).build().await?;
1281 let conn = db.connect()?;
1282 create_messages_schema(&conn).await?;
1283 insert_message(&conn, "/imu", 1_000_000_000, vec![0xDE, 0xAD]).await?;
1284 insert_message(&conn, "/gps", 2_000_000_000, vec![0xBE, 0xEF]).await?;
1285 }
1286 let db = Builder::new_local(p).build().await?;
1287 let conn = db.connect()?;
1288 create_messages_schema(&conn).await?; let scan =
1290 query_row_count(&conn, "SELECT timestamp, topic, data FROM messages;").await?;
1291 let ordered = query_row_count(
1292 &conn,
1293 "SELECT timestamp, topic, data FROM messages ORDER BY timestamp;",
1294 )
1295 .await?;
1296 Ok((scan, ordered))
1297 }
1298
1299 let (scan1, ord1) = cycle(&p).await?;
1301 assert_eq!(scan1, 2, "cycle 1 table scan");
1302 assert_eq!(ord1, 2, "cycle 1 ORDER BY timestamp (index scan)");
1303
1304 for c in 2..=3 {
1307 let (scan, ord) = cycle(&p).await?;
1308 assert_eq!(scan, 2, "cycle {c} table scan must stay 2");
1309 assert_eq!(ord, 2, "cycle {c} ORDER BY timestamp must stay 2");
1310 }
1311
1312 let _ = std::fs::remove_dir_all(&dir);
1313 Ok(())
1314 }
1315
1316 #[cfg(feature = "index_experimental")]
1320 #[tokio::test]
1321 async fn test_two_non_unique_indexes_roundtrip_values() -> Result<()> {
1322 let db = Builder::new_local(":memory:").build().await?;
1323 let conn = db.connect()?;
1324 create_messages_schema(&conn).await?;
1325 insert_message(&conn, "/imu", 1_000_000_000, vec![0xDE, 0xAD]).await?;
1326 insert_message(&conn, "/gps", 2_000_000_000, vec![0xBE, 0xEF]).await?;
1327
1328 assert_eq!(
1329 query_scalar_i64(&conn, "SELECT count(*) FROM messages;").await?,
1330 2
1331 );
1332
1333 let mut rows = conn
1334 .query(
1335 "SELECT timestamp, topic, data FROM messages ORDER BY timestamp;",
1336 (),
1337 )
1338 .await?;
1339 let r0 = rows.next().await?.expect("row 0");
1340 assert_eq!(r0.get_value(0)?, Value::Integer(1_000_000_000));
1341 assert_eq!(r0.get_value(1)?, Value::Text("/imu".to_string()));
1342 assert_eq!(r0.get_value(2)?, Value::Blob(vec![0xDE, 0xAD]));
1343 let r1 = rows.next().await?.expect("row 1");
1344 assert_eq!(r1.get_value(0)?, Value::Integer(2_000_000_000));
1345 assert_eq!(r1.get_value(1)?, Value::Text("/gps".to_string()));
1346 assert_eq!(r1.get_value(2)?, Value::Blob(vec![0xBE, 0xEF]));
1347 assert!(rows.next().await?.is_none(), "exactly two rows");
1348
1349 Ok(())
1350 }
1351
1352 #[cfg(feature = "index_experimental")]
1357 #[tokio::test]
1358 async fn test_index_count_matrix_single_inserts() -> Result<()> {
1359 for n_idx in 0..=3usize {
1360 let db = Builder::new_local(":memory:").build().await?;
1361 let conn = db.connect()?;
1362 conn.execute(
1363 "CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, a INTEGER, b INTEGER, c INTEGER);",
1364 (),
1365 )
1366 .await?;
1367 let cols = ["a", "b", "c"];
1368 for col in cols.iter().take(n_idx) {
1369 conn.execute(&format!("CREATE INDEX idx_{col} ON t ({col});"), ())
1370 .await?;
1371 }
1372 conn.execute("INSERT INTO t (a, b, c) VALUES (1, 1, 1);", ())
1373 .await?;
1374 conn.execute("INSERT INTO t (a, b, c) VALUES (2, 2, 2);", ())
1375 .await?;
1376
1377 assert_eq!(
1378 query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?,
1379 2,
1380 "n_idx={n_idx}: count(*)"
1381 );
1382 assert_eq!(
1383 query_row_count(&conn, "SELECT a FROM t;").await?,
1384 2,
1385 "n_idx={n_idx}: table scan"
1386 );
1387 assert_eq!(
1388 query_row_count(&conn, "SELECT a FROM t ORDER BY a;").await?,
1389 2,
1390 "n_idx={n_idx}: ORDER BY a (index scan)"
1391 );
1392 }
1393 Ok(())
1394 }
1395
1396 #[cfg(feature = "index_experimental")]
1399 #[tokio::test]
1400 async fn test_multi_row_insert_two_indexes() -> Result<()> {
1401 let db = Builder::new_local(":memory:").build().await?;
1402 let conn = db.connect()?;
1403 conn.execute(
1404 "CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, a INTEGER, b INTEGER);",
1405 (),
1406 )
1407 .await?;
1408 conn.execute("CREATE INDEX idx_a ON t (a);", ()).await?;
1409 conn.execute("CREATE INDEX idx_b ON t (b);", ()).await?;
1410 conn.execute("INSERT INTO t (a, b) VALUES (1, 10), (2, 20), (3, 30);", ())
1411 .await?;
1412
1413 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 3);
1414 assert_eq!(query_row_count(&conn, "SELECT a FROM t;").await?, 3);
1415 assert_eq!(
1416 query_row_count(&conn, "SELECT a FROM t ORDER BY a;").await?,
1417 3
1418 );
1419 assert_eq!(
1420 query_row_count(&conn, "SELECT b FROM t ORDER BY b;").await?,
1421 3
1422 );
1423 Ok(())
1424 }
1425
1426 #[cfg(feature = "index_experimental")]
1430 #[tokio::test]
1431 async fn test_insert_or_ignore_two_non_unique_indexes() -> Result<()> {
1432 let db = Builder::new_local(":memory:").build().await?;
1433 let conn = db.connect()?;
1434 conn.execute(
1435 "CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, a INTEGER, b INTEGER);",
1436 (),
1437 )
1438 .await?;
1439 conn.execute("CREATE INDEX idx_a ON t (a);", ()).await?;
1440 conn.execute("CREATE INDEX idx_b ON t (b);", ()).await?;
1441
1442 conn.execute("INSERT OR IGNORE INTO t (a, b) VALUES (1, 10);", ())
1445 .await?;
1446 conn.execute("INSERT OR IGNORE INTO t (a, b) VALUES (1, 10);", ())
1447 .await?;
1448
1449 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 2);
1450 assert_eq!(
1451 query_row_count(&conn, "SELECT a FROM t ORDER BY a;").await?,
1452 2
1453 );
1454 Ok(())
1455 }
1456
1457 #[cfg(feature = "index_experimental")]
1462 #[tokio::test]
1463 async fn test_insert_or_replace_unique_plus_non_unique_index() -> Result<()> {
1464 let db = Builder::new_local(":memory:").build().await?;
1465 let conn = db.connect()?;
1466 conn.execute(
1467 "CREATE TABLE t (id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT, tag INTEGER);",
1468 (),
1469 )
1470 .await?;
1471 conn.execute("CREATE UNIQUE INDEX idx_email ON t (email);", ())
1472 .await?;
1473 conn.execute("CREATE INDEX idx_tag ON t (tag);", ()).await?;
1474 conn.execute(
1475 "INSERT INTO t (id, email, tag) VALUES (1, 'a@example.com', 7);",
1476 (),
1477 )
1478 .await?;
1479
1480 conn.execute(
1482 "INSERT OR REPLACE INTO t (id, email, tag) VALUES (2, 'a@example.com', 9);",
1483 (),
1484 )
1485 .await?;
1486
1487 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1488 assert_eq!(
1491 query_row_count(&conn, "SELECT id FROM t ORDER BY tag;").await?,
1492 1
1493 );
1494 assert_eq!(
1495 query_scalar_i64(&conn, "SELECT id FROM t WHERE email = 'a@example.com';").await?,
1496 2
1497 );
1498 assert_eq!(
1499 query_scalar_i64(&conn, "SELECT id FROM t WHERE tag = 9;").await?,
1500 2
1501 );
1502 assert_eq!(
1504 query_row_count(&conn, "SELECT id FROM t WHERE tag = 7;").await?,
1505 0
1506 );
1507 Ok(())
1508 }
1509}