Skip to main content

tesser_binance/
lib.rs

1use async_trait::async_trait;
2use binance_sdk::{
3    common::{config::ConfigurationRestApi, errors::ConnectorError, models::RestApiResponse},
4    derivatives_trading_usds_futures::{
5        self as binance_futures,
6        rest_api::{
7            self, NewOrderParams, NewOrderPriceMatchEnum, NewOrderSideEnum, NewOrderTimeInForceEnum,
8        },
9        websocket_streams,
10    },
11};
12use chrono::{DateTime, Utc};
13use rust_decimal::Decimal;
14use serde::Deserialize;
15use serde_json::Value;
16use std::{any::Any, collections::HashMap, num::NonZeroU32, sync::Arc, time::Duration};
17use tesser_broker::{
18    register_connector_factory, BrokerError, BrokerInfo, BrokerResult, ConnectorFactory,
19    ConnectorStream, ConnectorStreamConfig, ExecutionClient, MarketStream, Quota, RateLimiter,
20    RateLimiterError,
21};
22use tesser_core::{
23    AccountBalance, AssetId, Candle, ExchangeId, Fill, Instrument, InstrumentKind, Order,
24    OrderBook, OrderRequest, OrderStatus, OrderType, OrderUpdateRequest, Position, Side, Symbol,
25    TimeInForce,
26};
27use tokio::time::sleep;
28use uuid::Uuid;
29
30pub mod ws;
31
32pub use ws::{
33    extract_order_update, BinanceMarketStream, BinanceSubscription, BinanceUserDataStream,
34    UserDataStreamEventsResponse,
35};
36
37const BINANCE_DEFAULT_WEIGHT_LIMIT: u32 = 1_200;
38const WEIGHT_BACKOFF_RATIO: f32 = 0.9;
39const ORDER_WEIGHT: u32 = 1;
40const CANCEL_WEIGHT: u32 = 1;
41const QUERY_WEIGHT: u32 = 5;
42
43#[derive(Clone)]
44pub struct BinanceCredentials {
45    pub api_key: String,
46    pub api_secret: String,
47}
48
49pub struct BinanceConfig {
50    pub rest_url: String,
51    pub ws_url: String,
52    pub recv_window: u64,
53    pub weight_limit_per_minute: u32,
54}
55
56impl Default for BinanceConfig {
57    fn default() -> Self {
58        Self {
59            rest_url: "https://fapi.binance.com".to_string(),
60            ws_url: "wss://fstream.binance.com/stream".to_string(),
61            recv_window: 5_000,
62            weight_limit_per_minute: BINANCE_DEFAULT_WEIGHT_LIMIT,
63        }
64    }
65}
66
67impl BinanceConfig {
68    pub fn testnet() -> Self {
69        Self {
70            rest_url: "https://testnet.binancefuture.com".to_string(),
71            ws_url: "wss://stream.binancefuture.com/stream".to_string(),
72            ..Self::default()
73        }
74    }
75}
76
77pub struct BinanceClient {
78    rest: Arc<rest_api::RestApi>,
79    info: BrokerInfo,
80    credentials: Option<BinanceCredentials>,
81    config: BinanceConfig,
82    weight_limiter: Option<RateLimiter>,
83    exchange: ExchangeId,
84}
85
86impl BinanceClient {
87    pub fn new(
88        config: BinanceConfig,
89        credentials: Option<BinanceCredentials>,
90        exchange: ExchangeId,
91    ) -> Self {
92        let mut builder = ConfigurationRestApi::builder();
93        builder = builder.base_path(config.rest_url.clone());
94        if let Some(creds) = credentials.as_ref() {
95            builder = builder
96                .api_key(creds.api_key.clone())
97                .api_secret(creds.api_secret.clone());
98        }
99        let rest_cfg = builder
100            .build()
101            .expect("failed to build Binance REST configuration");
102        let rest = binance_futures::DerivativesTradingUsdsFuturesRestApi::from_config(rest_cfg);
103        let supports_testnet = config.rest_url.contains("testnet");
104        let weight_limiter = NonZeroU32::new(config.weight_limit_per_minute)
105            .map(Quota::per_minute)
106            .map(RateLimiter::direct);
107        Self {
108            rest: Arc::new(rest),
109            info: BrokerInfo {
110                name: "binance".into(),
111                markets: vec!["usd_perp".into()],
112                supports_testnet,
113            },
114            credentials,
115            config,
116            weight_limiter,
117            exchange,
118        }
119    }
120
121    pub fn credentials(&self) -> Option<BinanceCredentials> {
122        self.credentials.clone()
123    }
124
125    pub fn ws_url(&self) -> &str {
126        &self.config.ws_url
127    }
128
129    pub fn rest_url(&self) -> &str {
130        &self.config.rest_url
131    }
132
133    pub fn recv_window(&self) -> u64 {
134        self.config.recv_window
135    }
136
137    pub fn rest(&self) -> Arc<rest_api::RestApi> {
138        Arc::clone(&self.rest)
139    }
140
141    pub fn exchange(&self) -> ExchangeId {
142        self.exchange
143    }
144
145    async fn throttle_weight(&self, units: u32) -> BrokerResult<()> {
146        if units == 0 {
147            return Ok(());
148        }
149        if let Some(limiter) = &self.weight_limiter {
150            if let Some(nonzero) = NonZeroU32::new(units) {
151                limiter
152                    .until_units_ready(nonzero)
153                    .await
154                    .map_err(Self::rate_limiter_error)?;
155            }
156        }
157        Ok(())
158    }
159
160    async fn handle_weight_headers(&self, headers: &HashMap<String, String>) {
161        if self.config.weight_limit_per_minute == 0 {
162            return;
163        }
164        if let Some(value) = headers.get("x-mbx-used-weight-1m") {
165            if let Ok(used) = value.parse::<u32>() {
166                self.backoff_if_needed(used).await;
167            }
168        }
169    }
170
171    async fn backoff_if_needed(&self, used: u32) {
172        let limit = self.config.weight_limit_per_minute;
173        if limit == 0 {
174            return;
175        }
176        if used >= limit {
177            sleep(Duration::from_secs(1)).await;
178        } else if (used as f32) / (limit as f32) >= WEIGHT_BACKOFF_RATIO {
179            sleep(Duration::from_millis(200)).await;
180        }
181    }
182
183    async fn parse_response<T>(&self, result: anyhow::Result<RestApiResponse<T>>) -> BrokerResult<T>
184    where
185        T: Send + 'static,
186    {
187        let response = result.map_err(|err| BrokerError::Transport(err.to_string()))?;
188        self.handle_weight_headers(&response.headers).await;
189        response.data().await.map_err(map_connector_error)
190    }
191
192    fn rate_limiter_error(err: RateLimiterError) -> BrokerError {
193        BrokerError::Other(format!("rate limited: {err}"))
194    }
195
196    pub async fn start_user_stream(&self) -> BrokerResult<String> {
197        self.throttle_weight(QUERY_WEIGHT).await?;
198        let response = self
199            .parse_response(self.rest.start_user_data_stream().await)
200            .await?;
201        response
202            .listen_key
203            .ok_or_else(|| BrokerError::Other("missing listenKey".into()))
204    }
205
206    pub async fn keepalive_user_stream(&self) -> BrokerResult<String> {
207        self.throttle_weight(QUERY_WEIGHT).await?;
208        let response = self
209            .parse_response(self.rest.keepalive_user_data_stream().await)
210            .await?;
211        response
212            .listen_key
213            .ok_or_else(|| BrokerError::Other("missing listenKey".into()))
214    }
215}
216
217#[async_trait]
218impl ExecutionClient for BinanceClient {
219    fn info(&self) -> BrokerInfo {
220        self.info.clone()
221    }
222
223    async fn place_order(&self, request: OrderRequest) -> BrokerResult<Order> {
224        self.throttle_weight(ORDER_WEIGHT).await?;
225        let side = map_side(request.side);
226        let order_type = map_order_type(request.order_type);
227        let symbol_code = request.symbol.code().to_string();
228        let mut builder = NewOrderParams::builder(symbol_code, side, order_type);
229        let mut client_request = request.clone();
230        if let Some(price) = request.price {
231            builder = builder.price(Some(price));
232        }
233        builder = builder.quantity(Some(request.quantity));
234        if let Some(trigger) = request.trigger_price {
235            builder = builder.stop_price(Some(trigger));
236        }
237        let tif = match request
238            .time_in_force
239            .or_else(|| default_time_in_force(request.order_type))
240        {
241            Some(TimeInForce::GoodTilCanceled) => Some(NewOrderTimeInForceEnum::Gtc),
242            Some(TimeInForce::ImmediateOrCancel) => Some(NewOrderTimeInForceEnum::Ioc),
243            Some(TimeInForce::FillOrKill) => Some(NewOrderTimeInForceEnum::Fok),
244            None => None,
245        };
246        if let Some(value) = tif {
247            builder = builder.time_in_force(Some(value));
248        }
249        let client_id = client_request
250            .client_order_id
251            .clone()
252            .unwrap_or_else(|| format!("tesser-{}", Uuid::new_v4()));
253        client_request.client_order_id = Some(client_id.clone());
254        builder = builder
255            .new_client_order_id(Some(client_id))
256            .price_match(Some(NewOrderPriceMatchEnum::None))
257            .recv_window(Some(self.config.recv_window as i64));
258
259        let params = builder
260            .build()
261            .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
262        let response = self
263            .parse_response(self.rest.new_order(params).await)
264            .await?;
265        Ok(build_order_from_response(response, client_request))
266    }
267
268    async fn cancel_order(
269        &self,
270        order_id: tesser_core::OrderId,
271        symbol: Symbol,
272    ) -> BrokerResult<()> {
273        self.throttle_weight(CANCEL_WEIGHT).await?;
274        let mut builder = rest_api::CancelOrderParams::builder(symbol.code().to_string())
275            .recv_window(Some(self.config.recv_window as i64));
276        if let Ok(id) = order_id.parse::<i64>() {
277            builder = builder.order_id(Some(id));
278        } else {
279            builder = builder.orig_client_order_id(Some(order_id));
280        }
281        let params = builder
282            .build()
283            .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
284        self.parse_response(self.rest.cancel_order(params).await)
285            .await?;
286        Ok(())
287    }
288
289    async fn amend_order(&self, request: OrderUpdateRequest) -> BrokerResult<Order> {
290        self.throttle_weight(ORDER_WEIGHT).await?;
291        let new_price = request.new_price.ok_or_else(|| {
292            BrokerError::InvalidRequest("amend requires new price for Binance".into())
293        })?;
294        let new_qty = request.new_quantity.ok_or_else(|| {
295            BrokerError::InvalidRequest("amend requires new quantity for Binance".into())
296        })?;
297        let mut builder = rest_api::ModifyOrderParams::builder(
298            request.symbol.code().to_string(),
299            map_modify_side(request.side),
300            new_qty,
301            new_price,
302        )
303        .recv_window(Some(self.config.recv_window as i64));
304        if let Ok(id) = request.order_id.parse::<i64>() {
305            builder = builder.order_id(Some(id));
306        } else {
307            builder = builder.orig_client_order_id(Some(request.order_id.clone()));
308        }
309        let params = builder
310            .build()
311            .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
312        let response = self
313            .parse_response(self.rest.modify_order(params).await)
314            .await?;
315        Ok(build_order_from_modify_response(response, &request))
316    }
317
318    async fn list_open_orders(&self, symbol: Symbol) -> BrokerResult<Vec<Order>> {
319        self.throttle_weight(QUERY_WEIGHT).await?;
320        let params = rest_api::CurrentAllOpenOrdersParams::builder()
321            .symbol(Some(symbol.code().to_string()))
322            .recv_window(Some(self.config.recv_window as i64))
323            .build()
324            .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
325        let raw = self
326            .parse_response(self.rest.current_all_open_orders(params).await)
327            .await?;
328        let mut orders = Vec::new();
329        for entry in raw {
330            if let Some(order) = order_from_open_order(self.exchange, &entry) {
331                orders.push(order);
332            }
333        }
334        Ok(orders)
335    }
336
337    async fn account_balances(&self) -> BrokerResult<Vec<AccountBalance>> {
338        self.throttle_weight(QUERY_WEIGHT).await?;
339        let params = rest_api::FuturesAccountBalanceV3Params::builder()
340            .recv_window(Some(self.config.recv_window as i64))
341            .build()
342            .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
343        let balances = self
344            .parse_response(self.rest.futures_account_balance_v3(params).await)
345            .await?;
346        Ok(balances
347            .into_iter()
348            .filter_map(|entry| balance_from_entry(self.exchange, &entry))
349            .collect())
350    }
351
352    async fn positions(&self, _symbols: Option<&Vec<Symbol>>) -> BrokerResult<Vec<Position>> {
353        self.throttle_weight(QUERY_WEIGHT).await?;
354        let params = rest_api::PositionInformationV2Params::builder()
355            .recv_window(Some(self.config.recv_window as i64))
356            .build()
357            .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
358        let positions = self
359            .parse_response(self.rest.position_information_v2(params).await)
360            .await?;
361        Ok(positions
362            .into_iter()
363            .filter_map(|entry| position_from_entry(self.exchange, &entry))
364            .collect())
365    }
366
367    async fn list_instruments(&self, _category: &str) -> BrokerResult<Vec<Instrument>> {
368        self.throttle_weight(QUERY_WEIGHT).await?;
369        let info = self
370            .parse_response(self.rest.exchange_information().await)
371            .await?;
372        let mut instruments = Vec::new();
373        for symbol in info.symbols.unwrap_or_default() {
374            if let Some(instr) = instrument_from_symbol(self.exchange, &symbol) {
375                instruments.push(instr);
376            }
377        }
378        Ok(instruments)
379    }
380
381    fn as_any(&self) -> &dyn Any {
382        self
383    }
384}
385
386fn build_order_from_response(response: rest_api::NewOrderResponse, request: OrderRequest) -> Order {
387    let status = response
388        .status
389        .as_deref()
390        .map(map_order_status)
391        .unwrap_or(OrderStatus::PendingNew);
392    Order {
393        id: response
394            .order_id
395            .map(|id| id.to_string())
396            .unwrap_or_else(|| Utc::now().timestamp_millis().to_string()),
397        request,
398        status,
399        filled_quantity: parse_decimal_opt(response.executed_qty.as_deref())
400            .unwrap_or(Decimal::ZERO),
401        avg_fill_price: parse_decimal_opt(response.avg_price.as_deref()),
402        created_at: timestamp_from_ms(response.update_time),
403        updated_at: timestamp_from_ms(response.update_time),
404    }
405}
406
407fn build_order_from_modify_response(
408    response: rest_api::ModifyOrderResponse,
409    update: &OrderUpdateRequest,
410) -> Order {
411    let symbol = response
412        .symbol
413        .clone()
414        .map(|code| Symbol::from_code(update.symbol.exchange, code))
415        .unwrap_or(update.symbol);
416    let order_type = response
417        .r#type
418        .as_deref()
419        .map(map_order_type_from_str)
420        .unwrap_or(OrderType::Limit);
421    let quantity = parse_decimal_opt(response.orig_qty.as_deref())
422        .or(update.new_quantity)
423        .unwrap_or(Decimal::ZERO);
424    let price = parse_decimal_opt(response.price.as_deref()).or(update.new_price);
425    let request = OrderRequest {
426        symbol,
427        side: response
428            .side
429            .as_deref()
430            .map(map_order_side)
431            .unwrap_or(update.side),
432        order_type,
433        quantity,
434        price,
435        trigger_price: parse_decimal_opt(response.stop_price.as_deref()),
436        time_in_force: response.time_in_force.as_deref().and_then(map_tif_from_str),
437        client_order_id: response.client_order_id.clone(),
438        take_profit: None,
439        stop_loss: None,
440        display_quantity: None,
441    };
442    Order {
443        id: response
444            .order_id
445            .map(|id| id.to_string())
446            .unwrap_or_else(|| update.order_id.clone()),
447        request,
448        status: response
449            .status
450            .as_deref()
451            .map(map_order_status)
452            .unwrap_or(OrderStatus::PendingNew),
453        filled_quantity: parse_decimal_opt(response.executed_qty.as_deref())
454            .unwrap_or(Decimal::ZERO),
455        avg_fill_price: parse_decimal_opt(response.avg_price.as_deref()),
456        created_at: timestamp_from_ms(response.update_time),
457        updated_at: timestamp_from_ms(response.update_time),
458    }
459}
460
461fn order_from_open_order(
462    exchange: ExchangeId,
463    entry: &rest_api::AllOrdersResponseInner,
464) -> Option<Order> {
465    let symbol = entry
466        .symbol
467        .as_deref()
468        .map(|code| Symbol::from_code(exchange, code))?;
469    let request = OrderRequest {
470        symbol,
471        side: entry
472            .side
473            .as_deref()
474            .map(map_order_side)
475            .unwrap_or(Side::Buy),
476        order_type: entry
477            .r#type
478            .as_deref()
479            .map(map_order_type_from_str)
480            .unwrap_or(OrderType::Limit),
481        quantity: parse_decimal_opt(entry.orig_qty.as_deref())?,
482        price: parse_decimal_opt(entry.price.as_deref()),
483        trigger_price: parse_decimal_opt(entry.stop_price.as_deref()),
484        time_in_force: entry.time_in_force.as_deref().and_then(map_tif_from_str),
485        client_order_id: entry.client_order_id.clone(),
486        take_profit: None,
487        stop_loss: None,
488        display_quantity: None,
489    };
490    Some(Order {
491        id: entry
492            .order_id
493            .map(|id| id.to_string())
494            .or_else(|| entry.client_order_id.clone())?,
495        request,
496        status: entry
497            .status
498            .as_deref()
499            .map(map_order_status)
500            .unwrap_or(OrderStatus::PendingNew),
501        filled_quantity: parse_decimal_opt(entry.executed_qty.as_deref()).unwrap_or(Decimal::ZERO),
502        avg_fill_price: parse_decimal_opt(entry.avg_price.as_deref()),
503        created_at: timestamp_from_ms(entry.time),
504        updated_at: timestamp_from_ms(entry.update_time),
505    })
506}
507
508fn balance_from_entry(
509    exchange: ExchangeId,
510    entry: &rest_api::FuturesAccountBalanceV2ResponseInner,
511) -> Option<AccountBalance> {
512    let asset = entry
513        .asset
514        .as_deref()
515        .map(|code| AssetId::from_code(exchange, code))?;
516    let total = parse_decimal_opt(entry.balance.as_deref())?;
517    let available = parse_decimal_opt(entry.available_balance.as_deref()).unwrap_or(total);
518    Some(AccountBalance {
519        exchange,
520        asset,
521        total,
522        available,
523        updated_at: timestamp_from_ms(entry.update_time),
524    })
525}
526
527fn position_from_entry(
528    exchange: ExchangeId,
529    entry: &rest_api::PositionInformationV2ResponseInner,
530) -> Option<Position> {
531    let qty = parse_decimal_opt(entry.position_amt.as_deref())?;
532    if qty.is_zero() {
533        return None;
534    }
535    let side = if qty.is_sign_positive() {
536        Some(Side::Buy)
537    } else {
538        Some(Side::Sell)
539    };
540    let symbol = entry
541        .symbol
542        .as_deref()
543        .map(|code| Symbol::from_code(exchange, code))?;
544    Some(Position {
545        symbol,
546        side,
547        quantity: qty.abs(),
548        entry_price: parse_decimal_opt(entry.entry_price.as_deref()),
549        unrealized_pnl: parse_decimal_opt(entry.un_realized_profit.as_deref())
550            .unwrap_or(Decimal::ZERO),
551        updated_at: timestamp_from_ms(entry.update_time),
552    })
553}
554
555fn instrument_from_symbol(
556    exchange: ExchangeId,
557    symbol: &rest_api::ExchangeInformationResponseSymbolsInner,
558) -> Option<Instrument> {
559    let symbol_name = symbol
560        .symbol
561        .as_deref()
562        .map(|code| Symbol::from_code(exchange, code))?;
563    let base = symbol
564        .base_asset
565        .as_deref()
566        .map(|code| AssetId::from_code(exchange, code))?;
567    let quote = symbol
568        .quote_asset
569        .as_deref()
570        .map(|code| AssetId::from_code(exchange, code))?;
571    let settlement = symbol
572        .margin_asset
573        .as_deref()
574        .map(|code| AssetId::from_code(exchange, code))
575        .unwrap_or(quote);
576    let mut tick_size = Decimal::ONE;
577    let mut lot_size = Decimal::ONE;
578    if let Some(filters) = &symbol.filters {
579        for filter in filters {
580            match filter.filter_type.as_deref() {
581                Some("PRICE_FILTER") => {
582                    if let Some(tick) = parse_decimal_opt(filter.tick_size.as_deref()) {
583                        tick_size = tick;
584                    }
585                }
586                Some("LOT_SIZE") => {
587                    if let Some(step) = parse_decimal_opt(filter.step_size.as_deref()) {
588                        lot_size = step;
589                    }
590                }
591                _ => {}
592            }
593        }
594    }
595
596    Some(Instrument {
597        symbol: symbol_name,
598        base,
599        quote,
600        kind: InstrumentKind::LinearPerpetual,
601        settlement_currency: settlement,
602        tick_size,
603        lot_size,
604    })
605}
606
607fn map_order_status(value: &str) -> OrderStatus {
608    match value {
609        "NEW" => OrderStatus::Accepted,
610        "PARTIALLY_FILLED" => OrderStatus::PartiallyFilled,
611        "FILLED" => OrderStatus::Filled,
612        "CANCELED" | "EXPIRED" => OrderStatus::Canceled,
613        "REJECTED" => OrderStatus::Rejected,
614        _ => OrderStatus::PendingNew,
615    }
616}
617
618fn map_order_side(value: &str) -> Side {
619    match value {
620        "SELL" => Side::Sell,
621        _ => Side::Buy,
622    }
623}
624
625fn map_side(value: Side) -> NewOrderSideEnum {
626    match value {
627        Side::Buy => NewOrderSideEnum::Buy,
628        Side::Sell => NewOrderSideEnum::Sell,
629    }
630}
631
632fn map_modify_side(value: Side) -> rest_api::ModifyOrderSideEnum {
633    match value {
634        Side::Buy => rest_api::ModifyOrderSideEnum::Buy,
635        Side::Sell => rest_api::ModifyOrderSideEnum::Sell,
636    }
637}
638
639fn map_order_type(order_type: OrderType) -> String {
640    match order_type {
641        OrderType::Market => "MARKET".into(),
642        OrderType::Limit => "LIMIT".into(),
643        OrderType::StopMarket => "STOP_MARKET".into(),
644    }
645}
646
647fn map_order_type_from_str(value: &str) -> OrderType {
648    match value {
649        "MARKET" => OrderType::Market,
650        "STOP_MARKET" | "TAKE_PROFIT_MARKET" => OrderType::StopMarket,
651        _ => OrderType::Limit,
652    }
653}
654
655fn map_tif_from_str(value: &str) -> Option<TimeInForce> {
656    match value {
657        "GTC" => Some(TimeInForce::GoodTilCanceled),
658        "IOC" => Some(TimeInForce::ImmediateOrCancel),
659        "FOK" => Some(TimeInForce::FillOrKill),
660        _ => None,
661    }
662}
663
664fn default_time_in_force(order_type: OrderType) -> Option<TimeInForce> {
665    match order_type {
666        OrderType::Limit => Some(TimeInForce::GoodTilCanceled),
667        _ => None,
668    }
669}
670
671pub fn parse_decimal_opt(value: Option<&str>) -> Option<Decimal> {
672    value.and_then(|v| v.parse::<Decimal>().ok())
673}
674
675pub fn timestamp_from_ms(value: Option<i64>) -> DateTime<Utc> {
676    value
677        .and_then(DateTime::<Utc>::from_timestamp_millis)
678        .unwrap_or_else(Utc::now)
679}
680
681pub fn fill_from_update(
682    exchange: ExchangeId,
683    order: &websocket_streams::OrderTradeUpdateO,
684) -> Option<Fill> {
685    let last_qty = parse_decimal_opt(order.l.as_deref())?;
686    if last_qty.is_zero() {
687        return None;
688    }
689    let price = parse_decimal_opt(order.l_uppercase.as_deref())
690        .or_else(|| parse_decimal_opt(order.p.as_deref()))?;
691    let side = order
692        .s_uppercase
693        .as_deref()
694        .map(map_order_side)
695        .unwrap_or(Side::Buy);
696    let symbol = order
697        .s
698        .as_deref()
699        .map(|code| Symbol::from_code(exchange, code))
700        .unwrap_or(Symbol::unspecified());
701    let fee_currency = order
702        .n_uppercase
703        .as_deref()
704        .filter(|code| !code.is_empty())
705        .map(|code| AssetId::from_code(exchange, code));
706    Some(Fill {
707        order_id: order.i.map(|id| id.to_string()).unwrap_or_default(),
708        symbol,
709        side,
710        fill_price: price,
711        fill_quantity: last_qty,
712        fee: parse_decimal_opt(order.n.as_deref()),
713        fee_asset: fee_currency,
714        timestamp: timestamp_from_ms(order.t_uppercase),
715    })
716}
717
718pub fn order_from_update(
719    exchange: ExchangeId,
720    order: &websocket_streams::OrderTradeUpdateO,
721) -> Option<Order> {
722    let symbol = order
723        .s
724        .as_deref()
725        .map(|code| Symbol::from_code(exchange, code))?;
726    let request = OrderRequest {
727        symbol,
728        side: order
729            .s_uppercase
730            .as_deref()
731            .map(map_order_side)
732            .unwrap_or(Side::Buy),
733        order_type: order
734            .o
735            .as_deref()
736            .map(map_order_type_from_str)
737            .unwrap_or(OrderType::Limit),
738        quantity: parse_decimal_opt(order.q.as_deref()).unwrap_or(Decimal::ZERO),
739        price: parse_decimal_opt(order.p.as_deref()),
740        trigger_price: parse_decimal_opt(order.sp.as_deref()),
741        time_in_force: order.f.as_deref().and_then(map_tif_from_str),
742        client_order_id: order.c.clone(),
743        take_profit: None,
744        stop_loss: None,
745        display_quantity: None,
746    };
747    Some(Order {
748        id: order.i.map(|v| v.to_string()).unwrap_or_default(),
749        request,
750        status: order
751            .x_uppercase
752            .as_deref()
753            .map(map_order_status)
754            .unwrap_or(OrderStatus::PendingNew),
755        filled_quantity: parse_decimal_opt(order.z.as_deref()).unwrap_or(Decimal::ZERO),
756        avg_fill_price: parse_decimal_opt(order.ap.as_deref()),
757        created_at: timestamp_from_ms(order.t_uppercase),
758        updated_at: timestamp_from_ms(order.t_uppercase),
759    })
760}
761
762#[derive(Clone, Debug, Deserialize)]
763struct BinanceConnectorConfig {
764    #[serde(default = "default_rest_url")]
765    rest_url: String,
766    #[serde(default = "default_ws_url")]
767    ws_url: String,
768    #[serde(default = "default_exchange_name")]
769    exchange: String,
770    #[serde(default)]
771    api_key: String,
772    #[serde(default)]
773    api_secret: String,
774    #[serde(default = "default_recv_window")]
775    recv_window: u64,
776    #[serde(default = "default_weight_limit_per_minute")]
777    weight_limit_per_minute: u32,
778}
779
780fn default_rest_url() -> String {
781    "https://fapi.binance.com".into()
782}
783
784fn default_ws_url() -> String {
785    "wss://fstream.binance.com/stream".into()
786}
787
788fn default_exchange_name() -> String {
789    "binance_perp".into()
790}
791
792fn default_recv_window() -> u64 {
793    5_000
794}
795
796fn default_weight_limit_per_minute() -> u32 {
797    BINANCE_DEFAULT_WEIGHT_LIMIT
798}
799
800#[derive(Default)]
801pub struct BinanceFactory;
802
803impl BinanceFactory {
804    fn parse_config(&self, value: &Value) -> BrokerResult<BinanceConnectorConfig> {
805        serde_json::from_value(value.clone()).map_err(|err| {
806            BrokerError::InvalidRequest(format!("invalid binance connector config: {err}"))
807        })
808    }
809
810    fn credentials(cfg: &BinanceConnectorConfig) -> Option<BinanceCredentials> {
811        if cfg.api_key.trim().is_empty() || cfg.api_secret.trim().is_empty() {
812            None
813        } else {
814            Some(BinanceCredentials {
815                api_key: cfg.api_key.clone(),
816                api_secret: cfg.api_secret.clone(),
817            })
818        }
819    }
820}
821
822const BINANCE_DEFAULT_DEPTH: usize = 50;
823
824pub fn register_factory() {
825    register_connector_factory(Arc::new(BinanceFactory));
826}
827
828#[async_trait]
829impl ConnectorFactory for BinanceFactory {
830    fn name(&self) -> &str {
831        "binance"
832    }
833
834    async fn create_execution_client(
835        &self,
836        config: &Value,
837    ) -> BrokerResult<Arc<dyn ExecutionClient>> {
838        let cfg = self.parse_config(config)?;
839        let exchange = ExchangeId::from(cfg.exchange.as_str());
840        let binance_cfg = BinanceConfig {
841            rest_url: cfg.rest_url.clone(),
842            ws_url: cfg.ws_url.clone(),
843            recv_window: cfg.recv_window,
844            weight_limit_per_minute: cfg.weight_limit_per_minute,
845        };
846        Ok(Arc::new(BinanceClient::new(
847            binance_cfg,
848            Self::credentials(&cfg),
849            exchange,
850        )))
851    }
852
853    async fn create_market_stream(
854        &self,
855        config: &Value,
856        stream_config: ConnectorStreamConfig,
857    ) -> BrokerResult<Box<dyn ConnectorStream>> {
858        let cfg = self.parse_config(config)?;
859        let exchange = ExchangeId::from(cfg.exchange.as_str());
860        let stream = BinanceMarketStream::connect(
861            &cfg.ws_url,
862            &cfg.rest_url,
863            stream_config.connection_status,
864            exchange,
865        )
866        .await?;
867        Ok(Box::new(BinanceConnectorStream::new(
868            stream,
869            stream_config
870                .metadata
871                .get("orderbook_depth")
872                .and_then(|v| v.as_u64())
873                .map(|v| v as usize)
874                .unwrap_or(BINANCE_DEFAULT_DEPTH),
875        )))
876    }
877}
878
879struct BinanceConnectorStream {
880    inner: BinanceMarketStream,
881    depth: usize,
882}
883
884impl BinanceConnectorStream {
885    fn new(inner: BinanceMarketStream, depth: usize) -> Self {
886        Self { inner, depth }
887    }
888}
889
890#[async_trait]
891impl ConnectorStream for BinanceConnectorStream {
892    async fn subscribe(
893        &mut self,
894        symbols: &[String],
895        interval: tesser_core::Interval,
896    ) -> BrokerResult<()> {
897        for symbol in symbols {
898            self.inner
899                .subscribe(BinanceSubscription::Trades {
900                    symbol: symbol.clone(),
901                })
902                .await?;
903            self.inner
904                .subscribe(BinanceSubscription::Kline {
905                    symbol: symbol.clone(),
906                    interval,
907                })
908                .await?;
909            self.inner
910                .subscribe(BinanceSubscription::OrderBook {
911                    symbol: symbol.clone(),
912                    depth: self.depth,
913                })
914                .await?;
915        }
916        Ok(())
917    }
918
919    async fn next_tick(&mut self) -> BrokerResult<Option<tesser_core::Tick>> {
920        self.inner.next_tick().await
921    }
922
923    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
924        self.inner.next_candle().await
925    }
926
927    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
928        self.inner.next_order_book().await
929    }
930}
931
932fn map_connector_error(err: ConnectorError) -> BrokerError {
933    match err {
934        ConnectorError::UnauthorizedError(msg) => BrokerError::Authentication(msg),
935        ConnectorError::ForbiddenError(msg)
936        | ConnectorError::TooManyRequestsError(msg)
937        | ConnectorError::RateLimitBanError(msg)
938        | ConnectorError::ServerError { msg, .. } => BrokerError::Exchange(msg),
939        ConnectorError::NetworkError(msg) => BrokerError::Transport(msg),
940        ConnectorError::NotFoundError(msg) | ConnectorError::BadRequestError(msg) => {
941            BrokerError::InvalidRequest(msg)
942        }
943        ConnectorError::ConnectorClientError(msg) => BrokerError::Other(msg),
944    }
945}