1use allora_core::adapter::{ensure_correlation, BaseAdapter, InboundAdapter};
44use allora_core::channel::{ChannelRef, QueueChannel};
45use allora_core::endpoint::{EndpointSource, InMemoryEndpoint};
47use allora_core::error::Result;
48use allora_core::{Exchange, Message, Payload};
49use async_trait::async_trait;
50use hyper::service::{make_service_fn, service_fn};
51use hyper::{Body, Request, Response, Server, Version};
52use std::collections::HashMap;
53use std::net::SocketAddr;
54use std::pin::Pin;
55use std::sync::{Arc, Mutex, Weak};
56use std::task::{Context, Poll};
57use tracing::{debug, error, info, trace};
58
59const REPLY_TIMEOUT_SECS: u64 = 3;
61const REPLY_POLL_INTERVAL_MILLIS: u64 = 50;
62
63#[derive(Clone, Copy, Debug, PartialEq, Eq)]
65pub enum Mep {
66 InOut,
68 InOnly202,
70}
71impl Default for Mep {
72 fn default() -> Self {
73 Mep::InOut
74 }
75}
76
77#[derive(Clone, Debug)]
78pub struct HttpInboundAdapter {
79 id: String,
80 addr: SocketAddr,
81 base_path: String,
82 channel: ChannelRef,
83 mep: Mep,
84 reply_channel: Option<ChannelRef>,
85 routes: Arc<Mutex<HashMap<(String, String), Vec<Weak<InMemoryEndpoint>>>>>,
86}
87
88pub struct HttpServerHandle {
89 join: tokio::task::JoinHandle<Result<()>>,
90}
91
92impl HttpServerHandle {
93 pub async fn wait(self) -> Result<()> {
94 self.join
95 .await
96 .unwrap_or_else(|e| Err(allora_core::error::Error::other(e.to_string())))
97 }
98 pub fn abort(self) {
99 self.join.abort();
100 }
101}
102
103impl std::future::Future for HttpServerHandle {
104 type Output = Result<()>;
105 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106 let inner = unsafe { self.map_unchecked_mut(|s| &mut s.join) };
107 match inner.poll(cx) {
108 Poll::Ready(r) => Poll::Ready(
109 r.unwrap_or_else(|e| Err(allora_core::error::Error::other(e.to_string()))),
110 ),
111 Poll::Pending => Poll::Pending,
112 }
113 }
114}
115
116impl HttpInboundAdapter {
117 pub fn id(&self) -> &str {
118 &self.id
119 }
120 pub fn addr(&self) -> SocketAddr {
121 self.addr
122 }
123 pub fn base_path(&self) -> &str {
124 &self.base_path
125 }
126 pub fn mep(&self) -> Mep {
127 self.mep
128 }
129 pub fn new(
134 host: impl Into<String>,
135 port: u16,
136 base_path: impl Into<String>,
137 channel: ChannelRef,
138 reply_channel: Option<ChannelRef>,
139 mep: Mep,
140 id: Option<String>,
141 ) -> Self {
142 let host_str = host.into();
143 let addr: SocketAddr = format!("{}:{}", host_str, port)
144 .parse()
145 .expect("invalid socket addr");
146 let base = {
147 let b = base_path.into();
148 if b.is_empty() {
149 "/".to_string()
150 } else {
151 b
152 }
153 };
154 let id_final = id.unwrap_or_else(|| format!("http-inbound:{}", addr));
155 trace!(adapter.id=%id_final, host=%host_str, port=%port, base_path=%base, mep=?mep, "constructing HttpInboundAdapter (direct)");
156 HttpInboundAdapter {
157 id: id_final,
158 addr,
159 base_path: base,
160 channel,
161 mep,
162 reply_channel,
163 routes: Arc::new(Mutex::new(HashMap::new())),
164 }
165 }
166 pub fn new_in_out(
168 host: impl Into<String>,
169 port: u16,
170 base_path: impl Into<String>,
171 channel: ChannelRef,
172 reply_channel: Option<ChannelRef>,
173 id: Option<String>,
174 ) -> Self {
175 Self::new(
176 host,
177 port,
178 base_path,
179 channel,
180 reply_channel,
181 Mep::InOut,
182 id,
183 )
184 }
185 pub fn new_in_only_202(
187 host: impl Into<String>,
188 port: u16,
189 base_path: impl Into<String>,
190 channel: ChannelRef,
191 id: Option<String>,
192 ) -> Self {
193 Self::new(host, port, base_path, channel, None, Mep::InOnly202, id)
194 }
195 }
197
198pub struct HttpInboundBuilder {
199 id: Option<String>,
200 host: String,
201 port: u16,
202 base_path: String,
203 channel: Option<ChannelRef>,
204 mep: Mep,
205 reply_channel: Option<ChannelRef>,
206 registrations: Vec<(String, String, Arc<InMemoryEndpoint>)>,
207}
208impl HttpInboundBuilder {
209 pub(crate) fn new() -> Self {
210 Self {
211 id: None,
212 host: String::new(),
213 port: 0,
214 base_path: String::new(),
215 channel: None,
216 mep: Mep::InOut,
217 reply_channel: None,
218 registrations: Vec::new(),
219 }
220 }
221 pub fn register(mut self, method: &str, path: &str, endpoint: Arc<InMemoryEndpoint>) -> Self {
223 let norm = if path.starts_with('/') {
224 path.to_string()
225 } else {
226 format!("/{}", path)
227 };
228 self.registrations
229 .push((method.to_ascii_uppercase(), norm, endpoint));
230 self
231 }
232 pub fn register_any(self, path: &str, endpoint: Arc<InMemoryEndpoint>) -> Self {
234 self.register("ANY", path, endpoint)
235 }
236 pub fn id(mut self, id: impl Into<String>) -> Self {
237 self.id = Some(id.into());
238 self
239 }
240 pub fn host(mut self, host: impl Into<String>) -> Self {
241 self.host = host.into();
242 self
243 }
244 pub fn port(mut self, port: u16) -> Self {
245 self.port = port;
246 self
247 }
248 pub fn base_path(mut self, path: impl Into<String>) -> Self {
249 self.base_path = path.into();
250 self
251 }
252 pub fn channel(mut self, ch: ChannelRef) -> Self {
253 self.channel = Some(ch);
254 self
255 }
256 pub fn reply_channel(mut self, ch: ChannelRef) -> Self {
257 self.reply_channel = Some(ch);
258 self
259 }
260
261 pub fn mep(mut self, mep: Mep) -> Self {
263 self.mep = mep;
264 self
265 }
266
267 pub fn in_only_202(self) -> Self {
269 self.mep(Mep::InOnly202)
270 }
271
272 pub fn build(self) -> HttpInboundAdapter {
273 let addr: SocketAddr = format!("{}:{}", self.host, self.port)
274 .parse()
275 .expect("invalid socket addr");
276 let id = self.id.unwrap_or_else(|| format!("http-inbound:{}", addr));
277 let base_path = if self.base_path.is_empty() {
278 "/".to_string()
279 } else {
280 self.base_path
281 };
282 let channel = self
283 .channel
284 .expect("channel must be set on HttpInboundBuilder before build()");
285 let effective_mep = if self.reply_channel.is_some() {
286 Mep::InOut
287 } else {
288 self.mep
289 };
290 let adapter = HttpInboundAdapter {
291 id: id.clone(),
292 addr,
293 base_path: base_path.clone(),
294 channel,
295 mep: effective_mep,
296 reply_channel: self.reply_channel.clone(),
297 routes: Arc::new(Mutex::new(HashMap::new())),
298 };
299 info!(adapter.id=%adapter.id, addr=%adapter.addr, base_path=%adapter.base_path, mep=?adapter.mep, reply_channel=adapter.reply_channel.is_some(), "HttpInboundAdapter built via builder");
300 for (method, path, ep) in self.registrations.into_iter() {
301 adapter.register_endpoint(&method, &path, Arc::downgrade(&ep));
302 }
303 adapter
304 }
305}
306
307impl BaseAdapter for HttpInboundAdapter {
308 fn id(&self) -> &str {
309 &self.id
310 }
311}
312
313#[async_trait]
314impl InboundAdapter for HttpInboundAdapter {
315 async fn run(&self) -> Result<()> {
316 self.serve().await
317 }
318}
319
320fn normalize_path<'a>(base: &'a str, full: &'a str) -> &'a str {
321 if base == "/" {
322 return full;
323 }
324 match full.strip_prefix(base) {
325 Some(p) if p.is_empty() => "/",
326 Some(p) => {
327 if p.starts_with('/') {
328 p
329 } else {
330 "/"
331 }
332 }
333 None => full,
334 }
335}
336
337fn http_version_str(v: Version) -> &'static str {
338 match v {
339 Version::HTTP_09 => "0.9",
340 Version::HTTP_10 => "1.0",
341 Version::HTTP_11 => "1.1",
342 Version::HTTP_2 => "2.0",
343 Version::HTTP_3 => "3.0",
344 _ => "unknown",
345 }
346}
347
348async fn adapt_request(
349 adapter_id: String,
350 channel: ChannelRef,
351 reply_channel: Option<ChannelRef>,
352 req: Request<Body>,
353 base_path: String,
354 mep: Mep,
355 routes: Arc<Mutex<HashMap<(String, String), Vec<Weak<InMemoryEndpoint>>>>>,
356) -> Result<Response<Body>> {
357 let method = req.method().clone();
358 let path_full = req.uri().path().to_string();
359 let path_norm = normalize_path(&base_path, &path_full).to_string();
360 let query = req.uri().query().unwrap_or("").to_string();
361 let version = http_version_str(req.version()).to_string();
362 let mut content_type = None::<String>;
364 let headers_clone: Vec<(String, String)> = req
365 .headers()
366 .iter()
367 .filter_map(|(name, val)| {
368 val.to_str()
369 .ok()
370 .map(|s| (name.as_str().to_ascii_lowercase(), s.to_string()))
371 })
372 .collect();
373 if let Some(ct) = headers_clone
374 .iter()
375 .find(|(k, _)| k == "content-type")
376 .map(|(_, v)| v.clone())
377 {
378 content_type = Some(ct);
379 }
380 let body_bytes = hyper::body::to_bytes(req.into_body())
382 .await
383 .map_err(|e| allora_core::error::Error::other(e.to_string()))?;
384
385 let mut msg = if let Ok(txt) = std::str::from_utf8(&body_bytes) {
387 Message::from_text(txt)
388 } else {
389 Message::new(Payload::Bytes(body_bytes.to_vec()))
390 };
391 msg.set_header("http.method", method.as_str());
392 msg.set_header("http.path", &path_norm);
393 if !query.is_empty() {
394 msg.set_header("http.query", &query);
395 }
396 msg.set_header("http.version", &version);
397 for (k, v) in headers_clone.iter() {
398 let key = format!("http.header.{}", k);
399 msg.set_header(&key, v);
400 }
401 if let Some(ct) = content_type {
402 msg.set_header("http.content_type", &ct);
403 }
404 if let Ok(txt) = std::str::from_utf8(&body_bytes) {
405 msg.set_header("http.body_text", txt);
406 }
407
408 let mut exchange = Exchange::new(msg);
410 ensure_correlation(&mut exchange);
411 debug!(adapter.id=%adapter_id, corr_id=?exchange.in_msg.header("corr_id"), "correlation ensured for inbound exchange");
412 match mep {
413 Mep::InOut => {
414 let key_exact = (method.as_str().to_ascii_uppercase(), path_norm.clone());
415 let key_any = ("ANY".to_string(), path_norm.clone());
416 let mut endpoints: Vec<Weak<InMemoryEndpoint>> = Vec::new();
417 if let Ok(map) = routes.lock() {
418 if let Some(list) = map.get(&key_exact) {
419 endpoints.extend(list.iter().cloned());
420 }
421 if let Some(list) = map.get(&key_any) {
422 endpoints.extend(list.iter().cloned());
423 }
424 }
425 if !endpoints.is_empty() {
426 debug!(adapter.id=%adapter_id, endpoints.count=endpoints.len(), path=%path_norm, "matched in-memory endpoints");
427 let mut response_body: Option<String> = None;
428 for weak_ep in endpoints.iter() {
429 if let Some(ep) = weak_ep.upgrade() {
430 if let Some(ch_ref) = ep.channel() {
431 let mut ex_clone = exchange.clone();
432 EndpointSource::Http {
433 adapter_id: adapter_id.clone(),
434 method: method.as_str().to_string(),
435 path: path_norm.clone(),
436 }
437 .apply_headers(&mut ex_clone);
438 ch_ref.send(ex_clone.clone()).await?;
439 trace!(adapter.id=%adapter_id, endpoint.channel=%ch_ref.id(), method=%method, path=%path_norm, "dispatched exchange to endpoint channel");
440 if response_body.is_none() {
441 response_body = ex_clone.in_msg.body_text().map(|s| s.to_string());
442 }
443 }
444 } else {
445 trace!(adapter.id=%adapter_id, method=%method, path=%path_norm, "skipping stale endpoint");
446 }
447 }
448 let body_final = response_body.unwrap_or_else(|| String::new());
449 return Ok(Response::new(Body::from(body_final)));
450 }
451 trace!(adapter.id=%adapter_id, channel.id=?channel.id(), mep=?mep, "no endpoints matched; sending to primary channel");
453 channel.send(exchange.clone()).await?;
454 if let Some(rc) = reply_channel {
455 if let Some(qc) = rc.as_any().downcast_ref::<QueueChannel>() {
457 use allora_core::PollableChannel;
458 let start = std::time::Instant::now();
459 while start.elapsed() < std::time::Duration::from_secs(REPLY_TIMEOUT_SECS) {
460 if let Some(ex_reply) = qc.try_receive().await {
461 let body = ex_reply
462 .out_msg
463 .as_ref()
464 .and_then(|m| m.body_text())
465 .or_else(|| ex_reply.in_msg.body_text())
466 .unwrap_or("");
467 return Ok(Response::new(Body::from(body.to_string())));
468 }
469 tokio::time::sleep(std::time::Duration::from_millis(
470 REPLY_POLL_INTERVAL_MILLIS,
471 ))
472 .await;
473 }
474 trace!(adapter.id=%adapter_id, "reply-channel timeout; returning original inbound body");
475 } else {
476 trace!(adapter.id=%adapter_id, "reply-channel present but not queue/pollable; skipping reply wait");
477 }
478 }
479 let response_body = exchange
480 .in_msg
481 .body_text()
482 .map(|s| s.to_string())
483 .unwrap_or_else(|| String::from_utf8_lossy(&body_bytes).to_string());
484 Ok(Response::new(Body::from(response_body)))
485 }
486 Mep::InOnly202 => {
487 trace!(adapter.id=%adapter_id, channel.id=?channel.id(), "IN_ONLY_202 mode: spawning background send");
488 let ch = channel.clone();
490 tokio::spawn(async move {
491 let _ = ch.send(exchange).await;
492 });
493 Ok(Response::builder()
494 .status(202)
495 .body(Body::from("ok"))
496 .unwrap())
497 }
498 }
499}
500
501impl HttpInboundAdapter {
502 pub fn register_endpoint(&self, method: &str, path: &str, ep: Weak<InMemoryEndpoint>) {
503 let key = (method.to_ascii_uppercase(), path.to_string());
504 let mut map = self.routes.lock().unwrap();
505 map.entry(key).or_insert_with(Vec::new).push(ep);
506 }
507 pub fn register_endpoint_any(&self, path: &str, ep: Weak<InMemoryEndpoint>) {
508 for m in [
509 "GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "ANY",
510 ] {
511 self.register_endpoint(m, path, ep.clone());
512 }
513 }
514 pub async fn serve(&self) -> Result<()> {
515 let channel = self.channel.clone();
516 let reply_channel = self.reply_channel.clone();
517 let base = self.base_path.clone();
518 let mep = self.mep;
519 let adapter_id = self.id.clone();
520 let routes_arc = self.routes.clone();
521 let make = make_service_fn(move |_conn| {
522 let channel_clone = channel.clone();
523 let base_clone = base.clone();
524 let adapter_id_clone = adapter_id.clone();
525 let routes_ref = routes_arc.clone();
526 let reply_channel_outer = reply_channel.clone();
527 async move {
528 Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
529 let c = channel_clone.clone();
530 let b = base_clone.clone();
531 let r = routes_ref.clone();
532 let a = adapter_id_clone.clone();
533 let rc = reply_channel_outer.clone();
534 async move {
535 match adapt_request(a, c, rc, req, b, mep, r).await {
536 Ok(resp) => Ok::<_, hyper::Error>(resp),
537 Err(e) => {
538 error!(error=%e, "request handling failed");
539 Ok(Response::builder()
540 .status(500)
541 .body(Body::from("internal error"))
542 .unwrap())
543 }
544 }
545 }
546 }))
547 }
548 });
549 info!(address=%self.addr, mep=?self.mep, "starting HTTP inbound adapter (continuous)");
550 Server::bind(&self.addr)
551 .serve(make)
552 .await
553 .map_err(|e| allora_core::error::Error::other(e.to_string()))?;
554 Ok(())
555 }
556 pub async fn run_once(self) -> Result<()> {
557 let (tx, rx) = tokio::sync::oneshot::channel::<()>();
558 let channel = self.channel.clone();
559 let reply_channel = self.reply_channel.clone();
560 let base = self.base_path.clone();
561 let mep = self.mep;
562 let adapter_id = self.id.clone();
563 let routes_arc = self.routes.clone();
564 let shutdown_flag = Arc::new(Mutex::new(Some(tx)));
565 let make = make_service_fn(move |_conn| {
566 let channel_clone = channel.clone();
567 let base_clone = base.clone();
568 let adapter_id_clone = adapter_id.clone();
569 let routes_ref = routes_arc.clone();
570 let reply_channel_outer = reply_channel.clone();
571 let shutdown_inner = shutdown_flag.clone();
572 async move {
573 Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
574 let c = channel_clone.clone();
575 let b = base_clone.clone();
576 let r = routes_ref.clone();
577 let a = adapter_id_clone.clone();
578 let rc = reply_channel_outer.clone();
579 let shutdown_local = shutdown_inner.clone();
580 async move {
581 let result = adapt_request(a, c, rc, req, b, mep, r).await;
582 if let Some(sender) = shutdown_local.lock().unwrap().take() {
583 let _ = sender.send(());
584 }
585 match result {
586 Ok(resp) => Ok::<_, hyper::Error>(resp),
587 Err(e) => {
588 error!(error=%e, "request handling failed (run_once)");
589 Ok(Response::builder()
590 .status(500)
591 .body(Body::from("internal error"))
592 .unwrap())
593 }
594 }
595 }
596 }))
597 }
598 });
599 info!(address=%self.addr, mep=?self.mep, "starting HTTP inbound adapter (single request)");
600 Server::bind(&self.addr)
601 .serve(make)
602 .with_graceful_shutdown(async {
603 let _ = rx.await;
604 })
605 .await
606 .map_err(|e| allora_core::error::Error::other(e.to_string()))?;
607 Ok(())
608 }
609 pub fn spawn_once(self) -> HttpServerHandle {
610 HttpServerHandle {
611 join: tokio::spawn(async move { self.run_once().await }),
612 }
613 }
614 pub fn spawn_serve(self) -> HttpServerHandle {
615 HttpServerHandle {
616 join: tokio::spawn(async move { self.serve().await }),
617 }
618 }
619}