datapress_core/handlers/mod.rs
1//! HTTP handler surface, organised by API version.
2//!
3//! ## Layout
4//!
5//! - This module hosts **unversioned** endpoints (liveness / readiness /
6//! `/health`) plus shared utilities used by every version (content
7//! negotiation, the [`BackendData`] extractor type, the Arrow IPC MIME
8//! constant).
9//! - Each API version lives in its own submodule ([`v1`], future
10//! `v2`, …). Versions expose a single [`actix_web::web::ServiceConfig`]
11//! registration function so the server can mount them under a scope:
12//!
13//! ```ignore
14//! App::new()
15//! .service(web::scope("/api/v1").configure(handlers::v1::configure))
16//! ```
17//!
18//! ## Adding a new version
19//!
20//! 1. Copy `v1.rs` to `v2.rs` and adjust the request / response handlers.
21//! 2. Add `pub mod v2;` below.
22//! 3. Mount it in [`crate::server::serve`] under `/api/v2`.
23//! 4. Decide whether `v1` should be kept (it usually is, for a deprecation
24//! window) or removed.
25//!
26//! Handlers inside a version module are plain `async fn` (no route
27//! macros) so the same handler can be re-mounted in multiple scopes —
28//! that's how the legacy un-versioned `/api/datasets/...` alias works
29//! without duplicating code.
30
31use std::sync::Arc;
32
33use actix_web::{HttpRequest, HttpResponse, get, web};
34
35use crate::backend::Backend;
36
37pub mod v1;
38
39/// Convenience alias — every handler extracts the backend through this.
40pub type BackendData = web::Data<Arc<dyn Backend>>;/// Query-related limits copied from `[server]` config into Actix app data.
41#[derive(Debug, Clone, Copy)]
42pub struct QueryLimits {
43 pub max_page_size: u64,
44}
45
46impl Default for QueryLimits {
47 fn default() -> Self {
48 Self {
49 max_page_size: 100_000,
50 }
51 }
52}
53
54/// Raw-SQL endpoint settings copied from `[sql]` config into Actix app
55/// data. When `enabled` is false the `POST /api/v1/sql` handler returns
56/// `404` and never touches the engine.
57#[derive(Debug, Clone, Copy)]
58pub struct SqlSettings {
59 pub enabled: bool,
60 pub max_rows: u64,
61}
62
63impl Default for SqlSettings {
64 fn default() -> Self {
65 Self {
66 enabled: false,
67 max_rows: 100_000,
68 }
69 }
70}
71
72/// MIME type used for Arrow IPC stream responses.
73pub const ARROW_IPC_MIME: &str = "application/vnd.apache.arrow.stream";#[get("/health")]
74pub async fn health() -> HttpResponse {
75 HttpResponse::Ok()
76 .content_type("application/json")
77 .body(r#"{"status":"ok"}"#)
78}
79
80/// Liveness probe. Mounted outside the configured `prefix` at a fixed
81/// path so orchestrators don't need to know how the server is exposed.
82#[get("/healthz")]
83pub async fn healthz() -> HttpResponse {
84 HttpResponse::Ok()
85 .content_type("application/json")
86 .body(r#"{"status":"ok"}"#)
87}
88
89/// Readiness probe. Returns `200` once at least one dataset is registered
90/// (i.e. the registry finished loading at startup), `503` otherwise.
91#[get("/readyz")]
92pub async fn readyz(backend: BackendData) -> HttpResponse {
93 let names = backend.names();
94 if names.is_empty() {
95 HttpResponse::ServiceUnavailable()
96 .content_type("application/json")
97 .body(r#"{"status":"not ready","reason":"no datasets registered"}"#)
98 } else {
99 let body = format!(r#"{{"status":"ready","datasets":{}}}"#, names.len());
100 HttpResponse::Ok()
101 .content_type("application/json")
102 .body(body)
103 }
104}
105
106/// Build / version metadata published by [`version`] at `/version`.
107///
108/// Populated once by [`crate::server::serve`] from compile-time
109/// constants (`CARGO_PKG_*`) and optional build-time env vars
110/// (`DATAPRESS_GIT_SHA`, `DATAPRESS_BUILD_TIME`), and stored in actix
111/// app data. The handler just serialises it to JSON.
112#[derive(Clone, Debug, serde::Serialize)]
113pub struct BuildInfo {
114 /// Crate name (e.g. `"datapress-core"`).
115 pub name: &'static str,
116 /// Crate version from `CARGO_PKG_VERSION` (e.g. `"0.1.17"`).
117 pub version: &'static str,
118 /// Human-readable backend label — `"DuckDB"` or `"DataFusion"`.
119 pub backend: &'static str,
120 /// Git commit SHA the binary was built from. `None` when
121 /// `DATAPRESS_GIT_SHA` was not set at build time.
122 #[serde(skip_serializing_if = "Option::is_none")]
123 pub git_sha: Option<&'static str>,
124 /// ISO-8601 build timestamp. `None` when `DATAPRESS_BUILD_TIME`
125 /// was not set at build time.
126 #[serde(skip_serializing_if = "Option::is_none")]
127 pub build_time: Option<&'static str>,
128 /// `"debug"` or `"release"`, derived from `cfg!(debug_assertions)`.
129 pub profile: &'static str,
130 /// Rust target triple the binary was built for (e.g.
131 /// `"aarch64-apple-darwin"`). `None` when `DATAPRESS_TARGET` was
132 /// not set at build time.
133 #[serde(skip_serializing_if = "Option::is_none")]
134 pub target: Option<&'static str>,
135}
136
137impl BuildInfo {
138 /// Build a `BuildInfo` populated from compile-time constants. The
139 /// caller supplies the backend `label` (the binaries know which
140 /// they are; this crate doesn't).
141 pub fn new(backend: &'static str) -> Self {
142 Self {
143 name: env!("CARGO_PKG_NAME"),
144 version: env!("CARGO_PKG_VERSION"),
145 backend,
146 git_sha: option_env!("DATAPRESS_GIT_SHA"),
147 build_time: option_env!("DATAPRESS_BUILD_TIME"),
148 profile: if cfg!(debug_assertions) {
149 "debug"
150 } else {
151 "release"
152 },
153 target: option_env!("DATAPRESS_TARGET"),
154 }
155 }
156}
157
158/// Build / version info. Mounted unprefixed so orchestrators and
159/// release-tracking tools can hit it without knowing how the server
160/// is exposed. Always returns `200` with a JSON object.
161#[get("/version")]
162pub async fn version(info: web::Data<BuildInfo>) -> HttpResponse {
163 HttpResponse::Ok().json(info.get_ref())
164}
165
166/// True if the caller wants Arrow IPC: either `?format=arrow` in the
167/// query string, or `Accept` lists `application/vnd.apache.arrow.stream`.
168/// A bare `Accept: */*` does **not** count — JSON stays the default.
169pub(crate) fn wants_arrow(http: &HttpRequest) -> bool {
170 let qs = http.query_string();
171 if !qs.is_empty()
172 && qs.split('&').any(|kv| matches!(kv.split_once('='), Some(("format", v)) if v.eq_ignore_ascii_case("arrow")))
173 {
174 return true;
175 }
176 http.headers()
177 .get(actix_web::http::header::ACCEPT)
178 .and_then(|h| h.to_str().ok())
179 .map(|s| {
180 s.split(',').any(|part| {
181 part.split(';')
182 .next()
183 .unwrap_or("")
184 .trim()
185 .eq_ignore_ascii_case(ARROW_IPC_MIME)
186 })
187 })
188 .unwrap_or(false)
189}
190
191/// True if the caller wants to skip server-side HTTP compression for this
192/// response: either `?compress=false` (also `0`/`no`/`off`) in the query
193/// string, or an `X-No-Compress` request header. Browsers can't override
194/// `Accept-Encoding` from `fetch`, so this gives a page-settable escape
195/// hatch — handlers translate it into a `Content-Encoding: identity`
196/// response header, which actix's `Compress` middleware treats as a
197/// signal to leave the body untouched.
198pub(crate) fn wants_no_compression(http: &HttpRequest) -> bool {
199 let qs = http.query_string();
200 if !qs.is_empty()
201 && qs.split('&').any(|kv| {
202 matches!(
203 kv.split_once('='),
204 Some(("compress", v))
205 if v.eq_ignore_ascii_case("false")
206 || v == "0"
207 || v.eq_ignore_ascii_case("no")
208 || v.eq_ignore_ascii_case("off")
209 )
210 })
211 {
212 return true;
213 }
214 http.headers().contains_key("x-no-compress")
215}
216
217/// MIME type used for Parquet export responses.
218pub const PARQUET_MIME: &str = "application/vnd.apache.parquet";
219
220/// Process-wide cache of encoded Parquet exports, keyed by dataset name.
221///
222/// The `/datasets/{name}/parquet` endpoint serves a complete Parquet file
223/// with HTTP range support. A single client (e.g. DuckDB `httpfs`) issues
224/// several requests against it — a `HEAD` for the length, then ranged
225/// `GET`s for the footer and any row-group metadata — so every request
226/// must observe the *same* bytes. Caching the encoded file makes those
227/// requests cheap and consistent; [`crate::handlers::v1::reload_dataset`]
228/// drops the entry after a successful reload so a fresh export is built
229/// on next access.
230#[derive(Default)]
231pub struct ParquetCache {
232 inner: std::sync::RwLock<std::collections::HashMap<String, Arc<bytes::Bytes>>>,
233}
234
235impl ParquetCache {
236 /// Return the cached export for `name`, if one has been built.
237 pub fn get(&self, name: &str) -> Option<Arc<bytes::Bytes>> {
238 self.inner.read().ok()?.get(name).cloned()
239 }
240
241 /// Store `bytes` as the export for `name`, returning the stored handle.
242 pub fn insert(&self, name: &str, bytes: bytes::Bytes) -> Arc<bytes::Bytes> {
243 let shared = Arc::new(bytes);
244 if let Ok(mut map) = self.inner.write() {
245 map.insert(name.to_string(), shared.clone());
246 }
247 shared
248 }
249
250 /// Drop the cached export for `name` (no-op if absent).
251 pub fn invalidate(&self, name: &str) {
252 if let Ok(mut map) = self.inner.write() {
253 map.remove(name);
254 }
255 }
256}
257
258/// A single byte range resolved against a body of `total` bytes.
259struct ByteRange {
260 start: u64,
261 /// Inclusive end offset.
262 end: u64,
263}
264
265/// Parse a single-range HTTP `Range: bytes=…` header against a body of
266/// `total` bytes.
267///
268/// Returns:
269/// - `Ok(None)` when there is no (parseable) byte range — the caller
270/// should serve the full body with `200`.
271/// - `Ok(Some(range))` for a satisfiable single range — serve `206`.
272/// - `Err(())` when the range is syntactically a `bytes=` range but
273/// unsatisfiable — the caller should answer `416`.
274///
275/// Only the first range of a multi-range header is honoured; multi-range
276/// `multipart/byteranges` responses are intentionally not implemented, so
277/// such requests fall back to the full body.
278fn parse_byte_range(header: &str, total: u64) -> Result<Option<ByteRange>, ()> {
279 let spec = match header.trim().strip_prefix("bytes=") {
280 Some(s) => s.trim(),
281 None => return Ok(None),
282 };
283 // Take the first range only.
284 let first = spec.split(',').next().unwrap_or("").trim();
285 let (start_s, end_s) = match first.split_once('-') {
286 Some(parts) => parts,
287 None => return Ok(None),
288 };
289
290 if total == 0 {
291 return Err(());
292 }
293
294 let (start, end) = if start_s.is_empty() {
295 // Suffix range: `-N` → last N bytes.
296 let n: u64 = end_s.trim().parse().map_err(|_| ())?;
297 if n == 0 {
298 return Err(());
299 }
300 let n = n.min(total);
301 (total - n, total - 1)
302 } else {
303 let start: u64 = start_s.trim().parse().map_err(|_| ())?;
304 let end: u64 = if end_s.trim().is_empty() {
305 total - 1
306 } else {
307 end_s.trim().parse::<u64>().map_err(|_| ())?.min(total - 1)
308 };
309 (start, end)
310 };
311
312 if start > end || start >= total {
313 return Err(());
314 }
315 Ok(Some(ByteRange { start, end }))
316}
317
318/// Serve `body` as an HTTP response with range + `HEAD` support.
319///
320/// Honours a single `Range: bytes=…` header (`206 Partial Content` with a
321/// `Content-Range`), advertises `Accept-Ranges: bytes`, and lets actix's
322/// dispatcher answer `HEAD` with the same headers (including the computed
323/// `Content-Length`) but no body. This is what lets DuckDB's `httpfs` read
324/// only the Parquet footer for a `count(*)` instead of the whole file.
325pub fn serve_bytes_with_range(
326 http: &HttpRequest,
327 body: Arc<bytes::Bytes>,
328 content_type: &str,
329) -> HttpResponse {
330 use actix_web::http::header;
331
332 let total = body.len() as u64;
333
334 let range = http
335 .headers()
336 .get(header::RANGE)
337 .and_then(|h| h.to_str().ok());
338
339 match range.map(|r| parse_byte_range(r, total)) {
340 // Unsatisfiable byte range → 416 with the total size.
341 Some(Err(())) => HttpResponse::RangeNotSatisfiable()
342 .insert_header((header::CONTENT_RANGE, format!("bytes */{total}")))
343 .insert_header((header::ACCEPT_RANGES, "bytes"))
344 .finish(),
345 // Satisfiable single range → 206 Partial Content. For a HEAD the
346 // dispatcher drops the body bytes but keeps the Content-Length
347 // derived from the slice, so range probes still see the right size.
348 Some(Ok(Some(ByteRange { start, end }))) => HttpResponse::PartialContent()
349 .insert_header((header::CONTENT_TYPE, content_type.to_string()))
350 .insert_header((header::ACCEPT_RANGES, "bytes"))
351 .insert_header((header::CONTENT_RANGE, format!("bytes {start}-{end}/{total}")))
352 .body(body.slice(start as usize..(end as usize + 1))),
353 // No (parseable) range → full body with 200.
354 _ => HttpResponse::Ok()
355 .insert_header((header::CONTENT_TYPE, content_type.to_string()))
356 .insert_header((header::ACCEPT_RANGES, "bytes"))
357 .body(bytes::Bytes::clone(&body)),
358 }
359}