Skip to main content

allora_http/
http_outbound_adapter.rs

1//! HTTP Outbound Adapter: dispatches an `Exchange` to an external HTTP(S) endpoint.
2//!
3//! # Overview
4//! `HttpOutboundAdapter` implements [`OutboundAdapter`], taking the (optionally transformed)
5//! message from an `Exchange` and POSTing (default) it to a configured remote endpoint.
6//! A builder is provided for ergonomic configuration.
7//!
8//! As of 0.0.9 the adapter is configured by a single `url` (string parsed into
9//! [`url::Url`]) rather than the previous `host` + `port` + `base-path` triple,
10//! and the underlying HTTP client is [`reqwest::Client`] — which transparently
11//! supports both `http://` and `https://` schemes via `rustls`.
12//!
13//! # Selection of Message
14//! By default the adapter prefers `exchange.out_msg` (if present) falling back to `exchange.in_msg`.
15//! This can be inverted via `use_out_msg(false)` to always send the inbound message.
16//!
17//! # Builder Fields
18//! * id (optional) – stable identifier; auto-generated UUID if omitted.
19//! * url (required) – full target URL including scheme (e.g. `https://api.example.com/submit`).
20//! * method (optional) – HTTP method (default POST).
21//! * use_out_msg (optional) – whether to prioritize `out_msg` (default true).
22//!
23//! # Error Semantics
24//! Network / protocol errors map to `Error::other`. Non-success HTTP status still returns
25//! `OutboundDispatchResult` (acknowledged = status.is_success()).
26//!
27//! # Observability (0.0.10+)
28//! Every failure path in [`HttpOutboundAdapter::dispatch`] emits a structured
29//! `tracing::warn!` event so operators can diagnose silent dispatch problems
30//! in logs (e.g. CloudWatch). The four failure classes are:
31//!
32//! 1. **Serialization** of a `Payload::Json` body fails.
33//! 2. **Transport/connection** error from `reqwest` (DNS, TCP, TLS, …).
34//! 3. **Body read** failure after a response was received.
35//! 4. **Non-success HTTP status** (any non-2xx); the first 512 chars of the
36//!    response body are included as `body_preview`.
37//!
38//! Successful dispatches are intentionally silent — the happy path runs at
39//! high frequency and `info!` here would flood log sinks. The dispatch
40//! contract (return type and channel-propagation semantics) is unchanged;
41//! the new log calls are purely additive.
42//!
43//! # Example
44//! ```no_run
45//! use allora_core::{adapter::OutboundAdapter, Exchange, Message};
46//! use allora_http::HttpOutboundAdapter;
47//! # async fn demo() {
48//! let adapter = HttpOutboundAdapter::builder()
49//!     .id("http-public")
50//!     .url("https://api.example.com/submit")
51//!     .build().unwrap();
52//! let mut exchange = Exchange::new(Message::from_text("ping"));
53//! let _res = adapter.dispatch(&exchange).await.unwrap();
54//! # }
55//! ```
56//!
57//! # Future Extensions
58//! * Header propagation (copy selected message headers to HTTP request).
59//! * Custom serialization strategies (content-type aware).
60//! * Retry / backoff policies & circuit breaking.
61//! * Metrics emission (latency, status codes).
62
63use allora_core::adapter::{BaseAdapter, OutboundAdapter, OutboundDispatchResult};
64use allora_core::{
65    error::{Error, Result},
66    Exchange, Payload,
67};
68use async_trait::async_trait;
69use reqwest::{Client, Method};
70use std::fmt::Debug;
71
72#[derive(Clone)]
73pub struct HttpOutboundAdapter {
74    id: String,
75    /// Parsed, validated target URL. Cached at construction; the per-dispatch
76    /// hot path just clones this `Url` and hands it to `reqwest`.
77    url: url::Url,
78    method: Method,
79    use_out_msg: bool,
80    client: Client,
81}
82
83impl Debug for HttpOutboundAdapter {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("HttpOutboundAdapter")
86            .field("id", &self.id)
87            .field("url", &self.url.as_str())
88            .field("method", &self.method.as_str())
89            .field("use_out_msg", &self.use_out_msg)
90            .finish()
91    }
92}
93
94impl HttpOutboundAdapter {
95    pub fn builder() -> HttpOutboundAdapterBuilder {
96        HttpOutboundAdapterBuilder::default()
97    }
98    /// Configured target URL (parsed). Useful for tests and structured logging.
99    pub fn url(&self) -> &url::Url {
100        &self.url
101    }
102    /// Configured HTTP method (default POST).
103    pub fn method(&self) -> &Method {
104        &self.method
105    }
106}
107
108impl BaseAdapter for HttpOutboundAdapter {
109    fn id(&self) -> &str {
110        &self.id
111    }
112}
113
114#[derive(Default)]
115pub struct HttpOutboundAdapterBuilder {
116    id: Option<String>,
117    url: Option<String>,
118    method: Option<Method>,
119    use_out_msg: Option<bool>,
120    /// Test-only escape hatch — see the `dangerous_accept_invalid_certs`
121    /// setter. Off by default. **Do not enable in production.**
122    dangerous_accept_invalid_certs: bool,
123}
124
125impl HttpOutboundAdapterBuilder {
126    pub fn id(mut self, v: impl Into<String>) -> Self {
127        self.id = Some(v.into());
128        self
129    }
130    /// Set the full target URL (must include scheme; either `http://` or `https://`).
131    pub fn url(mut self, v: impl Into<String>) -> Self {
132        self.url = Some(v.into());
133        self
134    }
135    pub fn method(mut self, m: Method) -> Self {
136        self.method = Some(m);
137        self
138    }
139    pub fn use_out_msg(mut self, flag: bool) -> Self {
140        self.use_out_msg = Some(flag);
141        self
142    }
143    /// Disable TLS certificate validation on the underlying `reqwest::Client`.
144    ///
145    /// **Intended for tests only.** Enabling this against a real endpoint
146    /// defeats TLS — an active network attacker can read or modify all
147    /// traffic. Production configuration loaded from YAML never sets this
148    /// flag; it has no corresponding spec field.
149    pub fn dangerous_accept_invalid_certs(mut self, flag: bool) -> Self {
150        self.dangerous_accept_invalid_certs = flag;
151        self
152    }
153    pub fn build(self) -> Result<HttpOutboundAdapter> {
154        let raw_url = self.url.ok_or_else(|| Error::other("url required"))?;
155        let parsed = url::Url::parse(&raw_url)
156            .map_err(|e| Error::other(format!("invalid url '{raw_url}': {e}")))?;
157        match parsed.scheme() {
158            "http" | "https" => {}
159            other => {
160                return Err(Error::other(format!(
161                    "unsupported url scheme '{other}' (expected http or https)"
162                )));
163            }
164        }
165        let id = self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
166        let method = self.method.unwrap_or(Method::POST);
167        let client = Client::builder()
168            .danger_accept_invalid_certs(self.dangerous_accept_invalid_certs)
169            .build()
170            .map_err(|e| Error::other(format!("failed to build http client: {e}")))?;
171        Ok(HttpOutboundAdapter {
172            id,
173            url: parsed,
174            method,
175            use_out_msg: self.use_out_msg.unwrap_or(true),
176            client,
177        })
178    }
179}
180
181#[async_trait]
182impl OutboundAdapter for HttpOutboundAdapter {
183    async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult> {
184        let msg_ref = if self.use_out_msg {
185            exchange.out_msg.as_ref().unwrap_or(&exchange.in_msg)
186        } else {
187            &exchange.in_msg
188        };
189        let bytes: Vec<u8> = match &msg_ref.payload {
190            Payload::Text(s) => s.clone().into_bytes(),
191            Payload::Bytes(b) => b.clone(),
192            Payload::Json(v) => match serde_json::to_vec(v) {
193                Ok(b) => b,
194                Err(e) => {
195                    tracing::warn!(
196                        url = %self.url.as_str(),
197                        error = %e,
198                        "HttpOutboundAdapter: failed to serialize JSON payload"
199                    );
200                    return Err(Error::serialization(e.to_string()));
201                }
202            },
203            Payload::Empty => Vec::new(),
204        };
205        let resp = match self
206            .client
207            .request(self.method.clone(), self.url.clone())
208            .header("Content-Type", "application/octet-stream")
209            .body(bytes)
210            .send()
211            .await
212        {
213            Ok(r) => r,
214            Err(e) => {
215                tracing::warn!(
216                    url = %self.url.as_str(),
217                    method = %self.method,
218                    error = %e,
219                    "HttpOutboundAdapter: transport error during dispatch"
220                );
221                return Err(Error::other(e.to_string()));
222            }
223        };
224        let status = resp.status();
225        let body_string = match resp.text().await {
226            Ok(b) => b,
227            Err(e) => {
228                tracing::warn!(
229                    url = %self.url.as_str(),
230                    method = %self.method,
231                    status = status.as_u16(),
232                    error = %e,
233                    "HttpOutboundAdapter: failed to read response body"
234                );
235                return Err(Error::other(e.to_string()));
236            }
237        };
238        if !status.is_success() {
239            // Char-aware truncation: byte slicing here would panic on a
240            // multi-byte UTF-8 boundary if the server returns non-ASCII.
241            let body_preview: String = body_string.chars().take(512).collect();
242            tracing::warn!(
243                url = %self.url.as_str(),
244                method = %self.method,
245                status = status.as_u16(),
246                body_preview = %body_preview,
247                "HttpOutboundAdapter: non-success HTTP status"
248            );
249        }
250        Ok(OutboundDispatchResult {
251            acknowledged: status.is_success(),
252            message: Some(format!("HTTP {}", status)),
253            status_code: Some(status.as_u16()),
254            body: Some(body_string),
255        })
256    }
257}