rhei-core 1.5.0

Core traits and types for the Rhei HTAP engine
Documentation
//! [`OlapEngine`] trait and [`RecordBatchBoxStream`] streaming type.
//!
//! Implementations include `rhei-duckdb` and `rhei-datafusion`. All data
//! crosses the trait boundary as Arrow [`RecordBatch`]es for zero-copy interop
//! with the rest of the Arrow ecosystem (Flight SQL, DataFusion, etc.).

use std::pin::Pin;
use std::task::{Context, Poll};

use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;

/// A boxed, pinned stream of `RecordBatch` results.
///
/// This is the backend-agnostic streaming type used by `query_stream()`.
/// DataFusion produces this natively; DuckDB adapts via a channel-based bridge.
pub type RecordBatchBoxStream = Pin<
    Box<
        dyn futures_core::Stream<
                Item = Result<RecordBatch, Box<dyn std::error::Error + Send + Sync>>,
            > + Send,
    >,
>;

/// Wrap a `Vec<RecordBatch>` into a `RecordBatchBoxStream`.
pub fn vec_to_stream(batches: Vec<RecordBatch>) -> RecordBatchBoxStream {
    Box::pin(VecStream {
        batches: batches.into_iter(),
    })
}

/// Simple stream adapter over a `Vec<RecordBatch>` iterator.
struct VecStream {
    batches: std::vec::IntoIter<RecordBatch>,
}

impl futures_core::Stream for VecStream {
    type Item = Result<RecordBatch, Box<dyn std::error::Error + Send + Sync>>;

    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        Poll::Ready(self.batches.next().map(Ok))
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        self.batches.size_hint()
    }
}

/// Abstraction over an OLAP (analytical) query engine.
///
/// Implementations include `rhei-duckdb` and `rhei-datafusion`. All data
/// flows through Arrow [`RecordBatch`]es for zero-copy interop.
///
/// # Contract for implementors
///
/// - `query` and `execute` must be callable concurrently from multiple async
///   tasks (i.e., the engine must provide its own internal locking).
/// - `load_arrow` should prefer a native bulk-insert path (e.g., DuckDB's
///   `Appender` API) over generating SQL literals when possible.
/// - `supports_transactions` must return the same value for the lifetime of
///   the engine instance.
pub trait OlapEngine: Send + Sync {
    /// Engine-specific error type returned by all fallible methods.
    type Error: std::error::Error + Send + Sync + 'static;

    /// Execute an analytical SQL query and return results as Arrow
    /// [`RecordBatch`]es buffered in memory.
    ///
    /// For streaming results without buffering the full result set, prefer
    /// [`OlapEngine::query_stream`].
    ///
    /// # Errors
    ///
    /// Returns `Err(Self::Error)` on SQL parse failure, execution error, or
    /// internal engine error.
    fn query(
        &self,
        sql: &str,
    ) -> impl std::future::Future<Output = Result<Vec<RecordBatch>, Self::Error>> + Send;

    /// Execute a SQL query and return a lazy [`RecordBatchBoxStream`].
    ///
    /// Unlike [`OlapEngine::query`], this does not buffer the full result set.
    /// Batches are produced as the engine generates them, which is important
    /// for large result sets sent over Arrow Flight.
    ///
    /// The default implementation falls back to [`OlapEngine::query`] and
    /// wraps the collected result in a stream. Backends that support native
    /// streaming (e.g., DataFusion via `execute_stream()`) should override this.
    ///
    /// # Errors
    ///
    /// Returns `Err(Self::Error)` if query planning or execution fails before
    /// the first batch is produced. Errors that occur mid-stream are surfaced
    /// as `Err` items in the returned stream.
    fn query_stream(
        &self,
        sql: &str,
    ) -> impl std::future::Future<Output = Result<RecordBatchBoxStream, Self::Error>> + Send {
        let sql = sql.to_string();
        async move {
            let batches = self.query(&sql).await?;
            Ok(vec_to_stream(batches))
        }
    }

    /// Execute a DDL or DML statement without returning rows.
    ///
    /// Returns the number of rows affected (0 for DDL).
    ///
    /// # Errors
    ///
    /// Returns `Err(Self::Error)` on SQL parse failure, constraint violation,
    /// or internal engine error.
    fn execute(
        &self,
        sql: &str,
    ) -> impl std::future::Future<Output = Result<u64, Self::Error>> + Send;

    /// Bulk-load Arrow [`RecordBatch`]es into `table`.
    ///
    /// Implementations should use a native zero-copy path when available (e.g.,
    /// DuckDB's `Appender` API). Returns the total number of rows ingested.
    ///
    /// # Errors
    ///
    /// Returns `Err(Self::Error)` if the table does not exist, a batch's schema
    /// does not match the table schema, or the engine rejects the data.
    fn load_arrow(
        &self,
        table: &str,
        batches: &[RecordBatch],
    ) -> impl std::future::Future<Output = Result<u64, Self::Error>> + Send;

    /// Create a table from an Arrow schema with an optional primary key.
    ///
    /// `primary_key` is a slice of column names. Backends that support PK
    /// enforcement (e.g. DuckDB) emit a `PRIMARY KEY (col1, col2)` clause;
    /// backends that do not (e.g. DataFusion) accept the parameter as
    /// informational metadata only.
    ///
    /// # Errors
    ///
    /// Returns `Err(Self::Error)` if the table already exists or the schema
    /// contains an unsupported data type.
    fn create_table(
        &self,
        table_name: &str,
        schema: &SchemaRef,
        primary_key: &[String],
    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;

    /// Return `true` if `table_name` exists in the OLAP catalog.
    ///
    /// # Errors
    ///
    /// Returns `Err(Self::Error)` if the catalog query fails.
    fn table_exists(
        &self,
        table_name: &str,
    ) -> impl std::future::Future<Output = Result<bool, Self::Error>> + Send;

    /// Add a nullable column to an existing table.
    ///
    /// # Errors
    ///
    /// Returns `Err(Self::Error)` if the table does not exist, the column
    /// already exists, or the data type is unsupported.
    fn add_column(
        &self,
        table_name: &str,
        column_name: &str,
        data_type: &arrow::datatypes::DataType,
    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;

    /// Remove a column from an existing table.
    ///
    /// # Errors
    ///
    /// Returns `Err(Self::Error)` if the table does not exist or the column
    /// is not present.
    fn drop_column(
        &self,
        table_name: &str,
        column_name: &str,
    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;

    /// Returns `true` if this backend supports ACID transactions via
    /// `BEGIN` / `COMMIT` / `ROLLBACK`.
    ///
    /// Backends that return `false` (e.g., DataFusion) treat these statements as
    /// no-ops. The sync engine uses this signal to decide whether to wrap sync
    /// cycles in transactions or to use idempotent per-statement semantics with
    /// seq-based recovery.
    ///
    /// Default: `false` (conservative).
    fn supports_transactions(&self) -> bool {
        false
    }
}