1use exc_core::{
2 types::{
3 instrument::{InstrumentMeta, SubscribeInstruments},
4 trading::{CancelOrder, OrderId, PlaceOrder},
5 BidAsk, Canceled, OrderUpdate, Placed, SubscribeBidAsk, SubscribeOrders, SubscribeTrades,
6 Trade,
7 },
8 Adaptor, ExchangeError,
9};
10use futures::{future::ready, stream::iter, FutureExt, StreamExt, TryStreamExt};
11use time::OffsetDateTime;
12
13use crate::{error::OkxError, utils::inst_tag::parse_inst_tag};
14
15use super::{
16 types::{
17 messages::{
18 event::{order::OkxOrder, Event, OkxInstrumentMeta, TradeResponse},
19 Args,
20 },
21 response::StatusKind,
22 },
23 Request, Response,
24};
25
26impl Adaptor<SubscribeInstruments> for Request {
27 fn from_request(req: SubscribeInstruments) -> Result<Self, exc_core::ExchangeError>
28 where
29 Self: Sized,
30 {
31 let (ty, _) = parse_inst_tag(&req.tag)?;
32 Ok(Self::subscribe(Args::subscribe_instruments(&ty)))
33 }
34
35 fn into_response(
36 resp: Self::Response,
37 ) -> Result<<SubscribeInstruments as exc_core::Request>::Response, ExchangeError> {
38 match resp {
39 Response::Error(err) => Err(ExchangeError::Other(anyhow::anyhow!("status: {err}"))),
40 Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
41 "invalid response kind"
42 ))),
43 Response::Streaming(stream) => {
44 let stream = stream
45 .skip(1)
46 .filter_map(|frame| {
47 ready(match frame {
48 Ok(frame) => frame.into_change().map(Ok),
49 Err(err) => Some(Err(err)),
50 })
51 })
52 .flat_map(|change| match change {
53 Ok(change) => iter(change.deserialize_data::<OkxInstrumentMeta>())
54 .filter_map(|m| match m {
55 Ok(m) => ready(Some(
56 InstrumentMeta::try_from(m).map_err(ExchangeError::from),
57 )),
58 Err(err) => {
59 error!("deserialize instrument meta error: {err}, skipped.");
60 ready(None)
61 }
62 })
63 .left_stream(),
64 Err(err) => {
65 futures::stream::once(
66 async move { Err(ExchangeError::Other(err.into())) },
67 )
68 .right_stream()
69 }
70 })
71 .boxed();
72 Ok(stream)
73 }
74 }
75 }
76}
77
78impl Adaptor<SubscribeOrders> for Request {
79 fn from_request(req: SubscribeOrders) -> Result<Self, exc_core::ExchangeError>
80 where
81 Self: Sized,
82 {
83 Ok(Self::subscribe(Args::subscribe_orders(&req.instrument)))
84 }
85
86 fn into_response(
87 resp: Self::Response,
88 ) -> Result<<SubscribeOrders as exc_core::Request>::Response, ExchangeError> {
89 match resp {
90 Response::Error(err) => Err(ExchangeError::Other(anyhow::anyhow!("status: {err}"))),
91 Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
92 "invalid response kind"
93 ))),
94 Response::Streaming(stream) => {
95 let stream = stream
96 .skip(1)
97 .filter_map(|frame| {
98 ready(match frame {
99 Ok(frame) => frame.into_change().map(Ok),
100 Err(err) => Some(Err(err)),
101 })
102 })
103 .flat_map(|change| match change {
104 Ok(change) => iter(change.deserialize_data::<OkxOrder>())
105 .filter_map(|m| {
106 match m.map_err(OkxError::from).and_then(OrderUpdate::try_from) {
107 Ok(m) => ready(Some(Ok(m))),
108 Err(err) => {
109 error!(%err, "deserialize order error, skipped.");
110 ready(None)
111 }
112 }
113 })
114 .left_stream(),
115 Err(err) => {
116 futures::stream::once(
117 async move { Err(ExchangeError::Other(err.into())) },
118 )
119 .right_stream()
120 }
121 })
122 .boxed();
123 Ok(stream)
124 }
125 }
126 }
127}
128
129impl Adaptor<PlaceOrder> for Request {
130 fn from_request(req: PlaceOrder) -> Result<Self, ExchangeError>
131 where
132 Self: Sized,
133 {
134 Ok(Self::order(&req))
135 }
136
137 fn into_response(
138 resp: Self::Response,
139 ) -> Result<<PlaceOrder as exc_core::Request>::Response, ExchangeError> {
140 let resp = resp.into_unary().map_err(OkxError::Api)?;
141
142 Ok(async move {
143 let event = resp.await?.inner;
144 let (ts, id) = if let Event::TradeResponse(TradeResponse::Order {
145 code,
146 msg,
147 mut data,
148 ..
149 }) = event
150 {
151 if code == "0" {
152 if let Some(data) = data.pop() {
153 #[cfg(not(feature = "prefer-client-id"))]
154 {
155 let id = OrderId::from(data.ord_id);
156 Ok((OffsetDateTime::now_utc(), id))
157 }
158 #[cfg(feature = "prefer-client-id")]
159 if let Some(id) = if data.cl_ord_id.is_empty() {
160 None
161 } else {
162 Some(data.cl_ord_id)
163 } {
164 Ok((OffsetDateTime::now_utc(), OrderId::from(id)))
165 } else {
166 Err(OkxError::MissingClientId)
167 }
168 } else {
169 Err(OkxError::Api(StatusKind::EmptyResponse))
170 }
171 } else if let Some(data) = data.pop() {
172 Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
173 "code={} msg={}",
174 data.s_code,
175 data.s_msg
176 ))))
177 } else {
178 Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
179 "code={code} msg={msg}"
180 ))))
181 }
182 } else {
183 Err(OkxError::UnexpectedDataType(anyhow::anyhow!("{event:?}")))
184 }?;
185 Ok(Placed {
186 id,
187 order: None,
188 ts,
189 })
190 }
191 .boxed())
192 }
193}
194
195impl Adaptor<CancelOrder> for Request {
196 fn from_request(req: CancelOrder) -> Result<Self, ExchangeError>
197 where
198 Self: Sized,
199 {
200 Ok(Self::cancel_order(&req.instrument, req.id.as_str()))
201 }
202
203 fn into_response(
204 resp: Self::Response,
205 ) -> Result<<CancelOrder as exc_core::Request>::Response, ExchangeError> {
206 let resp = resp.into_unary().map_err(OkxError::Api)?;
207
208 Ok(async move {
209 let event = resp.await?.inner;
210 if let Event::TradeResponse(TradeResponse::CancelOrder {
211 code,
212 msg,
213 mut data,
214 ..
215 }) = event
216 {
217 if code == "0" {
218 if let Some(_data) = data.pop() {
219 Ok(Canceled {
220 ts: OffsetDateTime::now_utc(),
221 order: None,
222 })
223 } else {
224 Err(OkxError::Api(StatusKind::EmptyResponse))
225 }
226 } else if let Some(data) = data.pop() {
227 Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
228 "code={} msg={}",
229 data.s_code,
230 data.s_msg
231 ))))
232 } else {
233 Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
234 "code={code} msg={msg}"
235 ))))
236 }
237 } else {
238 Err(OkxError::UnexpectedDataType(anyhow::anyhow!("{event:?}")))
239 }?;
240 Ok(Canceled {
241 ts: OffsetDateTime::now_utc(),
242 order: None,
243 })
244 }
245 .boxed())
246 }
247}
248
249impl Adaptor<SubscribeTrades> for Request {
250 fn from_request(req: SubscribeTrades) -> Result<Self, ExchangeError> {
251 Ok(Self::subscribe_trades(&req.instrument))
252 }
253
254 fn into_response(
255 resp: Self::Response,
256 ) -> Result<<SubscribeTrades as exc_core::Request>::Response, ExchangeError> {
257 match resp {
258 Response::Streaming(stream) => {
259 let stream = stream
260 .skip(1)
261 .flat_map(|frame| {
262 let res: Result<Vec<Result<Trade, OkxError>>, OkxError> =
263 frame.and_then(|f| f.inner.try_into());
264 match res {
265 Ok(tickers) => futures::stream::iter(tickers).left_stream(),
266 Err(err) => {
267 futures::stream::once(async move { Err(err) }).right_stream()
268 }
269 }
270 })
271 .map_err(ExchangeError::from)
272 .boxed();
273 Ok(stream)
274 }
275 Response::Error(status) => Err(OkxError::Api(status).into()),
276 Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
277 "invalid response kind"
278 ))),
279 }
280 }
281}
282
283impl Adaptor<SubscribeBidAsk> for Request {
284 fn from_request(req: SubscribeBidAsk) -> Result<Self, ExchangeError> {
285 Ok(Self::subscribe_bid_ask(&req.instrument))
286 }
287
288 fn into_response(
289 resp: Self::Response,
290 ) -> Result<<SubscribeBidAsk as exc_core::Request>::Response, ExchangeError> {
291 match resp {
292 Response::Streaming(stream) => {
293 let stream = stream
294 .skip(1)
295 .flat_map(|frame| {
296 let res: Result<Vec<Result<BidAsk, OkxError>>, OkxError> =
297 frame.and_then(|f| f.inner.try_into());
298 match res {
299 Ok(tickers) => futures::stream::iter(tickers).left_stream(),
300 Err(err) => {
301 futures::stream::once(async move { Err(err) }).right_stream()
302 }
303 }
304 })
305 .map_err(ExchangeError::from)
306 .boxed();
307 Ok(stream)
308 }
309 Response::Error(status) => Err(OkxError::Api(status).into()),
310 Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
311 "invalid response kind"
312 ))),
313 }
314 }
315}