allora_http/
http_outbound_adapter.rs

1//! HTTP Outbound Adapter: dispatches an `Exchange` to an external HTTP endpoint.
2//!
3//! # Overview
4//! `HttpOutboundAdapter` implements [`OutboundAdapter`], taking the (optionally transformed)
5//! message from an `Exchange` and POSTing (default) it to a configured remote endpoint.
6//! A builder is provided for ergonomic configuration.
7//!
8//! # Selection of Message
9//! By default the adapter prefers `exchange.out_msg` (if present) falling back to `exchange.in_msg`.
10//! This can be inverted via `use_out_msg(false)` to always send the inbound message.
11//!
12//! # Builder Fields
13//! * id (optional) – stable identifier; auto-generated UUID if omitted.
14//! * host (required) – remote host or IP.
15//! * port (required) – remote TCP port.
16//! * base_path (optional) – leading path (default "/").
17//! * path (optional) – trailing path segment appended to base_path.
18//! * method (optional) – HTTP method (default POST).
19//! * use_out_msg (optional) – whether to prioritize `out_msg` (default true).
20//!
21//! # Error Semantics
22//! Network / protocol errors map to `Error::other`. Non-success HTTP status still returns
23//! `OutboundDispatchResult` (acknowledged = status.is_success()).
24//!
25//! # Example
26//! ```no_run
27//! use allora_core::{adapter::OutboundAdapter, Exchange, Message};
28//! use allora_http::HttpOutboundAdapter;
29//! # async fn demo() {
30//! let adapter = HttpOutboundAdapter::builder()
31//!     .id("http-public")
32//!     .host("0.0.0.0")
33//!     .port(8080)
34//!     .base_path("/")
35//!     .build().unwrap();
36//! let mut exchange = Exchange::new(Message::from_text("ping"));
37//! #[cfg(feature = "async")]
38//! let _res = adapter.dispatch(&exchange).await.unwrap();
39//! # }
40//! ```
41//!
42//! # Future Extensions
43//! * Header propagation (copy selected message headers to HTTP request).
44//! * Custom serialization strategies (content-type aware).
45//! * Retry / backoff policies & circuit breaking.
46//! * Metrics emission (latency, status codes).
47
48use allora_core::adapter::{BaseAdapter, OutboundAdapter, OutboundDispatchResult};
49use allora_core::{
50    error::{Error, Result},
51    Exchange, Payload,
52};
53use async_trait::async_trait;
54use hyper::client::HttpConnector;
55use hyper::{Body, Client, Method, Request};
56use std::fmt::Debug;
57
58#[derive(Clone)]
59pub struct HttpOutboundAdapter {
60    id: String,
61    host: String,
62    port: u16,
63    base_path: String,
64    path: Option<String>,
65    method: Method,
66    use_out_msg: bool,
67    client: Client<HttpConnector>,
68}
69
70impl Debug for HttpOutboundAdapter {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("HttpOutboundAdapter")
73            .field("id", &self.id)
74            .field("host", &self.host)
75            .field("port", &self.port)
76            .field("base_path", &self.base_path)
77            .field("path", &self.path)
78            .field("method", &self.method.as_str())
79            .field("use_out_msg", &self.use_out_msg)
80            .finish()
81    }
82}
83
84impl HttpOutboundAdapter {
85    pub fn builder() -> HttpOutboundAdapterBuilder {
86        HttpOutboundAdapterBuilder::default()
87    }
88    fn full_url(&self) -> String {
89        let mut base = self.base_path.clone();
90        if !base.starts_with('/') {
91            base.insert(0, '/');
92        }
93        if base.len() > 1 && base.ends_with('/') {
94            base.pop();
95        }
96        let mut full = base;
97        if let Some(p) = &self.path {
98            if !p.is_empty() {
99                if !p.starts_with('/') {
100                    full.push('/');
101                }
102                full.push_str(p);
103            }
104        }
105        format!("http://{}:{}{}", self.host, self.port, full)
106    }
107}
108
109impl BaseAdapter for HttpOutboundAdapter {
110    fn id(&self) -> &str {
111        &self.id
112    }
113}
114
115#[derive(Default)]
116pub struct HttpOutboundAdapterBuilder {
117    id: Option<String>,
118    host: Option<String>,
119    port: Option<u16>,
120    base_path: Option<String>,
121    path: Option<String>,
122    method: Option<Method>,
123    use_out_msg: Option<bool>,
124}
125
126impl HttpOutboundAdapterBuilder {
127    pub fn id(mut self, v: impl Into<String>) -> Self {
128        self.id = Some(v.into());
129        self
130    }
131    pub fn host(mut self, v: impl Into<String>) -> Self {
132        self.host = Some(v.into());
133        self
134    }
135    pub fn port(mut self, v: u16) -> Self {
136        self.port = Some(v);
137        self
138    }
139    pub fn base_path(mut self, v: impl Into<String>) -> Self {
140        self.base_path = Some(v.into());
141        self
142    }
143    pub fn path(mut self, v: impl Into<String>) -> Self {
144        self.path = Some(v.into());
145        self
146    }
147    pub fn method(mut self, m: Method) -> Self {
148        self.method = Some(m);
149        self
150    }
151    pub fn use_out_msg(mut self, flag: bool) -> Self {
152        self.use_out_msg = Some(flag);
153        self
154    }
155    pub fn build(self) -> Result<HttpOutboundAdapter> {
156        let host = self.host.ok_or_else(|| Error::other("host required"))?;
157        let port = self.port.ok_or_else(|| Error::other("port required"))?;
158        let id = self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
159        let base_path = self.base_path.unwrap_or_else(|| "/".to_string());
160        let method = self.method.unwrap_or(Method::POST);
161        Ok(HttpOutboundAdapter {
162            id,
163            host,
164            port,
165            base_path,
166            path: self.path,
167            method,
168            use_out_msg: self.use_out_msg.unwrap_or(true),
169            client: Client::new(),
170        })
171    }
172}
173
174#[async_trait]
175impl OutboundAdapter for HttpOutboundAdapter {
176    async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult> {
177        let msg_ref = if self.use_out_msg {
178            exchange.out_msg.as_ref().unwrap_or(&exchange.in_msg)
179        } else {
180            &exchange.in_msg
181        };
182        let bytes: Vec<u8> = match &msg_ref.payload {
183            Payload::Text(s) => s.clone().into_bytes(),
184            Payload::Bytes(b) => b.clone(),
185            Payload::Json(v) => {
186                serde_json::to_vec(v).map_err(|e| Error::serialization(e.to_string()))?
187            }
188            Payload::Empty => Vec::new(),
189        };
190        let req = Request::builder()
191            .method(self.method.clone())
192            .uri(self.full_url())
193            .header("Content-Type", "application/octet-stream")
194            .body(Body::from(bytes))
195            .map_err(|e| Error::other(e.to_string()))?;
196        let resp = self
197            .client
198            .request(req)
199            .await
200            .map_err(|e| Error::other(e.to_string()))?;
201        let status = resp.status();
202        let body_bytes = hyper::body::to_bytes(resp.into_body())
203            .await
204            .map_err(|e| Error::other(e.to_string()))?;
205        let body_string = String::from_utf8_lossy(&body_bytes).to_string();
206        Ok(OutboundDispatchResult {
207            acknowledged: status.is_success(),
208            message: Some(format!("HTTP {}", status)),
209            status_code: Some(status.as_u16()),
210            body: Some(body_string),
211        })
212    }
213}