datapress-core 0.2.17

Backend-agnostic core types, config, routing, and HTTP handlers for the datapress dataset server.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
//! Shared actix-web bootstrap. Both backends call [`serve`] from their
//! own thin `serve(cfg)` entry point.

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use actix_web::{App, HttpServer, middleware, web};

use crate::backend::Backend;
use crate::config::AppConfig;
use crate::handlers;
use crate::timeout::Timeout;

/// How the running server is asked to begin a graceful shutdown.
enum Shutdown {
    /// Install `SIGINT`/`SIGTERM` (or `Ctrl+C`) handlers and stop when one
    /// arrives. Used by the standalone binaries, which own the process and
    /// its signal disposition.
    Signals,
    /// Stop when the given future resolves. Used when DataPress is embedded
    /// (e.g. the Python extension), where the *host* owns signal handling
    /// and drives shutdown by completing this future. No OS signal handlers
    /// are installed, so we never fight the host's handlers.
    External(Pin<Box<dyn Future<Output = ()> + Send>>),
}

/// Bind the HTTP server, register the generic handler set against
/// `backend`, and run until the process receives `SIGINT` or `SIGTERM`.
///
/// Shutdown is **graceful**: on signal the listening socket is closed,
/// existing connections get up to `cfg.server.shutdown_timeout_secs`
/// seconds to drain in-flight requests, then workers are stopped.
///
/// `label` is the human-readable backend name used in the startup log
/// line (e.g. `"DuckDB"`, `"DataFusion"`).
pub async fn serve(cfg: AppConfig, backend: Arc<dyn Backend>, label: &str) -> std::io::Result<()> {
    run_server(cfg, backend, label, Shutdown::Signals).await
}

/// Like [`serve`], but driven to a graceful stop by `shutdown` instead of
/// OS signals.
///
/// Intended for embedding DataPress inside another runtime (the Python
/// extension's `DataPress.run()`), where installing process-global signal
/// handlers would race the host's own. The caller resolves `shutdown` —
/// for example when its asyncio task is cancelled by `Ctrl+C` — and the
/// server then drains in-flight requests within
/// `cfg.server.shutdown_timeout_secs` and returns.
pub async fn serve_with_shutdown(
    cfg: AppConfig,
    backend: Arc<dyn Backend>,
    label: &str,
    shutdown: impl Future<Output = ()> + Send + 'static,
) -> std::io::Result<()> {
    run_server(cfg, backend, label, Shutdown::External(Box::pin(shutdown))).await
}

