rhei-olap 1.5.0

Backend-agnostic OLAP dispatcher for Rhei HTAP engine
Documentation
//! Backend-agnostic OLAP and OLTP dispatcher for the Rhei HTAP engine.
//!
//! # Purpose
//!
//! This crate provides two enum dispatchers — [`OlapBackend`] and [`OltpBackend`] — that
//! forward every [`rhei_core::OlapEngine`] and [`rhei_core::OltpEngine`] method call to the
//! concrete engine selected at configuration time.  The consuming crates (`rhei`, `rhei-sync`,
//! `rhei-flight`) only ever see one type, so they are compiled once regardless of which backend
//! combination is enabled.
//!
//! Using an enum (instead of `Box<dyn OlapEngine>`) avoids heap allocation and virtual-dispatch
//! overhead in the hot sync path while still allowing the backend choice to be deferred until
//! runtime configuration.
//!
//! # OLAP feature flags
//!
//! | Feature | Default | Description |
//! |---------|---------|-------------|
//! | `datafusion-backend` | **yes** | Enables the [`OlapBackend::DataFusion`] variant backed by [`rhei_datafusion::DataFusionEngine`] |
//! | `duckdb-backend` | no | Enables the [`OlapBackend::DuckDb`] variant backed by [`rhei_duckdb::DuckDbEngine`] |
//! | `full` | no | Both `datafusion-backend` and `duckdb-backend` simultaneously |
//! | `cloud-storage` | no | Passes the `cloud-storage` flag through to `rhei-datafusion` (S3 / GCS Parquet backends) |
//!
//! At least one of `datafusion-backend` or `duckdb-backend` must be enabled; the crate will not
//! compile if both are absent.
//!
//! # Dispatch pattern
//!
//! Every async method on [`OlapBackend`] is a thin `match` over the active variant that calls the
//! same method on the inner engine and maps its error to [`OlapError`].  The same pattern applies
//! to [`OltpBackend`] and [`OltpError`].
//!
//! # Re-exports
//!
//! For consumer convenience this crate re-exports the concrete engine types and their errors so
//! that callers do not need to depend on `rhei-datafusion` or `rhei-duckdb` directly.

pub mod error;

pub use error::{OlapError, OltpError};

// Re-export OLAP backend types for convenience
#[cfg(feature = "datafusion-backend")]
pub use rhei_datafusion::{DataFusionEngine, DfOlapError, SharedDataFusionEngine, StorageMode};
#[cfg(feature = "duckdb-backend")]
pub use rhei_duckdb::{DuckDbEngine, DuckDbError, SharedDuckDbEngine};

// Re-export OLTP backend types
pub use rhei_oltp_rusqlite::{RusqliteCdcProducer, RusqliteEngine, RusqliteOltpError};

use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;

/// Unified OLAP backend that wraps all supported engines.
///
/// This enum is the primary dispatch boundary for analytical queries in the Rhei HTAP engine.
/// It implements [`rhei_core::OlapEngine`] by forwarding every method to the active inner engine,
/// converting backend-specific errors to [`OlapError`].
///
/// The active variant is chosen at configuration time (see [`rhei_core`] config types) and
/// remains fixed for the lifetime of the engine.  Because the choice is encoded in the variant,
/// the compiler can inline through the `match` arms and the common case produces no virtual
/// dispatch overhead.
///
/// # Feature gates
///
/// Each variant is independently gated.  Build with `--features datafusion-backend` (the default)
/// for DataFusion, `--features duckdb-backend` for DuckDB, or `--features full` for both.
#[derive(Clone)]
pub enum OlapBackend {
    /// DuckDB OLAP engine.
    ///
    /// Wraps a [`rhei_duckdb::SharedDuckDbEngine`] (an `Arc`-cloneable handle to a
    /// [`rhei_duckdb::DuckDbEngine`]).  All operations are dispatched via `spawn_blocking`
    /// because the underlying DuckDB C++ library is not async-native.
    ///
    /// Available on crate feature `duckdb-backend` only.
    #[cfg(feature = "duckdb-backend")]
    DuckDb(SharedDuckDbEngine),

    /// Apache DataFusion OLAP engine.
    ///
    /// Wraps a [`rhei_datafusion::SharedDataFusionEngine`] (an `Arc`-cloneable handle to a
    /// [`rhei_datafusion::DataFusionEngine`]).  DataFusion is natively async; operations run
    /// directly on the Tokio runtime without `spawn_blocking`.
    ///
    /// Available on crate feature `datafusion-backend` only.
    #[cfg(feature = "datafusion-backend")]
    DataFusion(SharedDataFusionEngine),
}

