barter_data/exchange/bybit/
message.rs1use crate::{
2 Identifier,
3 event::MarketIter,
4 exchange::bybit::{channel::BybitChannel, subscription::BybitResponse, trade::BybitTrade},
5 subscription::trade::PublicTrade,
6};
7use barter_instrument::exchange::ExchangeId;
8use barter_integration::subscription::SubscriptionId;
9use chrono::{DateTime, Utc};
10use serde::{
11 Deserialize, Serialize,
12 de::{Error, Unexpected},
13};
14
15#[derive(Debug, Serialize, Deserialize)]
17#[serde(untagged)]
18pub enum BybitMessage {
19 Response(BybitResponse),
20 Trade(BybitTrade),
21}
22
23#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Deserialize, Serialize)]
46pub struct BybitPayload<T> {
47 #[serde(alias = "topic", deserialize_with = "de_message_subscription_id")]
48 pub subscription_id: SubscriptionId,
49
50 #[serde(rename = "type")]
51 pub r#type: String,
52
53 #[serde(
54 alias = "ts",
55 deserialize_with = "barter_integration::de::de_u64_epoch_ms_as_datetime_utc"
56 )]
57 pub time: DateTime<Utc>,
58 pub data: T,
59}
60
61pub fn de_message_subscription_id<'de, D>(deserializer: D) -> Result<SubscriptionId, D::Error>
66where
67 D: serde::de::Deserializer<'de>,
68{
69 let input = <&str as serde::Deserialize>::deserialize(deserializer)?;
70 let mut tokens = input.split('.');
71
72 match (tokens.next(), tokens.next(), tokens.next()) {
73 (Some("publicTrade"), Some(market), None) => Ok(SubscriptionId::from(format!(
74 "{}|{market}",
75 BybitChannel::TRADES.0
76 ))),
77 _ => Err(Error::invalid_value(
78 Unexpected::Str(input),
79 &"invalid message type expected pattern: <type>.<symbol>",
80 )),
81 }
82}
83
84impl Identifier<Option<SubscriptionId>> for BybitMessage {
85 fn id(&self) -> Option<SubscriptionId> {
86 match self {
87 BybitMessage::Trade(trade) => Some(trade.subscription_id.clone()),
88 _ => None,
89 }
90 }
91}
92
93impl<InstrumentKey: Clone> From<(ExchangeId, InstrumentKey, BybitMessage)>
94 for MarketIter<InstrumentKey, PublicTrade>
95{
96 fn from((exchange_id, instrument, message): (ExchangeId, InstrumentKey, BybitMessage)) -> Self {
97 match message {
98 BybitMessage::Response(_) => Self(vec![]),
99 BybitMessage::Trade(trade) => Self::from((exchange_id, instrument, trade)),
100 }
101 }
102}
103
104#[cfg(test)]
105mod tests {
106 use super::*;
107
108 mod de {
109 use super::*;
110 use crate::exchange::bybit::subscription::BybitReturnMessage;
111 use barter_integration::error::SocketError;
112
113 #[test]
114 fn test_bybit_pong() {
115 struct TestCase {
116 input: &'static str,
117 expected: Result<BybitResponse, SocketError>,
118 }
119
120 let tests = vec![
121 TestCase {
123 input: r#"
124 {
125 "success": true,
126 "ret_msg": "pong",
127 "conn_id": "0970e817-426e-429a-a679-ff7f55e0b16a",
128 "op": "ping"
129 }
130 "#,
131 expected: Ok(BybitResponse {
132 success: true,
133 ret_msg: BybitReturnMessage::Pong,
134 }),
135 },
136 ];
137
138 for (index, test) in tests.into_iter().enumerate() {
139 let actual = serde_json::from_str::<BybitResponse>(test.input);
140 match (actual, test.expected) {
141 (Ok(actual), Ok(expected)) => {
142 assert_eq!(actual, expected, "TC{} failed", index)
143 }
144 (Err(_), Err(_)) => {
145 }
147 (actual, expected) => {
148 panic!(
150 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
151 );
152 }
153 }
154 }
155 }
156 }
157}