exc-okx 0.7.3

OKX exchange services
Documentation
use exc_core::{
    types::{
        instrument::{InstrumentMeta, SubscribeInstruments},
        trading::{CancelOrder, OrderId, PlaceOrder},
        BidAsk, Canceled, OrderUpdate, Placed, SubscribeBidAsk, SubscribeOrders, SubscribeTrades,
        Trade,
    },
    Adaptor, ExchangeError,
};
use futures::{future::ready, stream::iter, FutureExt, StreamExt, TryStreamExt};
use time::OffsetDateTime;

use crate::{error::OkxError, utils::inst_tag::parse_inst_tag};

use super::{
    types::{
        messages::{
            event::{order::OkxOrder, Event, OkxInstrumentMeta, TradeResponse},
            Args,
        },
        response::StatusKind,
    },
    Request, Response,
};

impl Adaptor<SubscribeInstruments> for Request {
    fn from_request(req: SubscribeInstruments) -> Result<Self, exc_core::ExchangeError>
    where
        Self: Sized,
    {
        let (ty, _) = parse_inst_tag(&req.tag)?;
        Ok(Self::subscribe(Args::subscribe_instruments(&ty)))
    }

    fn into_response(
        resp: Self::Response,
    ) -> Result<<SubscribeInstruments as exc_core::Request>::Response, ExchangeError> {
        match resp {
            Response::Error(err) => Err(ExchangeError::Other(anyhow::anyhow!("status: {err}"))),
            Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
                "invalid response kind"
            ))),
            Response::Streaming(stream) => {
                let stream = stream
                    .skip(1)
                    .filter_map(|frame| {
                        ready(match frame {
                            Ok(frame) => frame.into_change().map(Ok),
                            Err(err) => Some(Err(err)),
                        })
                    })
                    .flat_map(|change| match change {
                        Ok(change) => iter(change.deserialize_data::<OkxInstrumentMeta>())
                            .filter_map(|m| match m {
                                Ok(m) => ready(Some(
                                    InstrumentMeta::try_from(m).map_err(ExchangeError::from),
                                )),
                                Err(err) => {
                                    error!("deserialize instrument meta error: {err}, skipped.");
                                    ready(None)
                                }
                            })
                            .left_stream(),
                        Err(err) => {
                            futures::stream::once(
                                async move { Err(ExchangeError::Other(err.into())) },
                            )
                            .right_stream()
                        }
                    })
                    .boxed();
                Ok(stream)
            }
        }
    }
}

impl Adaptor<SubscribeOrders> for Request {
    fn from_request(req: SubscribeOrders) -> Result<Self, exc_core::ExchangeError>
    where
        Self: Sized,
    {
        Ok(Self::subscribe(Args::subscribe_orders(&req.instrument)))
    }

    fn into_response(
        resp: Self::Response,
    ) -> Result<<SubscribeOrders as exc_core::Request>::Response, ExchangeError> {
        match resp {
            Response::Error(err) => Err(ExchangeError::Other(anyhow::anyhow!("status: {err}"))),
            Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
                "invalid response kind"
            ))),
            Response::Streaming(stream) => {
                let stream = stream
                    .skip(1)
                    .filter_map(|frame| {
                        ready(match frame {
                            Ok(frame) => frame.into_change().map(Ok),
                            Err(err) => Some(Err(err)),
                        })
                    })
                    .flat_map(|change| match change {
                        Ok(change) => iter(change.deserialize_data::<OkxOrder>())
                            .filter_map(|m| {
                                match m.map_err(OkxError::from).and_then(OrderUpdate::try_from) {
                                    Ok(m) => ready(Some(Ok(m))),
                                    Err(err) => {
                                        error!(%err, "deserialize order error, skipped.");
                                        ready(None)
                                    }
                                }
                            })
                            .left_stream(),
                        Err(err) => {
                            futures::stream::once(
                                async move { Err(ExchangeError::Other(err.into())) },
                            )
                            .right_stream()
                        }
                    })
                    .boxed();
                Ok(stream)
            }
        }
    }
}

impl Adaptor<PlaceOrder> for Request {
    fn from_request(req: PlaceOrder) -> Result<Self, ExchangeError>
    where
        Self: Sized,
    {
        Ok(Self::order(&req))
    }

