1use std::future::Future;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::time::Duration;
8
9use actix_web::{App, HttpServer, middleware, web};
10
11use crate::backend::Backend;
12use crate::config::AppConfig;
13use crate::handlers;
14use crate::timeout::Timeout;
15
16enum Shutdown {
18 Signals,
22 External(Pin<Box<dyn Future<Output = ()> + Send>>),
27}
28
29pub async fn serve(cfg: AppConfig, backend: Arc<dyn Backend>, label: &str) -> std::io::Result<()> {
39 run_server(cfg, backend, label, Shutdown::Signals).await
40}
41
42pub async fn serve_with_shutdown(
52 cfg: AppConfig,
53 backend: Arc<dyn Backend>,
54 label: &str,
55 shutdown: impl Future<Output = ()> + Send + 'static,
56) -> std::io::Result<()> {
57 run_server(cfg, backend, label, Shutdown::External(Box::pin(shutdown))).await
58}
59
60async fn run_server(
61 cfg: AppConfig,
62 backend: Arc<dyn Backend>,
63 label: &str,
64 shutdown: Shutdown,
65) -> std::io::Result<()> {
66 let addr = (cfg.server.listen, cfg.server.port);
67 let workers = cfg.server.workers;
68 let prefix = cfg.server.prefix.clone();
69 let compress = cfg.server.compress;
70 let max_body = cfg.server.max_body_bytes;
71 let max_page_size = cfg.server.max_page_size;
72 let timeout_ms = cfg.server.request_timeout_ms;
73 let shutdown_secs = cfg.server.shutdown_timeout_secs;
74 let docs_cfg = cfg.docs.clone();
75 let swagger_cfg = cfg.swagger.clone();
76 let metrics_cfg = cfg.metrics.clone();
77
78 #[cfg(not(feature = "docs"))]
81 if docs_cfg.enabled {
82 log::warn!(
83 "[docs] enabled = true in config, but this binary was built \
84 without --features docs; skipping docs site"
85 );
86 }
87 #[cfg(not(feature = "swagger"))]
88 if swagger_cfg.enabled {
89 log::warn!(
90 "[swagger] enabled = true in config, but this binary was built \
91 without --features swagger; skipping Swagger UI"
92 );
93 }
94 #[cfg(not(feature = "auth"))]
95 if cfg.auth.enabled {
96 log::warn!(
97 "[auth] enabled = true in config, but this binary was built \
98 without --features auth; skipping OIDC enforcement"
99 );
100 }
101 #[cfg(not(feature = "metrics"))]
102 if metrics_cfg.enabled {
103 log::warn!(
104 "[metrics] enabled = true in config, but this binary was built \
105 without --features metrics; skipping Prometheus endpoint"
106 );
107 }
108
109 #[cfg(feature = "auth")]
114 let auth_state = if cfg.auth.enabled {
115 let jwks = crate::auth::JwksCache::boot(&cfg.auth)
116 .await
117 .map_err(|e| std::io::Error::other(format!("auth bootstrap failed: {e}")))?;
118 log::info!(
119 "[auth] OIDC enforcement enabled (issuer = {}, audience = {}, read_scopes = {:?}, reload_scopes = {:?})",
120 cfg.auth.issuer,
121 if cfg.auth.audience.is_empty() {
122 "<none>"
123 } else {
124 cfg.auth.audience.as_str()
125 },
126 cfg.auth.read_scopes,
127 cfg.auth.reload_scopes,
128 );
129 Some(crate::auth::AuthState {
130 cfg: Arc::new(cfg.auth.clone()),
131 jwks,
132 })
133 } else {
134 None
135 };
136
137 log::info!(
138 "Listening on http://{}:{}{} ({} backend, {} workers, compression {}, max-body {} bytes, max-page-size {}, timeout {}, shutdown grace {}s)",
139 cfg.server.listen,
140 cfg.server.port,
141 if prefix.is_empty() {
142 "".into()
143 } else {
144 format!("{prefix}/")
145 },
146 label,
147 workers
148 .map(|w| w.to_string())
149 .unwrap_or_else(|| "auto".into()),
150 if compress { "on" } else { "off" },
151 max_body,
152 max_page_size,
153 if timeout_ms == 0 {
154 "off".into()
155 } else {
156 format!("{timeout_ms} ms")
157 },
158 shutdown_secs,
159 );
160
161 log_routes(&prefix, backend.as_ref());
162
163 #[cfg(feature = "docs")]
164 if docs_cfg.enabled {
165 log::info!(" {} (mkdocs site):", docs_cfg.path);
166 log::info!(" GET {}/", docs_cfg.path);
167 log::info!(" GET {}/{{path}}", docs_cfg.path);
168 }
169
170 #[cfg(feature = "swagger")]
171 if swagger_cfg.enabled {
172 log::info!(" {} (swagger UI):", swagger_cfg.path);
173 log::info!(" GET {}/", swagger_cfg.path);
174 log::info!(" GET {}/openapi.json", swagger_cfg.path);
175 }
176
177 #[cfg(feature = "swagger")]
184 let swagger_oauth2 = if swagger_cfg.enabled {
185 match swagger_cfg.oauth2.as_ref() {
186 Some(o) => match crate::swagger::resolve_oauth2(o).await {
187 Ok(resolved) => Some(resolved),
188 Err(e) => {
189 log::warn!(
190 "[swagger.oauth2] OIDC discovery for issuer {} failed ({e}); \
191 serving docs without the Authorize button",
192 o.issuer
193 );
194 None
195 }
196 },
197 None => None,
198 }
199 } else {
200 None
201 };
202
203 #[cfg(feature = "metrics")]
208 let prometheus = {
209 use actix_web_prom::PrometheusMetricsBuilder;
210 PrometheusMetricsBuilder::new("datapress")
211 .endpoint(metrics_cfg.path.as_str())
212 .build()
213 .map_err(|e| std::io::Error::other(format!("metrics init failed: {e}")))?
214 };
215 #[cfg(feature = "metrics")]
216 let metrics_enabled = metrics_cfg.enabled;
217
218 #[cfg(feature = "metrics")]
219 if metrics_cfg.enabled {
220 log::info!(" {} (prometheus metrics):", metrics_cfg.path);
221 log::info!(" GET {}", metrics_cfg.path);
222 }
223
224 let build_info = web::Data::new(handlers::BuildInfo::new(
225 match label {
228 "DuckDB" => "DuckDB",
229 "DataFusion" => "DataFusion",
230 _ => "unknown",
231 },
232 ));
233
234 let mut server = HttpServer::new(move || {
235 let backend = backend.clone();
236 let prefix = prefix.clone();
237 let json_cfg = web::JsonConfig::default().limit(max_body);
238 let pay_cfg = web::PayloadConfig::default().limit(max_body);
239 let query_limits = handlers::QueryLimits { max_page_size };
240 let timeout = Timeout::new(Duration::from_millis(timeout_ms.max(1)));
241 #[cfg(feature = "docs")]
242 let docs_cfg = docs_cfg.clone();
243 #[cfg(feature = "swagger")]
244 let swagger_cfg = swagger_cfg.clone();
245 #[cfg(feature = "swagger")]
246 let swagger_oauth2 = swagger_oauth2.clone();
247 #[cfg(feature = "auth")]
248 let auth_state = auth_state.clone();
249 #[cfg(feature = "metrics")]
250 let prometheus = prometheus.clone();
251 let app = App::new()
252 .app_data(web::Data::new(backend))
253 .app_data(build_info.clone())
254 .app_data(web::Data::new(query_limits))
255 .app_data(json_cfg)
256 .app_data(pay_cfg)
257 .wrap(middleware::Condition::new(timeout_ms > 0, timeout))
258 .wrap(middleware::Condition::new(
259 compress,
260 middleware::Compress::default(),
261 ))
262 .wrap(middleware::Logger::new("%a \"%r\" %s %b bytes %Dms"));
263 #[cfg(feature = "auth")]
269 let app = match auth_state.clone() {
270 Some(state) => app
271 .app_data(web::Data::new(state.cfg.clone()))
272 .wrap(crate::auth::Auth::new(state)),
273 None => app.wrap(crate::auth::Auth::disabled()),
274 };
275 #[cfg(feature = "metrics")]
281 let app = app.wrap(middleware::Condition::new(metrics_enabled, prometheus));
282 let app = app
283 .service(handlers::healthz)
284 .service(handlers::readyz)
285 .service(handlers::version);
286 #[cfg(feature = "docs")]
293 let app = if docs_cfg.enabled {
294 app.configure(|c| crate::docs::configure(&docs_cfg.path, c))
295 } else {
296 app
297 };
298 #[cfg(feature = "swagger")]
299 let app = if swagger_cfg.enabled {
300 app.configure(|c| {
301 crate::swagger::configure(&swagger_cfg.path, swagger_oauth2.as_ref(), c)
302 })
303 } else {
304 app
305 };
306 app.service(
307 web::scope(prefix.as_str())
308 .service(handlers::health)
309 .service(web::scope("/api/v1").configure(handlers::v1::configure))
311 .service(web::scope("/api").configure(handlers::v1::configure)),
316 )
317 });
318 if let Some(w) = workers {
319 server = server.workers(w);
320 }
321
322 let running = server
326 .bind(addr)?
327 .shutdown_timeout(shutdown_secs)
328 .disable_signals()
329 .run();
330 let handle = running.handle();
331 tokio::spawn(shutdown_listener(handle, shutdown_secs, shutdown));
332
333 running.await
334}
335
336async fn shutdown_listener(
339 handle: actix_web::dev::ServerHandle,
340 grace_secs: u64,
341 shutdown: Shutdown,
342) {
343 match shutdown {
344 Shutdown::Signals => {
345 let which = wait_for_signal().await;
346 log::info!(
347 "Received {which}, shutting down gracefully (up to {grace_secs}s for in-flight requests)..."
348 );
349 }
350 Shutdown::External(fut) => {
351 fut.await;
352 log::info!(
353 "Shutdown requested by host, draining in-flight requests (up to {grace_secs}s)..."
354 );
355 }
356 }
357 handle.stop(true).await;
358 log::info!("Shutdown complete.");
359}
360
361#[cfg(unix)]
362async fn wait_for_signal() -> &'static str {
363 use tokio::signal::unix::{SignalKind, signal};
364 let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM handler");
367 let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT handler");
368 tokio::select! {
369 _ = sigterm.recv() => "SIGTERM",
370 _ = sigint.recv() => "SIGINT",
371 }
372}
373
374#[cfg(not(unix))]
375async fn wait_for_signal() -> &'static str {
376 let _ = tokio::signal::ctrl_c().await;
378 "Ctrl+C"
379}
380
381fn log_routes(prefix: &str, backend: &dyn Backend) {
386 const METHOD_W: usize = 6;
389
390 let p = prefix; log::info!("Routes:");
393 log::info!(" general:");
394 for (method, path) in [
395 ("GET", "/healthz".to_string()),
396 ("GET", "/readyz".to_string()),
397 ("GET", "/version".to_string()),
398 ("GET", format!("{p}/health")),
399 ] {
400 log::info!(" {:<width$} {}", method, path, width = METHOD_W);
401 }
402
403 let mounts: &[(&str, &[(&str, &str)])] = &[
406 ("/api/v1", handlers::v1::ROUTES),
407 ("/api", handlers::v1::ROUTES), ];
409
410 let names = backend.names();
411 for (mount, routes) in mounts {
412 log::info!(" {p}{mount}:");
413 for (method, suffix) in *routes {
415 if !suffix.contains("{name}") {
416 log::info!(
417 " {:<width$} {p}{mount}{suffix}",
418 method,
419 width = METHOD_W,
420 );
421 }
422 }
423 if names.is_empty() {
424 log::info!(" (no datasets registered)");
425 continue;
426 }
427 for name in &names {
428 for (method, suffix) in *routes {
429 if let Some(rest) = suffix.strip_prefix("/datasets/{name}") {
430 log::info!(
431 " {:<width$} {p}{mount}/datasets/{name}{rest}",
432 method,
433 width = METHOD_W,
434 );
435 }
436 }
437 }
438 }
439}