exc_binance/websocket/protocol/stream/
req_res.rs

1use std::time::Duration;
2
3use futures::{
4    future::ready,
5    stream::{once, BoxStream},
6    Stream, StreamExt,
7};
8
9use crate::websocket::{
10    error::WsError,
11    protocol::frame::{Name, RequestFrame, ServerFrame, StreamFrame},
12};
13
14use tokio::sync::broadcast;
15
16pub(crate) type ResponseToken = tokio::sync::oneshot::Receiver<()>;
17type RequestToken = tokio::sync::oneshot::Sender<()>;
18
19pub(crate) enum MultiplexRequestKind {
20    MainStream(Name),
21    SubStream {
22        token: RequestToken,
23        timeout: Option<Duration>,
24        stream: BoxStream<'static, RequestFrame>,
25    },
26}
27
28/// Multiplex request.
29pub struct MultiplexRequest {
30    pub(crate) id: usize,
31    pub(crate) kind: MultiplexRequestKind,
32}
33
34impl MultiplexRequest {
35    pub(crate) fn main_stream(name: Name) -> Self {
36        Self {
37            id: 0,
38            kind: MultiplexRequestKind::MainStream(name),
39        }
40    }
41
42    pub(crate) fn new<S, F>(stream: F) -> Self
43    where
44        F: FnOnce(ResponseToken) -> S,
45        S: Stream<Item = RequestFrame> + Send + 'static,
46    {
47        let (tx, rx) = tokio::sync::oneshot::channel();
48        let stream = stream(rx).boxed();
49        Self {
50            id: 0,
51            kind: MultiplexRequestKind::SubStream {
52                token: tx,
53                timeout: None,
54                stream,
55            },
56        }
57    }
58
59    pub(crate) fn timeout(mut self, duration: Duration) -> Self {
60        match &mut self.kind {
61            MultiplexRequestKind::MainStream(_) => {}
62            MultiplexRequestKind::SubStream { timeout, .. } => {
63                *timeout = Some(duration);
64            }
65        }
66        self
67    }
68}
69
70/// Multiplex response.
71#[derive(Debug)]
72pub enum MultiplexResponse {
73    /// Main stream.
74    MainStream(usize, Option<broadcast::Receiver<StreamFrame>>),
75    /// Sub stream.
76    SubStream {
77        /// Id.
78        id: usize,
79        /// Token.
80        token: RequestToken,
81        /// Rx.
82        rx: tokio::sync::mpsc::UnboundedReceiver<Result<ServerFrame, WsError>>,
83    },
84}
85
86impl MultiplexResponse {
87    pub(crate) fn into_stream(self) -> impl Stream<Item = Result<ServerFrame, WsError>> {
88        match self {
89            Self::MainStream(_, rx) => match rx {
90                Some(rx) => {
91                    let stream = tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
92                        |res| match res {
93                            Ok(frame) => ready(Some(Ok(ServerFrame::Stream(frame)))),
94                            Err(_) => ready(None),
95                        },
96                    );
97                    once(ready(Ok(ServerFrame::Empty)))
98                        .chain(stream)
99                        .left_stream()
100                }
101                None => once(ready(Err(WsError::MainStreamNotFound))).right_stream(),
102            }
103            .left_stream(),
104            Self::SubStream { token, rx, .. } => {
105                tokio_stream::wrappers::UnboundedReceiverStream::new(rx)
106                    .scan(token, |_, item| futures::future::ready(Some(item)))
107                    .right_stream()
108            }
109        }
110    }
111}