Skip to main content

ranvier_http/
ingress.rs

1//! # Ingress Module - Flat API Entry Point
2//!
3//! Implements Discussion 193: `Ranvier::http()` is an **Ingress Circuit Builder**, not a web server.
4//!
5//! ## API Surface (MVP)
6//!
7//! - `bind(addr)` — Execution unit
8//! - `route(path, circuit)` — Core wiring
9//! - `fallback(circuit)` — Circuit completeness
10//! - `into_raw_service()` — Escape hatch to Raw API
11//!
12//! ## Flat API Principle (Discussion 192)
13//!
14//! User code depth ≤ 2. Complexity is isolated, not hidden.
15
16use bytes::Bytes;
17use http::{Method, Request, Response, StatusCode};
18use http_body_util::Full;
19use hyper::body::Incoming;
20use hyper::server::conn::http1;
21use hyper::service::service_fn;
22use hyper_util::rt::TokioIo;
23use ranvier_core::prelude::*;
24use ranvier_runtime::Axon;
25use std::collections::HashMap;
26use std::convert::Infallible;
27use std::future::Future;
28use std::net::SocketAddr;
29use std::pin::Pin;
30use std::sync::Arc;
31use tokio::net::TcpListener;
32use tower::Service;
33use tracing::Instrument;
34
35/// The Ranvier Framework entry point.
36///
37/// `Ranvier` provides static methods to create Ingress builders for various protocols.
38/// Currently only HTTP is supported.
39pub struct Ranvier;
40
41impl Ranvier {
42    /// Create an HTTP Ingress Circuit Builder.
43    pub fn http<R>() -> HttpIngress<R>
44    where
45        R: ranvier_core::transition::ResourceRequirement + Clone,
46    {
47        HttpIngress::new()
48    }
49}
50
51/// Route handler type: boxed async function returning Response
52type RouteHandler<R> = Arc<
53    dyn Fn(Request<Incoming>, &R) -> Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
54        + Send
55        + Sync,
56>;
57
58/// HTTP Ingress Circuit Builder.
59///
60/// Wires HTTP inputs to Ranvier Circuits. This is NOT a web server—it's a circuit wiring tool.
61///
62/// **Ingress is part of Schematic** (separate layer: Ingress → Circuit → Egress)
63pub struct HttpIngress<R = ()> {
64    /// Bind address (e.g., "127.0.0.1:3000")
65    addr: Option<String>,
66    /// Routes: (Method, Path) -> Handler
67    routes: HashMap<(Method, String), RouteHandler<R>>,
68    /// Fallback circuit for unmatched routes
69    fallback: Option<RouteHandler<R>>,
70    _phantom: std::marker::PhantomData<R>,
71}
72
73impl<R> HttpIngress<R>
74where
75    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
76{
77    /// Create a new empty HttpIngress builder.
78    pub fn new() -> Self {
79        Self {
80            addr: None,
81            routes: HashMap::new(),
82            fallback: None,
83            _phantom: std::marker::PhantomData,
84        }
85    }
86
87    /// Set the bind address for the server.
88    pub fn bind(mut self, addr: impl Into<String>) -> Self {
89        self.addr = Some(addr.into());
90        self
91    }
92
93    /// Register a route with a circuit.
94    pub fn route<E>(mut self, path: impl Into<String>, circuit: Axon<(), String, E, R>) -> Self
95    where
96        E: Send + 'static + std::fmt::Debug,
97    {
98        let path_str: String = path.into();
99        let circuit = Arc::new(circuit);
100        let path_for_map = path_str.clone();
101        let path_for_handler = path_str;
102
103        let handler: RouteHandler<R> = Arc::new(move |_req: Request<Incoming>, res: &R| {
104            let circuit = circuit.clone();
105            let res = res.clone(); // R must be Clone
106            let path = path_for_handler.clone();
107
108            Box::pin(async move {
109                let request_id = uuid::Uuid::new_v4().to_string();
110                let span = tracing::info_span!(
111                    "HTTPRequest",
112                    ranvier.http.method = %Method::GET,
113                    ranvier.http.path = %path,
114                    ranvier.http.request_id = %request_id
115                );
116
117                async move {
118                    let mut bus = Bus::new();
119                    let result = circuit.execute((), &res, &mut bus).await;
120
121                    match result {
122                        Outcome::Next(body) => Response::builder()
123                            .status(StatusCode::OK)
124                            .body(Full::new(Bytes::from(body)))
125                            .unwrap(),
126                        Outcome::Fault(e) => Response::builder()
127                            .status(StatusCode::INTERNAL_SERVER_ERROR)
128                            .body(Full::new(Bytes::from(format!("Error: {:?}", e))))
129                            .unwrap(),
130                        _ => Response::builder()
131                            .status(StatusCode::OK)
132                            .body(Full::new(Bytes::from("OK")))
133                            .unwrap(),
134                    }
135                }
136                .instrument(span)
137                .await
138            }) as Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
139        });
140
141        self.routes.insert((Method::GET, path_for_map), handler);
142        self
143    }
144    /// Register a route with a specific HTTP method.
145    ///
146    /// # Example
147    ///
148    /// ```rust,ignore
149    /// Ranvier::http()
150    ///     .route_method(Method::POST, "/users", create_user_circuit)
151    /// ```
152    pub fn route_method<E>(
153        mut self,
154        method: Method,
155        path: impl Into<String>,
156        circuit: Axon<(), String, E, R>,
157    ) -> Self
158    where
159        E: Send + 'static + std::fmt::Debug,
160    {
161        let path_str: String = path.into();
162        let circuit = Arc::new(circuit);
163        let path_for_map = path_str.clone();
164        let path_for_handler = path_str;
165        let method_for_map = method.clone();
166        let method_for_handler = method;
167
168        let handler: RouteHandler<R> = Arc::new(move |_req: Request<Incoming>, res: &R| {
169            let circuit = circuit.clone();
170            let res = res.clone();
171            let path = path_for_handler.clone();
172            let method = method_for_handler.clone();
173
174            Box::pin(async move {
175                let request_id = uuid::Uuid::new_v4().to_string();
176                let span = tracing::info_span!(
177                    "HTTPRequest",
178                    ranvier.http.method = %method,
179                    ranvier.http.path = %path,
180                    ranvier.http.request_id = %request_id
181                );
182
183                async move {
184                    let mut bus = Bus::new();
185                    let result = circuit.execute((), &res, &mut bus).await;
186
187                    match result {
188                        Outcome::Next(body) => Response::builder()
189                            .status(StatusCode::OK)
190                            .body(Full::new(Bytes::from(body)))
191                            .unwrap(),
192                        Outcome::Fault(e) => Response::builder()
193                            .status(StatusCode::INTERNAL_SERVER_ERROR)
194                            .body(Full::new(Bytes::from(format!("Error: {:?}", e))))
195                            .unwrap(),
196                        _ => Response::builder()
197                            .status(StatusCode::OK)
198                            .body(Full::new(Bytes::from("OK")))
199                            .unwrap(),
200                    }
201                }
202                .instrument(span)
203                .await
204            }) as Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
205        });
206
207        self.routes.insert((method_for_map, path_for_map), handler);
208        self
209    }
210
211    /// Set a fallback circuit for unmatched routes.
212    ///
213    /// # Example
214    ///
215    /// ```rust,ignore
216    /// let not_found = Axon::new("NotFound").then(|_| async { "404 Not Found" });
217    /// Ranvier::http()
218    ///     .route("/", home)
219    ///     .fallback(not_found)
220    /// ```
221    pub fn fallback<E>(mut self, circuit: Axon<(), String, E, R>) -> Self
222    where
223        E: Send + 'static + std::fmt::Debug,
224    {
225        let circuit = Arc::new(circuit);
226
227        let handler: RouteHandler<R> = Arc::new(move |_req: Request<Incoming>, res: &R| {
228            let circuit = circuit.clone();
229            let res = res.clone();
230            Box::pin(async move {
231                let request_id = uuid::Uuid::new_v4().to_string();
232                let span = tracing::info_span!(
233                    "HTTPRequest",
234                    ranvier.http.method = "FALLBACK",
235                    ranvier.http.request_id = %request_id
236                );
237
238                async move {
239                    let mut bus = Bus::new();
240                    let result = circuit.execute((), &res, &mut bus).await;
241
242                    match result {
243                        Outcome::Next(body) => Response::builder()
244                            .status(StatusCode::NOT_FOUND)
245                            .body(Full::new(Bytes::from(body)))
246                            .unwrap(),
247                        _ => Response::builder()
248                            .status(StatusCode::NOT_FOUND)
249                            .body(Full::new(Bytes::from("Not Found")))
250                            .unwrap(),
251                    }
252                }
253                .instrument(span)
254                .await
255            }) as Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
256        });
257
258        self.fallback = Some(handler);
259        self
260    }
261
262    /// Run the HTTP server with required resources.
263    pub async fn run(self, resources: R) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
264        let addr_str = self.addr.as_deref().unwrap_or("127.0.0.1:3000");
265        let addr: SocketAddr = addr_str.parse()?;
266
267        let routes = Arc::new(self.routes);
268        let fallback = self.fallback;
269        let resources = Arc::new(resources);
270
271        let listener = TcpListener::bind(addr).await?;
272        tracing::info!("Ranvier HTTP Ingress listening on http://{}", addr);
273
274        loop {
275            let (stream, _) = listener.accept().await?;
276            let io = TokioIo::new(stream);
277
278            let routes = routes.clone();
279            let fallback = fallback.clone();
280            let resources = resources.clone();
281
282            tokio::task::spawn(async move {
283                let resources = resources.clone();
284                let service = service_fn(move |req: Request<Incoming>| {
285                    let routes = routes.clone();
286                    let fallback = fallback.clone();
287                    let resources = resources.clone();
288
289                    async move {
290                        let method = req.method().clone();
291                        let path = req.uri().path().to_string();
292
293                        // Try to find a matching route
294                        if let Some(handler) = routes.get(&(method.clone(), path.clone())) {
295                            Ok::<_, Infallible>(handler(req, &resources).await)
296                        } else if let Some(ref fb) = fallback {
297                            Ok(fb(req, &resources).await)
298                        } else {
299                            // Default 404
300                            Ok(Response::builder()
301                                .status(StatusCode::NOT_FOUND)
302                                .body(Full::new(Bytes::from("Not Found")))
303                                .unwrap())
304                        }
305                    }
306                });
307
308                if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
309                    tracing::error!("Error serving connection: {:?}", err);
310                }
311            });
312        }
313    }
314
315    /// Convert to a raw Tower Service for integration with existing Tower stacks.
316    ///
317    /// This is the "escape hatch" per Discussion 193:
318    /// > "Raw API는 Flat API의 탈출구다."
319    ///
320    /// # Example
321    ///
322    /// ```rust,ignore
323    /// let ingress = Ranvier::http()
324    ///     .bind(":3000")
325    ///     .route("/", circuit);
326    ///
327    /// let raw_service = ingress.into_raw_service();
328    /// // Use raw_service with existing Tower infrastructure
329    /// ```
330    pub fn into_raw_service(self, resources: R) -> RawIngressService<R> {
331        let routes = Arc::new(self.routes);
332        let fallback = self.fallback;
333        let resources = Arc::new(resources);
334
335        RawIngressService {
336            routes,
337            fallback,
338            resources,
339        }
340    }
341}
342
343impl<R> Default for HttpIngress<R>
344where
345    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
346{
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352/// Internal service type for `into_raw_service()`
353#[derive(Clone)]
354pub struct RawIngressService<R> {
355    routes: Arc<HashMap<(Method, String), RouteHandler<R>>>,
356    fallback: Option<RouteHandler<R>>,
357    resources: Arc<R>,
358}
359
360impl<R> Service<Request<Incoming>> for RawIngressService<R>
361where
362    R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
363{
364    type Response = Response<Full<Bytes>>;
365    type Error = Infallible;
366    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
367
368    fn poll_ready(
369        &mut self,
370        _cx: &mut std::task::Context<'_>,
371    ) -> std::task::Poll<Result<(), Self::Error>> {
372        std::task::Poll::Ready(Ok(()))
373    }
374
375    fn call(&mut self, req: Request<Incoming>) -> Self::Future {
376        let routes = self.routes.clone();
377        let fallback = self.fallback.clone();
378        let resources = self.resources.clone();
379
380        Box::pin(async move {
381            let method = req.method().clone();
382            let path = req.uri().path().to_string();
383
384            if let Some(handler) = routes.get(&(method, path)) {
385                Ok(handler(req, &resources).await)
386            } else if let Some(ref fb) = fallback {
387                Ok(fb(req, &resources).await)
388            } else {
389                Ok(Response::builder()
390                    .status(StatusCode::NOT_FOUND)
391                    .body(Full::new(Bytes::from("Not Found")))
392                    .unwrap())
393            }
394        })
395    }
396}