use std::pin::Pin;
use std::sync::Arc;
use async_trait::async_trait;
use futures_util::Stream;
use tokio::sync::Mutex as TokioMutex;
use crate::core::traits::{Credentials, WebSocketConnector};
use crate::core::types::{
AccountType, ConnectionStatus, ExchangeResult,
OrderbookCapabilities, StreamEvent, SubscriptionRequest, WebSocketResult,
WsBookChannel,
};
use crate::core::websocket::{StreamSpec, UniversalWsTransport};
use super::protocol::BybitProtocol;
pub struct BybitWebSocket {
inner: UniversalWsTransport<BybitProtocol>,
_account_type: AccountType,
}
impl BybitWebSocket {
pub async fn new(
credentials: Option<Credentials>,
testnet: bool,
account_type: AccountType,
) -> ExchangeResult<Self> {
let protocol = BybitProtocol::new(account_type, testnet);
let inner = UniversalWsTransport::new(protocol, account_type, testnet, credentials);
Ok(Self { inner, _account_type: account_type })
}
}
#[async_trait]
impl WebSocketConnector for BybitWebSocket {
async fn connect(&self, _account_type: AccountType) -> WebSocketResult<()> {
self.inner.connect().await
}
async fn disconnect(&self) -> WebSocketResult<()> {
self.inner.disconnect().await
}
fn connection_status(&self) -> ConnectionStatus {
self.inner.connection_status()
}
async fn subscribe(&self, request: SubscriptionRequest) -> WebSocketResult<()> {
let spec = StreamSpec::try_from(request)?;
self.inner.subscribe(spec).await
}
async fn unsubscribe(&self, request: SubscriptionRequest) -> WebSocketResult<()> {
let spec = StreamSpec::try_from(request)?;
self.inner.unsubscribe(spec).await
}
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = WebSocketResult<StreamEvent>> + Send>> {
Box::pin(self.inner.event_stream())
}
fn active_subscriptions(&self) -> Vec<SubscriptionRequest> {
self.inner
.active_subscriptions()
.into_iter()
.map(SubscriptionRequest::from)
.collect()
}
fn ping_rtt_handle(&self) -> Option<Arc<TokioMutex<u64>>> {
None
}
fn orderbook_capabilities(&self, account_type: AccountType) -> OrderbookCapabilities {
static SPOT_CHANNELS: &[WsBookChannel] = &[
WsBookChannel::snapshot("orderbook.1", 1, 10),
WsBookChannel::delta("orderbook.50", Some(50), Some(20)),
WsBookChannel::delta("orderbook.200", Some(200), Some(100)),
WsBookChannel::delta("orderbook.1000", Some(1000), Some(200)),
];
static LINEAR_CHANNELS: &[WsBookChannel] = &[
WsBookChannel::snapshot("orderbook.1", 1, 10),
WsBookChannel::delta("orderbook.50", Some(50), Some(20)),
WsBookChannel::delta("orderbook.200", Some(200), Some(100)),
WsBookChannel::delta("orderbook.1000", Some(1000), Some(200)),
];
static OPTION_CHANNELS: &[WsBookChannel] = &[
WsBookChannel::delta("orderbook.25", Some(25), Some(20)),
WsBookChannel::delta("orderbook.100", Some(100), Some(100)),
];
match account_type {
AccountType::Options => OrderbookCapabilities {
ws_depths: &[25, 100],
ws_default_depth: Some(25),
rest_max_depth: Some(25),
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[20, 100],
default_speed_ms: Some(20),
ws_channels: OPTION_CHANNELS,
checksum: None,
has_sequence: true,
has_prev_sequence: false,
supports_aggregation: false,
aggregation_levels: &[],
},
AccountType::Spot => OrderbookCapabilities {
ws_depths: &[1, 50, 200, 1000],
ws_default_depth: Some(50),
rest_max_depth: Some(200),
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[10, 20, 100, 200],
default_speed_ms: Some(20),
ws_channels: SPOT_CHANNELS,
checksum: None,
has_sequence: true,
has_prev_sequence: false,
supports_aggregation: false,
aggregation_levels: &[],
},
_ => OrderbookCapabilities {
ws_depths: &[1, 50, 200, 1000],
ws_default_depth: Some(50),
rest_max_depth: Some(500),
rest_depth_values: &[],
supports_snapshot: true,
supports_delta: true,
update_speeds_ms: &[10, 20, 100, 200],
default_speed_ms: Some(20),
ws_channels: LINEAR_CHANNELS,
checksum: None,
has_sequence: true,
has_prev_sequence: false,
supports_aggregation: false,
aggregation_levels: &[],
},
}
}
}