tesser-binance 0.6.0

Binance exchange connector for the Tesser framework
Documentation
use std::{
    str::FromStr,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
};

use binance_sdk::common::{
    config::ConfigurationWebsocketStreams,
    models::WebsocketEvent,
    websocket::{Subscription, WebsocketStream},
};
pub use binance_sdk::derivatives_trading_usds_futures::websocket_streams::UserDataStreamEventsResponse;
use binance_sdk::derivatives_trading_usds_futures::{
    self as binance_futures,
    websocket_streams::{
        self, AggregateTradeStreamsParams, AggregateTradeStreamsResponse,
        KlineCandlestickStreamsParams, KlineCandlestickStreamsResponse, OrderTradeUpdateO,
        PartialBookDepthStreamsParams, PartialBookDepthStreamsResponse,
    },
};
use chrono::Utc;
use rust_decimal::Decimal;
use serde::Serialize;
use tesser_broker::{BrokerError, BrokerInfo, BrokerResult, MarketStream};
use tesser_core::{Candle, Interval, OrderBook, OrderBookLevel, Side, Tick};
use tokio::sync::mpsc::error::TryRecvError;
use tokio::sync::{mpsc, Mutex};

use crate::{parse_decimal_opt, timestamp_from_ms};

#[derive(Clone, Debug, Serialize)]
pub enum BinanceSubscription {
    Trades { symbol: String },
    Kline { symbol: String, interval: Interval },
    OrderBook { symbol: String, depth: usize },
}

pub struct BinanceMarketStream {
    info: BrokerInfo,
    ws: Arc<websocket_streams::WebsocketStreams>,
    tick_tx: mpsc::Sender<Tick>,
    candle_tx: mpsc::Sender<Candle>,
    book_tx: mpsc::Sender<OrderBook>,
    tick_rx: Mutex<mpsc::Receiver<Tick>>,
    candle_rx: Mutex<mpsc::Receiver<Candle>>,
    book_rx: Mutex<mpsc::Receiver<OrderBook>>,
    handles: Vec<StreamHandle>,
    _event_subscription: Option<Subscription>,
}

#[allow(dead_code)]
enum StreamHandle {
    Trade(Arc<WebsocketStream<AggregateTradeStreamsResponse>>),
    Kline(Arc<WebsocketStream<KlineCandlestickStreamsResponse>>),
    Book(Arc<WebsocketStream<PartialBookDepthStreamsResponse>>),
}

impl BinanceMarketStream {
    pub async fn connect(
        ws_url: &str,
        connection_status: Option<Arc<AtomicBool>>,
    ) -> BrokerResult<Self> {
        let cfg = ConfigurationWebsocketStreams::builder()
            .ws_url(ws_url.to_string())
            .build()
            .map_err(|err| BrokerError::Transport(err.to_string()))?;
        let handle = binance_futures::DerivativesTradingUsdsFuturesWsStreams::from_config(cfg);
        let ws = handle
            .connect()
            .await
            .map_err(|err| BrokerError::Transport(err.to_string()))?;
        let ws = Arc::new(ws);
        if let Some(flag) = &connection_status {
            flag.store(true, Ordering::SeqCst);
        }
        let event_subscription = connection_status.clone().map(|flag| {
            ws.subscribe_on_ws_events(move |event| match event {
                WebsocketEvent::Open => flag.store(true, Ordering::SeqCst),
                WebsocketEvent::Close(_, _) | WebsocketEvent::Error(_) => {
                    flag.store(false, Ordering::SeqCst)
                }
                _ => {}
            })
        });
        let (tick_tx, tick_rx) = mpsc::channel(2048);
        let (candle_tx, candle_rx) = mpsc::channel(1024);
        let (book_tx, book_rx) = mpsc::channel(256);
        Ok(Self {
            info: BrokerInfo {
                name: "binance-market".into(),
                markets: vec!["usd_perp".into()],
                supports_testnet: ws_url.contains("testnet"),
            },
            ws,
            tick_tx,
            candle_tx,
            book_tx,
            tick_rx: Mutex::new(tick_rx),
            candle_rx: Mutex::new(candle_rx),
            book_rx: Mutex::new(book_rx),
            handles: Vec::new(),
            _event_subscription: event_subscription,
        })
    }

