rhei_core/traits/olap.rs
1//! [`OlapEngine`] trait and [`RecordBatchBoxStream`] streaming type.
2//!
3//! Implementations include `rhei-duckdb` and `rhei-datafusion`. All data
4//! crosses the trait boundary as Arrow [`RecordBatch`]es for zero-copy interop
5//! with the rest of the Arrow ecosystem (Flight SQL, DataFusion, etc.).
6
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use arrow::datatypes::SchemaRef;
11use arrow::record_batch::RecordBatch;
12
13/// A boxed, pinned stream of `RecordBatch` results.
14///
15/// This is the backend-agnostic streaming type used by `query_stream()`.
16/// DataFusion produces this natively; DuckDB adapts via a channel-based bridge.
17pub type RecordBatchBoxStream = Pin<
18 Box<
19 dyn futures_core::Stream<
20 Item = Result<RecordBatch, Box<dyn std::error::Error + Send + Sync>>,
21 > + Send,
22 >,
23>;
24
25/// Wrap a `Vec<RecordBatch>` into a `RecordBatchBoxStream`.
26pub fn vec_to_stream(batches: Vec<RecordBatch>) -> RecordBatchBoxStream {
27 Box::pin(VecStream {
28 batches: batches.into_iter(),
29 })
30}
31
32/// Simple stream adapter over a `Vec<RecordBatch>` iterator.
33struct VecStream {
34 batches: std::vec::IntoIter<RecordBatch>,
35}
36
37impl futures_core::Stream for VecStream {
38 type Item = Result<RecordBatch, Box<dyn std::error::Error + Send + Sync>>;
39
40 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
41 Poll::Ready(self.batches.next().map(Ok))
42 }
43
44 fn size_hint(&self) -> (usize, Option<usize>) {
45 self.batches.size_hint()
46 }
47}
48
49/// Abstraction over an OLAP (analytical) query engine.
50///
51/// Implementations include `rhei-duckdb` and `rhei-datafusion`. All data
52/// flows through Arrow [`RecordBatch`]es for zero-copy interop.
53///
54/// # Contract for implementors
55///
56/// - `query` and `execute` must be callable concurrently from multiple async
57/// tasks (i.e., the engine must provide its own internal locking).
58/// - `load_arrow` should prefer a native bulk-insert path (e.g., DuckDB's
59/// `Appender` API) over generating SQL literals when possible.
60/// - `supports_transactions` must return the same value for the lifetime of
61/// the engine instance.
62pub trait OlapEngine: Send + Sync {
63 /// Engine-specific error type returned by all fallible methods.
64 type Error: std::error::Error + Send + Sync + 'static;
65
66 /// Execute an analytical SQL query and return results as Arrow
67 /// [`RecordBatch`]es buffered in memory.
68 ///
69 /// For streaming results without buffering the full result set, prefer
70 /// [`OlapEngine::query_stream`].
71 ///
72 /// # Errors
73 ///
74 /// Returns `Err(Self::Error)` on SQL parse failure, execution error, or
75 /// internal engine error.
76 fn query(
77 &self,
78 sql: &str,
79 ) -> impl std::future::Future<Output = Result<Vec<RecordBatch>, Self::Error>> + Send;
80
81 /// Execute a SQL query and return a lazy [`RecordBatchBoxStream`].
82 ///
83 /// Unlike [`OlapEngine::query`], this does not buffer the full result set.
84 /// Batches are produced as the engine generates them, which is important
85 /// for large result sets sent over Arrow Flight.
86 ///
87 /// The default implementation falls back to [`OlapEngine::query`] and
88 /// wraps the collected result in a stream. Backends that support native
89 /// streaming (e.g., DataFusion via `execute_stream()`) should override this.
90 ///
91 /// # Errors
92 ///
93 /// Returns `Err(Self::Error)` if query planning or execution fails before
94 /// the first batch is produced. Errors that occur mid-stream are surfaced
95 /// as `Err` items in the returned stream.
96 fn query_stream(
97 &self,
98 sql: &str,
99 ) -> impl std::future::Future<Output = Result<RecordBatchBoxStream, Self::Error>> + Send {
100 let sql = sql.to_string();
101 async move {
102 let batches = self.query(&sql).await?;
103 Ok(vec_to_stream(batches))
104 }
105 }
106
107 /// Execute a DDL or DML statement without returning rows.
108 ///
109 /// Returns the number of rows affected (0 for DDL).
110 ///
111 /// # Errors
112 ///
113 /// Returns `Err(Self::Error)` on SQL parse failure, constraint violation,
114 /// or internal engine error.
115 fn execute(
116 &self,
117 sql: &str,
118 ) -> impl std::future::Future<Output = Result<u64, Self::Error>> + Send;
119
120 /// Bulk-load Arrow [`RecordBatch`]es into `table`.
121 ///
122 /// Implementations should use a native zero-copy path when available (e.g.,
123 /// DuckDB's `Appender` API). Returns the total number of rows ingested.
124 ///
125 /// # Errors
126 ///
127 /// Returns `Err(Self::Error)` if the table does not exist, a batch's schema
128 /// does not match the table schema, or the engine rejects the data.
129 fn load_arrow(
130 &self,
131 table: &str,
132 batches: &[RecordBatch],
133 ) -> impl std::future::Future<Output = Result<u64, Self::Error>> + Send;
134
135 /// Create a table from an Arrow schema with an optional primary key.
136 ///
137 /// `primary_key` is a slice of column names. Backends that support PK
138 /// enforcement (e.g. DuckDB) emit a `PRIMARY KEY (col1, col2)` clause;
139 /// backends that do not (e.g. DataFusion) accept the parameter as
140 /// informational metadata only.
141 ///
142 /// # Errors
143 ///
144 /// Returns `Err(Self::Error)` if the table already exists or the schema
145 /// contains an unsupported data type.
146 fn create_table(
147 &self,
148 table_name: &str,
149 schema: &SchemaRef,
150 primary_key: &[String],
151 ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
152
153 /// Return `true` if `table_name` exists in the OLAP catalog.
154 ///
155 /// # Errors
156 ///
157 /// Returns `Err(Self::Error)` if the catalog query fails.
158 fn table_exists(
159 &self,
160 table_name: &str,
161 ) -> impl std::future::Future<Output = Result<bool, Self::Error>> + Send;
162
163 /// Add a nullable column to an existing table.
164 ///
165 /// # Errors
166 ///
167 /// Returns `Err(Self::Error)` if the table does not exist, the column
168 /// already exists, or the data type is unsupported.
169 fn add_column(
170 &self,
171 table_name: &str,
172 column_name: &str,
173 data_type: &arrow::datatypes::DataType,
174 ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
175
176 /// Remove a column from an existing table.
177 ///
178 /// # Errors
179 ///
180 /// Returns `Err(Self::Error)` if the table does not exist or the column
181 /// is not present.
182 fn drop_column(
183 &self,
184 table_name: &str,
185 column_name: &str,
186 ) -> impl std::future::Future<Output = Result<(), Self::Error>> + Send;
187
188 /// Returns `true` if this backend supports ACID transactions via
189 /// `BEGIN` / `COMMIT` / `ROLLBACK`.
190 ///
191 /// Backends that return `false` (e.g., DataFusion) treat these statements as
192 /// no-ops. The sync engine uses this signal to decide whether to wrap sync
193 /// cycles in transactions or to use idempotent per-statement semantics with
194 /// seq-based recovery.
195 ///
196 /// Default: `false` (conservative).
197 fn supports_transactions(&self) -> bool {
198 false
199 }
200}