Skip to main content

rhei_flight/
service.rs

1//! [`FlightSqlService`](arrow_flight::sql::server::FlightSqlService) implementation
2//! that delegates to [`rhei_olap::OlapBackend`].
3//!
4//! The service follows a **deferred-execution** pattern:
5//!
6//! 1. `get_flight_info_*` encodes the SQL statement into a
7//!    [`Ticket`](arrow_flight::Ticket) and returns a
8//!    [`FlightInfo`](arrow_flight::FlightInfo) immediately — no query planning.
9//! 2. `do_get_*` receives the ticket, decodes the SQL, calls
10//!    [`rhei_core::OlapEngine::query_stream`] on the backend, and streams
11//!    Arrow IPC record batches back to the client.
12//!
13//! Tickets are cheap (just opaque byte handles); deferring the actual query
14//! plan to `do_get` means the `get_flight_info` round-trip adds negligible
15//! latency and avoids holding resources while the client is not yet ready to
16//! consume results.
17//!
18//! All write operations (`do_put_*`) return `tonic::Status::unimplemented`
19//! because Rhei's OLAP layer is read-only — DML is handled by the OLTP engine
20//! via a separate path.
21
22use std::pin::Pin;
23
24use arrow::ipc::writer::IpcWriteOptions;
25use arrow_flight::encode::FlightDataEncoderBuilder;
26use arrow_flight::flight_service_server::FlightService;
27use arrow_flight::sql::server::FlightSqlService;
28use arrow_flight::sql::server::PeekableFlightDataStream;
29use arrow_flight::sql::{
30    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
31    ActionCreatePreparedStatementResult, CommandPreparedStatementQuery, CommandStatementQuery,
32    CommandStatementUpdate, ProstMessageExt, TicketStatementQuery,
33};
34use arrow_flight::{
35    Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse,
36    Ticket,
37};
38use futures::stream;
39use futures::TryStreamExt;
40use prost::Message;
41use rhei_core::OlapEngine;
42use rhei_olap::OlapBackend;
43use tonic::{Request, Response, Status, Streaming};
44use tracing::{debug, warn};
45
46/// Arrow Flight SQL service backed by Rhei's OLAP engine.
47///
48/// This struct implements [`arrow_flight::sql::server::FlightSqlService`] and
49/// routes every analytical SQL query to the [`rhei_olap::OlapBackend`]
50/// (DataFusion or DuckDB) using true streaming via
51/// [`rhei_core::OlapEngine::query_stream`].  Record batches flow from the
52/// engine directly into the gRPC Arrow IPC stream without buffering the full
53/// result in memory.
54///
55/// # Deferred execution
56///
57/// The service uses a deferred-execution pattern:
58///
59/// - `get_flight_info_statement` encodes the SQL into an opaque
60///   [`Ticket`](arrow_flight::Ticket) and returns immediately — no query is
61///   planned yet.
62/// - `do_get_statement` receives the ticket, decodes the SQL, executes it
63///   against the OLAP backend, and streams the results.
64///
65/// This keeps `get_flight_info` latency minimal and avoids holding engine
66/// resources while the client is not yet ready to consume data.
67///
68/// # Read-only constraint
69///
70/// Write operations (`do_put_*`) return `tonic::Status::unimplemented`.
71/// DML (INSERT / UPDATE / DELETE) is handled by the OLTP engine through a
72/// separate path; there are no DML semantics through Arrow Flight SQL today.
73///
74/// # Compression
75///
76/// Defaults to [`CompressionType::Zstd`].  Override at construction time with
77/// [`RheiFlightSqlService::with_compression`].
78///
79/// # Authentication
80///
81/// When `auth_token` is `Some`, every RPC must carry
82/// `authorization: Bearer {token}` in the gRPC metadata; requests without a
83/// valid token are rejected with `tonic::Status::unauthenticated`.  When
84/// `auth_token` is `None` (the default), all requests are allowed.
85pub struct RheiFlightSqlService {
86    olap: OlapBackend,
87    compression: CompressionType,
88    auth_token: Option<String>,
89}
90
91/// Compression algorithm applied to Arrow IPC record batches sent over gRPC.
92///
93/// Compression is negotiated per-stream: the server applies the chosen codec
94/// when encoding IPC buffers with `FlightDataEncoderBuilder`.
95///
96/// # Choosing a codec
97///
98/// - Use [`Zstd`](CompressionType::Zstd) for most deployments — it offers the
99///   best size/CPU tradeoff for typical columnar workloads and is the default.
100/// - Use [`Lz4`](CompressionType::Lz4) when the client is CPU-constrained and
101///   decompression speed matters more than wire size.
102/// - Use [`None`](CompressionType::None) on loopback / LAN links where network
103///   bandwidth is not a bottleneck and you want zero codec overhead.
104#[derive(Debug, Clone, Copy)]
105pub enum CompressionType {
106    /// No compression.  Zero CPU cost; highest wire size.
107    ///
108    /// Suitable for loopback or high-bandwidth LAN deployments where encoding
109    /// overhead outweighs any bandwidth saving.
110    None,
111    /// Zstandard compression (the default).
112    ///
113    /// Best size/CPU tradeoff for typical columnar Arrow workloads.  Zstd
114    /// achieves higher compression ratios than LZ4 at comparable or lower
115    /// decompression cost, making it the right default for most deployments.
116    Zstd,
117    /// LZ4 (frame format) compression.
118    ///
119    /// Faster decompression than Zstd at the cost of a larger wire footprint.
120    /// Prefer this when the client is CPU-constrained and decompression speed
121    /// is more important than reducing bytes on the wire.
122    Lz4,
123}
124
125impl RheiFlightSqlService {
126    /// Create a new service backed by `olap` with [`CompressionType::Zstd`] (the default).
127    ///
128    /// Authentication is disabled: all requests are accepted without a bearer
129    /// token.  Call [`RheiFlightSqlService::with_auth_token`] on the returned
130    /// value to enable token-based authentication.
131    pub fn new(olap: OlapBackend) -> Self {
132        Self {
133            olap,
134            compression: CompressionType::Zstd,
135            auth_token: None,
136        }
137    }
138
139    /// Create a new service backed by `olap` with the given [`CompressionType`].
140    ///
141    /// Authentication is disabled by default.  Chain
142    /// [`RheiFlightSqlService::with_auth_token`] to require bearer-token auth.
143    pub fn with_compression(olap: OlapBackend, compression: CompressionType) -> Self {
144        Self {
145            olap,
146            compression,
147            auth_token: None,
148        }
149    }
150
151    /// Require clients to present `Authorization: Bearer {token}` on every RPC.
152    ///
153    /// Returns a new service instance with the token configured. When the
154    /// token is `None` all requests are allowed (default, no-auth mode).
155    ///
156    /// Empty strings are silently ignored (treated as "no auth configured")
157    /// to handle misconfigured `RHEI_FLIGHT_AUTH_TOKEN=""` env vars safely —
158    /// otherwise an empty-string expected token would match every request
159    /// that presented no `authorization` header, effectively disabling auth
160    /// while giving operators a false sense of security.
161    pub fn with_auth_token(mut self, token: impl Into<String>) -> Self {
162        let token = token.into();
163        if token.is_empty() {
164            tracing::warn!(
165                "FlightSQL auth token is empty; ignoring configuration and running without auth"
166            );
167            self.auth_token = None;
168        } else {
169            self.auth_token = Some(token);
170        }
171        self
172    }
173
174    // Verify the `authorization` metadata header on an incoming request.
175    // No-op when `auth_token` is None; otherwise expects `Bearer {token}`.
176    fn check_auth<T>(&self, request: &Request<T>) -> Result<(), Status> {
177        let Some(expected) = &self.auth_token else {
178            return Ok(());
179        };
180        // Defense in depth: reject an empty configured token so a misrouted
181        // empty string never authorizes requests with no/empty bearer tokens.
182        if expected.is_empty() {
183            return Err(Status::unauthenticated(
184                "server auth misconfigured (empty token)",
185            ));
186        }
187        let got = request
188            .metadata()
189            .get("authorization")
190            .and_then(|v| v.to_str().ok())
191            .unwrap_or("");
192        let token = got.strip_prefix("Bearer ").unwrap_or("");
193        if token.is_empty() || token != expected {
194            return Err(Status::unauthenticated("invalid or missing bearer token"));
195        }
196        Ok(())
197    }
198
199    /// Build IPC write options with the configured compression.
200    fn ipc_options(&self) -> IpcWriteOptions {
201        let options = IpcWriteOptions::default();
202        match self.compression {
203            CompressionType::None => options,
204            CompressionType::Zstd => options
205                .try_with_compression(Some(arrow::ipc::CompressionType::ZSTD))
206                .unwrap_or_default(),
207            CompressionType::Lz4 => options
208                .try_with_compression(Some(arrow::ipc::CompressionType::LZ4_FRAME))
209                .unwrap_or_default(),
210        }
211    }
212
213    /// Execute a streaming query and return a Flight data stream.
214    ///
215    /// Uses `query_stream()` for true streaming (DataFusion) or the default
216    /// fallback (DuckDB: collect then stream).
217    async fn execute_streaming(
218        &self,
219        sql: &str,
220    ) -> Result<
221        Pin<Box<dyn futures::Stream<Item = Result<arrow_flight::FlightData, Status>> + Send>>,
222        Status,
223    > {
224        let batch_stream = self.olap.query_stream(sql).await.map_err(|e| {
225            warn!(error = %e, sql, "OLAP query failed");
226            Status::internal(format!("query error: {e}"))
227        })?;
228
229        // Map Box<dyn Error> to FlightError for the encoder
230        let mapped = batch_stream.map_err(arrow_flight::error::FlightError::ExternalError);
231
232        let flight_stream = FlightDataEncoderBuilder::new()
233            .with_options(self.ipc_options())
234            .build(mapped)
235            .map_err(|e| Status::internal(e.to_string()));
236
237        Ok(Box::pin(flight_stream))
238    }
239}
240
241#[tonic::async_trait]
242impl FlightSqlService for RheiFlightSqlService {
243    type FlightService = RheiFlightSqlService;
244
245    // -----------------------------------------------------------------------
246    // Handshake — validates bearer token when auth is configured
247    // -----------------------------------------------------------------------
248
249    async fn do_handshake(
250        &self,
251        request: Request<Streaming<HandshakeRequest>>,
252    ) -> Result<
253        Response<Pin<Box<dyn futures::Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
254        Status,
255    > {
256        self.check_auth(&request)?;
257
258        // Echo the bearer token back in the response payload so that
259        // FlightSqlServiceClient.handshake() automatically picks it up via
260        // the `authorization` header it inspects in the response metadata.
261        // For callers that already set the token directly this is a no-op.
262        let mut response = Response::new(Box::pin(stream::once(async {
263            Ok(HandshakeResponse {
264                protocol_version: 0,
265                payload: bytes::Bytes::new(),
266            })
267        }))
268            as Pin<Box<dyn futures::Stream<Item = Result<HandshakeResponse, Status>> + Send>>);
269
270        if let Some(token) = &self.auth_token {
271            if let Ok(val) = format!("Bearer {token}").parse() {
272                response.metadata_mut().insert("authorization", val);
273            }
274        }
275
276        Ok(response)
277    }
278
279    // -----------------------------------------------------------------------
280    // Statement queries — streaming execution
281    // -----------------------------------------------------------------------
282
283    async fn get_flight_info_statement(
284        &self,
285        query: CommandStatementQuery,
286        request: Request<FlightDescriptor>,
287    ) -> Result<Response<FlightInfo>, Status> {
288        self.check_auth(&request)?;
289        let sql = &query.query;
290        debug!(sql, "get_flight_info_statement");
291
292        // Defer execution to do_get_statement.
293        let ticket = TicketStatementQuery {
294            statement_handle: sql.as_bytes().to_vec().into(),
295        };
296        let any = ticket.as_any();
297        let ticket_bytes = any.encode_to_vec();
298
299        let endpoint = FlightEndpoint::new().with_ticket(Ticket::new(ticket_bytes));
300        let info = FlightInfo::new().with_endpoint(endpoint);
301
302        Ok(Response::new(info))
303    }
304
305    async fn do_get_statement(
306        &self,
307        ticket: TicketStatementQuery,
308        request: Request<Ticket>,
309    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
310        self.check_auth(&request)?;
311        let sql = String::from_utf8(ticket.statement_handle.to_vec())
312            .map_err(|_| Status::internal("invalid statement handle"))?;
313
314        debug!(sql, "do_get_statement (streaming)");
315        let stream = self.execute_streaming(&sql).await?;
316        Ok(Response::new(stream))
317    }
318
319    // -----------------------------------------------------------------------
320    // Prepared statements
321    // -----------------------------------------------------------------------
322
323    async fn get_flight_info_prepared_statement(
324        &self,
325        cmd: CommandPreparedStatementQuery,
326        request: Request<FlightDescriptor>,
327    ) -> Result<Response<FlightInfo>, Status> {
328        self.check_auth(&request)?;
329        debug!("get_flight_info_prepared_statement");
330
331        let any = cmd.as_any();
332        let ticket_bytes = any.encode_to_vec();
333        let endpoint = FlightEndpoint::new().with_ticket(Ticket::new(ticket_bytes));
334        let info = FlightInfo::new().with_endpoint(endpoint);
335
336        Ok(Response::new(info))
337    }
338
339    async fn do_action_create_prepared_statement(
340        &self,
341        query: ActionCreatePreparedStatementRequest,
342        request: Request<Action>,
343    ) -> Result<ActionCreatePreparedStatementResult, Status> {
344        self.check_auth(&request)?;
345        debug!(sql = query.query, "create_prepared_statement");
346
347        Ok(ActionCreatePreparedStatementResult {
348            prepared_statement_handle: query.query.into_bytes().into(),
349            dataset_schema: bytes::Bytes::new(),
350            parameter_schema: bytes::Bytes::new(),
351        })
352    }
353
354    async fn do_action_close_prepared_statement(
355        &self,
356        _query: ActionClosePreparedStatementRequest,
357        request: Request<Action>,
358    ) -> Result<(), Status> {
359        self.check_auth(&request)?;
360        Ok(())
361    }
362
363    async fn do_get_prepared_statement(
364        &self,
365        query: CommandPreparedStatementQuery,
366        request: Request<Ticket>,
367    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
368        self.check_auth(&request)?;
369        let sql = String::from_utf8(query.prepared_statement_handle.to_vec())
370            .map_err(|_| Status::internal("invalid prepared statement handle"))?;
371
372        debug!(sql, "do_get_prepared_statement (streaming)");
373        let stream = self.execute_streaming(&sql).await?;
374        Ok(Response::new(stream))
375    }
376
377    // -----------------------------------------------------------------------
378    // Write operations — read-only, return Unimplemented
379    // -----------------------------------------------------------------------
380
381    async fn do_put_statement_update(
382        &self,
383        _ticket: CommandStatementUpdate,
384        request: Request<PeekableFlightDataStream>,
385    ) -> Result<i64, Status> {
386        self.check_auth(&request)?;
387        Err(Status::unimplemented(
388            "write operations not supported — OLAP is read-only",
389        ))
390    }
391
392    async fn register_sql_info(&self, _id: i32, _result: &arrow_flight::sql::SqlInfo) {}
393}