1use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse};
25use axum::routing::get;
26
27use crate::core::engine::{
28 DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
29};
30use crate::core::plugins::{
31 build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
32};
33use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
34use crate::realtime::gateway::GatewayDescriptor;
35use crate::realtime::{ws_route, ConnectionRegistry};
36use crate::web::boundary::adapt;
37
38#[derive(Clone, Debug)]
49pub struct LaunchConfig {
50 pub drain_budget: Duration,
55 pub request_timeout: Duration,
59 pub max_in_flight: usize,
63 pub max_body_bytes: usize,
65 pub cache_max_entries: usize,
68 pub cache_sweep_interval: Duration,
70 pub ws_drain_deadline: Duration,
75 pub cors: Option<crate::web::cors::CorsConfig>,
77 pub expose_docs: bool,
80 pub ws_outbound_buffer: usize,
83 pub ws_max_connections: usize,
87 pub ws_ping_interval: Duration,
90 pub ws_idle_timeout: Duration,
94}
95
96impl Default for LaunchConfig {
97 fn default() -> Self {
98 Self {
99 drain_budget: Duration::from_secs(5),
100 request_timeout: Duration::from_secs(30),
101 max_in_flight: 0,
102 max_body_bytes: 8 * 1024 * 1024,
103 cache_max_entries: 10_000,
104 cache_sweep_interval: Duration::from_secs(30),
105 ws_drain_deadline: Duration::from_secs(10),
106 cors: None,
107 expose_docs: true,
108 ws_outbound_buffer: 256,
109 ws_max_connections: 0,
110 ws_ping_interval: Duration::from_secs(20),
111 ws_idle_timeout: Duration::from_secs(60),
112 }
113 }
114}
115
116impl LaunchConfig {
117 pub fn with_env_overrides(self) -> Self {
134 self.apply_overrides(|k| std::env::var(k).ok())
135 }
136
137 pub(crate) fn apply_overrides(mut self, get: impl Fn(&str) -> Option<String>) -> Self {
139 fn parse<T: std::str::FromStr>(key: &str, raw: String) -> Option<T> {
140 match raw.parse() {
141 Ok(v) => Some(v),
142 Err(_) => {
143 tracing::warn!(key, value = raw, "ignoring unparseable ARCLY_* override");
144 None
145 }
146 }
147 }
148 if let Some(v) =
149 get("ARCLY_REQUEST_TIMEOUT_MS").and_then(|r| parse("ARCLY_REQUEST_TIMEOUT_MS", r))
150 {
151 self.request_timeout = Duration::from_millis(v);
152 }
153 if let Some(v) = get("ARCLY_MAX_IN_FLIGHT").and_then(|r| parse("ARCLY_MAX_IN_FLIGHT", r)) {
154 self.max_in_flight = v;
155 }
156 if let Some(v) = get("ARCLY_MAX_BODY_BYTES").and_then(|r| parse("ARCLY_MAX_BODY_BYTES", r))
157 {
158 self.max_body_bytes = v;
159 }
160 if let Some(v) =
161 get("ARCLY_CACHE_MAX_ENTRIES").and_then(|r| parse("ARCLY_CACHE_MAX_ENTRIES", r))
162 {
163 self.cache_max_entries = v;
164 }
165 if let Some(v) =
166 get("ARCLY_WS_DRAIN_DEADLINE_MS").and_then(|r| parse("ARCLY_WS_DRAIN_DEADLINE_MS", r))
167 {
168 self.ws_drain_deadline = Duration::from_millis(v);
169 }
170 if let Some(v) =
171 get("ARCLY_DRAIN_BUDGET_MS").and_then(|r| parse("ARCLY_DRAIN_BUDGET_MS", r))
172 {
173 self.drain_budget = Duration::from_millis(v);
174 }
175 if let Some(v) =
176 get("ARCLY_WS_OUTBOUND_BUFFER").and_then(|r| parse("ARCLY_WS_OUTBOUND_BUFFER", r))
177 {
178 self.ws_outbound_buffer = v;
179 }
180 if let Some(v) =
181 get("ARCLY_WS_MAX_CONNECTIONS").and_then(|r| parse("ARCLY_WS_MAX_CONNECTIONS", r))
182 {
183 self.ws_max_connections = v;
184 }
185 if let Some(v) =
186 get("ARCLY_WS_PING_INTERVAL_MS").and_then(|r| parse("ARCLY_WS_PING_INTERVAL_MS", r))
187 {
188 self.ws_ping_interval = Duration::from_millis(v);
189 }
190 if let Some(v) =
191 get("ARCLY_WS_IDLE_TIMEOUT_MS").and_then(|r| parse("ARCLY_WS_IDLE_TIMEOUT_MS", r))
192 {
193 self.ws_idle_timeout = Duration::from_millis(v);
194 }
195 if let Some(raw) = get("ARCLY_EXPOSE_DOCS") {
196 match raw.as_str() {
197 "true" | "1" => self.expose_docs = true,
198 "false" | "0" => self.expose_docs = false,
199 _ => tracing::warn!(value = raw, "ignoring unparseable ARCLY_EXPOSE_DOCS"),
200 }
201 }
202 self
203 }
204}
205
206pub struct App;
207
208impl App {
209 pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
210 let info = OpenApiInfo {
211 title: "arcly-http service",
212 version: env!("CARGO_PKG_VERSION"),
213 ..Default::default()
214 };
215 Self::launch_with_info::<RootMod>(addr, info).await
216 }
217
218 pub async fn launch_named<RootMod: Module>(
219 addr: &str,
220 title: &'static str,
221 version: &'static str,
222 ) -> std::io::Result<()> {
223 let info = OpenApiInfo {
224 title,
225 version,
226 ..Default::default()
227 };
228 Self::launch_with_info::<RootMod>(addr, info).await
229 }
230
231 pub async fn launch_with_info<RootMod: Module>(
232 addr: &str,
233 info: OpenApiInfo,
234 ) -> std::io::Result<()> {
235 Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
236 }
237
238 pub async fn launch_with_plugins<RootMod: Module>(
241 addr: &str,
242 info: OpenApiInfo,
243 plugins: Vec<Box<dyn ArclyPlugin>>,
244 ) -> std::io::Result<()> {
245 Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
246 }
247
248 pub async fn launch_configured<RootMod: Module>(
250 addr: &str,
251 info: OpenApiInfo,
252 mut plugins: Vec<Box<dyn ArclyPlugin>>,
253 config: LaunchConfig,
254 ) -> std::io::Result<()> {
255 let _root: PhantomData<RootMod> = PhantomData;
256 let config = config.with_env_overrides();
258 tracing::info!(
259 request_timeout = ?config.request_timeout,
260 max_in_flight = config.max_in_flight,
261 max_body_bytes = config.max_body_bytes,
262 expose_docs = config.expose_docs,
263 "launch config (effective)"
264 );
265
266 let reachable_modules = collect_reachable_modules(RootMod::descriptor());
268 let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
269 .iter()
270 .flat_map(|m| m.controllers.iter().copied())
271 .collect();
272
273 let mut b = DiContainerBuilder::new();
275 for m in &reachable_modules {
276 for p in m.providers {
277 b.add_provider(p);
278 }
279 }
280
281 let mut plugin_ctx = ArclyPluginContext::new();
283 for p in plugins.iter_mut() {
284 plugin_ctx.current_plugin = p.name();
285 if let Err(e) = p.on_init(&mut plugin_ctx).await {
286 return Err(plugin_io_err(e));
287 }
288 }
289
290 for f in plugin_ctx.pending_providers.drain(..) {
292 f(&mut b);
293 }
294
295 b.register(crate::web::dynamic::DynamicRouteTable::new());
299 b.register(crate::web::boundary::BodyLimit(config.max_body_bytes));
302 let container = b.freeze();
303
304 let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
306 for mutator in plugin_ctx.openapi_mutators.drain(..) {
307 mutator(&mut spec_value);
308 }
309 let spec_bytes: &'static [u8] = Box::leak(
312 serde_json::to_vec(&spec_value)
313 .unwrap_or_else(|e| {
314 panic!("Arcly: OpenAPI spec serialization failed: {e}")
317 })
318 .into_boxed_slice(),
319 );
320
321 let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
325 Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
326 let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
328 Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());
329
330 let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
331 axum::Router::new();
332 let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
333 std::collections::HashSet::new();
334 mounted.insert(("/openapi.json", HttpMethod::GET));
336 mounted.insert(("/docs", HttpMethod::GET));
337 for rt in inventory::iter::<&'static RouteDescriptor> {
338 if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
341 continue;
342 }
343 mounted.insert((rt.path, rt.method));
344 router = router.route(rt.path, adapt(rt, globals, filters));
345 }
346 let mut app = router.with_state(container);
347 for r in &plugin_ctx.extra_routes {
348 if !mounted.insert((r.path, r.method)) {
351 return Err(plugin_io_err(PluginError::new(
352 r.plugin,
353 PluginStage::Init,
354 format!(
355 "route `{:?} {}` is already mounted by another route or plugin",
356 r.method, r.path
357 ),
358 )));
359 }
360 app = app.route(r.path, build_plugin_route(container, r, globals, filters));
361 }
362
363 app = app.route(
365 "/_plugins/*rest",
366 crate::web::dynamic::dynamic_dispatch_route(container, globals, filters),
367 );
368
369 let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
373 .iter()
374 .flat_map(|m| m.gateways.iter().copied())
375 .collect();
376 let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
377 let ws_tuning = crate::realtime::ws::WsTuning {
378 outbound_buffer: config.ws_outbound_buffer,
379 max_connections: config.ws_max_connections,
380 ping_interval: config.ws_ping_interval,
381 };
382 if !config.ws_idle_timeout.is_zero() {
386 let idle = config.ws_idle_timeout;
387 tokio::spawn(async move {
388 let mut tick = tokio::time::interval(idle / 2);
389 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
390 loop {
391 tick.tick().await;
392 let reaped = registry.sweep_idle(idle.as_secs());
393 if !reaped.is_empty() {
394 tracing::info!(count = reaped.len(), "reaped idle WebSocket connections");
395 }
396 }
397 });
398 }
399 for gd in inventory::iter::<&'static GatewayDescriptor> {
400 if !allowed_gateways.contains(gd.name) {
401 continue;
402 }
403 let runtime = (gd.build)(container);
404 app = app.route(gd.path, ws_route(runtime, registry, container, ws_tuning));
405 }
406
407 if config.expose_docs {
411 app = app
412 .route(
413 "/openapi.json",
414 get(move || async move {
415 (
416 [(axum::http::header::CONTENT_TYPE, "application/json")],
417 spec_bytes,
418 )
419 .into_response()
420 }),
421 )
422 .route(
423 "/docs",
424 get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
425 );
426 }
427 let mut app = app.layer(axum::middleware::from_fn(
428 crate::security::apply_security_headers,
429 ));
430
431 if let Some(cors_cfg) = config.cors.clone() {
436 let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
437 app = app.layer(axum::middleware::from_fn(
438 move |req: axum::extract::Request, next: axum::middleware::Next| {
439 crate::web::cors::apply_cors(cors_cfg, req, next)
440 },
441 ));
442 }
443 let gov = crate::web::governor::Governor::new(config.request_timeout, config.max_in_flight);
444 let app = app.layer(axum::middleware::from_fn(
445 move |req: axum::extract::Request, next: axum::middleware::Next| {
446 crate::web::governor::govern(Arc::clone(&gov), req, next)
447 },
448 ));
449
450 crate::web::cache::set_capacity(config.cache_max_entries);
452 crate::web::cache::spawn_sweeper(config.cache_sweep_interval);
453
454 let listener = tokio::net::TcpListener::bind(addr).await?;
456
457 let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
465 let mut started = 0usize;
466 #[allow(clippy::explicit_counter_loop)] for p in plugins_arc.iter() {
468 if let Err(e) = p.on_start(container).await {
469 for already in plugins_arc[..started].iter().rev() {
470 drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
471 .await;
472 }
473 return Err(plugin_io_err(e));
474 }
475 started += 1;
476 }
477
478 let plugins_for_draining = Arc::clone(&plugins_arc);
484 let draining_budget = config.drain_budget;
485 let ws_deadline = config.ws_drain_deadline;
486 let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
487 shutdown_signal().await;
488 crate::observability::health::set_draining(true);
491 tracing::info!("shutdown signal received — HTTP draining");
492 tokio::spawn(async move {
495 tokio::time::sleep(ws_deadline).await;
496 let closed = registry.close_all();
497 if closed > 0 {
498 tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
499 }
500 });
501 tokio::spawn(async move {
506 for p in plugins_for_draining.iter() {
507 match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
508 Ok(Ok(())) => {}
509 Ok(Err(e)) => {
510 tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
511 }
512 Err(_) => tracing::warn!(
513 plugin = p.name(),
514 budget = ?draining_budget,
515 "plugin on_draining exceeded budget"
516 ),
517 }
518 }
519 });
520 });
521 let serve_res = serve.await;
522
523 tracing::info!(
525 budget = ?config.drain_budget,
526 "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
527 );
528 for p in plugins_arc.iter().rev() {
529 drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
530 }
531 serve_res
532 }
533}
534
535fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
539 use std::collections::HashSet;
540 let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
541 let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
542 std::collections::VecDeque::new();
543 let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
544 queue.push_back(root);
545 while let Some(m) = queue.pop_front() {
546 if !visited.insert(m as *const _) {
547 continue;
548 }
549 order.push(m);
550 for getter in m.imports {
551 queue.push_back(getter());
552 }
553 }
554 order
555}
556
557#[cfg(unix)]
562async fn shutdown_signal() {
563 use tokio::signal::unix::{signal, SignalKind};
564 match signal(SignalKind::terminate()) {
565 Ok(mut sigterm) => {
566 tokio::select! {
567 _ = tokio::signal::ctrl_c() => {}
568 _ = sigterm.recv() => {}
569 }
570 }
571 Err(e) => {
572 tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
573 let _ = tokio::signal::ctrl_c().await;
574 }
575 }
576}
577
578#[cfg(not(unix))]
579async fn shutdown_signal() {
580 let _ = tokio::signal::ctrl_c().await;
581}
582
583async fn drain_plugin(
587 p: &dyn ArclyPlugin,
588 container: &'static crate::core::engine::FrozenDiContainer,
589 phase: &str,
590 budget: Duration,
591) {
592 match tokio::time::timeout(budget, p.on_shutdown(container)).await {
593 Ok(Ok(())) => {}
594 Ok(Err(e)) => {
595 tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
596 }
597 Err(_) => tracing::warn!(
598 plugin = p.name(),
599 phase,
600 budget = ?budget,
601 "plugin shutdown exceeded budget — skipped"
602 ),
603 }
604}
605
606fn plugin_io_err(e: PluginError) -> std::io::Error {
607 let kind = match e.stage {
608 PluginStage::Init => std::io::ErrorKind::InvalidInput,
609 PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
610 PluginStage::Shutdown => std::io::ErrorKind::Other,
611 };
612 std::io::Error::new(kind, e)
613}
614
615#[cfg(test)]
616mod tests {
617 use super::*;
618
619 #[test]
620 fn env_overrides_apply_and_ignore_garbage() {
621 let cfg = LaunchConfig::default().apply_overrides(|k| match k {
622 "ARCLY_REQUEST_TIMEOUT_MS" => Some("1500".into()),
623 "ARCLY_MAX_IN_FLIGHT" => Some("notanumber".into()), "ARCLY_MAX_BODY_BYTES" => Some("1024".into()),
625 "ARCLY_EXPOSE_DOCS" => Some("false".into()),
626 _ => None,
627 });
628 assert_eq!(cfg.request_timeout, Duration::from_millis(1500));
629 assert_eq!(cfg.max_in_flight, 0, "unparseable override is ignored");
630 assert_eq!(cfg.max_body_bytes, 1024);
631 assert!(!cfg.expose_docs);
632 assert_eq!(cfg.drain_budget, Duration::from_secs(5));
634 }
635}