Skip to main content

ranvier_http/
ingress.rs

1//! # Ingress Module - Flat API Entry Point
2//!
3//! Implements Discussion 193: `Ranvier::http()` is an **Ingress Circuit Builder**, not a web server.
4//!
5//! ## API Surface (MVP)
6//!
7//! - `bind(addr)` — Execution unit
8//! - `route(path, circuit)` — Core wiring
9//! - `fallback(circuit)` — Circuit completeness
10//! - `into_raw_service()` — Escape hatch to Raw API
11//!
12//! ## Flat API Principle (Discussion 192)
13//!
14//! User code depth ≤ 2. Complexity is isolated, not hidden.
15
16use base64::Engine;
17use bytes::Bytes;
18use futures_util::{SinkExt, StreamExt};
19use http::{Method, Request, Response, StatusCode, Uri};
20use http_body::Body;
21use http_body_util::{BodyExt, Full};
22use hyper::body::Incoming;
23use hyper::server::conn::http1;
24use hyper::upgrade::Upgraded;
25use hyper_util::rt::TokioIo;
26use hyper_util::service::TowerToHyperService;
27use ranvier_core::event::{EventSink, EventSource};
28use ranvier_core::prelude::*;
29use ranvier_runtime::Axon;
30use serde::Serialize;
31use serde::de::DeserializeOwned;
32use sha1::{Digest, Sha1};
33use std::collections::HashMap;
34use std::convert::Infallible;
35use std::future::Future;
36use std::net::SocketAddr;
37use std::pin::Pin;
38use std::sync::Arc;
39use std::time::Duration;
40use tokio::net::TcpListener;
41use tokio::sync::Mutex;
42use tokio_tungstenite::WebSocketStream;
43use tokio_tungstenite::tungstenite::{Error as WsWireError, Message as WsWireMessage};
44use tower::util::BoxCloneService;
45use tower::{Layer, Service, ServiceExt, service_fn};
46use tower_http::compression::CompressionLayer;
47use tower_http::services::{ServeDir, ServeFile};
48use tracing::Instrument;
49
50use crate::response::{IntoResponse, outcome_to_response_with_error};
51
52/// The Ranvier Framework entry point.
53///
54/// `Ranvier` provides static methods to create Ingress builders for various protocols.
55/// Currently only HTTP is supported.
56pub struct Ranvier;
57
58impl Ranvier {
59    /// Create an HTTP Ingress Circuit Builder.
60    pub fn http<R>() -> HttpIngress<R>
61    where
62        R: ranvier_core::transition::ResourceRequirement + Clone,
63    {
64        HttpIngress::new()
65    }
66}
67
68/// Route handler type: boxed async function returning Response
69type RouteHandler<R> = Arc<
70    dyn Fn(Request<Incoming>, &R) -> Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
71        + Send
72        + Sync,
73>;
74
75type BoxHttpService = BoxCloneService<Request<Incoming>, Response<Full<Bytes>>, Infallible>;
76type ServiceLayer = Arc<dyn Fn(BoxHttpService) -> BoxHttpService + Send + Sync>;
77type LifecycleHook = Arc<dyn Fn() + Send + Sync>;
78type BusInjector = Arc<dyn Fn(&Request<Incoming>, &mut Bus) + Send + Sync>;
79type WsSessionFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
80type WsSessionHandler<R> =
81    Arc<dyn Fn(WebSocketConnection, Arc<R>, Bus) -> WsSessionFuture + Send + Sync>;
82type HealthCheckFuture = Pin<Box<dyn Future<Output = Result<(), String>> + Send>>;
83type HealthCheckFn<R> = Arc<dyn Fn(Arc<R>) -> HealthCheckFuture + Send + Sync>;
84const REQUEST_ID_HEADER: &str = "x-request-id";
85const WS_UPGRADE_TOKEN: &str = "websocket";
86const WS_GUID: &str = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
87
88#[derive(Clone)]
89struct NamedHealthCheck<R> {
90    name: String,
91    check: HealthCheckFn<R>,
92}
93
94#[derive(Clone)]
95struct HealthConfig<R> {
96    health_path: Option<String>,
97    readiness_path: Option<String>,
98    liveness_path: Option<String>,
99    checks: Vec<NamedHealthCheck<R>>,
100}
101
102impl<R> Default for HealthConfig<R> {
103    fn default() -> Self {
104        Self {
105            health_path: None,
106            readiness_path: None,
107            liveness_path: None,
108            checks: Vec::new(),
109        }
110    }
111}
112
113#[derive(Clone, Default)]
114struct StaticAssetsConfig {
115    mounts: Vec<StaticMount>,
116    spa_fallback: Option<String>,
117    cache_control: Option<String>,
118    enable_compression: bool,
119}
120
121#[derive(Clone)]
122struct StaticMount {
123    route_prefix: String,
124    directory: String,
125}
126
127#[derive(Serialize)]
128struct HealthReport {
129    status: &'static str,
130    probe: &'static str,
131    checks: Vec<HealthCheckReport>,
132}
133
134#[derive(Serialize)]
135struct HealthCheckReport {
136    name: String,
137    status: &'static str,
138    #[serde(skip_serializing_if = "Option::is_none")]
139    error: Option<String>,
140}
141
142#[derive(Clone)]
143struct TimeoutService {
144    inner: BoxHttpService,
145    timeout: Duration,
146}
147
148impl Service<Request<Incoming>> for TimeoutService {
149    type Response = Response<Full<Bytes>>;
150    type Error = Infallible;
151    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
152
153    fn poll_ready(
154        &mut self,
155        cx: &mut std::task::Context<'_>,
156    ) -> std::task::Poll<Result<(), Self::Error>> {
157        self.inner.poll_ready(cx)
158    }
159
160    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
161        let timeout = self.timeout;
162        let fut = self.inner.call(req);
163        Box::pin(async move {
164            match tokio::time::timeout(timeout, fut).await {
165                Ok(response) => response,
166                Err(_) => Ok(Response::builder()
167                    .status(StatusCode::REQUEST_TIMEOUT)
168                    .body(Full::new(Bytes::from("Request Timeout")))
169                    .unwrap()),
170            }
171        })
172    }
173}
174
175#[derive(Clone)]
176struct RequestIdService {
177    inner: BoxHttpService,
178}
179
180impl Service<Request<Incoming>> for RequestIdService {
181    type Response = Response<Full<Bytes>>;
182    type Error = Infallible;
183    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
184
185    fn poll_ready(
186        &mut self,
187        cx: &mut std::task::Context<'_>,
188    ) -> std::task::Poll<Result<(), Self::Error>> {
189        self.inner.poll_ready(cx)
190    }
191
192    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
193        let mut req = req;
194        let request_id = req
195            .headers()
196            .get(REQUEST_ID_HEADER)
197            .cloned()
198            .unwrap_or_else(|| {
199                http::HeaderValue::from_str(&uuid::Uuid::new_v4().to_string())
200                    .unwrap_or_else(|_| http::HeaderValue::from_static("request-id-unavailable"))
201            });
202
203        req.headers_mut()
204            .insert(REQUEST_ID_HEADER, request_id.clone());
205
206        let fut = self.inner.call(req);
207        Box::pin(async move {
208            let mut response = fut.await?;
209            response.headers_mut().insert(REQUEST_ID_HEADER, request_id);
210            Ok(response)
211        })
212    }
213}
214
215fn to_service_layer<L>(layer: L) -> ServiceLayer
216where
217    L: Layer<BoxHttpService> + Clone + Send + Sync + 'static,
218    L::Service: Service<Request<Incoming>, Response = Response<Full<Bytes>>, Error = Infallible>
219        + Clone
220        + Send
221        + 'static,
222    <L::Service as Service<Request<Incoming>>>::Future: Send + 'static,
223{
224    Arc::new(move |service: BoxHttpService| BoxCloneService::new(layer.clone().layer(service)))
225}
226
227#[derive(Clone, Debug, Default, PartialEq, Eq)]
228pub struct PathParams {
229    values: HashMap<String, String>,
230}
231
232/// Public route descriptor snapshot for tooling integrations (e.g., OpenAPI generation).
233#[derive(Clone, Debug, PartialEq, Eq)]
234pub struct HttpRouteDescriptor {
235    method: Method,
236    path_pattern: String,
237}
238
239impl HttpRouteDescriptor {
240    pub fn new(method: Method, path_pattern: impl Into<String>) -> Self {
241        Self {
242            method,
243            path_pattern: path_pattern.into(),
244        }
245    }
246
247    pub fn method(&self) -> &Method {
248        &self.method
249    }
250
251    pub fn path_pattern(&self) -> &str {
252        &self.path_pattern
253    }
254}
255
256/// Connection metadata injected into Bus for each accepted WebSocket session.
257#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
258pub struct WebSocketSessionContext {
259    connection_id: uuid::Uuid,
260    path: String,
261    query: Option<String>,
262}
263
264impl WebSocketSessionContext {
265    pub fn connection_id(&self) -> uuid::Uuid {
266        self.connection_id
267    }
268
269    pub fn path(&self) -> &str {
270        &self.path
271    }
272
273    pub fn query(&self) -> Option<&str> {
274        self.query.as_deref()
275    }
276}
277
278/// Logical WebSocket message model used by Ranvier EventSource/EventSink bridge.
279#[derive(Clone, Debug, PartialEq, Eq)]
280pub enum WebSocketEvent {
281    Text(String),
282    Binary(Vec<u8>),
283    Ping(Vec<u8>),
284    Pong(Vec<u8>),
285    Close,
286}
287
288impl WebSocketEvent {
289    pub fn text(value: impl Into<String>) -> Self {
290        Self::Text(value.into())
291    }
292
293    pub fn binary(value: impl Into<Vec<u8>>) -> Self {
294        Self::Binary(value.into())
295    }
296
297    pub fn json<T>(value: &T) -> Result<Self, serde_json::Error>
298    where
299        T: Serialize,
300    {
301        let text = serde_json::to_string(value)?;
302        Ok(Self::Text(text))
303    }
304}
305
306#[derive(Debug, thiserror::Error)]
307pub enum WebSocketError {
308    #[error("websocket wire error: {0}")]
309    Wire(#[from] WsWireError),
310    #[error("json serialization failed: {0}")]
311    JsonSerialize(#[source] serde_json::Error),
312    #[error("json deserialization failed: {0}")]
313    JsonDeserialize(#[source] serde_json::Error),
314    #[error("expected text or binary frame for json payload")]
315    NonDataFrame,
316}
317
318type WsServerStream = WebSocketStream<TokioIo<Upgraded>>;
319type WsServerSink = futures_util::stream::SplitSink<WsServerStream, WsWireMessage>;
320type WsServerSource = futures_util::stream::SplitStream<WsServerStream>;
321
322/// WebSocket connection adapter bridging wire frames and EventSource/EventSink traits.
323pub struct WebSocketConnection {
324    sink: Mutex<WsServerSink>,
325    source: Mutex<WsServerSource>,
326    session: WebSocketSessionContext,
327}
328
329impl WebSocketConnection {
330    fn new(stream: WsServerStream, session: WebSocketSessionContext) -> Self {
331        let (sink, source) = stream.split();
332        Self {
333            sink: Mutex::new(sink),
334            source: Mutex::new(source),
335            session,
336        }
337    }
338
339    pub fn session(&self) -> &WebSocketSessionContext {
340        &self.session
341    }
342
343    pub async fn send(&self, event: WebSocketEvent) -> Result<(), WebSocketError> {
344        let mut sink = self.sink.lock().await;
345        sink.send(event.into_wire_message()).await?;
346        Ok(())
347    }
348
349    pub async fn send_json<T>(&self, value: &T) -> Result<(), WebSocketError>
350    where
351        T: Serialize,
352    {
353        let event = WebSocketEvent::json(value).map_err(WebSocketError::JsonSerialize)?;
354        self.send(event).await
355    }
356
357    pub async fn next_json<T>(&mut self) -> Result<Option<T>, WebSocketError>
358    where
359        T: DeserializeOwned,
360    {
361        let Some(event) = self.recv_event().await? else {
362            return Ok(None);
363        };
364        match event {
365            WebSocketEvent::Text(text) => serde_json::from_str(&text)
366                .map(Some)
367                .map_err(WebSocketError::JsonDeserialize),
368            WebSocketEvent::Binary(bytes) => serde_json::from_slice(&bytes)
369                .map(Some)
370                .map_err(WebSocketError::JsonDeserialize),
371            _ => Err(WebSocketError::NonDataFrame),
372        }
373    }
374
375    async fn recv_event(&mut self) -> Result<Option<WebSocketEvent>, WsWireError> {
376        let mut source = self.source.lock().await;
377        while let Some(item) = source.next().await {
378            let message = item?;
379            if let Some(event) = WebSocketEvent::from_wire_message(message) {
380                return Ok(Some(event));
381            }
382        }
383        Ok(None)
384    }
385}
386
387impl WebSocketEvent {
388    fn from_wire_message(message: WsWireMessage) -> Option<Self> {
389        match message {
390            WsWireMessage::Text(value) => Some(Self::Text(value.to_string())),
391            WsWireMessage::Binary(value) => Some(Self::Binary(value.to_vec())),
392            WsWireMessage::Ping(value) => Some(Self::Ping(value.to_vec())),
393            WsWireMessage::Pong(value) => Some(Self::Pong(value.to_vec())),
394            WsWireMessage::Close(_) => Some(Self::Close),
395            WsWireMessage::Frame(_) => None,
396        }
397    }
398
399    fn into_wire_message(self) -> WsWireMessage {
400        match self {
401            Self::Text(value) => WsWireMessage::Text(value.into()),
402            Self::Binary(value) => WsWireMessage::Binary(value.into()),
403            Self::Ping(value) => WsWireMessage::Ping(value.into()),
404            Self::Pong(value) => WsWireMessage::Pong(value.into()),
405            Self::Close => WsWireMessage::Close(None),
406        }
407    }
408}
409
410#[async_trait::async_trait]
411impl EventSource<WebSocketEvent> for WebSocketConnection {
412    async fn next_event(&mut self) -> Option<WebSocketEvent> {
413        match self.recv_event().await {
414            Ok(event) => event,
415            Err(error) => {
416                tracing::warn!(ranvier.ws.error = %error, "websocket source read failed");
417                None
418            }
419        }
420    }
421}
422
423#[async_trait::async_trait]
424impl EventSink<WebSocketEvent> for WebSocketConnection {
425    type Error = WebSocketError;
426
427    async fn send_event(&self, event: WebSocketEvent) -> Result<(), Self::Error> {
428        self.send(event).await
429    }
430}
431
432#[async_trait::async_trait]
433impl EventSink<String> for WebSocketConnection {
434    type Error = WebSocketError;
435
436    async fn send_event(&self, event: String) -> Result<(), Self::Error> {
437        self.send(WebSocketEvent::Text(event)).await
438    }
439}
440
441#[async_trait::async_trait]
442impl EventSink<Vec<u8>> for WebSocketConnection {
443    type Error = WebSocketError;
444
445    async fn send_event(&self, event: Vec<u8>) -> Result<(), Self::Error> {
446        self.send(WebSocketEvent::Binary(event)).await
447    }
448}
449
450impl PathParams {
451    pub fn new(values: HashMap<String, String>) -> Self {
452        Self { values }
453    }
454
455    pub fn get(&self, key: &str) -> Option<&str> {
456        self.values.get(key).map(String::as_str)
457    }
458
459    pub fn as_map(&self) -> &HashMap<String, String> {
460        &self.values
461    }
462
463    pub fn into_inner(self) -> HashMap<String, String> {
464        self.values
465    }
466}
467
468#[derive(Clone, Debug, PartialEq, Eq)]
469enum RouteSegment {
470    Static(String),
471    Param(String),
472    Wildcard(String),
473}
474
475#[derive(Clone, Debug, PartialEq, Eq)]
476struct RoutePattern {
477    raw: String,
478    segments: Vec<RouteSegment>,
479}
480
481impl RoutePattern {
482    fn parse(path: &str) -> Self {
483        let segments = path_segments(path)
484            .into_iter()
485            .map(|segment| {
486                if let Some(name) = segment.strip_prefix(':') {
487                    if !name.is_empty() {
488                        return RouteSegment::Param(name.to_string());
489                    }
490                }
491                if let Some(name) = segment.strip_prefix('*') {
492                    if !name.is_empty() {
493                        return RouteSegment::Wildcard(name.to_string());
494                    }
495                }
496                RouteSegment::Static(segment.to_string())
497            })
498            .collect();
499
500        Self {
501            raw: path.to_string(),
502            segments,
503        }
504    }
505
506    fn match_path(&self, path: &str) -> Option<PathParams> {
507        let mut params = HashMap::new();
508        let path_segments = path_segments(path);
509        let mut pattern_index = 0usize;
510        let mut path_index = 0usize;
511
512        while pattern_index < self.segments.len() {
513            match &self.segments[pattern_index] {
514                RouteSegment::Static(expected) => {
515                    let actual = path_segments.get(path_index)?;
516                    if actual != expected {
517                        return None;
518                    }
519                    pattern_index += 1;
520                    path_index += 1;
521                }
522                RouteSegment::Param(name) => {
523                    let actual = path_segments.get(path_index)?;
524                    params.insert(name.clone(), (*actual).to_string());
525                    pattern_index += 1;
526                    path_index += 1;
527                }
528                RouteSegment::Wildcard(name) => {
529                    let remaining = path_segments[path_index..].join("/");
530                    params.insert(name.clone(), remaining);
531                    pattern_index += 1;
532                    path_index = path_segments.len();
533                    break;
534                }
535            }
536        }
537
538        if pattern_index == self.segments.len() && path_index == path_segments.len() {
539            Some(PathParams::new(params))
540        } else {
541            None
542        }
543    }
544}
545
546#[derive(Clone)]
547struct RouteEntry<R> {
548    method: Method,
549    pattern: RoutePattern,
550    handler: RouteHandler<R>,
551    layers: Arc<Vec<ServiceLayer>>,
552    apply_global_layers: bool,
553}
554
555fn path_segments(path: &str) -> Vec<&str> {
556    if path == "/" {
557        return Vec::new();
558    }
559
560    path.trim_matches('/')
561        .split('/')
562        .filter(|segment| !segment.is_empty())
563        .collect()
564}
565
566fn normalize_route_path(path: String) -> String {
567    if path.is_empty() {
568        return "/".to_string();
569    }
570    if path.starts_with('/') {
571        path
572    } else {
573        format!("/{path}")
574    }
575}
576
577fn find_matching_route<'a, R>(
578    routes: &'a [RouteEntry<R>],
579    method: &Method,
580    path: &str,
581) -> Option<(&'a RouteEntry<R>, PathParams)> {
582    for entry in routes {
583        if &entry.method != method {
584            continue;
585        }
586        if let Some(params) = entry.pattern.match_path(path) {
587            return Some((entry, params));
588        }
589    }
590    None
591}
592
593fn header_contains_token(
594    headers: &http::HeaderMap,
595    name: http::header::HeaderName,
596    token: &str,
597) -> bool {
598    headers
599        .get(name)
600        .and_then(|value| value.to_str().ok())
601        .map(|value| {
602            value
603                .split(',')
604                .any(|part| part.trim().eq_ignore_ascii_case(token))
605        })
606        .unwrap_or(false)
607}
608
609fn websocket_session_from_request(req: &Request<Incoming>) -> WebSocketSessionContext {
610    WebSocketSessionContext {
611        connection_id: uuid::Uuid::new_v4(),
612        path: req.uri().path().to_string(),
613        query: req.uri().query().map(str::to_string),
614    }
615}
616
617fn websocket_accept_key(client_key: &str) -> String {
618    let mut hasher = Sha1::new();
619    hasher.update(client_key.as_bytes());
620    hasher.update(WS_GUID.as_bytes());
621    let digest = hasher.finalize();
622    base64::engine::general_purpose::STANDARD.encode(digest)
623}
624
625fn websocket_bad_request(message: &'static str) -> Response<Full<Bytes>> {
626    Response::builder()
627        .status(StatusCode::BAD_REQUEST)
628        .body(Full::new(Bytes::from(message)))
629        .unwrap_or_else(|_| Response::new(Full::new(Bytes::new())))
630}
631
632fn websocket_upgrade_response(
633    req: &mut Request<Incoming>,
634) -> Result<(Response<Full<Bytes>>, hyper::upgrade::OnUpgrade), Response<Full<Bytes>>> {
635    if req.method() != Method::GET {
636        return Err(websocket_bad_request(
637            "WebSocket upgrade requires GET method",
638        ));
639    }
640
641    if !header_contains_token(req.headers(), http::header::CONNECTION, "upgrade") {
642        return Err(websocket_bad_request(
643            "Missing Connection: upgrade header for WebSocket",
644        ));
645    }
646
647    if !header_contains_token(req.headers(), http::header::UPGRADE, WS_UPGRADE_TOKEN) {
648        return Err(websocket_bad_request("Missing Upgrade: websocket header"));
649    }
650
651    if let Some(version) = req.headers().get("sec-websocket-version") {
652        if version != "13" {
653            return Err(websocket_bad_request(
654                "Unsupported Sec-WebSocket-Version (expected 13)",
655            ));
656        }
657    }
658
659    let Some(client_key) = req
660        .headers()
661        .get("sec-websocket-key")
662        .and_then(|value| value.to_str().ok())
663    else {
664        return Err(websocket_bad_request(
665            "Missing Sec-WebSocket-Key header for WebSocket",
666        ));
667    };
668
669    let accept_key = websocket_accept_key(client_key);
670    let on_upgrade = hyper::upgrade::on(req);
671    let response = Response::builder()
672        .status(StatusCode::SWITCHING_PROTOCOLS)
673        .header(http::header::UPGRADE, WS_UPGRADE_TOKEN)
674        .header(http::header::CONNECTION, "Upgrade")
675        .header("sec-websocket-accept", accept_key)
676        .body(Full::new(Bytes::new()))
677        .unwrap_or_else(|_| Response::new(Full::new(Bytes::new())));
678
679    Ok((response, on_upgrade))
680}
681
682/// HTTP Ingress Circuit Builder.
683///
684/// Wires HTTP inputs to Ranvier Circuits. This is NOT a web server—it's a circuit wiring tool.
685///
686/// **Ingress is part of Schematic** (separate layer: Ingress → Circuit → Egress)
687pub struct HttpIngress<R = ()> {
688    /// Bind address (e.g., "127.0.0.1:3000")
689    addr: Option<String>,
690    /// Routes: (Method, RoutePattern, Handler)
691    routes: Vec<RouteEntry<R>>,
692    /// Fallback circuit for unmatched routes
693    fallback: Option<RouteHandler<R>>,
694    /// Global middleware layers (LIFO execution on request path).
695    layers: Vec<ServiceLayer>,
696    /// Lifecycle callback invoked after listener bind succeeds.
697    on_start: Option<LifecycleHook>,
698    /// Lifecycle callback invoked when graceful shutdown finishes.
699    on_shutdown: Option<LifecycleHook>,
700    /// Maximum time to wait for in-flight requests to drain.
701    graceful_shutdown_timeout: Duration,
702    /// Request-context to Bus injection hooks executed before each circuit run.
703    bus_injectors: Vec<BusInjector>,
704    /// Static asset serving configuration (serve_dir + SPA fallback).
705    static_assets: StaticAssetsConfig,
706    /// Built-in health endpoint configuration.
707    health: HealthConfig<R>,
708    _phantom: std::marker::PhantomData<R>,
709}
710
711impl<R> HttpIngress<R>
712where
713    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
714{
715    /// Create a new empty HttpIngress builder.
716    pub fn new() -> Self {
717        Self {
718            addr: None,
719            routes: Vec::new(),
720            fallback: None,
721            layers: Vec::new(),
722            on_start: None,
723            on_shutdown: None,
724            graceful_shutdown_timeout: Duration::from_secs(30),
725            bus_injectors: Vec::new(),
726            static_assets: StaticAssetsConfig::default(),
727            health: HealthConfig::default(),
728            _phantom: std::marker::PhantomData,
729        }
730    }
731
732    /// Set the bind address for the server.
733    pub fn bind(mut self, addr: impl Into<String>) -> Self {
734        self.addr = Some(addr.into());
735        self
736    }
737
738    /// Register a lifecycle callback invoked when the server starts listening.
739    pub fn on_start<F>(mut self, callback: F) -> Self
740    where
741        F: Fn() + Send + Sync + 'static,
742    {
743        self.on_start = Some(Arc::new(callback));
744        self
745    }
746
747    /// Register a lifecycle callback invoked after graceful shutdown completes.
748    pub fn on_shutdown<F>(mut self, callback: F) -> Self
749    where
750        F: Fn() + Send + Sync + 'static,
751    {
752        self.on_shutdown = Some(Arc::new(callback));
753        self
754    }
755
756    /// Configure graceful shutdown timeout for in-flight request draining.
757    pub fn graceful_shutdown(mut self, timeout: Duration) -> Self {
758        self.graceful_shutdown_timeout = timeout;
759        self
760    }
761
762    /// Add a global Tower layer to the ingress service stack.
763    ///
764    /// Layers execute in LIFO order on the request path:
765    /// the last layer added is the first to receive the request.
766    pub fn layer<L>(mut self, layer: L) -> Self
767    where
768        L: Layer<BoxHttpService> + Clone + Send + Sync + 'static,
769        L::Service: Service<Request<Incoming>, Response = Response<Full<Bytes>>, Error = Infallible>
770            + Clone
771            + Send
772            + 'static,
773        <L::Service as Service<Request<Incoming>>>::Future: Send + 'static,
774    {
775        self.layers.push(to_service_layer(layer));
776        self
777    }
778
779    /// Add built-in timeout middleware that returns `408 Request Timeout`
780    /// when the inner service call exceeds `timeout`.
781    pub fn timeout_layer(mut self, timeout: Duration) -> Self {
782        self.layers.push(Arc::new(move |service: BoxHttpService| {
783            BoxCloneService::new(TimeoutService {
784                inner: service,
785                timeout,
786            })
787        }));
788        self
789    }
790
791    /// Add built-in request-id middleware.
792    ///
793    /// Ensures `x-request-id` exists on request and response headers.
794    pub fn request_id_layer(mut self) -> Self {
795        self.layers.push(Arc::new(move |service: BoxHttpService| {
796            BoxCloneService::new(RequestIdService { inner: service })
797        }));
798        self
799    }
800
801    /// Register a request-context injector executed before each circuit run.
802    ///
803    /// Use this to bridge adapter-layer context (request extensions/headers)
804    /// into explicit Bus resources consumed by Transitions.
805    pub fn bus_injector<F>(mut self, injector: F) -> Self
806    where
807        F: Fn(&Request<Incoming>, &mut Bus) + Send + Sync + 'static,
808    {
809        self.bus_injectors.push(Arc::new(injector));
810        self
811    }
812
813    /// Export route metadata snapshot for external tooling.
814    pub fn route_descriptors(&self) -> Vec<HttpRouteDescriptor> {
815        let mut descriptors = self
816            .routes
817            .iter()
818            .map(|entry| HttpRouteDescriptor::new(entry.method.clone(), entry.pattern.raw.clone()))
819            .collect::<Vec<_>>();
820
821        if let Some(path) = &self.health.health_path {
822            descriptors.push(HttpRouteDescriptor::new(Method::GET, path.clone()));
823        }
824        if let Some(path) = &self.health.readiness_path {
825            descriptors.push(HttpRouteDescriptor::new(Method::GET, path.clone()));
826        }
827        if let Some(path) = &self.health.liveness_path {
828            descriptors.push(HttpRouteDescriptor::new(Method::GET, path.clone()));
829        }
830
831        descriptors
832    }
833
834    /// Mount a static directory under a path prefix.
835    ///
836    /// Example: `.serve_dir("/static", "./public")`.
837    pub fn serve_dir(
838        mut self,
839        route_prefix: impl Into<String>,
840        directory: impl Into<String>,
841    ) -> Self {
842        self.static_assets.mounts.push(StaticMount {
843            route_prefix: normalize_route_path(route_prefix.into()),
844            directory: directory.into(),
845        });
846        if self.static_assets.cache_control.is_none() {
847            self.static_assets.cache_control = Some("public, max-age=3600".to_string());
848        }
849        self
850    }
851
852    /// Configure SPA fallback file for unmatched GET/HEAD routes.
853    ///
854    /// Example: `.spa_fallback("./public/index.html")`.
855    pub fn spa_fallback(mut self, file_path: impl Into<String>) -> Self {
856        self.static_assets.spa_fallback = Some(file_path.into());
857        self
858    }
859
860    /// Override default Cache-Control for static responses.
861    pub fn static_cache_control(mut self, cache_control: impl Into<String>) -> Self {
862        self.static_assets.cache_control = Some(cache_control.into());
863        self
864    }
865
866    /// Add gzip/brotli response compression via `tower-http::CompressionLayer`.
867    pub fn compression_layer(mut self) -> Self {
868        self.static_assets.enable_compression = true;
869        self
870    }
871
872    /// Register a WebSocket upgrade endpoint and session handler.
873    ///
874    /// The handler receives:
875    /// 1) a `WebSocketConnection` implementing `EventSource`/`EventSink`,
876    /// 2) shared resources (`Arc<R>`),
877    /// 3) a connection-scoped `Bus` with request injectors + `WebSocketSessionContext`.
878    pub fn ws<H, Fut>(mut self, path: impl Into<String>, handler: H) -> Self
879    where
880        H: Fn(WebSocketConnection, Arc<R>, Bus) -> Fut + Send + Sync + 'static,
881        Fut: Future<Output = ()> + Send + 'static,
882    {
883        let path_str: String = path.into();
884        let ws_handler: WsSessionHandler<R> = Arc::new(move |connection, resources, bus| {
885            Box::pin(handler(connection, resources, bus))
886        });
887        let bus_injectors = Arc::new(self.bus_injectors.clone());
888        let path_for_pattern = path_str.clone();
889        let path_for_handler = path_str;
890
891        let route_handler: RouteHandler<R> =
892            Arc::new(move |mut req: Request<Incoming>, res: &R| {
893                let ws_handler = ws_handler.clone();
894                let bus_injectors = bus_injectors.clone();
895                let resources = Arc::new(res.clone());
896                let path = path_for_handler.clone();
897
898                Box::pin(async move {
899                    let request_id = uuid::Uuid::new_v4().to_string();
900                    let span = tracing::info_span!(
901                        "WebSocketUpgrade",
902                        ranvier.ws.path = %path,
903                        ranvier.ws.request_id = %request_id
904                    );
905
906                    async move {
907                        let mut bus = Bus::new();
908                        for injector in bus_injectors.iter() {
909                            injector(&req, &mut bus);
910                        }
911
912                        let session = websocket_session_from_request(&req);
913                        bus.insert(session.clone());
914
915                        let (response, on_upgrade) = match websocket_upgrade_response(&mut req) {
916                            Ok(result) => result,
917                            Err(error_response) => return error_response,
918                        };
919
920                        tokio::spawn(async move {
921                            match on_upgrade.await {
922                                Ok(upgraded) => {
923                                    let stream = WebSocketStream::from_raw_socket(
924                                        TokioIo::new(upgraded),
925                                        tokio_tungstenite::tungstenite::protocol::Role::Server,
926                                        None,
927                                    )
928                                    .await;
929                                    let connection = WebSocketConnection::new(stream, session);
930                                    ws_handler(connection, resources, bus).await;
931                                }
932                                Err(error) => {
933                                    tracing::warn!(
934                                        ranvier.ws.path = %path,
935                                        ranvier.ws.error = %error,
936                                        "websocket upgrade failed"
937                                    );
938                                }
939                            }
940                        });
941
942                        response
943                    }
944                    .instrument(span)
945                    .await
946                }) as Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
947            });
948
949        self.routes.push(RouteEntry {
950            method: Method::GET,
951            pattern: RoutePattern::parse(&path_for_pattern),
952            handler: route_handler,
953            layers: Arc::new(Vec::new()),
954            apply_global_layers: true,
955        });
956
957        self
958    }
959
960    /// Enable built-in health endpoint at the given path.
961    ///
962    /// The endpoint returns JSON with status and check results.
963    /// If no checks are registered, status is always `ok`.
964    pub fn health_endpoint(mut self, path: impl Into<String>) -> Self {
965        self.health.health_path = Some(normalize_route_path(path.into()));
966        self
967    }
968
969    /// Register an async health check used by `/health` and `/ready` probes.
970    ///
971    /// `Err` values are converted to strings and surfaced in the JSON response.
972    pub fn health_check<F, Fut, Err>(mut self, name: impl Into<String>, check: F) -> Self
973    where
974        F: Fn(Arc<R>) -> Fut + Send + Sync + 'static,
975        Fut: Future<Output = Result<(), Err>> + Send + 'static,
976        Err: ToString + Send + 'static,
977    {
978        if self.health.health_path.is_none() {
979            self.health.health_path = Some("/health".to_string());
980        }
981
982        let check_fn: HealthCheckFn<R> = Arc::new(move |resources: Arc<R>| {
983            let fut = check(resources);
984            Box::pin(async move { fut.await.map_err(|error| error.to_string()) })
985        });
986
987        self.health.checks.push(NamedHealthCheck {
988            name: name.into(),
989            check: check_fn,
990        });
991        self
992    }
993
994    /// Enable readiness/liveness probe separation with explicit paths.
995    pub fn readiness_liveness(
996        mut self,
997        readiness_path: impl Into<String>,
998        liveness_path: impl Into<String>,
999    ) -> Self {
1000        self.health.readiness_path = Some(normalize_route_path(readiness_path.into()));
1001        self.health.liveness_path = Some(normalize_route_path(liveness_path.into()));
1002        self
1003    }
1004
1005    /// Enable readiness/liveness probes at `/ready` and `/live`.
1006    pub fn readiness_liveness_default(self) -> Self {
1007        self.readiness_liveness("/ready", "/live")
1008    }
1009
1010    /// Register a route with GET method.
1011    pub fn route<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1012    where
1013        Out: IntoResponse + Send + Sync + 'static,
1014        E: Send + 'static + std::fmt::Debug,
1015    {
1016        self.route_method(Method::GET, path, circuit)
1017    }
1018    /// Register a route with a specific HTTP method.
1019    ///
1020    /// # Example
1021    ///
1022    /// ```rust,ignore
1023    /// Ranvier::http()
1024    ///     .route_method(Method::POST, "/users", create_user_circuit)
1025    /// ```
1026    pub fn route_method<Out, E>(
1027        self,
1028        method: Method,
1029        path: impl Into<String>,
1030        circuit: Axon<(), Out, E, R>,
1031    ) -> Self
1032    where
1033        Out: IntoResponse + Send + Sync + 'static,
1034        E: Send + 'static + std::fmt::Debug,
1035    {
1036        self.route_method_with_error(method, path, circuit, |error| {
1037            (
1038                StatusCode::INTERNAL_SERVER_ERROR,
1039                format!("Error: {:?}", error),
1040            )
1041                .into_response()
1042        })
1043    }
1044
1045    pub fn route_method_with_error<Out, E, H>(
1046        self,
1047        method: Method,
1048        path: impl Into<String>,
1049        circuit: Axon<(), Out, E, R>,
1050        error_handler: H,
1051    ) -> Self
1052    where
1053        Out: IntoResponse + Send + Sync + 'static,
1054        E: Send + 'static + std::fmt::Debug,
1055        H: Fn(&E) -> Response<Full<Bytes>> + Send + Sync + 'static,
1056    {
1057        self.route_method_with_error_and_layers(
1058            method,
1059            path,
1060            circuit,
1061            error_handler,
1062            Arc::new(Vec::new()),
1063            true,
1064        )
1065    }
1066
1067    pub fn route_method_with_layer<Out, E, L>(
1068        self,
1069        method: Method,
1070        path: impl Into<String>,
1071        circuit: Axon<(), Out, E, R>,
1072        layer: L,
1073    ) -> Self
1074    where
1075        Out: IntoResponse + Send + Sync + 'static,
1076        E: Send + 'static + std::fmt::Debug,
1077        L: Layer<BoxHttpService> + Clone + Send + Sync + 'static,
1078        L::Service: Service<Request<Incoming>, Response = Response<Full<Bytes>>, Error = Infallible>
1079            + Clone
1080            + Send
1081            + 'static,
1082        <L::Service as Service<Request<Incoming>>>::Future: Send + 'static,
1083    {
1084        self.route_method_with_error_and_layers(
1085            method,
1086            path,
1087            circuit,
1088            |error| {
1089                (
1090                    StatusCode::INTERNAL_SERVER_ERROR,
1091                    format!("Error: {:?}", error),
1092                )
1093                    .into_response()
1094            },
1095            Arc::new(vec![to_service_layer(layer)]),
1096            true,
1097        )
1098    }
1099
1100    pub fn route_method_with_layer_override<Out, E, L>(
1101        self,
1102        method: Method,
1103        path: impl Into<String>,
1104        circuit: Axon<(), Out, E, R>,
1105        layer: L,
1106    ) -> Self
1107    where
1108        Out: IntoResponse + Send + Sync + 'static,
1109        E: Send + 'static + std::fmt::Debug,
1110        L: Layer<BoxHttpService> + Clone + Send + Sync + 'static,
1111        L::Service: Service<Request<Incoming>, Response = Response<Full<Bytes>>, Error = Infallible>
1112            + Clone
1113            + Send
1114            + 'static,
1115        <L::Service as Service<Request<Incoming>>>::Future: Send + 'static,
1116    {
1117        self.route_method_with_error_and_layers(
1118            method,
1119            path,
1120            circuit,
1121            |error| {
1122                (
1123                    StatusCode::INTERNAL_SERVER_ERROR,
1124                    format!("Error: {:?}", error),
1125                )
1126                    .into_response()
1127            },
1128            Arc::new(vec![to_service_layer(layer)]),
1129            false,
1130        )
1131    }
1132
1133    fn route_method_with_error_and_layers<Out, E, H>(
1134        mut self,
1135        method: Method,
1136        path: impl Into<String>,
1137        circuit: Axon<(), Out, E, R>,
1138        error_handler: H,
1139        route_layers: Arc<Vec<ServiceLayer>>,
1140        apply_global_layers: bool,
1141    ) -> Self
1142    where
1143        Out: IntoResponse + Send + Sync + 'static,
1144        E: Send + 'static + std::fmt::Debug,
1145        H: Fn(&E) -> Response<Full<Bytes>> + Send + Sync + 'static,
1146    {
1147        let path_str: String = path.into();
1148        let circuit = Arc::new(circuit);
1149        let error_handler = Arc::new(error_handler);
1150        let route_bus_injectors = Arc::new(self.bus_injectors.clone());
1151        let path_for_pattern = path_str.clone();
1152        let path_for_handler = path_str;
1153        let method_for_pattern = method.clone();
1154        let method_for_handler = method;
1155
1156        let handler: RouteHandler<R> = Arc::new(move |req: Request<Incoming>, res: &R| {
1157            let circuit = circuit.clone();
1158            let error_handler = error_handler.clone();
1159            let route_bus_injectors = route_bus_injectors.clone();
1160            let res = res.clone();
1161            let path = path_for_handler.clone();
1162            let method = method_for_handler.clone();
1163
1164            Box::pin(async move {
1165                let request_id = uuid::Uuid::new_v4().to_string();
1166                let span = tracing::info_span!(
1167                    "HTTPRequest",
1168                    ranvier.http.method = %method,
1169                    ranvier.http.path = %path,
1170                    ranvier.http.request_id = %request_id
1171                );
1172
1173                async move {
1174                    let mut bus = Bus::new();
1175                    for injector in route_bus_injectors.iter() {
1176                        injector(&req, &mut bus);
1177                    }
1178                    let result = circuit.execute((), &res, &mut bus).await;
1179                    outcome_to_response_with_error(result, |error| error_handler(error))
1180                }
1181                .instrument(span)
1182                .await
1183            }) as Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
1184        });
1185
1186        self.routes.push(RouteEntry {
1187            method: method_for_pattern,
1188            pattern: RoutePattern::parse(&path_for_pattern),
1189            handler,
1190            layers: route_layers,
1191            apply_global_layers,
1192        });
1193        self
1194    }
1195
1196    pub fn get<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1197    where
1198        Out: IntoResponse + Send + Sync + 'static,
1199        E: Send + 'static + std::fmt::Debug,
1200    {
1201        self.route_method(Method::GET, path, circuit)
1202    }
1203
1204    pub fn get_with_error<Out, E, H>(
1205        self,
1206        path: impl Into<String>,
1207        circuit: Axon<(), Out, E, R>,
1208        error_handler: H,
1209    ) -> Self
1210    where
1211        Out: IntoResponse + Send + Sync + 'static,
1212        E: Send + 'static + std::fmt::Debug,
1213        H: Fn(&E) -> Response<Full<Bytes>> + Send + Sync + 'static,
1214    {
1215        self.route_method_with_error(Method::GET, path, circuit, error_handler)
1216    }
1217
1218    pub fn get_with_layer<Out, E, L>(
1219        self,
1220        path: impl Into<String>,
1221        circuit: Axon<(), Out, E, R>,
1222        layer: L,
1223    ) -> Self
1224    where
1225        Out: IntoResponse + Send + Sync + 'static,
1226        E: Send + 'static + std::fmt::Debug,
1227        L: Layer<BoxHttpService> + Clone + Send + Sync + 'static,
1228        L::Service: Service<Request<Incoming>, Response = Response<Full<Bytes>>, Error = Infallible>
1229            + Clone
1230            + Send
1231            + 'static,
1232        <L::Service as Service<Request<Incoming>>>::Future: Send + 'static,
1233    {
1234        self.route_method_with_layer(Method::GET, path, circuit, layer)
1235    }
1236
1237    pub fn get_with_layer_override<Out, E, L>(
1238        self,
1239        path: impl Into<String>,
1240        circuit: Axon<(), Out, E, R>,
1241        layer: L,
1242    ) -> Self
1243    where
1244        Out: IntoResponse + Send + Sync + 'static,
1245        E: Send + 'static + std::fmt::Debug,
1246        L: Layer<BoxHttpService> + Clone + Send + Sync + 'static,
1247        L::Service: Service<Request<Incoming>, Response = Response<Full<Bytes>>, Error = Infallible>
1248            + Clone
1249            + Send
1250            + 'static,
1251        <L::Service as Service<Request<Incoming>>>::Future: Send + 'static,
1252    {
1253        self.route_method_with_layer_override(Method::GET, path, circuit, layer)
1254    }
1255
1256    pub fn post<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1257    where
1258        Out: IntoResponse + Send + Sync + 'static,
1259        E: Send + 'static + std::fmt::Debug,
1260    {
1261        self.route_method(Method::POST, path, circuit)
1262    }
1263
1264    pub fn put<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1265    where
1266        Out: IntoResponse + Send + Sync + 'static,
1267        E: Send + 'static + std::fmt::Debug,
1268    {
1269        self.route_method(Method::PUT, path, circuit)
1270    }
1271
1272    pub fn delete<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1273    where
1274        Out: IntoResponse + Send + Sync + 'static,
1275        E: Send + 'static + std::fmt::Debug,
1276    {
1277        self.route_method(Method::DELETE, path, circuit)
1278    }
1279
1280    pub fn patch<Out, E>(self, path: impl Into<String>, circuit: Axon<(), Out, E, R>) -> Self
1281    where
1282        Out: IntoResponse + Send + Sync + 'static,
1283        E: Send + 'static + std::fmt::Debug,
1284    {
1285        self.route_method(Method::PATCH, path, circuit)
1286    }
1287
1288    /// Set a fallback circuit for unmatched routes.
1289    ///
1290    /// # Example
1291    ///
1292    /// ```rust,ignore
1293    /// let not_found = Axon::new("NotFound").then(|_| async { "404 Not Found" });
1294    /// Ranvier::http()
1295    ///     .route("/", home)
1296    ///     .fallback(not_found)
1297    /// ```
1298    pub fn fallback<Out, E>(mut self, circuit: Axon<(), Out, E, R>) -> Self
1299    where
1300        Out: IntoResponse + Send + Sync + 'static,
1301        E: Send + 'static + std::fmt::Debug,
1302    {
1303        let circuit = Arc::new(circuit);
1304        let fallback_bus_injectors = Arc::new(self.bus_injectors.clone());
1305
1306        let handler: RouteHandler<R> = Arc::new(move |req: Request<Incoming>, res: &R| {
1307            let circuit = circuit.clone();
1308            let fallback_bus_injectors = fallback_bus_injectors.clone();
1309            let res = res.clone();
1310            Box::pin(async move {
1311                let request_id = uuid::Uuid::new_v4().to_string();
1312                let span = tracing::info_span!(
1313                    "HTTPRequest",
1314                    ranvier.http.method = "FALLBACK",
1315                    ranvier.http.request_id = %request_id
1316                );
1317
1318                async move {
1319                    let mut bus = Bus::new();
1320                    for injector in fallback_bus_injectors.iter() {
1321                        injector(&req, &mut bus);
1322                    }
1323                    let result = circuit.execute((), &res, &mut bus).await;
1324
1325                    match result {
1326                        Outcome::Next(output) => {
1327                            let mut response = output.into_response();
1328                            *response.status_mut() = StatusCode::NOT_FOUND;
1329                            response
1330                        }
1331                        _ => Response::builder()
1332                            .status(StatusCode::NOT_FOUND)
1333                            .body(Full::new(Bytes::from("Not Found")))
1334                            .unwrap(),
1335                    }
1336                }
1337                .instrument(span)
1338                .await
1339            }) as Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
1340        });
1341
1342        self.fallback = Some(handler);
1343        self
1344    }
1345
1346    /// Run the HTTP server with required resources.
1347    pub async fn run(self, resources: R) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1348        self.run_with_shutdown_signal(resources, shutdown_signal())
1349            .await
1350    }
1351
1352    async fn run_with_shutdown_signal<S>(
1353        self,
1354        resources: R,
1355        shutdown_signal: S,
1356    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
1357    where
1358        S: Future<Output = ()> + Send,
1359    {
1360        let addr_str = self.addr.as_deref().unwrap_or("127.0.0.1:3000");
1361        let addr: SocketAddr = addr_str.parse()?;
1362
1363        let routes = Arc::new(self.routes);
1364        let fallback = self.fallback;
1365        let layers = Arc::new(self.layers);
1366        let health = Arc::new(self.health);
1367        let static_assets = Arc::new(self.static_assets);
1368        let on_start = self.on_start;
1369        let on_shutdown = self.on_shutdown;
1370        let graceful_shutdown_timeout = self.graceful_shutdown_timeout;
1371        let resources = Arc::new(resources);
1372
1373        let listener = TcpListener::bind(addr).await?;
1374        tracing::info!("Ranvier HTTP Ingress listening on http://{}", addr);
1375        if let Some(callback) = on_start.as_ref() {
1376            callback();
1377        }
1378
1379        tokio::pin!(shutdown_signal);
1380        let mut connections = tokio::task::JoinSet::new();
1381
1382        loop {
1383            tokio::select! {
1384                _ = &mut shutdown_signal => {
1385                    tracing::info!("Shutdown signal received. Draining in-flight connections.");
1386                    break;
1387                }
1388                accept_result = listener.accept() => {
1389                    let (stream, _) = accept_result?;
1390                    let io = TokioIo::new(stream);
1391
1392                    let routes = routes.clone();
1393                    let fallback = fallback.clone();
1394                    let resources = resources.clone();
1395                    let layers = layers.clone();
1396                    let health = health.clone();
1397                    let static_assets = static_assets.clone();
1398
1399                    connections.spawn(async move {
1400                        let service = build_http_service(
1401                            routes,
1402                            fallback,
1403                            resources,
1404                            layers,
1405                            health,
1406                            static_assets,
1407                        );
1408                        let hyper_service = TowerToHyperService::new(service);
1409                        if let Err(err) = http1::Builder::new()
1410                            .serve_connection(io, hyper_service)
1411                            .with_upgrades()
1412                            .await
1413                        {
1414                            tracing::error!("Error serving connection: {:?}", err);
1415                        }
1416                    });
1417                }
1418                Some(join_result) = connections.join_next(), if !connections.is_empty() => {
1419                    if let Err(err) = join_result {
1420                        tracing::warn!("Connection task join error: {:?}", err);
1421                    }
1422                }
1423            }
1424        }
1425
1426        let _timed_out = drain_connections(&mut connections, graceful_shutdown_timeout).await;
1427
1428        drop(resources);
1429        if let Some(callback) = on_shutdown.as_ref() {
1430            callback();
1431        }
1432
1433        Ok(())
1434    }
1435
1436    /// Convert to a raw Tower Service for integration with existing Tower stacks.
1437    ///
1438    /// This is the "escape hatch" per Discussion 193:
1439    /// > "Raw API는 Flat API의 탈출구다."
1440    ///
1441    /// # Example
1442    ///
1443    /// ```rust,ignore
1444    /// let ingress = Ranvier::http()
1445    ///     .bind(":3000")
1446    ///     .route("/", circuit);
1447    ///
1448    /// let raw_service = ingress.into_raw_service();
1449    /// // Use raw_service with existing Tower infrastructure
1450    /// ```
1451    pub fn into_raw_service(self, resources: R) -> RawIngressService<R> {
1452        let routes = Arc::new(self.routes);
1453        let fallback = self.fallback;
1454        let layers = Arc::new(self.layers);
1455        let health = Arc::new(self.health);
1456        let static_assets = Arc::new(self.static_assets);
1457        let resources = Arc::new(resources);
1458
1459        RawIngressService {
1460            routes,
1461            fallback,
1462            layers,
1463            health,
1464            static_assets,
1465            resources,
1466        }
1467    }
1468}
1469
1470fn build_http_service<R>(
1471    routes: Arc<Vec<RouteEntry<R>>>,
1472    fallback: Option<RouteHandler<R>>,
1473    resources: Arc<R>,
1474    layers: Arc<Vec<ServiceLayer>>,
1475    health: Arc<HealthConfig<R>>,
1476    static_assets: Arc<StaticAssetsConfig>,
1477) -> BoxHttpService
1478where
1479    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1480{
1481    let base_service = service_fn(move |req: Request<Incoming>| {
1482        let routes = routes.clone();
1483        let fallback = fallback.clone();
1484        let resources = resources.clone();
1485        let layers = layers.clone();
1486        let health = health.clone();
1487        let static_assets = static_assets.clone();
1488
1489        async move {
1490            let mut req = req;
1491            let method = req.method().clone();
1492            let path = req.uri().path().to_string();
1493
1494            if let Some(response) =
1495                maybe_handle_health_request(&method, &path, &health, resources.clone()).await
1496            {
1497                return Ok::<_, Infallible>(response);
1498            }
1499
1500            if let Some((entry, params)) = find_matching_route(routes.as_slice(), &method, &path) {
1501                req.extensions_mut().insert(params);
1502                let effective_layers = if entry.apply_global_layers {
1503                    merge_layers(&layers, &entry.layers)
1504                } else {
1505                    entry.layers.clone()
1506                };
1507
1508                if effective_layers.is_empty() {
1509                    Ok::<_, Infallible>((entry.handler)(req, &resources).await)
1510                } else {
1511                    let route_service = build_route_service(
1512                        entry.handler.clone(),
1513                        resources.clone(),
1514                        effective_layers,
1515                    );
1516                    route_service.oneshot(req).await
1517                }
1518            } else {
1519                let req =
1520                    match maybe_handle_static_request(req, &method, &path, static_assets.as_ref())
1521                        .await
1522                    {
1523                        Ok(req) => req,
1524                        Err(response) => return Ok(response),
1525                    };
1526
1527                if let Some(ref fb) = fallback {
1528                    if layers.is_empty() {
1529                        Ok(fb(req, &resources).await)
1530                    } else {
1531                        let fallback_service =
1532                            build_route_service(fb.clone(), resources.clone(), layers.clone());
1533                        fallback_service.oneshot(req).await
1534                    }
1535                } else {
1536                    Ok(Response::builder()
1537                        .status(StatusCode::NOT_FOUND)
1538                        .body(Full::new(Bytes::from("Not Found")))
1539                        .unwrap())
1540                }
1541            }
1542        }
1543    });
1544
1545    BoxCloneService::new(base_service)
1546}
1547
1548fn build_route_service<R>(
1549    handler: RouteHandler<R>,
1550    resources: Arc<R>,
1551    layers: Arc<Vec<ServiceLayer>>,
1552) -> BoxHttpService
1553where
1554    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1555{
1556    let base_service = service_fn(move |req: Request<Incoming>| {
1557        let handler = handler.clone();
1558        let resources = resources.clone();
1559        async move { Ok::<_, Infallible>(handler(req, &resources).await) }
1560    });
1561
1562    let mut service = BoxCloneService::new(base_service);
1563    for layer in layers.iter() {
1564        service = layer(service);
1565    }
1566    service
1567}
1568
1569fn merge_layers(
1570    global_layers: &Arc<Vec<ServiceLayer>>,
1571    route_layers: &Arc<Vec<ServiceLayer>>,
1572) -> Arc<Vec<ServiceLayer>> {
1573    if global_layers.is_empty() {
1574        return route_layers.clone();
1575    }
1576    if route_layers.is_empty() {
1577        return global_layers.clone();
1578    }
1579
1580    let mut combined = Vec::with_capacity(global_layers.len() + route_layers.len());
1581    combined.extend(global_layers.iter().cloned());
1582    combined.extend(route_layers.iter().cloned());
1583    Arc::new(combined)
1584}
1585
1586async fn maybe_handle_health_request<R>(
1587    method: &Method,
1588    path: &str,
1589    health: &HealthConfig<R>,
1590    resources: Arc<R>,
1591) -> Option<Response<Full<Bytes>>>
1592where
1593    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1594{
1595    if method != Method::GET {
1596        return None;
1597    }
1598
1599    if let Some(liveness_path) = health.liveness_path.as_ref() {
1600        if path == liveness_path {
1601            return Some(health_json_response("liveness", true, Vec::new()));
1602        }
1603    }
1604
1605    if let Some(readiness_path) = health.readiness_path.as_ref() {
1606        if path == readiness_path {
1607            let (healthy, checks) = run_named_health_checks(&health.checks, resources).await;
1608            return Some(health_json_response("readiness", healthy, checks));
1609        }
1610    }
1611
1612    if let Some(health_path) = health.health_path.as_ref() {
1613        if path == health_path {
1614            let (healthy, checks) = run_named_health_checks(&health.checks, resources).await;
1615            return Some(health_json_response("health", healthy, checks));
1616        }
1617    }
1618
1619    None
1620}
1621
1622async fn maybe_handle_static_request(
1623    req: Request<Incoming>,
1624    method: &Method,
1625    path: &str,
1626    static_assets: &StaticAssetsConfig,
1627) -> Result<Request<Incoming>, Response<Full<Bytes>>> {
1628    if method != Method::GET && method != Method::HEAD {
1629        return Ok(req);
1630    }
1631
1632    if let Some(mount) = static_assets
1633        .mounts
1634        .iter()
1635        .find(|mount| strip_mount_prefix(path, &mount.route_prefix).is_some())
1636    {
1637        let accept_encoding = req.headers().get(http::header::ACCEPT_ENCODING).cloned();
1638        let Some(stripped_path) = strip_mount_prefix(path, &mount.route_prefix) else {
1639            return Ok(req);
1640        };
1641        let rewritten = rewrite_request_path(req, &stripped_path);
1642        let service = ServeDir::new(&mount.directory);
1643        let response = match service.oneshot(rewritten).await {
1644            Ok(response) => response,
1645            Err(_) => {
1646                return Err(Response::builder()
1647                    .status(StatusCode::INTERNAL_SERVER_ERROR)
1648                    .body(Full::new(Bytes::from("Failed to serve static asset")))
1649                    .unwrap_or_else(|_| Response::new(Full::new(Bytes::new()))));
1650            }
1651        };
1652        let response =
1653            collect_static_response(response, static_assets.cache_control.as_deref()).await;
1654        return Err(maybe_compress_static_response(
1655            response,
1656            accept_encoding,
1657            static_assets.enable_compression,
1658        )
1659        .await);
1660    }
1661
1662    if let Some(spa_file) = static_assets.spa_fallback.as_ref() {
1663        if looks_like_spa_request(path) {
1664            let accept_encoding = req.headers().get(http::header::ACCEPT_ENCODING).cloned();
1665            let service = ServeFile::new(spa_file);
1666            let response = match service.oneshot(req).await {
1667                Ok(response) => response,
1668                Err(_) => {
1669                    return Err(Response::builder()
1670                        .status(StatusCode::INTERNAL_SERVER_ERROR)
1671                        .body(Full::new(Bytes::from("Failed to serve SPA fallback")))
1672                        .unwrap_or_else(|_| Response::new(Full::new(Bytes::new()))));
1673                }
1674            };
1675            let response =
1676                collect_static_response(response, static_assets.cache_control.as_deref()).await;
1677            return Err(maybe_compress_static_response(
1678                response,
1679                accept_encoding,
1680                static_assets.enable_compression,
1681            )
1682            .await);
1683        }
1684    }
1685
1686    Ok(req)
1687}
1688
1689fn strip_mount_prefix(path: &str, prefix: &str) -> Option<String> {
1690    let normalized_prefix = if prefix == "/" {
1691        "/"
1692    } else {
1693        prefix.trim_end_matches('/')
1694    };
1695
1696    if normalized_prefix == "/" {
1697        return Some(path.to_string());
1698    }
1699
1700    if path == normalized_prefix {
1701        return Some("/".to_string());
1702    }
1703
1704    let with_slash = format!("{normalized_prefix}/");
1705    path.strip_prefix(&with_slash)
1706        .map(|stripped| format!("/{}", stripped))
1707}
1708
1709fn rewrite_request_path(mut req: Request<Incoming>, new_path: &str) -> Request<Incoming> {
1710    let query = req.uri().query().map(str::to_string);
1711    let path_and_query = match query {
1712        Some(query) => format!("{new_path}?{query}"),
1713        None => new_path.to_string(),
1714    };
1715
1716    let mut parts = req.uri().clone().into_parts();
1717    if let Ok(parsed_path_and_query) = path_and_query.parse() {
1718        parts.path_and_query = Some(parsed_path_and_query);
1719        if let Ok(uri) = Uri::from_parts(parts) {
1720            *req.uri_mut() = uri;
1721        }
1722    }
1723
1724    req
1725}
1726
1727async fn collect_static_response<B>(
1728    response: Response<B>,
1729    cache_control: Option<&str>,
1730) -> Response<Full<Bytes>>
1731where
1732    B: Body<Data = Bytes> + Send + 'static,
1733    B::Error: std::fmt::Display,
1734{
1735    let status = response.status();
1736    let headers = response.headers().clone();
1737    let body = response.into_body();
1738    let collected = body.collect().await;
1739
1740    let bytes = match collected {
1741        Ok(value) => value.to_bytes(),
1742        Err(error) => Bytes::from(error.to_string()),
1743    };
1744
1745    let mut builder = Response::builder().status(status);
1746    for (name, value) in headers.iter() {
1747        builder = builder.header(name, value);
1748    }
1749
1750    let mut response = builder
1751        .body(Full::new(bytes))
1752        .unwrap_or_else(|_| Response::new(Full::new(Bytes::new())));
1753
1754    if status == StatusCode::OK {
1755        if let Some(value) = cache_control {
1756            if !response.headers().contains_key(http::header::CACHE_CONTROL) {
1757                if let Ok(header_value) = http::HeaderValue::from_str(value) {
1758                    response
1759                        .headers_mut()
1760                        .insert(http::header::CACHE_CONTROL, header_value);
1761                }
1762            }
1763        }
1764    }
1765
1766    response
1767}
1768
1769fn looks_like_spa_request(path: &str) -> bool {
1770    let tail = path.rsplit('/').next().unwrap_or_default();
1771    !tail.contains('.')
1772}
1773
1774async fn maybe_compress_static_response(
1775    response: Response<Full<Bytes>>,
1776    accept_encoding: Option<http::HeaderValue>,
1777    enable_compression: bool,
1778) -> Response<Full<Bytes>> {
1779    if !enable_compression {
1780        return response;
1781    }
1782
1783    let Some(accept_encoding) = accept_encoding else {
1784        return response;
1785    };
1786
1787    let mut request = Request::builder()
1788        .uri("/")
1789        .body(Full::new(Bytes::new()))
1790        .unwrap_or_else(|_| Request::new(Full::new(Bytes::new())));
1791    request
1792        .headers_mut()
1793        .insert(http::header::ACCEPT_ENCODING, accept_encoding);
1794
1795    let service = CompressionLayer::new().layer(service_fn({
1796        let response = response.clone();
1797        move |_req: Request<Full<Bytes>>| {
1798            let response = response.clone();
1799            async move { Ok::<_, Infallible>(response) }
1800        }
1801    }));
1802
1803    match service.oneshot(request).await {
1804        Ok(compressed) => collect_static_response(compressed, None).await,
1805        Err(_) => response,
1806    }
1807}
1808
1809async fn run_named_health_checks<R>(
1810    checks: &[NamedHealthCheck<R>],
1811    resources: Arc<R>,
1812) -> (bool, Vec<HealthCheckReport>)
1813where
1814    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1815{
1816    let mut reports = Vec::with_capacity(checks.len());
1817    let mut healthy = true;
1818
1819    for check in checks {
1820        match (check.check)(resources.clone()).await {
1821            Ok(()) => reports.push(HealthCheckReport {
1822                name: check.name.clone(),
1823                status: "ok",
1824                error: None,
1825            }),
1826            Err(error) => {
1827                healthy = false;
1828                reports.push(HealthCheckReport {
1829                    name: check.name.clone(),
1830                    status: "error",
1831                    error: Some(error),
1832                });
1833            }
1834        }
1835    }
1836
1837    (healthy, reports)
1838}
1839
1840fn health_json_response(
1841    probe: &'static str,
1842    healthy: bool,
1843    checks: Vec<HealthCheckReport>,
1844) -> Response<Full<Bytes>> {
1845    let status_code = if healthy {
1846        StatusCode::OK
1847    } else {
1848        StatusCode::SERVICE_UNAVAILABLE
1849    };
1850    let status = if healthy { "ok" } else { "degraded" };
1851    let payload = HealthReport {
1852        status,
1853        probe,
1854        checks,
1855    };
1856
1857    let body = serde_json::to_vec(&payload)
1858        .unwrap_or_else(|_| br#"{"status":"error","probe":"health"}"#.to_vec());
1859
1860    Response::builder()
1861        .status(status_code)
1862        .header(http::header::CONTENT_TYPE, "application/json")
1863        .body(Full::new(Bytes::from(body)))
1864        .unwrap()
1865}
1866
1867async fn shutdown_signal() {
1868    #[cfg(unix)]
1869    {
1870        use tokio::signal::unix::{SignalKind, signal};
1871
1872        match signal(SignalKind::terminate()) {
1873            Ok(mut terminate) => {
1874                tokio::select! {
1875                    _ = tokio::signal::ctrl_c() => {}
1876                    _ = terminate.recv() => {}
1877                }
1878            }
1879            Err(err) => {
1880                tracing::warn!("Failed to install SIGTERM handler: {:?}", err);
1881                if let Err(ctrl_c_err) = tokio::signal::ctrl_c().await {
1882                    tracing::warn!("Failed to listen for Ctrl+C: {:?}", ctrl_c_err);
1883                }
1884            }
1885        }
1886    }
1887
1888    #[cfg(not(unix))]
1889    {
1890        if let Err(err) = tokio::signal::ctrl_c().await {
1891            tracing::warn!("Failed to listen for Ctrl+C: {:?}", err);
1892        }
1893    }
1894}
1895
1896async fn drain_connections(
1897    connections: &mut tokio::task::JoinSet<()>,
1898    graceful_shutdown_timeout: Duration,
1899) -> bool {
1900    if connections.is_empty() {
1901        return false;
1902    }
1903
1904    let drain_result = tokio::time::timeout(graceful_shutdown_timeout, async {
1905        while let Some(join_result) = connections.join_next().await {
1906            if let Err(err) = join_result {
1907                tracing::warn!("Connection task join error during shutdown: {:?}", err);
1908            }
1909        }
1910    })
1911    .await;
1912
1913    if drain_result.is_err() {
1914        tracing::warn!(
1915            "Graceful shutdown timeout reached ({:?}). Aborting remaining connections.",
1916            graceful_shutdown_timeout
1917        );
1918        connections.abort_all();
1919        while let Some(join_result) = connections.join_next().await {
1920            if let Err(err) = join_result {
1921                tracing::warn!("Connection task abort join error: {:?}", err);
1922            }
1923        }
1924        true
1925    } else {
1926        false
1927    }
1928}
1929
1930impl<R> Default for HttpIngress<R>
1931where
1932    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1933{
1934    fn default() -> Self {
1935        Self::new()
1936    }
1937}
1938
1939/// Internal service type for `into_raw_service()`
1940#[derive(Clone)]
1941pub struct RawIngressService<R> {
1942    routes: Arc<Vec<RouteEntry<R>>>,
1943    fallback: Option<RouteHandler<R>>,
1944    layers: Arc<Vec<ServiceLayer>>,
1945    health: Arc<HealthConfig<R>>,
1946    static_assets: Arc<StaticAssetsConfig>,
1947    resources: Arc<R>,
1948}
1949
1950impl<R> Service<Request<Incoming>> for RawIngressService<R>
1951where
1952    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
1953{
1954    type Response = Response<Full<Bytes>>;
1955    type Error = Infallible;
1956    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
1957
1958    fn poll_ready(
1959        &mut self,
1960        _cx: &mut std::task::Context<'_>,
1961    ) -> std::task::Poll<Result<(), Self::Error>> {
1962        std::task::Poll::Ready(Ok(()))
1963    }
1964
1965    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
1966        let routes = self.routes.clone();
1967        let fallback = self.fallback.clone();
1968        let layers = self.layers.clone();
1969        let health = self.health.clone();
1970        let static_assets = self.static_assets.clone();
1971        let resources = self.resources.clone();
1972
1973        Box::pin(async move {
1974            let service =
1975                build_http_service(routes, fallback, resources, layers, health, static_assets);
1976            service.oneshot(req).await
1977        })
1978    }
1979}
1980
1981#[cfg(test)]
1982mod tests {
1983    use super::*;
1984    use async_trait::async_trait;
1985    use futures_util::{SinkExt, StreamExt};
1986    use ranvier_observe::{HttpMetrics, HttpMetricsLayer, IncomingTraceContext, TraceContextLayer};
1987    use serde::Deserialize;
1988    use std::fs;
1989    use std::sync::atomic::{AtomicBool, Ordering};
1990    use tempfile::tempdir;
1991    use tokio::io::{AsyncReadExt, AsyncWriteExt};
1992    use tokio_tungstenite::tungstenite::Message as WsClientMessage;
1993    use tokio_tungstenite::tungstenite::client::IntoClientRequest;
1994
1995    async fn connect_with_retry(addr: std::net::SocketAddr) -> tokio::net::TcpStream {
1996        let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
1997
1998        loop {
1999            match tokio::net::TcpStream::connect(addr).await {
2000                Ok(stream) => return stream,
2001                Err(error) => {
2002                    if tokio::time::Instant::now() >= deadline {
2003                        panic!("connect server: {error}");
2004                    }
2005                    tokio::time::sleep(Duration::from_millis(25)).await;
2006                }
2007            }
2008        }
2009    }
2010
2011    #[test]
2012    fn route_pattern_matches_static_path() {
2013        let pattern = RoutePattern::parse("/orders/list");
2014        let params = pattern.match_path("/orders/list").expect("should match");
2015        assert!(params.into_inner().is_empty());
2016    }
2017
2018    #[test]
2019    fn route_pattern_matches_param_segments() {
2020        let pattern = RoutePattern::parse("/orders/:id/items/:item_id");
2021        let params = pattern
2022            .match_path("/orders/42/items/sku-123")
2023            .expect("should match");
2024        assert_eq!(params.get("id"), Some("42"));
2025        assert_eq!(params.get("item_id"), Some("sku-123"));
2026    }
2027
2028    #[test]
2029    fn route_pattern_matches_wildcard_segment() {
2030        let pattern = RoutePattern::parse("/assets/*path");
2031        let params = pattern
2032            .match_path("/assets/css/theme/light.css")
2033            .expect("should match");
2034        assert_eq!(params.get("path"), Some("css/theme/light.css"));
2035    }
2036
2037    #[test]
2038    fn route_pattern_rejects_non_matching_path() {
2039        let pattern = RoutePattern::parse("/orders/:id");
2040        assert!(pattern.match_path("/users/42").is_none());
2041    }
2042
2043    #[test]
2044    fn graceful_shutdown_timeout_defaults_to_30_seconds() {
2045        let ingress = HttpIngress::<()>::new();
2046        assert_eq!(ingress.graceful_shutdown_timeout, Duration::from_secs(30));
2047        assert!(ingress.layers.is_empty());
2048        assert!(ingress.bus_injectors.is_empty());
2049        assert!(ingress.static_assets.mounts.is_empty());
2050        assert!(ingress.on_start.is_none());
2051        assert!(ingress.on_shutdown.is_none());
2052    }
2053
2054    #[test]
2055    fn layer_registration_stacks_globally() {
2056        let ingress = HttpIngress::<()>::new()
2057            .layer(tower::layer::util::Identity::new())
2058            .layer(tower::layer::util::Identity::new());
2059        assert_eq!(ingress.layers.len(), 2);
2060    }
2061
2062    #[test]
2063    fn layer_accepts_tower_http_cors_layer() {
2064        let ingress = HttpIngress::<()>::new().layer(tower_http::cors::CorsLayer::permissive());
2065        assert_eq!(ingress.layers.len(), 1);
2066    }
2067
2068    #[test]
2069    fn route_without_layer_keeps_empty_route_middleware_stack() {
2070        let ingress =
2071            HttpIngress::<()>::new().get("/ping", Axon::<(), (), Infallible, ()>::new("Ping"));
2072        assert_eq!(ingress.routes.len(), 1);
2073        assert!(ingress.routes[0].layers.is_empty());
2074        assert!(ingress.routes[0].apply_global_layers);
2075    }
2076
2077    #[test]
2078    fn route_with_layer_registers_route_middleware_stack() {
2079        let ingress = HttpIngress::<()>::new().get_with_layer(
2080            "/ping",
2081            Axon::<(), (), Infallible, ()>::new("Ping"),
2082            tower::layer::util::Identity::new(),
2083        );
2084        assert_eq!(ingress.routes.len(), 1);
2085        assert_eq!(ingress.routes[0].layers.len(), 1);
2086        assert!(ingress.routes[0].apply_global_layers);
2087    }
2088
2089    #[test]
2090    fn route_with_layer_override_disables_global_layers() {
2091        let ingress = HttpIngress::<()>::new().get_with_layer_override(
2092            "/ping",
2093            Axon::<(), (), Infallible, ()>::new("Ping"),
2094            tower::layer::util::Identity::new(),
2095        );
2096        assert_eq!(ingress.routes.len(), 1);
2097        assert_eq!(ingress.routes[0].layers.len(), 1);
2098        assert!(!ingress.routes[0].apply_global_layers);
2099    }
2100
2101    #[test]
2102    fn timeout_layer_registers_builtin_middleware() {
2103        let ingress = HttpIngress::<()>::new().timeout_layer(Duration::from_secs(1));
2104        assert_eq!(ingress.layers.len(), 1);
2105    }
2106
2107    #[test]
2108    fn request_id_layer_registers_builtin_middleware() {
2109        let ingress = HttpIngress::<()>::new().request_id_layer();
2110        assert_eq!(ingress.layers.len(), 1);
2111    }
2112
2113    #[test]
2114    fn compression_layer_registers_builtin_middleware() {
2115        let ingress = HttpIngress::<()>::new().compression_layer();
2116        assert!(ingress.static_assets.enable_compression);
2117    }
2118
2119    #[test]
2120    fn bus_injector_registration_adds_hook() {
2121        let ingress = HttpIngress::<()>::new().bus_injector(|_req, bus| {
2122            bus.insert("ok".to_string());
2123        });
2124        assert_eq!(ingress.bus_injectors.len(), 1);
2125    }
2126
2127    #[test]
2128    fn ws_route_registers_get_route_pattern() {
2129        let ingress =
2130            HttpIngress::<()>::new().ws("/ws/events", |_socket, _resources, _bus| async {});
2131        assert_eq!(ingress.routes.len(), 1);
2132        assert_eq!(ingress.routes[0].method, Method::GET);
2133        assert_eq!(ingress.routes[0].pattern.raw, "/ws/events");
2134    }
2135
2136    #[derive(Debug, Deserialize)]
2137    struct WsWelcomeFrame {
2138        connection_id: String,
2139        path: String,
2140        tenant: String,
2141    }
2142
2143    #[tokio::test]
2144    async fn ws_route_upgrades_and_bridges_event_source_sink_with_connection_bus() {
2145        let probe = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
2146        let addr = probe.local_addr().expect("local addr");
2147        drop(probe);
2148
2149        let ingress = HttpIngress::<()>::new()
2150            .bind(addr.to_string())
2151            .bus_injector(|req, bus| {
2152                if let Some(value) = req
2153                    .headers()
2154                    .get("x-tenant-id")
2155                    .and_then(|v| v.to_str().ok())
2156                {
2157                    bus.insert(value.to_string());
2158                }
2159            })
2160            .ws("/ws/echo", |mut socket, _resources, bus| async move {
2161                let tenant = bus
2162                    .read::<String>()
2163                    .cloned()
2164                    .unwrap_or_else(|| "unknown".to_string());
2165                if let Some(session) = bus.read::<WebSocketSessionContext>() {
2166                    let welcome = serde_json::json!({
2167                        "connection_id": session.connection_id().to_string(),
2168                        "path": session.path(),
2169                        "tenant": tenant,
2170                    });
2171                    let _ = socket.send_json(&welcome).await;
2172                }
2173
2174                while let Some(event) = socket.next_event().await {
2175                    match event {
2176                        WebSocketEvent::Text(text) => {
2177                            let _ = socket.send_event(format!("echo:{text}")).await;
2178                        }
2179                        WebSocketEvent::Binary(bytes) => {
2180                            let _ = socket.send_event(bytes).await;
2181                        }
2182                        WebSocketEvent::Close => break,
2183                        WebSocketEvent::Ping(_) | WebSocketEvent::Pong(_) => {}
2184                    }
2185                }
2186            });
2187
2188        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
2189        let server = tokio::spawn(async move {
2190            ingress
2191                .run_with_shutdown_signal((), async move {
2192                    let _ = shutdown_rx.await;
2193                })
2194                .await
2195        });
2196
2197        let ws_uri = format!("ws://{addr}/ws/echo?room=alpha");
2198        let mut ws_request = ws_uri
2199            .as_str()
2200            .into_client_request()
2201            .expect("ws client request");
2202        ws_request
2203            .headers_mut()
2204            .insert("x-tenant-id", http::HeaderValue::from_static("acme"));
2205        let (mut client, _response) = tokio_tungstenite::connect_async(ws_request)
2206            .await
2207            .expect("websocket connect");
2208
2209        let welcome = client
2210            .next()
2211            .await
2212            .expect("welcome frame")
2213            .expect("welcome frame ok");
2214        let welcome_text = match welcome {
2215            WsClientMessage::Text(text) => text.to_string(),
2216            other => panic!("expected text welcome frame, got {other:?}"),
2217        };
2218        let welcome_payload: WsWelcomeFrame =
2219            serde_json::from_str(&welcome_text).expect("welcome json");
2220        assert_eq!(welcome_payload.path, "/ws/echo");
2221        assert_eq!(welcome_payload.tenant, "acme");
2222        assert!(!welcome_payload.connection_id.is_empty());
2223
2224        client
2225            .send(WsClientMessage::Text("hello".into()))
2226            .await
2227            .expect("send text");
2228        let echo_text = client
2229            .next()
2230            .await
2231            .expect("echo text frame")
2232            .expect("echo text frame ok");
2233        assert_eq!(echo_text, WsClientMessage::Text("echo:hello".into()));
2234
2235        client
2236            .send(WsClientMessage::Binary(vec![1, 2, 3, 4].into()))
2237            .await
2238            .expect("send binary");
2239        let echo_binary = client
2240            .next()
2241            .await
2242            .expect("echo binary frame")
2243            .expect("echo binary frame ok");
2244        assert_eq!(
2245            echo_binary,
2246            WsClientMessage::Binary(vec![1, 2, 3, 4].into())
2247        );
2248
2249        client.close(None).await.expect("close websocket");
2250
2251        let _ = shutdown_tx.send(());
2252        server
2253            .await
2254            .expect("server join")
2255            .expect("server shutdown should succeed");
2256    }
2257
2258    #[derive(Clone)]
2259    struct EchoTrace;
2260
2261    #[async_trait]
2262    impl Transition<(), String> for EchoTrace {
2263        type Error = Infallible;
2264        type Resources = ();
2265
2266        async fn run(
2267            &self,
2268            _state: (),
2269            _resources: &Self::Resources,
2270            bus: &mut Bus,
2271        ) -> Outcome<String, Self::Error> {
2272            let trace_id = bus
2273                .read::<String>()
2274                .cloned()
2275                .unwrap_or_else(|| "missing-trace".to_string());
2276            Outcome::next(trace_id)
2277        }
2278    }
2279
2280    #[tokio::test]
2281    async fn observe_trace_context_and_metrics_layers_work_with_ingress() {
2282        let metrics = HttpMetrics::default();
2283        let ingress = HttpIngress::<()>::new()
2284            .layer(TraceContextLayer::new())
2285            .layer(HttpMetricsLayer::new(metrics.clone()))
2286            .bus_injector(|req, bus| {
2287                if let Some(trace) = req.extensions().get::<IncomingTraceContext>() {
2288                    bus.insert(trace.trace_id().to_string());
2289                }
2290            })
2291            .get(
2292                "/trace",
2293                Axon::<(), (), Infallible, ()>::new("EchoTrace").then(EchoTrace),
2294            );
2295
2296        let app = crate::test_harness::TestApp::new(ingress, ());
2297        let response = app
2298            .send(crate::test_harness::TestRequest::get("/trace").header(
2299                "traceparent",
2300                "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
2301            ))
2302            .await
2303            .expect("request should succeed");
2304
2305        assert_eq!(response.status(), StatusCode::OK);
2306        assert_eq!(
2307            response.text().expect("utf8 response"),
2308            "4bf92f3577b34da6a3ce929d0e0e4736"
2309        );
2310
2311        let snapshot = metrics.snapshot();
2312        assert_eq!(snapshot.requests_total, 1);
2313        assert_eq!(snapshot.requests_error, 0);
2314    }
2315
2316    #[test]
2317    fn route_descriptors_export_http_and_health_paths() {
2318        let ingress = HttpIngress::<()>::new()
2319            .get(
2320                "/orders/:id",
2321                Axon::<(), (), Infallible, ()>::new("OrderById"),
2322            )
2323            .health_endpoint("/healthz")
2324            .readiness_liveness("/readyz", "/livez");
2325
2326        let descriptors = ingress.route_descriptors();
2327
2328        assert!(
2329            descriptors
2330                .iter()
2331                .any(|descriptor| descriptor.method() == Method::GET
2332                    && descriptor.path_pattern() == "/orders/:id")
2333        );
2334        assert!(
2335            descriptors
2336                .iter()
2337                .any(|descriptor| descriptor.method() == Method::GET
2338                    && descriptor.path_pattern() == "/healthz")
2339        );
2340        assert!(
2341            descriptors
2342                .iter()
2343                .any(|descriptor| descriptor.method() == Method::GET
2344                    && descriptor.path_pattern() == "/readyz")
2345        );
2346        assert!(
2347            descriptors
2348                .iter()
2349                .any(|descriptor| descriptor.method() == Method::GET
2350                    && descriptor.path_pattern() == "/livez")
2351        );
2352    }
2353
2354    #[tokio::test]
2355    async fn lifecycle_hooks_fire_on_start_and_shutdown() {
2356        let started = Arc::new(AtomicBool::new(false));
2357        let shutdown = Arc::new(AtomicBool::new(false));
2358
2359        let started_flag = started.clone();
2360        let shutdown_flag = shutdown.clone();
2361
2362        let ingress = HttpIngress::<()>::new()
2363            .bind("127.0.0.1:0")
2364            .on_start(move || {
2365                started_flag.store(true, Ordering::SeqCst);
2366            })
2367            .on_shutdown(move || {
2368                shutdown_flag.store(true, Ordering::SeqCst);
2369            })
2370            .graceful_shutdown(Duration::from_millis(50));
2371
2372        ingress
2373            .run_with_shutdown_signal((), async {
2374                tokio::time::sleep(Duration::from_millis(20)).await;
2375            })
2376            .await
2377            .expect("server should exit gracefully");
2378
2379        assert!(started.load(Ordering::SeqCst));
2380        assert!(shutdown.load(Ordering::SeqCst));
2381    }
2382
2383    #[tokio::test]
2384    async fn graceful_shutdown_drains_in_flight_requests_before_exit() {
2385        #[derive(Clone)]
2386        struct SlowDrainRoute;
2387
2388        #[async_trait]
2389        impl Transition<(), &'static str> for SlowDrainRoute {
2390            type Error = Infallible;
2391            type Resources = ();
2392
2393            async fn run(
2394                &self,
2395                _state: (),
2396                _resources: &Self::Resources,
2397                _bus: &mut Bus,
2398            ) -> Outcome<&'static str, Self::Error> {
2399                tokio::time::sleep(Duration::from_millis(120)).await;
2400                Outcome::next("drained-ok")
2401            }
2402        }
2403
2404        let probe = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
2405        let addr = probe.local_addr().expect("local addr");
2406        drop(probe);
2407
2408        let ingress = HttpIngress::<()>::new()
2409            .bind(addr.to_string())
2410            .graceful_shutdown(Duration::from_millis(500))
2411            .get(
2412                "/drain",
2413                Axon::<(), (), Infallible, ()>::new("SlowDrain").then(SlowDrainRoute),
2414            );
2415
2416        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
2417        let server = tokio::spawn(async move {
2418            ingress
2419                .run_with_shutdown_signal((), async move {
2420                    let _ = shutdown_rx.await;
2421                })
2422                .await
2423        });
2424
2425        let mut stream = connect_with_retry(addr).await;
2426        stream
2427            .write_all(b"GET /drain HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
2428            .await
2429            .expect("write request");
2430
2431        tokio::time::sleep(Duration::from_millis(20)).await;
2432        let _ = shutdown_tx.send(());
2433
2434        let mut buf = Vec::new();
2435        stream.read_to_end(&mut buf).await.expect("read response");
2436        let response = String::from_utf8_lossy(&buf);
2437        assert!(response.starts_with("HTTP/1.1 200"), "{response}");
2438        assert!(response.contains("drained-ok"), "{response}");
2439
2440        server
2441            .await
2442            .expect("server join")
2443            .expect("server shutdown should succeed");
2444    }
2445
2446    #[tokio::test]
2447    async fn serve_dir_serves_static_file_with_cache_and_metadata_headers() {
2448        let temp = tempdir().expect("tempdir");
2449        let root = temp.path().join("public");
2450        fs::create_dir_all(&root).expect("create dir");
2451        let file = root.join("hello.txt");
2452        fs::write(&file, "hello static").expect("write file");
2453
2454        let ingress =
2455            Ranvier::http::<()>().serve_dir("/static", root.to_string_lossy().to_string());
2456        let app = crate::test_harness::TestApp::new(ingress, ());
2457        let response = app
2458            .send(crate::test_harness::TestRequest::get("/static/hello.txt"))
2459            .await
2460            .expect("request should succeed");
2461
2462        assert_eq!(response.status(), StatusCode::OK);
2463        assert_eq!(response.text().expect("utf8"), "hello static");
2464        assert!(response.header("cache-control").is_some());
2465        let has_metadata_header =
2466            response.header("etag").is_some() || response.header("last-modified").is_some();
2467        assert!(has_metadata_header);
2468    }
2469
2470    #[tokio::test]
2471    async fn spa_fallback_returns_index_for_unmatched_path() {
2472        let temp = tempdir().expect("tempdir");
2473        let index = temp.path().join("index.html");
2474        fs::write(&index, "<html><body>spa</body></html>").expect("write index");
2475
2476        let ingress = Ranvier::http::<()>().spa_fallback(index.to_string_lossy().to_string());
2477        let app = crate::test_harness::TestApp::new(ingress, ());
2478        let response = app
2479            .send(crate::test_harness::TestRequest::get("/dashboard/settings"))
2480            .await
2481            .expect("request should succeed");
2482
2483        assert_eq!(response.status(), StatusCode::OK);
2484        assert!(response.text().expect("utf8").contains("spa"));
2485    }
2486
2487    #[tokio::test]
2488    async fn static_compression_layer_sets_content_encoding_for_gzip_client() {
2489        let temp = tempdir().expect("tempdir");
2490        let root = temp.path().join("public");
2491        fs::create_dir_all(&root).expect("create dir");
2492        let file = root.join("compressed.txt");
2493        fs::write(&file, "compress me ".repeat(400)).expect("write file");
2494
2495        let ingress = Ranvier::http::<()>()
2496            .serve_dir("/static", root.to_string_lossy().to_string())
2497            .compression_layer();
2498        let app = crate::test_harness::TestApp::new(ingress, ());
2499        let response = app
2500            .send(
2501                crate::test_harness::TestRequest::get("/static/compressed.txt")
2502                    .header("accept-encoding", "gzip"),
2503            )
2504            .await
2505            .expect("request should succeed");
2506
2507        assert_eq!(response.status(), StatusCode::OK);
2508        assert_eq!(
2509            response
2510                .header("content-encoding")
2511                .and_then(|value| value.to_str().ok()),
2512            Some("gzip")
2513        );
2514    }
2515
2516    #[tokio::test]
2517    async fn drain_connections_completes_before_timeout() {
2518        let mut connections = tokio::task::JoinSet::new();
2519        connections.spawn(async {
2520            tokio::time::sleep(Duration::from_millis(20)).await;
2521        });
2522
2523        let timed_out = drain_connections(&mut connections, Duration::from_millis(200)).await;
2524        assert!(!timed_out);
2525        assert!(connections.is_empty());
2526    }
2527
2528    #[tokio::test]
2529    async fn drain_connections_times_out_and_aborts() {
2530        let mut connections = tokio::task::JoinSet::new();
2531        connections.spawn(async {
2532            tokio::time::sleep(Duration::from_secs(10)).await;
2533        });
2534
2535        let timed_out = drain_connections(&mut connections, Duration::from_millis(10)).await;
2536        assert!(timed_out);
2537        assert!(connections.is_empty());
2538    }
2539
2540    #[tokio::test]
2541    async fn timeout_layer_returns_408_for_slow_route() {
2542        #[derive(Clone)]
2543        struct SlowRoute;
2544
2545        #[async_trait]
2546        impl Transition<(), &'static str> for SlowRoute {
2547            type Error = Infallible;
2548            type Resources = ();
2549
2550            async fn run(
2551                &self,
2552                _state: (),
2553                _resources: &Self::Resources,
2554                _bus: &mut Bus,
2555            ) -> Outcome<&'static str, Self::Error> {
2556                tokio::time::sleep(Duration::from_millis(80)).await;
2557                Outcome::next("slow-ok")
2558            }
2559        }
2560
2561        let probe = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
2562        let addr = probe.local_addr().expect("local addr");
2563        drop(probe);
2564
2565        let ingress = HttpIngress::<()>::new()
2566            .bind(addr.to_string())
2567            .timeout_layer(Duration::from_millis(10))
2568            .get(
2569                "/slow",
2570                Axon::<(), (), Infallible, ()>::new("Slow").then(SlowRoute),
2571            );
2572
2573        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
2574        let server = tokio::spawn(async move {
2575            ingress
2576                .run_with_shutdown_signal((), async move {
2577                    let _ = shutdown_rx.await;
2578                })
2579                .await
2580        });
2581
2582        let mut stream = connect_with_retry(addr).await;
2583        stream
2584            .write_all(b"GET /slow HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
2585            .await
2586            .expect("write request");
2587
2588        let mut buf = Vec::new();
2589        stream.read_to_end(&mut buf).await.expect("read response");
2590        let response = String::from_utf8_lossy(&buf);
2591        assert!(response.starts_with("HTTP/1.1 408"), "{response}");
2592
2593        let _ = shutdown_tx.send(());
2594        server
2595            .await
2596            .expect("server join")
2597            .expect("server shutdown should succeed");
2598    }
2599
2600    #[tokio::test]
2601    async fn route_layer_override_bypasses_global_timeout() {
2602        #[derive(Clone)]
2603        struct SlowRoute;
2604
2605        #[async_trait]
2606        impl Transition<(), &'static str> for SlowRoute {
2607            type Error = Infallible;
2608            type Resources = ();
2609
2610            async fn run(
2611                &self,
2612                _state: (),
2613                _resources: &Self::Resources,
2614                _bus: &mut Bus,
2615            ) -> Outcome<&'static str, Self::Error> {
2616                tokio::time::sleep(Duration::from_millis(60)).await;
2617                Outcome::next("override-ok")
2618            }
2619        }
2620
2621        let probe = std::net::TcpListener::bind("127.0.0.1:0").expect("bind probe");
2622        let addr = probe.local_addr().expect("local addr");
2623        drop(probe);
2624
2625        let ingress = HttpIngress::<()>::new()
2626            .bind(addr.to_string())
2627            .timeout_layer(Duration::from_millis(10))
2628            .get_with_layer_override(
2629                "/slow",
2630                Axon::<(), (), Infallible, ()>::new("SlowOverride").then(SlowRoute),
2631                tower::layer::util::Identity::new(),
2632            );
2633
2634        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
2635        let server = tokio::spawn(async move {
2636            ingress
2637                .run_with_shutdown_signal((), async move {
2638                    let _ = shutdown_rx.await;
2639                })
2640                .await
2641        });
2642
2643        let mut stream = connect_with_retry(addr).await;
2644        stream
2645            .write_all(b"GET /slow HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n")
2646            .await
2647            .expect("write request");
2648
2649        let mut buf = Vec::new();
2650        stream.read_to_end(&mut buf).await.expect("read response");
2651        let response = String::from_utf8_lossy(&buf);
2652        assert!(response.starts_with("HTTP/1.1 200"), "{response}");
2653        assert!(response.contains("override-ok"), "{response}");
2654
2655        let _ = shutdown_tx.send(());
2656        server
2657            .await
2658            .expect("server join")
2659            .expect("server shutdown should succeed");
2660    }
2661}