Skip to main content

duckdb/
lib.rs

1//! duckdb-rs is an ergonomic wrapper for using DuckDB from Rust. It attempts to
2//! expose an interface similar to [rusqlite](https://github.com/rusqlite/rusqlite).
3//!
4//! ```rust
5//! use duckdb::{params, Connection, Result};
6//! use duckdb::arrow::record_batch::RecordBatch;
7//! use duckdb::arrow::util::pretty::print_batches;
8//!
9//! #[derive(Debug)]
10//! struct Person {
11//!     id: i32,
12//!     name: String,
13//!     data: Option<Vec<u8>>,
14//! }
15//!
16//! fn main() -> Result<()> {
17//!     let conn = Connection::open_in_memory()?;
18//!
19//!     conn.execute_batch(
20//!         r"CREATE SEQUENCE seq;
21//!           CREATE TABLE person (
22//!                   id              INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq'),
23//!                   name            TEXT NOT NULL,
24//!                   data            BLOB
25//!                   );
26//!          ")?;
27//!     let me = Person {
28//!         id: 0,
29//!         name: "Steven".to_string(),
30//!         data: None,
31//!     };
32//!     conn.execute(
33//!         "INSERT INTO person (name, data) VALUES (?, ?)",
34//!         params![me.name, me.data],
35//!     )?;
36//!
37//!     let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
38//!     let person_iter = stmt.query_map([], |row| {
39//!         Ok(Person {
40//!             id: row.get(0)?,
41//!             name: row.get(1)?,
42//!             data: row.get(2)?,
43//!         })
44//!     })?;
45//!
46//!     for person in person_iter {
47//!         println!("Found person {:?}", person.unwrap());
48//!     }
49//!
50//!     // query table by arrow
51//!     let rbs: Vec<RecordBatch> = stmt.query_arrow([])?.collect();
52//!     print_batches(&rbs);
53//!     Ok(())
54//! }
55//! ```
56#![warn(missing_docs)]
57
58pub use libduckdb_sys as ffi;
59
60use std::{
61    cell::RefCell,
62    convert,
63    ffi::CString,
64    fmt,
65    path::{Path, PathBuf},
66    result, str,
67};
68
69use crate::{cache::StatementCache, inner_connection::InnerConnection, raw_statement::RawStatement, types::ValueRef};
70
71#[cfg(feature = "r2d2")]
72pub use crate::r2d2::DuckdbConnectionManager;
73pub use crate::{
74    appender::Appender,
75    appender_params::{appender_params_from_iter, AppenderParams, AppenderParamsFromIter},
76    arrow_batch::{Arrow, ArrowStream},
77    cache::CachedStatement,
78    column::Column,
79    config::{AccessMode, Config, DefaultNullOrder, DefaultOrder},
80    error::Error,
81    ffi::ErrorCode,
82    inner_connection::InterruptHandle,
83    params::{params_from_iter, Params, ParamsFromIter},
84    row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows},
85    statement::Statement,
86    transaction::{DropBehavior, Transaction},
87    types::ToSql,
88};
89#[cfg(feature = "polars")]
90pub use polars_dataframe::Polars;
91
92// re-export dependencies to minimise version maintenance for crate users
93pub use arrow;
94#[cfg(feature = "loadable-extension")]
95pub use duckdb_loadable_macros::duckdb_entrypoint_c_api;
96#[cfg(feature = "polars")]
97pub use polars;
98#[cfg(feature = "polars")]
99pub use polars_arrow as arrow2;
100
101/// The core module contains the main functionality of the DuckDB crate.
102pub mod core;
103
104#[macro_use]
105mod error;
106mod appender;
107mod appender_params;
108mod arrow_batch;
109mod arrow_scan;
110mod cache;
111mod column;
112mod config;
113mod inner_connection;
114mod params;
115#[cfg(feature = "polars")]
116mod polars_dataframe;
117mod pragma;
118#[cfg(feature = "r2d2")]
119mod r2d2;
120mod raw_statement;
121mod row;
122mod statement;
123mod transaction;
124
125#[cfg(feature = "extensions-full")]
126mod extension;
127
128pub mod types;
129/// The duckdb table function interface
130#[cfg(feature = "vtab")]
131pub mod vtab;
132
133/// The duckdb scalar function interface
134#[cfg(feature = "vscalar")]
135pub mod vscalar;
136
137#[cfg(test)]
138mod test_all_types;
139
140// Number of cached prepared statements we'll hold on to.
141const STATEMENT_CACHE_DEFAULT_CAPACITY: usize = 16;
142
143/// A macro making it more convenient to pass heterogeneous or long lists of
144/// parameters as a `&[&dyn ToSql]`.
145///
146/// # Example
147///
148/// ```rust,no_run
149/// # use duckdb::{Result, Connection, params};
150///
151/// struct Person {
152///     name: String,
153///     age_in_years: u8,
154///     data: Option<Vec<u8>>,
155/// }
156///
157/// fn add_person(conn: &Connection, person: &Person) -> Result<()> {
158///     conn.execute("INSERT INTO person (name, age_in_years, data)
159///                   VALUES (?1, ?2, ?3)",
160///                  params![person.name, person.age_in_years, person.data])?;
161///     Ok(())
162/// }
163/// ```
164#[macro_export]
165macro_rules! params {
166    () => {
167        &[] as &[&dyn $crate::ToSql]
168    };
169    ($($param:expr),+ $(,)?) => {
170        &[$(&$param as &dyn $crate::ToSql),+] as &[&dyn $crate::ToSql]
171    };
172}
173
174/// A typedef of the result returned by many methods.
175pub type Result<T, E = Error> = result::Result<T, E>;
176
177/// See the [method documentation](#tymethod.optional).
178pub trait OptionalExt<T> {
179    /// Converts a `Result<T>` into a `Result<Option<T>>`.
180    ///
181    /// By default, duckdb-rs treats 0 rows being returned from a query that is
182    /// expected to return 1 row as an error. This method will
183    /// handle that error, and give you back an `Option<T>` instead.
184    fn optional(self) -> Result<Option<T>>;
185}
186
187impl<T> OptionalExt<T> for Result<T> {
188    fn optional(self) -> Result<Option<T>> {
189        match self {
190            Ok(value) => Ok(Some(value)),
191            Err(Error::QueryReturnedNoRows) => Ok(None),
192            Err(e) => Err(e),
193        }
194    }
195}
196
197/// Name for a database within a DuckDB connection.
198#[derive(Copy, Clone, Debug)]
199pub enum DatabaseName<'a> {
200    /// The main database.
201    Main,
202
203    /// The temporary database (e.g., any "CREATE TEMPORARY TABLE" tables).
204    Temp,
205
206    /// A database that has been attached via "ATTACH DATABASE ...".
207    Attached(&'a str),
208}
209
210#[allow(clippy::needless_lifetimes)]
211impl<'a> fmt::Display for DatabaseName<'a> {
212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213        match *self {
214            DatabaseName::Main => write!(f, "main"),
215            DatabaseName::Temp => write!(f, "temp"),
216            DatabaseName::Attached(s) => write!(f, "{s}"),
217        }
218    }
219}
220
221/// Shorthand for [`DatabaseName::Main`].
222pub const MAIN_DB: DatabaseName<'static> = DatabaseName::Main;
223
224/// Shorthand for [`DatabaseName::Temp`].
225pub const TEMP_DB: DatabaseName<'static> = DatabaseName::Temp;
226
227/// A connection to a DuckDB database.
228pub struct Connection {
229    db: RefCell<InnerConnection>,
230    cache: StatementCache,
231    path: Option<PathBuf>,
232}
233
234unsafe impl Send for Connection {}
235
236impl Connection {
237    /// Open a new connection to a DuckDB database.
238    ///
239    /// `Connection::open(path)` is equivalent to
240    /// `Connection::open_with_flags(path,
241    /// Config::default())`.
242    ///
243    /// ```rust,no_run
244    /// # use duckdb::{Connection, Result};
245    /// fn open_my_db() -> Result<()> {
246    ///     let path = "./my_db.db3";
247    ///     let db = Connection::open(&path)?;
248    ///     println!("{}", db.is_autocommit());
249    ///     Ok(())
250    /// }
251    /// ```
252    ///
253    /// # Failure
254    ///
255    /// Will return `Err` if `path` cannot be converted to a C-compatible
256    /// string or if the underlying DuckDB open call fails.
257    #[inline]
258    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
259        Self::open_with_flags(path, Config::default())
260    }
261
262    /// Open a new connection to an in-memory DuckDB database.
263    ///
264    /// # Failure
265    ///
266    /// Will return `Err` if the underlying DuckDB open call fails.
267    #[inline]
268    pub fn open_in_memory() -> Result<Self> {
269        Self::open_in_memory_with_flags(Config::default())
270    }
271
272    /// Open a new connection to an ffi database.
273    ///
274    /// # Failure
275    ///
276    /// Will return `Err` if the underlying DuckDB open call fails.
277    /// # Safety
278    ///
279    /// Need to pass in a valid db instance
280    #[inline]
281    pub unsafe fn open_from_raw(raw: ffi::duckdb_database) -> Result<Self> {
282        InnerConnection::new(raw, false).map(|db| Self {
283            db: RefCell::new(db),
284            cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
285            path: None, // Can we know the path from connection?
286        })
287    }
288
289    /// Open a new connection to a DuckDB database.
290    ///
291    /// # Failure
292    ///
293    /// Will return `Err` if `path` cannot be converted to a C-compatible
294    /// string or if the underlying DuckDB open call fails.
295    #[inline]
296    pub fn open_with_flags<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
297        #[cfg(unix)]
298        fn path_to_cstring(p: &Path) -> Result<CString> {
299            use std::os::unix::ffi::OsStrExt;
300            Ok(CString::new(p.as_os_str().as_bytes())?)
301        }
302
303        #[cfg(not(unix))]
304        fn path_to_cstring(p: &Path) -> Result<CString> {
305            let s = p.to_str().ok_or_else(|| Error::InvalidPath(p.to_owned()))?;
306            Ok(CString::new(s)?)
307        }
308
309        let c_path = path_to_cstring(path.as_ref())?;
310        let config = config.with("duckdb_api", "rust").unwrap();
311        InnerConnection::open_with_flags(&c_path, config).map(|db| Self {
312            db: RefCell::new(db),
313            cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
314            path: Some(path.as_ref().to_path_buf()),
315        })
316    }
317
318    /// Open a new connection to an in-memory DuckDB database.
319    ///
320    /// # Failure
321    ///
322    /// Will return `Err` if the underlying DuckDB open call fails.
323    #[inline]
324    pub fn open_in_memory_with_flags(config: Config) -> Result<Self> {
325        Self::open_with_flags(":memory:", config)
326    }
327
328    /// Convenience method to run multiple SQL statements (that cannot take any
329    /// parameters).
330    ///
331    /// ## Example
332    ///
333    /// ```rust,no_run
334    /// # use duckdb::{Connection, Result};
335    /// fn create_tables(conn: &Connection) -> Result<()> {
336    ///     conn.execute_batch("BEGIN;
337    ///                         CREATE TABLE foo(x INTEGER);
338    ///                         CREATE TABLE bar(y TEXT);
339    ///                         COMMIT;",
340    ///     )
341    /// }
342    /// ```
343    ///
344    /// # Failure
345    ///
346    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
347    /// or if the underlying DuckDB call fails.
348    pub fn execute_batch(&self, sql: &str) -> Result<()> {
349        self.db.borrow_mut().execute(sql)
350    }
351
352    /// Convenience method to prepare and execute a single SQL statement.
353    ///
354    /// On success, returns the number of rows that were changed or inserted or
355    /// deleted.
356    ///
357    /// ## Example
358    ///
359    /// ### With params
360    ///
361    /// ```rust,no_run
362    /// # use duckdb::{Connection};
363    /// fn update_rows(conn: &Connection) {
364    ///     match conn.execute("UPDATE foo SET bar = 'baz' WHERE qux = ?", [1i32]) {
365    ///         Ok(updated) => println!("{} rows were updated", updated),
366    ///         Err(err) => println!("update failed: {}", err),
367    ///     }
368    /// }
369    /// ```
370    ///
371    /// ### With params of varying types
372    ///
373    /// ```rust,no_run
374    /// # use duckdb::{Connection, params};
375    /// fn update_rows(conn: &Connection) {
376    ///     match conn.execute("UPDATE foo SET bar = ? WHERE qux = ?", params![&"baz", 1i32]) {
377    ///         Ok(updated) => println!("{} rows were updated", updated),
378    ///         Err(err) => println!("update failed: {}", err),
379    ///     }
380    /// }
381    /// ```
382    ///
383    /// # Failure
384    ///
385    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
386    /// or if the underlying DuckDB call fails.
387    #[inline]
388    pub fn execute<P: Params>(&self, sql: &str, params: P) -> Result<usize> {
389        self.prepare(sql).and_then(|mut stmt| stmt.execute(params))
390    }
391
392    /// Returns the path to the database file, if one exists and is known.
393    #[inline]
394    pub fn path(&self) -> Option<&Path> {
395        self.path.as_deref()
396    }
397
398    /// Convenience method to execute a query that is expected to return a
399    /// single row.
400    ///
401    /// ## Example
402    ///
403    /// ```rust,no_run
404    /// # use duckdb::{Result, Connection};
405    /// fn preferred_locale(conn: &Connection) -> Result<String> {
406    ///     conn.query_row(
407    ///         "SELECT value FROM preferences WHERE name='locale'",
408    ///         [],
409    ///         |row| row.get(0),
410    ///     )
411    /// }
412    /// ```
413    ///
414    /// If the query returns more than one row, all rows except the first are
415    /// ignored.
416    ///
417    /// Returns `Err(QueryReturnedNoRows)` if no results are returned. If the
418    /// query truly is optional, you can call `.optional()` on the result of
419    /// this to get a `Result<Option<T>>`.
420    ///
421    /// # Failure
422    ///
423    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
424    /// or if the underlying DuckDB call fails.
425    #[inline]
426    pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T>
427    where
428        P: Params,
429        F: FnOnce(&Row<'_>) -> Result<T>,
430    {
431        self.prepare(sql)?.query_row(params, f)
432    }
433
434    /// Convenience method to execute a query that is expected to return a
435    /// single row, and execute a mapping via `f` on that returned row with
436    /// the possibility of failure. The `Result` type of `f` must implement
437    /// `std::convert::From<Error>`.
438    ///
439    /// ## Example
440    ///
441    /// ```rust,no_run
442    /// # use duckdb::{Result, Connection};
443    /// fn preferred_locale(conn: &Connection) -> Result<String> {
444    ///     conn.query_row_and_then(
445    ///         "SELECT value FROM preferences WHERE name='locale'",
446    ///         [],
447    ///         |row| row.get(0),
448    ///     )
449    /// }
450    /// ```
451    ///
452    /// If the query returns more than one row, all rows except the first are
453    /// ignored.
454    ///
455    /// # Failure
456    ///
457    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
458    /// or if the underlying DuckDB call fails.
459    #[inline]
460    pub fn query_row_and_then<T, E, P, F>(&self, sql: &str, params: P, f: F) -> Result<T, E>
461    where
462        P: Params,
463        F: FnOnce(&Row<'_>) -> Result<T, E>,
464        E: convert::From<Error>,
465    {
466        self.prepare(sql)?
467            .query(params)?
468            .get_expected_row()
469            .map_err(E::from)
470            .and_then(f)
471    }
472
473    /// Prepare a SQL statement for execution.
474    ///
475    /// ## Example
476    ///
477    /// ```rust,no_run
478    /// # use duckdb::{Connection, Result};
479    /// fn insert_new_people(conn: &Connection) -> Result<()> {
480    ///     let mut stmt = conn.prepare("INSERT INTO People (name) VALUES (?)")?;
481    ///     stmt.execute(["Joe Smith"])?;
482    ///     stmt.execute(["Bob Jones"])?;
483    ///     Ok(())
484    /// }
485    /// ```
486    ///
487    /// # Failure
488    ///
489    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
490    /// or if the underlying DuckDB call fails.
491    #[inline]
492    pub fn prepare(&self, sql: &str) -> Result<Statement<'_>> {
493        self.db.borrow_mut().prepare(self, sql)
494    }
495
496    /// Create an Appender for fast import data
497    /// default to use `DatabaseName::Main`
498    ///
499    /// ## Example
500    ///
501    /// ```rust,no_run
502    /// # use duckdb::{Connection, Result, params};
503    /// fn insert_rows(conn: &Connection) -> Result<()> {
504    ///     let mut app = conn.appender("foo")?;
505    ///     app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
506    ///     Ok(())
507    /// }
508    /// ```
509    ///
510    /// # Failure
511    ///
512    /// Will return `Err` if `table` not exists
513    pub fn appender(&self, table: &str) -> Result<Appender<'_>> {
514        self.appender_to_db(table, &DatabaseName::Main.to_string())
515    }
516
517    /// Create an Appender for fast import data
518    ///
519    /// ## Example
520    ///
521    /// ```rust,no_run
522    /// # use duckdb::{Connection, Result, params, DatabaseName};
523    /// fn insert_rows(conn: &Connection) -> Result<()> {
524    ///     let mut app = conn.appender_to_db("foo", &DatabaseName::Main.to_string())?;
525    ///     app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
526    ///     Ok(())
527    /// }
528    /// ```
529    ///
530    /// # Failure
531    ///
532    /// Will return `Err` if `table` not exists
533    pub fn appender_to_db(&self, table: &str, schema: &str) -> Result<Appender<'_>> {
534        self.db.borrow_mut().appender(self, table, schema)
535    }
536
537    /// Create an Appender for fast import data with provided catalog, schema and table
538    ///
539    /// ## Example
540    ///
541    /// ```rust,no_run
542    /// # use duckdb::{Connection, Result, params, DatabaseName};
543    /// fn insert_rows(conn: &Connection) -> Result<()> {
544    ///     let mut app = conn.appender_to_catalog_and_db("catalog", &DatabaseName::Main.to_string(), "foo")?;
545    ///     app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
546    ///     Ok(())
547    /// }
548    /// ```
549    ///
550    /// # Failure
551    ///
552    /// Will return `Err` if `catalog` or `schema` not exists
553    pub fn appender_to_catalog_and_db(&self, table: &str, catalog: &str, schema: &str) -> Result<Appender<'_>> {
554        self.db
555            .borrow_mut()
556            .appender_to_catalog_and_db(self, table, catalog, schema)
557    }
558
559    /// Create an Appender that only provides values for specific columns.
560    ///
561    /// Columns not in the list will use their DEFAULT value, or NULL if no default.
562    /// This supports all types of DEFAULT expressions including non-deterministic
563    /// ones like `random()`, `current_timestamp`, or sequences.
564    ///
565    /// ## Example
566    ///
567    /// ```rust,no_run
568    /// # use duckdb::{Connection, Result};
569    /// fn insert_partial(conn: &Connection) -> Result<()> {
570    ///     // Table: CREATE TABLE foo(id INT DEFAULT nextval('seq'), name TEXT, created TIMESTAMP DEFAULT current_timestamp)
571    ///     let mut app = conn.appender_with_columns("foo", &["name"])?;
572    ///     // Only provide name; id and created use their defaults
573    ///     app.append_row(["Alice"])?;
574    ///     app.append_row(["Bob"])?;
575    ///     Ok(())
576    /// }
577    /// ```
578    ///
579    /// # Failure
580    ///
581    /// Will return `Err` if `table` does not exist or a column name is invalid.
582    pub fn appender_with_columns(&self, table: &str, columns: &[&str]) -> Result<Appender<'_>> {
583        self.appender_with_columns_to_db(table, &DatabaseName::Main.to_string(), columns)
584    }
585
586    /// Create an Appender that only provides values for specific columns, with schema.
587    ///
588    /// See [`appender_with_columns`](Connection::appender_with_columns) for details.
589    pub fn appender_with_columns_to_db(&self, table: &str, schema: &str, columns: &[&str]) -> Result<Appender<'_>> {
590        self.db
591            .borrow_mut()
592            .appender_with_columns(self, table, schema, None, columns)
593    }
594
595    /// Create an Appender that only provides values for specific columns, with catalog and schema.
596    ///
597    /// See [`appender_with_columns`](Connection::appender_with_columns) for details.
598    pub fn appender_with_columns_to_catalog_and_db(
599        &self,
600        table: &str,
601        catalog: &str,
602        schema: &str,
603        columns: &[&str],
604    ) -> Result<Appender<'_>> {
605        self.db
606            .borrow_mut()
607            .appender_with_columns(self, table, schema, Some(catalog), columns)
608    }
609
610    /// Get a handle to interrupt long-running queries.
611    ///
612    /// ## Example
613    ///
614    /// ```rust,no_run
615    /// # use duckdb::{Connection, Result};
616    /// fn run_query(conn: Connection) -> Result<()> {
617    ///   let interrupt_handle = conn.interrupt_handle();
618    ///   let join_handle = std::thread::spawn(move || { conn.execute("expensive query", []) });
619    ///
620    ///   // Arbitrary wait for query to start
621    ///   std::thread::sleep(std::time::Duration::from_millis(100));
622    ///
623    ///   interrupt_handle.interrupt();
624    ///
625    ///   let query_result = join_handle.join().unwrap();
626    ///   assert!(query_result.is_err());
627    ///
628    ///   Ok(())
629    /// }
630    pub fn interrupt_handle(&self) -> std::sync::Arc<InterruptHandle> {
631        self.db.borrow().get_interrupt_handle()
632    }
633
634    /// Close the DuckDB connection.
635    ///
636    /// This is functionally equivalent to the `Drop` implementation for
637    /// `Connection` except that on failure, it returns an error and the
638    /// connection itself (presumably so closing can be attempted again).
639    ///
640    /// # Failure
641    ///
642    /// Will return `Err` if the underlying DuckDB call fails.
643    #[inline]
644    #[allow(clippy::result_large_err)]
645    pub fn close(self) -> Result<(), (Self, Error)> {
646        let r = self.db.borrow_mut().close();
647        r.map_err(move |err| (self, err))
648    }
649
650    /// Test for auto-commit mode.
651    /// Autocommit mode is on by default.
652    #[inline]
653    pub fn is_autocommit(&self) -> bool {
654        self.db.borrow().is_autocommit()
655    }
656
657    /// Creates a new connection to the already-opened database.
658    pub fn try_clone(&self) -> Result<Self> {
659        let inner = self.db.borrow().try_clone()?;
660        Ok(Self {
661            db: RefCell::new(inner),
662            cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
663            path: self.path.clone(),
664        })
665    }
666
667    /// Returns the version of the DuckDB library
668    pub fn version(&self) -> Result<String> {
669        self.query_row("PRAGMA version", [], |row| row.get(0))
670    }
671}
672
673impl fmt::Debug for Connection {
674    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
675        f.debug_struct("Connection").field("path", &self.path).finish()
676    }
677}
678
679#[cfg(doctest)]
680doc_comment::doctest!("../../../README.md");
681
682#[cfg(test)]
683mod test {
684    use crate::types::Value;
685
686    use super::*;
687    use std::{error::Error as StdError, fmt};
688
689    use arrow::{array::Int32Array, datatypes::DataType, record_batch::RecordBatch};
690    use fallible_iterator::FallibleIterator;
691
692    // this function is never called, but is still type checked; in
693    // particular, calls with specific instantiations will require
694    // that those types are `Send`.
695    #[allow(dead_code, unconditional_recursion, clippy::extra_unused_type_parameters)]
696    fn ensure_send<T: Send>() {
697        ensure_send::<Connection>();
698    }
699
700    pub fn checked_memory_handle() -> Connection {
701        Connection::open_in_memory().unwrap()
702    }
703
704    #[test]
705    fn test_params_of_vary_types() -> Result<()> {
706        let db = checked_memory_handle();
707        let sql = "BEGIN;
708                   CREATE TABLE foo(bar TEXT, qux INTEGER);
709                   INSERT INTO foo VALUES ('baz', 1), ('baz', 2), ('baz', 3);
710                   END;";
711        db.execute_batch(sql)?;
712
713        let changed = db.execute("UPDATE foo SET qux = ? WHERE bar = ?", params![1i32, &"baz"])?;
714        assert_eq!(changed, 3);
715        Ok(())
716    }
717
718    #[test]
719    #[cfg_attr(windows, ignore = "Windows doesn't allow concurrent writes to a file")]
720    fn test_concurrent_transactions_busy_commit() -> Result<()> {
721        let tmp = tempfile::tempdir().unwrap();
722        let path = tmp.path().join("transactions.db3");
723
724        Connection::open(&path)?.execute_batch(
725            "
726            BEGIN;
727            CREATE TABLE foo(x INTEGER);
728            INSERT INTO foo VALUES(42);
729            END;",
730        )?;
731
732        let mut db1 =
733            Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
734        let mut db2 =
735            Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
736
737        {
738            let tx1 = db1.transaction()?;
739            let tx2 = db2.transaction()?;
740
741            // SELECT first makes sqlite lock with a shared lock
742            tx1.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
743            tx2.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
744
745            tx1.execute("INSERT INTO foo VALUES(?1)", [1])?;
746            let _ = tx2.execute("INSERT INTO foo VALUES(?1)", [2]);
747
748            let _ = tx1.commit();
749            let _ = tx2.commit();
750        }
751
752        let _ = db1.transaction().expect("commit should have closed transaction");
753        let _ = db2.transaction().expect("commit should have closed transaction");
754        Ok(())
755    }
756
757    #[test]
758    fn test_persistence() -> Result<()> {
759        let temp_dir = tempfile::tempdir().unwrap();
760        let path = temp_dir.path().join("test.db3");
761
762        {
763            let db = Connection::open(&path)?;
764            let sql = "BEGIN;
765                   CREATE TABLE foo(x INTEGER);
766                   INSERT INTO foo VALUES(42);
767                   END;";
768            db.execute_batch(sql)?;
769        }
770
771        let path_string = path.to_str().unwrap();
772        let db = Connection::open(path_string)?;
773        let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
774
775        assert_eq!(42i64, the_answer?);
776        Ok(())
777    }
778
779    #[test]
780    fn test_open() {
781        let con = Connection::open_in_memory();
782        if let Err(e) = con {
783            panic!("open error {e}");
784        }
785        assert!(Connection::open_in_memory().is_ok());
786        let db = checked_memory_handle();
787        assert!(db.close().is_ok());
788        let _ = checked_memory_handle();
789        let _ = checked_memory_handle();
790    }
791
792    #[test]
793    fn test_open_from_raw() {
794        let con = Connection::open_in_memory();
795        assert!(con.is_ok());
796        let inner_con: InnerConnection = con.unwrap().db.into_inner();
797        unsafe {
798            assert!(Connection::open_from_raw(inner_con.db).is_ok());
799        }
800    }
801
802    #[test]
803    fn test_open_failure() -> Result<()> {
804        let filename = "no_such_file.db";
805        let result =
806            Connection::open_with_flags(filename, Config::default().access_mode(config::AccessMode::ReadOnly)?);
807        assert!(result.is_err());
808        let err = result.err().unwrap();
809        if let Error::DuckDBFailure(_e, Some(msg)) = err {
810            // TODO: update error code
811            // assert_eq!(ErrorCode::CannotOpen, e.code);
812            assert!(
813                msg.contains(filename),
814                "error message '{msg}' does not contain '{filename}'"
815            );
816        } else {
817            panic!("DuckDBFailure expected");
818        }
819        Ok(())
820    }
821
822    #[cfg(unix)]
823    #[test]
824    fn test_invalid_unicode_file_names() -> Result<()> {
825        use std::{ffi::OsStr, fs::File, os::unix::ffi::OsStrExt};
826        let temp_dir = tempfile::tempdir().unwrap();
827
828        let path = temp_dir.path();
829        if File::create(path.join(OsStr::from_bytes(&[0xFE]))).is_err() {
830            // Skip test, filesystem doesn't support invalid Unicode
831            return Ok(());
832        }
833        let db_path = path.join(OsStr::from_bytes(&[0xFF]));
834        {
835            let db = Connection::open(&db_path)?;
836            let sql = "BEGIN;
837                   CREATE TABLE foo(x INTEGER);
838                   INSERT INTO foo VALUES(42);
839                   END;";
840            db.execute_batch(sql)?;
841        }
842
843        let db = Connection::open(&db_path)?;
844        let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
845
846        assert_eq!(42i64, the_answer?);
847        Ok(())
848    }
849
850    #[test]
851    fn test_close_always_ok() -> Result<()> {
852        let db = checked_memory_handle();
853
854        // TODO: prepare a query but not execute it
855
856        db.close().unwrap();
857        Ok(())
858    }
859
860    #[test]
861    fn test_execute_batch() -> Result<()> {
862        let db = checked_memory_handle();
863        let sql = "BEGIN;
864                   CREATE TABLE foo(x INTEGER);
865                   INSERT INTO foo VALUES(1);
866                   INSERT INTO foo VALUES(2);
867                   INSERT INTO foo VALUES(3);
868                   INSERT INTO foo VALUES(4);
869                   END;";
870        db.execute_batch(sql)?;
871
872        db.execute_batch("UPDATE foo SET x = 3 WHERE x < 3")?;
873
874        assert!(db.execute_batch("INVALID SQL").is_err());
875        Ok(())
876    }
877
878    #[test]
879    fn test_execute_single() -> Result<()> {
880        let db = checked_memory_handle();
881        db.execute_batch("CREATE TABLE foo(x INTEGER)")?;
882
883        assert_eq!(
884            3,
885            db.execute("INSERT INTO foo(x) VALUES (?), (?), (?)", [1i32, 2i32, 3i32])?
886        );
887        assert_eq!(1, db.execute("INSERT INTO foo(x) VALUES (?)", [4i32])?);
888
889        assert_eq!(
890            10i32,
891            db.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
892        );
893        Ok(())
894    }
895
896    #[test]
897    fn test_prepare_column_names() -> Result<()> {
898        let db = checked_memory_handle();
899        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
900
901        let mut stmt = db.prepare("SELECT * FROM foo")?;
902        stmt.execute([])?;
903        assert_eq!(stmt.column_count(), 1);
904        assert_eq!(stmt.column_names(), vec!["x"]);
905
906        let mut stmt = db.prepare("SELECT x AS a, x AS b FROM foo")?;
907        stmt.execute([])?;
908        assert_eq!(stmt.column_count(), 2);
909        assert_eq!(stmt.column_names(), vec!["a", "b"]);
910        Ok(())
911    }
912
913    #[test]
914    fn test_prepare_execute() -> Result<()> {
915        let db = checked_memory_handle();
916        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
917
918        let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
919        assert_eq!(insert_stmt.execute([1i32])?, 1);
920        assert_eq!(insert_stmt.execute([2i32])?, 1);
921        assert_eq!(insert_stmt.execute([3i32])?, 1);
922
923        assert!(insert_stmt.execute(["hello"]).is_err());
924        // NOTE: can't execute on errored stmt
925        // assert!(insert_stmt.execute(["goodbye"]).is_err());
926        // assert!(insert_stmt.execute([types::Null]).is_err());
927
928        let mut update_stmt = db.prepare("UPDATE foo SET x=? WHERE x<?")?;
929        assert_eq!(update_stmt.execute([3i32, 3i32])?, 2);
930        assert_eq!(update_stmt.execute([3i32, 3i32])?, 0);
931        assert_eq!(update_stmt.execute([8i32, 8i32])?, 3);
932        Ok(())
933    }
934
935    #[test]
936    fn test_prepare_query() -> Result<()> {
937        let db = checked_memory_handle();
938        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
939
940        let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
941        assert_eq!(insert_stmt.execute([1i32])?, 1);
942        assert_eq!(insert_stmt.execute([2i32])?, 1);
943        assert_eq!(insert_stmt.execute([3i32])?, 1);
944
945        let mut query = db.prepare("SELECT x FROM foo WHERE x < ? ORDER BY x DESC")?;
946        {
947            let mut rows = query.query([4i32])?;
948            let mut v = Vec::<i32>::new();
949
950            while let Some(row) = rows.next()? {
951                v.push(row.get(0)?);
952            }
953
954            assert_eq!(v, [3i32, 2, 1]);
955        }
956
957        {
958            let mut rows = query.query([3i32])?;
959            let mut v = Vec::<i32>::new();
960
961            while let Some(row) = rows.next()? {
962                v.push(row.get(0)?);
963            }
964
965            assert_eq!(v, [2i32, 1]);
966        }
967        Ok(())
968    }
969
970    #[test]
971    fn test_query_map() -> Result<()> {
972        let db = checked_memory_handle();
973        let sql = "BEGIN;
974                   CREATE TABLE foo(x INTEGER, y TEXT);
975                   INSERT INTO foo VALUES(4, 'hello');
976                   INSERT INTO foo VALUES(3, ', ');
977                   INSERT INTO foo VALUES(2, 'world');
978                   INSERT INTO foo VALUES(1, '!');
979                   END;";
980        db.execute_batch(sql)?;
981
982        let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
983        let results: Result<Vec<String>> = query.query([])?.map(|row| row.get(1)).collect();
984
985        assert_eq!(results?.concat(), "hello, world!");
986        Ok(())
987    }
988
989    #[test]
990    fn test_query_row() -> Result<()> {
991        let db = checked_memory_handle();
992        let sql = "BEGIN;
993                   CREATE TABLE foo(x INTEGER);
994                   INSERT INTO foo VALUES(1);
995                   INSERT INTO foo VALUES(2);
996                   INSERT INTO foo VALUES(3);
997                   INSERT INTO foo VALUES(4);
998                   END;";
999        db.execute_batch(sql)?;
1000
1001        assert_eq!(
1002            10i64,
1003            db.query_row::<i64, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
1004        );
1005
1006        let result: Result<i64> = db.query_row("SELECT x FROM foo WHERE x > 5", [], |r| r.get(0));
1007        match result.unwrap_err() {
1008            Error::QueryReturnedNoRows => (),
1009            err => panic!("Unexpected error {err}"),
1010        }
1011
1012        let bad_query_result = db.query_row("NOT A PROPER QUERY; test123", [], |_| Ok(()));
1013
1014        assert!(bad_query_result.is_err());
1015        Ok(())
1016    }
1017
1018    #[test]
1019    fn test_optional() -> Result<()> {
1020        let db = checked_memory_handle();
1021
1022        let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 <> 0", [], |r| r.get(0));
1023        let result = result.optional();
1024        match result? {
1025            None => (),
1026            _ => panic!("Unexpected result"),
1027        }
1028
1029        let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 == 0", [], |r| r.get(0));
1030        let result = result.optional();
1031        match result? {
1032            Some(1) => (),
1033            _ => panic!("Unexpected result"),
1034        }
1035
1036        let bad_query_result: Result<i64> = db.query_row("NOT A PROPER QUERY", [], |r| r.get(0));
1037        let bad_query_result = bad_query_result.optional();
1038        assert!(bad_query_result.is_err());
1039        Ok(())
1040    }
1041
1042    #[test]
1043    fn test_prepare_failures() -> Result<()> {
1044        let db = checked_memory_handle();
1045        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1046
1047        let _ = db.prepare("SELECT * FROM does_not_exist").unwrap_err();
1048        // assert!(format!("{}", err).contains("does_not_exist"));
1049        Ok(())
1050    }
1051
1052    #[test]
1053    fn test_is_autocommit() {
1054        let db = checked_memory_handle();
1055        assert!(db.is_autocommit(), "autocommit expected to be active by default");
1056    }
1057
1058    #[test]
1059    #[should_panic(expected = "not supported")]
1060    fn test_statement_debugging() {
1061        let db = checked_memory_handle();
1062        let query = "SELECT 12345";
1063        let stmt = db.prepare(query).unwrap();
1064
1065        assert!(format!("{stmt:?}").contains(query));
1066    }
1067
1068    #[test]
1069    fn test_notnull_constraint_error() -> Result<()> {
1070        let db = checked_memory_handle();
1071        db.execute_batch("CREATE TABLE foo(x TEXT NOT NULL)")?;
1072
1073        let result = db.execute("INSERT INTO foo (x) VALUES (NULL)", []);
1074        assert!(result.is_err());
1075
1076        match result.unwrap_err() {
1077            Error::DuckDBFailure(err, _) => {
1078                // TODO(wangfenjin): Update errorcode
1079                assert_eq!(err.code, ErrorCode::Unknown);
1080            }
1081            err => panic!("Unexpected error {err}"),
1082        }
1083        Ok(())
1084    }
1085
1086    #[test]
1087    fn test_clone() -> Result<()> {
1088        // 1. Drop the cloned connection first. The original connection should still be able to run queries.
1089        {
1090            let owned_con = checked_memory_handle();
1091            {
1092                let cloned_con = owned_con.try_clone().unwrap();
1093                cloned_con.execute_batch("create table test (c1 bigint)")?;
1094                cloned_con.close().unwrap();
1095            }
1096            owned_con.execute_batch("create table test2 (c1 bigint)")?;
1097            owned_con.close().unwrap();
1098        }
1099
1100        // 2. Close the original connection first. The cloned connection should still be able to run queries.
1101        {
1102            let cloned_con = {
1103                let owned_con = checked_memory_handle();
1104                let clone = owned_con.try_clone().unwrap();
1105                owned_con.execute_batch("create table test (c1 bigint)")?;
1106                owned_con.close().unwrap();
1107                clone
1108            };
1109            cloned_con.execute_batch("create table test2 (c1 bigint)")?;
1110            cloned_con.close().unwrap();
1111        }
1112        Ok(())
1113    }
1114
1115    mod query_and_then_tests {
1116        use super::*;
1117
1118        #[derive(Debug)]
1119        enum CustomError {
1120            SomeError,
1121            Sqlite(Error),
1122        }
1123
1124        impl fmt::Display for CustomError {
1125            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1126                match *self {
1127                    Self::SomeError => write!(f, "my custom error"),
1128                    Self::Sqlite(ref se) => write!(f, "my custom error: {se}"),
1129                }
1130            }
1131        }
1132
1133        impl StdError for CustomError {
1134            fn description(&self) -> &str {
1135                "my custom error"
1136            }
1137
1138            fn cause(&self) -> Option<&dyn StdError> {
1139                match *self {
1140                    Self::SomeError => None,
1141                    Self::Sqlite(ref se) => Some(se),
1142                }
1143            }
1144        }
1145
1146        impl From<Error> for CustomError {
1147            fn from(se: Error) -> Self {
1148                Self::Sqlite(se)
1149            }
1150        }
1151
1152        type CustomResult<T> = Result<T, CustomError>;
1153
1154        #[test]
1155        fn test_query_and_then() -> Result<()> {
1156            let db = checked_memory_handle();
1157            let sql = "BEGIN;
1158                       CREATE TABLE foo(x INTEGER, y TEXT);
1159                       INSERT INTO foo VALUES(4, 'hello');
1160                       INSERT INTO foo VALUES(3, ', ');
1161                       INSERT INTO foo VALUES(2, 'world');
1162                       INSERT INTO foo VALUES(1, '!');
1163                       END;";
1164            db.execute_batch(sql)?;
1165
1166            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1167            let results: Result<Vec<String>> = query.query_and_then([], |row| row.get(1))?.collect();
1168
1169            assert_eq!(results?.concat(), "hello, world!");
1170            Ok(())
1171        }
1172
1173        #[test]
1174        fn test_query_and_then_fails() -> Result<()> {
1175            let db = checked_memory_handle();
1176            let sql = "BEGIN;
1177                       CREATE TABLE foo(x INTEGER, y TEXT);
1178                       INSERT INTO foo VALUES(4, 'hello');
1179                       INSERT INTO foo VALUES(3, ', ');
1180                       INSERT INTO foo VALUES(2, 'world');
1181                       INSERT INTO foo VALUES(1, '!');
1182                       END;";
1183            db.execute_batch(sql)?;
1184
1185            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1186            let bad_type: Result<Vec<f64>> = query.query_and_then([], |row| row.get(1))?.collect();
1187
1188            match bad_type.unwrap_err() {
1189                Error::InvalidColumnType(..) => (),
1190                err => panic!("Unexpected error {err}"),
1191            }
1192
1193            let bad_idx: Result<Vec<String>> = query.query_and_then([], |row| row.get(3))?.collect();
1194
1195            match bad_idx.unwrap_err() {
1196                Error::InvalidColumnIndex(_) => (),
1197                err => panic!("Unexpected error {err}"),
1198            }
1199            Ok(())
1200        }
1201
1202        #[test]
1203        fn test_query_and_then_custom_error() -> CustomResult<()> {
1204            let db = checked_memory_handle();
1205            let sql = "BEGIN;
1206                       CREATE TABLE foo(x INTEGER, y TEXT);
1207                       INSERT INTO foo VALUES(4, 'hello');
1208                       INSERT INTO foo VALUES(3, ', ');
1209                       INSERT INTO foo VALUES(2, 'world');
1210                       INSERT INTO foo VALUES(1, '!');
1211                       END;";
1212            db.execute_batch(sql)?;
1213
1214            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1215            let results: CustomResult<Vec<String>> = query
1216                .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1217                .collect();
1218
1219            assert_eq!(results?.concat(), "hello, world!");
1220            Ok(())
1221        }
1222
1223        #[test]
1224        fn test_query_and_then_custom_error_fails() -> Result<()> {
1225            let db = checked_memory_handle();
1226            let sql = "BEGIN;
1227                       CREATE TABLE foo(x INTEGER, y TEXT);
1228                       INSERT INTO foo VALUES(4, 'hello');
1229                       INSERT INTO foo VALUES(3, ', ');
1230                       INSERT INTO foo VALUES(2, 'world');
1231                       INSERT INTO foo VALUES(1, '!');
1232                       END;";
1233            db.execute_batch(sql)?;
1234
1235            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1236            let bad_type: CustomResult<Vec<f64>> = query
1237                .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1238                .collect();
1239
1240            match bad_type.unwrap_err() {
1241                CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1242                err => panic!("Unexpected error {err}"),
1243            }
1244
1245            let bad_idx: CustomResult<Vec<String>> = query
1246                .query_and_then([], |row| row.get(3).map_err(CustomError::Sqlite))?
1247                .collect();
1248
1249            match bad_idx.unwrap_err() {
1250                CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1251                err => panic!("Unexpected error {err}"),
1252            }
1253
1254            let non_sqlite_err: CustomResult<Vec<String>> =
1255                query.query_and_then([], |_| Err(CustomError::SomeError))?.collect();
1256
1257            match non_sqlite_err.unwrap_err() {
1258                CustomError::SomeError => (),
1259                err => panic!("Unexpected error {err}"),
1260            }
1261            Ok(())
1262        }
1263
1264        #[test]
1265        fn test_query_row_and_then_custom_error() -> CustomResult<()> {
1266            let db = checked_memory_handle();
1267            let sql = "BEGIN;
1268                       CREATE TABLE foo(x INTEGER, y TEXT);
1269                       INSERT INTO foo VALUES(4, 'hello');
1270                       END;";
1271            db.execute_batch(sql)?;
1272
1273            let query = "SELECT x, y FROM foo ORDER BY x DESC";
1274            let results: CustomResult<String> =
1275                db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1276
1277            assert_eq!(results?, "hello");
1278            Ok(())
1279        }
1280
1281        #[test]
1282        fn test_query_row_and_then_custom_error_fails() -> Result<()> {
1283            let db = checked_memory_handle();
1284            let sql = "BEGIN;
1285                       CREATE TABLE foo(x INTEGER, y TEXT);
1286                       INSERT INTO foo VALUES(4, 'hello');
1287                       END;";
1288            db.execute_batch(sql)?;
1289
1290            let query = "SELECT x, y FROM foo ORDER BY x DESC";
1291            let bad_type: CustomResult<f64> =
1292                db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1293
1294            match bad_type.unwrap_err() {
1295                CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1296                err => panic!("Unexpected error {err}"),
1297            }
1298
1299            let bad_idx: CustomResult<String> =
1300                db.query_row_and_then(query, [], |row| row.get(3).map_err(CustomError::Sqlite));
1301
1302            match bad_idx.unwrap_err() {
1303                CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1304                err => panic!("Unexpected error {err}"),
1305            }
1306
1307            let non_sqlite_err: CustomResult<String> =
1308                db.query_row_and_then(query, [], |_| Err(CustomError::SomeError));
1309
1310            match non_sqlite_err.unwrap_err() {
1311                CustomError::SomeError => (),
1312                err => panic!("Unexpected error {err}"),
1313            }
1314            Ok(())
1315        }
1316
1317        #[test]
1318        fn test_rows_and_then_with_custom_error() -> Result<()> {
1319            let db = checked_memory_handle();
1320            db.execute_batch("CREATE TABLE test (value INTEGER)")?;
1321            db.execute_batch("INSERT INTO test VALUES (1), (3), (5)")?;
1322
1323            let mut stmt = db.prepare("SELECT value FROM test ORDER BY value")?;
1324            let rows = stmt.query([])?;
1325
1326            // Use and_then to apply custom validation with custom error type
1327            let results: Vec<i32> = rows
1328                .and_then(|row| -> CustomResult<i32> {
1329                    let val: i32 = row.get(0)?; // duckdb::Error automatically converted via From trait
1330                    if val > 10 {
1331                        Err(CustomError::SomeError) // Custom application-specific error
1332                    } else {
1333                        Ok(val)
1334                    }
1335                })
1336                .collect::<CustomResult<Vec<_>>>()
1337                .unwrap();
1338
1339            assert_eq!(results, vec![1, 3, 5]);
1340            Ok(())
1341        }
1342    }
1343
1344    #[test]
1345    fn test_dynamic() -> Result<()> {
1346        let db = checked_memory_handle();
1347        let sql = "BEGIN;
1348                       CREATE TABLE foo(x INTEGER, y TEXT);
1349                       INSERT INTO foo VALUES(4, 'hello');
1350                       END;";
1351        db.execute_batch(sql)?;
1352
1353        db.query_row("SELECT * FROM foo", [], |r| {
1354            assert_eq!(2, r.as_ref().column_count());
1355            Ok(())
1356        })
1357    }
1358    #[test]
1359    fn test_dyn_box() -> Result<()> {
1360        let db = checked_memory_handle();
1361        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1362        let b: Box<dyn ToSql> = Box::new(5);
1363        db.execute("INSERT INTO foo VALUES(?)", [b])?;
1364        db.query_row("SELECT x FROM foo", [], |r| {
1365            assert_eq!(5, r.get_unwrap::<_, i32>(0));
1366            Ok(())
1367        })
1368    }
1369
1370    #[test]
1371    fn test_alter_table() -> Result<()> {
1372        let db = checked_memory_handle();
1373        db.execute_batch("CREATE TABLE x(t INTEGER);")?;
1374        // `execute_batch` should be used but `execute` should also work
1375        db.execute("ALTER TABLE x RENAME TO y;", [])?;
1376        Ok(())
1377    }
1378
1379    #[test]
1380    fn test_query_arrow_record_batch_small() -> Result<()> {
1381        let db = checked_memory_handle();
1382        let sql = "BEGIN TRANSACTION;
1383                   CREATE TABLE test(t INTEGER);
1384                   INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);
1385                   END TRANSACTION;";
1386        db.execute_batch(sql)?;
1387        let mut stmt = db.prepare("select t from test order by t desc")?;
1388        let mut arr = stmt.query_arrow([])?;
1389
1390        let schema = arr.get_schema();
1391        assert_eq!(schema.fields().len(), 1);
1392        assert_eq!(schema.field(0).name(), "t");
1393        assert_eq!(schema.field(0).data_type(), &DataType::Int32);
1394
1395        let rb = arr.next().unwrap();
1396        let column = rb.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1397        assert_eq!(column.len(), 5);
1398        assert_eq!(column.value(0), 5);
1399        assert_eq!(column.value(1), 4);
1400        assert_eq!(column.value(2), 3);
1401        assert_eq!(column.value(3), 2);
1402        assert_eq!(column.value(4), 1);
1403
1404        assert!(arr.next().is_none());
1405        Ok(())
1406    }
1407
1408    #[test]
1409    fn test_query_arrow_record_batch_large() -> Result<()> {
1410        let db = checked_memory_handle();
1411        db.execute_batch("BEGIN TRANSACTION")?;
1412        db.execute_batch("CREATE TABLE test(t INTEGER);")?;
1413        for _ in 0..600 {
1414            db.execute_batch("INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);")?;
1415        }
1416        db.execute_batch("END TRANSACTION")?;
1417        let rbs: Vec<RecordBatch> = db.prepare("select t from test order by t")?.query_arrow([])?.collect();
1418        // batch size is not stable
1419        // assert_eq!(rbs.len(), 3);
1420        assert_eq!(rbs.iter().map(|rb| rb.num_rows()).sum::<usize>(), 3000);
1421        assert_eq!(
1422            rbs.iter()
1423                .map(|rb| rb
1424                    .column(0)
1425                    .as_any()
1426                    .downcast_ref::<Int32Array>()
1427                    .unwrap()
1428                    .iter()
1429                    .map(|i| i.unwrap())
1430                    .sum::<i32>())
1431                .sum::<i32>(),
1432            9000
1433        );
1434        Ok(())
1435    }
1436
1437    #[test]
1438    fn test_stream_arrow_with_call() -> Result<()> {
1439        use arrow::datatypes::{DataType, Field, Schema};
1440        use std::sync::Arc;
1441
1442        let db = checked_memory_handle();
1443
1444        db.execute_batch(
1445            "CREATE TABLE test_data(id INTEGER, name VARCHAR);
1446             INSERT INTO test_data VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie');",
1447        )?;
1448
1449        db.execute_batch("CREATE MACRO test_func() AS TABLE SELECT * FROM test_data;")?;
1450
1451        let schema = Arc::new(Schema::new(vec![
1452            Field::new("id", DataType::Int32, true),
1453            Field::new("name", DataType::Utf8, true),
1454        ]));
1455
1456        let mut stmt = db.prepare("CALL test_func()")?;
1457        let rbs: Vec<RecordBatch> = stmt.stream_arrow([], schema)?.collect();
1458
1459        // Verify we got results
1460        assert!(!rbs.is_empty(), "Expected at least one record batch");
1461        let total_rows: usize = rbs.iter().map(|rb| rb.num_rows()).sum();
1462        assert_eq!(total_rows, 3);
1463
1464        let id_column = rbs[0].column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1465        assert_eq!(id_column.value(0), 1);
1466
1467        Ok(())
1468    }
1469
1470    #[test]
1471    fn round_trip_interval() -> Result<()> {
1472        let db = checked_memory_handle();
1473        db.execute_batch("CREATE TABLE foo (t INTERVAL);")?;
1474
1475        let d = Value::Interval {
1476            months: 1,
1477            days: 2,
1478            nanos: 3,
1479        };
1480        db.execute("INSERT INTO foo VALUES (?)", [d])?;
1481
1482        let mut stmt = db.prepare("SELECT t FROM foo")?;
1483        let mut rows = stmt.query([])?;
1484        let row = rows.next()?.unwrap();
1485        let d: Value = row.get_unwrap(0);
1486        assert_eq!(d, d);
1487        Ok(())
1488    }
1489
1490    #[test]
1491    fn test_database_name_to_string() -> Result<()> {
1492        assert_eq!(DatabaseName::Main.to_string(), "main");
1493        assert_eq!(DatabaseName::Temp.to_string(), "temp");
1494        assert_eq!(DatabaseName::Attached("abc").to_string(), "abc");
1495        Ok(())
1496    }
1497
1498    #[test]
1499    fn test_interrupt() -> Result<()> {
1500        let db = checked_memory_handle();
1501        let db_interrupt = db.interrupt_handle();
1502
1503        let (tx, rx) = std::sync::mpsc::channel();
1504        std::thread::spawn(move || {
1505            let mut stmt = db
1506                .prepare("select count(*) from range(10000000) t1, range(1000000) t2")
1507                .unwrap();
1508            tx.send(stmt.execute([])).unwrap();
1509        });
1510
1511        std::thread::sleep(std::time::Duration::from_millis(100));
1512        db_interrupt.interrupt();
1513
1514        let result = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap();
1515        assert!(result.is_err_and(|err| err.to_string().contains("INTERRUPT")));
1516        Ok(())
1517    }
1518
1519    #[test]
1520    fn test_interrupt_on_dropped_db() {
1521        let db = checked_memory_handle();
1522        let db_interrupt = db.interrupt_handle();
1523
1524        drop(db);
1525        db_interrupt.interrupt();
1526    }
1527
1528    #[cfg(feature = "bundled")]
1529    #[test]
1530    fn test_version() -> Result<()> {
1531        let db = checked_memory_handle();
1532        let expected: String = format!("v{}", env!("CARGO_PKG_VERSION"));
1533        let actual = db.version()?;
1534        assert_eq!(expected, actual);
1535        Ok(())
1536    }
1537
1538    #[test]
1539    fn test_arrow_string_view_setting() -> Result<()> {
1540        // Test that only one setting doesn't work (missing arrow_output_version)
1541        {
1542            let config = Config::default().with("produce_arrow_string_view", "true")?;
1543            let conn = Connection::open_in_memory_with_flags(config)?;
1544
1545            let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1546            let arrow = query.query_arrow([])?;
1547
1548            let batch = arrow.into_iter().next().expect("Expected at least one batch");
1549            assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8);
1550        }
1551
1552        {
1553            let config = Config::default()
1554                .with("produce_arrow_string_view", "true")?
1555                .with("arrow_output_version", "1.4")?;
1556            let conn = Connection::open_in_memory_with_flags(config)?;
1557
1558            let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1559            let arrow = query.query_arrow([])?;
1560
1561            let batch = arrow.into_iter().next().expect("Expected at least one batch");
1562            assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8View);
1563        }
1564
1565        Ok(())
1566    }
1567
1568    #[test]
1569    fn test_prepare_multi_statement() -> Result<()> {
1570        let db = checked_memory_handle();
1571
1572        {
1573            let mut stmt =
1574                db.prepare("CREATE TABLE test(x INTEGER); INSERT INTO test VALUES (42); SELECT x FROM test;")?;
1575            let result: i32 = stmt.query_row([], |row| row.get(0))?;
1576            assert_eq!(result, 42);
1577        }
1578
1579        {
1580            let mut stmt = db.prepare(
1581                "CREATE TEMP TABLE temp_data(id INTEGER, value TEXT);
1582                INSERT INTO temp_data VALUES (1, 'first'), (2, 'second');
1583                SELECT COUNT(*) FROM temp_data;",
1584            )?;
1585            let count: i32 = stmt.query_row([], |row| row.get(0))?;
1586            assert_eq!(count, 2);
1587        }
1588
1589        Ok(())
1590    }
1591
1592    #[test]
1593    fn test_pivot_query() -> Result<()> {
1594        let db = checked_memory_handle();
1595
1596        db.execute_batch(
1597            "CREATE TABLE cities(city VARCHAR, year INTEGER, population INTEGER);
1598             INSERT INTO cities VALUES
1599               ('Amsterdam', 2000, 1005),
1600               ('Amsterdam', 2010, 1065),
1601               ('Amsterdam', 2020, 1158),
1602               ('Berlin', 2000, 3382),
1603               ('Berlin', 2010, 3460),
1604               ('Berlin', 2020, 3576);",
1605        )?;
1606
1607        // PIVOT queries internally expand to multiple statements
1608        let mut stmt = db.prepare("PIVOT cities ON year USING sum(population);")?;
1609        let mut rows = stmt.query([])?;
1610
1611        let mut row_count = 0;
1612        while let Some(_row) = rows.next()? {
1613            row_count += 1;
1614        }
1615        assert_eq!(row_count, 2);
1616
1617        Ok(())
1618    }
1619
1620    #[test]
1621    fn test_multiple_memory_databases() -> Result<()> {
1622        // Unnamed :memory: connections are isolated
1623        {
1624            let mem1 = Connection::open_in_memory()?;
1625            let mem2 = Connection::open_in_memory()?;
1626
1627            mem1.execute_batch("CREATE TABLE test (id INTEGER)")?;
1628            mem1.execute("INSERT INTO test VALUES (1)", [])?;
1629
1630            mem2.execute_batch("CREATE TABLE test (id INTEGER)")?;
1631            mem2.execute("INSERT INTO test VALUES (2)", [])?;
1632
1633            let value1: i32 = mem1.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1634            assert_eq!(value1, 1);
1635
1636            let value2: i32 = mem2.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1637            assert_eq!(value2, 2);
1638        }
1639
1640        // try_clone() shares the same database
1641        {
1642            let shared = Connection::open_in_memory()?;
1643
1644            shared.execute_batch("CREATE TABLE shared_table (id INTEGER)")?;
1645            shared.execute("INSERT INTO shared_table VALUES (123)", [])?;
1646
1647            let cloned = shared.try_clone()?;
1648
1649            // Cloned connection can see the original's tables
1650            let value: i32 = cloned.query_row("SELECT id FROM shared_table", [], |r| r.get(0))?;
1651            assert_eq!(value, 123);
1652
1653            cloned.execute("INSERT INTO shared_table VALUES (456)", [])?;
1654
1655            // Original connection can see cloned's insert
1656            let count: i64 = shared.query_row("SELECT COUNT(*) FROM shared_table", [], |r| r.get(0))?;
1657            assert_eq!(count, 2);
1658        }
1659
1660        Ok(())
1661    }
1662
1663    #[test]
1664    fn test_appender_with_catalog() -> Result<()> {
1665        let db = checked_memory_handle();
1666
1667        // Attach a new database to use as a catalog
1668        let temp_dir = tempfile::tempdir().unwrap();
1669        let attached_path = temp_dir.path().join("attached.db");
1670        db.execute_batch(&format!("ATTACH '{}' AS attached_db", attached_path.display()))?;
1671
1672        // Create a table in the attached database
1673        db.execute_batch("CREATE TABLE attached_db.main.test_table (id INTEGER, name TEXT)")?;
1674
1675        // Use appender with catalog
1676        {
1677            let mut app = db.appender_to_catalog_and_db("test_table", "attached_db", "main")?;
1678            app.append_row(params![1, "Alice"])?;
1679            app.append_row(params![2, "Bob"])?;
1680            app.append_row(params![3, "Charlie"])?;
1681        }
1682
1683        // Verify data was inserted into the correct table
1684        let count: i64 = db.query_row("SELECT COUNT(*) FROM attached_db.main.test_table", [], |r| r.get(0))?;
1685        assert_eq!(count, 3);
1686
1687        let name: String = db.query_row("SELECT name FROM attached_db.main.test_table WHERE id = ?", [2], |r| {
1688            r.get(0)
1689        })?;
1690        assert_eq!(name, "Bob");
1691
1692        Ok(())
1693    }
1694
1695    #[test]
1696    fn test_appender_with_catalog_multiple_schemas() -> Result<()> {
1697        let db = checked_memory_handle();
1698
1699        // Attach a new database
1700        let temp_dir = tempfile::tempdir().unwrap();
1701        let attached_path = temp_dir.path().join("multi_schema.db");
1702        db.execute_batch(&format!("ATTACH '{}' AS my_catalog", attached_path.display()))?;
1703
1704        // Create multiple schemas and tables
1705        db.execute_batch("CREATE SCHEMA my_catalog.schema1")?;
1706        db.execute_batch("CREATE SCHEMA my_catalog.schema2")?;
1707        db.execute_batch("CREATE TABLE my_catalog.schema1.data (value INTEGER)")?;
1708        db.execute_batch("CREATE TABLE my_catalog.schema2.data (value INTEGER)")?;
1709
1710        // Append to schema1
1711        {
1712            let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema1")?;
1713            app.append_rows([[10], [20], [30]])?;
1714        }
1715
1716        // Append to schema2
1717        {
1718            let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema2")?;
1719            app.append_rows([[100], [200]])?;
1720        }
1721
1722        // Verify data in schema1
1723        let sum1: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema1.data", [], |r| r.get(0))?;
1724        assert_eq!(sum1, 60);
1725
1726        // Verify data in schema2
1727        let sum2: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema2.data", [], |r| r.get(0))?;
1728        assert_eq!(sum2, 300);
1729
1730        Ok(())
1731    }
1732
1733    #[test]
1734    fn test_appender_with_catalog_main_vs_attached() -> Result<()> {
1735        let db = checked_memory_handle();
1736
1737        // Create table in main database
1738        db.execute_batch("CREATE TABLE test (id INTEGER)")?;
1739
1740        // Attach another database
1741        let temp_dir = tempfile::tempdir().unwrap();
1742        let attached_path = temp_dir.path().join("other.db");
1743        db.execute_batch(&format!("ATTACH '{}' AS other_db", attached_path.display()))?;
1744        db.execute_batch("CREATE TABLE other_db.main.test (id INTEGER)")?;
1745
1746        // Append to main catalog (memory)
1747        {
1748            let mut app = db.appender_to_catalog_and_db("test", "memory", "main")?;
1749            app.append_rows([[1], [2]])?;
1750        }
1751
1752        // Append to attached catalog
1753        {
1754            let mut app = db.appender_to_catalog_and_db("test", "other_db", "main")?;
1755            app.append_rows([[100], [200]])?;
1756        }
1757
1758        // Verify main database
1759        let count_main: i64 = db.query_row("SELECT COUNT(*) FROM test", [], |r| r.get(0))?;
1760        assert_eq!(count_main, 2);
1761
1762        // Verify attached database
1763        let count_attached: i64 = db.query_row("SELECT COUNT(*) FROM other_db.main.test", [], |r| r.get(0))?;
1764        assert_eq!(count_attached, 2);
1765
1766        Ok(())
1767    }
1768
1769    #[test]
1770    fn test_appender_with_catalog_error_invalid_catalog() -> Result<()> {
1771        let db = checked_memory_handle();
1772
1773        // Try to create appender with non-existent catalog
1774        let result = db.appender_to_catalog_and_db("test", "nonexistent_catalog", "main");
1775        assert!(result.is_err());
1776
1777        Ok(())
1778    }
1779
1780    #[test]
1781    fn test_appender_with_catalog_error_invalid_schema() -> Result<()> {
1782        let db = checked_memory_handle();
1783
1784        // Attach a database
1785        let temp_dir = tempfile::tempdir().unwrap();
1786        let attached_path = temp_dir.path().join("test.db");
1787        db.execute_batch(&format!("ATTACH '{}' AS my_db", attached_path.display()))?;
1788
1789        db.execute_batch("CREATE TABLE my_db.main.test (id INTEGER)")?;
1790
1791        // Try to create appender with non-existent schema
1792        let result = db.appender_to_catalog_and_db("test", "my_db", "nonexistent_schema");
1793        assert!(result.is_err());
1794
1795        Ok(())
1796    }
1797
1798    #[test]
1799    fn test_appender_with_catalog_flush() -> Result<()> {
1800        let db = checked_memory_handle();
1801
1802        // Attach database
1803        let temp_dir = tempfile::tempdir().unwrap();
1804        let attached_path = temp_dir.path().join("flush_test.db");
1805        db.execute_batch(&format!("ATTACH '{}' AS flush_db", attached_path.display()))?;
1806
1807        db.execute_batch("CREATE TABLE flush_db.main.test (id INTEGER)")?;
1808
1809        // Use appender with explicit flush
1810        {
1811            let mut app = db.appender_to_catalog_and_db("test", "flush_db", "main")?;
1812            app.append_row([1])?;
1813            app.append_row([2])?;
1814            app.flush()?;
1815            app.append_row([3])?;
1816            app.flush()?;
1817        }
1818
1819        // Verify all rows were flushed
1820        let count: i64 = db.query_row("SELECT COUNT(*) FROM flush_db.main.test", [], |r| r.get(0))?;
1821        assert_eq!(count, 3);
1822
1823        Ok(())
1824    }
1825}