1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use crate::{ExchangeTransformer, ExchangeTransformerId, MarketData, Subscriber, exchange::ftx::model::{FtxSubResponse, FtxMessage}, Identifiable};
use barter_integration::{
    StreamKind, Subscription, InstrumentKind, SubscriptionId, SubscriptionIds, SubscriptionMeta,
    socket::{
        Transformer,
        error::SocketError,
        protocol::websocket::WsMessage,
    }
};
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_json::json;

mod model;

#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
pub struct Ftx {
    pub ids: SubscriptionIds,
}

impl Subscriber for Ftx {
    type SubResponse = FtxSubResponse;

    fn base_url() -> &'static str { "wss://ftx.com/ws/" }

    fn build_subscription_meta(subscriptions: &[Subscription]) -> Result<SubscriptionMeta, SocketError> {
        // Allocate SubscriptionIds HashMap to track identifiers for each actioned Subscription
        let mut ids = SubscriptionIds(HashMap::with_capacity(subscriptions.len()));

        // Map Barter Subscriptions to Ftx channels
        let subscriptions = subscriptions
            .iter()
            .map(|subscription| {
                // Determine the Ftx specific channel & market for this Barter Subscription
                let (channel, market) = Self::get_channel_meta(subscription)?;

                // Construct Ftx specific subscription message
                let ftx_subscription = Self::subscription(channel, &market);

                // Use market as the SubscriptionId key in the SubscriptionIds
                ids.0.insert(SubscriptionId(market), subscription.clone());

                Ok(ftx_subscription)
            })
            .collect::<Result<Vec<_>, SocketError>>()?;

        Ok(SubscriptionMeta {
            ids,
            expected_responses: subscriptions.len(),
            subscriptions,
        })
    }
}

impl ExchangeTransformer for Ftx {
    const EXCHANGE: ExchangeTransformerId = ExchangeTransformerId::Ftx;
    fn new(ids: SubscriptionIds) -> Self { Self { ids } }
}

impl Transformer<MarketData> for Ftx {
    type Input = FtxMessage;
    type OutputIter = Vec<Result<MarketData, SocketError>>;

    fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
        let instrument = match self.ids.find_instrument(input.id()) {
            Ok(instrument) => instrument,
            Err(error) => return vec![Err(error)]
        };

        match input {
            FtxMessage::Trades { trades, .. } => {
                trades
                    .into_iter()
                    .map(|trade| Ok(MarketData::from(
                        (Ftx::EXCHANGE, instrument.clone(), trade)
                    )))
                    .collect()
            }
        }
    }
}

impl Ftx {
    /// Determine the `Ftx` channel metadata associated with an input Barter [`Subscription`].
    /// This includes the `Ftx` &str channel, and a `String` market identifier. Both are used to
    /// build an `Ftx` subscription payload.
    ///
    /// Example Ok Return: Ok("trades", "BTC/USDT")
    /// where channel == "trades" & market == "BTC/USDT".
    fn get_channel_meta(sub: &Subscription) -> Result<(&str, String), SocketError> {
        // Determine Ftx channel using the Subscription StreamKind
        let channel = match &sub.kind {
            StreamKind::Trade => "trades",
            other => return Err(SocketError::Unsupported {
                entity: Self::EXCHANGE.as_str(),
                item: other.to_string(),
            }),
        };

        // Determine Ftx market using the InstrumentKind
        let market = match &sub.instrument.kind {
            InstrumentKind::Spot => format!("{}/{}", sub.instrument.base, sub.instrument.quote).to_uppercase(),
            InstrumentKind::FuturePerpetual => format!("{}-PERP", sub.instrument.base).to_uppercase(),
        };

        Ok((channel, market))
    }

    /// Build a `Ftx` compatible subscription message using the channel & market provided.
    fn subscription(channel: &str, market: &str) -> WsMessage {
        WsMessage::Text(
            json!({
                "op": "subscribe",
                "channel": channel,
                "market": market,
            }).to_string(),
        )
    }
}