Skip to main content

ranvier_http/
ingress.rs

1//! # Ingress Module - Flat API Entry Point
2//!
3//! Implements Discussion 193: `Ranvier::http()` is an **Ingress Circuit Builder**, not a web server.
4//!
5//! ## API Surface (MVP)
6//!
7//! - `bind(addr)` — Execution unit
8//! - `route(path, circuit)` — Core wiring
9//! - `fallback(circuit)` — Circuit completeness
10//! - `into_raw_service()` — Escape hatch to Raw API
11//!
12//! ## Flat API Principle (Discussion 192)
13//!
14//! User code depth ≤ 2. Complexity is isolated, not hidden.
15
16use base64::Engine;
17use bytes::Bytes;
18use futures_util::{SinkExt, StreamExt};
19use http::{Method, Request, Response, StatusCode};
20use http_body_util::{BodyExt, Full};
21use hyper::body::Incoming;
22use hyper::server::conn::http1;
23use hyper::upgrade::Upgraded;
24use hyper_util::rt::TokioIo;
25use ranvier_core::event::{EventSink, EventSource};
26use ranvier_core::prelude::*;
27use ranvier_runtime::Axon;
28use serde::Serialize;
29use serde::de::DeserializeOwned;
30use sha1::{Digest, Sha1};
31use std::collections::HashMap;
32use std::convert::Infallible;
33use std::future::Future;
34use std::net::SocketAddr;
35use std::pin::Pin;
36use std::sync::Arc;
37use std::time::Duration;
38use tokio::net::TcpListener;
39use tokio::sync::Mutex;
40use tokio_tungstenite::WebSocketStream;
41use tokio_tungstenite::tungstenite::{Error as WsWireError, Message as WsWireMessage};
42use tracing::Instrument;
43
44use crate::response::{HttpResponse, IntoResponse, outcome_to_response_with_error};
45
46/// The Ranvier Framework entry point.
47///
48/// `Ranvier` provides static methods to create Ingress builders for various protocols.
49/// Currently only HTTP is supported.
50pub struct Ranvier;
51
52impl Ranvier {
53    /// Create an HTTP Ingress Circuit Builder.
54    pub fn http<R>() -> HttpIngress<R>
55    where
56        R: ranvier_core::transition::ResourceRequirement + Clone,
57    {
58        HttpIngress::new()
59    }
60}
61
62/// Route handler type: boxed async function returning Response
63type RouteHandler<R> = Arc<
64    dyn Fn(http::request::Parts, &R) -> Pin<Box<dyn Future<Output = HttpResponse> + Send>>
65        + Send
66        + Sync,
67>;
68
69/// Type-erased cloneable HTTP service (replaces tower::util::BoxCloneService).
70#[derive(Clone)]
71struct BoxService(
72    Arc<
73        dyn Fn(Request<Incoming>) -> Pin<Box<dyn Future<Output = Result<HttpResponse, Infallible>> + Send>>
74            + Send
75            + Sync,
76    >,
77);
78
79impl BoxService {
80    fn new<F, Fut>(f: F) -> Self
81    where
82        F: Fn(Request<Incoming>) -> Fut + Send + Sync + 'static,
83        Fut: Future<Output = Result<HttpResponse, Infallible>> + Send + 'static,
84    {
85        Self(Arc::new(move |req| Box::pin(f(req))))
86    }
87
88    fn call(&self, req: Request<Incoming>) -> Pin<Box<dyn Future<Output = Result<HttpResponse, Infallible>> + Send>> {
89        (self.0)(req)
90    }
91}
92
93impl hyper::service::Service<Request<Incoming>> for BoxService {
94    type Response = HttpResponse;
95    type Error = Infallible;
96    type Future = Pin<Box<dyn Future<Output = Result<HttpResponse, Infallible>> + Send>>;
97
98    fn call(&self, req: Request<Incoming>) -> Self::Future {
99        (self.0)(req)
100    }
101}
102
103type BoxHttpService = BoxService;
104type ServiceLayer = Arc<dyn Fn(BoxHttpService) -> BoxHttpService + Send + Sync>;
105type LifecycleHook = Arc<dyn Fn() + Send + Sync>;
106type BusInjector = Arc<dyn Fn(&http::request::Parts, &mut Bus) + Send + Sync + 'static>;
107type WsSessionFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
108type WsSessionHandler<R> =
109    Arc<dyn Fn(WebSocketConnection, Arc<R>, Bus) -> WsSessionFuture + Send + Sync>;
110type HealthCheckFuture = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
111type HealthCheckFn<R> = Arc<dyn Fn(Arc<R>) -> HealthCheckFuture + Send + Sync>;
112const REQUEST_ID_HEADER: &str = "x-request-id";
113const WS_UPGRADE_TOKEN: &str = "websocket";
114const WS_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
115
116#[derive(Clone)]
117struct NamedHealthCheck<R> {
118    name: String,
119    check: HealthCheckFn<R>,
120}
121
122#[derive(Clone)]
123struct HealthConfig<R> {
124    health_path: Option<String>,
125    readiness_path: Option<String>,
126    liveness_path: Option<String>,
127    checks: Vec<NamedHealthCheck<R>>,
128}
129
130impl<R> Default for HealthConfig<R> {
131    fn default() -> Self {
132        Self {
133            health_path: None,
134            readiness_path: None,
135            liveness_path: None,
136            checks: Vec::new(),
137        }
138    }
139}
140
141#[derive(Clone, Default)]
142struct StaticAssetsConfig {
143    mounts: Vec<StaticMount>,
144    spa_fallback: Option<String>,
145    cache_control: Option<String>,
146    enable_compression: bool,
147}
148
149#[derive(Clone)]
150struct StaticMount {
151    route_prefix: String,
152    directory: String,
153}
154
155/// TLS configuration for HTTPS serving.
156#[cfg(feature = "tls")]
157#[derive(Clone)]
158struct TlsAcceptorConfig {
159    cert_path: String,
160    key_path: String,
161}
162
163#[derive(Serialize)]
164struct HealthReport {
165    status: &'static str,
166    probe: &'static str,
167    checks: Vec<HealthCheckReport>,
168}
169
170#[derive(Serialize)]
171struct HealthCheckReport {
172    name: String,
173    status: &'static str,
174    #[serde(skip_serializing_if = "Option::is_none")]
175    error: Option<String>,
176}
177
178fn timeout_middleware(timeout: Duration) -> ServiceLayer {
179    Arc::new(move |inner: BoxHttpService| {
180        BoxService::new(move |req: Request<Incoming>| {
181            let inner = inner.clone();
182            async move {
183                match tokio::time::timeout(timeout, inner.call(req)).await {
184                    Ok(response) => response,
185                    Err(_) => Ok(Response::builder()
186                        .status(StatusCode::REQUEST_TIMEOUT)
187                        .body(
188                            Full::new(Bytes::from("Request Timeout"))
189                                .map_err(|never| match never {})
190                                .boxed(),
191                        )
192                        .expect("valid HTTP response construction")),
193                }
194            }
195        })
196    })
197}
198
199fn request_id_middleware() -> ServiceLayer {
200    Arc::new(move |inner: BoxHttpService| {
201        BoxService::new(move |req: Request<Incoming>| {
202            let inner = inner.clone();
203            async move {
204                let mut req = req;
205                let request_id = req
206                    .headers()
207                    .get(REQUEST_ID_HEADER)
208                    .cloned()
209                    .unwrap_or_else(|| {
210                        http::HeaderValue::from_str(&uuid::Uuid::new_v4().to_string())
211                            .unwrap_or_else(|_| {
212                                http::HeaderValue::from_static("request-id-unavailable")
213                            })
214                    });
215                req.headers_mut()
216                    .insert(REQUEST_ID_HEADER, request_id.clone());
217                let mut response = inner.call(req).await?;
218                response
219                    .headers_mut()
220                    .insert(REQUEST_ID_HEADER, request_id);
221                Ok(response)
222            }
223        })
224    })
225}
226
227#[derive(Clone, Debug, Default, PartialEq, Eq)]
228pub struct PathParams {
229    values: HashMap<String, String>,
230}
231
232/// Public route descriptor snapshot for tooling integrations (e.g., OpenAPI generation).
233#[derive(Clone, Debug, PartialEq, Eq)]
234pub struct HttpRouteDescriptor {
235    method: Method,
236    path_pattern: String,
237}
238
239impl HttpRouteDescriptor {
240    pub fn new(method: Method, path_pattern: impl Into<String>) -> Self {
241        Self {
242            method,
243            path_pattern: path_pattern.into(),
244        }
245    }
246
247    pub fn method(&self) -> &Method {
248        &self.method
249    }
250
251    pub fn path_pattern(&self) -> &str {
252        &self.path_pattern
253    }
254}
255
256/// Connection metadata injected into Bus for each accepted WebSocket session.
257#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
258pub struct WebSocketSessionContext {
259    connection_id: uuid::Uuid,
260    path: String,
261    query: Option<String>,
262}
263
264impl WebSocketSessionContext {
265    pub fn connection_id(&self) -> uuid::Uuid {
266        self.connection_id
267    }
268
269    pub fn path(&self) -> &str {
270        &self.path
271    }
272
273    pub fn query(&self) -> Option<&str> {
274        self.query.as_deref()
275    }
276}
277
278/// Logical WebSocket message model used by Ranvier EventSource/EventSink bridge.
279#[derive(Clone, Debug, PartialEq, Eq)]
280pub enum WebSocketEvent {
281    Text(String),
282    Binary(Vec<u8>),
283    Ping(Vec<u8>),
284    Pong(Vec<u8>),
285    Close,
286}
287
288impl WebSocketEvent {
289    pub fn text(value: impl Into<String>) -> Self {
290        Self::Text(value.into())
291    }
292
293    pub fn binary(value: impl Into<Vec<u8>>) -> Self {
294        Self::Binary(value.into())
295    }
296
297    pub fn json<T>(value: &T) -> Result<Self, serde_json::Error>
298    where
299        T: Serialize,
300    {
301        let text = serde_json::to_string(value)?;
302        Ok(Self::Text(text))
303    }
304}
305
306#[derive(Debug, thiserror::Error)]
307pub enum WebSocketError {
308    #[error("websocket wire error: {0}")]
309    Wire(#[from] WsWireError),
310    #[error("json serialization failed: {0}")]
311    JsonSerialize(#[source] serde_json::Error),
312    #[error("json deserialization failed: {0}")]
313    JsonDeserialize(#[source] serde_json::Error),
314    #[error("expected text or binary frame for json payload")]
315    NonDataFrame,
316}
317
318type WsServerStream = WebSocketStream<TokioIo<Upgraded>>;
319type WsServerSink = futures_util::stream::SplitSink<WsServerStream, WsWireMessage>;
320type WsServerSource = futures_util::stream::SplitStream<WsServerStream>;
321
322/// WebSocket connection adapter bridging wire frames and EventSource/EventSink traits.
323pub struct WebSocketConnection {
324    sink: Mutex<WsServerSink>,
325    source: Mutex<WsServerSource>,
326    session: WebSocketSessionContext,
327}
328
329impl WebSocketConnection {
330    fn new(stream: WsServerStream, session: WebSocketSessionContext) -> Self {
331        let (sink, source) = stream.split();
332        Self {
333            sink: Mutex::new(sink),
334            source: Mutex::new(source),
335            session,
336        }
337    }
338
339    pub fn session(&self) -> &WebSocketSessionContext {
340        &self.session
341    }
342
343    pub async fn send(&self, event: WebSocketEvent) -> Result<(), WebSocketError> {
344        let mut sink = self.sink.lock().await;
345        sink.send(event.into_wire_message()).await?;
346        Ok(())
347    }
348
349    pub async fn send_json<T>(&self, value: &T) -> Result<(), WebSocketError>
350    where
351        T: Serialize,
352    {
353        let event = WebSocketEvent::json(value).map_err(WebSocketError::JsonSerialize)?;
354        self.send(event).await
355    }
356
357    pub async fn next_json<T>(&mut self) -> Result<Option<T>, WebSocketError>
358    where
359        T: DeserializeOwned,
360    {
361        let Some(event) = self.recv_event().await? else {
362            return Ok(None);
363        };
364        match event {
365            WebSocketEvent::Text(text) => serde_json::from_str(&text)
366                .map(Some)
367                .map_err(WebSocketError::JsonDeserialize),
368            WebSocketEvent::Binary(bytes) => serde_json::from_slice(&bytes)
369                .map(Some)
370                .map_err(WebSocketError::JsonDeserialize),
371            _ => Err(WebSocketError::NonDataFrame),
372        }
373    }
374
375    async fn recv_event(&mut self) -> Result<Option<WebSocketEvent>, WsWireError> {
376        let mut source = self.source.lock().await;
377        while let Some(item) = source.next().await {
378            let message = item?;
379            if let Some(event) = WebSocketEvent::from_wire_message(message) {
380                return Ok(Some(event));
381            }
382        }
383        Ok(None)
384    }
385}
386
387impl WebSocketEvent {
388    fn from_wire_message(message: WsWireMessage) -> Option<Self> {
389        match message {
390            WsWireMessage::Text(value) => Some(Self::Text(value.to_string())),
391            WsWireMessage::Binary(value) => Some(Self::Binary(value.to_vec())),
392            WsWireMessage::Ping(value) => Some(Self::Ping(value.to_vec())),
393            WsWireMessage::Pong(value) => Some(Self::Pong(value.to_vec())),
394            WsWireMessage::Close(_) => Some(Self::Close),
395            WsWireMessage::Frame(_) => None,
396        }
397    }
398
399    fn into_wire_message(self) -> WsWireMessage {
400        match self {
401            Self::Text(value) => WsWireMessage::Text(value),
402            Self::Binary(value) => WsWireMessage::Binary(value),
403            Self::Ping(value) => WsWireMessage::Ping(value),
404            Self::Pong(value) => WsWireMessage::Pong(value),
405            Self::Close => WsWireMessage::Close(None),
406        }
407    }
408}
409
410#[async_trait::async_trait]
411impl EventSource<WebSocketEvent> for WebSocketConnection {
412    async fn next_event(&mut self) -> Option<WebSocketEvent> {
413        match self.recv_event().await {
414            Ok(event) => event,
415            Err(error) => {
416                tracing::warn!(ranvier.ws.error = %error, "websocket source read failed");
417                None
418            }
419        }
420    }
421}
422
423#[async_trait::async_trait]
424impl EventSink<WebSocketEvent> for WebSocketConnection {
425    type Error = WebSocketError;
426
427    async fn send_event(&self, event: WebSocketEvent) -> Result<(), Self::Error> {
428        self.send(event).await
429    }
430}
431
432#[async_trait::async_trait]
433impl EventSink<String> for WebSocketConnection {
434    type Error = WebSocketError;
435
436    async fn send_event(&self, event: String) -> Result<(), Self::Error> {
437        self.send(WebSocketEvent::Text(event)).await
438    }
439}
440
441#[async_trait::async_trait]
442impl EventSink<Vec<u8>> for WebSocketConnection {
443    type Error = WebSocketError;
444
445    async fn send_event(&self, event: Vec<u8>) -> Result<(), Self::Error> {
446        self.send(WebSocketEvent::Binary(event)).await
447    }
448}
449
450impl PathParams {
451    pub fn new(values: HashMap<String, String>) -> Self {
452        Self { values }
453    }
454
455    pub fn get(&self, key: &str) -> Option<&str> {
456        self.values.get(key).map(String::as_str)
457    }
458
459    pub fn as_map(&self) -> &HashMap<String, String> {
460        &self.values
461    }
462
463    pub fn into_inner(self) -> HashMap<String, String> {
464        self.values
465    }
466}
467
468#[derive(Clone, Debug, PartialEq, Eq)]
469enum RouteSegment {
470    Static(String),
471    Param(String),
472    Wildcard(String),
473}
474
475#[derive(Clone, Debug, PartialEq, Eq)]
476struct RoutePattern {
477    raw: String,
478    segments: Vec<RouteSegment>,
479}
480
481impl RoutePattern {
482    fn parse(path: &str) -> Self {
483        let segments = path_segments(path)
484            .into_iter()
485            .map(|segment| {
486                if let Some(name) = segment.strip_prefix(':') {
487                    if !name.is_empty() {
488                        return RouteSegment::Param(name.to_string());
489                    }
490                }
491                if let Some(name) = segment.strip_prefix('*') {
492                    if !name.is_empty() {
493                        return RouteSegment::Wildcard(name.to_string());
494                    }
495                }
496                RouteSegment::Static(segment.to_string())
497            })
498            .collect();
499
500        Self {
501            raw: path.to_string(),
502            segments,
503        }
504    }
505
506    fn match_path(&self, path: &str) -> Option<PathParams> {
507        let mut params = HashMap::new();
508        let path_segments = path_segments(path);
509        let mut pattern_index = 0usize;
510        let mut path_index = 0usize;
511
512        while pattern_index < self.segments.len() {
513            match &self.segments[pattern_index] {
514                RouteSegment::Static(expected) => {
515                    let actual = path_segments.get(path_index)?;
516                    if actual != expected {
517                        return None;
518                    }
519                    pattern_index += 1;
520                    path_index += 1;
521                }
522                RouteSegment::Param(name) => {
523                    let actual = path_segments.get(path_index)?;
524                    params.insert(name.clone(), (*actual).to_string());
525                    pattern_index += 1;
526                    path_index += 1;
527                }
528                RouteSegment::Wildcard(name) => {
529                    let remaining = path_segments[path_index..].join("/");
530                    params.insert(name.clone(), remaining);
531                    pattern_index += 1;
532                    path_index = path_segments.len();
533                    break;
534                }
535            }
536        }
537
538        if pattern_index == self.segments.len() && path_index == path_segments.len() {
539            Some(PathParams::new(params))
540        } else {
541            None
542        }
543    }
544}
545
546#[derive(Clone)]
547struct RouteEntry<R> {
548    method: Method,
549    pattern: RoutePattern,
550    handler: RouteHandler<R>,
551    layers: Arc<Vec<ServiceLayer>>,
552    apply_global_layers: bool,
553}
554
555fn path_segments(path: &str) -> Vec<&str> {
556    if path == "/" {
557        return Vec::new();
558    }
559
560    path.trim_matches('/')
561        .split('/')
562        .filter(|segment| !segment.is_empty())
563        .collect()
564}
565
566fn normalize_route_path(path: String) -> String {
567    if path.is_empty() {
568        return "/".to_string();
569    }
570    if path.starts_with('/') {
571        path
572    } else {
573        format!("/{path}")
574    }
575}
576
577fn find_matching_route<'a, R>(
578    routes: &'a [RouteEntry<R>],
579    method: &Method,
580    path: &str,
581) -> Option<(&'a RouteEntry<R>, PathParams)> {
582    for entry in routes {
583        if entry.method != *method {
584            continue;
585        }
586        if let Some(params) = entry.pattern.match_path(path) {
587            return Some((entry, params));
588        }
589    }
590    None
591}
592
593fn header_contains_token(
594    headers: &http::HeaderMap,
595    name: http::header::HeaderName,
596    token: &str,
597) -> bool {
598    headers
599        .get(name)
600        .and_then(|value| value.to_str().ok())
601        .map(|value| {
602            value
603                .split(',')
604                .any(|part| part.trim().eq_ignore_ascii_case(token))
605        })
606        .unwrap_or(false)
607}
608
609fn websocket_session_from_request<B>(req: &Request<B>) -> WebSocketSessionContext {
610    WebSocketSessionContext {
611        connection_id: uuid::Uuid::new_v4(),
612        path: req.uri().path().to_string(),
613        query: req.uri().query().map(str::to_string),
614    }
615}
616
617fn websocket_accept_key(client_key: &str) -> String {
618    let mut hasher = Sha1::new();
619    hasher.update(client_key.as_bytes());
620    hasher.update(WS_GUID.as_bytes());
621    let digest = hasher.finalize();
622    base64::engine::general_purpose::STANDARD.encode(digest)
623}
624
625fn websocket_bad_request(message: &'static str) -> HttpResponse {
626    Response::builder()
627        .status(StatusCode::BAD_REQUEST)
628        .body(
629            Full::new(Bytes::from(message))
630                .map_err(|never| match never {})
631                .boxed(),
632        )
633        .unwrap_or_else(|_| {
634            Response::new(
635                Full::new(Bytes::new())
636                    .map_err(|never| match never {})
637                    .boxed(),
638            )
639        })
640}
641
642fn websocket_upgrade_response<B>(
643    req: &mut Request<B>,
644) -> Result<(HttpResponse, hyper::upgrade::OnUpgrade), HttpResponse> {
645    if req.method() != Method::GET {
646        return Err(websocket_bad_request(
647            "WebSocket upgrade requires GET method",
648        ));
649    }
650
651    if !header_contains_token(req.headers(), http::header::CONNECTION, "upgrade") {
652        return Err(websocket_bad_request(
653            "Missing Connection: upgrade header for WebSocket",
654        ));
655    }
656
657    if !header_contains_token(req.headers(), http::header::UPGRADE, WS_UPGRADE_TOKEN) {
658        return Err(websocket_bad_request("Missing Upgrade: websocket header"));
659    }
660
661    if let Some(version) = req.headers().get("sec-websocket-version") {
662        if version != "13" {
663            return Err(websocket_bad_request(
664                "Unsupported Sec-WebSocket-Version (expected 13)",
665            ));
666        }
667    }
668
669    let Some(client_key) = req
670        .headers()
671        .get("sec-websocket-key")
672        .and_then(|value| value.to_str().ok())
673    else {
674        return Err(websocket_bad_request(
675            "Missing Sec-WebSocket-Key header for WebSocket",
676        ));
677    };
678
679    let accept_key = websocket_accept_key(client_key);
680    let on_upgrade = hyper::upgrade::on(req);
681    let response = Response::builder()
682        .status(StatusCode::SWITCHING_PROTOCOLS)
683        .header(http::header::UPGRADE, WS_UPGRADE_TOKEN)
684        .header(http::header::CONNECTION, "Upgrade")
685        .header("sec-websocket-accept", accept_key)
686        .body(
687            Full::new(Bytes::new())
688                .map_err(|never| match never {})
689                .boxed(),
690        )
691        .unwrap_or_else(|_| {
692            Response::new(
693                Full::new(Bytes::new())
694                    .map_err(|never| match never {})
695                    .boxed(),
696            )
697        });
698
699    Ok((response, on_upgrade))
700}
701
702/// HTTP Ingress Circuit Builder.
703///
704/// Wires HTTP inputs to Ranvier Circuits. This is NOT a web server—it's a circuit wiring tool.
705///
706/// **Ingress is part of Schematic** (separate layer: Ingress → Circuit → Egress)
707pub struct HttpIngress<R = ()> {
708    /// Bind address (e.g., "127.0.0.1:3000")
709    addr: Option<String>,
710    /// Routes: (Method, RoutePattern, Handler)
711    routes: Vec<RouteEntry<R>>,
712    /// Fallback circuit for unmatched routes
713    fallback: Option<RouteHandler<R>>,
714    /// Global middleware layers (LIFO execution on request path).
715    layers: Vec<ServiceLayer>,
716    /// Lifecycle callback invoked after listener bind succeeds.
717    on_start: Option<LifecycleHook>,
718    /// Lifecycle callback invoked when graceful shutdown finishes.
719    on_shutdown: Option<LifecycleHook>,
720    /// Maximum time to wait for in-flight requests to drain.
721    graceful_shutdown_timeout: Duration,
722    /// Request-context to Bus injection hooks executed before each circuit run.
723    bus_injectors: Vec<BusInjector>,
724    /// Static asset serving configuration (serve_dir + SPA fallback).
725    static_assets: StaticAssetsConfig,
726    /// Built-in health endpoint configuration.
727    health: HealthConfig<R>,
728    #[cfg(feature = "http3")]
729    http3_config: Option<crate::http3::Http3Config>,
730    #[cfg(feature = "http3")]
731    alt_svc_h3_port: Option<u16>,
732    /// TLS configuration (feature-gated: `tls`)
733    #[cfg(feature = "tls")]
734    tls_config: Option<TlsAcceptorConfig>,
735    /// Features: enable active intervention system routes
736    active_intervention: bool,
737    /// Optional policy registry for hot-reloads
738    policy_registry: Option<ranvier_core::policy::PolicyRegistry>,
739    _phantom: std::marker::PhantomData<R>,
740}
741
742impl<R> HttpIngress<R>
743where
744    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
745{
746    /// Create a new empty HttpIngress builder.
747    pub fn new() -> Self {
748        Self {
749            addr: None,
750            routes: Vec::new(),
751            fallback: None,
752            layers: Vec::new(),
753            on_start: None,
754            on_shutdown: None,
755            graceful_shutdown_timeout: Duration::from_secs(30),
756            bus_injectors: Vec::new(),
757            static_assets: StaticAssetsConfig::default(),
758            health: HealthConfig::default(),
759            #[cfg(feature = "tls")]
760            tls_config: None,
761            #[cfg(feature = "http3")]
762            http3_config: None,
763            #[cfg(feature = "http3")]
764            alt_svc_h3_port: None,
765            active_intervention: false,
766            policy_registry: None,
767            _phantom: std::marker::PhantomData,
768        }
769    }
770
771    /// Set the bind address for the server.
772    pub fn bind(mut self, addr: impl Into<String>) -> Self {
773        self.addr = Some(addr.into());
774        self
775    }
776
777    /// Enable active intervention endpoints (`/_system/intervene/*`).
778    /// These endpoints allow external tooling (like Ranvier Studio) to pause,
779    /// inspect, and forcefully resume or re-route in-flight workflow instances.
780    pub fn active_intervention(mut self) -> Self {
781        self.active_intervention = true;
782        self
783    }
784
785    /// Attach a policy registry for hot-reloads.
786    pub fn policy_registry(mut self, registry: ranvier_core::policy::PolicyRegistry) -> Self {
787        self.policy_registry = Some(registry);
788        self
789    }
790
791    /// Register a lifecycle callback invoked when the server starts listening.
792    pub fn on_start<F>(mut self, callback: F) -> Self
793    where
794        F: Fn() + Send + Sync + 'static,
795    {
796        self.on_start = Some(Arc::new(callback));
797        self
798    }
799
800    /// Register a lifecycle callback invoked after graceful shutdown completes.
801    pub fn on_shutdown<F>(mut self, callback: F) -> Self
802    where
803        F: Fn() + Send + Sync + 'static,
804    {
805        self.on_shutdown = Some(Arc::new(callback));
806        self
807    }
808
809    /// Configure graceful shutdown timeout for in-flight request draining.
810    pub fn graceful_shutdown(mut self, timeout: Duration) -> Self {
811        self.graceful_shutdown_timeout = timeout;
812        self
813    }
814
815    /// Apply a `RanvierConfig` to this builder.
816    ///
817    /// Reads server settings (bind address, shutdown timeout) from the config.
818    /// Logging should be initialized separately via `config.init_logging()`.
819    pub fn config(mut self, config: &ranvier_core::config::RanvierConfig) -> Self {
820        self.addr = Some(config.bind_addr());
821        self.graceful_shutdown_timeout = config.shutdown_timeout();
822        self
823    }
824
825    /// Enable TLS with certificate and key PEM files (requires `tls` feature).
826    #[cfg(feature = "tls")]
827    pub fn tls(mut self, cert_path: impl Into<String>, key_path: impl Into<String>) -> Self {
828        self.tls_config = Some(TlsAcceptorConfig {
829            cert_path: cert_path.into(),
830            key_path: key_path.into(),
831        });
832        self
833    }
834
835    /// Add built-in timeout middleware that returns `408 Request Timeout`
836    /// when the inner service call exceeds `timeout`.
837    pub fn timeout_layer(mut self, timeout: Duration) -> Self {
838        self.layers.push(timeout_middleware(timeout));
839        self
840    }
841
842    /// Add built-in request-id middleware.
843    ///
844    /// Ensures `x-request-id` exists on request and response headers.
845    pub fn request_id_layer(mut self) -> Self {
846        self.layers.push(request_id_middleware());
847        self
848    }
849
850    /// Register a request-context injector executed before each circuit run.
851    ///
852    /// Use this to bridge adapter-layer context (request extensions/headers)
853    /// into explicit Bus resources consumed by Transitions.
854    pub fn bus_injector<F>(mut self, injector: F) -> Self
855    where
856        F: Fn(&http::request::Parts, &mut Bus) + Send + Sync + 'static,
857    {
858        self.bus_injectors.push(Arc::new(injector));
859        self
860    }
861
862    /// Configure HTTP/3 QUIC support.
863    #[cfg(feature = "http3")]
864    pub fn enable_http3(mut self, config: crate::http3::Http3Config) -> Self {
865        self.http3_config = Some(config);
866        self
867    }
868
869    /// Automatically injects the `Alt-Svc` header into responses to signal HTTP/3 availability.
870    #[cfg(feature = "http3")]
871    pub fn alt_svc_h3(mut self, port: u16) -> Self {
872        self.alt_svc_h3_port = Some(port);
873        self
874    }
875
876    /// Export route metadata snapshot for external tooling.
877    pub fn route_descriptors(&self) -> Vec<HttpRouteDescriptor> {
878        let mut descriptors = self
879            .routes
880            .iter()
881            .map(|entry| HttpRouteDescriptor::new(entry.method.clone(), entry.pattern.raw.clone()))
882            .collect::<Vec<_>>();
883
884        if let Some(path) = &self.health.health_path {
885            descriptors.push(HttpRouteDescriptor::new(Method::GET, path.clone()));
886        }
887        if let Some(path) = &self.health.readiness_path {
888            descriptors.push(HttpRouteDescriptor::new(Method::GET, path.clone()));
889        }
890        if let Some(path) = &self.health.liveness_path {
891            descriptors.push(HttpRouteDescriptor::new(Method::GET, path.clone()));
892        }
893
894        descriptors
895    }
896
897    /// Mount a static directory under a path prefix.
898    ///
899    /// Example: `.serve_dir("/static", "./public")`.
900    pub fn serve_dir(
901        mut self,
902        route_prefix: impl Into<String>,
903        directory: impl Into<String>,
904    ) -> Self {
905        self.static_assets.mounts.push(StaticMount {
906            route_prefix: normalize_route_path(route_prefix.into()),
907            directory: directory.into(),
908        });
909        if self.static_assets.cache_control.is_none() {
910            self.static_assets.cache_control = Some("public, max-age=3600".to_string());
911        }
912        self
913    }
914
915    /// Configure SPA fallback file for unmatched GET/HEAD routes.
916    ///
917    /// Example: `.spa_fallback("./public/index.html")`.
918    pub fn spa_fallback(mut self, file_path: impl Into<String>) -> Self {
919        self.static_assets.spa_fallback = Some(file_path.into());
920        self
921    }
922
923    /// Override default Cache-Control for static responses.
924    pub fn static_cache_control(mut self, cache_control: impl Into<String>) -> Self {
925        self.static_assets.cache_control = Some(cache_control.into());
926        self
927    }
928
929    /// Enable gzip response compression for static assets.
930    pub fn compression_layer(mut self) -> Self {
931        self.static_assets.enable_compression = true;
932        self
933    }
934
935    /// Register a WebSocket upgrade endpoint and session handler.
936    ///
937    /// The handler receives:
938    /// 1) a `WebSocketConnection` implementing `EventSource`/`EventSink`,
939    /// 2) shared resources (`Arc<R>`),
940    /// 3) a connection-scoped `Bus` with request injectors + `WebSocketSessionContext`.
941    pub fn ws<H, Fut>(mut self, path: impl Into<String>, handler: H) -> Self
942    where
943        H: Fn(WebSocketConnection, Arc<R>, Bus) -> Fut + Send + Sync + 'static,
944        Fut: Future<Output = ()> + Send + 'static,
945    {
946        let path_str: String = path.into();
947        let ws_handler: WsSessionHandler<R> = Arc::new(move |connection, resources, bus| {
948            Box::pin(handler(connection, resources, bus))
949        });
950        let bus_injectors = Arc::new(self.bus_injectors.clone());
951        let path_for_pattern = path_str.clone();
952        let path_for_handler = path_str;
953
954        let route_handler: RouteHandler<R> =
955            Arc::new(move |parts: http::request::Parts, res: &R| {
956                let ws_handler = ws_handler.clone();
957                let bus_injectors = bus_injectors.clone();
958                let resources = Arc::new(res.clone());
959                let path = path_for_handler.clone();
960
961                Box::pin(async move {
962                    let request_id = uuid::Uuid::new_v4().to_string();
963                    let span = tracing::info_span!(
964                        "WebSocketUpgrade",
965                        ranvier.ws.path = %path,
966                        ranvier.ws.request_id = %request_id
967                    );
968
969                    async move {
970                        let mut bus = Bus::new();
971                        for injector in bus_injectors.iter() {
972                            injector(&parts, &mut bus);
973                        }
974
975                        // Reconstruct a dummy Request for WebSocket extraction
976                        let mut req = Request::from_parts(parts, ());
977                        let session = websocket_session_from_request(&req);
978                        bus.insert(session.clone());
979
980                        let (response, on_upgrade) = match websocket_upgrade_response(&mut req) {
981                            Ok(result) => result,
982                            Err(error_response) => return error_response,
983                        };
984
985                        tokio::spawn(async move {
986                            match on_upgrade.await {
987                                Ok(upgraded) => {
988                                    let stream = WebSocketStream::from_raw_socket(
989                                        TokioIo::new(upgraded),
990                                        tokio_tungstenite::tungstenite::protocol::Role::Server,
991                                        None,
992                                    )
993                                    .await;
994                                    let connection = WebSocketConnection::new(stream, session);
995                                    ws_handler(connection, resources, bus).await;
996                                }
997                                Err(error) => {
998                                    tracing::warn!(
999                                        ranvier.ws.path = %path,
1000                                        ranvier.ws.error = %error,
1001                                        "websocket upgrade failed"
1002                                    );
1003                                }
1004                            }
1005                        });
1006
1007                        response
1008                    }
1009                    .instrument(span)
1010                    .await
1011                }) as Pin<Box<dyn Future<Output = HttpResponse> + Send>>
1012            });
1013
1014        self.routes.push(RouteEntry {
1015            method: Method::GET,
1016            pattern: RoutePattern::parse(&path_for_pattern),
1017            handler: route_handler,
1018            layers: Arc::new(Vec::new()),
1019            apply_global_layers: true,
1020        });
1021
1022        self
1023    }
1024
1025    /// Enable built-in health endpoint at the given path.
1026    ///
1027    /// The endpoint returns JSON with status and check results.
1028    /// If no checks are registered, status is always `ok`.
1029    pub fn health_endpoint(mut self, path: impl Into<String>) -> Self {
1030        self.health.health_path = Some(normalize_route_path(path.into()));
1031        self
1032    }
1033
1034    /// Register an async health check used by `/health` and `/ready` probes.
1035    ///
1036    /// `Err` values are converted to strings and surfaced in the JSON response.
1037    pub fn health_check<F, Fut, Err>(mut self, name: impl Into<String>, check: F) -> Self
1038    where
1039        F: Fn(Arc<R>) -> Fut + Send + Sync + 'static,
1040        Fut: Future<Output = Result<(), Err>> + Send + 'static,
1041        Err: ToString + Send + 'static,
1042    {
1043        if self.health.health_path.is_none() {
1044            self.health.health_path = Some("/health".to_string());
1045        }
1046
1047        let check_fn: HealthCheckFn<R> = Arc::new(move |resources: Arc<R>| {
1048            let fut = check(resources);
1049            Box::pin(async move { fut.await.map_err(|error| error.to_string()) })
1050        });
1051
1052        self.health.checks.push(NamedHealthCheck {
1053            name: name.into(),
1054            check: check_fn,
1055        });
1056        self
1057    }
1058
1059    /// Enable readiness/liveness probe separation with explicit paths.
1060    pub fn readiness_liveness(
1061        mut self,
1062        readiness_path: impl Into<String>,
1063        liveness_path: impl Into<String>,
1064    ) -> Self {
1065        self.health.readiness_path = Some(normalize_route_path(readiness_path.into()));
1066        self.health.liveness_path = Some(normalize_route_path(liveness_path.into()));
1067        self
1068    }
1069
1070    /// Enable readiness/liveness probes at `/ready` and `/live`.
1071    pub fn readiness_liveness_default(self) -> Self {
1072        self.readiness_liveness("/ready", "/live")
1073    }
1074
1075    /// Register a route with GET method.
1076    pub fn route<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1077    where
1078        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1079        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1080    {
1081        self.route_method(Method::GET, path, circuit)
1082    }
1083    /// Register a route with a specific HTTP method.
1084    ///
1085    /// # Example
1086    ///
1087    /// ```rust,ignore
1088    /// Ranvier::http()
1089    ///     .route_method(Method::POST, "/users", create_user_circuit)
1090    /// ```
1091    pub fn route_method<Out, E>(
1092        self,
1093        method: Method,
1094        path: impl Into<String>,
1095        circuit: Axon<(), Out, E, R>,
1096    ) -> Self
1097    where
1098        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1099        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1100    {
1101        self.route_method_with_error(method, path, circuit, |error| {
1102            (
1103                StatusCode::INTERNAL_SERVER_ERROR,
1104                format!("Error: {:?}", error),
1105            )
1106                .into_response()
1107        })
1108    }
1109
1110    pub fn route_method_with_error<Out, E, H>(
1111        self,
1112        method: Method,
1113        path: impl Into<String>,
1114        circuit: Axon<(), Out, E, R>,
1115        error_handler: H,
1116    ) -> Self
1117    where
1118        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1119        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1120        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1121    {
1122        self.route_method_with_error_and_layers(
1123            method,
1124            path,
1125            circuit,
1126            error_handler,
1127            Arc::new(Vec::new()),
1128            true,
1129        )
1130    }
1131
1132
1133
1134    fn route_method_with_error_and_layers<Out, E, H>(
1135        mut self,
1136        method: Method,
1137        path: impl Into<String>,
1138        circuit: Axon<(), Out, E, R>,
1139        error_handler: H,
1140        route_layers: Arc<Vec<ServiceLayer>>,
1141        apply_global_layers: bool,
1142    ) -> Self
1143    where
1144        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1145        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1146        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1147    {
1148        let path_str: String = path.into();
1149        let circuit = Arc::new(circuit);
1150        let error_handler = Arc::new(error_handler);
1151        let route_bus_injectors = Arc::new(self.bus_injectors.clone());
1152        let path_for_pattern = path_str.clone();
1153        let path_for_handler = path_str;
1154        let method_for_pattern = method.clone();
1155        let method_for_handler = method;
1156
1157        let handler: RouteHandler<R> = Arc::new(move |parts: http::request::Parts, res: &R| {
1158            let circuit = circuit.clone();
1159            let error_handler = error_handler.clone();
1160            let route_bus_injectors = route_bus_injectors.clone();
1161            let res = res.clone();
1162            let path = path_for_handler.clone();
1163            let method = method_for_handler.clone();
1164
1165            Box::pin(async move {
1166                let request_id = uuid::Uuid::new_v4().to_string();
1167                let span = tracing::info_span!(
1168                    "HTTPRequest",
1169                    ranvier.http.method = %method,
1170                    ranvier.http.path = %path,
1171                    ranvier.http.request_id = %request_id
1172                );
1173
1174                async move {
1175                    let mut bus = Bus::new();
1176                    for injector in route_bus_injectors.iter() {
1177                        injector(&parts, &mut bus);
1178                    }
1179                    let result = circuit.execute((), &res, &mut bus).await;
1180                    outcome_to_response_with_error(result, |error| error_handler(error))
1181                }
1182                .instrument(span)
1183                .await
1184            }) as Pin<Box<dyn Future<Output = HttpResponse> + Send>>
1185        });
1186
1187        self.routes.push(RouteEntry {
1188            method: method_for_pattern,
1189            pattern: RoutePattern::parse(&path_for_pattern),
1190            handler,
1191            layers: route_layers,
1192            apply_global_layers,
1193        });
1194        self
1195    }
1196
1197    pub fn get<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1198    where
1199        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1200        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1201    {
1202        self.route_method(Method::GET, path, circuit)
1203    }
1204
1205    pub fn get_with_error<Out, E, H>(
1206        self,
1207        path: impl Into<String>,
1208        circuit: Axon<(), Out, E, R>,
1209        error_handler: H,
1210    ) -> Self
1211    where
1212        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1213        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1214        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1215    {
1216        self.route_method_with_error(Method::GET, path, circuit, error_handler)
1217    }
1218
1219    pub fn post<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1220    where
1221        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1222        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1223    {
1224        self.route_method(Method::POST, path, circuit)
1225    }
1226
1227    pub fn put<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1228    where
1229        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1230        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1231    {
1232        self.route_method(Method::PUT, path, circuit)
1233    }
1234
1235    pub fn delete<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1236    where
1237        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1238        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1239    {
1240        self.route_method(Method::DELETE, path, circuit)
1241    }
1242
1243    pub fn patch<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1244    where
1245        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1246        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1247    {
1248        self.route_method(Method::PATCH, path, circuit)
1249    }
1250
1251    pub fn post_with_error<Out, E, H>(
1252        self,
1253        path: impl Into<String>,
1254        circuit: Axon<(), Out, E, R>,
1255        error_handler: H,
1256    ) -> Self
1257    where
1258        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1259        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1260        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1261    {
1262        self.route_method_with_error(Method::POST, path, circuit, error_handler)
1263    }
1264
1265    pub fn put_with_error<Out, E, H>(
1266        self,
1267        path: impl Into<String>,
1268        circuit: Axon<(), Out, E, R>,
1269        error_handler: H,
1270    ) -> Self
1271    where
1272        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1273        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1274        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1275    {
1276        self.route_method_with_error(Method::PUT, path, circuit, error_handler)
1277    }
1278
1279    pub fn delete_with_error<Out, E, H>(
1280        self,
1281        path: impl Into<String>,
1282        circuit: Axon<(), Out, E, R>,
1283        error_handler: H,
1284    ) -> Self
1285    where
1286        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1287        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1288        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1289    {
1290        self.route_method_with_error(Method::DELETE, path, circuit, error_handler)
1291    }
1292
1293    pub fn patch_with_error<Out, E, H>(
1294        self,
1295        path: impl Into<String>,
1296        circuit: Axon<(), Out, E, R>,
1297        error_handler: H,
1298    ) -> Self
1299    where
1300        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1301        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1302        H: Fn(&E) -> HttpResponse + Send + Sync + 'static,
1303    {
1304        self.route_method_with_error(Method::PATCH, path, circuit, error_handler)
1305    }
1306
1307    /// Set a fallback circuit for unmatched routes.
1308    ///
1309    /// # Example
1310    ///
1311    /// ```rust,ignore
1312    /// let not_found = Axon::new("NotFound").then(|_| async { "404 Not Found" });
1313    /// Ranvier::http()
1314    ///     .route("/", home)
1315    ///     .fallback(not_found)
1316    /// ```
1317    pub fn fallback<Out, E>(mut self, circuit: Axon<(), Out, E, R>) -> Self
1318    where
1319        Out: IntoResponse + Send + Sync + serde::Serialize + serde::de::DeserializeOwned + 'static,
1320        E: Send + Sync + serde::Serialize + serde::de::DeserializeOwned + std::fmt::Debug + 'static,
1321    {
1322        let circuit = Arc::new(circuit);
1323        let fallback_bus_injectors = Arc::new(self.bus_injectors.clone());
1324
1325        let handler: RouteHandler<R> = Arc::new(move |parts: http::request::Parts, res: &R| {
1326            let circuit = circuit.clone();
1327            let fallback_bus_injectors = fallback_bus_injectors.clone();
1328            let res = res.clone();
1329            Box::pin(async move {
1330                let request_id = uuid::Uuid::new_v4().to_string();
1331                let span = tracing::info_span!(
1332                    "HTTPRequest",
1333                    ranvier.http.method = "FALLBACK",
1334                    ranvier.http.request_id = %request_id
1335                );
1336
1337                async move {
1338                    let mut bus = Bus::new();
1339                    for injector in fallback_bus_injectors.iter() {
1340                        injector(&parts, &mut bus);
1341                    }
1342                    let result: ranvier_core::Outcome<Out, E> =
1343                        circuit.execute((), &res, &mut bus).await;
1344
1345                    match result {
1346                        Outcome::Next(output) => {
1347                            let mut response = output.into_response();
1348                            *response.status_mut() = StatusCode::NOT_FOUND;
1349                            response
1350                        }
1351                        _ => Response::builder()
1352                            .status(StatusCode::NOT_FOUND)
1353                            .body(
1354                                Full::new(Bytes::from("Not Found"))
1355                                    .map_err(|never| match never {})
1356                                    .boxed(),
1357                            )
1358                            .expect("valid HTTP response construction"),
1359                    }
1360                }
1361                .instrument(span)
1362                .await
1363            }) as Pin<Box<dyn Future<Output = HttpResponse> + Send>>
1364        });
1365
1366        self.fallback = Some(handler);
1367        self
1368    }
1369
1370    /// Run the HTTP server with required resources.
1371    pub async fn run(self, resources: R) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1372        self.run_with_shutdown_signal(resources, shutdown_signal())
1373            .await
1374    }
1375
1376    async fn run_with_shutdown_signal<S>(
1377        self,
1378        resources: R,
1379        shutdown_signal: S,
1380    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
1381    where
1382        S: Future<Output = ()> + Send,
1383    {
1384        let addr_str = self.addr.as_deref().unwrap_or("127.0.0.1:3000");
1385        let addr: SocketAddr = addr_str.parse()?;
1386
1387        let mut raw_routes = self.routes;
1388        if self.active_intervention {
1389            let handler: RouteHandler<R> = Arc::new(|_parts, _res| {
1390                Box::pin(async move {
1391                    Response::builder()
1392                        .status(StatusCode::OK)
1393                        .body(
1394                            Full::new(Bytes::from("Intervention accepted"))
1395                                .map_err(|never| match never {} as Infallible)
1396                                .boxed(),
1397                        )
1398                        .expect("valid HTTP response construction")
1399                }) as Pin<Box<dyn Future<Output = HttpResponse> + Send>>
1400            });
1401
1402            raw_routes.push(RouteEntry {
1403                method: Method::POST,
1404                pattern: RoutePattern::parse("/_system/intervene/force_resume"),
1405                handler,
1406                layers: Arc::new(Vec::new()),
1407                apply_global_layers: true,
1408            });
1409        }
1410
1411        if let Some(registry) = self.policy_registry.clone() {
1412            let handler: RouteHandler<R> = Arc::new(move |_parts, _res| {
1413                let _registry = registry.clone();
1414                Box::pin(async move {
1415                    // This is a simplified reload endpoint.
1416                    // In a real implementation, it would parse JSON from the body.
1417                    // For now, we provide the infrastructure.
1418                    Response::builder()
1419                        .status(StatusCode::OK)
1420                        .body(
1421                            Full::new(Bytes::from("Policy registry active"))
1422                                .map_err(|never| match never {} as Infallible)
1423                                .boxed(),
1424                        )
1425                        .expect("valid HTTP response construction")
1426                }) as Pin<Box<dyn Future<Output = HttpResponse> + Send>>
1427            });
1428
1429            raw_routes.push(RouteEntry {
1430                method: Method::POST,
1431                pattern: RoutePattern::parse("/_system/policy/reload"),
1432                handler,
1433                layers: Arc::new(Vec::new()),
1434                apply_global_layers: true,
1435            });
1436        }
1437        let routes = Arc::new(raw_routes);
1438        let fallback = self.fallback;
1439        let layers = Arc::new(self.layers);
1440        let health = Arc::new(self.health);
1441        let static_assets = Arc::new(self.static_assets);
1442        let on_start = self.on_start;
1443        let on_shutdown = self.on_shutdown;
1444        let graceful_shutdown_timeout = self.graceful_shutdown_timeout;
1445        let resources = Arc::new(resources);
1446
1447        let listener = TcpListener::bind(addr).await?;
1448
1449        // Build optional TLS acceptor
1450        #[cfg(feature = "tls")]
1451        let tls_acceptor = if let Some(ref tls_cfg) = self.tls_config {
1452            let acceptor = build_tls_acceptor(&tls_cfg.cert_path, &tls_cfg.key_path)?;
1453            tracing::info!("Ranvier HTTP Ingress listening on https://{}", addr);
1454            Some(acceptor)
1455        } else {
1456            tracing::info!("Ranvier HTTP Ingress listening on http://{}", addr);
1457            None
1458        };
1459        #[cfg(not(feature = "tls"))]
1460        tracing::info!("Ranvier HTTP Ingress listening on http://{}", addr);
1461
1462        if let Some(callback) = on_start.as_ref() {
1463            callback();
1464        }
1465
1466        tokio::pin!(shutdown_signal);
1467        let mut connections = tokio::task::JoinSet::new();
1468
1469        loop {
1470            tokio::select! {
1471                _ = &mut shutdown_signal => {
1472                    tracing::info!("Shutdown signal received. Draining in-flight connections.");
1473                    break;
1474                }
1475                accept_result = listener.accept() => {
1476                    let (stream, _) = accept_result?;
1477
1478                    let routes = routes.clone();
1479                    let fallback = fallback.clone();
1480                    let resources = resources.clone();
1481                    let layers = layers.clone();
1482                    let health = health.clone();
1483                    let static_assets = static_assets.clone();
1484                    #[cfg(feature = "http3")]
1485                    let alt_svc_h3_port = self.alt_svc_h3_port;
1486
1487                    #[cfg(feature = "tls")]
1488                    let tls_acceptor = tls_acceptor.clone();
1489
1490                    connections.spawn(async move {
1491                        let service = build_http_service(
1492                            routes,
1493                            fallback,
1494                            resources,
1495                            layers,
1496                            health,
1497                            static_assets,
1498                            #[cfg(feature = "http3")] alt_svc_h3_port,
1499                        );
1500
1501                        #[cfg(feature = "tls")]
1502                        if let Some(acceptor) = tls_acceptor {
1503                            match acceptor.accept(stream).await {
1504                                Ok(tls_stream) => {
1505                                    let io = TokioIo::new(tls_stream);
1506                                    if let Err(err) = http1::Builder::new()
1507                                        .serve_connection(io, service)
1508                                        .with_upgrades()
1509                                        .await
1510                                    {
1511                                        tracing::error!("Error serving TLS connection: {:?}", err);
1512                                    }
1513                                }
1514                                Err(err) => {
1515                                    tracing::warn!("TLS handshake failed: {:?}", err);
1516                                }
1517                            }
1518                            return;
1519                        }
1520
1521                        let io = TokioIo::new(stream);
1522                        if let Err(err) = http1::Builder::new()
1523                            .serve_connection(io, service)
1524                            .with_upgrades()
1525                            .await
1526                        {
1527                            tracing::error!("Error serving connection: {:?}", err);
1528                        }
1529                    });
1530                }
1531                Some(join_result) = connections.join_next(), if !connections.is_empty() => {
1532                    if let Err(err) = join_result {
1533                        tracing::warn!("Connection task join error: {:?}", err);
1534                    }
1535                }
1536            }
1537        }
1538
1539        let _timed_out = drain_connections(&mut connections, graceful_shutdown_timeout).await;
1540
1541        drop(resources);
1542        if let Some(callback) = on_shutdown.as_ref() {
1543            callback();
1544        }
1545
1546        Ok(())
1547    }
1548
1549    /// Convert to a raw Hyper Service for integration with existing infrastructure.
1550    ///
1551    /// This is the "escape hatch" per Discussion 193:
1552    /// > "Raw API는 Flat API의 탈출구다."
1553    ///
1554    /// # Example
1555    ///
1556    /// ```rust,ignore
1557    /// let ingress = Ranvier::http()
1558    ///     .bind(":3000")
1559    ///     .route("/", circuit);
1560    ///
1561    /// let raw_service = ingress.into_raw_service();
1562    /// // Use raw_service with existing Hyper infrastructure
1563    /// ```
1564    pub fn into_raw_service(self, resources: R) -> RawIngressService<R> {
1565        let routes = Arc::new(self.routes);
1566        let fallback = self.fallback;
1567        let layers = Arc::new(self.layers);
1568        let health = Arc::new(self.health);
1569        let static_assets = Arc::new(self.static_assets);
1570        let resources = Arc::new(resources);
1571
1572        RawIngressService {
1573            routes,
1574            fallback,
1575            layers,
1576            health,
1577            static_assets,
1578            resources,
1579        }
1580    }
1581}
1582
1583fn build_http_service<R>(
1584    routes: Arc<Vec<RouteEntry<R>>>,
1585    fallback: Option<RouteHandler<R>>,
1586    resources: Arc<R>,
1587    layers: Arc<Vec<ServiceLayer>>,
1588    health: Arc<HealthConfig<R>>,
1589    static_assets: Arc<StaticAssetsConfig>,
1590    #[cfg(feature = "http3")] alt_svc_port: Option<u16>,
1591) -> BoxHttpService
1592where
1593    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1594{
1595    BoxService::new(move |req: Request<Incoming>| {
1596        let routes = routes.clone();
1597        let fallback = fallback.clone();
1598        let resources = resources.clone();
1599        let layers = layers.clone();
1600        let health = health.clone();
1601        let static_assets = static_assets.clone();
1602
1603        async move {
1604            let mut req = req;
1605            let method = req.method().clone();
1606            let path = req.uri().path().to_string();
1607
1608            if let Some(response) =
1609                maybe_handle_health_request(&method, &path, &health, resources.clone()).await
1610            {
1611                return Ok::<_, Infallible>(response.into_response());
1612            }
1613
1614            if let Some((entry, params)) = find_matching_route(routes.as_slice(), &method, &path) {
1615                req.extensions_mut().insert(params);
1616                let effective_layers = if entry.apply_global_layers {
1617                    merge_layers(&layers, &entry.layers)
1618                } else {
1619                    entry.layers.clone()
1620                };
1621
1622                if effective_layers.is_empty() {
1623                    let (parts, _) = req.into_parts();
1624                    #[allow(unused_mut)]
1625                    let mut res = (entry.handler)(parts, &resources).await;
1626                    #[cfg(feature = "http3")]
1627                    if let Some(port) = alt_svc_port {
1628                        if let Ok(val) =
1629                            http::HeaderValue::from_str(&format!("h3=\":{}\"; ma=86400", port))
1630                        {
1631                            res.headers_mut().insert(http::header::ALT_SVC, val);
1632                        }
1633                    }
1634                    Ok::<_, Infallible>(res)
1635                } else {
1636                    let route_service = build_route_service(
1637                        entry.handler.clone(),
1638                        resources.clone(),
1639                        effective_layers,
1640                    );
1641                    #[allow(unused_mut)]
1642                    let mut res = route_service.call(req).await;
1643                    #[cfg(feature = "http3")]
1644                    #[allow(irrefutable_let_patterns)]
1645                    if let Ok(ref mut r) = res {
1646                        if let Some(port) = alt_svc_port {
1647                            if let Ok(val) =
1648                                http::HeaderValue::from_str(&format!("h3=\":{}\"; ma=86400", port))
1649                            {
1650                                r.headers_mut().insert(http::header::ALT_SVC, val);
1651                            }
1652                        }
1653                    }
1654                    res
1655                }
1656            } else {
1657                let req =
1658                    match maybe_handle_static_request(req, &method, &path, static_assets.as_ref())
1659                        .await
1660                    {
1661                        Ok(req) => req,
1662                        Err(response) => return Ok(response),
1663                    };
1664
1665                #[allow(unused_mut)]
1666                let mut fallback_res = if let Some(ref fb) = fallback {
1667                    if layers.is_empty() {
1668                        let (parts, _) = req.into_parts();
1669                        Ok(fb(parts, &resources).await)
1670                    } else {
1671                        let fallback_service =
1672                            build_route_service(fb.clone(), resources.clone(), layers.clone());
1673                        fallback_service.call(req).await
1674                    }
1675                } else {
1676                    Ok(Response::builder()
1677                        .status(StatusCode::NOT_FOUND)
1678                        .body(
1679                            Full::new(Bytes::from("Not Found"))
1680                                .map_err(|never| match never {})
1681                                .boxed(),
1682                        )
1683                        .expect("valid HTTP response construction"))
1684                };
1685
1686                #[cfg(feature = "http3")]
1687                if let Ok(r) = fallback_res.as_mut() {
1688                    if let Some(port) = alt_svc_port {
1689                        if let Ok(val) =
1690                            http::HeaderValue::from_str(&format!("h3=\":{}\"; ma=86400", port))
1691                        {
1692                            r.headers_mut().insert(http::header::ALT_SVC, val);
1693                        }
1694                    }
1695                }
1696
1697                fallback_res
1698            }
1699        }
1700    })
1701}
1702
1703fn build_route_service<R>(
1704    handler: RouteHandler<R>,
1705    resources: Arc<R>,
1706    layers: Arc<Vec<ServiceLayer>>,
1707) -> BoxHttpService
1708where
1709    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1710{
1711    let mut service = BoxService::new(move |req: Request<Incoming>| {
1712        let handler = handler.clone();
1713        let resources = resources.clone();
1714        async move {
1715            let (parts, _) = req.into_parts();
1716            Ok::<_, Infallible>(handler(parts, &resources).await)
1717        }
1718    });
1719
1720    for layer in layers.iter() {
1721        service = layer(service);
1722    }
1723    service
1724}
1725
1726fn merge_layers(
1727    global_layers: &Arc<Vec<ServiceLayer>>,
1728    route_layers: &Arc<Vec<ServiceLayer>>,
1729) -> Arc<Vec<ServiceLayer>> {
1730    if global_layers.is_empty() {
1731        return route_layers.clone();
1732    }
1733    if route_layers.is_empty() {
1734        return global_layers.clone();
1735    }
1736
1737    let mut combined = Vec::with_capacity(global_layers.len() + route_layers.len());
1738    combined.extend(global_layers.iter().cloned());
1739    combined.extend(route_layers.iter().cloned());
1740    Arc::new(combined)
1741}
1742
1743async fn maybe_handle_health_request<R>(
1744    method: &Method,
1745    path: &str,
1746    health: &HealthConfig<R>,
1747    resources: Arc<R>,
1748) -> Option<HttpResponse>
1749where
1750    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1751{
1752    if method != Method::GET {
1753        return None;
1754    }
1755
1756    if let Some(liveness_path) = health.liveness_path.as_ref() {
1757        if path == liveness_path {
1758            return Some(health_json_response("liveness", true, Vec::new()));
1759        }
1760    }
1761
1762    if let Some(readiness_path) = health.readiness_path.as_ref() {
1763        if path == readiness_path {
1764            let (healthy, checks) = run_named_health_checks(&health.checks, resources).await;
1765            return Some(health_json_response("readiness", healthy, checks));
1766        }
1767    }
1768
1769    if let Some(health_path) = health.health_path.as_ref() {
1770        if path == health_path {
1771            let (healthy, checks) = run_named_health_checks(&health.checks, resources).await;
1772            return Some(health_json_response("health", healthy, checks));
1773        }
1774    }
1775
1776    None
1777}
1778
1779/// Serve a single file from the filesystem with MIME type detection and ETag.
1780async fn serve_single_file(file_path: &str) -> Result<Response<Full<Bytes>>, std::io::Error> {
1781    let path = std::path::Path::new(file_path);
1782    let content = tokio::fs::read(path).await?;
1783    let mime = guess_mime(file_path);
1784    let mut response = Response::new(Full::new(Bytes::from(content)));
1785    if let Ok(value) = http::HeaderValue::from_str(mime) {
1786        response
1787            .headers_mut()
1788            .insert(http::header::CONTENT_TYPE, value);
1789    }
1790    if let Ok(metadata) = tokio::fs::metadata(path).await {
1791        if let Ok(modified) = metadata.modified() {
1792            if let Ok(duration) = modified.duration_since(std::time::UNIX_EPOCH) {
1793                let etag = format!("\"{}\"", duration.as_secs());
1794                if let Ok(value) = http::HeaderValue::from_str(&etag) {
1795                    response.headers_mut().insert(http::header::ETAG, value);
1796                }
1797            }
1798        }
1799    }
1800    Ok(response)
1801}
1802
1803/// Serve a file from a static directory with path traversal protection.
1804async fn serve_static_file(
1805    directory: &str,
1806    file_subpath: &str,
1807) -> Result<Response<Full<Bytes>>, std::io::Error> {
1808    let subpath = file_subpath.trim_start_matches('/');
1809    if subpath.is_empty() || subpath == "/" {
1810        return Err(std::io::Error::new(
1811            std::io::ErrorKind::NotFound,
1812            "empty path",
1813        ));
1814    }
1815    let full_path = std::path::Path::new(directory).join(subpath);
1816    // Path traversal protection
1817    let canonical = tokio::fs::canonicalize(&full_path).await?;
1818    let dir_canonical = tokio::fs::canonicalize(directory).await?;
1819    if !canonical.starts_with(&dir_canonical) {
1820        return Err(std::io::Error::new(
1821            std::io::ErrorKind::PermissionDenied,
1822            "path traversal detected",
1823        ));
1824    }
1825    let content = tokio::fs::read(&canonical).await?;
1826    let mime = guess_mime(canonical.to_str().unwrap_or(""));
1827    let mut response = Response::new(Full::new(Bytes::from(content)));
1828    if let Ok(value) = http::HeaderValue::from_str(mime) {
1829        response
1830            .headers_mut()
1831            .insert(http::header::CONTENT_TYPE, value);
1832    }
1833    if let Ok(metadata) = tokio::fs::metadata(&canonical).await {
1834        if let Ok(modified) = metadata.modified() {
1835            if let Ok(duration) = modified.duration_since(std::time::UNIX_EPOCH) {
1836                let etag = format!("\"{}\"", duration.as_secs());
1837                if let Ok(value) = http::HeaderValue::from_str(&etag) {
1838                    response.headers_mut().insert(http::header::ETAG, value);
1839                }
1840            }
1841        }
1842    }
1843    Ok(response)
1844}
1845
1846fn guess_mime(path: &str) -> &'static str {
1847    match path.rsplit('.').next().unwrap_or("") {
1848        "html" | "htm" => "text/html; charset=utf-8",
1849        "css" => "text/css; charset=utf-8",
1850        "js" | "mjs" => "application/javascript; charset=utf-8",
1851        "json" => "application/json; charset=utf-8",
1852        "png" => "image/png",
1853        "jpg" | "jpeg" => "image/jpeg",
1854        "gif" => "image/gif",
1855        "svg" => "image/svg+xml",
1856        "ico" => "image/x-icon",
1857        "woff" => "font/woff",
1858        "woff2" => "font/woff2",
1859        "ttf" => "font/ttf",
1860        "txt" => "text/plain; charset=utf-8",
1861        "xml" => "application/xml; charset=utf-8",
1862        "wasm" => "application/wasm",
1863        "pdf" => "application/pdf",
1864        _ => "application/octet-stream",
1865    }
1866}
1867
1868fn apply_cache_control(
1869    mut response: Response<Full<Bytes>>,
1870    cache_control: Option<&str>,
1871) -> Response<Full<Bytes>> {
1872    if response.status() == StatusCode::OK {
1873        if let Some(value) = cache_control {
1874            if !response.headers().contains_key(http::header::CACHE_CONTROL) {
1875                if let Ok(header_value) = http::HeaderValue::from_str(value) {
1876                    response
1877                        .headers_mut()
1878                        .insert(http::header::CACHE_CONTROL, header_value);
1879                }
1880            }
1881        }
1882    }
1883    response
1884}
1885
1886async fn maybe_handle_static_request(
1887    req: Request<Incoming>,
1888    method: &Method,
1889    path: &str,
1890    static_assets: &StaticAssetsConfig,
1891) -> Result<Request<Incoming>, HttpResponse> {
1892    if method != Method::GET && method != Method::HEAD {
1893        return Ok(req);
1894    }
1895
1896    if let Some(mount) = static_assets
1897        .mounts
1898        .iter()
1899        .find(|mount| strip_mount_prefix(path, &mount.route_prefix).is_some())
1900    {
1901        let accept_encoding = req.headers().get(http::header::ACCEPT_ENCODING).cloned();
1902        let Some(stripped_path) = strip_mount_prefix(path, &mount.route_prefix) else {
1903            return Ok(req);
1904        };
1905        let response = match serve_static_file(&mount.directory, &stripped_path).await {
1906            Ok(response) => response,
1907            Err(_) => {
1908                return Err(Response::builder()
1909                    .status(StatusCode::INTERNAL_SERVER_ERROR)
1910                    .body(
1911                        Full::new(Bytes::from("Failed to serve static asset"))
1912                            .map_err(|never| match never {})
1913                            .boxed(),
1914                    )
1915                    .unwrap_or_else(|_| {
1916                        Response::new(
1917                            Full::new(Bytes::new())
1918                                .map_err(|never| match never {})
1919                                .boxed(),
1920                        )
1921                    }));
1922            }
1923        };
1924        let mut response = apply_cache_control(response, static_assets.cache_control.as_deref());
1925        response = maybe_compress_static_response(
1926            response,
1927            accept_encoding,
1928            static_assets.enable_compression,
1929        );
1930        let (parts, body) = response.into_parts();
1931        return Err(Response::from_parts(
1932            parts,
1933            body.map_err(|never| match never {}).boxed(),
1934        ));
1935    }
1936
1937    if let Some(spa_file) = static_assets.spa_fallback.as_ref() {
1938        if looks_like_spa_request(path) {
1939            let accept_encoding = req.headers().get(http::header::ACCEPT_ENCODING).cloned();
1940            let response = match serve_single_file(spa_file).await {
1941                Ok(response) => response,
1942                Err(_) => {
1943                    return Err(Response::builder()
1944                        .status(StatusCode::INTERNAL_SERVER_ERROR)
1945                        .body(
1946                            Full::new(Bytes::from("Failed to serve SPA fallback"))
1947                                .map_err(|never| match never {})
1948                                .boxed(),
1949                        )
1950                        .unwrap_or_else(|_| {
1951                            Response::new(
1952                                Full::new(Bytes::new())
1953                                    .map_err(|never| match never {})
1954                                    .boxed(),
1955                            )
1956                        }));
1957                }
1958            };
1959            let mut response =
1960                apply_cache_control(response, static_assets.cache_control.as_deref());
1961            response = maybe_compress_static_response(
1962                response,
1963                accept_encoding,
1964                static_assets.enable_compression,
1965            );
1966            let (parts, body) = response.into_parts();
1967            return Err(Response::from_parts(
1968                parts,
1969                body.map_err(|never| match never {}).boxed(),
1970            ));
1971        }
1972    }
1973
1974    Ok(req)
1975}
1976
1977fn strip_mount_prefix(path: &str, prefix: &str) -> Option<String> {
1978    let normalized_prefix = if prefix == "/" {
1979        "/"
1980    } else {
1981        prefix.trim_end_matches('/')
1982    };
1983
1984    if normalized_prefix == "/" {
1985        return Some(path.to_string());
1986    }
1987
1988    if path == normalized_prefix {
1989        return Some("/".to_string());
1990    }
1991
1992    let with_slash = format!("{normalized_prefix}/");
1993    path.strip_prefix(&with_slash)
1994        .map(|stripped| format!("/{}", stripped))
1995}
1996
1997fn looks_like_spa_request(path: &str) -> bool {
1998    let tail = path.rsplit('/').next().unwrap_or_default();
1999    !tail.contains('.')
2000}
2001
2002fn maybe_compress_static_response(
2003    response: Response<Full<Bytes>>,
2004    accept_encoding: Option<http::HeaderValue>,
2005    enable_compression: bool,
2006) -> Response<Full<Bytes>> {
2007    if !enable_compression {
2008        return response;
2009    }
2010
2011    let Some(accept_encoding) = accept_encoding else {
2012        return response;
2013    };
2014
2015    let accept_str = accept_encoding.to_str().unwrap_or("");
2016    if !accept_str.contains("gzip") {
2017        return response;
2018    }
2019
2020    let status = response.status();
2021    let headers = response.headers().clone();
2022    let body = response.into_body();
2023
2024    // Full<Bytes> resolves immediately — collect synchronously via now_or_never()
2025    let data = futures_util::FutureExt::now_or_never(BodyExt::collect(body))
2026        .and_then(|r| r.ok())
2027        .map(|collected| collected.to_bytes())
2028        .unwrap_or_default();
2029
2030    // Compress with gzip
2031    let compressed = {
2032        use flate2::write::GzEncoder;
2033        use flate2::Compression;
2034        use std::io::Write;
2035        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
2036        let _ = encoder.write_all(&data);
2037        encoder.finish().unwrap_or_default()
2038    };
2039
2040    let mut builder = Response::builder().status(status);
2041    for (name, value) in headers.iter() {
2042        if name != http::header::CONTENT_LENGTH && name != http::header::CONTENT_ENCODING {
2043            builder = builder.header(name, value);
2044        }
2045    }
2046    builder
2047        .header(http::header::CONTENT_ENCODING, "gzip")
2048        .body(Full::new(Bytes::from(compressed)))
2049        .unwrap_or_else(|_| Response::new(Full::new(Bytes::new())))
2050}
2051
2052async fn run_named_health_checks<R>(
2053    checks: &[NamedHealthCheck<R>],
2054    resources: Arc<R>,
2055) -> (bool, Vec<HealthCheckReport>)
2056where
2057    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
2058{
2059    let mut reports = Vec::with_capacity(checks.len());
2060    let mut healthy = true;
2061
2062    for check in checks {
2063        match (check.check)(resources.clone()).await {
2064            Ok(()) => reports.push(HealthCheckReport {
2065                name: check.name.clone(),
2066                status: "ok",
2067                error: None,
2068            }),
2069            Err(error) => {
2070                healthy = false;
2071                reports.push(HealthCheckReport {
2072                    name: check.name.clone(),
2073                    status: "error",
2074                    error: Some(error),
2075                });
2076            }
2077        }
2078    }
2079
2080    (healthy, reports)
2081}
2082
2083fn health_json_response(
2084    probe: &'static str,
2085    healthy: bool,
2086    checks: Vec<HealthCheckReport>,
2087) -> HttpResponse {
2088    let status_code = if healthy {
2089        StatusCode::OK
2090    } else {
2091        StatusCode::SERVICE_UNAVAILABLE
2092    };
2093    let status = if healthy { "ok" } else { "degraded" };
2094    let payload = HealthReport {
2095        status,
2096        probe,
2097        checks,
2098    };
2099
2100    let body = serde_json::to_vec(&payload)
2101        .unwrap_or_else(|_| br#"{"status":"error","probe":"health"}"#.to_vec());
2102
2103    Response::builder()
2104        .status(status_code)
2105        .header(http::header::CONTENT_TYPE, "application/json")
2106        .body(
2107            Full::new(Bytes::from(body))
2108                .map_err(|never| match never {})
2109                .boxed(),
2110        )
2111        .expect("valid HTTP response construction")
2112}
2113
2114async fn shutdown_signal() {
2115    #[cfg(unix)]
2116    {
2117        use tokio::signal::unix::{SignalKind, signal};
2118
2119        match signal(SignalKind::terminate()) {
2120            Ok(mut terminate) => {
2121                tokio::select! {
2122                    _ = tokio::signal::ctrl_c() => {}
2123                    _ = terminate.recv() => {}
2124                }
2125            }
2126            Err(err) => {
2127                tracing::warn!("Failed to install SIGTERM handler: {:?}", err);
2128                if let Err(ctrl_c_err) = tokio::signal::ctrl_c().await {
2129                    tracing::warn!("Failed to listen for Ctrl+C: {:?}", ctrl_c_err);
2130                }
2131            }
2132        }
2133    }
2134
2135    #[cfg(not(unix))]
2136    {
2137        if let Err(err) = tokio::signal::ctrl_c().await {
2138            tracing::warn!("Failed to listen for Ctrl+C: {:?}", err);
2139        }
2140    }
2141}
2142
2143async fn drain_connections(
2144    connections: &mut tokio::task::JoinSet<()>,
2145    graceful_shutdown_timeout: Duration,
2146) -> bool {
2147    if connections.is_empty() {
2148        return false;
2149    }
2150
2151    let drain_result = tokio::time::timeout(graceful_shutdown_timeout, async {
2152        while let Some(join_result) = connections.join_next().await {
2153            if let Err(err) = join_result {
2154                tracing::warn!("Connection task join error during shutdown: {:?}", err);
2155            }
2156        }
2157    })
2158    .await;
2159
2160    if drain_result.is_err() {
2161        tracing::warn!(
2162            "Graceful shutdown timeout reached ({:?}). Aborting remaining connections.",
2163            graceful_shutdown_timeout
2164        );
2165        connections.abort_all();
2166        while let Some(join_result) = connections.join_next().await {
2167            if let Err(err) = join_result {
2168                tracing::warn!("Connection task abort join error: {:?}", err);
2169            }
2170        }
2171        true
2172    } else {
2173        false
2174    }
2175}
2176
2177/// Build a TLS acceptor from PEM certificate and key files.
2178#[cfg(feature = "tls")]
2179fn build_tls_acceptor(
2180    cert_path: &str,
2181    key_path: &str,
2182) -> Result<tokio_rustls::TlsAcceptor, Box<dyn std::error::Error + Send + Sync>> {
2183    use rustls::ServerConfig;
2184    use rustls_pemfile::{certs, private_key};
2185    use std::io::BufReader;
2186    use tokio_rustls::TlsAcceptor;
2187
2188    let cert_file = std::fs::File::open(cert_path)
2189        .map_err(|e| format!("Failed to open certificate file '{}': {}", cert_path, e))?;
2190    let key_file = std::fs::File::open(key_path)
2191        .map_err(|e| format!("Failed to open key file '{}': {}", key_path, e))?;
2192
2193    let cert_chain: Vec<_> = certs(&mut BufReader::new(cert_file))
2194        .collect::<Result<Vec<_>, _>>()
2195        .map_err(|e| format!("Failed to parse certificate PEM: {}", e))?;
2196
2197    let key = private_key(&mut BufReader::new(key_file))
2198        .map_err(|e| format!("Failed to parse private key PEM: {}", e))?
2199        .ok_or("No private key found in key file")?;
2200
2201    let config = ServerConfig::builder()
2202        .with_no_client_auth()
2203        .with_single_cert(cert_chain, key)
2204        .map_err(|e| format!("TLS configuration error: {}", e))?;
2205
2206    Ok(TlsAcceptor::from(Arc::new(config)))
2207}
2208
2209impl<R> Default for HttpIngress<R>
2210where
2211    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
2212{
2213    fn default() -> Self {
2214        Self::new()
2215    }
2216}
2217
2218/// Internal service type for `into_raw_service()`
2219#[derive(Clone)]
2220pub struct RawIngressService<R> {
2221    routes: Arc<Vec<RouteEntry<R>>>,
2222    fallback: Option<RouteHandler<R>>,
2223    layers: Arc<Vec<ServiceLayer>>,
2224    health: Arc<HealthConfig<R>>,
2225    static_assets: Arc<StaticAssetsConfig>,
2226    resources: Arc<R>,
2227}
2228
2229impl<R> hyper::service::Service<Request<Incoming>> for RawIngressService<R>
2230where
2231    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
2232{
2233    type Response = HttpResponse;
2234    type Error = Infallible;
2235    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
2236
2237    fn call(&self, req: Request<Incoming>) -> Self::Future {
2238        let routes = self.routes.clone();
2239        let fallback = self.fallback.clone();
2240        let layers = self.layers.clone();
2241        let health = self.health.clone();
2242        let static_assets = self.static_assets.clone();
2243        let resources = self.resources.clone();
2244
2245        Box::pin(async move {
2246            let service = build_http_service(
2247                routes,
2248                fallback,
2249                resources,
2250                layers,
2251                health,
2252                static_assets,
2253                #[cfg(feature = "http3")]
2254                None,
2255            );
2256            service.call(req).await
2257        })
2258    }
2259}
2260
2261#[cfg(test)]
2262mod tests {
2263    use super::*;
2264    use async_trait::async_trait;
2265    use futures_util::{SinkExt, StreamExt};
2266    use serde::Deserialize;
2267    use std::fs;
2268    use std::sync::atomic::{AtomicBool, Ordering};
2269    use tempfile::tempdir;
2270    use tokio::io::{AsyncReadExt, AsyncWriteExt};
2271    use tokio_tungstenite::tungstenite::Message as WsClientMessage;
2272    use tokio_tungstenite::tungstenite::client::IntoClientRequest;
2273
2274    async fn connect_with_retry(addr: std::net::SocketAddr) -> tokio::net::TcpStream {
2275        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
2276
2277        loop {
2278            match tokio::net::TcpStream::connect(addr).await {
2279                Ok(stream) => return stream,
2280                Err(error) => {
2281                    if tokio::time::Instant::now() >= deadline {
2282                        panic!("connect server: {error}");
2283                    }
2284                    tokio::time::sleep(Duration::from_millis(25)).await;
2285                }
2286            }
2287        }
2288    }
2289
2290    #[test]
2291    fn route_pattern_matches_static_path() {
2292        let pattern = RoutePattern::parse("/orders/list");
2293        let params = pattern.match_path("/orders/list").expect("should match");
2294        assert!(params.into_inner().is_empty());
2295    }
2296
2297    #[test]
2298    fn route_pattern_matches_param_segments() {
2299        let pattern = RoutePattern::parse("/orders/:id/items/:item_id");
2300        let params = pattern
2301            .match_path("/orders/42/items/sku-123")
2302            .expect("should match");
2303        assert_eq!(params.get("id"), Some("42"));
2304        assert_eq!(params.get("item_id"), Some("sku-123"));
2305    }
2306
2307    #[test]
2308    fn route_pattern_matches_wildcard_segment() {
2309        let pattern = RoutePattern::parse("/assets/*path");
2310        let params = pattern
2311            .match_path("/assets/css/theme/light.css")
2312            .expect("should match");
2313        assert_eq!(params.get("path"), Some("css/theme/light.css"));
2314    }
2315
2316    #[test]
2317    fn route_pattern_rejects_non_matching_path() {
2318        let pattern = RoutePattern::parse("/orders/:id");
2319        assert!(pattern.match_path("/users/42").is_none());
2320    }
2321
2322    #[test]
2323    fn graceful_shutdown_timeout_defaults_to_30_seconds() {
2324        let ingress = HttpIngress::<()>::new();
2325        assert_eq!(ingress.graceful_shutdown_timeout, Duration::from_secs(30));
2326        assert!(ingress.layers.is_empty());
2327        assert!(ingress.bus_injectors.is_empty());
2328        assert!(ingress.static_assets.mounts.is_empty());
2329        assert!(ingress.on_start.is_none());
2330        assert!(ingress.on_shutdown.is_none());
2331    }
2332
2333    #[test]
2334    fn route_without_layer_keeps_empty_route_middleware_stack() {
2335        let ingress =
2336            HttpIngress::<()>::new().get("/ping", Axon::<(), (), String, ()>::new("Ping"));
2337        assert_eq!(ingress.routes.len(), 1);
2338        assert!(ingress.routes[0].layers.is_empty());
2339        assert!(ingress.routes[0].apply_global_layers);
2340    }
2341
2342    #[test]
2343    fn timeout_layer_registers_builtin_middleware() {
2344        let ingress = HttpIngress::<()>::new().timeout_layer(Duration::from_secs(1));
2345        assert_eq!(ingress.layers.len(), 1);
2346    }
2347
2348    #[test]
2349    fn request_id_layer_registers_builtin_middleware() {
2350        let ingress = HttpIngress::<()>::new().request_id_layer();
2351        assert_eq!(ingress.layers.len(), 1);
2352    }
2353
2354    #[test]
2355    fn compression_layer_registers_builtin_middleware() {
2356        let ingress = HttpIngress::<()>::new().compression_layer();
2357        assert!(ingress.static_assets.enable_compression);
2358    }
2359
2360    #[test]
2361    fn bus_injector_registration_adds_hook() {
2362        let ingress = HttpIngress::<()>::new().bus_injector(|_req, bus| {
2363            bus.insert("ok".to_string());
2364        });
2365        assert_eq!(ingress.bus_injectors.len(), 1);
2366    }
2367
2368    #[test]
2369    fn ws_route_registers_get_route_pattern() {
2370        let ingress =
2371            HttpIngress::<()>::new().ws("/ws/events", |_socket, _resources, _bus| async {});
2372        assert_eq!(ingress.routes.len(), 1);
2373        assert_eq!(ingress.routes[0].method, Method::GET);
2374        assert_eq!(ingress.routes[0].pattern.raw, "/ws/events");
2375    }
2376
2377    #[derive(Debug, Deserialize)]
2378    struct WsWelcomeFrame {
2379        connection_id: String,
2380        path: String,
2381        tenant: String,
2382    }
2383
2384    #[tokio::test]
2385    async fn ws_route_upgrades_and_bridges_event_source_sink_with_connection_bus() {
2386        let probe = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
2387        let addr = probe.local_addr().expect("local addr");
2388        drop(probe);
2389
2390        let ingress = HttpIngress::<()>::new()
2391            .bind(addr.to_string())
2392            .bus_injector(|req, bus| {
2393                if let Some(value) = req.headers.get("x-tenant-id").and_then(|v| v.to_str().ok()) {
2394                    bus.insert(value.to_string());
2395                }
2396            })
2397            .ws("/ws/echo", |mut socket, _resources, bus| async move {
2398                let tenant = bus
2399                    .read::<String>()
2400                    .cloned()
2401                    .unwrap_or_else(|| "unknown".to_string());
2402                if let Some(session) = bus.read::<WebSocketSessionContext>() {
2403                    let welcome = serde_json::json!({
2404                        "connection_id": session.connection_id().to_string(),
2405                        "path": session.path(),
2406                        "tenant": tenant,
2407                    });
2408                    let _ = socket.send_json(&welcome).await;
2409                }
2410
2411                while let Some(event) = socket.next_event().await {
2412                    match event {
2413                        WebSocketEvent::Text(text) => {
2414                            let _ = socket.send_event(format!("echo:{text}")).await;
2415                        }
2416                        WebSocketEvent::Binary(bytes) => {
2417                            let _ = socket.send_event(bytes).await;
2418                        }
2419                        WebSocketEvent::Close => break,
2420                        WebSocketEvent::Ping(_) | WebSocketEvent::Pong(_) => {}
2421                    }
2422                }
2423            });
2424
2425        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
2426        let server = tokio::spawn(async move {
2427            ingress
2428                .run_with_shutdown_signal((), async move {
2429                    let _ = shutdown_rx.await;
2430                })
2431                .await
2432        });
2433
2434        let ws_uri = format!("ws://{addr}/ws/echo?room=alpha");
2435        let mut ws_request = ws_uri
2436            .as_str()
2437            .into_client_request()
2438            .expect("ws client request");
2439        ws_request
2440            .headers_mut()
2441            .insert("x-tenant-id", http::HeaderValue::from_static("acme"));
2442        let (mut client, _response) = tokio_tungstenite::connect_async(ws_request)
2443            .await
2444            .expect("websocket connect");
2445
2446        let welcome = client
2447            .next()
2448            .await
2449            .expect("welcome frame")
2450            .expect("welcome frame ok");
2451        let welcome_text = match welcome {
2452            WsClientMessage::Text(text) => text.to_string(),
2453            other => panic!("expected text welcome frame, got {other:?}"),
2454        };
2455        let welcome_payload: WsWelcomeFrame =
2456            serde_json::from_str(&welcome_text).expect("welcome json");
2457        assert_eq!(welcome_payload.path, "/ws/echo");
2458        assert_eq!(welcome_payload.tenant, "acme");
2459        assert!(!welcome_payload.connection_id.is_empty());
2460
2461        client
2462            .send(WsClientMessage::Text("hello".into()))
2463            .await
2464            .expect("send text");
2465        let echo_text = client
2466            .next()
2467            .await
2468            .expect("echo text frame")
2469            .expect("echo text frame ok");
2470        assert_eq!(echo_text, WsClientMessage::Text("echo:hello".into()));
2471
2472        client
2473            .send(WsClientMessage::Binary(vec![1, 2, 3, 4].into()))
2474            .await
2475            .expect("send binary");
2476        let echo_binary = client
2477            .next()
2478            .await
2479            .expect("echo binary frame")
2480            .expect("echo binary frame ok");
2481        assert_eq!(
2482            echo_binary,
2483            WsClientMessage::Binary(vec![1, 2, 3, 4].into())
2484        );
2485
2486        client.close(None).await.expect("close websocket");
2487
2488        let _ = shutdown_tx.send(());
2489        server
2490            .await
2491            .expect("server join")
2492            .expect("server shutdown should succeed");
2493    }
2494
2495    #[test]
2496    fn route_descriptors_export_http_and_health_paths() {
2497        let ingress = HttpIngress::<()>::new()
2498            .get(
2499                "/orders/:id",
2500                Axon::<(), (), String, ()>::new("OrderById"),
2501            )
2502            .health_endpoint("/healthz")
2503            .readiness_liveness("/readyz", "/livez");
2504
2505        let descriptors = ingress.route_descriptors();
2506
2507        assert!(
2508            descriptors
2509                .iter()
2510                .any(|descriptor| descriptor.method() == Method::GET
2511                    && descriptor.path_pattern() == "/orders/:id")
2512        );
2513        assert!(
2514            descriptors
2515                .iter()
2516                .any(|descriptor| descriptor.method() == Method::GET
2517                    && descriptor.path_pattern() == "/healthz")
2518        );
2519        assert!(
2520            descriptors
2521                .iter()
2522                .any(|descriptor| descriptor.method() == Method::GET
2523                    && descriptor.path_pattern() == "/readyz")
2524        );
2525        assert!(
2526            descriptors
2527                .iter()
2528                .any(|descriptor| descriptor.method() == Method::GET
2529                    && descriptor.path_pattern() == "/livez")
2530        );
2531    }
2532
2533    #[tokio::test]
2534    async fn lifecycle_hooks_fire_on_start_and_shutdown() {
2535        let started = Arc::new(AtomicBool::new(false));
2536        let shutdown = Arc::new(AtomicBool::new(false));
2537
2538        let started_flag = started.clone();
2539        let shutdown_flag = shutdown.clone();
2540
2541        let ingress = HttpIngress::<()>::new()
2542            .bind("127.0.0.1:0")
2543            .on_start(move || {
2544                started_flag.store(true, Ordering::SeqCst);
2545            })
2546            .on_shutdown(move || {
2547                shutdown_flag.store(true, Ordering::SeqCst);
2548            })
2549            .graceful_shutdown(Duration::from_millis(50));
2550
2551        ingress
2552            .run_with_shutdown_signal((), async {
2553                tokio::time::sleep(Duration::from_millis(20)).await;
2554            })
2555            .await
2556            .expect("server should exit gracefully");
2557
2558        assert!(started.load(Ordering::SeqCst));
2559        assert!(shutdown.load(Ordering::SeqCst));
2560    }
2561
2562    #[tokio::test]
2563    async fn graceful_shutdown_drains_in_flight_requests_before_exit() {
2564        #[derive(Clone)]
2565        struct SlowDrainRoute;
2566
2567        #[async_trait]
2568        impl Transition<(), String> for SlowDrainRoute {
2569            type Error = String;
2570            type Resources = ();
2571
2572            async fn run(
2573                &self,
2574                _state: (),
2575                _resources: &Self::Resources,
2576                _bus: &mut Bus,
2577            ) -> Outcome<String, Self::Error> {
2578                tokio::time::sleep(Duration::from_millis(120)).await;
2579                Outcome::next("drained-ok".to_string())
2580            }
2581        }
2582
2583        let probe = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
2584        let addr = probe.local_addr().expect("local addr");
2585        drop(probe);
2586
2587        let ingress = HttpIngress::<()>::new()
2588            .bind(addr.to_string())
2589            .graceful_shutdown(Duration::from_millis(500))
2590            .get(
2591                "/drain",
2592                Axon::<(), (), String, ()>::new("SlowDrain").then(SlowDrainRoute),
2593            );
2594
2595        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
2596        let server = tokio::spawn(async move {
2597            ingress
2598                .run_with_shutdown_signal((), async move {
2599                    let _ = shutdown_rx.await;
2600                })
2601                .await
2602        });
2603
2604        let mut stream = connect_with_retry(addr).await;
2605        stream
2606            .write_all(b"GET /drain HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
2607            .await
2608            .expect("write request");
2609
2610        tokio::time::sleep(Duration::from_millis(20)).await;
2611        let _ = shutdown_tx.send(());
2612
2613        let mut buf = Vec::new();
2614        stream.read_to_end(&mut buf).await.expect("read response");
2615        let response = String::from_utf8_lossy(&buf);
2616        assert!(response.starts_with("HTTP/1.1 200"), "{response}");
2617        assert!(response.contains("drained-ok"), "{response}");
2618
2619        server
2620            .await
2621            .expect("server join")
2622            .expect("server shutdown should succeed");
2623    }
2624
2625    #[tokio::test]
2626    async fn serve_dir_serves_static_file_with_cache_and_metadata_headers() {
2627        let temp = tempdir().expect("tempdir");
2628        let root = temp.path().join("public");
2629        fs::create_dir_all(&root).expect("create dir");
2630        let file = root.join("hello.txt");
2631        fs::write(&file, "hello static").expect("write file");
2632
2633        let ingress =
2634            Ranvier::http::<()>().serve_dir("/static", root.to_string_lossy().to_string());
2635        let app = crate::test_harness::TestApp::new(ingress, ());
2636        let response = app
2637            .send(crate::test_harness::TestRequest::get("/static/hello.txt"))
2638            .await
2639            .expect("request should succeed");
2640
2641        assert_eq!(response.status(), StatusCode::OK);
2642        assert_eq!(response.text().expect("utf8"), "hello static");
2643        assert!(response.header("cache-control").is_some());
2644        let has_metadata_header =
2645            response.header("etag").is_some() || response.header("last-modified").is_some();
2646        assert!(has_metadata_header);
2647    }
2648
2649    #[tokio::test]
2650    async fn spa_fallback_returns_index_for_unmatched_path() {
2651        let temp = tempdir().expect("tempdir");
2652        let index = temp.path().join("index.html");
2653        fs::write(&index, "<html><body>spa</body></html>").expect("write index");
2654
2655        let ingress = Ranvier::http::<()>().spa_fallback(index.to_string_lossy().to_string());
2656        let app = crate::test_harness::TestApp::new(ingress, ());
2657        let response = app
2658            .send(crate::test_harness::TestRequest::get("/dashboard/settings"))
2659            .await
2660            .expect("request should succeed");
2661
2662        assert_eq!(response.status(), StatusCode::OK);
2663        assert!(response.text().expect("utf8").contains("spa"));
2664    }
2665
2666    #[tokio::test]
2667    async fn static_compression_layer_sets_content_encoding_for_gzip_client() {
2668        let temp = tempdir().expect("tempdir");
2669        let root = temp.path().join("public");
2670        fs::create_dir_all(&root).expect("create dir");
2671        let file = root.join("compressed.txt");
2672        fs::write(&file, "compress me ".repeat(400)).expect("write file");
2673
2674        let ingress = Ranvier::http::<()>()
2675            .serve_dir("/static", root.to_string_lossy().to_string())
2676            .compression_layer();
2677        let app = crate::test_harness::TestApp::new(ingress, ());
2678        let response = app
2679            .send(
2680                crate::test_harness::TestRequest::get("/static/compressed.txt")
2681                    .header("accept-encoding", "gzip"),
2682            )
2683            .await
2684            .expect("request should succeed");
2685
2686        assert_eq!(response.status(), StatusCode::OK);
2687        assert_eq!(
2688            response
2689                .header("content-encoding")
2690                .and_then(|value| value.to_str().ok()),
2691            Some("gzip")
2692        );
2693    }
2694
2695    #[tokio::test]
2696    async fn drain_connections_completes_before_timeout() {
2697        let mut connections = tokio::task::JoinSet::new();
2698        connections.spawn(async {
2699            tokio::time::sleep(Duration::from_millis(20)).await;
2700        });
2701
2702        let timed_out = drain_connections(&mut connections, Duration::from_millis(200)).await;
2703        assert!(!timed_out);
2704        assert!(connections.is_empty());
2705    }
2706
2707    #[tokio::test]
2708    async fn drain_connections_times_out_and_aborts() {
2709        let mut connections = tokio::task::JoinSet::new();
2710        connections.spawn(async {
2711            tokio::time::sleep(Duration::from_secs(10)).await;
2712        });
2713
2714        let timed_out = drain_connections(&mut connections, Duration::from_millis(10)).await;
2715        assert!(timed_out);
2716        assert!(connections.is_empty());
2717    }
2718
2719    #[tokio::test]
2720    async fn timeout_layer_returns_408_for_slow_route() {
2721        #[derive(Clone)]
2722        struct SlowRoute;
2723
2724        #[async_trait]
2725        impl Transition<(), String> for SlowRoute {
2726            type Error = String;
2727            type Resources = ();
2728
2729            async fn run(
2730                &self,
2731                _state: (),
2732                _resources: &Self::Resources,
2733                _bus: &mut Bus,
2734            ) -> Outcome<String, Self::Error> {
2735                tokio::time::sleep(Duration::from_millis(80)).await;
2736                Outcome::next("slow-ok".to_string())
2737            }
2738        }
2739
2740        let probe = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
2741        let addr = probe.local_addr().expect("local addr");
2742        drop(probe);
2743
2744        let ingress = HttpIngress::<()>::new()
2745            .bind(addr.to_string())
2746            .timeout_layer(Duration::from_millis(10))
2747            .get(
2748                "/slow",
2749                Axon::<(), (), String, ()>::new("Slow").then(SlowRoute),
2750            );
2751
2752        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
2753        let server = tokio::spawn(async move {
2754            ingress
2755                .run_with_shutdown_signal((), async move {
2756                    let _ = shutdown_rx.await;
2757                })
2758                .await
2759        });
2760
2761        let mut stream = connect_with_retry(addr).await;
2762        stream
2763            .write_all(b"GET /slow HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
2764            .await
2765            .expect("write request");
2766
2767        let mut buf = Vec::new();
2768        stream.read_to_end(&mut buf).await.expect("read response");
2769        let response = String::from_utf8_lossy(&buf);
2770        assert!(response.starts_with("HTTP/1.1 408"), "{response}");
2771
2772        let _ = shutdown_tx.send(());
2773        server
2774            .await
2775            .expect("server join")
2776            .expect("server shutdown should succeed");
2777    }
2778
2779}