1use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use actix_web::{App, HttpServer, middleware, web};
10
11use crate::backend::Backend;
12use crate::config::AppConfig;
13use crate::handlers;
14use crate::timeout::Timeout;
15
16enum Shutdown {
18 Signals,
22 External(Pin<Box<dyn Future<Output = ()> + Send>>),
27}
28
29pub async fn serve(cfg: AppConfig, backend: Arc<dyn Backend>, label: &str) -> std::io::Result<()> {
39 run_server(cfg, backend, label, Shutdown::Signals).await
40}
41
42pub async fn serve_with_shutdown(
52 cfg: AppConfig,
53 backend: Arc<dyn Backend>,
54 label: &str,
55 shutdown: impl Future<Output = ()> + Send + 'static,
56) -> std::io::Result<()> {
57 run_server(cfg, backend, label, Shutdown::External(Box::pin(shutdown))).await
58}
59
60async fn run_server(
61 cfg: AppConfig,
62 backend: Arc<dyn Backend>,
63 label: &str,
64 shutdown: Shutdown,
65) -> std::io::Result<()> {
66 let addr = (cfg.server.listen, cfg.server.port);
67 let workers = cfg.server.workers;
68 let prefix = cfg.server.prefix.clone();
69 let compress = cfg.server.compress;
70 let max_body = cfg.server.max_body_bytes;
71 let max_page_size = cfg.server.max_page_size;
72 let timeout_ms = cfg.server.request_timeout_ms;
73 let shutdown_secs = cfg.server.shutdown_timeout_secs;
74 let sql_settings = handlers::SqlSettings {
75 enabled: cfg.sql.enabled,
76 max_rows: cfg.sql.max_rows.max(1),
77 };
78 let docs_cfg = cfg.docs.clone();
79 let swagger_cfg = cfg.swagger.clone();
80 let metrics_cfg = cfg.metrics.clone();
81 let explorer_cfg = cfg.explorer.clone();
82
83 #[cfg(not(feature = "docs"))]
86 if docs_cfg.enabled {
87 log::warn!(
88 "[docs] enabled = true in config, but this binary was built \
89 without --features docs; skipping docs site"
90 );
91 }
92 #[cfg(not(feature = "swagger"))]
93 if swagger_cfg.enabled {
94 log::warn!(
95 "[swagger] enabled = true in config, but this binary was built \
96 without --features swagger; skipping Swagger UI"
97 );
98 }
99 #[cfg(not(feature = "auth"))]
100 if cfg.auth.enabled {
101 log::warn!(
102 "[auth] enabled = true in config, but this binary was built \
103 without --features auth; skipping OIDC enforcement"
104 );
105 }
106 #[cfg(not(feature = "metrics"))]
107 if metrics_cfg.enabled {
108 log::warn!(
109 "[metrics] enabled = true in config, but this binary was built \
110 without --features metrics; skipping Prometheus endpoint"
111 );
112 }
113 #[cfg(not(feature = "explorer"))]
114 if explorer_cfg.enabled {
115 log::warn!(
116 "[explorer] enabled = true in config, but this binary was built \
117 without --features explorer; skipping explorer UI"
118 );
119 }
120
121 #[cfg(feature = "auth")]
126 let auth_state = if cfg.auth.enabled {
127 let jwks = crate::auth::JwksCache::boot(&cfg.auth)
128 .await
129 .map_err(|e| std::io::Error::other(format!("auth bootstrap failed: {e}")))?;
130 log::info!(
131 "[auth] OIDC enforcement enabled (issuer = {}, audience = {}, read_scopes = {:?}, reload_scopes = {:?})",
132 cfg.auth.issuer,
133 if cfg.auth.audience.is_empty() {
134 "<none>"
135 } else {
136 cfg.auth.audience.as_str()
137 },
138 cfg.auth.read_scopes,
139 cfg.auth.reload_scopes,
140 );
141 Some(crate::auth::AuthState {
142 cfg: Arc::new(cfg.auth.clone()),
143 jwks,
144 })
145 } else {
146 None
147 };
148
149 log::info!(
150 "Listening on http://{}:{}{} ({} backend, {} workers, compression {}, max-body {} bytes, max-page-size {}, timeout {}, shutdown grace {}s)",
151 cfg.server.listen,
152 cfg.server.port,
153 if prefix.is_empty() {
154 "".into()
155 } else {
156 format!("{prefix}/")
157 },
158 label,
159 workers
160 .map(|w| w.to_string())
161 .unwrap_or_else(|| "auto".into()),
162 if compress { "on" } else { "off" },
163 max_body,
164 max_page_size,
165 if timeout_ms == 0 {
166 "off".into()
167 } else {
168 format!("{timeout_ms} ms")
169 },
170 shutdown_secs,
171 );
172
173 log_routes(&prefix, backend.as_ref());
174
175 #[cfg(feature = "docs")]
176 if docs_cfg.enabled {
177 log::info!(" {} (mkdocs site):", docs_cfg.path);
178 log::info!(" GET {}/", docs_cfg.path);
179 log::info!(" GET {}/{{path}}", docs_cfg.path);
180 }
181
182 #[cfg(feature = "swagger")]
183 if swagger_cfg.enabled {
184 log::info!(" {} (swagger UI):", swagger_cfg.path);
185 log::info!(" GET {}/", swagger_cfg.path);
186 log::info!(" GET {}/openapi.json", swagger_cfg.path);
187 }
188
189 #[cfg(feature = "explorer")]
190 if explorer_cfg.enabled {
191 log::info!(" {} (explorer UI):", explorer_cfg.path);
192 log::info!(" GET {}/", explorer_cfg.path);
193 log::info!(" GET {}/datasets/{{name}}", explorer_cfg.path);
194 }
195
196 #[cfg(feature = "swagger")]
203 let swagger_oauth2 = if swagger_cfg.enabled {
204 match swagger_cfg.oauth2.as_ref() {
205 Some(o) => match crate::swagger::resolve_oauth2(o).await {
206 Ok(resolved) => Some(resolved),
207 Err(e) => {
208 log::warn!(
209 "[swagger.oauth2] OIDC discovery for issuer {} failed ({e}); \
210 serving docs without the Authorize button",
211 o.issuer
212 );
213 None
214 }
215 },
216 None => None,
217 }
218 } else {
219 None
220 };
221
222 #[cfg(feature = "metrics")]
227 let prometheus = {
228 use actix_web_prom::PrometheusMetricsBuilder;
229 PrometheusMetricsBuilder::new("datapress")
230 .endpoint(metrics_cfg.path.as_str())
231 .build()
232 .map_err(|e| std::io::Error::other(format!("metrics init failed: {e}")))?
233 };
234 #[cfg(feature = "metrics")]
235 let metrics_enabled = metrics_cfg.enabled;
236
237 #[cfg(feature = "metrics")]
238 if metrics_cfg.enabled {
239 log::info!(" {} (prometheus metrics):", metrics_cfg.path);
240 log::info!(" GET {}", metrics_cfg.path);
241 }
242
243 let build_info = web::Data::new(handlers::BuildInfo::new(
244 match label {
247 "DuckDB" => "DuckDB",
248 "DataFusion" => "DataFusion",
249 _ => "unknown",
250 },
251 ));
252
253 let parquet_cache = web::Data::new(handlers::ParquetCache::default());
257
258 #[cfg(feature = "explorer")]
261 let explorer_state = if explorer_cfg.enabled {
262 Some(web::Data::new(crate::explorer::ExplorerState {
263 backend: backend.clone(),
264 datasets: cfg.datasets.clone(),
265 explorer_base: explorer_cfg.path.clone(),
266 api_base: format!("{prefix}/api/v1"),
267 backend_label: label.to_string(),
268 sql_enabled: cfg.sql.enabled,
269 }))
270 } else {
271 None
272 };
273
274 let mut server = HttpServer::new(move || {
275 let backend = backend.clone();
276 let prefix = prefix.clone();
277 let json_cfg = web::JsonConfig::default().limit(max_body);
278 let pay_cfg = web::PayloadConfig::default().limit(max_body);
279 let query_limits = handlers::QueryLimits { max_page_size };
280 let timeout = Timeout::new(Duration::from_millis(timeout_ms.max(1)));
281 #[cfg(feature = "docs")]
282 let docs_cfg = docs_cfg.clone();
283 #[cfg(feature = "explorer")]
284 let explorer_state = explorer_state.clone();
285 #[cfg(feature = "swagger")]
286 let swagger_cfg = swagger_cfg.clone();
287 #[cfg(feature = "swagger")]
288 let swagger_oauth2 = swagger_oauth2.clone();
289 #[cfg(feature = "auth")]
290 let auth_state = auth_state.clone();
291 #[cfg(feature = "metrics")]
292 let prometheus = prometheus.clone();
293 let app = App::new()
294 .app_data(web::Data::new(backend))
295 .app_data(build_info.clone())
296 .app_data(web::Data::new(query_limits))
297 .app_data(web::Data::new(sql_settings))
298 .app_data(parquet_cache.clone())
299 .app_data(json_cfg)
300 .app_data(pay_cfg)
301 .wrap(middleware::Condition::new(timeout_ms > 0, timeout))
302 .wrap(middleware::Condition::new(
303 compress,
304 middleware::Compress::default(),
305 ))
306 .wrap(middleware::Logger::new("%a \"%r\" %s %b bytes %Dms"));
307 #[cfg(feature = "auth")]
313 let app = match auth_state.clone() {
314 Some(state) => app
315 .app_data(web::Data::new(state.cfg.clone()))
316 .wrap(crate::auth::Auth::new(state)),
317 None => app.wrap(crate::auth::Auth::disabled()),
318 };
319 #[cfg(feature = "metrics")]
325 let app = app.wrap(middleware::Condition::new(metrics_enabled, prometheus));
326 let app = app
327 .service(handlers::healthz)
328 .service(handlers::readyz)
329 .service(handlers::version);
330 #[cfg(feature = "docs")]
337 let app = if docs_cfg.enabled {
338 app.configure(|c| crate::docs::configure(&docs_cfg.path, c))
339 } else {
340 app
341 };
342 #[cfg(feature = "swagger")]
343 let app = if swagger_cfg.enabled {
344 app.configure(|c| {
345 crate::swagger::configure(&swagger_cfg.path, swagger_oauth2.as_ref(), c)
346 })
347 } else {
348 app
349 };
350 #[cfg(feature = "explorer")]
353 let app = match explorer_state {
354 Some(state) => app.configure(|c| crate::explorer::configure(state, c)),
355 None => app,
356 };
357 app.service(
358 web::scope(prefix.as_str())
359 .service(handlers::health)
360 .service(web::scope("/api/v1").configure(handlers::v1::configure))
362 .service(web::scope("/api").configure(handlers::v1::configure)),
367 )
368 });
369 if let Some(w) = workers {
370 server = server.workers(w);
371 }
372
373 let running = server
377 .bind(addr)?
378 .shutdown_timeout(shutdown_secs)
379 .disable_signals()
380 .run();
381 let handle = running.handle();
382 tokio::spawn(shutdown_listener(handle, shutdown_secs, shutdown));
383
384 running.await
385}
386
387async fn shutdown_listener(
390 handle: actix_web::dev::ServerHandle,
391 grace_secs: u64,
392 shutdown: Shutdown,
393) {
394 match shutdown {
395 Shutdown::Signals => {
396 let which = wait_for_signal().await;
397 log::info!(
398 "Received {which}, shutting down gracefully (up to {grace_secs}s for in-flight requests)..."
399 );
400 }
401 Shutdown::External(fut) => {
402 fut.await;
403 log::info!(
404 "Shutdown requested by host, draining in-flight requests (up to {grace_secs}s)..."
405 );
406 }
407 }
408 handle.stop(true).await;
409 log::info!("Shutdown complete.");
410}
411
412#[cfg(unix)]
413async fn wait_for_signal() -> &'static str {
414 use tokio::signal::unix::{SignalKind, signal};
415 let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
418 let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
419 tokio::select! {
420 _ = sigterm.recv() => "SIGTERM",
421 _ = sigint.recv() => "SIGINT",
422 }
423}
424
425#[cfg(not(unix))]
426async fn wait_for_signal() -> &'static str {
427 let _ = tokio::signal::ctrl_c().await;
429 "Ctrl+C"
430}
431
432fn log_routes(prefix: &str, backend: &dyn Backend) {
437 const METHOD_W: usize = 6;
440
441 let p = prefix; log::info!("Routes:");
444 log::info!(" general:");
445 for (method, path) in [
446 ("GET", "/healthz".to_string()),
447 ("GET", "/readyz".to_string()),
448 ("GET", "/version".to_string()),
449 ("GET", format!("{p}/health")),
450 ] {
451 log::info!(" {:<width$} {}", method, path, width = METHOD_W);
452 }
453
454 let mounts: &[(&str, &[(&str, &str)])] = &[
457 ("/api/v1", handlers::v1::ROUTES),
458 ("/api", handlers::v1::ROUTES), ];
460
461 let names = backend.names();
462 for (mount, routes) in mounts {
463 log::info!(" {p}{mount}:");
464 for (method, suffix) in *routes {
466 if !suffix.contains("{name}") {
467 log::info!(
468 " {:<width$} {p}{mount}{suffix}",
469 method,
470 width = METHOD_W,
471 );
472 }
473 }
474 if names.is_empty() {
475 log::info!(" (no datasets registered)");
476 continue;
477 }
478 for name in &names {
479 for (method, suffix) in *routes {
480 if let Some(rest) = suffix.strip_prefix("/datasets/{name}") {
481 log::info!(
482 " {:<width$} {p}{mount}/datasets/{name}{rest}",
483 method,
484 width = METHOD_W,
485 );
486 }
487 }
488 }
489 }
490}