ordinary_integration/
lib.rs1#![doc = include_str!("../README.md")]
2#![warn(clippy::all, clippy::pedantic)]
3#![allow(clippy::missing_errors_doc)]
4
5use anyhow::anyhow;
10use bytes::Bytes;
11use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
12use ordinary_storage::Storage;
13use ordinary_types::{
14 Kind, flexbuffer_reader_to_json, flexbuffer_reader_to_path_component,
15 flexbuffer_reader_to_string, json_to_flexbuffer_vec,
16};
17use std::{collections::BTreeMap, sync::Arc};
18use tracing::instrument;
19use ureq::Body;
20#[derive(Clone)]
23pub struct Integration {
24 pub config: IntegrationConfig,
25}
26
27impl Integration {
28 #[instrument(skip(config, storage, secrets), fields(i, nm), err)]
29 pub fn new(
30 config: IntegrationConfig,
31 storage: Arc<Storage>,
32 secrets: &BTreeMap<String, &Secret>,
33 ) -> anyhow::Result<Integration> {
34 tracing::Span::current().record("i", config.idx);
35 tracing::Span::current().record("nm", tracing::field::display(&config.name));
36
37 let mut usable_config = config.clone();
38
39 if let Some(secret_names) = &config.secrets {
41 for secret_name in secret_names {
42 if let Some(secret) = secrets.get(secret_name) {
43 match secret.visibility {
44 ordinary_config::SecretVisibility::Integrations => {
45 let val = match &secret.source {
46 ordinary_config::SecretSource::Env => std::env::var(&secret.name)?,
47 ordinary_config::SecretSource::Stored => std::str::from_utf8(
48 storage.secrets.get(&secret.name)?.as_ref(),
49 )?
50 .to_string(),
51 };
52
53 let mut str_config = serde_json::to_string(&config)?;
55 str_config =
56 str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
57 usable_config = serde_json::from_str(&str_config)?;
58 }
59 }
60 }
61 }
62 }
63
64 Ok(Integration {
65 config: usable_config,
66 })
67 }
68
69 #[instrument(skip(self, payload), fields(i, nm), err)]
70 pub fn invoke(&self, payload: &[u8]) -> anyhow::Result<Bytes> {
71 tracing::Span::current().record("i", self.config.idx);
72 tracing::Span::current().record("nm", tracing::field::display(&self.config.name));
73
74 match &self.config.protocol {
75 IntegrationProtocol::Http {
76 method,
77 headers,
78 send_encoding,
79 recv_encoding,
80 } => {
81 let root = flexbuffers::Reader::get_root(payload)?;
82
83 let body: Bytes = if method == "GET" {
84 Bytes::new()
85 } else {
86 match send_encoding {
87 ordinary_config::IntegrationProtocolHttpEncoding::None => {
88 Bytes::copy_from_slice(payload)
89 }
90 ordinary_config::IntegrationProtocolHttpEncoding::Json => {
91 flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
92 .to_string()
93 .into()
94 }
95 ordinary_config::IntegrationProtocolHttpEncoding::Text => {
96 flexbuffer_reader_to_string(
97 &self.config.send,
98 &root.as_vector().idx(0),
99 )?
100 .into()
101 }
102 }
103 };
104
105 let mut endpoint = self.config.endpoint.clone();
106
107 if endpoint.contains("{send}") {
108 endpoint = endpoint.replace(
109 "{send}",
110 &flexbuffer_reader_to_path_component(
111 &self.config.send,
112 &root.as_vector().idx(0),
113 )?,
114 );
115 }
116
117 if endpoint.contains("{send.")
118 && let Kind::Object { name: _, fields } = &self.config.send
119 {
120 let send_vec = root.as_vector().idx(0).as_vector();
121
122 for field in fields {
123 endpoint = endpoint.replace(
124 &format!("{{send.{}}}", field.name),
125 &flexbuffer_reader_to_path_component(
126 &field.kind,
127 &send_vec.idx(field.idx as usize),
128 )?,
129 );
130 }
131 }
132
133 let mut res = Self::http(&endpoint, method, headers, body)?;
134
135 let mut builder =
136 flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
137 let mut builder_vec = builder.start_vector();
138
139 match recv_encoding {
140 ordinary_config::IntegrationProtocolHttpEncoding::None => {
141 builder_vec.push(flexbuffers::Blob(res.body_mut().read_to_vec()?.as_ref()));
142 }
143 ordinary_config::IntegrationProtocolHttpEncoding::Json => {
144 json_to_flexbuffer_vec(
145 &self.config.recv,
146 &res.body_mut().read_json::<serde_json::Value>()?,
147 &mut builder_vec,
148 )?;
149 }
150 ordinary_config::IntegrationProtocolHttpEncoding::Text => {
151 builder_vec.push(res.body_mut().read_to_string()?.as_str());
152 }
153 }
154
155 builder_vec.end_vector();
156
157 Ok(Bytes::copy_from_slice(builder.view()))
158 } }
165 }
166
167 #[instrument(skip(body, headers), err)]
168 fn http(
169 endpoint: &String,
170 method: &String,
171 headers: &Vec<(String, String)>,
172 body: Bytes,
173 ) -> anyhow::Result<http::Response<Body>> {
174 match method.as_str() {
177 "GET" => {
178 let mut req = ureq::get(endpoint);
179
180 for (key, val) in headers {
181 req = req.header(key, val);
182 }
183
184 let res = req.call()?;
185
186 Ok(res)
187 }
188 "POST" => {
189 let mut req = ureq::post(endpoint);
190
191 for (key, val) in headers {
192 req = req.header(key, val);
193 }
194
195 let res = req.send(body.as_ref())?;
196
197 Ok(res)
198 }
199 "PUT" => {
200 let mut req = ureq::put(endpoint);
201
202 for (key, val) in headers {
203 req = req.header(key, val);
204 }
205
206 let res = req.send(body.as_ref())?;
207
208 Ok(res)
209 }
210 "PATCH" => {
211 let mut req = ureq::patch(endpoint);
212
213 for (key, val) in headers {
214 req = req.header(key, val);
215 }
216
217 let res = req.send(body.as_ref())?;
218
219 Ok(res)
220 }
221 "DELETE" => {
222 let mut req = ureq::delete(endpoint);
223
224 for (key, val) in headers {
225 req = req.header(key, val);
226 }
227
228 let res = req.call()?;
229
230 Ok(res)
231 }
232 _ => Err(anyhow!("invalid method")),
233 }
234 }
235}