1use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use actix_web::{App, HttpServer, middleware, web};
10
11use crate::backend::Backend;
12use crate::config::AppConfig;
13use crate::handlers;
14use crate::timeout::Timeout;
15
16enum Shutdown {
18 Signals,
22 External(Pin<Box<dyn Future<Output = ()> + Send>>),
27}
28
29pub async fn serve(cfg: AppConfig, backend: Arc<dyn Backend>, label: &str) -> std::io::Result<()> {
39 run_server(cfg, backend, label, Shutdown::Signals).await
40}
41
42pub async fn serve_with_shutdown(
52 cfg: AppConfig,
53 backend: Arc<dyn Backend>,
54 label: &str,
55 shutdown: impl Future<Output = ()> + Send + 'static,
56) -> std::io::Result<()> {
57 run_server(cfg, backend, label, Shutdown::External(Box::pin(shutdown))).await
58}
59
60async fn run_server(
61 cfg: AppConfig,
62 backend: Arc<dyn Backend>,
63 label: &str,
64 shutdown: Shutdown,
65) -> std::io::Result<()> {
66 let addr = (cfg.server.listen, cfg.server.port);
67 let workers = cfg.server.workers;
68 let prefix = cfg.server.prefix.clone();
69 let compress = cfg.server.compress;
70 let max_body = cfg.server.max_body_bytes;
71 let max_page_size = cfg.server.max_page_size;
72 let timeout_ms = cfg.server.request_timeout_ms;
73 let shutdown_secs = cfg.server.shutdown_timeout_secs;
74 let sql_settings = handlers::SqlSettings {
75 enabled: cfg.sql.enabled,
76 max_rows: cfg.sql.max_rows.max(1),
77 };
78 let docs_cfg = cfg.docs.clone();
79 let swagger_cfg = cfg.swagger.clone();
80 let metrics_cfg = cfg.metrics.clone();
81 let explorer_cfg = cfg.explorer.clone();
82
83 #[cfg(not(feature = "docs"))]
86 if docs_cfg.enabled {
87 log::warn!(
88 "[docs] enabled = true in config, but this binary was built \
89 without --features docs; skipping docs site"
90 );
91 }
92 #[cfg(not(feature = "swagger"))]
93 if swagger_cfg.enabled {
94 log::warn!(
95 "[swagger] enabled = true in config, but this binary was built \
96 without --features swagger; skipping Swagger UI"
97 );
98 }
99 #[cfg(not(feature = "auth"))]
100 if cfg.auth.enabled {
101 log::warn!(
102 "[auth] enabled = true in config, but this binary was built \
103 without --features auth; skipping OIDC enforcement"
104 );
105 }
106 #[cfg(not(feature = "metrics"))]
107 if metrics_cfg.enabled {
108 log::warn!(
109 "[metrics] enabled = true in config, but this binary was built \
110 without --features metrics; skipping Prometheus endpoint"
111 );
112 }
113 #[cfg(not(feature = "explorer"))]
114 if explorer_cfg.enabled {
115 log::warn!(
116 "[explorer] enabled = true in config, but this binary was built \
117 without --features explorer; skipping explorer UI"
118 );
119 }
120
121 #[cfg(feature = "auth")]
126 let auth_state = if cfg.auth.enabled {
127 let jwks = crate::auth::JwksCache::boot(&cfg.auth)
128 .await
129 .map_err(|e| std::io::Error::other(format!("auth bootstrap failed: {e}")))?;
130 log::info!(
131 "[auth] OIDC enforcement enabled (issuer = {}, audience = {}, read_scopes = {:?}, reload_scopes = {:?})",
132 cfg.auth.issuer,
133 if cfg.auth.audience.is_empty() {
134 "<none>"
135 } else {
136 cfg.auth.audience.as_str()
137 },
138 cfg.auth.read_scopes,
139 cfg.auth.reload_scopes,
140 );
141 Some(crate::auth::AuthState {
142 cfg: Arc::new(cfg.auth.clone()),
143 jwks,
144 })
145 } else {
146 None
147 };
148
149 log::info!(
150 "Listening on http://{}:{}{} ({} backend, {} workers, compression {}, max-body {} bytes, max-page-size {}, timeout {}, shutdown grace {}s)",
151 cfg.server.listen,
152 cfg.server.port,
153 if prefix.is_empty() {
154 "".into()
155 } else {
156 format!("{prefix}/")
157 },
158 label,
159 workers
160 .map(|w| w.to_string())
161 .unwrap_or_else(|| "auto".into()),
162 if compress { "on" } else { "off" },
163 max_body,
164 max_page_size,
165 if timeout_ms == 0 {
166 "off".into()
167 } else {
168 format!("{timeout_ms} ms")
169 },
170 shutdown_secs,
171 );
172
173 log_routes(&prefix, backend.as_ref());
174
175 #[cfg(feature = "docs")]
176 if docs_cfg.enabled {
177 log::info!(" {} (mkdocs site):", docs_cfg.path);
178 log::info!(" GET {}/", docs_cfg.path);
179 log::info!(" GET {}/{{path}}", docs_cfg.path);
180 }
181
182 #[cfg(feature = "swagger")]
183 if swagger_cfg.enabled {
184 log::info!(" {} (swagger UI):", swagger_cfg.path);
185 log::info!(" GET {}/", swagger_cfg.path);
186 log::info!(" GET {}/openapi.json", swagger_cfg.path);
187 }
188
189 #[cfg(feature = "explorer")]
190 if explorer_cfg.enabled {
191 log::info!(" {} (explorer UI):", explorer_cfg.path);
192 log::info!(" GET {}/", explorer_cfg.path);
193 log::info!(" GET {}/datasets/{{name}}", explorer_cfg.path);
194 }
195
196 #[cfg(feature = "swagger")]
203 let swagger_oauth2 = if swagger_cfg.enabled {
204 match swagger_cfg.oauth2.as_ref() {
205 Some(o) => match crate::swagger::resolve_oauth2(o).await {
206 Ok(resolved) => Some(resolved),
207 Err(e) => {
208 log::warn!(
209 "[swagger.oauth2] OIDC discovery for issuer {} failed ({e}); \
210 serving docs without the Authorize button",
211 o.issuer
212 );
213 None
214 }
215 },
216 None => None,
217 }
218 } else {
219 None
220 };
221
222 #[cfg(feature = "metrics")]
227 let prometheus = {
228 use actix_web_prom::PrometheusMetricsBuilder;
229 PrometheusMetricsBuilder::new("datapress")
230 .endpoint(metrics_cfg.path.as_str())
231 .build()
232 .map_err(|e| std::io::Error::other(format!("metrics init failed: {e}")))?
233 };
234 #[cfg(feature = "metrics")]
235 let metrics_enabled = metrics_cfg.enabled;
236
237 #[cfg(feature = "metrics")]
238 if metrics_cfg.enabled {
239 log::info!(" {} (prometheus metrics):", metrics_cfg.path);
240 log::info!(" GET {}", metrics_cfg.path);
241 }
242
243 let build_info = web::Data::new(handlers::BuildInfo::new(
244 match label {
247 "DuckDB" => "DuckDB",
248 "DataFusion" => "DataFusion",
249 _ => "unknown",
250 },
251 ));
252
253 let parquet_cache = web::Data::new(handlers::ParquetCache::default());
257
258 #[cfg(feature = "explorer")]
261 let explorer_state = if explorer_cfg.enabled {
262 let docs_url = {
265 #[cfg(feature = "docs")]
266 {
267 if docs_cfg.enabled {
268 docs_cfg.path.clone()
269 } else {
270 "https://docs.datap-rs.org".to_string()
271 }
272 }
273 #[cfg(not(feature = "docs"))]
274 {
275 "https://docs.datap-rs.org".to_string()
276 }
277 };
278 let swagger_url = {
281 #[cfg(feature = "swagger")]
282 {
283 if swagger_cfg.enabled {
284 Some(format!("{}/", swagger_cfg.path))
285 } else {
286 None
287 }
288 }
289 #[cfg(not(feature = "swagger"))]
290 {
291 None::<String>
292 }
293 };
294 Some(web::Data::new(crate::explorer::ExplorerState {
295 backend: backend.clone(),
296 datasets: cfg.datasets.clone(),
297 explorer_base: explorer_cfg.path.clone(),
298 api_base: format!("{prefix}/api/v1"),
299 backend_label: label.to_string(),
300 sql_enabled: cfg.sql.enabled,
301 docs_url,
302 swagger_url,
303 }))
304 } else {
305 None
306 };
307
308 let mut server = HttpServer::new(move || {
309 let backend = backend.clone();
310 let prefix = prefix.clone();
311 let json_cfg = web::JsonConfig::default().limit(max_body);
312 let pay_cfg = web::PayloadConfig::default().limit(max_body);
313 let query_limits = handlers::QueryLimits { max_page_size };
314 let timeout = Timeout::new(Duration::from_millis(timeout_ms.max(1)));
315 #[cfg(feature = "docs")]
316 let docs_cfg = docs_cfg.clone();
317 #[cfg(feature = "explorer")]
318 let explorer_state = explorer_state.clone();
319 #[cfg(feature = "swagger")]
320 let swagger_cfg = swagger_cfg.clone();
321 #[cfg(feature = "swagger")]
322 let swagger_oauth2 = swagger_oauth2.clone();
323 #[cfg(feature = "auth")]
324 let auth_state = auth_state.clone();
325 #[cfg(feature = "metrics")]
326 let prometheus = prometheus.clone();
327 let app = App::new()
328 .app_data(web::Data::new(backend))
329 .app_data(build_info.clone())
330 .app_data(web::Data::new(query_limits))
331 .app_data(web::Data::new(sql_settings))
332 .app_data(parquet_cache.clone())
333 .app_data(json_cfg)
334 .app_data(pay_cfg)
335 .wrap(middleware::Condition::new(timeout_ms > 0, timeout))
336 .wrap(middleware::Condition::new(
337 compress,
338 middleware::Compress::default(),
339 ))
340 .wrap(middleware::Logger::new("%a \"%r\" %s %b bytes %Dms"));
341 #[cfg(feature = "auth")]
347 let app = match auth_state.clone() {
348 Some(state) => app
349 .app_data(web::Data::new(state.cfg.clone()))
350 .wrap(crate::auth::Auth::new(state)),
351 None => app.wrap(crate::auth::Auth::disabled()),
352 };
353 #[cfg(feature = "metrics")]
359 let app = app.wrap(middleware::Condition::new(metrics_enabled, prometheus));
360 let app = app
361 .service(handlers::healthz)
362 .service(handlers::readyz)
363 .service(handlers::version);
364 #[cfg(feature = "docs")]
371 let app = if docs_cfg.enabled {
372 app.configure(|c| crate::docs::configure(&docs_cfg.path, c))
373 } else {
374 app
375 };
376 #[cfg(feature = "swagger")]
377 let app = if swagger_cfg.enabled {
378 app.configure(|c| {
379 crate::swagger::configure(&swagger_cfg.path, swagger_oauth2.as_ref(), c)
380 })
381 } else {
382 app
383 };
384 #[cfg(feature = "explorer")]
387 let app = match explorer_state {
388 Some(state) => app.configure(|c| crate::explorer::configure(state, c)),
389 None => app,
390 };
391 app.service(
392 web::scope(prefix.as_str())
393 .service(handlers::health)
394 .service(web::scope("/api/v1").configure(handlers::v1::configure))
396 .service(web::scope("/api").configure(handlers::v1::configure)),
401 )
402 });
403 if let Some(w) = workers {
404 server = server.workers(w);
405 }
406
407 let running = server
411 .bind(addr)?
412 .shutdown_timeout(shutdown_secs)
413 .disable_signals()
414 .run();
415 let handle = running.handle();
416 tokio::spawn(shutdown_listener(handle, shutdown_secs, shutdown));
417
418 running.await
419}
420
421async fn shutdown_listener(
424 handle: actix_web::dev::ServerHandle,
425 grace_secs: u64,
426 shutdown: Shutdown,
427) {
428 match shutdown {
429 Shutdown::Signals => {
430 let which = wait_for_signal().await;
431 log::info!(
432 "Received {which}, shutting down gracefully (up to {grace_secs}s for in-flight requests)..."
433 );
434 }
435 Shutdown::External(fut) => {
436 fut.await;
437 log::info!(
438 "Shutdown requested by host, draining in-flight requests (up to {grace_secs}s)..."
439 );
440 }
441 }
442 handle.stop(true).await;
443 log::info!("Shutdown complete.");
444}
445
446#[cfg(unix)]
447async fn wait_for_signal() -> &'static str {
448 use tokio::signal::unix::{SignalKind, signal};
449 let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
452 let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
453 tokio::select! {
454 _ = sigterm.recv() => "SIGTERM",
455 _ = sigint.recv() => "SIGINT",
456 }
457}
458
459#[cfg(not(unix))]
460async fn wait_for_signal() -> &'static str {
461 let _ = tokio::signal::ctrl_c().await;
463 "Ctrl+C"
464}
465
466fn log_routes(prefix: &str, backend: &dyn Backend) {
471 const METHOD_W: usize = 6;
474
475 let p = prefix; log::info!("Routes:");
478 log::info!(" general:");
479 for (method, path) in [
480 ("GET", "/healthz".to_string()),
481 ("GET", "/readyz".to_string()),
482 ("GET", "/version".to_string()),
483 ("GET", format!("{p}/health")),
484 ] {
485 log::info!(" {:<width$} {}", method, path, width = METHOD_W);
486 }
487
488 let mounts: &[(&str, &[(&str, &str)])] = &[
491 ("/api/v1", handlers::v1::ROUTES),
492 ("/api", handlers::v1::ROUTES), ];
494
495 let names = backend.names();
496 for (mount, routes) in mounts {
497 log::info!(" {p}{mount}:");
498 for (method, suffix) in *routes {
500 if !suffix.contains("{name}") {
501 log::info!(
502 " {:<width$} {p}{mount}{suffix}",
503 method,
504 width = METHOD_W,
505 );
506 }
507 }
508 if names.is_empty() {
509 log::info!(" (no datasets registered)");
510 continue;
511 }
512 for name in &names {
513 for (method, suffix) in *routes {
514 if let Some(rest) = suffix.strip_prefix("/datasets/{name}") {
515 log::info!(
516 " {:<width$} {p}{mount}/datasets/{name}{rest}",
517 method,
518 width = METHOD_W,
519 );
520 }
521 }
522 }
523 }
524}