rhei-flight 1.5.0

Arrow Flight SQL server for Rhei HTAP engine OLAP queries
Documentation
//! [`FlightSqlService`](arrow_flight::sql::server::FlightSqlService) implementation
//! that delegates to [`rhei_olap::OlapBackend`].
//!
//! The service follows a **deferred-execution** pattern:
//!
//! 1. `get_flight_info_*` encodes the SQL statement into a
//!    [`Ticket`](arrow_flight::Ticket) and returns a
//!    [`FlightInfo`](arrow_flight::FlightInfo) immediately — no query planning.
//! 2. `do_get_*` receives the ticket, decodes the SQL, calls
//!    [`rhei_core::OlapEngine::query_stream`] on the backend, and streams
//!    Arrow IPC record batches back to the client.
//!
//! Tickets are cheap (just opaque byte handles); deferring the actual query
//! plan to `do_get` means the `get_flight_info` round-trip adds negligible
//! latency and avoids holding resources while the client is not yet ready to
//! consume results.
//!
//! All write operations (`do_put_*`) return `tonic::Status::unimplemented`
//! because Rhei's OLAP layer is read-only — DML is handled by the OLTP engine
//! via a separate path.

use std::pin::Pin;

use arrow::ipc::writer::IpcWriteOptions;
use arrow_flight::encode::FlightDataEncoderBuilder;
use arrow_flight::flight_service_server::FlightService;
use arrow_flight::sql::server::FlightSqlService;
use arrow_flight::sql::server::PeekableFlightDataStream;
use arrow_flight::sql::{
    ActionClosePreparedStatementRequest, ActionCreatePreparedStatementRequest,
    ActionCreatePreparedStatementResult, CommandPreparedStatementQuery, CommandStatementQuery,
    CommandStatementUpdate, ProstMessageExt, TicketStatementQuery,
};
use arrow_flight::{
    Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse,
    Ticket,
};
use futures::stream;
use futures::TryStreamExt;
use prost::Message;
use rhei_core::OlapEngine;
use rhei_olap::OlapBackend;
use tonic::{Request, Response, Status, Streaming};
use tracing::{debug, warn};

/// Arrow Flight SQL service backed by Rhei's OLAP engine.
///
/// This struct implements [`arrow_flight::sql::server::FlightSqlService`] and
/// routes every analytical SQL query to the [`rhei_olap::OlapBackend`]
/// (DataFusion or DuckDB) using true streaming via
/// [`rhei_core::OlapEngine::query_stream`].  Record batches flow from the
/// engine directly into the gRPC Arrow IPC stream without buffering the full
/// result in memory.
///
/// # Deferred execution
///
/// The service uses a deferred-execution pattern:
///
/// - `get_flight_info_statement` encodes the SQL into an opaque
///   [`Ticket`](arrow_flight::Ticket) and returns immediately — no query is
///   planned yet.
/// - `do_get_statement` receives the ticket, decodes the SQL, executes it
///   against the OLAP backend, and streams the results.
///
/// This keeps `get_flight_info` latency minimal and avoids holding engine
/// resources while the client is not yet ready to consume data.
///
/// # Read-only constraint
///
/// Write operations (`do_put_*`) return `tonic::Status::unimplemented`.
/// DML (INSERT / UPDATE / DELETE) is handled by the OLTP engine through a
/// separate path; there are no DML semantics through Arrow Flight SQL today.
///
/// # Compression
///
/// Defaults to [`CompressionType::Zstd`].  Override at construction time with
/// [`RheiFlightSqlService::with_compression`].
///
/// # Authentication
///
/// When `auth_token` is `Some`, every RPC must carry
/// `authorization: Bearer {token}` in the gRPC metadata; requests without a
/// valid token are rejected with `tonic::Status::unauthenticated`.  When
/// `auth_token` is `None` (the default), all requests are allowed.
pub struct RheiFlightSqlService {
    olap: OlapBackend,
    compression: CompressionType,
    auth_token: Option<String>,
}

/// Compression algorithm applied to Arrow IPC record batches sent over gRPC.
///
/// Compression is negotiated per-stream: the server applies the chosen codec
/// when encoding IPC buffers with `FlightDataEncoderBuilder`.
///
/// # Choosing a codec
///
/// - Use [`Zstd`](CompressionType::Zstd) for most deployments — it offers the
///   best size/CPU tradeoff for typical columnar workloads and is the default.
/// - Use [`Lz4`](CompressionType::Lz4) when the client is CPU-constrained and
///   decompression speed matters more than wire size.
/// - Use [`None`](CompressionType::None) on loopback / LAN links where network
///   bandwidth is not a bottleneck and you want zero codec overhead.
#[derive(Debug, Clone, Copy)]
pub enum CompressionType {
    /// No compression.  Zero CPU cost; highest wire size.
    ///
    /// Suitable for loopback or high-bandwidth LAN deployments where encoding
    /// overhead outweighs any bandwidth saving.
    None,
    /// Zstandard compression (the default).
    ///
    /// Best size/CPU tradeoff for typical columnar Arrow workloads.  Zstd
    /// achieves higher compression ratios than LZ4 at comparable or lower
    /// decompression cost, making it the right default for most deployments.
    Zstd,
    /// LZ4 (frame format) compression.
    ///
    /// Faster decompression than Zstd at the cost of a larger wire footprint.
    /// Prefer this when the client is CPU-constrained and decompression speed
    /// is more important than reducing bytes on the wire.
    Lz4,
}

