Skip to main content

ranvier_http/
ingress.rs

1//! # Ingress Module - Flat API Entry Point
2//!
3//! Implements Discussion 193: `Ranvier::http()` is an **Ingress Circuit Builder**, not a web server.
4//!
5//! ## API Surface (MVP)
6//!
7//! - `bind(addr)` — Execution unit
8//! - `route(path, circuit)` — Core wiring
9//! - `fallback(circuit)` — Circuit completeness
10//! - `into_raw_service()` — Escape hatch to Raw API
11//!
12//! ## Flat API Principle (Discussion 192)
13//!
14//! User code depth ≤ 2. Complexity is isolated, not hidden.
15
16use base64::Engine;
17use bytes::Bytes;
18use futures_util::{SinkExt, StreamExt};
19use http::{Method, Request, Response, StatusCode};
20use http_body_util::{BodyExt, Full};
21use hyper::body::Incoming;
22use hyper::server::conn::http1;
23use hyper::upgrade::Upgraded;
24use hyper_util::rt::TokioIo;
25use ranvier_core::event::{EventSink, EventSource};
26use ranvier_core::prelude::*;
27use ranvier_runtime::Axon;
28use serde::Serialize;
29use serde::de::DeserializeOwned;
30use sha1::{Digest, Sha1};
31use std::collections::HashMap;
32use std::convert::Infallible;
33use std::future::Future;
34use std::net::SocketAddr;
35use std::pin::Pin;
36use std::sync::Arc;
37use std::time::Duration;
38use tokio::net::TcpListener;
39use tokio::sync::Mutex;
40use tokio_tungstenite::WebSocketStream;
41use tokio_tungstenite::tungstenite::{Error as WsWireError, Message as WsWireMessage};
42use tracing::Instrument;
43
44use crate::response::{HttpResponse, IntoResponse, outcome_to_response_with_error};
45
46/// The Ranvier Framework entry point.
47///
48/// `Ranvier` provides static methods to create Ingress builders for various protocols.
49/// Currently only HTTP is supported.
50pub struct Ranvier;
51
52impl Ranvier {
53    /// Create an HTTP Ingress Circuit Builder.
54    pub fn http<R>() -> HttpIngress<R>
55    where
56        R: ranvier_core::transition::ResourceRequirement + Clone,
57    {
58        HttpIngress::new()
59    }
60}
61
62/// Route handler type: boxed async function returning Response
63type RouteHandler<R> = Arc<
64    dyn Fn(http::request::Parts, &R) -> Pin<Box<dyn Future<Output = HttpResponse> + Send>>
65        + Send
66        + Sync,
67>;
68
69/// Type-erased cloneable HTTP service (replaces tower::util::BoxCloneService).
70#[derive(Clone)]
71struct BoxService(
72    Arc<
73        dyn Fn(Request<Incoming>) -> Pin<Box<dyn Future<Output = Result<HttpResponse, Infallible>> + Send>>
74            + Send
75            + Sync,
76    >,
77);
78
79impl BoxService {
80    fn new<F, Fut>(f: F) -> Self
81    where
82        F: Fn(Request<Incoming>) -> Fut + Send + Sync + 'static,
83        Fut: Future<Output = Result<HttpResponse, Infallible>> + Send + 'static,
84    {
85        Self(Arc::new(move |req| Box::pin(f(req))))
86    }
87
88    fn call(&self, req: Request<Incoming>) -> Pin<Box<dyn Future<Output = Result<HttpResponse, Infallible>> + Send>> {
89        (self.0)(req)
90    }
91}
92
93impl hyper::service::Service<Request<Incoming>> for BoxService {
94    type Response = HttpResponse;
95    type Error = Infallible;
96    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Infallible>> + Send>>;
97
98    fn call(&self, req: Request<Incoming>) -> Self::Future {
99        (self.0)(req)
100    }
101}
102
103type BoxHttpService = BoxService;
104type ServiceLayer = Arc<dyn Fn(BoxHttpService) -> BoxHttpService + Send + Sync>;
105type LifecycleHook = Arc<dyn Fn() + Send + Sync>;
106type BusInjector = Arc<dyn Fn(&http::request::Parts, &mut Bus) + Send + Sync + 'static>;
107type WsSessionFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
108type WsSessionHandler<R> =
109    Arc<dyn Fn(WebSocketConnection, Arc<R>, Bus) -> WsSessionFuture + Send + Sync>;
110type HealthCheckFuture = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
111type HealthCheckFn<R> = Arc<dyn Fn(Arc<R>) -> HealthCheckFuture + Send + Sync>;
112const REQUEST_ID_HEADER: &str = "x-request-id";
113const WS_UPGRADE_TOKEN: &str = "websocket";
114const WS_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
115
116#[derive(Clone)]
117struct NamedHealthCheck<R> {
118    name: String,
119    check: HealthCheckFn<R>,
120}
121
122#[derive(Clone)]
123struct HealthConfig<R> {
124    health_path: Option<String>,
125    readiness_path: Option<String>,
126    liveness_path: Option<String>,
127    checks: Vec<NamedHealthCheck<R>>,
128}
129
130impl<R> Default for HealthConfig<R> {
131    fn default() -> Self {
132        Self {
133            health_path: None,
134            readiness_path: None,
135            liveness_path: None,
136            checks: Vec::new(),
137        }
138    }
139}
140
141#[derive(Clone, Default)]
142struct StaticAssetsConfig {
143    mounts: Vec<StaticMount>,
144    spa_fallback: Option<String>,
145    cache_control: Option<String>,
146    enable_compression: bool,
147}
148
149#[derive(Clone)]
150struct StaticMount {
151    route_prefix: String,
152    directory: String,
153}
154
155/// TLS configuration for HTTPS serving.
156#[cfg(feature = "tls")]
157#[derive(Clone)]
158struct TlsAcceptorConfig {
159    cert_path: String,
160    key_path: String,
161}
162
163#[derive(Serialize)]
164struct HealthReport {
165    status: &'static str,
166    probe: &'static str,
167    checks: Vec<HealthCheckReport>,
168}
169
170#[derive(Serialize)]
171struct HealthCheckReport {
172    name: String,
173    status: &'static str,
174    #[serde(skip_serializing_if = "Option::is_none")]
175    error: Option<String>,
176}
177
178fn timeout_middleware(timeout: Duration) -> ServiceLayer {
179    Arc::new(move |inner: BoxHttpService| {
180        BoxService::new(move |req: Request<Incoming>| {
181            let inner = inner.clone();
182            async move {
183                match tokio::time::timeout(timeout, inner.call(req)).await {
184                    Ok(response) => response,
185                    Err(_) => Ok(Response::builder()
186                        .status(StatusCode::REQUEST_TIMEOUT)
187                        .body(
188                            Full::new(Bytes::from("Request Timeout"))
189                                .map_err(|never| match never {})
190                                .boxed(),
191                        )
192                        .expect("valid HTTP response construction")),
193                }
194            }
195        })
196    })
197}
198
199fn request_id_middleware() -> ServiceLayer {
200    Arc::new(move |inner: BoxHttpService| {
201        BoxService::new(move |req: Request<Incoming>| {
202            let inner = inner.clone();
203            async move {
204                let mut req = req;
205                let request_id = req
206                    .headers()
207                    .get(REQUEST_ID_HEADER)
208                    .cloned()
209                    .unwrap_or_else(|| {
210                        http::HeaderValue::from_str(&uuid::Uuid::new_v4().to_string())
211                            .unwrap_or_else(|_| {
212                                http::HeaderValue::from_static("request-id-unavailable")
213                            })
214                    });
215                req.headers_mut()
216                    .insert(REQUEST_ID_HEADER, request_id.clone());
217                let mut response = inner.call(req).await?;
218                response
219                    .headers_mut()
220                    .insert(REQUEST_ID_HEADER, request_id);
221                Ok(response)
222            }
223        })
224    })
225}
226
227#[derive(Clone, Debug, Default, PartialEq, Eq)]
228pub struct PathParams {
229    values: HashMap<String, String>,
230}
231
232/// Public route descriptor snapshot for tooling integrations (e.g., OpenAPI generation).
233#[derive(Clone, Debug, PartialEq, Eq)]
234pub struct HttpRouteDescriptor {
235    method: Method,
236    path_pattern: String,
237}
238
239impl HttpRouteDescriptor {
240    pub fn new(method: Method, path_pattern: impl Into<String>) -> Self {
241        Self {
242            method,
243            path_pattern: path_pattern.into(),
244        }
245    }
246
247    pub fn method(&self) -> &Method {
248        &self.method
249    }
250
251    pub fn path_pattern(&self) -> &str {
252        &self.path_pattern
253    }
254}
255
256/// Connection metadata injected into Bus for each accepted WebSocket session.
257#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
258pub struct WebSocketSessionContext {
259    connection_id: uuid::Uuid,
260    path: String,
261    query: Option<String>,
262}
263
264impl WebSocketSessionContext {
265    pub fn connection_id(&self) -> uuid::Uuid {
266        self.connection_id
267    }
268
269    pub fn path(&self) -> &str {
270        &self.path
271    }
272
273    pub fn query(&self) -> Option<&str> {
274        self.query.as_deref()
275    }
276}
277
278/// Logical WebSocket message model used by Ranvier EventSource/EventSink bridge.
279#[derive(Clone, Debug, PartialEq, Eq)]
280pub enum WebSocketEvent {
281    Text(String),
282    Binary(Vec<u8>),
283    Ping(Vec<u8>),
284    Pong(Vec<u8>),
285    Close,
286}
287
288impl WebSocketEvent {
289    pub fn text(value: impl Into<String>) -> Self {
290        Self::Text(value.into())
291    }
292
293    pub fn binary(value: impl Into<Vec<u8>>) -> Self {
294        Self::Binary(value.into())
295    }
296
297    pub fn json<T>(value: &T) -> Result<Self, serde_json::Error>
298    where
299        T: Serialize,
300    {
301        let text = serde_json::to_string(value)?;
302        Ok(Self::Text(text))
303    }
304}
305
306#[derive(Debug, thiserror::Error)]
307pub enum WebSocketError {
308    #[error("websocket wire error: {0}")]
309    Wire(#[from] WsWireError),
310    #[error("json serialization failed: {0}")]
311    JsonSerialize(#[source] serde_json::Error),
312    #[error("json deserialization failed: {0}")]
313    JsonDeserialize(#[source] serde_json::Error),
314    #[error("expected text or binary frame for json payload")]
315    NonDataFrame,
316}
317
318type WsServerStream = WebSocketStream<TokioIo<Upgraded>>;
319type WsServerSink = futures_util::stream::SplitSink<WsServerStream, WsWireMessage>;
320type WsServerSource = futures_util::stream::SplitStream<WsServerStream>;
321
322/// WebSocket connection adapter bridging wire frames and EventSource/EventSink traits.
323pub struct WebSocketConnection {
324    sink: Mutex<WsServerSink>,
325    source: Mutex<WsServerSource>,
326    session: WebSocketSessionContext,
327}
328
329impl WebSocketConnection {
330    fn new(stream: WsServerStream, session: WebSocketSessionContext) -> Self {
331        let (sink, source) = stream.split();
332        Self {
333            sink: Mutex::new(sink),
334            source: Mutex::new(source),
335            session,
336        }
337    }
338
339    pub fn session(&self) -> &WebSocketSessionContext {
340        &self.session
341    }
342
343    pub async fn send(&self, event: WebSocketEvent) -> Result<(), WebSocketError> {
344        let mut sink = self.sink.lock().await;
345        sink.send(event.into_wire_message()).await?;
346        Ok(())
347    }
348
349    pub async fn send_json<T>(&self, value: &T) -> Result<(), WebSocketError>
350    where
351        T: Serialize,
352    {
353        let event = WebSocketEvent::json(value).map_err(WebSocketError::JsonSerialize)?;
354        self.send(event).await
355    }
356
357    pub async fn next_json<T>(&mut self) -> Result<Option<T>, WebSocketError>
358    where
359        T: DeserializeOwned,
360    {
361        let Some(event) = self.recv_event().await? else {
362            return Ok(None);
363        };
364        match event {
365            WebSocketEvent::Text(text) => serde_json::from_str(&text)
366                .map(Some)
367                .map_err(WebSocketError::JsonDeserialize),
368            WebSocketEvent::Binary(bytes) => serde_json::from_slice(&bytes)
369                .map(Some)
370                .map_err(WebSocketError::JsonDeserialize),
371            _ => Err(WebSocketError::NonDataFrame),
372        }
373    }
374
375    async fn recv_event(&mut self) -> Result<Option<WebSocketEvent>, WsWireError> {
376        let mut source = self.source.lock().await;
377        while let Some(item) = source.next().await {
378            let message = item?;
379            if let Some(event) = WebSocketEvent::from_wire_message(message) {
380                return Ok(Some(event));
381            }
382        }
383        Ok(None)
384    }
385}
386
387impl WebSocketEvent {
388    fn from_wire_message(message: WsWireMessage) -> Option<Self> {
389        match message {
390            WsWireMessage::Text(value) => Some(Self::Text(value.to_string())),
391            WsWireMessage::Binary(value) => Some(Self::Binary(value.to_vec())),
392            WsWireMessage::Ping(value) => Some(Self::Ping(value.to_vec())),
393            WsWireMessage::Pong(value) => Some(Self::Pong(value.to_vec())),
394            WsWireMessage::Close(_) => Some(Self::Close),
395            WsWireMessage::Frame(_) => None,
396        }
397    }
398
399    fn into_wire_message(self) -> WsWireMessage {
400        match self {
401            Self::Text(value) => WsWireMessage::Text(value),
402            Self::Binary(value) => WsWireMessage::Binary(value),
403            Self::Ping(value) => WsWireMessage::Ping(value),
404            Self::Pong(value) => WsWireMessage::Pong(value),
405            Self::Close => WsWireMessage::Close(None),
406        }
407    }
408}
409
410#[async_trait::async_trait]
411impl EventSource<WebSocketEvent> for WebSocketConnection {
412    async fn next_event(&mut self) -> Option<WebSocketEvent> {
413        match self.recv_event().await {
414            Ok(event) => event,
415            Err(error) => {
416                tracing::warn!(ranvier.ws.error = %error, "websocket source read failed");
417                None
418            }
419        }
420    }
421}
422
423#[async_trait::async_trait]
424impl EventSink<WebSocketEvent> for WebSocketConnection {
425    type Error = WebSocketError;
426
427    async fn send_event(&self, event: WebSocketEvent) -> Result<(), Self::Error> {
428        self.send(event).await
429    }
430}
431
432#[async_trait::async_trait]
433impl EventSink<String> for WebSocketConnection {
434    type Error = WebSocketError;
435
436    async fn send_event(&self, event: String) -> Result<(), Self::Error> {
437        self.send(WebSocketEvent::Text(event)).await
438    }
439}
440
441#[async_trait::async_trait]
442impl EventSink<Vec<u8>> for WebSocketConnection {
443    type Error = WebSocketError;
444
445    async fn send_event(&self, event: Vec<u8>) -> Result<(), Self::Error> {
446        self.send(WebSocketEvent::Binary(event)).await
447    }
448}
449
450impl PathParams {
451    pub fn new(values: HashMap<String, String>) -> Self {
452        Self { values }
453    }
454
455    pub fn get(&self, key: &str) -> Option<&str> {
456        self.values.get(key).map(String::as_str)
457    }
458
459    pub fn as_map(&self) -> &HashMap<String, String> {
460        &self.values
461    }
462
463    pub fn into_inner(self) -> HashMap<String, String> {
464        self.values
465    }
466}
467
468#[derive(Clone, Debug, PartialEq, Eq)]
469enum RouteSegment {
470    Static(String),
471    Param(String),
472    Wildcard(String),
473}
474
475#[derive(Clone, Debug, PartialEq, Eq)]
476struct RoutePattern {
477    raw: String,
478    segments: Vec<RouteSegment>,
479}
480
481impl RoutePattern {
482    fn parse(path: &str) -> Self {
483        let segments = path_segments(path)
484            .into_iter()
485            .map(|segment| {
486                if let Some(name) = segment.strip_prefix(':') {
487                    if !name.is_empty() {
488                        return RouteSegment::Param(name.to_string());
489                    }
490                }
491                if let Some(name) = segment.strip_prefix('*') {
492                    if !name.is_empty() {
493                        return RouteSegment::Wildcard(name.to_string());
494                    }
495                }
496                RouteSegment::Static(segment.to_string())
497            })
498            .collect();
499
500        Self {
501            raw: path.to_string(),
502            segments,
503        }
504    }
505
506    fn match_path(&self, path: &str) -> Option<PathParams> {
507        let mut params = HashMap::new();
508        let path_segments = path_segments(path);
509        let mut pattern_index = 0usize;
510        let mut path_index = 0usize;
511
512        while pattern_index < self.segments.len() {
513            match &self.segments[pattern_index] {
514                RouteSegment::Static(expected) => {
515                    let actual = path_segments.get(path_index)?;
516                    if actual != expected {
517                        return None;
518                    }
519                    pattern_index += 1;
520                    path_index += 1;
521                }
522                RouteSegment::Param(name) => {
523                    let actual = path_segments.get(path_index)?;
524                    params.insert(name.clone(), (*actual).to_string());
525                    pattern_index += 1;
526                    path_index += 1;
527                }
528                RouteSegment::Wildcard(name) => {
529                    let remaining = path_segments[path_index..].join("/");
530                    params.insert(name.clone(), remaining);
531                    pattern_index += 1;
532                    path_index = path_segments.len();
533                    break;
534                }
535            }
536        }
537
538        if pattern_index == self.segments.len() && path_index == path_segments.len() {
539            Some(PathParams::new(params))
540        } else {
541            None
542        }
543    }
544}
545
546#[derive(Clone)]
547struct RouteEntry<R> {
548    method: Method,
549    pattern: RoutePattern,
550    handler: RouteHandler<R>,
551    layers: Arc<Vec<ServiceLayer>>,
552    apply_global_layers: bool,
553}
554
555fn path_segments(path: &str) -> Vec<&str> {
556    if path == "/" {
557        return Vec::new();
558    }
559
560    path.trim_matches('/')
561        .split('/')
562        .filter(|segment| !segment.is_empty())
563        .collect()
564}
565
566fn normalize_route_path(path: String) -> String {
567    if path.is_empty() {
568        return "/".to_string();
569    }
570    if path.starts_with('/') {
571        path
572    } else {
573        format!("/{path}")
574    }
575}
576
577fn find_matching_route<'a, R>(
578    routes: &'a [RouteEntry<R>],
579    method: &Method,
580    path: &str,
581) -> Option<(&'a RouteEntry<R>, PathParams)> {
582    for entry in routes {
583        if entry.method != *method {
584            continue;
585        }
586        if let Some(params) = entry.pattern.match_path(path) {
587            return Some((entry, params));
588        }
589    }
590    None
591}
592
593fn header_contains_token(
594    headers: &http::HeaderMap,
595    name: http::header::HeaderName,
596    token: &str,
597) -> bool {
598    headers
599        .get(name)
600        .and_then(|value| value.to_str().ok())
601        .map(|value| {
602            value
603                .split(',')
604                .any(|part| part.trim().eq_ignore_ascii_case(token))
605        })
606        .unwrap_or(false)
607}
608
609fn websocket_session_from_request<B>(req: &Request<B>) -> WebSocketSessionContext {
610    WebSocketSessionContext {
611        connection_id: uuid::Uuid::new_v4(),
612        path: req.uri().path().to_string(),
613        query: req.uri().query().map(str::to_string),
614    }
615}
616
617fn websocket_accept_key(client_key: &str) -> String {
618    let mut hasher = Sha1::new();
619    hasher.update(client_key.as_bytes());
620    hasher.update(WS_GUID.as_bytes());
621    let digest = hasher.finalize();
622    base64::engine::general_purpose::STANDARD.encode(digest)
623}
624
625fn websocket_bad_request(message: &'static str) -> HttpResponse {
626    Response::builder()
627        .status(StatusCode::BAD_REQUEST)
628        .body(
629            Full::new(Bytes::from(message))
630                .map_err(|never| match never {})
631                .boxed(),
632        )
633        .unwrap_or_else(|_| {
634            Response::new(
635                Full::new(Bytes::new())
636                    .map_err(|never| match never {})
637                    .boxed(),
638            )
639        })
640}
641
642fn websocket_upgrade_response<B>(
643    req: &mut Request<B>,
644) -> Result<(HttpResponse, hyper::upgrade::OnUpgrade), HttpResponse> {
645    if req.method() != Method::GET {
646        return Err(websocket_bad_request(
647            "WebSocket upgrade requires GET method",
648        ));
649    }
650
651    if !header_contains_token(req.headers(), http::header::CONNECTION, "upgrade") {
652        return Err(websocket_bad_request(
653            "Missing Connection: upgrade header for WebSocket",
654        ));
655    }
656
657    if !header_contains_token(req.headers(), http::header::UPGRADE, WS_UPGRADE_TOKEN) {
658        return Err(websocket_bad_request("Missing Upgrade: websocket header"));
659    }
660
661    if let Some(version) = req.headers().get("sec-websocket-version") {
662        if version != "13" {
663            return Err(websocket_bad_request(
664                "Unsupported Sec-WebSocket-Version (expected 13)",
665            ));
666        }
667    }
668
669    let Some(client_key) = req
670        .headers()
671        .get("sec-websocket-key")
672        .and_then(|value| value.to_str().ok())
673    else {
674        return Err(websocket_bad_request(
675            "Missing Sec-WebSocket-Key header for WebSocket",
676        ));
677    };
678
679    let accept_key = websocket_accept_key(client_key);
680    let on_upgrade = hyper::upgrade::on(req);
681    let response = Response::builder()
682        .status(StatusCode::SWITCHING_PROTOCOLS)
683        .header(http::header::UPGRADE, WS_UPGRADE_TOKEN)
684        .header(http::header::CONNECTION, "Upgrade")
685        .header("sec-websocket-accept", accept_key)
686        .body(
687            Full::new(Bytes::new())
688                .map_err(|never| match never {})
689                .boxed(),
690        )
691        .unwrap_or_else(|_| {
692            Response::new(
693                Full::new(Bytes::new())
694                    .map_err(|never| match never {})
695                    .boxed(),
696            )
697        });
698
699    Ok((response, on_upgrade))
700}
701
702/// HTTP Ingress Circuit Builder.
703///
704/// Wires HTTP inputs to Ranvier Circuits. This is NOT a web server—it's a circuit wiring tool.
705///
706/// **Ingress is part of Schematic** (separate layer: Ingress → Circuit → Egress)
707pub struct HttpIngress<R = ()> {
708    /// Bind address (e.g., "127.0.0.1:3000")
709    addr: Option<String>,
710    /// Routes: (Method, RoutePattern, Handler)
711    routes: Vec<RouteEntry<R>>,
712    /// Fallback circuit for unmatched routes
713    fallback: Option<RouteHandler<R>>,
714    /// Global middleware layers (LIFO execution on request path).
715    layers: Vec<ServiceLayer>,
716    /// Lifecycle callback invoked after listener bind succeeds.
717    on_start: Option<LifecycleHook>,
718    /// Lifecycle callback invoked when graceful shutdown finishes.
719    on_shutdown: Option<LifecycleHook>,
720    /// Maximum time to wait for in-flight requests to drain.
721    graceful_shutdown_timeout: Duration,
722    /// Request-context to Bus injection hooks executed before each circuit run.
723    bus_injectors: Vec<BusInjector>,
724    /// Static asset serving configuration (serve_dir + SPA fallback).
725    static_assets: StaticAssetsConfig,
726    /// Built-in health endpoint configuration.
727    health: HealthConfig<R>,
728    #[cfg(feature = "http3")]
729    http3_config: Option<crate::http3::Http3Config>,
730    #[cfg(feature = "http3")]
731    alt_svc_h3_port: Option<u16>,
732    /// TLS configuration (feature-gated: `tls`)
733    #[cfg(feature = "tls")]
734    tls_config: Option<TlsAcceptorConfig>,
735    /// Features: enable active intervention system routes
736    active_intervention: bool,
737    /// Optional policy registry for hot-reloads
738    policy_registry: Option<ranvier_core::policy::PolicyRegistry>,
739    _phantom: std::marker::PhantomData<R>,
740}
741
742impl<R> HttpIngress<R>
743where
744    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
745{
746    /// Create a new empty HttpIngress builder.
747    pub fn new() -> Self {
748        Self {
749            addr: None,
750            routes: Vec::new(),
751            fallback: None,
752            layers: Vec::new(),
753            on_start: None,
754            on_shutdown: None,
755            graceful_shutdown_timeout: Duration::from_secs(30),
756            bus_injectors: Vec::new(),
757            static_assets: StaticAssetsConfig::default(),
758            health: HealthConfig::default(),
759            #[cfg(feature = "tls")]
760            tls_config: None,
761            #[cfg(feature = "http3")]
762            http3_config: None,
763            #[cfg(feature = "http3")]
764            alt_svc_h3_port: None,
765            active_intervention: false,
766            policy_registry: None,
767            _phantom: std::marker::PhantomData,
768        }
769    }
770
771    /// Set the bind address for the server.
772    pub fn bind(mut self, addr: impl Into<String>) -> Self {
773        self.addr = Some(addr.into());
774        self
775    }
776
777    /// Enable active intervention endpoints (`/_system/intervene/*`).
778    /// These endpoints allow external tooling (like Ranvier Studio) to pause,
779    /// inspect, and forcefully resume or re-route in-flight workflow instances.
780    pub fn active_intervention(mut self) -> Self {
781        self.active_intervention = true;
782        self
783    }
784
785    /// Attach a policy registry for hot-reloads.
786    pub fn policy_registry(mut self, registry: ranvier_core::policy::PolicyRegistry) -> Self {
787        self.policy_registry = Some(registry);
788        self
789    }
790
791    /// Register a lifecycle callback invoked when the server starts listening.
792    pub fn on_start<F>(mut self, callback: F) -> Self
793    where
794        F: Fn() + Send + Sync + 'static,
795    {
796        self.on_start = Some(Arc::new(callback));
797        self
798    }
799
800    /// Register a lifecycle callback invoked after graceful shutdown completes.
801    pub fn on_shutdown<F>(mut self, callback: F) -> Self
802    where
803        F: Fn() + Send + Sync + 'static,
804    {
805        self.on_shutdown = Some(Arc::new(callback));
806        self
807    }
808
809    /// Configure graceful shutdown timeout for in-flight request draining.
810    pub fn graceful_shutdown(mut self, timeout: Duration) -> Self {
811        self.graceful_shutdown_timeout = timeout;
812        self
813    }
814
815    /// Apply a `RanvierConfig` to this builder.
816    ///
817    /// Reads server settings (bind address, shutdown timeout) from the config
818    /// and initializes telemetry if an OTLP endpoint is configured.
819    /// Logging should be initialized separately via `config.init_logging()`.
820    pub fn config(mut self, config: &ranvier_core::config::RanvierConfig) -> Self {
821        self.addr = Some(config.bind_addr());
822        self.graceful_shutdown_timeout = config.shutdown_timeout();
823        config.init_telemetry();
824        self
825    }
826
827    /// Enable TLS with certificate and key PEM files (requires `tls` feature).
828    #[cfg(feature = "tls")]
829    pub fn tls(mut self, cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
830        self.tls_config = Some(TlsAcceptorConfig {
831            cert_path: cert_path.into(),
832            key_path: key_path.into(),
833        });
834        self
835    }
836
837    /// Add built-in timeout middleware that returns `408 Request Timeout`
838    /// when the inner service call exceeds `timeout`.
839    pub fn timeout_layer(mut self, timeout: Duration) -> Self {
840        self.layers.push(timeout_middleware(timeout));
841        self
842    }
843
844    /// Add built-in request-id middleware.
845    ///
846    /// Ensures `x-request-id` exists on request and response headers.
847    pub fn request_id_layer(mut self) -> Self {
848        self.layers.push(request_id_middleware());
849        self
850    }
851
852    /// Register a request-context injector executed before each circuit run.
853    ///
854    /// Use this to bridge adapter-layer context (request extensions/headers)
855    /// into explicit Bus resources consumed by Transitions.
856    pub fn bus_injector<F>(mut self, injector: F) -> Self
857    where
858        F: Fn(&http::request::Parts, &mut Bus) + Send + Sync + 'static,
859    {
860        self.bus_injectors.push(Arc::new(injector));
861        self
862    }
863
864    /// Configure HTTP/3 QUIC support.
865    #[cfg(feature = "http3")]
866    pub fn enable_http3(mut self, config: crate::http3::Http3Config) -> Self {
867        self.http3_config = Some(config);
868        self
869    }
870
871    /// Automatically injects the `Alt-Svc` header into responses to signal HTTP/3 availability.
872    #[cfg(feature = "http3")]
873    pub fn alt_svc_h3(mut self, port: u16) -> Self {
874        self.alt_svc_h3_port = Some(port);
875        self
876    }
877
878    /// Export route metadata snapshot for external tooling.
879    pub fn route_descriptors(&self) -> Vec<HttpRouteDescriptor> {
880        let mut descriptors = self
881            .routes
882            .iter()
883            .map(|entry| HttpRouteDescriptor::new(entry.method.clone(), entry.pattern.raw.clone()))
884            .collect::<Vec<_>>();
885
886        if let Some(path) = &self.health.health_path {
887            descriptors.push(HttpRouteDescriptor::new(Method::GET, path.clone()));
888        }
889        if let Some(path) = &self.health.readiness_path {
890            descriptors.push(HttpRouteDescriptor::new(Method::GET, path.clone()));
891        }
892        if let Some(path) = &self.health.liveness_path {
893            descriptors.push(HttpRouteDescriptor::new(Method::GET, path.clone()));
894        }
895
896        descriptors
897    }
898
899    /// Mount a static directory under a path prefix.
900    ///
901    /// Example: `.serve_dir("/static", "./public")`.
902    pub fn serve_dir(
903        mut self,
904        route_prefix: impl Into<String>,
905        directory: impl Into<String>,
906    ) -> Self {
907        self.static_assets.mounts.push(StaticMount {
908            route_prefix: normalize_route_path(route_prefix.into()),
909            directory: directory.into(),
910        });
911        if self.static_assets.cache_control.is_none() {
912            self.static_assets.cache_control = Some("public, max-age=3600".to_string());
913        }
914        self
915    }
916
917    /// Configure SPA fallback file for unmatched GET/HEAD routes.
918    ///
919    /// Example: `.spa_fallback("./public/index.html")`.
920    pub fn spa_fallback(mut self, file_path: impl Into<String>) -> Self {
921        self.static_assets.spa_fallback = Some(file_path.into());
922        self
923    }
924
925    /// Override default Cache-Control for static responses.
926    pub fn static_cache_control(mut self, cache_control: impl Into<String>) -> Self {
927        self.static_assets.cache_control = Some(cache_control.into());
928        self
929    }
930
931    /// Enable gzip response compression for static assets.
932    pub fn compression_layer(mut self) -> Self {
933        self.static_assets.enable_compression = true;
934        self
935    }
936
937    /// Register a WebSocket upgrade endpoint and session handler.
938    ///
939    /// The handler receives:
940    /// 1) a `WebSocketConnection` implementing `EventSource`/`EventSink`,
941    /// 2) shared resources (`Arc<R>`),
942    /// 3) a connection-scoped `Bus` with request injectors + `WebSocketSessionContext`.
943    pub fn ws<H, Fut>(mut self, path: impl Into<String>, handler: H) -> Self
944    where
945        H: Fn(WebSocketConnection, Arc<R>, Bus) -> Fut + Send + Sync + 'static,
946        Fut: Future<Output = ()> + Send + 'static,
947    {
948        let path_str: String = path.into();
949        let ws_handler: WsSessionHandler<R> = Arc::new(move |connection, resources, bus| {
950            Box::pin(handler(connection, resources, bus))
951        });
952        let bus_injectors = Arc::new(self.bus_injectors.clone());
953        let path_for_pattern = path_str.clone();
954        let path_for_handler = path_str;
955
956        let route_handler: RouteHandler<R> =
957            Arc::new(move |parts: http::request::Parts, res: &R| {
958                let ws_handler = ws_handler.clone();
959                let bus_injectors = bus_injectors.clone();
960                let resources = Arc::new(res.clone());
961                let path = path_for_handler.clone();
962
963                Box::pin(async move {
964                    let request_id = uuid::Uuid::new_v4().to_string();
965                    let span = tracing::info_span!(
966                        "WebSocketUpgrade",
967                        ranvier.ws.path = %path,
968                        ranvier.ws.request_id = %request_id
969                    );
970
971                    async move {
972                        let mut bus = Bus::new();
973                        for injector in bus_injectors.iter() {
974                            injector(&parts, &mut bus);
975                        }
976
977                        // Reconstruct a dummy Request for WebSocket extraction
978                        let mut req = Request::from_parts(parts, ());
979                        let session = websocket_session_from_request(&req);
980                        bus.insert(session.clone());
981
982                        let (response, on_upgrade) = match websocket_upgrade_response(&mut req) {
983                            Ok(result) => result,
984                            Err(error_response) => return error_response,
985                        };
986
987                        tokio::spawn(async move {
988                            match on_upgrade.await {
989                                Ok(upgraded) => {
990                                    let stream = WebSocketStream::from_raw_socket(
991                                        TokioIo::new(upgraded),
992                                        tokio_tungstenite::tungstenite::protocol::Role::Server,
993                                        None,
994                                    )
995                                    .await;
996                                    let connection = WebSocketConnection::new(stream, session);
997                                    ws_handler(connection, resources, bus).await;
998                                }
999                                Err(error) => {
1000                                    tracing::warn!(
1001                                        ranvier.ws.path = %path,
1002                                        ranvier.ws.error = %error,
1003                                        "websocket upgrade failed"
1004                                    );
1005                                }
1006                            }
1007                        });
1008
1009                        response
1010                    }
1011                    .instrument(span)
1012                    .await
1013                }) as Pin<Box<dyn Future<Output = HttpResponse> + Send>>
1014            });
1015
1016        self.routes.push(RouteEntry {
1017            method: Method::GET,
1018            pattern: RoutePattern::parse(&path_for_pattern),
1019            handler: route_handler,
1020            layers: Arc::new(Vec::new()),
1021            apply_global_layers: true,
1022        });
1023
1024        self
1025    }
1026
1027    /// Enable built-in health endpoint at the given path.
1028    ///
1029    /// The endpoint returns JSON with status and check results.
1030    /// If no checks are registered, status is always `ok`.
1031    pub fn health_endpoint(mut self, path: impl Into<String>) -> Self {
1032        self.health.health_path = Some(normalize_route_path(path.into()));
1033        self
1034    }
1035
1036    /// Register an async health check used by `/health` and `/ready` probes.
1037    ///
1038    /// `Err` values are converted to strings and surfaced in the JSON response.
1039    pub fn health_check<F, Fut, Err>(mut self, name: impl Into<String>, check: F) -> Self
1040    where
1041        F: Fn(Arc<R>) -> Fut + Send + Sync + 'static,
1042        Fut: Future<Output = Result<(), Err>> + Send + 'static,
1043        Err: ToString + Send + 'static,
1044    {
1045        if self.health.health_path.is_none() {
1046            self.health.health_path = Some("/health".to_string());
1047        }
1048
1049        let check_fn: HealthCheckFn<R> = Arc::new(move |resources: Arc<R>| {
1050            let fut = check(resources);
1051            Box::pin(async move { fut.await.map_err(|error| error.to_string()) })
1052        });
1053
1054        self.health.checks.push(NamedHealthCheck {
1055            name: name.into(),
1056            check: check_fn,
1057        });
1058        self
1059    }
1060
1061    /// Enable readiness/liveness probe separation with explicit paths.
1062    pub fn readiness_liveness(
1063        mut self,
1064        readiness_path: impl Into<String>,
1065        liveness_path: impl Into<String>,
1066    ) -> Self {
1067        self.health.readiness_path = Some(normalize_route_path(readiness_path.into()));
1068        self.health.liveness_path = Some(normalize_route_path(liveness_path.into()));
1069        self
1070    }
1071
1072    /// Enable readiness/liveness probes at `/ready` and `/live`.
1073    pub fn readiness_liveness_default(self) -> Self {
1074        self.readiness_liveness("/ready", "/live")
1075    }
1076
1077    /// Register a route with GET method.
1078    pub fn route<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1079    where
1080        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1081        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1082    {
1083        self.route_method(Method::GET, path, circuit)
1084    }
1085    /// Register a route with a specific HTTP method.
1086    ///
1087    /// # Example
1088    ///
1089    /// ```rust,ignore
1090    /// Ranvier::http()
1091    ///     .route_method(Method::POST, "/users", create_user_circuit)
1092    /// ```
1093    pub fn route_method<Out, E>(
1094        self,
1095        method: Method,
1096        path: impl Into<String>,
1097        circuit: Axon<(), Out, E, R>,
1098    ) -> Self
1099    where
1100        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1101        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1102    {
1103        self.route_method_with_error(method, path, circuit, |error| {
1104            (
1105                StatusCode::INTERNAL_SERVER_ERROR,
1106                format!("Error: {:?}", error),
1107            )
1108                .into_response()
1109        })
1110    }
1111
1112    pub fn route_method_with_error<Out, E, H>(
1113        self,
1114        method: Method,
1115        path: impl Into<String>,
1116        circuit: Axon<(), Out, E, R>,
1117        error_handler: H,
1118    ) -> Self
1119    where
1120        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1121        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1122        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1123    {
1124        self.route_method_with_error_and_layers(
1125            method,
1126            path,
1127            circuit,
1128            error_handler,
1129            Arc::new(Vec::new()),
1130            true,
1131        )
1132    }
1133
1134
1135
1136    fn route_method_with_error_and_layers<Out, E, H>(
1137        mut self,
1138        method: Method,
1139        path: impl Into<String>,
1140        circuit: Axon<(), Out, E, R>,
1141        error_handler: H,
1142        route_layers: Arc<Vec<ServiceLayer>>,
1143        apply_global_layers: bool,
1144    ) -> Self
1145    where
1146        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1147        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1148        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1149    {
1150        let path_str: String = path.into();
1151        let circuit = Arc::new(circuit);
1152        let error_handler = Arc::new(error_handler);
1153        let route_bus_injectors = Arc::new(self.bus_injectors.clone());
1154        let path_for_pattern = path_str.clone();
1155        let path_for_handler = path_str;
1156        let method_for_pattern = method.clone();
1157        let method_for_handler = method;
1158
1159        let handler: RouteHandler<R> = Arc::new(move |parts: http::request::Parts, res: &R| {
1160            let circuit = circuit.clone();
1161            let error_handler = error_handler.clone();
1162            let route_bus_injectors = route_bus_injectors.clone();
1163            let res = res.clone();
1164            let path = path_for_handler.clone();
1165            let method = method_for_handler.clone();
1166
1167            Box::pin(async move {
1168                let request_id = uuid::Uuid::new_v4().to_string();
1169                let span = tracing::info_span!(
1170                    "HTTPRequest",
1171                    ranvier.http.method = %method,
1172                    ranvier.http.path = %path,
1173                    ranvier.http.request_id = %request_id
1174                );
1175
1176                async move {
1177                    let mut bus = Bus::new();
1178                    for injector in route_bus_injectors.iter() {
1179                        injector(&parts, &mut bus);
1180                    }
1181                    let result = circuit.execute((), &res, &mut bus).await;
1182                    outcome_to_response_with_error(result, |error| error_handler(error))
1183                }
1184                .instrument(span)
1185                .await
1186            }) as Pin<Box<dyn Future<Output = HttpResponse> + Send>>
1187        });
1188
1189        self.routes.push(RouteEntry {
1190            method: method_for_pattern,
1191            pattern: RoutePattern::parse(&path_for_pattern),
1192            handler,
1193            layers: route_layers,
1194            apply_global_layers,
1195        });
1196        self
1197    }
1198
1199    pub fn get<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1200    where
1201        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1202        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1203    {
1204        self.route_method(Method::GET, path, circuit)
1205    }
1206
1207    pub fn get_with_error<Out, E, H>(
1208        self,
1209        path: impl Into<String>,
1210        circuit: Axon<(), Out, E, R>,
1211        error_handler: H,
1212    ) -> Self
1213    where
1214        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1215        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1216        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1217    {
1218        self.route_method_with_error(Method::GET, path, circuit, error_handler)
1219    }
1220
1221    pub fn post<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1222    where
1223        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1224        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1225    {
1226        self.route_method(Method::POST, path, circuit)
1227    }
1228
1229    pub fn put<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1230    where
1231        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1232        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1233    {
1234        self.route_method(Method::PUT, path, circuit)
1235    }
1236
1237    pub fn delete<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1238    where
1239        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1240        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1241    {
1242        self.route_method(Method::DELETE, path, circuit)
1243    }
1244
1245    pub fn patch<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1246    where
1247        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1248        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1249    {
1250        self.route_method(Method::PATCH, path, circuit)
1251    }
1252
1253    pub fn post_with_error<Out, E, H>(
1254        self,
1255        path: impl Into<String>,
1256        circuit: Axon<(), Out, E, R>,
1257        error_handler: H,
1258    ) -> Self
1259    where
1260        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1261        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1262        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1263    {
1264        self.route_method_with_error(Method::POST, path, circuit, error_handler)
1265    }
1266
1267    pub fn put_with_error<Out, E, H>(
1268        self,
1269        path: impl Into<String>,
1270        circuit: Axon<(), Out, E, R>,
1271        error_handler: H,
1272    ) -> Self
1273    where
1274        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1275        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1276        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1277    {
1278        self.route_method_with_error(Method::PUT, path, circuit, error_handler)
1279    }
1280
1281    pub fn delete_with_error<Out, E, H>(
1282        self,
1283        path: impl Into<String>,
1284        circuit: Axon<(), Out, E, R>,
1285        error_handler: H,
1286    ) -> Self
1287    where
1288        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1289        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1290        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1291    {
1292        self.route_method_with_error(Method::DELETE, path, circuit, error_handler)
1293    }
1294
1295    pub fn patch_with_error<Out, E, H>(
1296        self,
1297        path: impl Into<String>,
1298        circuit: Axon<(), Out, E, R>,
1299        error_handler: H,
1300    ) -> Self
1301    where
1302        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1303        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1304        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1305    {
1306        self.route_method_with_error(Method::PATCH, path, circuit, error_handler)
1307    }
1308
1309    /// Set a fallback circuit for unmatched routes.
1310    ///
1311    /// # Example
1312    ///
1313    /// ```rust,ignore
1314    /// let not_found = Axon::new("NotFound").then(|_| async { "404 Not Found" });
1315    /// Ranvier::http()
1316    ///     .route("/", home)
1317    ///     .fallback(not_found)
1318    /// ```
1319    pub fn fallback<Out, E>(mut self, circuit: Axon<(), Out, E, R>) -> Self
1320    where
1321        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1322        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1323    {
1324        let circuit = Arc::new(circuit);
1325        let fallback_bus_injectors = Arc::new(self.bus_injectors.clone());
1326
1327        let handler: RouteHandler<R> = Arc::new(move |parts: http::request::Parts, res: &R| {
1328            let circuit = circuit.clone();
1329            let fallback_bus_injectors = fallback_bus_injectors.clone();
1330            let res = res.clone();
1331            Box::pin(async move {
1332                let request_id = uuid::Uuid::new_v4().to_string();
1333                let span = tracing::info_span!(
1334                    "HTTPRequest",
1335                    ranvier.http.method = "FALLBACK",
1336                    ranvier.http.request_id = %request_id
1337                );
1338
1339                async move {
1340                    let mut bus = Bus::new();
1341                    for injector in fallback_bus_injectors.iter() {
1342                        injector(&parts, &mut bus);
1343                    }
1344                    let result: ranvier_core::Outcome<Out, E> =
1345                        circuit.execute((), &res, &mut bus).await;
1346
1347                    match result {
1348                        Outcome::Next(output) => {
1349                            let mut response = output.into_response();
1350                            *response.status_mut() = StatusCode::NOT_FOUND;
1351                            response
1352                        }
1353                        _ => Response::builder()
1354                            .status(StatusCode::NOT_FOUND)
1355                            .body(
1356                                Full::new(Bytes::from("Not Found"))
1357                                    .map_err(|never| match never {})
1358                                    .boxed(),
1359                            )
1360                            .expect("valid HTTP response construction"),
1361                    }
1362                }
1363                .instrument(span)
1364                .await
1365            }) as Pin<Box<dyn Future<Output = HttpResponse> + Send>>
1366        });
1367
1368        self.fallback = Some(handler);
1369        self
1370    }
1371
1372    /// Run the HTTP server with required resources.
1373    pub async fn run(self, resources: R) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1374        self.run_with_shutdown_signal(resources, shutdown_signal())
1375            .await
1376    }
1377
1378    async fn run_with_shutdown_signal<S>(
1379        self,
1380        resources: R,
1381        shutdown_signal: S,
1382    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
1383    where
1384        S: Future<Output = ()> + Send,
1385    {
1386        let addr_str = self.addr.as_deref().unwrap_or("127.0.0.1:3000");
1387        let addr: SocketAddr = addr_str.parse()?;
1388
1389        let mut raw_routes = self.routes;
1390        if self.active_intervention {
1391            let handler: RouteHandler<R> = Arc::new(|_parts, _res| {
1392                Box::pin(async move {
1393                    Response::builder()
1394                        .status(StatusCode::OK)
1395                        .body(
1396                            Full::new(Bytes::from("Intervention accepted"))
1397                                .map_err(|never| match never {} as Infallible)
1398                                .boxed(),
1399                        )
1400                        .expect("valid HTTP response construction")
1401                }) as Pin<Box<dyn Future<Output = HttpResponse> + Send>>
1402            });
1403
1404            raw_routes.push(RouteEntry {
1405                method: Method::POST,
1406                pattern: RoutePattern::parse("/_system/intervene/force_resume"),
1407                handler,
1408                layers: Arc::new(Vec::new()),
1409                apply_global_layers: true,
1410            });
1411        }
1412
1413        if let Some(registry) = self.policy_registry.clone() {
1414            let handler: RouteHandler<R> = Arc::new(move |_parts, _res| {
1415                let _registry = registry.clone();
1416                Box::pin(async move {
1417                    // This is a simplified reload endpoint.
1418                    // In a real implementation, it would parse JSON from the body.
1419                    // For now, we provide the infrastructure.
1420                    Response::builder()
1421                        .status(StatusCode::OK)
1422                        .body(
1423                            Full::new(Bytes::from("Policy registry active"))
1424                                .map_err(|never| match never {} as Infallible)
1425                                .boxed(),
1426                        )
1427                        .expect("valid HTTP response construction")
1428                }) as Pin<Box<dyn Future<Output = HttpResponse> + Send>>
1429            });
1430
1431            raw_routes.push(RouteEntry {
1432                method: Method::POST,
1433                pattern: RoutePattern::parse("/_system/policy/reload"),
1434                handler,
1435                layers: Arc::new(Vec::new()),
1436                apply_global_layers: true,
1437            });
1438        }
1439        let routes = Arc::new(raw_routes);
1440        let fallback = self.fallback;
1441        let layers = Arc::new(self.layers);
1442        let health = Arc::new(self.health);
1443        let static_assets = Arc::new(self.static_assets);
1444        let on_start = self.on_start;
1445        let on_shutdown = self.on_shutdown;
1446        let graceful_shutdown_timeout = self.graceful_shutdown_timeout;
1447        let resources = Arc::new(resources);
1448
1449        let listener = TcpListener::bind(addr).await?;
1450
1451        // Build optional TLS acceptor
1452        #[cfg(feature = "tls")]
1453        let tls_acceptor = if let Some(ref tls_cfg) = self.tls_config {
1454            let acceptor = build_tls_acceptor(&tls_cfg.cert_path, &tls_cfg.key_path)?;
1455            tracing::info!("Ranvier HTTP Ingress listening on https://{}", addr);
1456            Some(acceptor)
1457        } else {
1458            tracing::info!("Ranvier HTTP Ingress listening on http://{}", addr);
1459            None
1460        };
1461        #[cfg(not(feature = "tls"))]
1462        tracing::info!("Ranvier HTTP Ingress listening on http://{}", addr);
1463
1464        if let Some(callback) = on_start.as_ref() {
1465            callback();
1466        }
1467
1468        tokio::pin!(shutdown_signal);
1469        let mut connections = tokio::task::JoinSet::new();
1470
1471        loop {
1472            tokio::select! {
1473                _ = &mut shutdown_signal => {
1474                    tracing::info!("Shutdown signal received. Draining in-flight connections.");
1475                    break;
1476                }
1477                accept_result = listener.accept() => {
1478                    let (stream, _) = accept_result?;
1479
1480                    let routes = routes.clone();
1481                    let fallback = fallback.clone();
1482                    let resources = resources.clone();
1483                    let layers = layers.clone();
1484                    let health = health.clone();
1485                    let static_assets = static_assets.clone();
1486                    #[cfg(feature = "http3")]
1487                    let alt_svc_h3_port = self.alt_svc_h3_port;
1488
1489                    #[cfg(feature = "tls")]
1490                    let tls_acceptor = tls_acceptor.clone();
1491
1492                    connections.spawn(async move {
1493                        let service = build_http_service(
1494                            routes,
1495                            fallback,
1496                            resources,
1497                            layers,
1498                            health,
1499                            static_assets,
1500                            #[cfg(feature = "http3")] alt_svc_h3_port,
1501                        );
1502
1503                        #[cfg(feature = "tls")]
1504                        if let Some(acceptor) = tls_acceptor {
1505                            match acceptor.accept(stream).await {
1506                                Ok(tls_stream) => {
1507                                    let io = TokioIo::new(tls_stream);
1508                                    if let Err(err) = http1::Builder::new()
1509                                        .serve_connection(io, service)
1510                                        .with_upgrades()
1511                                        .await
1512                                    {
1513                                        tracing::error!("Error serving TLS connection: {:?}", err);
1514                                    }
1515                                }
1516                                Err(err) => {
1517                                    tracing::warn!("TLS handshake failed: {:?}", err);
1518                                }
1519                            }
1520                            return;
1521                        }
1522
1523                        let io = TokioIo::new(stream);
1524                        if let Err(err) = http1::Builder::new()
1525                            .serve_connection(io, service)
1526                            .with_upgrades()
1527                            .await
1528                        {
1529                            tracing::error!("Error serving connection: {:?}", err);
1530                        }
1531                    });
1532                }
1533                Some(join_result) = connections.join_next(), if !connections.is_empty() => {
1534                    if let Err(err) = join_result {
1535                        tracing::warn!("Connection task join error: {:?}", err);
1536                    }
1537                }
1538            }
1539        }
1540
1541        let _timed_out = drain_connections(&mut connections, graceful_shutdown_timeout).await;
1542
1543        drop(resources);
1544        if let Some(callback) = on_shutdown.as_ref() {
1545            callback();
1546        }
1547
1548        Ok(())
1549    }
1550
1551    /// Convert to a raw Hyper Service for integration with existing infrastructure.
1552    ///
1553    /// This is the "escape hatch" per Discussion 193:
1554    /// > "Raw API는 Flat API의 탈출구다."
1555    ///
1556    /// # Example
1557    ///
1558    /// ```rust,ignore
1559    /// let ingress = Ranvier::http()
1560    ///     .bind(":3000")
1561    ///     .route("/", circuit);
1562    ///
1563    /// let raw_service = ingress.into_raw_service();
1564    /// // Use raw_service with existing Hyper infrastructure
1565    /// ```
1566    pub fn into_raw_service(self, resources: R) -> RawIngressService<R> {
1567        let routes = Arc::new(self.routes);
1568        let fallback = self.fallback;
1569        let layers = Arc::new(self.layers);
1570        let health = Arc::new(self.health);
1571        let static_assets = Arc::new(self.static_assets);
1572        let resources = Arc::new(resources);
1573
1574        RawIngressService {
1575            routes,
1576            fallback,
1577            layers,
1578            health,
1579            static_assets,
1580            resources,
1581        }
1582    }
1583}
1584
1585fn build_http_service<R>(
1586    routes: Arc<Vec<RouteEntry<R>>>,
1587    fallback: Option<RouteHandler<R>>,
1588    resources: Arc<R>,
1589    layers: Arc<Vec<ServiceLayer>>,
1590    health: Arc<HealthConfig<R>>,
1591    static_assets: Arc<StaticAssetsConfig>,
1592    #[cfg(feature = "http3")] alt_svc_port: Option<u16>,
1593) -> BoxHttpService
1594where
1595    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1596{
1597    BoxService::new(move |req: Request<Incoming>| {
1598        let routes = routes.clone();
1599        let fallback = fallback.clone();
1600        let resources = resources.clone();
1601        let layers = layers.clone();
1602        let health = health.clone();
1603        let static_assets = static_assets.clone();
1604
1605        async move {
1606            let mut req = req;
1607            let method = req.method().clone();
1608            let path = req.uri().path().to_string();
1609
1610            if let Some(response) =
1611                maybe_handle_health_request(&method, &path, &health, resources.clone()).await
1612            {
1613                return Ok::<_, Infallible>(response.into_response());
1614            }
1615
1616            if let Some((entry, params)) = find_matching_route(routes.as_slice(), &method, &path) {
1617                req.extensions_mut().insert(params);
1618                let effective_layers = if entry.apply_global_layers {
1619                    merge_layers(&layers, &entry.layers)
1620                } else {
1621                    entry.layers.clone()
1622                };
1623
1624                if effective_layers.is_empty() {
1625                    let (parts, _) = req.into_parts();
1626                    #[allow(unused_mut)]
1627                    let mut res = (entry.handler)(parts, &resources).await;
1628                    #[cfg(feature = "http3")]
1629                    if let Some(port) = alt_svc_port {
1630                        if let Ok(val) =
1631                            http::HeaderValue::from_str(&format!("h3=\":{}\"; ma=86400", port))
1632                        {
1633                            res.headers_mut().insert(http::header::ALT_SVC, val);
1634                        }
1635                    }
1636                    Ok::<_, Infallible>(res)
1637                } else {
1638                    let route_service = build_route_service(
1639                        entry.handler.clone(),
1640                        resources.clone(),
1641                        effective_layers,
1642                    );
1643                    #[allow(unused_mut)]
1644                    let mut res = route_service.call(req).await;
1645                    #[cfg(feature = "http3")]
1646                    #[allow(irrefutable_let_patterns)]
1647                    if let Ok(ref mut r) = res {
1648                        if let Some(port) = alt_svc_port {
1649                            if let Ok(val) =
1650                                http::HeaderValue::from_str(&format!("h3=\":{}\"; ma=86400", port))
1651                            {
1652                                r.headers_mut().insert(http::header::ALT_SVC, val);
1653                            }
1654                        }
1655                    }
1656                    res
1657                }
1658            } else {
1659                let req =
1660                    match maybe_handle_static_request(req, &method, &path, static_assets.as_ref())
1661                        .await
1662                    {
1663                        Ok(req) => req,
1664                        Err(response) => return Ok(response),
1665                    };
1666
1667                #[allow(unused_mut)]
1668                let mut fallback_res = if let Some(ref fb) = fallback {
1669                    if layers.is_empty() {
1670                        let (parts, _) = req.into_parts();
1671                        Ok(fb(parts, &resources).await)
1672                    } else {
1673                        let fallback_service =
1674                            build_route_service(fb.clone(), resources.clone(), layers.clone());
1675                        fallback_service.call(req).await
1676                    }
1677                } else {
1678                    Ok(Response::builder()
1679                        .status(StatusCode::NOT_FOUND)
1680                        .body(
1681                            Full::new(Bytes::from("Not Found"))
1682                                .map_err(|never| match never {})
1683                                .boxed(),
1684                        )
1685                        .expect("valid HTTP response construction"))
1686                };
1687
1688                #[cfg(feature = "http3")]
1689                if let Ok(r) = fallback_res.as_mut() {
1690                    if let Some(port) = alt_svc_port {
1691                        if let Ok(val) =
1692                            http::HeaderValue::from_str(&format!("h3=\":{}\"; ma=86400", port))
1693                        {
1694                            r.headers_mut().insert(http::header::ALT_SVC, val);
1695                        }
1696                    }
1697                }
1698
1699                fallback_res
1700            }
1701        }
1702    })
1703}
1704
1705fn build_route_service<R>(
1706    handler: RouteHandler<R>,
1707    resources: Arc<R>,
1708    layers: Arc<Vec<ServiceLayer>>,
1709) -> BoxHttpService
1710where
1711    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1712{
1713    let mut service = BoxService::new(move |req: Request<Incoming>| {
1714        let handler = handler.clone();
1715        let resources = resources.clone();
1716        async move {
1717            let (parts, _) = req.into_parts();
1718            Ok::<_, Infallible>(handler(parts, &resources).await)
1719        }
1720    });
1721
1722    for layer in layers.iter() {
1723        service = layer(service);
1724    }
1725    service
1726}
1727
1728fn merge_layers(
1729    global_layers: &Arc<Vec<ServiceLayer>>,
1730    route_layers: &Arc<Vec<ServiceLayer>>,
1731) -> Arc<Vec<ServiceLayer>> {
1732    if global_layers.is_empty() {
1733        return route_layers.clone();
1734    }
1735    if route_layers.is_empty() {
1736        return global_layers.clone();
1737    }
1738
1739    let mut combined = Vec::with_capacity(global_layers.len() + route_layers.len());
1740    combined.extend(global_layers.iter().cloned());
1741    combined.extend(route_layers.iter().cloned());
1742    Arc::new(combined)
1743}
1744
1745async fn maybe_handle_health_request<R>(
1746    method: &Method,
1747    path: &str,
1748    health: &HealthConfig<R>,
1749    resources: Arc<R>,
1750) -> Option<HttpResponse>
1751where
1752    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1753{
1754    if method != Method::GET {
1755        return None;
1756    }
1757
1758    if let Some(liveness_path) = health.liveness_path.as_ref() {
1759        if path == liveness_path {
1760            return Some(health_json_response("liveness", true, Vec::new()));
1761        }
1762    }
1763
1764    if let Some(readiness_path) = health.readiness_path.as_ref() {
1765        if path == readiness_path {
1766            let (healthy, checks) = run_named_health_checks(&health.checks, resources).await;
1767            return Some(health_json_response("readiness", healthy, checks));
1768        }
1769    }
1770
1771    if let Some(health_path) = health.health_path.as_ref() {
1772        if path == health_path {
1773            let (healthy, checks) = run_named_health_checks(&health.checks, resources).await;
1774            return Some(health_json_response("health", healthy, checks));
1775        }
1776    }
1777
1778    None
1779}
1780
1781/// Serve a single file from the filesystem with MIME type detection and ETag.
1782async fn serve_single_file(file_path: &str) -> Result<Response<Full<Bytes>>, std::io::Error> {
1783    let path = std::path::Path::new(file_path);
1784    let content = tokio::fs::read(path).await?;
1785    let mime = guess_mime(file_path);
1786    let mut response = Response::new(Full::new(Bytes::from(content)));
1787    if let Ok(value) = http::HeaderValue::from_str(mime) {
1788        response
1789            .headers_mut()
1790            .insert(http::header::CONTENT_TYPE, value);
1791    }
1792    if let Ok(metadata) = tokio::fs::metadata(path).await {
1793        if let Ok(modified) = metadata.modified() {
1794            if let Ok(duration) = modified.duration_since(std::time::UNIX_EPOCH) {
1795                let etag = format!("\"{}\"", duration.as_secs());
1796                if let Ok(value) = http::HeaderValue::from_str(&etag) {
1797                    response.headers_mut().insert(http::header::ETAG, value);
1798                }
1799            }
1800        }
1801    }
1802    Ok(response)
1803}
1804
1805/// Serve a file from a static directory with path traversal protection.
1806async fn serve_static_file(
1807    directory: &str,
1808    file_subpath: &str,
1809) -> Result<Response<Full<Bytes>>, std::io::Error> {
1810    let subpath = file_subpath.trim_start_matches('/');
1811    if subpath.is_empty() || subpath == "/" {
1812        return Err(std::io::Error::new(
1813            std::io::ErrorKind::NotFound,
1814            "empty path",
1815        ));
1816    }
1817    let full_path = std::path::Path::new(directory).join(subpath);
1818    // Path traversal protection
1819    let canonical = tokio::fs::canonicalize(&full_path).await?;
1820    let dir_canonical = tokio::fs::canonicalize(directory).await?;
1821    if !canonical.starts_with(&dir_canonical) {
1822        return Err(std::io::Error::new(
1823            std::io::ErrorKind::PermissionDenied,
1824            "path traversal detected",
1825        ));
1826    }
1827    let content = tokio::fs::read(&canonical).await?;
1828    let mime = guess_mime(canonical.to_str().unwrap_or(""));
1829    let mut response = Response::new(Full::new(Bytes::from(content)));
1830    if let Ok(value) = http::HeaderValue::from_str(mime) {
1831        response
1832            .headers_mut()
1833            .insert(http::header::CONTENT_TYPE, value);
1834    }
1835    if let Ok(metadata) = tokio::fs::metadata(&canonical).await {
1836        if let Ok(modified) = metadata.modified() {
1837            if let Ok(duration) = modified.duration_since(std::time::UNIX_EPOCH) {
1838                let etag = format!("\"{}\"", duration.as_secs());
1839                if let Ok(value) = http::HeaderValue::from_str(&etag) {
1840                    response.headers_mut().insert(http::header::ETAG, value);
1841                }
1842            }
1843        }
1844    }
1845    Ok(response)
1846}
1847
1848fn guess_mime(path: &str) -> &'static str {
1849    match path.rsplit('.').next().unwrap_or("") {
1850        "html" | "htm" => "text/html; charset=utf-8",
1851        "css" => "text/css; charset=utf-8",
1852        "js" | "mjs" => "application/javascript; charset=utf-8",
1853        "json" => "application/json; charset=utf-8",
1854        "png" => "image/png",
1855        "jpg" | "jpeg" => "image/jpeg",
1856        "gif" => "image/gif",
1857        "svg" => "image/svg+xml",
1858        "ico" => "image/x-icon",
1859        "woff" => "font/woff",
1860        "woff2" => "font/woff2",
1861        "ttf" => "font/ttf",
1862        "txt" => "text/plain; charset=utf-8",
1863        "xml" => "application/xml; charset=utf-8",
1864        "wasm" => "application/wasm",
1865        "pdf" => "application/pdf",
1866        _ => "application/octet-stream",
1867    }
1868}
1869
1870fn apply_cache_control(
1871    mut response: Response<Full<Bytes>>,
1872    cache_control: Option<&str>,
1873) -> Response<Full<Bytes>> {
1874    if response.status() == StatusCode::OK {
1875        if let Some(value) = cache_control {
1876            if !response.headers().contains_key(http::header::CACHE_CONTROL) {
1877                if let Ok(header_value) = http::HeaderValue::from_str(value) {
1878                    response
1879                        .headers_mut()
1880                        .insert(http::header::CACHE_CONTROL, header_value);
1881                }
1882            }
1883        }
1884    }
1885    response
1886}
1887
1888async fn maybe_handle_static_request(
1889    req: Request<Incoming>,
1890    method: &Method,
1891    path: &str,
1892    static_assets: &StaticAssetsConfig,
1893) -> Result<Request<Incoming>, HttpResponse> {
1894    if method != Method::GET && method != Method::HEAD {
1895        return Ok(req);
1896    }
1897
1898    if let Some(mount) = static_assets
1899        .mounts
1900        .iter()
1901        .find(|mount| strip_mount_prefix(path, &mount.route_prefix).is_some())
1902    {
1903        let accept_encoding = req.headers().get(http::header::ACCEPT_ENCODING).cloned();
1904        let Some(stripped_path) = strip_mount_prefix(path, &mount.route_prefix) else {
1905            return Ok(req);
1906        };
1907        let response = match serve_static_file(&mount.directory, &stripped_path).await {
1908            Ok(response) => response,
1909            Err(_) => {
1910                return Err(Response::builder()
1911                    .status(StatusCode::INTERNAL_SERVER_ERROR)
1912                    .body(
1913                        Full::new(Bytes::from("Failed to serve static asset"))
1914                            .map_err(|never| match never {})
1915                            .boxed(),
1916                    )
1917                    .unwrap_or_else(|_| {
1918                        Response::new(
1919                            Full::new(Bytes::new())
1920                                .map_err(|never| match never {})
1921                                .boxed(),
1922                        )
1923                    }));
1924            }
1925        };
1926        let mut response = apply_cache_control(response, static_assets.cache_control.as_deref());
1927        response = maybe_compress_static_response(
1928            response,
1929            accept_encoding,
1930            static_assets.enable_compression,
1931        );
1932        let (parts, body) = response.into_parts();
1933        return Err(Response::from_parts(
1934            parts,
1935            body.map_err(|never| match never {}).boxed(),
1936        ));
1937    }
1938
1939    if let Some(spa_file) = static_assets.spa_fallback.as_ref() {
1940        if looks_like_spa_request(path) {
1941            let accept_encoding = req.headers().get(http::header::ACCEPT_ENCODING).cloned();
1942            let response = match serve_single_file(spa_file).await {
1943                Ok(response) => response,
1944                Err(_) => {
1945                    return Err(Response::builder()
1946                        .status(StatusCode::INTERNAL_SERVER_ERROR)
1947                        .body(
1948                            Full::new(Bytes::from("Failed to serve SPA fallback"))
1949                                .map_err(|never| match never {})
1950                                .boxed(),
1951                        )
1952                        .unwrap_or_else(|_| {
1953                            Response::new(
1954                                Full::new(Bytes::new())
1955                                    .map_err(|never| match never {})
1956                                    .boxed(),
1957                            )
1958                        }));
1959                }
1960            };
1961            let mut response =
1962                apply_cache_control(response, static_assets.cache_control.as_deref());
1963            response = maybe_compress_static_response(
1964                response,
1965                accept_encoding,
1966                static_assets.enable_compression,
1967            );
1968            let (parts, body) = response.into_parts();
1969            return Err(Response::from_parts(
1970                parts,
1971                body.map_err(|never| match never {}).boxed(),
1972            ));
1973        }
1974    }
1975
1976    Ok(req)
1977}
1978
1979fn strip_mount_prefix(path: &str, prefix: &str) -> Option<String> {
1980    let normalized_prefix = if prefix == "/" {
1981        "/"
1982    } else {
1983        prefix.trim_end_matches('/')
1984    };
1985
1986    if normalized_prefix == "/" {
1987        return Some(path.to_string());
1988    }
1989
1990    if path == normalized_prefix {
1991        return Some("/".to_string());
1992    }
1993
1994    let with_slash = format!("{normalized_prefix}/");
1995    path.strip_prefix(&with_slash)
1996        .map(|stripped| format!("/{}", stripped))
1997}
1998
1999fn looks_like_spa_request(path: &str) -> bool {
2000    let tail = path.rsplit('/').next().unwrap_or_default();
2001    !tail.contains('.')
2002}
2003
2004fn maybe_compress_static_response(
2005    response: Response<Full<Bytes>>,
2006    accept_encoding: Option<http::HeaderValue>,
2007    enable_compression: bool,
2008) -> Response<Full<Bytes>> {
2009    if !enable_compression {
2010        return response;
2011    }
2012
2013    let Some(accept_encoding) = accept_encoding else {
2014        return response;
2015    };
2016
2017    let accept_str = accept_encoding.to_str().unwrap_or("");
2018    if !accept_str.contains("gzip") {
2019        return response;
2020    }
2021
2022    let status = response.status();
2023    let headers = response.headers().clone();
2024    let body = response.into_body();
2025
2026    // Full<Bytes> resolves immediately — collect synchronously via now_or_never()
2027    let data = futures_util::FutureExt::now_or_never(BodyExt::collect(body))
2028        .and_then(|r| r.ok())
2029        .map(|collected| collected.to_bytes())
2030        .unwrap_or_default();
2031
2032    // Compress with gzip
2033    let compressed = {
2034        use flate2::write::GzEncoder;
2035        use flate2::Compression;
2036        use std::io::Write;
2037        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2038        let _ = encoder.write_all(&data);
2039        encoder.finish().unwrap_or_default()
2040    };
2041
2042    let mut builder = Response::builder().status(status);
2043    for (name, value) in headers.iter() {
2044        if name != http::header::CONTENT_LENGTH && name != http::header::CONTENT_ENCODING {
2045            builder = builder.header(name, value);
2046        }
2047    }
2048    builder
2049        .header(http::header::CONTENT_ENCODING, "gzip")
2050        .body(Full::new(Bytes::from(compressed)))
2051        .unwrap_or_else(|_| Response::new(Full::new(Bytes::new())))
2052}
2053
2054async fn run_named_health_checks<R>(
2055    checks: &[NamedHealthCheck<R>],
2056    resources: Arc<R>,
2057) -> (bool, Vec<HealthCheckReport>)
2058where
2059    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
2060{
2061    let mut reports = Vec::with_capacity(checks.len());
2062    let mut healthy = true;
2063
2064    for check in checks {
2065        match (check.check)(resources.clone()).await {
2066            Ok(()) => reports.push(HealthCheckReport {
2067                name: check.name.clone(),
2068                status: "ok",
2069                error: None,
2070            }),
2071            Err(error) => {
2072                healthy = false;
2073                reports.push(HealthCheckReport {
2074                    name: check.name.clone(),
2075                    status: "error",
2076                    error: Some(error),
2077                });
2078            }
2079        }
2080    }
2081
2082    (healthy, reports)
2083}
2084
2085fn health_json_response(
2086    probe: &'static str,
2087    healthy: bool,
2088    checks: Vec<HealthCheckReport>,
2089) -> HttpResponse {
2090    let status_code = if healthy {
2091        StatusCode::OK
2092    } else {
2093        StatusCode::SERVICE_UNAVAILABLE
2094    };
2095    let status = if healthy { "ok" } else { "degraded" };
2096    let payload = HealthReport {
2097        status,
2098        probe,
2099        checks,
2100    };
2101
2102    let body = serde_json::to_vec(&payload)
2103        .unwrap_or_else(|_| br#"{"status":"error","probe":"health"}"#.to_vec());
2104
2105    Response::builder()
2106        .status(status_code)
2107        .header(http::header::CONTENT_TYPE, "application/json")
2108        .body(
2109            Full::new(Bytes::from(body))
2110                .map_err(|never| match never {})
2111                .boxed(),
2112        )
2113        .expect("valid HTTP response construction")
2114}
2115
2116async fn shutdown_signal() {
2117    #[cfg(unix)]
2118    {
2119        use tokio::signal::unix::{SignalKind, signal};
2120
2121        match signal(SignalKind::terminate()) {
2122            Ok(mut terminate) => {
2123                tokio::select! {
2124                    _ = tokio::signal::ctrl_c() => {}
2125                    _ = terminate.recv() => {}
2126                }
2127            }
2128            Err(err) => {
2129                tracing::warn!("Failed to install SIGTERM handler: {:?}", err);
2130                if let Err(ctrl_c_err) = tokio::signal::ctrl_c().await {
2131                    tracing::warn!("Failed to listen for Ctrl+C: {:?}", ctrl_c_err);
2132                }
2133            }
2134        }
2135    }
2136
2137    #[cfg(not(unix))]
2138    {
2139        if let Err(err) = tokio::signal::ctrl_c().await {
2140            tracing::warn!("Failed to listen for Ctrl+C: {:?}", err);
2141        }
2142    }
2143}
2144
2145async fn drain_connections(
2146    connections: &mut tokio::task::JoinSet<()>,
2147    graceful_shutdown_timeout: Duration,
2148) -> bool {
2149    if connections.is_empty() {
2150        return false;
2151    }
2152
2153    let drain_result = tokio::time::timeout(graceful_shutdown_timeout, async {
2154        while let Some(join_result) = connections.join_next().await {
2155            if let Err(err) = join_result {
2156                tracing::warn!("Connection task join error during shutdown: {:?}", err);
2157            }
2158        }
2159    })
2160    .await;
2161
2162    if drain_result.is_err() {
2163        tracing::warn!(
2164            "Graceful shutdown timeout reached ({:?}). Aborting remaining connections.",
2165            graceful_shutdown_timeout
2166        );
2167        connections.abort_all();
2168        while let Some(join_result) = connections.join_next().await {
2169            if let Err(err) = join_result {
2170                tracing::warn!("Connection task abort join error: {:?}", err);
2171            }
2172        }
2173        true
2174    } else {
2175        false
2176    }
2177}
2178
2179/// Build a TLS acceptor from PEM certificate and key files.
2180#[cfg(feature = "tls")]
2181fn build_tls_acceptor(
2182    cert_path: &str,
2183    key_path: &str,
2184) -> Result<tokio_rustls::TlsAcceptor, Box<dyn std::error::Error + Send + Sync>> {
2185    use rustls::ServerConfig;
2186    use rustls_pemfile::{certs, private_key};
2187    use std::io::BufReader;
2188    use tokio_rustls::TlsAcceptor;
2189
2190    let cert_file = std::fs::File::open(cert_path)
2191        .map_err(|e| format!("Failed to open certificate file '{}': {}", cert_path, e))?;
2192    let key_file = std::fs::File::open(key_path)
2193        .map_err(|e| format!("Failed to open key file '{}': {}", key_path, e))?;
2194
2195    let cert_chain: Vec<_> = certs(&mut BufReader::new(cert_file))
2196        .collect::<Result<Vec<_>, _>>()
2197        .map_err(|e| format!("Failed to parse certificate PEM: {}", e))?;
2198
2199    let key = private_key(&mut BufReader::new(key_file))
2200        .map_err(|e| format!("Failed to parse private key PEM: {}", e))?
2201        .ok_or("No private key found in key file")?;
2202
2203    let config = ServerConfig::builder()
2204        .with_no_client_auth()
2205        .with_single_cert(cert_chain, key)
2206        .map_err(|e| format!("TLS configuration error: {}", e))?;
2207
2208    Ok(TlsAcceptor::from(Arc::new(config)))
2209}
2210
2211impl<R> Default for HttpIngress<R>
2212where
2213    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
2214{
2215    fn default() -> Self {
2216        Self::new()
2217    }
2218}
2219
2220/// Internal service type for `into_raw_service()`
2221#[derive(Clone)]
2222pub struct RawIngressService<R> {
2223    routes: Arc<Vec<RouteEntry<R>>>,
2224    fallback: Option<RouteHandler<R>>,
2225    layers: Arc<Vec<ServiceLayer>>,
2226    health: Arc<HealthConfig<R>>,
2227    static_assets: Arc<StaticAssetsConfig>,
2228    resources: Arc<R>,
2229}
2230
2231impl<R> hyper::service::Service<Request<Incoming>> for RawIngressService<R>
2232where
2233    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
2234{
2235    type Response = HttpResponse;
2236    type Error = Infallible;
2237    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
2238
2239    fn call(&self, req: Request<Incoming>) -> Self::Future {
2240        let routes = self.routes.clone();
2241        let fallback = self.fallback.clone();
2242        let layers = self.layers.clone();
2243        let health = self.health.clone();
2244        let static_assets = self.static_assets.clone();
2245        let resources = self.resources.clone();
2246
2247        Box::pin(async move {
2248            let service = build_http_service(
2249                routes,
2250                fallback,
2251                resources,
2252                layers,
2253                health,
2254                static_assets,
2255                #[cfg(feature = "http3")]
2256                None,
2257            );
2258            service.call(req).await
2259        })
2260    }
2261}
2262
2263#[cfg(test)]
2264mod tests {
2265    use super::*;
2266    use async_trait::async_trait;
2267    use futures_util::{SinkExt, StreamExt};
2268    use serde::Deserialize;
2269    use std::fs;
2270    use std::sync::atomic::{AtomicBool, Ordering};
2271    use tempfile::tempdir;
2272    use tokio::io::{AsyncReadExt, AsyncWriteExt};
2273    use tokio_tungstenite::tungstenite::Message as WsClientMessage;
2274    use tokio_tungstenite::tungstenite::client::IntoClientRequest;
2275
2276    async fn connect_with_retry(addr: std::net::SocketAddr) -> tokio::net::TcpStream {
2277        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
2278
2279        loop {
2280            match tokio::net::TcpStream::connect(addr).await {
2281                Ok(stream) => return stream,
2282                Err(error) => {
2283                    if tokio::time::Instant::now() >= deadline {
2284                        panic!("connect server: {error}");
2285                    }
2286                    tokio::time::sleep(Duration::from_millis(25)).await;
2287                }
2288            }
2289        }
2290    }
2291
2292    #[test]
2293    fn route_pattern_matches_static_path() {
2294        let pattern = RoutePattern::parse("/orders/list");
2295        let params = pattern.match_path("/orders/list").expect("should match");
2296        assert!(params.into_inner().is_empty());
2297    }
2298
2299    #[test]
2300    fn route_pattern_matches_param_segments() {
2301        let pattern = RoutePattern::parse("/orders/:id/items/:item_id");
2302        let params = pattern
2303            .match_path("/orders/42/items/sku-123")
2304            .expect("should match");
2305        assert_eq!(params.get("id"), Some("42"));
2306        assert_eq!(params.get("item_id"), Some("sku-123"));
2307    }
2308
2309    #[test]
2310    fn route_pattern_matches_wildcard_segment() {
2311        let pattern = RoutePattern::parse("/assets/*path");
2312        let params = pattern
2313            .match_path("/assets/css/theme/light.css")
2314            .expect("should match");
2315        assert_eq!(params.get("path"), Some("css/theme/light.css"));
2316    }
2317
2318    #[test]
2319    fn route_pattern_rejects_non_matching_path() {
2320        let pattern = RoutePattern::parse("/orders/:id");
2321        assert!(pattern.match_path("/users/42").is_none());
2322    }
2323
2324    #[test]
2325    fn graceful_shutdown_timeout_defaults_to_30_seconds() {
2326        let ingress = HttpIngress::<()>::new();
2327        assert_eq!(ingress.graceful_shutdown_timeout, Duration::from_secs(30));
2328        assert!(ingress.layers.is_empty());
2329        assert!(ingress.bus_injectors.is_empty());
2330        assert!(ingress.static_assets.mounts.is_empty());
2331        assert!(ingress.on_start.is_none());
2332        assert!(ingress.on_shutdown.is_none());
2333    }
2334
2335    #[test]
2336    fn route_without_layer_keeps_empty_route_middleware_stack() {
2337        let ingress =
2338            HttpIngress::<()>::new().get("/ping", Axon::<(), (), String, ()>::new("Ping"));
2339        assert_eq!(ingress.routes.len(), 1);
2340        assert!(ingress.routes[0].layers.is_empty());
2341        assert!(ingress.routes[0].apply_global_layers);
2342    }
2343
2344    #[test]
2345    fn timeout_layer_registers_builtin_middleware() {
2346        let ingress = HttpIngress::<()>::new().timeout_layer(Duration::from_secs(1));
2347        assert_eq!(ingress.layers.len(), 1);
2348    }
2349
2350    #[test]
2351    fn request_id_layer_registers_builtin_middleware() {
2352        let ingress = HttpIngress::<()>::new().request_id_layer();
2353        assert_eq!(ingress.layers.len(), 1);
2354    }
2355
2356    #[test]
2357    fn compression_layer_registers_builtin_middleware() {
2358        let ingress = HttpIngress::<()>::new().compression_layer();
2359        assert!(ingress.static_assets.enable_compression);
2360    }
2361
2362    #[test]
2363    fn bus_injector_registration_adds_hook() {
2364        let ingress = HttpIngress::<()>::new().bus_injector(|_req, bus| {
2365            bus.insert("ok".to_string());
2366        });
2367        assert_eq!(ingress.bus_injectors.len(), 1);
2368    }
2369
2370    #[test]
2371    fn ws_route_registers_get_route_pattern() {
2372        let ingress =
2373            HttpIngress::<()>::new().ws("/ws/events", |_socket, _resources, _bus| async {});
2374        assert_eq!(ingress.routes.len(), 1);
2375        assert_eq!(ingress.routes[0].method, Method::GET);
2376        assert_eq!(ingress.routes[0].pattern.raw, "/ws/events");
2377    }
2378
2379    #[derive(Debug, Deserialize)]
2380    struct WsWelcomeFrame {
2381        connection_id: String,
2382        path: String,
2383        tenant: String,
2384    }
2385
2386    #[tokio::test]
2387    async fn ws_route_upgrades_and_bridges_event_source_sink_with_connection_bus() {
2388        let probe = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
2389        let addr = probe.local_addr().expect("local addr");
2390        drop(probe);
2391
2392        let ingress = HttpIngress::<()>::new()
2393            .bind(addr.to_string())
2394            .bus_injector(|req, bus| {
2395                if let Some(value) = req.headers.get("x-tenant-id").and_then(|v| v.to_str().ok()) {
2396                    bus.insert(value.to_string());
2397                }
2398            })
2399            .ws("/ws/echo", |mut socket, _resources, bus| async move {
2400                let tenant = bus
2401                    .read::<String>()
2402                    .cloned()
2403                    .unwrap_or_else(|| "unknown".to_string());
2404                if let Some(session) = bus.read::<WebSocketSessionContext>() {
2405                    let welcome = serde_json::json!({
2406                        "connection_id": session.connection_id().to_string(),
2407                        "path": session.path(),
2408                        "tenant": tenant,
2409                    });
2410                    let _ = socket.send_json(&welcome).await;
2411                }
2412
2413                while let Some(event) = socket.next_event().await {
2414                    match event {
2415                        WebSocketEvent::Text(text) => {
2416                            let _ = socket.send_event(format!("echo:{text}")).await;
2417                        }
2418                        WebSocketEvent::Binary(bytes) => {
2419                            let _ = socket.send_event(bytes).await;
2420                        }
2421                        WebSocketEvent::Close => break,
2422                        WebSocketEvent::Ping(_) | WebSocketEvent::Pong(_) => {}
2423                    }
2424                }
2425            });
2426
2427        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
2428        let server = tokio::spawn(async move {
2429            ingress
2430                .run_with_shutdown_signal((), async move {
2431                    let _ = shutdown_rx.await;
2432                })
2433                .await
2434        });
2435
2436        let ws_uri = format!("ws://{addr}/ws/echo?room=alpha");
2437        let mut ws_request = ws_uri
2438            .as_str()
2439            .into_client_request()
2440            .expect("ws client request");
2441        ws_request
2442            .headers_mut()
2443            .insert("x-tenant-id", http::HeaderValue::from_static("acme"));
2444        let (mut client, _response) = tokio_tungstenite::connect_async(ws_request)
2445            .await
2446            .expect("websocket connect");
2447
2448        let welcome = client
2449            .next()
2450            .await
2451            .expect("welcome frame")
2452            .expect("welcome frame ok");
2453        let welcome_text = match welcome {
2454            WsClientMessage::Text(text) => text.to_string(),
2455            other => panic!("expected text welcome frame, got {other:?}"),
2456        };
2457        let welcome_payload: WsWelcomeFrame =
2458            serde_json::from_str(&welcome_text).expect("welcome json");
2459        assert_eq!(welcome_payload.path, "/ws/echo");
2460        assert_eq!(welcome_payload.tenant, "acme");
2461        assert!(!welcome_payload.connection_id.is_empty());
2462
2463        client
2464            .send(WsClientMessage::Text("hello".into()))
2465            .await
2466            .expect("send text");
2467        let echo_text = client
2468            .next()
2469            .await
2470            .expect("echo text frame")
2471            .expect("echo text frame ok");
2472        assert_eq!(echo_text, WsClientMessage::Text("echo:hello".into()));
2473
2474        client
2475            .send(WsClientMessage::Binary(vec![1, 2, 3, 4].into()))
2476            .await
2477            .expect("send binary");
2478        let echo_binary = client
2479            .next()
2480            .await
2481            .expect("echo binary frame")
2482            .expect("echo binary frame ok");
2483        assert_eq!(
2484            echo_binary,
2485            WsClientMessage::Binary(vec![1, 2, 3, 4].into())
2486        );
2487
2488        client.close(None).await.expect("close websocket");
2489
2490        let _ = shutdown_tx.send(());
2491        server
2492            .await
2493            .expect("server join")
2494            .expect("server shutdown should succeed");
2495    }
2496
2497    #[test]
2498    fn route_descriptors_export_http_and_health_paths() {
2499        let ingress = HttpIngress::<()>::new()
2500            .get(
2501                "/orders/:id",
2502                Axon::<(), (), String, ()>::new("OrderById"),
2503            )
2504            .health_endpoint("/healthz")
2505            .readiness_liveness("/readyz", "/livez");
2506
2507        let descriptors = ingress.route_descriptors();
2508
2509        assert!(
2510            descriptors
2511                .iter()
2512                .any(|descriptor| descriptor.method() == Method::GET
2513                    && descriptor.path_pattern() == "/orders/:id")
2514        );
2515        assert!(
2516            descriptors
2517                .iter()
2518                .any(|descriptor| descriptor.method() == Method::GET
2519                    && descriptor.path_pattern() == "/healthz")
2520        );
2521        assert!(
2522            descriptors
2523                .iter()
2524                .any(|descriptor| descriptor.method() == Method::GET
2525                    && descriptor.path_pattern() == "/readyz")
2526        );
2527        assert!(
2528            descriptors
2529                .iter()
2530                .any(|descriptor| descriptor.method() == Method::GET
2531                    && descriptor.path_pattern() == "/livez")
2532        );
2533    }
2534
2535    #[tokio::test]
2536    async fn lifecycle_hooks_fire_on_start_and_shutdown() {
2537        let started = Arc::new(AtomicBool::new(false));
2538        let shutdown = Arc::new(AtomicBool::new(false));
2539
2540        let started_flag = started.clone();
2541        let shutdown_flag = shutdown.clone();
2542
2543        let ingress = HttpIngress::<()>::new()
2544            .bind("127.0.0.1:0")
2545            .on_start(move || {
2546                started_flag.store(true, Ordering::SeqCst);
2547            })
2548            .on_shutdown(move || {
2549                shutdown_flag.store(true, Ordering::SeqCst);
2550            })
2551            .graceful_shutdown(Duration::from_millis(50));
2552
2553        ingress
2554            .run_with_shutdown_signal((), async {
2555                tokio::time::sleep(Duration::from_millis(20)).await;
2556            })
2557            .await
2558            .expect("server should exit gracefully");
2559
2560        assert!(started.load(Ordering::SeqCst));
2561        assert!(shutdown.load(Ordering::SeqCst));
2562    }
2563
2564    #[tokio::test]
2565    async fn graceful_shutdown_drains_in_flight_requests_before_exit() {
2566        #[derive(Clone)]
2567        struct SlowDrainRoute;
2568
2569        #[async_trait]
2570        impl Transition<(), String> for SlowDrainRoute {
2571            type Error = String;
2572            type Resources = ();
2573
2574            async fn run(
2575                &self,
2576                _state: (),
2577                _resources: &Self::Resources,
2578                _bus: &mut Bus,
2579            ) -> Outcome<String, Self::Error> {
2580                tokio::time::sleep(Duration::from_millis(120)).await;
2581                Outcome::next("drained-ok".to_string())
2582            }
2583        }
2584
2585        let probe = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
2586        let addr = probe.local_addr().expect("local addr");
2587        drop(probe);
2588
2589        let ingress = HttpIngress::<()>::new()
2590            .bind(addr.to_string())
2591            .graceful_shutdown(Duration::from_millis(500))
2592            .get(
2593                "/drain",
2594                Axon::<(), (), String, ()>::new("SlowDrain").then(SlowDrainRoute),
2595            );
2596
2597        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
2598        let server = tokio::spawn(async move {
2599            ingress
2600                .run_with_shutdown_signal((), async move {
2601                    let _ = shutdown_rx.await;
2602                })
2603                .await
2604        });
2605
2606        let mut stream = connect_with_retry(addr).await;
2607        stream
2608            .write_all(b"GET /drain HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
2609            .await
2610            .expect("write request");
2611
2612        tokio::time::sleep(Duration::from_millis(20)).await;
2613        let _ = shutdown_tx.send(());
2614
2615        let mut buf = Vec::new();
2616        stream.read_to_end(&mut buf).await.expect("read response");
2617        let response = String::from_utf8_lossy(&buf);
2618        assert!(response.starts_with("HTTP/1.1 200"), "{response}");
2619        assert!(response.contains("drained-ok"), "{response}");
2620
2621        server
2622            .await
2623            .expect("server join")
2624            .expect("server shutdown should succeed");
2625    }
2626
2627    #[tokio::test]
2628    async fn serve_dir_serves_static_file_with_cache_and_metadata_headers() {
2629        let temp = tempdir().expect("tempdir");
2630        let root = temp.path().join("public");
2631        fs::create_dir_all(&root).expect("create dir");
2632        let file = root.join("hello.txt");
2633        fs::write(&file, "hello static").expect("write file");
2634
2635        let ingress =
2636            Ranvier::http::<()>().serve_dir("/static", root.to_string_lossy().to_string());
2637        let app = crate::test_harness::TestApp::new(ingress, ());
2638        let response = app
2639            .send(crate::test_harness::TestRequest::get("/static/hello.txt"))
2640            .await
2641            .expect("request should succeed");
2642
2643        assert_eq!(response.status(), StatusCode::OK);
2644        assert_eq!(response.text().expect("utf8"), "hello static");
2645        assert!(response.header("cache-control").is_some());
2646        let has_metadata_header =
2647            response.header("etag").is_some() || response.header("last-modified").is_some();
2648        assert!(has_metadata_header);
2649    }
2650
2651    #[tokio::test]
2652    async fn spa_fallback_returns_index_for_unmatched_path() {
2653        let temp = tempdir().expect("tempdir");
2654        let index = temp.path().join("index.html");
2655        fs::write(&index, "<html><body>spa</body></html>").expect("write index");
2656
2657        let ingress = Ranvier::http::<()>().spa_fallback(index.to_string_lossy().to_string());
2658        let app = crate::test_harness::TestApp::new(ingress, ());
2659        let response = app
2660            .send(crate::test_harness::TestRequest::get("/dashboard/settings"))
2661            .await
2662            .expect("request should succeed");
2663
2664        assert_eq!(response.status(), StatusCode::OK);
2665        assert!(response.text().expect("utf8").contains("spa"));
2666    }
2667
2668    #[tokio::test]
2669    async fn static_compression_layer_sets_content_encoding_for_gzip_client() {
2670        let temp = tempdir().expect("tempdir");
2671        let root = temp.path().join("public");
2672        fs::create_dir_all(&root).expect("create dir");
2673        let file = root.join("compressed.txt");
2674        fs::write(&file, "compress me ".repeat(400)).expect("write file");
2675
2676        let ingress = Ranvier::http::<()>()
2677            .serve_dir("/static", root.to_string_lossy().to_string())
2678            .compression_layer();
2679        let app = crate::test_harness::TestApp::new(ingress, ());
2680        let response = app
2681            .send(
2682                crate::test_harness::TestRequest::get("/static/compressed.txt")
2683                    .header("accept-encoding", "gzip"),
2684            )
2685            .await
2686            .expect("request should succeed");
2687
2688        assert_eq!(response.status(), StatusCode::OK);
2689        assert_eq!(
2690            response
2691                .header("content-encoding")
2692                .and_then(|value| value.to_str().ok()),
2693            Some("gzip")
2694        );
2695    }
2696
2697    #[tokio::test]
2698    async fn drain_connections_completes_before_timeout() {
2699        let mut connections = tokio::task::JoinSet::new();
2700        connections.spawn(async {
2701            tokio::time::sleep(Duration::from_millis(20)).await;
2702        });
2703
2704        let timed_out = drain_connections(&mut connections, Duration::from_millis(200)).await;
2705        assert!(!timed_out);
2706        assert!(connections.is_empty());
2707    }
2708
2709    #[tokio::test]
2710    async fn drain_connections_times_out_and_aborts() {
2711        let mut connections = tokio::task::JoinSet::new();
2712        connections.spawn(async {
2713            tokio::time::sleep(Duration::from_secs(10)).await;
2714        });
2715
2716        let timed_out = drain_connections(&mut connections, Duration::from_millis(10)).await;
2717        assert!(timed_out);
2718        assert!(connections.is_empty());
2719    }
2720
2721    #[tokio::test]
2722    async fn timeout_layer_returns_408_for_slow_route() {
2723        #[derive(Clone)]
2724        struct SlowRoute;
2725
2726        #[async_trait]
2727        impl Transition<(), String> for SlowRoute {
2728            type Error = String;
2729            type Resources = ();
2730
2731            async fn run(
2732                &self,
2733                _state: (),
2734                _resources: &Self::Resources,
2735                _bus: &mut Bus,
2736            ) -> Outcome<String, Self::Error> {
2737                tokio::time::sleep(Duration::from_millis(80)).await;
2738                Outcome::next("slow-ok".to_string())
2739            }
2740        }
2741
2742        let probe = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
2743        let addr = probe.local_addr().expect("local addr");
2744        drop(probe);
2745
2746        let ingress = HttpIngress::<()>::new()
2747            .bind(addr.to_string())
2748            .timeout_layer(Duration::from_millis(10))
2749            .get(
2750                "/slow",
2751                Axon::<(), (), String, ()>::new("Slow").then(SlowRoute),
2752            );
2753
2754        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
2755        let server = tokio::spawn(async move {
2756            ingress
2757                .run_with_shutdown_signal((), async move {
2758                    let _ = shutdown_rx.await;
2759                })
2760                .await
2761        });
2762
2763        let mut stream = connect_with_retry(addr).await;
2764        stream
2765            .write_all(b"GET /slow HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
2766            .await
2767            .expect("write request");
2768
2769        let mut buf = Vec::new();
2770        stream.read_to_end(&mut buf).await.expect("read response");
2771        let response = String::from_utf8_lossy(&buf);
2772        assert!(response.starts_with("HTTP/1.1 408"), "{response}");
2773
2774        let _ = shutdown_tx.send(());
2775        server
2776            .await
2777            .expect("server join")
2778            .expect("server shutdown should succeed");
2779    }
2780
2781}