    async fn subscribe_trades(&mut self, symbol: String) -> BrokerResult<()> {
        let params = AggregateTradeStreamsParams::builder(symbol.to_lowercase())
            .build()
            .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
        let stream = self
            .ws
            .aggregate_trade_streams(params)
            .await
            .map_err(|err| BrokerError::Transport(err.to_string()))?;
        let tx = self.tick_tx.clone();
        stream.on_message(move |payload: AggregateTradeStreamsResponse| {
            if let Some(tick) = convert_trade(&payload) {
                let _ = tx.try_send(tick);
            }
        });
        self.handles.push(StreamHandle::Trade(stream));
        Ok(())
    }

    async fn subscribe_kline(&mut self, symbol: String, interval: Interval) -> BrokerResult<()> {
        let params =
            KlineCandlestickStreamsParams::builder(symbol.to_lowercase(), interval_label(interval))
                .build()
                .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
        let stream = self
            .ws
            .kline_candlestick_streams(params)
            .await
            .map_err(|err| BrokerError::Transport(err.to_string()))?;
        let tx = self.candle_tx.clone();
        stream.on_message(move |payload: KlineCandlestickStreamsResponse| {
            if let Some(candle) = convert_kline(&payload) {
                let _ = tx.try_send(candle);
            }
        });
        self.handles.push(StreamHandle::Kline(stream));
        Ok(())
    }

    async fn subscribe_order_book(&mut self, symbol: String, depth: usize) -> BrokerResult<()> {
        let clamped_depth = match depth {
            d if d <= 5 => 5,
            d if d <= 10 => 10,
            _ => 20,
        } as i64;
        let params = PartialBookDepthStreamsParams::builder(symbol.to_lowercase(), clamped_depth)
            .update_speed(Some("100ms".to_string()))
            .build()
            .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
        let stream = self
            .ws
            .partial_book_depth_streams(params)
            .await
            .map_err(|err| BrokerError::Transport(err.to_string()))?;
        let tx = self.book_tx.clone();
        stream.on_message(move |payload: PartialBookDepthStreamsResponse| {
            if let Some(book) = convert_book(&payload) {
                let _ = tx.try_send(book);
            }
        });
        self.handles.push(StreamHandle::Book(stream));
        Ok(())
    }
}

#[async_trait::async_trait]
impl MarketStream for BinanceMarketStream {
    type Subscription = BinanceSubscription;

    fn name(&self) -> &str {
        &self.info.name
    }

    fn info(&self) -> Option<&BrokerInfo> {
        Some(&self.info)
    }

    async fn subscribe(&mut self, subscription: Self::Subscription) -> BrokerResult<()> {
        match subscription {
            BinanceSubscription::Trades { symbol } => self.subscribe_trades(symbol).await,
            BinanceSubscription::Kline { symbol, interval } => {
                self.subscribe_kline(symbol, interval).await
            }
            BinanceSubscription::OrderBook { symbol, depth } => {
                self.subscribe_order_book(symbol, depth).await
            }
        }
    }

    async fn next_tick(&mut self) -> BrokerResult<Option<Tick>> {
        let mut rx = self.tick_rx.lock().await;
        match rx.try_recv() {
            Ok(tick) => Ok(Some(tick)),
            Err(TryRecvError::Empty) => Ok(None),
            Err(TryRecvError::Disconnected) => Ok(None),
        }
    }

    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
        let mut rx = self.candle_rx.lock().await;
        match rx.try_recv() {
            Ok(candle) => Ok(Some(candle)),
            Err(TryRecvError::Empty) => Ok(None),
            Err(TryRecvError::Disconnected) => Ok(None),
        }
    }

    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
        let mut rx = self.book_rx.lock().await;
        match rx.try_recv() {
            Ok(book) => Ok(Some(book)),
            Err(TryRecvError::Empty) => Ok(None),
            Err(TryRecvError::Disconnected) => Ok(None),
        }
    }
}

