umbral_socket/
lib.rs

1use http_body_util::BodyExt;
2use http_body_util::Empty;
3use http_body_util::Full;
4use hyper::Method;
5use hyper::Request;
6use hyper::body::{Buf, Bytes};
7use hyper::header;
8use hyper_util::client::legacy::Client;
9use hyper_util::rt::tokio::TokioExecutor;
10use hyperlocal::UnixConnector;
11use hyperlocal::Uri;
12use serde::Serialize;
13use serde::de::DeserializeOwned;
14
15use std::error::Error;
16
17pub mod http;
18pub mod stream;
19
20pub struct UmbralResponse<T: DeserializeOwned> {
21    pub status: u16,
22    pub response: Option<T>,
23}
24
25impl<T: DeserializeOwned> UmbralResponse<T> {
26    fn new(status: u16, response: Option<T>) -> Self {
27        return UmbralResponse {
28            status: status,
29            response: response,
30        };
31    }
32}
33
34type EmptyClient = Client<UnixConnector, Empty<Bytes>>;
35type FullClient = Client<UnixConnector, Full<Bytes>>;
36
37#[derive(Clone)]
38pub struct UmbralSocket {
39    socket: String,
40    empty: EmptyClient,
41    full: FullClient,
42}
43
44impl UmbralSocket {
45    pub fn new(socket: &str) -> UmbralSocket {
46        let connector = UnixConnector;
47        return UmbralSocket {
48            socket: String::from(socket),
49            empty: Client::builder(TokioExecutor::new()).build(connector),
50            full: Client::builder(TokioExecutor::new()).build(connector),
51        };
52    }
53
54    pub async fn get<U: DeserializeOwned>(
55        &self,
56        endpoint: &str,
57    ) -> Result<UmbralResponse<U>, Box<dyn Error>> {
58        let uri = hyperlocal::Uri::new(&self.socket, endpoint).into();
59        let response = self.empty.get(uri).await?;
60        let status = response.status().as_u16();
61        let body_bytes = response.into_body().collect().await?.to_bytes();
62        let data: U = serde_json::from_reader(body_bytes.reader())?;
63        return Ok(UmbralResponse::new(status, Some(data)));
64    }
65
66    pub async fn post<T: Serialize, U: DeserializeOwned>(
67        &self,
68        endpoint: &str,
69        request: &T,
70    ) -> Result<UmbralResponse<U>, Box<dyn Error>> {
71        let body_bytes = serde_json::to_vec(&*request).unwrap();
72        let request_body = Full::new(Bytes::from(body_bytes));
73
74        let request = Request::builder()
75            .method(Method::POST)
76            .uri(Uri::new(&self.socket, endpoint))
77            .header(header::CONTENT_TYPE, "application/json")
78            .body(request_body)?;
79
80        let response = self.full.request(request).await?;
81        let status = response.status().as_u16();
82        let body_bytes = response.into_body().collect().await?.to_bytes();
83        let data: U = serde_json::from_reader(body_bytes.reader())?;
84        return Ok(UmbralResponse::new(status, Some(data)));
85    }
86
87    pub async fn post_raw<T: Serialize>(
88        &self,
89        endpoint: &str,
90        request: &T,
91    ) -> Result<UmbralResponse<()>, Box<dyn Error>> {
92        let body_bytes = serde_json::to_vec(&*request)?;
93        let request_body = Full::new(Bytes::from(body_bytes));
94
95        let request = Request::builder()
96            .method(Method::POST)
97            .uri(Uri::new(&self.socket, endpoint))
98            .header(header::CONTENT_TYPE, "application/json")
99            .body(request_body)?;
100
101        let response = self.full.request(request).await?;
102        let status = response.status().as_u16();
103        response.into_body().collect().await?;
104        return Ok(UmbralResponse::new(status, None));
105    }
106
107    pub async fn post_trigger(&self, endpoint: &str) -> Result<UmbralResponse<()>, Box<dyn Error>> {
108        let request_body = Empty::<Bytes>::new();
109        let request = Request::builder()
110            .method(Method::POST)
111            .uri(Uri::new(&self.socket, endpoint))
112            .header(header::CONTENT_LENGTH, "0")
113            .body(request_body)?;
114
115        let response = self.empty.request(request).await?;
116        let status = response.status().as_u16();
117        response.into_body().collect().await?;
118        return Ok(UmbralResponse::new(status, None));
119    }
120
121    pub async fn post_raw_bytes(
122        &self,
123        endpoint: &str,
124        body: Bytes,
125    ) -> Result<UmbralResponse<()>, Box<dyn Error>> {
126        let request_body = Full::new(body);
127        let request = Request::builder()
128            .method(Method::POST)
129            .uri(Uri::new(&self.socket, endpoint))
130            .header(header::CONTENT_TYPE, "application/json")
131            .body(request_body)?;
132        let response = self.full.request(request).await?;
133        let status = response.status();
134        response.into_body().collect().await?;
135        Ok(UmbralResponse::new(status.as_u16(), None))
136    }
137}