1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use super::{BinanceSubResponse, BinanceMessage};
use crate::{ExchangeTransformerId, Subscriber, ExchangeTransformer, Subscription, SubscriptionMeta, SubscriptionIds, model::MarketData, Identifiable};
use barter_integration::{StreamKind, socket::{
    Transformer, error::SocketError
}, SubscriptionId};
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use serde_json::json;
use barter_integration::socket::protocol::websocket::WsMessage;

#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
pub struct BinanceFutures {
    pub ids: SubscriptionIds,
}

impl Subscriber for BinanceFutures {
    type SubResponse = BinanceSubResponse;

    fn base_url() -> &'static str { "wss://fstream.binance.com/ws" }

    fn build_subscription_meta(subscriptions: &[Subscription]) -> Result<SubscriptionMeta, SocketError> {
        // Allocate SubscriptionIds HashMap to track identifiers for each actioned Subscription
        let mut ids = SubscriptionIds(HashMap::with_capacity(subscriptions.len()));

        // Map Barter Subscriptions to BinanceFutures channels
        let channels = subscriptions
            .iter()
            .map(|subscription| {
                // Determine the BinanceFutures specific channel for this Barter Subscription
                let channel = Self::get_channel_id(subscription)?;

                // Use channel as the SubscriptionId key in the SubscriptionIds
                ids.0.insert(SubscriptionId(channel.clone()), subscription.clone());

                Ok(channel)
            })
            .collect::<Result<Vec<_>, SocketError>>()?;

        // Use channels to construct a Binance subscription WsMessage
        let subscriptions = Self::subscriptions(channels);

        Ok(SubscriptionMeta {
            ids,
            expected_responses: subscriptions.len(),
            subscriptions,
        })
    }
}

impl ExchangeTransformer for BinanceFutures {
    const EXCHANGE: ExchangeTransformerId = ExchangeTransformerId::BinanceFutures;
    fn new(ids: SubscriptionIds) -> Self { Self { ids } }
}

impl Transformer<MarketData> for BinanceFutures {
    type Input = BinanceMessage;
    type OutputIter = Vec<Result<MarketData, SocketError>>;

    fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
        let market_data = self
            .ids
            .find_instrument(input.id())
            .map(|instrument| {
                MarketData::from((BinanceFutures::EXCHANGE, instrument, input))
            });

        vec![market_data]
    }
}

impl BinanceFutures {
    /// Determine the Binance channel identifier associated with an input Barter [`Subscription`].
    fn get_channel_id(sub: &Subscription) -> Result<String, SocketError> {
        match &sub.kind {
            StreamKind::Trade => Ok(format!("{}{}@aggTrade", sub.instrument.base, sub.instrument.quote)),
            other =>  Err(SocketError::Unsupported {
                entity: BinanceFutures::EXCHANGE.as_str(),
                item: other.to_string()
            })
        }
    }

    /// Build a `BinanceFutures` compatible subscription message using the channels provided.
    fn subscriptions(channels: Vec<String>) -> Vec<WsMessage> {
        vec![WsMessage::Text(
            json!({
                "method": "SUBSCRIBE",
                "params": channels,
                "id": 1
            }).to_string(),
        )]
    }
}