ordinary-integration 0.6.0-pre.2

Integration for Ordinary
Documentation
#![doc = include_str!("../README.md")]
#![warn(clippy::all, clippy::pedantic)]
#![allow(clippy::missing_errors_doc)]

// Copyright (C) 2026 Ordinary Labs, LLC.
//
// SPDX-License-Identifier: AGPL-3.0-only

use bytes::Bytes;
use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
use ordinary_storage::Storage;
use ordinary_types::{
    flexbuffer_reader_to_json, flexbuffer_reader_to_string, json_to_flexbuffer_vec,
};
use reqwest::blocking::Response;
use std::{collections::BTreeMap, error::Error, sync::Arc};
use tracing::instrument;

// for gRPC: https://docs.rs/protobuf/latest/protobuf/reflect/

#[derive(Clone)]
pub struct Integration {
    pub config: IntegrationConfig,
}

impl Integration {
    #[instrument(skip(config, storage, secrets), fields(i, nm), err)]
    pub fn new(
        config: IntegrationConfig,
        storage: Arc<Storage>,
        secrets: &BTreeMap<String, &Secret>,
    ) -> Result<Integration, Box<dyn Error>> {
        tracing::Span::current().record("i", config.idx);
        tracing::Span::current().record("nm", tracing::field::display(&config.name));

        let mut usable_config = config.clone();

        if let Some(secret_names) = &config.secrets {
            for secret_name in secret_names {
                if let Some(secret) = secrets.get(secret_name) {
                    match secret.visibility {
                        ordinary_config::SecretVisibility::Integrations => {
                            let val = match &secret.source {
                                ordinary_config::SecretSource::Env(name) => std::env::var(name)?,
                                ordinary_config::SecretSource::Stored(name) => {
                                    storage.secrets.retrieve_secret(name)?
                                }
                            };

                            // ?? probably the silliest way to go about this
                            let mut str_config = serde_json::to_string(&config)?;
                            str_config =
                                str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
                            usable_config = serde_json::from_str(&str_config)?;
                        }
                    }
                }
            }
        }

        Ok(Integration {
            config: usable_config,
        })
    }

    #[instrument(skip(self, payload), fields(i, nm), err)]
    pub fn invoke(&self, payload: &[u8]) -> Result<Bytes, Box<dyn Error>> {
        tracing::Span::current().record("i", self.config.idx);
        tracing::Span::current().record("nm", tracing::field::display(&self.config.name));

        match &self.config.protocol {
            IntegrationProtocol::Http {
                method,
                headers,
                send_encoding,
                recv_encoding,
            } => {
                let root = flexbuffers::Reader::get_root(payload)?;

                let body = match send_encoding {
                    ordinary_config::IntegrationProtocolHttpEncoding::Json => {
                        flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
                            .to_string()
                    }
                    ordinary_config::IntegrationProtocolHttpEncoding::Text => {
                        flexbuffer_reader_to_string(&self.config.send, &root.as_vector().idx(0))?
                    }
                };

                let res = self.http(method, headers, body)?;

                let mut builder =
                    flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
                let mut builder_vec = builder.start_vector();

                match recv_encoding {
                    ordinary_config::IntegrationProtocolHttpEncoding::Json => {
                        json_to_flexbuffer_vec(&self.config.recv, &res.json()?, &mut builder_vec)?;
                    }
                    ordinary_config::IntegrationProtocolHttpEncoding::Text => {
                        builder_vec.push(res.text()?.as_str());
                    }
                }

                builder_vec.end_vector();

                Ok(Bytes::copy_from_slice(builder.view()))
            } // IntegrationProtocol::Grpc { metadata: _ } => Ok(Bytes::new()),
              // IntegrationProtocol::CapnProto => Ok(Bytes::new()),
              // IntegrationProtocol::GraphQL => Ok(Bytes::new()),
              // IntegrationProtocol::Postgres { statement: _ } => Ok(Bytes::new()),
              // IntegrationProtocol::OpenSearch => Ok(Bytes::new()),
              // IntegrationProtocol::Smtp => Ok(Bytes::new()),
        }
    }

    #[instrument(skip(self, body, headers), fields(endpoint), err)]
    fn http(
        &self,
        method: &String,
        headers: &Vec<(String, String)>,
        body: String,
    ) -> Result<Response, Box<dyn Error>> {
        tracing::Span::current().record("endpoint", &self.config.endpoint);

        let client = reqwest::blocking::Client::new();

        match method.as_str() {
            "GET" => {
                let mut req = client.get(self.config.endpoint.clone());

                for (key, val) in headers {
                    req = req.header(key, val);
                }

                let res = req.send()?;

                Ok(res)
            }
            "POST" => {
                let mut req = client.post(self.config.endpoint.clone()).body(body);

                for (key, val) in headers {
                    req = req.header(key, val);
                }

                let res = req.send()?;

                Ok(res)
            }
            "PUT" => {
                let mut req = client.put(self.config.endpoint.clone()).body(body);

                for (key, val) in headers {
                    req = req.header(key, val);
                }

                let res = req.send()?;

                Ok(res)
            }
            "PATCH" => {
                let mut req = client.patch(self.config.endpoint.clone()).body(body);

                for (key, val) in headers {
                    req = req.header(key, val);
                }

                let res = req.send()?;

                Ok(res)
            }
            "DELETE" => {
                let mut req = client.delete(self.config.endpoint.clone()).body(body);

                for (key, val) in headers {
                    req = req.header(key, val);
                }

                let res = req.send()?;

                Ok(res)
            }
            _ => Err("invalid method".into()),
        }
    }
}