impl OlapBackend {
    /// Returns a human-readable name for the active backend (`"DuckDB"` or `"DataFusion"`).
    ///
    /// Useful for logging, metrics labels, and diagnostic output.
    pub fn backend_name(&self) -> &'static str {
        match self {
            #[cfg(feature = "duckdb-backend")]
            Self::DuckDb(_) => "DuckDB",
            #[cfg(feature = "datafusion-backend")]
            Self::DataFusion(_) => "DataFusion",
        }
    }
}

impl rhei_core::OlapEngine for OlapBackend {
    type Error = OlapError;

    /// Executes a read-only SQL query and collects all result rows into memory.
    ///
    /// Dispatches to the inner engine's `query` implementation and maps its error to
    /// [`OlapError`].
    async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
        match self {
            #[cfg(feature = "duckdb-backend")]
            Self::DuckDb(e) => e.query(sql).await.map_err(OlapError::from),
            #[cfg(feature = "datafusion-backend")]
            Self::DataFusion(e) => e.query(sql).await.map_err(OlapError::from),
        }
    }

    /// Executes a read-only SQL query and returns results as a streaming
    /// `Pin<Box<dyn Stream<Item = Result<RecordBatch>>>>`.
    ///
    /// Dispatches to the inner engine's `query_stream` implementation.  DataFusion streams
    /// natively; DuckDB collects results first and then wraps them in a stream.
    async fn query_stream(
        &self,
        sql: &str,
    ) -> Result<rhei_core::RecordBatchBoxStream, Self::Error> {
        match self {
            #[cfg(feature = "duckdb-backend")]
            Self::DuckDb(e) => e.query_stream(sql).await.map_err(OlapError::from),
            #[cfg(feature = "datafusion-backend")]
            Self::DataFusion(e) => e.query_stream(sql).await.map_err(OlapError::from),
        }
    }

    /// Executes a write (DML/DDL) SQL statement and returns the number of affected rows.
    ///
    /// Dispatches to the inner engine's `execute` implementation.
    async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
        match self {
            #[cfg(feature = "duckdb-backend")]
            Self::DuckDb(e) => e.execute(sql).await.map_err(OlapError::from),
            #[cfg(feature = "datafusion-backend")]
            Self::DataFusion(e) => e.execute(sql).await.map_err(OlapError::from),
        }
    }

    /// Bulk-loads Arrow [`RecordBatch`]es into an existing OLAP table.
    ///
    /// Dispatches to the inner engine's `load_arrow` implementation.  DuckDB uses the native
    /// `Appender` API for zero-copy ingestion; DataFusion merges the batches into its in-memory
    /// or on-disk representation.
    ///
    /// Returns the total number of rows loaded.
    async fn load_arrow(&self, table: &str, batches: &[RecordBatch]) -> Result<u64, Self::Error> {
        match self {
            #[cfg(feature = "duckdb-backend")]
            Self::DuckDb(e) => e.load_arrow(table, batches).await.map_err(OlapError::from),
            #[cfg(feature = "datafusion-backend")]
            Self::DataFusion(e) => e.load_arrow(table, batches).await.map_err(OlapError::from),
        }
    }

    /// Creates a new table in the OLAP backend with the given Arrow schema and primary key.
    ///
    /// Dispatches to the inner engine's `create_table` implementation.
    async fn create_table(
        &self,
        table_name: &str,
        schema: &SchemaRef,
        primary_key: &[String],
    ) -> Result<(), Self::Error> {
        match self {
            #[cfg(feature = "duckdb-backend")]
            Self::DuckDb(e) => e
                .create_table(table_name, schema, primary_key)
                .await
                .map_err(OlapError::from),
            #[cfg(feature = "datafusion-backend")]
            Self::DataFusion(e) => e
                .create_table(table_name, schema, primary_key)
                .await
                .map_err(OlapError::from),
        }
    }

    /// Returns `true` if the named table exists in the OLAP backend.
    ///
    /// Dispatches to the inner engine's `table_exists` implementation.
    async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
        match self {
            #[cfg(feature = "duckdb-backend")]
            Self::DuckDb(e) => e.table_exists(table_name).await.map_err(OlapError::from),
            #[cfg(feature = "datafusion-backend")]
            Self::DataFusion(e) => e.table_exists(table_name).await.map_err(OlapError::from),
        }
    }

    /// Adds a new column with the given Arrow [`DataType`] to an existing OLAP table.
    ///
    /// Dispatches to the inner engine's `add_column` implementation.
    async fn add_column(
        &self,
        table_name: &str,
        column_name: &str,
        data_type: &DataType,
    ) -> Result<(), Self::Error> {
        match self {
            #[cfg(feature = "duckdb-backend")]
            Self::DuckDb(e) => e
                .add_column(table_name, column_name, data_type)
                .await
                .map_err(OlapError::from),
            #[cfg(feature = "datafusion-backend")]
            Self::DataFusion(e) => e
                .add_column(table_name, column_name, data_type)
                .await
                .map_err(OlapError::from),
        }
    }

    /// Drops an existing column from an OLAP table.
    ///
    /// Dispatches to the inner engine's `drop_column` implementation.
    async fn drop_column(&self, table_name: &str, column_name: &str) -> Result<(), Self::Error> {
        match self {
            #[cfg(feature = "duckdb-backend")]
            Self::DuckDb(e) => e
                .drop_column(table_name, column_name)
                .await
                .map_err(OlapError::from),
            #[cfg(feature = "datafusion-backend")]
            Self::DataFusion(e) => e
                .drop_column(table_name, column_name)
                .await
                .map_err(OlapError::from),
        }
    }

    /// Returns `true` if the backend supports multi-statement transactions.
    ///
    /// DuckDB supports full ACID transactions (`BEGIN`/`COMMIT`/`ROLLBACK`), so this returns
    /// `true` for the [`OlapBackend::DuckDb`] variant.  DataFusion does not support
    /// transactions, so this returns `false` for [`OlapBackend::DataFusion`].
    ///
    /// The sync engine uses this flag to decide whether to wrap CDC sync cycles in an explicit
    /// `BEGIN..COMMIT` block.
    fn supports_transactions(&self) -> bool {
        match self {
            #[cfg(feature = "duckdb-backend")]
            Self::DuckDb(e) => e.supports_transactions(),
            #[cfg(feature = "datafusion-backend")]
            Self::DataFusion(e) => e.supports_transactions(),
        }
    }
}

