use std::ffi::{c_char, CString};
use crate::arrow_stream::{ArrowArray, ArrowSchema, ArrowStream};
use crate::bindings;
use crate::error::{Error, Result};
use crate::format::OutputFormat;
use crate::query_result::QueryResult;
#[derive(Debug)]
pub struct Connection {
inner: *mut bindings::chdb_connection,
}
unsafe impl Send for Connection {}
impl Connection {
pub fn open(args: &[&str]) -> Result<Self> {
let c_args: Vec<CString> = args
.iter()
.map(|s| CString::new(*s))
.collect::<std::result::Result<Vec<_>, _>>()?;
let mut argv: Vec<*mut c_char> = c_args.iter().map(|s| s.as_ptr() as *mut c_char).collect();
let conn_ptr = unsafe { bindings::chdb_connect(argv.len() as i32, argv.as_mut_ptr()) };
if conn_ptr.is_null() {
return Err(Error::ConnectionFailed);
}
let conn = unsafe { *conn_ptr };
if conn.is_null() {
return Err(Error::ConnectionFailed);
}
Ok(Self { inner: conn_ptr })
}
pub fn open_in_memory() -> Result<Self> {
Self::open(&["clickhouse"])
}
pub fn open_with_path(path: &str) -> Result<Self> {
let path_arg = format!("--path={path}");
Self::open(&["clickhouse", &path_arg])
}
pub fn query(&self, sql: &str, format: OutputFormat) -> Result<QueryResult> {
let query_cstr = CString::new(sql)?;
let format_cstr = CString::new(format.as_str())?;
let conn = unsafe { *self.inner };
let result_ptr =
unsafe { bindings::chdb_query(conn, query_cstr.as_ptr(), format_cstr.as_ptr()) };
if result_ptr.is_null() {
return Err(Error::NoResult);
}
let result = QueryResult::new(result_ptr);
result.check_error()
}
pub fn register_arrow_stream(
&self,
table_name: &str,
arrow_stream: &ArrowStream,
) -> Result<()> {
let table_name_cstr = CString::new(table_name)?;
let conn = unsafe { *self.inner };
let state = unsafe {
bindings::chdb_arrow_scan(conn, table_name_cstr.as_ptr(), arrow_stream.as_raw())
};
if state == bindings::chdb_state_CHDBSuccess {
Ok(())
} else {
Err(Error::QueryError(format!(
"Failed to register Arrow stream as table '{}'",
table_name
)))
}
}
pub fn register_arrow_array(
&self,
table_name: &str,
arrow_schema: &ArrowSchema,
arrow_array: &ArrowArray,
) -> Result<()> {
let table_name_cstr = CString::new(table_name)?;
let conn = unsafe { *self.inner };
let state = unsafe {
bindings::chdb_arrow_array_scan(
conn,
table_name_cstr.as_ptr(),
arrow_schema.as_raw(),
arrow_array.as_raw(),
)
};
if state == bindings::chdb_state_CHDBSuccess {
Ok(())
} else {
Err(Error::QueryError(format!(
"Failed to register Arrow array as table '{}'",
table_name
)))
}
}
pub fn unregister_arrow_table(&self, table_name: &str) -> Result<()> {
let table_name_cstr = CString::new(table_name)?;
let conn = unsafe { *self.inner };
let state =
unsafe { bindings::chdb_arrow_unregister_table(conn, table_name_cstr.as_ptr()) };
if state == bindings::chdb_state_CHDBSuccess {
Ok(())
} else {
Err(Error::QueryError(format!(
"Failed to unregister Arrow table '{}'",
table_name
)))
}
}
}
impl Drop for Connection {
fn drop(&mut self) {
if !self.inner.is_null() {
unsafe { bindings::chdb_close_conn(self.inner) };
}
}
}