impl RheiFlightSqlService {
    /// Create a new service backed by `olap` with [`CompressionType::Zstd`] (the default).
    ///
    /// Authentication is disabled: all requests are accepted without a bearer
    /// token.  Call [`RheiFlightSqlService::with_auth_token`] on the returned
    /// value to enable token-based authentication.
    pub fn new(olap: OlapBackend) -> Self {
        Self {
            olap,
            compression: CompressionType::Zstd,
            auth_token: None,
        }
    }

    /// Create a new service backed by `olap` with the given [`CompressionType`].
    ///
    /// Authentication is disabled by default.  Chain
    /// [`RheiFlightSqlService::with_auth_token`] to require bearer-token auth.
    pub fn with_compression(olap: OlapBackend, compression: CompressionType) -> Self {
        Self {
            olap,
            compression,
            auth_token: None,
        }
    }

    /// Require clients to present `Authorization: Bearer {token}` on every RPC.
    ///
    /// Returns a new service instance with the token configured. When the
    /// token is `None` all requests are allowed (default, no-auth mode).
    ///
    /// Empty strings are silently ignored (treated as "no auth configured")
    /// to handle misconfigured `RHEI_FLIGHT_AUTH_TOKEN=""` env vars safely —
    /// otherwise an empty-string expected token would match every request
    /// that presented no `authorization` header, effectively disabling auth
    /// while giving operators a false sense of security.
    pub fn with_auth_token(mut self, token: impl Into<String>) -> Self {
        let token = token.into();
        if token.is_empty() {
            tracing::warn!(
                "FlightSQL auth token is empty; ignoring configuration and running without auth"
            );
            self.auth_token = None;
        } else {
            self.auth_token = Some(token);
        }
        self
    }

    // Verify the `authorization` metadata header on an incoming request.
    // No-op when `auth_token` is None; otherwise expects `Bearer {token}`.
    fn check_auth<T>(&self, request: &Request<T>) -> Result<(), Status> {
        let Some(expected) = &self.auth_token else {
            return Ok(());
        };
        // Defense in depth: reject an empty configured token so a misrouted
        // empty string never authorizes requests with no/empty bearer tokens.
        if expected.is_empty() {
            return Err(Status::unauthenticated(
                "server auth misconfigured (empty token)",
            ));
        }
        let got = request
            .metadata()
            .get("authorization")
            .and_then(|v| v.to_str().ok())
            .unwrap_or("");
        let token = got.strip_prefix("Bearer ").unwrap_or("");
        if token.is_empty() || token != expected {
            return Err(Status::unauthenticated("invalid or missing bearer token"));
        }
        Ok(())
    }

    /// Build IPC write options with the configured compression.
    fn ipc_options(&self) -> IpcWriteOptions {
        let options = IpcWriteOptions::default();
        match self.compression {
            CompressionType::None => options,
            CompressionType::Zstd => options
                .try_with_compression(Some(arrow::ipc::CompressionType::ZSTD))
                .unwrap_or_default(),
            CompressionType::Lz4 => options
                .try_with_compression(Some(arrow::ipc::CompressionType::LZ4_FRAME))
                .unwrap_or_default(),
        }
    }

    /// Execute a streaming query and return a Flight data stream.
    ///
    /// Uses `query_stream()` for true streaming (DataFusion) or the default
    /// fallback (DuckDB: collect then stream).
    async fn execute_streaming(
        &self,
        sql: &str,
    ) -> Result<
        Pin<Box<dyn futures::Stream<Item = Result<arrow_flight::FlightData, Status>> + Send>>,
        Status,
    > {
        let batch_stream = self.olap.query_stream(sql).await.map_err(|e| {
            warn!(error = %e, sql, "OLAP query failed");
            Status::internal(format!("query error: {e}"))
        })?;

        // Map Box<dyn Error> to FlightError for the encoder
        let mapped = batch_stream.map_err(arrow_flight::error::FlightError::ExternalError);

        let flight_stream = FlightDataEncoderBuilder::new()
            .with_options(self.ipc_options())
            .build(mapped)
            .map_err(|e| Status::internal(e.to_string()));

        Ok(Box::pin(flight_stream))
    }
}

#[tonic::async_trait]
impl FlightSqlService for RheiFlightSqlService {
    type FlightService = RheiFlightSqlService;

    // -----------------------------------------------------------------------
    // Handshake — validates bearer token when auth is configured
    // -----------------------------------------------------------------------

