Skip to main content

barter_data/exchange/bybit/
mod.rs

1use crate::{
2    ExchangeWsStream, NoInitialSnapshots,
3    exchange::{
4        Connector, ExchangeServer, PingInterval, StreamSelector,
5        bybit::{channel::BybitChannel, market::BybitMarket, subscription::BybitResponse},
6        subscription::ExchangeSub,
7    },
8    instrument::InstrumentData,
9    subscriber::{WebSocketSubscriber, validator::WebSocketSubValidator},
10    subscription::{
11        Map,
12        book::{OrderBooksL1, OrderBooksL2},
13        trade::PublicTrades,
14    },
15    transformer::stateless::StatelessTransformer,
16};
17use barter_instrument::exchange::ExchangeId;
18use barter_integration::protocol::websocket::{WebSocketSerdeParser, WsMessage};
19use book::{BybitOrderBookMessage, l2::BybitOrderBooksL2Transformer};
20use serde::de::{Error, Unexpected};
21use std::{fmt::Debug, marker::PhantomData, time::Duration};
22use tokio::time;
23use trade::BybitTrade;
24use url::Url;
25
26/// Defines the type that translates a Barter [`Subscription`](crate::subscription::Subscription)
27/// into an exchange [`Connector`] specific channel used for generating [`Connector::requests`].
28pub mod channel;
29
30/// [`ExchangeServer`] and [`StreamSelector`] implementations for
31/// [`BybitFuturesUsd`](futures::BybitPerpetualsUsd).
32pub mod futures;
33
34/// Defines the type that translates a Barter [`Subscription`](crate::subscription::Subscription)
35/// into an exchange [`Connector`] specific market used for generating [`Connector::requests`].
36pub mod market;
37
38/// Generic [`BybitPayload<T>`](message::BybitPayload) type common to
39/// [`BybitSpot`](spot::BybitSpot)
40pub mod message;
41
42/// [`ExchangeServer`] and [`StreamSelector`] implementations for
43/// [`BybitSpot`](spot::BybitSpot).
44pub mod spot;
45
46/// [`Subscription`](crate::subscription::Subscription) response type and response
47/// [`Validator`](barter_integration::Validator) common to both [`BybitSpot`](spot::BybitSpot)
48/// and [`BybitFuturesUsd`](futures::BybitPerpetualsUsd).
49pub mod subscription;
50
51/// Public trade types common to both [`BybitSpot`](spot::BybitSpot) and
52/// [`BybitFuturesUsd`](futures::BybitPerpetualsUsd).
53pub mod trade;
54
55/// Orderbook types common to both [`BybitSpot`](spot::BybitSpot) and
56/// [`BybitFuturesUsd`](futures::BybitPerpetualsUsd).
57pub mod book;
58
59/// Convenient type alias for a Bybit [`ExchangeWsStream`] using [`WebSocketSerdeParser`](barter_integration::protocol::websocket::WebSocketSerdeParser).
60pub type BybitWsStream<Transformer> = ExchangeWsStream<WebSocketSerdeParser, Transformer>;
61
62/// Generic [`Bybit<Server>`](Bybit) exchange.
63///
64/// ### Notes
65/// A `Server` [`ExchangeServer`] implementations exists for
66/// [`BybitSpot`](spot::BybitSpot) and [`BybitFuturesUsd`](futures::BybitPerpetualsUsd).
67#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default)]
68pub struct Bybit<Server> {
69    server: PhantomData<Server>,
70}
71
72impl<Server> Connector for Bybit<Server>
73where
74    Server: ExchangeServer,
75{
76    const ID: ExchangeId = Server::ID;
77    type Channel = BybitChannel;
78    type Market = BybitMarket;
79    type Subscriber = WebSocketSubscriber;
80    type SubValidator = WebSocketSubValidator;
81    type SubResponse = BybitResponse;
82
83    fn url() -> Result<Url, url::ParseError> {
84        Url::parse(Server::websocket_url())
85    }
86
87    fn ping_interval() -> Option<PingInterval> {
88        Some(PingInterval {
89            interval: time::interval(Duration::from_millis(5_000)),
90            ping: || {
91                WsMessage::text(
92                    serde_json::json!({
93                        "op": "ping",
94                    })
95                    .to_string(),
96                )
97            },
98        })
99    }
100
101    fn requests(exchange_subs: Vec<ExchangeSub<Self::Channel, Self::Market>>) -> Vec<WsMessage> {
102        let stream_names = exchange_subs
103            .into_iter()
104            .map(|sub| format!("{}.{}", sub.channel.as_ref(), sub.market.as_ref(),))
105            .collect::<Vec<String>>();
106
107        vec![WsMessage::text(
108            serde_json::json!({
109                "op": "subscribe",
110                "args": stream_names
111            })
112            .to_string(),
113        )]
114    }
115
116    fn expected_responses<InstrumentKey>(_: &Map<InstrumentKey>) -> usize {
117        1
118    }
119}
120
121impl<Instrument, Server> StreamSelector<Instrument, PublicTrades> for Bybit<Server>
122where
123    Instrument: InstrumentData,
124    Server: ExchangeServer + Debug + Send + Sync,
125{
126    type SnapFetcher = NoInitialSnapshots;
127    type Stream =
128        BybitWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, BybitTrade>>;
129}
130
131impl<Instrument, Server> StreamSelector<Instrument, OrderBooksL1> for Bybit<Server>
132where
133    Instrument: InstrumentData,
134    Server: ExchangeServer + Debug + Send + Sync,
135{
136    type SnapFetcher = NoInitialSnapshots;
137    type Stream = BybitWsStream<
138        StatelessTransformer<Self, Instrument::Key, OrderBooksL1, BybitOrderBookMessage>,
139    >;
140}
141
142impl<Instrument, Server> StreamSelector<Instrument, OrderBooksL2> for Bybit<Server>
143where
144    Instrument: InstrumentData,
145    Server: ExchangeServer + Debug + Send + Sync,
146{
147    type SnapFetcher = NoInitialSnapshots;
148    type Stream = BybitWsStream<BybitOrderBooksL2Transformer<Instrument::Key>>;
149}
150
151impl<'de, Server> serde::Deserialize<'de> for Bybit<Server>
152where
153    Server: ExchangeServer,
154{
155    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
156    where
157        D: serde::de::Deserializer<'de>,
158    {
159        let input = <&str as serde::Deserialize>::deserialize(deserializer)?;
160
161        if input == Self::ID.as_str() {
162            Ok(Self::default())
163        } else {
164            Err(Error::invalid_value(
165                Unexpected::Str(input),
166                &Self::ID.as_str(),
167            ))
168        }
169    }
170}
171
172impl<Server> serde::Serialize for Bybit<Server>
173where
174    Server: ExchangeServer,
175{
176    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
177    where
178        S: serde::ser::Serializer,
179    {
180        serializer.serialize_str(Self::ID.as_str())
181    }
182}