1#![allow(
3 rustdoc::bare_urls,
4 rustdoc::invalid_html_tags,
5 rustdoc::broken_intra_doc_links
6)]
7#![allow(
8 clippy::collapsible_match,
9 clippy::doc_overindented_list_items,
10 clippy::from_over_into
11)]
12
13pub mod params;
14pub mod value;
15
16pub use value::Value;
17
18pub use params::params_from_iter;
19
20use crate::params::*;
21use std::fmt::Debug;
22use std::num::NonZero;
23use std::sync::{Arc, Mutex};
24
25#[derive(Debug, thiserror::Error)]
26pub enum Error {
27 #[error("SQL conversion failure: `{0}`")]
28 ToSqlConversionFailure(BoxError),
29 #[error("Mutex lock error: {0}")]
30 MutexError(String),
31 #[error("SQL execution failure: `{0}`")]
32 SqlExecutionFailure(String),
33}
34
35impl From<limbo_core::LimboError> for Error {
36 fn from(err: limbo_core::LimboError) -> Self {
37 Error::SqlExecutionFailure(err.to_string())
38 }
39}
40
41pub(crate) type BoxError = Box<dyn std::error::Error + Send + Sync>;
42
43pub type Result<T> = std::result::Result<T, Error>;
44pub struct Builder {
45 path: String,
46}
47
48impl Builder {
49 pub fn new_local(path: &str) -> Self {
50 Self {
51 path: path.to_string(),
52 }
53 }
54
55 #[allow(unused_variables, clippy::arc_with_non_send_sync)]
56 pub async fn build(self) -> Result<Database> {
57 match self.path.as_str() {
58 ":memory:" => {
59 let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::MemoryIO::new());
60 let db = limbo_core::Database::open_file(io, self.path.as_str(), false)?;
61 Ok(Database { inner: db })
62 }
63 path => {
64 let io: Arc<dyn limbo_core::IO> = Arc::new(limbo_core::PlatformIO::new()?);
65 let db = limbo_core::Database::open_file(io, path, false)?;
66 Ok(Database { inner: db })
67 }
68 }
69 }
70}
71
72#[derive(Clone)]
73pub struct Database {
74 inner: Arc<limbo_core::Database>,
75}
76
77unsafe impl Send for Database {}
78unsafe impl Sync for Database {}
79
80impl Debug for Database {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 f.debug_struct("Database").finish()
83 }
84}
85
86impl Database {
87 pub fn connect(&self) -> Result<Connection> {
88 let conn = self.inner.connect()?;
89 #[allow(clippy::arc_with_non_send_sync)]
90 let connection = Connection {
91 inner: Arc::new(Mutex::new(conn)),
92 };
93 Ok(connection)
94 }
95}
96
97pub struct Connection {
98 inner: Arc<Mutex<Arc<limbo_core::Connection>>>,
99}
100
101impl Clone for Connection {
102 fn clone(&self) -> Self {
103 Self {
104 inner: Arc::clone(&self.inner),
105 }
106 }
107}
108
109unsafe impl Send for Connection {}
110unsafe impl Sync for Connection {}
111
112impl Connection {
113 pub async fn query(&self, sql: &str, params: impl IntoParams) -> Result<Rows> {
114 let mut stmt = self.prepare(sql).await?;
115 stmt.query(params).await
116 }
117
118 pub async fn execute(&self, sql: &str, params: impl IntoParams) -> Result<u64> {
119 let mut stmt = self.prepare(sql).await?;
120 stmt.execute(params).await
121 }
122
123 pub async fn prepare(&self, sql: &str) -> Result<Statement> {
124 let conn = self
125 .inner
126 .lock()
127 .map_err(|e| Error::MutexError(e.to_string()))?;
128
129 let stmt = conn.prepare(sql)?;
130
131 #[allow(clippy::arc_with_non_send_sync)]
132 let statement = Statement {
133 inner: Arc::new(Mutex::new(stmt)),
134 };
135 Ok(statement)
136 }
137
138 pub fn changes(&self) -> Result<i64> {
142 let conn = self
143 .inner
144 .lock()
145 .map_err(|e| Error::MutexError(e.to_string()))?;
146 Ok(conn.changes())
147 }
148
149 pub fn pragma_query<F>(&self, pragma_name: &str, mut f: F) -> Result<()>
150 where
151 F: FnMut(&Row) -> limbo_core::Result<()>,
152 {
153 let conn = self
154 .inner
155 .lock()
156 .map_err(|e| Error::MutexError(e.to_string()))?;
157
158 let rows: Vec<Row> = conn
159 .pragma_query(pragma_name)
160 .map_err(|e| Error::SqlExecutionFailure(e.to_string()))?
161 .iter()
162 .map(|row| row.iter().collect::<Row>())
163 .collect();
164
165 rows.iter().try_for_each(|row| {
166 f(row).map_err(|e| {
167 Error::SqlExecutionFailure(format!("Error executing user defined function: {}", e))
168 })
169 })?;
170 Ok(())
171 }
172}
173
174pub struct Statement {
175 inner: Arc<Mutex<limbo_core::Statement>>,
176}
177
178impl Clone for Statement {
179 fn clone(&self) -> Self {
180 Self {
181 inner: Arc::clone(&self.inner),
182 }
183 }
184}
185
186unsafe impl Send for Statement {}
187unsafe impl Sync for Statement {}
188
189impl Statement {
190 pub async fn query(&mut self, params: impl IntoParams) -> Result<Rows> {
191 let params = params.into_params()?;
192 match params {
193 params::Params::None => (),
194 params::Params::Positional(values) => {
195 for (i, value) in values.into_iter().enumerate() {
196 let mut stmt = self.inner.lock().unwrap();
197 stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
198 }
199 }
200 params::Params::Named(_items) => todo!(),
201 }
202 #[allow(clippy::arc_with_non_send_sync)]
203 let rows = Rows {
204 inner: Arc::clone(&self.inner),
205 };
206 Ok(rows)
207 }
208
209 pub async fn execute(&mut self, params: impl IntoParams) -> Result<u64> {
210 {
211 self.inner.lock().unwrap().reset();
213 }
214 let params = params.into_params()?;
215 match params {
216 params::Params::None => (),
217 params::Params::Positional(values) => {
218 for (i, value) in values.into_iter().enumerate() {
219 let mut stmt = self.inner.lock().unwrap();
220 stmt.bind_at(NonZero::new(i + 1).unwrap(), value.into());
221 }
222 }
223 params::Params::Named(_items) => todo!(),
224 }
225 loop {
226 let mut stmt = self.inner.lock().unwrap();
227 match stmt.step() {
228 Ok(limbo_core::StepResult::Row) => {
229 return Ok(2);
231 }
232 Ok(limbo_core::StepResult::Done) => {
233 return Ok(0);
234 }
235 Ok(limbo_core::StepResult::IO) => {
236 let _ = stmt.run_once();
237 }
239 Ok(limbo_core::StepResult::Busy) => {
240 return Ok(4);
241 }
242 Ok(limbo_core::StepResult::Interrupt) => {
243 return Ok(3);
244 }
245 Err(err) => {
246 return Err(err.into());
247 }
248 }
249 }
250 }
251
252 pub fn columns(&self) -> Vec<Column> {
253 let stmt = self.inner.lock().unwrap();
254
255 let n = stmt.num_columns();
256
257 let mut cols = Vec::with_capacity(n);
258
259 for i in 0..n {
260 let name = stmt.get_column_name(i).into_owned();
261 let decl_type = stmt.get_column_decl_type(i).map(|s| s.into_owned());
262 cols.push(Column { name, decl_type });
263 }
264
265 cols
266 }
267}
268
269pub struct Column {
270 name: String,
271 decl_type: Option<String>,
272}
273
274impl Column {
275 pub fn name(&self) -> &str {
276 &self.name
277 }
278
279 pub fn decl_type(&self) -> Option<&str> {
280 self.decl_type.as_deref()
281 }
282}
283
284pub trait IntoValue {
285 fn into_value(self) -> Result<Value>;
286}
287
288#[derive(Debug, Clone)]
289pub enum Params {
290 None,
291 Positional(Vec<Value>),
292 Named(Vec<(String, Value)>),
293}
294pub struct Transaction {}
295
296pub struct Rows {
297 inner: Arc<Mutex<limbo_core::Statement>>,
298}
299
300impl Clone for Rows {
301 fn clone(&self) -> Self {
302 Self {
303 inner: Arc::clone(&self.inner),
304 }
305 }
306}
307
308unsafe impl Send for Rows {}
309unsafe impl Sync for Rows {}
310
311impl Rows {
312 pub async fn next(&mut self) -> Result<Option<Row>> {
313 loop {
314 let mut stmt = self
315 .inner
316 .lock()
317 .map_err(|e| Error::MutexError(e.to_string()))?;
318 match stmt.step() {
319 Ok(limbo_core::StepResult::Row) => {
320 let row = stmt.row().unwrap();
321 return Ok(Some(Row {
322 values: row.get_values().map(|v| v.to_owned()).collect(),
323 }));
324 }
325 Ok(limbo_core::StepResult::Done) => return Ok(None),
326 Ok(limbo_core::StepResult::IO) => {
327 if let Err(e) = stmt.run_once() {
328 return Err(e.into());
329 }
330 continue;
331 }
332 Ok(limbo_core::StepResult::Busy) => return Ok(None),
333 Ok(limbo_core::StepResult::Interrupt) => return Ok(None),
334 _ => return Ok(None),
335 }
336 }
337 }
338}
339
340#[derive(Debug)]
341pub struct Row {
342 values: Vec<limbo_core::Value>,
343}
344
345unsafe impl Send for Row {}
346unsafe impl Sync for Row {}
347
348impl Row {
349 pub fn get_value(&self, index: usize) -> Result<Value> {
350 let value = &self.values[index];
351 match value {
352 limbo_core::Value::Integer(i) => Ok(Value::Integer(*i)),
353 limbo_core::Value::Null => Ok(Value::Null),
354 limbo_core::Value::Float(f) => Ok(Value::Real(*f)),
355 limbo_core::Value::Text(text) => Ok(Value::Text(text.to_string())),
356 limbo_core::Value::Blob(items) => Ok(Value::Blob(items.to_vec())),
357 }
358 }
359
360 pub fn column_count(&self) -> usize {
361 self.values.len()
362 }
363}
364
365impl<'a> FromIterator<&'a limbo_core::Value> for Row {
366 fn from_iter<T: IntoIterator<Item = &'a limbo_core::Value>>(iter: T) -> Self {
367 let values = iter
368 .into_iter()
369 .map(|v| match v {
370 limbo_core::Value::Integer(i) => limbo_core::Value::Integer(*i),
371 limbo_core::Value::Null => limbo_core::Value::Null,
372 limbo_core::Value::Float(f) => limbo_core::Value::Float(*f),
373 limbo_core::Value::Text(s) => limbo_core::Value::Text(s.clone()),
374 limbo_core::Value::Blob(b) => limbo_core::Value::Blob(b.clone()),
375 })
376 .collect();
377
378 Row { values }
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::*;
385 use tempfile::NamedTempFile;
386
387 #[tokio::test]
388 async fn test_database_persistence() -> Result<()> {
389 let temp_file = NamedTempFile::new().unwrap();
390 let db_path = temp_file.path().to_str().unwrap();
391
392 {
394 let db = Builder::new_local(db_path).build().await?;
395 let conn = db.connect()?;
396 conn.execute(
397 "CREATE TABLE test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
398 (),
399 )
400 .await?;
401 conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
402 .await?;
403 conn.execute("INSERT INTO test_persistence (name) VALUES ('Bob');", ())
404 .await?;
405 } let db = Builder::new_local(db_path).build().await?;
409 let conn = db.connect()?;
410
411 let mut rows = conn
412 .query("SELECT name FROM test_persistence ORDER BY id;", ())
413 .await?;
414
415 let row1 = rows.next().await?.expect("Expected first row");
416 assert_eq!(row1.get_value(0)?, Value::Text("Alice".to_string()));
417
418 let row2 = rows.next().await?.expect("Expected second row");
419 assert_eq!(row2.get_value(0)?, Value::Text("Bob".to_string()));
420
421 assert!(rows.next().await?.is_none(), "Expected no more rows");
422
423 Ok(())
424 }
425
426 #[tokio::test]
427 async fn test_database_persistence_many_frames() -> Result<()> {
428 let temp_file = NamedTempFile::new().unwrap();
429 let db_path = temp_file.path().to_str().unwrap();
430
431 const NUM_INSERTS: usize = 100;
432 const TARGET_STRING_LEN: usize = 1024; let mut original_data = Vec::with_capacity(NUM_INSERTS);
435 for i in 0..NUM_INSERTS {
436 let prefix = format!("test_string_{:04}_", i);
437 let padding_len = TARGET_STRING_LEN.saturating_sub(prefix.len());
438 let padding: String = "A".repeat(padding_len);
439 original_data.push(format!("{}{}", prefix, padding));
440 }
441
442 {
444 let db = Builder::new_local(db_path).build().await?;
445 let conn = db.connect()?;
446 conn.execute(
447 "CREATE TABLE test_large_persistence (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT NOT NULL);",
448 (),
449 )
450 .await?;
451
452 for data_val in &original_data {
453 conn.execute(
454 "INSERT INTO test_large_persistence (data) VALUES (?);",
455 params::Params::Positional(vec![Value::Text(data_val.clone())]),
456 )
457 .await?;
458 }
459 } let db = Builder::new_local(db_path).build().await?;
463 let conn = db.connect()?;
464
465 let mut rows = conn
466 .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
467 .await?;
468
469 for (i, expected) in original_data.iter().enumerate().take(NUM_INSERTS) {
470 let row = rows
471 .next()
472 .await?
473 .unwrap_or_else(|| panic!("Expected row {} but found None", i));
474 assert_eq!(
475 row.get_value(0)?,
476 Value::Text(expected.clone()),
477 "Mismatch in retrieved data for row {}",
478 i
479 );
480 }
481
482 assert!(
483 rows.next().await?.is_none(),
484 "Expected no more rows after retrieving all inserted data"
485 );
486
487 let wal_path = format!("{}-wal", db_path);
489 std::fs::remove_file(&wal_path)
490 .map_err(|e| eprintln!("Warning: Failed to delete WAL file for test: {}", e))
491 .unwrap();
492
493 let db_after_wal_delete = Builder::new_local(db_path).build().await?;
495 let conn_after_wal_delete = db_after_wal_delete.connect()?;
496
497 let query_result_after_wal_delete = conn_after_wal_delete
498 .query("SELECT data FROM test_large_persistence ORDER BY id;", ())
499 .await;
500
501 match query_result_after_wal_delete {
502 Ok(_) => panic!("Query succeeded after WAL deletion and DB reopen, but was expected to fail because the table definition should have been in the WAL."),
503 Err(Error::SqlExecutionFailure(msg)) => {
504 assert!(
505 msg.contains("test_large_persistence not found"),
506 "Expected 'test_large_persistence not found' error, but got: {}",
507 msg
508 );
509 }
510 Err(e) => panic!(
511 "Expected SqlExecutionFailure for 'no such table', but got a different error: {:?}",
512 e
513 ),
514 }
515
516 Ok(())
517 }
518
519 #[tokio::test]
520 async fn test_database_persistence_write_one_frame_many_times() -> Result<()> {
521 let temp_file = NamedTempFile::new().unwrap();
522 let db_path = temp_file.path().to_str().unwrap();
523
524 for i in 0..100 {
525 {
526 let db = Builder::new_local(db_path).build().await?;
527 let conn = db.connect()?;
528
529 conn.execute("CREATE TABLE IF NOT EXISTS test_persistence (id INTEGER PRIMARY KEY, name TEXT NOT NULL);", ()).await?;
530 conn.execute("INSERT INTO test_persistence (name) VALUES ('Alice');", ())
531 .await?;
532 }
533 {
534 let db = Builder::new_local(db_path).build().await?;
535 let conn = db.connect()?;
536
537 let mut rows_iter = conn
538 .query("SELECT count(*) FROM test_persistence;", ())
539 .await?;
540 let rows = rows_iter.next().await?.unwrap();
541 assert_eq!(rows.get_value(0)?, Value::Integer(i as i64 + 1));
542 assert!(rows_iter.next().await?.is_none());
543 }
544 }
545
546 Ok(())
547 }
548
549 async fn query_scalar_i64(conn: &Connection, sql: &str) -> Result<i64> {
555 let mut rows = conn.query(sql, ()).await?;
556 let row = rows
557 .next()
558 .await?
559 .unwrap_or_else(|| panic!("expected a row from `{sql}`"));
560 match row.get_value(0)? {
561 Value::Integer(i) => Ok(i),
562 other => panic!("expected Integer from `{sql}`, got {other:?}"),
563 }
564 }
565
566 #[tokio::test]
567 async fn test_application_id_write_read_round_trip() -> Result<()> {
568 let db = Builder::new_local(":memory:").build().await?;
569 let conn = db.connect()?;
570
571 assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, 0);
573
574 conn.execute("PRAGMA application_id = 1196444487;", ())
576 .await?;
577 assert_eq!(
578 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
579 1196444487
580 );
581
582 conn.execute("PRAGMA application_id = 42;", ()).await?;
584 assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, 42);
585
586 Ok(())
587 }
588
589 #[tokio::test]
590 async fn test_application_id_negative_round_trip() -> Result<()> {
591 let db = Builder::new_local(":memory:").build().await?;
594 let conn = db.connect()?;
595
596 conn.execute("PRAGMA application_id = -1;", ()).await?;
597 assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, -1);
598
599 conn.execute("PRAGMA application_id = -2147483648;", ())
600 .await?;
601 assert_eq!(
602 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
603 -2147483648
604 );
605
606 Ok(())
607 }
608
609 struct TempDbPath {
615 path: std::path::PathBuf,
616 }
617
618 impl TempDbPath {
619 fn new(tag: &str) -> Self {
620 use std::sync::atomic::{AtomicU64, Ordering};
621 static COUNTER: AtomicU64 = AtomicU64::new(0);
622 let n = COUNTER.fetch_add(1, Ordering::Relaxed);
623 let mut path = std::env::temp_dir();
624 path.push(format!(
625 "oxisqlite_dur_{}_{}_{}.db",
626 tag,
627 std::process::id(),
628 n
629 ));
630 let _ = std::fs::remove_file(&path);
632 let _ = std::fs::remove_file(format!("{}-wal", path.display()));
633 Self { path }
634 }
635
636 fn as_str(&self) -> &str {
637 self.path
638 .to_str()
639 .expect("temp db path is valid UTF-8 on the test platforms")
640 }
641 }
642
643 impl Drop for TempDbPath {
644 fn drop(&mut self) {
645 let _ = std::fs::remove_file(&self.path);
646 let _ = std::fs::remove_file(format!("{}-wal", self.path.display()));
647 }
648 }
649
650 #[tokio::test]
656 async fn test_application_id_persistence() -> Result<()> {
657 let temp = TempDbPath::new("app_id_persist");
658 let db_path = temp.as_str();
659
660 {
661 let db = Builder::new_local(db_path).build().await?;
662 let conn = db.connect()?;
663 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY);", ())
664 .await?;
665 conn.execute("PRAGMA application_id = -12345;", ()).await?;
666
667 assert_eq!(
669 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
670 -12345
671 );
672 } let db = Builder::new_local(db_path).build().await?;
676 let conn = db.connect()?;
677 assert_eq!(
678 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
679 -12345,
680 "application_id must survive close/reopen"
681 );
682
683 Ok(())
684 }
685
686 #[tokio::test]
689 async fn test_application_id_durable_reopen() -> Result<()> {
690 let temp = TempDbPath::new("app_id_reopen");
691 let db_path = temp.as_str();
692
693 {
694 let db = Builder::new_local(db_path).build().await?;
695 let conn = db.connect()?;
696 conn.execute("PRAGMA application_id = 12345;", ()).await?;
697 assert_eq!(
698 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
699 12345
700 );
701 }
702
703 let db = Builder::new_local(db_path).build().await?;
704 let conn = db.connect()?;
705 assert_eq!(
706 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
707 12345,
708 "application_id = 12345 must survive close/reopen"
709 );
710
711 Ok(())
712 }
713
714 #[tokio::test]
717 async fn test_user_version_durable_reopen() -> Result<()> {
718 let temp = TempDbPath::new("user_version_reopen");
719 let db_path = temp.as_str();
720
721 {
722 let db = Builder::new_local(db_path).build().await?;
723 let conn = db.connect()?;
724 conn.execute("PRAGMA user_version = 12345;", ()).await?;
725 assert_eq!(
726 query_scalar_i64(&conn, "PRAGMA user_version;").await?,
727 12345
728 );
729 }
730
731 let db = Builder::new_local(db_path).build().await?;
732 let conn = db.connect()?;
733 assert_eq!(
734 query_scalar_i64(&conn, "PRAGMA user_version;").await?,
735 12345,
736 "user_version = 12345 must survive close/reopen"
737 );
738
739 Ok(())
740 }
741
742 #[tokio::test]
746 async fn test_application_id_negative_durable_reopen() -> Result<()> {
747 let temp = TempDbPath::new("app_id_negative_reopen");
748 let db_path = temp.as_str();
749
750 {
751 let db = Builder::new_local(db_path).build().await?;
752 let conn = db.connect()?;
753 conn.execute("PRAGMA application_id = -1;", ()).await?;
754 assert_eq!(query_scalar_i64(&conn, "PRAGMA application_id;").await?, -1);
755 }
756
757 let db = Builder::new_local(db_path).build().await?;
758 let conn = db.connect()?;
759 assert_eq!(
760 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
761 -1,
762 "application_id = -1 must survive close/reopen as the signed value -1"
763 );
764
765 let conn = db.connect()?;
768 let _ = conn.execute("PRAGMA wal_checkpoint;", ()).await;
769 drop(conn);
770 drop(db);
771 let bytes = std::fs::read(db_path).expect("read database file");
772 assert!(bytes.len() >= 72, "database file shorter than the header");
773 assert_eq!(
774 &bytes[68..72],
775 &0xFFFF_FFFFu32.to_be_bytes(),
776 "application_id = -1 must be encoded as 0xFFFFFFFF at offset 68"
777 );
778
779 Ok(())
780 }
781
782 #[tokio::test]
787 async fn test_application_id_byte_level_on_disk() -> Result<()> {
788 const GPKG_MAGIC: u32 = 1196444487; const USER_VERSION: i32 = 10201; let temp = TempDbPath::new("app_id_bytes");
792 let db_path = temp.as_str();
793
794 {
795 let db = Builder::new_local(db_path).build().await?;
796 let conn = db.connect()?;
797 conn.execute("CREATE TABLE gpkg_contents (id INTEGER PRIMARY KEY);", ())
799 .await?;
800 conn.execute(&format!("PRAGMA application_id = {GPKG_MAGIC};"), ())
801 .await?;
802 conn.execute(&format!("PRAGMA user_version = {USER_VERSION};"), ())
803 .await?;
804 let _ = conn.execute("PRAGMA wal_checkpoint;", ()).await;
809 }
810
811 let bytes = std::fs::read(db_path).expect("read database file");
812 assert!(
813 bytes.len() >= 72,
814 "database file is shorter than the 100-byte header"
815 );
816
817 assert_eq!(
819 &bytes[68..72],
820 &GPKG_MAGIC.to_be_bytes(),
821 "GPKG magic must be stored big-endian at file offset 68"
822 );
823 assert_eq!(
824 u32::from_be_bytes([bytes[68], bytes[69], bytes[70], bytes[71]]),
825 0x4750_4B47,
826 "application_id bytes must decode to 0x47504B47"
827 );
828
829 assert_eq!(
831 &bytes[60..64],
832 &USER_VERSION.to_be_bytes(),
833 "user_version must be stored big-endian at file offset 60"
834 );
835
836 let db = Builder::new_local(db_path).build().await?;
838 let conn = db.connect()?;
839 assert_eq!(
840 query_scalar_i64(&conn, "PRAGMA application_id;").await?,
841 GPKG_MAGIC as i64
842 );
843 assert_eq!(
844 query_scalar_i64(&conn, "PRAGMA user_version;").await?,
845 USER_VERSION as i64
846 );
847
848 Ok(())
849 }
850
851 #[tokio::test]
856 async fn test_insert_or_ignore_rowid_conflict_skipped() -> Result<()> {
857 let db = Builder::new_local(":memory:").build().await?;
858 let conn = db.connect()?;
859 conn.execute(
860 "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
861 (),
862 )
863 .await?;
864 conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
865 .await?;
866
867 conn.execute("INSERT OR IGNORE INTO t (id, name) VALUES (1, 'Bob');", ())
869 .await?;
870
871 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
873 let mut rows = conn
874 .query("SELECT name FROM t WHERE id = 1;", ())
875 .await?;
876 let row = rows.next().await?.expect("row");
877 assert_eq!(row.get_value(0)?, Value::Text("Alice".to_string()));
878
879 Ok(())
880 }
881
882 #[tokio::test]
883 async fn test_insert_or_ignore_multi_row_other_rows_land() -> Result<()> {
884 let db = Builder::new_local(":memory:").build().await?;
885 let conn = db.connect()?;
886 conn.execute(
887 "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
888 (),
889 )
890 .await?;
891 conn.execute("INSERT INTO t (id, name) VALUES (2, 'Two');", ())
892 .await?;
893
894 conn.execute(
897 "INSERT OR IGNORE INTO t (id, name) VALUES (1, 'One'), (2, 'Dup'), (3, 'Three');",
898 (),
899 )
900 .await?;
901
902 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 3);
903 let mut rows = conn
905 .query("SELECT name FROM t WHERE id = 2;", ())
906 .await?;
907 assert_eq!(
908 rows.next().await?.expect("row").get_value(0)?,
909 Value::Text("Two".to_string())
910 );
911 let mut rows = conn
913 .query("SELECT id FROM t ORDER BY id;", ())
914 .await?;
915 assert_eq!(
916 rows.next().await?.expect("row").get_value(0)?,
917 Value::Integer(1)
918 );
919 assert_eq!(
920 rows.next().await?.expect("row").get_value(0)?,
921 Value::Integer(2)
922 );
923 assert_eq!(
924 rows.next().await?.expect("row").get_value(0)?,
925 Value::Integer(3)
926 );
927
928 Ok(())
929 }
930
931 #[cfg(feature = "index_experimental")]
932 #[tokio::test]
933 async fn test_insert_or_ignore_unique_index_conflict_skipped() -> Result<()> {
934 let db = Builder::new_local(":memory:").build().await?;
935 let conn = db.connect()?;
936 conn.execute(
937 "CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT);",
938 (),
939 )
940 .await?;
941 conn.execute("CREATE UNIQUE INDEX idx_email ON t (email);", ())
942 .await?;
943 conn.execute(
944 "INSERT INTO t (id, email) VALUES (1, 'a@example.com');",
945 (),
946 )
947 .await?;
948
949 conn.execute(
952 "INSERT OR IGNORE INTO t (id, email) VALUES (2, 'a@example.com');",
953 (),
954 )
955 .await?;
956
957 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
958 let mut rows = conn.query("SELECT id FROM t ORDER BY id;", ()).await?;
960 assert_eq!(
961 rows.next().await?.expect("row").get_value(0)?,
962 Value::Integer(1)
963 );
964 assert!(rows.next().await?.is_none());
965
966 Ok(())
967 }
968
969 #[tokio::test]
974 async fn test_insert_or_replace_rowid_conflict_replaces() -> Result<()> {
975 let db = Builder::new_local(":memory:").build().await?;
976 let conn = db.connect()?;
977 conn.execute(
978 "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
979 (),
980 )
981 .await?;
982 conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
983 .await?;
984
985 conn.execute(
987 "INSERT OR REPLACE INTO t (id, name) VALUES (1, 'Bob');",
988 (),
989 )
990 .await?;
991
992 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
993 let mut rows = conn.query("SELECT name FROM t WHERE id = 1;", ()).await?;
994 assert_eq!(
995 rows.next().await?.expect("row").get_value(0)?,
996 Value::Text("Bob".to_string())
997 );
998
999 Ok(())
1000 }
1001
1002 #[tokio::test]
1003 async fn test_insert_or_replace_multi_row_conflict_with_prior_row() -> Result<()> {
1004 let db = Builder::new_local(":memory:").build().await?;
1005 let conn = db.connect()?;
1006 conn.execute(
1007 "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
1008 (),
1009 )
1010 .await?;
1011
1012 conn.execute(
1015 "INSERT OR REPLACE INTO t (id, name) VALUES (1, 'First'), (1, 'Second'), (2, 'Other');",
1016 (),
1017 )
1018 .await?;
1019
1020 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 2);
1021 let mut rows = conn.query("SELECT name FROM t WHERE id = 1;", ()).await?;
1022 assert_eq!(
1023 rows.next().await?.expect("row").get_value(0)?,
1024 Value::Text("Second".to_string())
1025 );
1026
1027 Ok(())
1028 }
1029
1030 #[cfg(feature = "index_experimental")]
1031 #[tokio::test]
1032 async fn test_insert_or_replace_single_unique_index_conflict() -> Result<()> {
1033 let db = Builder::new_local(":memory:").build().await?;
1034 let conn = db.connect()?;
1035 conn.execute("CREATE TABLE t (id INTEGER PRIMARY KEY, email TEXT);", ())
1036 .await?;
1037 conn.execute("CREATE UNIQUE INDEX idx_email ON t (email);", ())
1038 .await?;
1039 conn.execute(
1040 "INSERT INTO t (id, email) VALUES (1, 'a@example.com');",
1041 (),
1042 )
1043 .await?;
1044
1045 conn.execute(
1048 "INSERT OR REPLACE INTO t (id, email) VALUES (2, 'a@example.com');",
1049 (),
1050 )
1051 .await?;
1052
1053 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1054 let mut rows = conn
1056 .query("SELECT id FROM t WHERE email = 'a@example.com';", ())
1057 .await?;
1058 assert_eq!(
1059 rows.next().await?.expect("row").get_value(0)?,
1060 Value::Integer(2)
1061 );
1062 assert!(rows.next().await?.is_none());
1063
1064 Ok(())
1065 }
1066
1067 #[cfg(feature = "index_experimental")]
1068 #[tokio::test]
1069 async fn test_insert_or_replace_multiple_unique_indexes_different_victims() -> Result<()> {
1070 let db = Builder::new_local(":memory:").build().await?;
1074 let conn = db.connect()?;
1075 conn.execute(
1076 "CREATE TABLE t (id INTEGER PRIMARY KEY, a TEXT, b TEXT);",
1077 (),
1078 )
1079 .await?;
1080 conn.execute("CREATE UNIQUE INDEX idx_a ON t (a);", ())
1081 .await?;
1082 conn.execute("CREATE UNIQUE INDEX idx_b ON t (b);", ())
1083 .await?;
1084
1085 conn.execute("INSERT INTO t (id, a, b) VALUES (1, 'A1', 'B1');", ())
1088 .await?;
1089 conn.execute("INSERT INTO t (id, a, b) VALUES (2, 'A2', 'B2');", ())
1090 .await?;
1091
1092 conn.execute(
1093 "INSERT OR REPLACE INTO t (id, a, b) VALUES (3, 'A1', 'B2');",
1094 (),
1095 )
1096 .await?;
1097
1098 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1100 let mut rows = conn.query("SELECT id, a, b FROM t;", ()).await?;
1101 let row = rows.next().await?.expect("row");
1102 assert_eq!(row.get_value(0)?, Value::Integer(3));
1103 assert_eq!(row.get_value(1)?, Value::Text("A1".to_string()));
1104 assert_eq!(row.get_value(2)?, Value::Text("B2".to_string()));
1105 assert!(rows.next().await?.is_none());
1106
1107 assert_eq!(
1109 query_scalar_i64(&conn, "SELECT id FROM t WHERE a = 'A1';").await?,
1110 3
1111 );
1112 assert_eq!(
1113 query_scalar_i64(&conn, "SELECT id FROM t WHERE b = 'B2';").await?,
1114 3
1115 );
1116
1117 Ok(())
1118 }
1119
1120 #[tokio::test]
1125 async fn test_plain_insert_rowid_conflict_still_errors() -> Result<()> {
1126 let db = Builder::new_local(":memory:").build().await?;
1127 let conn = db.connect()?;
1128 conn.execute(
1129 "CREATE TABLE t (id INTEGER PRIMARY KEY, name TEXT NOT NULL);",
1130 (),
1131 )
1132 .await?;
1133 conn.execute("INSERT INTO t (id, name) VALUES (1, 'Alice');", ())
1134 .await?;
1135
1136 let result = conn
1138 .execute("INSERT INTO t (id, name) VALUES (1, 'Bob');", ())
1139 .await;
1140 assert!(
1141 result.is_err(),
1142 "plain INSERT on duplicate PRIMARY KEY must error, got Ok"
1143 );
1144
1145 assert_eq!(query_scalar_i64(&conn, "SELECT count(*) FROM t;").await?, 1);
1147
1148 Ok(())
1149 }
1150}