1use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse};
25use axum::routing::get;
26
27use crate::core::engine::{
28 DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
29};
30use crate::core::plugins::{
31 build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
32};
33use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
34use crate::realtime::gateway::GatewayDescriptor;
35use crate::realtime::{ws_route, ConnectionRegistry};
36use crate::web::boundary::adapt;
37
38#[derive(Clone, Debug)]
49#[non_exhaustive]
50pub struct LaunchConfig {
51 pub drain_budget: Duration,
56 pub request_timeout: Duration,
60 pub max_in_flight: usize,
64 pub max_body_bytes: usize,
66 pub cache_max_entries: usize,
69 pub cache_sweep_interval: Duration,
71 pub ws_drain_deadline: Duration,
76 pub cors: Option<crate::web::cors::CorsConfig>,
78 pub shutdown_trigger: Option<std::sync::Arc<tokio::sync::Notify>>,
83 pub expose_docs: bool,
86 pub ws_outbound_buffer: usize,
89 pub ws_max_connections: usize,
93 pub ws_ping_interval: Duration,
96 pub adaptive_shed_target: Duration,
101 pub ws_idle_timeout: Duration,
105}
106
107impl Default for LaunchConfig {
108 fn default() -> Self {
109 Self {
110 drain_budget: Duration::from_secs(5),
111 request_timeout: Duration::from_secs(30),
112 max_in_flight: 0,
113 max_body_bytes: 8 * 1024 * 1024,
114 cache_max_entries: 10_000,
115 cache_sweep_interval: Duration::from_secs(30),
116 ws_drain_deadline: Duration::from_secs(10),
117 adaptive_shed_target: Duration::ZERO,
118 cors: None,
119 shutdown_trigger: None,
120 expose_docs: true,
121 ws_outbound_buffer: 256,
122 ws_max_connections: 0,
123 ws_ping_interval: Duration::from_secs(20),
124 ws_idle_timeout: Duration::from_secs(60),
125 }
126 }
127}
128
129impl LaunchConfig {
130 pub fn drain_budget(mut self, v: Duration) -> Self {
131 self.drain_budget = v;
132 self
133 }
134 pub fn request_timeout(mut self, v: Duration) -> Self {
135 self.request_timeout = v;
136 self
137 }
138 pub fn max_in_flight(mut self, v: usize) -> Self {
139 self.max_in_flight = v;
140 self
141 }
142 pub fn max_body_bytes(mut self, v: usize) -> Self {
143 self.max_body_bytes = v;
144 self
145 }
146 pub fn cache_max_entries(mut self, v: usize) -> Self {
147 self.cache_max_entries = v;
148 self
149 }
150 pub fn cache_sweep_interval(mut self, v: Duration) -> Self {
151 self.cache_sweep_interval = v;
152 self
153 }
154 pub fn ws_drain_deadline(mut self, v: Duration) -> Self {
155 self.ws_drain_deadline = v;
156 self
157 }
158 pub fn adaptive_shed_target(mut self, v: Duration) -> Self {
159 self.adaptive_shed_target = v;
160 self
161 }
162 pub fn cors(mut self, v: crate::web::cors::CorsConfig) -> Self {
163 self.cors = Some(v);
164 self
165 }
166 pub fn shutdown_trigger(mut self, v: std::sync::Arc<tokio::sync::Notify>) -> Self {
167 self.shutdown_trigger = Some(v);
168 self
169 }
170 pub fn expose_docs(mut self, v: bool) -> Self {
171 self.expose_docs = v;
172 self
173 }
174 pub fn ws_outbound_buffer(mut self, v: usize) -> Self {
175 self.ws_outbound_buffer = v;
176 self
177 }
178 pub fn ws_max_connections(mut self, v: usize) -> Self {
179 self.ws_max_connections = v;
180 self
181 }
182 pub fn ws_ping_interval(mut self, v: Duration) -> Self {
183 self.ws_ping_interval = v;
184 self
185 }
186 pub fn ws_idle_timeout(mut self, v: Duration) -> Self {
187 self.ws_idle_timeout = v;
188 self
189 }
190
191 pub fn with_env_overrides(self) -> Self {
208 self.apply_overrides(|k| std::env::var(k).ok())
209 }
210
211 pub(crate) fn apply_overrides(mut self, get: impl Fn(&str) -> Option<String>) -> Self {
213 fn parse<T: std::str::FromStr>(key: &str, raw: String) -> Option<T> {
214 match raw.parse() {
215 Ok(v) => Some(v),
216 Err(_) => {
217 tracing::warn!(key, value = raw, "ignoring unparseable ARCLY_* override");
218 None
219 }
220 }
221 }
222 if let Some(v) =
223 get("ARCLY_REQUEST_TIMEOUT_MS").and_then(|r| parse("ARCLY_REQUEST_TIMEOUT_MS", r))
224 {
225 self.request_timeout = Duration::from_millis(v);
226 }
227 if let Some(v) = get("ARCLY_MAX_IN_FLIGHT").and_then(|r| parse("ARCLY_MAX_IN_FLIGHT", r)) {
228 self.max_in_flight = v;
229 }
230 if let Some(v) = get("ARCLY_MAX_BODY_BYTES").and_then(|r| parse("ARCLY_MAX_BODY_BYTES", r))
231 {
232 self.max_body_bytes = v;
233 }
234 if let Some(v) =
235 get("ARCLY_CACHE_MAX_ENTRIES").and_then(|r| parse("ARCLY_CACHE_MAX_ENTRIES", r))
236 {
237 self.cache_max_entries = v;
238 }
239 if let Some(v) =
240 get("ARCLY_WS_DRAIN_DEADLINE_MS").and_then(|r| parse("ARCLY_WS_DRAIN_DEADLINE_MS", r))
241 {
242 self.ws_drain_deadline = Duration::from_millis(v);
243 }
244 if let Some(v) =
245 get("ARCLY_DRAIN_BUDGET_MS").and_then(|r| parse("ARCLY_DRAIN_BUDGET_MS", r))
246 {
247 self.drain_budget = Duration::from_millis(v);
248 }
249 if let Some(v) =
250 get("ARCLY_WS_OUTBOUND_BUFFER").and_then(|r| parse("ARCLY_WS_OUTBOUND_BUFFER", r))
251 {
252 self.ws_outbound_buffer = v;
253 }
254 if let Some(v) =
255 get("ARCLY_WS_MAX_CONNECTIONS").and_then(|r| parse("ARCLY_WS_MAX_CONNECTIONS", r))
256 {
257 self.ws_max_connections = v;
258 }
259 if let Some(v) =
260 get("ARCLY_WS_PING_INTERVAL_MS").and_then(|r| parse("ARCLY_WS_PING_INTERVAL_MS", r))
261 {
262 self.ws_ping_interval = Duration::from_millis(v);
263 }
264 if let Some(v) =
265 get("ARCLY_WS_IDLE_TIMEOUT_MS").and_then(|r| parse("ARCLY_WS_IDLE_TIMEOUT_MS", r))
266 {
267 self.ws_idle_timeout = Duration::from_millis(v);
268 }
269 if let Some(v) = get("ARCLY_ADAPTIVE_SHED_TARGET_MS")
270 .and_then(|r| parse("ARCLY_ADAPTIVE_SHED_TARGET_MS", r))
271 {
272 self.adaptive_shed_target = Duration::from_millis(v);
273 }
274 if let Some(raw) = get("ARCLY_EXPOSE_DOCS") {
275 match raw.as_str() {
276 "true" | "1" => self.expose_docs = true,
277 "false" | "0" => self.expose_docs = false,
278 _ => tracing::warn!(value = raw, "ignoring unparseable ARCLY_EXPOSE_DOCS"),
279 }
280 }
281 self
282 }
283}
284
285pub struct App;
286
287impl App {
288 pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
289 let info = OpenApiInfo::new("arcly-http service", env!("CARGO_PKG_VERSION"));
290 Self::launch_with_info::<RootMod>(addr, info).await
291 }
292
293 pub async fn launch_named<RootMod: Module>(
294 addr: &str,
295 title: &'static str,
296 version: &'static str,
297 ) -> std::io::Result<()> {
298 let info = OpenApiInfo::new(title, version);
299 Self::launch_with_info::<RootMod>(addr, info).await
300 }
301
302 pub async fn launch_with_info<RootMod: Module>(
303 addr: &str,
304 info: OpenApiInfo,
305 ) -> std::io::Result<()> {
306 Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
307 }
308
309 pub async fn launch_with_plugins<RootMod: Module>(
312 addr: &str,
313 info: OpenApiInfo,
314 plugins: Vec<Box<dyn ArclyPlugin>>,
315 ) -> std::io::Result<()> {
316 Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
317 }
318
319 pub async fn launch_configured<RootMod: Module>(
321 addr: &str,
322 info: OpenApiInfo,
323 plugins: Vec<Box<dyn ArclyPlugin>>,
324 config: LaunchConfig,
325 ) -> std::io::Result<()> {
326 let listener = tokio::net::TcpListener::bind(addr).await?;
327 Self::launch_on_listener::<RootMod>(listener, info, plugins, config).await
328 }
329
330 pub async fn launch_on_listener<RootMod: Module>(
336 listener: tokio::net::TcpListener,
337 info: OpenApiInfo,
338 mut plugins: Vec<Box<dyn ArclyPlugin>>,
339 config: LaunchConfig,
340 ) -> std::io::Result<()> {
341 let _root: PhantomData<RootMod> = PhantomData;
342 let config = config.with_env_overrides();
344 tracing::info!(
345 request_timeout = ?config.request_timeout,
346 max_in_flight = config.max_in_flight,
347 max_body_bytes = config.max_body_bytes,
348 expose_docs = config.expose_docs,
349 "launch config (effective)"
350 );
351
352 let reachable_modules = collect_reachable_modules(RootMod::descriptor());
354 let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
355 .iter()
356 .flat_map(|m| m.controllers.iter().copied())
357 .collect();
358
359 let mut b = DiContainerBuilder::new();
361 for m in &reachable_modules {
362 for p in m.providers {
363 b.add_provider(p);
364 }
365 }
366
367 let mut plugin_ctx = ArclyPluginContext::new();
369 for p in plugins.iter_mut() {
370 plugin_ctx.current_plugin = p.name();
371 if let Err(e) = p.on_init(&mut plugin_ctx).await {
372 return Err(plugin_io_err(e));
373 }
374 }
375
376 for f in plugin_ctx.pending_providers.drain(..) {
378 f(&mut b);
379 }
380
381 b.register(crate::web::dynamic::DynamicRouteTable::new());
385 b.register(crate::web::boundary::BodyLimit(config.max_body_bytes));
388 let container = b.freeze();
389
390 let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
392 for mutator in plugin_ctx.openapi_mutators.drain(..) {
393 mutator(&mut spec_value);
394 }
395 let spec_bytes: &'static [u8] = Box::leak(
398 serde_json::to_vec(&spec_value)
399 .unwrap_or_else(|e| {
400 panic!("Arcly: OpenAPI spec serialization failed: {e}")
403 })
404 .into_boxed_slice(),
405 );
406
407 let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
411 Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
412 let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
414 Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());
415
416 let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
417 axum::Router::new();
418 let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
419 std::collections::HashSet::new();
420 mounted.insert(("/openapi.json", HttpMethod::GET));
422 mounted.insert(("/docs", HttpMethod::GET));
423 for rt in inventory::iter::<&'static RouteDescriptor> {
424 if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
427 continue;
428 }
429 mounted.insert((rt.path, rt.method));
430 router = router.route(axum_path_static(rt.path), adapt(rt, globals, filters));
431 if rt.path.len() > 1 && rt.path.ends_with('/') {
435 let trimmed: &'static str =
436 Box::leak(rt.path[..rt.path.len() - 1].to_owned().into_boxed_str());
437 if mounted.insert((trimmed, rt.method)) {
438 router = router.route(axum_path_static(trimmed), adapt(rt, globals, filters));
439 }
440 }
441 }
442 let mut app = router.with_state(container);
443 for r in &plugin_ctx.extra_routes {
444 if !mounted.insert((r.path, r.method)) {
447 return Err(plugin_io_err(PluginError::new(
448 r.plugin,
449 PluginStage::Init,
450 format!(
451 "route `{:?} {}` is already mounted by another route or plugin",
452 r.method, r.path
453 ),
454 )));
455 }
456 app = app.route(
457 axum_path_static(r.path),
458 build_plugin_route(container, r, globals, filters),
459 );
460 }
461
462 container
466 .get::<crate::web::dynamic::DynamicRouteTable>()
467 .set_globals(globals);
468 app = app.route(
469 "/_plugins/{*rest}",
470 crate::web::dynamic::dynamic_dispatch_route(container, filters),
471 );
472
473 let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
477 .iter()
478 .flat_map(|m| m.gateways.iter().copied())
479 .collect();
480 let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
481 let ws_tuning = crate::realtime::ws::WsTuning {
482 outbound_buffer: config.ws_outbound_buffer,
483 max_connections: config.ws_max_connections,
484 ping_interval: config.ws_ping_interval,
485 };
486 if !config.ws_idle_timeout.is_zero() {
490 let idle = config.ws_idle_timeout;
491 tokio::spawn(async move {
492 let mut tick = tokio::time::interval(idle / 2);
493 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
494 loop {
495 tick.tick().await;
496 let reaped = registry.sweep_idle(idle.as_secs());
497 if !reaped.is_empty() {
498 tracing::info!(count = reaped.len(), "reaped idle WebSocket connections");
499 }
500 }
501 });
502 }
503 for gd in inventory::iter::<&'static GatewayDescriptor> {
504 if !allowed_gateways.contains(gd.name) {
505 continue;
506 }
507 let runtime = (gd.build)(container);
508 app = app.route(
509 axum_path_static(gd.path),
510 ws_route(runtime, registry, container, ws_tuning),
511 );
512 }
513
514 if config.expose_docs {
518 app = app
519 .route(
520 "/openapi.json",
521 get(move || async move {
522 (
523 [(axum::http::header::CONTENT_TYPE, "application/json")],
524 spec_bytes,
525 )
526 .into_response()
527 }),
528 )
529 .route(
530 "/docs",
531 get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
532 );
533 }
534 let mut app = app.layer(axum::middleware::from_fn(
535 crate::security::apply_security_headers,
536 ));
537
538 if let Some(cors_cfg) = config.cors.clone() {
543 let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
544 app = app.layer(axum::middleware::from_fn(
545 move |req: axum::extract::Request, next: axum::middleware::Next| {
546 crate::web::cors::apply_cors(cors_cfg, req, next)
547 },
548 ));
549 }
550 let gov = crate::web::governor::Governor::new(
551 config.request_timeout,
552 config.max_in_flight,
553 config.adaptive_shed_target,
554 );
555 let app = app.layer(axum::middleware::from_fn(
556 move |req: axum::extract::Request, next: axum::middleware::Next| {
557 crate::web::governor::govern(Arc::clone(&gov), req, next)
558 },
559 ));
560
561 crate::web::cache::set_capacity(config.cache_max_entries);
563 crate::web::cache::spawn_sweeper(config.cache_sweep_interval);
564
565 let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
576 let mut started = 0usize;
577 #[allow(clippy::explicit_counter_loop)] for p in plugins_arc.iter() {
579 if let Err(e) = p.on_start(container).await {
580 for already in plugins_arc[..started].iter().rev() {
581 drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
582 .await;
583 }
584 return Err(plugin_io_err(e));
585 }
586 started += 1;
587 }
588
589 let plugins_for_draining = Arc::clone(&plugins_arc);
595 let draining_budget = config.drain_budget;
596 let ws_deadline = config.ws_drain_deadline;
597 let trigger = config.shutdown_trigger.clone();
598 let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
599 match trigger {
600 Some(n) => {
601 tokio::select! {
602 _ = shutdown_signal() => {}
603 _ = n.notified() => {}
604 }
605 }
606 None => shutdown_signal().await,
607 }
608 crate::observability::health::set_draining(true);
611 tracing::info!("shutdown signal received — HTTP draining");
612 tokio::spawn(async move {
615 tokio::time::sleep(ws_deadline).await;
616 let closed = registry.close_all();
617 if closed > 0 {
618 tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
619 }
620 });
621 tokio::spawn(async move {
626 for p in plugins_for_draining.iter() {
627 match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
628 Ok(Ok(())) => {}
629 Ok(Err(e)) => {
630 tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
631 }
632 Err(_) => tracing::warn!(
633 plugin = p.name(),
634 budget = ?draining_budget,
635 "plugin on_draining exceeded budget"
636 ),
637 }
638 }
639 });
640 });
641 let serve_res = serve.await;
642
643 tracing::info!(
645 budget = ?config.drain_budget,
646 "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
647 );
648 for p in plugins_arc.iter().rev() {
649 drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
650 }
651 serve_res
652 }
653}
654
655fn axum_path(path: &str) -> String {
664 path.split('/')
665 .map(|seg| {
666 if let Some(name) = seg.strip_prefix(':') {
667 format!("{{{name}}}")
668 } else if let Some(name) = seg.strip_prefix('*') {
669 format!("{{*{name}}}")
670 } else {
671 seg.to_owned()
672 }
673 })
674 .collect::<Vec<_>>()
675 .join("/")
676}
677
678fn axum_path_static(path: &str) -> &'static str {
679 Box::leak(axum_path(path).into_boxed_str())
680}
681
682fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
683 use std::collections::HashSet;
684 let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
685 let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
686 std::collections::VecDeque::new();
687 let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
688 queue.push_back(root);
689 while let Some(m) = queue.pop_front() {
690 if !visited.insert(m as *const _) {
691 continue;
692 }
693 order.push(m);
694 for getter in m.imports {
695 queue.push_back(getter());
696 }
697 }
698 order
699}
700
701#[cfg(unix)]
706async fn shutdown_signal() {
707 use tokio::signal::unix::{signal, SignalKind};
708 match signal(SignalKind::terminate()) {
709 Ok(mut sigterm) => {
710 tokio::select! {
711 _ = tokio::signal::ctrl_c() => {}
712 _ = sigterm.recv() => {}
713 }
714 }
715 Err(e) => {
716 tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
717 let _ = tokio::signal::ctrl_c().await;
718 }
719 }
720}
721
722#[cfg(not(unix))]
723async fn shutdown_signal() {
724 let _ = tokio::signal::ctrl_c().await;
725}
726
727async fn drain_plugin(
731 p: &dyn ArclyPlugin,
732 container: &'static crate::core::engine::FrozenDiContainer,
733 phase: &str,
734 budget: Duration,
735) {
736 match tokio::time::timeout(budget, p.on_shutdown(container)).await {
737 Ok(Ok(())) => {}
738 Ok(Err(e)) => {
739 tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
740 }
741 Err(_) => tracing::warn!(
742 plugin = p.name(),
743 phase,
744 budget = ?budget,
745 "plugin shutdown exceeded budget — skipped"
746 ),
747 }
748}
749
750fn plugin_io_err(e: PluginError) -> std::io::Error {
751 let kind = match e.stage {
752 PluginStage::Init => std::io::ErrorKind::InvalidInput,
753 PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
754 PluginStage::Shutdown => std::io::ErrorKind::Other,
755 };
756 std::io::Error::new(kind, e)
757}
758
759#[cfg(test)]
760mod tests {
761 use super::*;
762
763 #[test]
764 fn env_overrides_apply_and_ignore_garbage() {
765 let cfg = LaunchConfig::default().apply_overrides(|k| match k {
766 "ARCLY_REQUEST_TIMEOUT_MS" => Some("1500".into()),
767 "ARCLY_MAX_IN_FLIGHT" => Some("notanumber".into()), "ARCLY_MAX_BODY_BYTES" => Some("1024".into()),
769 "ARCLY_EXPOSE_DOCS" => Some("false".into()),
770 _ => None,
771 });
772 assert_eq!(cfg.request_timeout, Duration::from_millis(1500));
773 assert_eq!(cfg.max_in_flight, 0, "unparseable override is ignored");
774 assert_eq!(cfg.max_body_bytes, 1024);
775 assert!(!cfg.expose_docs);
776 assert_eq!(cfg.drain_budget, Duration::from_secs(5));
778 }
779}