1use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse};
25use axum::routing::get;
26
27use crate::core::engine::{
28 DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
29};
30use crate::core::plugins::{
31 build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
32};
33use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
34use crate::realtime::gateway::GatewayDescriptor;
35use crate::realtime::{ws_route, ConnectionRegistry};
36use crate::web::boundary::adapt;
37
38#[derive(Clone, Debug)]
49#[non_exhaustive]
50pub struct LaunchConfig {
51 pub drain_budget: Duration,
56 pub request_timeout: Duration,
60 pub max_in_flight: usize,
64 pub max_body_bytes: usize,
66 pub cache_max_entries: usize,
69 pub cache_sweep_interval: Duration,
71 pub ws_drain_deadline: Duration,
76 pub cors: Option<crate::web::cors::CorsConfig>,
78 pub expose_docs: bool,
81 pub ws_outbound_buffer: usize,
84 pub ws_max_connections: usize,
88 pub ws_ping_interval: Duration,
91 pub adaptive_shed_target: Duration,
96 pub ws_idle_timeout: Duration,
100}
101
102impl Default for LaunchConfig {
103 fn default() -> Self {
104 Self {
105 drain_budget: Duration::from_secs(5),
106 request_timeout: Duration::from_secs(30),
107 max_in_flight: 0,
108 max_body_bytes: 8 * 1024 * 1024,
109 cache_max_entries: 10_000,
110 cache_sweep_interval: Duration::from_secs(30),
111 ws_drain_deadline: Duration::from_secs(10),
112 adaptive_shed_target: Duration::ZERO,
113 cors: None,
114 expose_docs: true,
115 ws_outbound_buffer: 256,
116 ws_max_connections: 0,
117 ws_ping_interval: Duration::from_secs(20),
118 ws_idle_timeout: Duration::from_secs(60),
119 }
120 }
121}
122
123impl LaunchConfig {
124 pub fn drain_budget(mut self, v: Duration) -> Self {
125 self.drain_budget = v;
126 self
127 }
128 pub fn request_timeout(mut self, v: Duration) -> Self {
129 self.request_timeout = v;
130 self
131 }
132 pub fn max_in_flight(mut self, v: usize) -> Self {
133 self.max_in_flight = v;
134 self
135 }
136 pub fn max_body_bytes(mut self, v: usize) -> Self {
137 self.max_body_bytes = v;
138 self
139 }
140 pub fn cache_max_entries(mut self, v: usize) -> Self {
141 self.cache_max_entries = v;
142 self
143 }
144 pub fn cache_sweep_interval(mut self, v: Duration) -> Self {
145 self.cache_sweep_interval = v;
146 self
147 }
148 pub fn ws_drain_deadline(mut self, v: Duration) -> Self {
149 self.ws_drain_deadline = v;
150 self
151 }
152 pub fn adaptive_shed_target(mut self, v: Duration) -> Self {
153 self.adaptive_shed_target = v;
154 self
155 }
156 pub fn cors(mut self, v: crate::web::cors::CorsConfig) -> Self {
157 self.cors = Some(v);
158 self
159 }
160 pub fn expose_docs(mut self, v: bool) -> Self {
161 self.expose_docs = v;
162 self
163 }
164 pub fn ws_outbound_buffer(mut self, v: usize) -> Self {
165 self.ws_outbound_buffer = v;
166 self
167 }
168 pub fn ws_max_connections(mut self, v: usize) -> Self {
169 self.ws_max_connections = v;
170 self
171 }
172 pub fn ws_ping_interval(mut self, v: Duration) -> Self {
173 self.ws_ping_interval = v;
174 self
175 }
176 pub fn ws_idle_timeout(mut self, v: Duration) -> Self {
177 self.ws_idle_timeout = v;
178 self
179 }
180
181 pub fn with_env_overrides(self) -> Self {
198 self.apply_overrides(|k| std::env::var(k).ok())
199 }
200
201 pub(crate) fn apply_overrides(mut self, get: impl Fn(&str) -> Option<String>) -> Self {
203 fn parse<T: std::str::FromStr>(key: &str, raw: String) -> Option<T> {
204 match raw.parse() {
205 Ok(v) => Some(v),
206 Err(_) => {
207 tracing::warn!(key, value = raw, "ignoring unparseable ARCLY_* override");
208 None
209 }
210 }
211 }
212 if let Some(v) =
213 get("ARCLY_REQUEST_TIMEOUT_MS").and_then(|r| parse("ARCLY_REQUEST_TIMEOUT_MS", r))
214 {
215 self.request_timeout = Duration::from_millis(v);
216 }
217 if let Some(v) = get("ARCLY_MAX_IN_FLIGHT").and_then(|r| parse("ARCLY_MAX_IN_FLIGHT", r)) {
218 self.max_in_flight = v;
219 }
220 if let Some(v) = get("ARCLY_MAX_BODY_BYTES").and_then(|r| parse("ARCLY_MAX_BODY_BYTES", r))
221 {
222 self.max_body_bytes = v;
223 }
224 if let Some(v) =
225 get("ARCLY_CACHE_MAX_ENTRIES").and_then(|r| parse("ARCLY_CACHE_MAX_ENTRIES", r))
226 {
227 self.cache_max_entries = v;
228 }
229 if let Some(v) =
230 get("ARCLY_WS_DRAIN_DEADLINE_MS").and_then(|r| parse("ARCLY_WS_DRAIN_DEADLINE_MS", r))
231 {
232 self.ws_drain_deadline = Duration::from_millis(v);
233 }
234 if let Some(v) =
235 get("ARCLY_DRAIN_BUDGET_MS").and_then(|r| parse("ARCLY_DRAIN_BUDGET_MS", r))
236 {
237 self.drain_budget = Duration::from_millis(v);
238 }
239 if let Some(v) =
240 get("ARCLY_WS_OUTBOUND_BUFFER").and_then(|r| parse("ARCLY_WS_OUTBOUND_BUFFER", r))
241 {
242 self.ws_outbound_buffer = v;
243 }
244 if let Some(v) =
245 get("ARCLY_WS_MAX_CONNECTIONS").and_then(|r| parse("ARCLY_WS_MAX_CONNECTIONS", r))
246 {
247 self.ws_max_connections = v;
248 }
249 if let Some(v) =
250 get("ARCLY_WS_PING_INTERVAL_MS").and_then(|r| parse("ARCLY_WS_PING_INTERVAL_MS", r))
251 {
252 self.ws_ping_interval = Duration::from_millis(v);
253 }
254 if let Some(v) =
255 get("ARCLY_WS_IDLE_TIMEOUT_MS").and_then(|r| parse("ARCLY_WS_IDLE_TIMEOUT_MS", r))
256 {
257 self.ws_idle_timeout = Duration::from_millis(v);
258 }
259 if let Some(v) = get("ARCLY_ADAPTIVE_SHED_TARGET_MS")
260 .and_then(|r| parse("ARCLY_ADAPTIVE_SHED_TARGET_MS", r))
261 {
262 self.adaptive_shed_target = Duration::from_millis(v);
263 }
264 if let Some(raw) = get("ARCLY_EXPOSE_DOCS") {
265 match raw.as_str() {
266 "true" | "1" => self.expose_docs = true,
267 "false" | "0" => self.expose_docs = false,
268 _ => tracing::warn!(value = raw, "ignoring unparseable ARCLY_EXPOSE_DOCS"),
269 }
270 }
271 self
272 }
273}
274
275pub struct App;
276
277impl App {
278 pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
279 let info = OpenApiInfo::new("arcly-http service", env!("CARGO_PKG_VERSION"));
280 Self::launch_with_info::<RootMod>(addr, info).await
281 }
282
283 pub async fn launch_named<RootMod: Module>(
284 addr: &str,
285 title: &'static str,
286 version: &'static str,
287 ) -> std::io::Result<()> {
288 let info = OpenApiInfo::new(title, version);
289 Self::launch_with_info::<RootMod>(addr, info).await
290 }
291
292 pub async fn launch_with_info<RootMod: Module>(
293 addr: &str,
294 info: OpenApiInfo,
295 ) -> std::io::Result<()> {
296 Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
297 }
298
299 pub async fn launch_with_plugins<RootMod: Module>(
302 addr: &str,
303 info: OpenApiInfo,
304 plugins: Vec<Box<dyn ArclyPlugin>>,
305 ) -> std::io::Result<()> {
306 Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
307 }
308
309 pub async fn launch_configured<RootMod: Module>(
311 addr: &str,
312 info: OpenApiInfo,
313 mut plugins: Vec<Box<dyn ArclyPlugin>>,
314 config: LaunchConfig,
315 ) -> std::io::Result<()> {
316 let _root: PhantomData<RootMod> = PhantomData;
317 let config = config.with_env_overrides();
319 tracing::info!(
320 request_timeout = ?config.request_timeout,
321 max_in_flight = config.max_in_flight,
322 max_body_bytes = config.max_body_bytes,
323 expose_docs = config.expose_docs,
324 "launch config (effective)"
325 );
326
327 let reachable_modules = collect_reachable_modules(RootMod::descriptor());
329 let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
330 .iter()
331 .flat_map(|m| m.controllers.iter().copied())
332 .collect();
333
334 let mut b = DiContainerBuilder::new();
336 for m in &reachable_modules {
337 for p in m.providers {
338 b.add_provider(p);
339 }
340 }
341
342 let mut plugin_ctx = ArclyPluginContext::new();
344 for p in plugins.iter_mut() {
345 plugin_ctx.current_plugin = p.name();
346 if let Err(e) = p.on_init(&mut plugin_ctx).await {
347 return Err(plugin_io_err(e));
348 }
349 }
350
351 for f in plugin_ctx.pending_providers.drain(..) {
353 f(&mut b);
354 }
355
356 b.register(crate::web::dynamic::DynamicRouteTable::new());
360 b.register(crate::web::boundary::BodyLimit(config.max_body_bytes));
363 let container = b.freeze();
364
365 let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
367 for mutator in plugin_ctx.openapi_mutators.drain(..) {
368 mutator(&mut spec_value);
369 }
370 let spec_bytes: &'static [u8] = Box::leak(
373 serde_json::to_vec(&spec_value)
374 .unwrap_or_else(|e| {
375 panic!("Arcly: OpenAPI spec serialization failed: {e}")
378 })
379 .into_boxed_slice(),
380 );
381
382 let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
386 Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
387 let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
389 Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());
390
391 let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
392 axum::Router::new();
393 let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
394 std::collections::HashSet::new();
395 mounted.insert(("/openapi.json", HttpMethod::GET));
397 mounted.insert(("/docs", HttpMethod::GET));
398 for rt in inventory::iter::<&'static RouteDescriptor> {
399 if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
402 continue;
403 }
404 mounted.insert((rt.path, rt.method));
405 router = router.route(axum_path_static(rt.path), adapt(rt, globals, filters));
406 if rt.path.len() > 1 && rt.path.ends_with('/') {
410 let trimmed: &'static str =
411 Box::leak(rt.path[..rt.path.len() - 1].to_owned().into_boxed_str());
412 if mounted.insert((trimmed, rt.method)) {
413 router = router.route(axum_path_static(trimmed), adapt(rt, globals, filters));
414 }
415 }
416 }
417 let mut app = router.with_state(container);
418 for r in &plugin_ctx.extra_routes {
419 if !mounted.insert((r.path, r.method)) {
422 return Err(plugin_io_err(PluginError::new(
423 r.plugin,
424 PluginStage::Init,
425 format!(
426 "route `{:?} {}` is already mounted by another route or plugin",
427 r.method, r.path
428 ),
429 )));
430 }
431 app = app.route(
432 axum_path_static(r.path),
433 build_plugin_route(container, r, globals, filters),
434 );
435 }
436
437 container
441 .get::<crate::web::dynamic::DynamicRouteTable>()
442 .set_globals(globals);
443 app = app.route(
444 "/_plugins/{*rest}",
445 crate::web::dynamic::dynamic_dispatch_route(container, filters),
446 );
447
448 let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
452 .iter()
453 .flat_map(|m| m.gateways.iter().copied())
454 .collect();
455 let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
456 let ws_tuning = crate::realtime::ws::WsTuning {
457 outbound_buffer: config.ws_outbound_buffer,
458 max_connections: config.ws_max_connections,
459 ping_interval: config.ws_ping_interval,
460 };
461 if !config.ws_idle_timeout.is_zero() {
465 let idle = config.ws_idle_timeout;
466 tokio::spawn(async move {
467 let mut tick = tokio::time::interval(idle / 2);
468 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
469 loop {
470 tick.tick().await;
471 let reaped = registry.sweep_idle(idle.as_secs());
472 if !reaped.is_empty() {
473 tracing::info!(count = reaped.len(), "reaped idle WebSocket connections");
474 }
475 }
476 });
477 }
478 for gd in inventory::iter::<&'static GatewayDescriptor> {
479 if !allowed_gateways.contains(gd.name) {
480 continue;
481 }
482 let runtime = (gd.build)(container);
483 app = app.route(
484 axum_path_static(gd.path),
485 ws_route(runtime, registry, container, ws_tuning),
486 );
487 }
488
489 if config.expose_docs {
493 app = app
494 .route(
495 "/openapi.json",
496 get(move || async move {
497 (
498 [(axum::http::header::CONTENT_TYPE, "application/json")],
499 spec_bytes,
500 )
501 .into_response()
502 }),
503 )
504 .route(
505 "/docs",
506 get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
507 );
508 }
509 let mut app = app.layer(axum::middleware::from_fn(
510 crate::security::apply_security_headers,
511 ));
512
513 if let Some(cors_cfg) = config.cors.clone() {
518 let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
519 app = app.layer(axum::middleware::from_fn(
520 move |req: axum::extract::Request, next: axum::middleware::Next| {
521 crate::web::cors::apply_cors(cors_cfg, req, next)
522 },
523 ));
524 }
525 let gov = crate::web::governor::Governor::new(
526 config.request_timeout,
527 config.max_in_flight,
528 config.adaptive_shed_target,
529 );
530 let app = app.layer(axum::middleware::from_fn(
531 move |req: axum::extract::Request, next: axum::middleware::Next| {
532 crate::web::governor::govern(Arc::clone(&gov), req, next)
533 },
534 ));
535
536 crate::web::cache::set_capacity(config.cache_max_entries);
538 crate::web::cache::spawn_sweeper(config.cache_sweep_interval);
539
540 let listener = tokio::net::TcpListener::bind(addr).await?;
542
543 let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
551 let mut started = 0usize;
552 #[allow(clippy::explicit_counter_loop)] for p in plugins_arc.iter() {
554 if let Err(e) = p.on_start(container).await {
555 for already in plugins_arc[..started].iter().rev() {
556 drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
557 .await;
558 }
559 return Err(plugin_io_err(e));
560 }
561 started += 1;
562 }
563
564 let plugins_for_draining = Arc::clone(&plugins_arc);
570 let draining_budget = config.drain_budget;
571 let ws_deadline = config.ws_drain_deadline;
572 let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
573 shutdown_signal().await;
574 crate::observability::health::set_draining(true);
577 tracing::info!("shutdown signal received — HTTP draining");
578 tokio::spawn(async move {
581 tokio::time::sleep(ws_deadline).await;
582 let closed = registry.close_all();
583 if closed > 0 {
584 tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
585 }
586 });
587 tokio::spawn(async move {
592 for p in plugins_for_draining.iter() {
593 match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
594 Ok(Ok(())) => {}
595 Ok(Err(e)) => {
596 tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
597 }
598 Err(_) => tracing::warn!(
599 plugin = p.name(),
600 budget = ?draining_budget,
601 "plugin on_draining exceeded budget"
602 ),
603 }
604 }
605 });
606 });
607 let serve_res = serve.await;
608
609 tracing::info!(
611 budget = ?config.drain_budget,
612 "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
613 );
614 for p in plugins_arc.iter().rev() {
615 drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
616 }
617 serve_res
618 }
619}
620
621fn axum_path(path: &str) -> String {
630 path.split('/')
631 .map(|seg| {
632 if let Some(name) = seg.strip_prefix(':') {
633 format!("{{{name}}}")
634 } else if let Some(name) = seg.strip_prefix('*') {
635 format!("{{*{name}}}")
636 } else {
637 seg.to_owned()
638 }
639 })
640 .collect::<Vec<_>>()
641 .join("/")
642}
643
644fn axum_path_static(path: &str) -> &'static str {
645 Box::leak(axum_path(path).into_boxed_str())
646}
647
648fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
649 use std::collections::HashSet;
650 let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
651 let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
652 std::collections::VecDeque::new();
653 let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
654 queue.push_back(root);
655 while let Some(m) = queue.pop_front() {
656 if !visited.insert(m as *const _) {
657 continue;
658 }
659 order.push(m);
660 for getter in m.imports {
661 queue.push_back(getter());
662 }
663 }
664 order
665}
666
667#[cfg(unix)]
672async fn shutdown_signal() {
673 use tokio::signal::unix::{signal, SignalKind};
674 match signal(SignalKind::terminate()) {
675 Ok(mut sigterm) => {
676 tokio::select! {
677 _ = tokio::signal::ctrl_c() => {}
678 _ = sigterm.recv() => {}
679 }
680 }
681 Err(e) => {
682 tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
683 let _ = tokio::signal::ctrl_c().await;
684 }
685 }
686}
687
688#[cfg(not(unix))]
689async fn shutdown_signal() {
690 let _ = tokio::signal::ctrl_c().await;
691}
692
693async fn drain_plugin(
697 p: &dyn ArclyPlugin,
698 container: &'static crate::core::engine::FrozenDiContainer,
699 phase: &str,
700 budget: Duration,
701) {
702 match tokio::time::timeout(budget, p.on_shutdown(container)).await {
703 Ok(Ok(())) => {}
704 Ok(Err(e)) => {
705 tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
706 }
707 Err(_) => tracing::warn!(
708 plugin = p.name(),
709 phase,
710 budget = ?budget,
711 "plugin shutdown exceeded budget — skipped"
712 ),
713 }
714}
715
716fn plugin_io_err(e: PluginError) -> std::io::Error {
717 let kind = match e.stage {
718 PluginStage::Init => std::io::ErrorKind::InvalidInput,
719 PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
720 PluginStage::Shutdown => std::io::ErrorKind::Other,
721 };
722 std::io::Error::new(kind, e)
723}
724
725#[cfg(test)]
726mod tests {
727 use super::*;
728
729 #[test]
730 fn env_overrides_apply_and_ignore_garbage() {
731 let cfg = LaunchConfig::default().apply_overrides(|k| match k {
732 "ARCLY_REQUEST_TIMEOUT_MS" => Some("1500".into()),
733 "ARCLY_MAX_IN_FLIGHT" => Some("notanumber".into()), "ARCLY_MAX_BODY_BYTES" => Some("1024".into()),
735 "ARCLY_EXPOSE_DOCS" => Some("false".into()),
736 _ => None,
737 });
738 assert_eq!(cfg.request_timeout, Duration::from_millis(1500));
739 assert_eq!(cfg.max_in_flight, 0, "unparseable override is ignored");
740 assert_eq!(cfg.max_body_bytes, 1024);
741 assert!(!cfg.expose_docs);
742 assert_eq!(cfg.drain_budget, Duration::from_secs(5));
744 }
745}