Skip to main content

ordinary_integration/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(clippy::all, clippy::pedantic)]
3#![allow(clippy::missing_errors_doc)]
4
5// Copyright (C) 2026 Ordinary Labs, LLC.
6//
7// SPDX-License-Identifier: AGPL-3.0-only
8
9use anyhow::anyhow;
10use bytes::Bytes;
11use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
12use ordinary_storage::Storage;
13use ordinary_types::{
14    Kind, flexbuffer_reader_to_json, flexbuffer_reader_to_path_component,
15    flexbuffer_reader_to_string, json_to_flexbuffer_vec,
16};
17use std::{collections::BTreeMap, sync::Arc};
18use tracing::instrument;
19use ureq::Body;
20// for gRPC: https://docs.rs/protobuf/latest/protobuf/reflect/
21
22#[derive(Clone)]
23pub struct Integration {
24    pub config: IntegrationConfig,
25}
26
27impl Integration {
28    #[instrument(skip(config, storage, secrets), fields(i, nm), err)]
29    pub fn new(
30        config: IntegrationConfig,
31        storage: Arc<Storage>,
32        secrets: &BTreeMap<String, &Secret>,
33    ) -> anyhow::Result<Integration> {
34        tracing::Span::current().record("i", config.idx);
35        tracing::Span::current().record("nm", tracing::field::display(&config.name));
36
37        let mut usable_config = config.clone();
38
39        // todo: make it possible to update secrets without a restart
40        if let Some(secret_names) = &config.secrets {
41            for secret_name in secret_names {
42                if let Some(secret) = secrets.get(secret_name) {
43                    match secret.visibility {
44                        ordinary_config::SecretVisibility::Integrations => {
45                            let val = match &secret.source {
46                                ordinary_config::SecretSource::Env => std::env::var(&secret.name)?,
47                                ordinary_config::SecretSource::Stored => std::str::from_utf8(
48                                    storage.secrets.get(&secret.name)?.as_ref(),
49                                )?
50                                .to_string(),
51                            };
52
53                            // ?? probably the silliest way to go about this
54                            let mut str_config = serde_json::to_string(&config)?;
55                            str_config =
56                                str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
57                            usable_config = serde_json::from_str(&str_config)?;
58                        }
59                    }
60                }
61            }
62        }
63
64        Ok(Integration {
65            config: usable_config,
66        })
67    }
68
69    #[instrument(skip(self, payload), fields(i, nm), err)]
70    pub fn invoke(&self, payload: &[u8]) -> anyhow::Result<Bytes> {
71        tracing::Span::current().record("i", self.config.idx);
72        tracing::Span::current().record("nm", tracing::field::display(&self.config.name));
73
74        match &self.config.protocol {
75            IntegrationProtocol::Http {
76                method,
77                headers,
78                send_encoding,
79                recv_encoding,
80            } => {
81                let root = flexbuffers::Reader::get_root(payload)?;
82
83                let body: Bytes = if method == "GET" {
84                    Bytes::new()
85                } else {
86                    match send_encoding {
87                        ordinary_config::IntegrationProtocolHttpEncoding::None => {
88                            Bytes::copy_from_slice(payload)
89                        }
90                        ordinary_config::IntegrationProtocolHttpEncoding::Json => {
91                            flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
92                                .to_string()
93                                .into()
94                        }
95                        ordinary_config::IntegrationProtocolHttpEncoding::Text => {
96                            flexbuffer_reader_to_string(
97                                &self.config.send,
98                                &root.as_vector().idx(0),
99                            )?
100                            .into()
101                        }
102                    }
103                };
104
105                let mut endpoint = self.config.endpoint.clone();
106
107                if endpoint.contains("{send}") {
108                    endpoint = endpoint.replace(
109                        "{send}",
110                        &flexbuffer_reader_to_path_component(
111                            &self.config.send,
112                            &root.as_vector().idx(0),
113                        )?,
114                    );
115                }
116
117                if endpoint.contains("{send.")
118                    && let Kind::Object { name: _, fields } = &self.config.send
119                {
120                    let send_vec = root.as_vector().idx(0).as_vector();
121
122                    for field in fields {
123                        endpoint = endpoint.replace(
124                            &format!("{{send.{}}}", field.name),
125                            &flexbuffer_reader_to_path_component(
126                                &field.kind,
127                                &send_vec.idx(field.idx as usize),
128                            )?,
129                        );
130                    }
131                }
132
133                let mut res = Self::http(&endpoint, method, headers, body)?;
134
135                let mut builder =
136                    flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
137                let mut builder_vec = builder.start_vector();
138
139                match recv_encoding {
140                    ordinary_config::IntegrationProtocolHttpEncoding::None => {
141                        builder_vec.push(flexbuffers::Blob(res.body_mut().read_to_vec()?.as_ref()));
142                    }
143                    ordinary_config::IntegrationProtocolHttpEncoding::Json => {
144                        json_to_flexbuffer_vec(
145                            &self.config.recv,
146                            &res.body_mut().read_json::<serde_json::Value>()?,
147                            &mut builder_vec,
148                        )?;
149                    }
150                    ordinary_config::IntegrationProtocolHttpEncoding::Text => {
151                        builder_vec.push(res.body_mut().read_to_string()?.as_str());
152                    }
153                }
154
155                builder_vec.end_vector();
156
157                Ok(Bytes::copy_from_slice(builder.view()))
158            } // IntegrationProtocol::Grpc { metadata: _ } => Ok(Bytes::new()),
159              // IntegrationProtocol::CapnProto => Ok(Bytes::new()),
160              // IntegrationProtocol::GraphQL => Ok(Bytes::new()),
161              // IntegrationProtocol::Postgres { statement: _ } => Ok(Bytes::new()),
162              // IntegrationProtocol::OpenSearch => Ok(Bytes::new()),
163              // IntegrationProtocol::Smtp => Ok(Bytes::new()),
164        }
165    }
166
167    #[instrument(skip(body, headers), err)]
168    fn http(
169        endpoint: &String,
170        method: &String,
171        headers: &Vec<(String, String)>,
172        body: Bytes,
173    ) -> anyhow::Result<http::Response<Body>> {
174        // todo: switch back to reqwest when actions are async
175
176        match method.as_str() {
177            "GET" => {
178                let mut req = ureq::get(endpoint);
179
180                for (key, val) in headers {
181                    req = req.header(key, val);
182                }
183
184                let res = req.call()?;
185
186                Ok(res)
187            }
188            "POST" => {
189                let mut req = ureq::post(endpoint);
190
191                for (key, val) in headers {
192                    req = req.header(key, val);
193                }
194
195                let res = req.send(body.as_ref())?;
196
197                Ok(res)
198            }
199            "PUT" => {
200                let mut req = ureq::put(endpoint);
201
202                for (key, val) in headers {
203                    req = req.header(key, val);
204                }
205
206                let res = req.send(body.as_ref())?;
207
208                Ok(res)
209            }
210            "PATCH" => {
211                let mut req = ureq::patch(endpoint);
212
213                for (key, val) in headers {
214                    req = req.header(key, val);
215                }
216
217                let res = req.send(body.as_ref())?;
218
219                Ok(res)
220            }
221            "DELETE" => {
222                let mut req = ureq::delete(endpoint);
223
224                for (key, val) in headers {
225                    req = req.header(key, val);
226                }
227
228                let res = req.call()?;
229
230                Ok(res)
231            }
232            _ => Err(anyhow!("invalid method")),
233        }
234    }
235}