exc_okx/service/
mod.rs

1use exc_core::retry::RetryPolicy;
2use exc_core::transport::http::channel::HttpsChannel;
3use exc_core::{ExchangeError, Request};
4use futures::future::{ready, BoxFuture};
5use futures::{FutureExt, TryFutureExt};
6use tower::buffer::Buffer;
7use tower::ready_cache::ReadyCache;
8use tower::retry::Retry;
9use tower::util::Either;
10use tower::Service;
11
12use crate::http::layer::OkxHttpApi;
13use crate::http::types::{request::HttpRequest, response::HttpResponse};
14use crate::websocket::transport::channel::Channel as WsChannel;
15use crate::websocket::types::messages::Args;
16use crate::websocket::{Request as WsRequest, Response as WsResponse};
17
18use self::endpoint::Endpoint;
19
20/// Endpoint.
21pub mod endpoint;
22
23mod adaptation;
24
25/// Okx request.
26pub enum OkxRequest {
27    /// Request of HTTP API.
28    Http(HttpRequest),
29    /// Request of WS API.
30    Ws(WsRequest),
31}
32
33impl OkxRequest {
34    /// Subscribe to orders channel.
35    pub fn subscribe_orders(inst: &str) -> Self {
36        Self::Ws(WsRequest::subscribe_orders(inst))
37    }
38
39    /// Subscribe with the given args.
40    pub fn subscribe(args: Args) -> Self {
41        Self::Ws(WsRequest::subscribe(args))
42    }
43}
44
45/// Okx response.
46pub enum OkxResponse {
47    /// Response from HTTP API.
48    Http(HttpResponse),
49    /// Response from WS API.
50    Ws(WsResponse),
51}
52
53impl OkxResponse {
54    /// Convert into http response.
55    pub fn http(self) -> Result<HttpResponse, ExchangeError> {
56        if let Self::Http(res) = self {
57            Ok(res)
58        } else {
59            Err(ExchangeError::Other(anyhow::anyhow!(
60                "unexpected response type `ws`"
61            )))
62        }
63    }
64
65    /// Convert into websocket response.
66    pub fn ws(self) -> Result<WsResponse, ExchangeError> {
67        if let Self::Ws(res) = self {
68            Ok(res)
69        } else {
70            Err(ExchangeError::Other(anyhow::anyhow!(
71                "unexpected response type `http`"
72            )))
73        }
74    }
75}
76
77impl Request for OkxRequest {
78    type Response = OkxResponse;
79}
80
81type HttpInner = OkxHttpApi<HttpsChannel>;
82type Http = Retry<RetryPolicy<HttpRequest, HttpResponse, fn(&ExchangeError) -> bool>, HttpInner>;
83type Ws = WsChannel;
84
85impl Service<OkxRequest> for Http {
86    type Response = OkxResponse;
87
88    type Error = ExchangeError;
89
90    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
91
92    fn poll_ready(
93        &mut self,
94        cx: &mut std::task::Context<'_>,
95    ) -> std::task::Poll<Result<(), Self::Error>> {
96        Service::<HttpRequest>::poll_ready(self, cx)
97    }
98
99    fn call(&mut self, req: OkxRequest) -> Self::Future {
100        if let OkxRequest::Http(req) = req {
101            Service::call(self, req).map_ok(OkxResponse::Http).boxed()
102        } else {
103            ready(Err(ExchangeError::Other(anyhow::anyhow!(
104                "Invalid request type"
105            ))))
106            .boxed()
107        }
108    }
109}
110
111impl Service<OkxRequest> for Ws {
112    type Response = OkxResponse;
113
114    type Error = ExchangeError;
115
116    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
117
118    fn poll_ready(
119        &mut self,
120        cx: &mut std::task::Context<'_>,
121    ) -> std::task::Poll<Result<(), Self::Error>> {
122        Service::<WsRequest>::poll_ready(self, cx)
123    }
124
125    fn call(&mut self, req: OkxRequest) -> Self::Future {
126        if let OkxRequest::Ws(req) = req {
127            Service::call(self, req).map_ok(OkxResponse::Ws).boxed()
128        } else {
129            ready(Err(ExchangeError::Other(anyhow::anyhow!(
130                "Invalid request type"
131            ))))
132            .boxed()
133        }
134    }
135}
136
137struct Inner {
138    svcs: ReadyCache<&'static str, Either<Http, Ws>, OkxRequest>,
139}
140
141const HTTP_KEY: &str = "http";
142const WS_KEY: &str = "ws";
143
144impl Inner {
145    fn new(ws: Ws, http: Http) -> Self {
146        let mut svcs = ReadyCache::default();
147        svcs.push(WS_KEY, Either::B(ws));
148        svcs.push(HTTP_KEY, Either::A(http));
149        Inner { svcs }
150    }
151}
152
153impl Service<OkxRequest> for Inner {
154    type Response = OkxResponse;
155    type Error = ExchangeError;
156    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
157
158    fn poll_ready(
159        &mut self,
160        cx: &mut std::task::Context<'_>,
161    ) -> std::task::Poll<Result<(), Self::Error>> {
162        self.svcs
163            .poll_pending(cx)
164            .map_err(|err| ExchangeError::Unavailable(err.into()))
165    }
166
167    fn call(&mut self, req: OkxRequest) -> Self::Future {
168        let key = match &req {
169            OkxRequest::Http(_) => HTTP_KEY,
170            OkxRequest::Ws(_) => WS_KEY,
171        };
172        self.svcs
173            .call_ready(&key, req)
174            .map_err(ExchangeError::layer)
175            .boxed()
176    }
177}
178
179/// Okx API.
180#[derive(Clone)]
181pub struct Okx {
182    inner: Buffer<Inner, OkxRequest>,
183}
184
185impl Okx {
186    fn new(ws: Ws, http: Http, cap: usize) -> Self {
187        Self {
188            inner: Buffer::new(Inner::new(ws, http), cap),
189        }
190    }
191
192    /// Create a default endpoint (the [`Okx`] builder).
193    pub fn endpoint() -> Endpoint {
194        Endpoint::default()
195    }
196}
197
198impl Service<OkxRequest> for Okx {
199    type Response = OkxResponse;
200
201    type Error = ExchangeError;
202
203    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
204
205    fn poll_ready(
206        &mut self,
207        cx: &mut std::task::Context<'_>,
208    ) -> std::task::Poll<Result<(), Self::Error>> {
209        self.inner.poll_ready(cx).map_err(ExchangeError::layer)
210    }
211
212    fn call(&mut self, req: OkxRequest) -> Self::Future {
213        self.inner.call(req).map_err(ExchangeError::layer).boxed()
214    }
215}