ordinary-integration 0.9.0

Integration for Ordinary
#![doc = include_str!("../README.md")]
#![warn(clippy::all, clippy::pedantic)]
#![allow(clippy::missing_errors_doc)]

// Copyright (C) 2026 Ordinary Labs, LLC.
//
// SPDX-License-Identifier: AGPL-3.0-only

use anyhow::bail;
use bytes::Bytes;
use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
use ordinary_storage::Storage;
use ordinary_types::{
    Kind, flexbuffer_reader_to_json, flexbuffer_reader_to_path_component,
    flexbuffer_reader_to_string, json_to_flexbuffer_vec,
};
use ordinary_utils::headers::{log_request, log_response};
use ordinary_utils::{SERVER, WrappedRedactedHashingAlg};
use reqwest::redirect::Policy;
use std::time::Instant;
use std::{collections::BTreeMap, sync::Arc};
use tokio::runtime::Handle;
use tracing::{Instrument, Span, instrument};
// for gRPC: https://docs.rs/protobuf/latest/protobuf/reflect/

#[derive(Clone)]
pub struct Integration {
    client: reqwest::Client,
    sender: crossbeam_channel::Sender<(Bytes, oneshot::Sender<Bytes>, Span)>,

    pub config: IntegrationConfig,

    log_headers: bool,
    redacted_hash: Arc<Option<WrappedRedactedHashingAlg>>,
}

impl Integration {
    #[instrument(skip_all, fields(i, nm), err)]
    pub fn new(
        config: IntegrationConfig,
        storage: Arc<Storage>,
        secrets: &BTreeMap<String, &Secret>,
        log_headers: bool,
        redacted_hash: Arc<Option<WrappedRedactedHashingAlg>>,
    ) -> anyhow::Result<Integration> {
        Span::current().record("i", config.idx);
        Span::current().record("nm", tracing::field::display(&config.name));

        let mut usable_config = config.clone();

        // todo: make it possible to update secrets without a restart
        if let Some(secret_names) = &config.secrets {
            for secret_name in secret_names {
                if let Some(secret) = secrets.get(secret_name) {
                    match secret.visibility {
                        ordinary_config::SecretVisibility::Integrations => {
                            let val = match &secret.source {
                                ordinary_config::SecretSource::Env => std::env::var(&secret.name)?,
                                ordinary_config::SecretSource::Stored => std::str::from_utf8(
                                    storage.secrets.get(&secret.name)?.as_ref(),
                                )?
                                .to_string(),
                            };

                            // ?? probably the silliest way to go about this
                            let mut str_config = serde_json::to_string(&config)?;
                            str_config =
                                str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
                            usable_config = serde_json::from_str(&str_config)?;
                        }
                    }
                }
            }
        }

        let rt = Handle::try_current()?;

        let client = reqwest::Client::builder()
            .use_rustls_tls()
            .brotli(true)
            .zstd(true)
            .deflate(true)
            .gzip(true)
            .redirect(Policy::none())
            .user_agent(SERVER)
            .build()?;

        let (sender, receiver) = crossbeam_channel::bounded(128_000);

        let integration = Integration {
            client,
            sender,

            log_headers,
            redacted_hash,

            config: usable_config,
        };

        let integration_clone = Arc::new(integration.clone());

        std::thread::spawn(move || {
            while let Ok((payload, tx, span)) = receiver.recv() {
                {
                    let integration = integration_clone.clone();

                    rt.spawn(async move {
                        async {
                            match integration.invoke(payload.as_ref()).await {
                                Ok(res) => {
                                    if let Err(err) = tx.send(res) {
                                        tracing::error!(%err);
                                    }
                                }
                                Err(err) => {
                                    tracing::error!(%err);
                                }
                            }
                        }
                        .instrument(span)
                        .await;
                    });
                }
            }
        });

        Ok(integration)
    }

    pub fn invoke_blocking(&self, payload: &[u8], span: Span) -> anyhow::Result<Bytes> {
        let (tx, rx) = oneshot::channel();

        self.sender
            .send((Bytes::copy_from_slice(payload), tx, span))?;

        let res = rx.recv()?;
        Ok(res)
    }

