barter_data/exchange/bybit/
message.rs

1use crate::{
2    Identifier,
3    event::MarketIter,
4    exchange::bybit::{channel::BybitChannel, subscription::BybitResponse, trade::BybitTrade},
5    subscription::trade::PublicTrade,
6};
7use barter_instrument::exchange::ExchangeId;
8use barter_integration::subscription::SubscriptionId;
9use chrono::{DateTime, Utc};
10use serde::{
11    Deserialize, Serialize,
12    de::{Error, Unexpected},
13};
14
15/// [`Bybit`](super::Bybit) websocket message supports both [`BybitTrade`] and [`BybitResponse`].
16#[derive(Debug, Serialize, Deserialize)]
17#[serde(untagged)]
18pub enum BybitMessage {
19    Response(BybitResponse),
20    Trade(BybitTrade),
21}
22
23/// ### Raw Payload Examples
24/// See docs: <https://bybit-exchange.github.io/docs/v5/websocket/public/trade>
25/// #### Spot Side::Buy Trade
26///```json
27/// {
28///     "topic": "publicTrade.BTCUSDT",
29///     "type": "snapshot",
30///     "ts": 1672304486868,
31///     "data": [
32///         {
33///             "T": 1672304486865,
34///             "s": "BTCUSDT",
35///             "S": "Buy",
36///             "v": "0.001",
37///             "p": "16578.50",
38///             "L": "PlusTick",
39///             "i": "20f43950-d8dd-5b31-9112-a178eb6023af",
40///             "BT": false
41///         }
42///     ]
43/// }
44/// ```
45#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Deserialize, Serialize)]
46pub struct BybitPayload<T> {
47    #[serde(alias = "topic", deserialize_with = "de_message_subscription_id")]
48    pub subscription_id: SubscriptionId,
49
50    #[serde(rename = "type")]
51    pub r#type: String,
52
53    #[serde(
54        alias = "ts",
55        deserialize_with = "barter_integration::de::de_u64_epoch_ms_as_datetime_utc"
56    )]
57    pub time: DateTime<Utc>,
58    pub data: T,
59}
60
61/// Deserialize a [`BybitPayload`] "s" (eg/ "publicTrade.BTCUSDT") as the associated
62/// [`SubscriptionId`].
63///
64/// eg/ "publicTrade|BTCUSDT"
65pub fn de_message_subscription_id<'de, D>(deserializer: D) -> Result<SubscriptionId, D::Error>
66where
67    D: serde::de::Deserializer<'de>,
68{
69    let input = <&str as serde::Deserialize>::deserialize(deserializer)?;
70    let mut tokens = input.split('.');
71
72    match (tokens.next(), tokens.next(), tokens.next()) {
73        (Some("publicTrade"), Some(market), None) => Ok(SubscriptionId::from(format!(
74            "{}|{market}",
75            BybitChannel::TRADES.0
76        ))),
77        _ => Err(Error::invalid_value(
78            Unexpected::Str(input),
79            &"invalid message type expected pattern: <type>.<symbol>",
80        )),
81    }
82}
83
84impl Identifier<Option<SubscriptionId>> for BybitMessage {
85    fn id(&self) -> Option<SubscriptionId> {
86        match self {
87            BybitMessage::Trade(trade) => Some(trade.subscription_id.clone()),
88            _ => None,
89        }
90    }
91}
92
93impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, BybitMessage)>
94    for MarketIter<InstrumentKey, PublicTrade>
95{
96    fn from((exchange_id, instrument, message): (ExchangeId, InstrumentKey, BybitMessage)) -> Self {
97        match message {
98            BybitMessage::Response(_) => Self(vec![]),
99            BybitMessage::Trade(trade) => Self::from((exchange_id, instrument, trade)),
100        }
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use super::*;
107
108    mod de {
109        use super::*;
110        use crate::exchange::bybit::subscription::BybitReturnMessage;
111        use barter_integration::error::SocketError;
112
113        #[test]
114        fn test_bybit_pong() {
115            struct TestCase {
116                input: &'static str,
117                expected: Result<BybitResponse, SocketError>,
118            }
119
120            let tests = vec![
121                // TC0: input BybitResponse(Pong) is deserialised
122                TestCase {
123                    input: r#"
124                        {
125                            "success": true,
126                            "ret_msg": "pong",
127                            "conn_id": "0970e817-426e-429a-a679-ff7f55e0b16a",
128                            "op": "ping"
129                        }
130                    "#,
131                    expected: Ok(BybitResponse {
132                        success: true,
133                        ret_msg: BybitReturnMessage::Pong,
134                    }),
135                },
136            ];
137
138            for (index, test) in tests.into_iter().enumerate() {
139                let actual = serde_json::from_str::<BybitResponse>(test.input);
140                match (actual, test.expected) {
141                    (Ok(actual), Ok(expected)) => {
142                        assert_eq!(actual, expected, "TC{} failed", index)
143                    }
144                    (Err(_), Err(_)) => {
145                        // Test passed
146                    }
147                    (actual, expected) => {
148                        // Test failed
149                        panic!(
150                            "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
151                        );
152                    }
153                }
154            }
155        }
156    }
157}