Skip to main content

ordinary_integration/
lib.rs

1#![doc = include_str!("../README.md")]
2#![warn(clippy::all, clippy::pedantic)]
3#![allow(clippy::missing_errors_doc)]
4
5// Copyright (C) 2026 Ordinary Labs, LLC.
6//
7// SPDX-License-Identifier: AGPL-3.0-only
8
9use anyhow::bail;
10use bytes::Bytes;
11use ordinary_config::{IntegrationConfig, IntegrationProtocol, Secret};
12use ordinary_storage::Storage;
13use ordinary_types::{
14    Kind, flexbuffer_reader_to_json, flexbuffer_reader_to_path_component,
15    flexbuffer_reader_to_string, json_to_flexbuffer_vec,
16};
17use ordinary_utils::headers::{log_request, log_response};
18use ordinary_utils::{SERVER, WrappedRedactedHashingAlg};
19use reqwest::redirect::Policy;
20use std::time::Instant;
21use std::{collections::BTreeMap, sync::Arc};
22use tokio::runtime::Handle;
23use tracing::{Instrument, Span, instrument};
24// for gRPC: https://docs.rs/protobuf/latest/protobuf/reflect/
25
26#[derive(Clone)]
27pub struct Integration {
28    client: reqwest::Client,
29    sender: crossbeam_channel::Sender<(Bytes, oneshot::Sender<Bytes>, Span)>,
30
31    pub config: IntegrationConfig,
32
33    log_headers: bool,
34    redacted_hash: Arc<Option<WrappedRedactedHashingAlg>>,
35}
36
37impl Integration {
38    #[instrument(skip_all, fields(i, nm), err)]
39    pub fn new(
40        config: IntegrationConfig,
41        storage: Arc<Storage>,
42        secrets: &BTreeMap<String, &Secret>,
43        log_headers: bool,
44        redacted_hash: Arc<Option<WrappedRedactedHashingAlg>>,
45    ) -> anyhow::Result<Integration> {
46        Span::current().record("i", config.idx);
47        Span::current().record("nm", tracing::field::display(&config.name));
48
49        let mut usable_config = config.clone();
50
51        // todo: make it possible to update secrets without a restart
52        if let Some(secret_names) = &config.secrets {
53            for secret_name in secret_names {
54                if let Some(secret) = secrets.get(secret_name) {
55                    match secret.visibility {
56                        ordinary_config::SecretVisibility::Integrations => {
57                            let val = match &secret.source {
58                                ordinary_config::SecretSource::Env => std::env::var(&secret.name)?,
59                                ordinary_config::SecretSource::Stored => std::str::from_utf8(
60                                    storage.secrets.get(&secret.name)?.as_ref(),
61                                )?
62                                .to_string(),
63                            };
64
65                            // ?? probably the silliest way to go about this
66                            let mut str_config = serde_json::to_string(&config)?;
67                            str_config =
68                                str_config.replace(&format!("{{{{{}}}}}", secret.name), &val);
69                            usable_config = serde_json::from_str(&str_config)?;
70                        }
71                    }
72                }
73            }
74        }
75
76        let rt = Handle::try_current()?;
77
78        let client = reqwest::Client::builder()
79            .use_rustls_tls()
80            .brotli(true)
81            .zstd(true)
82            .deflate(true)
83            .gzip(true)
84            .redirect(Policy::none())
85            .user_agent(SERVER)
86            .build()?;
87
88        let (sender, receiver) = crossbeam_channel::bounded(128_000);
89
90        let integration = Integration {
91            client,
92            sender,
93
94            log_headers,
95            redacted_hash,
96
97            config: usable_config,
98        };
99
100        let integration_clone = Arc::new(integration.clone());
101
102        std::thread::spawn(move || {
103            while let Ok((payload, tx, span)) = receiver.recv() {
104                {
105                    let integration = integration_clone.clone();
106
107                    rt.spawn(async move {
108                        async {
109                            match integration.invoke(payload.as_ref()).await {
110                                Ok(res) => {
111                                    if let Err(err) = tx.send(res) {
112                                        tracing::error!(%err);
113                                    }
114                                }
115                                Err(err) => {
116                                    tracing::error!(%err);
117                                }
118                            }
119                        }
120                        .instrument(span)
121                        .await;
122                    });
123                }
124            }
125        });
126
127        Ok(integration)
128    }
129
130    pub fn invoke_blocking(&self, payload: &[u8], span: Span) -> anyhow::Result<Bytes> {
131        let (tx, rx) = oneshot::channel();
132
133        self.sender
134            .send((Bytes::copy_from_slice(payload), tx, span))?;
135
136        let res = rx.recv()?;
137        Ok(res)
138    }
139
140    #[instrument(name = "int", skip_all, fields(i, nm), err)]
141    pub async fn invoke(&self, payload: &[u8]) -> anyhow::Result<Bytes> {
142        Span::current().record("i", self.config.idx);
143        Span::current().record("nm", tracing::field::display(&self.config.name));
144
145        match &self.config.protocol {
146            IntegrationProtocol::Http {
147                method,
148                headers,
149                send_encoding,
150                recv_encoding,
151            } => {
152                let root = flexbuffers::Reader::get_root(payload)?;
153
154                let body: Bytes = if method == "GET" {
155                    Bytes::new()
156                } else {
157                    match send_encoding {
158                        ordinary_config::IntegrationProtocolHttpEncoding::None => {
159                            Bytes::copy_from_slice(payload)
160                        }
161                        ordinary_config::IntegrationProtocolHttpEncoding::Json => {
162                            flexbuffer_reader_to_json(&self.config.send, &root.as_vector().idx(0))?
163                                .to_string()
164                                .into()
165                        }
166                        ordinary_config::IntegrationProtocolHttpEncoding::Text => {
167                            flexbuffer_reader_to_string(
168                                &self.config.send,
169                                &root.as_vector().idx(0),
170                            )?
171                            .into()
172                        }
173                    }
174                };
175
176                let mut endpoint = self.config.endpoint.clone();
177
178                if endpoint.contains("{send}") {
179                    endpoint = endpoint.replace(
180                        "{send}",
181                        &flexbuffer_reader_to_path_component(
182                            &self.config.send,
183                            &root.as_vector().idx(0),
184                        )?,
185                    );
186                }
187
188                if endpoint.contains("{send.")
189                    && let Kind::Object { name: _, fields } = &self.config.send
190                {
191                    let send_vec = root.as_vector().idx(0).as_vector();
192
193                    for field in fields {
194                        endpoint = endpoint.replace(
195                            &format!("{{send.{}}}", field.name),
196                            &flexbuffer_reader_to_path_component(
197                                &field.kind,
198                                &send_vec.idx(field.idx as usize),
199                            )?,
200                        );
201                    }
202                }
203
204                let res = self.http(&endpoint, method, headers, body).await?;
205
206                let mut builder =
207                    flexbuffers::Builder::new(&flexbuffers::BuilderOptions::SHARE_NONE);
208                let mut builder_vec = builder.start_vector();
209
210                match recv_encoding {
211                    ordinary_config::IntegrationProtocolHttpEncoding::None => {
212                        builder_vec.push(flexbuffers::Blob(res.bytes().await?.as_ref()));
213                    }
214                    ordinary_config::IntegrationProtocolHttpEncoding::Json => {
215                        json_to_flexbuffer_vec(
216                            &self.config.recv,
217                            &res.json().await?,
218                            &mut builder_vec,
219                        )?;
220                    }
221                    ordinary_config::IntegrationProtocolHttpEncoding::Text => {
222                        builder_vec.push(res.text().await?.as_str());
223                    }
224                }
225
226                builder_vec.end_vector();
227
228                Ok(Bytes::copy_from_slice(builder.view()))
229            } // IntegrationProtocol::Grpc { metadata: _ } => Ok(Bytes::new()),
230              // IntegrationProtocol::CapnProto => Ok(Bytes::new()),
231              // IntegrationProtocol::GraphQL => Ok(Bytes::new()),
232              // IntegrationProtocol::Postgres { statement: _ } => Ok(Bytes::new()),
233              // IntegrationProtocol::OpenSearch => Ok(Bytes::new()),
234              // IntegrationProtocol::Smtp => Ok(Bytes::new()),
235        }
236    }
237
238    #[instrument(skip_all, fields(host, port, path, query), err)]
239    async fn http(
240        &self,
241        endpoint: &str,
242        method: &str,
243        headers: &Vec<(String, String)>,
244        body: Bytes,
245    ) -> anyhow::Result<reqwest::Response> {
246        let start = Instant::now();
247
248        let mut req = match method {
249            "GET" => self.client.get(endpoint),
250            "POST" => {
251                let req = self.client.post(endpoint);
252                req.body(body)
253            }
254            "PUT" => {
255                let req = self.client.put(endpoint);
256                req.body(body)
257            }
258            "PATCH" => {
259                let req = self.client.patch(endpoint);
260                req.body(body)
261            }
262            "DELETE" => self.client.delete(endpoint),
263            _ => bail!("invalid method"),
264        };
265
266        for (key, val) in headers {
267            req = req.header(key.clone(), val);
268        }
269
270        let req = req.build()?;
271        let url = req.url();
272
273        Span::current().record("host", url.host().map(tracing::field::display));
274        Span::current().record("port", url.port().map(tracing::field::display));
275        Span::current().record("path", tracing::field::display(url.path()));
276        Span::current().record("query", url.query().map(tracing::field::display));
277
278        log_request(
279            self.log_headers,
280            req.headers(),
281            &self.redacted_hash,
282            req.method(),
283        );
284
285        let res = self.client.execute(req).await?;
286
287        log_response(
288            res.status().as_u16(),
289            self.log_headers,
290            &self.redacted_hash,
291            start,
292            res.headers(),
293            res.version(),
294        );
295
296        Ok(res)
297    }
298}