Skip to main content

rhei_core/traits/
olap.rs

1//! [`OlapEngine`] trait and [`RecordBatchBoxStream`] streaming type.
2//!
3//! Implementations include `rhei-duckdb` and `rhei-datafusion`. All data
4//! crosses the trait boundary as Arrow [`RecordBatch`]es for zero-copy interop
5//! with the rest of the Arrow ecosystem (Flight SQL, DataFusion, etc.).
6
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use arrow::datatypes::SchemaRef;
11use arrow::record_batch::RecordBatch;
12
13/// A boxed, pinned stream of `RecordBatch` results.
14///
15/// This is the backend-agnostic streaming type used by `query_stream()`.
16/// DataFusion produces this natively; DuckDB adapts via a channel-based bridge.
17pub type RecordBatchBoxStream = Pin<
18    Box<
19        dyn futures_core::Stream<
20                Item = Result<RecordBatch, Box<dyn std::error::Error + Send + Sync>>,
21            > + Send,
22    >,
23>;
24
25/// Wrap a `Vec<RecordBatch>` into a `RecordBatchBoxStream`.
26pub fn vec_to_stream(batches: Vec<RecordBatch>) -> RecordBatchBoxStream {
27    Box::pin(VecStream {
28        batches: batches.into_iter(),
29    })
30}
31
32/// Simple stream adapter over a `Vec<RecordBatch>` iterator.
33struct VecStream {
34    batches: std::vec::IntoIter<RecordBatch>,
35}
36
37impl futures_core::Stream for VecStream {
38    type Item = Result<RecordBatch, Box<dyn std::error::Error + Send + Sync>>;
39
40    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41        Poll::Ready(self.batches.next().map(Ok))
42    }
43
44    fn size_hint(&self) -> (usize, Option<usize>) {
45        self.batches.size_hint()
46    }
47}
48
49/// Abstraction over an OLAP (analytical) query engine.
50///
51/// Implementations include `rhei-duckdb` and `rhei-datafusion`. All data
52/// flows through Arrow [`RecordBatch`]es for zero-copy interop.
53///
54/// # Contract for implementors
55///
56/// - `query` and `execute` must be callable concurrently from multiple async
57///   tasks (i.e., the engine must provide its own internal locking).
58/// - `load_arrow` should prefer a native bulk-insert path (e.g., DuckDB's
59///   `Appender` API) over generating SQL literals when possible.
60/// - `supports_transactions` must return the same value for the lifetime of
61///   the engine instance.
62pub trait OlapEngine: Send + Sync {
63    /// Engine-specific error type returned by all fallible methods.
64    type Error: std::error::Error + Send + Sync + 'static;
65
66    /// Execute an analytical SQL query and return results as Arrow
67    /// [`RecordBatch`]es buffered in memory.
68    ///
69    /// For streaming results without buffering the full result set, prefer
70    /// [`OlapEngine::query_stream`].
71    ///
72    /// # Errors
73    ///
74    /// Returns `Err(Self::Error)` on SQL parse failure, execution error, or
75    /// internal engine error.
76    fn query(
77        &self,
78        sql: &str,
79    ) -> impl std::future::Future<Output = Result<Vec<RecordBatch>, Self::Error>> + Send;
80
81    /// Execute a SQL query and return a lazy [`RecordBatchBoxStream`].
82    ///
83    /// Unlike [`OlapEngine::query`], this does not buffer the full result set.
84    /// Batches are produced as the engine generates them, which is important
85    /// for large result sets sent over Arrow Flight.
86    ///
87    /// The default implementation falls back to [`OlapEngine::query`] and
88    /// wraps the collected result in a stream. Backends that support native
89    /// streaming (e.g., DataFusion via `execute_stream()`) should override this.
90    ///
91    /// # Errors
92    ///
93    /// Returns `Err(Self::Error)` if query planning or execution fails before
94    /// the first batch is produced. Errors that occur mid-stream are surfaced
95    /// as `Err` items in the returned stream.
96    fn query_stream(
97        &self,
98        sql: &str,
99    ) -> impl std::future::Future<Output = Result<RecordBatchBoxStream, Self::Error>> + Send {
100        let sql = sql.to_string();
101        async move {
102            let batches = self.query(&sql).await?;
103            Ok(vec_to_stream(batches))
104        }
105    }
106
107    /// Execute a DDL or DML statement without returning rows.
108    ///
109    /// Returns the number of rows affected (0 for DDL).
110    ///
111    /// # Errors
112    ///
113    /// Returns `Err(Self::Error)` on SQL parse failure, constraint violation,
114    /// or internal engine error.
115    fn execute(
116        &self,
117        sql: &str,
118    ) -> impl std::future::Future<Output = Result<u64, Self::Error>> + Send;
119
120    /// Bulk-load Arrow [`RecordBatch`]es into `table`.
121    ///
122    /// Implementations should use a native zero-copy path when available (e.g.,
123    /// DuckDB's `Appender` API). Returns the total number of rows ingested.
124    ///
125    /// # Errors
126    ///
127    /// Returns `Err(Self::Error)` if the table does not exist, a batch's schema
128    /// does not match the table schema, or the engine rejects the data.
129    fn load_arrow(
130        &self,
131        table: &str,
132        batches: &[RecordBatch],
133    ) -> impl std::future::Future<Output = Result<u64, Self::Error>> + Send;
134
135    /// Create a table from an Arrow schema with an optional primary key.
136    ///
137    /// `primary_key` is a slice of column names. Backends that support PK
138    /// enforcement (e.g. DuckDB) emit a `PRIMARY KEY (col1, col2)` clause;
139    /// backends that do not (e.g. DataFusion) accept the parameter as
140    /// informational metadata only.
141    ///
142    /// # Errors
143    ///
144    /// Returns `Err(Self::Error)` if the table already exists or the schema
145    /// contains an unsupported data type.
146    fn create_table(
147        &self,
148        table_name: &str,
149        schema: &SchemaRef,
150        primary_key: &[String],
151    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
152
153    /// Return `true` if `table_name` exists in the OLAP catalog.
154    ///
155    /// # Errors
156    ///
157    /// Returns `Err(Self::Error)` if the catalog query fails.
158    fn table_exists(
159        &self,
160        table_name: &str,
161    ) -> impl std::future::Future<Output = Result<bool, Self::Error>> + Send;
162
163    /// Add a nullable column to an existing table.
164    ///
165    /// # Errors
166    ///
167    /// Returns `Err(Self::Error)` if the table does not exist, the column
168    /// already exists, or the data type is unsupported.
169    fn add_column(
170        &self,
171        table_name: &str,
172        column_name: &str,
173        data_type: &arrow::datatypes::DataType,
174    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
175
176    /// Remove a column from an existing table.
177    ///
178    /// # Errors
179    ///
180    /// Returns `Err(Self::Error)` if the table does not exist or the column
181    /// is not present.
182    fn drop_column(
183        &self,
184        table_name: &str,
185        column_name: &str,
186    ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
187
188    /// Returns `true` if this backend supports ACID transactions via
189    /// `BEGIN` / `COMMIT` / `ROLLBACK`.
190    ///
191    /// Backends that return `false` (e.g., DataFusion) treat these statements as
192    /// no-ops. The sync engine uses this signal to decide whether to wrap sync
193    /// cycles in transactions or to use idempotent per-statement semantics with
194    /// seq-based recovery.
195    ///
196    /// Default: `false` (conservative).
197    fn supports_transactions(&self) -> bool {
198        false
199    }
200}