    #[instrument(name = "int", skip_all, fields(i, nm), err)]
    pub async fn invoke(&self, payload: &[u8]) -> anyhow::Result<Bytes> {
        Span::current().record("i", self.config.idx);
        Span::current().record("nm", tracing::field::display(&self.config.name));

        match &self.config.protocol {
            IntegrationProtocol::Http {
                method,
                headers,
                send_encoding,
                recv_encoding,
            } => {
                let root = flexbuffers::Reader::get_root(payload)?;

                let body: Bytes = if method == "GET" {
                    Bytes::new()
                } else {
                    match send_encoding {
                        ordinary_config::IntegrationProtocolHttpEncoding::None => {
                            Bytes::copy_from_slice(payload)
                        }
                        ordinary_config::IntegrationProtocolHttpEncoding::Json => {
                            flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
                                .to_string()
                                .into()
                        }
                        ordinary_config::IntegrationProtocolHttpEncoding::Text => {
                            flexbuffer_reader_to_string(
                                &self.config.send,
                                &root.as_vector().idx(0),
                            )?
                            .into()
                        }
                    }
                };

                let mut endpoint = self.config.endpoint.clone();

                if endpoint.contains("{send}") {
                    endpoint = endpoint.replace(
                        "{send}",
                        &flexbuffer_reader_to_path_component(
                            &self.config.send,
                            &root.as_vector().idx(0),
                        )?,
                    );
                }

                if endpoint.contains("{send.")
                    && let Kind::Object { name: _, fields } = &self.config.send
                {
                    let send_vec = root.as_vector().idx(0).as_vector();

                    for field in fields {
                        endpoint = endpoint.replace(
                            &format!("{{send.{}}}", field.name),
                            &flexbuffer_reader_to_path_component(
                                &field.kind,
                                &send_vec.idx(field.idx as usize),
                            )?,
                        );
                    }
                }

                let res = self.http(&endpoint, method, headers, body).await?;

                let mut builder =
                    flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
                let mut builder_vec = builder.start_vector();

                match recv_encoding {
                    ordinary_config::IntegrationProtocolHttpEncoding::None => {
                        builder_vec.push(flexbuffers::Blob(res.bytes().await?.as_ref()));
                    }
                    ordinary_config::IntegrationProtocolHttpEncoding::Json => {
                        json_to_flexbuffer_vec(
                            &self.config.recv,
                            &res.json().await?,
                            &mut builder_vec,
                        )?;
                    }
                    ordinary_config::IntegrationProtocolHttpEncoding::Text => {
                        builder_vec.push(res.text().await?.as_str());
                    }
                }

                builder_vec.end_vector();

                Ok(Bytes::copy_from_slice(builder.view()))
            } // IntegrationProtocol::Grpc { metadata: _ } => Ok(Bytes::new()),
              // IntegrationProtocol::CapnProto => Ok(Bytes::new()),
              // IntegrationProtocol::GraphQL => Ok(Bytes::new()),
              // IntegrationProtocol::Postgres { statement: _ } => Ok(Bytes::new()),
              // IntegrationProtocol::OpenSearch => Ok(Bytes::new()),
              // IntegrationProtocol::Smtp => Ok(Bytes::new()),
        }
    }

    #[instrument(skip_all, fields(host, port, path, query), err)]
    async fn http(
        &self,
        endpoint: &str,
        method: &str,
        headers: &Vec<(String, String)>,
        body: Bytes,
    ) -> anyhow::Result<reqwest::Response> {
        let start = Instant::now();

        let mut req = match method {
            "GET" => self.client.get(endpoint),
            "POST" => {
                let req = self.client.post(endpoint);
                req.body(body)
            }
            "PUT" => {
                let req = self.client.put(endpoint);
                req.body(body)
            }
            "PATCH" => {
                let req = self.client.patch(endpoint);
                req.body(body)
            }
            "DELETE" => self.client.delete(endpoint),
            _ => bail!("invalid method"),
        };

        for (key, val) in headers {
            req = req.header(key.clone(), val);
        }

        let req = req.build()?;
        let url = req.url();

        Span::current().record("host", url.host().map(tracing::field::display));
        Span::current().record("port", url.port().map(tracing::field::display));
        Span::current().record("path", tracing::field::display(url.path()));
        Span::current().record("query", url.query().map(tracing::field::display));

        log_request(
            self.log_headers,
            req.headers(),
            &self.redacted_hash,
            req.method(),
        );

        let res = self.client.execute(req).await?;

        log_response(
            res.status().as_u16(),
            self.log_headers,
            &self.redacted_hash,
            start,
            res.headers(),
            res.version(),
        );

        Ok(res)
    }
}