/// Unified OLTP backend enum.
///
/// Mirrors [`OlapBackend`] for the transactional layer: the `HtapEngine` facade and the sync
/// engine work against this single type regardless of which OLTP engine is actually running.
///
/// Currently only [`OltpBackend::Rusqlite`] is provided.  Future variants (PostgreSQL, MySQL,
/// etc.) can be added without changing any consumer code.
pub enum OltpBackend {
    /// SQLite OLTP engine backed by [`RusqliteEngine`].
    ///
    /// Uses WAL mode, a single write connection, and a round-robin read pool.
    /// Trigger-based CDC is installed by [`RusqliteCdcProducer`] on each registered table.
    Rusqlite(RusqliteEngine),
    // future: Postgres(PostgresEngine), MySQL(MySqlEngine), etc.
}

impl OltpBackend {
    /// Returns a human-readable name for the active backend (currently always `"Rusqlite"`).
    ///
    /// Useful for logging and diagnostic output.
    pub fn backend_name(&self) -> &'static str {
        match self {
            Self::Rusqlite(_) => "Rusqlite",
        }
    }

    /// Returns a reference to the inner [`RusqliteEngine`], if this is the `Rusqlite` variant.
    ///
    /// Returns `None` for any other variant.
    ///
    /// Use this only for rusqlite-specific operations (CDC trigger setup, obtaining raw
    /// connections).  All trait-level OLTP operations should go through [`rhei_core::OltpEngine`].
    pub fn as_rusqlite(&self) -> Option<&RusqliteEngine> {
        match self {
            Self::Rusqlite(e) => Some(e),
        }
    }
}

impl rhei_core::OltpEngine for OltpBackend {
    type Error = OltpError;

    /// Executes a parameterised read-only SQL query against the OLTP engine.
    ///
    /// Dispatches to the inner engine's `query` implementation and maps its error to
    /// [`OltpError`].
    async fn query(
        &self,
        sql: &str,
        params: &[serde_json::Value],
    ) -> Result<Vec<arrow::record_batch::RecordBatch>, Self::Error> {
        match self {
            Self::Rusqlite(e) => e.query(sql, params).await.map_err(OltpError::from),
        }
    }

    /// Executes a parameterised write (DML/DDL) SQL statement against the OLTP engine.
    ///
    /// Dispatches to the inner engine's `execute` implementation and returns the number of
    /// affected rows.
    async fn execute(&self, sql: &str, params: &[serde_json::Value]) -> Result<u64, Self::Error> {
        match self {
            Self::Rusqlite(e) => e.execute(sql, params).await.map_err(OltpError::from),
        }
    }

    /// Executes a batch of parameterised SQL statements as a single unit against the OLTP engine.
    ///
    /// Dispatches to the inner engine's `execute_batch` implementation.  The entire batch is
    /// wrapped in a transaction; if any statement fails the transaction is rolled back.
    async fn execute_batch(
        &self,
        statements: &[(String, Vec<serde_json::Value>)],
    ) -> Result<(), Self::Error> {
        match self {
            Self::Rusqlite(e) => e.execute_batch(statements).await.map_err(OltpError::from),
        }
    }

    /// Returns `true` if the named table exists in the OLTP engine.
    ///
    /// Dispatches to the inner engine's `table_exists` implementation.
    async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
        match self {
            Self::Rusqlite(e) => e.table_exists(table_name).await.map_err(OltpError::from),
        }
    }
}