    fn into_response(
        resp: Self::Response,
    ) -> Result<<PlaceOrder as exc_core::Request>::Response, ExchangeError> {
        let resp = resp.into_unary().map_err(OkxError::Api)?;

        Ok(async move {
            let event = resp.await?.inner;
            let (ts, id) = if let Event::TradeResponse(TradeResponse::Order {
                code,
                msg,
                mut data,
                ..
            }) = event
            {
                if code == "0" {
                    if let Some(data) = data.pop() {
                        #[cfg(not(feature = "prefer-client-id"))]
                        {
                            let id = OrderId::from(data.ord_id);
                            Ok((OffsetDateTime::now_utc(), id))
                        }
                        #[cfg(feature = "prefer-client-id")]
                        if let Some(id) = if data.cl_ord_id.is_empty() {
                            None
                        } else {
                            Some(data.cl_ord_id)
                        } {
                            Ok((OffsetDateTime::now_utc(), OrderId::from(id)))
                        } else {
                            Err(OkxError::MissingClientId)
                        }
                    } else {
                        Err(OkxError::Api(StatusKind::EmptyResponse))
                    }
                } else if let Some(data) = data.pop() {
                    Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
                        "code={} msg={}",
                        data.s_code,
                        data.s_msg
                    ))))
                } else {
                    Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
                        "code={code} msg={msg}"
                    ))))
                }
            } else {
                Err(OkxError::UnexpectedDataType(anyhow::anyhow!("{event:?}")))
            }?;
            Ok(Placed {
                id,
                order: None,
                ts,
            })
        }
        .boxed())
    }
}

impl Adaptor<CancelOrder> for Request {
    fn from_request(req: CancelOrder) -> Result<Self, ExchangeError>
    where
        Self: Sized,
    {
        Ok(Self::cancel_order(&req.instrument, req.id.as_str()))
    }

    fn into_response(
        resp: Self::Response,
    ) -> Result<<CancelOrder as exc_core::Request>::Response, ExchangeError> {
        let resp = resp.into_unary().map_err(OkxError::Api)?;

        Ok(async move {
            let event = resp.await?.inner;
            if let Event::TradeResponse(TradeResponse::CancelOrder {
                code,
                msg,
                mut data,
                ..
            }) = event
            {
                if code == "0" {
                    if let Some(_data) = data.pop() {
                        Ok(Canceled {
                            ts: OffsetDateTime::now_utc(),
                            order: None,
                        })
                    } else {
                        Err(OkxError::Api(StatusKind::EmptyResponse))
                    }
                } else if let Some(data) = data.pop() {
                    Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
                        "code={} msg={}",
                        data.s_code,
                        data.s_msg
                    ))))
                } else {
                    Err(OkxError::Api(StatusKind::Other(anyhow::anyhow!(
                        "code={code} msg={msg}"
                    ))))
                }
            } else {
                Err(OkxError::UnexpectedDataType(anyhow::anyhow!("{event:?}")))
            }?;
            Ok(Canceled {
                ts: OffsetDateTime::now_utc(),
                order: None,
            })
        }
        .boxed())
    }
}

impl Adaptor<SubscribeTrades> for Request {
    fn from_request(req: SubscribeTrades) -> Result<Self, ExchangeError> {
        Ok(Self::subscribe_trades(&req.instrument))
    }

    fn into_response(
        resp: Self::Response,
    ) -> Result<<SubscribeTrades as exc_core::Request>::Response, ExchangeError> {
        match resp {
            Response::Streaming(stream) => {
                let stream = stream
                    .skip(1)
                    .flat_map(|frame| {
                        let res: Result<Vec<Result<Trade, OkxError>>, OkxError> =
                            frame.and_then(|f| f.inner.try_into());
                        match res {
                            Ok(tickers) => futures::stream::iter(tickers).left_stream(),
                            Err(err) => {
                                futures::stream::once(async move { Err(err) }).right_stream()
                            }
                        }
                    })
                    .map_err(ExchangeError::from)
                    .boxed();
                Ok(stream)
            }
            Response::Error(status) => Err(OkxError::Api(status).into()),
            Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
                "invalid response kind"
            ))),
        }
    }
}

impl Adaptor<SubscribeBidAsk> for Request {
    fn from_request(req: SubscribeBidAsk) -> Result<Self, ExchangeError> {
        Ok(Self::subscribe_bid_ask(&req.instrument))
    }

    fn into_response(
        resp: Self::Response,
    ) -> Result<<SubscribeBidAsk as exc_core::Request>::Response, ExchangeError> {
        match resp {
            Response::Streaming(stream) => {
                let stream = stream
                    .skip(1)
                    .flat_map(|frame| {
                        let res: Result<Vec<Result<BidAsk, OkxError>>, OkxError> =
                            frame.and_then(|f| f.inner.try_into());
                        match res {
                            Ok(tickers) => futures::stream::iter(tickers).left_stream(),
                            Err(err) => {
                                futures::stream::once(async move { Err(err) }).right_stream()
                            }
                        }
                    })
                    .map_err(ExchangeError::from)
                    .boxed();
                Ok(stream)
            }
            Response::Error(status) => Err(OkxError::Api(status).into()),
            Response::Reconnected => Err(ExchangeError::Other(anyhow::anyhow!(
                "invalid response kind"
            ))),
        }
    }
}