use std::collections::{HashMap, HashSet};
use std::pin::Pin;
use std::sync::{Arc, Mutex as StdMutex, OnceLock};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures_util::{Stream, StreamExt, SinkExt};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::{mpsc, broadcast, Mutex};
use tokio_tungstenite::{connect_async, tungstenite::Message, WebSocketStream, MaybeTlsStream};
use crate::core::{
Credentials, AccountType,
ExchangeError, ExchangeResult,
ConnectionStatus, StreamEvent, StreamType, SubscriptionRequest,
timestamp_millis,
};
use crate::core::types::{WebSocketResult, WebSocketError};
use crate::core::traits::WebSocketConnector;
use crate::core::utils::SimpleRateLimiter;
use super::auth::BitfinexAuth;
use super::endpoints::{BitfinexUrls, format_symbol};
use super::parser::BitfinexParser;
const WS_PUBLIC_URL: &str = "wss://api-pub.bitfinex.com/ws/2";
const WS_PRIVATE_URL: &str = "wss://api.bitfinex.com/ws/2";
static GLOBAL_WS_PUBLIC_LIMITER: OnceLock<Arc<StdMutex<SimpleRateLimiter>>> = OnceLock::new();
static GLOBAL_WS_PRIVATE_LIMITER: OnceLock<Arc<StdMutex<SimpleRateLimiter>>> = OnceLock::new();
fn get_global_ws_public_limiter() -> Arc<StdMutex<SimpleRateLimiter>> {
GLOBAL_WS_PUBLIC_LIMITER.get_or_init(|| {
Arc::new(StdMutex::new(SimpleRateLimiter::new(20, Duration::from_secs(60))))
}).clone()
}
fn get_global_ws_private_limiter() -> Arc<StdMutex<SimpleRateLimiter>> {
GLOBAL_WS_PRIVATE_LIMITER.get_or_init(|| {
Arc::new(StdMutex::new(SimpleRateLimiter::new(5, Duration::from_secs(15))))
}).clone()
}
#[derive(Debug, Clone, Serialize)]
struct SubscribeMessage {
event: String,
channel: String,
#[serde(skip_serializing_if = "Option::is_none")]
symbol: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
key: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
struct UnsubscribeMessage {
event: String,
#[serde(rename = "chanId")]
chan_id: u64,
}
#[derive(Debug, Clone, Serialize)]
struct AuthMessage {
event: String,
#[serde(rename = "apiKey")]
api_key: String,
#[serde(rename = "authSig")]
auth_sig: String,
#[serde(rename = "authPayload")]
auth_payload: String,
#[serde(rename = "authNonce")]
auth_nonce: String,
}
#[derive(Debug, Clone, Deserialize)]
#[allow(dead_code)]
struct InfoMessage {
event: String,
version: Option<u32>,
platform: Option<Value>,
}
#[derive(Debug, Clone, Deserialize)]
struct SubscriptionResponse {
#[allow(dead_code)]
event: String,
#[serde(rename = "chanId")]
chan_id: u64,
channel: String,
#[serde(rename = "symbol")]
symbol: Option<String>,
#[serde(rename = "key")]
key: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
struct ErrorMessage {
#[allow(dead_code)]
event: String,
msg: String,
code: i32,
}
type ChannelMap = HashMap<u64, SubscriptionRequest>;
type PendingSubscriptions = HashMap<String, SubscriptionRequest>;
type WsStream = WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
pub struct BitfinexWebSocket {
auth: Option<BitfinexAuth>,
_urls: BitfinexUrls,
account_type: AccountType,
status: Arc<Mutex<ConnectionStatus>>,
subscriptions: Arc<Mutex<HashSet<SubscriptionRequest>>>,
channels: Arc<Mutex<ChannelMap>>,
pending_subs: Arc<Mutex<PendingSubscriptions>>,
event_tx: Arc<Mutex<Option<mpsc::UnboundedSender<WebSocketResult<StreamEvent>>>>>,
broadcast_tx: Arc<StdMutex<Option<broadcast::Sender<WebSocketResult<StreamEvent>>>>>,
ws_stream: Arc<Mutex<Option<WsStream>>>,
write_tx: Arc<Mutex<Option<mpsc::UnboundedSender<Message>>>>,
is_authenticated: Arc<Mutex<bool>>,
last_ping: Arc<Mutex<Instant>>,
ws_ping_rtt_ms: Arc<Mutex<u64>>,
}
impl BitfinexWebSocket {
pub async fn new(
credentials: Option<Credentials>,
_testnet: bool, account_type: AccountType,
) -> ExchangeResult<Self> {
let urls = BitfinexUrls::MAINNET;
let auth = credentials
.as_ref()
.map(BitfinexAuth::new)
.transpose()?;
Ok(Self {
auth,
_urls: urls,
account_type,
status: Arc::new(Mutex::new(ConnectionStatus::Disconnected)),
subscriptions: Arc::new(Mutex::new(HashSet::new())),
channels: Arc::new(Mutex::new(HashMap::new())),
pending_subs: Arc::new(Mutex::new(HashMap::new())),
event_tx: Arc::new(Mutex::new(None)),
broadcast_tx: Arc::new(StdMutex::new(None)),
ws_stream: Arc::new(Mutex::new(None)),
write_tx: Arc::new(Mutex::new(None)),
is_authenticated: Arc::new(Mutex::new(false)),
last_ping: Arc::new(Mutex::new(Instant::now())),
ws_ping_rtt_ms: Arc::new(Mutex::new(0)),
})
}
fn ws_url(&self) -> &'static str {
if self.auth.is_some() {
WS_PRIVATE_URL
} else {
WS_PUBLIC_URL
}
}
async fn connect_ws(&self) -> ExchangeResult<WsStream> {
let ws_url = self.ws_url();
let limiter = if self.auth.is_some() {
get_global_ws_private_limiter()
} else {
get_global_ws_public_limiter()
};
let wait_time = {
let mut limiter_guard = limiter.lock().expect("Mutex poisoned");
if !limiter_guard.try_acquire() {
limiter_guard.time_until_ready()
} else {
Duration::ZERO
}
};
if !wait_time.is_zero() {
tokio::time::sleep(wait_time).await;
let mut limiter_guard = limiter.lock().expect("Mutex poisoned");
limiter_guard.try_acquire();
}
let (ws_stream, _) = connect_async(ws_url).await
.map_err(|e| ExchangeError::Network(format!("WebSocket connection failed: {}", e)))?;
Ok(ws_stream)
}
async fn authenticate(&self, stream: &mut WsStream) -> ExchangeResult<()> {
let auth = self.auth.as_ref()
.ok_or_else(|| ExchangeError::Auth("No credentials provided".to_string()))?;
let nonce = timestamp_millis().to_string();
let auth_payload = format!("AUTH{}", nonce);
let signature = auth.sign_auth(&auth_payload);
let auth_msg = AuthMessage {
event: "auth".to_string(),
api_key: auth.api_key().to_string(),
auth_sig: signature,
auth_payload,
auth_nonce: nonce,
};
let msg_json = serde_json::to_string(&auth_msg)
.map_err(|e| ExchangeError::Parse(format!("Failed to serialize auth message: {}", e)))?;
stream.send(Message::Text(msg_json)).await
.map_err(|e| ExchangeError::Network(format!("Failed to send auth message: {}", e)))?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn start_message_handler(
ws_stream: Arc<Mutex<Option<WsStream>>>,
event_tx: mpsc::UnboundedSender<WebSocketResult<StreamEvent>>,
mut write_rx: mpsc::UnboundedReceiver<Message>,
status: Arc<Mutex<ConnectionStatus>>,
channels: Arc<Mutex<ChannelMap>>,
pending_subs: Arc<Mutex<PendingSubscriptions>>,
is_authenticated: Arc<Mutex<bool>>,
account_type: AccountType,
last_ping: Arc<Mutex<Instant>>,
ws_ping_rtt_ms: Arc<Mutex<u64>>,
) {
tokio::spawn(async move {
let stream = {
let mut guard = ws_stream.lock().await;
match guard.take() {
Some(s) => s,
None => return,
}
};
let (mut write, mut read) = stream.split();
let mut ping_timer = tokio::time::interval(Duration::from_secs(5));
ping_timer.tick().await;
loop {
tokio::select! {
msg = read.next() => {
match msg {
Some(Ok(Message::Text(text))) => {
if let Err(e) = Self::handle_message(
&text,
&event_tx,
&channels,
&pending_subs,
&is_authenticated,
account_type,
&last_ping,
&ws_ping_rtt_ms,
).await {
let _ = event_tx.send(Err(e));
}
}
Some(Ok(Message::Ping(data))) => {
let _ = write.send(Message::Pong(data)).await;
}
Some(Ok(Message::Pong(_))) => {
}
Some(Ok(Message::Close(_))) => {
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
Some(Err(e)) => {
let _ = event_tx.send(Err(WebSocketError::ConnectionError(e.to_string())));
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
None => {
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
_ => {
}
}
}
Some(msg) = write_rx.recv() => {
if write.send(msg).await.is_err() {
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
}
_ = ping_timer.tick() => {
let cid = timestamp_millis();
let ping_msg = format!(r#"{{"event":"ping","cid":{}}}"#, cid);
*last_ping.lock().await = Instant::now();
if write.send(Message::Text(ping_msg)).await.is_err() {
*status.lock().await = ConnectionStatus::Disconnected;
break;
}
}
}
}
});
}
#[allow(clippy::too_many_arguments)]
async fn handle_message(
text: &str,
event_tx: &mpsc::UnboundedSender<WebSocketResult<StreamEvent>>,
channels: &Arc<Mutex<ChannelMap>>,
pending_subs: &Arc<Mutex<PendingSubscriptions>>,
is_authenticated: &Arc<Mutex<bool>>,
account_type: AccountType,
last_ping: &Arc<Mutex<Instant>>,
ws_ping_rtt_ms: &Arc<Mutex<u64>>,
) -> WebSocketResult<()> {
let value: Value = serde_json::from_str(text)
.map_err(|e| WebSocketError::Parse(format!("Failed to parse message: {}", e)))?;
if value.is_object() {
Self::handle_event_message(&value, channels, pending_subs, is_authenticated, last_ping, ws_ping_rtt_ms).await?;
} else if value.is_array() {
Self::handle_data_message(&value, event_tx, channels, account_type).await?;
}
Ok(())
}
async fn handle_event_message(
value: &Value,
channels: &Arc<Mutex<ChannelMap>>,
pending_subs: &Arc<Mutex<PendingSubscriptions>>,
is_authenticated: &Arc<Mutex<bool>>,
last_ping: &Arc<Mutex<Instant>>,
ws_ping_rtt_ms: &Arc<Mutex<u64>>,
) -> WebSocketResult<()> {
let event = value.get("event")
.and_then(|e| e.as_str())
.unwrap_or("");
match event {
"info" => {
if let Some(code) = value.get("code").and_then(|c| c.as_u64()) {
match code {
20051 => {
return Err(WebSocketError::ConnectionError(
"Bitfinex requested reconnect (code 20051)".to_string()
));
}
20060 => {
return Err(WebSocketError::ConnectionError(
"Bitfinex entering maintenance (code 20060)".to_string()
));
}
20061 => {
return Err(WebSocketError::ConnectionError(
"Bitfinex maintenance ended, reconnect needed (code 20061)".to_string()
));
}
_ => {}
}
}
Ok(())
}
"subscribed" => {
let sub_resp: SubscriptionResponse = serde_json::from_value(value.clone())
.map_err(|e| WebSocketError::Parse(format!("Failed to parse subscribed: {}", e)))?;
let chan_id = sub_resp.chan_id;
let channel = &sub_resp.channel;
let symbol_or_key = sub_resp.symbol.as_deref()
.or(sub_resp.key.as_deref())
.unwrap_or("");
let key = format!("{}:{}", channel, symbol_or_key);
let mut pending = pending_subs.lock().await;
if let Some(request) = pending.remove(&key) {
channels.lock().await.insert(chan_id, request);
}
drop(pending);
Ok(())
}
"unsubscribed" => {
Ok(())
}
"auth" => {
let status = value.get("status")
.and_then(|s| s.as_str())
.unwrap_or("FAILED");
if status == "OK" {
*is_authenticated.lock().await = true;
Ok(())
} else {
Err(WebSocketError::Auth("Authentication failed".to_string()))
}
}
"pong" => {
let rtt = last_ping.lock().await.elapsed().as_millis() as u64;
*ws_ping_rtt_ms.lock().await = rtt;
Ok(())
}
"error" => {
let err_msg: ErrorMessage = serde_json::from_value(value.clone())
.map_err(|e| WebSocketError::Parse(format!("Failed to parse error: {}", e)))?;
Err(WebSocketError::ProtocolError(format!("Code {}: {}", err_msg.code, err_msg.msg)))
}
_ => {
Ok(())
}
}
}
async fn handle_data_message(
value: &Value,
event_tx: &mpsc::UnboundedSender<WebSocketResult<StreamEvent>>,
channels: &Arc<Mutex<ChannelMap>>,
account_type: AccountType,
) -> WebSocketResult<()> {
let arr = value.as_array()
.ok_or_else(|| WebSocketError::Parse("Expected array".to_string()))?;
if arr.is_empty() {
return Ok(());
}
let chan_id = arr[0].as_u64()
.ok_or_else(|| WebSocketError::Parse("Invalid channel ID".to_string()))?;
if arr.len() == 2 && arr[1].as_str() == Some("hb") {
return Ok(()); }
let channels_guard = channels.lock().await;
let subscription = channels_guard.get(&chan_id);
if let Some(sub) = subscription {
if let Some(event) = Self::parse_channel_data(arr, sub, account_type)? {
let _ = event_tx.send(Ok(event));
}
}
drop(channels_guard);
Ok(())
}
fn parse_channel_data(
arr: &[Value],
subscription: &SubscriptionRequest,
_account_type: AccountType,
) -> WebSocketResult<Option<StreamEvent>> {
if arr.len() < 2 {
return Ok(None);
}
match &subscription.stream_type {
StreamType::Ticker => {
if let Some(data) = arr[1].as_array() {
let ticker = BitfinexParser::parse_ws_ticker(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(StreamEvent::Ticker(ticker)))
} else {
Ok(None)
}
}
StreamType::Trade => {
if arr.len() >= 3 && arr[1].as_str() == Some("te") {
if let Some(data) = arr[2].as_array() {
let trade = BitfinexParser::parse_ws_trade(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(StreamEvent::Trade(trade)))
} else {
Ok(None)
}
} else if let Some(data) = arr[1].as_array() {
if let Some(first) = data.first() {
if first.is_array() {
Ok(None) } else {
let trade = BitfinexParser::parse_ws_trade(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(StreamEvent::Trade(trade)))
}
} else {
Ok(None)
}
} else {
Ok(None)
}
}
StreamType::Orderbook | StreamType::OrderbookDelta => {
if let Some(data) = arr[1].as_array() {
let event = BitfinexParser::parse_ws_orderbook_delta(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(event))
} else {
Ok(None)
}
}
StreamType::Kline { .. } => {
if let Some(data) = arr[1].as_array() {
let kline = BitfinexParser::parse_ws_kline(data)
.map_err(|e| WebSocketError::Parse(e.to_string()))?;
Ok(Some(StreamEvent::Kline(kline)))
} else {
Ok(None)
}
}
_ => Ok(None),
}
}
fn build_channel(request: &SubscriptionRequest, account_type: AccountType) -> (String, Option<String>) {
let symbol = format_symbol(&request.symbol.base, &request.symbol.quote, account_type);
match &request.stream_type {
StreamType::Ticker => ("ticker".to_string(), Some(symbol)),
StreamType::Trade => ("trades".to_string(), Some(symbol)),
StreamType::Orderbook | StreamType::OrderbookDelta => {
("book".to_string(), Some(symbol))
}
StreamType::Kline { interval } => {
let key = format!("trade:{}:{}", interval, symbol);
("candles".to_string(), Some(key))
}
_ => ("".to_string(), None),
}
}
}
#[async_trait]
impl WebSocketConnector for BitfinexWebSocket {
async fn connect(&mut self, account_type: AccountType) -> WebSocketResult<()> {
*self.status.lock().await = ConnectionStatus::Connecting;
self.account_type = account_type;
let mut ws_stream = self.connect_ws().await
.map_err(|e| WebSocketError::ConnectionError(e.to_string()))?;
if self.auth.is_some() {
self.authenticate(&mut ws_stream).await
.map_err(|e| WebSocketError::Auth(e.to_string()))?;
}
*self.ws_stream.lock().await = Some(ws_stream);
*self.status.lock().await = ConnectionStatus::Connected;
let (tx, mut rx) = mpsc::unbounded_channel();
*self.event_tx.lock().await = Some(tx.clone());
let (write_cmd_tx, write_cmd_rx) = mpsc::unbounded_channel();
*self.write_tx.lock().await = Some(write_cmd_tx);
Self::start_message_handler(
self.ws_stream.clone(),
tx,
write_cmd_rx,
self.status.clone(),
self.channels.clone(),
self.pending_subs.clone(),
self.is_authenticated.clone(),
account_type,
self.last_ping.clone(),
self.ws_ping_rtt_ms.clone(),
);
let (broadcast_sender, _) = broadcast::channel(1000);
*self.broadcast_tx.lock().unwrap() = Some(broadcast_sender);
let broadcast_tx = self.broadcast_tx.clone();
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
if let Some(tx) = broadcast_tx.lock().unwrap().as_ref() {
let _ = tx.send(event);
}
}
let _ = broadcast_tx.lock().unwrap().take();
});
Ok(())
}
async fn disconnect(&mut self) -> WebSocketResult<()> {
*self.status.lock().await = ConnectionStatus::Disconnected;
*self.ws_stream.lock().await = None;
*self.event_tx.lock().await = None;
*self.write_tx.lock().await = None; let _ = self.broadcast_tx.lock().unwrap().take();
self.subscriptions.lock().await.clear();
self.channels.lock().await.clear();
Ok(())
}
fn connection_status(&self) -> ConnectionStatus {
match self.status.try_lock() {
Ok(status) => *status,
Err(_) => ConnectionStatus::Disconnected,
}
}
async fn subscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
let (channel, symbol_or_key) = Self::build_channel(&request, self.account_type);
if channel.is_empty() {
return Err(WebSocketError::ProtocolError("Unsupported stream type".to_string()));
}
let symbol_or_key_str = symbol_or_key.as_deref().unwrap_or("");
let pending_key = format!("{}:{}", channel, symbol_or_key_str);
self.pending_subs.lock().await.insert(pending_key, request.clone());
let msg = if channel == "candles" {
SubscribeMessage {
event: "subscribe".to_string(),
channel,
symbol: None,
key: symbol_or_key,
}
} else {
SubscribeMessage {
event: "subscribe".to_string(),
channel,
symbol: symbol_or_key,
key: None,
}
};
let msg_json = serde_json::to_string(&msg)
.map_err(|e| WebSocketError::ProtocolError(e.to_string()))?;
let write_tx_guard = self.write_tx.lock().await;
let tx = write_tx_guard.as_ref()
.ok_or_else(|| WebSocketError::ConnectionError("Not connected".to_string()))?;
tx.send(Message::Text(msg_json))
.map_err(|e| WebSocketError::ConnectionError(e.to_string()))?;
drop(write_tx_guard);
self.subscriptions.lock().await.insert(request);
Ok(())
}
async fn unsubscribe(&mut self, request: SubscriptionRequest) -> WebSocketResult<()> {
let channels_guard = self.channels.lock().await;
let chan_id = channels_guard.iter()
.find(|(_, sub)| *sub == &request)
.map(|(id, _)| *id);
drop(channels_guard);
if let Some(chan_id) = chan_id {
let msg = UnsubscribeMessage {
event: "unsubscribe".to_string(),
chan_id,
};
let msg_json = serde_json::to_string(&msg)
.map_err(|e| WebSocketError::ProtocolError(e.to_string()))?;
let write_tx_guard = self.write_tx.lock().await;
let tx = write_tx_guard.as_ref()
.ok_or_else(|| WebSocketError::ConnectionError("Not connected".to_string()))?;
tx.send(Message::Text(msg_json))
.map_err(|e| WebSocketError::ConnectionError(e.to_string()))?;
drop(write_tx_guard);
self.subscriptions.lock().await.remove(&request);
self.channels.lock().await.remove(&chan_id);
}
Ok(())
}
fn event_stream(&self) -> Pin<Box<dyn Stream<Item = WebSocketResult<StreamEvent>> + Send>> {
let rx = self.broadcast_tx.lock().unwrap().as_ref()
.map(|tx| tx.subscribe())
.unwrap_or_else(|| broadcast::channel(1).1);
Box::pin(tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(|result| async move {
match result {
Ok(event) => Some(event),
Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(_)) => {
Some(Err(WebSocketError::ConnectionError("Event stream lagged behind".to_string())))
}
}
}))
}
fn active_subscriptions(&self) -> Vec<SubscriptionRequest> {
match self.subscriptions.try_lock() {
Ok(subs) => subs.iter().cloned().collect(),
Err(_) => Vec::new(),
}
}
fn ping_rtt_handle(&self) -> Option<Arc<Mutex<u64>>> {
Some(self.ws_ping_rtt_ms.clone())
}
}