barter_data/exchange/bybit/
mod.rs1use crate::{
2 ExchangeWsStream, NoInitialSnapshots,
3 exchange::{
4 Connector, ExchangeServer, PingInterval, StreamSelector,
5 bybit::{channel::BybitChannel, market::BybitMarket, subscription::BybitResponse},
6 subscription::ExchangeSub,
7 },
8 instrument::InstrumentData,
9 subscriber::{WebSocketSubscriber, validator::WebSocketSubValidator},
10 subscription::{
11 Map,
12 book::{OrderBooksL1, OrderBooksL2},
13 trade::PublicTrades,
14 },
15 transformer::stateless::StatelessTransformer,
16};
17use barter_instrument::exchange::ExchangeId;
18use barter_integration::protocol::websocket::{WebSocketSerdeParser, WsMessage};
19use book::{BybitOrderBookMessage, l2::BybitOrderBooksL2Transformer};
20use serde::de::{Error, Unexpected};
21use std::{fmt::Debug, marker::PhantomData, time::Duration};
22use tokio::time;
23use trade::BybitTrade;
24use url::Url;
25
26pub mod channel;
29
30pub mod futures;
33
34pub mod market;
37
38pub mod message;
41
42pub mod spot;
45
46pub mod subscription;
50
51pub mod trade;
54
55pub mod book;
58
59pub type BybitWsStream<Transformer> = ExchangeWsStream<WebSocketSerdeParser, Transformer>;
61
62#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default)]
68pub struct Bybit<Server> {
69 server: PhantomData<Server>,
70}
71
72impl<Server> Connector for Bybit<Server>
73where
74 Server: ExchangeServer,
75{
76 const ID: ExchangeId = Server::ID;
77 type Channel = BybitChannel;
78 type Market = BybitMarket;
79 type Subscriber = WebSocketSubscriber;
80 type SubValidator = WebSocketSubValidator;
81 type SubResponse = BybitResponse;
82
83 fn url() -> Result<Url, url::ParseError> {
84 Url::parse(Server::websocket_url())
85 }
86
87 fn ping_interval() -> Option<PingInterval> {
88 Some(PingInterval {
89 interval: time::interval(Duration::from_millis(5_000)),
90 ping: || {
91 WsMessage::text(
92 serde_json::json!({
93 "op": "ping",
94 })
95 .to_string(),
96 )
97 },
98 })
99 }
100
101 fn requests(exchange_subs: Vec<ExchangeSub<Self::Channel, Self::Market>>) -> Vec<WsMessage> {
102 let stream_names = exchange_subs
103 .into_iter()
104 .map(|sub| format!("{}.{}", sub.channel.as_ref(), sub.market.as_ref(),))
105 .collect::<Vec<String>>();
106
107 vec![WsMessage::text(
108 serde_json::json!({
109 "op": "subscribe",
110 "args": stream_names
111 })
112 .to_string(),
113 )]
114 }
115
116 fn expected_responses<InstrumentKey>(_: &Map<InstrumentKey>) -> usize {
117 1
118 }
119}
120
121impl<Instrument, Server> StreamSelector<Instrument, PublicTrades> for Bybit<Server>
122where
123 Instrument: InstrumentData,
124 Server: ExchangeServer + Debug + Send + Sync,
125{
126 type SnapFetcher = NoInitialSnapshots;
127 type Stream =
128 BybitWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, BybitTrade>>;
129}
130
131impl<Instrument, Server> StreamSelector<Instrument, OrderBooksL1> for Bybit<Server>
132where
133 Instrument: InstrumentData,
134 Server: ExchangeServer + Debug + Send + Sync,
135{
136 type SnapFetcher = NoInitialSnapshots;
137 type Stream = BybitWsStream<
138 StatelessTransformer<Self, Instrument::Key, OrderBooksL1, BybitOrderBookMessage>,
139 >;
140}
141
142impl<Instrument, Server> StreamSelector<Instrument, OrderBooksL2> for Bybit<Server>
143where
144 Instrument: InstrumentData,
145 Server: ExchangeServer + Debug + Send + Sync,
146{
147 type SnapFetcher = NoInitialSnapshots;
148 type Stream = BybitWsStream<BybitOrderBooksL2Transformer<Instrument::Key>>;
149}
150
151impl<'de, Server> serde::Deserialize<'de> for Bybit<Server>
152where
153 Server: ExchangeServer,
154{
155 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
156 where
157 D: serde::de::Deserializer<'de>,
158 {
159 let input = <&str as serde::Deserialize>::deserialize(deserializer)?;
160
161 if input == Self::ID.as_str() {
162 Ok(Self::default())
163 } else {
164 Err(Error::invalid_value(
165 Unexpected::Str(input),
166 &Self::ID.as_str(),
167 ))
168 }
169 }
170}
171
172impl<Server> serde::Serialize for Bybit<Server>
173where
174 Server: ExchangeServer,
175{
176 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
177 where
178 S: serde::ser::Serializer,
179 {
180 serializer.serialize_str(Self::ID.as_str())
181 }
182}