use crate::account::Balance;
use crate::adapters::ExchangeClient;
use crate::error::Result;
use crate::exchange::ExchangeId;
use crate::fill::Fill;
use crate::instrument::Instrument;
#[cfg(feature = "bitget")]
use crate::market::OrderBookLevel;
use crate::market::{OrderBook, Ticker};
use crate::order::Order;
use crate::position::Position;
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[cfg(feature = "bitget")]
use serde_json::json;
#[cfg(feature = "bitget")]
use bitget_rs::api::websocket::{
BitgetWebsocketChannel, BitgetWebsocketEvent, BitgetWebsocketManager,
};
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum EventChannel {
Ticker,
OrderBook,
Trades,
Orders,
Account,
Positions,
Fills,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EventSubscription {
pub channel: EventChannel,
pub instrument: Option<Instrument>,
pub depth_channel: Option<String>,
}
impl EventSubscription {
pub fn ticker(instrument: Instrument) -> Self {
Self::new(EventChannel::Ticker).with_instrument(instrument)
}
pub fn orderbook(instrument: Instrument) -> Self {
Self::new(EventChannel::OrderBook)
.with_instrument(instrument)
.with_depth_channel("books5")
}
pub fn trades(instrument: Instrument) -> Self {
Self::new(EventChannel::Trades).with_instrument(instrument)
}
pub fn orders() -> Self {
Self::new(EventChannel::Orders)
}
pub fn orders_for_instrument(instrument: Instrument) -> Self {
Self::orders().with_instrument(instrument)
}
pub fn account() -> Self {
Self::new(EventChannel::Account)
}
pub fn positions() -> Self {
Self::new(EventChannel::Positions)
}
pub fn positions_for_instrument(instrument: Instrument) -> Self {
Self::positions().with_instrument(instrument)
}
pub fn fills() -> Self {
Self::new(EventChannel::Fills)
}
pub fn fills_for_instrument(instrument: Instrument) -> Self {
Self::fills().with_instrument(instrument)
}
pub fn with_instrument(mut self, value: Instrument) -> Self {
self.instrument = Some(value);
self
}
pub fn with_depth_channel(mut self, value: impl Into<String>) -> Self {
self.depth_channel = Some(value.into());
self
}
fn new(channel: EventChannel) -> Self {
Self {
channel,
instrument: None,
depth_channel: None,
}
}
#[cfg(feature = "bitget")]
pub(crate) fn bitget_channel(&self) -> &str {
match self.channel {
EventChannel::Ticker => "ticker",
EventChannel::OrderBook => self.depth_channel.as_deref().unwrap_or("books5"),
EventChannel::Trades => "trade",
EventChannel::Orders => "orders",
EventChannel::Account => "account",
EventChannel::Positions => "positions",
EventChannel::Fills => "fill",
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct TradeEvent {
pub exchange: ExchangeId,
pub instrument: Instrument,
pub exchange_symbol: String,
pub trade_id: Option<String>,
pub side: Option<String>,
pub price: String,
pub size: String,
pub timestamp: Option<u64>,
pub raw: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ExchangeEvent {
SubscriptionAck {
exchange: ExchangeId,
subscription: EventSubscription,
raw: Value,
},
UnsubscriptionAck {
exchange: ExchangeId,
subscription: EventSubscription,
raw: Value,
},
Ticker {
exchange: ExchangeId,
subscription: EventSubscription,
items: Vec<Ticker>,
raw: Value,
},
OrderBook {
exchange: ExchangeId,
subscription: EventSubscription,
items: Vec<OrderBook>,
raw: Value,
},
Trades {
exchange: ExchangeId,
subscription: EventSubscription,
items: Vec<TradeEvent>,
raw: Value,
},
Orders {
exchange: ExchangeId,
subscription: EventSubscription,
items: Vec<Order>,
raw: Value,
},
Account {
exchange: ExchangeId,
subscription: EventSubscription,
items: Vec<Balance>,
raw: Value,
},
Positions {
exchange: ExchangeId,
subscription: EventSubscription,
items: Vec<Position>,
raw: Value,
},
Fills {
exchange: ExchangeId,
subscription: EventSubscription,
items: Vec<Fill>,
raw: Value,
},
Error {
exchange: ExchangeId,
code: Option<String>,
message: Option<String>,
raw: Value,
},
Raw {
exchange: ExchangeId,
raw: Value,
},
}
pub struct EventStreamFacade<'a> {
pub(crate) client: &'a ExchangeClient,
}
impl<'a> EventStreamFacade<'a> {
pub(crate) fn new(client: &'a ExchangeClient) -> Self {
Self { client }
}
pub async fn connect(&self, subscriptions: Vec<EventSubscription>) -> Result<EventStream> {
self.client.event_stream(subscriptions).await
}
}
pub struct EventStream {
inner: EventStreamInner,
}
enum EventStreamInner {
#[cfg(feature = "bitget")]
Bitget(BitgetEventStream),
#[cfg(not(feature = "bitget"))]
#[allow(dead_code)]
Unsupported,
}
#[cfg(feature = "bitget")]
struct BitgetEventStream {
manager: BitgetWebsocketManager,
rx: tokio::sync::mpsc::Receiver<BitgetWebsocketEvent>,
}
impl EventStream {
#[cfg(feature = "bitget")]
pub(crate) fn from_bitget(
manager: BitgetWebsocketManager,
rx: tokio::sync::mpsc::Receiver<BitgetWebsocketEvent>,
) -> Self {
Self {
inner: EventStreamInner::Bitget(BitgetEventStream { manager, rx }),
}
}
pub async fn recv(&mut self) -> Result<Option<ExchangeEvent>> {
match &mut self.inner {
#[cfg(feature = "bitget")]
EventStreamInner::Bitget(stream) => {
let Some(event) = stream.rx.recv().await else {
return Ok(None);
};
Ok(Some(map_bitget_event(event)?))
}
#[cfg(not(feature = "bitget"))]
EventStreamInner::Unsupported => Ok(None),
}
}
pub async fn stop(&mut self) -> Result<()> {
match &mut self.inner {
#[cfg(feature = "bitget")]
EventStreamInner::Bitget(stream) => {
stream.manager.stop().await;
Ok(())
}
#[cfg(not(feature = "bitget"))]
EventStreamInner::Unsupported => Ok(()),
}
}
}
#[cfg(feature = "bitget")]
fn map_bitget_event(event: BitgetWebsocketEvent) -> Result<ExchangeEvent> {
let exchange = ExchangeId::Bitget;
match event {
BitgetWebsocketEvent::Subscribed { arg, raw } => Ok(ExchangeEvent::SubscriptionAck {
exchange,
subscription: subscription_from_bitget_arg(&arg),
raw,
}),
BitgetWebsocketEvent::Unsubscribed { arg, raw } => Ok(ExchangeEvent::UnsubscriptionAck {
exchange,
subscription: subscription_from_bitget_arg(&arg),
raw,
}),
BitgetWebsocketEvent::Error { code, msg, raw } => Ok(ExchangeEvent::Error {
exchange,
code,
message: msg,
raw,
}),
BitgetWebsocketEvent::Ticker { arg, data, raw, .. } => {
let subscription = subscription_from_bitget_arg(&arg);
let items = data
.into_iter()
.map(|item| {
let exchange_symbol = item
.inst_id
.clone()
.or_else(|| arg.inst_id.clone())
.unwrap_or_default();
let instrument = instrument_from_linear_symbol(&exchange_symbol);
Ticker {
exchange,
instrument,
exchange_symbol,
last_price: item.last_price.unwrap_or_default(),
bid_price: item.bid_price,
ask_price: item.ask_price,
volume_24h: item.quote_volume.or(item.base_volume),
timestamp: parse_u64(item.timestamp.as_deref()),
raw: raw.clone(),
}
})
.collect();
Ok(ExchangeEvent::Ticker {
exchange,
subscription,
items,
raw,
})
}
BitgetWebsocketEvent::OrderBook { arg, data, raw, .. } => {
let subscription = subscription_from_bitget_arg(&arg);
let exchange_symbol = arg.inst_id.clone().unwrap_or_default();
let instrument = instrument_from_linear_symbol(&exchange_symbol);
let items = data
.into_iter()
.map(|item| OrderBook {
exchange,
instrument: instrument.clone(),
exchange_symbol: exchange_symbol.clone(),
bids: item
.bids
.into_iter()
.map(|level| OrderBookLevel {
raw: json!([level.price, level.size]),
price: level.price,
size: level.size,
})
.collect(),
asks: item
.asks
.into_iter()
.map(|level| OrderBookLevel {
raw: json!([level.price, level.size]),
price: level.price,
size: level.size,
})
.collect(),
timestamp: parse_u64(item.timestamp.as_deref()),
raw: raw.clone(),
})
.collect();
Ok(ExchangeEvent::OrderBook {
exchange,
subscription,
items,
raw,
})
}
BitgetWebsocketEvent::Trades { arg, data, raw, .. } => {
let subscription = subscription_from_bitget_arg(&arg);
let exchange_symbol = arg.inst_id.clone().unwrap_or_default();
let instrument = instrument_from_linear_symbol(&exchange_symbol);
let items = data
.into_iter()
.map(|item| TradeEvent {
exchange,
instrument: instrument.clone(),
exchange_symbol: exchange_symbol.clone(),
trade_id: item.trade_id,
side: item.side,
price: item.price.unwrap_or_default(),
size: item.size.unwrap_or_default(),
timestamp: parse_u64(item.timestamp.as_deref()),
raw: raw.clone(),
})
.collect();
Ok(ExchangeEvent::Trades {
exchange,
subscription,
items,
raw,
})
}
BitgetWebsocketEvent::Orders { arg, data, raw, .. } => {
let subscription = subscription_from_bitget_arg(&arg);
let items = data
.into_iter()
.map(|item| {
let exchange_symbol = item
.inst_id
.clone()
.or_else(|| arg.inst_id.clone())
.unwrap_or_default();
Order {
exchange,
instrument: instrument_from_linear_symbol(&exchange_symbol),
exchange_symbol,
order_id: item.order_id,
client_order_id: item.client_order_id,
side: item.side,
order_type: item.order_type,
price: item.price,
size: item.size,
filled_size: item.filled_size,
average_price: item.average_price,
status: item.status,
created_at: parse_u64(item.create_time.as_deref()),
updated_at: parse_u64(item.update_time.as_deref()),
raw: raw.clone(),
}
})
.collect();
Ok(ExchangeEvent::Orders {
exchange,
subscription,
items,
raw,
})
}
BitgetWebsocketEvent::Account { arg, data, raw, .. } => {
let subscription = subscription_from_bitget_arg(&arg);
let items = data
.into_iter()
.map(|item| Balance {
exchange,
asset: item.margin_coin.unwrap_or_default(),
total: item.equity.or(item.usdt_equity).unwrap_or_default(),
available: item.available.unwrap_or_default(),
frozen: item.frozen,
raw: raw.clone(),
})
.collect();
Ok(ExchangeEvent::Account {
exchange,
subscription,
items,
raw,
})
}
BitgetWebsocketEvent::Positions { arg, data, raw, .. } => {
let subscription = subscription_from_bitget_arg(&arg);
let items = data
.into_iter()
.map(|item| {
let exchange_symbol = item
.inst_id
.clone()
.or_else(|| arg.inst_id.clone())
.unwrap_or_default();
Position {
exchange,
instrument: instrument_from_linear_symbol(&exchange_symbol),
exchange_symbol,
side: item.hold_side,
size: item.total.unwrap_or_default(),
entry_price: item.open_price_avg,
mark_price: item.mark_price,
unrealized_pnl: item.unrealized_pl,
leverage: item.leverage,
margin_mode: item.margin_mode,
liquidation_price: item.liquidation_price,
raw: raw.clone(),
}
})
.collect();
Ok(ExchangeEvent::Positions {
exchange,
subscription,
items,
raw,
})
}
BitgetWebsocketEvent::Fill { arg, data, raw, .. } => {
let subscription = subscription_from_bitget_arg(&arg);
let items = data
.into_iter()
.map(|item| {
let exchange_symbol = item
.symbol
.clone()
.or_else(|| arg.inst_id.clone())
.unwrap_or_default();
let fee = item.fee_detail.first().and_then(|detail| {
detail
.total_fee
.clone()
.or_else(|| detail.total_deduction_fee.clone())
});
let fee_asset = item
.fee_detail
.first()
.and_then(|detail| detail.fee_coin.clone());
Fill {
exchange,
instrument: instrument_from_linear_symbol(&exchange_symbol),
exchange_symbol,
trade_id: item.trade_id,
order_id: item.order_id,
side: item.side,
price: item.price,
size: item.base_volume,
fee,
fee_asset,
role: item.trade_scope,
timestamp: parse_u64(item.create_time.as_deref()),
raw: raw.clone(),
}
})
.collect();
Ok(ExchangeEvent::Fills {
exchange,
subscription,
items,
raw,
})
}
BitgetWebsocketEvent::Pong => Ok(ExchangeEvent::Raw {
exchange,
raw: json!("pong"),
}),
BitgetWebsocketEvent::Raw(raw) | BitgetWebsocketEvent::Data { raw, .. } => {
Ok(ExchangeEvent::Raw { exchange, raw })
}
BitgetWebsocketEvent::Login { raw, .. }
| BitgetWebsocketEvent::Trade { raw, .. }
| BitgetWebsocketEvent::AlgoOrders { raw, .. }
| BitgetWebsocketEvent::Adl { raw, .. }
| BitgetWebsocketEvent::HistoryPositions { raw, .. }
| BitgetWebsocketEvent::Candles { raw, .. } => Ok(ExchangeEvent::Raw { exchange, raw }),
}
}
#[cfg(feature = "bitget")]
fn subscription_from_bitget_arg(arg: &BitgetWebsocketChannel) -> EventSubscription {
let instrument = arg.inst_id.as_deref().and_then(|value| {
if value == "default" {
None
} else {
Some(instrument_from_linear_symbol(value))
}
});
match arg.channel.as_str() {
"ticker" => {
EventSubscription::new(EventChannel::Ticker).with_optional_instrument(instrument)
}
channel if channel.starts_with("books") => EventSubscription::new(EventChannel::OrderBook)
.with_optional_instrument(instrument)
.with_depth_channel(channel),
"trade" => {
EventSubscription::new(EventChannel::Trades).with_optional_instrument(instrument)
}
"orders" => {
EventSubscription::new(EventChannel::Orders).with_optional_instrument(instrument)
}
"account" => EventSubscription::new(EventChannel::Account),
"positions" => {
EventSubscription::new(EventChannel::Positions).with_optional_instrument(instrument)
}
"fill" => EventSubscription::new(EventChannel::Fills).with_optional_instrument(instrument),
_ => EventSubscription::new(EventChannel::Ticker).with_optional_instrument(instrument),
}
}
#[cfg(feature = "bitget")]
impl EventSubscription {
fn with_optional_instrument(mut self, value: Option<Instrument>) -> Self {
self.instrument = value;
self
}
}
#[cfg(feature = "bitget")]
fn instrument_from_linear_symbol(symbol: &str) -> Instrument {
for quote in ["USDT", "USDC", "BUSD", "USD"] {
if let Some(base) = symbol.strip_suffix(quote) {
return Instrument::perp(base, quote);
}
}
Instrument::perp(symbol, "USDT")
}
#[cfg(feature = "bitget")]
fn parse_u64(value: Option<&str>) -> Option<u64> {
value.and_then(|value| value.parse::<u64>().ok())
}