async fn run_server(
    cfg: AppConfig,
    backend: Arc<dyn Backend>,
    label: &str,
    shutdown: Shutdown,
) -> std::io::Result<()> {
    let addr = (cfg.server.listen, cfg.server.port);
    let workers = cfg.server.workers;
    let prefix = cfg.server.prefix.clone();
    let compress = cfg.server.compress;
    let max_body = cfg.server.max_body_bytes;
    let max_page_size = cfg.server.max_page_size;
    let timeout_ms = cfg.server.request_timeout_ms;
    let shutdown_secs = cfg.server.shutdown_timeout_secs;
    let docs_cfg = cfg.docs.clone();
    let swagger_cfg = cfg.swagger.clone();
    let metrics_cfg = cfg.metrics.clone();

    // Warn (but don't fail) when the operator asked for docs in TOML but
    // this binary was built without the cargo feature that embeds them.
    #[cfg(not(feature = "docs"))]
    if docs_cfg.enabled {
        log::warn!(
            "[docs] enabled = true in config, but this binary was built \
             without --features docs; skipping docs site"
        );
    }
    #[cfg(not(feature = "swagger"))]
    if swagger_cfg.enabled {
        log::warn!(
            "[swagger] enabled = true in config, but this binary was built \
             without --features swagger; skipping Swagger UI"
        );
    }
    #[cfg(not(feature = "auth"))]
    if cfg.auth.enabled {
        log::warn!(
            "[auth] enabled = true in config, but this binary was built \
             without --features auth; skipping OIDC enforcement"
        );
    }
    #[cfg(not(feature = "metrics"))]
    if metrics_cfg.enabled {
        log::warn!(
            "[metrics] enabled = true in config, but this binary was built \
             without --features metrics; skipping Prometheus endpoint"
        );
    }

    // Boot the JWKS cache (and validate config) before binding the
    // listener. With `start_degraded = true` this only warns on an
    // unreachable IdP; with `false` it propagates the error and the
    // process exits non-zero.
    #[cfg(feature = "auth")]
    let auth_state = if cfg.auth.enabled {
        let jwks = crate::auth::JwksCache::boot(&cfg.auth)
            .await
            .map_err(|e| std::io::Error::other(format!("auth bootstrap failed: {e}")))?;
        log::info!(
            "[auth] OIDC enforcement enabled (issuer = {}, audience = {}, read_scopes = {:?}, reload_scopes = {:?})",
            cfg.auth.issuer,
            if cfg.auth.audience.is_empty() {
                "<none>"
            } else {
                cfg.auth.audience.as_str()
            },
            cfg.auth.read_scopes,
            cfg.auth.reload_scopes,
        );
        Some(crate::auth::AuthState {
            cfg: Arc::new(cfg.auth.clone()),
            jwks,
        })
    } else {
        None
    };

    log::info!(
        "Listening on http://{}:{}{} ({} backend, {} workers, compression {}, max-body {} bytes, max-page-size {}, timeout {}, shutdown grace {}s)",
        cfg.server.listen,
        cfg.server.port,
        if prefix.is_empty() {
            "".into()
        } else {
            format!("{prefix}/")
        },
        label,
        workers
            .map(|w| w.to_string())
            .unwrap_or_else(|| "auto".into()),
        if compress { "on" } else { "off" },
        max_body,
        max_page_size,
        if timeout_ms == 0 {
            "off".into()
        } else {
            format!("{timeout_ms} ms")
        },
        shutdown_secs,
    );

    log_routes(&prefix, backend.as_ref());

    #[cfg(feature = "docs")]
    if docs_cfg.enabled {
        log::info!("  {} (mkdocs site):", docs_cfg.path);
        log::info!("    GET    {}/", docs_cfg.path);
        log::info!("    GET    {}/{{path}}", docs_cfg.path);
    }

    #[cfg(feature = "swagger")]
    if swagger_cfg.enabled {
        log::info!("  {} (swagger UI):", swagger_cfg.path);
        log::info!("    GET    {}/", swagger_cfg.path);
        log::info!("    GET    {}/openapi.json", swagger_cfg.path);
    }

    // Resolve the Swagger UI's OIDC login endpoints once, before binding.
    // We emit an explicit `oauth2` authorizationCode flow in the spec (see
    // `swagger::ResolvedOAuth2`); discovering the authorize/token URLs here
    // keeps the operator-facing config to just an `issuer`. On failure we
    // log and serve the docs *without* a login button rather than shipping
    // an empty Authorize dialog.
    #[cfg(feature = "swagger")]
    let swagger_oauth2 = if swagger_cfg.enabled {
        match swagger_cfg.oauth2.as_ref() {
            Some(o) => match crate::swagger::resolve_oauth2(o).await {
                Ok(resolved) => Some(resolved),
                Err(e) => {
                    log::warn!(
                        "[swagger.oauth2] OIDC discovery for issuer {} failed ({e}); \
                         serving docs without the Authorize button",
                        o.issuer
                    );
                    None
                }
            },
            None => None,
        }
    } else {
        None
    };

    // Build the Prometheus middleware once, outside the worker closure, so
    // every worker shares a single registry (counts aggregate correctly).
    // Constructed whenever the feature is compiled; the runtime `enabled`
    // flag gates whether it is actually wrapped (and the endpoint served).
    #[cfg(feature = "metrics")]
    let prometheus = {
        use actix_web_prom::PrometheusMetricsBuilder;
        PrometheusMetricsBuilder::new("datapress")
            .endpoint(metrics_cfg.path.as_str())
            .build()
            .map_err(|e| std::io::Error::other(format!("metrics init failed: {e}")))?
    };
    #[cfg(feature = "metrics")]
    let metrics_enabled = metrics_cfg.enabled;

    #[cfg(feature = "metrics")]
    if metrics_cfg.enabled {
        log::info!("  {} (prometheus metrics):", metrics_cfg.path);
        log::info!("    GET    {}", metrics_cfg.path);
    }

    let build_info = web::Data::new(handlers::BuildInfo::new(
        // `&'static str` so it fits BuildInfo's compile-time fields.
        // The match keeps this generic enough for future backends.
        match label {
            "DuckDB" => "DuckDB",
            "DataFusion" => "DataFusion",
            _ => "unknown",
        },
    ));

    let mut server = HttpServer::new(move || {
        let backend = backend.clone();
        let prefix = prefix.clone();
        let json_cfg = web::JsonConfig::default().limit(max_body);
        let pay_cfg = web::PayloadConfig::default().limit(max_body);
        let query_limits = handlers::QueryLimits { max_page_size };
        let timeout = Timeout::new(Duration::from_millis(timeout_ms.max(1)));
        #[cfg(feature = "docs")]
        let docs_cfg = docs_cfg.clone();
        #[cfg(feature = "swagger")]
        let swagger_cfg = swagger_cfg.clone();
        #[cfg(feature = "swagger")]
        let swagger_oauth2 = swagger_oauth2.clone();
        #[cfg(feature = "auth")]
        let auth_state = auth_state.clone();
        #[cfg(feature = "metrics")]
        let prometheus = prometheus.clone();
        let app = App::new()
            .app_data(web::Data::new(backend))
            .app_data(build_info.clone())
            .app_data(web::Data::new(query_limits))
            .app_data(json_cfg)
            .app_data(pay_cfg)
            .wrap(middleware::Condition::new(timeout_ms > 0, timeout))
            .wrap(middleware::Condition::new(
                compress,
                middleware::Compress::default(),
            ))
            .wrap(middleware::Logger::new("%a \"%r\" %s %b bytes %Dms"));
        // Auth middleware wraps everything below — including the docs +
        // swagger services and the prefix scope. Health/version probes
        // are registered above and remain unauthenticated by design so
        // load balancers can keep checking liveness. When auth is
        // disabled the middleware is a pass-through.
        #[cfg(feature = "auth")]
        let app = match auth_state.clone() {
            Some(state) => app
                .app_data(web::Data::new(state.cfg.clone()))
                .wrap(crate::auth::Auth::new(state)),
            None => app.wrap(crate::auth::Auth::disabled()),
        };
        // Prometheus middleware sits OUTERMOST (added last → runs first) so
        // it observes every request — including those auth rejects — and so
        // the `/metrics` scrape it serves bypasses the auth layer entirely.
        // `Condition` makes it a pass-through (and suppresses the endpoint)
        // when `[metrics].enabled = false`.
        #[cfg(feature = "metrics")]
        let app = app.wrap(middleware::Condition::new(metrics_enabled, prometheus));
        let app = app
            .service(handlers::healthz)
            .service(handlers::readyz)
            .service(handlers::version);
        // Docs + swagger are registered BEFORE the `web::scope(prefix)`
        // catch-all below. An empty `prefix` (the default) becomes
        // `web::scope("")` which matches every path and 404s any miss
        // *inside* the scope — so services registered after it become
        // unreachable. Keeping these at the top of the dispatch chain
        // sidesteps that.
        #[cfg(feature = "docs")]
        let app = if docs_cfg.enabled {
            app.configure(|c| crate::docs::configure(&docs_cfg.path, c))
        } else {
            app
        };
        #[cfg(feature = "swagger")]
        let app = if swagger_cfg.enabled {
            app.configure(|c| {
                crate::swagger::configure(&swagger_cfg.path, swagger_oauth2.as_ref(), c)
            })
        } else {
            app
        };
        app.service(
            web::scope(prefix.as_str())
                .service(handlers::health)
                // Canonical, versioned API.
                .service(web::scope("/api/v1").configure(handlers::v1::configure))
                // Legacy un-versioned alias. Kept around so older
                // clients (and the historical `/api/datasets/...`
                // URLs in docs / scripts) keep working. New code
                // should prefer `/api/v1/...`.
                .service(web::scope("/api").configure(handlers::v1::configure)),
        )
    });
    if let Some(w) = workers {
        server = server.workers(w);
    }

    // Disable actix's built-in signal handling so we can log which signal
    // triggered shutdown, then drive the same `ServerHandle::stop(true)`
    // path it would have used internally.
    let running = server
        .bind(addr)?
        .shutdown_timeout(shutdown_secs)
        .disable_signals()
        .run();
    let handle = running.handle();
    tokio::spawn(shutdown_listener(handle, shutdown_secs, shutdown));

    running.await
}

