use std::cell::Cell;
use std::collections::HashMap;
use std::sync::{Arc, Mutex as StdMutex, OnceLock};
use std::time::Duration;
use serde_json::{json, Value};
use url::Url;
use crate::core::rt::WsFrame;
use crate::core::traits::Credentials;
use crate::core::types::{
AccountType, Kline, OrderbookDelta as OrderbookDeltaData, StreamEvent, TradeSide,
WebSocketError, WebSocketResult,
};
use crate::core::websocket::{KlineInterval, StreamKind, StreamSpec, TopicKey, TopicRegistry, WsProtocol};
use super::endpoints::format_symbol;
use super::parser::BitfinexParser;
const WS_PUBLIC_URL: &str = "wss://api-pub.bitfinex.com/ws/2";
thread_local! {
static CURRENT_SYMBOL: Cell<Option<String>> = const { Cell::new(None) };
static CURRENT_INTERVAL: Cell<Option<String>> = const { Cell::new(None) };
}
fn set_current_symbol(sym: impl Into<String>) {
CURRENT_SYMBOL.with(|c| c.set(Some(sym.into())));
}
fn take_current_symbol() -> String {
CURRENT_SYMBOL.with(|c| c.take()).unwrap_or_default()
}
fn set_current_interval(interval: impl Into<String>) {
CURRENT_INTERVAL.with(|c| c.set(Some(interval.into())));
}
fn take_current_interval() -> String {
CURRENT_INTERVAL.with(|c| c.take()).unwrap_or_default()
}
static REGISTRY: OnceLock<TopicRegistry> = OnceLock::new();
pub struct BitfinexProtocol {
chan_map: Arc<StdMutex<HashMap<u64, TopicKey>>>,
}
impl BitfinexProtocol {
pub fn new(_testnet: bool) -> Self {
Self {
chan_map: Arc::new(StdMutex::new(HashMap::new())),
}
}
fn wire_symbol(spec: &StreamSpec) -> String {
let raw = spec.symbol.to_string();
if raw.starts_with('t') || raw.starts_with('f') {
return raw;
}
if raw.contains('/') {
let mut parts = raw.splitn(2, '/');
let base = parts.next().unwrap_or("");
let quote = parts.next().unwrap_or("USD");
return format_symbol(base, quote, AccountType::Spot);
}
format!("t{}", raw)
}
fn topic_from_ack(channel: &str, symbol: Option<&str>, key: Option<&str>) -> Option<TopicKey> {
match channel {
"ticker" | "trades" | "book" => {
let sym = symbol?;
Some(TopicKey::new(format!("{}:{}", channel, sym)))
}
"candles" => {
let k = key?;
let stripped = k.strip_prefix("trade:").unwrap_or(k);
Some(TopicKey::new(format!("candles:{}", stripped)))
}
"status" => {
let k = key?;
Some(TopicKey::new(format!("status:{}", k)))
}
_ => None,
}
}
fn symbol_from_topic(key: &TopicKey) -> &str {
key.as_str()
.rsplit(':')
.next()
.unwrap_or("")
}
fn interval_from_candles_topic(key: &TopicKey) -> &str {
let s = key.as_str();
let mut parts = s.splitn(3, ':');
parts.next(); parts.next().unwrap_or("") }
}
impl WsProtocol for BitfinexProtocol {
fn name(&self) -> &'static str {
"bitfinex"
}
fn endpoint(&self, _account_type: AccountType, _testnet: bool) -> Url {
Url::parse(WS_PUBLIC_URL).expect("bitfinex ws endpoint is valid")
}
fn ping_frame(&self) -> Option<WsFrame> {
Some(WsFrame::Text(r#"{"event":"ping","cid":0}"#.to_string()))
}
fn ping_interval(&self) -> Duration {
Duration::from_secs(20)
}
fn subscribe_frame(&self, spec: &StreamSpec) -> Result<WsFrame, WebSocketError> {
let sym = Self::wire_symbol(spec);
let frame = match &spec.kind {
StreamKind::Ticker => json!({
"event": "subscribe",
"channel": "ticker",
"symbol": sym,
}),
StreamKind::Trade => json!({
"event": "subscribe",
"channel": "trades",
"symbol": sym,
}),
StreamKind::Orderbook | StreamKind::OrderbookDelta => json!({
"event": "subscribe",
"channel": "book",
"symbol": sym,
"prec": "P0",
}),
StreamKind::Kline { interval } => {
let key = format!("trade:{}:{}", interval.as_str(), sym);
json!({
"event": "subscribe",
"channel": "candles",
"key": key,
})
}
StreamKind::Liquidation => json!({
"event": "subscribe",
"channel": "status",
"key": "liq:global",
}),
other => {
return Err(WebSocketError::NotSupported(format!(
"Bitfinex public WS has no channel for {:?}",
other
)))
}
};
Ok(WsFrame::Text(frame.to_string()))
}
fn unsubscribe_frame(&self, spec: &StreamSpec) -> Result<WsFrame, WebSocketError> {
let sym = Self::wire_symbol(spec);
let topic = match &spec.kind {
StreamKind::Ticker => format!("ticker:{}", sym),
StreamKind::Trade => format!("trades:{}", sym),
StreamKind::Orderbook | StreamKind::OrderbookDelta => format!("book:{}", sym),
StreamKind::Kline { interval } => {
format!("candles:{}:{}", interval.as_str(), sym)
}
StreamKind::Liquidation => "status:liq:global".to_string(),
other => {
return Err(WebSocketError::NotSupported(format!(
"Bitfinex public WS has no channel for {:?}",
other
)))
}
};
let topic_key = TopicKey::new(&topic);
let chan_map = self.chan_map.lock().expect("bitfinex chan_map poisoned");
let chan_id = chan_map
.iter()
.find(|(_, v)| **v == topic_key)
.map(|(k, _)| *k);
drop(chan_map);
match chan_id {
Some(id) => Ok(WsFrame::Text(json!({"event":"unsubscribe","chanId": id}).to_string())),
None => Err(WebSocketError::NotSupported(format!(
"bitfinex: cannot unsubscribe from {} — chanId not yet known (ack pending?)",
topic
))),
}
}
fn auth_frame(&self, _credentials: &Credentials) -> Option<Result<WsFrame, WebSocketError>> {
None
}
fn is_pong(&self, raw: &Value) -> bool {
raw.get("event").and_then(|v| v.as_str()) == Some("pong")
}
fn is_subscribe_ack(&self, raw: &Value) -> bool {
let event = match raw.get("event").and_then(|v| v.as_str()) {
Some(e) => e,
None => return false,
};
match event {
"subscribed" => {
if let Some(chan_id) = raw.get("chanId").and_then(|v| v.as_u64()) {
let channel = raw.get("channel").and_then(|v| v.as_str()).unwrap_or("");
let symbol = raw.get("symbol").and_then(|v| v.as_str());
let key = raw.get("key").and_then(|v| v.as_str());
if let Some(topic) = Self::topic_from_ack(channel, symbol, key) {
let mut map = self.chan_map.lock().expect("bitfinex chan_map poisoned");
map.insert(chan_id, topic);
}
}
true
}
"unsubscribed" => {
if let Some(chan_id) = raw.get("chanId").and_then(|v| v.as_u64()) {
let mut map = self.chan_map.lock().expect("bitfinex chan_map poisoned");
map.remove(&chan_id);
}
true
}
"info" => true,
"error" => false,
_ => false,
}
}
fn is_server_ping(&self, _raw: &Value) -> bool {
false
}
fn extract_topic(&self, raw: &Value) -> Option<TopicKey> {
let arr = raw.as_array()?;
if arr.is_empty() {
return None;
}
let chan_id = arr[0].as_u64()?;
if arr.len() >= 2 && arr[1].as_str() == Some("hb") {
return None;
}
let map = self.chan_map.lock().expect("bitfinex chan_map poisoned");
let topic = map.get(&chan_id).cloned()?;
drop(map);
let sym = Self::symbol_from_topic(&topic).to_string();
set_current_symbol(sym);
if topic.as_str().starts_with("candles:") {
let interval = Self::interval_from_candles_topic(&topic).to_string();
set_current_interval(interval);
}
Some(topic)
}
fn topic_registry(&self, _account_type: AccountType) -> &TopicRegistry {
REGISTRY.get_or_init(build_registry)
}
}
fn build_registry() -> TopicRegistry {
let at = AccountType::Spot;
TopicRegistry::builder()
.register(StreamKind::Ticker, at, "ticker:*", parse_ticker_frame)
.register(StreamKind::Trade, at, "trades:*", parse_trade_frame)
.register(StreamKind::Orderbook, at, "book:*", parse_book_frame)
.register(StreamKind::OrderbookDelta, at, "book:*", parse_book_frame)
.register(
StreamKind::Kline { interval: KlineInterval::new("") },
at,
"candles:*",
parse_candle_frame,
)
.register(StreamKind::Liquidation, at, "status:*", parse_liq_frame)
.build()
}
pub(crate) fn parse_ticker_frame(raw: &Value) -> WebSocketResult<StreamEvent> {
let arr = raw
.as_array()
.ok_or_else(|| WebSocketError::Parse("bitfinex ticker: expected array".into()))?;
if arr.len() < 2 {
return Err(WebSocketError::Parse("bitfinex ticker: array too short".into()));
}
let data = arr[1]
.as_array()
.ok_or_else(|| WebSocketError::Parse("bitfinex ticker: data[1] not array".into()))?;
let symbol = take_current_symbol();
let ticker = BitfinexParser::parse_ws_ticker(data)
.map_err(|e| WebSocketError::Parse(format!("bitfinex ticker: {}", e)))?;
Ok(StreamEvent::Ticker { symbol, ticker })
}
pub(crate) fn parse_trade_frame(raw: &Value) -> WebSocketResult<StreamEvent> {
let arr = raw
.as_array()
.ok_or_else(|| WebSocketError::Parse("bitfinex trade: expected array".into()))?;
if arr.len() < 2 {
return Err(WebSocketError::FieldAbsent("trade data".into()));
}
let symbol = take_current_symbol();
if arr.len() >= 3 && arr[1].as_str() == Some("te") {
let data = arr[2]
.as_array()
.ok_or_else(|| WebSocketError::FieldAbsent("te trade data array".into()))?;
let trade = BitfinexParser::parse_ws_trade(data)
.map_err(|e| WebSocketError::Parse(format!("bitfinex trade te: {}", e)))?;
return Ok(StreamEvent::Trade { symbol, trade });
}
if arr.len() >= 2 && arr[1].as_str() == Some("tu") {
return Err(WebSocketError::FieldAbsent("trade: tu suppressed (duplicate of te)".into()));
}
if let Some(data) = arr[1].as_array() {
if data.first().map(|v| v.is_array()).unwrap_or(false) {
if let Some(inner) = data.first().and_then(|v| v.as_array()) {
let trade = BitfinexParser::parse_ws_trade(inner)
.map_err(|e| WebSocketError::Parse(format!("bitfinex trade snapshot: {}", e)))?;
return Ok(StreamEvent::Trade { symbol, trade });
}
return Err(WebSocketError::FieldAbsent("trade snapshot inner".into()));
}
let trade = BitfinexParser::parse_ws_trade(data)
.map_err(|e| WebSocketError::Parse(format!("bitfinex trade single: {}", e)))?;
return Ok(StreamEvent::Trade { symbol, trade });
}
Err(WebSocketError::FieldAbsent("trade data".into()))
}
pub(crate) fn parse_book_frame(raw: &Value) -> WebSocketResult<StreamEvent> {
let arr = raw
.as_array()
.ok_or_else(|| WebSocketError::Parse("bitfinex book: expected array".into()))?;
if arr.len() < 2 {
return Err(WebSocketError::FieldAbsent("book data".into()));
}
let symbol = take_current_symbol();
let data = arr[1]
.as_array()
.ok_or_else(|| WebSocketError::FieldAbsent("book data[1] not array".into()))?;
let delta: OrderbookDeltaData =
BitfinexParser::parse_ws_orderbook_delta(data)
.map_err(|e| WebSocketError::Parse(format!("bitfinex book: {}", e)))?;
if delta.bids.is_empty() && delta.asks.is_empty() {
return Err(WebSocketError::FieldAbsent(
"bitfinex book: pure-remove delta suppressed".into(),
));
}
Ok(StreamEvent::OrderbookDelta { symbol, delta })
}
pub(crate) fn parse_candle_frame(raw: &Value) -> WebSocketResult<StreamEvent> {
let arr = raw
.as_array()
.ok_or_else(|| WebSocketError::Parse("bitfinex candle: expected array".into()))?;
if arr.len() < 2 {
return Err(WebSocketError::FieldAbsent("candle data".into()));
}
let symbol = take_current_symbol();
let interval_str = take_current_interval();
let interval = KlineInterval::new(&interval_str);
let data = arr[1]
.as_array()
.ok_or_else(|| WebSocketError::FieldAbsent("candle data[1] not array".into()))?;
let kline_data: &[Value] = if data.first().map(|v| v.is_array()).unwrap_or(false) {
match data.first().and_then(|v| v.as_array()) {
Some(inner) => inner,
None => return Err(WebSocketError::FieldAbsent("candle snapshot inner".into())),
}
} else {
data
};
let kline: Kline = BitfinexParser::parse_ws_kline(kline_data)
.map_err(|e| WebSocketError::Parse(format!("bitfinex candle: {}", e)))?;
Ok(StreamEvent::Kline { symbol, interval, kline })
}
pub(crate) fn parse_liq_frame(raw: &Value) -> WebSocketResult<StreamEvent> {
let arr = raw
.as_array()
.ok_or_else(|| WebSocketError::Parse("bitfinex liq: expected array".into()))?;
if arr.len() < 2 {
return Err(WebSocketError::FieldAbsent("liq data".into()));
}
let outer = arr[1]
.as_array()
.ok_or_else(|| WebSocketError::FieldAbsent("liq outer data not array".into()))?;
if outer.is_empty() {
return Err(WebSocketError::FieldAbsent("liq: empty entries".into()));
}
let entry: &[Value] = if outer[0].is_array() {
outer[0]
.as_array()
.ok_or_else(|| WebSocketError::FieldAbsent("liq inner entry not array".into()))?
} else {
outer
};
if entry.len() < 12 {
return Err(WebSocketError::Parse(format!(
"bitfinex liq: entry too short ({} fields, need 12)",
entry.len()
)));
}
let symbol = entry[4]
.as_str()
.ok_or_else(|| WebSocketError::FieldAbsent("liq entry[4] symbol".into()))?
.to_string();
let amount = entry[5]
.as_f64()
.ok_or_else(|| WebSocketError::FieldAbsent("liq entry[5] amount".into()))?;
let side = if amount >= 0.0 {
TradeSide::Buy } else {
TradeSide::Sell };
let price = entry[11]
.as_f64()
.ok_or_else(|| WebSocketError::FieldAbsent("liq entry[11] liquidation_price".into()))?;
let base_price = entry[6].as_f64();
let timestamp = entry[2].as_i64().unwrap_or(0);
let quantity = amount.abs();
let value = base_price.map(|bp| bp * quantity);
Ok(StreamEvent::Liquidation {
symbol,
side,
price,
quantity,
timestamp,
value,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::types::OwnedSymbolInput;
use crate::core::websocket::StreamSpec;
fn make_proto() -> BitfinexProtocol {
BitfinexProtocol::new(false)
}
fn make_spec(kind: StreamKind, sym: &str) -> StreamSpec {
StreamSpec {
kind,
symbol: OwnedSymbolInput::Raw(sym.to_string()),
account_type: AccountType::Spot,
depth: None,
speed_ms: None,
}
}
#[test]
fn endpoint_returns_public_url() {
let proto = make_proto();
let url = proto.endpoint(AccountType::Spot, false);
assert_eq!(url.as_str(), "wss://api-pub.bitfinex.com/ws/2");
}
#[test]
fn ping_frame_is_json_event_ping() {
let proto = make_proto();
let frame = proto.ping_frame().expect("ping_frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["event"], "ping");
} else {
panic!("expected Text frame");
}
}
#[test]
fn ping_interval_is_20_seconds() {
let proto = make_proto();
assert_eq!(proto.ping_interval(), Duration::from_secs(20));
}
#[test]
fn is_pong_true_for_pong_event() {
let proto = make_proto();
let raw = serde_json::json!({"event": "pong", "ts": 12345, "cid": 0});
assert!(proto.is_pong(&raw));
}
#[test]
fn is_pong_false_for_data_array() {
let proto = make_proto();
let raw = serde_json::json!([17, [50000.0, 1.0, 50001.0, 1.0, 0.0, 0.0, 50000.0, 100.0, 51000.0, 49000.0]]);
assert!(!proto.is_pong(&raw));
}
#[test]
fn is_subscribe_ack_returns_true_for_subscribed() {
let proto = make_proto();
let raw = serde_json::json!({
"event": "subscribed",
"channel": "ticker",
"chanId": 17,
"symbol": "tBTCUSD",
});
assert!(proto.is_subscribe_ack(&raw));
}
#[test]
fn is_subscribe_ack_populates_chan_map() {
let proto = make_proto();
let raw = serde_json::json!({
"event": "subscribed",
"channel": "ticker",
"chanId": 17,
"symbol": "tBTCUSD",
});
proto.is_subscribe_ack(&raw);
let map = proto.chan_map.lock().unwrap();
assert_eq!(map.get(&17), Some(&TopicKey::new("ticker:tBTCUSD")));
}
#[test]
fn is_subscribe_ack_candles_maps_key() {
let proto = make_proto();
let raw = serde_json::json!({
"event": "subscribed",
"channel": "candles",
"chanId": 42,
"key": "trade:1m:tBTCUSD",
});
proto.is_subscribe_ack(&raw);
let map = proto.chan_map.lock().unwrap();
assert_eq!(map.get(&42), Some(&TopicKey::new("candles:1m:tBTCUSD")));
}
#[test]
fn is_subscribe_ack_unsubscribed_removes_entry() {
let proto = make_proto();
let sub = serde_json::json!({
"event": "subscribed",
"channel": "ticker",
"chanId": 17,
"symbol": "tBTCUSD",
});
proto.is_subscribe_ack(&sub);
let unsub = serde_json::json!({
"event": "unsubscribed",
"chanId": 17,
"status": "OK",
});
proto.is_subscribe_ack(&unsub);
let map = proto.chan_map.lock().unwrap();
assert!(!map.contains_key(&17));
}
#[test]
fn is_subscribe_ack_true_for_info() {
let proto = make_proto();
let raw = serde_json::json!({"event": "info", "version": 2});
assert!(proto.is_subscribe_ack(&raw));
}
#[test]
fn extract_topic_returns_topic_for_known_chan() {
let proto = make_proto();
let ack = serde_json::json!({
"event": "subscribed",
"channel": "ticker",
"chanId": 17,
"symbol": "tBTCUSD",
});
proto.is_subscribe_ack(&ack);
let data = serde_json::json!([17, [50000.0, 1.0, 50001.0, 1.0, 0.0, 0.0, 50000.0, 100.0, 51000.0, 49000.0]]);
assert_eq!(
proto.extract_topic(&data),
Some(TopicKey::new("ticker:tBTCUSD"))
);
}
#[test]
fn extract_topic_sets_thread_local_symbol() {
let proto = make_proto();
let ack = serde_json::json!({
"event": "subscribed",
"channel": "ticker",
"chanId": 17,
"symbol": "tBTCUSD",
});
proto.is_subscribe_ack(&ack);
let data = serde_json::json!([17, [50000.0, 1.0, 50001.0, 1.0, 0.0, 0.0, 50000.0, 100.0, 51000.0, 49000.0]]);
proto.extract_topic(&data);
let sym = take_current_symbol();
assert_eq!(sym, "tBTCUSD");
}
#[test]
fn extract_topic_returns_none_for_heartbeat() {
let proto = make_proto();
let ack = serde_json::json!({
"event": "subscribed",
"channel": "ticker",
"chanId": 17,
"symbol": "tBTCUSD",
});
proto.is_subscribe_ack(&ack);
let hb = serde_json::json!([17, "hb"]);
assert_eq!(proto.extract_topic(&hb), None);
}
#[test]
fn extract_topic_returns_none_for_unknown_chan() {
let proto = make_proto();
let data = serde_json::json!([999, [50000.0]]);
assert_eq!(proto.extract_topic(&data), None);
}
#[test]
fn extract_topic_returns_none_for_object_frame() {
let proto = make_proto();
let raw = serde_json::json!({"event": "subscribed", "chanId": 17});
assert_eq!(proto.extract_topic(&raw), None);
}
#[test]
fn subscribe_frame_ticker() {
let proto = make_proto();
let spec = make_spec(StreamKind::Ticker, "tBTCUSD");
let frame = proto.subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["event"], "subscribe");
assert_eq!(v["channel"], "ticker");
assert_eq!(v["symbol"], "tBTCUSD");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_trade() {
let proto = make_proto();
let spec = make_spec(StreamKind::Trade, "tBTCUSD");
let frame = proto.subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["channel"], "trades");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_orderbook() {
let proto = make_proto();
let spec = make_spec(StreamKind::Orderbook, "tBTCUSD");
let frame = proto.subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["channel"], "book");
assert_eq!(v["symbol"], "tBTCUSD");
assert_eq!(v["prec"], "P0");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_kline() {
let proto = make_proto();
let spec = make_spec(
StreamKind::Kline { interval: KlineInterval::new("1m") },
"tBTCUSD",
);
let frame = proto.subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["channel"], "candles");
assert_eq!(v["key"], "trade:1m:tBTCUSD");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_plain_symbol_gets_t_prefix() {
let proto = make_proto();
let spec = make_spec(StreamKind::Ticker, "BTCUSD");
let frame = proto.subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["symbol"], "tBTCUSD");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_liquidation_emits_status_liq_global() {
let proto = make_proto();
let spec = make_spec(StreamKind::Liquidation, "tBTCUSD");
let frame = proto.subscribe_frame(&spec).expect("subscribe frame");
if let WsFrame::Text(s) = frame {
let v: Value = serde_json::from_str(&s).expect("valid json");
assert_eq!(v["event"], "subscribe");
assert_eq!(v["channel"], "status");
assert_eq!(v["key"], "liq:global");
} else {
panic!("expected Text frame");
}
}
#[test]
fn subscribe_frame_truly_absent_returns_not_supported() {
let proto = make_proto();
let spec = make_spec(StreamKind::OpenInterest, "tBTCUSD");
assert!(matches!(
proto.subscribe_frame(&spec),
Err(WebSocketError::NotSupported(_))
));
}
#[test]
fn is_subscribe_ack_status_liq_global_maps_chan_id() {
let proto = make_proto();
let ack = serde_json::json!({
"event": "subscribed",
"channel": "status",
"chanId": 5410,
"key": "liq:global"
});
proto.is_subscribe_ack(&ack);
let map = proto.chan_map.lock().unwrap();
assert_eq!(map.get(&5410), Some(&TopicKey::new("status:liq:global")));
}
#[test]
fn extract_topic_status_liq_global() {
let proto = make_proto();
let ack = serde_json::json!({
"event": "subscribed",
"channel": "status",
"chanId": 5410,
"key": "liq:global"
});
proto.is_subscribe_ack(&ack);
let data = serde_json::json!([5410, [["pos", 191941265, 1779998542354i64, null, "tETHF0:USTF0", 19.0, 2027.1, null, 1, 1, null, 2013.5]]]);
assert_eq!(
proto.extract_topic(&data),
Some(TopicKey::new("status:liq:global"))
);
}
#[test]
fn parse_liq_frame_live_format() {
let raw = serde_json::json!([
5410,
[["pos", 191941265, 1779998542354i64, null, "tETHF0:USTF0", 19.00019049, 2027.1, null, 1, 1, null, 2013.5]]
]);
let event = parse_liq_frame(&raw).expect("should parse");
if let StreamEvent::Liquidation { symbol, side, price, quantity, timestamp, value } = event {
assert_eq!(symbol, "tETHF0:USTF0");
assert_eq!(side, TradeSide::Buy); assert!((price - 2013.5).abs() < 1e-6);
assert!((quantity - 19.00019049).abs() < 1e-6);
assert_eq!(timestamp, 1779998542354);
assert!(value.is_some());
} else {
panic!("expected Liquidation event");
}
}
#[test]
fn parse_liq_frame_negative_amount_is_sell() {
let raw = serde_json::json!([
5410,
[["pos", 12345, 1780000000000i64, null, "tBTCUSD", -0.5, 95000.0, null, 1, 1, null, 94500.0]]
]);
let event = parse_liq_frame(&raw).expect("should parse");
if let StreamEvent::Liquidation { side, quantity, price, .. } = event {
assert_eq!(side, TradeSide::Sell);
assert!((quantity - 0.5).abs() < 1e-9);
assert!((price - 94500.0).abs() < 1e-6);
} else {
panic!("expected Liquidation event");
}
}
#[test]
fn parse_liq_frame_short_entry_returns_err() {
let raw = serde_json::json!([5410, [["pos", 1, 2, null, "tBTCUSD", 1.0]]]);
assert!(parse_liq_frame(&raw).is_err());
}
#[test]
fn topic_registry_covers_public_channels() {
let proto = make_proto();
let reg = proto.topic_registry(AccountType::Spot);
let at = AccountType::Spot;
assert!(reg.supports(&StreamKind::Ticker, at), "Ticker");
assert!(reg.supports(&StreamKind::Trade, at), "Trade");
assert!(reg.supports(&StreamKind::Orderbook, at), "Orderbook");
assert!(reg.supports(&StreamKind::OrderbookDelta, at), "OrderbookDelta");
assert!(
reg.supports(&StreamKind::Kline { interval: KlineInterval::new("") }, at),
"Kline"
);
assert!(reg.supports(&StreamKind::Liquidation, at), "Liquidation");
}
#[test]
fn topic_registry_wildcard_matches_per_symbol_keys() {
let proto = make_proto();
let reg = proto.topic_registry(AccountType::Spot);
assert!(
reg.dispatch(&TopicKey::new("ticker:tBTCUSD")).is_some(),
"ticker:tBTCUSD"
);
assert!(
reg.dispatch(&TopicKey::new("trades:tETHUSD")).is_some(),
"trades:tETHUSD"
);
assert!(
reg.dispatch(&TopicKey::new("book:tBTCUSD")).is_some(),
"book:tBTCUSD"
);
assert!(
reg.dispatch(&TopicKey::new("candles:1m:tBTCUSD")).is_some(),
"candles:1m:tBTCUSD"
);
assert!(
reg.dispatch(&TopicKey::new("status:liq:global")).is_some(),
"status:liq:global"
);
}
}