Skip to main content

datapress_core/
server.rs

1//! Shared actix-web bootstrap. Both backends call [`serve`] from their
2//! own thin `serve(cfg)` entry point.
3
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use actix_web::{App, HttpServer, middleware, web};
10
11use crate::backend::Backend;
12use crate::config::AppConfig;
13use crate::handlers;
14use crate::timeout::Timeout;
15
16/// How the running server is asked to begin a graceful shutdown.
17enum Shutdown {
18    /// Install `SIGINT`/`SIGTERM` (or `Ctrl+C`) handlers and stop when one
19    /// arrives. Used by the standalone binaries, which own the process and
20    /// its signal disposition.
21    Signals,
22    /// Stop when the given future resolves. Used when DataPress is embedded
23    /// (e.g. the Python extension), where the *host* owns signal handling
24    /// and drives shutdown by completing this future. No OS signal handlers
25    /// are installed, so we never fight the host's handlers.
26    External(Pin<Box<dyn Future<Output = ()> + Send>>),
27}
28
29/// Bind the HTTP server, register the generic handler set against
30/// `backend`, and run until the process receives `SIGINT` or `SIGTERM`.
31///
32/// Shutdown is **graceful**: on signal the listening socket is closed,
33/// existing connections get up to `cfg.server.shutdown_timeout_secs`
34/// seconds to drain in-flight requests, then workers are stopped.
35///
36/// `label` is the human-readable backend name used in the startup log
37/// line (e.g. `"DuckDB"`, `"DataFusion"`).
38pub async fn serve(cfg: AppConfig, backend: Arc<dyn Backend>, label: &str) -> std::io::Result<()> {
39    run_server(cfg, backend, label, Shutdown::Signals).await
40}
41
42/// Like [`serve`], but driven to a graceful stop by `shutdown` instead of
43/// OS signals.
44///
45/// Intended for embedding DataPress inside another runtime (the Python
46/// extension's `DataPress.run()`), where installing process-global signal
47/// handlers would race the host's own. The caller resolves `shutdown` —
48/// for example when its asyncio task is cancelled by `Ctrl+C` — and the
49/// server then drains in-flight requests within
50/// `cfg.server.shutdown_timeout_secs` and returns.
51pub async fn serve_with_shutdown(
52    cfg: AppConfig,
53    backend: Arc<dyn Backend>,
54    label: &str,
55    shutdown: impl Future<Output = ()> + Send + 'static,
56) -> std::io::Result<()> {
57    run_server(cfg, backend, label, Shutdown::External(Box::pin(shutdown))).await
58}
59
60async fn run_server(
61    cfg: AppConfig,
62    backend: Arc<dyn Backend>,
63    label: &str,
64    shutdown: Shutdown,
65) -> std::io::Result<()> {
66    let addr = (cfg.server.listen, cfg.server.port);
67    let workers = cfg.server.workers;
68    let prefix = cfg.server.prefix.clone();
69    let compress = cfg.server.compress;
70    let max_body = cfg.server.max_body_bytes;
71    let max_page_size = cfg.server.max_page_size;
72    let timeout_ms = cfg.server.request_timeout_ms;
73    let shutdown_secs = cfg.server.shutdown_timeout_secs;
74    let sql_settings = handlers::SqlSettings {
75        enabled: cfg.sql.enabled,
76        max_rows: cfg.sql.max_rows.max(1),
77    };
78    let docs_cfg = cfg.docs.clone();
79    let swagger_cfg = cfg.swagger.clone();
80    let metrics_cfg = cfg.metrics.clone();
81    let explorer_cfg = cfg.explorer.clone();
82
83    // Warn (but don't fail) when the operator asked for docs in TOML but
84    // this binary was built without the cargo feature that embeds them.
85    #[cfg(not(feature = "docs"))]
86    if docs_cfg.enabled {
87        log::warn!(
88            "[docs] enabled = true in config, but this binary was built \
89             without --features docs; skipping docs site"
90        );
91    }
92    #[cfg(not(feature = "swagger"))]
93    if swagger_cfg.enabled {
94        log::warn!(
95            "[swagger] enabled = true in config, but this binary was built \
96             without --features swagger; skipping Swagger UI"
97        );
98    }
99    #[cfg(not(feature = "auth"))]
100    if cfg.auth.enabled {
101        log::warn!(
102            "[auth] enabled = true in config, but this binary was built \
103             without --features auth; skipping OIDC enforcement"
104        );
105    }
106    #[cfg(not(feature = "metrics"))]
107    if metrics_cfg.enabled {
108        log::warn!(
109            "[metrics] enabled = true in config, but this binary was built \
110             without --features metrics; skipping Prometheus endpoint"
111        );
112    }
113    #[cfg(not(feature = "explorer"))]
114    if explorer_cfg.enabled {
115        log::warn!(
116            "[explorer] enabled = true in config, but this binary was built \
117             without --features explorer; skipping explorer UI"
118        );
119    }
120
121    // Boot the JWKS cache (and validate config) before binding the
122    // listener. With `start_degraded = true` this only warns on an
123    // unreachable IdP; with `false` it propagates the error and the
124    // process exits non-zero.
125    #[cfg(feature = "auth")]
126    let auth_state = if cfg.auth.enabled {
127        let jwks = crate::auth::JwksCache::boot(&cfg.auth)
128            .await
129            .map_err(|e| std::io::Error::other(format!("auth bootstrap failed: {e}")))?;
130        log::info!(
131            "[auth] OIDC enforcement enabled (issuer = {}, audience = {}, read_scopes = {:?}, reload_scopes = {:?})",
132            cfg.auth.issuer,
133            if cfg.auth.audience.is_empty() {
134                "<none>"
135            } else {
136                cfg.auth.audience.as_str()
137            },
138            cfg.auth.read_scopes,
139            cfg.auth.reload_scopes,
140        );
141        Some(crate::auth::AuthState {
142            cfg: Arc::new(cfg.auth.clone()),
143            jwks,
144        })
145    } else {
146        None
147    };
148
149    log::info!(
150        "Listening on http://{}:{}{} ({} backend, {} workers, compression {}, max-body {} bytes, max-page-size {}, timeout {}, shutdown grace {}s)",
151        cfg.server.listen,
152        cfg.server.port,
153        if prefix.is_empty() {
154            "".into()
155        } else {
156            format!("{prefix}/")
157        },
158        label,
159        workers
160            .map(|w| w.to_string())
161            .unwrap_or_else(|| "auto".into()),
162        if compress { "on" } else { "off" },
163        max_body,
164        max_page_size,
165        if timeout_ms == 0 {
166            "off".into()
167        } else {
168            format!("{timeout_ms} ms")
169        },
170        shutdown_secs,
171    );
172
173    log_routes(&prefix, backend.as_ref());
174
175    #[cfg(feature = "docs")]
176    if docs_cfg.enabled {
177        log::info!("  {} (mkdocs site):", docs_cfg.path);
178        log::info!("    GET    {}/", docs_cfg.path);
179        log::info!("    GET    {}/{{path}}", docs_cfg.path);
180    }
181
182    #[cfg(feature = "swagger")]
183    if swagger_cfg.enabled {
184        log::info!("  {} (swagger UI):", swagger_cfg.path);
185        log::info!("    GET    {}/", swagger_cfg.path);
186        log::info!("    GET    {}/openapi.json", swagger_cfg.path);
187    }
188
189    #[cfg(feature = "explorer")]
190    if explorer_cfg.enabled {
191        log::info!("  {} (explorer UI):", explorer_cfg.path);
192        log::info!("    GET    {}/", explorer_cfg.path);
193        log::info!("    GET    {}/datasets/{{name}}", explorer_cfg.path);
194    }
195
196    // Resolve the Swagger UI's OIDC login endpoints once, before binding.
197    // We emit an explicit `oauth2` authorizationCode flow in the spec (see
198    // `swagger::ResolvedOAuth2`); discovering the authorize/token URLs here
199    // keeps the operator-facing config to just an `issuer`. On failure we
200    // log and serve the docs *without* a login button rather than shipping
201    // an empty Authorize dialog.
202    #[cfg(feature = "swagger")]
203    let swagger_oauth2 = if swagger_cfg.enabled {
204        match swagger_cfg.oauth2.as_ref() {
205            Some(o) => match crate::swagger::resolve_oauth2(o).await {
206                Ok(resolved) => Some(resolved),
207                Err(e) => {
208                    log::warn!(
209                        "[swagger.oauth2] OIDC discovery for issuer {} failed ({e}); \
210                         serving docs without the Authorize button",
211                        o.issuer
212                    );
213                    None
214                }
215            },
216            None => None,
217        }
218    } else {
219        None
220    };
221
222    // Build the Prometheus middleware once, outside the worker closure, so
223    // every worker shares a single registry (counts aggregate correctly).
224    // Constructed whenever the feature is compiled; the runtime `enabled`
225    // flag gates whether it is actually wrapped (and the endpoint served).
226    #[cfg(feature = "metrics")]
227    let prometheus = {
228        use actix_web_prom::PrometheusMetricsBuilder;
229        PrometheusMetricsBuilder::new("datapress")
230            .endpoint(metrics_cfg.path.as_str())
231            .build()
232            .map_err(|e| std::io::Error::other(format!("metrics init failed: {e}")))?
233    };
234    #[cfg(feature = "metrics")]
235    let metrics_enabled = metrics_cfg.enabled;
236
237    #[cfg(feature = "metrics")]
238    if metrics_cfg.enabled {
239        log::info!("  {} (prometheus metrics):", metrics_cfg.path);
240        log::info!("    GET    {}", metrics_cfg.path);
241    }
242
243    let build_info = web::Data::new(handlers::BuildInfo::new(
244        // `&'static str` so it fits BuildInfo's compile-time fields.
245        // The match keeps this generic enough for future backends.
246        match label {
247            "DuckDB" => "DuckDB",
248            "DataFusion" => "DataFusion",
249            _ => "unknown",
250        },
251    ));
252
253    // One Parquet export cache shared across all workers (it wraps an Arc),
254    // so a dataset is encoded at most once and every worker serves the same
255    // bytes for the ranged requests a Parquet reader makes.
256    let parquet_cache = web::Data::new(handlers::ParquetCache::default());
257
258    // One shared explorer state across all workers (it wraps an Arc backend).
259    // Built once here; each worker clones the `web::Data` handle.
260    #[cfg(feature = "explorer")]
261    let explorer_state = if explorer_cfg.enabled {
262        // "Docs" link target: the locally-mounted MkDocs site when it is
263        // both compiled in and enabled, otherwise the public docs site.
264        let docs_url = {
265            #[cfg(feature = "docs")]
266            {
267                if docs_cfg.enabled {
268                    docs_cfg.path.clone()
269                } else {
270                    "https://docs.datap-rs.org".to_string()
271                }
272            }
273            #[cfg(not(feature = "docs"))]
274            {
275                "https://docs.datap-rs.org".to_string()
276            }
277        };
278        // "API" link target: the locally-mounted Swagger UI when it is both
279        // compiled in and enabled; `None` hides the link.
280        let swagger_url = {
281            #[cfg(feature = "swagger")]
282            {
283                if swagger_cfg.enabled {
284                    Some(format!("{}/", swagger_cfg.path))
285                } else {
286                    None
287                }
288            }
289            #[cfg(not(feature = "swagger"))]
290            {
291                None::<String>
292            }
293        };
294        Some(web::Data::new(crate::explorer::ExplorerState {
295            backend: backend.clone(),
296            datasets: cfg.datasets.clone(),
297            explorer_base: explorer_cfg.path.clone(),
298            api_base: format!("{prefix}/api/v1"),
299            backend_label: label.to_string(),
300            sql_enabled: cfg.sql.enabled,
301            docs_url,
302            swagger_url,
303        }))
304    } else {
305        None
306    };
307
308    let mut server = HttpServer::new(move || {
309        let backend = backend.clone();
310        let prefix = prefix.clone();
311        let json_cfg = web::JsonConfig::default().limit(max_body);
312        let pay_cfg = web::PayloadConfig::default().limit(max_body);
313        let query_limits = handlers::QueryLimits { max_page_size };
314        let timeout = Timeout::new(Duration::from_millis(timeout_ms.max(1)));
315        #[cfg(feature = "docs")]
316        let docs_cfg = docs_cfg.clone();
317        #[cfg(feature = "explorer")]
318        let explorer_state = explorer_state.clone();
319        #[cfg(feature = "swagger")]
320        let swagger_cfg = swagger_cfg.clone();
321        #[cfg(feature = "swagger")]
322        let swagger_oauth2 = swagger_oauth2.clone();
323        #[cfg(feature = "auth")]
324        let auth_state = auth_state.clone();
325        #[cfg(feature = "metrics")]
326        let prometheus = prometheus.clone();
327        let app = App::new()
328            .app_data(web::Data::new(backend))
329            .app_data(build_info.clone())
330            .app_data(web::Data::new(query_limits))
331            .app_data(web::Data::new(sql_settings))
332            .app_data(parquet_cache.clone())
333            .app_data(json_cfg)
334            .app_data(pay_cfg)
335            .wrap(middleware::Condition::new(timeout_ms > 0, timeout))
336            .wrap(middleware::Condition::new(
337                compress,
338                middleware::Compress::default(),
339            ))
340            .wrap(middleware::Logger::new("%a \"%r\" %s %b bytes %Dms"));
341        // Auth middleware wraps everything below — including the docs +
342        // swagger services and the prefix scope. Health/version probes
343        // are registered above and remain unauthenticated by design so
344        // load balancers can keep checking liveness. When auth is
345        // disabled the middleware is a pass-through.
346        #[cfg(feature = "auth")]
347        let app = match auth_state.clone() {
348            Some(state) => app
349                .app_data(web::Data::new(state.cfg.clone()))
350                .wrap(crate::auth::Auth::new(state)),
351            None => app.wrap(crate::auth::Auth::disabled()),
352        };
353        // Prometheus middleware sits OUTERMOST (added last → runs first) so
354        // it observes every request — including those auth rejects — and so
355        // the `/metrics` scrape it serves bypasses the auth layer entirely.
356        // `Condition` makes it a pass-through (and suppresses the endpoint)
357        // when `[metrics].enabled = false`.
358        #[cfg(feature = "metrics")]
359        let app = app.wrap(middleware::Condition::new(metrics_enabled, prometheus));
360        let app = app
361            .service(handlers::healthz)
362            .service(handlers::readyz)
363            .service(handlers::version);
364        // Docs + swagger are registered BEFORE the `web::scope(prefix)`
365        // catch-all below. An empty `prefix` (the default) becomes
366        // `web::scope("")` which matches every path and 404s any miss
367        // *inside* the scope — so services registered after it become
368        // unreachable. Keeping these at the top of the dispatch chain
369        // sidesteps that.
370        #[cfg(feature = "docs")]
371        let app = if docs_cfg.enabled {
372            app.configure(|c| crate::docs::configure(&docs_cfg.path, c))
373        } else {
374            app
375        };
376        #[cfg(feature = "swagger")]
377        let app = if swagger_cfg.enabled {
378            app.configure(|c| {
379                crate::swagger::configure(&swagger_cfg.path, swagger_oauth2.as_ref(), c)
380            })
381        } else {
382            app
383        };
384        // Explorer UI — registered (like docs/swagger) BEFORE the
385        // `web::scope(prefix)` catch-all so an empty prefix can't shadow it.
386        #[cfg(feature = "explorer")]
387        let app = match explorer_state {
388            Some(state) => app.configure(|c| crate::explorer::configure(state, c)),
389            None => app,
390        };
391        app.service(
392            web::scope(prefix.as_str())
393                .service(handlers::health)
394                // Canonical, versioned API.
395                .service(web::scope("/api/v1").configure(handlers::v1::configure))
396                // Legacy un-versioned alias. Kept around so older
397                // clients (and the historical `/api/datasets/...`
398                // URLs in docs / scripts) keep working. New code
399                // should prefer `/api/v1/...`.
400                .service(web::scope("/api").configure(handlers::v1::configure)),
401        )
402    });
403    if let Some(w) = workers {
404        server = server.workers(w);
405    }
406
407    // Disable actix's built-in signal handling so we can log which signal
408    // triggered shutdown, then drive the same `ServerHandle::stop(true)`
409    // path it would have used internally.
410    let running = server
411        .bind(addr)?
412        .shutdown_timeout(shutdown_secs)
413        .disable_signals()
414        .run();
415    let handle = running.handle();
416    tokio::spawn(shutdown_listener(handle, shutdown_secs, shutdown));
417
418    running.await
419}
420
421/// Wait for the configured shutdown trigger (OS signal or an external
422/// future), log it, then ask the actix server handle to stop gracefully.
423async fn shutdown_listener(
424    handle: actix_web::dev::ServerHandle,
425    grace_secs: u64,
426    shutdown: Shutdown,
427) {
428    match shutdown {
429        Shutdown::Signals => {
430            let which = wait_for_signal().await;
431            log::info!(
432                "Received {which}, shutting down gracefully (up to {grace_secs}s for in-flight requests)..."
433            );
434        }
435        Shutdown::External(fut) => {
436            fut.await;
437            log::info!(
438                "Shutdown requested by host, draining in-flight requests (up to {grace_secs}s)..."
439            );
440        }
441    }
442    handle.stop(true).await;
443    log::info!("Shutdown complete.");
444}
445
446#[cfg(unix)]
447async fn wait_for_signal() -> &'static str {
448    use tokio::signal::unix::{SignalKind, signal};
449    // `expect` is OK here: failing to install a signal handler at startup
450    // is a misconfigured runtime, not a recoverable condition.
451    let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
452    let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
453    tokio::select! {
454        _ = sigterm.recv() => "SIGTERM",
455        _ = sigint.recv()  => "SIGINT",
456    }
457}
458
459#[cfg(not(unix))]
460async fn wait_for_signal() -> &'static str {
461    // Windows / other: only Ctrl+C is portably available through tokio.
462    let _ = tokio::signal::ctrl_c().await;
463    "Ctrl+C"
464}
465
466/// Pretty-print the route table at startup. Two sections:
467///   - general routes (health, probes)
468///   - per-dataset routes for every mounted API version (canonical
469///     `/api/v1/...` + the legacy un-versioned `/api/...` alias).
470fn log_routes(prefix: &str, backend: &dyn Backend) {
471    // Column widths chosen to fit the longest method + a comfortable
472    // path column. Names are inlined into the per-dataset paths.
473    const METHOD_W: usize = 6;
474
475    let p = prefix; // already validated to start with '/' or be empty
476
477    log::info!("Routes:");
478    log::info!("  general:");
479    for (method, path) in [
480        ("GET", "/healthz".to_string()),
481        ("GET", "/readyz".to_string()),
482        ("GET", "/version".to_string()),
483        ("GET", format!("{p}/health")),
484    ] {
485        log::info!("    {:<width$} {}", method, path, width = METHOD_W);
486    }
487
488    // Each API version is mounted under its own scope; we currently
489    // also expose v1 under the un-versioned `/api` for back-compat.
490    let mounts: &[(&str, &[(&str, &str)])] = &[
491        ("/api/v1", handlers::v1::ROUTES),
492        ("/api", handlers::v1::ROUTES), // legacy alias
493    ];
494
495    let names = backend.names();
496    for (mount, routes) in mounts {
497        log::info!("  {p}{mount}:");
498        // Top-level (non-dataset-scoped) routes for this version.
499        for (method, suffix) in *routes {
500            if !suffix.contains("{name}") {
501                log::info!(
502                    "    {:<width$} {p}{mount}{suffix}",
503                    method,
504                    width = METHOD_W,
505                );
506            }
507        }
508        if names.is_empty() {
509            log::info!("    (no datasets registered)");
510            continue;
511        }
512        for name in &names {
513            for (method, suffix) in *routes {
514                if let Some(rest) = suffix.strip_prefix("/datasets/{name}") {
515                    log::info!(
516                        "    {:<width$} {p}{mount}/datasets/{name}{rest}",
517                        method,
518                        width = METHOD_W,
519                    );
520                }
521            }
522        }
523    }
524}