1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use std::fmt;

use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};

use crate::websocket::protocol::frame::ServerFrame;

use super::{
    error::WsError,
    protocol::{frame::StreamFrame, stream::MultiplexResponse},
};

/// Binance websocket response.
pub enum WsResponse {
    /// Raw.
    Raw(MultiplexResponse),
    /// Stream.
    Stream(BoxStream<'static, Result<StreamFrame, WsError>>),
    /// Reconnected.
    Reconnected,
}

impl fmt::Debug for WsResponse {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Raw(resp) => write!(f, "Raw({resp:?})"),
            Self::Stream(_) => write!(f, "Stream(_)"),
            Self::Reconnected => write!(f, "Reconneced"),
        }
    }
}

impl WsResponse {
    /// As a stream of the given type.
    pub fn into_stream<T>(self) -> Option<impl Stream<Item = Result<T, WsError>>>
    where
        T: TryFrom<StreamFrame, Error = WsError>,
    {
        match self {
            Self::Raw(_) | Self::Reconnected => None,
            Self::Stream(stream) => {
                Some(stream.and_then(|frame| async move { T::try_from(frame) }))
            }
        }
    }

    pub(crate) async fn stream(self) -> Result<Self, WsError> {
        match self {
            Self::Raw(resp) => {
                let mut stream = resp.into_stream();
                if let Some(header) = stream.try_next().await? {
                    tracing::trace!("ws response: header={header:?}");
                    Ok(Self::Stream(
                        stream
                            .filter_map(|frame| {
                                let res = match frame {
                                    Ok(ServerFrame::Stream(frame)) => Some(Ok(frame)),
                                    Ok(ServerFrame::Response(resp)) => {
                                        tracing::trace!("received a response frame: {resp:?}");
                                        None
                                    }
                                    Ok(ServerFrame::Empty) => {
                                        tracing::trace!("received a empty frame");
                                        None
                                    }
                                    Err(err) => Some(Err(err)),
                                };
                                futures::future::ready(res)
                            })
                            .boxed(),
                    ))
                } else {
                    Err(WsError::NoResponse)
                }
            }
            Self::Stream(stream) => Ok(Self::Stream(stream)),
            Self::Reconnected => Err(WsError::NoResponse),
        }
    }
}

impl From<MultiplexResponse> for WsResponse {
    fn from(inner: MultiplexResponse) -> Self {
        Self::Raw(inner)
    }
}