1use exc_core::retry::RetryPolicy;
2use exc_core::transport::http::channel::HttpsChannel;
3use exc_core::{ExchangeError, Request};
4use futures::future::{ready, BoxFuture};
5use futures::{FutureExt, TryFutureExt};
6use tower::buffer::Buffer;
7use tower::ready_cache::ReadyCache;
8use tower::retry::Retry;
9use tower::util::Either;
10use tower::Service;
11
12use crate::http::layer::OkxHttpApi;
13use crate::http::types::{request::HttpRequest, response::HttpResponse};
14use crate::websocket::transport::channel::Channel as WsChannel;
15use crate::websocket::types::messages::Args;
16use crate::websocket::{Request as WsRequest, Response as WsResponse};
17
18use self::endpoint::Endpoint;
19
20pub mod endpoint;
22
23mod adaptation;
24
25pub enum OkxRequest {
27 Http(HttpRequest),
29 Ws(WsRequest),
31}
32
33impl OkxRequest {
34 pub fn subscribe_orders(inst: &str) -> Self {
36 Self::Ws(WsRequest::subscribe_orders(inst))
37 }
38
39 pub fn subscribe(args: Args) -> Self {
41 Self::Ws(WsRequest::subscribe(args))
42 }
43}
44
45pub enum OkxResponse {
47 Http(HttpResponse),
49 Ws(WsResponse),
51}
52
53impl OkxResponse {
54 pub fn http(self) -> Result<HttpResponse, ExchangeError> {
56 if let Self::Http(res) = self {
57 Ok(res)
58 } else {
59 Err(ExchangeError::Other(anyhow::anyhow!(
60 "unexpected response type `ws`"
61 )))
62 }
63 }
64
65 pub fn ws(self) -> Result<WsResponse, ExchangeError> {
67 if let Self::Ws(res) = self {
68 Ok(res)
69 } else {
70 Err(ExchangeError::Other(anyhow::anyhow!(
71 "unexpected response type `http`"
72 )))
73 }
74 }
75}
76
77impl Request for OkxRequest {
78 type Response = OkxResponse;
79}
80
81type HttpInner = OkxHttpApi<HttpsChannel>;
82type Http = Retry<RetryPolicy<HttpRequest, HttpResponse, fn(&ExchangeError) -> bool>, HttpInner>;
83type Ws = WsChannel;
84
85impl Service<OkxRequest> for Http {
86 type Response = OkxResponse;
87
88 type Error = ExchangeError;
89
90 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
91
92 fn poll_ready(
93 &mut self,
94 cx: &mut std::task::Context<'_>,
95 ) -> std::task::Poll<Result<(), Self::Error>> {
96 Service::<HttpRequest>::poll_ready(self, cx)
97 }
98
99 fn call(&mut self, req: OkxRequest) -> Self::Future {
100 if let OkxRequest::Http(req) = req {
101 Service::call(self, req).map_ok(OkxResponse::Http).boxed()
102 } else {
103 ready(Err(ExchangeError::Other(anyhow::anyhow!(
104 "Invalid request type"
105 ))))
106 .boxed()
107 }
108 }
109}
110
111impl Service<OkxRequest> for Ws {
112 type Response = OkxResponse;
113
114 type Error = ExchangeError;
115
116 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
117
118 fn poll_ready(
119 &mut self,
120 cx: &mut std::task::Context<'_>,
121 ) -> std::task::Poll<Result<(), Self::Error>> {
122 Service::<WsRequest>::poll_ready(self, cx)
123 }
124
125 fn call(&mut self, req: OkxRequest) -> Self::Future {
126 if let OkxRequest::Ws(req) = req {
127 Service::call(self, req).map_ok(OkxResponse::Ws).boxed()
128 } else {
129 ready(Err(ExchangeError::Other(anyhow::anyhow!(
130 "Invalid request type"
131 ))))
132 .boxed()
133 }
134 }
135}
136
137struct Inner {
138 svcs: ReadyCache<&'static str, Either<Http, Ws>, OkxRequest>,
139}
140
141const HTTP_KEY: &str = "http";
142const WS_KEY: &str = "ws";
143
144impl Inner {
145 fn new(ws: Ws, http: Http) -> Self {
146 let mut svcs = ReadyCache::default();
147 svcs.push(WS_KEY, Either::B(ws));
148 svcs.push(HTTP_KEY, Either::A(http));
149 Inner { svcs }
150 }
151}
152
153impl Service<OkxRequest> for Inner {
154 type Response = OkxResponse;
155 type Error = ExchangeError;
156 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
157
158 fn poll_ready(
159 &mut self,
160 cx: &mut std::task::Context<'_>,
161 ) -> std::task::Poll<Result<(), Self::Error>> {
162 self.svcs
163 .poll_pending(cx)
164 .map_err(|err| ExchangeError::Unavailable(err.into()))
165 }
166
167 fn call(&mut self, req: OkxRequest) -> Self::Future {
168 let key = match &req {
169 OkxRequest::Http(_) => HTTP_KEY,
170 OkxRequest::Ws(_) => WS_KEY,
171 };
172 self.svcs
173 .call_ready(&key, req)
174 .map_err(ExchangeError::layer)
175 .boxed()
176 }
177}
178
179#[derive(Clone)]
181pub struct Okx {
182 inner: Buffer<Inner, OkxRequest>,
183}
184
185impl Okx {
186 fn new(ws: Ws, http: Http, cap: usize) -> Self {
187 Self {
188 inner: Buffer::new(Inner::new(ws, http), cap),
189 }
190 }
191
192 pub fn endpoint() -> Endpoint {
194 Endpoint::default()
195 }
196}
197
198impl Service<OkxRequest> for Okx {
199 type Response = OkxResponse;
200
201 type Error = ExchangeError;
202
203 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
204
205 fn poll_ready(
206 &mut self,
207 cx: &mut std::task::Context<'_>,
208 ) -> std::task::Poll<Result<(), Self::Error>> {
209 self.inner.poll_ready(cx).map_err(ExchangeError::layer)
210 }
211
212 fn call(&mut self, req: OkxRequest) -> Self::Future {
213 self.inner.call(req).map_err(ExchangeError::layer).boxed()
214 }
215}