/// Wait for the configured shutdown trigger (OS signal or an external
/// future), log it, then ask the actix server handle to stop gracefully.
async fn shutdown_listener(
    handle: actix_web::dev::ServerHandle,
    grace_secs: u64,
    shutdown: Shutdown,
) {
    match shutdown {
        Shutdown::Signals => {
            let which = wait_for_signal().await;
            log::info!(
                "Received {which}, shutting down gracefully (up to {grace_secs}s for in-flight requests)..."
            );
        }
        Shutdown::External(fut) => {
            fut.await;
            log::info!(
                "Shutdown requested by host, draining in-flight requests (up to {grace_secs}s)..."
            );
        }
    }
    handle.stop(true).await;
    log::info!("Shutdown complete.");
}

#[cfg(unix)]
async fn wait_for_signal() -> &'static str {
    use tokio::signal::unix::{SignalKind, signal};
    // `expect` is OK here: failing to install a signal handler at startup
    // is a misconfigured runtime, not a recoverable condition.
    let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
    let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
    tokio::select! {
        _ = sigterm.recv() => "SIGTERM",
        _ = sigint.recv()  => "SIGINT",
    }
}

#[cfg(not(unix))]
async fn wait_for_signal() -> &'static str {
    // Windows / other: only Ctrl+C is portably available through tokio.
    let _ = tokio::signal::ctrl_c().await;
    "Ctrl+C"
}

