exc_binance/
service.rs

1use std::task::{Context, Poll};
2
3use exc_core::{transport::http::channel::HttpsChannel, ExchangeError};
4use futures::{future::BoxFuture, FutureExt, TryFutureExt};
5use tower::{
6    buffer::Buffer,
7    ready_cache::{error::Failed, ReadyCache},
8    util::Either,
9    Service,
10};
11
12use crate::{
13    endpoint::Endpoint,
14    http::{
15        request::{Payload, RestRequest},
16        BinanceRestApi,
17    },
18    types::{request::Request, response::Response},
19    websocket::{request::WsRequest, BinanceWebsocketApi},
20    Error, SpotOptions,
21};
22
23type Http = BinanceRestApi<HttpsChannel>;
24type Ws = BinanceWebsocketApi;
25
26pub(crate) const HTTP_KEY: &str = "http";
27pub(crate) const WS_KEY: &str = "ws";
28
29impl Service<Request> for Http {
30    type Response = Response;
31    type Error = Error;
32    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
33
34    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
35        Service::<RestRequest<Payload>>::poll_ready(self, cx).map_err(Error::from)
36    }
37
38    fn call(&mut self, req: Request) -> Self::Future {
39        if let Request::Http(req) = req {
40            Service::call(self, req)
41                .map_ok(|resp| Response::Http(Box::new(resp)))
42                .map_err(Error::from)
43                .boxed()
44        } else {
45            futures::future::ready(Err(Error::WrongResponseType)).boxed()
46        }
47    }
48}
49
50impl Service<Request> for Ws {
51    type Response = Response;
52    type Error = Error;
53    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
54
55    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
56        Service::<WsRequest>::poll_ready(self, cx).map_err(Error::from)
57    }
58
59    fn call(&mut self, req: Request) -> Self::Future {
60        if let Request::Ws(req) = req {
61            Service::call(self, req)
62                .map_ok(Response::Ws)
63                .map_err(Error::from)
64                .boxed()
65        } else {
66            futures::future::ready(Err(Error::WrongResponseType)).boxed()
67        }
68    }
69}
70
71pub(crate) struct BinanceInner {
72    pub(crate) svcs: ReadyCache<&'static str, Either<Http, Ws>, Request>,
73}
74
75impl Service<Request> for BinanceInner {
76    type Response = Response;
77    type Error = Error;
78    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
79
80    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81        self.svcs
82            .poll_pending(cx)
83            .map_err(|Failed(k, s)| Error::Unknown(anyhow::anyhow!("poll {k} ready error: {s}")))
84    }
85
86    fn call(&mut self, req: Request) -> Self::Future {
87        match &req {
88            Request::Http(_) => self
89                .svcs
90                .call_ready(&HTTP_KEY, req)
91                .map_err(|err| match err.downcast::<Error>() {
92                    Ok(err) => *err,
93                    Err(err) => Error::Unknown(anyhow::anyhow!("{}", err)),
94                })
95                .boxed(),
96            Request::Ws(_) => self
97                .svcs
98                .call_ready(&WS_KEY, req)
99                .map_err(|err| match err.downcast::<Error>() {
100                    Ok(err) => *err,
101                    Err(err) => Error::Unknown(anyhow::anyhow!("{}", err)),
102                })
103                .boxed(),
104        }
105    }
106}
107
108/// Binance.
109#[derive(Clone)]
110pub struct Binance {
111    pub(crate) inner: Buffer<BinanceInner, Request>,
112}
113
114impl Binance {
115    /// Usd-margin futures endpoint.
116    pub fn usd_margin_futures() -> Endpoint {
117        Endpoint::usd_margin_futures()
118    }
119
120    /// Spot endpoint.
121    pub fn spot() -> Endpoint {
122        Endpoint::spot_with_options(SpotOptions::default())
123    }
124
125    /// Spot endpoint.
126    pub fn spot_with_options(options: SpotOptions) -> Endpoint {
127        Endpoint::spot_with_options(options)
128    }
129
130    /// European options endpoint.
131    pub fn european_options() -> Endpoint {
132        Endpoint::european_options()
133    }
134}
135
136impl Service<Request> for Binance {
137    type Response = Response;
138    type Error = ExchangeError;
139    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
140
141    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
142        self.inner
143            .poll_ready(cx)
144            .map_err(|err| match err.downcast::<Error>() {
145                Ok(err) => *err,
146                Err(err) => Error::Unknown(anyhow::anyhow!("{}", err)),
147            })
148            .map_err(ExchangeError::from)
149    }
150
151    fn call(&mut self, req: Request) -> Self::Future {
152        self.inner
153            .call(req)
154            .map_err(|err| match err.downcast::<Error>() {
155                Ok(err) => *err,
156                Err(err) => Error::Unknown(anyhow::anyhow!("{}", err)),
157            })
158            .map_err(ExchangeError::from)
159            .boxed()
160    }
161}