Skip to main content

datapress_core/handlers/
mod.rs

1//! HTTP handler surface, organised by API version.
2//!
3//! ## Layout
4//!
5//! - This module hosts **unversioned** endpoints (liveness / readiness /
6//!   `/health`) plus shared utilities used by every version (content
7//!   negotiation, the [`BackendData`] extractor type, the Arrow IPC MIME
8//!   constant).
9//! - Each API version lives in its own submodule ([`v1`], future
10//!   `v2`, …). Versions expose a single [`actix_web::web::ServiceConfig`]
11//!   registration function so the server can mount them under a scope:
12//!
13//!   ```ignore
14//!   App::new()
15//!       .service(web::scope("/api/v1").configure(handlers::v1::configure))
16//!   ```
17//!
18//! ## Adding a new version
19//!
20//! 1. Copy `v1.rs` to `v2.rs` and adjust the request / response handlers.
21//! 2. Add `pub mod v2;` below.
22//! 3. Mount it in [`crate::server::serve`] under `/api/v2`.
23//! 4. Decide whether `v1` should be kept (it usually is, for a deprecation
24//!    window) or removed.
25//!
26//! Handlers inside a version module are plain `async fn` (no route
27//! macros) so the same handler can be re-mounted in multiple scopes —
28//! that's how the legacy un-versioned `/api/datasets/...` alias works
29//! without duplicating code.
30
31use std::sync::Arc;
32
33use actix_web::{HttpRequest, HttpResponse, get, web};
34
35use crate::backend::Backend;
36
37pub mod v1;
38
39/// Convenience alias — every handler extracts the backend through this.
40pub type BackendData = web::Data<Arc<dyn Backend>>;/// Query-related limits copied from `[server]` config into Actix app data.
41#[derive(Debug, Clone, Copy)]
42pub struct QueryLimits {
43    pub max_page_size: u64,
44}
45
46impl Default for QueryLimits {
47    fn default() -> Self {
48        Self {
49            max_page_size: 100_000,
50        }
51    }
52}
53
54/// Raw-SQL endpoint settings copied from `[sql]` config into Actix app
55/// data. When `enabled` is false the `POST /api/v1/sql` handler returns
56/// `404` and never touches the engine.
57#[derive(Debug, Clone, Copy)]
58pub struct SqlSettings {
59    pub enabled: bool,
60    pub max_rows: u64,
61}
62
63impl Default for SqlSettings {
64    fn default() -> Self {
65        Self {
66            enabled: false,
67            max_rows: 100_000,
68        }
69    }
70}
71
72/// MIME type used for Arrow IPC stream responses.
73pub const ARROW_IPC_MIME: &str = "application/vnd.apache.arrow.stream";#[get("/health")]
74pub async fn health() -> HttpResponse {
75    HttpResponse::Ok()
76        .content_type("application/json")
77        .body(r#"{"status":"ok"}"#)
78}
79
80/// Liveness probe. Mounted outside the configured `prefix` at a fixed
81/// path so orchestrators don't need to know how the server is exposed.
82#[get("/healthz")]
83pub async fn healthz() -> HttpResponse {
84    HttpResponse::Ok()
85        .content_type("application/json")
86        .body(r#"{"status":"ok"}"#)
87}
88
89/// Readiness probe. Returns `200` once at least one dataset is registered
90/// (i.e. the registry finished loading at startup), `503` otherwise.
91#[get("/readyz")]
92pub async fn readyz(backend: BackendData) -> HttpResponse {
93    let names = backend.names();
94    if names.is_empty() {
95        HttpResponse::ServiceUnavailable()
96            .content_type("application/json")
97            .body(r#"{"status":"not ready","reason":"no datasets registered"}"#)
98    } else {
99        let body = format!(r#"{{"status":"ready","datasets":{}}}"#, names.len());
100        HttpResponse::Ok()
101            .content_type("application/json")
102            .body(body)
103    }
104}
105
106/// Build / version metadata published by [`version`] at `/version`.
107///
108/// Populated once by [`crate::server::serve`] from compile-time
109/// constants (`CARGO_PKG_*`) and optional build-time env vars
110/// (`DATAPRESS_GIT_SHA`, `DATAPRESS_BUILD_TIME`), and stored in actix
111/// app data. The handler just serialises it to JSON.
112#[derive(Clone, Debug, serde::Serialize)]
113pub struct BuildInfo {
114    /// Crate name (e.g. `"datapress-core"`).
115    pub name: &'static str,
116    /// Crate version from `CARGO_PKG_VERSION` (e.g. `"0.1.17"`).
117    pub version: &'static str,
118    /// Human-readable backend label — `"DuckDB"` or `"DataFusion"`.
119    pub backend: &'static str,
120    /// Git commit SHA the binary was built from. `None` when
121    /// `DATAPRESS_GIT_SHA` was not set at build time.
122    #[serde(skip_serializing_if = "Option::is_none")]
123    pub git_sha: Option<&'static str>,
124    /// ISO-8601 build timestamp. `None` when `DATAPRESS_BUILD_TIME`
125    /// was not set at build time.
126    #[serde(skip_serializing_if = "Option::is_none")]
127    pub build_time: Option<&'static str>,
128    /// `"debug"` or `"release"`, derived from `cfg!(debug_assertions)`.
129    pub profile: &'static str,
130    /// Rust target triple the binary was built for (e.g.
131    /// `"aarch64-apple-darwin"`). `None` when `DATAPRESS_TARGET` was
132    /// not set at build time.
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub target: Option<&'static str>,
135}
136
137impl BuildInfo {
138    /// Build a `BuildInfo` populated from compile-time constants. The
139    /// caller supplies the backend `label` (the binaries know which
140    /// they are; this crate doesn't).
141    pub fn new(backend: &'static str) -> Self {
142        Self {
143            name: env!("CARGO_PKG_NAME"),
144            version: env!("CARGO_PKG_VERSION"),
145            backend,
146            git_sha: option_env!("DATAPRESS_GIT_SHA"),
147            build_time: option_env!("DATAPRESS_BUILD_TIME"),
148            profile: if cfg!(debug_assertions) {
149                "debug"
150            } else {
151                "release"
152            },
153            target: option_env!("DATAPRESS_TARGET"),
154        }
155    }
156}
157
158/// Build / version info. Mounted unprefixed so orchestrators and
159/// release-tracking tools can hit it without knowing how the server
160/// is exposed. Always returns `200` with a JSON object.
161#[get("/version")]
162pub async fn version(info: web::Data<BuildInfo>) -> HttpResponse {
163    HttpResponse::Ok().json(info.get_ref())
164}
165
166/// True if the caller wants Arrow IPC: either `?format=arrow` in the
167/// query string, or `Accept` lists `application/vnd.apache.arrow.stream`.
168/// A bare `Accept: */*` does **not** count — JSON stays the default.
169pub(crate) fn wants_arrow(http: &HttpRequest) -> bool {
170    let qs = http.query_string();
171    if !qs.is_empty()
172        && qs.split('&').any(|kv| matches!(kv.split_once('='), Some(("format", v)) if v.eq_ignore_ascii_case("arrow")))
173    {
174        return true;
175    }
176    http.headers()
177        .get(actix_web::http::header::ACCEPT)
178        .and_then(|h| h.to_str().ok())
179        .map(|s| {
180            s.split(',').any(|part| {
181                part.split(';')
182                    .next()
183                    .unwrap_or("")
184                    .trim()
185                    .eq_ignore_ascii_case(ARROW_IPC_MIME)
186            })
187        })
188        .unwrap_or(false)
189}
190
191/// MIME type used for Parquet export responses.
192pub const PARQUET_MIME: &str = "application/vnd.apache.parquet";
193
194/// Process-wide cache of encoded Parquet exports, keyed by dataset name.
195///
196/// The `/datasets/{name}/parquet` endpoint serves a complete Parquet file
197/// with HTTP range support. A single client (e.g. DuckDB `httpfs`) issues
198/// several requests against it — a `HEAD` for the length, then ranged
199/// `GET`s for the footer and any row-group metadata — so every request
200/// must observe the *same* bytes. Caching the encoded file makes those
201/// requests cheap and consistent; [`crate::handlers::v1::reload_dataset`]
202/// drops the entry after a successful reload so a fresh export is built
203/// on next access.
204#[derive(Default)]
205pub struct ParquetCache {
206    inner: std::sync::RwLock<std::collections::HashMap<String, Arc<bytes::Bytes>>>,
207}
208
209impl ParquetCache {
210    /// Return the cached export for `name`, if one has been built.
211    pub fn get(&self, name: &str) -> Option<Arc<bytes::Bytes>> {
212        self.inner.read().ok()?.get(name).cloned()
213    }
214
215    /// Store `bytes` as the export for `name`, returning the stored handle.
216    pub fn insert(&self, name: &str, bytes: bytes::Bytes) -> Arc<bytes::Bytes> {
217        let shared = Arc::new(bytes);
218        if let Ok(mut map) = self.inner.write() {
219            map.insert(name.to_string(), shared.clone());
220        }
221        shared
222    }
223
224    /// Drop the cached export for `name` (no-op if absent).
225    pub fn invalidate(&self, name: &str) {
226        if let Ok(mut map) = self.inner.write() {
227            map.remove(name);
228        }
229    }
230}
231
232/// A single byte range resolved against a body of `total` bytes.
233struct ByteRange {
234    start: u64,
235    /// Inclusive end offset.
236    end: u64,
237}
238
239/// Parse a single-range HTTP `Range: bytes=…` header against a body of
240/// `total` bytes.
241///
242/// Returns:
243/// - `Ok(None)` when there is no (parseable) byte range — the caller
244///   should serve the full body with `200`.
245/// - `Ok(Some(range))` for a satisfiable single range — serve `206`.
246/// - `Err(())` when the range is syntactically a `bytes=` range but
247///   unsatisfiable — the caller should answer `416`.
248///
249/// Only the first range of a multi-range header is honoured; multi-range
250/// `multipart/byteranges` responses are intentionally not implemented, so
251/// such requests fall back to the full body.
252fn parse_byte_range(header: &str, total: u64) -> Result<Option<ByteRange>, ()> {
253    let spec = match header.trim().strip_prefix("bytes=") {
254        Some(s) => s.trim(),
255        None => return Ok(None),
256    };
257    // Take the first range only.
258    let first = spec.split(',').next().unwrap_or("").trim();
259    let (start_s, end_s) = match first.split_once('-') {
260        Some(parts) => parts,
261        None => return Ok(None),
262    };
263
264    if total == 0 {
265        return Err(());
266    }
267
268    let (start, end) = if start_s.is_empty() {
269        // Suffix range: `-N` → last N bytes.
270        let n: u64 = end_s.trim().parse().map_err(|_| ())?;
271        if n == 0 {
272            return Err(());
273        }
274        let n = n.min(total);
275        (total - n, total - 1)
276    } else {
277        let start: u64 = start_s.trim().parse().map_err(|_| ())?;
278        let end: u64 = if end_s.trim().is_empty() {
279            total - 1
280        } else {
281            end_s.trim().parse::<u64>().map_err(|_| ())?.min(total - 1)
282        };
283        (start, end)
284    };
285
286    if start > end || start >= total {
287        return Err(());
288    }
289    Ok(Some(ByteRange { start, end }))
290}
291
292/// Serve `body` as an HTTP response with range + `HEAD` support.
293///
294/// Honours a single `Range: bytes=…` header (`206 Partial Content` with a
295/// `Content-Range`), advertises `Accept-Ranges: bytes`, and lets actix's
296/// dispatcher answer `HEAD` with the same headers (including the computed
297/// `Content-Length`) but no body. This is what lets DuckDB's `httpfs` read
298/// only the Parquet footer for a `count(*)` instead of the whole file.
299pub fn serve_bytes_with_range(
300    http: &HttpRequest,
301    body: Arc<bytes::Bytes>,
302    content_type: &str,
303) -> HttpResponse {
304    use actix_web::http::header;
305
306    let total = body.len() as u64;
307
308    let range = http
309        .headers()
310        .get(header::RANGE)
311        .and_then(|h| h.to_str().ok());
312
313    match range.map(|r| parse_byte_range(r, total)) {
314        // Unsatisfiable byte range → 416 with the total size.
315        Some(Err(())) => HttpResponse::RangeNotSatisfiable()
316            .insert_header((header::CONTENT_RANGE, format!("bytes */{total}")))
317            .insert_header((header::ACCEPT_RANGES, "bytes"))
318            .finish(),
319        // Satisfiable single range → 206 Partial Content. For a HEAD the
320        // dispatcher drops the body bytes but keeps the Content-Length
321        // derived from the slice, so range probes still see the right size.
322        Some(Ok(Some(ByteRange { start, end }))) => HttpResponse::PartialContent()
323            .insert_header((header::CONTENT_TYPE, content_type.to_string()))
324            .insert_header((header::ACCEPT_RANGES, "bytes"))
325            .insert_header((header::CONTENT_RANGE, format!("bytes {start}-{end}/{total}")))
326            .body(body.slice(start as usize..(end as usize + 1))),
327        // No (parseable) range → full body with 200.
328        _ => HttpResponse::Ok()
329            .insert_header((header::CONTENT_TYPE, content_type.to_string()))
330            .insert_header((header::ACCEPT_RANGES, "bytes"))
331            .body(bytes::Bytes::clone(&body)),
332    }
333}