Skip to main content

datapress_core/
backend.rs

1//! Backend-agnostic interface used by the shared HTTP handlers.
2//!
3//! Both `datapress-duckdb` and `datapress-datafusion` implement [`Backend`]
4//! against their own dataset registry / store. The generic handlers in
5//! [`crate::handlers`] and the [`crate::server::serve`] helper then drive
6//! either backend through the same code path.
7
8use std::io::{self, Write};
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use futures_util::stream::{self, BoxStream, StreamExt};
14use serde::Serialize;
15use tokio::sync::mpsc;
16
17use crate::errors::AppError;
18use crate::models::{CountRequest, QueryRequest};
19use crate::schema::DatasetSchema;
20
21/// Stream of Arrow IPC response chunks emitted by a backend.
22pub type ArrowIpcStream = BoxStream<'static, Result<Bytes, AppError>>;
23
24/// Target size for a single Arrow IPC response chunk. Arrow's
25/// `StreamWriter` issues many tiny `write()` calls (length prefixes,
26/// per-buffer padding, one call per column buffer); forwarding each as
27/// its own HTTP chunk produces hundreds of micro-frames that ping-pong
28/// across the `spawn_blocking` ↔ async boundary and dominate the wire
29/// time. Coalescing them into ~64 KiB chunks keeps the channel and the
30/// chunked transfer encoding efficient.
31const ARROW_CHUNK_TARGET: usize = 64 * 1024;
32
33/// Writer used by backend encoders to push Arrow IPC bytes into an HTTP
34/// response stream without accumulating one full response buffer. Small
35/// writes are buffered and flushed in ~[`ARROW_CHUNK_TARGET`]-byte chunks.
36pub struct ArrowIpcChunkWriter {
37    tx: mpsc::Sender<Result<Bytes, AppError>>,
38    buf: Vec<u8>,
39}
40
41impl ArrowIpcChunkWriter {
42    pub fn send_error(&mut self, err: AppError) {
43        // Drop any partial chunk so it can't trail the error on the wire.
44        self.buf.clear();
45        let _ = self.tx.blocking_send(Err(err));
46    }
47
48    /// Ship whatever is currently buffered as one chunk. No-op when empty.
49    fn send_buffered(&mut self) -> io::Result<()> {
50        if self.buf.is_empty() {
51            return Ok(());
52        }
53        let chunk = Bytes::from(std::mem::take(&mut self.buf));
54        self.tx
55            .blocking_send(Ok(chunk))
56            .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "response stream closed"))
57    }
58}
59
60impl Write for ArrowIpcChunkWriter {
61    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
62        self.buf.extend_from_slice(buf);
63        if self.buf.len() >= ARROW_CHUNK_TARGET {
64            self.send_buffered()?;
65        }
66        Ok(buf.len())
67    }
68
69    fn flush(&mut self) -> io::Result<()> {
70        self.send_buffered()
71    }
72}
73
74impl Drop for ArrowIpcChunkWriter {
75    fn drop(&mut self) {
76        // Flush the tail the encoder didn't explicitly flush (e.g. after
77        // `StreamWriter::finish`). Any send error here is unobservable —
78        // the receiver has already gone away.
79        let _ = self.send_buffered();
80    }
81}
82
83pub fn arrow_ipc_stream_channel(capacity: usize) -> (ArrowIpcChunkWriter, ArrowIpcStream) {
84    let (tx, rx) = mpsc::channel(capacity);
85    let writer = ArrowIpcChunkWriter {
86        tx,
87        buf: Vec::with_capacity(ARROW_CHUNK_TARGET),
88    };
89    let stream = stream::unfold(rx, |mut rx| async move {
90        rx.recv().await.map(|item| (item, rx))
91    })
92    .boxed();
93    (writer, stream)
94}
95
96/// Outcome of a successful [`Backend::reload`].
97#[derive(Debug, Clone, Copy, Serialize)]
98pub struct ReloadStats {
99    pub rows: usize,
100    pub elapsed_ms: u128,
101}
102
103/// One entry in `GET /api/datasets`.
104#[derive(Debug, Clone, Serialize)]
105pub struct DatasetSummary {
106    pub name: String,
107    pub columns: usize,
108    pub rows: usize,
109}
110
111/// Read / reload interface every backend exposes to the HTTP layer.
112///
113/// All methods are async — synchronous backends (DuckDB) wrap their
114/// blocking calls in `actix_web::web::block` inside the impl.
115#[async_trait]
116pub trait Backend: Send + Sync + 'static {
117    /// Sorted list of dataset names.
118    fn names(&self) -> Vec<String>;
119
120    /// Cheap summary for the dataset listing endpoint. `Err(NotFound)`
121    /// on unknown name.
122    fn summary(&self, name: &str) -> Result<DatasetSummary, AppError>;
123
124    /// Full schema for `name`. `Err(NotFound)` on unknown name.
125    fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError>;
126
127    /// Names of columns the backend has built an equality index over,
128    /// for inclusion in the `/schema` response. Default impl returns
129    /// an empty vec — backends without per-column indexes (e.g.
130    /// DuckDB, which relies on the embedded database engine) need
131    /// not override.
132    fn indexed_columns(&self, _name: &str) -> Result<Vec<String>, AppError> {
133        Ok(Vec::new())
134    }
135
136    /// JSON for the first row of the dataset, or the literal string
137    /// `"null"` if the dataset is empty.
138    async fn sample(&self, name: &str) -> Result<String, AppError>;
139
140    /// Execute `req` against `name`, returning the JSON-encoded `data`
141    /// array (without the `{"data": …, "page": …}` envelope — that's
142    /// added by the handler).
143    async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError>;
144
145    /// Execute `req` against `name`, returning the result as an Arrow IPC
146    /// **stream** byte buffer (one schema message + zero or more
147    /// `RecordBatch` messages + EOS). The handler ships this verbatim
148    /// with `Content-Type: application/vnd.apache.arrow.stream`.
149    ///
150    /// Default impl errors with `InvalidValue` — backends that don't
151    /// produce Arrow natively (e.g. DuckDB today) reject the format and
152    /// the handler falls through to JSON. Override on backends where
153    /// batches are already Arrow.
154    async fn query_arrow(&self, _name: &str, _req: &QueryRequest) -> Result<Vec<u8>, AppError> {
155        Err(AppError::InvalidValue(
156            "Arrow IPC response format is not supported by this backend".into(),
157        ))
158    }
159
160    /// Execute `req` and stream the Arrow IPC bytes. The default adapter
161    /// preserves compatibility for backends that only implement
162    /// [`Backend::query_arrow`], but high-throughput backends should
163    /// override this to avoid building one full response buffer.
164    async fn query_arrow_stream(
165        &self,
166        name: &str,
167        req: &QueryRequest,
168    ) -> Result<ArrowIpcStream, AppError> {
169        let bytes = self.query_arrow(name, req).await?;
170        Ok(Box::pin(stream::once(
171            async move { Ok(Bytes::from(bytes)) },
172        )))
173    }
174
175    /// Execute `req` and stream all matching Arrow IPC batches in one HTTP
176    /// response. Unlike [`Backend::query_arrow_stream`], this is not page
177    /// scoped; `limit` may still cap the total rows returned.
178    async fn query_arrow_stream_all(
179        &self,
180        name: &str,
181        req: &QueryRequest,
182    ) -> Result<ArrowIpcStream, AppError> {
183        let bytes = self.query_arrow(name, req).await?;
184        Ok(Box::pin(stream::once(
185            async move { Ok(Bytes::from(bytes)) },
186        )))
187    }
188
189    /// Count rows in `name` matching `req.predicates`.
190    async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError>;
191
192    /// Execute a pre-validated raw `SELECT` and return the JSON-encoded
193    /// `data` array (same shape as [`Backend::query`] — the handler adds
194    /// the `{"data": …}` envelope).
195    ///
196    /// `sql` has already passed [`crate::sql::validate`]: it is a single
197    /// read-only query that references only registered datasets. The
198    /// backend wraps it in an outer `LIMIT max_rows` before executing so
199    /// the result size is bounded regardless of the user's own `LIMIT`.
200    ///
201    /// Default impl errors with `InvalidValue`; backends that support raw
202    /// SQL (DuckDB, DataFusion) override it.
203    async fn query_sql(&self, _sql: &str, _max_rows: u64) -> Result<String, AppError> {
204        Err(AppError::InvalidValue(
205            "raw SQL is not supported by this backend".into(),
206        ))
207    }
208
209    /// Execute a pre-validated raw `SELECT` and stream the result as Arrow
210    /// IPC bytes (one schema message + zero or more `RecordBatch` messages
211    /// + EOS), the same wire format as [`Backend::query_arrow_stream`].
212    ///
213    /// `sql` has already passed [`crate::sql::validate`]; the backend wraps
214    /// it in an outer `LIMIT max_rows` so the result is bounded regardless
215    /// of the caller's own clauses. Powers the Arrow content-negotiated
216    /// branch of `POST /api/v1/sql`.
217    ///
218    /// Default impl errors with `InvalidValue`; backends that support raw
219    /// SQL (DuckDB, DataFusion) override it.
220    async fn query_sql_arrow_stream(
221        &self,
222        _sql: &str,
223        _max_rows: u64,
224    ) -> Result<ArrowIpcStream, AppError> {
225        Err(AppError::InvalidValue(
226            "raw SQL is not supported by this backend".into(),
227        ))
228    }
229
230    /// Encode the **entire** dataset as a single self-contained Parquet
231    /// file, returned as in-memory bytes.
232    ///
233    /// Powers `GET /datasets/{name}/parquet`, which serves these bytes
234    /// with HTTP range support so external tools (DuckDB `httpfs`, pandas,
235    /// polars, …) can read the dataset straight over HTTP — e.g.
236    /// `SELECT count(*) FROM 'http://host/api/v1/datasets/accidents/parquet'`.
237    ///
238    /// The handler caches the result per dataset (and invalidates on
239    /// reload) so the repeated range requests a Parquet reader makes all
240    /// see identical, stable bytes. Default impl errors with
241    /// `InvalidValue`; every shipped backend overrides it.
242    async fn parquet(&self, _name: &str) -> Result<Bytes, AppError> {
243        Err(AppError::InvalidValue(
244            "Parquet export is not supported by this backend".into(),
245        ))
246    }
247
248    /// Rebuild `name` from its configured source and atomically swap it in.
249    async fn reload(&self, name: &str) -> Result<ReloadStats, AppError>;
250}