1use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse};
25use axum::routing::get;
26
27use crate::core::engine::{
28 DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
29};
30use crate::core::plugins::{
31 build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
32};
33use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
34use crate::realtime::gateway::GatewayDescriptor;
35use crate::realtime::{ws_route, ConnectionRegistry};
36use crate::web::boundary::adapt;
37
38#[derive(Clone, Debug)]
49pub struct LaunchConfig {
50 pub drain_budget: Duration,
55 pub request_timeout: Duration,
59 pub max_in_flight: usize,
63 pub max_body_bytes: usize,
65 pub cache_max_entries: usize,
68 pub cache_sweep_interval: Duration,
70 pub ws_drain_deadline: Duration,
75 pub cors: Option<crate::web::cors::CorsConfig>,
77}
78
79impl Default for LaunchConfig {
80 fn default() -> Self {
81 Self {
82 drain_budget: Duration::from_secs(5),
83 request_timeout: Duration::from_secs(30),
84 max_in_flight: 0,
85 max_body_bytes: 8 * 1024 * 1024,
86 cache_max_entries: 10_000,
87 cache_sweep_interval: Duration::from_secs(30),
88 ws_drain_deadline: Duration::from_secs(10),
89 cors: None,
90 }
91 }
92}
93
94pub struct App;
95
96impl App {
97 pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
98 let info = OpenApiInfo {
99 title: "arcly-http service",
100 version: env!("CARGO_PKG_VERSION"),
101 ..Default::default()
102 };
103 Self::launch_with_info::<RootMod>(addr, info).await
104 }
105
106 pub async fn launch_named<RootMod: Module>(
107 addr: &str,
108 title: &'static str,
109 version: &'static str,
110 ) -> std::io::Result<()> {
111 let info = OpenApiInfo {
112 title,
113 version,
114 ..Default::default()
115 };
116 Self::launch_with_info::<RootMod>(addr, info).await
117 }
118
119 pub async fn launch_with_info<RootMod: Module>(
120 addr: &str,
121 info: OpenApiInfo,
122 ) -> std::io::Result<()> {
123 Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
124 }
125
126 pub async fn launch_with_plugins<RootMod: Module>(
129 addr: &str,
130 info: OpenApiInfo,
131 plugins: Vec<Box<dyn ArclyPlugin>>,
132 ) -> std::io::Result<()> {
133 Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
134 }
135
136 pub async fn launch_configured<RootMod: Module>(
138 addr: &str,
139 info: OpenApiInfo,
140 mut plugins: Vec<Box<dyn ArclyPlugin>>,
141 config: LaunchConfig,
142 ) -> std::io::Result<()> {
143 let _root: PhantomData<RootMod> = PhantomData;
144
145 let reachable_modules = collect_reachable_modules(RootMod::descriptor());
147 let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
148 .iter()
149 .flat_map(|m| m.controllers.iter().copied())
150 .collect();
151
152 let mut b = DiContainerBuilder::new();
154 for m in &reachable_modules {
155 for p in m.providers {
156 b.add_provider(p);
157 }
158 }
159
160 let mut plugin_ctx = ArclyPluginContext::new();
162 for p in plugins.iter_mut() {
163 plugin_ctx.current_plugin = p.name();
164 if let Err(e) = p.on_init(&mut plugin_ctx).await {
165 return Err(plugin_io_err(e));
166 }
167 }
168
169 for f in plugin_ctx.pending_providers.drain(..) {
171 f(&mut b);
172 }
173
174 b.register(crate::web::dynamic::DynamicRouteTable::new());
178 let container = b.freeze();
179
180 let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
182 for mutator in plugin_ctx.openapi_mutators.drain(..) {
183 mutator(&mut spec_value);
184 }
185 let spec_bytes: &'static [u8] = Box::leak(
188 serde_json::to_vec(&spec_value)
189 .unwrap_or_else(|e| {
190 panic!("Arcly: OpenAPI spec serialization failed: {e}")
193 })
194 .into_boxed_slice(),
195 );
196
197 let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
201 Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
202 let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
204 Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());
205
206 let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
207 axum::Router::new();
208 let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
209 std::collections::HashSet::new();
210 mounted.insert(("/openapi.json", HttpMethod::GET));
212 mounted.insert(("/docs", HttpMethod::GET));
213 for rt in inventory::iter::<&'static RouteDescriptor> {
214 if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
217 continue;
218 }
219 mounted.insert((rt.path, rt.method));
220 router = router.route(rt.path, adapt(rt, globals, filters));
221 }
222 let mut app = router.with_state(container);
223 for r in &plugin_ctx.extra_routes {
224 if !mounted.insert((r.path, r.method)) {
227 return Err(plugin_io_err(PluginError::new(
228 r.plugin,
229 PluginStage::Init,
230 format!(
231 "route `{:?} {}` is already mounted by another route or plugin",
232 r.method, r.path
233 ),
234 )));
235 }
236 app = app.route(r.path, build_plugin_route(container, r, globals, filters));
237 }
238
239 app = app.route(
241 "/_plugins/*rest",
242 crate::web::dynamic::dynamic_dispatch_route(container, globals, filters),
243 );
244
245 let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
249 .iter()
250 .flat_map(|m| m.gateways.iter().copied())
251 .collect();
252 let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
253 for gd in inventory::iter::<&'static GatewayDescriptor> {
254 if !allowed_gateways.contains(gd.name) {
255 continue;
256 }
257 let runtime = (gd.build)(container);
258 app = app.route(gd.path, ws_route(runtime, registry, container));
259 }
260
261 let mut app = app
262 .route(
263 "/openapi.json",
264 get(move || async move {
265 (
266 [(axum::http::header::CONTENT_TYPE, "application/json")],
267 spec_bytes,
268 )
269 .into_response()
270 }),
271 )
272 .route(
273 "/docs",
274 get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
275 )
276 .layer(axum::middleware::from_fn(
277 crate::security::apply_security_headers,
278 ));
279
280 if let Some(cors_cfg) = config.cors.clone() {
285 let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
286 app = app.layer(axum::middleware::from_fn(
287 move |req: axum::extract::Request, next: axum::middleware::Next| {
288 crate::web::cors::apply_cors(cors_cfg, req, next)
289 },
290 ));
291 }
292 let gov = crate::web::governor::Governor::new(config.request_timeout, config.max_in_flight);
293 let app = app.layer(axum::middleware::from_fn(
294 move |req: axum::extract::Request, next: axum::middleware::Next| {
295 crate::web::governor::govern(Arc::clone(&gov), req, next)
296 },
297 ));
298
299 crate::web::boundary::set_max_body(config.max_body_bytes);
301 crate::web::cache::set_capacity(config.cache_max_entries);
302 crate::web::cache::spawn_sweeper(config.cache_sweep_interval);
303
304 let listener = tokio::net::TcpListener::bind(addr).await?;
306
307 let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
315 let mut started = 0usize;
316 #[allow(clippy::explicit_counter_loop)] for p in plugins_arc.iter() {
318 if let Err(e) = p.on_start(container).await {
319 for already in plugins_arc[..started].iter().rev() {
320 drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
321 .await;
322 }
323 return Err(plugin_io_err(e));
324 }
325 started += 1;
326 }
327
328 let plugins_for_draining = Arc::clone(&plugins_arc);
334 let draining_budget = config.drain_budget;
335 let ws_deadline = config.ws_drain_deadline;
336 let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
337 shutdown_signal().await;
338 tracing::info!("shutdown signal received — HTTP draining");
339 tokio::spawn(async move {
342 tokio::time::sleep(ws_deadline).await;
343 let closed = registry.close_all();
344 if closed > 0 {
345 tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
346 }
347 });
348 tokio::spawn(async move {
353 for p in plugins_for_draining.iter() {
354 match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
355 Ok(Ok(())) => {}
356 Ok(Err(e)) => {
357 tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
358 }
359 Err(_) => tracing::warn!(
360 plugin = p.name(),
361 budget = ?draining_budget,
362 "plugin on_draining exceeded budget"
363 ),
364 }
365 }
366 });
367 });
368 let serve_res = serve.await;
369
370 tracing::info!(
372 budget = ?config.drain_budget,
373 "HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
374 );
375 for p in plugins_arc.iter().rev() {
376 drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
377 }
378 serve_res
379 }
380}
381
382fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
386 use std::collections::HashSet;
387 let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
388 let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
389 std::collections::VecDeque::new();
390 let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
391 queue.push_back(root);
392 while let Some(m) = queue.pop_front() {
393 if !visited.insert(m as *const _) {
394 continue;
395 }
396 order.push(m);
397 for getter in m.imports {
398 queue.push_back(getter());
399 }
400 }
401 order
402}
403
404#[cfg(unix)]
409async fn shutdown_signal() {
410 use tokio::signal::unix::{signal, SignalKind};
411 match signal(SignalKind::terminate()) {
412 Ok(mut sigterm) => {
413 tokio::select! {
414 _ = tokio::signal::ctrl_c() => {}
415 _ = sigterm.recv() => {}
416 }
417 }
418 Err(e) => {
419 tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
420 let _ = tokio::signal::ctrl_c().await;
421 }
422 }
423}
424
425#[cfg(not(unix))]
426async fn shutdown_signal() {
427 let _ = tokio::signal::ctrl_c().await;
428}
429
430async fn drain_plugin(
434 p: &dyn ArclyPlugin,
435 container: &'static crate::core::engine::FrozenDiContainer,
436 phase: &str,
437 budget: Duration,
438) {
439 match tokio::time::timeout(budget, p.on_shutdown(container)).await {
440 Ok(Ok(())) => {}
441 Ok(Err(e)) => {
442 tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
443 }
444 Err(_) => tracing::warn!(
445 plugin = p.name(),
446 phase,
447 budget = ?budget,
448 "plugin shutdown exceeded budget — skipped"
449 ),
450 }
451}
452
453fn plugin_io_err(e: PluginError) -> std::io::Error {
454 let kind = match e.stage {
455 PluginStage::Init => std::io::ErrorKind::InvalidInput,
456 PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
457 PluginStage::Shutdown => std::io::ErrorKind::Other,
458 };
459 std::io::Error::new(kind, e)
460}