fn convert_trade(payload: &AggregateTradeStreamsResponse) -> Option<Tick> {
    let symbol = payload.s.clone()?;
    let price = parse_decimal_opt(payload.p.as_deref())?;
    let quantity = parse_decimal_opt(payload.q.as_deref())?;
    let side = match payload.m.unwrap_or(false) {
        true => Side::Sell,
        false => Side::Buy,
    };
    Some(Tick {
        symbol,
        price,
        size: quantity,
        side,
        exchange_timestamp: timestamp_from_ms(payload.t_uppercase),
        received_at: Utc::now(),
    })
}

fn convert_kline(payload: &KlineCandlestickStreamsResponse) -> Option<Candle> {
    let kline = payload.k.as_ref()?;
    let symbol = kline.s.clone()?;
    let interval = Interval::from_str(kline.i.as_deref().unwrap_or("1m")).ok()?;
    let open = parse_decimal_opt(kline.o.as_deref())?;
    let high = parse_decimal_opt(kline.h.as_deref())?;
    let low = parse_decimal_opt(kline.l.as_deref())?;
    let close = parse_decimal_opt(kline.c.as_deref())?;
    let volume = parse_decimal_opt(kline.v.as_deref()).unwrap_or(Decimal::ZERO);
    Some(Candle {
        symbol,
        interval,
        open,
        high,
        low,
        close,
        volume,
        timestamp: timestamp_from_ms(kline.t),
    })
}

fn convert_book(payload: &PartialBookDepthStreamsResponse) -> Option<OrderBook> {
    let symbol = payload.s.clone()?;
    let bids = parse_levels(payload.b.as_ref()?);
    let asks = parse_levels(payload.a.as_ref()?);
    Some(OrderBook {
        symbol,
        bids,
        asks,
        timestamp: timestamp_from_ms(payload.t_uppercase),
    })
}

fn parse_levels(levels: &[Vec<String>]) -> Vec<OrderBookLevel> {
    levels
        .iter()
        .filter_map(|level| {
            let mut values = level.iter();
            let price = values.next()?.parse::<Decimal>().ok()?;
            let size = values.next()?.parse::<Decimal>().ok()?;
            Some(OrderBookLevel { price, size })
        })
        .collect()
}

pub struct BinanceUserDataStream {
    ws: Arc<websocket_streams::WebsocketStreams>,
    stream: Arc<WebsocketStream<UserDataStreamEventsResponse>>,
}

impl BinanceUserDataStream {
    pub async fn connect(ws_url: &str, listen_key: &str) -> BrokerResult<Self> {
        let cfg = ConfigurationWebsocketStreams::builder()
            .ws_url(ws_url.to_string())
            .build()
            .map_err(|err| BrokerError::Transport(err.to_string()))?;
        let handle = binance_futures::DerivativesTradingUsdsFuturesWsStreams::from_config(cfg);
        let ws = handle
            .connect()
            .await
            .map_err(|err| BrokerError::Transport(err.to_string()))?;
        let ws = Arc::new(ws);
        let stream = ws
            .user_data(listen_key.to_string(), None)
            .await
            .map_err(|err| BrokerError::Transport(err.to_string()))?;
        Ok(Self { ws, stream })
    }

    pub fn on_event<F>(&self, callback: F)
    where
        F: Fn(UserDataStreamEventsResponse) + Send + Sync + 'static,
    {
        self.stream.on_message(callback);
    }

    pub async fn unsubscribe(&self) {
        self.stream.unsubscribe().await;
        let _ = self.ws.disconnect().await;
    }
}

pub fn extract_order_update(event: &UserDataStreamEventsResponse) -> Option<&OrderTradeUpdateO> {
    match event {
        UserDataStreamEventsResponse::OrderTradeUpdate(payload) => payload.o.as_deref(),
        _ => None,
    }
}

fn interval_label(interval: Interval) -> String {
    interval.to_binance().to_string()
}