Skip to main content

allora_http/
http_inbound_adapter.rs

1//! HTTP Inbound Adapter: bridges incoming HTTP requests into Allora `Exchange`s.
2//! (see module docs above for overview; trimmed here for brevity)
3//!
4//! # Overview
5//! Translates inbound HTTP requests into `Exchange`s with a `Message` payload, then enqueues them
6//! on a configured channel. Optional endpoints (method/path registrations) can be attached for
7//! direct processing or routing augmentation.
8//!
9//! # Configuration (Builder)
10//! * `id` (optional) – stable identifier; auto-generated from socket if omitted.
11//! * `host` / `port` – listening address.
12//! * `base_path` – URL path prefix for registrations (e.g. `/api`).
13//! * `channel` – target ChannelRef (mandatory).
14//! * `mep` – message exchange pattern (request/reply vs fire-and-forget).
15//! * `register` / `register_any` – attach endpoints for specific method/path pairs.
16//!
17//! # Message Exchange Patterns
18//! * `InOut` – awaits downstream processing and echoes (or uses first endpoint mutation) in the HTTP response.
19//! * `InOnly202` – returns HTTP 202 immediately; processing continues asynchronously.
20//!
21//! Notes:
22//! * Async-only (Tokio) – no sync variant.
23//! * HTTP adapter always available (no feature flags).
24//! * Correlation header ensured automatically.
25//!
26//! # Example (Builder)
27//! ```rust
28//! use allora_core::adapter::Adapter;
29//! use allora_core::channel::QueueChannel;
30//! use allora_http::adapter_dsl::InboundHttpExt; // bring .http() into scope
31//! let channel = QueueChannel::with_id("http-pipe");
32//! let adapter = Adapter::inbound()
33//!     .http()
34//!     .host("127.0.0.1")
35//!     .port(0)
36//!     .channel(std::sync::Arc::new(channel))
37//!     .in_only_202()
38//!     .build();
39//! use allora_http::http_inbound_adapter::Mep;
40//! assert_eq!(adapter.mep(), Mep::InOnly202);
41//! ```
42
43use allora_core::adapter::{ensure_correlation, BaseAdapter, InboundAdapter};
44use allora_core::channel::{ChannelRef, QueueChannel};
45// for downcast when polling reply_channel
46use allora_core::endpoint::{EndpointSource, InMemoryEndpoint};
47use allora_core::error::Result;
48use allora_core::{Exchange, Message, Payload};
49use async_trait::async_trait;
50use hyper::service::{make_service_fn, service_fn};
51use hyper::{Body, Request, Response, Server, Version};
52use std::collections::HashMap;
53use std::net::SocketAddr;
54use std::pin::Pin;
55use std::sync::{Arc, Mutex, Weak};
56use std::task::{Context, Poll};
57use tracing::{debug, error, info, trace};
58
59// Reply to polling configuration (seconds to wait for a reply-channel before falling back, and interval between polls)
60const REPLY_TIMEOUT_SECS: u64 = 3;
61const REPLY_POLL_INTERVAL_MILLIS: u64 = 50;
62
63/// Message Exchange Pattern for HTTP inbound.
64#[derive(Clone, Copy, Debug, PartialEq, Eq)]
65pub enum Mep {
66    /// Request/Reply: wait for the route to complete and return its response body (legacy behavior).
67    InOut,
68    /// Fire-and-forget: return 202 immediately and dispatch the Exchange in the background.
69    InOnly202,
70}
71impl Default for Mep {
72    fn default() -> Self {
73        Mep::InOut
74    }
75}
76
77#[derive(Clone, Debug)]
78pub struct HttpInboundAdapter {
79    id: String,
80    addr: SocketAddr,
81    base_path: String,
82    channel: ChannelRef,
83    mep: Mep,
84    reply_channel: Option<ChannelRef>,
85    routes: Arc<Mutex<HashMap<(String, String), Vec<Weak<InMemoryEndpoint>>>>>,
86}
87
88pub struct HttpServerHandle {
89    join: tokio::task::JoinHandle<Result<()>>,
90}
91
92impl HttpServerHandle {
93    pub async fn wait(self) -> Result<()> {
94        self.join
95            .await
96            .unwrap_or_else(|e| Err(allora_core::error::Error::other(e.to_string())))
97    }
98    pub fn abort(self) {
99        self.join.abort();
100    }
101}
102
103impl std::future::Future for HttpServerHandle {
104    type Output = Result<()>;
105    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
106        let inner = unsafe { self.map_unchecked_mut(|s| &mut s.join) };
107        match inner.poll(cx) {
108            Poll::Ready(r) => Poll::Ready(
109                r.unwrap_or_else(|e| Err(allora_core::error::Error::other(e.to_string()))),
110            ),
111            Poll::Pending => Poll::Pending,
112        }
113    }
114}
115
116impl HttpInboundAdapter {
117    pub fn id(&self) -> &str {
118        &self.id
119    }
120    pub fn addr(&self) -> SocketAddr {
121        self.addr
122    }
123    pub fn base_path(&self) -> &str {
124        &self.base_path
125    }
126    pub fn mep(&self) -> Mep {
127        self.mep
128    }
129    /// Direct constructor bypassing the staged DSL builder.
130    /// Use this when you want to instantiate the adapter programmatically without chaining
131    /// `Adapter::inbound().http()...` calls. `id` is optional; if `None`, an id of the form
132    /// `http-inbound:<host>:<port>` is synthesized (matching the builder's auto pattern).
133    pub fn new(
134        host: impl Into<String>,
135        port: u16,
136        base_path: impl Into<String>,
137        channel: ChannelRef,
138        reply_channel: Option<ChannelRef>,
139        mep: Mep,
140        id: Option<String>,
141    ) -> Self {
142        let host_str = host.into();
143        let addr: SocketAddr = format!("{}:{}", host_str, port)
144            .parse()
145            .expect("invalid socket addr");
146        let base = {
147            let b = base_path.into();
148            if b.is_empty() {
149                "/".to_string()
150            } else {
151                b
152            }
153        };
154        let id_final = id.unwrap_or_else(|| format!("http-inbound:{}", addr));
155        trace!(adapter.id=%id_final, host=%host_str, port=%port, base_path=%base, mep=?mep, "constructing HttpInboundAdapter (direct)");
156        HttpInboundAdapter {
157            id: id_final,
158            addr,
159            base_path: base,
160            channel,
161            mep,
162            reply_channel,
163            routes: Arc::new(Mutex::new(HashMap::new())),
164        }
165    }
166    /// Convenience: construct InOut adapter (request/reply pattern).
167    pub fn new_in_out(
168        host: impl Into<String>,
169        port: u16,
170        base_path: impl Into<String>,
171        channel: ChannelRef,
172        reply_channel: Option<ChannelRef>,
173        id: Option<String>,
174    ) -> Self {
175        Self::new(
176            host,
177            port,
178            base_path,
179            channel,
180            reply_channel,
181            Mep::InOut,
182            id,
183        )
184    }
185    /// Convenience: construct InOnly202 adapter (fire-and-forget pattern).
186    pub fn new_in_only_202(
187        host: impl Into<String>,
188        port: u16,
189        base_path: impl Into<String>,
190        channel: ChannelRef,
191        id: Option<String>,
192    ) -> Self {
193        Self::new(host, port, base_path, channel, None, Mep::InOnly202, id)
194    }
195    // Builder entry via staged pattern: Adapter::inbound().http() returns builder
196}
197
198pub struct HttpInboundBuilder {
199    id: Option<String>,
200    host: String,
201    port: u16,
202    base_path: String,
203    channel: Option<ChannelRef>,
204    mep: Mep,
205    reply_channel: Option<ChannelRef>,
206    registrations: Vec<(String, String, Arc<InMemoryEndpoint>)>,
207}
208impl HttpInboundBuilder {
209    pub(crate) fn new() -> Self {
210        Self {
211            id: None,
212            host: String::new(),
213            port: 0,
214            base_path: String::new(),
215            channel: None,
216            mep: Mep::InOut,
217            reply_channel: None,
218            registrations: Vec::new(),
219        }
220    }
221    /// Register an endpoint for a specific HTTP method and path. Path relative to base_path.
222    pub fn register(mut self, method: &str, path: &str, endpoint: Arc<InMemoryEndpoint>) -> Self {
223        let norm = if path.starts_with('/') {
224            path.to_string()
225        } else {
226            format!("/{}", path)
227        };
228        self.registrations
229            .push((method.to_ascii_uppercase(), norm, endpoint));
230        self
231    }
232    /// Convenience: register endpoint for all methods (stored as ANY wildcard).
233    pub fn register_any(self, path: &str, endpoint: Arc<InMemoryEndpoint>) -> Self {
234        self.register("ANY", path, endpoint)
235    }
236    pub fn id(mut self, id: impl Into<String>) -> Self {
237        self.id = Some(id.into());
238        self
239    }
240    pub fn host(mut self, host: impl Into<String>) -> Self {
241        self.host = host.into();
242        self
243    }
244    pub fn port(mut self, port: u16) -> Self {
245        self.port = port;
246        self
247    }
248    pub fn base_path(mut self, path: impl Into<String>) -> Self {
249        self.base_path = path.into();
250        self
251    }
252    pub fn channel(mut self, ch: ChannelRef) -> Self {
253        self.channel = Some(ch);
254        self
255    }
256    pub fn reply_channel(mut self, ch: ChannelRef) -> Self {
257        self.reply_channel = Some(ch);
258        self
259    }
260
261    /// Set the message exchange pattern. Default is `InOut`.
262    pub fn mep(mut self, mep: Mep) -> Self {
263        self.mep = mep;
264        self
265    }
266
267    /// Convenience: respond 202 immediately and dispatch in background.
268    pub fn in_only_202(self) -> Self {
269        self.mep(Mep::InOnly202)
270    }
271
272    pub fn build(self) -> HttpInboundAdapter {
273        let addr: SocketAddr = format!("{}:{}", self.host, self.port)
274            .parse()
275            .expect("invalid socket addr");
276        let id = self.id.unwrap_or_else(|| format!("http-inbound:{}", addr));
277        let base_path = if self.base_path.is_empty() {
278            "/".to_string()
279        } else {
280            self.base_path
281        };
282        let channel = self
283            .channel
284            .expect("channel must be set on HttpInboundBuilder before build()");
285        let effective_mep = if self.reply_channel.is_some() {
286            Mep::InOut
287        } else {
288            self.mep
289        };
290        let adapter = HttpInboundAdapter {
291            id: id.clone(),
292            addr,
293            base_path: base_path.clone(),
294            channel,
295            mep: effective_mep,
296            reply_channel: self.reply_channel.clone(),
297            routes: Arc::new(Mutex::new(HashMap::new())),
298        };
299        info!(adapter.id=%adapter.id, addr=%adapter.addr, base_path=%adapter.base_path, mep=?adapter.mep, reply_channel=adapter.reply_channel.is_some(), "HttpInboundAdapter built via builder");
300        for (method, path, ep) in self.registrations.into_iter() {
301            adapter.register_endpoint(&method, &path, Arc::downgrade(&ep));
302        }
303        adapter
304    }
305}
306
307impl BaseAdapter for HttpInboundAdapter {
308    fn id(&self) -> &str {
309        &self.id
310    }
311}
312
313#[async_trait]
314impl InboundAdapter for HttpInboundAdapter {
315    async fn run(&self) -> Result<()> {
316        self.serve().await
317    }
318}
319
320fn normalize_path<'a>(base: &'a str, full: &'a str) -> &'a str {
321    if base == "/" {
322        return full;
323    }
324    match full.strip_prefix(base) {
325        Some(p) if p.is_empty() => "/",
326        Some(p) => {
327            if p.starts_with('/') {
328                p
329            } else {
330                "/"
331            }
332        }
333        None => full,
334    }
335}
336
337fn http_version_str(v: Version) -> &'static str {
338    match v {
339        Version::HTTP_09 => "0.9",
340        Version::HTTP_10 => "1.0",
341        Version::HTTP_11 => "1.1",
342        Version::HTTP_2 => "2.0",
343        Version::HTTP_3 => "3.0",
344        _ => "unknown",
345    }
346}
347
348async fn adapt_request(
349    adapter_id: String,
350    channel: ChannelRef,
351    reply_channel: Option<ChannelRef>,
352    req: Request<Body>,
353    base_path: String,
354    mep: Mep,
355    routes: Arc<Mutex<HashMap<(String, String), Vec<Weak<InMemoryEndpoint>>>>>,
356) -> Result<Response<Body>> {
357    let method = req.method().clone();
358    let path_full = req.uri().path().to_string();
359    let path_norm = normalize_path(&base_path, &path_full).to_string();
360    let query = req.uri().query().unwrap_or("").to_string();
361    let version = http_version_str(req.version()).to_string();
362    // Extract headers before consuming body.
363    let mut content_type = None::<String>;
364    let headers_clone: Vec<(String, String)> = req
365        .headers()
366        .iter()
367        .filter_map(|(name, val)| {
368            val.to_str()
369                .ok()
370                .map(|s| (name.as_str().to_ascii_lowercase(), s.to_string()))
371        })
372        .collect();
373    if let Some(ct) = headers_clone
374        .iter()
375        .find(|(k, _)| k == "content-type")
376        .map(|(_, v)| v.clone())
377    {
378        content_type = Some(ct);
379    }
380    // Consume body afterwards.
381    let body_bytes = hyper::body::to_bytes(req.into_body())
382        .await
383        .map_err(|e| allora_core::error::Error::other(e.to_string()))?;
384
385    // Build Message with raw bytes payload (no lossy conversion).
386    let mut msg = if let Ok(txt) = std::str::from_utf8(&body_bytes) {
387        Message::from_text(txt)
388    } else {
389        Message::new(Payload::Bytes(body_bytes.to_vec()))
390    };
391    msg.set_header("http.method", method.as_str());
392    msg.set_header("http.path", &path_norm);
393    if !query.is_empty() {
394        msg.set_header("http.query", &query);
395    }
396    msg.set_header("http.version", &version);
397    for (k, v) in headers_clone.iter() {
398        let key = format!("http.header.{}", k);
399        msg.set_header(&key, v);
400    }
401    if let Some(ct) = content_type {
402        msg.set_header("http.content_type", &ct);
403    }
404    if let Ok(txt) = std::str::from_utf8(&body_bytes) {
405        msg.set_header("http.body_text", txt);
406    }
407
408    // Prepare Exchange with correlation id.
409    let mut exchange = Exchange::new(msg);
410    ensure_correlation(&mut exchange);
411    debug!(adapter.id=%adapter_id, corr_id=?exchange.in_msg.header("corr_id"), "correlation ensured for inbound exchange");
412    match mep {
413        Mep::InOut => {
414            let key_exact = (method.as_str().to_ascii_uppercase(), path_norm.clone());
415            let key_any = ("ANY".to_string(), path_norm.clone());
416            let mut endpoints: Vec<Weak<InMemoryEndpoint>> = Vec::new();
417            if let Ok(map) = routes.lock() {
418                if let Some(list) = map.get(&key_exact) {
419                    endpoints.extend(list.iter().cloned());
420                }
421                if let Some(list) = map.get(&key_any) {
422                    endpoints.extend(list.iter().cloned());
423                }
424            }
425            if !endpoints.is_empty() {
426                debug!(adapter.id=%adapter_id, endpoints.count=endpoints.len(), path=%path_norm, "matched in-memory endpoints");
427                let mut response_body: Option<String> = None;
428                for weak_ep in endpoints.iter() {
429                    if let Some(ep) = weak_ep.upgrade() {
430                        if let Some(ch_ref) = ep.channel() {
431                            let mut ex_clone = exchange.clone();
432                            EndpointSource::Http {
433                                adapter_id: adapter_id.clone(),
434                                method: method.as_str().to_string(),
435                                path: path_norm.clone(),
436                            }
437                            .apply_headers(&mut ex_clone);
438                            ch_ref.send(ex_clone.clone()).await?;
439                            trace!(adapter.id=%adapter_id, endpoint.channel=%ch_ref.id(), method=%method, path=%path_norm, "dispatched exchange to endpoint channel");
440                            if response_body.is_none() {
441                                response_body = ex_clone.in_msg.body_text().map(|s| s.to_string());
442                            }
443                        }
444                    } else {
445                        trace!(adapter.id=%adapter_id, method=%method, path=%path_norm, "skipping stale endpoint");
446                    }
447                }
448                let body_final = response_body.unwrap_or_else(|| String::new());
449                return Ok(Response::new(Body::from(body_final)));
450            }
451            // Fallback: enqueue on primary channel only, then optionally wait for reply_channel.
452            trace!(adapter.id=%adapter_id, channel.id=?channel.id(), mep=?mep, "no endpoints matched; sending to primary channel");
453            channel.send(exchange.clone()).await?;
454            if let Some(rc) = reply_channel {
455                // Downcast to a pollable queue channel.
456                if let Some(qc) = rc.as_any().downcast_ref::<QueueChannel>() {
457                    use allora_core::PollableChannel;
458                    let start = std::time::Instant::now();
459                    while start.elapsed() < std::time::Duration::from_secs(REPLY_TIMEOUT_SECS) {
460                        if let Some(ex_reply) = qc.try_receive().await {
461                            let body = ex_reply
462                                .out_msg
463                                .as_ref()
464                                .and_then(|m| m.body_text())
465                                .or_else(|| ex_reply.in_msg.body_text())
466                                .unwrap_or("");
467                            return Ok(Response::new(Body::from(body.to_string())));
468                        }
469                        tokio::time::sleep(std::time::Duration::from_millis(
470                            REPLY_POLL_INTERVAL_MILLIS,
471                        ))
472                        .await;
473                    }
474                    trace!(adapter.id=%adapter_id, "reply-channel timeout; returning original inbound body");
475                } else {
476                    trace!(adapter.id=%adapter_id, "reply-channel present but not queue/pollable; skipping reply wait");
477                }
478            }
479            let response_body = exchange
480                .in_msg
481                .body_text()
482                .map(|s| s.to_string())
483                .unwrap_or_else(|| String::from_utf8_lossy(&body_bytes).to_string());
484            Ok(Response::new(Body::from(response_body)))
485        }
486        Mep::InOnly202 => {
487            trace!(adapter.id=%adapter_id, channel.id=?channel.id(), "IN_ONLY_202 mode: spawning background send");
488            // Fire-and-forget: dispatch in the background and ack now.
489            let ch = channel.clone();
490            tokio::spawn(async move {
491                let _ = ch.send(exchange).await;
492            });
493            Ok(Response::builder()
494                .status(202)
495                .body(Body::from("ok"))
496                .unwrap())
497        }
498    }
499}
500
501impl HttpInboundAdapter {
502    pub fn register_endpoint(&self, method: &str, path: &str, ep: Weak<InMemoryEndpoint>) {
503        let key = (method.to_ascii_uppercase(), path.to_string());
504        let mut map = self.routes.lock().unwrap();
505        map.entry(key).or_insert_with(Vec::new).push(ep);
506    }
507    pub fn register_endpoint_any(&self, path: &str, ep: Weak<InMemoryEndpoint>) {
508        for m in [
509            "GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD", "ANY",
510        ] {
511            self.register_endpoint(m, path, ep.clone());
512        }
513    }
514    pub async fn serve(&self) -> Result<()> {
515        let channel = self.channel.clone();
516        let reply_channel = self.reply_channel.clone();
517        let base = self.base_path.clone();
518        let mep = self.mep;
519        let adapter_id = self.id.clone();
520        let routes_arc = self.routes.clone();
521        let make = make_service_fn(move |_conn| {
522            let channel_clone = channel.clone();
523            let base_clone = base.clone();
524            let adapter_id_clone = adapter_id.clone();
525            let routes_ref = routes_arc.clone();
526            let reply_channel_outer = reply_channel.clone();
527            async move {
528                Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
529                    let c = channel_clone.clone();
530                    let b = base_clone.clone();
531                    let r = routes_ref.clone();
532                    let a = adapter_id_clone.clone();
533                    let rc = reply_channel_outer.clone();
534                    async move {
535                        match adapt_request(a, c, rc, req, b, mep, r).await {
536                            Ok(resp) => Ok::<_, hyper::Error>(resp),
537                            Err(e) => {
538                                error!(error=%e, "request handling failed");
539                                Ok(Response::builder()
540                                    .status(500)
541                                    .body(Body::from("internal error"))
542                                    .unwrap())
543                            }
544                        }
545                    }
546                }))
547            }
548        });
549        info!(address=%self.addr, mep=?self.mep, "starting HTTP inbound adapter (continuous)");
550        Server::bind(&self.addr)
551            .serve(make)
552            .await
553            .map_err(|e| allora_core::error::Error::other(e.to_string()))?;
554        Ok(())
555    }
556    pub async fn run_once(self) -> Result<()> {
557        let (tx, rx) = tokio::sync::oneshot::channel::<()>();
558        let channel = self.channel.clone();
559        let reply_channel = self.reply_channel.clone();
560        let base = self.base_path.clone();
561        let mep = self.mep;
562        let adapter_id = self.id.clone();
563        let routes_arc = self.routes.clone();
564        let shutdown_flag = Arc::new(Mutex::new(Some(tx)));
565        let make = make_service_fn(move |_conn| {
566            let channel_clone = channel.clone();
567            let base_clone = base.clone();
568            let adapter_id_clone = adapter_id.clone();
569            let routes_ref = routes_arc.clone();
570            let reply_channel_outer = reply_channel.clone();
571            let shutdown_inner = shutdown_flag.clone();
572            async move {
573                Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
574                    let c = channel_clone.clone();
575                    let b = base_clone.clone();
576                    let r = routes_ref.clone();
577                    let a = adapter_id_clone.clone();
578                    let rc = reply_channel_outer.clone();
579                    let shutdown_local = shutdown_inner.clone();
580                    async move {
581                        let result = adapt_request(a, c, rc, req, b, mep, r).await;
582                        if let Some(sender) = shutdown_local.lock().unwrap().take() {
583                            let _ = sender.send(());
584                        }
585                        match result {
586                            Ok(resp) => Ok::<_, hyper::Error>(resp),
587                            Err(e) => {
588                                error!(error=%e, "request handling failed (run_once)");
589                                Ok(Response::builder()
590                                    .status(500)
591                                    .body(Body::from("internal error"))
592                                    .unwrap())
593                            }
594                        }
595                    }
596                }))
597            }
598        });
599        info!(address=%self.addr, mep=?self.mep, "starting HTTP inbound adapter (single request)");
600        Server::bind(&self.addr)
601            .serve(make)
602            .with_graceful_shutdown(async {
603                let _ = rx.await;
604            })
605            .await
606            .map_err(|e| allora_core::error::Error::other(e.to_string()))?;
607        Ok(())
608    }
609    pub fn spawn_once(self) -> HttpServerHandle {
610        HttpServerHandle {
611            join: tokio::spawn(async move { self.run_once().await }),
612        }
613    }
614    pub fn spawn_serve(self) -> HttpServerHandle {
615        HttpServerHandle {
616            join: tokio::spawn(async move { self.serve().await }),
617        }
618    }
619}