use allora_core::adapter::{BaseAdapter, OutboundAdapter, OutboundDispatchResult};
use allora_core::{
error::{Error, Result},
Exchange, Payload,
};
use async_trait::async_trait;
use reqwest::{Client, Method};
use std::fmt::Debug;
#[derive(Clone)]
pub struct HttpOutboundAdapter {
id: String,
url: url::Url,
method: Method,
use_out_msg: bool,
client: Client,
}
impl Debug for HttpOutboundAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpOutboundAdapter")
.field("id", &self.id)
.field("url", &self.url.as_str())
.field("method", &self.method.as_str())
.field("use_out_msg", &self.use_out_msg)
.finish()
}
}
impl HttpOutboundAdapter {
pub fn builder() -> HttpOutboundAdapterBuilder {
HttpOutboundAdapterBuilder::default()
}
pub fn url(&self) -> &url::Url {
&self.url
}
pub fn method(&self) -> &Method {
&self.method
}
}
impl BaseAdapter for HttpOutboundAdapter {
fn id(&self) -> &str {
&self.id
}
}
#[derive(Default)]
pub struct HttpOutboundAdapterBuilder {
id: Option<String>,
url: Option<String>,
method: Option<Method>,
use_out_msg: Option<bool>,
dangerous_accept_invalid_certs: bool,
}
impl HttpOutboundAdapterBuilder {
pub fn id(mut self, v: impl Into<String>) -> Self {
self.id = Some(v.into());
self
}
pub fn url(mut self, v: impl Into<String>) -> Self {
self.url = Some(v.into());
self
}
pub fn method(mut self, m: Method) -> Self {
self.method = Some(m);
self
}
pub fn use_out_msg(mut self, flag: bool) -> Self {
self.use_out_msg = Some(flag);
self
}
pub fn dangerous_accept_invalid_certs(mut self, flag: bool) -> Self {
self.dangerous_accept_invalid_certs = flag;
self
}
pub fn build(self) -> Result<HttpOutboundAdapter> {
let raw_url = self.url.ok_or_else(|| Error::other("url required"))?;
let parsed = url::Url::parse(&raw_url)
.map_err(|e| Error::other(format!("invalid url '{raw_url}': {e}")))?;
match parsed.scheme() {
"http" | "https" => {}
other => {
return Err(Error::other(format!(
"unsupported url scheme '{other}' (expected http or https)"
)));
}
}
let id = self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let method = self.method.unwrap_or(Method::POST);
let client = Client::builder()
.danger_accept_invalid_certs(self.dangerous_accept_invalid_certs)
.build()
.map_err(|e| Error::other(format!("failed to build http client: {e}")))?;
Ok(HttpOutboundAdapter {
id,
url: parsed,
method,
use_out_msg: self.use_out_msg.unwrap_or(true),
client,
})
}
}
#[async_trait]
impl OutboundAdapter for HttpOutboundAdapter {
async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult> {
let msg_ref = if self.use_out_msg {
exchange.out_msg.as_ref().unwrap_or(&exchange.in_msg)
} else {
&exchange.in_msg
};
let bytes: Vec<u8> = match &msg_ref.payload {
Payload::Text(s) => s.clone().into_bytes(),
Payload::Bytes(b) => b.clone(),
Payload::Json(v) => match serde_json::to_vec(v) {
Ok(b) => b,
Err(e) => {
tracing::warn!(
url = %self.url.as_str(),
error = %e,
"HttpOutboundAdapter: failed to serialize JSON payload"
);
return Err(Error::serialization(e.to_string()));
}
},
Payload::Empty => Vec::new(),
};
let resp = match self
.client
.request(self.method.clone(), self.url.clone())
.header("Content-Type", "application/octet-stream")
.body(bytes)
.send()
.await
{
Ok(r) => r,
Err(e) => {
tracing::warn!(
url = %self.url.as_str(),
method = %self.method,
error = %e,
"HttpOutboundAdapter: transport error during dispatch"
);
return Err(Error::other(e.to_string()));
}
};
let status = resp.status();
let body_string = match resp.text().await {
Ok(b) => b,
Err(e) => {
tracing::warn!(
url = %self.url.as_str(),
method = %self.method,
status = status.as_u16(),
error = %e,
"HttpOutboundAdapter: failed to read response body"
);
return Err(Error::other(e.to_string()));
}
};
if !status.is_success() {
let body_preview: String = body_string.chars().take(512).collect();
tracing::warn!(
url = %self.url.as_str(),
method = %self.method,
status = status.as_u16(),
body_preview = %body_preview,
"HttpOutboundAdapter: non-success HTTP status"
);
}
Ok(OutboundDispatchResult {
acknowledged: status.is_success(),
message: Some(format!("HTTP {}", status)),
status_code: Some(status.as_u16()),
body: Some(body_string),
})
}
}