use crate::{
clients::wss::WssDecoder,
errors::ExchangeError,
sources::bybit::events::BybitWssEvent,
sources::bybit::responses::{
liquidations::BybitLiquidationResponse, orderbooks::BybitOrderbookResponse,
tickers::BybitTickerResponse, trades::BybitTradeResponse,
},
};
use serde_json::Value;
pub struct BybitDecoder;
impl WssDecoder for BybitDecoder {
type Event = BybitWssEvent;
fn decode(text: &str) -> Result<Option<Self::Event>, Box<ExchangeError>> {
let json: Value =
serde_json::from_str(text).map_err(|e| Box::new(ExchangeError::from(e)))?;
if let Some(op) = json.get("op") {
let op_str = op.as_str().unwrap_or("");
match op_str {
"pong" => return Ok(None),
"subscribe" => {
let success = json
.get("success")
.and_then(|v| v.as_bool())
.unwrap_or(false);
let ret_msg = json
.get("ret_msg")
.and_then(|v| v.as_str())
.unwrap_or("");
let conn_id = json
.get("conn_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
if success {
tracing::info!(
conn_id = conn_id,
ret_msg = ret_msg,
"bybit.subscription_confirmed"
);
} else {
tracing::error!(
conn_id = conn_id,
ret_msg = ret_msg,
raw = text,
"bybit.subscription_failed"
);
}
return Ok(None);
}
_ => {
tracing::debug!(op = op_str, "bybit.control_frame");
return Ok(None);
}
}
}
if let Some(success) = json.get("success") {
if success.as_bool() == Some(true) {
tracing::info!("bybit.subscription_confirmed_legacy");
return Ok(None);
}
}
if let Some(topic) = json.get("topic") {
if let Some(topic_str) = topic.as_str() {
if topic_str.starts_with("allLiquidation.") {
match serde_json::from_str::<BybitLiquidationResponse>(text) {
Ok(env) => {
if let Some(liq_data) = env.data.into_iter().next() {
Ok(Some(Self::Event::LiquidationData(liq_data)))
} else {
Ok(None)
}
}
Err(e) => {
tracing::warn!("Failed to parse liquidation data: {}", e);
Ok(None)
}
}
} else if topic_str.starts_with("publicTrade.") {
match serde_json::from_str::<BybitTradeResponse>(text) {
Ok(env) => {
if let Some(trade_data) = env.data.into_iter().next() {
Ok(Some(Self::Event::TradeData(trade_data)))
} else {
Ok(None)
}
}
Err(e) => {
tracing::warn!("Failed to parse public trade data: {}", e);
Ok(None)
}
}
} else if topic_str.starts_with("orderbook.") {
match serde_json::from_str::<BybitOrderbookResponse>(text) {
Ok(env) => {
if let Some(orderbook_data) = Some(env) {
Ok(Some(Self::Event::OrderbookData(orderbook_data)))
} else {
Ok(None)
}
}
Err(e) => {
tracing::warn!("Failed to parse order book data: {}", e);
Ok(None)
}
}
} else if topic_str.starts_with("tickers") {
match serde_json::from_str::<BybitTickerResponse>(text) {
Ok(env) => Ok(Some(Self::Event::TickerData(env.data))),
Err(e) => {
tracing::warn!("Failed to parse ticker data: {}", e);
Ok(None)
}
}
} else {
Ok(None) }
} else {
Ok(None)
}
} else {
tracing::debug!("Received unknown message type: {}", text);
Ok(None)
}
}
}