Skip to main content

ordinary_integration/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(clippy::all, clippy::pedantic)]
3#![allow(clippy::missing_errors_doc)]
4
5// Copyright (C) 2026 Ordinary Labs, LLC.
6//
7// SPDX-License-Identifier: AGPL-3.0-only
8
9use anyhow::anyhow;
10use bytes::Bytes;
11use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
12use ordinary_storage::Storage;
13use ordinary_types::{
14    flexbuffer_reader_to_json, flexbuffer_reader_to_string, json_to_flexbuffer_vec,
15};
16use reqwest::blocking::Response;
17use std::{collections::BTreeMap, sync::Arc};
18use tracing::instrument;
19
20// for gRPC: https://docs.rs/protobuf/latest/protobuf/reflect/
21
22#[derive(Clone)]
23pub struct Integration {
24    pub config: IntegrationConfig,
25}
26
27impl Integration {
28    #[instrument(skip(config, storage, secrets), fields(i, nm), err)]
29    pub fn new(
30        config: IntegrationConfig,
31        storage: Arc<Storage>,
32        secrets: &BTreeMap<String, &Secret>,
33    ) -> anyhow::Result<Integration> {
34        tracing::Span::current().record("i", config.idx);
35        tracing::Span::current().record("nm", tracing::field::display(&config.name));
36
37        let mut usable_config = config.clone();
38
39        if let Some(secret_names) = &config.secrets {
40            for secret_name in secret_names {
41                if let Some(secret) = secrets.get(secret_name) {
42                    match secret.visibility {
43                        ordinary_config::SecretVisibility::Integrations => {
44                            let val = match &secret.source {
45                                ordinary_config::SecretSource::Env(name) => std::env::var(name)?,
46                                ordinary_config::SecretSource::Stored(name) => {
47                                    storage.secrets.retrieve_secret(name)?
48                                }
49                            };
50
51                            // ?? probably the silliest way to go about this
52                            let mut str_config = serde_json::to_string(&config)?;
53                            str_config =
54                                str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
55                            usable_config = serde_json::from_str(&str_config)?;
56                        }
57                    }
58                }
59            }
60        }
61
62        Ok(Integration {
63            config: usable_config,
64        })
65    }
66
67    #[instrument(skip(self, payload), fields(i, nm), err)]
68    pub fn invoke(&self, payload: &[u8]) -> anyhow::Result<Bytes> {
69        tracing::Span::current().record("i", self.config.idx);
70        tracing::Span::current().record("nm", tracing::field::display(&self.config.name));
71
72        match &self.config.protocol {
73            IntegrationProtocol::Http {
74                method,
75                headers,
76                send_encoding,
77                recv_encoding,
78            } => {
79                let root = flexbuffers::Reader::get_root(payload)?;
80
81                let body = match send_encoding {
82                    ordinary_config::IntegrationProtocolHttpEncoding::Json => {
83                        flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
84                            .to_string()
85                    }
86                    ordinary_config::IntegrationProtocolHttpEncoding::Text => {
87                        flexbuffer_reader_to_string(&self.config.send, &root.as_vector().idx(0))?
88                    }
89                };
90
91                let res = self.http(method, headers, body)?;
92
93                let mut builder =
94                    flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
95                let mut builder_vec = builder.start_vector();
96
97                match recv_encoding {
98                    ordinary_config::IntegrationProtocolHttpEncoding::Json => {
99                        json_to_flexbuffer_vec(&self.config.recv, &res.json()?, &mut builder_vec)?;
100                    }
101                    ordinary_config::IntegrationProtocolHttpEncoding::Text => {
102                        builder_vec.push(res.text()?.as_str());
103                    }
104                }
105
106                builder_vec.end_vector();
107
108                Ok(Bytes::copy_from_slice(builder.view()))
109            } // IntegrationProtocol::Grpc { metadata: _ } => Ok(Bytes::new()),
110              // IntegrationProtocol::CapnProto => Ok(Bytes::new()),
111              // IntegrationProtocol::GraphQL => Ok(Bytes::new()),
112              // IntegrationProtocol::Postgres { statement: _ } => Ok(Bytes::new()),
113              // IntegrationProtocol::OpenSearch => Ok(Bytes::new()),
114              // IntegrationProtocol::Smtp => Ok(Bytes::new()),
115        }
116    }
117
118    #[instrument(skip(self, body, headers), fields(endpoint), err)]
119    fn http(
120        &self,
121        method: &String,
122        headers: &Vec<(String, String)>,
123        body: String,
124    ) -> anyhow::Result<Response> {
125        tracing::Span::current().record("endpoint", &self.config.endpoint);
126
127        let client = reqwest::blocking::Client::new();
128
129        match method.as_str() {
130            "GET" => {
131                let mut req = client.get(self.config.endpoint.clone());
132
133                for (key, val) in headers {
134                    req = req.header(key, val);
135                }
136
137                let res = req.send()?;
138
139                Ok(res)
140            }
141            "POST" => {
142                let mut req = client.post(self.config.endpoint.clone()).body(body);
143
144                for (key, val) in headers {
145                    req = req.header(key, val);
146                }
147
148                let res = req.send()?;
149
150                Ok(res)
151            }
152            "PUT" => {
153                let mut req = client.put(self.config.endpoint.clone()).body(body);
154
155                for (key, val) in headers {
156                    req = req.header(key, val);
157                }
158
159                let res = req.send()?;
160
161                Ok(res)
162            }
163            "PATCH" => {
164                let mut req = client.patch(self.config.endpoint.clone()).body(body);
165
166                for (key, val) in headers {
167                    req = req.header(key, val);
168                }
169
170                let res = req.send()?;
171
172                Ok(res)
173            }
174            "DELETE" => {
175                let mut req = client.delete(self.config.endpoint.clone()).body(body);
176
177                for (key, val) in headers {
178                    req = req.header(key, val);
179                }
180
181                let res = req.send()?;
182
183                Ok(res)
184            }
185            _ => Err(anyhow!("invalid method")),
186        }
187    }
188}