1#![warn(missing_docs)]
57
58pub use libduckdb_sys as ffi;
59
60use std::{
61 cell::RefCell,
62 convert,
63 ffi::CString,
64 fmt,
65 path::{Path, PathBuf},
66 result, str,
67};
68
69use crate::{cache::StatementCache, inner_connection::InnerConnection, raw_statement::RawStatement, types::ValueRef};
70
71#[cfg(feature = "r2d2")]
72pub use crate::r2d2::DuckdbConnectionManager;
73pub use crate::{
74 appender::Appender,
75 appender_params::{appender_params_from_iter, AppenderParams, AppenderParamsFromIter},
76 arrow_batch::{Arrow, ArrowStream},
77 cache::CachedStatement,
78 column::Column,
79 config::{AccessMode, Config, DefaultNullOrder, DefaultOrder},
80 error::Error,
81 ffi::ErrorCode,
82 inner_connection::InterruptHandle,
83 params::{params_from_iter, Params, ParamsFromIter},
84 row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows},
85 statement::Statement,
86 transaction::{DropBehavior, Transaction},
87 types::ToSql,
88};
89#[cfg(feature = "polars")]
90pub use polars_dataframe::Polars;
91
92pub use arrow;
94#[cfg(feature = "loadable-extension")]
95pub use duckdb_loadable_macros::duckdb_entrypoint_c_api;
96#[cfg(feature = "polars")]
97pub use polars;
98#[cfg(feature = "polars")]
99pub use polars_arrow as arrow2;
100
101pub mod core;
103
104#[macro_use]
105mod error;
106mod appender;
107mod appender_params;
108mod arrow_batch;
109mod arrow_scan;
110mod cache;
111mod column;
112mod config;
113mod inner_connection;
114mod params;
115#[cfg(feature = "polars")]
116mod polars_dataframe;
117mod pragma;
118#[cfg(feature = "r2d2")]
119mod r2d2;
120mod raw_statement;
121mod row;
122mod statement;
123mod transaction;
124
125#[cfg(feature = "extensions-full")]
126mod extension;
127
128pub mod types;
129#[cfg(feature = "vtab")]
131pub mod vtab;
132
133#[cfg(feature = "vscalar")]
135pub mod vscalar;
136
137#[cfg(test)]
138mod test_all_types;
139
140const STATEMENT_CACHE_DEFAULT_CAPACITY: usize = 16;
142
143#[macro_export]
165macro_rules! params {
166 () => {
167 &[] as &[&dyn $crate::ToSql]
168 };
169 ($($param:expr),+ $(,)?) => {
170 &[$(&$param as &dyn $crate::ToSql),+] as &[&dyn $crate::ToSql]
171 };
172}
173
174pub type Result<T, E = Error> = result::Result<T, E>;
176
177pub trait OptionalExt<T> {
179 fn optional(self) -> Result<Option<T>>;
185}
186
187impl<T> OptionalExt<T> for Result<T> {
188 fn optional(self) -> Result<Option<T>> {
189 match self {
190 Ok(value) => Ok(Some(value)),
191 Err(Error::QueryReturnedNoRows) => Ok(None),
192 Err(e) => Err(e),
193 }
194 }
195}
196
197#[derive(Copy, Clone, Debug)]
199pub enum DatabaseName<'a> {
200 Main,
202
203 Temp,
205
206 Attached(&'a str),
208}
209
210#[allow(clippy::needless_lifetimes)]
211impl<'a> fmt::Display for DatabaseName<'a> {
212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213 match *self {
214 DatabaseName::Main => write!(f, "main"),
215 DatabaseName::Temp => write!(f, "temp"),
216 DatabaseName::Attached(s) => write!(f, "{s}"),
217 }
218 }
219}
220
221pub const MAIN_DB: DatabaseName<'static> = DatabaseName::Main;
223
224pub const TEMP_DB: DatabaseName<'static> = DatabaseName::Temp;
226
227pub struct Connection {
229 db: RefCell<InnerConnection>,
230 cache: StatementCache,
231 path: Option<PathBuf>,
232}
233
234unsafe impl Send for Connection {}
235
236impl Connection {
237 #[inline]
258 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
259 Self::open_with_flags(path, Config::default())
260 }
261
262 #[inline]
268 pub fn open_in_memory() -> Result<Self> {
269 Self::open_in_memory_with_flags(Config::default())
270 }
271
272 #[inline]
281 pub unsafe fn open_from_raw(raw: ffi::duckdb_database) -> Result<Self> {
282 InnerConnection::new(raw, false).map(|db| Self {
283 db: RefCell::new(db),
284 cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
285 path: None, })
287 }
288
289 #[inline]
296 pub fn open_with_flags<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
297 #[cfg(unix)]
298 fn path_to_cstring(p: &Path) -> Result<CString> {
299 use std::os::unix::ffi::OsStrExt;
300 Ok(CString::new(p.as_os_str().as_bytes())?)
301 }
302
303 #[cfg(not(unix))]
304 fn path_to_cstring(p: &Path) -> Result<CString> {
305 let s = p.to_str().ok_or_else(|| Error::InvalidPath(p.to_owned()))?;
306 Ok(CString::new(s)?)
307 }
308
309 let c_path = path_to_cstring(path.as_ref())?;
310 let config = config.with("duckdb_api", "rust").unwrap();
311 InnerConnection::open_with_flags(&c_path, config).map(|db| Self {
312 db: RefCell::new(db),
313 cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
314 path: Some(path.as_ref().to_path_buf()),
315 })
316 }
317
318 #[inline]
324 pub fn open_in_memory_with_flags(config: Config) -> Result<Self> {
325 Self::open_with_flags(":memory:", config)
326 }
327
328 pub fn execute_batch(&self, sql: &str) -> Result<()> {
349 self.db.borrow_mut().execute(sql)
350 }
351
352 #[inline]
388 pub fn execute<P: Params>(&self, sql: &str, params: P) -> Result<usize> {
389 self.prepare(sql).and_then(|mut stmt| stmt.execute(params))
390 }
391
392 #[inline]
394 pub fn path(&self) -> Option<&Path> {
395 self.path.as_deref()
396 }
397
398 #[inline]
426 pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T>
427 where
428 P: Params,
429 F: FnOnce(&Row<'_>) -> Result<T>,
430 {
431 self.prepare(sql)?.query_row(params, f)
432 }
433
434 #[inline]
460 pub fn query_row_and_then<T, E, P, F>(&self, sql: &str, params: P, f: F) -> Result<T, E>
461 where
462 P: Params,
463 F: FnOnce(&Row<'_>) -> Result<T, E>,
464 E: convert::From<Error>,
465 {
466 self.prepare(sql)?
467 .query(params)?
468 .get_expected_row()
469 .map_err(E::from)
470 .and_then(f)
471 }
472
473 #[inline]
492 pub fn prepare(&self, sql: &str) -> Result<Statement<'_>> {
493 self.db.borrow_mut().prepare(self, sql)
494 }
495
496 pub fn appender(&self, table: &str) -> Result<Appender<'_>> {
514 self.appender_to_db(table, &DatabaseName::Main.to_string())
515 }
516
517 pub fn appender_to_db(&self, table: &str, schema: &str) -> Result<Appender<'_>> {
534 self.db.borrow_mut().appender(self, table, schema)
535 }
536
537 pub fn appender_to_catalog_and_db(&self, table: &str, catalog: &str, schema: &str) -> Result<Appender<'_>> {
554 self.db
555 .borrow_mut()
556 .appender_to_catalog_and_db(self, table, catalog, schema)
557 }
558
559 pub fn appender_with_columns(&self, table: &str, columns: &[&str]) -> Result<Appender<'_>> {
583 self.appender_with_columns_to_db(table, &DatabaseName::Main.to_string(), columns)
584 }
585
586 pub fn appender_with_columns_to_db(&self, table: &str, schema: &str, columns: &[&str]) -> Result<Appender<'_>> {
590 self.db
591 .borrow_mut()
592 .appender_with_columns(self, table, schema, None, columns)
593 }
594
595 pub fn appender_with_columns_to_catalog_and_db(
599 &self,
600 table: &str,
601 catalog: &str,
602 schema: &str,
603 columns: &[&str],
604 ) -> Result<Appender<'_>> {
605 self.db
606 .borrow_mut()
607 .appender_with_columns(self, table, schema, Some(catalog), columns)
608 }
609
610 pub fn interrupt_handle(&self) -> std::sync::Arc<InterruptHandle> {
631 self.db.borrow().get_interrupt_handle()
632 }
633
634 #[inline]
644 #[allow(clippy::result_large_err)]
645 pub fn close(self) -> Result<(), (Self, Error)> {
646 let r = self.db.borrow_mut().close();
647 r.map_err(move |err| (self, err))
648 }
649
650 #[inline]
653 pub fn is_autocommit(&self) -> bool {
654 self.db.borrow().is_autocommit()
655 }
656
657 pub fn try_clone(&self) -> Result<Self> {
659 let inner = self.db.borrow().try_clone()?;
660 Ok(Self {
661 db: RefCell::new(inner),
662 cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
663 path: self.path.clone(),
664 })
665 }
666
667 pub fn version(&self) -> Result<String> {
669 self.query_row("PRAGMA version", [], |row| row.get(0))
670 }
671}
672
673impl fmt::Debug for Connection {
674 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
675 f.debug_struct("Connection").field("path", &self.path).finish()
676 }
677}
678
679#[cfg(doctest)]
680doc_comment::doctest!("../../../README.md");
681
682#[cfg(test)]
683mod test {
684 use crate::types::Value;
685
686 use super::*;
687 use std::{error::Error as StdError, fmt};
688
689 use arrow::{array::Int32Array, datatypes::DataType, record_batch::RecordBatch};
690 use fallible_iterator::FallibleIterator;
691
692 #[allow(dead_code, unconditional_recursion, clippy::extra_unused_type_parameters)]
696 fn ensure_send<T: Send>() {
697 ensure_send::<Connection>();
698 }
699
700 pub fn checked_memory_handle() -> Connection {
701 Connection::open_in_memory().unwrap()
702 }
703
704 #[test]
705 fn test_params_of_vary_types() -> Result<()> {
706 let db = checked_memory_handle();
707 let sql = "BEGIN;
708 CREATE TABLE foo(bar TEXT, qux INTEGER);
709 INSERT INTO foo VALUES ('baz', 1), ('baz', 2), ('baz', 3);
710 END;";
711 db.execute_batch(sql)?;
712
713 let changed = db.execute("UPDATE foo SET qux = ? WHERE bar = ?", params![1i32, &"baz"])?;
714 assert_eq!(changed, 3);
715 Ok(())
716 }
717
718 #[test]
719 #[cfg_attr(windows, ignore = "Windows doesn't allow concurrent writes to a file")]
720 fn test_concurrent_transactions_busy_commit() -> Result<()> {
721 let tmp = tempfile::tempdir().unwrap();
722 let path = tmp.path().join("transactions.db3");
723
724 Connection::open(&path)?.execute_batch(
725 "
726 BEGIN;
727 CREATE TABLE foo(x INTEGER);
728 INSERT INTO foo VALUES(42);
729 END;",
730 )?;
731
732 let mut db1 =
733 Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
734 let mut db2 =
735 Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
736
737 {
738 let tx1 = db1.transaction()?;
739 let tx2 = db2.transaction()?;
740
741 tx1.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
743 tx2.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
744
745 tx1.execute("INSERT INTO foo VALUES(?1)", [1])?;
746 let _ = tx2.execute("INSERT INTO foo VALUES(?1)", [2]);
747
748 let _ = tx1.commit();
749 let _ = tx2.commit();
750 }
751
752 let _ = db1.transaction().expect("commit should have closed transaction");
753 let _ = db2.transaction().expect("commit should have closed transaction");
754 Ok(())
755 }
756
757 #[test]
758 fn test_persistence() -> Result<()> {
759 let temp_dir = tempfile::tempdir().unwrap();
760 let path = temp_dir.path().join("test.db3");
761
762 {
763 let db = Connection::open(&path)?;
764 let sql = "BEGIN;
765 CREATE TABLE foo(x INTEGER);
766 INSERT INTO foo VALUES(42);
767 END;";
768 db.execute_batch(sql)?;
769 }
770
771 let path_string = path.to_str().unwrap();
772 let db = Connection::open(path_string)?;
773 let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
774
775 assert_eq!(42i64, the_answer?);
776 Ok(())
777 }
778
779 #[test]
780 fn test_open() {
781 let con = Connection::open_in_memory();
782 if let Err(e) = con {
783 panic!("open error {e}");
784 }
785 assert!(Connection::open_in_memory().is_ok());
786 let db = checked_memory_handle();
787 assert!(db.close().is_ok());
788 let _ = checked_memory_handle();
789 let _ = checked_memory_handle();
790 }
791
792 #[test]
793 fn test_open_from_raw() {
794 let con = Connection::open_in_memory();
795 assert!(con.is_ok());
796 let inner_con: InnerConnection = con.unwrap().db.into_inner();
797 unsafe {
798 assert!(Connection::open_from_raw(inner_con.db).is_ok());
799 }
800 }
801
802 #[test]
803 fn test_open_failure() -> Result<()> {
804 let filename = "no_such_file.db";
805 let result =
806 Connection::open_with_flags(filename, Config::default().access_mode(config::AccessMode::ReadOnly)?);
807 assert!(result.is_err());
808 let err = result.err().unwrap();
809 if let Error::DuckDBFailure(_e, Some(msg)) = err {
810 assert!(
813 msg.contains(filename),
814 "error message '{msg}' does not contain '{filename}'"
815 );
816 } else {
817 panic!("DuckDBFailure expected");
818 }
819 Ok(())
820 }
821
822 #[cfg(unix)]
823 #[test]
824 fn test_invalid_unicode_file_names() -> Result<()> {
825 use std::{ffi::OsStr, fs::File, os::unix::ffi::OsStrExt};
826 let temp_dir = tempfile::tempdir().unwrap();
827
828 let path = temp_dir.path();
829 if File::create(path.join(OsStr::from_bytes(&[0xFE]))).is_err() {
830 return Ok(());
832 }
833 let db_path = path.join(OsStr::from_bytes(&[0xFF]));
834 {
835 let db = Connection::open(&db_path)?;
836 let sql = "BEGIN;
837 CREATE TABLE foo(x INTEGER);
838 INSERT INTO foo VALUES(42);
839 END;";
840 db.execute_batch(sql)?;
841 }
842
843 let db = Connection::open(&db_path)?;
844 let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
845
846 assert_eq!(42i64, the_answer?);
847 Ok(())
848 }
849
850 #[test]
851 fn test_close_always_ok() -> Result<()> {
852 let db = checked_memory_handle();
853
854 db.close().unwrap();
857 Ok(())
858 }
859
860 #[test]
861 fn test_execute_batch() -> Result<()> {
862 let db = checked_memory_handle();
863 let sql = "BEGIN;
864 CREATE TABLE foo(x INTEGER);
865 INSERT INTO foo VALUES(1);
866 INSERT INTO foo VALUES(2);
867 INSERT INTO foo VALUES(3);
868 INSERT INTO foo VALUES(4);
869 END;";
870 db.execute_batch(sql)?;
871
872 db.execute_batch("UPDATE foo SET x = 3 WHERE x < 3")?;
873
874 assert!(db.execute_batch("INVALID SQL").is_err());
875 Ok(())
876 }
877
878 #[test]
879 fn test_execute_single() -> Result<()> {
880 let db = checked_memory_handle();
881 db.execute_batch("CREATE TABLE foo(x INTEGER)")?;
882
883 assert_eq!(
884 3,
885 db.execute("INSERT INTO foo(x) VALUES (?), (?), (?)", [1i32, 2i32, 3i32])?
886 );
887 assert_eq!(1, db.execute("INSERT INTO foo(x) VALUES (?)", [4i32])?);
888
889 assert_eq!(
890 10i32,
891 db.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
892 );
893 Ok(())
894 }
895
896 #[test]
897 fn test_prepare_column_names() -> Result<()> {
898 let db = checked_memory_handle();
899 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
900
901 let mut stmt = db.prepare("SELECT * FROM foo")?;
902 stmt.execute([])?;
903 assert_eq!(stmt.column_count(), 1);
904 assert_eq!(stmt.column_names(), vec!["x"]);
905
906 let mut stmt = db.prepare("SELECT x AS a, x AS b FROM foo")?;
907 stmt.execute([])?;
908 assert_eq!(stmt.column_count(), 2);
909 assert_eq!(stmt.column_names(), vec!["a", "b"]);
910 Ok(())
911 }
912
913 #[test]
914 fn test_prepare_execute() -> Result<()> {
915 let db = checked_memory_handle();
916 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
917
918 let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
919 assert_eq!(insert_stmt.execute([1i32])?, 1);
920 assert_eq!(insert_stmt.execute([2i32])?, 1);
921 assert_eq!(insert_stmt.execute([3i32])?, 1);
922
923 assert!(insert_stmt.execute(["hello"]).is_err());
924 let mut update_stmt = db.prepare("UPDATE foo SET x=? WHERE x<?")?;
929 assert_eq!(update_stmt.execute([3i32, 3i32])?, 2);
930 assert_eq!(update_stmt.execute([3i32, 3i32])?, 0);
931 assert_eq!(update_stmt.execute([8i32, 8i32])?, 3);
932 Ok(())
933 }
934
935 #[test]
936 fn test_prepare_query() -> Result<()> {
937 let db = checked_memory_handle();
938 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
939
940 let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
941 assert_eq!(insert_stmt.execute([1i32])?, 1);
942 assert_eq!(insert_stmt.execute([2i32])?, 1);
943 assert_eq!(insert_stmt.execute([3i32])?, 1);
944
945 let mut query = db.prepare("SELECT x FROM foo WHERE x < ? ORDER BY x DESC")?;
946 {
947 let mut rows = query.query([4i32])?;
948 let mut v = Vec::<i32>::new();
949
950 while let Some(row) = rows.next()? {
951 v.push(row.get(0)?);
952 }
953
954 assert_eq!(v, [3i32, 2, 1]);
955 }
956
957 {
958 let mut rows = query.query([3i32])?;
959 let mut v = Vec::<i32>::new();
960
961 while let Some(row) = rows.next()? {
962 v.push(row.get(0)?);
963 }
964
965 assert_eq!(v, [2i32, 1]);
966 }
967 Ok(())
968 }
969
970 #[test]
971 fn test_query_map() -> Result<()> {
972 let db = checked_memory_handle();
973 let sql = "BEGIN;
974 CREATE TABLE foo(x INTEGER, y TEXT);
975 INSERT INTO foo VALUES(4, 'hello');
976 INSERT INTO foo VALUES(3, ', ');
977 INSERT INTO foo VALUES(2, 'world');
978 INSERT INTO foo VALUES(1, '!');
979 END;";
980 db.execute_batch(sql)?;
981
982 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
983 let results: Result<Vec<String>> = query.query([])?.map(|row| row.get(1)).collect();
984
985 assert_eq!(results?.concat(), "hello, world!");
986 Ok(())
987 }
988
989 #[test]
990 fn test_query_row() -> Result<()> {
991 let db = checked_memory_handle();
992 let sql = "BEGIN;
993 CREATE TABLE foo(x INTEGER);
994 INSERT INTO foo VALUES(1);
995 INSERT INTO foo VALUES(2);
996 INSERT INTO foo VALUES(3);
997 INSERT INTO foo VALUES(4);
998 END;";
999 db.execute_batch(sql)?;
1000
1001 assert_eq!(
1002 10i64,
1003 db.query_row::<i64, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
1004 );
1005
1006 let result: Result<i64> = db.query_row("SELECT x FROM foo WHERE x > 5", [], |r| r.get(0));
1007 match result.unwrap_err() {
1008 Error::QueryReturnedNoRows => (),
1009 err => panic!("Unexpected error {err}"),
1010 }
1011
1012 let bad_query_result = db.query_row("NOT A PROPER QUERY; test123", [], |_| Ok(()));
1013
1014 assert!(bad_query_result.is_err());
1015 Ok(())
1016 }
1017
1018 #[test]
1019 fn test_optional() -> Result<()> {
1020 let db = checked_memory_handle();
1021
1022 let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 <> 0", [], |r| r.get(0));
1023 let result = result.optional();
1024 match result? {
1025 None => (),
1026 _ => panic!("Unexpected result"),
1027 }
1028
1029 let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 == 0", [], |r| r.get(0));
1030 let result = result.optional();
1031 match result? {
1032 Some(1) => (),
1033 _ => panic!("Unexpected result"),
1034 }
1035
1036 let bad_query_result: Result<i64> = db.query_row("NOT A PROPER QUERY", [], |r| r.get(0));
1037 let bad_query_result = bad_query_result.optional();
1038 assert!(bad_query_result.is_err());
1039 Ok(())
1040 }
1041
1042 #[test]
1043 fn test_prepare_failures() -> Result<()> {
1044 let db = checked_memory_handle();
1045 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1046
1047 let _ = db.prepare("SELECT * FROM does_not_exist").unwrap_err();
1048 Ok(())
1050 }
1051
1052 #[test]
1053 fn test_is_autocommit() {
1054 let db = checked_memory_handle();
1055 assert!(db.is_autocommit(), "autocommit expected to be active by default");
1056 }
1057
1058 #[test]
1059 #[should_panic(expected = "not supported")]
1060 fn test_statement_debugging() {
1061 let db = checked_memory_handle();
1062 let query = "SELECT 12345";
1063 let stmt = db.prepare(query).unwrap();
1064
1065 assert!(format!("{stmt:?}").contains(query));
1066 }
1067
1068 #[test]
1069 fn test_notnull_constraint_error() -> Result<()> {
1070 let db = checked_memory_handle();
1071 db.execute_batch("CREATE TABLE foo(x TEXT NOT NULL)")?;
1072
1073 let result = db.execute("INSERT INTO foo (x) VALUES (NULL)", []);
1074 assert!(result.is_err());
1075
1076 match result.unwrap_err() {
1077 Error::DuckDBFailure(err, _) => {
1078 assert_eq!(err.code, ErrorCode::Unknown);
1080 }
1081 err => panic!("Unexpected error {err}"),
1082 }
1083 Ok(())
1084 }
1085
1086 #[test]
1087 fn test_clone() -> Result<()> {
1088 {
1090 let owned_con = checked_memory_handle();
1091 {
1092 let cloned_con = owned_con.try_clone().unwrap();
1093 cloned_con.execute_batch("create table test (c1 bigint)")?;
1094 cloned_con.close().unwrap();
1095 }
1096 owned_con.execute_batch("create table test2 (c1 bigint)")?;
1097 owned_con.close().unwrap();
1098 }
1099
1100 {
1102 let cloned_con = {
1103 let owned_con = checked_memory_handle();
1104 let clone = owned_con.try_clone().unwrap();
1105 owned_con.execute_batch("create table test (c1 bigint)")?;
1106 owned_con.close().unwrap();
1107 clone
1108 };
1109 cloned_con.execute_batch("create table test2 (c1 bigint)")?;
1110 cloned_con.close().unwrap();
1111 }
1112 Ok(())
1113 }
1114
1115 mod query_and_then_tests {
1116 use super::*;
1117
1118 #[derive(Debug)]
1119 enum CustomError {
1120 SomeError,
1121 Sqlite(Error),
1122 }
1123
1124 impl fmt::Display for CustomError {
1125 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1126 match *self {
1127 Self::SomeError => write!(f, "my custom error"),
1128 Self::Sqlite(ref se) => write!(f, "my custom error: {se}"),
1129 }
1130 }
1131 }
1132
1133 impl StdError for CustomError {
1134 fn description(&self) -> &str {
1135 "my custom error"
1136 }
1137
1138 fn cause(&self) -> Option<&dyn StdError> {
1139 match *self {
1140 Self::SomeError => None,
1141 Self::Sqlite(ref se) => Some(se),
1142 }
1143 }
1144 }
1145
1146 impl From<Error> for CustomError {
1147 fn from(se: Error) -> Self {
1148 Self::Sqlite(se)
1149 }
1150 }
1151
1152 type CustomResult<T> = Result<T, CustomError>;
1153
1154 #[test]
1155 fn test_query_and_then() -> Result<()> {
1156 let db = checked_memory_handle();
1157 let sql = "BEGIN;
1158 CREATE TABLE foo(x INTEGER, y TEXT);
1159 INSERT INTO foo VALUES(4, 'hello');
1160 INSERT INTO foo VALUES(3, ', ');
1161 INSERT INTO foo VALUES(2, 'world');
1162 INSERT INTO foo VALUES(1, '!');
1163 END;";
1164 db.execute_batch(sql)?;
1165
1166 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1167 let results: Result<Vec<String>> = query.query_and_then([], |row| row.get(1))?.collect();
1168
1169 assert_eq!(results?.concat(), "hello, world!");
1170 Ok(())
1171 }
1172
1173 #[test]
1174 fn test_query_and_then_fails() -> Result<()> {
1175 let db = checked_memory_handle();
1176 let sql = "BEGIN;
1177 CREATE TABLE foo(x INTEGER, y TEXT);
1178 INSERT INTO foo VALUES(4, 'hello');
1179 INSERT INTO foo VALUES(3, ', ');
1180 INSERT INTO foo VALUES(2, 'world');
1181 INSERT INTO foo VALUES(1, '!');
1182 END;";
1183 db.execute_batch(sql)?;
1184
1185 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1186 let bad_type: Result<Vec<f64>> = query.query_and_then([], |row| row.get(1))?.collect();
1187
1188 match bad_type.unwrap_err() {
1189 Error::InvalidColumnType(..) => (),
1190 err => panic!("Unexpected error {err}"),
1191 }
1192
1193 let bad_idx: Result<Vec<String>> = query.query_and_then([], |row| row.get(3))?.collect();
1194
1195 match bad_idx.unwrap_err() {
1196 Error::InvalidColumnIndex(_) => (),
1197 err => panic!("Unexpected error {err}"),
1198 }
1199 Ok(())
1200 }
1201
1202 #[test]
1203 fn test_query_and_then_custom_error() -> CustomResult<()> {
1204 let db = checked_memory_handle();
1205 let sql = "BEGIN;
1206 CREATE TABLE foo(x INTEGER, y TEXT);
1207 INSERT INTO foo VALUES(4, 'hello');
1208 INSERT INTO foo VALUES(3, ', ');
1209 INSERT INTO foo VALUES(2, 'world');
1210 INSERT INTO foo VALUES(1, '!');
1211 END;";
1212 db.execute_batch(sql)?;
1213
1214 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1215 let results: CustomResult<Vec<String>> = query
1216 .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1217 .collect();
1218
1219 assert_eq!(results?.concat(), "hello, world!");
1220 Ok(())
1221 }
1222
1223 #[test]
1224 fn test_query_and_then_custom_error_fails() -> Result<()> {
1225 let db = checked_memory_handle();
1226 let sql = "BEGIN;
1227 CREATE TABLE foo(x INTEGER, y TEXT);
1228 INSERT INTO foo VALUES(4, 'hello');
1229 INSERT INTO foo VALUES(3, ', ');
1230 INSERT INTO foo VALUES(2, 'world');
1231 INSERT INTO foo VALUES(1, '!');
1232 END;";
1233 db.execute_batch(sql)?;
1234
1235 let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1236 let bad_type: CustomResult<Vec<f64>> = query
1237 .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1238 .collect();
1239
1240 match bad_type.unwrap_err() {
1241 CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1242 err => panic!("Unexpected error {err}"),
1243 }
1244
1245 let bad_idx: CustomResult<Vec<String>> = query
1246 .query_and_then([], |row| row.get(3).map_err(CustomError::Sqlite))?
1247 .collect();
1248
1249 match bad_idx.unwrap_err() {
1250 CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1251 err => panic!("Unexpected error {err}"),
1252 }
1253
1254 let non_sqlite_err: CustomResult<Vec<String>> =
1255 query.query_and_then([], |_| Err(CustomError::SomeError))?.collect();
1256
1257 match non_sqlite_err.unwrap_err() {
1258 CustomError::SomeError => (),
1259 err => panic!("Unexpected error {err}"),
1260 }
1261 Ok(())
1262 }
1263
1264 #[test]
1265 fn test_query_row_and_then_custom_error() -> CustomResult<()> {
1266 let db = checked_memory_handle();
1267 let sql = "BEGIN;
1268 CREATE TABLE foo(x INTEGER, y TEXT);
1269 INSERT INTO foo VALUES(4, 'hello');
1270 END;";
1271 db.execute_batch(sql)?;
1272
1273 let query = "SELECT x, y FROM foo ORDER BY x DESC";
1274 let results: CustomResult<String> =
1275 db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1276
1277 assert_eq!(results?, "hello");
1278 Ok(())
1279 }
1280
1281 #[test]
1282 fn test_query_row_and_then_custom_error_fails() -> Result<()> {
1283 let db = checked_memory_handle();
1284 let sql = "BEGIN;
1285 CREATE TABLE foo(x INTEGER, y TEXT);
1286 INSERT INTO foo VALUES(4, 'hello');
1287 END;";
1288 db.execute_batch(sql)?;
1289
1290 let query = "SELECT x, y FROM foo ORDER BY x DESC";
1291 let bad_type: CustomResult<f64> =
1292 db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1293
1294 match bad_type.unwrap_err() {
1295 CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1296 err => panic!("Unexpected error {err}"),
1297 }
1298
1299 let bad_idx: CustomResult<String> =
1300 db.query_row_and_then(query, [], |row| row.get(3).map_err(CustomError::Sqlite));
1301
1302 match bad_idx.unwrap_err() {
1303 CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1304 err => panic!("Unexpected error {err}"),
1305 }
1306
1307 let non_sqlite_err: CustomResult<String> =
1308 db.query_row_and_then(query, [], |_| Err(CustomError::SomeError));
1309
1310 match non_sqlite_err.unwrap_err() {
1311 CustomError::SomeError => (),
1312 err => panic!("Unexpected error {err}"),
1313 }
1314 Ok(())
1315 }
1316
1317 #[test]
1318 fn test_rows_and_then_with_custom_error() -> Result<()> {
1319 let db = checked_memory_handle();
1320 db.execute_batch("CREATE TABLE test (value INTEGER)")?;
1321 db.execute_batch("INSERT INTO test VALUES (1), (3), (5)")?;
1322
1323 let mut stmt = db.prepare("SELECT value FROM test ORDER BY value")?;
1324 let rows = stmt.query([])?;
1325
1326 let results: Vec<i32> = rows
1328 .and_then(|row| -> CustomResult<i32> {
1329 let val: i32 = row.get(0)?; if val > 10 {
1331 Err(CustomError::SomeError) } else {
1333 Ok(val)
1334 }
1335 })
1336 .collect::<CustomResult<Vec<_>>>()
1337 .unwrap();
1338
1339 assert_eq!(results, vec![1, 3, 5]);
1340 Ok(())
1341 }
1342 }
1343
1344 #[test]
1345 fn test_dynamic() -> Result<()> {
1346 let db = checked_memory_handle();
1347 let sql = "BEGIN;
1348 CREATE TABLE foo(x INTEGER, y TEXT);
1349 INSERT INTO foo VALUES(4, 'hello');
1350 END;";
1351 db.execute_batch(sql)?;
1352
1353 db.query_row("SELECT * FROM foo", [], |r| {
1354 assert_eq!(2, r.as_ref().column_count());
1355 Ok(())
1356 })
1357 }
1358 #[test]
1359 fn test_dyn_box() -> Result<()> {
1360 let db = checked_memory_handle();
1361 db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1362 let b: Box<dyn ToSql> = Box::new(5);
1363 db.execute("INSERT INTO foo VALUES(?)", [b])?;
1364 db.query_row("SELECT x FROM foo", [], |r| {
1365 assert_eq!(5, r.get_unwrap::<_, i32>(0));
1366 Ok(())
1367 })
1368 }
1369
1370 #[test]
1371 fn test_alter_table() -> Result<()> {
1372 let db = checked_memory_handle();
1373 db.execute_batch("CREATE TABLE x(t INTEGER);")?;
1374 db.execute("ALTER TABLE x RENAME TO y;", [])?;
1376 Ok(())
1377 }
1378
1379 #[test]
1380 fn test_query_arrow_record_batch_small() -> Result<()> {
1381 let db = checked_memory_handle();
1382 let sql = "BEGIN TRANSACTION;
1383 CREATE TABLE test(t INTEGER);
1384 INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);
1385 END TRANSACTION;";
1386 db.execute_batch(sql)?;
1387 let mut stmt = db.prepare("select t from test order by t desc")?;
1388 let mut arr = stmt.query_arrow([])?;
1389
1390 let schema = arr.get_schema();
1391 assert_eq!(schema.fields().len(), 1);
1392 assert_eq!(schema.field(0).name(), "t");
1393 assert_eq!(schema.field(0).data_type(), &DataType::Int32);
1394
1395 let rb = arr.next().unwrap();
1396 let column = rb.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1397 assert_eq!(column.len(), 5);
1398 assert_eq!(column.value(0), 5);
1399 assert_eq!(column.value(1), 4);
1400 assert_eq!(column.value(2), 3);
1401 assert_eq!(column.value(3), 2);
1402 assert_eq!(column.value(4), 1);
1403
1404 assert!(arr.next().is_none());
1405 Ok(())
1406 }
1407
1408 #[test]
1409 fn test_query_arrow_record_batch_large() -> Result<()> {
1410 let db = checked_memory_handle();
1411 db.execute_batch("BEGIN TRANSACTION")?;
1412 db.execute_batch("CREATE TABLE test(t INTEGER);")?;
1413 for _ in 0..600 {
1414 db.execute_batch("INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);")?;
1415 }
1416 db.execute_batch("END TRANSACTION")?;
1417 let rbs: Vec<RecordBatch> = db.prepare("select t from test order by t")?.query_arrow([])?.collect();
1418 assert_eq!(rbs.iter().map(|rb| rb.num_rows()).sum::<usize>(), 3000);
1421 assert_eq!(
1422 rbs.iter()
1423 .map(|rb| rb
1424 .column(0)
1425 .as_any()
1426 .downcast_ref::<Int32Array>()
1427 .unwrap()
1428 .iter()
1429 .map(|i| i.unwrap())
1430 .sum::<i32>())
1431 .sum::<i32>(),
1432 9000
1433 );
1434 Ok(())
1435 }
1436
1437 #[test]
1438 fn test_stream_arrow_with_call() -> Result<()> {
1439 use arrow::datatypes::{DataType, Field, Schema};
1440 use std::sync::Arc;
1441
1442 let db = checked_memory_handle();
1443
1444 db.execute_batch(
1445 "CREATE TABLE test_data(id INTEGER, name VARCHAR);
1446 INSERT INTO test_data VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie');",
1447 )?;
1448
1449 db.execute_batch("CREATE MACRO test_func() AS TABLE SELECT * FROM test_data;")?;
1450
1451 let schema = Arc::new(Schema::new(vec![
1452 Field::new("id", DataType::Int32, true),
1453 Field::new("name", DataType::Utf8, true),
1454 ]));
1455
1456 let mut stmt = db.prepare("CALL test_func()")?;
1457 let rbs: Vec<RecordBatch> = stmt.stream_arrow([], schema)?.collect();
1458
1459 assert!(!rbs.is_empty(), "Expected at least one record batch");
1461 let total_rows: usize = rbs.iter().map(|rb| rb.num_rows()).sum();
1462 assert_eq!(total_rows, 3);
1463
1464 let id_column = rbs[0].column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1465 assert_eq!(id_column.value(0), 1);
1466
1467 Ok(())
1468 }
1469
1470 #[test]
1471 fn round_trip_interval() -> Result<()> {
1472 let db = checked_memory_handle();
1473 db.execute_batch("CREATE TABLE foo (t INTERVAL);")?;
1474
1475 let d = Value::Interval {
1476 months: 1,
1477 days: 2,
1478 nanos: 3,
1479 };
1480 db.execute("INSERT INTO foo VALUES (?)", [d])?;
1481
1482 let mut stmt = db.prepare("SELECT t FROM foo")?;
1483 let mut rows = stmt.query([])?;
1484 let row = rows.next()?.unwrap();
1485 let d: Value = row.get_unwrap(0);
1486 assert_eq!(d, d);
1487 Ok(())
1488 }
1489
1490 #[test]
1491 fn test_database_name_to_string() -> Result<()> {
1492 assert_eq!(DatabaseName::Main.to_string(), "main");
1493 assert_eq!(DatabaseName::Temp.to_string(), "temp");
1494 assert_eq!(DatabaseName::Attached("abc").to_string(), "abc");
1495 Ok(())
1496 }
1497
1498 #[test]
1499 fn test_interrupt() -> Result<()> {
1500 let db = checked_memory_handle();
1501 let db_interrupt = db.interrupt_handle();
1502
1503 let (tx, rx) = std::sync::mpsc::channel();
1504 std::thread::spawn(move || {
1505 let mut stmt = db
1506 .prepare("select count(*) from range(10000000) t1, range(1000000) t2")
1507 .unwrap();
1508 tx.send(stmt.execute([])).unwrap();
1509 });
1510
1511 std::thread::sleep(std::time::Duration::from_millis(100));
1512 db_interrupt.interrupt();
1513
1514 let result = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap();
1515 assert!(result.is_err_and(|err| err.to_string().contains("INTERRUPT")));
1516 Ok(())
1517 }
1518
1519 #[test]
1520 fn test_interrupt_on_dropped_db() {
1521 let db = checked_memory_handle();
1522 let db_interrupt = db.interrupt_handle();
1523
1524 drop(db);
1525 db_interrupt.interrupt();
1526 }
1527
1528 #[cfg(feature = "bundled")]
1529 #[test]
1530 fn test_version() -> Result<()> {
1531 let db = checked_memory_handle();
1532 let expected: String = format!("v{}", env!("CARGO_PKG_VERSION"));
1533 let actual = db.version()?;
1534 assert_eq!(expected, actual);
1535 Ok(())
1536 }
1537
1538 #[test]
1539 fn test_arrow_string_view_setting() -> Result<()> {
1540 {
1542 let config = Config::default().with("produce_arrow_string_view", "true")?;
1543 let conn = Connection::open_in_memory_with_flags(config)?;
1544
1545 let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1546 let arrow = query.query_arrow([])?;
1547
1548 let batch = arrow.into_iter().next().expect("Expected at least one batch");
1549 assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8);
1550 }
1551
1552 {
1553 let config = Config::default()
1554 .with("produce_arrow_string_view", "true")?
1555 .with("arrow_output_version", "1.4")?;
1556 let conn = Connection::open_in_memory_with_flags(config)?;
1557
1558 let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1559 let arrow = query.query_arrow([])?;
1560
1561 let batch = arrow.into_iter().next().expect("Expected at least one batch");
1562 assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8View);
1563 }
1564
1565 Ok(())
1566 }
1567
1568 #[test]
1569 fn test_prepare_multi_statement() -> Result<()> {
1570 let db = checked_memory_handle();
1571
1572 {
1573 let mut stmt =
1574 db.prepare("CREATE TABLE test(x INTEGER); INSERT INTO test VALUES (42); SELECT x FROM test;")?;
1575 let result: i32 = stmt.query_row([], |row| row.get(0))?;
1576 assert_eq!(result, 42);
1577 }
1578
1579 {
1580 let mut stmt = db.prepare(
1581 "CREATE TEMP TABLE temp_data(id INTEGER, value TEXT);
1582 INSERT INTO temp_data VALUES (1, 'first'), (2, 'second');
1583 SELECT COUNT(*) FROM temp_data;",
1584 )?;
1585 let count: i32 = stmt.query_row([], |row| row.get(0))?;
1586 assert_eq!(count, 2);
1587 }
1588
1589 Ok(())
1590 }
1591
1592 #[test]
1593 fn test_pivot_query() -> Result<()> {
1594 let db = checked_memory_handle();
1595
1596 db.execute_batch(
1597 "CREATE TABLE cities(city VARCHAR, year INTEGER, population INTEGER);
1598 INSERT INTO cities VALUES
1599 ('Amsterdam', 2000, 1005),
1600 ('Amsterdam', 2010, 1065),
1601 ('Amsterdam', 2020, 1158),
1602 ('Berlin', 2000, 3382),
1603 ('Berlin', 2010, 3460),
1604 ('Berlin', 2020, 3576);",
1605 )?;
1606
1607 let mut stmt = db.prepare("PIVOT cities ON year USING sum(population);")?;
1609 let mut rows = stmt.query([])?;
1610
1611 let mut row_count = 0;
1612 while let Some(_row) = rows.next()? {
1613 row_count += 1;
1614 }
1615 assert_eq!(row_count, 2);
1616
1617 Ok(())
1618 }
1619
1620 #[test]
1621 fn test_multiple_memory_databases() -> Result<()> {
1622 {
1624 let mem1 = Connection::open_in_memory()?;
1625 let mem2 = Connection::open_in_memory()?;
1626
1627 mem1.execute_batch("CREATE TABLE test (id INTEGER)")?;
1628 mem1.execute("INSERT INTO test VALUES (1)", [])?;
1629
1630 mem2.execute_batch("CREATE TABLE test (id INTEGER)")?;
1631 mem2.execute("INSERT INTO test VALUES (2)", [])?;
1632
1633 let value1: i32 = mem1.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1634 assert_eq!(value1, 1);
1635
1636 let value2: i32 = mem2.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1637 assert_eq!(value2, 2);
1638 }
1639
1640 {
1642 let shared = Connection::open_in_memory()?;
1643
1644 shared.execute_batch("CREATE TABLE shared_table (id INTEGER)")?;
1645 shared.execute("INSERT INTO shared_table VALUES (123)", [])?;
1646
1647 let cloned = shared.try_clone()?;
1648
1649 let value: i32 = cloned.query_row("SELECT id FROM shared_table", [], |r| r.get(0))?;
1651 assert_eq!(value, 123);
1652
1653 cloned.execute("INSERT INTO shared_table VALUES (456)", [])?;
1654
1655 let count: i64 = shared.query_row("SELECT COUNT(*) FROM shared_table", [], |r| r.get(0))?;
1657 assert_eq!(count, 2);
1658 }
1659
1660 Ok(())
1661 }
1662
1663 #[test]
1664 fn test_appender_with_catalog() -> Result<()> {
1665 let db = checked_memory_handle();
1666
1667 let temp_dir = tempfile::tempdir().unwrap();
1669 let attached_path = temp_dir.path().join("attached.db");
1670 db.execute_batch(&format!("ATTACH '{}' AS attached_db", attached_path.display()))?;
1671
1672 db.execute_batch("CREATE TABLE attached_db.main.test_table (id INTEGER, name TEXT)")?;
1674
1675 {
1677 let mut app = db.appender_to_catalog_and_db("test_table", "attached_db", "main")?;
1678 app.append_row(params![1, "Alice"])?;
1679 app.append_row(params![2, "Bob"])?;
1680 app.append_row(params![3, "Charlie"])?;
1681 }
1682
1683 let count: i64 = db.query_row("SELECT COUNT(*) FROM attached_db.main.test_table", [], |r| r.get(0))?;
1685 assert_eq!(count, 3);
1686
1687 let name: String = db.query_row("SELECT name FROM attached_db.main.test_table WHERE id = ?", [2], |r| {
1688 r.get(0)
1689 })?;
1690 assert_eq!(name, "Bob");
1691
1692 Ok(())
1693 }
1694
1695 #[test]
1696 fn test_appender_with_catalog_multiple_schemas() -> Result<()> {
1697 let db = checked_memory_handle();
1698
1699 let temp_dir = tempfile::tempdir().unwrap();
1701 let attached_path = temp_dir.path().join("multi_schema.db");
1702 db.execute_batch(&format!("ATTACH '{}' AS my_catalog", attached_path.display()))?;
1703
1704 db.execute_batch("CREATE SCHEMA my_catalog.schema1")?;
1706 db.execute_batch("CREATE SCHEMA my_catalog.schema2")?;
1707 db.execute_batch("CREATE TABLE my_catalog.schema1.data (value INTEGER)")?;
1708 db.execute_batch("CREATE TABLE my_catalog.schema2.data (value INTEGER)")?;
1709
1710 {
1712 let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema1")?;
1713 app.append_rows([[10], [20], [30]])?;
1714 }
1715
1716 {
1718 let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema2")?;
1719 app.append_rows([[100], [200]])?;
1720 }
1721
1722 let sum1: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema1.data", [], |r| r.get(0))?;
1724 assert_eq!(sum1, 60);
1725
1726 let sum2: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema2.data", [], |r| r.get(0))?;
1728 assert_eq!(sum2, 300);
1729
1730 Ok(())
1731 }
1732
1733 #[test]
1734 fn test_appender_with_catalog_main_vs_attached() -> Result<()> {
1735 let db = checked_memory_handle();
1736
1737 db.execute_batch("CREATE TABLE test (id INTEGER)")?;
1739
1740 let temp_dir = tempfile::tempdir().unwrap();
1742 let attached_path = temp_dir.path().join("other.db");
1743 db.execute_batch(&format!("ATTACH '{}' AS other_db", attached_path.display()))?;
1744 db.execute_batch("CREATE TABLE other_db.main.test (id INTEGER)")?;
1745
1746 {
1748 let mut app = db.appender_to_catalog_and_db("test", "memory", "main")?;
1749 app.append_rows([[1], [2]])?;
1750 }
1751
1752 {
1754 let mut app = db.appender_to_catalog_and_db("test", "other_db", "main")?;
1755 app.append_rows([[100], [200]])?;
1756 }
1757
1758 let count_main: i64 = db.query_row("SELECT COUNT(*) FROM test", [], |r| r.get(0))?;
1760 assert_eq!(count_main, 2);
1761
1762 let count_attached: i64 = db.query_row("SELECT COUNT(*) FROM other_db.main.test", [], |r| r.get(0))?;
1764 assert_eq!(count_attached, 2);
1765
1766 Ok(())
1767 }
1768
1769 #[test]
1770 fn test_appender_with_catalog_error_invalid_catalog() -> Result<()> {
1771 let db = checked_memory_handle();
1772
1773 let result = db.appender_to_catalog_and_db("test", "nonexistent_catalog", "main");
1775 assert!(result.is_err());
1776
1777 Ok(())
1778 }
1779
1780 #[test]
1781 fn test_appender_with_catalog_error_invalid_schema() -> Result<()> {
1782 let db = checked_memory_handle();
1783
1784 let temp_dir = tempfile::tempdir().unwrap();
1786 let attached_path = temp_dir.path().join("test.db");
1787 db.execute_batch(&format!("ATTACH '{}' AS my_db", attached_path.display()))?;
1788
1789 db.execute_batch("CREATE TABLE my_db.main.test (id INTEGER)")?;
1790
1791 let result = db.appender_to_catalog_and_db("test", "my_db", "nonexistent_schema");
1793 assert!(result.is_err());
1794
1795 Ok(())
1796 }
1797
1798 #[test]
1799 fn test_appender_with_catalog_flush() -> Result<()> {
1800 let db = checked_memory_handle();
1801
1802 let temp_dir = tempfile::tempdir().unwrap();
1804 let attached_path = temp_dir.path().join("flush_test.db");
1805 db.execute_batch(&format!("ATTACH '{}' AS flush_db", attached_path.display()))?;
1806
1807 db.execute_batch("CREATE TABLE flush_db.main.test (id INTEGER)")?;
1808
1809 {
1811 let mut app = db.appender_to_catalog_and_db("test", "flush_db", "main")?;
1812 app.append_row([1])?;
1813 app.append_row([2])?;
1814 app.flush()?;
1815 app.append_row([3])?;
1816 app.flush()?;
1817 }
1818
1819 let count: i64 = db.query_row("SELECT COUNT(*) FROM flush_db.main.test", [], |r| r.get(0))?;
1821 assert_eq!(count, 3);
1822
1823 Ok(())
1824 }
1825}