    async fn do_handshake(
        &self,
        request: Request<Streaming<HandshakeRequest>>,
    ) -> Result<
        Response<Pin<Box<dyn futures::Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
        Status,
    > {
        self.check_auth(&request)?;

        // Echo the bearer token back in the response payload so that
        // FlightSqlServiceClient.handshake() automatically picks it up via
        // the `authorization` header it inspects in the response metadata.
        // For callers that already set the token directly this is a no-op.
        let mut response = Response::new(Box::pin(stream::once(async {
            Ok(HandshakeResponse {
                protocol_version: 0,
                payload: bytes::Bytes::new(),
            })
        }))
            as Pin<Box<dyn futures::Stream<Item = Result<HandshakeResponse, Status>> + Send>>);

        if let Some(token) = &self.auth_token {
            if let Ok(val) = format!("Bearer {token}").parse() {
                response.metadata_mut().insert("authorization", val);
            }
        }

        Ok(response)
    }

    // -----------------------------------------------------------------------
    // Statement queries — streaming execution
    // -----------------------------------------------------------------------

    async fn get_flight_info_statement(
        &self,
        query: CommandStatementQuery,
        request: Request<FlightDescriptor>,
    ) -> Result<Response<FlightInfo>, Status> {
        self.check_auth(&request)?;
        let sql = &query.query;
        debug!(sql, "get_flight_info_statement");

        // Defer execution to do_get_statement.
        let ticket = TicketStatementQuery {
            statement_handle: sql.as_bytes().to_vec().into(),
        };
        let any = ticket.as_any();
        let ticket_bytes = any.encode_to_vec();

        let endpoint = FlightEndpoint::new().with_ticket(Ticket::new(ticket_bytes));
        let info = FlightInfo::new().with_endpoint(endpoint);

        Ok(Response::new(info))
    }

    async fn do_get_statement(
        &self,
        ticket: TicketStatementQuery,
        request: Request<Ticket>,
    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
        self.check_auth(&request)?;
        let sql = String::from_utf8(ticket.statement_handle.to_vec())
            .map_err(|_| Status::internal("invalid statement handle"))?;

        debug!(sql, "do_get_statement (streaming)");
        let stream = self.execute_streaming(&sql).await?;
        Ok(Response::new(stream))
    }

    // -----------------------------------------------------------------------
    // Prepared statements
    // -----------------------------------------------------------------------

    async fn get_flight_info_prepared_statement(
        &self,
        cmd: CommandPreparedStatementQuery,
        request: Request<FlightDescriptor>,
    ) -> Result<Response<FlightInfo>, Status> {
        self.check_auth(&request)?;
        debug!("get_flight_info_prepared_statement");

        let any = cmd.as_any();
        let ticket_bytes = any.encode_to_vec();
        let endpoint = FlightEndpoint::new().with_ticket(Ticket::new(ticket_bytes));
        let info = FlightInfo::new().with_endpoint(endpoint);

        Ok(Response::new(info))
    }

    async fn do_action_create_prepared_statement(
        &self,
        query: ActionCreatePreparedStatementRequest,
        request: Request<Action>,
    ) -> Result<ActionCreatePreparedStatementResult, Status> {
        self.check_auth(&request)?;
        debug!(sql = query.query, "create_prepared_statement");

        Ok(ActionCreatePreparedStatementResult {
            prepared_statement_handle: query.query.into_bytes().into(),
            dataset_schema: bytes::Bytes::new(),
            parameter_schema: bytes::Bytes::new(),
        })
    }

    async fn do_action_close_prepared_statement(
        &self,
        _query: ActionClosePreparedStatementRequest,
        request: Request<Action>,
    ) -> Result<(), Status> {
        self.check_auth(&request)?;
        Ok(())
    }

    async fn do_get_prepared_statement(
        &self,
        query: CommandPreparedStatementQuery,
        request: Request<Ticket>,
    ) -> Result<Response<<Self as FlightService>::DoGetStream>, Status> {
        self.check_auth(&request)?;
        let sql = String::from_utf8(query.prepared_statement_handle.to_vec())
            .map_err(|_| Status::internal("invalid prepared statement handle"))?;

        debug!(sql, "do_get_prepared_statement (streaming)");
        let stream = self.execute_streaming(&sql).await?;
        Ok(Response::new(stream))
    }

    // -----------------------------------------------------------------------
    // Write operations — read-only, return Unimplemented
    // -----------------------------------------------------------------------

    async fn do_put_statement_update(
        &self,
        _ticket: CommandStatementUpdate,
        request: Request<PeekableFlightDataStream>,
    ) -> Result<i64, Status> {
        self.check_auth(&request)?;
        Err(Status::unimplemented(
            "write operations not supported — OLAP is read-only",
        ))
    }

    async fn register_sql_info(&self, _id: i32, _result: &arrow_flight::sql::SqlInfo) {}
}