#![doc = include_str!("../README.md")]
#![warn(clippy::all, clippy::pedantic)]
#![allow(clippy::missing_errors_doc)]
use anyhow::bail;
use bytes::Bytes;
use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
use ordinary_storage::Storage;
use ordinary_types::{
Kind, flexbuffer_reader_to_json, flexbuffer_reader_to_path_component,
flexbuffer_reader_to_string, json_to_flexbuffer_vec,
};
use ordinary_utils::headers::{log_request, log_response};
use ordinary_utils::{SERVER, WrappedRedactedHashingAlg};
use reqwest::redirect::Policy;
use std::time::Instant;
use std::{collections::BTreeMap, sync::Arc};
use tokio::runtime::Handle;
use tracing::{Instrument, Span, instrument};
#[derive(Clone)]
pub struct Integration {
client: reqwest::Client,
sender: crossbeam_channel::Sender<(Bytes, oneshot::Sender<Bytes>, Span)>,
pub config: IntegrationConfig,
log_headers: bool,
redacted_hash: Arc<Option<WrappedRedactedHashingAlg>>,
}
impl Integration {
#[instrument(skip_all, fields(i, nm), err)]
pub fn new(
config: IntegrationConfig,
storage: Arc<Storage>,
secrets: &BTreeMap<String, &Secret>,
log_headers: bool,
redacted_hash: Arc<Option<WrappedRedactedHashingAlg>>,
) -> anyhow::Result<Integration> {
Span::current().record("i", config.idx);
Span::current().record("nm", tracing::field::display(&config.name));
let mut usable_config = config.clone();
if let Some(secret_names) = &config.secrets {
for secret_name in secret_names {
if let Some(secret) = secrets.get(secret_name) {
match secret.visibility {
ordinary_config::SecretVisibility::Integrations => {
let val = match &secret.source {
ordinary_config::SecretSource::Env => std::env::var(&secret.name)?,
ordinary_config::SecretSource::Stored => std::str::from_utf8(
storage.secrets.get(&secret.name)?.as_ref(),
)?
.to_string(),
};
let mut str_config = serde_json::to_string(&config)?;
str_config =
str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
usable_config = serde_json::from_str(&str_config)?;
}
}
}
}
}
let rt = Handle::try_current()?;
let client = reqwest::Client::builder()
.use_rustls_tls()
.brotli(true)
.zstd(true)
.deflate(true)
.gzip(true)
.redirect(Policy::none())
.user_agent(SERVER)
.build()?;
let (sender, receiver) = crossbeam_channel::bounded(128_000);
let integration = Integration {
client,
sender,
log_headers,
redacted_hash,
config: usable_config,
};
let integration_clone = Arc::new(integration.clone());
std::thread::spawn(move || {
while let Ok((payload, tx, span)) = receiver.recv() {
{
let integration = integration_clone.clone();
rt.spawn(async move {
async {
match integration.invoke(payload.as_ref()).await {
Ok(res) => {
if let Err(err) = tx.send(res) {
tracing::error!(%err);
}
}
Err(err) => {
tracing::error!(%err);
}
}
}
.instrument(span)
.await;
});
}
}
});
Ok(integration)
}
pub fn invoke_blocking(&self, payload: &[u8], span: Span) -> anyhow::Result<Bytes> {
let (tx, rx) = oneshot::channel();
self.sender
.send((Bytes::copy_from_slice(payload), tx, span))?;
let res = rx.recv()?;
Ok(res)
}
#[instrument(name = "int", skip_all, fields(i, nm), err)]
pub async fn invoke(&self, payload: &[u8]) -> anyhow::Result<Bytes> {
Span::current().record("i", self.config.idx);
Span::current().record("nm", tracing::field::display(&self.config.name));
match &self.config.protocol {
IntegrationProtocol::Http {
method,
headers,
send_encoding,
recv_encoding,
} => {
let root = flexbuffers::Reader::get_root(payload)?;
let body: Bytes = if method == "GET" {
Bytes::new()
} else {
match send_encoding {
ordinary_config::IntegrationProtocolHttpEncoding::None => {
Bytes::copy_from_slice(payload)
}
ordinary_config::IntegrationProtocolHttpEncoding::Json => {
flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
.to_string()
.into()
}
ordinary_config::IntegrationProtocolHttpEncoding::Text => {
flexbuffer_reader_to_string(
&self.config.send,
&root.as_vector().idx(0),
)?
.into()
}
}
};
let mut endpoint = self.config.endpoint.clone();
if endpoint.contains("{send}") {
endpoint = endpoint.replace(
"{send}",
&flexbuffer_reader_to_path_component(
&self.config.send,
&root.as_vector().idx(0),
)?,
);
}
if endpoint.contains("{send.")
&& let Kind::Object { name: _, fields } = &self.config.send
{
let send_vec = root.as_vector().idx(0).as_vector();
for field in fields {
endpoint = endpoint.replace(
&format!("{{send.{}}}", field.name),
&flexbuffer_reader_to_path_component(
&field.kind,
&send_vec.idx(field.idx as usize),
)?,
);
}
}
let res = self.http(&endpoint, method, headers, body).await?;
let mut builder =
flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
let mut builder_vec = builder.start_vector();
match recv_encoding {
ordinary_config::IntegrationProtocolHttpEncoding::None => {
builder_vec.push(flexbuffers::Blob(res.bytes().await?.as_ref()));
}
ordinary_config::IntegrationProtocolHttpEncoding::Json => {
json_to_flexbuffer_vec(
&self.config.recv,
&res.json().await?,
&mut builder_vec,
)?;
}
ordinary_config::IntegrationProtocolHttpEncoding::Text => {
builder_vec.push(res.text().await?.as_str());
}
}
builder_vec.end_vector();
Ok(Bytes::copy_from_slice(builder.view()))
} }
}
#[instrument(skip_all, fields(host, port, path, query), err)]
async fn http(
&self,
endpoint: &str,
method: &str,
headers: &Vec<(String, String)>,
body: Bytes,
) -> anyhow::Result<reqwest::Response> {
let start = Instant::now();
let mut req = match method {
"GET" => self.client.get(endpoint),
"POST" => {
let req = self.client.post(endpoint);
req.body(body)
}
"PUT" => {
let req = self.client.put(endpoint);
req.body(body)
}
"PATCH" => {
let req = self.client.patch(endpoint);
req.body(body)
}
"DELETE" => self.client.delete(endpoint),
_ => bail!("invalid method"),
};
for (key, val) in headers {
req = req.header(key.clone(), val);
}
let req = req.build()?;
let url = req.url();
Span::current().record("host", url.host().map(tracing::field::display));
Span::current().record("port", url.port().map(tracing::field::display));
Span::current().record("path", tracing::field::display(url.path()));
Span::current().record("query", url.query().map(tracing::field::display));
log_request(
self.log_headers,
req.headers(),
&self.redacted_hash,
req.method(),
);
let res = self.client.execute(req).await?;
log_response(
res.status().as_u16(),
self.log_headers,
&self.redacted_hash,
start,
res.headers(),
res.version(),
);
Ok(res)
}
}