1use std::{
2 error, fmt,
3 pin::Pin,
4 sync::{
5 atomic::{AtomicUsize, Ordering},
6 Arc,
7 },
8};
9
10use futures_core::{
11 task::{Context, Poll},
12 Future,
13};
14use futures_util::TryFutureExt;
15use hyper::client::HttpConnector;
16use hyper::{
17 body::to_bytes,
18 header::{AUTHORIZATION, CONTENT_TYPE},
19 Body, Client as HyperClient, Error as HyperError, Request as HttpRequest,
20 Response as HttpResponse,
21};
22use hyper_tls::HttpsConnector;
23use tower_service::Service;
24use tower_util::ServiceExt;
25
26use super::{Error, RequestFactory};
27use crate::objects::{Request, RequestBuilder, Response};
28
29pub type HttpError<E> = Error<ConnectionError<E>>;
30
31#[derive(Debug)]
33pub enum ConnectionError<E> {
34 Poll(E),
35 Service(E),
36 Body(HyperError),
37}
38
39impl<E: fmt::Display> fmt::Display for ConnectionError<E> {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 match self {
42 Self::Poll(err) => write!(f, "polling error, {}", err),
43 Self::Service(err) => write!(f, "service error, {}", err),
44 Self::Body(err) => write!(f, "body error, {}", err),
45 }
46 }
47}
48
49impl<E: fmt::Display + fmt::Debug> error::Error for ConnectionError<E> {}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
52pub struct Credentials {
53 url: String,
54 user: Option<String>,
55 password: Option<String>,
56}
57
58#[derive(Clone, Debug)]
60pub struct Client<S> {
61 credentials: Arc<Credentials>,
62 nonce: Arc<AtomicUsize>,
63 inner_service: S,
64}
65
66impl<S> Client<S> {
67 pub fn from_service(
71 service: S,
72 url: String,
73 user: Option<String>,
74 password: Option<String>,
75 ) -> Self {
76 let credentials = Arc::new(Credentials {
77 url,
78 user,
79 password,
80 });
81 Client {
82 credentials,
83 inner_service: service,
84 nonce: Arc::new(AtomicUsize::new(0)),
85 }
86 }
87
88 pub fn next_nonce(&self) -> usize {
90 self.nonce.load(Ordering::AcqRel)
91 }
92}
93
94impl Client<HyperClient<HttpConnector>> {
95 pub fn new(url: String, user: Option<String>, password: Option<String>) -> Self {
97 Self::from_service(HyperClient::new(), url, user, password)
98 }
99}
100
101impl Client<HyperClient<HttpsConnector<HttpConnector>>> {
102 pub fn new_tls(url: String, user: Option<String>, password: Option<String>) -> Self {
104 let https = HttpsConnector::new();
105 let service = HyperClient::builder().build::<_, Body>(https);
106 Self::from_service(service, url, user, password)
107 }
108}
109
110type FutResponse<R, E> = Pin<Box<dyn Future<Output = Result<R, E>> + 'static + Send>>;
111
112impl<S> Service<Request> for Client<S>
113where
114 S: Service<HttpRequest<Body>, Response = HttpResponse<Body>>,
115 S::Error: 'static,
116 S::Future: Send + 'static,
117{
118 type Response = Response;
119 type Error = Error<ConnectionError<S::Error>>;
120 type Future = FutResponse<Self::Response, Self::Error>;
121
122 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
123 self.inner_service
124 .poll_ready(cx)
125 .map_err(ConnectionError::Poll)
126 .map_err(Error::Connection)
127 }
128
129 fn call(&mut self, request: Request) -> Self::Future {
130 let json_raw = serde_json::to_vec(&request).unwrap(); let body = Body::from(json_raw);
132 let mut builder = hyper::Request::post(&self.credentials.url);
133
134 if let Some(ref user) = self.credentials.user {
136 let pass_str = match &self.credentials.password {
137 Some(some) => some,
138 None => "",
139 };
140 builder = builder.header(
141 AUTHORIZATION,
142 format!(
143 "Basic {}",
144 base64::encode(&format!("{}:{}", user, pass_str))
145 ),
146 )
147 };
148
149 let request = builder
151 .header(CONTENT_TYPE, "application/json")
152 .body(body)
153 .unwrap(); let fut = self
157 .inner_service
158 .call(request)
159 .map_err(ConnectionError::Service)
160 .map_err(Error::Connection)
161 .and_then(|response| async move {
162 let body = to_bytes(response.into_body())
163 .await
164 .map_err(ConnectionError::Body)
165 .map_err(Error::Connection)?;
166 Ok(serde_json::from_slice(&body).map_err(Error::Json)?)
167 });
168
169 Box::pin(fut)
170 }
171}
172
173impl<S> Client<S>
174where
175 S: Service<HttpRequest<Body>, Response = HttpResponse<Body>> + Clone,
176 S::Error: 'static,
177 S::Future: Send + 'static,
178{
179 pub async fn send(
180 &self,
181 request: Request,
182 ) -> Result<Response, Error<ConnectionError<S::Error>>> {
183 self.clone().oneshot(request).await
184 }
185}
186
187impl<C> RequestFactory for Client<C> {
188 fn build_request(&self) -> RequestBuilder {
190 let id = serde_json::Value::Number(self.nonce.fetch_add(1, Ordering::AcqRel).into());
191 Request::build().id(id)
192 }
193}