exc_binance/websocket/protocol/stream/
req_res.rs1use std::time::Duration;
2
3use futures::{
4 future::ready,
5 stream::{once, BoxStream},
6 Stream, StreamExt,
7};
8
9use crate::websocket::{
10 error::WsError,
11 protocol::frame::{Name, RequestFrame, ServerFrame, StreamFrame},
12};
13
14use tokio::sync::broadcast;
15
16pub(crate) type ResponseToken = tokio::sync::oneshot::Receiver<()>;
17type RequestToken = tokio::sync::oneshot::Sender<()>;
18
19pub(crate) enum MultiplexRequestKind {
20 MainStream(Name),
21 SubStream {
22 token: RequestToken,
23 timeout: Option<Duration>,
24 stream: BoxStream<'static, RequestFrame>,
25 },
26}
27
28pub struct MultiplexRequest {
30 pub(crate) id: usize,
31 pub(crate) kind: MultiplexRequestKind,
32}
33
34impl MultiplexRequest {
35 pub(crate) fn main_stream(name: Name) -> Self {
36 Self {
37 id: 0,
38 kind: MultiplexRequestKind::MainStream(name),
39 }
40 }
41
42 pub(crate) fn new<S, F>(stream: F) -> Self
43 where
44 F: FnOnce(ResponseToken) -> S,
45 S: Stream<Item = RequestFrame> + Send + 'static,
46 {
47 let (tx, rx) = tokio::sync::oneshot::channel();
48 let stream = stream(rx).boxed();
49 Self {
50 id: 0,
51 kind: MultiplexRequestKind::SubStream {
52 token: tx,
53 timeout: None,
54 stream,
55 },
56 }
57 }
58
59 pub(crate) fn timeout(mut self, duration: Duration) -> Self {
60 match &mut self.kind {
61 MultiplexRequestKind::MainStream(_) => {}
62 MultiplexRequestKind::SubStream { timeout, .. } => {
63 *timeout = Some(duration);
64 }
65 }
66 self
67 }
68}
69
70#[derive(Debug)]
72pub enum MultiplexResponse {
73 MainStream(usize, Option<broadcast::Receiver<StreamFrame>>),
75 SubStream {
77 id: usize,
79 token: RequestToken,
81 rx: tokio::sync::mpsc::UnboundedReceiver<Result<ServerFrame, WsError>>,
83 },
84}
85
86impl MultiplexResponse {
87 pub(crate) fn into_stream(self) -> impl Stream<Item = Result<ServerFrame, WsError>> {
88 match self {
89 Self::MainStream(_, rx) => match rx {
90 Some(rx) => {
91 let stream = tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(
92 |res| match res {
93 Ok(frame) => ready(Some(Ok(ServerFrame::Stream(frame)))),
94 Err(_) => ready(None),
95 },
96 );
97 once(ready(Ok(ServerFrame::Empty)))
98 .chain(stream)
99 .left_stream()
100 }
101 None => once(ready(Err(WsError::MainStreamNotFound))).right_stream(),
102 }
103 .left_stream(),
104 Self::SubStream { token, rx, .. } => {
105 tokio_stream::wrappers::UnboundedReceiverStream::new(rx)
106 .scan(token, |_, item| futures::future::ready(Some(item)))
107 .right_stream()
108 }
109 }
110 }
111}