allora_http/
http_outbound_adapter.rs1use allora_core::adapter::{BaseAdapter, OutboundAdapter, OutboundDispatchResult};
48use allora_core::{
49 error::{Error, Result},
50 Exchange, Payload,
51};
52use async_trait::async_trait;
53use reqwest::{Client, Method};
54use std::fmt::Debug;
55
56#[derive(Clone)]
57pub struct HttpOutboundAdapter {
58 id: String,
59 url: url::Url,
62 method: Method,
63 use_out_msg: bool,
64 client: Client,
65}
66
67impl Debug for HttpOutboundAdapter {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 f.debug_struct("HttpOutboundAdapter")
70 .field("id", &self.id)
71 .field("url", &self.url.as_str())
72 .field("method", &self.method.as_str())
73 .field("use_out_msg", &self.use_out_msg)
74 .finish()
75 }
76}
77
78impl HttpOutboundAdapter {
79 pub fn builder() -> HttpOutboundAdapterBuilder {
80 HttpOutboundAdapterBuilder::default()
81 }
82 pub fn url(&self) -> &url::Url {
84 &self.url
85 }
86 pub fn method(&self) -> &Method {
88 &self.method
89 }
90}
91
92impl BaseAdapter for HttpOutboundAdapter {
93 fn id(&self) -> &str {
94 &self.id
95 }
96}
97
98#[derive(Default)]
99pub struct HttpOutboundAdapterBuilder {
100 id: Option<String>,
101 url: Option<String>,
102 method: Option<Method>,
103 use_out_msg: Option<bool>,
104 dangerous_accept_invalid_certs: bool,
107}
108
109impl HttpOutboundAdapterBuilder {
110 pub fn id(mut self, v: impl Into<String>) -> Self {
111 self.id = Some(v.into());
112 self
113 }
114 pub fn url(mut self, v: impl Into<String>) -> Self {
116 self.url = Some(v.into());
117 self
118 }
119 pub fn method(mut self, m: Method) -> Self {
120 self.method = Some(m);
121 self
122 }
123 pub fn use_out_msg(mut self, flag: bool) -> Self {
124 self.use_out_msg = Some(flag);
125 self
126 }
127 pub fn dangerous_accept_invalid_certs(mut self, flag: bool) -> Self {
134 self.dangerous_accept_invalid_certs = flag;
135 self
136 }
137 pub fn build(self) -> Result<HttpOutboundAdapter> {
138 let raw_url = self.url.ok_or_else(|| Error::other("url required"))?;
139 let parsed = url::Url::parse(&raw_url)
140 .map_err(|e| Error::other(format!("invalid url '{raw_url}': {e}")))?;
141 match parsed.scheme() {
142 "http" | "https" => {}
143 other => {
144 return Err(Error::other(format!(
145 "unsupported url scheme '{other}' (expected http or https)"
146 )));
147 }
148 }
149 let id = self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
150 let method = self.method.unwrap_or(Method::POST);
151 let client = Client::builder()
152 .danger_accept_invalid_certs(self.dangerous_accept_invalid_certs)
153 .build()
154 .map_err(|e| Error::other(format!("failed to build http client: {e}")))?;
155 Ok(HttpOutboundAdapter {
156 id,
157 url: parsed,
158 method,
159 use_out_msg: self.use_out_msg.unwrap_or(true),
160 client,
161 })
162 }
163}
164
165#[async_trait]
166impl OutboundAdapter for HttpOutboundAdapter {
167 async fn dispatch(&self, exchange: &Exchange) -> Result<OutboundDispatchResult> {
168 let msg_ref = if self.use_out_msg {
169 exchange.out_msg.as_ref().unwrap_or(&exchange.in_msg)
170 } else {
171 &exchange.in_msg
172 };
173 let bytes: Vec<u8> = match &msg_ref.payload {
174 Payload::Text(s) => s.clone().into_bytes(),
175 Payload::Bytes(b) => b.clone(),
176 Payload::Json(v) => {
177 serde_json::to_vec(v).map_err(|e| Error::serialization(e.to_string()))?
178 }
179 Payload::Empty => Vec::new(),
180 };
181 let resp = self
182 .client
183 .request(self.method.clone(), self.url.clone())
184 .header("Content-Type", "application/octet-stream")
185 .body(bytes)
186 .send()
187 .await
188 .map_err(|e| Error::other(e.to_string()))?;
189 let status = resp.status();
190 let body_string = resp.text().await.map_err(|e| Error::other(e.to_string()))?;
191 Ok(OutboundDispatchResult {
192 acknowledged: status.is_success(),
193 message: Some(format!("HTTP {}", status)),
194 status_code: Some(status.as_u16()),
195 body: Some(body_string),
196 })
197 }
198}