binance_async_api/websocket/
mod.rs1pub mod coinm;
2pub mod spot;
3pub mod usdm;
4
5use crate::{
6 client::{BinanceClient, Product},
7 errors::BinanceError,
8};
9use futures_util::{stream::Stream, StreamExt};
10use reqwest::Url;
11use serde::de::DeserializeOwned;
12use serde_json::{from_str, Value};
13use std::{
14 marker::PhantomData, pin::Pin, str::FromStr, task::{Context, Poll}
15};
16use tokio::net::TcpStream;
17use tokio_tungstenite::{
18 connect_async,
19 tungstenite::{self, Message},
20 MaybeTlsStream, WebSocketStream,
21};
22
23pub trait StreamTopic {
24 const PRODUCT: Product;
25 fn endpoint(&self) -> String;
26 type Event: DeserializeOwned + Unpin;
27}
28
29type WSStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
30
31pub struct BinanceWebsocket<E> {
32 stream: WSStream,
33 _phantom: PhantomData<E>,
34}
35
36impl<E: DeserializeOwned + Unpin> Stream for BinanceWebsocket<E> {
37 type Item = Result<E, BinanceError>;
38
39 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
40 let msg = match self.stream.poll_next_unpin(cx) {
41 Poll::Ready(Some(Ok(c))) => c,
42 Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e.into()))),
43 Poll::Pending => return Poll::Pending,
44 Poll::Ready(None) => return Poll::Ready(None),
45 };
46 let text = match msg {
47 Message::Text(msg) => msg,
48 Message::Binary(_) | Message::Frame(_) | Message::Pong(_) | Message::Ping(_) => {
49 return Poll::Pending;
50 }
51 Message::Close(_) => return Poll::Ready(None),
52 };
53
54 let event: E = match from_str(&text) {
55 Ok(r) => r,
56 Err(e) => {
57 let val = Value::from_str(&text).unwrap();
58 eprintln!("Failed to parse event:");
59 eprintln!("{:#?}", val.as_object().unwrap());
60 panic!("parsing error: {}", e);
61 },
62 };
63
64 Poll::Ready(Some(Ok(event)))
65 }
66}
67
68impl BinanceClient {
69 pub async fn connect_stream<T: StreamTopic>(
70 &self,
71 topic: T,
72 ) -> Result<BinanceWebsocket<T::Event>, BinanceError> {
73 let base = match T::PRODUCT {
74 Product::Spot => &self.config.ws_endpoint,
75 Product::UsdMFutures => &self.config.usdm_futures_ws_endpoint,
76 Product::CoinMFutures => &self.config.coinm_futures_ws_endpoint,
77 };
78 let endpoint = topic.endpoint();
79 let url = Url::parse(&format!("{}{}", base, endpoint)).unwrap();
80 let (stream, _) = match connect_async(url).await {
81 Ok(v) => v,
82 Err(tungstenite::Error::Http(http)) => {
83 return Err(BinanceError::StartWebsocketError {
84 status_code: http.status(),
85 headers: http.headers().clone(),
86 body: String::from_utf8_lossy(http.body().as_deref().unwrap_or_default()).to_string(),
87 })
88 }
89 Err(e) => return Err(e.into()),
90 };
91 Ok(BinanceWebsocket {
92 stream,
93 _phantom: PhantomData,
94 })
95 }
96}