binance_async_api/websocket/
mod.rs

1pub mod coinm;
2pub mod spot;
3pub mod usdm;
4
5use crate::{
6    client::{BinanceClient, Product},
7    errors::BinanceError,
8};
9use futures_util::{stream::Stream, StreamExt};
10use reqwest::Url;
11use serde::de::DeserializeOwned;
12use serde_json::{from_str, Value};
13use std::{
14    marker::PhantomData, pin::Pin, str::FromStr, task::{Context, Poll}
15};
16use tokio::net::TcpStream;
17use tokio_tungstenite::{
18    connect_async,
19    tungstenite::{self, Message},
20    MaybeTlsStream, WebSocketStream,
21};
22
23pub trait StreamTopic {
24    const PRODUCT: Product;
25    fn endpoint(&self) -> String;
26    type Event: DeserializeOwned + Unpin;
27}
28
29type WSStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
30
31pub struct BinanceWebsocket<E> {
32    stream: WSStream,
33    _phantom: PhantomData<E>,
34}
35
36impl<E: DeserializeOwned + Unpin> Stream for BinanceWebsocket<E> {
37    type Item = Result<E, BinanceError>;
38
39    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
40        let msg = match self.stream.poll_next_unpin(cx) {
41            Poll::Ready(Some(Ok(c))) => c,
42            Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e.into()))),
43            Poll::Pending => return Poll::Pending,
44            Poll::Ready(None) => return Poll::Ready(None),
45        };
46        let text = match msg {
47            Message::Text(msg) => msg,
48            Message::Binary(_) | Message::Frame(_) | Message::Pong(_) | Message::Ping(_) => {
49                return Poll::Pending;
50            }
51            Message::Close(_) => return Poll::Ready(None),
52        };
53
54        let event: E = match from_str(&text) {
55            Ok(r) => r,
56            Err(e) => {
57                let val = Value::from_str(&text).unwrap();
58                eprintln!("Failed to parse event:");
59                eprintln!("{:#?}", val.as_object().unwrap());
60                panic!("parsing error: {}", e);
61            },
62        };
63
64        Poll::Ready(Some(Ok(event)))
65    }
66}
67
68impl BinanceClient {
69    pub async fn connect_stream<T: StreamTopic>(
70        &self,
71        topic: T,
72    ) -> Result<BinanceWebsocket<T::Event>, BinanceError> {
73        let base = match T::PRODUCT {
74            Product::Spot => &self.config.ws_endpoint,
75            Product::UsdMFutures => &self.config.usdm_futures_ws_endpoint,
76            Product::CoinMFutures => &self.config.coinm_futures_ws_endpoint,
77        };
78        let endpoint = topic.endpoint();
79        let url = Url::parse(&format!("{}{}", base, endpoint)).unwrap();
80        let (stream, _) = match connect_async(url).await {
81            Ok(v) => v,
82            Err(tungstenite::Error::Http(http)) => {
83                return Err(BinanceError::StartWebsocketError {
84                    status_code: http.status(),
85                    headers: http.headers().clone(),
86                    body: String::from_utf8_lossy(http.body().as_deref().unwrap_or_default()).to_string(),
87                })
88            }
89            Err(e) => return Err(e.into()),
90        };
91        Ok(BinanceWebsocket {
92            stream,
93            _phantom: PhantomData,
94        })
95    }
96}