1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use super::{BinanceSubResponse, BinanceMessage};
use crate::{ExchangeTransformerId, Subscriber, ExchangeTransformer, Subscription, SubscriptionMeta, SubscriptionIds, model::MarketData, Identifiable};
use barter_integration::{StreamKind, socket::{
Transformer, error::SocketError
}, SubscriptionId};
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_json::json;
use barter_integration::socket::protocol::websocket::WsMessage;
#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
pub struct BinanceFutures {
pub ids: SubscriptionIds,
}
impl Subscriber for BinanceFutures {
type SubResponse = BinanceSubResponse;
fn base_url() -> &'static str { "wss://fstream.binance.com/ws" }
fn build_subscription_meta(subscriptions: &[Subscription]) -> Result<SubscriptionMeta, SocketError> {
let mut ids = SubscriptionIds(HashMap::with_capacity(subscriptions.len()));
let channels = subscriptions
.iter()
.map(|subscription| {
let channel = Self::get_channel_id(subscription)?;
ids.0.insert(SubscriptionId(channel.clone()), subscription.clone());
Ok(channel)
})
.collect::<Result<Vec<_>, SocketError>>()?;
let subscriptions = Self::subscriptions(channels);
Ok(SubscriptionMeta {
ids,
expected_responses: subscriptions.len(),
subscriptions,
})
}
}
impl ExchangeTransformer for BinanceFutures {
const EXCHANGE: ExchangeTransformerId = ExchangeTransformerId::BinanceFutures;
fn new(ids: SubscriptionIds) -> Self { Self { ids } }
}
impl Transformer<MarketData> for BinanceFutures {
type Input = BinanceMessage;
type OutputIter = Vec<Result<MarketData, SocketError>>;
fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
let market_data = self
.ids
.find_instrument(input.id())
.map(|instrument| {
MarketData::from((BinanceFutures::EXCHANGE, instrument, input))
});
vec![market_data]
}
}
impl BinanceFutures {
fn get_channel_id(sub: &Subscription) -> Result<String, SocketError> {
match &sub.kind {
StreamKind::Trade => Ok(format!("{}{}@aggTrade", sub.instrument.base, sub.instrument.quote)),
other => Err(SocketError::Unsupported {
entity: BinanceFutures::EXCHANGE.as_str(),
item: other.to_string()
})
}
}
fn subscriptions(channels: Vec<String>) -> Vec<WsMessage> {
vec![WsMessage::Text(
json!({
"method": "SUBSCRIBE",
"params": channels,
"id": 1
}).to_string(),
)]
}
}