exc_okx/websocket/transport/protocol/
message.rs

1use crate::websocket::types::messages::{
2    event::Event,
3    request::{WsRequest, WsRequestMessage},
4};
5use futures::{Sink, SinkExt, Stream, StreamExt};
6use thiserror::Error;
7
8/// Okx websocket message layer errors.
9#[derive(Debug, Error)]
10pub enum MessageError<E> {
11    /// Json error.
12    #[error("[message] serializing: {0}")]
13    Serializing(serde_json::Error),
14    /// Transport error.
15    #[error("{0}")]
16    Transport(#[from] E),
17}
18
19pub(super) fn layer<T, E>(
20    transport: T,
21) -> impl Sink<WsRequest, Error = MessageError<E>> + Stream<Item = Result<Event, MessageError<E>>>
22where
23    T: Sink<String, Error = E>,
24    T: Stream<Item = Result<String, E>>,
25{
26    transport
27        .sink_map_err(MessageError::from)
28        .with(|msg: WsRequest| async move {
29            let msg: WsRequestMessage = msg.into();
30            let msg = serde_json::to_string(&msg).map_err(MessageError::Serializing)?;
31            Ok(msg)
32        })
33        .filter_map(|msg| async move {
34            match msg {
35                Ok(msg) => match serde_json::from_str::<Event>(&msg) {
36                    Ok(event) => {
37                        trace!("message layer; received event={event:?}");
38                        Some(Ok(event))
39                    }
40                    Err(err) => {
41                        warn!("message layer; deserializing message error: {err}, msg={msg}; ignored");
42                        None
43                    }
44                },
45                Err(err) => Some(Err(err.into())),
46            }
47        })
48}