datapress_core/backend.rs
1//! Backend-agnostic interface used by the shared HTTP handlers.
2//!
3//! Both `datapress-duckdb` and `datapress-datafusion` implement [`Backend`]
4//! against their own dataset registry / store. The generic handlers in
5//! [`crate::handlers`] and the [`crate::server::serve`] helper then drive
6//! either backend through the same code path.
7
8use std::io::{self, Write};
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use bytes::Bytes;
13use futures_util::stream::{self, BoxStream, StreamExt};
14use serde::Serialize;
15use tokio::sync::mpsc;
16
17use crate::errors::AppError;
18use crate::models::{CountRequest, QueryRequest};
19use crate::schema::DatasetSchema;
20
21/// Stream of Arrow IPC response chunks emitted by a backend.
22pub type ArrowIpcStream = BoxStream<'static, Result<Bytes, AppError>>;
23
24/// Writer used by backend encoders to push Arrow IPC bytes into an HTTP
25/// response stream without accumulating one full response buffer.
26pub struct ArrowIpcChunkWriter {
27 tx: mpsc::Sender<Result<Bytes, AppError>>,
28}
29
30impl ArrowIpcChunkWriter {
31 pub fn send_error(&self, err: AppError) {
32 let _ = self.tx.blocking_send(Err(err));
33 }
34}
35
36impl Write for ArrowIpcChunkWriter {
37 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
38 self.tx
39 .blocking_send(Ok(Bytes::copy_from_slice(buf)))
40 .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "response stream closed"))?;
41 Ok(buf.len())
42 }
43
44 fn flush(&mut self) -> io::Result<()> {
45 Ok(())
46 }
47}
48
49pub fn arrow_ipc_stream_channel(capacity: usize) -> (ArrowIpcChunkWriter, ArrowIpcStream) {
50 let (tx, rx) = mpsc::channel(capacity);
51 let writer = ArrowIpcChunkWriter { tx };
52 let stream = stream::unfold(rx, |mut rx| async move {
53 rx.recv().await.map(|item| (item, rx))
54 })
55 .boxed();
56 (writer, stream)
57}
58
59/// Outcome of a successful [`Backend::reload`].
60#[derive(Debug, Clone, Copy, Serialize)]
61pub struct ReloadStats {
62 pub rows: usize,
63 pub elapsed_ms: u128,
64}
65
66/// One entry in `GET /api/datasets`.
67#[derive(Debug, Clone, Serialize)]
68pub struct DatasetSummary {
69 pub name: String,
70 pub columns: usize,
71 pub rows: usize,
72}
73
74/// Read / reload interface every backend exposes to the HTTP layer.
75///
76/// All methods are async — synchronous backends (DuckDB) wrap their
77/// blocking calls in `actix_web::web::block` inside the impl.
78#[async_trait]
79pub trait Backend: Send + Sync + 'static {
80 /// Sorted list of dataset names.
81 fn names(&self) -> Vec<String>;
82
83 /// Cheap summary for the dataset listing endpoint. `Err(NotFound)`
84 /// on unknown name.
85 fn summary(&self, name: &str) -> Result<DatasetSummary, AppError>;
86
87 /// Full schema for `name`. `Err(NotFound)` on unknown name.
88 fn schema(&self, name: &str) -> Result<Arc<DatasetSchema>, AppError>;
89
90 /// Names of columns the backend has built an equality index over,
91 /// for inclusion in the `/schema` response. Default impl returns
92 /// an empty vec — backends without per-column indexes (e.g.
93 /// DuckDB, which relies on the embedded database engine) need
94 /// not override.
95 fn indexed_columns(&self, _name: &str) -> Result<Vec<String>, AppError> {
96 Ok(Vec::new())
97 }
98
99 /// JSON for the first row of the dataset, or the literal string
100 /// `"null"` if the dataset is empty.
101 async fn sample(&self, name: &str) -> Result<String, AppError>;
102
103 /// Execute `req` against `name`, returning the JSON-encoded `data`
104 /// array (without the `{"data": …, "page": …}` envelope — that's
105 /// added by the handler).
106 async fn query(&self, name: &str, req: &QueryRequest) -> Result<String, AppError>;
107
108 /// Execute `req` against `name`, returning the result as an Arrow IPC
109 /// **stream** byte buffer (one schema message + zero or more
110 /// `RecordBatch` messages + EOS). The handler ships this verbatim
111 /// with `Content-Type: application/vnd.apache.arrow.stream`.
112 ///
113 /// Default impl errors with `InvalidValue` — backends that don't
114 /// produce Arrow natively (e.g. DuckDB today) reject the format and
115 /// the handler falls through to JSON. Override on backends where
116 /// batches are already Arrow.
117 async fn query_arrow(&self, _name: &str, _req: &QueryRequest) -> Result<Vec<u8>, AppError> {
118 Err(AppError::InvalidValue(
119 "Arrow IPC response format is not supported by this backend".into(),
120 ))
121 }
122
123 /// Execute `req` and stream the Arrow IPC bytes. The default adapter
124 /// preserves compatibility for backends that only implement
125 /// [`Backend::query_arrow`], but high-throughput backends should
126 /// override this to avoid building one full response buffer.
127 async fn query_arrow_stream(
128 &self,
129 name: &str,
130 req: &QueryRequest,
131 ) -> Result<ArrowIpcStream, AppError> {
132 let bytes = self.query_arrow(name, req).await?;
133 Ok(Box::pin(stream::once(
134 async move { Ok(Bytes::from(bytes)) },
135 )))
136 }
137
138 /// Execute `req` and stream all matching Arrow IPC batches in one HTTP
139 /// response. Unlike [`Backend::query_arrow_stream`], this is not page
140 /// scoped; `limit` may still cap the total rows returned.
141 async fn query_arrow_stream_all(
142 &self,
143 name: &str,
144 req: &QueryRequest,
145 ) -> Result<ArrowIpcStream, AppError> {
146 let bytes = self.query_arrow(name, req).await?;
147 Ok(Box::pin(stream::once(
148 async move { Ok(Bytes::from(bytes)) },
149 )))
150 }
151
152 /// Count rows in `name` matching `req.predicates`.
153 async fn count(&self, name: &str, req: &CountRequest) -> Result<i64, AppError>;
154
155 /// Encode the **entire** dataset as a single self-contained Parquet
156 /// file, returned as in-memory bytes.
157 ///
158 /// Powers `GET /datasets/{name}/parquet`, which serves these bytes
159 /// with HTTP range support so external tools (DuckDB `httpfs`, pandas,
160 /// polars, …) can read the dataset straight over HTTP — e.g.
161 /// `SELECT count(*) FROM 'http://host/api/v1/datasets/accidents/parquet'`.
162 ///
163 /// The handler caches the result per dataset (and invalidates on
164 /// reload) so the repeated range requests a Parquet reader makes all
165 /// see identical, stable bytes. Default impl errors with
166 /// `InvalidValue`; every shipped backend overrides it.
167 async fn parquet(&self, _name: &str) -> Result<Bytes, AppError> {
168 Err(AppError::InvalidValue(
169 "Parquet export is not supported by this backend".into(),
170 ))
171 }
172
173 /// Rebuild `name` from its configured source and atomically swap it in.
174 async fn reload(&self, name: &str) -> Result<ReloadStats, AppError>;
175}