#![doc = include_str!("../README.md")]
#![warn(clippy::all, clippy::pedantic)]
#![allow(clippy::missing_errors_doc)]
use anyhow::anyhow;
use bytes::Bytes;
use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
use ordinary_storage::Storage;
use ordinary_types::{
Kind, flexbuffer_reader_to_json, flexbuffer_reader_to_path_component,
flexbuffer_reader_to_string, json_to_flexbuffer_vec,
};
use std::{collections::BTreeMap, sync::Arc};
use tracing::instrument;
use ureq::Body;
#[derive(Clone)]
pub struct Integration {
pub config: IntegrationConfig,
}
impl Integration {
#[instrument(skip_all, fields(i, nm), err)]
pub fn new(
config: IntegrationConfig,
storage: Arc<Storage>,
secrets: &BTreeMap<String, &Secret>,
) -> anyhow::Result<Integration> {
tracing::Span::current().record("i", config.idx);
tracing::Span::current().record("nm", tracing::field::display(&config.name));
let mut usable_config = config.clone();
if let Some(secret_names) = &config.secrets {
for secret_name in secret_names {
if let Some(secret) = secrets.get(secret_name) {
match secret.visibility {
ordinary_config::SecretVisibility::Integrations => {
let val = match &secret.source {
ordinary_config::SecretSource::Env => std::env::var(&secret.name)?,
ordinary_config::SecretSource::Stored => std::str::from_utf8(
storage.secrets.get(&secret.name)?.as_ref(),
)?
.to_string(),
};
let mut str_config = serde_json::to_string(&config)?;
str_config =
str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
usable_config = serde_json::from_str(&str_config)?;
}
}
}
}
}
Ok(Integration {
config: usable_config,
})
}
#[instrument(skip_all, fields(i, nm), err)]
pub fn invoke(&self, payload: &[u8]) -> anyhow::Result<Bytes> {
tracing::Span::current().record("i", self.config.idx);
tracing::Span::current().record("nm", tracing::field::display(&self.config.name));
match &self.config.protocol {
IntegrationProtocol::Http {
method,
headers,
send_encoding,
recv_encoding,
} => {
let root = flexbuffers::Reader::get_root(payload)?;
let body: Bytes = if method == "GET" {
Bytes::new()
} else {
match send_encoding {
ordinary_config::IntegrationProtocolHttpEncoding::None => {
Bytes::copy_from_slice(payload)
}
ordinary_config::IntegrationProtocolHttpEncoding::Json => {
flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
.to_string()
.into()
}
ordinary_config::IntegrationProtocolHttpEncoding::Text => {
flexbuffer_reader_to_string(
&self.config.send,
&root.as_vector().idx(0),
)?
.into()
}
}
};
let mut endpoint = self.config.endpoint.clone();
if endpoint.contains("{send}") {
endpoint = endpoint.replace(
"{send}",
&flexbuffer_reader_to_path_component(
&self.config.send,
&root.as_vector().idx(0),
)?,
);
}
if endpoint.contains("{send.")
&& let Kind::Object { name: _, fields } = &self.config.send
{
let send_vec = root.as_vector().idx(0).as_vector();
for field in fields {
endpoint = endpoint.replace(
&format!("{{send.{}}}", field.name),
&flexbuffer_reader_to_path_component(
&field.kind,
&send_vec.idx(field.idx as usize),
)?,
);
}
}
let mut res = Self::http(&endpoint, method, headers, body)?;
let mut builder =
flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
let mut builder_vec = builder.start_vector();
match recv_encoding {
ordinary_config::IntegrationProtocolHttpEncoding::None => {
builder_vec.push(flexbuffers::Blob(res.body_mut().read_to_vec()?.as_ref()));
}
ordinary_config::IntegrationProtocolHttpEncoding::Json => {
json_to_flexbuffer_vec(
&self.config.recv,
&res.body_mut().read_json::<serde_json::Value>()?,
&mut builder_vec,
)?;
}
ordinary_config::IntegrationProtocolHttpEncoding::Text => {
builder_vec.push(res.body_mut().read_to_string()?.as_str());
}
}
builder_vec.end_vector();
Ok(Bytes::copy_from_slice(builder.view()))
} }
}
#[instrument(skip(body, headers), err)]
fn http(
endpoint: &String,
method: &String,
headers: &Vec<(String, String)>,
body: Bytes,
) -> anyhow::Result<http::Response<Body>> {
match method.as_str() {
"GET" => {
let mut req = ureq::get(endpoint);
for (key, val) in headers {
req = req.header(key, val);
}
let res = req.call()?;
Ok(res)
}
"POST" => {
let mut req = ureq::post(endpoint);
for (key, val) in headers {
req = req.header(key, val);
}
let res = req.send(body.as_ref())?;
Ok(res)
}
"PUT" => {
let mut req = ureq::put(endpoint);
for (key, val) in headers {
req = req.header(key, val);
}
let res = req.send(body.as_ref())?;
Ok(res)
}
"PATCH" => {
let mut req = ureq::patch(endpoint);
for (key, val) in headers {
req = req.header(key, val);
}
let res = req.send(body.as_ref())?;
Ok(res)
}
"DELETE" => {
let mut req = ureq::delete(endpoint);
for (key, val) in headers {
req = req.header(key, val);
}
let res = req.call()?;
Ok(res)
}
_ => Err(anyhow!("invalid method")),
}
}
}