uni_plugin_host/
http_egress.rs1use std::io::Read as _;
19use std::time::Duration;
20
21use uni_plugin::{FnError, HttpEgress, HttpResponse};
22
23const ERR_CLIENT_BUILD: u32 = 0xB00;
25const ERR_TRANSPORT: u32 = 0xB01;
27const ERR_WORKER_PANIC: u32 = 0xB02;
29
30#[derive(Debug, Default, Clone)]
32pub struct BlockingHttpEgress;
33
34impl BlockingHttpEgress {
35 #[must_use]
37 pub fn new() -> Self {
38 Self
39 }
40}
41
42impl HttpEgress for BlockingHttpEgress {
43 fn get(
44 &self,
45 url: &str,
46 timeout: Duration,
47 max_bytes: usize,
48 traceparent: Option<&str>,
49 ) -> Result<HttpResponse, FnError> {
50 run_on_dedicated_thread(url, None, timeout, max_bytes, traceparent)
51 }
52
53 fn post(
54 &self,
55 url: &str,
56 body: &[u8],
57 timeout: Duration,
58 max_bytes: usize,
59 traceparent: Option<&str>,
60 ) -> Result<HttpResponse, FnError> {
61 run_on_dedicated_thread(url, Some(body), timeout, max_bytes, traceparent)
62 }
63}
64
65fn run_on_dedicated_thread(
68 url: &str,
69 body: Option<&[u8]>,
70 timeout: Duration,
71 max_bytes: usize,
72 traceparent: Option<&str>,
73) -> Result<HttpResponse, FnError> {
74 std::thread::scope(|scope| {
75 let handle = scope.spawn(|| do_request(url, body, timeout, max_bytes, traceparent));
76 match handle.join() {
77 Ok(result) => result,
78 Err(_) => Err(FnError::new(
79 ERR_WORKER_PANIC,
80 "http request worker thread panicked",
81 )),
82 }
83 })
84}
85
86fn do_request(
88 url: &str,
89 body: Option<&[u8]>,
90 timeout: Duration,
91 max_bytes: usize,
92 traceparent: Option<&str>,
93) -> Result<HttpResponse, FnError> {
94 let client = reqwest::blocking::Client::builder()
95 .timeout(timeout)
96 .redirect(reqwest::redirect::Policy::none())
101 .build()
102 .map_err(|e| FnError::new(ERR_CLIENT_BUILD, format!("http client build: {e}")))?;
103 let mut request = match body {
104 Some(b) => client.post(url).body(b.to_vec()),
105 None => client.get(url),
106 };
107 if let Some(tp) = traceparent {
109 request = request.header("traceparent", tp);
110 }
111 let response = request
112 .send()
113 .map_err(|e| FnError::new(ERR_TRANSPORT, format!("http send `{url}`: {e}")))?;
114 let status = response.status().as_u16();
115 let mut buf = Vec::new();
119 let cap = (max_bytes as u64).saturating_add(1);
120 response
121 .take(cap)
122 .read_to_end(&mut buf)
123 .map_err(|e| FnError::new(ERR_TRANSPORT, format!("http body `{url}`: {e}")))?;
124 buf.truncate(max_bytes);
125 Ok(HttpResponse { status, body: buf })
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131
132 #[test]
133 fn build_and_default() {
134 let _ = BlockingHttpEgress::new();
135 let _ = BlockingHttpEgress;
136 }
137
138 #[test]
143 fn redirect_is_not_followed() {
144 use std::io::Write as _;
145 use std::net::TcpListener;
146
147 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
148 let addr = listener.local_addr().unwrap();
149 let server = std::thread::spawn(move || {
150 if let Ok((mut stream, _)) = listener.accept() {
151 let mut buf = [0u8; 1024];
153 let _ = std::io::Read::read(&mut stream, &mut buf);
154 let resp = "HTTP/1.1 302 Found\r\n\
156 Location: http://169.254.169.254/latest/meta-data\r\n\
157 Content-Length: 0\r\n\r\n";
158 let _ = stream.write_all(resp.as_bytes());
159 }
160 });
161
162 let egress = BlockingHttpEgress::new();
163 let url = format!("http://{addr}/");
164 let resp = egress
167 .get(&url, Duration::from_millis(500), 1024, None)
168 .expect("redirect response should be returned, not followed");
169 assert_eq!(resp.status, 302, "redirect must be surfaced, not chased");
170 let _ = server.join();
171 }
172
173 #[test]
174 fn invalid_url_is_transport_error_not_panic() {
175 let egress = BlockingHttpEgress::new();
179 let err = egress
180 .get(
181 "http://127.0.0.1:1/",
182 Duration::from_millis(200),
183 1024,
184 None,
185 )
186 .expect_err("connection to a dead port must fail");
187 assert_eq!(err.code, ERR_TRANSPORT);
188 }
189}