1use http_body_util::BodyExt;
2use http_body_util::Empty;
3use http_body_util::Full;
4use hyper::Method;
5use hyper::Request;
6use hyper::body::{Buf, Bytes};
7use hyper::header;
8use hyper_util::client::legacy::Client;
9use hyper_util::rt::tokio::TokioExecutor;
10use hyperlocal::UnixConnector;
11use hyperlocal::Uri;
12use serde::Serialize;
13use serde::de::DeserializeOwned;
14
15use std::error::Error;
16
17pub mod http;
18pub mod stream;
19
20pub struct UmbralResponse<T: DeserializeOwned> {
21 pub status: u16,
22 pub response: Option<T>,
23}
24
25impl<T: DeserializeOwned> UmbralResponse<T> {
26 fn new(status: u16, response: Option<T>) -> Self {
27 return UmbralResponse {
28 status: status,
29 response: response,
30 };
31 }
32}
33
34type EmptyClient = Client<UnixConnector, Empty<Bytes>>;
35type FullClient = Client<UnixConnector, Full<Bytes>>;
36
37#[derive(Clone)]
38pub struct UmbralSocket {
39 socket: String,
40 empty: EmptyClient,
41 full: FullClient,
42}
43
44impl UmbralSocket {
45 pub fn new(socket: &str) -> UmbralSocket {
46 let connector = UnixConnector;
47 return UmbralSocket {
48 socket: String::from(socket),
49 empty: Client::builder(TokioExecutor::new()).build(connector),
50 full: Client::builder(TokioExecutor::new()).build(connector),
51 };
52 }
53
54 pub async fn get<U: DeserializeOwned>(
55 &self,
56 endpoint: &str,
57 ) -> Result<UmbralResponse<U>, Box<dyn Error>> {
58 let uri = hyperlocal::Uri::new(&self.socket, endpoint).into();
59 let response = self.empty.get(uri).await?;
60 let status = response.status().as_u16();
61 let body_bytes = response.into_body().collect().await?.to_bytes();
62 let data: U = serde_json::from_reader(body_bytes.reader())?;
63 return Ok(UmbralResponse::new(status, Some(data)));
64 }
65
66 pub async fn post<T: Serialize, U: DeserializeOwned>(
67 &self,
68 endpoint: &str,
69 request: &T,
70 ) -> Result<UmbralResponse<U>, Box<dyn Error>> {
71 let body_bytes = serde_json::to_vec(&*request).unwrap();
72 let request_body = Full::new(Bytes::from(body_bytes));
73
74 let request = Request::builder()
75 .method(Method::POST)
76 .uri(Uri::new(&self.socket, endpoint))
77 .header(header::CONTENT_TYPE, "application/json")
78 .body(request_body)?;
79
80 let response = self.full.request(request).await?;
81 let status = response.status().as_u16();
82 let body_bytes = response.into_body().collect().await?.to_bytes();
83 let data: U = serde_json::from_reader(body_bytes.reader())?;
84 return Ok(UmbralResponse::new(status, Some(data)));
85 }
86
87 pub async fn post_raw<T: Serialize>(
88 &self,
89 endpoint: &str,
90 request: &T,
91 ) -> Result<UmbralResponse<()>, Box<dyn Error>> {
92 let body_bytes = serde_json::to_vec(&*request)?;
93 let request_body = Full::new(Bytes::from(body_bytes));
94
95 let request = Request::builder()
96 .method(Method::POST)
97 .uri(Uri::new(&self.socket, endpoint))
98 .header(header::CONTENT_TYPE, "application/json")
99 .body(request_body)?;
100
101 let response = self.full.request(request).await?;
102 let status = response.status().as_u16();
103 response.into_body().collect().await?;
104 return Ok(UmbralResponse::new(status, None));
105 }
106
107 pub async fn post_trigger(&self, endpoint: &str) -> Result<UmbralResponse<()>, Box<dyn Error>> {
108 let request_body = Empty::<Bytes>::new();
109 let request = Request::builder()
110 .method(Method::POST)
111 .uri(Uri::new(&self.socket, endpoint))
112 .header(header::CONTENT_LENGTH, "0")
113 .body(request_body)?;
114
115 let response = self.empty.request(request).await?;
116 let status = response.status().as_u16();
117 response.into_body().collect().await?;
118 return Ok(UmbralResponse::new(status, None));
119 }
120
121 pub async fn post_raw_bytes(
122 &self,
123 endpoint: &str,
124 body: Bytes,
125 ) -> Result<UmbralResponse<()>, Box<dyn Error>> {
126 let request_body = Full::new(body);
127 let request = Request::builder()
128 .method(Method::POST)
129 .uri(Uri::new(&self.socket, endpoint))
130 .header(header::CONTENT_TYPE, "application/json")
131 .body(request_body)?;
132 let response = self.full.request(request).await?;
133 let status = response.status();
134 response.into_body().collect().await?;
135 Ok(UmbralResponse::new(status.as_u16(), None))
136 }
137}