todc_net/
lib.rs

1//! Algorithms for message-passing (HTTP) distributed systems.
2use bytes::Bytes;
3use http_body_util::combinators::BoxBody;
4use http_body_util::{BodyExt, Full};
5use hyper::body::Incoming;
6use hyper::http::StatusCode;
7use hyper::{Method, Request, Response, Uri};
8use serde_json::{json, Value as JSON};
9
10use crate::net::TcpStream;
11
12pub(crate) mod net;
13pub mod register;
14
15// NOTE: This module adds a local copy of some helper types that for integrating
16// tokio with Hyper 1.0. Hopefully, once Hyper 1.0 is released, there will be
17// a more standard way to integrate and this module can be deleted.
18// See: https://github.com/hyperium/hyper/issues/3110
19mod hyper_util_tokio_io;
20use hyper_util_tokio_io::TokioIo;
21
22type GenericError = Box<dyn std::error::Error + Send + Sync>;
23type ResponseResult = Result<Response<Incoming>, GenericError>;
24
25/// Submits a GET request to the URL.
26pub(crate) async fn get(url: Uri) -> ResponseResult {
27    make_request(url, Method::GET, json!(null)).await
28}
29
30/// Submits a POST request, along with a JSON body, to the URL.
31pub(crate) async fn post(url: Uri, body: JSON) -> ResponseResult {
32    make_request(url, Method::POST, body).await
33}
34
35/// Makes a request to the URL, including a JSON body.
36async fn make_request(url: Uri, method: Method, body: JSON) -> ResponseResult {
37    let authority = url.authority().ok_or("Invalid URL")?.as_str();
38    let stream = TcpStream::connect(authority).await?;
39
40    // Use adapter to access something implementing tokio::io as if they
41    // implement hyper::rt.
42    // See: https://github.com/hyperium/hyper/issues/3110
43    let io = TokioIo::new(stream);
44    let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
45
46    tokio::task::spawn(async move {
47        if let Err(err) = conn.await {
48            println!("Connection failed: {err}");
49        }
50    });
51
52    let req = Request::builder()
53        .header(hyper::header::HOST, authority)
54        .uri(url)
55        .method(method)
56        .body(full(body))?;
57
58    Ok(sender.send_request(req).await?)
59}
60
61/// Creates a response containing a JSON value.
62pub(crate) fn mk_response(
63    status: StatusCode,
64    body: JSON,
65) -> Result<Response<Full<Bytes>>, Box<dyn std::error::Error + Send + Sync>> {
66    Ok(Response::builder()
67        .status(status)
68        .body(Full::new(Bytes::from(body.to_string())))
69        .unwrap())
70}
71
72/// Returns a JSON body.
73fn full(value: JSON) -> BoxBody<Bytes, hyper::Error> {
74    Full::<Bytes>::new(Bytes::from(value.to_string()))
75        .map_err(|never| match never {})
76        .boxed()
77}