Skip to main content

datapress_core/handlers/
mod.rs

1//! HTTP handler surface, organised by API version.
2//!
3//! ## Layout
4//!
5//! - This module hosts **unversioned** endpoints (liveness / readiness /
6//!   `/health`) plus shared utilities used by every version (content
7//!   negotiation, the [`BackendData`] extractor type, the Arrow IPC MIME
8//!   constant).
9//! - Each API version lives in its own submodule ([`v1`], future
10//!   `v2`, …). Versions expose a single [`actix_web::web::ServiceConfig`]
11//!   registration function so the server can mount them under a scope:
12//!
13//!   ```ignore
14//!   App::new()
15//!       .service(web::scope("/api/v1").configure(handlers::v1::configure))
16//!   ```
17//!
18//! ## Adding a new version
19//!
20//! 1. Copy `v1.rs` to `v2.rs` and adjust the request / response handlers.
21//! 2. Add `pub mod v2;` below.
22//! 3. Mount it in [`crate::server::serve`] under `/api/v2`.
23//! 4. Decide whether `v1` should be kept (it usually is, for a deprecation
24//!    window) or removed.
25//!
26//! Handlers inside a version module are plain `async fn` (no route
27//! macros) so the same handler can be re-mounted in multiple scopes —
28//! that's how the legacy un-versioned `/api/datasets/...` alias works
29//! without duplicating code.
30
31use std::sync::Arc;
32
33use actix_web::{HttpRequest, HttpResponse, get, web};
34
35use crate::backend::Backend;
36
37pub mod v1;
38
39/// Convenience alias — every handler extracts the backend through this.
40pub type BackendData = web::Data<Arc<dyn Backend>>;/// Query-related limits copied from `[server]` config into Actix app data.
41#[derive(Debug, Clone, Copy)]
42pub struct QueryLimits {
43    pub max_page_size: u64,
44}
45
46impl Default for QueryLimits {
47    fn default() -> Self {
48        Self {
49            max_page_size: 100_000,
50        }
51    }
52}
53
54/// MIME type used for Arrow IPC stream responses.
55pub const ARROW_IPC_MIME: &str = "application/vnd.apache.arrow.stream";#[get("/health")]
56pub async fn health() -> HttpResponse {
57    HttpResponse::Ok()
58        .content_type("application/json")
59        .body(r#"{"status":"ok"}"#)
60}
61
62/// Liveness probe. Mounted outside the configured `prefix` at a fixed
63/// path so orchestrators don't need to know how the server is exposed.
64#[get("/healthz")]
65pub async fn healthz() -> HttpResponse {
66    HttpResponse::Ok()
67        .content_type("application/json")
68        .body(r#"{"status":"ok"}"#)
69}
70
71/// Readiness probe. Returns `200` once at least one dataset is registered
72/// (i.e. the registry finished loading at startup), `503` otherwise.
73#[get("/readyz")]
74pub async fn readyz(backend: BackendData) -> HttpResponse {
75    let names = backend.names();
76    if names.is_empty() {
77        HttpResponse::ServiceUnavailable()
78            .content_type("application/json")
79            .body(r#"{"status":"not ready","reason":"no datasets registered"}"#)
80    } else {
81        let body = format!(r#"{{"status":"ready","datasets":{}}}"#, names.len());
82        HttpResponse::Ok()
83            .content_type("application/json")
84            .body(body)
85    }
86}
87
88/// Build / version metadata published by [`version`] at `/version`.
89///
90/// Populated once by [`crate::server::serve`] from compile-time
91/// constants (`CARGO_PKG_*`) and optional build-time env vars
92/// (`DATAPRESS_GIT_SHA`, `DATAPRESS_BUILD_TIME`), and stored in actix
93/// app data. The handler just serialises it to JSON.
94#[derive(Clone, Debug, serde::Serialize)]
95pub struct BuildInfo {
96    /// Crate name (e.g. `"datapress-core"`).
97    pub name: &'static str,
98    /// Crate version from `CARGO_PKG_VERSION` (e.g. `"0.1.17"`).
99    pub version: &'static str,
100    /// Human-readable backend label — `"DuckDB"` or `"DataFusion"`.
101    pub backend: &'static str,
102    /// Git commit SHA the binary was built from. `None` when
103    /// `DATAPRESS_GIT_SHA` was not set at build time.
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub git_sha: Option<&'static str>,
106    /// ISO-8601 build timestamp. `None` when `DATAPRESS_BUILD_TIME`
107    /// was not set at build time.
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pub build_time: Option<&'static str>,
110    /// `"debug"` or `"release"`, derived from `cfg!(debug_assertions)`.
111    pub profile: &'static str,
112    /// Rust target triple the binary was built for (e.g.
113    /// `"aarch64-apple-darwin"`). `None` when `DATAPRESS_TARGET` was
114    /// not set at build time.
115    #[serde(skip_serializing_if = "Option::is_none")]
116    pub target: Option<&'static str>,
117}
118
119impl BuildInfo {
120    /// Build a `BuildInfo` populated from compile-time constants. The
121    /// caller supplies the backend `label` (the binaries know which
122    /// they are; this crate doesn't).
123    pub fn new(backend: &'static str) -> Self {
124        Self {
125            name: env!("CARGO_PKG_NAME"),
126            version: env!("CARGO_PKG_VERSION"),
127            backend,
128            git_sha: option_env!("DATAPRESS_GIT_SHA"),
129            build_time: option_env!("DATAPRESS_BUILD_TIME"),
130            profile: if cfg!(debug_assertions) {
131                "debug"
132            } else {
133                "release"
134            },
135            target: option_env!("DATAPRESS_TARGET"),
136        }
137    }
138}
139
140/// Build / version info. Mounted unprefixed so orchestrators and
141/// release-tracking tools can hit it without knowing how the server
142/// is exposed. Always returns `200` with a JSON object.
143#[get("/version")]
144pub async fn version(info: web::Data<BuildInfo>) -> HttpResponse {
145    HttpResponse::Ok().json(info.get_ref())
146}
147
148/// True if the caller wants Arrow IPC: either `?format=arrow` in the
149/// query string, or `Accept` lists `application/vnd.apache.arrow.stream`.
150/// A bare `Accept: */*` does **not** count — JSON stays the default.
151pub(crate) fn wants_arrow(http: &HttpRequest) -> bool {
152    let qs = http.query_string();
153    if !qs.is_empty()
154        && qs.split('&').any(|kv| matches!(kv.split_once('='), Some(("format", v)) if v.eq_ignore_ascii_case("arrow")))
155    {
156        return true;
157    }
158    http.headers()
159        .get(actix_web::http::header::ACCEPT)
160        .and_then(|h| h.to_str().ok())
161        .map(|s| {
162            s.split(',').any(|part| {
163                part.split(';')
164                    .next()
165                    .unwrap_or("")
166                    .trim()
167                    .eq_ignore_ascii_case(ARROW_IPC_MIME)
168            })
169        })
170        .unwrap_or(false)
171}
172
173/// MIME type used for Parquet export responses.
174pub const PARQUET_MIME: &str = "application/vnd.apache.parquet";
175
176/// Process-wide cache of encoded Parquet exports, keyed by dataset name.
177///
178/// The `/datasets/{name}/parquet` endpoint serves a complete Parquet file
179/// with HTTP range support. A single client (e.g. DuckDB `httpfs`) issues
180/// several requests against it — a `HEAD` for the length, then ranged
181/// `GET`s for the footer and any row-group metadata — so every request
182/// must observe the *same* bytes. Caching the encoded file makes those
183/// requests cheap and consistent; [`crate::handlers::v1::reload_dataset`]
184/// drops the entry after a successful reload so a fresh export is built
185/// on next access.
186#[derive(Default)]
187pub struct ParquetCache {
188    inner: std::sync::RwLock<std::collections::HashMap<String, Arc<bytes::Bytes>>>,
189}
190
191impl ParquetCache {
192    /// Return the cached export for `name`, if one has been built.
193    pub fn get(&self, name: &str) -> Option<Arc<bytes::Bytes>> {
194        self.inner.read().ok()?.get(name).cloned()
195    }
196
197    /// Store `bytes` as the export for `name`, returning the stored handle.
198    pub fn insert(&self, name: &str, bytes: bytes::Bytes) -> Arc<bytes::Bytes> {
199        let shared = Arc::new(bytes);
200        if let Ok(mut map) = self.inner.write() {
201            map.insert(name.to_string(), shared.clone());
202        }
203        shared
204    }
205
206    /// Drop the cached export for `name` (no-op if absent).
207    pub fn invalidate(&self, name: &str) {
208        if let Ok(mut map) = self.inner.write() {
209            map.remove(name);
210        }
211    }
212}
213
214/// A single byte range resolved against a body of `total` bytes.
215struct ByteRange {
216    start: u64,
217    /// Inclusive end offset.
218    end: u64,
219}
220
221/// Parse a single-range HTTP `Range: bytes=…` header against a body of
222/// `total` bytes.
223///
224/// Returns:
225/// - `Ok(None)` when there is no (parseable) byte range — the caller
226///   should serve the full body with `200`.
227/// - `Ok(Some(range))` for a satisfiable single range — serve `206`.
228/// - `Err(())` when the range is syntactically a `bytes=` range but
229///   unsatisfiable — the caller should answer `416`.
230///
231/// Only the first range of a multi-range header is honoured; multi-range
232/// `multipart/byteranges` responses are intentionally not implemented, so
233/// such requests fall back to the full body.
234fn parse_byte_range(header: &str, total: u64) -> Result<Option<ByteRange>, ()> {
235    let spec = match header.trim().strip_prefix("bytes=") {
236        Some(s) => s.trim(),
237        None => return Ok(None),
238    };
239    // Take the first range only.
240    let first = spec.split(',').next().unwrap_or("").trim();
241    let (start_s, end_s) = match first.split_once('-') {
242        Some(parts) => parts,
243        None => return Ok(None),
244    };
245
246    if total == 0 {
247        return Err(());
248    }
249
250    let (start, end) = if start_s.is_empty() {
251        // Suffix range: `-N` → last N bytes.
252        let n: u64 = end_s.trim().parse().map_err(|_| ())?;
253        if n == 0 {
254            return Err(());
255        }
256        let n = n.min(total);
257        (total - n, total - 1)
258    } else {
259        let start: u64 = start_s.trim().parse().map_err(|_| ())?;
260        let end: u64 = if end_s.trim().is_empty() {
261            total - 1
262        } else {
263            end_s.trim().parse::<u64>().map_err(|_| ())?.min(total - 1)
264        };
265        (start, end)
266    };
267
268    if start > end || start >= total {
269        return Err(());
270    }
271    Ok(Some(ByteRange { start, end }))
272}
273
274/// Serve `body` as an HTTP response with range + `HEAD` support.
275///
276/// Honours a single `Range: bytes=…` header (`206 Partial Content` with a
277/// `Content-Range`), advertises `Accept-Ranges: bytes`, and lets actix's
278/// dispatcher answer `HEAD` with the same headers (including the computed
279/// `Content-Length`) but no body. This is what lets DuckDB's `httpfs` read
280/// only the Parquet footer for a `count(*)` instead of the whole file.
281pub fn serve_bytes_with_range(
282    http: &HttpRequest,
283    body: Arc<bytes::Bytes>,
284    content_type: &str,
285) -> HttpResponse {
286    use actix_web::http::header;
287
288    let total = body.len() as u64;
289
290    let range = http
291        .headers()
292        .get(header::RANGE)
293        .and_then(|h| h.to_str().ok());
294
295    match range.map(|r| parse_byte_range(r, total)) {
296        // Unsatisfiable byte range → 416 with the total size.
297        Some(Err(())) => HttpResponse::RangeNotSatisfiable()
298            .insert_header((header::CONTENT_RANGE, format!("bytes */{total}")))
299            .insert_header((header::ACCEPT_RANGES, "bytes"))
300            .finish(),
301        // Satisfiable single range → 206 Partial Content. For a HEAD the
302        // dispatcher drops the body bytes but keeps the Content-Length
303        // derived from the slice, so range probes still see the right size.
304        Some(Ok(Some(ByteRange { start, end }))) => HttpResponse::PartialContent()
305            .insert_header((header::CONTENT_TYPE, content_type.to_string()))
306            .insert_header((header::ACCEPT_RANGES, "bytes"))
307            .insert_header((header::CONTENT_RANGE, format!("bytes {start}-{end}/{total}")))
308            .body(body.slice(start as usize..(end as usize + 1))),
309        // No (parseable) range → full body with 200.
310        _ => HttpResponse::Ok()
311            .insert_header((header::CONTENT_TYPE, content_type.to_string()))
312            .insert_header((header::ACCEPT_RANGES, "bytes"))
313            .body(bytes::Bytes::clone(&body)),
314    }
315}