1use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use actix_web::{App, HttpServer, middleware, web};
10
11use crate::backend::Backend;
12use crate::config::AppConfig;
13use crate::handlers;
14use crate::timeout::Timeout;
15
16enum Shutdown {
18 Signals,
22 External(Pin<Box<dyn Future<Output = ()> + Send>>),
27}
28
29pub async fn serve(cfg: AppConfig, backend: Arc<dyn Backend>, label: &str) -> std::io::Result<()> {
39 run_server(cfg, backend, label, Shutdown::Signals).await
40}
41
42pub async fn serve_with_shutdown(
52 cfg: AppConfig,
53 backend: Arc<dyn Backend>,
54 label: &str,
55 shutdown: impl Future<Output = ()> + Send + 'static,
56) -> std::io::Result<()> {
57 run_server(cfg, backend, label, Shutdown::External(Box::pin(shutdown))).await
58}
59
60async fn run_server(
61 cfg: AppConfig,
62 backend: Arc<dyn Backend>,
63 label: &str,
64 shutdown: Shutdown,
65) -> std::io::Result<()> {
66 let addr = (cfg.server.listen, cfg.server.port);
67 let workers = cfg.server.workers;
68 let prefix = cfg.server.prefix.clone();
69 let compress = cfg.server.compress;
70 let max_body = cfg.server.max_body_bytes;
71 let max_page_size = cfg.server.max_page_size;
72 let timeout_ms = cfg.server.request_timeout_ms;
73 let shutdown_secs = cfg.server.shutdown_timeout_secs;
74 let sql_settings = handlers::SqlSettings {
75 enabled: cfg.sql.enabled,
76 max_rows: cfg.sql.max_rows.max(1),
77 };
78 let docs_cfg = cfg.docs.clone();
79 let swagger_cfg = cfg.swagger.clone();
80 let metrics_cfg = cfg.metrics.clone();
81 let explorer_cfg = cfg.explorer.clone();
82
83 #[cfg(not(feature = "docs"))]
86 if docs_cfg.enabled {
87 log::warn!(
88 "[docs] enabled = true in config, but this binary was built \
89 without --features docs; skipping docs site"
90 );
91 }
92 #[cfg(not(feature = "swagger"))]
93 if swagger_cfg.enabled {
94 log::warn!(
95 "[swagger] enabled = true in config, but this binary was built \
96 without --features swagger; skipping Swagger UI"
97 );
98 }
99 #[cfg(not(feature = "auth"))]
100 if cfg.auth.enabled {
101 log::warn!(
102 "[auth] enabled = true in config, but this binary was built \
103 without --features auth; skipping OIDC enforcement"
104 );
105 }
106 #[cfg(not(feature = "metrics"))]
107 if metrics_cfg.enabled {
108 log::warn!(
109 "[metrics] enabled = true in config, but this binary was built \
110 without --features metrics; skipping Prometheus endpoint"
111 );
112 }
113 #[cfg(not(feature = "explorer"))]
114 if explorer_cfg.enabled {
115 log::warn!(
116 "[explorer] enabled = true in config, but this binary was built \
117 without --features explorer; skipping explorer UI"
118 );
119 }
120
121 #[cfg(feature = "auth")]
126 let auth_state = if cfg.auth.enabled {
127 let jwks = crate::auth::JwksCache::boot(&cfg.auth)
128 .await
129 .map_err(|e| std::io::Error::other(format!("auth bootstrap failed: {e}")))?;
130 log::info!(
131 "[auth] OIDC enforcement enabled (issuer = {}, audience = {}, read_scopes = {:?}, reload_scopes = {:?})",
132 cfg.auth.issuer,
133 if cfg.auth.audience.is_empty() {
134 "<none>"
135 } else {
136 cfg.auth.audience.as_str()
137 },
138 cfg.auth.read_scopes,
139 cfg.auth.reload_scopes,
140 );
141 Some(crate::auth::AuthState {
142 cfg: Arc::new(cfg.auth.clone()),
143 jwks,
144 })
145 } else {
146 None
147 };
148
149 log::info!(
150 "Listening on http://{}:{}{} ({} backend, {} workers, compression {}, max-body {} bytes, max-page-size {}, timeout {}, shutdown grace {}s)",
151 cfg.server.listen,
152 cfg.server.port,
153 if prefix.is_empty() {
154 "".into()
155 } else {
156 format!("{prefix}/")
157 },
158 label,
159 workers
160 .map(|w| w.to_string())
161 .unwrap_or_else(|| "auto".into()),
162 if compress { "on" } else { "off" },
163 max_body,
164 max_page_size,
165 if timeout_ms == 0 {
166 "off".into()
167 } else {
168 format!("{timeout_ms} ms")
169 },
170 shutdown_secs,
171 );
172
173 log_routes(&prefix, backend.as_ref());
174
175 #[cfg(feature = "docs")]
176 if docs_cfg.enabled {
177 log::info!(" {} (mkdocs site):", docs_cfg.path);
178 log::info!(" GET {}/", docs_cfg.path);
179 log::info!(" GET {}/{{path}}", docs_cfg.path);
180 }
181
182 #[cfg(feature = "swagger")]
183 if swagger_cfg.enabled {
184 log::info!(" {} (swagger UI):", swagger_cfg.path);
185 log::info!(" GET {}/", swagger_cfg.path);
186 log::info!(" GET {}/openapi.json", swagger_cfg.path);
187 }
188
189 #[cfg(feature = "explorer")]
190 if explorer_cfg.enabled {
191 log::info!(" {} (explorer UI):", explorer_cfg.path);
192 log::info!(" GET {}/", explorer_cfg.path);
193 log::info!(" GET {}/datasets/{{name}}", explorer_cfg.path);
194 }
195
196 #[cfg(feature = "swagger")]
203 let swagger_oauth2 = if swagger_cfg.enabled {
204 match swagger_cfg.oauth2.as_ref() {
205 Some(o) => match crate::swagger::resolve_oauth2(o).await {
206 Ok(resolved) => Some(resolved),
207 Err(e) => {
208 log::warn!(
209 "[swagger.oauth2] OIDC discovery for issuer {} failed ({e}); \
210 serving docs without the Authorize button",
211 o.issuer
212 );
213 None
214 }
215 },
216 None => None,
217 }
218 } else {
219 None
220 };
221
222 #[cfg(feature = "metrics")]
227 let prometheus = {
228 use actix_web_prom::PrometheusMetricsBuilder;
229 PrometheusMetricsBuilder::new("datapress")
230 .endpoint(metrics_cfg.path.as_str())
231 .build()
232 .map_err(|e| std::io::Error::other(format!("metrics init failed: {e}")))?
233 };
234 #[cfg(feature = "metrics")]
235 let metrics_enabled = metrics_cfg.enabled;
236
237 #[cfg(feature = "metrics")]
238 if metrics_cfg.enabled {
239 log::info!(" {} (prometheus metrics):", metrics_cfg.path);
240 log::info!(" GET {}", metrics_cfg.path);
241 }
242
243 let build_info = web::Data::new(handlers::BuildInfo::new(
244 match label {
247 "DuckDB" => "DuckDB",
248 "DataFusion" => "DataFusion",
249 _ => "unknown",
250 },
251 ));
252
253 let parquet_cache = web::Data::new(handlers::ParquetCache::default());
257
258 #[cfg(feature = "explorer")]
261 let explorer_state = if explorer_cfg.enabled {
262 Some(web::Data::new(crate::explorer::ExplorerState {
263 backend: backend.clone(),
264 datasets: cfg.datasets.clone(),
265 explorer_base: explorer_cfg.path.clone(),
266 api_base: format!("{prefix}/api/v1"),
267 backend_label: label.to_string(),
268 }))
269 } else {
270 None
271 };
272
273 let mut server = HttpServer::new(move || {
274 let backend = backend.clone();
275 let prefix = prefix.clone();
276 let json_cfg = web::JsonConfig::default().limit(max_body);
277 let pay_cfg = web::PayloadConfig::default().limit(max_body);
278 let query_limits = handlers::QueryLimits { max_page_size };
279 let timeout = Timeout::new(Duration::from_millis(timeout_ms.max(1)));
280 #[cfg(feature = "docs")]
281 let docs_cfg = docs_cfg.clone();
282 #[cfg(feature = "explorer")]
283 let explorer_state = explorer_state.clone();
284 #[cfg(feature = "swagger")]
285 let swagger_cfg = swagger_cfg.clone();
286 #[cfg(feature = "swagger")]
287 let swagger_oauth2 = swagger_oauth2.clone();
288 #[cfg(feature = "auth")]
289 let auth_state = auth_state.clone();
290 #[cfg(feature = "metrics")]
291 let prometheus = prometheus.clone();
292 let app = App::new()
293 .app_data(web::Data::new(backend))
294 .app_data(build_info.clone())
295 .app_data(web::Data::new(query_limits))
296 .app_data(web::Data::new(sql_settings))
297 .app_data(parquet_cache.clone())
298 .app_data(json_cfg)
299 .app_data(pay_cfg)
300 .wrap(middleware::Condition::new(timeout_ms > 0, timeout))
301 .wrap(middleware::Condition::new(
302 compress,
303 middleware::Compress::default(),
304 ))
305 .wrap(middleware::Logger::new("%a \"%r\" %s %b bytes %Dms"));
306 #[cfg(feature = "auth")]
312 let app = match auth_state.clone() {
313 Some(state) => app
314 .app_data(web::Data::new(state.cfg.clone()))
315 .wrap(crate::auth::Auth::new(state)),
316 None => app.wrap(crate::auth::Auth::disabled()),
317 };
318 #[cfg(feature = "metrics")]
324 let app = app.wrap(middleware::Condition::new(metrics_enabled, prometheus));
325 let app = app
326 .service(handlers::healthz)
327 .service(handlers::readyz)
328 .service(handlers::version);
329 #[cfg(feature = "docs")]
336 let app = if docs_cfg.enabled {
337 app.configure(|c| crate::docs::configure(&docs_cfg.path, c))
338 } else {
339 app
340 };
341 #[cfg(feature = "swagger")]
342 let app = if swagger_cfg.enabled {
343 app.configure(|c| {
344 crate::swagger::configure(&swagger_cfg.path, swagger_oauth2.as_ref(), c)
345 })
346 } else {
347 app
348 };
349 #[cfg(feature = "explorer")]
352 let app = match explorer_state {
353 Some(state) => app.configure(|c| crate::explorer::configure(state, c)),
354 None => app,
355 };
356 app.service(
357 web::scope(prefix.as_str())
358 .service(handlers::health)
359 .service(web::scope("/api/v1").configure(handlers::v1::configure))
361 .service(web::scope("/api").configure(handlers::v1::configure)),
366 )
367 });
368 if let Some(w) = workers {
369 server = server.workers(w);
370 }
371
372 let running = server
376 .bind(addr)?
377 .shutdown_timeout(shutdown_secs)
378 .disable_signals()
379 .run();
380 let handle = running.handle();
381 tokio::spawn(shutdown_listener(handle, shutdown_secs, shutdown));
382
383 running.await
384}
385
386async fn shutdown_listener(
389 handle: actix_web::dev::ServerHandle,
390 grace_secs: u64,
391 shutdown: Shutdown,
392) {
393 match shutdown {
394 Shutdown::Signals => {
395 let which = wait_for_signal().await;
396 log::info!(
397 "Received {which}, shutting down gracefully (up to {grace_secs}s for in-flight requests)..."
398 );
399 }
400 Shutdown::External(fut) => {
401 fut.await;
402 log::info!(
403 "Shutdown requested by host, draining in-flight requests (up to {grace_secs}s)..."
404 );
405 }
406 }
407 handle.stop(true).await;
408 log::info!("Shutdown complete.");
409}
410
411#[cfg(unix)]
412async fn wait_for_signal() -> &'static str {
413 use tokio::signal::unix::{SignalKind, signal};
414 let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
417 let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
418 tokio::select! {
419 _ = sigterm.recv() => "SIGTERM",
420 _ = sigint.recv() => "SIGINT",
421 }
422}
423
424#[cfg(not(unix))]
425async fn wait_for_signal() -> &'static str {
426 let _ = tokio::signal::ctrl_c().await;
428 "Ctrl+C"
429}
430
431fn log_routes(prefix: &str, backend: &dyn Backend) {
436 const METHOD_W: usize = 6;
439
440 let p = prefix; log::info!("Routes:");
443 log::info!(" general:");
444 for (method, path) in [
445 ("GET", "/healthz".to_string()),
446 ("GET", "/readyz".to_string()),
447 ("GET", "/version".to_string()),
448 ("GET", format!("{p}/health")),
449 ] {
450 log::info!(" {:<width$} {}", method, path, width = METHOD_W);
451 }
452
453 let mounts: &[(&str, &[(&str, &str)])] = &[
456 ("/api/v1", handlers::v1::ROUTES),
457 ("/api", handlers::v1::ROUTES), ];
459
460 let names = backend.names();
461 for (mount, routes) in mounts {
462 log::info!(" {p}{mount}:");
463 for (method, suffix) in *routes {
465 if !suffix.contains("{name}") {
466 log::info!(
467 " {:<width$} {p}{mount}{suffix}",
468 method,
469 width = METHOD_W,
470 );
471 }
472 }
473 if names.is_empty() {
474 log::info!(" (no datasets registered)");
475 continue;
476 }
477 for name in &names {
478 for (method, suffix) in *routes {
479 if let Some(rest) = suffix.strip_prefix("/datasets/{name}") {
480 log::info!(
481 " {:<width$} {p}{mount}/datasets/{name}{rest}",
482 method,
483 width = METHOD_W,
484 );
485 }
486 }
487 }
488 }
489}