Skip to main content

uni_plugin_host/
http_egress.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Blocking HTTP egress for capability-gated plugin host functions.
5//!
6//! Implements [`uni_plugin::HttpEgress`] with `reqwest::blocking`. The Rhai
7//! engine runs scripts synchronously inside DataFusion scalar/procedure
8//! execution, which is itself driven on Tokio worker threads — and
9//! `reqwest::blocking` panics if used from within a Tokio runtime context. So
10//! each request runs on a freshly-spawned OS thread (via [`std::thread::scope`])
11//! that carries no Tokio context; the calling thread blocks on its join. URL
12//! allow-listing is enforced by the caller against the plugin's granted
13//! [`uni_plugin::Capability::Network`]; this layer only honors the `timeout`
14//! and `max_bytes` it is handed.
15
16// Rust guideline compliant
17
18use std::io::Read as _;
19use std::time::Duration;
20
21use uni_plugin::{FnError, HttpEgress, HttpResponse};
22
23/// FnError code: HTTP client could not be constructed.
24const ERR_CLIENT_BUILD: u32 = 0xB00;
25/// FnError code: transport / send / read failure (connection, timeout, body).
26const ERR_TRANSPORT: u32 = 0xB01;
27/// FnError code: the request worker thread panicked.
28const ERR_WORKER_PANIC: u32 = 0xB02;
29
30/// `reqwest::blocking`-backed [`HttpEgress`] safe to call from async contexts.
31#[derive(Debug, Default, Clone)]
32pub struct BlockingHttpEgress;
33
34impl BlockingHttpEgress {
35    /// Construct a new egress service.
36    #[must_use]
37    pub fn new() -> Self {
38        Self
39    }
40}
41
42impl HttpEgress for BlockingHttpEgress {
43    fn get(
44        &self,
45        url: &str,
46        timeout: Duration,
47        max_bytes: usize,
48        traceparent: Option<&str>,
49    ) -> Result<HttpResponse, FnError> {
50        run_on_dedicated_thread(url, None, timeout, max_bytes, traceparent)
51    }
52
53    fn post(
54        &self,
55        url: &str,
56        body: &[u8],
57        timeout: Duration,
58        max_bytes: usize,
59        traceparent: Option<&str>,
60    ) -> Result<HttpResponse, FnError> {
61        run_on_dedicated_thread(url, Some(body), timeout, max_bytes, traceparent)
62    }
63}
64
65/// Run a blocking request on a dedicated OS thread (no inherited Tokio context),
66/// blocking the caller until it completes.
67fn run_on_dedicated_thread(
68    url: &str,
69    body: Option<&[u8]>,
70    timeout: Duration,
71    max_bytes: usize,
72    traceparent: Option<&str>,
73) -> Result<HttpResponse, FnError> {
74    std::thread::scope(|scope| {
75        let handle = scope.spawn(|| do_request(url, body, timeout, max_bytes, traceparent));
76        match handle.join() {
77            Ok(result) => result,
78            Err(_) => Err(FnError::new(
79                ERR_WORKER_PANIC,
80                "http request worker thread panicked",
81            )),
82        }
83    })
84}
85
86/// Perform one blocking request, reading at most `max_bytes` of the body.
87fn do_request(
88    url: &str,
89    body: Option<&[u8]>,
90    timeout: Duration,
91    max_bytes: usize,
92    traceparent: Option<&str>,
93) -> Result<HttpResponse, FnError> {
94    let client = reqwest::blocking::Client::builder()
95        .timeout(timeout)
96        // Do NOT auto-follow redirects: the URL allow-list is enforced by the
97        // caller against the *initial* URL only, so a 3xx `Location` to an
98        // internal/link-local host (e.g. 169.254.169.254) would bypass it
99        // (SSRF). Surface the redirect response to the caller instead. (review H14)
100        .redirect(reqwest::redirect::Policy::none())
101        .build()
102        .map_err(|e| FnError::new(ERR_CLIENT_BUILD, format!("http client build: {e}")))?;
103    let mut request = match body {
104        Some(b) => client.post(url).body(b.to_vec()),
105        None => client.get(url),
106    };
107    // Propagate the host's trace context across the plugin boundary when present.
108    if let Some(tp) = traceparent {
109        request = request.header("traceparent", tp);
110    }
111    let response = request
112        .send()
113        .map_err(|e| FnError::new(ERR_TRANSPORT, format!("http send `{url}`: {e}")))?;
114    let status = response.status().as_u16();
115    // Bound the read so a hostile/large response can't exhaust memory: read one
116    // byte past the limit only to know nothing is silently dropped, then
117    // truncate to the cap.
118    let mut buf = Vec::new();
119    let cap = (max_bytes as u64).saturating_add(1);
120    response
121        .take(cap)
122        .read_to_end(&mut buf)
123        .map_err(|e| FnError::new(ERR_TRANSPORT, format!("http body `{url}`: {e}")))?;
124    buf.truncate(max_bytes);
125    Ok(HttpResponse { status, body: buf })
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131
132    #[test]
133    fn build_and_default() {
134        let _ = BlockingHttpEgress::new();
135        let _ = BlockingHttpEgress;
136    }
137
138    /// H14: a 3xx response must NOT be auto-followed — the allow-list only
139    /// covers the initial URL, so following a `Location` to an internal host
140    /// would be SSRF. The client must return the redirect's status (302) rather
141    /// than chasing the (here, unroutable link-local) target.
142    #[test]
143    fn redirect_is_not_followed() {
144        use std::io::Write as _;
145        use std::net::TcpListener;
146
147        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
148        let addr = listener.local_addr().unwrap();
149        let server = std::thread::spawn(move || {
150            if let Ok((mut stream, _)) = listener.accept() {
151                // Drain the request line/headers enough to respond.
152                let mut buf = [0u8; 1024];
153                let _ = std::io::Read::read(&mut stream, &mut buf);
154                // 302 to a link-local address that must never be fetched.
155                let resp = "HTTP/1.1 302 Found\r\n\
156                            Location: http://169.254.169.254/latest/meta-data\r\n\
157                            Content-Length: 0\r\n\r\n";
158                let _ = stream.write_all(resp.as_bytes());
159            }
160        });
161
162        let egress = BlockingHttpEgress::new();
163        let url = format!("http://{addr}/");
164        // Short timeout: if redirects WERE followed, the connect to the
165        // unroutable 169.254.169.254 would hang/error rather than return 302.
166        let resp = egress
167            .get(&url, Duration::from_millis(500), 1024, None)
168            .expect("redirect response should be returned, not followed");
169        assert_eq!(resp.status, 302, "redirect must be surfaced, not chased");
170        let _ = server.join();
171    }
172
173    #[test]
174    fn invalid_url_is_transport_error_not_panic() {
175        // No network and a bogus scheme: must surface as a transport FnError,
176        // and — critically — must not panic even though this test harness runs
177        // under a Tokio-capable context.
178        let egress = BlockingHttpEgress::new();
179        let err = egress
180            .get(
181                "http://127.0.0.1:1/",
182                Duration::from_millis(200),
183                1024,
184                None,
185            )
186            .expect_err("connection to a dead port must fail");
187        assert_eq!(err.code, ERR_TRANSPORT);
188    }
189}