use allora_core::adapter::{BaseAdapter, OutboundAdapter, OutboundDispatchResult};
use allora_core::{
error::{Error, Result},
Exchange, Payload,
};
use async_trait::async_trait;
use hyper::client::HttpConnector;
use hyper::{Body, Client, Method, Request};
use std::fmt::Debug;
#[derive(Clone)]
pub struct HttpOutboundAdapter {
id: String,
host: String,
port: u16,
base_path: String,
path: Option<String>,
method: Method,
use_out_msg: bool,
client: Client<HttpConnector>,
}
impl Debug for HttpOutboundAdapter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HttpOutboundAdapter")
.field("id", &self.id)
.field("host", &self.host)
.field("port", &self.port)
.field("base_path", &self.base_path)
.field("path", &self.path)
.field("method", &self.method.as_str())
.field("use_out_msg", &self.use_out_msg)
.finish()
}
}
impl HttpOutboundAdapter {
pub fn builder() -> HttpOutboundAdapterBuilder {
HttpOutboundAdapterBuilder::default()
}
fn full_url(&self) -> String {
let mut base = self.base_path.clone();
if !base.starts_with('/') {
base.insert(0, '/');
}
if base.len() > 1 && base.ends_with('/') {
base.pop();
}
let mut full = base;
if let Some(p) = &self.path {
if !p.is_empty() {
if !p.starts_with('/') {
full.push('/');
}
full.push_str(p);
}
}
format!("http://{}:{}{}", self.host, self.port, full)
}
}
impl BaseAdapter for HttpOutboundAdapter {
fn id(&self) -> &str {
&self.id
}
}
#[derive(Default)]
pub struct HttpOutboundAdapterBuilder {
id: Option<String>,
host: Option<String>,
port: Option<u16>,
base_path: Option<String>,
path: Option<String>,
method: Option<Method>,
use_out_msg: Option<bool>,
}
impl HttpOutboundAdapterBuilder {
pub fn id(mut self, v: impl Into<String>) -> Self {
self.id = Some(v.into());
self
}
pub fn host(mut self, v: impl Into<String>) -> Self {
self.host = Some(v.into());
self
}
pub fn port(mut self, v: u16) -> Self {
self.port = Some(v);
self
}
pub fn base_path(mut self, v: impl Into<String>) -> Self {
self.base_path = Some(v.into());
self
}
pub fn path(mut self, v: impl Into<String>) -> Self {
self.path = Some(v.into());
self
}
pub fn method(mut self, m: Method) -> Self {
self.method = Some(m);
self
}
pub fn use_out_msg(mut self, flag: bool) -> Self {
self.use_out_msg = Some(flag);
self
}
pub fn build(self) -> Result<HttpOutboundAdapter> {
let host = self.host.ok_or_else(|| Error::other("host required"))?;
let port = self.port.ok_or_else(|| Error::other("port required"))?;
let id = self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let base_path = self.base_path.unwrap_or_else(|| "/".to_string());
let method = self.method.unwrap_or(Method::POST);
Ok(HttpOutboundAdapter {
id,
host,
port,
base_path,
path: self.path,
method,
use_out_msg: self.use_out_msg.unwrap_or(true),
client: Client::new(),
})
}
}
#[async_trait]
impl OutboundAdapter for HttpOutboundAdapter {
async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult> {
let msg_ref = if self.use_out_msg {
exchange.out_msg.as_ref().unwrap_or(&exchange.in_msg)
} else {
&exchange.in_msg
};
let bytes: Vec<u8> = match &msg_ref.payload {
Payload::Text(s) => s.clone().into_bytes(),
Payload::Bytes(b) => b.clone(),
Payload::Json(v) => {
serde_json::to_vec(v).map_err(|e| Error::serialization(e.to_string()))?
}
Payload::Empty => Vec::new(),
};
let req = Request::builder()
.method(self.method.clone())
.uri(self.full_url())
.header("Content-Type", "application/octet-stream")
.body(Body::from(bytes))
.map_err(|e| Error::other(e.to_string()))?;
let resp = self
.client
.request(req)
.await
.map_err(|e| Error::other(e.to_string()))?;
let status = resp.status();
let body_bytes = hyper::body::to_bytes(resp.into_body())
.await
.map_err(|e| Error::other(e.to_string()))?;
let body_string = String::from_utf8_lossy(&body_bytes).to_string();
Ok(OutboundDispatchResult {
acknowledged: status.is_success(),
message: Some(format!("HTTP {}", status)),
status_code: Some(status.as_u16()),
body: Some(body_string),
})
}
}