allora_http/http_outbound_adapter.rs
1//! HTTP Outbound Adapter: dispatches an `Exchange` to an external HTTP(S) endpoint.
2//!
3//! # Overview
4//! `HttpOutboundAdapter` implements [`OutboundAdapter`], taking the (optionally transformed)
5//! message from an `Exchange` and POSTing (default) it to a configured remote endpoint.
6//! A builder is provided for ergonomic configuration.
7//!
8//! As of 0.0.9 the adapter is configured by a single `url` (string parsed into
9//! [`url::Url`]) rather than the previous `host` + `port` + `base-path` triple,
10//! and the underlying HTTP client is [`reqwest::Client`] — which transparently
11//! supports both `http://` and `https://` schemes via `rustls`.
12//!
13//! # Selection of Message
14//! By default the adapter prefers `exchange.out_msg` (if present) falling back to `exchange.in_msg`.
15//! This can be inverted via `use_out_msg(false)` to always send the inbound message.
16//!
17//! # Builder Fields
18//! * id (optional) – stable identifier; auto-generated UUID if omitted.
19//! * url (required) – full target URL including scheme (e.g. `https://api.example.com/submit`).
20//! * method (optional) – HTTP method (default POST).
21//! * use_out_msg (optional) – whether to prioritize `out_msg` (default true).
22//!
23//! # Error Semantics
24//! Network / protocol errors map to `Error::other`. Non-success HTTP status still returns
25//! `OutboundDispatchResult` (acknowledged = status.is_success()).
26//!
27//! # Observability (0.0.10+)
28//! Every failure path in [`HttpOutboundAdapter::dispatch`] emits a structured
29//! `tracing::warn!` event so operators can diagnose silent dispatch problems
30//! in logs (e.g. CloudWatch). The four failure classes are:
31//!
32//! 1. **Serialization** of a `Payload::Json` body fails.
33//! 2. **Transport/connection** error from `reqwest` (DNS, TCP, TLS, …).
34//! 3. **Body read** failure after a response was received.
35//! 4. **Non-success HTTP status** (any non-2xx); the first 512 chars of the
36//! response body are included as `body_preview`.
37//!
38//! Successful dispatches are intentionally silent — the happy path runs at
39//! high frequency and `info!` here would flood log sinks. The dispatch
40//! contract (return type and channel-propagation semantics) is unchanged;
41//! the new log calls are purely additive.
42//!
43//! # Example
44//! ```no_run
45//! use allora_core::{adapter::OutboundAdapter, Exchange, Message};
46//! use allora_http::HttpOutboundAdapter;
47//! # async fn demo() {
48//! let adapter = HttpOutboundAdapter::builder()
49//! .id("http-public")
50//! .url("https://api.example.com/submit")
51//! .build().unwrap();
52//! let mut exchange = Exchange::new(Message::from_text("ping"));
53//! let _res = adapter.dispatch(&exchange).await.unwrap();
54//! # }
55//! ```
56//!
57//! # Future Extensions
58//! * Header propagation (copy selected message headers to HTTP request).
59//! * Custom serialization strategies (content-type aware).
60//! * Retry / backoff policies & circuit breaking.
61//! * Metrics emission (latency, status codes).
62
63use allora_core::adapter::{BaseAdapter, OutboundAdapter, OutboundDispatchResult};
64use allora_core::{
65 error::{Error, Result},
66 Exchange, Payload,
67};
68use async_trait::async_trait;
69use reqwest::{Client, Method};
70use std::fmt::Debug;
71
72#[derive(Clone)]
73pub struct HttpOutboundAdapter {
74 id: String,
75 /// Parsed, validated target URL. Cached at construction; the per-dispatch
76 /// hot path just clones this `Url` and hands it to `reqwest`.
77 url: url::Url,
78 method: Method,
79 use_out_msg: bool,
80 client: Client,
81}
82
83impl Debug for HttpOutboundAdapter {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("HttpOutboundAdapter")
86 .field("id", &self.id)
87 .field("url", &self.url.as_str())
88 .field("method", &self.method.as_str())
89 .field("use_out_msg", &self.use_out_msg)
90 .finish()
91 }
92}
93
94impl HttpOutboundAdapter {
95 pub fn builder() -> HttpOutboundAdapterBuilder {
96 HttpOutboundAdapterBuilder::default()
97 }
98 /// Configured target URL (parsed). Useful for tests and structured logging.
99 pub fn url(&self) -> &url::Url {
100 &self.url
101 }
102 /// Configured HTTP method (default POST).
103 pub fn method(&self) -> &Method {
104 &self.method
105 }
106}
107
108impl BaseAdapter for HttpOutboundAdapter {
109 fn id(&self) -> &str {
110 &self.id
111 }
112}
113
114#[derive(Default)]
115pub struct HttpOutboundAdapterBuilder {
116 id: Option<String>,
117 url: Option<String>,
118 method: Option<Method>,
119 use_out_msg: Option<bool>,
120 /// Test-only escape hatch — see the `dangerous_accept_invalid_certs`
121 /// setter. Off by default. **Do not enable in production.**
122 dangerous_accept_invalid_certs: bool,
123}
124
125impl HttpOutboundAdapterBuilder {
126 pub fn id(mut self, v: impl Into<String>) -> Self {
127 self.id = Some(v.into());
128 self
129 }
130 /// Set the full target URL (must include scheme; either `http://` or `https://`).
131 pub fn url(mut self, v: impl Into<String>) -> Self {
132 self.url = Some(v.into());
133 self
134 }
135 pub fn method(mut self, m: Method) -> Self {
136 self.method = Some(m);
137 self
138 }
139 pub fn use_out_msg(mut self, flag: bool) -> Self {
140 self.use_out_msg = Some(flag);
141 self
142 }
143 /// Disable TLS certificate validation on the underlying `reqwest::Client`.
144 ///
145 /// **Intended for tests only.** Enabling this against a real endpoint
146 /// defeats TLS — an active network attacker can read or modify all
147 /// traffic. Production configuration loaded from YAML never sets this
148 /// flag; it has no corresponding spec field.
149 pub fn dangerous_accept_invalid_certs(mut self, flag: bool) -> Self {
150 self.dangerous_accept_invalid_certs = flag;
151 self
152 }
153 pub fn build(self) -> Result<HttpOutboundAdapter> {
154 let raw_url = self.url.ok_or_else(|| Error::other("url required"))?;
155 let parsed = url::Url::parse(&raw_url)
156 .map_err(|e| Error::other(format!("invalid url '{raw_url}': {e}")))?;
157 match parsed.scheme() {
158 "http" | "https" => {}
159 other => {
160 return Err(Error::other(format!(
161 "unsupported url scheme '{other}' (expected http or https)"
162 )));
163 }
164 }
165 let id = self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
166 let method = self.method.unwrap_or(Method::POST);
167 let client = Client::builder()
168 .danger_accept_invalid_certs(self.dangerous_accept_invalid_certs)
169 .build()
170 .map_err(|e| Error::other(format!("failed to build http client: {e}")))?;
171 Ok(HttpOutboundAdapter {
172 id,
173 url: parsed,
174 method,
175 use_out_msg: self.use_out_msg.unwrap_or(true),
176 client,
177 })
178 }
179}
180
181#[async_trait]
182impl OutboundAdapter for HttpOutboundAdapter {
183 async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult> {
184 let msg_ref = if self.use_out_msg {
185 exchange.out_msg.as_ref().unwrap_or(&exchange.in_msg)
186 } else {
187 &exchange.in_msg
188 };
189 let bytes: Vec<u8> = match &msg_ref.payload {
190 Payload::Text(s) => s.clone().into_bytes(),
191 Payload::Bytes(b) => b.clone(),
192 Payload::Json(v) => match serde_json::to_vec(v) {
193 Ok(b) => b,
194 Err(e) => {
195 tracing::warn!(
196 url = %self.url.as_str(),
197 error = %e,
198 "HttpOutboundAdapter: failed to serialize JSON payload"
199 );
200 return Err(Error::serialization(e.to_string()));
201 }
202 },
203 Payload::Empty => Vec::new(),
204 };
205 let resp = match self
206 .client
207 .request(self.method.clone(), self.url.clone())
208 .header("Content-Type", "application/octet-stream")
209 .body(bytes)
210 .send()
211 .await
212 {
213 Ok(r) => r,
214 Err(e) => {
215 tracing::warn!(
216 url = %self.url.as_str(),
217 method = %self.method,
218 error = %e,
219 "HttpOutboundAdapter: transport error during dispatch"
220 );
221 return Err(Error::other(e.to_string()));
222 }
223 };
224 let status = resp.status();
225 let body_string = match resp.text().await {
226 Ok(b) => b,
227 Err(e) => {
228 tracing::warn!(
229 url = %self.url.as_str(),
230 method = %self.method,
231 status = status.as_u16(),
232 error = %e,
233 "HttpOutboundAdapter: failed to read response body"
234 );
235 return Err(Error::other(e.to_string()));
236 }
237 };
238 if !status.is_success() {
239 // Char-aware truncation: byte slicing here would panic on a
240 // multi-byte UTF-8 boundary if the server returns non-ASCII.
241 let body_preview: String = body_string.chars().take(512).collect();
242 tracing::warn!(
243 url = %self.url.as_str(),
244 method = %self.method,
245 status = status.as_u16(),
246 body_preview = %body_preview,
247 "HttpOutboundAdapter: non-success HTTP status"
248 );
249 }
250 Ok(OutboundDispatchResult {
251 acknowledged: status.is_success(),
252 message: Some(format!("HTTP {}", status)),
253 status_code: Some(status.as_u16()),
254 body: Some(body_string),
255 })
256 }
257}