Skip to main content

uni_plugin_host/
http_egress.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Blocking HTTP egress for capability-gated plugin host functions.
5//!
6//! Implements [`uni_plugin::HttpEgress`] with `reqwest::blocking`. The Rhai
7//! engine runs scripts synchronously inside DataFusion scalar/procedure
8//! execution, which is itself driven on Tokio worker threads — and
9//! `reqwest::blocking` panics if used from within a Tokio runtime context. So
10//! each request runs on a freshly-spawned OS thread (via [`std::thread::scope`])
11//! that carries no Tokio context; the calling thread blocks on its join. URL
12//! allow-listing is enforced by the caller against the plugin's granted
13//! [`uni_plugin::Capability::Network`]; this layer only honors the `timeout`
14//! and `max_bytes` it is handed.
15
16// Rust guideline compliant
17
18use std::io::Read as _;
19use std::time::Duration;
20
21use uni_plugin::{FnError, HttpEgress, HttpResponse};
22
23/// FnError code: HTTP client could not be constructed.
24const ERR_CLIENT_BUILD: u32 = 0xB00;
25/// FnError code: transport / send / read failure (connection, timeout, body).
26const ERR_TRANSPORT: u32 = 0xB01;
27/// FnError code: the request worker thread panicked.
28const ERR_WORKER_PANIC: u32 = 0xB02;
29
30/// `reqwest::blocking`-backed [`HttpEgress`] safe to call from async contexts.
31#[derive(Debug, Default, Clone)]
32pub struct BlockingHttpEgress;
33
34impl BlockingHttpEgress {
35    /// Construct a new egress service.
36    #[must_use]
37    pub fn new() -> Self {
38        Self
39    }
40}
41
42impl HttpEgress for BlockingHttpEgress {
43    fn get(
44        &self,
45        url: &str,
46        timeout: Duration,
47        max_bytes: usize,
48        traceparent: Option<&str>,
49    ) -> Result<HttpResponse, FnError> {
50        run_on_dedicated_thread(url, None, timeout, max_bytes, traceparent)
51    }
52
53    fn post(
54        &self,
55        url: &str,
56        body: &[u8],
57        timeout: Duration,
58        max_bytes: usize,
59        traceparent: Option<&str>,
60    ) -> Result<HttpResponse, FnError> {
61        run_on_dedicated_thread(url, Some(body), timeout, max_bytes, traceparent)
62    }
63}
64
65/// Run a blocking request on a dedicated OS thread (no inherited Tokio context),
66/// blocking the caller until it completes.
67fn run_on_dedicated_thread(
68    url: &str,
69    body: Option<&[u8]>,
70    timeout: Duration,
71    max_bytes: usize,
72    traceparent: Option<&str>,
73) -> Result<HttpResponse, FnError> {
74    std::thread::scope(|scope| {
75        let handle = scope.spawn(|| do_request(url, body, timeout, max_bytes, traceparent));
76        match handle.join() {
77            Ok(result) => result,
78            Err(_) => Err(FnError::new(
79                ERR_WORKER_PANIC,
80                "http request worker thread panicked",
81            )),
82        }
83    })
84}
85
86/// Perform one blocking request, reading at most `max_bytes` of the body.
87fn do_request(
88    url: &str,
89    body: Option<&[u8]>,
90    timeout: Duration,
91    max_bytes: usize,
92    traceparent: Option<&str>,
93) -> Result<HttpResponse, FnError> {
94    let client = reqwest::blocking::Client::builder()
95        .timeout(timeout)
96        .build()
97        .map_err(|e| FnError::new(ERR_CLIENT_BUILD, format!("http client build: {e}")))?;
98    let mut request = match body {
99        Some(b) => client.post(url).body(b.to_vec()),
100        None => client.get(url),
101    };
102    // Propagate the host's trace context across the plugin boundary when present.
103    if let Some(tp) = traceparent {
104        request = request.header("traceparent", tp);
105    }
106    let response = request
107        .send()
108        .map_err(|e| FnError::new(ERR_TRANSPORT, format!("http send `{url}`: {e}")))?;
109    let status = response.status().as_u16();
110    // Bound the read so a hostile/large response can't exhaust memory: read one
111    // byte past the limit only to know nothing is silently dropped, then
112    // truncate to the cap.
113    let mut buf = Vec::new();
114    let cap = (max_bytes as u64).saturating_add(1);
115    response
116        .take(cap)
117        .read_to_end(&mut buf)
118        .map_err(|e| FnError::new(ERR_TRANSPORT, format!("http body `{url}`: {e}")))?;
119    buf.truncate(max_bytes);
120    Ok(HttpResponse { status, body: buf })
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126
127    #[test]
128    fn build_and_default() {
129        let _ = BlockingHttpEgress::new();
130        let _ = BlockingHttpEgress;
131    }
132
133    #[test]
134    fn invalid_url_is_transport_error_not_panic() {
135        // No network and a bogus scheme: must surface as a transport FnError,
136        // and — critically — must not panic even though this test harness runs
137        // under a Tokio-capable context.
138        let egress = BlockingHttpEgress::new();
139        let err = egress
140            .get(
141                "http://127.0.0.1:1/",
142                Duration::from_millis(200),
143                1024,
144                None,
145            )
146            .expect_err("connection to a dead port must fail");
147        assert_eq!(err.code, ERR_TRANSPORT);
148    }
149}