1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
use self::{
book::l1::BinanceOrderBookL1, channel::BinanceChannel, market::BinanceMarket,
subscription::BinanceSubResponse, trade::BinanceTrade,
};
use crate::{
exchange::{Connector, ExchangeId, ExchangeServer, ExchangeSub, StreamSelector},
subscriber::{validator::WebSocketSubValidator, WebSocketSubscriber},
subscription::{book::OrderBooksL1, trade::PublicTrades, Map},
transformer::stateless::StatelessTransformer,
ExchangeWsStream,
};
use barter_integration::{error::SocketError, model::Instrument, protocol::websocket::WsMessage};
use std::{fmt::Debug, marker::PhantomData};
use url::Url;
pub mod book;
pub mod channel;
pub mod futures;
pub mod market;
pub mod spot;
pub mod subscription;
pub mod trade;
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default)]
pub struct Binance<Server> {
server: PhantomData<Server>,
}
impl<Server> Connector for Binance<Server>
where
Server: ExchangeServer,
{
const ID: ExchangeId = Server::ID;
type Channel = BinanceChannel;
type Market = BinanceMarket;
type Subscriber = WebSocketSubscriber;
type SubValidator = WebSocketSubValidator;
type SubResponse = BinanceSubResponse;
fn url() -> Result<Url, SocketError> {
Url::parse(Server::websocket_url()).map_err(SocketError::UrlParse)
}
fn requests(exchange_subs: Vec<ExchangeSub<Self::Channel, Self::Market>>) -> Vec<WsMessage> {
let stream_names = exchange_subs
.into_iter()
.map(|sub| {
format!(
"{}{}",
sub.market.as_ref().to_lowercase(),
sub.channel.as_ref()
)
})
.collect::<Vec<String>>();
vec![WsMessage::Text(
serde_json::json!({
"method": "SUBSCRIBE",
"params": stream_names,
"id": 1
})
.to_string(),
)]
}
fn expected_responses(_: &Map<Instrument>) -> usize {
1
}
}
impl<Server> StreamSelector<PublicTrades> for Binance<Server>
where
Server: ExchangeServer + Debug + Send + Sync,
{
type Stream = ExchangeWsStream<StatelessTransformer<Self, PublicTrades, BinanceTrade>>;
}
impl<Server> StreamSelector<OrderBooksL1> for Binance<Server>
where
Server: ExchangeServer + Debug + Send + Sync,
{
type Stream = ExchangeWsStream<StatelessTransformer<Self, OrderBooksL1, BinanceOrderBookL1>>;
}
impl<'de, Server> serde::Deserialize<'de> for Binance<Server>
where
Server: ExchangeServer,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{
let input = <String as serde::Deserialize>::deserialize(deserializer)?;
let expected = Self::ID.as_str();
if input.as_str() == Self::ID.as_str() {
Ok(Self::default())
} else {
Err(serde::de::Error::invalid_value(
serde::de::Unexpected::Str(input.as_str()),
&expected,
))
}
}
}
impl<Server> serde::Serialize for Binance<Server>
where
Server: ExchangeServer,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
let exchange_id = Self::ID.as_str();
serializer.serialize_str(exchange_id)
}
}