barter_data/exchange/bybit/
mod.rs1use crate::{
2 ExchangeWsStream, NoInitialSnapshots,
3 exchange::{
4 Connector, ExchangeServer, PingInterval, StreamSelector,
5 bybit::{
6 channel::BybitChannel, market::BybitMarket, message::BybitMessage,
7 subscription::BybitResponse,
8 },
9 subscription::ExchangeSub,
10 },
11 instrument::InstrumentData,
12 subscriber::{WebSocketSubscriber, validator::WebSocketSubValidator},
13 subscription::{Map, trade::PublicTrades},
14 transformer::stateless::StatelessTransformer,
15};
16use barter_instrument::exchange::ExchangeId;
17use barter_integration::{error::SocketError, protocol::websocket::WsMessage};
18use serde::de::{Error, Unexpected};
19use std::{fmt::Debug, marker::PhantomData, time::Duration};
20use tokio::time;
21use url::Url;
22
23pub mod channel;
26
27pub mod futures;
30
31pub mod market;
34
35pub mod message;
38
39pub mod spot;
42
43pub mod subscription;
47
48pub mod trade;
51
52#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default)]
58pub struct Bybit<Server> {
59 server: PhantomData<Server>,
60}
61
62impl<Server> Connector for Bybit<Server>
63where
64 Server: ExchangeServer,
65{
66 const ID: ExchangeId = Server::ID;
67 type Channel = BybitChannel;
68 type Market = BybitMarket;
69 type Subscriber = WebSocketSubscriber;
70 type SubValidator = WebSocketSubValidator;
71 type SubResponse = BybitResponse;
72
73 fn url() -> Result<Url, SocketError> {
74 Url::parse(Server::websocket_url()).map_err(SocketError::UrlParse)
75 }
76
77 fn ping_interval() -> Option<PingInterval> {
78 Some(PingInterval {
79 interval: time::interval(Duration::from_millis(5_000)),
80 ping: || {
81 WsMessage::text(
82 serde_json::json!({
83 "op": "ping",
84 })
85 .to_string(),
86 )
87 },
88 })
89 }
90
91 fn requests(exchange_subs: Vec<ExchangeSub<Self::Channel, Self::Market>>) -> Vec<WsMessage> {
92 let stream_names = exchange_subs
93 .into_iter()
94 .map(|sub| format!("{}.{}", sub.channel.as_ref(), sub.market.as_ref(),))
95 .collect::<Vec<String>>();
96
97 vec![WsMessage::text(
98 serde_json::json!({
99 "op": "subscribe",
100 "args": stream_names
101 })
102 .to_string(),
103 )]
104 }
105
106 fn expected_responses<InstrumentKey>(_: &Map<InstrumentKey>) -> usize {
107 1
108 }
109}
110
111impl<Instrument, Server> StreamSelector<Instrument, PublicTrades> for Bybit<Server>
112where
113 Instrument: InstrumentData,
114 Server: ExchangeServer + Debug + Send + Sync,
115{
116 type SnapFetcher = NoInitialSnapshots;
117 type Stream =
118 ExchangeWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, BybitMessage>>;
119}
120
121impl<'de, Server> serde::Deserialize<'de> for Bybit<Server>
122where
123 Server: ExchangeServer,
124{
125 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
126 where
127 D: serde::de::Deserializer<'de>,
128 {
129 let input = <&str as serde::Deserialize>::deserialize(deserializer)?;
130
131 if input == Self::ID.as_str() {
132 Ok(Self::default())
133 } else {
134 Err(Error::invalid_value(
135 Unexpected::Str(input),
136 &Self::ID.as_str(),
137 ))
138 }
139 }
140}
141
142impl<Server> serde::Serialize for Bybit<Server>
143where
144 Server: ExchangeServer,
145{
146 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
147 where
148 S: serde::ser::Serializer,
149 {
150 serializer.serialize_str(Self::ID.as_str())
151 }
152}