Skip to main content

lab_ops_natmap/
utils.rs

1//! HTTP client helpers for communicating with the natmap daemon over its Unix socket.
2
3use std::path::Path;
4
5use color_eyre::Result;
6use http_body_util::BodyExt;
7use http_body_util::Empty;
8use http_body_util::Full;
9use hyper::Method;
10use hyper::Request;
11use hyper_util::rt::TokioIo;
12use tokio::net::UnixStream;
13
14/// Sends an HTTP request to the natmap daemon over its Unix socket and deserializes the JSON response.
15///
16/// Generic over `T` (response type, must implement `DeserializeOwned`) and `R`
17/// (request body type, must implement `Serialize`). Pass `None` for `body` on
18/// GET and DELETE requests.
19///
20/// # Errors
21///
22/// Returns an error if the daemon is unreachable, returns a non-success status
23/// code, or if JSON deserialization fails.
24pub async fn request_json<T: serde::de::DeserializeOwned, R: serde::Serialize>(
25    socket_path: impl AsRef<Path>,
26    method: Method,
27    path: &str,
28    body: Option<R>,
29) -> Result<T> {
30    let socket_path = socket_path.as_ref();
31    let stream = UnixStream::connect(socket_path).await.map_err(|e| {
32        color_eyre::eyre::eyre!(
33            "Failed to connect to natmap daemon at {}: {}\nIs the daemon running?",
34            socket_path.to_string_lossy(),
35            e
36        )
37    })?;
38    let io = TokioIo::new(stream);
39    let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
40
41    tokio::task::spawn(async move {
42        if let Err(err) = conn.await {
43            tracing::error!("Connection failed: {:?}", err);
44        }
45    });
46
47    let mut req_builder = Request::builder()
48        .uri(format!("http://localhost{path}"))
49        .method(method)
50        .header("Host", "localhost");
51
52    let req = if let Some(b) = body {
53        req_builder = req_builder.header("Content-Type", "application/json");
54        let json = serde_json::to_vec(&b)?;
55        req_builder.body(Full::new(hyper::body::Bytes::from(json)).boxed())?
56    } else {
57        req_builder.body(Empty::<hyper::body::Bytes>::new().boxed())?
58    };
59
60    let res = sender.send_request(req).await?;
61    let status = res.status();
62    let body_bytes = res.into_body().collect().await?.to_bytes();
63
64    if !status.is_success() {
65        let err_msg = String::from_utf8_lossy(&body_bytes);
66        color_eyre::eyre::bail!("Daemon returned error {}: {}", status, err_msg);
67    }
68
69    if body_bytes.is_empty() {
70        return Ok(serde_json::from_str("null")
71            .unwrap_or_else(|_| serde_json::from_value(serde_json::Value::Null).unwrap()));
72    }
73
74    let parsed: T = serde_json::from_slice(&body_bytes)?;
75    Ok(parsed)
76}