exc_okx/websocket/
adaptations.rs

1use exc_core::{
2    types::{
3        instrument::{InstrumentMeta, SubscribeInstruments},
4        trading::{CancelOrder, OrderId, PlaceOrder},
5        BidAsk, Canceled, OrderUpdate, Placed, SubscribeBidAsk, SubscribeOrders, SubscribeTrades,
6        Trade,
7    },
8    Adaptor, ExchangeError,
9};
10use futures::{future::ready, stream::iter, FutureExt, StreamExt, TryStreamExt};
11use time::OffsetDateTime;
12
13use crate::{error::OkxError, utils::inst_tag::parse_inst_tag};
14
15use super::{
16    types::{
17        messages::{
18            event::{order::OkxOrder, Event, OkxInstrumentMeta, TradeResponse},
19            Args,
20        },
21        response::StatusKind,
22    },
23    Request, Response,
24};
25
26impl Adaptor<SubscribeInstruments> for Request {
27    fn from_request(req: SubscribeInstruments) -> Result<Self, exc_core::ExchangeError>
28    where
29        Self: Sized,
30    {
31        let (ty, _) = parse_inst_tag(&req.tag)?;
32        Ok(Self::subscribe(Args::subscribe_instruments(&ty)))
33    }
34
35    fn into_response(
36        resp: Self::Response,
37    ) -> Result<<SubscribeInstruments as exc_core::Request>::Response, ExchangeError> {
38        match resp {
39            Response::Error(err) => Err(ExchangeError::Other(anyhow::anyhow!("status: {err}"))),
40            Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
41                "invalid response kind"
42            ))),
43            Response::Streaming(stream) => {
44                let stream = stream
45                    .skip(1)
46                    .filter_map(|frame| {
47                        ready(match frame {
48                            Ok(frame) => frame.into_change().map(Ok),
49                            Err(err) => Some(Err(err)),
50                        })
51                    })
52                    .flat_map(|change| match change {
53                        Ok(change) => iter(change.deserialize_data::<OkxInstrumentMeta>())
54                            .filter_map(|m| match m {
55                                Ok(m) => ready(Some(
56                                    InstrumentMeta::try_from(m).map_err(ExchangeError::from),
57                                )),
58                                Err(err) => {
59                                    error!("deserialize instrument meta error: {err}, skipped.");
60                                    ready(None)
61                                }
62                            })
63                            .left_stream(),
64                        Err(err) => {
65                            futures::stream::once(
66                                async move { Err(ExchangeError::Other(err.into())) },
67                            )
68                            .right_stream()
69                        }
70                    })
71                    .boxed();
72                Ok(stream)
73            }
74        }
75    }
76}
77
78impl Adaptor<SubscribeOrders> for Request {
79    fn from_request(req: SubscribeOrders) -> Result<Self, exc_core::ExchangeError>
80    where
81        Self: Sized,
82    {
83        Ok(Self::subscribe(Args::subscribe_orders(&req.instrument)))
84    }
85
86    fn into_response(
87        resp: Self::Response,
88    ) -> Result<<SubscribeOrders as exc_core::Request>::Response, ExchangeError> {
89        match resp {
90            Response::Error(err) => Err(ExchangeError::Other(anyhow::anyhow!("status: {err}"))),
91            Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
92                "invalid response kind"
93            ))),
94            Response::Streaming(stream) => {
95                let stream = stream
96                    .skip(1)
97                    .filter_map(|frame| {
98                        ready(match frame {
99                            Ok(frame) => frame.into_change().map(Ok),
100                            Err(err) => Some(Err(err)),
101                        })
102                    })
103                    .flat_map(|change| match change {
104                        Ok(change) => iter(change.deserialize_data::<OkxOrder>())
105                            .filter_map(|m| {
106                                match m.map_err(OkxError::from).and_then(OrderUpdate::try_from) {
107                                    Ok(m) => ready(Some(Ok(m))),
108                                    Err(err) => {
109                                        error!(%err, "deserialize order error, skipped.");
110                                        ready(None)
111                                    }
112                                }
113                            })
114                            .left_stream(),
115                        Err(err) => {
116                            futures::stream::once(
117                                async move { Err(ExchangeError::Other(err.into())) },
118                            )
119                            .right_stream()
120                        }
121                    })
122                    .boxed();
123                Ok(stream)
124            }
125        }
126    }
127}
128
129impl Adaptor<PlaceOrder> for Request {
130    fn from_request(req: PlaceOrder) -> Result<Self, ExchangeError>
131    where
132        Self: Sized,
133    {
134        Ok(Self::order(&req))
135    }
136
137    fn into_response(
138        resp: Self::Response,
139    ) -> Result<<PlaceOrder as exc_core::Request>::Response, ExchangeError> {
140        let resp = resp.into_unary().map_err(OkxError::Api)?;
141
142        Ok(async move {
143            let event = resp.await?.inner;
144            let (ts, id) = if let Event::TradeResponse(TradeResponse::Order {
145                code,
146                msg,
147                mut data,
148                ..
149            }) = event
150            {
151                if code == "0" {
152                    if let Some(data) = data.pop() {
153                        #[cfg(not(feature = "prefer-client-id"))]
154                        {
155                            let id = OrderId::from(data.ord_id);
156                            Ok((OffsetDateTime::now_utc(), id))
157                        }
158                        #[cfg(feature = "prefer-client-id")]
159                        if let Some(id) = if data.cl_ord_id.is_empty() {
160                            None
161                        } else {
162                            Some(data.cl_ord_id)
163                        } {
164                            Ok((OffsetDateTime::now_utc(), OrderId::from(id)))
165                        } else {
166                            Err(OkxError::MissingClientId)
167                        }
168                    } else {
169                        Err(OkxError::Api(StatusKind::EmptyResponse))
170                    }
171                } else if let Some(data) = data.pop() {
172                    Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
173                        "code={} msg={}",
174                        data.s_code,
175                        data.s_msg
176                    ))))
177                } else {
178                    Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
179                        "code={code} msg={msg}"
180                    ))))
181                }
182            } else {
183                Err(OkxError::UnexpectedDataType(anyhow::anyhow!("{event:?}")))
184            }?;
185            Ok(Placed {
186                id,
187                order: None,
188                ts,
189            })
190        }
191        .boxed())
192    }
193}
194
195impl Adaptor<CancelOrder> for Request {
196    fn from_request(req: CancelOrder) -> Result<Self, ExchangeError>
197    where
198        Self: Sized,
199    {
200        Ok(Self::cancel_order(&req.instrument, req.id.as_str()))
201    }
202
203    fn into_response(
204        resp: Self::Response,
205    ) -> Result<<CancelOrder as exc_core::Request>::Response, ExchangeError> {
206        let resp = resp.into_unary().map_err(OkxError::Api)?;
207
208        Ok(async move {
209            let event = resp.await?.inner;
210            if let Event::TradeResponse(TradeResponse::CancelOrder {
211                code,
212                msg,
213                mut data,
214                ..
215            }) = event
216            {
217                if code == "0" {
218                    if let Some(_data) = data.pop() {
219                        Ok(Canceled {
220                            ts: OffsetDateTime::now_utc(),
221                            order: None,
222                        })
223                    } else {
224                        Err(OkxError::Api(StatusKind::EmptyResponse))
225                    }
226                } else if let Some(data) = data.pop() {
227                    Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
228                        "code={} msg={}",
229                        data.s_code,
230                        data.s_msg
231                    ))))
232                } else {
233                    Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
234                        "code={code} msg={msg}"
235                    ))))
236                }
237            } else {
238                Err(OkxError::UnexpectedDataType(anyhow::anyhow!("{event:?}")))
239            }?;
240            Ok(Canceled {
241                ts: OffsetDateTime::now_utc(),
242                order: None,
243            })
244        }
245        .boxed())
246    }
247}
248
249impl Adaptor<SubscribeTrades> for Request {
250    fn from_request(req: SubscribeTrades) -> Result<Self, ExchangeError> {
251        Ok(Self::subscribe_trades(&req.instrument))
252    }
253
254    fn into_response(
255        resp: Self::Response,
256    ) -> Result<<SubscribeTrades as exc_core::Request>::Response, ExchangeError> {
257        match resp {
258            Response::Streaming(stream) => {
259                let stream = stream
260                    .skip(1)
261                    .flat_map(|frame| {
262                        let res: Result<Vec<Result<Trade, OkxError>>, OkxError> =
263                            frame.and_then(|f| f.inner.try_into());
264                        match res {
265                            Ok(tickers) => futures::stream::iter(tickers).left_stream(),
266                            Err(err) => {
267                                futures::stream::once(async move { Err(err) }).right_stream()
268                            }
269                        }
270                    })
271                    .map_err(ExchangeError::from)
272                    .boxed();
273                Ok(stream)
274            }
275            Response::Error(status) => Err(OkxError::Api(status).into()),
276            Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
277                "invalid response kind"
278            ))),
279        }
280    }
281}
282
283impl Adaptor<SubscribeBidAsk> for Request {
284    fn from_request(req: SubscribeBidAsk) -> Result<Self, ExchangeError> {
285        Ok(Self::subscribe_bid_ask(&req.instrument))
286    }
287
288    fn into_response(
289        resp: Self::Response,
290    ) -> Result<<SubscribeBidAsk as exc_core::Request>::Response, ExchangeError> {
291        match resp {
292            Response::Streaming(stream) => {
293                let stream = stream
294                    .skip(1)
295                    .flat_map(|frame| {
296                        let res: Result<Vec<Result<BidAsk, OkxError>>, OkxError> =
297                            frame.and_then(|f| f.inner.try_into());
298                        match res {
299                            Ok(tickers) => futures::stream::iter(tickers).left_stream(),
300                            Err(err) => {
301                                futures::stream::once(async move { Err(err) }).right_stream()
302                            }
303                        }
304                    })
305                    .map_err(ExchangeError::from)
306                    .boxed();
307                Ok(stream)
308            }
309            Response::Error(status) => Err(OkxError::Api(status).into()),
310            Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
311                "invalid response kind"
312            ))),
313        }
314    }
315}