umbral_socket/
client.rs

1use http_body_util::BodyExt;
2use http_body_util::Empty;
3use http_body_util::Full;
4use http_body_util::combinators::BoxBody;
5use hyper::Method;
6use hyper::Request;
7use hyper::body::{Buf, Bytes};
8use hyper::header;
9use hyper_util::client::legacy::Client;
10use hyper_util::rt::tokio::TokioExecutor;
11use hyperlocal::UnixConnector;
12use hyperlocal::Uri;
13use serde::Serialize;
14use serde::de::DeserializeOwned;
15
16use std::convert::Infallible;
17use std::error::Error;
18
19pub struct SocketResponse<T: DeserializeOwned> {
20    pub status: u16,
21    pub response: Option<T>,
22}
23
24impl<T: DeserializeOwned> SocketResponse<T> {
25    fn new(status: u16, response: Option<T>) -> Self {
26        return SocketResponse {
27            status: status,
28            response: response,
29        };
30    }
31}
32
33type BoxedBody = BoxBody<Bytes, Infallible>;
34type HyperClient = Client<UnixConnector, BoxedBody>;
35
36#[derive(Clone)]
37pub struct SocketClient {
38    socket: String,
39    client: HyperClient,
40}
41
42impl SocketClient {
43    pub fn new(socket: &str) -> SocketClient {
44        let connector = UnixConnector;
45        return SocketClient {
46            socket: String::from(socket),
47            client: Client::builder(TokioExecutor::new()).build(connector),
48        };
49    }
50
51    pub async fn get<U: DeserializeOwned>(
52        &self,
53        endpoint: &str,
54    ) -> Result<SocketResponse<U>, Box<dyn Error>> {
55        let uri = hyperlocal::Uri::new(&self.socket, endpoint).into();
56        let response = self.client.get(uri).await?;
57        let status = response.status().as_u16();
58        let body_bytes = response.into_body().collect().await?.to_bytes();
59        let data: U = serde_json::from_reader(body_bytes.reader())?;
60        return Ok(SocketResponse::new(status, Some(data)));
61    }
62
63    pub async fn post<T: Serialize, U: DeserializeOwned>(
64        &self,
65        endpoint: &str,
66        request: &T,
67    ) -> Result<SocketResponse<U>, Box<dyn Error>> {
68        let body_bytes = serde_json::to_vec(&*request).unwrap();
69        let request_body = Full::new(Bytes::from(body_bytes))
70            .map_err(|e| match e {})
71            .boxed();
72
73        let request = Request::builder()
74            .method(Method::POST)
75            .uri(Uri::new(&self.socket, endpoint))
76            .header(header::CONTENT_TYPE, "application/json")
77            .body(request_body)?;
78
79        let response = self.client.request(request).await?;
80        let status = response.status().as_u16();
81        let body_bytes = response.into_body().collect().await?.to_bytes();
82        let data: U = serde_json::from_reader(body_bytes.reader())?;
83        return Ok(SocketResponse::new(status, Some(data)));
84    }
85
86    pub async fn post_raw<T: Serialize>(
87        &self,
88        endpoint: &str,
89        request: &T,
90    ) -> Result<SocketResponse<()>, Box<dyn Error>> {
91        let body_bytes = serde_json::to_vec(&*request)?;
92        let request_body = Full::new(Bytes::from(body_bytes))
93            .map_err(|e| match e {})
94            .boxed();
95
96        let request = Request::builder()
97            .method(Method::POST)
98            .uri(Uri::new(&self.socket, endpoint))
99            .header(header::CONTENT_TYPE, "application/json")
100            .body(request_body)?;
101
102        let response = self.client.request(request).await?;
103        let status = response.status().as_u16();
104        response.into_body().collect().await?;
105        return Ok(SocketResponse::new(status, None));
106    }
107
108    pub async fn post_trigger(&self, endpoint: &str) -> Result<SocketResponse<()>, Box<dyn Error>> {
109        let request_body = Empty::new().map_err(|e| match e {}).boxed();
110        let request = Request::builder()
111            .method(Method::POST)
112            .uri(Uri::new(&self.socket, endpoint))
113            .header(header::CONTENT_LENGTH, "0")
114            .body(request_body)?;
115
116        let response = self.client.request(request).await?;
117        let status = response.status().as_u16();
118        response.into_body().collect().await?;
119        return Ok(SocketResponse::new(status, None));
120    }
121
122    pub async fn post_raw_bytes(
123        &self,
124        endpoint: &str,
125        body: Bytes,
126    ) -> Result<SocketResponse<()>, Box<dyn Error>> {
127        let request_body = Full::new(body).map_err(|e| match e {}).boxed();
128        let request = Request::builder()
129            .method(Method::POST)
130            .uri(Uri::new(&self.socket, endpoint))
131            .header(header::CONTENT_TYPE, "application/json")
132            .body(request_body)?;
133        let response = self.client.request(request).await?;
134        let status = response.status();
135        response.into_body().collect().await?;
136        Ok(SocketResponse::new(status.as_u16(), None))
137    }
138}