use crate::{
clients::{disconnect::WssExitReason, wss::WssDecoder},
sources::bybit::decoder::BybitDecoder,
};
use futures_util::{SinkExt, StreamExt};
use std::sync::Arc;
use tokio::{
sync::{Mutex, mpsc},
time::{Duration, sleep},
};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};
use tracing::{error, info, warn};
use url::Url;
use crate::sources::bybit::events::BybitWssEvent;
const BYBIT_WSS_URL: &str = "wss://stream.bybit.com/v5/public/linear";
const HEARTBEAT_INTERVAL_SECS: u64 = 25;
pub struct BybitWssClient {
base_url: String,
streams: Vec<String>,
}
impl BybitWssClient {
pub fn new(streams: Vec<String>) -> Self {
Self {
base_url: BYBIT_WSS_URL.to_string(),
streams,
}
}
pub fn with_url(base_url: impl Into<String>, streams: Vec<String>) -> Self {
Self {
base_url: base_url.into(),
streams,
}
}
pub async fn receive_data(
&self,
tx: mpsc::Sender<BybitWssEvent>,
) -> WssExitReason {
let url = match Url::parse(&self.base_url) {
Ok(u) => u,
Err(e) => return WssExitReason::ConnectionFailed(format!("URL parse: {e}")),
};
let (ws_stream, _) = match connect_async(url).await {
Ok(s) => s,
Err(e) => return WssExitReason::Transport(e),
};
info!("WebSocket connected to Bybit");
let (writer_half, mut reader_half) = ws_stream.split();
let writer = Arc::new(Mutex::new(writer_half));
let sub_msg = serde_json::json!({
"op": "subscribe",
"args": self.streams
})
.to_string();
if let Err(e) = writer.lock().await.send(Message::Text(sub_msg)).await {
return WssExitReason::Transport(e);
}
let hb_writer = writer.clone();
let hb_handle = tokio::spawn(async move {
loop {
sleep(Duration::from_secs(HEARTBEAT_INTERVAL_SECS)).await;
if hb_writer
.lock()
.await
.send(Message::Text(r#"{"op":"ping"}"#.into()))
.await
.is_err()
{
break;
}
}
});
let pong_writer = writer.clone();
let mut exit_reason = WssExitReason::StreamEnded;
while let Some(msg) = reader_half.next().await {
match msg {
Ok(Message::Text(txt)) => {
if txt.contains(r#""op":"ping""#) {
let _ = pong_writer
.lock()
.await
.send(Message::Text(r#"{"op":"pong"}"#.into()))
.await;
continue;
}
match BybitDecoder::decode(&txt) {
Ok(Some(event)) => {
if tx.send(event).await.is_err() {
exit_reason = WssExitReason::ReceiverDropped;
break;
}
}
Ok(None) => {} Err(e) => warn!("Decode error: {}", e),
}
}
Ok(Message::Ping(p)) => {
let _ = pong_writer.lock().await.send(Message::Pong(p)).await;
}
Ok(Message::Close(f)) => {
info!("Bybit server closed connection: {:?}", f);
exit_reason = WssExitReason::ServerClose(f);
break;
}
Err(e) => {
error!("Bybit ws error: {}", e);
exit_reason = WssExitReason::Transport(e);
break;
}
_ => {}
}
}
hb_handle.abort();
exit_reason
}
}