Skip to main content

ordinary_integration/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(clippy::all, clippy::pedantic)]
3#![allow(clippy::missing_errors_doc)]
4
5// Copyright (C) 2026 Ordinary Labs, LLC.
6//
7// SPDX-License-Identifier: AGPL-3.0-only
8
9use bytes::Bytes;
10use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
11use ordinary_storage::Storage;
12use ordinary_types::{
13    flexbuffer_reader_to_json, flexbuffer_reader_to_string, json_to_flexbuffer_vec,
14};
15use reqwest::blocking::Response;
16use std::{collections::BTreeMap, error::Error, sync::Arc};
17use tracing::instrument;
18
19// for gRPC: https://docs.rs/protobuf/latest/protobuf/reflect/
20
21#[derive(Clone)]
22pub struct Integration {
23    pub config: IntegrationConfig,
24}
25
26impl Integration {
27    #[instrument(skip(config, storage, secrets), fields(i, nm), err)]
28    pub fn new(
29        config: IntegrationConfig,
30        storage: Arc<Storage>,
31        secrets: &BTreeMap<String, &Secret>,
32    ) -> Result<Integration, Box<dyn Error>> {
33        tracing::Span::current().record("i", config.idx);
34        tracing::Span::current().record("nm", tracing::field::display(&config.name));
35
36        let mut usable_config = config.clone();
37
38        if let Some(secret_names) = &config.secrets {
39            for secret_name in secret_names {
40                if let Some(secret) = secrets.get(secret_name) {
41                    match secret.visibility {
42                        ordinary_config::SecretVisibility::Integrations => {
43                            let val = match &secret.source {
44                                ordinary_config::SecretSource::Env(name) => std::env::var(name)?,
45                                ordinary_config::SecretSource::Stored(name) => {
46                                    storage.secrets.retrieve_secret(name)?
47                                }
48                            };
49
50                            // ?? probably the silliest way to go about this
51                            let mut str_config = serde_json::to_string(&config)?;
52                            str_config =
53                                str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
54                            usable_config = serde_json::from_str(&str_config)?;
55                        }
56                    }
57                }
58            }
59        }
60
61        Ok(Integration {
62            config: usable_config,
63        })
64    }
65
66    #[instrument(skip(self, payload), fields(i, nm), err)]
67    pub fn invoke(&self, payload: &[u8]) -> Result<Bytes, Box<dyn Error>> {
68        tracing::Span::current().record("i", self.config.idx);
69        tracing::Span::current().record("nm", tracing::field::display(&self.config.name));
70
71        match &self.config.protocol {
72            IntegrationProtocol::Http {
73                method,
74                headers,
75                send_encoding,
76                recv_encoding,
77            } => {
78                let root = flexbuffers::Reader::get_root(payload)?;
79
80                let body = match send_encoding {
81                    ordinary_config::IntegrationProtocolHttpEncoding::Json => {
82                        flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
83                            .to_string()
84                    }
85                    ordinary_config::IntegrationProtocolHttpEncoding::Text => {
86                        flexbuffer_reader_to_string(&self.config.send, &root.as_vector().idx(0))?
87                    }
88                };
89
90                let res = self.http(method, headers, body)?;
91
92                let mut builder =
93                    flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
94                let mut builder_vec = builder.start_vector();
95
96                match recv_encoding {
97                    ordinary_config::IntegrationProtocolHttpEncoding::Json => {
98                        json_to_flexbuffer_vec(&self.config.recv, &res.json()?, &mut builder_vec)?;
99                    }
100                    ordinary_config::IntegrationProtocolHttpEncoding::Text => {
101                        builder_vec.push(res.text()?.as_str());
102                    }
103                }
104
105                builder_vec.end_vector();
106
107                Ok(Bytes::copy_from_slice(builder.view()))
108            } // IntegrationProtocol::Grpc { metadata: _ } => Ok(Bytes::new()),
109              // IntegrationProtocol::CapnProto => Ok(Bytes::new()),
110              // IntegrationProtocol::GraphQL => Ok(Bytes::new()),
111              // IntegrationProtocol::Postgres { statement: _ } => Ok(Bytes::new()),
112              // IntegrationProtocol::OpenSearch => Ok(Bytes::new()),
113              // IntegrationProtocol::Smtp => Ok(Bytes::new()),
114        }
115    }
116
117    #[instrument(skip(self, body, headers), fields(endpoint), err)]
118    fn http(
119        &self,
120        method: &String,
121        headers: &Vec<(String, String)>,
122        body: String,
123    ) -> Result<Response, Box<dyn Error>> {
124        tracing::Span::current().record("endpoint", &self.config.endpoint);
125
126        let client = reqwest::blocking::Client::new();
127
128        match method.as_str() {
129            "GET" => {
130                let mut req = client.get(self.config.endpoint.clone());
131
132                for (key, val) in headers {
133                    req = req.header(key, val);
134                }
135
136                let res = req.send()?;
137
138                Ok(res)
139            }
140            "POST" => {
141                let mut req = client.post(self.config.endpoint.clone()).body(body);
142
143                for (key, val) in headers {
144                    req = req.header(key, val);
145                }
146
147                let res = req.send()?;
148
149                Ok(res)
150            }
151            "PUT" => {
152                let mut req = client.put(self.config.endpoint.clone()).body(body);
153
154                for (key, val) in headers {
155                    req = req.header(key, val);
156                }
157
158                let res = req.send()?;
159
160                Ok(res)
161            }
162            "PATCH" => {
163                let mut req = client.patch(self.config.endpoint.clone()).body(body);
164
165                for (key, val) in headers {
166                    req = req.header(key, val);
167                }
168
169                let res = req.send()?;
170
171                Ok(res)
172            }
173            "DELETE" => {
174                let mut req = client.delete(self.config.endpoint.clone()).body(body);
175
176                for (key, val) in headers {
177                    req = req.header(key, val);
178                }
179
180                let res = req.send()?;
181
182                Ok(res)
183            }
184            _ => Err("invalid method".into()),
185        }
186    }
187}