allora_http/
http_outbound_adapter.rs1use allora_core::adapter::{BaseAdapter, OutboundAdapter, OutboundDispatchResult};
49use allora_core::{
50 error::{Error, Result},
51 Exchange, Payload,
52};
53use async_trait::async_trait;
54use hyper::client::HttpConnector;
55use hyper::{Body, Client, Method, Request};
56use std::fmt::Debug;
57
58#[derive(Clone)]
59pub struct HttpOutboundAdapter {
60 id: String,
61 host: String,
62 port: u16,
63 base_path: String,
64 path: Option<String>,
65 method: Method,
66 use_out_msg: bool,
67 client: Client<HttpConnector>,
68}
69
70impl Debug for HttpOutboundAdapter {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("HttpOutboundAdapter")
73 .field("id", &self.id)
74 .field("host", &self.host)
75 .field("port", &self.port)
76 .field("base_path", &self.base_path)
77 .field("path", &self.path)
78 .field("method", &self.method.as_str())
79 .field("use_out_msg", &self.use_out_msg)
80 .finish()
81 }
82}
83
84impl HttpOutboundAdapter {
85 pub fn builder() -> HttpOutboundAdapterBuilder {
86 HttpOutboundAdapterBuilder::default()
87 }
88 fn full_url(&self) -> String {
89 let mut base = self.base_path.clone();
90 if !base.starts_with('/') {
91 base.insert(0, '/');
92 }
93 if base.len() > 1 && base.ends_with('/') {
94 base.pop();
95 }
96 let mut full = base;
97 if let Some(p) = &self.path {
98 if !p.is_empty() {
99 if !p.starts_with('/') {
100 full.push('/');
101 }
102 full.push_str(p);
103 }
104 }
105 format!("http://{}:{}{}", self.host, self.port, full)
106 }
107}
108
109impl BaseAdapter for HttpOutboundAdapter {
110 fn id(&self) -> &str {
111 &self.id
112 }
113}
114
115#[derive(Default)]
116pub struct HttpOutboundAdapterBuilder {
117 id: Option<String>,
118 host: Option<String>,
119 port: Option<u16>,
120 base_path: Option<String>,
121 path: Option<String>,
122 method: Option<Method>,
123 use_out_msg: Option<bool>,
124}
125
126impl HttpOutboundAdapterBuilder {
127 pub fn id(mut self, v: impl Into<String>) -> Self {
128 self.id = Some(v.into());
129 self
130 }
131 pub fn host(mut self, v: impl Into<String>) -> Self {
132 self.host = Some(v.into());
133 self
134 }
135 pub fn port(mut self, v: u16) -> Self {
136 self.port = Some(v);
137 self
138 }
139 pub fn base_path(mut self, v: impl Into<String>) -> Self {
140 self.base_path = Some(v.into());
141 self
142 }
143 pub fn path(mut self, v: impl Into<String>) -> Self {
144 self.path = Some(v.into());
145 self
146 }
147 pub fn method(mut self, m: Method) -> Self {
148 self.method = Some(m);
149 self
150 }
151 pub fn use_out_msg(mut self, flag: bool) -> Self {
152 self.use_out_msg = Some(flag);
153 self
154 }
155 pub fn build(self) -> Result<HttpOutboundAdapter> {
156 let host = self.host.ok_or_else(|| Error::other("host required"))?;
157 let port = self.port.ok_or_else(|| Error::other("port required"))?;
158 let id = self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
159 let base_path = self.base_path.unwrap_or_else(|| "/".to_string());
160 let method = self.method.unwrap_or(Method::POST);
161 Ok(HttpOutboundAdapter {
162 id,
163 host,
164 port,
165 base_path,
166 path: self.path,
167 method,
168 use_out_msg: self.use_out_msg.unwrap_or(true),
169 client: Client::new(),
170 })
171 }
172}
173
174#[async_trait]
175impl OutboundAdapter for HttpOutboundAdapter {
176 async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult> {
177 let msg_ref = if self.use_out_msg {
178 exchange.out_msg.as_ref().unwrap_or(&exchange.in_msg)
179 } else {
180 &exchange.in_msg
181 };
182 let bytes: Vec<u8> = match &msg_ref.payload {
183 Payload::Text(s) => s.clone().into_bytes(),
184 Payload::Bytes(b) => b.clone(),
185 Payload::Json(v) => {
186 serde_json::to_vec(v).map_err(|e| Error::serialization(e.to_string()))?
187 }
188 Payload::Empty => Vec::new(),
189 };
190 let req = Request::builder()
191 .method(self.method.clone())
192 .uri(self.full_url())
193 .header("Content-Type", "application/octet-stream")
194 .body(Body::from(bytes))
195 .map_err(|e| Error::other(e.to_string()))?;
196 let resp = self
197 .client
198 .request(req)
199 .await
200 .map_err(|e| Error::other(e.to_string()))?;
201 let status = resp.status();
202 let body_bytes = hyper::body::to_bytes(resp.into_body())
203 .await
204 .map_err(|e| Error::other(e.to_string()))?;
205 let body_string = String::from_utf8_lossy(&body_bytes).to_string();
206 Ok(OutboundDispatchResult {
207 acknowledged: status.is_success(),
208 message: Some(format!("HTTP {}", status)),
209 status_code: Some(status.as_u16()),
210 body: Some(body_string),
211 })
212 }
213}