Skip to main content

rhei_flight/
lib.rs

1//! Arrow Flight SQL gRPC server for the Rhei HTAP engine.
2//!
3//! This crate exposes the OLAP backend (DataFusion or DuckDB) over the
4//! [Arrow Flight SQL](https://arrow.apache.org/docs/format/FlightSql.html)
5//! protocol, letting clients such as `adbc_driver_flightsql` (Python), DBeaver,
6//! or any standard Arrow Flight SQL driver run analytical queries via gRPC.
7//!
8//! # Architecture
9//!
10//! The crate is built on `arrow-flight 58` and `tonic 0.14`.  The core service
11//! is [`RheiFlightSqlService`], which implements `FlightSqlService` and
12//! delegates all query execution to [`rhei_olap::OlapBackend`].
13//!
14//! ## Deferred-execution pattern
15//!
16//! `get_flight_info_*` encodes the SQL string into an opaque
17//! ticket and returns a `FlightInfo` immediately — no query planning occurs
18//! yet.  When the client calls `do_get_*` with that ticket, the service
19//! decodes the SQL, calls [`rhei_core::OlapEngine::query_stream`], and streams
20//! Arrow IPC record batches back over the gRPC connection.
21//!
22//! Tickets are cheap opaque byte handles; deferring query planning to
23//! `do_get` keeps `get_flight_info` latency negligible and avoids holding
24//! engine resources while the client is not yet ready to consume data.
25//!
26//! ## Compression
27//!
28//! Arrow IPC record batches are compressed before transmission.
29//! [`CompressionType::Zstd`] is the default — it offers the best
30//! size/CPU tradeoff for typical columnar workloads.
31//! [`CompressionType::Lz4`] is available for CPU-constrained clients that
32//! need faster decompression.  [`CompressionType::None`] disables compression
33//! entirely, which is useful on loopback or high-bandwidth LAN links.
34//!
35//! ## Read-only constraint
36//!
37//! All write operations (`do_put_*`) return `tonic::Status::unimplemented`.
38//! Rhei's OLAP layer carries no DML semantics — INSERT / UPDATE / DELETE go
39//! through the OLTP engine (Rusqlite) on a separate path.
40//!
41//! # Quick start
42//!
43//! ```rust,no_run
44//! use std::net::SocketAddr;
45//! use rhei_flight::serve_flight_sql;
46//! # async fn example(olap: rhei_olap::OlapBackend) -> Result<(), Box<dyn std::error::Error>> {
47//! let addr: SocketAddr = "0.0.0.0:50051".parse()?;
48//! serve_flight_sql(olap, addr).await?;
49//! # Ok(()) }
50//! ```
51
52mod service;
53
54pub use service::{CompressionType, RheiFlightSqlService};
55
56use std::net::SocketAddr;
57
58use tonic::transport::Server;
59use tracing::info;
60
61/// Start the Arrow Flight SQL gRPC server with [`CompressionType::Zstd`] (the default).
62///
63/// Binds a tonic gRPC server on `addr` and serves Arrow Flight SQL queries
64/// against the provided [`rhei_olap::OlapBackend`].  This is a convenience
65/// wrapper around [`serve_flight_sql_with_compression`] that pre-selects Zstd.
66///
67/// The call blocks until the server shuts down or returns an error.
68pub async fn serve_flight_sql(
69    olap: rhei_olap::OlapBackend,
70    addr: SocketAddr,
71) -> Result<(), Box<dyn std::error::Error>> {
72    serve_flight_sql_with_compression(olap, addr, CompressionType::Zstd).await
73}
74
75/// Start the Arrow Flight SQL gRPC server with the specified [`CompressionType`].
76///
77/// Binds a tonic gRPC server on `addr` and serves Arrow Flight SQL queries
78/// against the provided [`rhei_olap::OlapBackend`].  Use this variant when
79/// you need to override the default Zstd compression — for example, to select
80/// [`CompressionType::Lz4`] on CPU-constrained clients or
81/// [`CompressionType::None`] on loopback deployments.
82///
83/// The call blocks until the server shuts down or returns an error.
84pub async fn serve_flight_sql_with_compression(
85    olap: rhei_olap::OlapBackend,
86    addr: SocketAddr,
87    compression: CompressionType,
88) -> Result<(), Box<dyn std::error::Error>> {
89    let service = RheiFlightSqlService::with_compression(olap, compression);
90    let svc = arrow_flight::flight_service_server::FlightServiceServer::new(service);
91
92    info!(%addr, ?compression, "Arrow Flight SQL server listening");
93
94    Server::builder().add_service(svc).serve(addr).await?;
95
96    Ok(())
97}
98
99/// Start the Arrow Flight SQL gRPC server with bearer-token authentication.
100///
101/// Identical to [`serve_flight_sql`] but requires every client RPC to carry
102/// `authorization: Bearer {token}` in the gRPC metadata.  Requests that omit
103/// or present a wrong token are rejected with `tonic::Status::unauthenticated`.
104///
105/// Uses [`CompressionType::Zstd`] by default.  To combine custom compression
106/// with authentication, construct a [`RheiFlightSqlService`] manually using
107/// [`RheiFlightSqlService::with_compression`] and
108/// [`RheiFlightSqlService::with_auth_token`].
109///
110/// The call blocks until the server shuts down or returns an error.
111pub async fn serve_flight_sql_with_auth(
112    olap: rhei_olap::OlapBackend,
113    addr: SocketAddr,
114    auth_token: String,
115) -> Result<(), Box<dyn std::error::Error>> {
116    let service = RheiFlightSqlService::with_compression(olap, CompressionType::Zstd)
117        .with_auth_token(auth_token);
118    let svc = arrow_flight::flight_service_server::FlightServiceServer::new(service);
119
120    info!(%addr, "Arrow Flight SQL server listening (bearer-token auth enabled)");
121
122    Server::builder().add_service(svc).serve(addr).await?;
123
124    Ok(())
125}