barter_data/exchange/bybit/
mod.rs

1use crate::{
2    ExchangeWsStream, NoInitialSnapshots,
3    exchange::{
4        Connector, ExchangeServer, PingInterval, StreamSelector,
5        bybit::{
6            channel::BybitChannel, market::BybitMarket, message::BybitMessage,
7            subscription::BybitResponse,
8        },
9        subscription::ExchangeSub,
10    },
11    instrument::InstrumentData,
12    subscriber::{WebSocketSubscriber, validator::WebSocketSubValidator},
13    subscription::{Map, trade::PublicTrades},
14    transformer::stateless::StatelessTransformer,
15};
16use barter_instrument::exchange::ExchangeId;
17use barter_integration::{error::SocketError, protocol::websocket::WsMessage};
18use serde::de::{Error, Unexpected};
19use std::{fmt::Debug, marker::PhantomData, time::Duration};
20use tokio::time;
21use url::Url;
22
23/// Defines the type that translates a Barter [`Subscription`](crate::subscription::Subscription)
24/// into an execution [`Connector`] specific channel used for generating [`Connector::requests`].
25pub mod channel;
26
27/// [`ExchangeServer`] and [`StreamSelector`] implementations for
28/// [`BybitFuturesUsd`](futures::BybitPerpetualsUsd).
29pub mod futures;
30
31/// Defines the type that translates a Barter [`Subscription`](crate::subscription::Subscription)
32/// into an execution [`Connector`] specific market used for generating [`Connector::requests`].
33pub mod market;
34
35/// Generic [`BybitPayload<T>`](message::BybitPayload) type common to
36/// [`BybitSpot`](spot::BybitSpot)
37pub mod message;
38
39/// [`ExchangeServer`] and [`StreamSelector`] implementations for
40/// [`BybitSpot`](spot::BybitSpot).
41pub mod spot;
42
43/// [`Subscription`](crate::subscription::Subscription) response type and response
44/// [`Validator`](barter_integration::Validator) common to both [`BybitSpot`](spot::BybitSpot)
45/// and [`BybitFuturesUsd`](futures::BybitPerpetualsUsd).
46pub mod subscription;
47
48/// Public trade types common to both [`BybitSpot`](spot::BybitSpot) and
49/// [`BybitFuturesUsd`](futures::BybitPerpetualsUsd).
50pub mod trade;
51
52/// Generic [`Bybit<Server>`](Bybit) execution.
53///
54/// ### Notes
55/// A `Server` [`ExchangeServer`] implementations exists for
56/// [`BybitSpot`](spot::BybitSpot) and [`BybitFuturesUsd`](futures::BybitPerpetualsUsd).
57#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default)]
58pub struct Bybit<Server> {
59    server: PhantomData<Server>,
60}
61
62impl<Server> Connector for Bybit<Server>
63where
64    Server: ExchangeServer,
65{
66    const ID: ExchangeId = Server::ID;
67    type Channel = BybitChannel;
68    type Market = BybitMarket;
69    type Subscriber = WebSocketSubscriber;
70    type SubValidator = WebSocketSubValidator;
71    type SubResponse = BybitResponse;
72
73    fn url() -> Result<Url, SocketError> {
74        Url::parse(Server::websocket_url()).map_err(SocketError::UrlParse)
75    }
76
77    fn ping_interval() -> Option<PingInterval> {
78        Some(PingInterval {
79            interval: time::interval(Duration::from_millis(5_000)),
80            ping: || {
81                WsMessage::text(
82                    serde_json::json!({
83                        "op": "ping",
84                    })
85                    .to_string(),
86                )
87            },
88        })
89    }
90
91    fn requests(exchange_subs: Vec<ExchangeSub<Self::Channel, Self::Market>>) -> Vec<WsMessage> {
92        let stream_names = exchange_subs
93            .into_iter()
94            .map(|sub| format!("{}.{}", sub.channel.as_ref(), sub.market.as_ref(),))
95            .collect::<Vec<String>>();
96
97        vec![WsMessage::text(
98            serde_json::json!({
99                "op": "subscribe",
100                "args": stream_names
101            })
102            .to_string(),
103        )]
104    }
105
106    fn expected_responses<InstrumentKey>(_: &Map<InstrumentKey>) -> usize {
107        1
108    }
109}
110
111impl<Instrument, Server> StreamSelector<Instrument, PublicTrades> for Bybit<Server>
112where
113    Instrument: InstrumentData,
114    Server: ExchangeServer + Debug + Send + Sync,
115{
116    type SnapFetcher = NoInitialSnapshots;
117    type Stream =
118        ExchangeWsStream<StatelessTransformer<Self, Instrument::Key, PublicTrades, BybitMessage>>;
119}
120
121impl<'de, Server> serde::Deserialize<'de> for Bybit<Server>
122where
123    Server: ExchangeServer,
124{
125    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
126    where
127        D: serde::de::Deserializer<'de>,
128    {
129        let input = <&str as serde::Deserialize>::deserialize(deserializer)?;
130
131        if input == Self::ID.as_str() {
132            Ok(Self::default())
133        } else {
134            Err(Error::invalid_value(
135                Unexpected::Str(input),
136                &Self::ID.as_str(),
137            ))
138        }
139    }
140}
141
142impl<Server> serde::Serialize for Bybit<Server>
143where
144    Server: ExchangeServer,
145{
146    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
147    where
148        S: serde::ser::Serializer,
149    {
150        serializer.serialize_str(Self::ID.as_str())
151    }
152}