binance_async/websocket/
mod.rs

1pub mod coinm;
2mod models;
3pub mod spot;
4pub mod usdm;
5
6use crate::{
7    error::BinanceError::{self, *},
8    models::Product,
9    Config,
10};
11use fehler::{throw, throws};
12use futures::{stream::Stream, StreamExt};
13use log::debug;
14pub use models::*;
15use reqwest::Url;
16use serde::{Deserialize, Serialize};
17use serde_json::{from_str, value::RawValue};
18use std::{
19    marker::PhantomData,
20    pin::Pin,
21    task::{Context, Poll},
22};
23use tokio::net::TcpStream;
24use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
25use tungstenite::Message;
26
27type WSStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
28
29pub trait ParseMessage: Sized {
30    const PRODUCT: Product;
31
32    fn parse(stream: &str, data: &str) -> Result<Self, BinanceError>;
33}
34
35pub struct BinanceWebsocket<M> {
36    stream: WSStream,
37    _phantom: PhantomData<M>,
38}
39
40impl<M> BinanceWebsocket<M>
41where
42    M: ParseMessage,
43{
44    #[throws(BinanceError)]
45    pub async fn new<I, S>(topics: I) -> BinanceWebsocket<M>
46    where
47        I: IntoIterator<Item = S>,
48        S: AsRef<str>,
49    {
50        let config = Config::default();
51        Self::with_config(&config, topics).await?
52    }
53
54    #[throws(BinanceError)]
55    pub async fn with_config<I, S>(config: &Config, topics: I) -> BinanceWebsocket<M>
56    where
57        I: IntoIterator<Item = S>,
58        S: AsRef<str>,
59    {
60        let mut combined = String::new();
61        for topic in topics {
62            if !combined.is_empty() {
63                combined.push('/');
64            }
65
66            combined.push_str(topic.as_ref())
67        }
68
69        if combined.is_empty() {
70            throw!(EmptyTopics)
71        }
72
73        let base = match M::PRODUCT {
74            Product::Spot => &config.ws_endpoint,
75            Product::UsdMFutures => &config.usdm_futures_ws_endpoint,
76            Product::CoinMFutures => &config.coinm_futures_ws_endpoint,
77            Product::EuropeanOptions => &config.european_options_ws_endpoint,
78        };
79        let endpoint = Url::parse(&format!("{}/stream?streams={}", base, combined)).unwrap();
80        debug!("ws endpoint: {endpoint:?}");
81        let (stream, _) = match connect_async(endpoint).await {
82            Ok(v) => v,
83            Err(tungstenite::Error::Http(ref http)) => throw!(StartWebsocketError(
84                http.status(),
85                String::from_utf8_lossy(http.body().as_deref().unwrap_or_default()).to_string()
86            )),
87            Err(e) => throw!(e),
88        };
89        Self {
90            stream,
91            _phantom: PhantomData,
92        }
93    }
94}
95
96#[derive(Deserialize)]
97struct MessageWithTopic<'a> {
98    stream: String,
99    #[serde(borrow)]
100    data: &'a RawValue,
101}
102
103impl<M> Stream for BinanceWebsocket<M>
104where
105    M: ParseMessage + Unpin + std::fmt::Debug,
106{
107    type Item = Result<M, BinanceError>;
108
109    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
110        let c = match self.stream.poll_next_unpin(cx) {
111            Poll::Ready(Some(Ok(c))) => c,
112            Poll::Ready(Some(Err(e))) => return dbg!(Poll::Ready(Some(Err(e.into())))),
113            Poll::Pending => return Poll::Pending,
114            Poll::Ready(None) => return Poll::Ready(None),
115        };
116        let msg = match c {
117            Message::Text(msg) => msg,
118            Message::Binary(_) | Message::Frame(_) | Message::Pong(..) | Message::Ping(..) => {
119                return Poll::Pending
120            }
121            Message::Close(_) => return Poll::Ready(None),
122        };
123
124        let t: MessageWithTopic = match from_str(&msg) {
125            Ok(v) => v,
126            Err(e) => return Poll::Ready(Some(Err(e.into()))),
127        };
128
129        Poll::Ready(Some(M::parse(&t.stream, t.data.get())))
130    }
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134#[serde(untagged)]
135enum Either<L, R> {
136    Left(L),
137    Right(R),
138}