uni_plugin_host/
http_egress.rs1use std::io::Read as _;
19use std::time::Duration;
20
21use uni_plugin::{FnError, HttpEgress, HttpResponse};
22
23const ERR_CLIENT_BUILD: u32 = 0xB00;
25const ERR_TRANSPORT: u32 = 0xB01;
27const ERR_WORKER_PANIC: u32 = 0xB02;
29
30#[derive(Debug, Default, Clone)]
32pub struct BlockingHttpEgress;
33
34impl BlockingHttpEgress {
35 #[must_use]
37 pub fn new() -> Self {
38 Self
39 }
40}
41
42impl HttpEgress for BlockingHttpEgress {
43 fn get(
44 &self,
45 url: &str,
46 timeout: Duration,
47 max_bytes: usize,
48 traceparent: Option<&str>,
49 ) -> Result<HttpResponse, FnError> {
50 run_on_dedicated_thread(url, None, timeout, max_bytes, traceparent)
51 }
52
53 fn post(
54 &self,
55 url: &str,
56 body: &[u8],
57 timeout: Duration,
58 max_bytes: usize,
59 traceparent: Option<&str>,
60 ) -> Result<HttpResponse, FnError> {
61 run_on_dedicated_thread(url, Some(body), timeout, max_bytes, traceparent)
62 }
63}
64
65fn run_on_dedicated_thread(
68 url: &str,
69 body: Option<&[u8]>,
70 timeout: Duration,
71 max_bytes: usize,
72 traceparent: Option<&str>,
73) -> Result<HttpResponse, FnError> {
74 std::thread::scope(|scope| {
75 let handle = scope.spawn(|| do_request(url, body, timeout, max_bytes, traceparent));
76 match handle.join() {
77 Ok(result) => result,
78 Err(_) => Err(FnError::new(
79 ERR_WORKER_PANIC,
80 "http request worker thread panicked",
81 )),
82 }
83 })
84}
85
86fn do_request(
88 url: &str,
89 body: Option<&[u8]>,
90 timeout: Duration,
91 max_bytes: usize,
92 traceparent: Option<&str>,
93) -> Result<HttpResponse, FnError> {
94 let client = reqwest::blocking::Client::builder()
95 .timeout(timeout)
96 .build()
97 .map_err(|e| FnError::new(ERR_CLIENT_BUILD, format!("http client build: {e}")))?;
98 let mut request = match body {
99 Some(b) => client.post(url).body(b.to_vec()),
100 None => client.get(url),
101 };
102 if let Some(tp) = traceparent {
104 request = request.header("traceparent", tp);
105 }
106 let response = request
107 .send()
108 .map_err(|e| FnError::new(ERR_TRANSPORT, format!("http send `{url}`: {e}")))?;
109 let status = response.status().as_u16();
110 let mut buf = Vec::new();
114 let cap = (max_bytes as u64).saturating_add(1);
115 response
116 .take(cap)
117 .read_to_end(&mut buf)
118 .map_err(|e| FnError::new(ERR_TRANSPORT, format!("http body `{url}`: {e}")))?;
119 buf.truncate(max_bytes);
120 Ok(HttpResponse { status, body: buf })
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126
127 #[test]
128 fn build_and_default() {
129 let _ = BlockingHttpEgress::new();
130 let _ = BlockingHttpEgress;
131 }
132
133 #[test]
134 fn invalid_url_is_transport_error_not_panic() {
135 let egress = BlockingHttpEgress::new();
139 let err = egress
140 .get(
141 "http://127.0.0.1:1/",
142 Duration::from_millis(200),
143 1024,
144 None,
145 )
146 .expect_err("connection to a dead port must fail");
147 assert_eq!(err.code, ERR_TRANSPORT);
148 }
149}