use std::{
ffi::{CStr, CString, c_void},
mem,
os::raw::c_char,
ptr, str,
sync::{Arc, Mutex},
};
use super::{Appender, Config, Connection, Result, ffi};
use crate::{
error::{
Error, result_from_duckdb_appender, result_from_duckdb_arrow, result_from_duckdb_extract,
result_from_duckdb_prepare,
},
raw_statement::RawStatement,
statement::Statement,
};
#[derive(Debug)]
struct DatabaseHandle {
db: ffi::duckdb_database,
close_on_drop: bool,
}
unsafe impl Send for DatabaseHandle {}
impl DatabaseHandle {
#[inline]
pub fn new(db: ffi::duckdb_database, close_on_drop: bool) -> Self {
Self { db, close_on_drop }
}
#[inline]
pub fn raw(&self) -> ffi::duckdb_database {
self.db
}
}
impl Drop for DatabaseHandle {
fn drop(&mut self) {
if !self.close_on_drop {
return;
}
if self.db.is_null() {
return;
}
unsafe {
ffi::duckdb_close(&mut self.db);
self.db = ptr::null_mut();
}
}
}
pub struct InnerConnection {
database: Arc<Mutex<DatabaseHandle>>,
pub con: ffi::duckdb_connection,
interrupt: Arc<InterruptHandle>,
}
impl InnerConnection {
#[inline]
unsafe fn new(database: Arc<Mutex<DatabaseHandle>>) -> Result<Self> {
unsafe {
let mut con: ffi::duckdb_connection = ptr::null_mut();
let db_raw = database.lock().expect("database handle mutex poisoned").raw();
let r = ffi::duckdb_connect(db_raw, &mut con);
if r != ffi::DuckDBSuccess {
ffi::duckdb_disconnect(&mut con);
return Err(Error::DuckDBFailure(
ffi::Error::new(r),
Some("connect error".to_owned()),
));
}
let interrupt = Arc::new(InterruptHandle::new(con));
Ok(Self {
database,
con,
interrupt,
})
}
}
pub fn open_with_flags(c_path: &CStr, config: Config) -> Result<Self> {
unsafe {
let mut db: ffi::duckdb_database = ptr::null_mut();
let mut c_err = std::ptr::null_mut();
let r = ffi::duckdb_open_ext(c_path.as_ptr(), &mut db, config.duckdb_config(), &mut c_err);
if r != ffi::DuckDBSuccess {
let msg = Some(CStr::from_ptr(c_err).to_string_lossy().to_string());
ffi::duckdb_free(c_err as *mut c_void);
return Err(Error::DuckDBFailure(ffi::Error::new(r), msg));
}
Self::new_from_raw_db(db, true)
}
}
#[inline]
pub(crate) unsafe fn new_from_raw_db(raw: ffi::duckdb_database, close_on_drop: bool) -> Result<Self> {
unsafe { Self::new(Arc::new(Mutex::new(DatabaseHandle::new(raw, close_on_drop)))) }
}
pub fn close(&mut self) -> Result<()> {
if self.con.is_null() {
return Ok(());
}
unsafe {
ffi::duckdb_disconnect(&mut self.con);
self.con = ptr::null_mut();
self.interrupt.clear();
}
Ok(())
}
pub fn try_clone(&self) -> Result<Self> {
unsafe { Self::new(self.database.clone()) }
}
pub fn execute(&mut self, sql: &str) -> Result<()> {
let c_str = CString::new(sql)?;
unsafe {
let mut out = mem::zeroed();
let r = ffi::duckdb_query_arrow(self.con, c_str.as_ptr() as *const c_char, &mut out);
result_from_duckdb_arrow(r, out)?;
ffi::duckdb_destroy_arrow(&mut out);
Ok(())
}
}
pub fn prepare<'a>(&mut self, conn: &'a Connection, sql: &str) -> Result<Statement<'a>> {
let c_str = CString::new(sql)?;
let mut extracted = ptr::null_mut();
let num_stmts =
unsafe { ffi::duckdb_extract_statements(self.con, c_str.as_ptr() as *const c_char, &mut extracted) };
result_from_duckdb_extract(num_stmts, extracted)?;
let _guard = ExtractedStatementsGuard(extracted);
for i in 0..num_stmts - 1 {
self.execute_extracted_statement(extracted, i)?;
}
let final_stmt = self.prepare_extracted_statement(extracted, num_stmts - 1)?;
Ok(Statement::new(conn, unsafe { RawStatement::new(final_stmt) }))
}
fn prepare_extracted_statement(
&self,
extracted: ffi::duckdb_extracted_statements,
index: ffi::idx_t,
) -> Result<ffi::duckdb_prepared_statement> {
let mut stmt = ptr::null_mut();
let res = unsafe { ffi::duckdb_prepare_extracted_statement(self.con, extracted, index, &mut stmt) };
result_from_duckdb_prepare(res, stmt)?;
Ok(stmt)
}
fn execute_extracted_statement(
&self,
extracted: ffi::duckdb_extracted_statements,
index: ffi::idx_t,
) -> Result<()> {
let mut stmt = self.prepare_extracted_statement(extracted, index)?;
let mut result = unsafe { mem::zeroed() };
let rc = unsafe { ffi::duckdb_execute_prepared(stmt, &mut result) };
let error = if rc != ffi::DuckDBSuccess {
unsafe {
let c_err = ffi::duckdb_result_error(&mut result as *mut _);
let msg = if c_err.is_null() {
None
} else {
Some(CStr::from_ptr(c_err).to_string_lossy().to_string())
};
Some(Error::DuckDBFailure(ffi::Error::new(rc), msg))
}
} else {
None
};
unsafe {
ffi::duckdb_destroy_prepare(&mut stmt);
ffi::duckdb_destroy_result(&mut result);
}
error.map_or(Ok(()), Err)
}
pub fn appender<'a>(&mut self, conn: &'a Connection, table: &str, schema: &str) -> Result<Appender<'a>> {
let mut c_app: ffi::duckdb_appender = ptr::null_mut();
let c_table = CString::new(table)?;
let c_schema = CString::new(schema)?;
let r = unsafe {
ffi::duckdb_appender_create(
self.con,
c_schema.as_ptr() as *const c_char,
c_table.as_ptr() as *const c_char,
&mut c_app,
)
};
result_from_duckdb_appender(r, &mut c_app)?;
Ok(Appender::new(conn, c_app))
}
pub fn appender_to_catalog_and_db<'a>(
&mut self,
conn: &'a Connection,
table: &str,
catalog: &str,
schema: &str,
) -> Result<Appender<'a>> {
let mut c_app: ffi::duckdb_appender = ptr::null_mut();
let c_table = CString::new(table)?;
let c_catalog = CString::new(catalog)?;
let c_schema = CString::new(schema)?;
let r = unsafe {
ffi::duckdb_appender_create_ext(
self.con,
c_catalog.as_ptr() as *const c_char,
c_schema.as_ptr() as *const c_char,
c_table.as_ptr() as *const c_char,
&mut c_app,
)
};
result_from_duckdb_appender(r, &mut c_app)?;
Ok(Appender::new(conn, c_app))
}
pub fn appender_with_columns<'a>(
&mut self,
conn: &'a Connection,
table: &str,
schema: &str,
catalog: Option<&str>,
columns: &[&str],
) -> Result<Appender<'a>> {
let mut appender = match catalog {
Some(catalog) => self.appender_to_catalog_and_db(conn, table, catalog, schema)?,
None => self.appender(conn, table, schema)?,
};
for column in columns {
appender.add_column(column)?;
}
Ok(appender)
}
pub fn get_interrupt_handle(&self) -> Arc<InterruptHandle> {
self.interrupt.clone()
}
#[inline]
pub fn is_autocommit(&self) -> bool {
true
}
}
struct ExtractedStatementsGuard(ffi::duckdb_extracted_statements);
impl Drop for ExtractedStatementsGuard {
fn drop(&mut self) {
unsafe { ffi::duckdb_destroy_extracted(&mut self.0) }
}
}
impl Drop for InnerConnection {
#[allow(unused_must_use)]
#[inline]
fn drop(&mut self) {
use std::thread::panicking;
if let Err(e) = self.close() {
if panicking() {
eprintln!("Error while closing DuckDB connection: {e:?}");
} else {
panic!("Error while closing DuckDB connection: {e:?}");
}
}
}
}
pub struct InterruptHandle {
conn: Mutex<ffi::duckdb_connection>,
}
unsafe impl Send for InterruptHandle {}
unsafe impl Sync for InterruptHandle {}
impl InterruptHandle {
fn new(conn: ffi::duckdb_connection) -> Self {
Self { conn: Mutex::new(conn) }
}
fn clear(&self) {
*(self.conn.lock().unwrap()) = ptr::null_mut();
}
pub fn interrupt(&self) {
let db_handle = self.conn.lock().unwrap();
if !db_handle.is_null() {
unsafe {
ffi::duckdb_interrupt(*db_handle);
}
}
}
}