ordinary_integration/
lib.rs1#![doc = include_str!("../README.md")]
2#![warn(clippy::all, clippy::pedantic)]
3#![allow(clippy::missing_errors_doc)]
4
5use bytes::Bytes;
10use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
11use ordinary_storage::Storage;
12use ordinary_types::{
13 flexbuffer_reader_to_json, flexbuffer_reader_to_string, json_to_flexbuffer_vec,
14};
15use reqwest::blocking::Response;
16use std::{collections::BTreeMap, error::Error, sync::Arc};
17use tracing::instrument;
18
19#[derive(Clone)]
22pub struct Integration {
23 pub config: IntegrationConfig,
24}
25
26impl Integration {
27 #[instrument(skip(config, storage, secrets), fields(i, nm), err)]
28 pub fn new(
29 config: IntegrationConfig,
30 storage: Arc<Storage>,
31 secrets: &BTreeMap<String, &Secret>,
32 ) -> Result<Integration, Box<dyn Error>> {
33 tracing::Span::current().record("i", config.idx);
34 tracing::Span::current().record("nm", tracing::field::display(&config.name));
35
36 let mut usable_config = config.clone();
37
38 if let Some(secret_names) = &config.secrets {
39 for secret_name in secret_names {
40 if let Some(secret) = secrets.get(secret_name) {
41 match secret.visibility {
42 ordinary_config::SecretVisibility::Integrations => {
43 let val = match &secret.source {
44 ordinary_config::SecretSource::Env(name) => std::env::var(name)?,
45 ordinary_config::SecretSource::Stored(name) => {
46 storage.secrets.retrieve_secret(name)?
47 }
48 };
49
50 let mut str_config = serde_json::to_string(&config)?;
52 str_config =
53 str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
54 usable_config = serde_json::from_str(&str_config)?;
55 }
56 }
57 }
58 }
59 }
60
61 Ok(Integration {
62 config: usable_config,
63 })
64 }
65
66 #[instrument(skip(self, payload), fields(i, nm), err)]
67 pub fn invoke(&self, payload: &[u8]) -> Result<Bytes, Box<dyn Error>> {
68 tracing::Span::current().record("i", self.config.idx);
69 tracing::Span::current().record("nm", tracing::field::display(&self.config.name));
70
71 match &self.config.protocol {
72 IntegrationProtocol::Http {
73 method,
74 headers,
75 send_encoding,
76 recv_encoding,
77 } => {
78 let root = flexbuffers::Reader::get_root(payload)?;
79
80 let body = match send_encoding {
81 ordinary_config::IntegrationProtocolHttpEncoding::Json => {
82 flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
83 .to_string()
84 }
85 ordinary_config::IntegrationProtocolHttpEncoding::Text => {
86 flexbuffer_reader_to_string(&self.config.send, &root.as_vector().idx(0))?
87 }
88 };
89
90 let res = self.http(method, headers, body)?;
91
92 let mut builder =
93 flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
94 let mut builder_vec = builder.start_vector();
95
96 match recv_encoding {
97 ordinary_config::IntegrationProtocolHttpEncoding::Json => {
98 json_to_flexbuffer_vec(&self.config.recv, &res.json()?, &mut builder_vec)?;
99 }
100 ordinary_config::IntegrationProtocolHttpEncoding::Text => {
101 builder_vec.push(res.text()?.as_str());
102 }
103 }
104
105 builder_vec.end_vector();
106
107 Ok(Bytes::copy_from_slice(builder.view()))
108 } }
115 }
116
117 #[instrument(skip(self, body, headers), fields(endpoint), err)]
118 fn http(
119 &self,
120 method: &String,
121 headers: &Vec<(String, String)>,
122 body: String,
123 ) -> Result<Response, Box<dyn Error>> {
124 tracing::Span::current().record("endpoint", &self.config.endpoint);
125
126 let client = reqwest::blocking::Client::new();
127
128 match method.as_str() {
129 "GET" => {
130 let mut req = client.get(self.config.endpoint.clone());
131
132 for (key, val) in headers {
133 req = req.header(key, val);
134 }
135
136 let res = req.send()?;
137
138 Ok(res)
139 }
140 "POST" => {
141 let mut req = client.post(self.config.endpoint.clone()).body(body);
142
143 for (key, val) in headers {
144 req = req.header(key, val);
145 }
146
147 let res = req.send()?;
148
149 Ok(res)
150 }
151 "PUT" => {
152 let mut req = client.put(self.config.endpoint.clone()).body(body);
153
154 for (key, val) in headers {
155 req = req.header(key, val);
156 }
157
158 let res = req.send()?;
159
160 Ok(res)
161 }
162 "PATCH" => {
163 let mut req = client.patch(self.config.endpoint.clone()).body(body);
164
165 for (key, val) in headers {
166 req = req.header(key, val);
167 }
168
169 let res = req.send()?;
170
171 Ok(res)
172 }
173 "DELETE" => {
174 let mut req = client.delete(self.config.endpoint.clone()).body(body);
175
176 for (key, val) in headers {
177 req = req.header(key, val);
178 }
179
180 let res = req.send()?;
181
182 Ok(res)
183 }
184 _ => Err("invalid method".into()),
185 }
186 }
187}