rhei_flight/service.rs
1//! [`FlightSqlService`](arrow_flight::sql::server::FlightSqlService) implementation
2//! that delegates to [`rhei_olap::OlapBackend`].
3//!
4//! The service follows a **deferred-execution** pattern:
5//!
6//! 1. `get_flight_info_*` encodes the SQL statement into a
7//! [`Ticket`](arrow_flight::Ticket) and returns a
8//! [`FlightInfo`](arrow_flight::FlightInfo) immediately — no query planning.
9//! 2. `do_get_*` receives the ticket, decodes the SQL, calls
10//! [`rhei_core::OlapEngine::query_stream`] on the backend, and streams
11//! Arrow IPC record batches back to the client.
12//!
13//! Tickets are cheap (just opaque byte handles); deferring the actual query
14//! plan to `do_get` means the `get_flight_info` round-trip adds negligible
15//! latency and avoids holding resources while the client is not yet ready to
16//! consume results.
17//!
18//! All write operations (`do_put_*`) return `tonic::Status::unimplemented`
19//! because Rhei's OLAP layer is read-only — DML is handled by the OLTP engine
20//! via a separate path.
21
22use std::pin::Pin;
23
24use arrow::ipc::writer::IpcWriteOptions;
25use arrow_flight::encode::FlightDataEncoderBuilder;
26use arrow_flight::flight_service_server::FlightService;
27use arrow_flight::sql::server::FlightSqlService;
28use arrow_flight::sql::server::PeekableFlightDataStream;
29use arrow_flight::sql::{
30 ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
31 ActionCreatePreparedStatementResult, CommandPreparedStatementQuery, CommandStatementQuery,
32 CommandStatementUpdate, ProstMessageExt, TicketStatementQuery,
33};
34use arrow_flight::{
35 Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse,
36 Ticket,
37};
38use futures::stream;
39use futures::TryStreamExt;
40use prost::Message;
41use rhei_core::OlapEngine;
42use rhei_olap::OlapBackend;
43use tonic::{Request, Response, Status, Streaming};
44use tracing::{debug, warn};
45
46/// Arrow Flight SQL service backed by Rhei's OLAP engine.
47///
48/// This struct implements [`arrow_flight::sql::server::FlightSqlService`] and
49/// routes every analytical SQL query to the [`rhei_olap::OlapBackend`]
50/// (DataFusion or DuckDB) using true streaming via
51/// [`rhei_core::OlapEngine::query_stream`]. Record batches flow from the
52/// engine directly into the gRPC Arrow IPC stream without buffering the full
53/// result in memory.
54///
55/// # Deferred execution
56///
57/// The service uses a deferred-execution pattern:
58///
59/// - `get_flight_info_statement` encodes the SQL into an opaque
60/// [`Ticket`](arrow_flight::Ticket) and returns immediately — no query is
61/// planned yet.
62/// - `do_get_statement` receives the ticket, decodes the SQL, executes it
63/// against the OLAP backend, and streams the results.
64///
65/// This keeps `get_flight_info` latency minimal and avoids holding engine
66/// resources while the client is not yet ready to consume data.
67///
68/// # Read-only constraint
69///
70/// Write operations (`do_put_*`) return `tonic::Status::unimplemented`.
71/// DML (INSERT / UPDATE / DELETE) is handled by the OLTP engine through a
72/// separate path; there are no DML semantics through Arrow Flight SQL today.
73///
74/// # Compression
75///
76/// Defaults to [`CompressionType::Zstd`]. Override at construction time with
77/// [`RheiFlightSqlService::with_compression`].
78///
79/// # Authentication
80///
81/// When `auth_token` is `Some`, every RPC must carry
82/// `authorization: Bearer {token}` in the gRPC metadata; requests without a
83/// valid token are rejected with `tonic::Status::unauthenticated`. When
84/// `auth_token` is `None` (the default), all requests are allowed.
85pub struct RheiFlightSqlService {
86 olap: OlapBackend,
87 compression: CompressionType,
88 auth_token: Option<String>,
89}
90
91/// Compression algorithm applied to Arrow IPC record batches sent over gRPC.
92///
93/// Compression is negotiated per-stream: the server applies the chosen codec
94/// when encoding IPC buffers with `FlightDataEncoderBuilder`.
95///
96/// # Choosing a codec
97///
98/// - Use [`Zstd`](CompressionType::Zstd) for most deployments — it offers the
99/// best size/CPU tradeoff for typical columnar workloads and is the default.
100/// - Use [`Lz4`](CompressionType::Lz4) when the client is CPU-constrained and
101/// decompression speed matters more than wire size.
102/// - Use [`None`](CompressionType::None) on loopback / LAN links where network
103/// bandwidth is not a bottleneck and you want zero codec overhead.
104#[derive(Debug, Clone, Copy)]
105pub enum CompressionType {
106 /// No compression. Zero CPU cost; highest wire size.
107 ///
108 /// Suitable for loopback or high-bandwidth LAN deployments where encoding
109 /// overhead outweighs any bandwidth saving.
110 None,
111 /// Zstandard compression (the default).
112 ///
113 /// Best size/CPU tradeoff for typical columnar Arrow workloads. Zstd
114 /// achieves higher compression ratios than LZ4 at comparable or lower
115 /// decompression cost, making it the right default for most deployments.
116 Zstd,
117 /// LZ4 (frame format) compression.
118 ///
119 /// Faster decompression than Zstd at the cost of a larger wire footprint.
120 /// Prefer this when the client is CPU-constrained and decompression speed
121 /// is more important than reducing bytes on the wire.
122 Lz4,
123}
124
125impl RheiFlightSqlService {
126 /// Create a new service backed by `olap` with [`CompressionType::Zstd`] (the default).
127 ///
128 /// Authentication is disabled: all requests are accepted without a bearer
129 /// token. Call [`RheiFlightSqlService::with_auth_token`] on the returned
130 /// value to enable token-based authentication.
131 pub fn new(olap: OlapBackend) -> Self {
132 Self {
133 olap,
134 compression: CompressionType::Zstd,
135 auth_token: None,
136 }
137 }
138
139 /// Create a new service backed by `olap` with the given [`CompressionType`].
140 ///
141 /// Authentication is disabled by default. Chain
142 /// [`RheiFlightSqlService::with_auth_token`] to require bearer-token auth.
143 pub fn with_compression(olap: OlapBackend, compression: CompressionType) -> Self {
144 Self {
145 olap,
146 compression,
147 auth_token: None,
148 }
149 }
150
151 /// Require clients to present `Authorization: Bearer {token}` on every RPC.
152 ///
153 /// Returns a new service instance with the token configured. When the
154 /// token is `None` all requests are allowed (default, no-auth mode).
155 ///
156 /// Empty strings are silently ignored (treated as "no auth configured")
157 /// to handle misconfigured `RHEI_FLIGHT_AUTH_TOKEN=""` env vars safely —
158 /// otherwise an empty-string expected token would match every request
159 /// that presented no `authorization` header, effectively disabling auth
160 /// while giving operators a false sense of security.
161 pub fn with_auth_token(mut self, token: impl Into<String>) -> Self {
162 let token = token.into();
163 if token.is_empty() {
164 tracing::warn!(
165 "FlightSQL auth token is empty; ignoring configuration and running without auth"
166 );
167 self.auth_token = None;
168 } else {
169 self.auth_token = Some(token);
170 }
171 self
172 }
173
174 // Verify the `authorization` metadata header on an incoming request.
175 // No-op when `auth_token` is None; otherwise expects `Bearer {token}`.
176 fn check_auth<T>(&self, request: &Request<T>) -> Result<(), Status> {
177 let Some(expected) = &self.auth_token else {
178 return Ok(());
179 };
180 // Defense in depth: reject an empty configured token so a misrouted
181 // empty string never authorizes requests with no/empty bearer tokens.
182 if expected.is_empty() {
183 return Err(Status::unauthenticated(
184 "server auth misconfigured (empty token)",
185 ));
186 }
187 let got = request
188 .metadata()
189 .get("authorization")
190 .and_then(|v| v.to_str().ok())
191 .unwrap_or("");
192 let token = got.strip_prefix("Bearer ").unwrap_or("");
193 if token.is_empty() || token != expected {
194 return Err(Status::unauthenticated("invalid or missing bearer token"));
195 }
196 Ok(())
197 }
198
199 /// Build IPC write options with the configured compression.
200 fn ipc_options(&self) -> IpcWriteOptions {
201 let options = IpcWriteOptions::default();
202 match self.compression {
203 CompressionType::None => options,
204 CompressionType::Zstd => options
205 .try_with_compression(Some(arrow::ipc::CompressionType::ZSTD))
206 .unwrap_or_default(),
207 CompressionType::Lz4 => options
208 .try_with_compression(Some(arrow::ipc::CompressionType::LZ4_FRAME))
209 .unwrap_or_default(),
210 }
211 }
212
213 /// Execute a streaming query and return a Flight data stream.
214 ///
215 /// Uses `query_stream()` for true streaming (DataFusion) or the default
216 /// fallback (DuckDB: collect then stream).
217 async fn execute_streaming(
218 &self,
219 sql: &str,
220 ) -> Result<
221 Pin<Box<dyn futures::Stream<Item = Result<arrow_flight::FlightData, Status>> + Send>>,
222 Status,
223 > {
224 let batch_stream = self.olap.query_stream(sql).await.map_err(|e| {
225 warn!(error = %e, sql, "OLAP query failed");
226 Status::internal(format!("query error: {e}"))
227 })?;
228
229 // Map Box<dyn Error> to FlightError for the encoder
230 let mapped = batch_stream.map_err(arrow_flight::error::FlightError::ExternalError);
231
232 let flight_stream = FlightDataEncoderBuilder::new()
233 .with_options(self.ipc_options())
234 .build(mapped)
235 .map_err(|e| Status::internal(e.to_string()));
236
237 Ok(Box::pin(flight_stream))
238 }
239}
240
241#[tonic::async_trait]
242impl FlightSqlService for RheiFlightSqlService {
243 type FlightService = RheiFlightSqlService;
244
245 // -----------------------------------------------------------------------
246 // Handshake — validates bearer token when auth is configured
247 // -----------------------------------------------------------------------
248
249 async fn do_handshake(
250 &self,
251 request: Request<Streaming<HandshakeRequest>>,
252 ) -> Result<
253 Response<Pin<Box<dyn futures::Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
254 Status,
255 > {
256 self.check_auth(&request)?;
257
258 // Echo the bearer token back in the response payload so that
259 // FlightSqlServiceClient.handshake() automatically picks it up via
260 // the `authorization` header it inspects in the response metadata.
261 // For callers that already set the token directly this is a no-op.
262 let mut response = Response::new(Box::pin(stream::once(async {
263 Ok(HandshakeResponse {
264 protocol_version: 0,
265 payload: bytes::Bytes::new(),
266 })
267 }))
268 as Pin<Box<dyn futures::Stream<Item = Result<HandshakeResponse, Status>> + Send>>);
269
270 if let Some(token) = &self.auth_token {
271 if let Ok(val) = format!("Bearer {token}").parse() {
272 response.metadata_mut().insert("authorization", val);
273 }
274 }
275
276 Ok(response)
277 }
278
279 // -----------------------------------------------------------------------
280 // Statement queries — streaming execution
281 // -----------------------------------------------------------------------
282
283 async fn get_flight_info_statement(
284 &self,
285 query: CommandStatementQuery,
286 request: Request<FlightDescriptor>,
287 ) -> Result<Response<FlightInfo>, Status> {
288 self.check_auth(&request)?;
289 let sql = &query.query;
290 debug!(sql, "get_flight_info_statement");
291
292 // Defer execution to do_get_statement.
293 let ticket = TicketStatementQuery {
294 statement_handle: sql.as_bytes().to_vec().into(),
295 };
296 let any = ticket.as_any();
297 let ticket_bytes = any.encode_to_vec();
298
299 let endpoint = FlightEndpoint::new().with_ticket(Ticket::new(ticket_bytes));
300 let info = FlightInfo::new().with_endpoint(endpoint);
301
302 Ok(Response::new(info))
303 }
304
305 async fn do_get_statement(
306 &self,
307 ticket: TicketStatementQuery,
308 request: Request<Ticket>,
309 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
310 self.check_auth(&request)?;
311 let sql = String::from_utf8(ticket.statement_handle.to_vec())
312 .map_err(|_| Status::internal("invalid statement handle"))?;
313
314 debug!(sql, "do_get_statement (streaming)");
315 let stream = self.execute_streaming(&sql).await?;
316 Ok(Response::new(stream))
317 }
318
319 // -----------------------------------------------------------------------
320 // Prepared statements
321 // -----------------------------------------------------------------------
322
323 async fn get_flight_info_prepared_statement(
324 &self,
325 cmd: CommandPreparedStatementQuery,
326 request: Request<FlightDescriptor>,
327 ) -> Result<Response<FlightInfo>, Status> {
328 self.check_auth(&request)?;
329 debug!("get_flight_info_prepared_statement");
330
331 let any = cmd.as_any();
332 let ticket_bytes = any.encode_to_vec();
333 let endpoint = FlightEndpoint::new().with_ticket(Ticket::new(ticket_bytes));
334 let info = FlightInfo::new().with_endpoint(endpoint);
335
336 Ok(Response::new(info))
337 }
338
339 async fn do_action_create_prepared_statement(
340 &self,
341 query: ActionCreatePreparedStatementRequest,
342 request: Request<Action>,
343 ) -> Result<ActionCreatePreparedStatementResult, Status> {
344 self.check_auth(&request)?;
345 debug!(sql = query.query, "create_prepared_statement");
346
347 Ok(ActionCreatePreparedStatementResult {
348 prepared_statement_handle: query.query.into_bytes().into(),
349 dataset_schema: bytes::Bytes::new(),
350 parameter_schema: bytes::Bytes::new(),
351 })
352 }
353
354 async fn do_action_close_prepared_statement(
355 &self,
356 _query: ActionClosePreparedStatementRequest,
357 request: Request<Action>,
358 ) -> Result<(), Status> {
359 self.check_auth(&request)?;
360 Ok(())
361 }
362
363 async fn do_get_prepared_statement(
364 &self,
365 query: CommandPreparedStatementQuery,
366 request: Request<Ticket>,
367 ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
368 self.check_auth(&request)?;
369 let sql = String::from_utf8(query.prepared_statement_handle.to_vec())
370 .map_err(|_| Status::internal("invalid prepared statement handle"))?;
371
372 debug!(sql, "do_get_prepared_statement (streaming)");
373 let stream = self.execute_streaming(&sql).await?;
374 Ok(Response::new(stream))
375 }
376
377 // -----------------------------------------------------------------------
378 // Write operations — read-only, return Unimplemented
379 // -----------------------------------------------------------------------
380
381 async fn do_put_statement_update(
382 &self,
383 _ticket: CommandStatementUpdate,
384 request: Request<PeekableFlightDataStream>,
385 ) -> Result<i64, Status> {
386 self.check_auth(&request)?;
387 Err(Status::unimplemented(
388 "write operations not supported — OLAP is read-only",
389 ))
390 }
391
392 async fn register_sql_info(&self, _id: i32, _result: &arrow_flight::sql::SqlInfo) {}
393}