pub mod error;
pub use error::{OlapError, OltpError};
#[cfg(feature = "datafusion-backend")]
pub use rhei_datafusion::{DataFusionEngine, DfOlapError, SharedDataFusionEngine, StorageMode};
#[cfg(feature = "duckdb-backend")]
pub use rhei_duckdb::{DuckDbEngine, DuckDbError, SharedDuckDbEngine};
pub use rhei_oltp_rusqlite::{RusqliteCdcProducer, RusqliteEngine, RusqliteOltpError};
use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
#[derive(Clone)]
pub enum OlapBackend {
#[cfg(feature = "duckdb-backend")]
DuckDb(SharedDuckDbEngine),
#[cfg(feature = "datafusion-backend")]
DataFusion(SharedDataFusionEngine),
}
impl OlapBackend {
pub fn backend_name(&self) -> &'static str {
match self {
#[cfg(feature = "duckdb-backend")]
Self::DuckDb(_) => "DuckDB",
#[cfg(feature = "datafusion-backend")]
Self::DataFusion(_) => "DataFusion",
}
}
}
impl rhei_core::OlapEngine for OlapBackend {
type Error = OlapError;
async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
match self {
#[cfg(feature = "duckdb-backend")]
Self::DuckDb(e) => e.query(sql).await.map_err(OlapError::from),
#[cfg(feature = "datafusion-backend")]
Self::DataFusion(e) => e.query(sql).await.map_err(OlapError::from),
}
}
async fn query_stream(
&self,
sql: &str,
) -> Result<rhei_core::RecordBatchBoxStream, Self::Error> {
match self {
#[cfg(feature = "duckdb-backend")]
Self::DuckDb(e) => e.query_stream(sql).await.map_err(OlapError::from),
#[cfg(feature = "datafusion-backend")]
Self::DataFusion(e) => e.query_stream(sql).await.map_err(OlapError::from),
}
}
async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
match self {
#[cfg(feature = "duckdb-backend")]
Self::DuckDb(e) => e.execute(sql).await.map_err(OlapError::from),
#[cfg(feature = "datafusion-backend")]
Self::DataFusion(e) => e.execute(sql).await.map_err(OlapError::from),
}
}
async fn load_arrow(&self, table: &str, batches: &[RecordBatch]) -> Result<u64, Self::Error> {
match self {
#[cfg(feature = "duckdb-backend")]
Self::DuckDb(e) => e.load_arrow(table, batches).await.map_err(OlapError::from),
#[cfg(feature = "datafusion-backend")]
Self::DataFusion(e) => e.load_arrow(table, batches).await.map_err(OlapError::from),
}
}
async fn create_table(
&self,
table_name: &str,
schema: &SchemaRef,
primary_key: &[String],
) -> Result<(), Self::Error> {
match self {
#[cfg(feature = "duckdb-backend")]
Self::DuckDb(e) => e
.create_table(table_name, schema, primary_key)
.await
.map_err(OlapError::from),
#[cfg(feature = "datafusion-backend")]
Self::DataFusion(e) => e
.create_table(table_name, schema, primary_key)
.await
.map_err(OlapError::from),
}
}
async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
match self {
#[cfg(feature = "duckdb-backend")]
Self::DuckDb(e) => e.table_exists(table_name).await.map_err(OlapError::from),
#[cfg(feature = "datafusion-backend")]
Self::DataFusion(e) => e.table_exists(table_name).await.map_err(OlapError::from),
}
}
async fn add_column(
&self,
table_name: &str,
column_name: &str,
data_type: &DataType,
) -> Result<(), Self::Error> {
match self {
#[cfg(feature = "duckdb-backend")]
Self::DuckDb(e) => e
.add_column(table_name, column_name, data_type)
.await
.map_err(OlapError::from),
#[cfg(feature = "datafusion-backend")]
Self::DataFusion(e) => e
.add_column(table_name, column_name, data_type)
.await
.map_err(OlapError::from),
}
}
async fn drop_column(&self, table_name: &str, column_name: &str) -> Result<(), Self::Error> {
match self {
#[cfg(feature = "duckdb-backend")]
Self::DuckDb(e) => e
.drop_column(table_name, column_name)
.await
.map_err(OlapError::from),
#[cfg(feature = "datafusion-backend")]
Self::DataFusion(e) => e
.drop_column(table_name, column_name)
.await
.map_err(OlapError::from),
}
}
fn supports_transactions(&self) -> bool {
match self {
#[cfg(feature = "duckdb-backend")]
Self::DuckDb(e) => e.supports_transactions(),
#[cfg(feature = "datafusion-backend")]
Self::DataFusion(e) => e.supports_transactions(),
}
}
}
pub enum OltpBackend {
Rusqlite(RusqliteEngine),
}
impl OltpBackend {
pub fn backend_name(&self) -> &'static str {
match self {
Self::Rusqlite(_) => "Rusqlite",
}
}
pub fn as_rusqlite(&self) -> Option<&RusqliteEngine> {
match self {
Self::Rusqlite(e) => Some(e),
}
}
}
impl rhei_core::OltpEngine for OltpBackend {
type Error = OltpError;
async fn query(
&self,
sql: &str,
params: &[serde_json::Value],
) -> Result<Vec<arrow::record_batch::RecordBatch>, Self::Error> {
match self {
Self::Rusqlite(e) => e.query(sql, params).await.map_err(OltpError::from),
}
}
async fn execute(&self, sql: &str, params: &[serde_json::Value]) -> Result<u64, Self::Error> {
match self {
Self::Rusqlite(e) => e.execute(sql, params).await.map_err(OltpError::from),
}
}
async fn execute_batch(
&self,
statements: &[(String, Vec<serde_json::Value>)],
) -> Result<(), Self::Error> {
match self {
Self::Rusqlite(e) => e.execute_batch(statements).await.map_err(OltpError::from),
}
}
async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
match self {
Self::Rusqlite(e) => e.table_exists(table_name).await.map_err(OltpError::from),
}
}
}