/// Pretty-print the route table at startup. Two sections:
///   - general routes (health, probes)
///   - per-dataset routes for every mounted API version (canonical
///     `/api/v1/...` + the legacy un-versioned `/api/...` alias).
fn log_routes(prefix: &str, backend: &dyn Backend) {
    // Column widths chosen to fit the longest method + a comfortable
    // path column. Names are inlined into the per-dataset paths.
    const METHOD_W: usize = 6;

    let p = prefix; // already validated to start with '/' or be empty

    log::info!("Routes:");
    log::info!("  general:");
    for (method, path) in [
        ("GET", "/healthz".to_string()),
        ("GET", "/readyz".to_string()),
        ("GET", "/version".to_string()),
        ("GET", format!("{p}/health")),
    ] {
        log::info!("    {:<width$} {}", method, path, width = METHOD_W);
    }

    // Each API version is mounted under its own scope; we currently
    // also expose v1 under the un-versioned `/api` for back-compat.
    let mounts: &[(&str, &[(&str, &str)])] = &[
        ("/api/v1", handlers::v1::ROUTES),
        ("/api", handlers::v1::ROUTES), // legacy alias
    ];

    let names = backend.names();
    for (mount, routes) in mounts {
        log::info!("  {p}{mount}:");
        // Top-level (non-dataset-scoped) routes for this version.
        for (method, suffix) in *routes {
            if !suffix.contains("{name}") {
                log::info!(
                    "    {:<width$} {p}{mount}{suffix}",
                    method,
                    width = METHOD_W,
                );
            }
        }
        if names.is_empty() {
            log::info!("    (no datasets registered)");
            continue;
        }
        for name in &names {
            for (method, suffix) in *routes {
                if let Some(rest) = suffix.strip_prefix("/datasets/{name}") {
                    log::info!(
                        "    {:<width$} {p}{mount}/datasets/{name}{rest}",
                        method,
                        width = METHOD_W,
                    );
                }
            }
        }
    }
}