binance_async/websocket/
mod.rs1pub mod coinm;
2mod models;
3pub mod spot;
4pub mod usdm;
5
6use crate::{
7 error::BinanceError::{self, *},
8 models::Product,
9 Config,
10};
11use fehler::{throw, throws};
12use futures::{stream::Stream, StreamExt};
13use log::debug;
14pub use models::*;
15use reqwest::Url;
16use serde::{Deserialize, Serialize};
17use serde_json::{from_str, value::RawValue};
18use std::{
19 marker::PhantomData,
20 pin::Pin,
21 task::{Context, Poll},
22};
23use tokio::net::TcpStream;
24use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
25use tungstenite::Message;
26
27type WSStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
28
29pub trait ParseMessage: Sized {
30 const PRODUCT: Product;
31
32 fn parse(stream: &str, data: &str) -> Result<Self, BinanceError>;
33}
34
35pub struct BinanceWebsocket<M> {
36 stream: WSStream,
37 _phantom: PhantomData<M>,
38}
39
40impl<M> BinanceWebsocket<M>
41where
42 M: ParseMessage,
43{
44 #[throws(BinanceError)]
45 pub async fn new<I, S>(topics: I) -> BinanceWebsocket<M>
46 where
47 I: IntoIterator<Item = S>,
48 S: AsRef<str>,
49 {
50 let config = Config::default();
51 Self::with_config(&config, topics).await?
52 }
53
54 #[throws(BinanceError)]
55 pub async fn with_config<I, S>(config: &Config, topics: I) -> BinanceWebsocket<M>
56 where
57 I: IntoIterator<Item = S>,
58 S: AsRef<str>,
59 {
60 let mut combined = String::new();
61 for topic in topics {
62 if !combined.is_empty() {
63 combined.push('/');
64 }
65
66 combined.push_str(topic.as_ref())
67 }
68
69 if combined.is_empty() {
70 throw!(EmptyTopics)
71 }
72
73 let base = match M::PRODUCT {
74 Product::Spot => &config.ws_endpoint,
75 Product::UsdMFutures => &config.usdm_futures_ws_endpoint,
76 Product::CoinMFutures => &config.coinm_futures_ws_endpoint,
77 Product::EuropeanOptions => &config.european_options_ws_endpoint,
78 };
79 let endpoint = Url::parse(&format!("{}/stream?streams={}", base, combined)).unwrap();
80 debug!("ws endpoint: {endpoint:?}");
81 let (stream, _) = match connect_async(endpoint).await {
82 Ok(v) => v,
83 Err(tungstenite::Error::Http(ref http)) => throw!(StartWebsocketError(
84 http.status(),
85 String::from_utf8_lossy(http.body().as_deref().unwrap_or_default()).to_string()
86 )),
87 Err(e) => throw!(e),
88 };
89 Self {
90 stream,
91 _phantom: PhantomData,
92 }
93 }
94}
95
96#[derive(Deserialize)]
97struct MessageWithTopic<'a> {
98 stream: String,
99 #[serde(borrow)]
100 data: &'a RawValue,
101}
102
103impl<M> Stream for BinanceWebsocket<M>
104where
105 M: ParseMessage + Unpin + std::fmt::Debug,
106{
107 type Item = Result<M, BinanceError>;
108
109 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
110 let c = match self.stream.poll_next_unpin(cx) {
111 Poll::Ready(Some(Ok(c))) => c,
112 Poll::Ready(Some(Err(e))) => return dbg!(Poll::Ready(Some(Err(e.into())))),
113 Poll::Pending => return Poll::Pending,
114 Poll::Ready(None) => return Poll::Ready(None),
115 };
116 let msg = match c {
117 Message::Text(msg) => msg,
118 Message::Binary(_) | Message::Frame(_) | Message::Pong(..) | Message::Ping(..) => {
119 return Poll::Pending
120 }
121 Message::Close(_) => return Poll::Ready(None),
122 };
123
124 let t: MessageWithTopic = match from_str(&msg) {
125 Ok(v) => v,
126 Err(e) => return Poll::Ready(Some(Err(e.into()))),
127 };
128
129 Poll::Ready(Some(M::parse(&t.stream, t.data.get())))
130 }
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
134#[serde(untagged)]
135enum Either<L, R> {
136 Left(L),
137 Right(R),
138}