1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use super::{BinanceMessage, BinanceSubResponse};
use crate::{
    model::{MarketEvent, SubKind},
    ExchangeId, ExchangeTransformer, Subscriber, Subscription, SubscriptionIds, SubscriptionMeta,
};
use barter_integration::{
    error::SocketError, model::SubscriptionId, protocol::websocket::WsMessage, Transformer,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use tokio::sync::mpsc;

/// `BinanceFuturesUsd` [`Subscriber`](crate::Subscriber) &
/// [`ExchangeTransformer`](crate::ExchangeTransformer) implementor for the collection
/// of `Futures` data.
#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)]
pub struct BinanceFuturesUsd {
    pub ids: SubscriptionIds,
}

impl Subscriber for BinanceFuturesUsd {
    type SubResponse = BinanceSubResponse;

    fn base_url() -> &'static str {
        "wss://fstream.binance.com/ws"
    }

    fn build_subscription_meta(
        subscriptions: &[Subscription],
    ) -> Result<SubscriptionMeta, SocketError> {
        // Allocate SubscriptionIds HashMap to track identifiers for each actioned Subscription
        let mut ids = SubscriptionIds(HashMap::with_capacity(subscriptions.len()));

        // Map Barter Subscriptions to BinanceFuturesUsd channels
        let channels = subscriptions
            .iter()
            .map(|subscription| {
                // Determine the BinanceFuturesUsd specific channel for this Barter Subscription
                let channel = Self::get_channel_id(subscription)?;

                // Use channel as the SubscriptionId key in the SubscriptionIds
                ids.insert(SubscriptionId(channel.clone()), subscription.clone());

                Ok(channel)
            })
            .collect::<Result<Vec<_>, SocketError>>()?;

        // Use channels to construct a Binance subscription WsMessage
        let subscriptions = Self::subscriptions(channels);

        Ok(SubscriptionMeta {
            ids,
            expected_responses: subscriptions.len(),
            subscriptions,
        })
    }
}

impl ExchangeTransformer for BinanceFuturesUsd {
    const EXCHANGE: ExchangeId = ExchangeId::BinanceFuturesUsd;
    fn new(_: mpsc::UnboundedSender<WsMessage>, ids: SubscriptionIds) -> Self {
        Self { ids }
    }
}

impl Transformer<MarketEvent> for BinanceFuturesUsd {
    type Input = BinanceMessage;
    type OutputIter = Vec<Result<MarketEvent, SocketError>>;

    fn transform(&mut self, input: Self::Input) -> Self::OutputIter {
        let market_event = self
            .ids
            .find_instrument(&input)
            .map(|instrument| MarketEvent::from((BinanceFuturesUsd::EXCHANGE, instrument, input)));

        vec![market_event]
    }
}

impl BinanceFuturesUsd {
    /// Determine the Binance channel identifier associated with an input Barter [`Subscription`].
    fn get_channel_id(sub: &Subscription) -> Result<String, SocketError> {
        match &sub.kind {
            SubKind::Trade => Ok(format!(
                "{}{}@aggTrade",
                sub.instrument.base, sub.instrument.quote
            )),
            other => Err(SocketError::Unsupported {
                entity: BinanceFuturesUsd::EXCHANGE.as_str(),
                item: other.to_string(),
            }),
        }
    }

    /// Build a [`BinanceFuturesUsd`] compatible subscription message using the channels provided.
    fn subscriptions(channels: Vec<String>) -> Vec<WsMessage> {
        vec![WsMessage::Text(
            json!({
                "method": "SUBSCRIBE",
                "params": channels,
                "id": 1
            })
            .to_string(),
        )]
    }
}