1use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use actix_web::{App, HttpServer, middleware, web};
10
11use crate::backend::Backend;
12use crate::config::AppConfig;
13use crate::handlers;
14use crate::timeout::Timeout;
15
16enum Shutdown {
18 Signals,
22 External(Pin<Box<dyn Future<Output = ()> + Send>>),
27}
28
29pub async fn serve(cfg: AppConfig, backend: Arc<dyn Backend>, label: &str) -> std::io::Result<()> {
39 run_server(cfg, backend, label, Shutdown::Signals).await
40}
41
42pub async fn serve_with_shutdown(
52 cfg: AppConfig,
53 backend: Arc<dyn Backend>,
54 label: &str,
55 shutdown: impl Future<Output = ()> + Send + 'static,
56) -> std::io::Result<()> {
57 run_server(cfg, backend, label, Shutdown::External(Box::pin(shutdown))).await
58}
59
60async fn run_server(
61 cfg: AppConfig,
62 backend: Arc<dyn Backend>,
63 label: &str,
64 shutdown: Shutdown,
65) -> std::io::Result<()> {
66 let addr = (cfg.server.listen, cfg.server.port);
67 let workers = cfg.server.workers;
68 let prefix = cfg.server.prefix.clone();
69 let compress = cfg.server.compress;
70 let max_body = cfg.server.max_body_bytes;
71 let max_page_size = cfg.server.max_page_size;
72 let timeout_ms = cfg.server.request_timeout_ms;
73 let shutdown_secs = cfg.server.shutdown_timeout_secs;
74 let docs_cfg = cfg.docs.clone();
75 let swagger_cfg = cfg.swagger.clone();
76 let metrics_cfg = cfg.metrics.clone();
77 let explorer_cfg = cfg.explorer.clone();
78
79 #[cfg(not(feature = "docs"))]
82 if docs_cfg.enabled {
83 log::warn!(
84 "[docs] enabled = true in config, but this binary was built \
85 without --features docs; skipping docs site"
86 );
87 }
88 #[cfg(not(feature = "swagger"))]
89 if swagger_cfg.enabled {
90 log::warn!(
91 "[swagger] enabled = true in config, but this binary was built \
92 without --features swagger; skipping Swagger UI"
93 );
94 }
95 #[cfg(not(feature = "auth"))]
96 if cfg.auth.enabled {
97 log::warn!(
98 "[auth] enabled = true in config, but this binary was built \
99 without --features auth; skipping OIDC enforcement"
100 );
101 }
102 #[cfg(not(feature = "metrics"))]
103 if metrics_cfg.enabled {
104 log::warn!(
105 "[metrics] enabled = true in config, but this binary was built \
106 without --features metrics; skipping Prometheus endpoint"
107 );
108 }
109 #[cfg(not(feature = "explorer"))]
110 if explorer_cfg.enabled {
111 log::warn!(
112 "[explorer] enabled = true in config, but this binary was built \
113 without --features explorer; skipping explorer UI"
114 );
115 }
116
117 #[cfg(feature = "auth")]
122 let auth_state = if cfg.auth.enabled {
123 let jwks = crate::auth::JwksCache::boot(&cfg.auth)
124 .await
125 .map_err(|e| std::io::Error::other(format!("auth bootstrap failed: {e}")))?;
126 log::info!(
127 "[auth] OIDC enforcement enabled (issuer = {}, audience = {}, read_scopes = {:?}, reload_scopes = {:?})",
128 cfg.auth.issuer,
129 if cfg.auth.audience.is_empty() {
130 "<none>"
131 } else {
132 cfg.auth.audience.as_str()
133 },
134 cfg.auth.read_scopes,
135 cfg.auth.reload_scopes,
136 );
137 Some(crate::auth::AuthState {
138 cfg: Arc::new(cfg.auth.clone()),
139 jwks,
140 })
141 } else {
142 None
143 };
144
145 log::info!(
146 "Listening on http://{}:{}{} ({} backend, {} workers, compression {}, max-body {} bytes, max-page-size {}, timeout {}, shutdown grace {}s)",
147 cfg.server.listen,
148 cfg.server.port,
149 if prefix.is_empty() {
150 "".into()
151 } else {
152 format!("{prefix}/")
153 },
154 label,
155 workers
156 .map(|w| w.to_string())
157 .unwrap_or_else(|| "auto".into()),
158 if compress { "on" } else { "off" },
159 max_body,
160 max_page_size,
161 if timeout_ms == 0 {
162 "off".into()
163 } else {
164 format!("{timeout_ms} ms")
165 },
166 shutdown_secs,
167 );
168
169 log_routes(&prefix, backend.as_ref());
170
171 #[cfg(feature = "docs")]
172 if docs_cfg.enabled {
173 log::info!(" {} (mkdocs site):", docs_cfg.path);
174 log::info!(" GET {}/", docs_cfg.path);
175 log::info!(" GET {}/{{path}}", docs_cfg.path);
176 }
177
178 #[cfg(feature = "swagger")]
179 if swagger_cfg.enabled {
180 log::info!(" {} (swagger UI):", swagger_cfg.path);
181 log::info!(" GET {}/", swagger_cfg.path);
182 log::info!(" GET {}/openapi.json", swagger_cfg.path);
183 }
184
185 #[cfg(feature = "explorer")]
186 if explorer_cfg.enabled {
187 log::info!(" {} (explorer UI):", explorer_cfg.path);
188 log::info!(" GET {}/", explorer_cfg.path);
189 log::info!(" GET {}/datasets/{{name}}", explorer_cfg.path);
190 }
191
192 #[cfg(feature = "swagger")]
199 let swagger_oauth2 = if swagger_cfg.enabled {
200 match swagger_cfg.oauth2.as_ref() {
201 Some(o) => match crate::swagger::resolve_oauth2(o).await {
202 Ok(resolved) => Some(resolved),
203 Err(e) => {
204 log::warn!(
205 "[swagger.oauth2] OIDC discovery for issuer {} failed ({e}); \
206 serving docs without the Authorize button",
207 o.issuer
208 );
209 None
210 }
211 },
212 None => None,
213 }
214 } else {
215 None
216 };
217
218 #[cfg(feature = "metrics")]
223 let prometheus = {
224 use actix_web_prom::PrometheusMetricsBuilder;
225 PrometheusMetricsBuilder::new("datapress")
226 .endpoint(metrics_cfg.path.as_str())
227 .build()
228 .map_err(|e| std::io::Error::other(format!("metrics init failed: {e}")))?
229 };
230 #[cfg(feature = "metrics")]
231 let metrics_enabled = metrics_cfg.enabled;
232
233 #[cfg(feature = "metrics")]
234 if metrics_cfg.enabled {
235 log::info!(" {} (prometheus metrics):", metrics_cfg.path);
236 log::info!(" GET {}", metrics_cfg.path);
237 }
238
239 let build_info = web::Data::new(handlers::BuildInfo::new(
240 match label {
243 "DuckDB" => "DuckDB",
244 "DataFusion" => "DataFusion",
245 _ => "unknown",
246 },
247 ));
248
249 let parquet_cache = web::Data::new(handlers::ParquetCache::default());
253
254 #[cfg(feature = "explorer")]
257 let explorer_state = if explorer_cfg.enabled {
258 Some(web::Data::new(crate::explorer::ExplorerState {
259 backend: backend.clone(),
260 datasets: cfg.datasets.clone(),
261 explorer_base: explorer_cfg.path.clone(),
262 api_base: format!("{prefix}/api/v1"),
263 backend_label: label.to_string(),
264 }))
265 } else {
266 None
267 };
268
269 let mut server = HttpServer::new(move || {
270 let backend = backend.clone();
271 let prefix = prefix.clone();
272 let json_cfg = web::JsonConfig::default().limit(max_body);
273 let pay_cfg = web::PayloadConfig::default().limit(max_body);
274 let query_limits = handlers::QueryLimits { max_page_size };
275 let timeout = Timeout::new(Duration::from_millis(timeout_ms.max(1)));
276 #[cfg(feature = "docs")]
277 let docs_cfg = docs_cfg.clone();
278 #[cfg(feature = "explorer")]
279 let explorer_state = explorer_state.clone();
280 #[cfg(feature = "swagger")]
281 let swagger_cfg = swagger_cfg.clone();
282 #[cfg(feature = "swagger")]
283 let swagger_oauth2 = swagger_oauth2.clone();
284 #[cfg(feature = "auth")]
285 let auth_state = auth_state.clone();
286 #[cfg(feature = "metrics")]
287 let prometheus = prometheus.clone();
288 let app = App::new()
289 .app_data(web::Data::new(backend))
290 .app_data(build_info.clone())
291 .app_data(web::Data::new(query_limits))
292 .app_data(parquet_cache.clone())
293 .app_data(json_cfg)
294 .app_data(pay_cfg)
295 .wrap(middleware::Condition::new(timeout_ms > 0, timeout))
296 .wrap(middleware::Condition::new(
297 compress,
298 middleware::Compress::default(),
299 ))
300 .wrap(middleware::Logger::new("%a \"%r\" %s %b bytes %Dms"));
301 #[cfg(feature = "auth")]
307 let app = match auth_state.clone() {
308 Some(state) => app
309 .app_data(web::Data::new(state.cfg.clone()))
310 .wrap(crate::auth::Auth::new(state)),
311 None => app.wrap(crate::auth::Auth::disabled()),
312 };
313 #[cfg(feature = "metrics")]
319 let app = app.wrap(middleware::Condition::new(metrics_enabled, prometheus));
320 let app = app
321 .service(handlers::healthz)
322 .service(handlers::readyz)
323 .service(handlers::version);
324 #[cfg(feature = "docs")]
331 let app = if docs_cfg.enabled {
332 app.configure(|c| crate::docs::configure(&docs_cfg.path, c))
333 } else {
334 app
335 };
336 #[cfg(feature = "swagger")]
337 let app = if swagger_cfg.enabled {
338 app.configure(|c| {
339 crate::swagger::configure(&swagger_cfg.path, swagger_oauth2.as_ref(), c)
340 })
341 } else {
342 app
343 };
344 #[cfg(feature = "explorer")]
347 let app = match explorer_state {
348 Some(state) => app.configure(|c| crate::explorer::configure(state, c)),
349 None => app,
350 };
351 app.service(
352 web::scope(prefix.as_str())
353 .service(handlers::health)
354 .service(web::scope("/api/v1").configure(handlers::v1::configure))
356 .service(web::scope("/api").configure(handlers::v1::configure)),
361 )
362 });
363 if let Some(w) = workers {
364 server = server.workers(w);
365 }
366
367 let running = server
371 .bind(addr)?
372 .shutdown_timeout(shutdown_secs)
373 .disable_signals()
374 .run();
375 let handle = running.handle();
376 tokio::spawn(shutdown_listener(handle, shutdown_secs, shutdown));
377
378 running.await
379}
380
381async fn shutdown_listener(
384 handle: actix_web::dev::ServerHandle,
385 grace_secs: u64,
386 shutdown: Shutdown,
387) {
388 match shutdown {
389 Shutdown::Signals => {
390 let which = wait_for_signal().await;
391 log::info!(
392 "Received {which}, shutting down gracefully (up to {grace_secs}s for in-flight requests)..."
393 );
394 }
395 Shutdown::External(fut) => {
396 fut.await;
397 log::info!(
398 "Shutdown requested by host, draining in-flight requests (up to {grace_secs}s)..."
399 );
400 }
401 }
402 handle.stop(true).await;
403 log::info!("Shutdown complete.");
404}
405
406#[cfg(unix)]
407async fn wait_for_signal() -> &'static str {
408 use tokio::signal::unix::{SignalKind, signal};
409 let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
412 let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
413 tokio::select! {
414 _ = sigterm.recv() => "SIGTERM",
415 _ = sigint.recv() => "SIGINT",
416 }
417}
418
419#[cfg(not(unix))]
420async fn wait_for_signal() -> &'static str {
421 let _ = tokio::signal::ctrl_c().await;
423 "Ctrl+C"
424}
425
426fn log_routes(prefix: &str, backend: &dyn Backend) {
431 const METHOD_W: usize = 6;
434
435 let p = prefix; log::info!("Routes:");
438 log::info!(" general:");
439 for (method, path) in [
440 ("GET", "/healthz".to_string()),
441 ("GET", "/readyz".to_string()),
442 ("GET", "/version".to_string()),
443 ("GET", format!("{p}/health")),
444 ] {
445 log::info!(" {:<width$} {}", method, path, width = METHOD_W);
446 }
447
448 let mounts: &[(&str, &[(&str, &str)])] = &[
451 ("/api/v1", handlers::v1::ROUTES),
452 ("/api", handlers::v1::ROUTES), ];
454
455 let names = backend.names();
456 for (mount, routes) in mounts {
457 log::info!(" {p}{mount}:");
458 for (method, suffix) in *routes {
460 if !suffix.contains("{name}") {
461 log::info!(
462 " {:<width$} {p}{mount}{suffix}",
463 method,
464 width = METHOD_W,
465 );
466 }
467 }
468 if names.is_empty() {
469 log::info!(" (no datasets registered)");
470 continue;
471 }
472 for name in &names {
473 for (method, suffix) in *routes {
474 if let Some(rest) = suffix.strip_prefix("/datasets/{name}") {
475 log::info!(
476 " {:<width$} {p}{mount}/datasets/{name}{rest}",
477 method,
478 width = METHOD_W,
479 );
480 }
481 }
482 }
483 }
484}