Skip to main content

duckdb/
lib.rs

1//! duckdb-rs is an ergonomic wrapper for using DuckDB from Rust. It attempts to
2//! expose an interface similar to [rusqlite](https://github.com/rusqlite/rusqlite).
3//!
4//! ```rust
5//! use duckdb::{params, Connection, Result};
6//! use duckdb::arrow::record_batch::RecordBatch;
7//! use duckdb::arrow::util::pretty::print_batches;
8//!
9//! #[derive(Debug)]
10//! struct Person {
11//!     id: i32,
12//!     name: String,
13//!     data: Option<Vec<u8>>,
14//! }
15//!
16//! fn main() -> Result<()> {
17//!     let conn = Connection::open_in_memory()?;
18//!
19//!     conn.execute_batch(
20//!         r"CREATE SEQUENCE seq;
21//!           CREATE TABLE person (
22//!                   id              INTEGER PRIMARY KEY DEFAULT NEXTVAL('seq'),
23//!                   name            TEXT NOT NULL,
24//!                   data            BLOB
25//!                   );
26//!          ")?;
27//!     let me = Person {
28//!         id: 0,
29//!         name: "Steven".to_string(),
30//!         data: None,
31//!     };
32//!     conn.execute(
33//!         "INSERT INTO person (name, data) VALUES (?, ?)",
34//!         params![me.name, me.data],
35//!     )?;
36//!
37//!     let mut stmt = conn.prepare("SELECT id, name, data FROM person")?;
38//!     let person_iter = stmt.query_map([], |row| {
39//!         Ok(Person {
40//!             id: row.get(0)?,
41//!             name: row.get(1)?,
42//!             data: row.get(2)?,
43//!         })
44//!     })?;
45//!
46//!     for person in person_iter {
47//!         println!("Found person {:?}", person.unwrap());
48//!     }
49//!
50//!     // query table by arrow
51//!     let rbs: Vec<RecordBatch> = stmt.query_arrow([])?.collect();
52//!     print_batches(&rbs);
53//!     Ok(())
54//! }
55//! ```
56#![warn(missing_docs)]
57
58pub use libduckdb_sys as ffi;
59
60use std::{
61    cell::RefCell,
62    convert,
63    ffi::CString,
64    fmt,
65    path::{Path, PathBuf},
66    result, str,
67};
68
69use crate::{cache::StatementCache, inner_connection::InnerConnection, raw_statement::RawStatement, types::ValueRef};
70
71#[cfg(feature = "r2d2")]
72pub use crate::r2d2::DuckdbConnectionManager;
73pub use crate::{
74    appender::Appender,
75    appender_params::{AppenderParams, AppenderParamsFromIter, appender_params_from_iter},
76    arrow_batch::{Arrow, ArrowStream},
77    cache::CachedStatement,
78    column::Column,
79    config::{AccessMode, Config, DefaultNullOrder, DefaultOrder},
80    error::Error,
81    ffi::ErrorCode,
82    inner_connection::InterruptHandle,
83    params::{Params, ParamsFromIter, params_from_iter},
84    row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows},
85    statement::Statement,
86    transaction::{DropBehavior, Transaction},
87    types::ToSql,
88};
89#[cfg(feature = "polars")]
90pub use polars_dataframe::Polars;
91
92// re-export dependencies to minimise version maintenance for crate users
93pub use arrow;
94#[cfg(feature = "loadable-extension")]
95pub use duckdb_loadable_macros::duckdb_entrypoint_c_api;
96#[cfg(feature = "polars")]
97pub use polars;
98
99/// The core module contains the main functionality of the DuckDB crate.
100pub mod core;
101
102#[macro_use]
103mod error;
104mod appender;
105mod appender_params;
106mod arrow_batch;
107mod cache;
108mod column;
109mod config;
110mod inner_connection;
111mod params;
112
113#[cfg(feature = "polars")]
114mod polars_dataframe;
115mod pragma;
116#[cfg(feature = "r2d2")]
117mod r2d2;
118mod raw_statement;
119mod row;
120mod statement;
121mod transaction;
122
123#[cfg(feature = "extensions-full")]
124mod extension;
125
126pub mod profiling;
127pub mod types;
128/// The duckdb table function interface
129#[cfg(feature = "vtab")]
130pub mod vtab;
131
132/// The duckdb scalar function interface
133#[cfg(feature = "vscalar")]
134pub mod vscalar;
135
136#[cfg(test)]
137mod test_all_types;
138
139// Number of cached prepared statements we'll hold on to.
140const STATEMENT_CACHE_DEFAULT_CAPACITY: usize = 16;
141
142/// A macro making it more convenient to pass heterogeneous or long lists of
143/// parameters as a `&[&dyn ToSql]`.
144///
145/// # Example
146///
147/// ```rust,no_run
148/// # use duckdb::{Result, Connection, params};
149///
150/// struct Person {
151///     name: String,
152///     age_in_years: u8,
153///     data: Option<Vec<u8>>,
154/// }
155///
156/// fn add_person(conn: &Connection, person: &Person) -> Result<()> {
157///     conn.execute("INSERT INTO person (name, age_in_years, data)
158///                   VALUES (?1, ?2, ?3)",
159///                  params![person.name, person.age_in_years, person.data])?;
160///     Ok(())
161/// }
162/// ```
163#[macro_export]
164macro_rules! params {
165    () => {
166        &[] as &[&dyn $crate::ToSql]
167    };
168    ($($param:expr),+ $(,)?) => {
169        &[$(&$param as &dyn $crate::ToSql),+] as &[&dyn $crate::ToSql]
170    };
171}
172
173/// A typedef of the result returned by many methods.
174pub type Result<T, E = Error> = result::Result<T, E>;
175
176/// See the [method documentation](#tymethod.optional).
177pub trait OptionalExt<T> {
178    /// Converts a `Result<T>` into a `Result<Option<T>>`.
179    ///
180    /// By default, duckdb-rs treats 0 rows being returned from a query that is
181    /// expected to return 1 row as an error. This method will
182    /// handle that error, and give you back an `Option<T>` instead.
183    fn optional(self) -> Result<Option<T>>;
184}
185
186impl<T> OptionalExt<T> for Result<T> {
187    fn optional(self) -> Result<Option<T>> {
188        match self {
189            Ok(value) => Ok(Some(value)),
190            Err(Error::QueryReturnedNoRows) => Ok(None),
191            Err(e) => Err(e),
192        }
193    }
194}
195
196/// Name for a database within a DuckDB connection.
197#[derive(Copy, Clone, Debug)]
198pub enum DatabaseName<'a> {
199    /// The main database.
200    Main,
201
202    /// The temporary database (e.g., any "CREATE TEMPORARY TABLE" tables).
203    Temp,
204
205    /// A database that has been attached via "ATTACH DATABASE ...".
206    Attached(&'a str),
207}
208
209#[allow(clippy::needless_lifetimes)]
210impl<'a> fmt::Display for DatabaseName<'a> {
211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
212        match *self {
213            DatabaseName::Main => write!(f, "main"),
214            DatabaseName::Temp => write!(f, "temp"),
215            DatabaseName::Attached(s) => write!(f, "{s}"),
216        }
217    }
218}
219
220/// Shorthand for [`DatabaseName::Main`].
221pub const MAIN_DB: DatabaseName<'static> = DatabaseName::Main;
222
223/// Shorthand for [`DatabaseName::Temp`].
224pub const TEMP_DB: DatabaseName<'static> = DatabaseName::Temp;
225
226/// A connection to a DuckDB database.
227pub struct Connection {
228    db: RefCell<InnerConnection>,
229    cache: StatementCache,
230    path: Option<PathBuf>,
231}
232
233unsafe impl Send for Connection {}
234
235impl Connection {
236    /// Open a new connection to a DuckDB database.
237    ///
238    /// `Connection::open(path)` is equivalent to
239    /// `Connection::open_with_flags(path,
240    /// Config::default())`.
241    ///
242    /// ```rust,no_run
243    /// # use duckdb::{Connection, Result};
244    /// fn open_my_db() -> Result<()> {
245    ///     let path = "./my_db.db3";
246    ///     let db = Connection::open(&path)?;
247    ///     println!("{}", db.is_autocommit());
248    ///     Ok(())
249    /// }
250    /// ```
251    ///
252    /// # Failure
253    ///
254    /// Will return `Err` if `path` cannot be converted to a C-compatible
255    /// string or if the underlying DuckDB open call fails.
256    #[inline]
257    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
258        Self::open_with_flags(path, Config::default())
259    }
260
261    /// Open a new connection to an in-memory DuckDB database.
262    ///
263    /// # Failure
264    ///
265    /// Will return `Err` if the underlying DuckDB open call fails.
266    #[inline]
267    pub fn open_in_memory() -> Result<Self> {
268        Self::open_in_memory_with_flags(Config::default())
269    }
270
271    /// Open a new connection to an ffi database.
272    ///
273    /// # Failure
274    ///
275    /// Will return `Err` if the underlying DuckDB open call fails.
276    /// # Safety
277    ///
278    /// Need to pass in a valid db instance
279    #[inline]
280    pub unsafe fn open_from_raw(raw: ffi::duckdb_database) -> Result<Self> {
281        unsafe { InnerConnection::new_from_raw_db(raw, false) }.map(|db| Self {
282            db: RefCell::new(db),
283            cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
284            path: None, // Can we know the path from connection?
285        })
286    }
287
288    /// Open a new connection to a DuckDB database.
289    ///
290    /// # Failure
291    ///
292    /// Will return `Err` if `path` cannot be converted to a C-compatible
293    /// string or if the underlying DuckDB open call fails.
294    #[inline]
295    pub fn open_with_flags<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
296        #[cfg(unix)]
297        fn path_to_cstring(p: &Path) -> Result<CString> {
298            use std::os::unix::ffi::OsStrExt;
299            Ok(CString::new(p.as_os_str().as_bytes())?)
300        }
301
302        #[cfg(not(unix))]
303        fn path_to_cstring(p: &Path) -> Result<CString> {
304            let s = p.to_str().ok_or_else(|| Error::InvalidPath(p.to_owned()))?;
305            Ok(CString::new(s)?)
306        }
307
308        let c_path = path_to_cstring(path.as_ref())?;
309        let config = config.with("duckdb_api", "rust").unwrap();
310        InnerConnection::open_with_flags(&c_path, config).map(|db| Self {
311            db: RefCell::new(db),
312            cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
313            path: Some(path.as_ref().to_path_buf()),
314        })
315    }
316
317    /// Open a new connection to an in-memory DuckDB database.
318    ///
319    /// # Failure
320    ///
321    /// Will return `Err` if the underlying DuckDB open call fails.
322    #[inline]
323    pub fn open_in_memory_with_flags(config: Config) -> Result<Self> {
324        Self::open_with_flags(":memory:", config)
325    }
326
327    /// Convenience method to run multiple SQL statements (that cannot take any
328    /// parameters).
329    ///
330    /// ## Example
331    ///
332    /// ```rust,no_run
333    /// # use duckdb::{Connection, Result};
334    /// fn create_tables(conn: &Connection) -> Result<()> {
335    ///     conn.execute_batch("BEGIN;
336    ///                         CREATE TABLE foo(x INTEGER);
337    ///                         CREATE TABLE bar(y TEXT);
338    ///                         COMMIT;",
339    ///     )
340    /// }
341    /// ```
342    ///
343    /// # Failure
344    ///
345    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
346    /// or if the underlying DuckDB call fails.
347    pub fn execute_batch(&self, sql: &str) -> Result<()> {
348        self.db.borrow_mut().execute(sql)
349    }
350
351    /// Convenience method to prepare and execute a single SQL statement.
352    ///
353    /// On success, returns the number of rows that were changed or inserted or
354    /// deleted.
355    ///
356    /// ## Example
357    ///
358    /// ### With params
359    ///
360    /// ```rust,no_run
361    /// # use duckdb::{Connection};
362    /// fn update_rows(conn: &Connection) {
363    ///     match conn.execute("UPDATE foo SET bar = 'baz' WHERE qux = ?", [1i32]) {
364    ///         Ok(updated) => println!("{} rows were updated", updated),
365    ///         Err(err) => println!("update failed: {}", err),
366    ///     }
367    /// }
368    /// ```
369    ///
370    /// ### With params of varying types
371    ///
372    /// ```rust,no_run
373    /// # use duckdb::{Connection, params};
374    /// fn update_rows(conn: &Connection) {
375    ///     match conn.execute("UPDATE foo SET bar = ? WHERE qux = ?", params![&"baz", 1i32]) {
376    ///         Ok(updated) => println!("{} rows were updated", updated),
377    ///         Err(err) => println!("update failed: {}", err),
378    ///     }
379    /// }
380    /// ```
381    ///
382    /// # Failure
383    ///
384    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
385    /// or if the underlying DuckDB call fails.
386    #[inline]
387    pub fn execute<P: Params>(&self, sql: &str, params: P) -> Result<usize> {
388        self.prepare(sql).and_then(|mut stmt| stmt.execute(params))
389    }
390
391    /// Returns the path to the database file, if one exists and is known.
392    #[inline]
393    pub fn path(&self) -> Option<&Path> {
394        self.path.as_deref()
395    }
396
397    /// Convenience method to execute a query that is expected to return a
398    /// single row.
399    ///
400    /// ## Example
401    ///
402    /// ```rust,no_run
403    /// # use duckdb::{Result, Connection};
404    /// fn preferred_locale(conn: &Connection) -> Result<String> {
405    ///     conn.query_row(
406    ///         "SELECT value FROM preferences WHERE name='locale'",
407    ///         [],
408    ///         |row| row.get(0),
409    ///     )
410    /// }
411    /// ```
412    ///
413    /// If the query returns more than one row, all rows except the first are
414    /// ignored.
415    ///
416    /// Returns `Err(QueryReturnedNoRows)` if no results are returned. If the
417    /// query truly is optional, you can call `.optional()` on the result of
418    /// this to get a `Result<Option<T>>`.
419    ///
420    /// # Failure
421    ///
422    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
423    /// or if the underlying DuckDB call fails.
424    #[inline]
425    pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T>
426    where
427        P: Params,
428        F: FnOnce(&Row<'_>) -> Result<T>,
429    {
430        self.prepare(sql)?.query_row(params, f)
431    }
432
433    /// Convenience method to execute a query that is expected to return a
434    /// single row, and execute a mapping via `f` on that returned row with
435    /// the possibility of failure. The `Result` type of `f` must implement
436    /// `std::convert::From<Error>`.
437    ///
438    /// ## Example
439    ///
440    /// ```rust,no_run
441    /// # use duckdb::{Result, Connection};
442    /// fn preferred_locale(conn: &Connection) -> Result<String> {
443    ///     conn.query_row_and_then(
444    ///         "SELECT value FROM preferences WHERE name='locale'",
445    ///         [],
446    ///         |row| row.get(0),
447    ///     )
448    /// }
449    /// ```
450    ///
451    /// If the query returns more than one row, all rows except the first are
452    /// ignored.
453    ///
454    /// # Failure
455    ///
456    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
457    /// or if the underlying DuckDB call fails.
458    #[inline]
459    pub fn query_row_and_then<T, E, P, F>(&self, sql: &str, params: P, f: F) -> Result<T, E>
460    where
461        P: Params,
462        F: FnOnce(&Row<'_>) -> Result<T, E>,
463        E: convert::From<Error>,
464    {
465        self.prepare(sql)?
466            .query(params)?
467            .get_expected_row()
468            .map_err(E::from)
469            .and_then(f)
470    }
471
472    /// Prepare a SQL statement for execution.
473    ///
474    /// ## Example
475    ///
476    /// ```rust,no_run
477    /// # use duckdb::{Connection, Result};
478    /// fn insert_new_people(conn: &Connection) -> Result<()> {
479    ///     let mut stmt = conn.prepare("INSERT INTO People (name) VALUES (?)")?;
480    ///     stmt.execute(["Joe Smith"])?;
481    ///     stmt.execute(["Bob Jones"])?;
482    ///     Ok(())
483    /// }
484    /// ```
485    ///
486    /// # Failure
487    ///
488    /// Will return `Err` if `sql` cannot be converted to a C-compatible string
489    /// or if the underlying DuckDB call fails.
490    #[inline]
491    pub fn prepare(&self, sql: &str) -> Result<Statement<'_>> {
492        self.db.borrow_mut().prepare(self, sql)
493    }
494
495    /// Create an Appender for fast import data
496    /// default to use `DatabaseName::Main`
497    ///
498    /// ## Example
499    ///
500    /// ```rust,no_run
501    /// # use duckdb::{Connection, Result, params};
502    /// fn insert_rows(conn: &Connection) -> Result<()> {
503    ///     let mut app = conn.appender("foo")?;
504    ///     app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
505    ///     Ok(())
506    /// }
507    /// ```
508    ///
509    /// # Failure
510    ///
511    /// Will return `Err` if `table` not exists
512    pub fn appender(&self, table: &str) -> Result<Appender<'_>> {
513        self.appender_to_db(table, &DatabaseName::Main.to_string())
514    }
515
516    /// Create an Appender for fast import data
517    ///
518    /// ## Example
519    ///
520    /// ```rust,no_run
521    /// # use duckdb::{Connection, Result, params, DatabaseName};
522    /// fn insert_rows(conn: &Connection) -> Result<()> {
523    ///     let mut app = conn.appender_to_db("foo", &DatabaseName::Main.to_string())?;
524    ///     app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
525    ///     Ok(())
526    /// }
527    /// ```
528    ///
529    /// # Failure
530    ///
531    /// Will return `Err` if `table` not exists
532    pub fn appender_to_db(&self, table: &str, schema: &str) -> Result<Appender<'_>> {
533        self.db.borrow_mut().appender(self, table, schema)
534    }
535
536    /// Create an Appender for fast import data with provided catalog, schema and table
537    ///
538    /// ## Example
539    ///
540    /// ```rust,no_run
541    /// # use duckdb::{Connection, Result, params, DatabaseName};
542    /// fn insert_rows(conn: &Connection) -> Result<()> {
543    ///     let mut app = conn.appender_to_catalog_and_db("catalog", &DatabaseName::Main.to_string(), "foo")?;
544    ///     app.append_rows([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10]])?;
545    ///     Ok(())
546    /// }
547    /// ```
548    ///
549    /// # Failure
550    ///
551    /// Will return `Err` if `catalog` or `schema` not exists
552    pub fn appender_to_catalog_and_db(&self, table: &str, catalog: &str, schema: &str) -> Result<Appender<'_>> {
553        self.db
554            .borrow_mut()
555            .appender_to_catalog_and_db(self, table, catalog, schema)
556    }
557
558    /// Create an Appender that only provides values for specific columns.
559    ///
560    /// Columns not in the list will use their DEFAULT value, or NULL if no default.
561    /// This supports all types of DEFAULT expressions including non-deterministic
562    /// ones like `random()`, `current_timestamp`, or sequences.
563    ///
564    /// ## Example
565    ///
566    /// ```rust,no_run
567    /// # use duckdb::{Connection, Result};
568    /// fn insert_partial(conn: &Connection) -> Result<()> {
569    ///     // Table: CREATE TABLE foo(id INT DEFAULT nextval('seq'), name TEXT, created TIMESTAMP DEFAULT current_timestamp)
570    ///     let mut app = conn.appender_with_columns("foo", &["name"])?;
571    ///     // Only provide name; id and created use their defaults
572    ///     app.append_row(["Alice"])?;
573    ///     app.append_row(["Bob"])?;
574    ///     Ok(())
575    /// }
576    /// ```
577    ///
578    /// # Failure
579    ///
580    /// Will return `Err` if `table` does not exist or a column name is invalid.
581    pub fn appender_with_columns(&self, table: &str, columns: &[&str]) -> Result<Appender<'_>> {
582        self.appender_with_columns_to_db(table, &DatabaseName::Main.to_string(), columns)
583    }
584
585    /// Create an Appender that only provides values for specific columns, with schema.
586    ///
587    /// See [`appender_with_columns`](Connection::appender_with_columns) for details.
588    pub fn appender_with_columns_to_db(&self, table: &str, schema: &str, columns: &[&str]) -> Result<Appender<'_>> {
589        self.db
590            .borrow_mut()
591            .appender_with_columns(self, table, schema, None, columns)
592    }
593
594    /// Create an Appender that only provides values for specific columns, with catalog and schema.
595    ///
596    /// See [`appender_with_columns`](Connection::appender_with_columns) for details.
597    pub fn appender_with_columns_to_catalog_and_db(
598        &self,
599        table: &str,
600        catalog: &str,
601        schema: &str,
602        columns: &[&str],
603    ) -> Result<Appender<'_>> {
604        self.db
605            .borrow_mut()
606            .appender_with_columns(self, table, schema, Some(catalog), columns)
607    }
608
609    /// Get a handle to interrupt long-running queries.
610    ///
611    /// ## Example
612    ///
613    /// ```rust,no_run
614    /// # use duckdb::{Connection, Result};
615    /// fn run_query(conn: Connection) -> Result<()> {
616    ///   let interrupt_handle = conn.interrupt_handle();
617    ///   let join_handle = std::thread::spawn(move || { conn.execute("expensive query", []) });
618    ///
619    ///   // Arbitrary wait for query to start
620    ///   std::thread::sleep(std::time::Duration::from_millis(100));
621    ///
622    ///   interrupt_handle.interrupt();
623    ///
624    ///   let query_result = join_handle.join().unwrap();
625    ///   assert!(query_result.is_err());
626    ///
627    ///   Ok(())
628    /// }
629    pub fn interrupt_handle(&self) -> std::sync::Arc<InterruptHandle> {
630        self.db.borrow().get_interrupt_handle()
631    }
632
633    /// Close the DuckDB connection.
634    ///
635    /// This is functionally equivalent to the `Drop` implementation for
636    /// `Connection` except that on failure, it returns an error and the
637    /// connection itself (presumably so closing can be attempted again).
638    ///
639    /// # Failure
640    ///
641    /// Will return `Err` if the underlying DuckDB call fails.
642    #[inline]
643    #[allow(clippy::result_large_err)]
644    pub fn close(self) -> Result<(), (Self, Error)> {
645        let r = self.db.borrow_mut().close();
646        r.map_err(move |err| (self, err))
647    }
648
649    /// Test for auto-commit mode.
650    /// Autocommit mode is on by default.
651    #[inline]
652    pub fn is_autocommit(&self) -> bool {
653        self.db.borrow().is_autocommit()
654    }
655
656    /// Creates a new connection to the already-opened database.
657    pub fn try_clone(&self) -> Result<Self> {
658        let inner = self.db.borrow().try_clone()?;
659        Ok(Self {
660            db: RefCell::new(inner),
661            cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
662            path: self.path.clone(),
663        })
664    }
665
666    /// Returns the version of the DuckDB library
667    pub fn version(&self) -> Result<String> {
668        self.query_row("PRAGMA version", [], |row| row.get(0))
669    }
670}
671
672impl fmt::Debug for Connection {
673    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
674        f.debug_struct("Connection").field("path", &self.path).finish()
675    }
676}
677
678#[cfg(doctest)]
679doc_comment::doctest!("../../../README.md");
680
681#[cfg(test)]
682mod test {
683    use crate::types::Value;
684
685    use super::*;
686    use std::{error::Error as StdError, fmt};
687
688    use arrow::{array::Int32Array, datatypes::DataType, record_batch::RecordBatch};
689    use fallible_iterator::FallibleIterator;
690
691    // this function is never called, but is still type checked; in
692    // particular, calls with specific instantiations will require
693    // that those types are `Send`.
694    #[allow(dead_code, unconditional_recursion, clippy::extra_unused_type_parameters)]
695    fn ensure_send<T: Send>() {
696        ensure_send::<Connection>();
697    }
698
699    pub fn checked_memory_handle() -> Connection {
700        Connection::open_in_memory().unwrap()
701    }
702
703    #[test]
704    fn test_params_of_vary_types() -> Result<()> {
705        let db = checked_memory_handle();
706        let sql = "BEGIN;
707                   CREATE TABLE foo(bar TEXT, qux INTEGER);
708                   INSERT INTO foo VALUES ('baz', 1), ('baz', 2), ('baz', 3);
709                   END;";
710        db.execute_batch(sql)?;
711
712        let changed = db.execute("UPDATE foo SET qux = ? WHERE bar = ?", params![1i32, &"baz"])?;
713        assert_eq!(changed, 3);
714        Ok(())
715    }
716
717    #[test]
718    #[cfg_attr(windows, ignore = "Windows doesn't allow concurrent writes to a file")]
719    fn test_concurrent_transactions_busy_commit() -> Result<()> {
720        let tmp = tempfile::tempdir().unwrap();
721        let path = tmp.path().join("transactions.db3");
722
723        Connection::open(&path)?.execute_batch(
724            "
725            BEGIN;
726            CREATE TABLE foo(x INTEGER);
727            INSERT INTO foo VALUES(42);
728            END;",
729        )?;
730
731        let mut db1 =
732            Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
733        let mut db2 =
734            Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
735
736        {
737            let tx1 = db1.transaction()?;
738            let tx2 = db2.transaction()?;
739
740            // SELECT first makes sqlite lock with a shared lock
741            tx1.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
742            tx2.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
743
744            tx1.execute("INSERT INTO foo VALUES(?1)", [1])?;
745            let _ = tx2.execute("INSERT INTO foo VALUES(?1)", [2]);
746
747            let _ = tx1.commit();
748            let _ = tx2.commit();
749        }
750
751        let _ = db1.transaction().expect("commit should have closed transaction");
752        let _ = db2.transaction().expect("commit should have closed transaction");
753        Ok(())
754    }
755
756    #[test]
757    fn test_persistence() -> Result<()> {
758        let temp_dir = tempfile::tempdir().unwrap();
759        let path = temp_dir.path().join("test.db3");
760
761        {
762            let db = Connection::open(&path)?;
763            let sql = "BEGIN;
764                   CREATE TABLE foo(x INTEGER);
765                   INSERT INTO foo VALUES(42);
766                   END;";
767            db.execute_batch(sql)?;
768        }
769
770        let path_string = path.to_str().unwrap();
771        let db = Connection::open(path_string)?;
772        let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
773
774        assert_eq!(42i64, the_answer?);
775        Ok(())
776    }
777
778    #[test]
779    fn test_open() {
780        let con = Connection::open_in_memory();
781        if let Err(e) = con {
782            panic!("open error {e}");
783        }
784        assert!(Connection::open_in_memory().is_ok());
785        let db = checked_memory_handle();
786        assert!(db.close().is_ok());
787        let _ = checked_memory_handle();
788        let _ = checked_memory_handle();
789    }
790
791    #[test]
792    fn test_open_from_raw() {
793        unsafe {
794            use std::{ffi::c_void, os::raw::c_char, ptr};
795
796            let mut db: ffi::duckdb_database = ptr::null_mut();
797            let mut c_err: *mut c_char = ptr::null_mut();
798            let r = ffi::duckdb_open_ext(
799                c":memory:".as_ptr(),
800                &mut db,
801                Config::default().duckdb_config(),
802                &mut c_err,
803            );
804            if r != ffi::DuckDBSuccess {
805                if !c_err.is_null() {
806                    ffi::duckdb_free(c_err as *mut c_void);
807                }
808                panic!("duckdb_open_ext failed: {r:?}");
809            }
810
811            let conn = Connection::open_from_raw(db).unwrap();
812            conn.execute_batch("SELECT 1").unwrap();
813            let cloned = conn.try_clone().unwrap();
814            drop(conn);
815            cloned.execute_batch("SELECT 2").unwrap();
816            cloned.close().unwrap();
817
818            ffi::duckdb_close(&mut db);
819        }
820    }
821
822    #[test]
823    fn test_open_failure() -> Result<()> {
824        let filename = "no_such_file.db";
825        let result =
826            Connection::open_with_flags(filename, Config::default().access_mode(config::AccessMode::ReadOnly)?);
827        assert!(result.is_err());
828        let err = result.err().unwrap();
829        if let Error::DuckDBFailure(_e, Some(msg)) = err {
830            // TODO: update error code
831            // assert_eq!(ErrorCode::CannotOpen, e.code);
832            assert!(
833                msg.contains(filename),
834                "error message '{msg}' does not contain '{filename}'"
835            );
836        } else {
837            panic!("DuckDBFailure expected");
838        }
839        Ok(())
840    }
841
842    #[cfg(unix)]
843    #[test]
844    fn test_invalid_unicode_file_names() -> Result<()> {
845        use std::{ffi::OsStr, fs::File, os::unix::ffi::OsStrExt};
846        let temp_dir = tempfile::tempdir().unwrap();
847
848        let path = temp_dir.path();
849        if File::create(path.join(OsStr::from_bytes(&[0xFE]))).is_err() {
850            // Skip test, filesystem doesn't support invalid Unicode
851            return Ok(());
852        }
853        let db_path = path.join(OsStr::from_bytes(&[0xFF]));
854        {
855            let db = Connection::open(&db_path)?;
856            let sql = "BEGIN;
857                   CREATE TABLE foo(x INTEGER);
858                   INSERT INTO foo VALUES(42);
859                   END;";
860            db.execute_batch(sql)?;
861        }
862
863        let db = Connection::open(&db_path)?;
864        let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
865
866        assert_eq!(42i64, the_answer?);
867        Ok(())
868    }
869
870    #[test]
871    fn test_close_always_ok() -> Result<()> {
872        let db = checked_memory_handle();
873
874        // TODO: prepare a query but not execute it
875
876        db.close().unwrap();
877        Ok(())
878    }
879
880    #[test]
881    fn test_execute_batch() -> Result<()> {
882        let db = checked_memory_handle();
883        let sql = "BEGIN;
884                   CREATE TABLE foo(x INTEGER);
885                   INSERT INTO foo VALUES(1);
886                   INSERT INTO foo VALUES(2);
887                   INSERT INTO foo VALUES(3);
888                   INSERT INTO foo VALUES(4);
889                   END;";
890        db.execute_batch(sql)?;
891
892        db.execute_batch("UPDATE foo SET x = 3 WHERE x < 3")?;
893
894        assert!(db.execute_batch("INVALID SQL").is_err());
895        Ok(())
896    }
897
898    #[test]
899    fn test_execute_single() -> Result<()> {
900        let db = checked_memory_handle();
901        db.execute_batch("CREATE TABLE foo(x INTEGER)")?;
902
903        assert_eq!(
904            3,
905            db.execute("INSERT INTO foo(x) VALUES (?), (?), (?)", [1i32, 2i32, 3i32])?
906        );
907        assert_eq!(1, db.execute("INSERT INTO foo(x) VALUES (?)", [4i32])?);
908
909        assert_eq!(
910            10i32,
911            db.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
912        );
913        Ok(())
914    }
915
916    #[test]
917    fn test_prepare_column_names() -> Result<()> {
918        let db = checked_memory_handle();
919        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
920
921        let mut stmt = db.prepare("SELECT * FROM foo")?;
922        stmt.execute([])?;
923        assert_eq!(stmt.column_count(), 1);
924        assert_eq!(stmt.column_names(), vec!["x"]);
925
926        let mut stmt = db.prepare("SELECT x AS a, x AS b FROM foo")?;
927        stmt.execute([])?;
928        assert_eq!(stmt.column_count(), 2);
929        assert_eq!(stmt.column_names(), vec!["a", "b"]);
930        Ok(())
931    }
932
933    #[test]
934    fn test_prepare_execute() -> Result<()> {
935        let db = checked_memory_handle();
936        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
937
938        let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
939        assert_eq!(insert_stmt.execute([1i32])?, 1);
940        assert_eq!(insert_stmt.execute([2i32])?, 1);
941        assert_eq!(insert_stmt.execute([3i32])?, 1);
942
943        assert!(insert_stmt.execute(["hello"]).is_err());
944        // NOTE: can't execute on errored stmt
945        // assert!(insert_stmt.execute(["goodbye"]).is_err());
946        // assert!(insert_stmt.execute([types::Null]).is_err());
947
948        let mut update_stmt = db.prepare("UPDATE foo SET x=? WHERE x<?")?;
949        assert_eq!(update_stmt.execute([3i32, 3i32])?, 2);
950        assert_eq!(update_stmt.execute([3i32, 3i32])?, 0);
951        assert_eq!(update_stmt.execute([8i32, 8i32])?, 3);
952        Ok(())
953    }
954
955    #[test]
956    fn test_prepare_query() -> Result<()> {
957        let db = checked_memory_handle();
958        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
959
960        let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
961        assert_eq!(insert_stmt.execute([1i32])?, 1);
962        assert_eq!(insert_stmt.execute([2i32])?, 1);
963        assert_eq!(insert_stmt.execute([3i32])?, 1);
964
965        let mut query = db.prepare("SELECT x FROM foo WHERE x < ? ORDER BY x DESC")?;
966        {
967            let mut rows = query.query([4i32])?;
968            let mut v = Vec::<i32>::new();
969
970            while let Some(row) = rows.next()? {
971                v.push(row.get(0)?);
972            }
973
974            assert_eq!(v, [3i32, 2, 1]);
975        }
976
977        {
978            let mut rows = query.query([3i32])?;
979            let mut v = Vec::<i32>::new();
980
981            while let Some(row) = rows.next()? {
982                v.push(row.get(0)?);
983            }
984
985            assert_eq!(v, [2i32, 1]);
986        }
987        Ok(())
988    }
989
990    #[test]
991    fn test_query_map() -> Result<()> {
992        let db = checked_memory_handle();
993        let sql = "BEGIN;
994                   CREATE TABLE foo(x INTEGER, y TEXT);
995                   INSERT INTO foo VALUES(4, 'hello');
996                   INSERT INTO foo VALUES(3, ', ');
997                   INSERT INTO foo VALUES(2, 'world');
998                   INSERT INTO foo VALUES(1, '!');
999                   END;";
1000        db.execute_batch(sql)?;
1001
1002        let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1003        let results: Result<Vec<String>> = query.query([])?.map(|row| row.get(1)).collect();
1004
1005        assert_eq!(results?.concat(), "hello, world!");
1006        Ok(())
1007    }
1008
1009    #[test]
1010    fn test_query_row() -> Result<()> {
1011        let db = checked_memory_handle();
1012        let sql = "BEGIN;
1013                   CREATE TABLE foo(x INTEGER);
1014                   INSERT INTO foo VALUES(1);
1015                   INSERT INTO foo VALUES(2);
1016                   INSERT INTO foo VALUES(3);
1017                   INSERT INTO foo VALUES(4);
1018                   END;";
1019        db.execute_batch(sql)?;
1020
1021        assert_eq!(
1022            10i64,
1023            db.query_row::<i64, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
1024        );
1025
1026        let result: Result<i64> = db.query_row("SELECT x FROM foo WHERE x > 5", [], |r| r.get(0));
1027        match result.unwrap_err() {
1028            Error::QueryReturnedNoRows => (),
1029            err => panic!("Unexpected error {err}"),
1030        }
1031
1032        let bad_query_result = db.query_row("NOT A PROPER QUERY; test123", [], |_| Ok(()));
1033
1034        assert!(bad_query_result.is_err());
1035        Ok(())
1036    }
1037
1038    #[test]
1039    fn test_optional() -> Result<()> {
1040        let db = checked_memory_handle();
1041
1042        let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 <> 0", [], |r| r.get(0));
1043        let result = result.optional();
1044        match result? {
1045            None => (),
1046            _ => panic!("Unexpected result"),
1047        }
1048
1049        let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 == 0", [], |r| r.get(0));
1050        let result = result.optional();
1051        match result? {
1052            Some(1) => (),
1053            _ => panic!("Unexpected result"),
1054        }
1055
1056        let bad_query_result: Result<i64> = db.query_row("NOT A PROPER QUERY", [], |r| r.get(0));
1057        let bad_query_result = bad_query_result.optional();
1058        assert!(bad_query_result.is_err());
1059        Ok(())
1060    }
1061
1062    #[test]
1063    fn test_prepare_failures() -> Result<()> {
1064        let db = checked_memory_handle();
1065        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1066
1067        let _ = db.prepare("SELECT * FROM does_not_exist").unwrap_err();
1068        // assert!(format!("{}", err).contains("does_not_exist"));
1069        Ok(())
1070    }
1071
1072    #[test]
1073    fn test_is_autocommit() {
1074        let db = checked_memory_handle();
1075        assert!(db.is_autocommit(), "autocommit expected to be active by default");
1076    }
1077
1078    #[test]
1079    #[should_panic(expected = "not supported")]
1080    fn test_statement_debugging() {
1081        let db = checked_memory_handle();
1082        let query = "SELECT 12345";
1083        let stmt = db.prepare(query).unwrap();
1084
1085        assert!(format!("{stmt:?}").contains(query));
1086    }
1087
1088    #[test]
1089    fn test_notnull_constraint_error() -> Result<()> {
1090        let db = checked_memory_handle();
1091        db.execute_batch("CREATE TABLE foo(x TEXT NOT NULL)")?;
1092
1093        let result = db.execute("INSERT INTO foo (x) VALUES (NULL)", []);
1094        assert!(result.is_err());
1095
1096        match result.unwrap_err() {
1097            Error::DuckDBFailure(err, _) => {
1098                // TODO(wangfenjin): Update errorcode
1099                assert_eq!(err.code, ErrorCode::Unknown);
1100            }
1101            err => panic!("Unexpected error {err}"),
1102        }
1103        Ok(())
1104    }
1105
1106    #[test]
1107    fn test_clone() -> Result<()> {
1108        // 1. Drop the cloned connection first. The original connection should still be able to run queries.
1109        {
1110            let owned_con = checked_memory_handle();
1111            {
1112                let cloned_con = owned_con.try_clone().unwrap();
1113                cloned_con.execute_batch("create table test (c1 bigint)")?;
1114                cloned_con.close().unwrap();
1115            }
1116            owned_con.execute_batch("create table test2 (c1 bigint)")?;
1117            owned_con.close().unwrap();
1118        }
1119
1120        // 2. Close the original connection first. The cloned connection should still be able to run queries.
1121        {
1122            let cloned_con = {
1123                let owned_con = checked_memory_handle();
1124                let clone = owned_con.try_clone().unwrap();
1125                owned_con.execute_batch("create table test (c1 bigint)")?;
1126                owned_con.close().unwrap();
1127                clone
1128            };
1129            cloned_con.execute_batch("create table test2 (c1 bigint)")?;
1130            cloned_con.close().unwrap();
1131        }
1132        Ok(())
1133    }
1134
1135    #[test]
1136    fn test_try_clone_after_owner_drop() -> Result<()> {
1137        let clone1 = {
1138            let owned = checked_memory_handle();
1139            let clone1 = owned.try_clone()?;
1140            drop(owned);
1141            clone1
1142        };
1143
1144        let clone2 = clone1.try_clone()?;
1145        clone2.execute_batch("CREATE TABLE t312(i INTEGER); INSERT INTO t312 VALUES (1);")?;
1146        let value: i32 = clone1.query_row("SELECT i FROM t312", [], |r| r.get(0))?;
1147        assert_eq!(value, 1);
1148        Ok(())
1149    }
1150
1151    mod query_and_then_tests {
1152        use super::*;
1153
1154        #[derive(Debug)]
1155        enum CustomError {
1156            SomeError,
1157            Sqlite(Error),
1158        }
1159
1160        impl fmt::Display for CustomError {
1161            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
1162                match *self {
1163                    Self::SomeError => write!(f, "my custom error"),
1164                    Self::Sqlite(ref se) => write!(f, "my custom error: {se}"),
1165                }
1166            }
1167        }
1168
1169        impl StdError for CustomError {
1170            fn description(&self) -> &str {
1171                "my custom error"
1172            }
1173
1174            fn cause(&self) -> Option<&dyn StdError> {
1175                match *self {
1176                    Self::SomeError => None,
1177                    Self::Sqlite(ref se) => Some(se),
1178                }
1179            }
1180        }
1181
1182        impl From<Error> for CustomError {
1183            fn from(se: Error) -> Self {
1184                Self::Sqlite(se)
1185            }
1186        }
1187
1188        type CustomResult<T> = Result<T, CustomError>;
1189
1190        #[test]
1191        fn test_query_and_then() -> Result<()> {
1192            let db = checked_memory_handle();
1193            let sql = "BEGIN;
1194                       CREATE TABLE foo(x INTEGER, y TEXT);
1195                       INSERT INTO foo VALUES(4, 'hello');
1196                       INSERT INTO foo VALUES(3, ', ');
1197                       INSERT INTO foo VALUES(2, 'world');
1198                       INSERT INTO foo VALUES(1, '!');
1199                       END;";
1200            db.execute_batch(sql)?;
1201
1202            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1203            let results: Result<Vec<String>> = query.query_and_then([], |row| row.get(1))?.collect();
1204
1205            assert_eq!(results?.concat(), "hello, world!");
1206            Ok(())
1207        }
1208
1209        #[test]
1210        fn test_query_and_then_fails() -> Result<()> {
1211            let db = checked_memory_handle();
1212            let sql = "BEGIN;
1213                       CREATE TABLE foo(x INTEGER, y TEXT);
1214                       INSERT INTO foo VALUES(4, 'hello');
1215                       INSERT INTO foo VALUES(3, ', ');
1216                       INSERT INTO foo VALUES(2, 'world');
1217                       INSERT INTO foo VALUES(1, '!');
1218                       END;";
1219            db.execute_batch(sql)?;
1220
1221            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1222            let bad_type: Result<Vec<f64>> = query.query_and_then([], |row| row.get(1))?.collect();
1223
1224            match bad_type.unwrap_err() {
1225                Error::InvalidColumnType(..) => (),
1226                err => panic!("Unexpected error {err}"),
1227            }
1228
1229            let bad_idx: Result<Vec<String>> = query.query_and_then([], |row| row.get(3))?.collect();
1230
1231            match bad_idx.unwrap_err() {
1232                Error::InvalidColumnIndex(_) => (),
1233                err => panic!("Unexpected error {err}"),
1234            }
1235            Ok(())
1236        }
1237
1238        #[test]
1239        fn test_query_and_then_custom_error() -> CustomResult<()> {
1240            let db = checked_memory_handle();
1241            let sql = "BEGIN;
1242                       CREATE TABLE foo(x INTEGER, y TEXT);
1243                       INSERT INTO foo VALUES(4, 'hello');
1244                       INSERT INTO foo VALUES(3, ', ');
1245                       INSERT INTO foo VALUES(2, 'world');
1246                       INSERT INTO foo VALUES(1, '!');
1247                       END;";
1248            db.execute_batch(sql)?;
1249
1250            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1251            let results: CustomResult<Vec<String>> = query
1252                .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1253                .collect();
1254
1255            assert_eq!(results?.concat(), "hello, world!");
1256            Ok(())
1257        }
1258
1259        #[test]
1260        fn test_query_and_then_custom_error_fails() -> Result<()> {
1261            let db = checked_memory_handle();
1262            let sql = "BEGIN;
1263                       CREATE TABLE foo(x INTEGER, y TEXT);
1264                       INSERT INTO foo VALUES(4, 'hello');
1265                       INSERT INTO foo VALUES(3, ', ');
1266                       INSERT INTO foo VALUES(2, 'world');
1267                       INSERT INTO foo VALUES(1, '!');
1268                       END;";
1269            db.execute_batch(sql)?;
1270
1271            let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
1272            let bad_type: CustomResult<Vec<f64>> = query
1273                .query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
1274                .collect();
1275
1276            match bad_type.unwrap_err() {
1277                CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1278                err => panic!("Unexpected error {err}"),
1279            }
1280
1281            let bad_idx: CustomResult<Vec<String>> = query
1282                .query_and_then([], |row| row.get(3).map_err(CustomError::Sqlite))?
1283                .collect();
1284
1285            match bad_idx.unwrap_err() {
1286                CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1287                err => panic!("Unexpected error {err}"),
1288            }
1289
1290            let non_sqlite_err: CustomResult<Vec<String>> =
1291                query.query_and_then([], |_| Err(CustomError::SomeError))?.collect();
1292
1293            match non_sqlite_err.unwrap_err() {
1294                CustomError::SomeError => (),
1295                err => panic!("Unexpected error {err}"),
1296            }
1297            Ok(())
1298        }
1299
1300        #[test]
1301        fn test_query_row_and_then_custom_error() -> CustomResult<()> {
1302            let db = checked_memory_handle();
1303            let sql = "BEGIN;
1304                       CREATE TABLE foo(x INTEGER, y TEXT);
1305                       INSERT INTO foo VALUES(4, 'hello');
1306                       END;";
1307            db.execute_batch(sql)?;
1308
1309            let query = "SELECT x, y FROM foo ORDER BY x DESC";
1310            let results: CustomResult<String> =
1311                db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1312
1313            assert_eq!(results?, "hello");
1314            Ok(())
1315        }
1316
1317        #[test]
1318        fn test_query_row_and_then_custom_error_fails() -> Result<()> {
1319            let db = checked_memory_handle();
1320            let sql = "BEGIN;
1321                       CREATE TABLE foo(x INTEGER, y TEXT);
1322                       INSERT INTO foo VALUES(4, 'hello');
1323                       END;";
1324            db.execute_batch(sql)?;
1325
1326            let query = "SELECT x, y FROM foo ORDER BY x DESC";
1327            let bad_type: CustomResult<f64> =
1328                db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
1329
1330            match bad_type.unwrap_err() {
1331                CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
1332                err => panic!("Unexpected error {err}"),
1333            }
1334
1335            let bad_idx: CustomResult<String> =
1336                db.query_row_and_then(query, [], |row| row.get(3).map_err(CustomError::Sqlite));
1337
1338            match bad_idx.unwrap_err() {
1339                CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
1340                err => panic!("Unexpected error {err}"),
1341            }
1342
1343            let non_sqlite_err: CustomResult<String> =
1344                db.query_row_and_then(query, [], |_| Err(CustomError::SomeError));
1345
1346            match non_sqlite_err.unwrap_err() {
1347                CustomError::SomeError => (),
1348                err => panic!("Unexpected error {err}"),
1349            }
1350            Ok(())
1351        }
1352
1353        #[test]
1354        fn test_rows_and_then_with_custom_error() -> Result<()> {
1355            let db = checked_memory_handle();
1356            db.execute_batch("CREATE TABLE test (value INTEGER)")?;
1357            db.execute_batch("INSERT INTO test VALUES (1), (3), (5)")?;
1358
1359            let mut stmt = db.prepare("SELECT value FROM test ORDER BY value")?;
1360            let rows = stmt.query([])?;
1361
1362            // Use and_then to apply custom validation with custom error type
1363            let results: Vec<i32> = rows
1364                .and_then(|row| -> CustomResult<i32> {
1365                    let val: i32 = row.get(0)?; // duckdb::Error automatically converted via From trait
1366                    if val > 10 {
1367                        Err(CustomError::SomeError) // Custom application-specific error
1368                    } else {
1369                        Ok(val)
1370                    }
1371                })
1372                .collect::<CustomResult<Vec<_>>>()
1373                .unwrap();
1374
1375            assert_eq!(results, vec![1, 3, 5]);
1376            Ok(())
1377        }
1378    }
1379
1380    #[test]
1381    fn test_dynamic() -> Result<()> {
1382        let db = checked_memory_handle();
1383        let sql = "BEGIN;
1384                       CREATE TABLE foo(x INTEGER, y TEXT);
1385                       INSERT INTO foo VALUES(4, 'hello');
1386                       END;";
1387        db.execute_batch(sql)?;
1388
1389        db.query_row("SELECT * FROM foo", [], |r| {
1390            assert_eq!(2, r.as_ref().column_count());
1391            Ok(())
1392        })
1393    }
1394    #[test]
1395    fn test_dyn_box() -> Result<()> {
1396        let db = checked_memory_handle();
1397        db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
1398        let b: Box<dyn ToSql> = Box::new(5);
1399        db.execute("INSERT INTO foo VALUES(?)", [b])?;
1400        db.query_row("SELECT x FROM foo", [], |r| {
1401            assert_eq!(5, r.get_unwrap::<_, i32>(0));
1402            Ok(())
1403        })
1404    }
1405
1406    #[test]
1407    fn test_alter_table() -> Result<()> {
1408        let db = checked_memory_handle();
1409        db.execute_batch("CREATE TABLE x(t INTEGER);")?;
1410        // `execute_batch` should be used but `execute` should also work
1411        db.execute("ALTER TABLE x RENAME TO y;", [])?;
1412        Ok(())
1413    }
1414
1415    #[test]
1416    fn test_query_arrow_record_batch_small() -> Result<()> {
1417        let db = checked_memory_handle();
1418        let sql = "BEGIN TRANSACTION;
1419                   CREATE TABLE test(t INTEGER);
1420                   INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);
1421                   END TRANSACTION;";
1422        db.execute_batch(sql)?;
1423        let mut stmt = db.prepare("select t from test order by t desc")?;
1424        let mut arr = stmt.query_arrow([])?;
1425
1426        let schema = arr.get_schema();
1427        assert_eq!(schema.fields().len(), 1);
1428        assert_eq!(schema.field(0).name(), "t");
1429        assert_eq!(schema.field(0).data_type(), &DataType::Int32);
1430
1431        let rb = arr.next().unwrap();
1432        let column = rb.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1433        assert_eq!(column.len(), 5);
1434        assert_eq!(column.value(0), 5);
1435        assert_eq!(column.value(1), 4);
1436        assert_eq!(column.value(2), 3);
1437        assert_eq!(column.value(3), 2);
1438        assert_eq!(column.value(4), 1);
1439
1440        assert!(arr.next().is_none());
1441        Ok(())
1442    }
1443
1444    #[test]
1445    fn test_query_arrow_record_batch_large() -> Result<()> {
1446        let db = checked_memory_handle();
1447        db.execute_batch("BEGIN TRANSACTION")?;
1448        db.execute_batch("CREATE TABLE test(t INTEGER);")?;
1449        for _ in 0..600 {
1450            db.execute_batch("INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);")?;
1451        }
1452        db.execute_batch("END TRANSACTION")?;
1453        let rbs: Vec<RecordBatch> = db.prepare("select t from test order by t")?.query_arrow([])?.collect();
1454        // batch size is not stable
1455        // assert_eq!(rbs.len(), 3);
1456        assert_eq!(rbs.iter().map(|rb| rb.num_rows()).sum::<usize>(), 3000);
1457        assert_eq!(
1458            rbs.iter()
1459                .map(|rb| rb
1460                    .column(0)
1461                    .as_any()
1462                    .downcast_ref::<Int32Array>()
1463                    .unwrap()
1464                    .iter()
1465                    .map(|i| i.unwrap())
1466                    .sum::<i32>())
1467                .sum::<i32>(),
1468            9000
1469        );
1470        Ok(())
1471    }
1472
1473    #[test]
1474    fn test_stream_arrow_with_call() -> Result<()> {
1475        use arrow::datatypes::{DataType, Field, Schema};
1476        use std::sync::Arc;
1477
1478        let db = checked_memory_handle();
1479
1480        db.execute_batch(
1481            "CREATE TABLE test_data(id INTEGER, name VARCHAR);
1482             INSERT INTO test_data VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie');",
1483        )?;
1484
1485        db.execute_batch("CREATE MACRO test_func() AS TABLE SELECT * FROM test_data;")?;
1486
1487        let schema = Arc::new(Schema::new(vec![
1488            Field::new("id", DataType::Int32, true),
1489            Field::new("name", DataType::Utf8, true),
1490        ]));
1491
1492        let mut stmt = db.prepare("CALL test_func()")?;
1493        let rbs: Vec<RecordBatch> = stmt.stream_arrow([], schema)?.collect();
1494
1495        // Verify we got results
1496        assert!(!rbs.is_empty(), "Expected at least one record batch");
1497        let total_rows: usize = rbs.iter().map(|rb| rb.num_rows()).sum();
1498        assert_eq!(total_rows, 3);
1499
1500        let id_column = rbs[0].column(0).as_any().downcast_ref::<Int32Array>().unwrap();
1501        assert_eq!(id_column.value(0), 1);
1502
1503        Ok(())
1504    }
1505
1506    #[test]
1507    fn round_trip_interval() -> Result<()> {
1508        let db = checked_memory_handle();
1509        db.execute_batch("CREATE TABLE foo (t INTERVAL);")?;
1510
1511        let d = Value::Interval {
1512            months: 1,
1513            days: 2,
1514            nanos: 3,
1515        };
1516        db.execute("INSERT INTO foo VALUES (?)", [d])?;
1517
1518        let mut stmt = db.prepare("SELECT t FROM foo")?;
1519        let mut rows = stmt.query([])?;
1520        let row = rows.next()?.unwrap();
1521        let d: Value = row.get_unwrap(0);
1522        assert_eq!(d, d);
1523        Ok(())
1524    }
1525
1526    #[test]
1527    fn test_database_name_to_string() -> Result<()> {
1528        assert_eq!(DatabaseName::Main.to_string(), "main");
1529        assert_eq!(DatabaseName::Temp.to_string(), "temp");
1530        assert_eq!(DatabaseName::Attached("abc").to_string(), "abc");
1531        Ok(())
1532    }
1533
1534    #[test]
1535    fn test_interrupt() -> Result<()> {
1536        let db = checked_memory_handle();
1537        let db_interrupt = db.interrupt_handle();
1538
1539        let (tx, rx) = std::sync::mpsc::channel();
1540        std::thread::spawn(move || {
1541            let mut stmt = db
1542                .prepare("select count(*) from range(10000000) t1, range(1000000) t2")
1543                .unwrap();
1544            tx.send(stmt.execute([])).unwrap();
1545        });
1546
1547        std::thread::sleep(std::time::Duration::from_millis(100));
1548        db_interrupt.interrupt();
1549
1550        let result = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap();
1551        assert!(result.is_err_and(|err| err.to_string().contains("INTERRUPT")));
1552        Ok(())
1553    }
1554
1555    #[test]
1556    fn test_interrupt_on_dropped_db() {
1557        let db = checked_memory_handle();
1558        let db_interrupt = db.interrupt_handle();
1559
1560        drop(db);
1561        db_interrupt.interrupt();
1562    }
1563
1564    #[test]
1565    fn test_arrow_string_view_setting() -> Result<()> {
1566        // Test that only one setting doesn't work (missing arrow_output_version)
1567        {
1568            let config = Config::default().with("produce_arrow_string_view", "true")?;
1569            let conn = Connection::open_in_memory_with_flags(config)?;
1570
1571            let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1572            let arrow = query.query_arrow([])?;
1573
1574            let batch = arrow.into_iter().next().expect("Expected at least one batch");
1575            assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8);
1576        }
1577
1578        {
1579            let config = Config::default()
1580                .with("produce_arrow_string_view", "true")?
1581                .with("arrow_output_version", "1.4")?;
1582            let conn = Connection::open_in_memory_with_flags(config)?;
1583
1584            let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
1585            let arrow = query.query_arrow([])?;
1586
1587            let batch = arrow.into_iter().next().expect("Expected at least one batch");
1588            assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8View);
1589        }
1590
1591        Ok(())
1592    }
1593
1594    #[test]
1595    fn test_prepare_multi_statement() -> Result<()> {
1596        let db = checked_memory_handle();
1597
1598        {
1599            let mut stmt =
1600                db.prepare("CREATE TABLE test(x INTEGER); INSERT INTO test VALUES (42); SELECT x FROM test;")?;
1601            let result: i32 = stmt.query_row([], |row| row.get(0))?;
1602            assert_eq!(result, 42);
1603        }
1604
1605        {
1606            let mut stmt = db.prepare(
1607                "CREATE TEMP TABLE temp_data(id INTEGER, value TEXT);
1608                INSERT INTO temp_data VALUES (1, 'first'), (2, 'second');
1609                SELECT COUNT(*) FROM temp_data;",
1610            )?;
1611            let count: i32 = stmt.query_row([], |row| row.get(0))?;
1612            assert_eq!(count, 2);
1613        }
1614
1615        Ok(())
1616    }
1617
1618    #[test]
1619    fn test_pivot_query() -> Result<()> {
1620        let db = checked_memory_handle();
1621
1622        db.execute_batch(
1623            "CREATE TABLE cities(city VARCHAR, year INTEGER, population INTEGER);
1624             INSERT INTO cities VALUES
1625               ('Amsterdam', 2000, 1005),
1626               ('Amsterdam', 2010, 1065),
1627               ('Amsterdam', 2020, 1158),
1628               ('Berlin', 2000, 3382),
1629               ('Berlin', 2010, 3460),
1630               ('Berlin', 2020, 3576);",
1631        )?;
1632
1633        // PIVOT queries internally expand to multiple statements
1634        let mut stmt = db.prepare("PIVOT cities ON year USING sum(population);")?;
1635        let mut rows = stmt.query([])?;
1636
1637        let mut row_count = 0;
1638        while let Some(_row) = rows.next()? {
1639            row_count += 1;
1640        }
1641        assert_eq!(row_count, 2);
1642
1643        Ok(())
1644    }
1645
1646    #[test]
1647    fn test_multiple_memory_databases() -> Result<()> {
1648        // Unnamed :memory: connections are isolated
1649        {
1650            let mem1 = Connection::open_in_memory()?;
1651            let mem2 = Connection::open_in_memory()?;
1652
1653            mem1.execute_batch("CREATE TABLE test (id INTEGER)")?;
1654            mem1.execute("INSERT INTO test VALUES (1)", [])?;
1655
1656            mem2.execute_batch("CREATE TABLE test (id INTEGER)")?;
1657            mem2.execute("INSERT INTO test VALUES (2)", [])?;
1658
1659            let value1: i32 = mem1.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1660            assert_eq!(value1, 1);
1661
1662            let value2: i32 = mem2.query_row("SELECT id FROM test", [], |r| r.get(0))?;
1663            assert_eq!(value2, 2);
1664        }
1665
1666        // try_clone() shares the same database
1667        {
1668            let shared = Connection::open_in_memory()?;
1669
1670            shared.execute_batch("CREATE TABLE shared_table (id INTEGER)")?;
1671            shared.execute("INSERT INTO shared_table VALUES (123)", [])?;
1672
1673            let cloned = shared.try_clone()?;
1674
1675            // Cloned connection can see the original's tables
1676            let value: i32 = cloned.query_row("SELECT id FROM shared_table", [], |r| r.get(0))?;
1677            assert_eq!(value, 123);
1678
1679            cloned.execute("INSERT INTO shared_table VALUES (456)", [])?;
1680
1681            // Original connection can see cloned's insert
1682            let count: i64 = shared.query_row("SELECT COUNT(*) FROM shared_table", [], |r| r.get(0))?;
1683            assert_eq!(count, 2);
1684        }
1685
1686        Ok(())
1687    }
1688
1689    #[test]
1690    fn test_appender_with_catalog() -> Result<()> {
1691        let db = checked_memory_handle();
1692
1693        // Attach a new database to use as a catalog
1694        let temp_dir = tempfile::tempdir().unwrap();
1695        let attached_path = temp_dir.path().join("attached.db");
1696        db.execute_batch(&format!("ATTACH '{}' AS attached_db", attached_path.display()))?;
1697
1698        // Create a table in the attached database
1699        db.execute_batch("CREATE TABLE attached_db.main.test_table (id INTEGER, name TEXT)")?;
1700
1701        // Use appender with catalog
1702        {
1703            let mut app = db.appender_to_catalog_and_db("test_table", "attached_db", "main")?;
1704            app.append_row(params![1, "Alice"])?;
1705            app.append_row(params![2, "Bob"])?;
1706            app.append_row(params![3, "Charlie"])?;
1707        }
1708
1709        // Verify data was inserted into the correct table
1710        let count: i64 = db.query_row("SELECT COUNT(*) FROM attached_db.main.test_table", [], |r| r.get(0))?;
1711        assert_eq!(count, 3);
1712
1713        let name: String = db.query_row("SELECT name FROM attached_db.main.test_table WHERE id = ?", [2], |r| {
1714            r.get(0)
1715        })?;
1716        assert_eq!(name, "Bob");
1717
1718        Ok(())
1719    }
1720
1721    #[test]
1722    fn test_appender_with_catalog_multiple_schemas() -> Result<()> {
1723        let db = checked_memory_handle();
1724
1725        // Attach a new database
1726        let temp_dir = tempfile::tempdir().unwrap();
1727        let attached_path = temp_dir.path().join("multi_schema.db");
1728        db.execute_batch(&format!("ATTACH '{}' AS my_catalog", attached_path.display()))?;
1729
1730        // Create multiple schemas and tables
1731        db.execute_batch("CREATE SCHEMA my_catalog.schema1")?;
1732        db.execute_batch("CREATE SCHEMA my_catalog.schema2")?;
1733        db.execute_batch("CREATE TABLE my_catalog.schema1.data (value INTEGER)")?;
1734        db.execute_batch("CREATE TABLE my_catalog.schema2.data (value INTEGER)")?;
1735
1736        // Append to schema1
1737        {
1738            let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema1")?;
1739            app.append_rows([[10], [20], [30]])?;
1740        }
1741
1742        // Append to schema2
1743        {
1744            let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema2")?;
1745            app.append_rows([[100], [200]])?;
1746        }
1747
1748        // Verify data in schema1
1749        let sum1: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema1.data", [], |r| r.get(0))?;
1750        assert_eq!(sum1, 60);
1751
1752        // Verify data in schema2
1753        let sum2: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema2.data", [], |r| r.get(0))?;
1754        assert_eq!(sum2, 300);
1755
1756        Ok(())
1757    }
1758
1759    #[test]
1760    fn test_appender_with_catalog_main_vs_attached() -> Result<()> {
1761        let db = checked_memory_handle();
1762
1763        // Create table in main database
1764        db.execute_batch("CREATE TABLE test (id INTEGER)")?;
1765
1766        // Attach another database
1767        let temp_dir = tempfile::tempdir().unwrap();
1768        let attached_path = temp_dir.path().join("other.db");
1769        db.execute_batch(&format!("ATTACH '{}' AS other_db", attached_path.display()))?;
1770        db.execute_batch("CREATE TABLE other_db.main.test (id INTEGER)")?;
1771
1772        // Append to main catalog (memory)
1773        {
1774            let mut app = db.appender_to_catalog_and_db("test", "memory", "main")?;
1775            app.append_rows([[1], [2]])?;
1776        }
1777
1778        // Append to attached catalog
1779        {
1780            let mut app = db.appender_to_catalog_and_db("test", "other_db", "main")?;
1781            app.append_rows([[100], [200]])?;
1782        }
1783
1784        // Verify main database
1785        let count_main: i64 = db.query_row("SELECT COUNT(*) FROM test", [], |r| r.get(0))?;
1786        assert_eq!(count_main, 2);
1787
1788        // Verify attached database
1789        let count_attached: i64 = db.query_row("SELECT COUNT(*) FROM other_db.main.test", [], |r| r.get(0))?;
1790        assert_eq!(count_attached, 2);
1791
1792        Ok(())
1793    }
1794
1795    #[test]
1796    fn test_appender_with_catalog_error_invalid_catalog() -> Result<()> {
1797        let db = checked_memory_handle();
1798
1799        // Try to create appender with non-existent catalog
1800        let result = db.appender_to_catalog_and_db("test", "nonexistent_catalog", "main");
1801        assert!(result.is_err());
1802
1803        Ok(())
1804    }
1805
1806    #[test]
1807    fn test_appender_with_catalog_error_invalid_schema() -> Result<()> {
1808        let db = checked_memory_handle();
1809
1810        // Attach a database
1811        let temp_dir = tempfile::tempdir().unwrap();
1812        let attached_path = temp_dir.path().join("test.db");
1813        db.execute_batch(&format!("ATTACH '{}' AS my_db", attached_path.display()))?;
1814
1815        db.execute_batch("CREATE TABLE my_db.main.test (id INTEGER)")?;
1816
1817        // Try to create appender with non-existent schema
1818        let result = db.appender_to_catalog_and_db("test", "my_db", "nonexistent_schema");
1819        assert!(result.is_err());
1820
1821        Ok(())
1822    }
1823
1824    #[test]
1825    fn test_appender_with_catalog_flush() -> Result<()> {
1826        let db = checked_memory_handle();
1827
1828        // Attach database
1829        let temp_dir = tempfile::tempdir().unwrap();
1830        let attached_path = temp_dir.path().join("flush_test.db");
1831        db.execute_batch(&format!("ATTACH '{}' AS flush_db", attached_path.display()))?;
1832
1833        db.execute_batch("CREATE TABLE flush_db.main.test (id INTEGER)")?;
1834
1835        // Use appender with explicit flush
1836        {
1837            let mut app = db.appender_to_catalog_and_db("test", "flush_db", "main")?;
1838            app.append_row([1])?;
1839            app.append_row([2])?;
1840            app.flush()?;
1841            app.append_row([3])?;
1842            app.flush()?;
1843        }
1844
1845        // Verify all rows were flushed
1846        let count: i64 = db.query_row("SELECT COUNT(*) FROM flush_db.main.test", [], |r| r.get(0))?;
1847        assert_eq!(count, 3);
1848
1849        Ok(())
1850    }
1851
1852    /// Enum values should reflect actual row data, not just iterate over enum variants.
1853    #[test]
1854    fn test_enum_read() -> Result<()> {
1855        let conn = Connection::open_in_memory()?;
1856        conn.execute_batch(
1857            r#"
1858            CREATE TABLE stats (
1859                name ENUM('CA', 'NY'),
1860                value INTEGER,
1861            );
1862            INSERT INTO stats VALUES ('CA', 10), ('CA', 20), ('NY', 4);
1863            "#,
1864        )?;
1865
1866        let mut stmt = conn.prepare("SELECT * FROM stats")?;
1867        let results: Vec<(String, i32)> = stmt
1868            .query_map([], |row| {
1869                let name: String = row.get(0)?;
1870                let value: i32 = row.get(1)?;
1871                Ok((name, value))
1872            })?
1873            .map(|r| r.unwrap())
1874            .collect();
1875
1876        assert_eq!(results.len(), 3);
1877        assert_eq!(results[0], ("CA".to_string(), 10));
1878        assert_eq!(results[1], ("CA".to_string(), 20));
1879        assert_eq!(results[2], ("NY".to_string(), 4));
1880        Ok(())
1881    }
1882
1883    #[test]
1884    fn test_enum_read_nullable() -> Result<()> {
1885        let conn = Connection::open_in_memory()?;
1886        conn.execute_batch(
1887            r#"
1888            CREATE TABLE stats (name ENUM('CA', 'NY'));
1889            INSERT INTO stats VALUES ('CA'), (NULL), ('NY');
1890            "#,
1891        )?;
1892
1893        let mut stmt = conn.prepare("SELECT name FROM stats")?;
1894        let results: Vec<Option<String>> = stmt.query_map([], |row| row.get(0))?.map(|r| r.unwrap()).collect();
1895
1896        assert_eq!(results, vec![Some("CA".into()), None, Some("NY".into())]);
1897        Ok(())
1898    }
1899}