exc_binance/http/
layer.rs1use exc_core::retry::RetryPolicy;
2use futures::future::BoxFuture;
3use futures::{FutureExt, TryFutureExt};
4use tower::retry::Retry;
5
6use crate::types::key::BinanceKey;
7
8use super::error::RestError;
9use super::request::{Payload, Rest, RestEndpoint, RestRequest};
10use super::response::{Data, RestResponse};
11use std::task::{Context, Poll};
12use tower::{Layer, Service, ServiceBuilder};
13
14type Policy = RetryPolicy<RestRequest<Payload>, RestResponse<Data>, fn(&RestError) -> bool>;
15
16#[derive(Clone)]
18pub struct BinanceRestApiLayer {
19 retry: Policy,
20 endpoint: RestEndpoint,
21 key: Option<BinanceKey>,
22}
23
24impl BinanceRestApiLayer {
25 pub fn usd_margin_futures() -> Self {
27 Self::new(RestEndpoint::UsdMarginFutures)
28 }
29
30 pub fn new(endpoint: RestEndpoint) -> Self {
32 Self {
33 endpoint,
34 retry: RetryPolicy::default().retry_on(RestError::is_temporary),
35 key: None,
36 }
37 }
38
39 pub fn key(mut self, key: BinanceKey) -> Self {
41 self.key = Some(key);
42 self
43 }
44}
45
46impl<S> Layer<S> for BinanceRestApiLayer {
47 type Service = BinanceRestApi<S>;
48
49 fn layer(&self, http: S) -> Self::Service {
50 let inner = ServiceBuilder::default()
51 .retry(self.retry)
52 .service(BinanceRestApiInner {
53 http,
54 endpoint: self.endpoint,
55 key: self.key.clone(),
56 });
57 BinanceRestApi { inner }
58 }
59}
60
61#[derive(Debug, Clone)]
63pub struct BinanceRestApiInner<S> {
64 endpoint: RestEndpoint,
65 http: S,
66 key: Option<BinanceKey>,
67}
68
69impl<S, R> Service<RestRequest<R>> for BinanceRestApiInner<S>
70where
71 R: Rest,
72 S: Service<http::Request<hyper::Body>, Response = http::Response<hyper::Body>>,
73 S::Future: Send + 'static,
74 S::Error: 'static,
75 RestError: From<S::Error>,
76{
77 type Response = RestResponse<Data>;
78 type Error = RestError;
79 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
80
81 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
82 self.http.poll_ready(cx).map_err(RestError::from)
83 }
84
85 fn call(&mut self, req: RestRequest<R>) -> Self::Future {
86 match req.to_http(&self.endpoint, self.key.as_ref()) {
87 Ok(req) => {
88 tracing::trace!("sent http request: {}", req.uri());
89 self.http
90 .call(req)
91 .map_err(RestError::from)
92 .and_then(RestResponse::from_http)
93 .boxed()
94 }
95 Err(err) => futures::future::ready(Err(err)).boxed(),
96 }
97 }
98}
99
100#[derive(Clone)]
102pub struct BinanceRestApi<S> {
103 inner: Retry<Policy, BinanceRestApiInner<S>>,
104}
105
106impl<S> Service<RestRequest<Payload>> for BinanceRestApi<S>
107where
108 S: Clone + Send + 'static,
109 S: Service<http::Request<hyper::Body>, Response = http::Response<hyper::Body>>,
110 S::Future: Send + 'static,
111 S::Error: 'static,
112 RestError: From<S::Error>,
113{
114 type Response = RestResponse<Data>;
115 type Error = RestError;
116 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
117
118 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119 self.inner.poll_ready(cx)
120 }
121
122 fn call(&mut self, req: RestRequest<Payload>) -> Self::Future {
123 self.inner.call(req).boxed()
124 }
125}