Skip to main content

allora_http/
http_outbound_adapter.rs

1//! HTTP Outbound Adapter: dispatches an `Exchange` to an external HTTP(S) endpoint.
2//!
3//! # Overview
4//! `HttpOutboundAdapter` implements [`OutboundAdapter`], taking the (optionally transformed)
5//! message from an `Exchange` and POSTing (default) it to a configured remote endpoint.
6//! A builder is provided for ergonomic configuration.
7//!
8//! As of 0.0.9 the adapter is configured by a single `url` (string parsed into
9//! [`url::Url`]) rather than the previous `host` + `port` + `base-path` triple,
10//! and the underlying HTTP client is [`reqwest::Client`] — which transparently
11//! supports both `http://` and `https://` schemes via `rustls`.
12//!
13//! # Selection of Message
14//! By default the adapter prefers `exchange.out_msg` (if present) falling back to `exchange.in_msg`.
15//! This can be inverted via `use_out_msg(false)` to always send the inbound message.
16//!
17//! # Builder Fields
18//! * id (optional) – stable identifier; auto-generated UUID if omitted.
19//! * url (required) – full target URL including scheme (e.g. `https://api.example.com/submit`).
20//! * method (optional) – HTTP method (default POST).
21//! * use_out_msg (optional) – whether to prioritize `out_msg` (default true).
22//!
23//! # Error Semantics
24//! Network / protocol errors map to `Error::other`. Non-success HTTP status still returns
25//! `OutboundDispatchResult` (acknowledged = status.is_success()).
26//!
27//! # Example
28//! ```no_run
29//! use allora_core::{adapter::OutboundAdapter, Exchange, Message};
30//! use allora_http::HttpOutboundAdapter;
31//! # async fn demo() {
32//! let adapter = HttpOutboundAdapter::builder()
33//!     .id("http-public")
34//!     .url("https://api.example.com/submit")
35//!     .build().unwrap();
36//! let mut exchange = Exchange::new(Message::from_text("ping"));
37//! let _res = adapter.dispatch(&exchange).await.unwrap();
38//! # }
39//! ```
40//!
41//! # Future Extensions
42//! * Header propagation (copy selected message headers to HTTP request).
43//! * Custom serialization strategies (content-type aware).
44//! * Retry / backoff policies & circuit breaking.
45//! * Metrics emission (latency, status codes).
46
47use allora_core::adapter::{BaseAdapter, OutboundAdapter, OutboundDispatchResult};
48use allora_core::{
49    error::{Error, Result},
50    Exchange, Payload,
51};
52use async_trait::async_trait;
53use reqwest::{Client, Method};
54use std::fmt::Debug;
55
56#[derive(Clone)]
57pub struct HttpOutboundAdapter {
58    id: String,
59    /// Parsed, validated target URL. Cached at construction; the per-dispatch
60    /// hot path just clones this `Url` and hands it to `reqwest`.
61    url: url::Url,
62    method: Method,
63    use_out_msg: bool,
64    client: Client,
65}
66
67impl Debug for HttpOutboundAdapter {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        f.debug_struct("HttpOutboundAdapter")
70            .field("id", &self.id)
71            .field("url", &self.url.as_str())
72            .field("method", &self.method.as_str())
73            .field("use_out_msg", &self.use_out_msg)
74            .finish()
75    }
76}
77
78impl HttpOutboundAdapter {
79    pub fn builder() -> HttpOutboundAdapterBuilder {
80        HttpOutboundAdapterBuilder::default()
81    }
82    /// Configured target URL (parsed). Useful for tests and structured logging.
83    pub fn url(&self) -> &url::Url {
84        &self.url
85    }
86    /// Configured HTTP method (default POST).
87    pub fn method(&self) -> &Method {
88        &self.method
89    }
90}
91
92impl BaseAdapter for HttpOutboundAdapter {
93    fn id(&self) -> &str {
94        &self.id
95    }
96}
97
98#[derive(Default)]
99pub struct HttpOutboundAdapterBuilder {
100    id: Option<String>,
101    url: Option<String>,
102    method: Option<Method>,
103    use_out_msg: Option<bool>,
104    /// Test-only escape hatch — see the `dangerous_accept_invalid_certs`
105    /// setter. Off by default. **Do not enable in production.**
106    dangerous_accept_invalid_certs: bool,
107}
108
109impl HttpOutboundAdapterBuilder {
110    pub fn id(mut self, v: impl Into<String>) -> Self {
111        self.id = Some(v.into());
112        self
113    }
114    /// Set the full target URL (must include scheme; either `http://` or `https://`).
115    pub fn url(mut self, v: impl Into<String>) -> Self {
116        self.url = Some(v.into());
117        self
118    }
119    pub fn method(mut self, m: Method) -> Self {
120        self.method = Some(m);
121        self
122    }
123    pub fn use_out_msg(mut self, flag: bool) -> Self {
124        self.use_out_msg = Some(flag);
125        self
126    }
127    /// Disable TLS certificate validation on the underlying `reqwest::Client`.
128    ///
129    /// **Intended for tests only.** Enabling this against a real endpoint
130    /// defeats TLS — an active network attacker can read or modify all
131    /// traffic. Production configuration loaded from YAML never sets this
132    /// flag; it has no corresponding spec field.
133    pub fn dangerous_accept_invalid_certs(mut self, flag: bool) -> Self {
134        self.dangerous_accept_invalid_certs = flag;
135        self
136    }
137    pub fn build(self) -> Result<HttpOutboundAdapter> {
138        let raw_url = self.url.ok_or_else(|| Error::other("url required"))?;
139        let parsed = url::Url::parse(&raw_url)
140            .map_err(|e| Error::other(format!("invalid url '{raw_url}': {e}")))?;
141        match parsed.scheme() {
142            "http" | "https" => {}
143            other => {
144                return Err(Error::other(format!(
145                    "unsupported url scheme '{other}' (expected http or https)"
146                )));
147            }
148        }
149        let id = self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
150        let method = self.method.unwrap_or(Method::POST);
151        let client = Client::builder()
152            .danger_accept_invalid_certs(self.dangerous_accept_invalid_certs)
153            .build()
154            .map_err(|e| Error::other(format!("failed to build http client: {e}")))?;
155        Ok(HttpOutboundAdapter {
156            id,
157            url: parsed,
158            method,
159            use_out_msg: self.use_out_msg.unwrap_or(true),
160            client,
161        })
162    }
163}
164
165#[async_trait]
166impl OutboundAdapter for HttpOutboundAdapter {
167    async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult> {
168        let msg_ref = if self.use_out_msg {
169            exchange.out_msg.as_ref().unwrap_or(&exchange.in_msg)
170        } else {
171            &exchange.in_msg
172        };
173        let bytes: Vec<u8> = match &msg_ref.payload {
174            Payload::Text(s) => s.clone().into_bytes(),
175            Payload::Bytes(b) => b.clone(),
176            Payload::Json(v) => {
177                serde_json::to_vec(v).map_err(|e| Error::serialization(e.to_string()))?
178            }
179            Payload::Empty => Vec::new(),
180        };
181        let resp = self
182            .client
183            .request(self.method.clone(), self.url.clone())
184            .header("Content-Type", "application/octet-stream")
185            .body(bytes)
186            .send()
187            .await
188            .map_err(|e| Error::other(e.to_string()))?;
189        let status = resp.status();
190        let body_string = resp.text().await.map_err(|e| Error::other(e.to_string()))?;
191        Ok(OutboundDispatchResult {
192            acknowledged: status.is_success(),
193            message: Some(format!("HTTP {}", status)),
194            status_code: Some(status.as_u16()),
195            body: Some(body_string),
196        })
197    }
198}