1use std::marker::PhantomData;
21use std::sync::Arc;
22use std::time::Duration;
23
24use axum::response::{Html, IntoResponse, Json};
25use axum::routing::get;
26
27use crate::core::engine::{DiContainerBuilder, Module, ModuleDescriptor, RouteDescriptor};
28use crate::core::plugins::{
29 build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
30};
31use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
32use crate::realtime::gateway::GatewayDescriptor;
33use crate::realtime::{ws_route, ConnectionRegistry};
34use crate::web::boundary::adapt;
35
36pub struct App;
37
38impl App {
39 pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
40 let info = OpenApiInfo {
41 title: "arcly-http service",
42 version: env!("CARGO_PKG_VERSION"),
43 ..Default::default()
44 };
45 Self::launch_with_info::<RootMod>(addr, info).await
46 }
47
48 pub async fn launch_named<RootMod: Module>(
49 addr: &str,
50 title: &'static str,
51 version: &'static str,
52 ) -> std::io::Result<()> {
53 let info = OpenApiInfo {
54 title,
55 version,
56 ..Default::default()
57 };
58 Self::launch_with_info::<RootMod>(addr, info).await
59 }
60
61 pub async fn launch_with_info<RootMod: Module>(
62 addr: &str,
63 info: OpenApiInfo,
64 ) -> std::io::Result<()> {
65 Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
66 }
67
68 pub async fn launch_with_plugins<RootMod: Module>(
71 addr: &str,
72 info: OpenApiInfo,
73 mut plugins: Vec<Box<dyn ArclyPlugin>>,
74 ) -> std::io::Result<()> {
75 let _root: PhantomData<RootMod> = PhantomData;
76
77 let reachable_modules = collect_reachable_modules(RootMod::descriptor());
79 let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
80 .iter()
81 .flat_map(|m| m.controllers.iter().copied())
82 .collect();
83
84 let mut b = DiContainerBuilder::new();
86 for m in &reachable_modules {
87 for p in m.providers {
88 b.add_provider(p);
89 }
90 }
91
92 let mut plugin_ctx = ArclyPluginContext::new();
94 for p in plugins.iter_mut() {
95 if let Err(e) = p.on_init(&mut plugin_ctx).await {
96 return Err(plugin_io_err(e));
97 }
98 }
99
100 for f in plugin_ctx.pending_providers.drain(..) {
102 f(&mut b);
103 }
104
105 let container = b.freeze();
107
108 let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
110 for mutator in plugin_ctx.openapi_mutators.drain(..) {
111 mutator(&mut spec_value);
112 }
113 let spec: &'static serde_json::Value = Box::leak(Box::new(spec_value));
114
115 let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
117 axum::Router::new();
118 for rt in inventory::iter::<&'static RouteDescriptor> {
119 if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
122 continue;
123 }
124 router = router.route(rt.path, adapt(rt));
125 }
126 let mut app = router.with_state(container);
127 for r in &plugin_ctx.extra_routes {
128 app = app.route(r.path, build_plugin_route(container, r));
129 }
130
131 let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
135 .iter()
136 .flat_map(|m| m.gateways.iter().copied())
137 .collect();
138 let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
139 for gd in inventory::iter::<&'static GatewayDescriptor> {
140 if !allowed_gateways.contains(gd.name) {
141 continue;
142 }
143 let runtime = (gd.build)(container);
144 app = app.route(gd.path, ws_route(runtime, registry, container));
145 }
146
147 let app = app
148 .route(
149 "/openapi.json",
150 get(move || async move { Json(spec.clone()) }),
151 )
152 .route(
153 "/docs",
154 get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
155 )
156 .layer(axum::middleware::from_fn(
157 crate::security::apply_security_headers,
158 ));
159
160 let listener = tokio::net::TcpListener::bind(addr).await?;
162
163 let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
170 let rollback_budget = Duration::from_secs(5);
171 let mut started = 0usize;
172 #[allow(clippy::explicit_counter_loop)] for p in plugins_arc.iter() {
174 if let Err(e) = p.on_start(container).await {
175 let already_started = Arc::clone(&plugins_arc);
176 let rolled = tokio::time::timeout(rollback_budget, async move {
177 for already in already_started[..started].iter().rev() {
178 if let Err(se) = already.on_shutdown(container).await {
179 eprintln!("[arcly] plugin `{}` cleanup error: {}", already.name(), se);
180 }
181 }
182 })
183 .await;
184 if rolled.is_err() {
185 eprintln!("[arcly] plugin rollback exceeded {rollback_budget:?} — forced");
186 }
187 return Err(plugin_io_err(e));
188 }
189 started += 1;
190 }
191
192 let serve = axum::serve(listener, app).with_graceful_shutdown(async {
198 shutdown_signal().await;
199 eprintln!("[arcly] shutdown signal received — HTTP draining");
200 });
201 let serve_res = serve.await;
202
203 eprintln!("[arcly] HTTP fully drained — running plugin on_shutdown (5s budget)");
205 let drain_budget = Duration::from_secs(5);
206 let plugins_for_drain = Arc::clone(&plugins_arc);
207 let drained = tokio::time::timeout(drain_budget, async move {
208 for p in plugins_for_drain.iter().rev() {
209 if let Err(e) = p.on_shutdown(container).await {
210 eprintln!("[arcly] plugin `{}` shutdown error: {}", p.name(), e);
211 }
212 }
213 })
214 .await;
215 if drained.is_err() {
216 eprintln!("[arcly] plugin drain exceeded {drain_budget:?} — forced exit");
217 }
218 serve_res
219 }
220}
221
222fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
226 use std::collections::HashSet;
227 let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
228 let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
229 std::collections::VecDeque::new();
230 let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
231 queue.push_back(root);
232 while let Some(m) = queue.pop_front() {
233 if !visited.insert(m as *const _) {
234 continue;
235 }
236 order.push(m);
237 for getter in m.imports {
238 queue.push_back(getter());
239 }
240 }
241 order
242}
243
244#[cfg(unix)]
249async fn shutdown_signal() {
250 use tokio::signal::unix::{signal, SignalKind};
251 match signal(SignalKind::terminate()) {
252 Ok(mut sigterm) => {
253 tokio::select! {
254 _ = tokio::signal::ctrl_c() => {}
255 _ = sigterm.recv() => {}
256 }
257 }
258 Err(e) => {
259 eprintln!("[arcly] SIGTERM handler unavailable ({e}), falling back to SIGINT only");
260 let _ = tokio::signal::ctrl_c().await;
261 }
262 }
263}
264
265#[cfg(not(unix))]
266async fn shutdown_signal() {
267 let _ = tokio::signal::ctrl_c().await;
268}
269
270fn plugin_io_err(e: PluginError) -> std::io::Error {
271 let kind = match e.stage {
272 PluginStage::Init => std::io::ErrorKind::InvalidInput,
273 PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
274 PluginStage::Shutdown => std::io::ErrorKind::Other,
275 };
276 std::io::Error::new(kind, e)
277}