exc_binance/http/
layer.rs

1use exc_core::retry::RetryPolicy;
2use futures::future::BoxFuture;
3use futures::{FutureExt, TryFutureExt};
4use tower::retry::Retry;
5
6use crate::types::key::BinanceKey;
7
8use super::error::RestError;
9use super::request::{Payload, Rest, RestEndpoint, RestRequest};
10use super::response::{Data, RestResponse};
11use std::task::{Context, Poll};
12use tower::{Layer, Service, ServiceBuilder};
13
14type Policy = RetryPolicy<RestRequest<Payload>, RestResponse<Data>, fn(&RestError) -> bool>;
15
16/// Binance rest api layer.
17#[derive(Clone)]
18pub struct BinanceRestApiLayer {
19    retry: Policy,
20    endpoint: RestEndpoint,
21    key: Option<BinanceKey>,
22}
23
24impl BinanceRestApiLayer {
25    /// USD-Margin Futures http endpoint.
26    pub fn usd_margin_futures() -> Self {
27        Self::new(RestEndpoint::UsdMarginFutures)
28    }
29
30    /// Create a new binance rest api layer.
31    pub fn new(endpoint: RestEndpoint) -> Self {
32        Self {
33            endpoint,
34            retry: RetryPolicy::default().retry_on(RestError::is_temporary),
35            key: None,
36        }
37    }
38
39    /// Set key.
40    pub fn key(mut self, key: BinanceKey) -> Self {
41        self.key = Some(key);
42        self
43    }
44}
45
46impl<S> Layer<S> for BinanceRestApiLayer {
47    type Service = BinanceRestApi<S>;
48
49    fn layer(&self, http: S) -> Self::Service {
50        let inner = ServiceBuilder::default()
51            .retry(self.retry)
52            .service(BinanceRestApiInner {
53                http,
54                endpoint: self.endpoint,
55                key: self.key.clone(),
56            });
57        BinanceRestApi { inner }
58    }
59}
60
61/// Binance rest api service inner part.
62#[derive(Debug, Clone)]
63pub struct BinanceRestApiInner<S> {
64    endpoint: RestEndpoint,
65    http: S,
66    key: Option<BinanceKey>,
67}
68
69impl<S, R> Service<RestRequest<R>> for BinanceRestApiInner<S>
70where
71    R: Rest,
72    S: Service<http::Request<hyper::Body>, Response = http::Response<hyper::Body>>,
73    S::Future: Send + 'static,
74    S::Error: 'static,
75    RestError: From<S::Error>,
76{
77    type Response = RestResponse<Data>;
78    type Error = RestError;
79    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
80
81    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82        self.http.poll_ready(cx).map_err(RestError::from)
83    }
84
85    fn call(&mut self, req: RestRequest<R>) -> Self::Future {
86        match req.to_http(&self.endpoint, self.key.as_ref()) {
87            Ok(req) => {
88                tracing::trace!("sent http request: {}", req.uri());
89                self.http
90                    .call(req)
91                    .map_err(RestError::from)
92                    .and_then(RestResponse::from_http)
93                    .boxed()
94            }
95            Err(err) => futures::future::ready(Err(err)).boxed(),
96        }
97    }
98}
99
100/// Binance rest api service.
101#[derive(Clone)]
102pub struct BinanceRestApi<S> {
103    inner: Retry<Policy, BinanceRestApiInner<S>>,
104}
105
106impl<S> Service<RestRequest<Payload>> for BinanceRestApi<S>
107where
108    S: Clone + Send + 'static,
109    S: Service<http::Request<hyper::Body>, Response = http::Response<hyper::Body>>,
110    S::Future: Send + 'static,
111    S::Error: 'static,
112    RestError: From<S::Error>,
113{
114    type Response = RestResponse<Data>;
115    type Error = RestError;
116    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
117
118    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119        self.inner.poll_ready(cx)
120    }
121
122    fn call(&mut self, req: RestRequest<Payload>) -> Self::Future {
123        self.inner.call(req).boxed()
124    }
125}