Skip to main content

datapress_core/
server.rs

1//! Shared actix-web bootstrap. Both backends call [`serve`] from their
2//! own thin `serve(cfg)` entry point.
3
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use actix_web::{App, HttpServer, middleware, web};
10
11use crate::backend::Backend;
12use crate::config::AppConfig;
13use crate::handlers;
14use crate::timeout::Timeout;
15
16/// How the running server is asked to begin a graceful shutdown.
17enum Shutdown {
18    /// Install `SIGINT`/`SIGTERM` (or `Ctrl+C`) handlers and stop when one
19    /// arrives. Used by the standalone binaries, which own the process and
20    /// its signal disposition.
21    Signals,
22    /// Stop when the given future resolves. Used when DataPress is embedded
23    /// (e.g. the Python extension), where the *host* owns signal handling
24    /// and drives shutdown by completing this future. No OS signal handlers
25    /// are installed, so we never fight the host's handlers.
26    External(Pin<Box<dyn Future<Output = ()> + Send>>),
27}
28
29/// Bind the HTTP server, register the generic handler set against
30/// `backend`, and run until the process receives `SIGINT` or `SIGTERM`.
31///
32/// Shutdown is **graceful**: on signal the listening socket is closed,
33/// existing connections get up to `cfg.server.shutdown_timeout_secs`
34/// seconds to drain in-flight requests, then workers are stopped.
35///
36/// `label` is the human-readable backend name used in the startup log
37/// line (e.g. `"DuckDB"`, `"DataFusion"`).
38pub async fn serve(cfg: AppConfig, backend: Arc<dyn Backend>, label: &str) -> std::io::Result<()> {
39    run_server(cfg, backend, label, Shutdown::Signals).await
40}
41
42/// Like [`serve`], but driven to a graceful stop by `shutdown` instead of
43/// OS signals.
44///
45/// Intended for embedding DataPress inside another runtime (the Python
46/// extension's `DataPress.run()`), where installing process-global signal
47/// handlers would race the host's own. The caller resolves `shutdown` —
48/// for example when its asyncio task is cancelled by `Ctrl+C` — and the
49/// server then drains in-flight requests within
50/// `cfg.server.shutdown_timeout_secs` and returns.
51pub async fn serve_with_shutdown(
52    cfg: AppConfig,
53    backend: Arc<dyn Backend>,
54    label: &str,
55    shutdown: impl Future<Output = ()> + Send + 'static,
56) -> std::io::Result<()> {
57    run_server(cfg, backend, label, Shutdown::External(Box::pin(shutdown))).await
58}
59
60async fn run_server(
61    cfg: AppConfig,
62    backend: Arc<dyn Backend>,
63    label: &str,
64    shutdown: Shutdown,
65) -> std::io::Result<()> {
66    let addr = (cfg.server.listen, cfg.server.port);
67    let workers = cfg.server.workers;
68    let prefix = cfg.server.prefix.clone();
69    let compress = cfg.server.compress;
70    let max_body = cfg.server.max_body_bytes;
71    let max_page_size = cfg.server.max_page_size;
72    let timeout_ms = cfg.server.request_timeout_ms;
73    let shutdown_secs = cfg.server.shutdown_timeout_secs;
74    let docs_cfg = cfg.docs.clone();
75    let swagger_cfg = cfg.swagger.clone();
76    let metrics_cfg = cfg.metrics.clone();
77
78    // Warn (but don't fail) when the operator asked for docs in TOML but
79    // this binary was built without the cargo feature that embeds them.
80    #[cfg(not(feature = "docs"))]
81    if docs_cfg.enabled {
82        log::warn!(
83            "[docs] enabled = true in config, but this binary was built \
84             without --features docs; skipping docs site"
85        );
86    }
87    #[cfg(not(feature = "swagger"))]
88    if swagger_cfg.enabled {
89        log::warn!(
90            "[swagger] enabled = true in config, but this binary was built \
91             without --features swagger; skipping Swagger UI"
92        );
93    }
94    #[cfg(not(feature = "auth"))]
95    if cfg.auth.enabled {
96        log::warn!(
97            "[auth] enabled = true in config, but this binary was built \
98             without --features auth; skipping OIDC enforcement"
99        );
100    }
101    #[cfg(not(feature = "metrics"))]
102    if metrics_cfg.enabled {
103        log::warn!(
104            "[metrics] enabled = true in config, but this binary was built \
105             without --features metrics; skipping Prometheus endpoint"
106        );
107    }
108
109    // Boot the JWKS cache (and validate config) before binding the
110    // listener. With `start_degraded = true` this only warns on an
111    // unreachable IdP; with `false` it propagates the error and the
112    // process exits non-zero.
113    #[cfg(feature = "auth")]
114    let auth_state = if cfg.auth.enabled {
115        let jwks = crate::auth::JwksCache::boot(&cfg.auth)
116            .await
117            .map_err(|e| std::io::Error::other(format!("auth bootstrap failed: {e}")))?;
118        log::info!(
119            "[auth] OIDC enforcement enabled (issuer = {}, audience = {}, read_scopes = {:?}, reload_scopes = {:?})",
120            cfg.auth.issuer,
121            if cfg.auth.audience.is_empty() {
122                "<none>"
123            } else {
124                cfg.auth.audience.as_str()
125            },
126            cfg.auth.read_scopes,
127            cfg.auth.reload_scopes,
128        );
129        Some(crate::auth::AuthState {
130            cfg: Arc::new(cfg.auth.clone()),
131            jwks,
132        })
133    } else {
134        None
135    };
136
137    log::info!(
138        "Listening on http://{}:{}{} ({} backend, {} workers, compression {}, max-body {} bytes, max-page-size {}, timeout {}, shutdown grace {}s)",
139        cfg.server.listen,
140        cfg.server.port,
141        if prefix.is_empty() {
142            "".into()
143        } else {
144            format!("{prefix}/")
145        },
146        label,
147        workers
148            .map(|w| w.to_string())
149            .unwrap_or_else(|| "auto".into()),
150        if compress { "on" } else { "off" },
151        max_body,
152        max_page_size,
153        if timeout_ms == 0 {
154            "off".into()
155        } else {
156            format!("{timeout_ms} ms")
157        },
158        shutdown_secs,
159    );
160
161    log_routes(&prefix, backend.as_ref());
162
163    #[cfg(feature = "docs")]
164    if docs_cfg.enabled {
165        log::info!("  {} (mkdocs site):", docs_cfg.path);
166        log::info!("    GET    {}/", docs_cfg.path);
167        log::info!("    GET    {}/{{path}}", docs_cfg.path);
168    }
169
170    #[cfg(feature = "swagger")]
171    if swagger_cfg.enabled {
172        log::info!("  {} (swagger UI):", swagger_cfg.path);
173        log::info!("    GET    {}/", swagger_cfg.path);
174        log::info!("    GET    {}/openapi.json", swagger_cfg.path);
175    }
176
177    // Resolve the Swagger UI's OIDC login endpoints once, before binding.
178    // We emit an explicit `oauth2` authorizationCode flow in the spec (see
179    // `swagger::ResolvedOAuth2`); discovering the authorize/token URLs here
180    // keeps the operator-facing config to just an `issuer`. On failure we
181    // log and serve the docs *without* a login button rather than shipping
182    // an empty Authorize dialog.
183    #[cfg(feature = "swagger")]
184    let swagger_oauth2 = if swagger_cfg.enabled {
185        match swagger_cfg.oauth2.as_ref() {
186            Some(o) => match crate::swagger::resolve_oauth2(o).await {
187                Ok(resolved) => Some(resolved),
188                Err(e) => {
189                    log::warn!(
190                        "[swagger.oauth2] OIDC discovery for issuer {} failed ({e}); \
191                         serving docs without the Authorize button",
192                        o.issuer
193                    );
194                    None
195                }
196            },
197            None => None,
198        }
199    } else {
200        None
201    };
202
203    // Build the Prometheus middleware once, outside the worker closure, so
204    // every worker shares a single registry (counts aggregate correctly).
205    // Constructed whenever the feature is compiled; the runtime `enabled`
206    // flag gates whether it is actually wrapped (and the endpoint served).
207    #[cfg(feature = "metrics")]
208    let prometheus = {
209        use actix_web_prom::PrometheusMetricsBuilder;
210        PrometheusMetricsBuilder::new("datapress")
211            .endpoint(metrics_cfg.path.as_str())
212            .build()
213            .map_err(|e| std::io::Error::other(format!("metrics init failed: {e}")))?
214    };
215    #[cfg(feature = "metrics")]
216    let metrics_enabled = metrics_cfg.enabled;
217
218    #[cfg(feature = "metrics")]
219    if metrics_cfg.enabled {
220        log::info!("  {} (prometheus metrics):", metrics_cfg.path);
221        log::info!("    GET    {}", metrics_cfg.path);
222    }
223
224    let build_info = web::Data::new(handlers::BuildInfo::new(
225        // `&'static str` so it fits BuildInfo's compile-time fields.
226        // The match keeps this generic enough for future backends.
227        match label {
228            "DuckDB" => "DuckDB",
229            "DataFusion" => "DataFusion",
230            _ => "unknown",
231        },
232    ));
233
234    let mut server = HttpServer::new(move || {
235        let backend = backend.clone();
236        let prefix = prefix.clone();
237        let json_cfg = web::JsonConfig::default().limit(max_body);
238        let pay_cfg = web::PayloadConfig::default().limit(max_body);
239        let query_limits = handlers::QueryLimits { max_page_size };
240        let timeout = Timeout::new(Duration::from_millis(timeout_ms.max(1)));
241        #[cfg(feature = "docs")]
242        let docs_cfg = docs_cfg.clone();
243        #[cfg(feature = "swagger")]
244        let swagger_cfg = swagger_cfg.clone();
245        #[cfg(feature = "swagger")]
246        let swagger_oauth2 = swagger_oauth2.clone();
247        #[cfg(feature = "auth")]
248        let auth_state = auth_state.clone();
249        #[cfg(feature = "metrics")]
250        let prometheus = prometheus.clone();
251        let app = App::new()
252            .app_data(web::Data::new(backend))
253            .app_data(build_info.clone())
254            .app_data(web::Data::new(query_limits))
255            .app_data(json_cfg)
256            .app_data(pay_cfg)
257            .wrap(middleware::Condition::new(timeout_ms > 0, timeout))
258            .wrap(middleware::Condition::new(
259                compress,
260                middleware::Compress::default(),
261            ))
262            .wrap(middleware::Logger::new("%a \"%r\" %s %b bytes %Dms"));
263        // Auth middleware wraps everything below — including the docs +
264        // swagger services and the prefix scope. Health/version probes
265        // are registered above and remain unauthenticated by design so
266        // load balancers can keep checking liveness. When auth is
267        // disabled the middleware is a pass-through.
268        #[cfg(feature = "auth")]
269        let app = match auth_state.clone() {
270            Some(state) => app
271                .app_data(web::Data::new(state.cfg.clone()))
272                .wrap(crate::auth::Auth::new(state)),
273            None => app.wrap(crate::auth::Auth::disabled()),
274        };
275        // Prometheus middleware sits OUTERMOST (added last → runs first) so
276        // it observes every request — including those auth rejects — and so
277        // the `/metrics` scrape it serves bypasses the auth layer entirely.
278        // `Condition` makes it a pass-through (and suppresses the endpoint)
279        // when `[metrics].enabled = false`.
280        #[cfg(feature = "metrics")]
281        let app = app.wrap(middleware::Condition::new(metrics_enabled, prometheus));
282        let app = app
283            .service(handlers::healthz)
284            .service(handlers::readyz)
285            .service(handlers::version);
286        // Docs + swagger are registered BEFORE the `web::scope(prefix)`
287        // catch-all below. An empty `prefix` (the default) becomes
288        // `web::scope("")` which matches every path and 404s any miss
289        // *inside* the scope — so services registered after it become
290        // unreachable. Keeping these at the top of the dispatch chain
291        // sidesteps that.
292        #[cfg(feature = "docs")]
293        let app = if docs_cfg.enabled {
294            app.configure(|c| crate::docs::configure(&docs_cfg.path, c))
295        } else {
296            app
297        };
298        #[cfg(feature = "swagger")]
299        let app = if swagger_cfg.enabled {
300            app.configure(|c| {
301                crate::swagger::configure(&swagger_cfg.path, swagger_oauth2.as_ref(), c)
302            })
303        } else {
304            app
305        };
306        app.service(
307            web::scope(prefix.as_str())
308                .service(handlers::health)
309                // Canonical, versioned API.
310                .service(web::scope("/api/v1").configure(handlers::v1::configure))
311                // Legacy un-versioned alias. Kept around so older
312                // clients (and the historical `/api/datasets/...`
313                // URLs in docs / scripts) keep working. New code
314                // should prefer `/api/v1/...`.
315                .service(web::scope("/api").configure(handlers::v1::configure)),
316        )
317    });
318    if let Some(w) = workers {
319        server = server.workers(w);
320    }
321
322    // Disable actix's built-in signal handling so we can log which signal
323    // triggered shutdown, then drive the same `ServerHandle::stop(true)`
324    // path it would have used internally.
325    let running = server
326        .bind(addr)?
327        .shutdown_timeout(shutdown_secs)
328        .disable_signals()
329        .run();
330    let handle = running.handle();
331    tokio::spawn(shutdown_listener(handle, shutdown_secs, shutdown));
332
333    running.await
334}
335
336/// Wait for the configured shutdown trigger (OS signal or an external
337/// future), log it, then ask the actix server handle to stop gracefully.
338async fn shutdown_listener(
339    handle: actix_web::dev::ServerHandle,
340    grace_secs: u64,
341    shutdown: Shutdown,
342) {
343    match shutdown {
344        Shutdown::Signals => {
345            let which = wait_for_signal().await;
346            log::info!(
347                "Received {which}, shutting down gracefully (up to {grace_secs}s for in-flight requests)..."
348            );
349        }
350        Shutdown::External(fut) => {
351            fut.await;
352            log::info!(
353                "Shutdown requested by host, draining in-flight requests (up to {grace_secs}s)..."
354            );
355        }
356    }
357    handle.stop(true).await;
358    log::info!("Shutdown complete.");
359}
360
361#[cfg(unix)]
362async fn wait_for_signal() -> &'static str {
363    use tokio::signal::unix::{SignalKind, signal};
364    // `expect` is OK here: failing to install a signal handler at startup
365    // is a misconfigured runtime, not a recoverable condition.
366    let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
367    let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
368    tokio::select! {
369        _ = sigterm.recv() => "SIGTERM",
370        _ = sigint.recv()  => "SIGINT",
371    }
372}
373
374#[cfg(not(unix))]
375async fn wait_for_signal() -> &'static str {
376    // Windows / other: only Ctrl+C is portably available through tokio.
377    let _ = tokio::signal::ctrl_c().await;
378    "Ctrl+C"
379}
380
381/// Pretty-print the route table at startup. Two sections:
382///   - general routes (health, probes)
383///   - per-dataset routes for every mounted API version (canonical
384///     `/api/v1/...` + the legacy un-versioned `/api/...` alias).
385fn log_routes(prefix: &str, backend: &dyn Backend) {
386    // Column widths chosen to fit the longest method + a comfortable
387    // path column. Names are inlined into the per-dataset paths.
388    const METHOD_W: usize = 6;
389
390    let p = prefix; // already validated to start with '/' or be empty
391
392    log::info!("Routes:");
393    log::info!("  general:");
394    for (method, path) in [
395        ("GET", "/healthz".to_string()),
396        ("GET", "/readyz".to_string()),
397        ("GET", "/version".to_string()),
398        ("GET", format!("{p}/health")),
399    ] {
400        log::info!("    {:<width$} {}", method, path, width = METHOD_W);
401    }
402
403    // Each API version is mounted under its own scope; we currently
404    // also expose v1 under the un-versioned `/api` for back-compat.
405    let mounts: &[(&str, &[(&str, &str)])] = &[
406        ("/api/v1", handlers::v1::ROUTES),
407        ("/api", handlers::v1::ROUTES), // legacy alias
408    ];
409
410    let names = backend.names();
411    for (mount, routes) in mounts {
412        log::info!("  {p}{mount}:");
413        // Top-level (non-dataset-scoped) routes for this version.
414        for (method, suffix) in *routes {
415            if !suffix.contains("{name}") {
416                log::info!(
417                    "    {:<width$} {p}{mount}{suffix}",
418                    method,
419                    width = METHOD_W,
420                );
421            }
422        }
423        if names.is_empty() {
424            log::info!("    (no datasets registered)");
425            continue;
426        }
427        for name in &names {
428            for (method, suffix) in *routes {
429                if let Some(rest) = suffix.strip_prefix("/datasets/{name}") {
430                    log::info!(
431                        "    {:<width$} {p}{mount}/datasets/{name}{rest}",
432                        method,
433                        width = METHOD_W,
434                    );
435                }
436            }
437        }
438    }
439}