barter_data/exchange/bybit/
mod.rs1use crate::{
2 ExchangeWsStream, NoInitialSnapshots,
3 exchange::{
4 Connector, ExchangeServer, PingInterval, StreamSelector,
5 bybit::{channel::BybitChannel, market::BybitMarket, subscription::BybitResponse},
6 subscription::ExchangeSub,
7 },
8 instrument::InstrumentData,
9 subscriber::{WebSocketSubscriber, validator::WebSocketSubValidator},
10 subscription::{
11 Map,
12 book::{OrderBooksL1, OrderBooksL2},
13 trade::PublicTrades,
14 },
15 transformer::stateless::StatelessTransformer,
16};
17use barter_instrument::exchange::ExchangeId;
18use barter_integration::{error::SocketError, protocol::websocket::WsMessage};
19use book::{BybitOrderBookMessage, l2::BybitOrderBooksL2Transformer};
20use serde::de::{Error, Unexpected};
21use std::{fmt::Debug, marker::PhantomData, time::Duration};
22use tokio::time;
23use trade::BybitTrade;
24use url::Url;
25
26pub mod channel;
29
30pub mod futures;
33
34pub mod market;
37
38pub mod message;
41
42pub mod spot;
45
46pub mod subscription;
50
51pub mod trade;
54
55pub mod book;
58
59#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default)]
65pub struct Bybit<Server> {
66 server: PhantomData<Server>,
67}
68
69impl<Server> Connector for Bybit<Server>
70where
71 Server: ExchangeServer,
72{
73 const ID: ExchangeId = Server::ID;
74 type Channel = BybitChannel;
75 type Market = BybitMarket;
76 type Subscriber = WebSocketSubscriber;
77 type SubValidator = WebSocketSubValidator;
78 type SubResponse = BybitResponse;
79
80 fn url() -> Result<Url, SocketError> {
81 Url::parse(Server::websocket_url()).map_err(SocketError::UrlParse)
82 }
83
84 fn ping_interval() -> Option<PingInterval> {
85 Some(PingInterval {
86 interval: time::interval(Duration::from_millis(5_000)),
87 ping: || {
88 WsMessage::text(
89 serde_json::json!({
90 "op": "ping",
91 })
92 .to_string(),
93 )
94 },
95 })
96 }
97
98 fn requests(exchange_subs: Vec<ExchangeSub<Self::Channel, Self::Market>>) -> Vec<WsMessage> {
99 let stream_names = exchange_subs
100 .into_iter()
101 .map(|sub| format!("{}.{}", sub.channel.as_ref(), sub.market.as_ref(),))
102 .collect::<Vec<String>>();
103
104 vec![WsMessage::text(
105 serde_json::json!({
106 "op": "subscribe",
107 "args": stream_names
108 })
109 .to_string(),
110 )]
111 }
112
113 fn expected_responses<InstrumentKey>(_: &Map<InstrumentKey>) -> usize {
114 1
115 }
116}
117
118impl<Instrument, Server> StreamSelector<Instrument, PublicTrades> for Bybit<Server>
119where
120 Instrument: InstrumentData,
121 Server: ExchangeServer + Debug + Send + Sync,
122{
123 type SnapFetcher = NoInitialSnapshots;
124 type Stream =
125 ExchangeWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, BybitTrade>>;
126}
127
128impl<Instrument, Server> StreamSelector<Instrument, OrderBooksL1> for Bybit<Server>
129where
130 Instrument: InstrumentData,
131 Server: ExchangeServer + Debug + Send + Sync,
132{
133 type SnapFetcher = NoInitialSnapshots;
134 type Stream = ExchangeWsStream<
135 StatelessTransformer<Self, Instrument::Key, OrderBooksL1, BybitOrderBookMessage>,
136 >;
137}
138
139impl<Instrument, Server> StreamSelector<Instrument, OrderBooksL2> for Bybit<Server>
140where
141 Instrument: InstrumentData,
142 Server: ExchangeServer + Debug + Send + Sync,
143{
144 type SnapFetcher = NoInitialSnapshots;
145 type Stream = ExchangeWsStream<BybitOrderBooksL2Transformer<Instrument::Key>>;
146}
147
148impl<'de, Server> serde::Deserialize<'de> for Bybit<Server>
149where
150 Server: ExchangeServer,
151{
152 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
153 where
154 D: serde::de::Deserializer<'de>,
155 {
156 let input = <&str as serde::Deserialize>::deserialize(deserializer)?;
157
158 if input == Self::ID.as_str() {
159 Ok(Self::default())
160 } else {
161 Err(Error::invalid_value(
162 Unexpected::Str(input),
163 &Self::ID.as_str(),
164 ))
165 }
166 }
167}
168
169impl<Server> serde::Serialize for Bybit<Server>
170where
171 Server: ExchangeServer,
172{
173 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
174 where
175 S: serde::ser::Serializer,
176 {
177 serializer.serialize_str(Self::ID.as_str())
178 }
179}