exc_binance/websocket/response/
mod.rs1use std::fmt;
2
3use futures::{stream::BoxStream, Stream, StreamExt, TryStreamExt};
4
5use crate::websocket::protocol::frame::ServerFrame;
6
7use super::{
8 error::WsError,
9 protocol::{frame::StreamFrame, stream::MultiplexResponse},
10};
11
12pub enum WsResponse {
14 Raw(MultiplexResponse),
16 Stream(BoxStream<'static, Result<StreamFrame, WsError>>),
18 Reconnected,
20}
21
22impl fmt::Debug for WsResponse {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 match self {
25 Self::Raw(resp) => write!(f, "Raw({resp:?})"),
26 Self::Stream(_) => write!(f, "Stream(_)"),
27 Self::Reconnected => write!(f, "Reconneced"),
28 }
29 }
30}
31
32impl WsResponse {
33 pub fn into_stream<T>(self) -> Option<impl Stream<Item = Result<T, WsError>>>
35 where
36 T: TryFrom<StreamFrame, Error = WsError>,
37 {
38 match self {
39 Self::Raw(_) | Self::Reconnected => None,
40 Self::Stream(stream) => {
41 Some(stream.and_then(|frame| async move { T::try_from(frame) }))
42 }
43 }
44 }
45
46 pub(crate) async fn stream(self) -> Result<Self, WsError> {
47 match self {
48 Self::Raw(resp) => {
49 let mut stream = resp.into_stream();
50 if let Some(header) = stream.try_next().await? {
51 tracing::trace!("ws response: header={header:?}");
52 Ok(Self::Stream(
53 stream
54 .filter_map(|frame| {
55 let res = match frame {
56 Ok(ServerFrame::Stream(frame)) => Some(Ok(frame)),
57 Ok(ServerFrame::Response(resp)) => {
58 tracing::trace!("received a response frame: {resp:?}");
59 None
60 }
61 Ok(ServerFrame::Empty) => {
62 tracing::trace!("received a empty frame");
63 None
64 }
65 Err(err) => Some(Err(err)),
66 };
67 futures::future::ready(res)
68 })
69 .boxed(),
70 ))
71 } else {
72 Err(WsError::NoResponse)
73 }
74 }
75 Self::Stream(stream) => Ok(Self::Stream(stream)),
76 Self::Reconnected => Err(WsError::NoResponse),
77 }
78 }
79}
80
81impl From<MultiplexResponse> for WsResponse {
82 fn from(inner: MultiplexResponse) -> Self {
83 Self::Raw(inner)
84 }
85}