1use bytes::Bytes;
17use http::{Method, Request, Response, StatusCode};
18use http_body_util::Full;
19use hyper::body::Incoming;
20use hyper::server::conn::http1;
21use hyper::service::service_fn;
22use hyper_util::rt::TokioIo;
23use ranvier_core::prelude::*;
24use ranvier_runtime::Axon;
25use std::collections::HashMap;
26use std::convert::Infallible;
27use std::future::Future;
28use std::net::SocketAddr;
29use std::pin::Pin;
30use std::sync::Arc;
31use tokio::net::TcpListener;
32use tower::Service;
33use tracing::Instrument;
34
35pub struct Ranvier;
40
41impl Ranvier {
42 pub fn http<R>() -> HttpIngress<R>
44 where
45 R: ranvier_core::transition::ResourceRequirement + Clone,
46 {
47 HttpIngress::new()
48 }
49}
50
51type RouteHandler<R> = Arc<
53 dyn Fn(Request<Incoming>, &R) -> Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
54 + Send
55 + Sync,
56>;
57
58pub struct HttpIngress<R = ()> {
64 addr: Option<String>,
66 routes: HashMap<(Method, String), RouteHandler<R>>,
68 fallback: Option<RouteHandler<R>>,
70 _phantom: std::marker::PhantomData<R>,
71}
72
73impl<R> HttpIngress<R>
74where
75 R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
76{
77 pub fn new() -> Self {
79 Self {
80 addr: None,
81 routes: HashMap::new(),
82 fallback: None,
83 _phantom: std::marker::PhantomData,
84 }
85 }
86
87 pub fn bind(mut self, addr: impl Into<String>) -> Self {
89 self.addr = Some(addr.into());
90 self
91 }
92
93 pub fn route<E>(mut self, path: impl Into<String>, circuit: Axon<(), String, E, R>) -> Self
95 where
96 E: Send + 'static + std::fmt::Debug,
97 {
98 let path_str: String = path.into();
99 let circuit = Arc::new(circuit);
100 let path_for_map = path_str.clone();
101 let path_for_handler = path_str;
102
103 let handler: RouteHandler<R> = Arc::new(move |_req: Request<Incoming>, res: &R| {
104 let circuit = circuit.clone();
105 let res = res.clone(); let path = path_for_handler.clone();
107
108 Box::pin(async move {
109 let request_id = uuid::Uuid::new_v4().to_string();
110 let span = tracing::info_span!(
111 "HTTPRequest",
112 ranvier.http.method = %Method::GET,
113 ranvier.http.path = %path,
114 ranvier.http.request_id = %request_id
115 );
116
117 async move {
118 let mut bus = Bus::new();
119 let result = circuit.execute((), &res, &mut bus).await;
120
121 match result {
122 Outcome::Next(body) => Response::builder()
123 .status(StatusCode::OK)
124 .body(Full::new(Bytes::from(body)))
125 .unwrap(),
126 Outcome::Fault(e) => Response::builder()
127 .status(StatusCode::INTERNAL_SERVER_ERROR)
128 .body(Full::new(Bytes::from(format!("Error: {:?}", e))))
129 .unwrap(),
130 _ => Response::builder()
131 .status(StatusCode::OK)
132 .body(Full::new(Bytes::from("OK")))
133 .unwrap(),
134 }
135 }
136 .instrument(span)
137 .await
138 }) as Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
139 });
140
141 self.routes.insert((Method::GET, path_for_map), handler);
142 self
143 }
144 pub fn route_method<E>(
153 mut self,
154 method: Method,
155 path: impl Into<String>,
156 circuit: Axon<(), String, E, R>,
157 ) -> Self
158 where
159 E: Send + 'static + std::fmt::Debug,
160 {
161 let path_str: String = path.into();
162 let circuit = Arc::new(circuit);
163 let path_for_map = path_str.clone();
164 let path_for_handler = path_str;
165 let method_for_map = method.clone();
166 let method_for_handler = method;
167
168 let handler: RouteHandler<R> = Arc::new(move |_req: Request<Incoming>, res: &R| {
169 let circuit = circuit.clone();
170 let res = res.clone();
171 let path = path_for_handler.clone();
172 let method = method_for_handler.clone();
173
174 Box::pin(async move {
175 let request_id = uuid::Uuid::new_v4().to_string();
176 let span = tracing::info_span!(
177 "HTTPRequest",
178 ranvier.http.method = %method,
179 ranvier.http.path = %path,
180 ranvier.http.request_id = %request_id
181 );
182
183 async move {
184 let mut bus = Bus::new();
185 let result = circuit.execute((), &res, &mut bus).await;
186
187 match result {
188 Outcome::Next(body) => Response::builder()
189 .status(StatusCode::OK)
190 .body(Full::new(Bytes::from(body)))
191 .unwrap(),
192 Outcome::Fault(e) => Response::builder()
193 .status(StatusCode::INTERNAL_SERVER_ERROR)
194 .body(Full::new(Bytes::from(format!("Error: {:?}", e))))
195 .unwrap(),
196 _ => Response::builder()
197 .status(StatusCode::OK)
198 .body(Full::new(Bytes::from("OK")))
199 .unwrap(),
200 }
201 }
202 .instrument(span)
203 .await
204 }) as Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
205 });
206
207 self.routes.insert((method_for_map, path_for_map), handler);
208 self
209 }
210
211 pub fn fallback<E>(mut self, circuit: Axon<(), String, E, R>) -> Self
222 where
223 E: Send + 'static + std::fmt::Debug,
224 {
225 let circuit = Arc::new(circuit);
226
227 let handler: RouteHandler<R> = Arc::new(move |_req: Request<Incoming>, res: &R| {
228 let circuit = circuit.clone();
229 let res = res.clone();
230 Box::pin(async move {
231 let request_id = uuid::Uuid::new_v4().to_string();
232 let span = tracing::info_span!(
233 "HTTPRequest",
234 ranvier.http.method = "FALLBACK",
235 ranvier.http.request_id = %request_id
236 );
237
238 async move {
239 let mut bus = Bus::new();
240 let result = circuit.execute((), &res, &mut bus).await;
241
242 match result {
243 Outcome::Next(body) => Response::builder()
244 .status(StatusCode::NOT_FOUND)
245 .body(Full::new(Bytes::from(body)))
246 .unwrap(),
247 _ => Response::builder()
248 .status(StatusCode::NOT_FOUND)
249 .body(Full::new(Bytes::from("Not Found")))
250 .unwrap(),
251 }
252 }
253 .instrument(span)
254 .await
255 }) as Pin<Box<dyn Future<Output = Response<Full<Bytes>>> + Send>>
256 });
257
258 self.fallback = Some(handler);
259 self
260 }
261
262 pub async fn run(self, resources: R) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
264 let addr_str = self.addr.as_deref().unwrap_or("127.0.0.1:3000");
265 let addr: SocketAddr = addr_str.parse()?;
266
267 let routes = Arc::new(self.routes);
268 let fallback = self.fallback;
269 let resources = Arc::new(resources);
270
271 let listener = TcpListener::bind(addr).await?;
272 tracing::info!("Ranvier HTTP Ingress listening on http://{}", addr);
273
274 loop {
275 let (stream, _) = listener.accept().await?;
276 let io = TokioIo::new(stream);
277
278 let routes = routes.clone();
279 let fallback = fallback.clone();
280 let resources = resources.clone();
281
282 tokio::task::spawn(async move {
283 let resources = resources.clone();
284 let service = service_fn(move |req: Request<Incoming>| {
285 let routes = routes.clone();
286 let fallback = fallback.clone();
287 let resources = resources.clone();
288
289 async move {
290 let method = req.method().clone();
291 let path = req.uri().path().to_string();
292
293 if let Some(handler) = routes.get(&(method.clone(), path.clone())) {
295 Ok::<_, Infallible>(handler(req, &resources).await)
296 } else if let Some(ref fb) = fallback {
297 Ok(fb(req, &resources).await)
298 } else {
299 Ok(Response::builder()
301 .status(StatusCode::NOT_FOUND)
302 .body(Full::new(Bytes::from("Not Found")))
303 .unwrap())
304 }
305 }
306 });
307
308 if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
309 tracing::error!("Error serving connection: {:?}", err);
310 }
311 });
312 }
313 }
314
315 pub fn into_raw_service(self, resources: R) -> RawIngressService<R> {
331 let routes = Arc::new(self.routes);
332 let fallback = self.fallback;
333 let resources = Arc::new(resources);
334
335 RawIngressService {
336 routes,
337 fallback,
338 resources,
339 }
340 }
341}
342
343impl<R> Default for HttpIngress<R>
344where
345 R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
346{
347 fn default() -> Self {
348 Self::new()
349 }
350}
351
352#[derive(Clone)]
354pub struct RawIngressService<R> {
355 routes: Arc<HashMap<(Method, String), RouteHandler<R>>>,
356 fallback: Option<RouteHandler<R>>,
357 resources: Arc<R>,
358}
359
360impl<R> Service<Request<Incoming>> for RawIngressService<R>
361where
362 R: ranvier_core::transition::ResourceRequirement + Clone + Send + Sync + 'static,
363{
364 type Response = Response<Full<Bytes>>;
365 type Error = Infallible;
366 type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
367
368 fn poll_ready(
369 &mut self,
370 _cx: &mut std::task::Context<'_>,
371 ) -> std::task::Poll<Result<(), Self::Error>> {
372 std::task::Poll::Ready(Ok(()))
373 }
374
375 fn call(&mut self, req: Request<Incoming>) -> Self::Future {
376 let routes = self.routes.clone();
377 let fallback = self.fallback.clone();
378 let resources = self.resources.clone();
379
380 Box::pin(async move {
381 let method = req.method().clone();
382 let path = req.uri().path().to_string();
383
384 if let Some(handler) = routes.get(&(method, path)) {
385 Ok(handler(req, &resources).await)
386 } else if let Some(ref fb) = fallback {
387 Ok(fb(req, &resources).await)
388 } else {
389 Ok(Response::builder()
390 .status(StatusCode::NOT_FOUND)
391 .body(Full::new(Bytes::from("Not Found")))
392 .unwrap())
393 }
394 })
395 }
396}