Skip to main content

rhei_olap/
lib.rs

1//! Backend-agnostic OLAP and OLTP dispatcher for the Rhei HTAP engine.
2//!
3//! # Purpose
4//!
5//! This crate provides two enum dispatchers — [`OlapBackend`] and [`OltpBackend`] — that
6//! forward every [`rhei_core::OlapEngine`] and [`rhei_core::OltpEngine`] method call to the
7//! concrete engine selected at configuration time.  The consuming crates (`rhei`, `rhei-sync`,
8//! `rhei-flight`) only ever see one type, so they are compiled once regardless of which backend
9//! combination is enabled.
10//!
11//! Using an enum (instead of `Box<dyn OlapEngine>`) avoids heap allocation and virtual-dispatch
12//! overhead in the hot sync path while still allowing the backend choice to be deferred until
13//! runtime configuration.
14//!
15//! # OLAP feature flags
16//!
17//! | Feature | Default | Description |
18//! |---------|---------|-------------|
19//! | `datafusion-backend` | **yes** | Enables the [`OlapBackend::DataFusion`] variant backed by [`rhei_datafusion::DataFusionEngine`] |
20//! | `duckdb-backend` | no | Enables the [`OlapBackend::DuckDb`] variant backed by [`rhei_duckdb::DuckDbEngine`] |
21//! | `full` | no | Both `datafusion-backend` and `duckdb-backend` simultaneously |
22//! | `cloud-storage` | no | Passes the `cloud-storage` flag through to `rhei-datafusion` (S3 / GCS Parquet backends) |
23//!
24//! At least one of `datafusion-backend` or `duckdb-backend` must be enabled; the crate will not
25//! compile if both are absent.
26//!
27//! # Dispatch pattern
28//!
29//! Every async method on [`OlapBackend`] is a thin `match` over the active variant that calls the
30//! same method on the inner engine and maps its error to [`OlapError`].  The same pattern applies
31//! to [`OltpBackend`] and [`OltpError`].
32//!
33//! # Re-exports
34//!
35//! For consumer convenience this crate re-exports the concrete engine types and their errors so
36//! that callers do not need to depend on `rhei-datafusion` or `rhei-duckdb` directly.
37
38pub mod error;
39
40pub use error::{OlapError, OltpError};
41
42// Re-export OLAP backend types for convenience
43#[cfg(feature = "datafusion-backend")]
44pub use rhei_datafusion::{DataFusionEngine, DfOlapError, SharedDataFusionEngine, StorageMode};
45#[cfg(feature = "duckdb-backend")]
46pub use rhei_duckdb::{DuckDbEngine, DuckDbError, SharedDuckDbEngine};
47
48// Re-export OLTP backend types
49pub use rhei_oltp_rusqlite::{RusqliteCdcProducer, RusqliteEngine, RusqliteOltpError};
50
51use arrow::datatypes::{DataType, SchemaRef};
52use arrow::record_batch::RecordBatch;
53
54/// Unified OLAP backend that wraps all supported engines.
55///
56/// This enum is the primary dispatch boundary for analytical queries in the Rhei HTAP engine.
57/// It implements [`rhei_core::OlapEngine`] by forwarding every method to the active inner engine,
58/// converting backend-specific errors to [`OlapError`].
59///
60/// The active variant is chosen at configuration time (see [`rhei_core`] config types) and
61/// remains fixed for the lifetime of the engine.  Because the choice is encoded in the variant,
62/// the compiler can inline through the `match` arms and the common case produces no virtual
63/// dispatch overhead.
64///
65/// # Feature gates
66///
67/// Each variant is independently gated.  Build with `--features datafusion-backend` (the default)
68/// for DataFusion, `--features duckdb-backend` for DuckDB, or `--features full` for both.
69#[derive(Clone)]
70pub enum OlapBackend {
71    /// DuckDB OLAP engine.
72    ///
73    /// Wraps a [`rhei_duckdb::SharedDuckDbEngine`] (an `Arc`-cloneable handle to a
74    /// [`rhei_duckdb::DuckDbEngine`]).  All operations are dispatched via `spawn_blocking`
75    /// because the underlying DuckDB C++ library is not async-native.
76    ///
77    /// Available on crate feature `duckdb-backend` only.
78    #[cfg(feature = "duckdb-backend")]
79    DuckDb(SharedDuckDbEngine),
80
81    /// Apache DataFusion OLAP engine.
82    ///
83    /// Wraps a [`rhei_datafusion::SharedDataFusionEngine`] (an `Arc`-cloneable handle to a
84    /// [`rhei_datafusion::DataFusionEngine`]).  DataFusion is natively async; operations run
85    /// directly on the Tokio runtime without `spawn_blocking`.
86    ///
87    /// Available on crate feature `datafusion-backend` only.
88    #[cfg(feature = "datafusion-backend")]
89    DataFusion(SharedDataFusionEngine),
90}
91
92impl OlapBackend {
93    /// Returns a human-readable name for the active backend (`"DuckDB"` or `"DataFusion"`).
94    ///
95    /// Useful for logging, metrics labels, and diagnostic output.
96    pub fn backend_name(&self) -> &'static str {
97        match self {
98            #[cfg(feature = "duckdb-backend")]
99            Self::DuckDb(_) => "DuckDB",
100            #[cfg(feature = "datafusion-backend")]
101            Self::DataFusion(_) => "DataFusion",
102        }
103    }
104}
105
106impl rhei_core::OlapEngine for OlapBackend {
107    type Error = OlapError;
108
109    /// Executes a read-only SQL query and collects all result rows into memory.
110    ///
111    /// Dispatches to the inner engine's `query` implementation and maps its error to
112    /// [`OlapError`].
113    async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
114        match self {
115            #[cfg(feature = "duckdb-backend")]
116            Self::DuckDb(e) => e.query(sql).await.map_err(OlapError::from),
117            #[cfg(feature = "datafusion-backend")]
118            Self::DataFusion(e) => e.query(sql).await.map_err(OlapError::from),
119        }
120    }
121
122    /// Executes a read-only SQL query and returns results as a streaming
123    /// `Pin<Box<dyn Stream<Item = Result<RecordBatch>>>>`.
124    ///
125    /// Dispatches to the inner engine's `query_stream` implementation.  DataFusion streams
126    /// natively; DuckDB collects results first and then wraps them in a stream.
127    async fn query_stream(
128        &self,
129        sql: &str,
130    ) -> Result<rhei_core::RecordBatchBoxStream, Self::Error> {
131        match self {
132            #[cfg(feature = "duckdb-backend")]
133            Self::DuckDb(e) => e.query_stream(sql).await.map_err(OlapError::from),
134            #[cfg(feature = "datafusion-backend")]
135            Self::DataFusion(e) => e.query_stream(sql).await.map_err(OlapError::from),
136        }
137    }
138
139    /// Executes a write (DML/DDL) SQL statement and returns the number of affected rows.
140    ///
141    /// Dispatches to the inner engine's `execute` implementation.
142    async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
143        match self {
144            #[cfg(feature = "duckdb-backend")]
145            Self::DuckDb(e) => e.execute(sql).await.map_err(OlapError::from),
146            #[cfg(feature = "datafusion-backend")]
147            Self::DataFusion(e) => e.execute(sql).await.map_err(OlapError::from),
148        }
149    }
150
151    /// Bulk-loads Arrow [`RecordBatch`]es into an existing OLAP table.
152    ///
153    /// Dispatches to the inner engine's `load_arrow` implementation.  DuckDB uses the native
154    /// `Appender` API for zero-copy ingestion; DataFusion merges the batches into its in-memory
155    /// or on-disk representation.
156    ///
157    /// Returns the total number of rows loaded.
158    async fn load_arrow(&self, table: &str, batches: &[RecordBatch]) -> Result<u64, Self::Error> {
159        match self {
160            #[cfg(feature = "duckdb-backend")]
161            Self::DuckDb(e) => e.load_arrow(table, batches).await.map_err(OlapError::from),
162            #[cfg(feature = "datafusion-backend")]
163            Self::DataFusion(e) => e.load_arrow(table, batches).await.map_err(OlapError::from),
164        }
165    }
166
167    /// Creates a new table in the OLAP backend with the given Arrow schema and primary key.
168    ///
169    /// Dispatches to the inner engine's `create_table` implementation.
170    async fn create_table(
171        &self,
172        table_name: &str,
173        schema: &SchemaRef,
174        primary_key: &[String],
175    ) -> Result<(), Self::Error> {
176        match self {
177            #[cfg(feature = "duckdb-backend")]
178            Self::DuckDb(e) => e
179                .create_table(table_name, schema, primary_key)
180                .await
181                .map_err(OlapError::from),
182            #[cfg(feature = "datafusion-backend")]
183            Self::DataFusion(e) => e
184                .create_table(table_name, schema, primary_key)
185                .await
186                .map_err(OlapError::from),
187        }
188    }
189
190    /// Returns `true` if the named table exists in the OLAP backend.
191    ///
192    /// Dispatches to the inner engine's `table_exists` implementation.
193    async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
194        match self {
195            #[cfg(feature = "duckdb-backend")]
196            Self::DuckDb(e) => e.table_exists(table_name).await.map_err(OlapError::from),
197            #[cfg(feature = "datafusion-backend")]
198            Self::DataFusion(e) => e.table_exists(table_name).await.map_err(OlapError::from),
199        }
200    }
201
202    /// Adds a new column with the given Arrow [`DataType`] to an existing OLAP table.
203    ///
204    /// Dispatches to the inner engine's `add_column` implementation.
205    async fn add_column(
206        &self,
207        table_name: &str,
208        column_name: &str,
209        data_type: &DataType,
210    ) -> Result<(), Self::Error> {
211        match self {
212            #[cfg(feature = "duckdb-backend")]
213            Self::DuckDb(e) => e
214                .add_column(table_name, column_name, data_type)
215                .await
216                .map_err(OlapError::from),
217            #[cfg(feature = "datafusion-backend")]
218            Self::DataFusion(e) => e
219                .add_column(table_name, column_name, data_type)
220                .await
221                .map_err(OlapError::from),
222        }
223    }
224
225    /// Drops an existing column from an OLAP table.
226    ///
227    /// Dispatches to the inner engine's `drop_column` implementation.
228    async fn drop_column(&self, table_name: &str, column_name: &str) -> Result<(), Self::Error> {
229        match self {
230            #[cfg(feature = "duckdb-backend")]
231            Self::DuckDb(e) => e
232                .drop_column(table_name, column_name)
233                .await
234                .map_err(OlapError::from),
235            #[cfg(feature = "datafusion-backend")]
236            Self::DataFusion(e) => e
237                .drop_column(table_name, column_name)
238                .await
239                .map_err(OlapError::from),
240        }
241    }
242
243    /// Returns `true` if the backend supports multi-statement transactions.
244    ///
245    /// DuckDB supports full ACID transactions (`BEGIN`/`COMMIT`/`ROLLBACK`), so this returns
246    /// `true` for the [`OlapBackend::DuckDb`] variant.  DataFusion does not support
247    /// transactions, so this returns `false` for [`OlapBackend::DataFusion`].
248    ///
249    /// The sync engine uses this flag to decide whether to wrap CDC sync cycles in an explicit
250    /// `BEGIN..COMMIT` block.
251    fn supports_transactions(&self) -> bool {
252        match self {
253            #[cfg(feature = "duckdb-backend")]
254            Self::DuckDb(e) => e.supports_transactions(),
255            #[cfg(feature = "datafusion-backend")]
256            Self::DataFusion(e) => e.supports_transactions(),
257        }
258    }
259}
260
261/// Unified OLTP backend enum.
262///
263/// Mirrors [`OlapBackend`] for the transactional layer: the `HtapEngine` facade and the sync
264/// engine work against this single type regardless of which OLTP engine is actually running.
265///
266/// Currently only [`OltpBackend::Rusqlite`] is provided.  Future variants (PostgreSQL, MySQL,
267/// etc.) can be added without changing any consumer code.
268pub enum OltpBackend {
269    /// SQLite OLTP engine backed by [`RusqliteEngine`].
270    ///
271    /// Uses WAL mode, a single write connection, and a round-robin read pool.
272    /// Trigger-based CDC is installed by [`RusqliteCdcProducer`] on each registered table.
273    Rusqlite(RusqliteEngine),
274    // future: Postgres(PostgresEngine), MySQL(MySqlEngine), etc.
275}
276
277impl OltpBackend {
278    /// Returns a human-readable name for the active backend (currently always `"Rusqlite"`).
279    ///
280    /// Useful for logging and diagnostic output.
281    pub fn backend_name(&self) -> &'static str {
282        match self {
283            Self::Rusqlite(_) => "Rusqlite",
284        }
285    }
286
287    /// Returns a reference to the inner [`RusqliteEngine`], if this is the `Rusqlite` variant.
288    ///
289    /// Returns `None` for any other variant.
290    ///
291    /// Use this only for rusqlite-specific operations (CDC trigger setup, obtaining raw
292    /// connections).  All trait-level OLTP operations should go through [`rhei_core::OltpEngine`].
293    pub fn as_rusqlite(&self) -> Option<&RusqliteEngine> {
294        match self {
295            Self::Rusqlite(e) => Some(e),
296        }
297    }
298}
299
300impl rhei_core::OltpEngine for OltpBackend {
301    type Error = OltpError;
302
303    /// Executes a parameterised read-only SQL query against the OLTP engine.
304    ///
305    /// Dispatches to the inner engine's `query` implementation and maps its error to
306    /// [`OltpError`].
307    async fn query(
308        &self,
309        sql: &str,
310        params: &[serde_json::Value],
311    ) -> Result<Vec<arrow::record_batch::RecordBatch>, Self::Error> {
312        match self {
313            Self::Rusqlite(e) => e.query(sql, params).await.map_err(OltpError::from),
314        }
315    }
316
317    /// Executes a parameterised write (DML/DDL) SQL statement against the OLTP engine.
318    ///
319    /// Dispatches to the inner engine's `execute` implementation and returns the number of
320    /// affected rows.
321    async fn execute(&self, sql: &str, params: &[serde_json::Value]) -> Result<u64, Self::Error> {
322        match self {
323            Self::Rusqlite(e) => e.execute(sql, params).await.map_err(OltpError::from),
324        }
325    }
326
327    /// Executes a batch of parameterised SQL statements as a single unit against the OLTP engine.
328    ///
329    /// Dispatches to the inner engine's `execute_batch` implementation.  The entire batch is
330    /// wrapped in a transaction; if any statement fails the transaction is rolled back.
331    async fn execute_batch(
332        &self,
333        statements: &[(String, Vec<serde_json::Value>)],
334    ) -> Result<(), Self::Error> {
335        match self {
336            Self::Rusqlite(e) => e.execute_batch(statements).await.map_err(OltpError::from),
337        }
338    }
339
340    /// Returns `true` if the named table exists in the OLTP engine.
341    ///
342    /// Dispatches to the inner engine's `table_exists` implementation.
343    async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
344        match self {
345            Self::Rusqlite(e) => e.table_exists(table_name).await.map_err(OltpError::from),
346        }
347    }
348}