exc_okx/websocket/transport/protocol/
message.rs1use crate::websocket::types::messages::{
2 event::Event,
3 request::{WsRequest, WsRequestMessage},
4};
5use futures::{Sink, SinkExt, Stream, StreamExt};
6use thiserror::Error;
7
8#[derive(Debug, Error)]
10pub enum MessageError<E> {
11 #[error("[message] serializing: {0}")]
13 Serializing(serde_json::Error),
14 #[error("{0}")]
16 Transport(#[from] E),
17}
18
19pub(super) fn layer<T, E>(
20 transport: T,
21) -> impl Sink<WsRequest, Error = MessageError<E>> + Stream<Item = Result<Event, MessageError<E>>>
22where
23 T: Sink<String, Error = E>,
24 T: Stream<Item = Result<String, E>>,
25{
26 transport
27 .sink_map_err(MessageError::from)
28 .with(|msg: WsRequest| async move {
29 let msg: WsRequestMessage = msg.into();
30 let msg = serde_json::to_string(&msg).map_err(MessageError::Serializing)?;
31 Ok(msg)
32 })
33 .filter_map(|msg| async move {
34 match msg {
35 Ok(msg) => match serde_json::from_str::<Event>(&msg) {
36 Ok(event) => {
37 trace!("message layer; received event={event:?}");
38 Some(Ok(event))
39 }
40 Err(err) => {
41 warn!("message layer; deserializing message error: {err}, msg={msg}; ignored");
42 None
43 }
44 },
45 Err(err) => Some(Err(err.into())),
46 }
47 })
48}