use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use axum::response::{Html, IntoResponse};
use axum::routing::get;
use crate::core::engine::{
DiContainerBuilder, HttpMethod, Module, ModuleDescriptor, RouteDescriptor,
};
use crate::core::plugins::{
build_plugin_route, ArclyPlugin, ArclyPluginContext, PluginError, PluginStage,
};
use crate::openapi::{build_spec_filtered, OpenApiInfo, SWAGGER_UI_HTML};
use crate::realtime::gateway::GatewayDescriptor;
use crate::realtime::{ws_route, ConnectionRegistry};
use crate::web::boundary::adapt;
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct LaunchConfig {
pub drain_budget: Duration,
pub request_timeout: Duration,
pub max_in_flight: usize,
pub max_body_bytes: usize,
pub cache_max_entries: usize,
pub cache_sweep_interval: Duration,
pub ws_drain_deadline: Duration,
pub cors: Option<crate::web::cors::CorsConfig>,
pub shutdown_trigger: Option<std::sync::Arc<tokio::sync::Notify>>,
pub expose_docs: bool,
pub ws_outbound_buffer: usize,
pub ws_max_connections: usize,
pub ws_ping_interval: Duration,
pub adaptive_shed_target: Duration,
pub ws_idle_timeout: Duration,
}
impl Default for LaunchConfig {
fn default() -> Self {
Self {
drain_budget: Duration::from_secs(5),
request_timeout: Duration::from_secs(30),
max_in_flight: 0,
max_body_bytes: 8 * 1024 * 1024,
cache_max_entries: 10_000,
cache_sweep_interval: Duration::from_secs(30),
ws_drain_deadline: Duration::from_secs(10),
adaptive_shed_target: Duration::ZERO,
cors: None,
shutdown_trigger: None,
expose_docs: true,
ws_outbound_buffer: 256,
ws_max_connections: 0,
ws_ping_interval: Duration::from_secs(20),
ws_idle_timeout: Duration::from_secs(60),
}
}
}
impl LaunchConfig {
pub fn drain_budget(mut self, v: Duration) -> Self {
self.drain_budget = v;
self
}
pub fn request_timeout(mut self, v: Duration) -> Self {
self.request_timeout = v;
self
}
pub fn max_in_flight(mut self, v: usize) -> Self {
self.max_in_flight = v;
self
}
pub fn max_body_bytes(mut self, v: usize) -> Self {
self.max_body_bytes = v;
self
}
pub fn cache_max_entries(mut self, v: usize) -> Self {
self.cache_max_entries = v;
self
}
pub fn cache_sweep_interval(mut self, v: Duration) -> Self {
self.cache_sweep_interval = v;
self
}
pub fn ws_drain_deadline(mut self, v: Duration) -> Self {
self.ws_drain_deadline = v;
self
}
pub fn adaptive_shed_target(mut self, v: Duration) -> Self {
self.adaptive_shed_target = v;
self
}
pub fn cors(mut self, v: crate::web::cors::CorsConfig) -> Self {
self.cors = Some(v);
self
}
pub fn shutdown_trigger(mut self, v: std::sync::Arc<tokio::sync::Notify>) -> Self {
self.shutdown_trigger = Some(v);
self
}
pub fn expose_docs(mut self, v: bool) -> Self {
self.expose_docs = v;
self
}
pub fn ws_outbound_buffer(mut self, v: usize) -> Self {
self.ws_outbound_buffer = v;
self
}
pub fn ws_max_connections(mut self, v: usize) -> Self {
self.ws_max_connections = v;
self
}
pub fn ws_ping_interval(mut self, v: Duration) -> Self {
self.ws_ping_interval = v;
self
}
pub fn ws_idle_timeout(mut self, v: Duration) -> Self {
self.ws_idle_timeout = v;
self
}
pub fn with_env_overrides(self) -> Self {
self.apply_overrides(|k| std::env::var(k).ok())
}
pub(crate) fn apply_overrides(mut self, get: impl Fn(&str) -> Option<String>) -> Self {
fn parse<T: std::str::FromStr>(key: &str, raw: String) -> Option<T> {
match raw.parse() {
Ok(v) => Some(v),
Err(_) => {
tracing::warn!(key, value = raw, "ignoring unparseable ARCLY_* override");
None
}
}
}
if let Some(v) =
get("ARCLY_REQUEST_TIMEOUT_MS").and_then(|r| parse("ARCLY_REQUEST_TIMEOUT_MS", r))
{
self.request_timeout = Duration::from_millis(v);
}
if let Some(v) = get("ARCLY_MAX_IN_FLIGHT").and_then(|r| parse("ARCLY_MAX_IN_FLIGHT", r)) {
self.max_in_flight = v;
}
if let Some(v) = get("ARCLY_MAX_BODY_BYTES").and_then(|r| parse("ARCLY_MAX_BODY_BYTES", r))
{
self.max_body_bytes = v;
}
if let Some(v) =
get("ARCLY_CACHE_MAX_ENTRIES").and_then(|r| parse("ARCLY_CACHE_MAX_ENTRIES", r))
{
self.cache_max_entries = v;
}
if let Some(v) =
get("ARCLY_WS_DRAIN_DEADLINE_MS").and_then(|r| parse("ARCLY_WS_DRAIN_DEADLINE_MS", r))
{
self.ws_drain_deadline = Duration::from_millis(v);
}
if let Some(v) =
get("ARCLY_DRAIN_BUDGET_MS").and_then(|r| parse("ARCLY_DRAIN_BUDGET_MS", r))
{
self.drain_budget = Duration::from_millis(v);
}
if let Some(v) =
get("ARCLY_WS_OUTBOUND_BUFFER").and_then(|r| parse("ARCLY_WS_OUTBOUND_BUFFER", r))
{
self.ws_outbound_buffer = v;
}
if let Some(v) =
get("ARCLY_WS_MAX_CONNECTIONS").and_then(|r| parse("ARCLY_WS_MAX_CONNECTIONS", r))
{
self.ws_max_connections = v;
}
if let Some(v) =
get("ARCLY_WS_PING_INTERVAL_MS").and_then(|r| parse("ARCLY_WS_PING_INTERVAL_MS", r))
{
self.ws_ping_interval = Duration::from_millis(v);
}
if let Some(v) =
get("ARCLY_WS_IDLE_TIMEOUT_MS").and_then(|r| parse("ARCLY_WS_IDLE_TIMEOUT_MS", r))
{
self.ws_idle_timeout = Duration::from_millis(v);
}
if let Some(v) = get("ARCLY_ADAPTIVE_SHED_TARGET_MS")
.and_then(|r| parse("ARCLY_ADAPTIVE_SHED_TARGET_MS", r))
{
self.adaptive_shed_target = Duration::from_millis(v);
}
if let Some(raw) = get("ARCLY_EXPOSE_DOCS") {
match raw.as_str() {
"true" | "1" => self.expose_docs = true,
"false" | "0" => self.expose_docs = false,
_ => tracing::warn!(value = raw, "ignoring unparseable ARCLY_EXPOSE_DOCS"),
}
}
self
}
}
pub struct App;
impl App {
pub async fn launch<RootMod: Module>(addr: &str) -> std::io::Result<()> {
let info = OpenApiInfo::new("arcly-http service", env!("CARGO_PKG_VERSION"));
Self::launch_with_info::<RootMod>(addr, info).await
}
pub async fn launch_named<RootMod: Module>(
addr: &str,
title: &'static str,
version: &'static str,
) -> std::io::Result<()> {
let info = OpenApiInfo::new(title, version);
Self::launch_with_info::<RootMod>(addr, info).await
}
pub async fn launch_with_info<RootMod: Module>(
addr: &str,
info: OpenApiInfo,
) -> std::io::Result<()> {
Self::launch_with_plugins::<RootMod>(addr, info, Vec::new()).await
}
pub async fn launch_with_plugins<RootMod: Module>(
addr: &str,
info: OpenApiInfo,
plugins: Vec<Box<dyn ArclyPlugin>>,
) -> std::io::Result<()> {
Self::launch_configured::<RootMod>(addr, info, plugins, LaunchConfig::default()).await
}
pub async fn launch_configured<RootMod: Module>(
addr: &str,
info: OpenApiInfo,
plugins: Vec<Box<dyn ArclyPlugin>>,
config: LaunchConfig,
) -> std::io::Result<()> {
let listener = tokio::net::TcpListener::bind(addr).await?;
Self::launch_on_listener::<RootMod>(listener, info, plugins, config).await
}
pub async fn launch_on_listener<RootMod: Module>(
listener: tokio::net::TcpListener,
info: OpenApiInfo,
mut plugins: Vec<Box<dyn ArclyPlugin>>,
config: LaunchConfig,
) -> std::io::Result<()> {
let _root: PhantomData<RootMod> = PhantomData;
let config = config.with_env_overrides();
tracing::info!(
request_timeout = ?config.request_timeout,
max_in_flight = config.max_in_flight,
max_body_bytes = config.max_body_bytes,
expose_docs = config.expose_docs,
"launch config (effective)"
);
let reachable_modules = collect_reachable_modules(RootMod::descriptor());
let allowed_controllers: std::collections::HashSet<&'static str> = reachable_modules
.iter()
.flat_map(|m| m.controllers.iter().copied())
.collect();
let mut b = DiContainerBuilder::new();
for m in &reachable_modules {
for p in m.providers {
b.add_provider(p);
}
}
let mut plugin_ctx = ArclyPluginContext::new();
for p in plugins.iter_mut() {
plugin_ctx.current_plugin = p.name();
if let Err(e) = p.on_init(&mut plugin_ctx).await {
return Err(plugin_io_err(e));
}
}
for f in plugin_ctx.pending_providers.drain(..) {
f(&mut b);
}
b.register(crate::web::dynamic::DynamicRouteTable::new());
b.register(crate::web::boundary::BodyLimit(config.max_body_bytes));
let container = b.freeze();
let mut spec_value = build_spec_filtered(&info, Some(&allowed_controllers));
for mutator in plugin_ctx.openapi_mutators.drain(..) {
mutator(&mut spec_value);
}
let spec_bytes: &'static [u8] = Box::leak(
serde_json::to_vec(&spec_value)
.unwrap_or_else(|e| {
panic!("Arcly: OpenAPI spec serialization failed: {e}")
})
.into_boxed_slice(),
);
let globals: &'static [&'static dyn crate::web::interceptors::Interceptor] =
Box::leak(std::mem::take(&mut plugin_ctx.global_interceptors).into_boxed_slice());
let filters: &'static [&'static dyn crate::web::boundary::BoundaryFilter] =
Box::leak(std::mem::take(&mut plugin_ctx.boundary_filters).into_boxed_slice());
let mut router: axum::Router<&'static crate::core::engine::FrozenDiContainer> =
axum::Router::new();
let mut mounted: std::collections::HashSet<(&'static str, HttpMethod)> =
std::collections::HashSet::new();
mounted.insert(("/openapi.json", HttpMethod::GET));
mounted.insert(("/docs", HttpMethod::GET));
for rt in inventory::iter::<&'static RouteDescriptor> {
if !rt.controller.is_empty() && !allowed_controllers.contains(rt.controller) {
continue;
}
mounted.insert((rt.path, rt.method));
router = router.route(axum_path_static(rt.path), adapt(rt, globals, filters));
if rt.path.len() > 1 && rt.path.ends_with('/') {
let trimmed: &'static str =
Box::leak(rt.path[..rt.path.len() - 1].to_owned().into_boxed_str());
if mounted.insert((trimmed, rt.method)) {
router = router.route(axum_path_static(trimmed), adapt(rt, globals, filters));
}
}
}
let mut app = router.with_state(container);
for r in &plugin_ctx.extra_routes {
if !mounted.insert((r.path, r.method)) {
return Err(plugin_io_err(PluginError::new(
r.plugin,
PluginStage::Init,
format!(
"route `{:?} {}` is already mounted by another route or plugin",
r.method, r.path
),
)));
}
app = app.route(
axum_path_static(r.path),
build_plugin_route(container, r, globals, filters),
);
}
container
.get::<crate::web::dynamic::DynamicRouteTable>()
.set_globals(globals);
app = app.route(
"/_plugins/{*rest}",
crate::web::dynamic::dynamic_dispatch_route(container, filters),
);
let allowed_gateways: std::collections::HashSet<&'static str> = reachable_modules
.iter()
.flat_map(|m| m.gateways.iter().copied())
.collect();
let registry: &'static ConnectionRegistry = Box::leak(Box::new(ConnectionRegistry::new()));
let ws_tuning = crate::realtime::ws::WsTuning {
outbound_buffer: config.ws_outbound_buffer,
max_connections: config.ws_max_connections,
ping_interval: config.ws_ping_interval,
};
if !config.ws_idle_timeout.is_zero() {
let idle = config.ws_idle_timeout;
tokio::spawn(async move {
let mut tick = tokio::time::interval(idle / 2);
tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tick.tick().await;
let reaped = registry.sweep_idle(idle.as_secs());
if !reaped.is_empty() {
tracing::info!(count = reaped.len(), "reaped idle WebSocket connections");
}
}
});
}
for gd in inventory::iter::<&'static GatewayDescriptor> {
if !allowed_gateways.contains(gd.name) {
continue;
}
let runtime = (gd.build)(container);
app = app.route(
axum_path_static(gd.path),
ws_route(runtime, registry, container, ws_tuning),
);
}
if config.expose_docs {
app = app
.route(
"/openapi.json",
get(move || async move {
(
[(axum::http::header::CONTENT_TYPE, "application/json")],
spec_bytes,
)
.into_response()
}),
)
.route(
"/docs",
get(|| async { Html(SWAGGER_UI_HTML).into_response() }),
);
}
let mut app = app.layer(axum::middleware::from_fn(
crate::security::apply_security_headers,
));
if let Some(cors_cfg) = config.cors.clone() {
let cors_cfg: &'static crate::web::cors::CorsConfig = Box::leak(Box::new(cors_cfg));
app = app.layer(axum::middleware::from_fn(
move |req: axum::extract::Request, next: axum::middleware::Next| {
crate::web::cors::apply_cors(cors_cfg, req, next)
},
));
}
let gov = crate::web::governor::Governor::new(
config.request_timeout,
config.max_in_flight,
config.adaptive_shed_target,
);
let app = app.layer(axum::middleware::from_fn(
move |req: axum::extract::Request, next: axum::middleware::Next| {
crate::web::governor::govern(Arc::clone(&gov), req, next)
},
));
crate::web::cache::set_capacity(config.cache_max_entries);
crate::web::cache::spawn_sweeper(config.cache_sweep_interval);
let plugins_arc: Arc<Vec<Box<dyn ArclyPlugin>>> = Arc::new(plugins);
let mut started = 0usize;
#[allow(clippy::explicit_counter_loop)] for p in plugins_arc.iter() {
if let Err(e) = p.on_start(container).await {
for already in plugins_arc[..started].iter().rev() {
drain_plugin(already.as_ref(), container, "rollback", config.drain_budget)
.await;
}
return Err(plugin_io_err(e));
}
started += 1;
}
let plugins_for_draining = Arc::clone(&plugins_arc);
let draining_budget = config.drain_budget;
let ws_deadline = config.ws_drain_deadline;
let trigger = config.shutdown_trigger.clone();
let serve = axum::serve(listener, app).with_graceful_shutdown(async move {
match trigger {
Some(n) => {
tokio::select! {
_ = shutdown_signal() => {}
_ = n.notified() => {}
}
}
None => shutdown_signal().await,
}
crate::observability::health::set_draining(true);
tracing::info!("shutdown signal received — HTTP draining");
tokio::spawn(async move {
tokio::time::sleep(ws_deadline).await;
let closed = registry.close_all();
if closed > 0 {
tracing::warn!(closed, "WS drain deadline reached — closed live sockets");
}
});
tokio::spawn(async move {
for p in plugins_for_draining.iter() {
match tokio::time::timeout(draining_budget, p.on_draining(container)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::error!(plugin = p.name(), error = %e, "plugin draining error")
}
Err(_) => tracing::warn!(
plugin = p.name(),
budget = ?draining_budget,
"plugin on_draining exceeded budget"
),
}
}
});
});
let serve_res = serve.await;
tracing::info!(
budget = ?config.drain_budget,
"HTTP fully drained — running plugin on_shutdown (per-plugin budget)"
);
for p in plugins_arc.iter().rev() {
drain_plugin(p.as_ref(), container, "shutdown", config.drain_budget).await;
}
serve_res
}
}
fn axum_path(path: &str) -> String {
path.split('/')
.map(|seg| {
if let Some(name) = seg.strip_prefix(':') {
format!("{{{name}}}")
} else if let Some(name) = seg.strip_prefix('*') {
format!("{{*{name}}}")
} else {
seg.to_owned()
}
})
.collect::<Vec<_>>()
.join("/")
}
fn axum_path_static(path: &str) -> &'static str {
Box::leak(axum_path(path).into_boxed_str())
}
fn collect_reachable_modules(root: &'static ModuleDescriptor) -> Vec<&'static ModuleDescriptor> {
use std::collections::HashSet;
let mut visited: HashSet<*const ModuleDescriptor> = HashSet::new();
let mut queue: std::collections::VecDeque<&'static ModuleDescriptor> =
std::collections::VecDeque::new();
let mut order: Vec<&'static ModuleDescriptor> = Vec::new();
queue.push_back(root);
while let Some(m) = queue.pop_front() {
if !visited.insert(m as *const _) {
continue;
}
order.push(m);
for getter in m.imports {
queue.push_back(getter());
}
}
order
}
#[cfg(unix)]
async fn shutdown_signal() {
use tokio::signal::unix::{signal, SignalKind};
match signal(SignalKind::terminate()) {
Ok(mut sigterm) => {
tokio::select! {
_ = tokio::signal::ctrl_c() => {}
_ = sigterm.recv() => {}
}
}
Err(e) => {
tracing::warn!(error = %e, "SIGTERM handler unavailable, falling back to SIGINT only");
let _ = tokio::signal::ctrl_c().await;
}
}
}
#[cfg(not(unix))]
async fn shutdown_signal() {
let _ = tokio::signal::ctrl_c().await;
}
async fn drain_plugin(
p: &dyn ArclyPlugin,
container: &'static crate::core::engine::FrozenDiContainer,
phase: &str,
budget: Duration,
) {
match tokio::time::timeout(budget, p.on_shutdown(container)).await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
tracing::error!(plugin = p.name(), phase, error = %e, "plugin shutdown error")
}
Err(_) => tracing::warn!(
plugin = p.name(),
phase,
budget = ?budget,
"plugin shutdown exceeded budget — skipped"
),
}
}
fn plugin_io_err(e: PluginError) -> std::io::Error {
let kind = match e.stage {
PluginStage::Init => std::io::ErrorKind::InvalidInput,
PluginStage::Start => std::io::ErrorKind::ConnectionRefused,
PluginStage::Shutdown => std::io::ErrorKind::Other,
};
std::io::Error::new(kind, e)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn env_overrides_apply_and_ignore_garbage() {
let cfg = LaunchConfig::default().apply_overrides(|k| match k {
"ARCLY_REQUEST_TIMEOUT_MS" => Some("1500".into()),
"ARCLY_MAX_IN_FLIGHT" => Some("notanumber".into()), "ARCLY_MAX_BODY_BYTES" => Some("1024".into()),
"ARCLY_EXPOSE_DOCS" => Some("false".into()),
_ => None,
});
assert_eq!(cfg.request_timeout, Duration::from_millis(1500));
assert_eq!(cfg.max_in_flight, 0, "unparseable override is ignored");
assert_eq!(cfg.max_body_bytes, 1024);
assert!(!cfg.expose_docs);
assert_eq!(cfg.drain_budget, Duration::from_secs(5));
}
}