rhei_olap/lib.rs
1//! Backend-agnostic OLAP and OLTP dispatcher for the Rhei HTAP engine.
2//!
3//! # Purpose
4//!
5//! This crate provides two enum dispatchers — [`OlapBackend`] and [`OltpBackend`] — that
6//! forward every [`rhei_core::OlapEngine`] and [`rhei_core::OltpEngine`] method call to the
7//! concrete engine selected at configuration time. The consuming crates (`rhei`, `rhei-sync`,
8//! `rhei-flight`) only ever see one type, so they are compiled once regardless of which backend
9//! combination is enabled.
10//!
11//! Using an enum (instead of `Box<dyn OlapEngine>`) avoids heap allocation and virtual-dispatch
12//! overhead in the hot sync path while still allowing the backend choice to be deferred until
13//! runtime configuration.
14//!
15//! # OLAP feature flags
16//!
17//! | Feature | Default | Description |
18//! |---------|---------|-------------|
19//! | `datafusion-backend` | **yes** | Enables the [`OlapBackend::DataFusion`] variant backed by [`rhei_datafusion::DataFusionEngine`] |
20//! | `duckdb-backend` | no | Enables the [`OlapBackend::DuckDb`] variant backed by [`rhei_duckdb::DuckDbEngine`] |
21//! | `full` | no | Both `datafusion-backend` and `duckdb-backend` simultaneously |
22//! | `cloud-storage` | no | Passes the `cloud-storage` flag through to `rhei-datafusion` (S3 / GCS Parquet backends) |
23//!
24//! At least one of `datafusion-backend` or `duckdb-backend` must be enabled; the crate will not
25//! compile if both are absent.
26//!
27//! # Dispatch pattern
28//!
29//! Every async method on [`OlapBackend`] is a thin `match` over the active variant that calls the
30//! same method on the inner engine and maps its error to [`OlapError`]. The same pattern applies
31//! to [`OltpBackend`] and [`OltpError`].
32//!
33//! # Re-exports
34//!
35//! For consumer convenience this crate re-exports the concrete engine types and their errors so
36//! that callers do not need to depend on `rhei-datafusion` or `rhei-duckdb` directly.
37
38pub mod error;
39
40pub use error::{OlapError, OltpError};
41
42// Re-export OLAP backend types for convenience
43#[cfg(feature = "datafusion-backend")]
44pub use rhei_datafusion::{DataFusionEngine, DfOlapError, SharedDataFusionEngine, StorageMode};
45#[cfg(feature = "duckdb-backend")]
46pub use rhei_duckdb::{DuckDbEngine, DuckDbError, SharedDuckDbEngine};
47
48// Re-export OLTP backend types
49pub use rhei_oltp_rusqlite::{RusqliteCdcProducer, RusqliteEngine, RusqliteOltpError};
50
51use arrow::datatypes::{DataType, SchemaRef};
52use arrow::record_batch::RecordBatch;
53
54/// Unified OLAP backend that wraps all supported engines.
55///
56/// This enum is the primary dispatch boundary for analytical queries in the Rhei HTAP engine.
57/// It implements [`rhei_core::OlapEngine`] by forwarding every method to the active inner engine,
58/// converting backend-specific errors to [`OlapError`].
59///
60/// The active variant is chosen at configuration time (see [`rhei_core`] config types) and
61/// remains fixed for the lifetime of the engine. Because the choice is encoded in the variant,
62/// the compiler can inline through the `match` arms and the common case produces no virtual
63/// dispatch overhead.
64///
65/// # Feature gates
66///
67/// Each variant is independently gated. Build with `--features datafusion-backend` (the default)
68/// for DataFusion, `--features duckdb-backend` for DuckDB, or `--features full` for both.
69#[derive(Clone)]
70pub enum OlapBackend {
71 /// DuckDB OLAP engine.
72 ///
73 /// Wraps a [`rhei_duckdb::SharedDuckDbEngine`] (an `Arc`-cloneable handle to a
74 /// [`rhei_duckdb::DuckDbEngine`]). All operations are dispatched via `spawn_blocking`
75 /// because the underlying DuckDB C++ library is not async-native.
76 ///
77 /// Available on crate feature `duckdb-backend` only.
78 #[cfg(feature = "duckdb-backend")]
79 DuckDb(SharedDuckDbEngine),
80
81 /// Apache DataFusion OLAP engine.
82 ///
83 /// Wraps a [`rhei_datafusion::SharedDataFusionEngine`] (an `Arc`-cloneable handle to a
84 /// [`rhei_datafusion::DataFusionEngine`]). DataFusion is natively async; operations run
85 /// directly on the Tokio runtime without `spawn_blocking`.
86 ///
87 /// Available on crate feature `datafusion-backend` only.
88 #[cfg(feature = "datafusion-backend")]
89 DataFusion(SharedDataFusionEngine),
90}
91
92impl OlapBackend {
93 /// Returns a human-readable name for the active backend (`"DuckDB"` or `"DataFusion"`).
94 ///
95 /// Useful for logging, metrics labels, and diagnostic output.
96 pub fn backend_name(&self) -> &'static str {
97 match self {
98 #[cfg(feature = "duckdb-backend")]
99 Self::DuckDb(_) => "DuckDB",
100 #[cfg(feature = "datafusion-backend")]
101 Self::DataFusion(_) => "DataFusion",
102 }
103 }
104}
105
106impl rhei_core::OlapEngine for OlapBackend {
107 type Error = OlapError;
108
109 /// Executes a read-only SQL query and collects all result rows into memory.
110 ///
111 /// Dispatches to the inner engine's `query` implementation and maps its error to
112 /// [`OlapError`].
113 async fn query(&self, sql: &str) -> Result<Vec<RecordBatch>, Self::Error> {
114 match self {
115 #[cfg(feature = "duckdb-backend")]
116 Self::DuckDb(e) => e.query(sql).await.map_err(OlapError::from),
117 #[cfg(feature = "datafusion-backend")]
118 Self::DataFusion(e) => e.query(sql).await.map_err(OlapError::from),
119 }
120 }
121
122 /// Executes a read-only SQL query and returns results as a streaming
123 /// `Pin<Box<dyn Stream<Item = Result<RecordBatch>>>>`.
124 ///
125 /// Dispatches to the inner engine's `query_stream` implementation. DataFusion streams
126 /// natively; DuckDB collects results first and then wraps them in a stream.
127 async fn query_stream(
128 &self,
129 sql: &str,
130 ) -> Result<rhei_core::RecordBatchBoxStream, Self::Error> {
131 match self {
132 #[cfg(feature = "duckdb-backend")]
133 Self::DuckDb(e) => e.query_stream(sql).await.map_err(OlapError::from),
134 #[cfg(feature = "datafusion-backend")]
135 Self::DataFusion(e) => e.query_stream(sql).await.map_err(OlapError::from),
136 }
137 }
138
139 /// Executes a write (DML/DDL) SQL statement and returns the number of affected rows.
140 ///
141 /// Dispatches to the inner engine's `execute` implementation.
142 async fn execute(&self, sql: &str) -> Result<u64, Self::Error> {
143 match self {
144 #[cfg(feature = "duckdb-backend")]
145 Self::DuckDb(e) => e.execute(sql).await.map_err(OlapError::from),
146 #[cfg(feature = "datafusion-backend")]
147 Self::DataFusion(e) => e.execute(sql).await.map_err(OlapError::from),
148 }
149 }
150
151 /// Bulk-loads Arrow [`RecordBatch`]es into an existing OLAP table.
152 ///
153 /// Dispatches to the inner engine's `load_arrow` implementation. DuckDB uses the native
154 /// `Appender` API for zero-copy ingestion; DataFusion merges the batches into its in-memory
155 /// or on-disk representation.
156 ///
157 /// Returns the total number of rows loaded.
158 async fn load_arrow(&self, table: &str, batches: &[RecordBatch]) -> Result<u64, Self::Error> {
159 match self {
160 #[cfg(feature = "duckdb-backend")]
161 Self::DuckDb(e) => e.load_arrow(table, batches).await.map_err(OlapError::from),
162 #[cfg(feature = "datafusion-backend")]
163 Self::DataFusion(e) => e.load_arrow(table, batches).await.map_err(OlapError::from),
164 }
165 }
166
167 /// Creates a new table in the OLAP backend with the given Arrow schema and primary key.
168 ///
169 /// Dispatches to the inner engine's `create_table` implementation.
170 async fn create_table(
171 &self,
172 table_name: &str,
173 schema: &SchemaRef,
174 primary_key: &[String],
175 ) -> Result<(), Self::Error> {
176 match self {
177 #[cfg(feature = "duckdb-backend")]
178 Self::DuckDb(e) => e
179 .create_table(table_name, schema, primary_key)
180 .await
181 .map_err(OlapError::from),
182 #[cfg(feature = "datafusion-backend")]
183 Self::DataFusion(e) => e
184 .create_table(table_name, schema, primary_key)
185 .await
186 .map_err(OlapError::from),
187 }
188 }
189
190 /// Returns `true` if the named table exists in the OLAP backend.
191 ///
192 /// Dispatches to the inner engine's `table_exists` implementation.
193 async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
194 match self {
195 #[cfg(feature = "duckdb-backend")]
196 Self::DuckDb(e) => e.table_exists(table_name).await.map_err(OlapError::from),
197 #[cfg(feature = "datafusion-backend")]
198 Self::DataFusion(e) => e.table_exists(table_name).await.map_err(OlapError::from),
199 }
200 }
201
202 /// Adds a new column with the given Arrow [`DataType`] to an existing OLAP table.
203 ///
204 /// Dispatches to the inner engine's `add_column` implementation.
205 async fn add_column(
206 &self,
207 table_name: &str,
208 column_name: &str,
209 data_type: &DataType,
210 ) -> Result<(), Self::Error> {
211 match self {
212 #[cfg(feature = "duckdb-backend")]
213 Self::DuckDb(e) => e
214 .add_column(table_name, column_name, data_type)
215 .await
216 .map_err(OlapError::from),
217 #[cfg(feature = "datafusion-backend")]
218 Self::DataFusion(e) => e
219 .add_column(table_name, column_name, data_type)
220 .await
221 .map_err(OlapError::from),
222 }
223 }
224
225 /// Drops an existing column from an OLAP table.
226 ///
227 /// Dispatches to the inner engine's `drop_column` implementation.
228 async fn drop_column(&self, table_name: &str, column_name: &str) -> Result<(), Self::Error> {
229 match self {
230 #[cfg(feature = "duckdb-backend")]
231 Self::DuckDb(e) => e
232 .drop_column(table_name, column_name)
233 .await
234 .map_err(OlapError::from),
235 #[cfg(feature = "datafusion-backend")]
236 Self::DataFusion(e) => e
237 .drop_column(table_name, column_name)
238 .await
239 .map_err(OlapError::from),
240 }
241 }
242
243 /// Returns `true` if the backend supports multi-statement transactions.
244 ///
245 /// DuckDB supports full ACID transactions (`BEGIN`/`COMMIT`/`ROLLBACK`), so this returns
246 /// `true` for the [`OlapBackend::DuckDb`] variant. DataFusion does not support
247 /// transactions, so this returns `false` for [`OlapBackend::DataFusion`].
248 ///
249 /// The sync engine uses this flag to decide whether to wrap CDC sync cycles in an explicit
250 /// `BEGIN..COMMIT` block.
251 fn supports_transactions(&self) -> bool {
252 match self {
253 #[cfg(feature = "duckdb-backend")]
254 Self::DuckDb(e) => e.supports_transactions(),
255 #[cfg(feature = "datafusion-backend")]
256 Self::DataFusion(e) => e.supports_transactions(),
257 }
258 }
259}
260
261/// Unified OLTP backend enum.
262///
263/// Mirrors [`OlapBackend`] for the transactional layer: the `HtapEngine` facade and the sync
264/// engine work against this single type regardless of which OLTP engine is actually running.
265///
266/// Currently only [`OltpBackend::Rusqlite`] is provided. Future variants (PostgreSQL, MySQL,
267/// etc.) can be added without changing any consumer code.
268pub enum OltpBackend {
269 /// SQLite OLTP engine backed by [`RusqliteEngine`].
270 ///
271 /// Uses WAL mode, a single write connection, and a round-robin read pool.
272 /// Trigger-based CDC is installed by [`RusqliteCdcProducer`] on each registered table.
273 Rusqlite(RusqliteEngine),
274 // future: Postgres(PostgresEngine), MySQL(MySqlEngine), etc.
275}
276
277impl OltpBackend {
278 /// Returns a human-readable name for the active backend (currently always `"Rusqlite"`).
279 ///
280 /// Useful for logging and diagnostic output.
281 pub fn backend_name(&self) -> &'static str {
282 match self {
283 Self::Rusqlite(_) => "Rusqlite",
284 }
285 }
286
287 /// Returns a reference to the inner [`RusqliteEngine`], if this is the `Rusqlite` variant.
288 ///
289 /// Returns `None` for any other variant.
290 ///
291 /// Use this only for rusqlite-specific operations (CDC trigger setup, obtaining raw
292 /// connections). All trait-level OLTP operations should go through [`rhei_core::OltpEngine`].
293 pub fn as_rusqlite(&self) -> Option<&RusqliteEngine> {
294 match self {
295 Self::Rusqlite(e) => Some(e),
296 }
297 }
298}
299
300impl rhei_core::OltpEngine for OltpBackend {
301 type Error = OltpError;
302
303 /// Executes a parameterised read-only SQL query against the OLTP engine.
304 ///
305 /// Dispatches to the inner engine's `query` implementation and maps its error to
306 /// [`OltpError`].
307 async fn query(
308 &self,
309 sql: &str,
310 params: &[serde_json::Value],
311 ) -> Result<Vec<arrow::record_batch::RecordBatch>, Self::Error> {
312 match self {
313 Self::Rusqlite(e) => e.query(sql, params).await.map_err(OltpError::from),
314 }
315 }
316
317 /// Executes a parameterised write (DML/DDL) SQL statement against the OLTP engine.
318 ///
319 /// Dispatches to the inner engine's `execute` implementation and returns the number of
320 /// affected rows.
321 async fn execute(&self, sql: &str, params: &[serde_json::Value]) -> Result<u64, Self::Error> {
322 match self {
323 Self::Rusqlite(e) => e.execute(sql, params).await.map_err(OltpError::from),
324 }
325 }
326
327 /// Executes a batch of parameterised SQL statements as a single unit against the OLTP engine.
328 ///
329 /// Dispatches to the inner engine's `execute_batch` implementation. The entire batch is
330 /// wrapped in a transaction; if any statement fails the transaction is rolled back.
331 async fn execute_batch(
332 &self,
333 statements: &[(String, Vec<serde_json::Value>)],
334 ) -> Result<(), Self::Error> {
335 match self {
336 Self::Rusqlite(e) => e.execute_batch(statements).await.map_err(OltpError::from),
337 }
338 }
339
340 /// Returns `true` if the named table exists in the OLTP engine.
341 ///
342 /// Dispatches to the inner engine's `table_exists` implementation.
343 async fn table_exists(&self, table_name: &str) -> Result<bool, Self::Error> {
344 match self {
345 Self::Rusqlite(e) => e.table_exists(table_name).await.map_err(OltpError::from),
346 }
347 }
348}