#![warn(missing_docs)]
pub use libduckdb_sys as ffi;
use std::{
cell::RefCell,
convert,
ffi::CString,
fmt,
path::{Path, PathBuf},
result, str,
};
use crate::{cache::StatementCache, inner_connection::InnerConnection, raw_statement::RawStatement, types::ValueRef};
#[cfg(feature = "r2d2")]
pub use crate::r2d2::DuckdbConnectionManager;
pub use crate::{
appender::Appender,
appender_params::{AppenderParams, AppenderParamsFromIter, appender_params_from_iter},
arrow_batch::{Arrow, ArrowStream},
cache::CachedStatement,
column::Column,
config::{AccessMode, Config, DefaultNullOrder, DefaultOrder},
error::Error,
ffi::ErrorCode,
inner_connection::InterruptHandle,
params::{Params, ParamsFromIter, params_from_iter},
row::{AndThenRows, Map, MappedRows, Row, RowIndex, Rows},
statement::Statement,
transaction::{DropBehavior, Transaction},
types::ToSql,
};
#[cfg(feature = "polars")]
pub use polars_dataframe::Polars;
pub use arrow;
#[cfg(feature = "loadable-extension")]
pub use duckdb_loadable_macros::duckdb_entrypoint_c_api;
#[cfg(feature = "polars")]
pub use polars;
pub mod core;
#[macro_use]
mod error;
mod appender;
mod appender_params;
mod arrow_batch;
mod cache;
mod column;
mod config;
mod inner_connection;
mod params;
#[cfg(feature = "polars")]
mod polars_dataframe;
mod pragma;
#[cfg(feature = "r2d2")]
mod r2d2;
mod raw_statement;
mod row;
mod statement;
mod transaction;
#[cfg(any(
feature = "autocomplete",
feature = "icu",
feature = "json",
feature = "parquet",
feature = "tpcds",
feature = "tpch"
))]
mod extension;
pub mod profiling;
pub mod types;
#[cfg(feature = "vtab")]
pub mod vtab;
#[cfg(feature = "vscalar")]
pub mod vscalar;
#[cfg(test)]
mod test_all_types;
const STATEMENT_CACHE_DEFAULT_CAPACITY: usize = 16;
#[macro_export]
macro_rules! params {
() => {
&[] as &[&dyn $crate::ToSql]
};
($($param:expr),+ $(,)?) => {
&[$(&$param as &dyn $crate::ToSql),+] as &[&dyn $crate::ToSql]
};
}
pub type Result<T, E = Error> = result::Result<T, E>;
pub trait OptionalExt<T> {
fn optional(self) -> Result<Option<T>>;
}
impl<T> OptionalExt<T> for Result<T> {
fn optional(self) -> Result<Option<T>> {
match self {
Ok(value) => Ok(Some(value)),
Err(Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(e),
}
}
}
#[derive(Copy, Clone, Debug)]
pub enum DatabaseName<'a> {
Main,
Temp,
Attached(&'a str),
}
#[allow(clippy::needless_lifetimes)]
impl<'a> fmt::Display for DatabaseName<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
DatabaseName::Main => write!(f, "main"),
DatabaseName::Temp => write!(f, "temp"),
DatabaseName::Attached(s) => write!(f, "{s}"),
}
}
}
pub const MAIN_DB: DatabaseName<'static> = DatabaseName::Main;
pub const TEMP_DB: DatabaseName<'static> = DatabaseName::Temp;
pub struct Connection {
db: RefCell<InnerConnection>,
cache: StatementCache,
path: Option<PathBuf>,
}
unsafe impl Send for Connection {}
impl Connection {
#[inline]
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
Self::open_with_flags(path, Config::default())
}
#[inline]
pub fn open_in_memory() -> Result<Self> {
Self::open_in_memory_with_flags(Config::default())
}
#[inline]
pub unsafe fn open_from_raw(raw: ffi::duckdb_database) -> Result<Self> {
unsafe { InnerConnection::new_from_raw_db(raw, false) }.map(|db| Self {
db: RefCell::new(db),
cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
path: None, })
}
#[inline]
pub fn open_with_flags<P: AsRef<Path>>(path: P, config: Config) -> Result<Self> {
#[cfg(unix)]
fn path_to_cstring(p: &Path) -> Result<CString> {
use std::os::unix::ffi::OsStrExt;
Ok(CString::new(p.as_os_str().as_bytes())?)
}
#[cfg(not(unix))]
fn path_to_cstring(p: &Path) -> Result<CString> {
let s = p.to_str().ok_or_else(|| Error::InvalidPath(p.to_owned()))?;
Ok(CString::new(s)?)
}
let c_path = path_to_cstring(path.as_ref())?;
let config = config.with("duckdb_api", "rust").unwrap();
InnerConnection::open_with_flags(&c_path, config).map(|db| Self {
db: RefCell::new(db),
cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
path: Some(path.as_ref().to_path_buf()),
})
}
#[inline]
pub fn open_in_memory_with_flags(config: Config) -> Result<Self> {
Self::open_with_flags(":memory:", config)
}
pub fn execute_batch(&self, sql: &str) -> Result<()> {
self.db.borrow_mut().execute(sql)
}
#[inline]
pub fn execute<P: Params>(&self, sql: &str, params: P) -> Result<usize> {
self.prepare(sql).and_then(|mut stmt| stmt.execute(params))
}
#[inline]
pub fn path(&self) -> Option<&Path> {
self.path.as_deref()
}
#[inline]
pub fn query_row<T, P, F>(&self, sql: &str, params: P, f: F) -> Result<T>
where
P: Params,
F: FnOnce(&Row<'_>) -> Result<T>,
{
self.prepare(sql)?.query_row(params, f)
}
#[inline]
pub fn query_row_and_then<T, E, P, F>(&self, sql: &str, params: P, f: F) -> Result<T, E>
where
P: Params,
F: FnOnce(&Row<'_>) -> Result<T, E>,
E: convert::From<Error>,
{
self.prepare(sql)?
.query(params)?
.get_expected_row()
.map_err(E::from)
.and_then(f)
}
#[inline]
pub fn prepare(&self, sql: &str) -> Result<Statement<'_>> {
self.db.borrow_mut().prepare(self, sql)
}
pub fn appender(&self, table: &str) -> Result<Appender<'_>> {
self.appender_to_db(table, &DatabaseName::Main.to_string())
}
pub fn appender_to_db(&self, table: &str, schema: &str) -> Result<Appender<'_>> {
self.db.borrow_mut().appender(self, table, schema)
}
pub fn appender_to_catalog_and_db(&self, table: &str, catalog: &str, schema: &str) -> Result<Appender<'_>> {
self.db
.borrow_mut()
.appender_to_catalog_and_db(self, table, catalog, schema)
}
pub fn appender_with_columns(&self, table: &str, columns: &[&str]) -> Result<Appender<'_>> {
self.appender_with_columns_to_db(table, &DatabaseName::Main.to_string(), columns)
}
pub fn appender_with_columns_to_db(&self, table: &str, schema: &str, columns: &[&str]) -> Result<Appender<'_>> {
self.db
.borrow_mut()
.appender_with_columns(self, table, schema, None, columns)
}
pub fn appender_with_columns_to_catalog_and_db(
&self,
table: &str,
catalog: &str,
schema: &str,
columns: &[&str],
) -> Result<Appender<'_>> {
self.db
.borrow_mut()
.appender_with_columns(self, table, schema, Some(catalog), columns)
}
pub fn interrupt_handle(&self) -> std::sync::Arc<InterruptHandle> {
self.db.borrow().get_interrupt_handle()
}
#[inline]
#[allow(clippy::result_large_err)]
pub fn close(self) -> Result<(), (Self, Error)> {
let r = self.db.borrow_mut().close();
r.map_err(move |err| (self, err))
}
#[inline]
pub fn is_autocommit(&self) -> bool {
self.db.borrow().is_autocommit()
}
pub fn try_clone(&self) -> Result<Self> {
let inner = self.db.borrow().try_clone()?;
Ok(Self {
db: RefCell::new(inner),
cache: StatementCache::with_capacity(STATEMENT_CACHE_DEFAULT_CAPACITY),
path: self.path.clone(),
})
}
pub fn version(&self) -> Result<String> {
self.query_row("PRAGMA version", [], |row| row.get(0))
}
}
impl fmt::Debug for Connection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connection").field("path", &self.path).finish()
}
}
#[cfg(doctest)]
doc_comment::doctest!("../../../README.md");
#[cfg(test)]
mod test {
use crate::types::Value;
use super::*;
use std::{error::Error as StdError, fmt};
use arrow::{array::Int32Array, datatypes::DataType, record_batch::RecordBatch};
use fallible_iterator::FallibleIterator;
#[allow(dead_code, unconditional_recursion, clippy::extra_unused_type_parameters)]
fn ensure_send<T: Send>() {
ensure_send::<Connection>();
}
pub fn checked_memory_handle() -> Connection {
Connection::open_in_memory().unwrap()
}
#[test]
fn test_params_of_vary_types() -> Result<()> {
let db = checked_memory_handle();
let sql = "BEGIN;
CREATE TABLE foo(bar TEXT, qux INTEGER);
INSERT INTO foo VALUES ('baz', 1), ('baz', 2), ('baz', 3);
END;";
db.execute_batch(sql)?;
let changed = db.execute("UPDATE foo SET qux = ? WHERE bar = ?", params![1i32, &"baz"])?;
assert_eq!(changed, 3);
Ok(())
}
#[test]
#[cfg_attr(windows, ignore = "Windows doesn't allow concurrent writes to a file")]
fn test_concurrent_transactions_busy_commit() -> Result<()> {
let tmp = tempfile::tempdir().unwrap();
let path = tmp.path().join("transactions.db3");
Connection::open(&path)?.execute_batch(
"
BEGIN;
CREATE TABLE foo(x INTEGER);
INSERT INTO foo VALUES(42);
END;",
)?;
let mut db1 =
Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
let mut db2 =
Connection::open_with_flags(&path, Config::default().access_mode(config::AccessMode::ReadWrite)?)?;
{
let tx1 = db1.transaction()?;
let tx2 = db2.transaction()?;
tx1.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
tx2.query_row("SELECT x FROM foo LIMIT 1", [], |_| Ok(()))?;
tx1.execute("INSERT INTO foo VALUES(?1)", [1])?;
let _ = tx2.execute("INSERT INTO foo VALUES(?1)", [2]);
let _ = tx1.commit();
let _ = tx2.commit();
}
let _ = db1.transaction().expect("commit should have closed transaction");
let _ = db2.transaction().expect("commit should have closed transaction");
Ok(())
}
#[test]
fn test_persistence() -> Result<()> {
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path().join("test.db3");
{
let db = Connection::open(&path)?;
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER);
INSERT INTO foo VALUES(42);
END;";
db.execute_batch(sql)?;
}
let path_string = path.to_str().unwrap();
let db = Connection::open(path_string)?;
let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
assert_eq!(42i64, the_answer?);
Ok(())
}
#[test]
fn test_open() {
let con = Connection::open_in_memory();
if let Err(e) = con {
panic!("open error {e}");
}
assert!(Connection::open_in_memory().is_ok());
let db = checked_memory_handle();
assert!(db.close().is_ok());
let _ = checked_memory_handle();
let _ = checked_memory_handle();
}
#[test]
fn test_open_from_raw() {
unsafe {
use std::{ffi::c_void, os::raw::c_char, ptr};
let mut db: ffi::duckdb_database = ptr::null_mut();
let mut c_err: *mut c_char = ptr::null_mut();
let r = ffi::duckdb_open_ext(
c":memory:".as_ptr(),
&mut db,
Config::default().duckdb_config(),
&mut c_err,
);
if r != ffi::DuckDBSuccess {
if !c_err.is_null() {
ffi::duckdb_free(c_err as *mut c_void);
}
panic!("duckdb_open_ext failed: {r:?}");
}
let conn = Connection::open_from_raw(db).unwrap();
conn.execute_batch("SELECT 1").unwrap();
let cloned = conn.try_clone().unwrap();
drop(conn);
cloned.execute_batch("SELECT 2").unwrap();
cloned.close().unwrap();
ffi::duckdb_close(&mut db);
}
}
#[test]
fn test_open_failure() -> Result<()> {
let filename = "no_such_file.db";
let result =
Connection::open_with_flags(filename, Config::default().access_mode(config::AccessMode::ReadOnly)?);
assert!(result.is_err());
let err = result.err().unwrap();
if let Error::DuckDBFailure(_e, Some(msg)) = err {
assert!(
msg.contains(filename),
"error message '{msg}' does not contain '{filename}'"
);
} else {
panic!("DuckDBFailure expected");
}
Ok(())
}
#[cfg(unix)]
#[test]
fn test_invalid_unicode_file_names() -> Result<()> {
use std::{ffi::OsStr, fs::File, os::unix::ffi::OsStrExt};
let temp_dir = tempfile::tempdir().unwrap();
let path = temp_dir.path();
if File::create(path.join(OsStr::from_bytes(&[0xFE]))).is_err() {
return Ok(());
}
let db_path = path.join(OsStr::from_bytes(&[0xFF]));
{
let db = Connection::open(&db_path)?;
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER);
INSERT INTO foo VALUES(42);
END;";
db.execute_batch(sql)?;
}
let db = Connection::open(&db_path)?;
let the_answer: Result<i64> = db.query_row("SELECT x FROM foo", [], |r| r.get(0));
assert_eq!(42i64, the_answer?);
Ok(())
}
#[test]
fn test_close_always_ok() -> Result<()> {
let db = checked_memory_handle();
db.close().unwrap();
Ok(())
}
#[test]
fn test_execute_batch() -> Result<()> {
let db = checked_memory_handle();
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER);
INSERT INTO foo VALUES(1);
INSERT INTO foo VALUES(2);
INSERT INTO foo VALUES(3);
INSERT INTO foo VALUES(4);
END;";
db.execute_batch(sql)?;
db.execute_batch("UPDATE foo SET x = 3 WHERE x < 3")?;
assert!(db.execute_batch("INVALID SQL").is_err());
Ok(())
}
#[test]
fn test_execute_single() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE foo(x INTEGER)")?;
assert_eq!(
3,
db.execute("INSERT INTO foo(x) VALUES (?), (?), (?)", [1i32, 2i32, 3i32])?
);
assert_eq!(1, db.execute("INSERT INTO foo(x) VALUES (?)", [4i32])?);
assert_eq!(
10i32,
db.query_row::<i32, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
);
Ok(())
}
#[test]
fn test_prepare_column_names() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
let mut stmt = db.prepare("SELECT * FROM foo")?;
stmt.execute([])?;
assert_eq!(stmt.column_count(), 1);
assert_eq!(stmt.column_names(), vec!["x"]);
let mut stmt = db.prepare("SELECT x AS a, x AS b FROM foo")?;
stmt.execute([])?;
assert_eq!(stmt.column_count(), 2);
assert_eq!(stmt.column_names(), vec!["a", "b"]);
Ok(())
}
#[test]
fn test_prepare_execute() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
assert_eq!(insert_stmt.execute([1i32])?, 1);
assert_eq!(insert_stmt.execute([2i32])?, 1);
assert_eq!(insert_stmt.execute([3i32])?, 1);
assert!(insert_stmt.execute(["hello"]).is_err());
let mut update_stmt = db.prepare("UPDATE foo SET x=? WHERE x<?")?;
assert_eq!(update_stmt.execute([3i32, 3i32])?, 2);
assert_eq!(update_stmt.execute([3i32, 3i32])?, 0);
assert_eq!(update_stmt.execute([8i32, 8i32])?, 3);
Ok(())
}
#[test]
fn test_prepare_query() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
let mut insert_stmt = db.prepare("INSERT INTO foo(x) VALUES(?)")?;
assert_eq!(insert_stmt.execute([1i32])?, 1);
assert_eq!(insert_stmt.execute([2i32])?, 1);
assert_eq!(insert_stmt.execute([3i32])?, 1);
let mut query = db.prepare("SELECT x FROM foo WHERE x < ? ORDER BY x DESC")?;
{
let mut rows = query.query([4i32])?;
let mut v = Vec::<i32>::new();
while let Some(row) = rows.next()? {
v.push(row.get(0)?);
}
assert_eq!(v, [3i32, 2, 1]);
}
{
let mut rows = query.query([3i32])?;
let mut v = Vec::<i32>::new();
while let Some(row) = rows.next()? {
v.push(row.get(0)?);
}
assert_eq!(v, [2i32, 1]);
}
Ok(())
}
#[test]
fn test_query_map() -> Result<()> {
let db = checked_memory_handle();
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER, y TEXT);
INSERT INTO foo VALUES(4, 'hello');
INSERT INTO foo VALUES(3, ', ');
INSERT INTO foo VALUES(2, 'world');
INSERT INTO foo VALUES(1, '!');
END;";
db.execute_batch(sql)?;
let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
let results: Result<Vec<String>> = query.query([])?.map(|row| row.get(1)).collect();
assert_eq!(results?.concat(), "hello, world!");
Ok(())
}
#[test]
fn test_query_row() -> Result<()> {
let db = checked_memory_handle();
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER);
INSERT INTO foo VALUES(1);
INSERT INTO foo VALUES(2);
INSERT INTO foo VALUES(3);
INSERT INTO foo VALUES(4);
END;";
db.execute_batch(sql)?;
assert_eq!(
10i64,
db.query_row::<i64, _, _>("SELECT SUM(x) FROM foo", [], |r| r.get(0))?
);
let result: Result<i64> = db.query_row("SELECT x FROM foo WHERE x > 5", [], |r| r.get(0));
match result.unwrap_err() {
Error::QueryReturnedNoRows => (),
err => panic!("Unexpected error {err}"),
}
let bad_query_result = db.query_row("NOT A PROPER QUERY; test123", [], |_| Ok(()));
assert!(bad_query_result.is_err());
Ok(())
}
#[test]
fn test_optional() -> Result<()> {
let db = checked_memory_handle();
let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 <> 0", [], |r| r.get(0));
let result = result.optional();
match result? {
None => (),
_ => panic!("Unexpected result"),
}
let result: Result<i64> = db.query_row("SELECT 1 WHERE 0 == 0", [], |r| r.get(0));
let result = result.optional();
match result? {
Some(1) => (),
_ => panic!("Unexpected result"),
}
let bad_query_result: Result<i64> = db.query_row("NOT A PROPER QUERY", [], |r| r.get(0));
let bad_query_result = bad_query_result.optional();
assert!(bad_query_result.is_err());
Ok(())
}
#[test]
fn test_prepare_failures() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
let _ = db.prepare("SELECT * FROM does_not_exist").unwrap_err();
Ok(())
}
#[test]
fn test_is_autocommit() {
let db = checked_memory_handle();
assert!(db.is_autocommit(), "autocommit expected to be active by default");
}
#[test]
#[should_panic(expected = "not supported")]
fn test_statement_debugging() {
let db = checked_memory_handle();
let query = "SELECT 12345";
let stmt = db.prepare(query).unwrap();
assert!(format!("{stmt:?}").contains(query));
}
#[test]
fn test_notnull_constraint_error() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE foo(x TEXT NOT NULL)")?;
let result = db.execute("INSERT INTO foo (x) VALUES (NULL)", []);
assert!(result.is_err());
match result.unwrap_err() {
Error::DuckDBFailure(err, _) => {
assert_eq!(err.code, ErrorCode::Unknown);
}
err => panic!("Unexpected error {err}"),
}
Ok(())
}
#[test]
fn test_clone() -> Result<()> {
{
let owned_con = checked_memory_handle();
{
let cloned_con = owned_con.try_clone().unwrap();
cloned_con.execute_batch("create table test (c1 bigint)")?;
cloned_con.close().unwrap();
}
owned_con.execute_batch("create table test2 (c1 bigint)")?;
owned_con.close().unwrap();
}
{
let cloned_con = {
let owned_con = checked_memory_handle();
let clone = owned_con.try_clone().unwrap();
owned_con.execute_batch("create table test (c1 bigint)")?;
owned_con.close().unwrap();
clone
};
cloned_con.execute_batch("create table test2 (c1 bigint)")?;
cloned_con.close().unwrap();
}
Ok(())
}
#[test]
fn test_try_clone_after_owner_drop() -> Result<()> {
let clone1 = {
let owned = checked_memory_handle();
let clone1 = owned.try_clone()?;
drop(owned);
clone1
};
let clone2 = clone1.try_clone()?;
clone2.execute_batch("CREATE TABLE t312(i INTEGER); INSERT INTO t312 VALUES (1);")?;
let value: i32 = clone1.query_row("SELECT i FROM t312", [], |r| r.get(0))?;
assert_eq!(value, 1);
Ok(())
}
mod query_and_then_tests {
use super::*;
#[derive(Debug)]
enum CustomError {
SomeError,
Sqlite(Error),
}
impl fmt::Display for CustomError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match *self {
Self::SomeError => write!(f, "my custom error"),
Self::Sqlite(ref se) => write!(f, "my custom error: {se}"),
}
}
}
impl StdError for CustomError {
fn description(&self) -> &str {
"my custom error"
}
fn cause(&self) -> Option<&dyn StdError> {
match *self {
Self::SomeError => None,
Self::Sqlite(ref se) => Some(se),
}
}
}
impl From<Error> for CustomError {
fn from(se: Error) -> Self {
Self::Sqlite(se)
}
}
type CustomResult<T> = Result<T, CustomError>;
#[test]
fn test_query_and_then() -> Result<()> {
let db = checked_memory_handle();
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER, y TEXT);
INSERT INTO foo VALUES(4, 'hello');
INSERT INTO foo VALUES(3, ', ');
INSERT INTO foo VALUES(2, 'world');
INSERT INTO foo VALUES(1, '!');
END;";
db.execute_batch(sql)?;
let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
let results: Result<Vec<String>> = query.query_and_then([], |row| row.get(1))?.collect();
assert_eq!(results?.concat(), "hello, world!");
Ok(())
}
#[test]
fn test_query_and_then_fails() -> Result<()> {
let db = checked_memory_handle();
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER, y TEXT);
INSERT INTO foo VALUES(4, 'hello');
INSERT INTO foo VALUES(3, ', ');
INSERT INTO foo VALUES(2, 'world');
INSERT INTO foo VALUES(1, '!');
END;";
db.execute_batch(sql)?;
let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
let bad_type: Result<Vec<f64>> = query.query_and_then([], |row| row.get(1))?.collect();
match bad_type.unwrap_err() {
Error::InvalidColumnType(..) => (),
err => panic!("Unexpected error {err}"),
}
let bad_idx: Result<Vec<String>> = query.query_and_then([], |row| row.get(3))?.collect();
match bad_idx.unwrap_err() {
Error::InvalidColumnIndex(_) => (),
err => panic!("Unexpected error {err}"),
}
Ok(())
}
#[test]
fn test_query_and_then_custom_error() -> CustomResult<()> {
let db = checked_memory_handle();
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER, y TEXT);
INSERT INTO foo VALUES(4, 'hello');
INSERT INTO foo VALUES(3, ', ');
INSERT INTO foo VALUES(2, 'world');
INSERT INTO foo VALUES(1, '!');
END;";
db.execute_batch(sql)?;
let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
let results: CustomResult<Vec<String>> = query
.query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
.collect();
assert_eq!(results?.concat(), "hello, world!");
Ok(())
}
#[test]
fn test_query_and_then_custom_error_fails() -> Result<()> {
let db = checked_memory_handle();
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER, y TEXT);
INSERT INTO foo VALUES(4, 'hello');
INSERT INTO foo VALUES(3, ', ');
INSERT INTO foo VALUES(2, 'world');
INSERT INTO foo VALUES(1, '!');
END;";
db.execute_batch(sql)?;
let mut query = db.prepare("SELECT x, y FROM foo ORDER BY x DESC")?;
let bad_type: CustomResult<Vec<f64>> = query
.query_and_then([], |row| row.get(1).map_err(CustomError::Sqlite))?
.collect();
match bad_type.unwrap_err() {
CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
err => panic!("Unexpected error {err}"),
}
let bad_idx: CustomResult<Vec<String>> = query
.query_and_then([], |row| row.get(3).map_err(CustomError::Sqlite))?
.collect();
match bad_idx.unwrap_err() {
CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
err => panic!("Unexpected error {err}"),
}
let non_sqlite_err: CustomResult<Vec<String>> =
query.query_and_then([], |_| Err(CustomError::SomeError))?.collect();
match non_sqlite_err.unwrap_err() {
CustomError::SomeError => (),
err => panic!("Unexpected error {err}"),
}
Ok(())
}
#[test]
fn test_query_row_and_then_custom_error() -> CustomResult<()> {
let db = checked_memory_handle();
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER, y TEXT);
INSERT INTO foo VALUES(4, 'hello');
END;";
db.execute_batch(sql)?;
let query = "SELECT x, y FROM foo ORDER BY x DESC";
let results: CustomResult<String> =
db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
assert_eq!(results?, "hello");
Ok(())
}
#[test]
fn test_query_row_and_then_custom_error_fails() -> Result<()> {
let db = checked_memory_handle();
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER, y TEXT);
INSERT INTO foo VALUES(4, 'hello');
END;";
db.execute_batch(sql)?;
let query = "SELECT x, y FROM foo ORDER BY x DESC";
let bad_type: CustomResult<f64> =
db.query_row_and_then(query, [], |row| row.get(1).map_err(CustomError::Sqlite));
match bad_type.unwrap_err() {
CustomError::Sqlite(Error::InvalidColumnType(..)) => (),
err => panic!("Unexpected error {err}"),
}
let bad_idx: CustomResult<String> =
db.query_row_and_then(query, [], |row| row.get(3).map_err(CustomError::Sqlite));
match bad_idx.unwrap_err() {
CustomError::Sqlite(Error::InvalidColumnIndex(_)) => (),
err => panic!("Unexpected error {err}"),
}
let non_sqlite_err: CustomResult<String> =
db.query_row_and_then(query, [], |_| Err(CustomError::SomeError));
match non_sqlite_err.unwrap_err() {
CustomError::SomeError => (),
err => panic!("Unexpected error {err}"),
}
Ok(())
}
#[test]
fn test_rows_and_then_with_custom_error() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE test (value INTEGER)")?;
db.execute_batch("INSERT INTO test VALUES (1), (3), (5)")?;
let mut stmt = db.prepare("SELECT value FROM test ORDER BY value")?;
let rows = stmt.query([])?;
let results: Vec<i32> = rows
.and_then(|row| -> CustomResult<i32> {
let val: i32 = row.get(0)?; if val > 10 {
Err(CustomError::SomeError) } else {
Ok(val)
}
})
.collect::<CustomResult<Vec<_>>>()
.unwrap();
assert_eq!(results, vec![1, 3, 5]);
Ok(())
}
}
#[test]
fn test_dynamic() -> Result<()> {
let db = checked_memory_handle();
let sql = "BEGIN;
CREATE TABLE foo(x INTEGER, y TEXT);
INSERT INTO foo VALUES(4, 'hello');
END;";
db.execute_batch(sql)?;
db.query_row("SELECT * FROM foo", [], |r| {
assert_eq!(2, r.as_ref().column_count());
Ok(())
})
}
#[test]
fn test_dyn_box() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE foo(x INTEGER);")?;
let b: Box<dyn ToSql> = Box::new(5);
db.execute("INSERT INTO foo VALUES(?)", [b])?;
db.query_row("SELECT x FROM foo", [], |r| {
assert_eq!(5, r.get_unwrap::<_, i32>(0));
Ok(())
})
}
#[test]
fn test_alter_table() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE x(t INTEGER);")?;
db.execute("ALTER TABLE x RENAME TO y;", [])?;
Ok(())
}
#[test]
fn test_query_arrow_record_batch_small() -> Result<()> {
let db = checked_memory_handle();
let sql = "BEGIN TRANSACTION;
CREATE TABLE test(t INTEGER);
INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);
END TRANSACTION;";
db.execute_batch(sql)?;
let mut stmt = db.prepare("select t from test order by t desc")?;
let mut arr = stmt.query_arrow([])?;
let schema = arr.get_schema();
assert_eq!(schema.fields().len(), 1);
assert_eq!(schema.field(0).name(), "t");
assert_eq!(schema.field(0).data_type(), &DataType::Int32);
let rb = arr.next().unwrap();
let column = rb.column(0).as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(column.len(), 5);
assert_eq!(column.value(0), 5);
assert_eq!(column.value(1), 4);
assert_eq!(column.value(2), 3);
assert_eq!(column.value(3), 2);
assert_eq!(column.value(4), 1);
assert!(arr.next().is_none());
Ok(())
}
#[test]
fn test_query_arrow_record_batch_large() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("BEGIN TRANSACTION")?;
db.execute_batch("CREATE TABLE test(t INTEGER);")?;
for _ in 0..600 {
db.execute_batch("INSERT INTO test VALUES (1); INSERT INTO test VALUES (2); INSERT INTO test VALUES (3); INSERT INTO test VALUES (4); INSERT INTO test VALUES (5);")?;
}
db.execute_batch("END TRANSACTION")?;
let rbs: Vec<RecordBatch> = db.prepare("select t from test order by t")?.query_arrow([])?.collect();
assert_eq!(rbs.iter().map(|rb| rb.num_rows()).sum::<usize>(), 3000);
assert_eq!(
rbs.iter()
.map(|rb| rb
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.iter()
.map(|i| i.unwrap())
.sum::<i32>())
.sum::<i32>(),
9000
);
Ok(())
}
#[test]
fn test_stream_arrow_with_call() -> Result<()> {
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;
let db = checked_memory_handle();
db.execute_batch(
"CREATE TABLE test_data(id INTEGER, name VARCHAR);
INSERT INTO test_data VALUES (1, 'Alice'), (2, 'Bob'), (3, 'Charlie');",
)?;
db.execute_batch("CREATE MACRO test_func() AS TABLE SELECT * FROM test_data;")?;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, true),
Field::new("name", DataType::Utf8, true),
]));
let mut stmt = db.prepare("CALL test_func()")?;
let rbs: Vec<RecordBatch> = stmt.stream_arrow([], schema)?.collect();
assert!(!rbs.is_empty(), "Expected at least one record batch");
let total_rows: usize = rbs.iter().map(|rb| rb.num_rows()).sum();
assert_eq!(total_rows, 3);
let id_column = rbs[0].column(0).as_any().downcast_ref::<Int32Array>().unwrap();
assert_eq!(id_column.value(0), 1);
Ok(())
}
#[test]
fn round_trip_interval() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE foo (t INTERVAL);")?;
let d = Value::Interval {
months: 1,
days: 2,
nanos: 3,
};
db.execute("INSERT INTO foo VALUES (?)", [d])?;
let mut stmt = db.prepare("SELECT t FROM foo")?;
let mut rows = stmt.query([])?;
let row = rows.next()?.unwrap();
let d: Value = row.get_unwrap(0);
assert_eq!(d, d);
Ok(())
}
#[test]
fn test_database_name_to_string() -> Result<()> {
assert_eq!(DatabaseName::Main.to_string(), "main");
assert_eq!(DatabaseName::Temp.to_string(), "temp");
assert_eq!(DatabaseName::Attached("abc").to_string(), "abc");
Ok(())
}
#[test]
fn test_interrupt() -> Result<()> {
let db = checked_memory_handle();
let db_interrupt = db.interrupt_handle();
let (tx, rx) = std::sync::mpsc::channel();
std::thread::spawn(move || {
let mut stmt = db
.prepare("select count(*) from range(10000000) t1, range(1000000) t2")
.unwrap();
tx.send(stmt.execute([])).unwrap();
});
std::thread::sleep(std::time::Duration::from_millis(100));
db_interrupt.interrupt();
let result = rx.recv_timeout(std::time::Duration::from_secs(5)).unwrap();
assert!(result.is_err_and(|err| err.to_string().contains("INTERRUPT")));
Ok(())
}
#[test]
fn test_interrupt_on_dropped_db() {
let db = checked_memory_handle();
let db_interrupt = db.interrupt_handle();
drop(db);
db_interrupt.interrupt();
}
#[test]
fn test_arrow_string_view_setting() -> Result<()> {
{
let config = Config::default().with("produce_arrow_string_view", "true")?;
let conn = Connection::open_in_memory_with_flags(config)?;
let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
let arrow = query.query_arrow([])?;
let batch = arrow.into_iter().next().expect("Expected at least one batch");
assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8);
}
{
let config = Config::default()
.with("produce_arrow_string_view", "true")?
.with("arrow_output_version", "1.4")?;
let conn = Connection::open_in_memory_with_flags(config)?;
let mut query = conn.prepare("SELECT 'test'::varchar AS str")?;
let arrow = query.query_arrow([])?;
let batch = arrow.into_iter().next().expect("Expected at least one batch");
assert_eq!(batch.schema().field(0).data_type(), &DataType::Utf8View);
}
Ok(())
}
#[test]
fn test_prepare_multi_statement() -> Result<()> {
let db = checked_memory_handle();
{
let mut stmt =
db.prepare("CREATE TABLE test(x INTEGER); INSERT INTO test VALUES (42); SELECT x FROM test;")?;
let result: i32 = stmt.query_row([], |row| row.get(0))?;
assert_eq!(result, 42);
}
{
let mut stmt = db.prepare(
"CREATE TEMP TABLE temp_data(id INTEGER, value TEXT);
INSERT INTO temp_data VALUES (1, 'first'), (2, 'second');
SELECT COUNT(*) FROM temp_data;",
)?;
let count: i32 = stmt.query_row([], |row| row.get(0))?;
assert_eq!(count, 2);
}
Ok(())
}
#[test]
fn test_pivot_query() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch(
"CREATE TABLE cities(city VARCHAR, year INTEGER, population INTEGER);
INSERT INTO cities VALUES
('Amsterdam', 2000, 1005),
('Amsterdam', 2010, 1065),
('Amsterdam', 2020, 1158),
('Berlin', 2000, 3382),
('Berlin', 2010, 3460),
('Berlin', 2020, 3576);",
)?;
let mut stmt = db.prepare("PIVOT cities ON year USING sum(population);")?;
let mut rows = stmt.query([])?;
let mut row_count = 0;
while let Some(_row) = rows.next()? {
row_count += 1;
}
assert_eq!(row_count, 2);
Ok(())
}
#[test]
fn test_multiple_memory_databases() -> Result<()> {
{
let mem1 = Connection::open_in_memory()?;
let mem2 = Connection::open_in_memory()?;
mem1.execute_batch("CREATE TABLE test (id INTEGER)")?;
mem1.execute("INSERT INTO test VALUES (1)", [])?;
mem2.execute_batch("CREATE TABLE test (id INTEGER)")?;
mem2.execute("INSERT INTO test VALUES (2)", [])?;
let value1: i32 = mem1.query_row("SELECT id FROM test", [], |r| r.get(0))?;
assert_eq!(value1, 1);
let value2: i32 = mem2.query_row("SELECT id FROM test", [], |r| r.get(0))?;
assert_eq!(value2, 2);
}
{
let shared = Connection::open_in_memory()?;
shared.execute_batch("CREATE TABLE shared_table (id INTEGER)")?;
shared.execute("INSERT INTO shared_table VALUES (123)", [])?;
let cloned = shared.try_clone()?;
let value: i32 = cloned.query_row("SELECT id FROM shared_table", [], |r| r.get(0))?;
assert_eq!(value, 123);
cloned.execute("INSERT INTO shared_table VALUES (456)", [])?;
let count: i64 = shared.query_row("SELECT COUNT(*) FROM shared_table", [], |r| r.get(0))?;
assert_eq!(count, 2);
}
Ok(())
}
#[test]
fn test_appender_with_catalog() -> Result<()> {
let db = checked_memory_handle();
let temp_dir = tempfile::tempdir().unwrap();
let attached_path = temp_dir.path().join("attached.db");
db.execute_batch(&format!("ATTACH '{}' AS attached_db", attached_path.display()))?;
db.execute_batch("CREATE TABLE attached_db.main.test_table (id INTEGER, name TEXT)")?;
{
let mut app = db.appender_to_catalog_and_db("test_table", "attached_db", "main")?;
app.append_row(params![1, "Alice"])?;
app.append_row(params![2, "Bob"])?;
app.append_row(params![3, "Charlie"])?;
}
let count: i64 = db.query_row("SELECT COUNT(*) FROM attached_db.main.test_table", [], |r| r.get(0))?;
assert_eq!(count, 3);
let name: String = db.query_row("SELECT name FROM attached_db.main.test_table WHERE id = ?", [2], |r| {
r.get(0)
})?;
assert_eq!(name, "Bob");
Ok(())
}
#[test]
fn test_appender_with_catalog_multiple_schemas() -> Result<()> {
let db = checked_memory_handle();
let temp_dir = tempfile::tempdir().unwrap();
let attached_path = temp_dir.path().join("multi_schema.db");
db.execute_batch(&format!("ATTACH '{}' AS my_catalog", attached_path.display()))?;
db.execute_batch("CREATE SCHEMA my_catalog.schema1")?;
db.execute_batch("CREATE SCHEMA my_catalog.schema2")?;
db.execute_batch("CREATE TABLE my_catalog.schema1.data (value INTEGER)")?;
db.execute_batch("CREATE TABLE my_catalog.schema2.data (value INTEGER)")?;
{
let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema1")?;
app.append_rows([[10], [20], [30]])?;
}
{
let mut app = db.appender_to_catalog_and_db("data", "my_catalog", "schema2")?;
app.append_rows([[100], [200]])?;
}
let sum1: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema1.data", [], |r| r.get(0))?;
assert_eq!(sum1, 60);
let sum2: i64 = db.query_row("SELECT SUM(value) FROM my_catalog.schema2.data", [], |r| r.get(0))?;
assert_eq!(sum2, 300);
Ok(())
}
#[test]
fn test_appender_with_catalog_main_vs_attached() -> Result<()> {
let db = checked_memory_handle();
db.execute_batch("CREATE TABLE test (id INTEGER)")?;
let temp_dir = tempfile::tempdir().unwrap();
let attached_path = temp_dir.path().join("other.db");
db.execute_batch(&format!("ATTACH '{}' AS other_db", attached_path.display()))?;
db.execute_batch("CREATE TABLE other_db.main.test (id INTEGER)")?;
{
let mut app = db.appender_to_catalog_and_db("test", "memory", "main")?;
app.append_rows([[1], [2]])?;
}
{
let mut app = db.appender_to_catalog_and_db("test", "other_db", "main")?;
app.append_rows([[100], [200]])?;
}
let count_main: i64 = db.query_row("SELECT COUNT(*) FROM test", [], |r| r.get(0))?;
assert_eq!(count_main, 2);
let count_attached: i64 = db.query_row("SELECT COUNT(*) FROM other_db.main.test", [], |r| r.get(0))?;
assert_eq!(count_attached, 2);
Ok(())
}
#[test]
fn test_appender_with_catalog_error_invalid_catalog() -> Result<()> {
let db = checked_memory_handle();
let result = db.appender_to_catalog_and_db("test", "nonexistent_catalog", "main");
assert!(result.is_err());
Ok(())
}
#[test]
fn test_appender_with_catalog_error_invalid_schema() -> Result<()> {
let db = checked_memory_handle();
let temp_dir = tempfile::tempdir().unwrap();
let attached_path = temp_dir.path().join("test.db");
db.execute_batch(&format!("ATTACH '{}' AS my_db", attached_path.display()))?;
db.execute_batch("CREATE TABLE my_db.main.test (id INTEGER)")?;
let result = db.appender_to_catalog_and_db("test", "my_db", "nonexistent_schema");
assert!(result.is_err());
Ok(())
}
#[test]
fn test_appender_with_catalog_flush() -> Result<()> {
let db = checked_memory_handle();
let temp_dir = tempfile::tempdir().unwrap();
let attached_path = temp_dir.path().join("flush_test.db");
db.execute_batch(&format!("ATTACH '{}' AS flush_db", attached_path.display()))?;
db.execute_batch("CREATE TABLE flush_db.main.test (id INTEGER)")?;
{
let mut app = db.appender_to_catalog_and_db("test", "flush_db", "main")?;
app.append_row([1])?;
app.append_row([2])?;
app.flush()?;
app.append_row([3])?;
app.flush()?;
}
let count: i64 = db.query_row("SELECT COUNT(*) FROM flush_db.main.test", [], |r| r.get(0))?;
assert_eq!(count, 3);
Ok(())
}
#[test]
fn test_enum_read() -> Result<()> {
let conn = Connection::open_in_memory()?;
conn.execute_batch(
r#"
CREATE TABLE stats (
name ENUM('CA', 'NY'),
value INTEGER,
);
INSERT INTO stats VALUES ('CA', 10), ('CA', 20), ('NY', 4);
"#,
)?;
let mut stmt = conn.prepare("SELECT * FROM stats")?;
let results: Vec<(String, i32)> = stmt
.query_map([], |row| {
let name: String = row.get(0)?;
let value: i32 = row.get(1)?;
Ok((name, value))
})?
.map(|r| r.unwrap())
.collect();
assert_eq!(results.len(), 3);
assert_eq!(results[0], ("CA".to_string(), 10));
assert_eq!(results[1], ("CA".to_string(), 20));
assert_eq!(results[2], ("NY".to_string(), 4));
Ok(())
}
#[test]
fn test_enum_read_nullable() -> Result<()> {
let conn = Connection::open_in_memory()?;
conn.execute_batch(
r#"
CREATE TABLE stats (name ENUM('CA', 'NY'));
INSERT INTO stats VALUES ('CA'), (NULL), ('NY');
"#,
)?;
let mut stmt = conn.prepare("SELECT name FROM stats")?;
let results: Vec<Option<String>> = stmt.query_map([], |row| row.get(0))?.map(|r| r.unwrap()).collect();
assert_eq!(results, vec![Some("CA".into()), None, Some("NY".into())]);
Ok(())
}
}