ordinary_integration/
lib.rs1#![doc = include_str!("../README.md")]
2#![warn(clippy::all, clippy::pedantic)]
3#![allow(clippy::missing_errors_doc)]
4
5use anyhow::anyhow;
10use bytes::Bytes;
11use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
12use ordinary_storage::Storage;
13use ordinary_types::{
14 flexbuffer_reader_to_json, flexbuffer_reader_to_string, json_to_flexbuffer_vec,
15};
16use reqwest::blocking::Response;
17use std::{collections::BTreeMap, sync::Arc};
18use tracing::instrument;
19
20#[derive(Clone)]
23pub struct Integration {
24 pub config: IntegrationConfig,
25}
26
27impl Integration {
28 #[instrument(skip(config, storage, secrets), fields(i, nm), err)]
29 pub fn new(
30 config: IntegrationConfig,
31 storage: Arc<Storage>,
32 secrets: &BTreeMap<String, &Secret>,
33 ) -> anyhow::Result<Integration> {
34 tracing::Span::current().record("i", config.idx);
35 tracing::Span::current().record("nm", tracing::field::display(&config.name));
36
37 let mut usable_config = config.clone();
38
39 if let Some(secret_names) = &config.secrets {
40 for secret_name in secret_names {
41 if let Some(secret) = secrets.get(secret_name) {
42 match secret.visibility {
43 ordinary_config::SecretVisibility::Integrations => {
44 let val = match &secret.source {
45 ordinary_config::SecretSource::Env(name) => std::env::var(name)?,
46 ordinary_config::SecretSource::Stored(name) => {
47 storage.secrets.retrieve_secret(name)?
48 }
49 };
50
51 let mut str_config = serde_json::to_string(&config)?;
53 str_config =
54 str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
55 usable_config = serde_json::from_str(&str_config)?;
56 }
57 }
58 }
59 }
60 }
61
62 Ok(Integration {
63 config: usable_config,
64 })
65 }
66
67 #[instrument(skip(self, payload), fields(i, nm), err)]
68 pub fn invoke(&self, payload: &[u8]) -> anyhow::Result<Bytes> {
69 tracing::Span::current().record("i", self.config.idx);
70 tracing::Span::current().record("nm", tracing::field::display(&self.config.name));
71
72 match &self.config.protocol {
73 IntegrationProtocol::Http {
74 method,
75 headers,
76 send_encoding,
77 recv_encoding,
78 } => {
79 let root = flexbuffers::Reader::get_root(payload)?;
80
81 let body = match send_encoding {
82 ordinary_config::IntegrationProtocolHttpEncoding::Json => {
83 flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
84 .to_string()
85 }
86 ordinary_config::IntegrationProtocolHttpEncoding::Text => {
87 flexbuffer_reader_to_string(&self.config.send, &root.as_vector().idx(0))?
88 }
89 };
90
91 let res = self.http(method, headers, body)?;
92
93 let mut builder =
94 flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
95 let mut builder_vec = builder.start_vector();
96
97 match recv_encoding {
98 ordinary_config::IntegrationProtocolHttpEncoding::Json => {
99 json_to_flexbuffer_vec(&self.config.recv, &res.json()?, &mut builder_vec)?;
100 }
101 ordinary_config::IntegrationProtocolHttpEncoding::Text => {
102 builder_vec.push(res.text()?.as_str());
103 }
104 }
105
106 builder_vec.end_vector();
107
108 Ok(Bytes::copy_from_slice(builder.view()))
109 } }
116 }
117
118 #[instrument(skip(self, body, headers), fields(endpoint), err)]
119 fn http(
120 &self,
121 method: &String,
122 headers: &Vec<(String, String)>,
123 body: String,
124 ) -> anyhow::Result<Response> {
125 tracing::Span::current().record("endpoint", &self.config.endpoint);
126
127 let client = reqwest::blocking::Client::new();
128
129 match method.as_str() {
130 "GET" => {
131 let mut req = client.get(self.config.endpoint.clone());
132
133 for (key, val) in headers {
134 req = req.header(key, val);
135 }
136
137 let res = req.send()?;
138
139 Ok(res)
140 }
141 "POST" => {
142 let mut req = client.post(self.config.endpoint.clone()).body(body);
143
144 for (key, val) in headers {
145 req = req.header(key, val);
146 }
147
148 let res = req.send()?;
149
150 Ok(res)
151 }
152 "PUT" => {
153 let mut req = client.put(self.config.endpoint.clone()).body(body);
154
155 for (key, val) in headers {
156 req = req.header(key, val);
157 }
158
159 let res = req.send()?;
160
161 Ok(res)
162 }
163 "PATCH" => {
164 let mut req = client.patch(self.config.endpoint.clone()).body(body);
165
166 for (key, val) in headers {
167 req = req.header(key, val);
168 }
169
170 let res = req.send()?;
171
172 Ok(res)
173 }
174 "DELETE" => {
175 let mut req = client.delete(self.config.endpoint.clone()).body(body);
176
177 for (key, val) in headers {
178 req = req.header(key, val);
179 }
180
181 let res = req.send()?;
182
183 Ok(res)
184 }
185 _ => Err(anyhow!("invalid method")),
186 }
187 }
188}