1use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse};
25use axum::routing::get;
26
27use crate::core::engine::{
28 DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
29};
30use crate::core::plugins::{
31 build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
32};
33use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
34use crate::realtime::gateway::GatewayDescriptor;
35use crate::realtime::{ws_route, ConnectionRegistry};
36use crate::web::boundary::adapt;
37
38#[derive(Clone, Debug)]
49#[non_exhaustive]
50pub struct LaunchConfig {
51 pub drain_budget: Duration,
56 pub request_timeout: Duration,
60 pub max_in_flight: usize,
64 pub max_body_bytes: usize,
66 pub cache_max_entries: usize,
69 pub cache_sweep_interval: Duration,
71 pub ws_drain_deadline: Duration,
76 pub cors: Option<crate::web::cors::CorsConfig>,
78 pub shutdown_trigger: Option<std::sync::Arc<tokio::sync::Notify>>,
83 pub expose_docs: bool,
86 pub ws_outbound_buffer: usize,
89 pub ws_max_connections: usize,
93 pub ws_ping_interval: Duration,
96 pub adaptive_shed_target: Duration,
101 pub ws_idle_timeout: Duration,
105}
106
107impl Default for LaunchConfig {
108 fn default() -> Self {
109 Self {
110 drain_budget: Duration::from_secs(5),
111 request_timeout: Duration::from_secs(30),
112 max_in_flight: 0,
113 max_body_bytes: 8 * 1024 * 1024,
114 cache_max_entries: 10_000,
115 cache_sweep_interval: Duration::from_secs(30),
116 ws_drain_deadline: Duration::from_secs(10),
117 adaptive_shed_target: Duration::ZERO,
118 cors: None,
119 shutdown_trigger: None,
120 expose_docs: true,
121 ws_outbound_buffer: 256,
122 ws_max_connections: 0,
123 ws_ping_interval: Duration::from_secs(20),
124 ws_idle_timeout: Duration::from_secs(60),
125 }
126 }
127}
128
129impl LaunchConfig {
130 pub fn drain_budget(mut self, v: Duration) -> Self {
131 self.drain_budget = v;
132 self
133 }
134 pub fn request_timeout(mut self, v: Duration) -> Self {
135 self.request_timeout = v;
136 self
137 }
138 pub fn max_in_flight(mut self, v: usize) -> Self {
139 self.max_in_flight = v;
140 self
141 }
142 pub fn max_body_bytes(mut self, v: usize) -> Self {
143 self.max_body_bytes = v;
144 self
145 }
146 pub fn cache_max_entries(mut self, v: usize) -> Self {
147 self.cache_max_entries = v;
148 self
149 }
150 pub fn cache_sweep_interval(mut self, v: Duration) -> Self {
151 self.cache_sweep_interval = v;
152 self
153 }
154 pub fn ws_drain_deadline(mut self, v: Duration) -> Self {
155 self.ws_drain_deadline = v;
156 self
157 }
158 pub fn adaptive_shed_target(mut self, v: Duration) -> Self {
159 self.adaptive_shed_target = v;
160 self
161 }
162 pub fn cors(mut self, v: crate::web::cors::CorsConfig) -> Self {
163 self.cors = Some(v);
164 self
165 }
166 pub fn shutdown_trigger(mut self, v: std::sync::Arc<tokio::sync::Notify>) -> Self {
167 self.shutdown_trigger = Some(v);
168 self
169 }
170 pub fn expose_docs(mut self, v: bool) -> Self {
171 self.expose_docs = v;
172 self
173 }
174 pub fn ws_outbound_buffer(mut self, v: usize) -> Self {
175 self.ws_outbound_buffer = v;
176 self
177 }
178 pub fn ws_max_connections(mut self, v: usize) -> Self {
179 self.ws_max_connections = v;
180 self
181 }
182 pub fn ws_ping_interval(mut self, v: Duration) -> Self {
183 self.ws_ping_interval = v;
184 self
185 }
186 pub fn ws_idle_timeout(mut self, v: Duration) -> Self {
187 self.ws_idle_timeout = v;
188 self
189 }
190
191 pub fn with_env_overrides(self) -> Self {
208 self.apply_overrides(|k| std::env::var(k).ok())
209 }
210
211 pub(crate) fn apply_overrides(mut self, get: impl Fn(&str) -> Option<String>) -> Self {
213 fn parse<T: std::str::FromStr>(key: &str, raw: String) -> Option<T> {
214 match raw.parse() {
215 Ok(v) => Some(v),
216 Err(_) => {
217 tracing::warn!(key, value = raw, "ignoring unparseable ARCLY_* override");
218 None
219 }
220 }
221 }
222 if let Some(v) =
223 get("ARCLY_REQUEST_TIMEOUT_MS").and_then(|r| parse("ARCLY_REQUEST_TIMEOUT_MS", r))
224 {
225 self.request_timeout = Duration::from_millis(v);
226 }
227 if let Some(v) = get("ARCLY_MAX_IN_FLIGHT").and_then(|r| parse("ARCLY_MAX_IN_FLIGHT", r)) {
228 self.max_in_flight = v;
229 }
230 if let Some(v) = get("ARCLY_MAX_BODY_BYTES").and_then(|r| parse("ARCLY_MAX_BODY_BYTES", r))
231 {
232 self.max_body_bytes = v;
233 }
234 if let Some(v) =
235 get("ARCLY_CACHE_MAX_ENTRIES").and_then(|r| parse("ARCLY_CACHE_MAX_ENTRIES", r))
236 {
237 self.cache_max_entries = v;
238 }
239 if let Some(v) =
240 get("ARCLY_WS_DRAIN_DEADLINE_MS").and_then(|r| parse("ARCLY_WS_DRAIN_DEADLINE_MS", r))
241 {
242 self.ws_drain_deadline = Duration::from_millis(v);
243 }
244 if let Some(v) =
245 get("ARCLY_DRAIN_BUDGET_MS").and_then(|r| parse("ARCLY_DRAIN_BUDGET_MS", r))
246 {
247 self.drain_budget = Duration::from_millis(v);
248 }
249 if let Some(v) =
250 get("ARCLY_WS_OUTBOUND_BUFFER").and_then(|r| parse("ARCLY_WS_OUTBOUND_BUFFER", r))
251 {
252 self.ws_outbound_buffer = v;
253 }
254 if let Some(v) =
255 get("ARCLY_WS_MAX_CONNECTIONS").and_then(|r| parse("ARCLY_WS_MAX_CONNECTIONS", r))
256 {
257 self.ws_max_connections = v;
258 }
259 if let Some(v) =
260 get("ARCLY_WS_PING_INTERVAL_MS").and_then(|r| parse("ARCLY_WS_PING_INTERVAL_MS", r))
261 {
262 self.ws_ping_interval = Duration::from_millis(v);
263 }
264 if let Some(v) =
265 get("ARCLY_WS_IDLE_TIMEOUT_MS").and_then(|r| parse("ARCLY_WS_IDLE_TIMEOUT_MS", r))
266 {
267 self.ws_idle_timeout = Duration::from_millis(v);
268 }
269 if let Some(v) = get("ARCLY_ADAPTIVE_SHED_TARGET_MS")
270 .and_then(|r| parse("ARCLY_ADAPTIVE_SHED_TARGET_MS", r))
271 {
272 self.adaptive_shed_target = Duration::from_millis(v);
273 }
274 if let Some(raw) = get("ARCLY_EXPOSE_DOCS") {
275 match raw.as_str() {
276 "true" | "1" => self.expose_docs = true,
277 "false" | "0" => self.expose_docs = false,
278 _ => tracing::warn!(value = raw, "ignoring unparseable ARCLY_EXPOSE_DOCS"),
279 }
280 }
281 self
282 }
283}
284
285pub struct App;
286
287impl App {
288 pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
289 let info = OpenApiInfo::new("arcly-http service", env!("CARGO_PKG_VERSION"));
290 Self::launch_with_info::<RootMod>(addr, info).await
291 }
292
293 pub async fn launch_named<RootMod: Module>(
294 addr: &str,
295 title: &'static str,
296 version: &'static str,
297 ) -> std::io::Result<()> {
298 let info = OpenApiInfo::new(title, version);
299 Self::launch_with_info::<RootMod>(addr, info).await
300 }
301
302 pub async fn launch_with_info<RootMod: Module>(
303 addr: &str,
304 info: OpenApiInfo,
305 ) -> std::io::Result<()> {
306 Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
307 }
308
309 pub async fn launch_with_plugins<RootMod: Module>(
312 addr: &str,
313 info: OpenApiInfo,
314 plugins: Vec<Box<dyn ArclyPlugin>>,
315 ) -> std::io::Result<()> {
316 Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
317 }
318
319 pub async fn launch_configured<RootMod: Module>(
321 addr: &str,
322 info: OpenApiInfo,
323 mut plugins: Vec<Box<dyn ArclyPlugin>>,
324 config: LaunchConfig,
325 ) -> std::io::Result<()> {
326 let _root: PhantomData<RootMod> = PhantomData;
327 let config = config.with_env_overrides();
329 tracing::info!(
330 request_timeout = ?config.request_timeout,
331 max_in_flight = config.max_in_flight,
332 max_body_bytes = config.max_body_bytes,
333 expose_docs = config.expose_docs,
334 "launch config (effective)"
335 );
336
337 let reachable_modules = collect_reachable_modules(RootMod::descriptor());
339 let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
340 .iter()
341 .flat_map(|m| m.controllers.iter().copied())
342 .collect();
343
344 let mut b = DiContainerBuilder::new();
346 for m in &reachable_modules {
347 for p in m.providers {
348 b.add_provider(p);
349 }
350 }
351
352 let mut plugin_ctx = ArclyPluginContext::new();
354 for p in plugins.iter_mut() {
355 plugin_ctx.current_plugin = p.name();
356 if let Err(e) = p.on_init(&mut plugin_ctx).await {
357 return Err(plugin_io_err(e));
358 }
359 }
360
361 for f in plugin_ctx.pending_providers.drain(..) {
363 f(&mut b);
364 }
365
366 b.register(crate::web::dynamic::DynamicRouteTable::new());
370 b.register(crate::web::boundary::BodyLimit(config.max_body_bytes));
373 let container = b.freeze();
374
375 let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
377 for mutator in plugin_ctx.openapi_mutators.drain(..) {
378 mutator(&mut spec_value);
379 }
380 let spec_bytes: &'static [u8] = Box::leak(
383 serde_json::to_vec(&spec_value)
384 .unwrap_or_else(|e| {
385 panic!("Arcly: OpenAPI spec serialization failed: {e}")
388 })
389 .into_boxed_slice(),
390 );
391
392 let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
396 Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
397 let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
399 Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());
400
401 let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
402 axum::Router::new();
403 let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
404 std::collections::HashSet::new();
405 mounted.insert(("/openapi.json", HttpMethod::GET));
407 mounted.insert(("/docs", HttpMethod::GET));
408 for rt in inventory::iter::<&'static RouteDescriptor> {
409 if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
412 continue;
413 }
414 mounted.insert((rt.path, rt.method));
415 router = router.route(axum_path_static(rt.path), adapt(rt, globals, filters));
416 if rt.path.len() > 1 && rt.path.ends_with('/') {
420 let trimmed: &'static str =
421 Box::leak(rt.path[..rt.path.len() - 1].to_owned().into_boxed_str());
422 if mounted.insert((trimmed, rt.method)) {
423 router = router.route(axum_path_static(trimmed), adapt(rt, globals, filters));
424 }
425 }
426 }
427 let mut app = router.with_state(container);
428 for r in &plugin_ctx.extra_routes {
429 if !mounted.insert((r.path, r.method)) {
432 return Err(plugin_io_err(PluginError::new(
433 r.plugin,
434 PluginStage::Init,
435 format!(
436 "route `{:?} {}` is already mounted by another route or plugin",
437 r.method, r.path
438 ),
439 )));
440 }
441 app = app.route(
442 axum_path_static(r.path),
443 build_plugin_route(container, r, globals, filters),
444 );
445 }
446
447 container
451 .get::<crate::web::dynamic::DynamicRouteTable>()
452 .set_globals(globals);
453 app = app.route(
454 "/_plugins/{*rest}",
455 crate::web::dynamic::dynamic_dispatch_route(container, filters),
456 );
457
458 let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
462 .iter()
463 .flat_map(|m| m.gateways.iter().copied())
464 .collect();
465 let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
466 let ws_tuning = crate::realtime::ws::WsTuning {
467 outbound_buffer: config.ws_outbound_buffer,
468 max_connections: config.ws_max_connections,
469 ping_interval: config.ws_ping_interval,
470 };
471 if !config.ws_idle_timeout.is_zero() {
475 let idle = config.ws_idle_timeout;
476 tokio::spawn(async move {
477 let mut tick = tokio::time::interval(idle / 2);
478 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
479 loop {
480 tick.tick().await;
481 let reaped = registry.sweep_idle(idle.as_secs());
482 if !reaped.is_empty() {
483 tracing::info!(count = reaped.len(), "reaped idle WebSocket connections");
484 }
485 }
486 });
487 }
488 for gd in inventory::iter::<&'static GatewayDescriptor> {
489 if !allowed_gateways.contains(gd.name) {
490 continue;
491 }
492 let runtime = (gd.build)(container);
493 app = app.route(
494 axum_path_static(gd.path),
495 ws_route(runtime, registry, container, ws_tuning),
496 );
497 }
498
499 if config.expose_docs {
503 app = app
504 .route(
505 "/openapi.json",
506 get(move || async move {
507 (
508 [(axum::http::header::CONTENT_TYPE, "application/json")],
509 spec_bytes,
510 )
511 .into_response()
512 }),
513 )
514 .route(
515 "/docs",
516 get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
517 );
518 }
519 let mut app = app.layer(axum::middleware::from_fn(
520 crate::security::apply_security_headers,
521 ));
522
523 if let Some(cors_cfg) = config.cors.clone() {
528 let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
529 app = app.layer(axum::middleware::from_fn(
530 move |req: axum::extract::Request, next: axum::middleware::Next| {
531 crate::web::cors::apply_cors(cors_cfg, req, next)
532 },
533 ));
534 }
535 let gov = crate::web::governor::Governor::new(
536 config.request_timeout,
537 config.max_in_flight,
538 config.adaptive_shed_target,
539 );
540 let app = app.layer(axum::middleware::from_fn(
541 move |req: axum::extract::Request, next: axum::middleware::Next| {
542 crate::web::governor::govern(Arc::clone(&gov), req, next)
543 },
544 ));
545
546 crate::web::cache::set_capacity(config.cache_max_entries);
548 crate::web::cache::spawn_sweeper(config.cache_sweep_interval);
549
550 let listener = tokio::net::TcpListener::bind(addr).await?;
552
553 let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
561 let mut started = 0usize;
562 #[allow(clippy::explicit_counter_loop)] for p in plugins_arc.iter() {
564 if let Err(e) = p.on_start(container).await {
565 for already in plugins_arc[..started].iter().rev() {
566 drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
567 .await;
568 }
569 return Err(plugin_io_err(e));
570 }
571 started += 1;
572 }
573
574 let plugins_for_draining = Arc::clone(&plugins_arc);
580 let draining_budget = config.drain_budget;
581 let ws_deadline = config.ws_drain_deadline;
582 let trigger = config.shutdown_trigger.clone();
583 let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
584 match trigger {
585 Some(n) => {
586 tokio::select! {
587 _ = shutdown_signal() => {}
588 _ = n.notified() => {}
589 }
590 }
591 None => shutdown_signal().await,
592 }
593 crate::observability::health::set_draining(true);
596 tracing::info!("shutdown signal received — HTTP draining");
597 tokio::spawn(async move {
600 tokio::time::sleep(ws_deadline).await;
601 let closed = registry.close_all();
602 if closed > 0 {
603 tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
604 }
605 });
606 tokio::spawn(async move {
611 for p in plugins_for_draining.iter() {
612 match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
613 Ok(Ok(())) => {}
614 Ok(Err(e)) => {
615 tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
616 }
617 Err(_) => tracing::warn!(
618 plugin = p.name(),
619 budget = ?draining_budget,
620 "plugin on_draining exceeded budget"
621 ),
622 }
623 }
624 });
625 });
626 let serve_res = serve.await;
627
628 tracing::info!(
630 budget = ?config.drain_budget,
631 "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
632 );
633 for p in plugins_arc.iter().rev() {
634 drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
635 }
636 serve_res
637 }
638}
639
640fn axum_path(path: &str) -> String {
649 path.split('/')
650 .map(|seg| {
651 if let Some(name) = seg.strip_prefix(':') {
652 format!("{{{name}}}")
653 } else if let Some(name) = seg.strip_prefix('*') {
654 format!("{{*{name}}}")
655 } else {
656 seg.to_owned()
657 }
658 })
659 .collect::<Vec<_>>()
660 .join("/")
661}
662
663fn axum_path_static(path: &str) -> &'static str {
664 Box::leak(axum_path(path).into_boxed_str())
665}
666
667fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
668 use std::collections::HashSet;
669 let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
670 let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
671 std::collections::VecDeque::new();
672 let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
673 queue.push_back(root);
674 while let Some(m) = queue.pop_front() {
675 if !visited.insert(m as *const _) {
676 continue;
677 }
678 order.push(m);
679 for getter in m.imports {
680 queue.push_back(getter());
681 }
682 }
683 order
684}
685
686#[cfg(unix)]
691async fn shutdown_signal() {
692 use tokio::signal::unix::{signal, SignalKind};
693 match signal(SignalKind::terminate()) {
694 Ok(mut sigterm) => {
695 tokio::select! {
696 _ = tokio::signal::ctrl_c() => {}
697 _ = sigterm.recv() => {}
698 }
699 }
700 Err(e) => {
701 tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
702 let _ = tokio::signal::ctrl_c().await;
703 }
704 }
705}
706
707#[cfg(not(unix))]
708async fn shutdown_signal() {
709 let _ = tokio::signal::ctrl_c().await;
710}
711
712async fn drain_plugin(
716 p: &dyn ArclyPlugin,
717 container: &'static crate::core::engine::FrozenDiContainer,
718 phase: &str,
719 budget: Duration,
720) {
721 match tokio::time::timeout(budget, p.on_shutdown(container)).await {
722 Ok(Ok(())) => {}
723 Ok(Err(e)) => {
724 tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
725 }
726 Err(_) => tracing::warn!(
727 plugin = p.name(),
728 phase,
729 budget = ?budget,
730 "plugin shutdown exceeded budget — skipped"
731 ),
732 }
733}
734
735fn plugin_io_err(e: PluginError) -> std::io::Error {
736 let kind = match e.stage {
737 PluginStage::Init => std::io::ErrorKind::InvalidInput,
738 PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
739 PluginStage::Shutdown => std::io::ErrorKind::Other,
740 };
741 std::io::Error::new(kind, e)
742}
743
744#[cfg(test)]
745mod tests {
746 use super::*;
747
748 #[test]
749 fn env_overrides_apply_and_ignore_garbage() {
750 let cfg = LaunchConfig::default().apply_overrides(|k| match k {
751 "ARCLY_REQUEST_TIMEOUT_MS" => Some("1500".into()),
752 "ARCLY_MAX_IN_FLIGHT" => Some("notanumber".into()), "ARCLY_MAX_BODY_BYTES" => Some("1024".into()),
754 "ARCLY_EXPOSE_DOCS" => Some("false".into()),
755 _ => None,
756 });
757 assert_eq!(cfg.request_timeout, Duration::from_millis(1500));
758 assert_eq!(cfg.max_in_flight, 0, "unparseable override is ignored");
759 assert_eq!(cfg.max_body_bytes, 1024);
760 assert!(!cfg.expose_docs);
761 assert_eq!(cfg.drain_budget, Duration::from_secs(5));
763 }
764}