#![doc = include_str!("../README.md")]
#![warn(clippy::all, clippy::pedantic)]
#![allow(clippy::missing_errors_doc)]
use bytes::Bytes;
use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
use ordinary_storage::Storage;
use ordinary_types::{
flexbuffer_reader_to_json, flexbuffer_reader_to_string, json_to_flexbuffer_vec,
};
use reqwest::blocking::Response;
use std::{collections::BTreeMap, error::Error, sync::Arc};
use tracing::instrument;
#[derive(Clone)]
pub struct Integration {
pub config: IntegrationConfig,
}
impl Integration {
#[instrument(skip(config, storage, secrets), fields(i, nm), err)]
pub fn new(
config: IntegrationConfig,
storage: Arc<Storage>,
secrets: &BTreeMap<String, &Secret>,
) -> Result<Integration, Box<dyn Error>> {
tracing::Span::current().record("i", config.idx);
tracing::Span::current().record("nm", tracing::field::display(&config.name));
let mut usable_config = config.clone();
if let Some(secret_names) = &config.secrets {
for secret_name in secret_names {
if let Some(secret) = secrets.get(secret_name) {
match secret.visibility {
ordinary_config::SecretVisibility::Integrations => {
let val = match &secret.source {
ordinary_config::SecretSource::Env(name) => std::env::var(name)?,
ordinary_config::SecretSource::Stored(name) => {
storage.secrets.retrieve_secret(name)?
}
};
let mut str_config = serde_json::to_string(&config)?;
str_config =
str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
usable_config = serde_json::from_str(&str_config)?;
}
}
}
}
}
Ok(Integration {
config: usable_config,
})
}
#[instrument(skip(self, payload), fields(i, nm), err)]
pub fn invoke(&self, payload: &[u8]) -> Result<Bytes, Box<dyn Error>> {
tracing::Span::current().record("i", self.config.idx);
tracing::Span::current().record("nm", tracing::field::display(&self.config.name));
match &self.config.protocol {
IntegrationProtocol::Http {
method,
headers,
send_encoding,
recv_encoding,
} => {
let root = flexbuffers::Reader::get_root(payload)?;
let body = match send_encoding {
ordinary_config::IntegrationProtocolHttpEncoding::Json => {
flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
.to_string()
}
ordinary_config::IntegrationProtocolHttpEncoding::Text => {
flexbuffer_reader_to_string(&self.config.send, &root.as_vector().idx(0))?
}
};
let res = self.http(method, headers, body)?;
let mut builder =
flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
let mut builder_vec = builder.start_vector();
match recv_encoding {
ordinary_config::IntegrationProtocolHttpEncoding::Json => {
json_to_flexbuffer_vec(&self.config.recv, &res.json()?, &mut builder_vec)?;
}
ordinary_config::IntegrationProtocolHttpEncoding::Text => {
builder_vec.push(res.text()?.as_str());
}
}
builder_vec.end_vector();
Ok(Bytes::copy_from_slice(builder.view()))
} }
}
#[instrument(skip(self, body, headers), fields(endpoint), err)]
fn http(
&self,
method: &String,
headers: &Vec<(String, String)>,
body: String,
) -> Result<Response, Box<dyn Error>> {
tracing::Span::current().record("endpoint", &self.config.endpoint);
let client = reqwest::blocking::Client::new();
match method.as_str() {
"GET" => {
let mut req = client.get(self.config.endpoint.clone());
for (key, val) in headers {
req = req.header(key, val);
}
let res = req.send()?;
Ok(res)
}
"POST" => {
let mut req = client.post(self.config.endpoint.clone()).body(body);
for (key, val) in headers {
req = req.header(key, val);
}
let res = req.send()?;
Ok(res)
}
"PUT" => {
let mut req = client.put(self.config.endpoint.clone()).body(body);
for (key, val) in headers {
req = req.header(key, val);
}
let res = req.send()?;
Ok(res)
}
"PATCH" => {
let mut req = client.patch(self.config.endpoint.clone()).body(body);
for (key, val) in headers {
req = req.header(key, val);
}
let res = req.send()?;
Ok(res)
}
"DELETE" => {
let mut req = client.delete(self.config.endpoint.clone()).body(body);
for (key, val) in headers {
req = req.header(key, val);
}
let res = req.send()?;
Ok(res)
}
_ => Err("invalid method".into()),
}
}
}