datapress_core/handlers/mod.rs
1//! HTTP handler surface, organised by API version.
2//!
3//! ## Layout
4//!
5//! - This module hosts **unversioned** endpoints (liveness / readiness /
6//! `/health`) plus shared utilities used by every version (content
7//! negotiation, the [`BackendData`] extractor type, the Arrow IPC MIME
8//! constant).
9//! - Each API version lives in its own submodule ([`v1`], future
10//! `v2`, …). Versions expose a single [`actix_web::web::ServiceConfig`]
11//! registration function so the server can mount them under a scope:
12//!
13//! ```ignore
14//! App::new()
15//! .service(web::scope("/api/v1").configure(handlers::v1::configure))
16//! ```
17//!
18//! ## Adding a new version
19//!
20//! 1. Copy `v1.rs` to `v2.rs` and adjust the request / response handlers.
21//! 2. Add `pub mod v2;` below.
22//! 3. Mount it in [`crate::server::serve`] under `/api/v2`.
23//! 4. Decide whether `v1` should be kept (it usually is, for a deprecation
24//! window) or removed.
25//!
26//! Handlers inside a version module are plain `async fn` (no route
27//! macros) so the same handler can be re-mounted in multiple scopes —
28//! that's how the legacy un-versioned `/api/datasets/...` alias works
29//! without duplicating code.
30
31use std::sync::Arc;
32
33use actix_web::{HttpRequest, HttpResponse, get, web};
34
35use crate::backend::Backend;
36
37pub mod v1;
38
39/// Convenience alias — every handler extracts the backend through this.
40pub type BackendData = web::Data<Arc<dyn Backend>>;/// Query-related limits copied from `[server]` config into Actix app data.
41#[derive(Debug, Clone, Copy)]
42pub struct QueryLimits {
43 pub max_page_size: u64,
44}
45
46impl Default for QueryLimits {
47 fn default() -> Self {
48 Self {
49 max_page_size: 100_000,
50 }
51 }
52}
53
54/// Raw-SQL endpoint settings copied from `[sql]` config into Actix app
55/// data. When `enabled` is false the `POST /api/v1/sql` handler returns
56/// `404` and never touches the engine.
57#[derive(Debug, Clone, Copy)]
58pub struct SqlSettings {
59 pub enabled: bool,
60 pub max_rows: u64,
61}
62
63impl Default for SqlSettings {
64 fn default() -> Self {
65 Self {
66 enabled: false,
67 max_rows: 100_000,
68 }
69 }
70}
71
72/// MIME type used for Arrow IPC stream responses.
73pub const ARROW_IPC_MIME: &str = "application/vnd.apache.arrow.stream";#[get("/health")]
74pub async fn health() -> HttpResponse {
75 HttpResponse::Ok()
76 .content_type("application/json")
77 .body(r#"{"status":"ok"}"#)
78}
79
80/// Liveness probe. Mounted outside the configured `prefix` at a fixed
81/// path so orchestrators don't need to know how the server is exposed.
82#[get("/healthz")]
83pub async fn healthz() -> HttpResponse {
84 HttpResponse::Ok()
85 .content_type("application/json")
86 .body(r#"{"status":"ok"}"#)
87}
88
89/// Readiness probe. Returns `200` once at least one dataset is registered
90/// (i.e. the registry finished loading at startup), `503` otherwise.
91#[get("/readyz")]
92pub async fn readyz(backend: BackendData) -> HttpResponse {
93 let names = backend.names();
94 if names.is_empty() {
95 HttpResponse::ServiceUnavailable()
96 .content_type("application/json")
97 .body(r#"{"status":"not ready","reason":"no datasets registered"}"#)
98 } else {
99 let body = format!(r#"{{"status":"ready","datasets":{}}}"#, names.len());
100 HttpResponse::Ok()
101 .content_type("application/json")
102 .body(body)
103 }
104}
105
106/// Build / version metadata published by [`version`] at `/version`.
107///
108/// Populated once by [`crate::server::serve`] from compile-time
109/// constants (`CARGO_PKG_*`) and optional build-time env vars
110/// (`DATAPRESS_GIT_SHA`, `DATAPRESS_BUILD_TIME`), and stored in actix
111/// app data. The handler just serialises it to JSON.
112#[derive(Clone, Debug, serde::Serialize)]
113pub struct BuildInfo {
114 /// Crate name (e.g. `"datapress-core"`).
115 pub name: &'static str,
116 /// Crate version from `CARGO_PKG_VERSION` (e.g. `"0.1.17"`).
117 pub version: &'static str,
118 /// Human-readable backend label — `"DuckDB"` or `"DataFusion"`.
119 pub backend: &'static str,
120 /// Git commit SHA the binary was built from. `None` when
121 /// `DATAPRESS_GIT_SHA` was not set at build time.
122 #[serde(skip_serializing_if = "Option::is_none")]
123 pub git_sha: Option<&'static str>,
124 /// ISO-8601 build timestamp. `None` when `DATAPRESS_BUILD_TIME`
125 /// was not set at build time.
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub build_time: Option<&'static str>,
128 /// `"debug"` or `"release"`, derived from `cfg!(debug_assertions)`.
129 pub profile: &'static str,
130 /// Rust target triple the binary was built for (e.g.
131 /// `"aarch64-apple-darwin"`). `None` when `DATAPRESS_TARGET` was
132 /// not set at build time.
133 #[serde(skip_serializing_if = "Option::is_none")]
134 pub target: Option<&'static str>,
135}
136
137impl BuildInfo {
138 /// Build a `BuildInfo` populated from compile-time constants. The
139 /// caller supplies the backend `label` (the binaries know which
140 /// they are; this crate doesn't).
141 pub fn new(backend: &'static str) -> Self {
142 Self {
143 name: env!("CARGO_PKG_NAME"),
144 version: env!("CARGO_PKG_VERSION"),
145 backend,
146 git_sha: option_env!("DATAPRESS_GIT_SHA"),
147 build_time: option_env!("DATAPRESS_BUILD_TIME"),
148 profile: if cfg!(debug_assertions) {
149 "debug"
150 } else {
151 "release"
152 },
153 target: option_env!("DATAPRESS_TARGET"),
154 }
155 }
156}
157
158/// Build / version info. Mounted unprefixed so orchestrators and
159/// release-tracking tools can hit it without knowing how the server
160/// is exposed. Always returns `200` with a JSON object.
161#[get("/version")]
162pub async fn version(info: web::Data<BuildInfo>) -> HttpResponse {
163 HttpResponse::Ok().json(info.get_ref())
164}
165
166/// True if the caller wants Arrow IPC: either `?format=arrow` in the
167/// query string, or `Accept` lists `application/vnd.apache.arrow.stream`.
168/// A bare `Accept: */*` does **not** count — JSON stays the default.
169pub(crate) fn wants_arrow(http: &HttpRequest) -> bool {
170 let qs = http.query_string();
171 if !qs.is_empty()
172 && qs.split('&').any(|kv| matches!(kv.split_once('='), Some(("format", v)) if v.eq_ignore_ascii_case("arrow")))
173 {
174 return true;
175 }
176 http.headers()
177 .get(actix_web::http::header::ACCEPT)
178 .and_then(|h| h.to_str().ok())
179 .map(|s| {
180 s.split(',').any(|part| {
181 part.split(';')
182 .next()
183 .unwrap_or("")
184 .trim()
185 .eq_ignore_ascii_case(ARROW_IPC_MIME)
186 })
187 })
188 .unwrap_or(false)
189}
190
191/// MIME type used for Parquet export responses.
192pub const PARQUET_MIME: &str = "application/vnd.apache.parquet";
193
194/// Process-wide cache of encoded Parquet exports, keyed by dataset name.
195///
196/// The `/datasets/{name}/parquet` endpoint serves a complete Parquet file
197/// with HTTP range support. A single client (e.g. DuckDB `httpfs`) issues
198/// several requests against it — a `HEAD` for the length, then ranged
199/// `GET`s for the footer and any row-group metadata — so every request
200/// must observe the *same* bytes. Caching the encoded file makes those
201/// requests cheap and consistent; [`crate::handlers::v1::reload_dataset`]
202/// drops the entry after a successful reload so a fresh export is built
203/// on next access.
204#[derive(Default)]
205pub struct ParquetCache {
206 inner: std::sync::RwLock<std::collections::HashMap<String, Arc<bytes::Bytes>>>,
207}
208
209impl ParquetCache {
210 /// Return the cached export for `name`, if one has been built.
211 pub fn get(&self, name: &str) -> Option<Arc<bytes::Bytes>> {
212 self.inner.read().ok()?.get(name).cloned()
213 }
214
215 /// Store `bytes` as the export for `name`, returning the stored handle.
216 pub fn insert(&self, name: &str, bytes: bytes::Bytes) -> Arc<bytes::Bytes> {
217 let shared = Arc::new(bytes);
218 if let Ok(mut map) = self.inner.write() {
219 map.insert(name.to_string(), shared.clone());
220 }
221 shared
222 }
223
224 /// Drop the cached export for `name` (no-op if absent).
225 pub fn invalidate(&self, name: &str) {
226 if let Ok(mut map) = self.inner.write() {
227 map.remove(name);
228 }
229 }
230}
231
232/// A single byte range resolved against a body of `total` bytes.
233struct ByteRange {
234 start: u64,
235 /// Inclusive end offset.
236 end: u64,
237}
238
239/// Parse a single-range HTTP `Range: bytes=…` header against a body of
240/// `total` bytes.
241///
242/// Returns:
243/// - `Ok(None)` when there is no (parseable) byte range — the caller
244/// should serve the full body with `200`.
245/// - `Ok(Some(range))` for a satisfiable single range — serve `206`.
246/// - `Err(())` when the range is syntactically a `bytes=` range but
247/// unsatisfiable — the caller should answer `416`.
248///
249/// Only the first range of a multi-range header is honoured; multi-range
250/// `multipart/byteranges` responses are intentionally not implemented, so
251/// such requests fall back to the full body.
252fn parse_byte_range(header: &str, total: u64) -> Result<Option<ByteRange>, ()> {
253 let spec = match header.trim().strip_prefix("bytes=") {
254 Some(s) => s.trim(),
255 None => return Ok(None),
256 };
257 // Take the first range only.
258 let first = spec.split(',').next().unwrap_or("").trim();
259 let (start_s, end_s) = match first.split_once('-') {
260 Some(parts) => parts,
261 None => return Ok(None),
262 };
263
264 if total == 0 {
265 return Err(());
266 }
267
268 let (start, end) = if start_s.is_empty() {
269 // Suffix range: `-N` → last N bytes.
270 let n: u64 = end_s.trim().parse().map_err(|_| ())?;
271 if n == 0 {
272 return Err(());
273 }
274 let n = n.min(total);
275 (total - n, total - 1)
276 } else {
277 let start: u64 = start_s.trim().parse().map_err(|_| ())?;
278 let end: u64 = if end_s.trim().is_empty() {
279 total - 1
280 } else {
281 end_s.trim().parse::<u64>().map_err(|_| ())?.min(total - 1)
282 };
283 (start, end)
284 };
285
286 if start > end || start >= total {
287 return Err(());
288 }
289 Ok(Some(ByteRange { start, end }))
290}
291
292/// Serve `body` as an HTTP response with range + `HEAD` support.
293///
294/// Honours a single `Range: bytes=…` header (`206 Partial Content` with a
295/// `Content-Range`), advertises `Accept-Ranges: bytes`, and lets actix's
296/// dispatcher answer `HEAD` with the same headers (including the computed
297/// `Content-Length`) but no body. This is what lets DuckDB's `httpfs` read
298/// only the Parquet footer for a `count(*)` instead of the whole file.
299pub fn serve_bytes_with_range(
300 http: &HttpRequest,
301 body: Arc<bytes::Bytes>,
302 content_type: &str,
303) -> HttpResponse {
304 use actix_web::http::header;
305
306 let total = body.len() as u64;
307
308 let range = http
309 .headers()
310 .get(header::RANGE)
311 .and_then(|h| h.to_str().ok());
312
313 match range.map(|r| parse_byte_range(r, total)) {
314 // Unsatisfiable byte range → 416 with the total size.
315 Some(Err(())) => HttpResponse::RangeNotSatisfiable()
316 .insert_header((header::CONTENT_RANGE, format!("bytes */{total}")))
317 .insert_header((header::ACCEPT_RANGES, "bytes"))
318 .finish(),
319 // Satisfiable single range → 206 Partial Content. For a HEAD the
320 // dispatcher drops the body bytes but keeps the Content-Length
321 // derived from the slice, so range probes still see the right size.
322 Some(Ok(Some(ByteRange { start, end }))) => HttpResponse::PartialContent()
323 .insert_header((header::CONTENT_TYPE, content_type.to_string()))
324 .insert_header((header::ACCEPT_RANGES, "bytes"))
325 .insert_header((header::CONTENT_RANGE, format!("bytes {start}-{end}/{total}")))
326 .body(body.slice(start as usize..(end as usize + 1))),
327 // No (parseable) range → full body with 200.
328 _ => HttpResponse::Ok()
329 .insert_header((header::CONTENT_TYPE, content_type.to_string()))
330 .insert_header((header::ACCEPT_RANGES, "bytes"))
331 .body(bytes::Bytes::clone(&body)),
332 }
333}