exc_binance/websocket/response/
mod.rs

1use std::fmt;
2
3use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
4
5use crate::websocket::protocol::frame::ServerFrame;
6
7use super::{
8    error::WsError,
9    protocol::{frame::StreamFrame, stream::MultiplexResponse},
10};
11
12/// Binance websocket response.
13pub enum WsResponse {
14    /// Raw.
15    Raw(MultiplexResponse),
16    /// Stream.
17    Stream(BoxStream<'static, Result<StreamFrame, WsError>>),
18    /// Reconnected.
19    Reconnected,
20}
21
22impl fmt::Debug for WsResponse {
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        match self {
25            Self::Raw(resp) => write!(f, "Raw({resp:?})"),
26            Self::Stream(_) => write!(f, "Stream(_)"),
27            Self::Reconnected => write!(f, "Reconneced"),
28        }
29    }
30}
31
32impl WsResponse {
33    /// As a stream of the given type.
34    pub fn into_stream<T>(self) -> Option<impl Stream<Item = Result<T, WsError>>>
35    where
36        T: TryFrom<StreamFrame, Error = WsError>,
37    {
38        match self {
39            Self::Raw(_) | Self::Reconnected => None,
40            Self::Stream(stream) => {
41                Some(stream.and_then(|frame| async move { T::try_from(frame) }))
42            }
43        }
44    }
45
46    pub(crate) async fn stream(self) -> Result<Self, WsError> {
47        match self {
48            Self::Raw(resp) => {
49                let mut stream = resp.into_stream();
50                if let Some(header) = stream.try_next().await? {
51                    tracing::trace!("ws response: header={header:?}");
52                    Ok(Self::Stream(
53                        stream
54                            .filter_map(|frame| {
55                                let res = match frame {
56                                    Ok(ServerFrame::Stream(frame)) => Some(Ok(frame)),
57                                    Ok(ServerFrame::Response(resp)) => {
58                                        tracing::trace!("received a response frame: {resp:?}");
59                                        None
60                                    }
61                                    Ok(ServerFrame::Empty) => {
62                                        tracing::trace!("received a empty frame");
63                                        None
64                                    }
65                                    Err(err) => Some(Err(err)),
66                                };
67                                futures::future::ready(res)
68                            })
69                            .boxed(),
70                    ))
71                } else {
72                    Err(WsError::NoResponse)
73                }
74            }
75            Self::Stream(stream) => Ok(Self::Stream(stream)),
76            Self::Reconnected => Err(WsError::NoResponse),
77        }
78    }
79}
80
81impl From<MultiplexResponse> for WsResponse {
82    fn from(inner: MultiplexResponse) -> Self {
83        Self::Raw(inner)
84    }
85}