1#![doc = include_str!("../README.md")]
2#![warn(clippy::all, clippy::pedantic)]
3#![allow(clippy::missing_errors_doc)]
4
5use anyhow::bail;
10use bytes::Bytes;
11use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
12use ordinary_storage::Storage;
13use ordinary_types::{
14 Kind, flexbuffer_reader_to_json, flexbuffer_reader_to_path_component,
15 flexbuffer_reader_to_string, json_to_flexbuffer_vec,
16};
17use ordinary_utils::headers::{log_request, log_response};
18use ordinary_utils::{SERVER, WrappedRedactedHashingAlg};
19use reqwest::redirect::Policy;
20use std::time::Instant;
21use std::{collections::BTreeMap, sync::Arc};
22use tokio::runtime::Handle;
23use tracing::{Instrument, Span, instrument};
24#[derive(Clone)]
27pub struct Integration {
28 client: reqwest::Client,
29 sender: crossbeam_channel::Sender<(Bytes, oneshot::Sender<Bytes>, Span)>,
30
31 pub config: IntegrationConfig,
32
33 log_headers: bool,
34 redacted_hash: Arc<Option<WrappedRedactedHashingAlg>>,
35}
36
37impl Integration {
38 #[instrument(skip_all, fields(i, nm), err)]
39 pub fn new(
40 config: IntegrationConfig,
41 storage: Arc<Storage>,
42 secrets: &BTreeMap<String, &Secret>,
43 log_headers: bool,
44 redacted_hash: Arc<Option<WrappedRedactedHashingAlg>>,
45 ) -> anyhow::Result<Integration> {
46 Span::current().record("i", config.idx);
47 Span::current().record("nm", tracing::field::display(&config.name));
48
49 let mut usable_config = config.clone();
50
51 if let Some(secret_names) = &config.secrets {
53 for secret_name in secret_names {
54 if let Some(secret) = secrets.get(secret_name) {
55 match secret.visibility {
56 ordinary_config::SecretVisibility::Integrations => {
57 let val = match &secret.source {
58 ordinary_config::SecretSource::Env => std::env::var(&secret.name)?,
59 ordinary_config::SecretSource::Stored => std::str::from_utf8(
60 storage.secrets.get(&secret.name)?.as_ref(),
61 )?
62 .to_string(),
63 };
64
65 let mut str_config = serde_json::to_string(&config)?;
67 str_config =
68 str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
69 usable_config = serde_json::from_str(&str_config)?;
70 }
71 }
72 }
73 }
74 }
75
76 let rt = Handle::try_current()?;
77
78 let client = reqwest::Client::builder()
79 .use_rustls_tls()
80 .brotli(true)
81 .zstd(true)
82 .deflate(true)
83 .gzip(true)
84 .redirect(Policy::none())
85 .user_agent(SERVER)
86 .build()?;
87
88 let (sender, receiver) = crossbeam_channel::bounded(128_000);
89
90 let integration = Integration {
91 client,
92 sender,
93
94 log_headers,
95 redacted_hash,
96
97 config: usable_config,
98 };
99
100 let integration_clone = Arc::new(integration.clone());
101
102 std::thread::spawn(move || {
103 while let Ok((payload, tx, span)) = receiver.recv() {
104 {
105 let integration = integration_clone.clone();
106
107 rt.spawn(async move {
108 async {
109 match integration.invoke(payload.as_ref()).await {
110 Ok(res) => {
111 if let Err(err) = tx.send(res) {
112 tracing::error!(%err);
113 }
114 }
115 Err(err) => {
116 tracing::error!(%err);
117 }
118 }
119 }
120 .instrument(span)
121 .await;
122 });
123 }
124 }
125 });
126
127 Ok(integration)
128 }
129
130 pub fn invoke_blocking(&self, payload: &[u8], span: Span) -> anyhow::Result<Bytes> {
131 let (tx, rx) = oneshot::channel();
132
133 self.sender
134 .send((Bytes::copy_from_slice(payload), tx, span))?;
135
136 let res = rx.recv()?;
137 Ok(res)
138 }
139
140 #[instrument(name = "int", skip_all, fields(i, nm), err)]
141 pub async fn invoke(&self, payload: &[u8]) -> anyhow::Result<Bytes> {
142 Span::current().record("i", self.config.idx);
143 Span::current().record("nm", tracing::field::display(&self.config.name));
144
145 match &self.config.protocol {
146 IntegrationProtocol::Http {
147 method,
148 headers,
149 send_encoding,
150 recv_encoding,
151 } => {
152 let root = flexbuffers::Reader::get_root(payload)?;
153
154 let body: Bytes = if method == "GET" {
155 Bytes::new()
156 } else {
157 match send_encoding {
158 ordinary_config::IntegrationProtocolHttpEncoding::None => {
159 Bytes::copy_from_slice(payload)
160 }
161 ordinary_config::IntegrationProtocolHttpEncoding::Json => {
162 flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
163 .to_string()
164 .into()
165 }
166 ordinary_config::IntegrationProtocolHttpEncoding::Text => {
167 flexbuffer_reader_to_string(
168 &self.config.send,
169 &root.as_vector().idx(0),
170 )?
171 .into()
172 }
173 }
174 };
175
176 let mut endpoint = self.config.endpoint.clone();
177
178 if endpoint.contains("{send}") {
179 endpoint = endpoint.replace(
180 "{send}",
181 &flexbuffer_reader_to_path_component(
182 &self.config.send,
183 &root.as_vector().idx(0),
184 )?,
185 );
186 }
187
188 if endpoint.contains("{send.")
189 && let Kind::Object { name: _, fields } = &self.config.send
190 {
191 let send_vec = root.as_vector().idx(0).as_vector();
192
193 for field in fields {
194 endpoint = endpoint.replace(
195 &format!("{{send.{}}}", field.name),
196 &flexbuffer_reader_to_path_component(
197 &field.kind,
198 &send_vec.idx(field.idx as usize),
199 )?,
200 );
201 }
202 }
203
204 let res = self.http(&endpoint, method, headers, body).await?;
205
206 let mut builder =
207 flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
208 let mut builder_vec = builder.start_vector();
209
210 match recv_encoding {
211 ordinary_config::IntegrationProtocolHttpEncoding::None => {
212 builder_vec.push(flexbuffers::Blob(res.bytes().await?.as_ref()));
213 }
214 ordinary_config::IntegrationProtocolHttpEncoding::Json => {
215 json_to_flexbuffer_vec(
216 &self.config.recv,
217 &res.json().await?,
218 &mut builder_vec,
219 )?;
220 }
221 ordinary_config::IntegrationProtocolHttpEncoding::Text => {
222 builder_vec.push(res.text().await?.as_str());
223 }
224 }
225
226 builder_vec.end_vector();
227
228 Ok(Bytes::copy_from_slice(builder.view()))
229 } }
236 }
237
238 #[instrument(skip_all, fields(host, port, path, query), err)]
239 async fn http(
240 &self,
241 endpoint: &str,
242 method: &str,
243 headers: &Vec<(String, String)>,
244 body: Bytes,
245 ) -> anyhow::Result<reqwest::Response> {
246 let start = Instant::now();
247
248 let mut req = match method {
249 "GET" => self.client.get(endpoint),
250 "POST" => {
251 let req = self.client.post(endpoint);
252 req.body(body)
253 }
254 "PUT" => {
255 let req = self.client.put(endpoint);
256 req.body(body)
257 }
258 "PATCH" => {
259 let req = self.client.patch(endpoint);
260 req.body(body)
261 }
262 "DELETE" => self.client.delete(endpoint),
263 _ => bail!("invalid method"),
264 };
265
266 for (key, val) in headers {
267 req = req.header(key.clone(), val);
268 }
269
270 let req = req.build()?;
271 let url = req.url();
272
273 Span::current().record("host", url.host().map(tracing::field::display));
274 Span::current().record("port", url.port().map(tracing::field::display));
275 Span::current().record("path", tracing::field::display(url.path()));
276 Span::current().record("query", url.query().map(tracing::field::display));
277
278 log_request(
279 self.log_headers,
280 req.headers(),
281 &self.redacted_hash,
282 req.method(),
283 );
284
285 let res = self.client.execute(req).await?;
286
287 log_response(
288 res.status().as_u16(),
289 self.log_headers,
290 &self.redacted_hash,
291 start,
292 res.headers(),
293 res.version(),
294 );
295
296 Ok(res)
297 }
298}