1#![warn(missing_docs)]
57
58pub use libduckdb_sys as ffi;
59
60use std::{
61 cell::RefCell,
62 convert,
63 ffi::CString,
64 fmt,
65 path::{Path, PathBuf},
66 result, str,
67};
68
69use crate::{cache::StatementCache, inner_connection::InnerConnection, raw_statement::RawStatement, types::ValueRef};
70
71#[cfg(feature = "r2d2")]
72pub use crate::r2d2::DuckdbConnectionManager;
73pub use crate::{
74 appender::Appender,
75 appender_params::{AppenderParams, AppenderParamsFromIter, appender_params_from_iter},
76 arrow_batch::{Arrow, ArrowStream},
77 cache::CachedStatement,
78 column::Column,
79 config::{AccessMode, Config, DefaultNullOrder, DefaultOrder},
80 error::Error,
81 ffi::ErrorCode,
82 inner_connection::InterruptHandle,
83 params::{Params, ParamsFromIter, params_from_iter},
84 row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows},
85 statement::Statement,
86 transaction::{DropBehavior, Transaction},
87 types::ToSql,
88};
89#[cfg(feature = "polars")]
90pub use polars_dataframe::Polars;
91
92pub use arrow;
94#[cfg(feature = "loadable-extension")]
95pub use duckdb_loadable_macros::duckdb_entrypoint_c_api;
96#[cfg(feature = "polars")]
97pub use polars;
98
99pub mod core;
101
102#[macro_use]
103mod error;
104mod appender;
105mod appender_params;
106mod arrow_batch;
107mod cache;
108mod column;
109mod config;
110mod inner_connection;
111mod params;
112
113#[cfg(feature = "polars")]
114mod polars_dataframe;
115mod pragma;
116#[cfg(feature = "r2d2")]
117mod r2d2;
118mod raw_statement;
119mod row;
120mod statement;
121mod transaction;
122
123#[cfg(feature = "extensions-full")]
124mod extension;
125
126pub mod profiling;
127pub mod types;
128#[cfg(feature = "vtab")]
130pub mod vtab;
131
132#[cfg(feature = "vscalar")]
134pub mod vscalar;
135
136#[cfg(test)]
137mod test_all_types;
138
139const STATEMENT_CACHE_DEFAULT_CAPACITY: usize = 16;
141
142#[macro_export]
164macro_rules! params {
165 () => {
166 &[] as &[&dyn $crate::ToSql]
167 };
168 ($($param:expr),+ $(,)?) => {
169 &[$(&$param as &dyn $crate::ToSql),+] as &[&dyn $crate::ToSql]
170 };
171}
172
173pub type Result<T, E = Error> = result::Result<T, E>;
175
176pub trait OptionalExt<T> {
178 fn optional(self) -> Result<Option<T>>;
184}
185
186impl<T> OptionalExt<T> for Result<T> {
187 fn optional(self) -> Result<Option<T>> {
188 match self {
189 Ok(value) => Ok(Some(value)),
190 Err(Error::QueryReturnedNoRows) => Ok(None),
191 Err(e) => Err(e),
192 }
193 }
194}
195
196#[derive(Copy, Clone, Debug)]
198pub enum DatabaseName<'a> {
199 Main,
201
202 Temp,
204
205 Attached(&'a str),
207}
208
209#[allow(clippy::needless_lifetimes)]
210impl<'a> fmt::Display for DatabaseName<'a> {
211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212 match *self {
213 DatabaseName::Main => write!(f, "main"),
214 DatabaseName::Temp => write!(f, "temp"),
215 DatabaseName::Attached(s) => write!(f, "{s}"),
216 }
217 }
218}
219
220pub const MAIN_DB: DatabaseName<'static> = DatabaseName::Main;
222
223pub const TEMP_DB: DatabaseName<'static> = DatabaseName::Temp;
225
226pub struct Connection {
228 db: RefCell<InnerConnection>,
229 cache: StatementCache,
230 path: Option<PathBuf>,
231}
232
233unsafe impl Send for Connection {}
234
235impl Connection {
236 #[inline]
257 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
258 Self::open_with_flags(path, Config::default())
259 }
260
261 #[inline]
267 pub fn open_in_memory() -> Result<Self> {
268 Self::open_in_memory_with_flags(Config::default())
269 }
270
271 #[inline]
280 pub unsafe fn open_from_raw(raw: ffi::duckdb_database) -> Result<Self> {
281 unsafe { InnerConnection::new_from_raw_db(raw, false) }.map(|db| Self {
282 db: RefCell::new(db),
283 cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
284 path: None, })
286 }
287
288 #[inline]
295 pub fn open_with_flags<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
296 #[cfg(unix)]
297 fn path_to_cstring(p: &Path) -> Result<CString> {
298 use std::os::unix::ffi::OsStrExt;
299 Ok(CString::new(p.as_os_str().as_bytes())?)
300 }
301
302 #[cfg(not(unix))]
303 fn path_to_cstring(p: &Path) -> Result<CString> {
304 let s = p.to_str().ok_or_else(|| Error::InvalidPath(p.to_owned()))?;
305 Ok(CString::new(s)?)
306 }
307
308 let c_path = path_to_cstring(path.as_ref())?;
309 let config = config.with("duckdb_api", "rust").unwrap();
310 InnerConnection::open_with_flags(&c_path, config).map(|db| Self {
311 db: RefCell::new(db),
312 cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
313 path: Some(path.as_ref().to_path_buf()),
314 })
315 }
316
317 #[inline]
323 pub fn open_in_memory_with_flags(config: Config) -> Result<Self> {
324 Self::open_with_flags(":memory:", config)
325 }
326
327 pub fn execute_batch(&self, sql: &str) -> Result<()> {
348 self.db.borrow_mut().execute(sql)
349 }
350
351 #[inline]
387 pub fn execute<P: Params>(&self, sql: &str, params: P) -> Result<usize> {
388 self.prepare(sql).and_then(|mut stmt| stmt.execute(params))
389 }
390
391 #[inline]
393 pub fn path(&self) -> Option<&Path> {
394 self.path.as_deref()
395 }
396
397 #[inline]
425 pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T>
426 where
427 P: Params,
428 F: FnOnce(&Row<'_>) -> Result<T>,
429 {
430 self.prepare(sql)?.query_row(params, f)
431 }
432
433 #[inline]
459 pub fn query_row_and_then<T, E, P, F>(&self, sql: &str, params: P, f: F) -> Result<T, E>
460 where
461 P: Params,
462 F: FnOnce(&Row<'_>) -> Result<T, E>,
463 E: convert::From<Error>,
464 {
465 self.prepare(sql)?
466 .query(params)?
467 .get_expected_row()
468 .map_err(E::from)
469 .and_then(f)
470 }
471
472 #[inline]
491 pub fn prepare(&self, sql: &str) -> Result<Statement<'_>> {
492 self.db.borrow_mut().prepare(self, sql)
493 }
494
495 pub fn appender(&self, table: &str) -> Result<Appender<'_>> {
513 self.appender_to_db(table, &DatabaseName::Main.to_string())
514 }
515
516 pub fn appender_to_db(&self, table: &str, schema: &str) -> Result<Appender<'_>> {
533 self.db.borrow_mut().appender(self, table, schema)
534 }
535
536 pub fn appender_to_catalog_and_db(&self, table: &str, catalog: &str, schema: &str) -> Result<Appender<'_>> {
553 self.db
554 .borrow_mut()
555 .appender_to_catalog_and_db(self, table, catalog, schema)
556 }
557
558 pub fn appender_with_columns(&self, table: &str, columns: &[&str]) -> Result<Appender<'_>> {
582 self.appender_with_columns_to_db(table, &DatabaseName::Main.to_string(), columns)
583 }
584
585 pub fn appender_with_columns_to_db(&self, table: &str, schema: &str, columns: &[&str]) -> Result<Appender<'_>> {
589 self.db
590 .borrow_mut()
591 .appender_with_columns(self, table, schema, None, columns)
592 }
593
594 pub fn appender_with_columns_to_catalog_and_db(
598 &self,
599 table: &str,
600 catalog: &str,
601 schema: &str,
602 columns: &[&str],
603 ) -> Result<Appender<'_>> {
604 self.db
605 .borrow_mut()
606 .appender_with_columns(self, table, schema, Some(catalog), columns)
607 }
608
609 pub fn interrupt_handle(&self) -> std::sync::Arc<InterruptHandle> {
630 self.db.borrow().get_interrupt_handle()
631 }
632
633 #[inline]
643 #[allow(clippy::result_large_err)]
644 pub fn close(self) -> Result<(), (Self, Error)> {
645 let r = self.db.borrow_mut().close();
646 r.map_err(move |err| (self, err))
647 }
648
649 #[inline]
652 pub fn is_autocommit(&self) -> bool {
653 self.db.borrow().is_autocommit()
654 }
655
656 pub fn try_clone(&self) -> Result<Self> {
658 let inner = self.db.borrow().try_clone()?;
659 Ok(Self {
660 db: RefCell::new(inner),
661 cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
662 path: self.path.clone(),
663 })
664 }
665
666 pub fn version(&self) -> Result<String> {
668 self.query_row("PRAGMA version", [], |row| row.get(0))
669 }
670}
671
672impl fmt::Debug for Connection {
673 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
674 f.debug_struct("Connection").field("path", &self.path).finish()
675 }
676}
677
678#[cfg(doctest)]
679doc_comment::doctest!("../../../README.md");
680
681#[cfg(test)]
682mod test {
683 use crate::types::Value;
684
685 use super::*;
686 use std::{error::Error as StdError, fmt};
687
688 use arrow::{array::Int32Array, datatypes::DataType, record_batch::RecordBatch};
689 use fallible_iterator::FallibleIterator;
690
691 #[allow(dead_code, unconditional_recursion, clippy::extra_unused_type_parameters)]
695 fn ensure_send<T: Send>() {
696 ensure_send::<Connection>();
697 }
698
699 pub fn checked_memory_handle() -> Connection {
700 Connection::open_in_memory().unwrap()
701 }
702
703 #[test]
704 fn test_params_of_vary_types() -> Result<()> {
705 let db = checked_memory_handle();
706 let sql = "BEGIN;
707 CREATE TABLE foo(bar TEXT, qux INTEGER);
708 INSERT INTO foo VALUES ('baz', 1), ('baz', 2), ('baz', 3);
709 END;";
710 db.execute_batch(sql)?;
711
712 let changed = db.execute("UPDATE foo SET qux = ? WHERE bar = ?", params![1i32, &"baz"])?;
713 assert_eq!(changed, 3);
714 Ok(())
715 }
716
717 #[test]
718 #[cfg_attr(windows, ignore = "Windows doesn't allow concurrent writes to a file")]
719 fn test_concurrent_transactions_busy_commit() -> Result<()> {
720 let tmp = tempfile::tempdir().unwrap();
721 let path = tmp.path().join("transactions.db3");
722
723 Connection::open(&path)?.execute_batch(
724 "
725 BEGIN;
726 CREATE TABLE foo(x INTEGER);
727 INSERT INTO foo VALUES(42);
728 END;",
729 )?;
730
731 let mut db1 =
732 Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
733 let mut db2 =
734 Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
735
736 {
737 let tx1 = db1.transaction()?;
738 let tx2 = db2.transaction()?;
739
740 tx1.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
742 tx2.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
743
744 tx1.execute("INSERT INTO foo VALUES(?1)", [1])?;
745 let _ = tx2.execute("INSERT INTO foo VALUES(?1)", [2]);
746
747 let _ = tx1.commit();
748 let _ = tx2.commit();
749 }
750
751 let _ = db1.transaction().expect("commit should have closed transaction");
752 let _ = db2.transaction().expect("commit should have closed transaction");
753 Ok(())
754 }
755
756 #[test]
757 fn test_persistence() -> Result<()> {
758 let temp_dir = tempfile::tempdir().unwrap();
759 let path = temp_dir.path().join("test.db3");
760
761 {
762 let db = Connection::open(&path)?;
763 let sql = "BEGIN;
764 CREATE TABLE foo(x INTEGER);
765 INSERT INTO foo VALUES(42);
766 END;";
767 db.execute_batch(sql)?;
768 }
769
770 let path_string = path.to_str().unwrap();
771 let db = Connection::open(path_string)?;
772 let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
773
774 assert_eq!(42i64, the_answer?);
775 Ok(())
776 }
777
778 #[test]
779 fn test_open() {
780 let con = Connection::open_in_memory();
781 if let Err(e) = con {
782 panic!("open error {e}");
783 }
784 assert!(Connection::open_in_memory().is_ok());
785 let db = checked_memory_handle();
786 assert!(db.close().is_ok());
787 let _ = checked_memory_handle();
788 let _ = checked_memory_handle();
789 }
790
791 #[test]
792 fn test_open_from_raw() {
793 unsafe {
794 use std::{ffi::c_void, os::raw::c_char, ptr};
795
796 let mut db: ffi::duckdb_database = ptr::null_mut();
797 let mut c_err: *mut c_char = ptr::null_mut();
798 let r = ffi::duckdb_open_ext(
799 c":memory:".as_ptr(),
800 &mut db,
801 Config::default().duckdb_config(),
802 &mut c_err,
803 );
804 if r != ffi::DuckDBSuccess {
805 if !c_err.is_null() {
806 ffi::duckdb_free(c_err as *mut c_void);
807 }
808 panic!("duckdb_open_ext failed: {r:?}");
809 }
810
811 let conn = Connection::open_from_raw(db).unwrap();
812 conn.execute_batch("SELECT 1").unwrap();
813 let cloned = conn.try_clone().unwrap();
814 drop(conn);
815 cloned.execute_batch("SELECT 2").unwrap();
816 cloned.close().unwrap();
817
818 ffi::duckdb_close(&mut db);
819 }
820 }
821
822 #[test]
823 fn test_open_failure() -> Result<()> {
824 let filename = "no_such_file.db";
825 let result =
826 Connection::open_with_flags(filename, Config::default().access_mode(config::AccessMode::ReadOnly)?);
827 assert!(result.is_err());
828 let err = result.err().unwrap();
829 if let Error::DuckDBFailure(_e, Some(msg)) = err {
830 assert!(
833 msg.contains(filename),
834 "error message '{msg}' does not contain '{filename}'"
835 );
836 } else {
837 panic!("DuckDBFailure expected");
838 }
839 Ok(())
840 }
841
842 #[cfg(unix)]
843 #[test]
844 fn test_invalid_unicode_file_names() -> Result<()> {
845 use std::{ffi::OsStr, fs::File, os::unix::ffi::OsStrExt};
846 let temp_dir = tempfile::tempdir().unwrap();
847
848 let path = temp_dir.path();
849 if File::create(path.join(OsStr::from_bytes(&[0xFE]))).is_err() {
850 return Ok(());
852 }
853 let db_path = path.join(OsStr::from_bytes(&[0xFF]));
854 {
855 let db = Connection::open(&db_path)?;
856 let sql = "BEGIN;
857 CREATE TABLE foo(x INTEGER);
858 INSERT INTO foo VALUES(42);
859 END;";
860 db.execute_batch(sql)?;
861 }
862
863 let db = Connection::open(&db_path)?;
864 let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
865
866 assert_eq!(42i64, the_answer?);
867 Ok(())
868 }
869
870 #[test]
871 fn test_close_always_ok() -> Result<()> {
872 let db = checked_memory_handle();
873
874 db.close().unwrap();
877 Ok(())
878 }
879
880 #[test]
881 fn test_execute_batch() -> Result<()> {
882 let db = checked_memory_handle();
883 let sql = "BEGIN;
884 CREATE TABLE foo(x INTEGER);
885 INSERT INTO foo VALUES(1);
886 INSERT INTO foo VALUES(2);
887 INSERT INTO foo VALUES(3);
888 INSERT INTO foo VALUES(4);
889 END;";
890 db.execute_batch(sql)?;
891
892 db.execute_batch("UPDATE foo SET x = 3 WHERE x < 3")?;
893
894 assert!(db.execute_batch("INVALID SQL").is_err());
895 Ok(())
896 }
897
898 #[test]
899 fn test_execute_single() -> Result<()> {
900 let db = checked_memory_handle();
901 db.execute_batch("CREATE TABLE foo(x INTEGER)")?;
902
903 assert_eq!(
904 3,
905 db.execute("INSERT INTO foo(x) VALUES (?), (?), (?)", [1i32, 2i32, 3i32])?
906 );
907 assert_eq!(1, db.execute("INSERT INTO foo(x) VALUES (?)", [4i32])?);
908
909 assert_eq!(
910 10i32,
911 db.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
912 );
913 Ok(())
914 }
915
916 #[test]
917 fn test_prepare_column_names() -> Result<()> {
918 let db = checked_memory_handle();
919 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
920
921 let mut stmt = db.prepare("SELECT * FROM foo")?;
922 stmt.execute([])?;
923 assert_eq!(stmt.column_count(), 1);
924 assert_eq!(stmt.column_names(), vec!["x"]);
925
926 let mut stmt = db.prepare("SELECT x AS a, x AS b FROM foo")?;
927 stmt.execute([])?;
928 assert_eq!(stmt.column_count(), 2);
929 assert_eq!(stmt.column_names(), vec!["a", "b"]);
930 Ok(())
931 }
932
933 #[test]
934 fn test_prepare_execute() -> Result<()> {
935 let db = checked_memory_handle();
936 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
937
938 let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
939 assert_eq!(insert_stmt.execute([1i32])?, 1);
940 assert_eq!(insert_stmt.execute([2i32])?, 1);
941 assert_eq!(insert_stmt.execute([3i32])?, 1);
942
943 assert!(insert_stmt.execute(["hello"]).is_err());
944 let mut update_stmt = db.prepare("UPDATE foo SET x=? WHERE x<?")?;
949 assert_eq!(update_stmt.execute([3i32, 3i32])?, 2);
950 assert_eq!(update_stmt.execute([3i32, 3i32])?, 0);
951 assert_eq!(update_stmt.execute([8i32, 8i32])?, 3);
952 Ok(())
953 }
954
955 #[test]
956 fn test_prepare_query() -> Result<()> {
957 let db = checked_memory_handle();
958 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
959
960 let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
961 assert_eq!(insert_stmt.execute([1i32])?, 1);
962 assert_eq!(insert_stmt.execute([2i32])?, 1);
963 assert_eq!(insert_stmt.execute([3i32])?, 1);
964
965 let mut query = db.prepare("SELECT x FROM foo WHERE x < ? ORDER BY x DESC")?;
966 {
967 let mut rows = query.query([4i32])?;
968 let mut v = Vec::<i32>::new();
969
970 while let Some(row) = rows.next()? {
971 v.push(row.get(0)?);
972 }
973
974 assert_eq!(v, [3i32, 2, 1]);
975 }
976
977 {
978 let mut rows = query.query([3i32])?;
979 let mut v = Vec::<i32>::new();
980
981 while let Some(row) = rows.next()? {
982 v.push(row.get(0)?);
983 }
984
985 assert_eq!(v, [2i32, 1]);
986 }
987 Ok(())
988 }
989
990 #[test]
991 fn test_query_map() -> Result<()> {
992 let db = checked_memory_handle();
993 let sql = "BEGIN;
994 CREATE TABLE foo(x INTEGER, y TEXT);
995 INSERT INTO foo VALUES(4, 'hello');
996 INSERT INTO foo VALUES(3, ', ');
997 INSERT INTO foo VALUES(2, 'world');
998 INSERT INTO foo VALUES(1, '!');
999 END;";
1000 db.execute_batch(sql)?;
1001
1002 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1003 let results: Result<Vec<String>> = query.query([])?.map(|row| row.get(1)).collect();
1004
1005 assert_eq!(results?.concat(), "hello, world!");
1006 Ok(())
1007 }
1008
1009 #[test]
1010 fn test_query_row() -> Result<()> {
1011 let db = checked_memory_handle();
1012 let sql = "BEGIN;
1013 CREATE TABLE foo(x INTEGER);
1014 INSERT INTO foo VALUES(1);
1015 INSERT INTO foo VALUES(2);
1016 INSERT INTO foo VALUES(3);
1017 INSERT INTO foo VALUES(4);
1018 END;";
1019 db.execute_batch(sql)?;
1020
1021 assert_eq!(
1022 10i64,
1023 db.query_row::<i64, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
1024 );
1025
1026 let result: Result<i64> = db.query_row("SELECT x FROM foo WHERE x > 5", [], |r| r.get(0));
1027 match result.unwrap_err() {
1028 Error::QueryReturnedNoRows => (),
1029 err => panic!("Unexpected error {err}"),
1030 }
1031
1032 let bad_query_result = db.query_row("NOT A PROPER QUERY; test123", [], |_| Ok(()));
1033
1034 assert!(bad_query_result.is_err());
1035 Ok(())
1036 }
1037
1038 #[test]
1039 fn test_optional() -> Result<()> {
1040 let db = checked_memory_handle();
1041
1042 let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 <> 0", [], |r| r.get(0));
1043 let result = result.optional();
1044 match result? {
1045 None => (),
1046 _ => panic!("Unexpected result"),
1047 }
1048
1049 let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 == 0", [], |r| r.get(0));
1050 let result = result.optional();
1051 match result? {
1052 Some(1) => (),
1053 _ => panic!("Unexpected result"),
1054 }
1055
1056 let bad_query_result: Result<i64> = db.query_row("NOT A PROPER QUERY", [], |r| r.get(0));
1057 let bad_query_result = bad_query_result.optional();
1058 assert!(bad_query_result.is_err());
1059 Ok(())
1060 }
1061
1062 #[test]
1063 fn test_prepare_failures() -> Result<()> {
1064 let db = checked_memory_handle();
1065 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1066
1067 let _ = db.prepare("SELECT * FROM does_not_exist").unwrap_err();
1068 Ok(())
1070 }
1071
1072 #[test]
1073 fn test_is_autocommit() {
1074 let db = checked_memory_handle();
1075 assert!(db.is_autocommit(), "autocommit expected to be active by default");
1076 }
1077
1078 #[test]
1079 #[should_panic(expected = "not supported")]
1080 fn test_statement_debugging() {
1081 let db = checked_memory_handle();
1082 let query = "SELECT 12345";
1083 let stmt = db.prepare(query).unwrap();
1084
1085 assert!(format!("{stmt:?}").contains(query));
1086 }
1087
1088 #[test]
1089 fn test_notnull_constraint_error() -> Result<()> {
1090 let db = checked_memory_handle();
1091 db.execute_batch("CREATE TABLE foo(x TEXT NOT NULL)")?;
1092
1093 let result = db.execute("INSERT INTO foo (x) VALUES (NULL)", []);
1094 assert!(result.is_err());
1095
1096 match result.unwrap_err() {
1097 Error::DuckDBFailure(err, _) => {
1098 assert_eq!(err.code, ErrorCode::Unknown);
1100 }
1101 err => panic!("Unexpected error {err}"),
1102 }
1103 Ok(())
1104 }
1105
1106 #[test]
1107 fn test_clone() -> Result<()> {
1108 {
1110 let owned_con = checked_memory_handle();
1111 {
1112 let cloned_con = owned_con.try_clone().unwrap();
1113 cloned_con.execute_batch("create table test (c1 bigint)")?;
1114 cloned_con.close().unwrap();
1115 }
1116 owned_con.execute_batch("create table test2 (c1 bigint)")?;
1117 owned_con.close().unwrap();
1118 }
1119
1120 {
1122 let cloned_con = {
1123 let owned_con = checked_memory_handle();
1124 let clone = owned_con.try_clone().unwrap();
1125 owned_con.execute_batch("create table test (c1 bigint)")?;
1126 owned_con.close().unwrap();
1127 clone
1128 };
1129 cloned_con.execute_batch("create table test2 (c1 bigint)")?;
1130 cloned_con.close().unwrap();
1131 }
1132 Ok(())
1133 }
1134
1135 #[test]
1136 fn test_try_clone_after_owner_drop() -> Result<()> {
1137 let clone1 = {
1138 let owned = checked_memory_handle();
1139 let clone1 = owned.try_clone()?;
1140 drop(owned);
1141 clone1
1142 };
1143
1144 let clone2 = clone1.try_clone()?;
1145 clone2.execute_batch("CREATE TABLE t312(i INTEGER); INSERT INTO t312 VALUES (1);")?;
1146 let value: i32 = clone1.query_row("SELECT i FROM t312", [], |r| r.get(0))?;
1147 assert_eq!(value, 1);
1148 Ok(())
1149 }
1150
1151 mod query_and_then_tests {
1152 use super::*;
1153
1154 #[derive(Debug)]
1155 enum CustomError {
1156 SomeError,
1157 Sqlite(Error),
1158 }
1159
1160 impl fmt::Display for CustomError {
1161 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1162 match *self {
1163 Self::SomeError => write!(f, "my custom error"),
1164 Self::Sqlite(ref se) => write!(f, "my custom error: {se}"),
1165 }
1166 }
1167 }
1168
1169 impl StdError for CustomError {
1170 fn description(&self) -> &str {
1171 "my custom error"
1172 }
1173
1174 fn cause(&self) -> Option<&dyn StdError> {
1175 match *self {
1176 Self::SomeError => None,
1177 Self::Sqlite(ref se) => Some(se),
1178 }
1179 }
1180 }
1181
1182 impl From<Error> for CustomError {
1183 fn from(se: Error) -> Self {
1184 Self::Sqlite(se)
1185 }
1186 }
1187
1188 type CustomResult<T> = Result<T, CustomError>;
1189
1190 #[test]
1191 fn test_query_and_then() -> Result<()> {
1192 let db = checked_memory_handle();
1193 let sql = "BEGIN;
1194 CREATE TABLE foo(x INTEGER, y TEXT);
1195 INSERT INTO foo VALUES(4, 'hello');
1196 INSERT INTO foo VALUES(3, ', ');
1197 INSERT INTO foo VALUES(2, 'world');
1198 INSERT INTO foo VALUES(1, '!');
1199 END;";
1200 db.execute_batch(sql)?;
1201
1202 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1203 let results: Result<Vec<String>> = query.query_and_then([], |row| row.get(1))?.collect();
1204
1205 assert_eq!(results?.concat(), "hello, world!");
1206 Ok(())
1207 }
1208
1209 #[test]
1210 fn test_query_and_then_fails() -> Result<()> {
1211 let db = checked_memory_handle();
1212 let sql = "BEGIN;
1213 CREATE TABLE foo(x INTEGER, y TEXT);
1214 INSERT INTO foo VALUES(4, 'hello');
1215 INSERT INTO foo VALUES(3, ', ');
1216 INSERT INTO foo VALUES(2, 'world');
1217 INSERT INTO foo VALUES(1, '!');
1218 END;";
1219 db.execute_batch(sql)?;
1220
1221 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1222 let bad_type: Result<Vec<f64>> = query.query_and_then([], |row| row.get(1))?.collect();
1223
1224 match bad_type.unwrap_err() {
1225 Error::InvalidColumnType(..) => (),
1226 err => panic!("Unexpected error {err}"),
1227 }
1228
1229 let bad_idx: Result<Vec<String>> = query.query_and_then([], |row| row.get(3))?.collect();
1230
1231 match bad_idx.unwrap_err() {
1232 Error::InvalidColumnIndex(_) => (),
1233 err => panic!("Unexpected error {err}"),
1234 }
1235 Ok(())
1236 }
1237
1238 #[test]
1239 fn test_query_and_then_custom_error() -> CustomResult<()> {
1240 let db = checked_memory_handle();
1241 let sql = "BEGIN;
1242 CREATE TABLE foo(x INTEGER, y TEXT);
1243 INSERT INTO foo VALUES(4, 'hello');
1244 INSERT INTO foo VALUES(3, ', ');
1245 INSERT INTO foo VALUES(2, 'world');
1246 INSERT INTO foo VALUES(1, '!');
1247 END;";
1248 db.execute_batch(sql)?;
1249
1250 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1251 let results: CustomResult<Vec<String>> = query
1252 .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1253 .collect();
1254
1255 assert_eq!(results?.concat(), "hello, world!");
1256 Ok(())
1257 }
1258
1259 #[test]
1260 fn test_query_and_then_custom_error_fails() -> Result<()> {
1261 let db = checked_memory_handle();
1262 let sql = "BEGIN;
1263 CREATE TABLE foo(x INTEGER, y TEXT);
1264 INSERT INTO foo VALUES(4, 'hello');
1265 INSERT INTO foo VALUES(3, ', ');
1266 INSERT INTO foo VALUES(2, 'world');
1267 INSERT INTO foo VALUES(1, '!');
1268 END;";
1269 db.execute_batch(sql)?;
1270
1271 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1272 let bad_type: CustomResult<Vec<f64>> = query
1273 .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1274 .collect();
1275
1276 match bad_type.unwrap_err() {
1277 CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1278 err => panic!("Unexpected error {err}"),
1279 }
1280
1281 let bad_idx: CustomResult<Vec<String>> = query
1282 .query_and_then([], |row| row.get(3).map_err(CustomError::Sqlite))?
1283 .collect();
1284
1285 match bad_idx.unwrap_err() {
1286 CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1287 err => panic!("Unexpected error {err}"),
1288 }
1289
1290 let non_sqlite_err: CustomResult<Vec<String>> =
1291 query.query_and_then([], |_| Err(CustomError::SomeError))?.collect();
1292
1293 match non_sqlite_err.unwrap_err() {
1294 CustomError::SomeError => (),
1295 err => panic!("Unexpected error {err}"),
1296 }
1297 Ok(())
1298 }
1299
1300 #[test]
1301 fn test_query_row_and_then_custom_error() -> CustomResult<()> {
1302 let db = checked_memory_handle();
1303 let sql = "BEGIN;
1304 CREATE TABLE foo(x INTEGER, y TEXT);
1305 INSERT INTO foo VALUES(4, 'hello');
1306 END;";
1307 db.execute_batch(sql)?;
1308
1309 let query = "SELECT x, y FROM foo ORDER BY x DESC";
1310 let results: CustomResult<String> =
1311 db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1312
1313 assert_eq!(results?, "hello");
1314 Ok(())
1315 }
1316
1317 #[test]
1318 fn test_query_row_and_then_custom_error_fails() -> Result<()> {
1319 let db = checked_memory_handle();
1320 let sql = "BEGIN;
1321 CREATE TABLE foo(x INTEGER, y TEXT);
1322 INSERT INTO foo VALUES(4, 'hello');
1323 END;";
1324 db.execute_batch(sql)?;
1325
1326 let query = "SELECT x, y FROM foo ORDER BY x DESC";
1327 let bad_type: CustomResult<f64> =
1328 db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1329
1330 match bad_type.unwrap_err() {
1331 CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1332 err => panic!("Unexpected error {err}"),
1333 }
1334
1335 let bad_idx: CustomResult<String> =
1336 db.query_row_and_then(query, [], |row| row.get(3).map_err(CustomError::Sqlite));
1337
1338 match bad_idx.unwrap_err() {
1339 CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1340 err => panic!("Unexpected error {err}"),
1341 }
1342
1343 let non_sqlite_err: CustomResult<String> =
1344 db.query_row_and_then(query, [], |_| Err(CustomError::SomeError));
1345
1346 match non_sqlite_err.unwrap_err() {
1347 CustomError::SomeError => (),
1348 err => panic!("Unexpected error {err}"),
1349 }
1350 Ok(())
1351 }
1352
1353 #[test]
1354 fn test_rows_and_then_with_custom_error() -> Result<()> {
1355 let db = checked_memory_handle();
1356 db.execute_batch("CREATE TABLE test (value INTEGER)")?;
1357 db.execute_batch("INSERT INTO test VALUES (1), (3), (5)")?;
1358
1359 let mut stmt = db.prepare("SELECT value FROM test ORDER BY value")?;
1360 let rows = stmt.query([])?;
1361
1362 let results: Vec<i32> = rows
1364 .and_then(|row| -> CustomResult<i32> {
1365 let val: i32 = row.get(0)?; if val > 10 {
1367 Err(CustomError::SomeError) } else {
1369 Ok(val)
1370 }
1371 })
1372 .collect::<CustomResult<Vec<_>>>()
1373 .unwrap();
1374
1375 assert_eq!(results, vec![1, 3, 5]);
1376 Ok(())
1377 }
1378 }
1379
1380 #[test]
1381 fn test_dynamic() -> Result<()> {
1382 let db = checked_memory_handle();
1383 let sql = "BEGIN;
1384 CREATE TABLE foo(x INTEGER, y TEXT);
1385 INSERT INTO foo VALUES(4, 'hello');
1386 END;";
1387 db.execute_batch(sql)?;
1388
1389 db.query_row("SELECT * FROM foo", [], |r| {
1390 assert_eq!(2, r.as_ref().column_count());
1391 Ok(())
1392 })
1393 }
1394 #[test]
1395 fn test_dyn_box() -> Result<()> {
1396 let db = checked_memory_handle();
1397 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1398 let b: Box<dyn ToSql> = Box::new(5);
1399 db.execute("INSERT INTO foo VALUES(?)", [b])?;
1400 db.query_row("SELECT x FROM foo", [], |r| {
1401 assert_eq!(5, r.get_unwrap::<_, i32>(0));
1402 Ok(())
1403 })
1404 }
1405
1406 #[test]
1407 fn test_alter_table() -> Result<()> {
1408 let db = checked_memory_handle();
1409 db.execute_batch("CREATE TABLE x(t INTEGER);")?;
1410 db.execute("ALTER TABLE x RENAME TO y;", [])?;
1412 Ok(())
1413 }
1414
1415 #[test]
1416 fn test_query_arrow_record_batch_small() -> Result<()> {
1417 let db = checked_memory_handle();
1418 let sql = "BEGIN TRANSACTION;
1419 CREATE TABLE test(t INTEGER);
1420 INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);
1421 END TRANSACTION;";
1422 db.execute_batch(sql)?;
1423 let mut stmt = db.prepare("select t from test order by t desc")?;
1424 let mut arr = stmt.query_arrow([])?;
1425
1426 let schema = arr.get_schema();
1427 assert_eq!(schema.fields().len(), 1);
1428 assert_eq!(schema.field(0).name(), "t");
1429 assert_eq!(schema.field(0).data_type(), &DataType::Int32);
1430
1431 let rb = arr.next().unwrap();
1432 let column = rb.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1433 assert_eq!(column.len(), 5);
1434 assert_eq!(column.value(0), 5);
1435 assert_eq!(column.value(1), 4);
1436 assert_eq!(column.value(2), 3);
1437 assert_eq!(column.value(3), 2);
1438 assert_eq!(column.value(4), 1);
1439
1440 assert!(arr.next().is_none());
1441 Ok(())
1442 }
1443
1444 #[test]
1445 fn test_query_arrow_record_batch_large() -> Result<()> {
1446 let db = checked_memory_handle();
1447 db.execute_batch("BEGIN TRANSACTION")?;
1448 db.execute_batch("CREATE TABLE test(t INTEGER);")?;
1449 for _ in 0..600 {
1450 db.execute_batch("INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);")?;
1451 }
1452 db.execute_batch("END TRANSACTION")?;
1453 let rbs: Vec<RecordBatch> = db.prepare("select t from test order by t")?.query_arrow([])?.collect();
1454 assert_eq!(rbs.iter().map(|rb| rb.num_rows()).sum::<usize>(), 3000);
1457 assert_eq!(
1458 rbs.iter()
1459 .map(|rb| rb
1460 .column(0)
1461 .as_any()
1462 .downcast_ref::<Int32Array>()
1463 .unwrap()
1464 .iter()
1465 .map(|i| i.unwrap())
1466 .sum::<i32>())
1467 .sum::<i32>(),
1468 9000
1469 );
1470 Ok(())
1471 }
1472
1473 #[test]
1474 fn test_stream_arrow_with_call() -> Result<()> {
1475 use arrow::datatypes::{DataType, Field, Schema};
1476 use std::sync::Arc;
1477
1478 let db = checked_memory_handle();
1479
1480 db.execute_batch(
1481 "CREATE TABLE test_data(id INTEGER, name VARCHAR);
1482 INSERT INTO test_data VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie');",
1483 )?;
1484
1485 db.execute_batch("CREATE MACRO test_func() AS TABLE SELECT * FROM test_data;")?;
1486
1487 let schema = Arc::new(Schema::new(vec![
1488 Field::new("id", DataType::Int32, true),
1489 Field::new("name", DataType::Utf8, true),
1490 ]));
1491
1492 let mut stmt = db.prepare("CALL test_func()")?;
1493 let rbs: Vec<RecordBatch> = stmt.stream_arrow([], schema)?.collect();
1494
1495 assert!(!rbs.is_empty(), "Expected at least one record batch");
1497 let total_rows: usize = rbs.iter().map(|rb| rb.num_rows()).sum();
1498 assert_eq!(total_rows, 3);
1499
1500 let id_column = rbs[0].column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1501 assert_eq!(id_column.value(0), 1);
1502
1503 Ok(())
1504 }
1505
1506 #[test]
1507 fn round_trip_interval() -> Result<()> {
1508 let db = checked_memory_handle();
1509 db.execute_batch("CREATE TABLE foo (t INTERVAL);")?;
1510
1511 let d = Value::Interval {
1512 months: 1,
1513 days: 2,
1514 nanos: 3,
1515 };
1516 db.execute("INSERT INTO foo VALUES (?)", [d])?;
1517
1518 let mut stmt = db.prepare("SELECT t FROM foo")?;
1519 let mut rows = stmt.query([])?;
1520 let row = rows.next()?.unwrap();
1521 let d: Value = row.get_unwrap(0);
1522 assert_eq!(d, d);
1523 Ok(())
1524 }
1525
1526 #[test]
1527 fn test_database_name_to_string() -> Result<()> {
1528 assert_eq!(DatabaseName::Main.to_string(), "main");
1529 assert_eq!(DatabaseName::Temp.to_string(), "temp");
1530 assert_eq!(DatabaseName::Attached("abc").to_string(), "abc");
1531 Ok(())
1532 }
1533
1534 #[test]
1535 fn test_interrupt() -> Result<()> {
1536 let db = checked_memory_handle();
1537 let db_interrupt = db.interrupt_handle();
1538
1539 let (tx, rx) = std::sync::mpsc::channel();
1540 std::thread::spawn(move || {
1541 let mut stmt = db
1542 .prepare("select count(*) from range(10000000) t1, range(1000000) t2")
1543 .unwrap();
1544 tx.send(stmt.execute([])).unwrap();
1545 });
1546
1547 std::thread::sleep(std::time::Duration::from_millis(100));
1548 db_interrupt.interrupt();
1549
1550 let result = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap();
1551 assert!(result.is_err_and(|err| err.to_string().contains("INTERRUPT")));
1552 Ok(())
1553 }
1554
1555 #[test]
1556 fn test_interrupt_on_dropped_db() {
1557 let db = checked_memory_handle();
1558 let db_interrupt = db.interrupt_handle();
1559
1560 drop(db);
1561 db_interrupt.interrupt();
1562 }
1563
1564 #[test]
1565 fn test_arrow_string_view_setting() -> Result<()> {
1566 {
1568 let config = Config::default().with("produce_arrow_string_view", "true")?;
1569 let conn = Connection::open_in_memory_with_flags(config)?;
1570
1571 let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1572 let arrow = query.query_arrow([])?;
1573
1574 let batch = arrow.into_iter().next().expect("Expected at least one batch");
1575 assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8);
1576 }
1577
1578 {
1579 let config = Config::default()
1580 .with("produce_arrow_string_view", "true")?
1581 .with("arrow_output_version", "1.4")?;
1582 let conn = Connection::open_in_memory_with_flags(config)?;
1583
1584 let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1585 let arrow = query.query_arrow([])?;
1586
1587 let batch = arrow.into_iter().next().expect("Expected at least one batch");
1588 assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8View);
1589 }
1590
1591 Ok(())
1592 }
1593
1594 #[test]
1595 fn test_prepare_multi_statement() -> Result<()> {
1596 let db = checked_memory_handle();
1597
1598 {
1599 let mut stmt =
1600 db.prepare("CREATE TABLE test(x INTEGER); INSERT INTO test VALUES (42); SELECT x FROM test;")?;
1601 let result: i32 = stmt.query_row([], |row| row.get(0))?;
1602 assert_eq!(result, 42);
1603 }
1604
1605 {
1606 let mut stmt = db.prepare(
1607 "CREATE TEMP TABLE temp_data(id INTEGER, value TEXT);
1608 INSERT INTO temp_data VALUES (1, 'first'), (2, 'second');
1609 SELECT COUNT(*) FROM temp_data;",
1610 )?;
1611 let count: i32 = stmt.query_row([], |row| row.get(0))?;
1612 assert_eq!(count, 2);
1613 }
1614
1615 Ok(())
1616 }
1617
1618 #[test]
1619 fn test_pivot_query() -> Result<()> {
1620 let db = checked_memory_handle();
1621
1622 db.execute_batch(
1623 "CREATE TABLE cities(city VARCHAR, year INTEGER, population INTEGER);
1624 INSERT INTO cities VALUES
1625 ('Amsterdam', 2000, 1005),
1626 ('Amsterdam', 2010, 1065),
1627 ('Amsterdam', 2020, 1158),
1628 ('Berlin', 2000, 3382),
1629 ('Berlin', 2010, 3460),
1630 ('Berlin', 2020, 3576);",
1631 )?;
1632
1633 let mut stmt = db.prepare("PIVOT cities ON year USING sum(population);")?;
1635 let mut rows = stmt.query([])?;
1636
1637 let mut row_count = 0;
1638 while let Some(_row) = rows.next()? {
1639 row_count += 1;
1640 }
1641 assert_eq!(row_count, 2);
1642
1643 Ok(())
1644 }
1645
1646 #[test]
1647 fn test_multiple_memory_databases() -> Result<()> {
1648 {
1650 let mem1 = Connection::open_in_memory()?;
1651 let mem2 = Connection::open_in_memory()?;
1652
1653 mem1.execute_batch("CREATE TABLE test (id INTEGER)")?;
1654 mem1.execute("INSERT INTO test VALUES (1)", [])?;
1655
1656 mem2.execute_batch("CREATE TABLE test (id INTEGER)")?;
1657 mem2.execute("INSERT INTO test VALUES (2)", [])?;
1658
1659 let value1: i32 = mem1.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1660 assert_eq!(value1, 1);
1661
1662 let value2: i32 = mem2.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1663 assert_eq!(value2, 2);
1664 }
1665
1666 {
1668 let shared = Connection::open_in_memory()?;
1669
1670 shared.execute_batch("CREATE TABLE shared_table (id INTEGER)")?;
1671 shared.execute("INSERT INTO shared_table VALUES (123)", [])?;
1672
1673 let cloned = shared.try_clone()?;
1674
1675 let value: i32 = cloned.query_row("SELECT id FROM shared_table", [], |r| r.get(0))?;
1677 assert_eq!(value, 123);
1678
1679 cloned.execute("INSERT INTO shared_table VALUES (456)", [])?;
1680
1681 let count: i64 = shared.query_row("SELECT COUNT(*) FROM shared_table", [], |r| r.get(0))?;
1683 assert_eq!(count, 2);
1684 }
1685
1686 Ok(())
1687 }
1688
1689 #[test]
1690 fn test_appender_with_catalog() -> Result<()> {
1691 let db = checked_memory_handle();
1692
1693 let temp_dir = tempfile::tempdir().unwrap();
1695 let attached_path = temp_dir.path().join("attached.db");
1696 db.execute_batch(&format!("ATTACH '{}' AS attached_db", attached_path.display()))?;
1697
1698 db.execute_batch("CREATE TABLE attached_db.main.test_table (id INTEGER, name TEXT)")?;
1700
1701 {
1703 let mut app = db.appender_to_catalog_and_db("test_table", "attached_db", "main")?;
1704 app.append_row(params![1, "Alice"])?;
1705 app.append_row(params![2, "Bob"])?;
1706 app.append_row(params![3, "Charlie"])?;
1707 }
1708
1709 let count: i64 = db.query_row("SELECT COUNT(*) FROM attached_db.main.test_table", [], |r| r.get(0))?;
1711 assert_eq!(count, 3);
1712
1713 let name: String = db.query_row("SELECT name FROM attached_db.main.test_table WHERE id = ?", [2], |r| {
1714 r.get(0)
1715 })?;
1716 assert_eq!(name, "Bob");
1717
1718 Ok(())
1719 }
1720
1721 #[test]
1722 fn test_appender_with_catalog_multiple_schemas() -> Result<()> {
1723 let db = checked_memory_handle();
1724
1725 let temp_dir = tempfile::tempdir().unwrap();
1727 let attached_path = temp_dir.path().join("multi_schema.db");
1728 db.execute_batch(&format!("ATTACH '{}' AS my_catalog", attached_path.display()))?;
1729
1730 db.execute_batch("CREATE SCHEMA my_catalog.schema1")?;
1732 db.execute_batch("CREATE SCHEMA my_catalog.schema2")?;
1733 db.execute_batch("CREATE TABLE my_catalog.schema1.data (value INTEGER)")?;
1734 db.execute_batch("CREATE TABLE my_catalog.schema2.data (value INTEGER)")?;
1735
1736 {
1738 let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema1")?;
1739 app.append_rows([[10], [20], [30]])?;
1740 }
1741
1742 {
1744 let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema2")?;
1745 app.append_rows([[100], [200]])?;
1746 }
1747
1748 let sum1: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema1.data", [], |r| r.get(0))?;
1750 assert_eq!(sum1, 60);
1751
1752 let sum2: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema2.data", [], |r| r.get(0))?;
1754 assert_eq!(sum2, 300);
1755
1756 Ok(())
1757 }
1758
1759 #[test]
1760 fn test_appender_with_catalog_main_vs_attached() -> Result<()> {
1761 let db = checked_memory_handle();
1762
1763 db.execute_batch("CREATE TABLE test (id INTEGER)")?;
1765
1766 let temp_dir = tempfile::tempdir().unwrap();
1768 let attached_path = temp_dir.path().join("other.db");
1769 db.execute_batch(&format!("ATTACH '{}' AS other_db", attached_path.display()))?;
1770 db.execute_batch("CREATE TABLE other_db.main.test (id INTEGER)")?;
1771
1772 {
1774 let mut app = db.appender_to_catalog_and_db("test", "memory", "main")?;
1775 app.append_rows([[1], [2]])?;
1776 }
1777
1778 {
1780 let mut app = db.appender_to_catalog_and_db("test", "other_db", "main")?;
1781 app.append_rows([[100], [200]])?;
1782 }
1783
1784 let count_main: i64 = db.query_row("SELECT COUNT(*) FROM test", [], |r| r.get(0))?;
1786 assert_eq!(count_main, 2);
1787
1788 let count_attached: i64 = db.query_row("SELECT COUNT(*) FROM other_db.main.test", [], |r| r.get(0))?;
1790 assert_eq!(count_attached, 2);
1791
1792 Ok(())
1793 }
1794
1795 #[test]
1796 fn test_appender_with_catalog_error_invalid_catalog() -> Result<()> {
1797 let db = checked_memory_handle();
1798
1799 let result = db.appender_to_catalog_and_db("test", "nonexistent_catalog", "main");
1801 assert!(result.is_err());
1802
1803 Ok(())
1804 }
1805
1806 #[test]
1807 fn test_appender_with_catalog_error_invalid_schema() -> Result<()> {
1808 let db = checked_memory_handle();
1809
1810 let temp_dir = tempfile::tempdir().unwrap();
1812 let attached_path = temp_dir.path().join("test.db");
1813 db.execute_batch(&format!("ATTACH '{}' AS my_db", attached_path.display()))?;
1814
1815 db.execute_batch("CREATE TABLE my_db.main.test (id INTEGER)")?;
1816
1817 let result = db.appender_to_catalog_and_db("test", "my_db", "nonexistent_schema");
1819 assert!(result.is_err());
1820
1821 Ok(())
1822 }
1823
1824 #[test]
1825 fn test_appender_with_catalog_flush() -> Result<()> {
1826 let db = checked_memory_handle();
1827
1828 let temp_dir = tempfile::tempdir().unwrap();
1830 let attached_path = temp_dir.path().join("flush_test.db");
1831 db.execute_batch(&format!("ATTACH '{}' AS flush_db", attached_path.display()))?;
1832
1833 db.execute_batch("CREATE TABLE flush_db.main.test (id INTEGER)")?;
1834
1835 {
1837 let mut app = db.appender_to_catalog_and_db("test", "flush_db", "main")?;
1838 app.append_row([1])?;
1839 app.append_row([2])?;
1840 app.flush()?;
1841 app.append_row([3])?;
1842 app.flush()?;
1843 }
1844
1845 let count: i64 = db.query_row("SELECT COUNT(*) FROM flush_db.main.test", [], |r| r.get(0))?;
1847 assert_eq!(count, 3);
1848
1849 Ok(())
1850 }
1851
1852 #[test]
1854 fn test_enum_read() -> Result<()> {
1855 let conn = Connection::open_in_memory()?;
1856 conn.execute_batch(
1857 r#"
1858 CREATE TABLE stats (
1859 name ENUM('CA', 'NY'),
1860 value INTEGER,
1861 );
1862 INSERT INTO stats VALUES ('CA', 10), ('CA', 20), ('NY', 4);
1863 "#,
1864 )?;
1865
1866 let mut stmt = conn.prepare("SELECT * FROM stats")?;
1867 let results: Vec<(String, i32)> = stmt
1868 .query_map([], |row| {
1869 let name: String = row.get(0)?;
1870 let value: i32 = row.get(1)?;
1871 Ok((name, value))
1872 })?
1873 .map(|r| r.unwrap())
1874 .collect();
1875
1876 assert_eq!(results.len(), 3);
1877 assert_eq!(results[0], ("CA".to_string(), 10));
1878 assert_eq!(results[1], ("CA".to_string(), 20));
1879 assert_eq!(results[2], ("NY".to_string(), 4));
1880 Ok(())
1881 }
1882
1883 #[test]
1884 fn test_enum_read_nullable() -> Result<()> {
1885 let conn = Connection::open_in_memory()?;
1886 conn.execute_batch(
1887 r#"
1888 CREATE TABLE stats (name ENUM('CA', 'NY'));
1889 INSERT INTO stats VALUES ('CA'), (NULL), ('NY');
1890 "#,
1891 )?;
1892
1893 let mut stmt = conn.prepare("SELECT name FROM stats")?;
1894 let results: Vec<Option<String>> = stmt.query_map([], |row| row.get(0))?.map(|r| r.unwrap()).collect();
1895
1896 assert_eq!(results, vec![Some("CA".into()), None, Some("NY".into())]);
1897 Ok(())
1898 }
1899}