1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use crate::{ExchangeTransformer, ExchangeTransformerId, MarketData, Subscriber, exchange::ftx::model::{FtxSubResponse, FtxMessage}, Identifiable};
use barter_integration::{
StreamKind, Subscription, InstrumentKind, SubscriptionId, SubscriptionIds, SubscriptionMeta,
socket::{
Transformer,
error::SocketError,
protocol::websocket::WsMessage,
}
};
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_json::json;
mod model;
#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
pub struct Ftx {
pub ids: SubscriptionIds,
}
impl Subscriber for Ftx {
type SubResponse = FtxSubResponse;
fn base_url() -> &'static str { "wss://ftx.com/ws/" }
fn build_subscription_meta(subscriptions: &[Subscription]) -> Result<SubscriptionMeta, SocketError> {
let mut ids = SubscriptionIds(HashMap::with_capacity(subscriptions.len()));
let subscriptions = subscriptions
.iter()
.map(|subscription| {
let (channel, market) = Self::get_channel_meta(subscription)?;
let ftx_subscription = Self::subscription(channel, &market);
ids.0.insert(SubscriptionId(market), subscription.clone());
Ok(ftx_subscription)
})
.collect::<Result<Vec<_>, SocketError>>()?;
Ok(SubscriptionMeta {
ids,
expected_responses: subscriptions.len(),
subscriptions,
})
}
}
impl ExchangeTransformer for Ftx {
const EXCHANGE: ExchangeTransformerId = ExchangeTransformerId::Ftx;
fn new(ids: SubscriptionIds) -> Self { Self { ids } }
}
impl Transformer<MarketData> for Ftx {
type Input = FtxMessage;
type OutputIter = Vec<Result<MarketData, SocketError>>;
fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
let instrument = match self.ids.find_instrument(input.id()) {
Ok(instrument) => instrument,
Err(error) => return vec![Err(error)]
};
match input {
FtxMessage::Trades { trades, .. } => {
trades
.into_iter()
.map(|trade| Ok(MarketData::from(
(Ftx::EXCHANGE, instrument.clone(), trade)
)))
.collect()
}
}
}
}
impl Ftx {
fn get_channel_meta(sub: &Subscription) -> Result<(&str, String), SocketError> {
let channel = match &sub.kind {
StreamKind::Trade => "trades",
other => return Err(SocketError::Unsupported {
entity: Self::EXCHANGE.as_str(),
item: other.to_string(),
}),
};
let market = match &sub.instrument.kind {
InstrumentKind::Spot => format!("{}/{}", sub.instrument.base, sub.instrument.quote).to_uppercase(),
InstrumentKind::FuturePerpetual => format!("{}-PERP", sub.instrument.base).to_uppercase(),
};
Ok((channel, market))
}
fn subscription(channel: &str, market: &str) -> WsMessage {
WsMessage::Text(
json!({
"op": "subscribe",
"channel": channel,
"market": market,
}).to_string(),
)
}
}