1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
use exc_core::retry::RetryPolicy;
use futures::future::BoxFuture;
use futures::{FutureExt, TryFutureExt};
use tower::retry::Retry;
use crate::types::key::BinanceKey;
use super::error::RestError;
use super::request::{Payload, Rest, RestEndpoint, RestRequest};
use super::response::{Data, RestResponse};
use std::task::{Context, Poll};
use tower::{Layer, Service, ServiceBuilder};
type Policy = RetryPolicy<RestRequest<Payload>, RestResponse<Data>, fn(&RestError) -> bool>;
#[derive(Clone)]
pub struct BinanceRestApiLayer {
retry: Policy,
endpoint: RestEndpoint,
key: Option<BinanceKey>,
}
impl BinanceRestApiLayer {
pub fn usd_margin_futures() -> Self {
Self::new(RestEndpoint::UsdMarginFutures)
}
pub fn new(endpoint: RestEndpoint) -> Self {
Self {
endpoint,
retry: RetryPolicy::On {
f: RestError::is_temporary,
times: 0,
},
key: None,
}
}
pub fn key(mut self, key: BinanceKey) -> Self {
self.key = Some(key);
self
}
}
impl<S> Layer<S> for BinanceRestApiLayer {
type Service = BinanceRestApi<S>;
fn layer(&self, http: S) -> Self::Service {
let inner = ServiceBuilder::default()
.retry(self.retry)
.service(BinanceRestApiInner {
http,
endpoint: self.endpoint,
key: self.key.clone(),
});
BinanceRestApi { inner }
}
}
#[derive(Debug, Clone)]
pub struct BinanceRestApiInner<S> {
endpoint: RestEndpoint,
http: S,
key: Option<BinanceKey>,
}
impl<S, R> Service<RestRequest<R>> for BinanceRestApiInner<S>
where
R: Rest,
S: Service<http::Request<hyper::Body>, Response = http::Response<hyper::Body>>,
S::Future: Send + 'static,
S::Error: 'static,
RestError: From<S::Error>,
{
type Response = RestResponse<Data>;
type Error = RestError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: RestRequest<R>) -> Self::Future {
match req.to_http(&self.endpoint, self.key.as_ref()) {
Ok(req) => self
.http
.call(req)
.map_err(RestError::from)
.and_then(RestResponse::from_http)
.boxed(),
Err(err) => futures::future::ready(Err(err)).boxed(),
}
}
}
#[derive(Clone)]
pub struct BinanceRestApi<S> {
inner: Retry<Policy, BinanceRestApiInner<S>>,
}
impl<S> Service<RestRequest<Payload>> for BinanceRestApi<S>
where
S: Clone + Send + 'static,
S: Service<http::Request<hyper::Body>, Response = http::Response<hyper::Body>>,
S::Future: Send + 'static,
S::Error: 'static,
RestError: From<S::Error>,
{
type Response = RestResponse<Data>;
type Error = RestError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: RestRequest<Payload>) -> Self::Future {
self.inner.call(req).boxed()
}
}