use std::io::Read as _;
use std::time::Duration;
use uni_plugin::{FnError, HttpEgress, HttpResponse};
const ERR_CLIENT_BUILD: u32 = 0xB00;
const ERR_TRANSPORT: u32 = 0xB01;
const ERR_WORKER_PANIC: u32 = 0xB02;
#[derive(Debug, Default, Clone)]
pub struct BlockingHttpEgress;
impl BlockingHttpEgress {
#[must_use]
pub fn new() -> Self {
Self
}
}
impl HttpEgress for BlockingHttpEgress {
fn get(
&self,
url: &str,
timeout: Duration,
max_bytes: usize,
traceparent: Option<&str>,
) -> Result<HttpResponse, FnError> {
run_on_dedicated_thread(url, None, timeout, max_bytes, traceparent)
}
fn post(
&self,
url: &str,
body: &[u8],
timeout: Duration,
max_bytes: usize,
traceparent: Option<&str>,
) -> Result<HttpResponse, FnError> {
run_on_dedicated_thread(url, Some(body), timeout, max_bytes, traceparent)
}
}
fn run_on_dedicated_thread(
url: &str,
body: Option<&[u8]>,
timeout: Duration,
max_bytes: usize,
traceparent: Option<&str>,
) -> Result<HttpResponse, FnError> {
std::thread::scope(|scope| {
let handle = scope.spawn(|| do_request(url, body, timeout, max_bytes, traceparent));
match handle.join() {
Ok(result) => result,
Err(_) => Err(FnError::new(
ERR_WORKER_PANIC,
"http request worker thread panicked",
)),
}
})
}
fn do_request(
url: &str,
body: Option<&[u8]>,
timeout: Duration,
max_bytes: usize,
traceparent: Option<&str>,
) -> Result<HttpResponse, FnError> {
let client = reqwest::blocking::Client::builder()
.timeout(timeout)
.build()
.map_err(|e| FnError::new(ERR_CLIENT_BUILD, format!("http client build: {e}")))?;
let mut request = match body {
Some(b) => client.post(url).body(b.to_vec()),
None => client.get(url),
};
if let Some(tp) = traceparent {
request = request.header("traceparent", tp);
}
let response = request
.send()
.map_err(|e| FnError::new(ERR_TRANSPORT, format!("http send `{url}`: {e}")))?;
let status = response.status().as_u16();
let mut buf = Vec::new();
let cap = (max_bytes as u64).saturating_add(1);
response
.take(cap)
.read_to_end(&mut buf)
.map_err(|e| FnError::new(ERR_TRANSPORT, format!("http body `{url}`: {e}")))?;
buf.truncate(max_bytes);
Ok(HttpResponse { status, body: buf })
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn build_and_default() {
let _ = BlockingHttpEgress::new();
let _ = BlockingHttpEgress;
}
#[test]
fn invalid_url_is_transport_error_not_panic() {
let egress = BlockingHttpEgress::new();
let err = egress
.get(
"http://127.0.0.1:1/",
Duration::from_millis(200),
1024,
None,
)
.expect_err("connection to a dead port must fail");
assert_eq!(err.code, ERR_TRANSPORT);
}
}