Skip to main content

bat_markets_binance/
lib.rs

1//! Binance linear futures adapter.
2
3pub mod native;
4
5use std::{
6    sync::Arc,
7    time::{SystemTime, UNIX_EPOCH},
8};
9
10use parking_lot::RwLock;
11use rust_decimal::Decimal;
12
13use bat_markets_core::{
14    AccountSnapshot, AggressorSide, AssetCode, Balance, BatMarketsConfig, CapabilitySet,
15    CommandOperation, CommandReceipt, CommandStatus, ErrorKind, Execution, FastBookTop, FastKline,
16    FastMarkPrice, FastOrderBookDelta, FastTicker, FastTrade, FetchOhlcvRequest,
17    FetchTradesRequest, FundingRate, InstrumentCatalog, InstrumentId, InstrumentSpec,
18    InstrumentStatus, InstrumentSupport, Kline, KlineInterval, Leverage, Liquidity, MarginMode,
19    MarketError, MarketType, Notional, OpenInterest, Order, OrderId, OrderStatus, OrderType,
20    Position, PositionDirection, PositionId, PositionMode, Price, PrivateLaneEvent, Product,
21    PublicLaneEvent, Quantity, Rate, RequestId, Result, Side, Ticker, TimeInForce, TimestampMs,
22    TradeId, Venue, VenueAdapter,
23};
24
25/// Binance linear futures adapter with a handwritten, fixture-backed contract.
26#[derive(Clone, Debug)]
27pub struct BinanceLinearFuturesAdapter {
28    config: BatMarketsConfig,
29    capabilities: CapabilitySet,
30    lane_set: bat_markets_core::LaneSet,
31    instruments: Arc<RwLock<InstrumentCatalog>>,
32}
33
34impl Default for BinanceLinearFuturesAdapter {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl BinanceLinearFuturesAdapter {
41    #[must_use]
42    pub fn new() -> Self {
43        Self::with_config(BatMarketsConfig::new(Venue::Binance, Product::LinearUsdt))
44    }
45
46    #[must_use]
47    pub fn with_config(config: BatMarketsConfig) -> Self {
48        Self {
49            config,
50            capabilities: CapabilitySet::linear_futures_defaults(),
51            lane_set: bat_markets_core::LaneSet::linear_futures_defaults(),
52            instruments: Arc::new(RwLock::new(InstrumentCatalog::new([
53                btc_spec(),
54                eth_spec(),
55            ]))),
56        }
57    }
58
59    pub fn replace_instruments(&self, instruments: Vec<InstrumentSpec>) {
60        self.instruments.write().replace(instruments);
61    }
62
63    pub fn parse_native_public(&self, payload: &str) -> Result<native::PublicMessage> {
64        serde_json::from_str(payload).map_err(|error| {
65            MarketError::new(
66                ErrorKind::DecodeError,
67                format!("failed to parse binance public payload: {error}"),
68            )
69            .with_venue(Venue::Binance, Product::LinearUsdt)
70            .with_operation("binance.parse_native_public")
71        })
72    }
73
74    pub fn parse_native_private(&self, payload: &str) -> Result<native::PrivateMessage> {
75        serde_json::from_str(payload).map_err(|error| {
76            MarketError::new(
77                ErrorKind::DecodeError,
78                format!("failed to parse binance private payload: {error}"),
79            )
80            .with_venue(Venue::Binance, Product::LinearUsdt)
81            .with_operation("binance.parse_native_private")
82        })
83    }
84
85    pub fn parse_server_time(&self, payload: &str) -> Result<TimestampMs> {
86        let response =
87            serde_json::from_str::<native::ServerTimeResponse>(payload).map_err(|error| {
88                MarketError::new(
89                    ErrorKind::DecodeError,
90                    format!("failed to parse binance server-time response: {error}"),
91                )
92            })?;
93        Ok(TimestampMs::new(response.server_time))
94    }
95
96    pub fn parse_metadata_snapshot(&self, payload: &str) -> Result<Vec<InstrumentSpec>> {
97        let response =
98            serde_json::from_str::<native::ExchangeInfoResponse>(payload).map_err(|error| {
99                MarketError::new(
100                    ErrorKind::DecodeError,
101                    format!("failed to parse binance exchangeInfo response: {error}"),
102                )
103                .with_venue(Venue::Binance, Product::LinearUsdt)
104                .with_operation("binance.parse_metadata_snapshot")
105            })?;
106
107        let mut instruments = Vec::new();
108        for symbol in response.symbols {
109            if symbol.contract_type != "PERPETUAL" || symbol.quote_asset != "USDT" {
110                continue;
111            }
112
113            let tick_size = require_filter_decimal(&symbol.filters, "PRICE_FILTER", |filter| {
114                filter.tick_size.as_deref()
115            })?;
116            let step_size = require_filter_decimal(&symbol.filters, "LOT_SIZE", |filter| {
117                filter.step_size.as_deref()
118            })?;
119            let min_qty = require_filter_decimal(&symbol.filters, "LOT_SIZE", |filter| {
120                filter.min_qty.as_deref()
121            })?;
122            let min_notional = require_filter_decimal(&symbol.filters, "MIN_NOTIONAL", |filter| {
123                filter.notional.as_deref()
124            })?;
125
126            let price_scale = decimal_scale(tick_size);
127            let qty_scale = decimal_scale(step_size);
128            let quote_scale = symbol
129                .quote_precision
130                .max(price_scale.saturating_add(qty_scale))
131                .max(decimal_scale(min_notional));
132
133            instruments.push(InstrumentSpec {
134                venue: Venue::Binance,
135                product: Product::LinearUsdt,
136                market_type: MarketType::LinearPerpetual,
137                instrument_id: InstrumentId::from(canonical_symbol(
138                    &symbol.base_asset,
139                    &symbol.quote_asset,
140                    &symbol.margin_asset,
141                )),
142                canonical_symbol: canonical_symbol(
143                    &symbol.base_asset,
144                    &symbol.quote_asset,
145                    &symbol.margin_asset,
146                )
147                .into(),
148                native_symbol: symbol.symbol.into(),
149                base: AssetCode::from(symbol.base_asset),
150                quote: AssetCode::from(symbol.quote_asset),
151                settle: AssetCode::from(symbol.margin_asset),
152                contract_size: Quantity::new(Decimal::ONE),
153                tick_size: Price::new(tick_size),
154                step_size: Quantity::new(step_size),
155                min_qty: Quantity::new(min_qty),
156                min_notional: Notional::new(min_notional),
157                price_scale,
158                qty_scale,
159                quote_scale,
160                max_leverage: None,
161                support: InstrumentSupport {
162                    public_streams: true,
163                    private_trading: true,
164                    leverage_set: true,
165                    margin_mode_set: true,
166                    funding_rate: true,
167                    open_interest: true,
168                },
169                status: parse_instrument_status(&symbol.status),
170            });
171        }
172
173        Ok(instruments)
174    }
175
176    pub fn parse_account_snapshot(
177        &self,
178        payload: &str,
179        observed_at: TimestampMs,
180    ) -> Result<(AccountSnapshot, Vec<Position>)> {
181        let response =
182            serde_json::from_str::<native::AccountInfoResponse>(payload).map_err(|error| {
183                MarketError::new(
184                    ErrorKind::DecodeError,
185                    format!("failed to parse binance account snapshot: {error}"),
186                )
187                .with_venue(Venue::Binance, Product::LinearUsdt)
188                .with_operation("binance.parse_account_snapshot")
189            })?;
190
191        let balances = response
192            .assets
193            .into_iter()
194            .map(|asset| {
195                Ok(Balance {
196                    asset: AssetCode::from(asset.asset),
197                    wallet_balance: balance_amount(&asset.wallet_balance)?,
198                    available_balance: balance_amount(&asset.available_balance)?,
199                    updated_at: observed_at,
200                })
201            })
202            .collect::<Result<Vec<_>>>()?;
203
204        let positions = response
205            .positions
206            .into_iter()
207            .filter_map(|position| match parse_decimal(&position.position_amount) {
208                Ok(size) if size.is_zero() => None,
209                Ok(size) => Some(self.position_from_account_snapshot(position, observed_at, size)),
210                Err(error) => Some(Err(error)),
211            })
212            .collect::<Result<Vec<_>>>()?;
213
214        let account = AccountSnapshot {
215            balances,
216            summary: Some(bat_markets_core::AccountSummary {
217                total_wallet_balance: balance_amount(&response.total_wallet_balance)?,
218                total_available_balance: balance_amount(&response.available_balance)?,
219                total_unrealized_pnl: balance_amount(&response.total_unrealized_profit)?,
220                updated_at: observed_at,
221            }),
222        };
223
224        Ok((account, positions))
225    }
226
227    pub fn parse_open_orders_snapshot(
228        &self,
229        payload: &str,
230        observed_at: TimestampMs,
231    ) -> Result<Vec<Order>> {
232        let snapshots =
233            serde_json::from_str::<Vec<native::OrderSnapshot>>(payload).map_err(|error| {
234                MarketError::new(
235                    ErrorKind::DecodeError,
236                    format!("failed to parse binance open-orders snapshot: {error}"),
237                )
238                .with_venue(Venue::Binance, Product::LinearUsdt)
239                .with_operation("binance.parse_open_orders_snapshot")
240            })?;
241
242        snapshots
243            .into_iter()
244            .map(|snapshot| self.order_from_snapshot(snapshot, observed_at))
245            .collect()
246    }
247
248    pub fn parse_open_algo_orders_snapshot(
249        &self,
250        payload: &str,
251        _observed_at: TimestampMs,
252    ) -> Result<Vec<Order>> {
253        let snapshots =
254            serde_json::from_str::<Vec<native::AlgoOrderSnapshot>>(payload).map_err(|error| {
255                MarketError::new(
256                    ErrorKind::DecodeError,
257                    format!("failed to parse binance open-algo-orders snapshot: {error}"),
258                )
259                .with_venue(Venue::Binance, Product::LinearUsdt)
260                .with_operation("binance.parse_open_algo_orders_snapshot")
261            })?;
262
263        snapshots
264            .into_iter()
265            .map(|snapshot| self.algo_order_from_snapshot(snapshot))
266            .collect()
267    }
268
269    pub fn parse_order_snapshot(&self, payload: &str, observed_at: TimestampMs) -> Result<Order> {
270        if let Ok(snapshot) = serde_json::from_str::<native::OrderSnapshot>(payload) {
271            return self.order_from_snapshot(snapshot, observed_at);
272        }
273        let snapshot =
274            serde_json::from_str::<native::AlgoOrderSnapshot>(payload).map_err(|error| {
275                MarketError::new(
276                    ErrorKind::DecodeError,
277                    format!("failed to parse binance order snapshot: {error}"),
278                )
279                .with_venue(Venue::Binance, Product::LinearUsdt)
280                .with_operation("binance.parse_order_snapshot")
281            })?;
282        self.algo_order_from_snapshot(snapshot)
283    }
284
285    pub fn parse_order_history_snapshot(
286        &self,
287        payload: &str,
288        observed_at: TimestampMs,
289    ) -> Result<Vec<Order>> {
290        self.parse_open_orders_snapshot(payload, observed_at)
291    }
292
293    pub fn parse_executions_snapshot(&self, payload: &str) -> Result<Vec<Execution>> {
294        let snapshots =
295            serde_json::from_str::<Vec<native::UserTradeSnapshot>>(payload).map_err(|error| {
296                MarketError::new(
297                    ErrorKind::DecodeError,
298                    format!("failed to parse binance user-trades snapshot: {error}"),
299                )
300                .with_venue(Venue::Binance, Product::LinearUsdt)
301                .with_operation("binance.parse_executions_snapshot")
302            })?;
303
304        snapshots
305            .into_iter()
306            .map(|snapshot| self.execution_from_snapshot(snapshot))
307            .collect()
308    }
309
310    pub fn parse_ticker_snapshot(
311        &self,
312        payload: &str,
313        instrument_id: &InstrumentId,
314    ) -> Result<Ticker> {
315        let snapshot =
316            serde_json::from_str::<native::TickerSnapshot>(payload).map_err(|error| {
317                MarketError::new(
318                    ErrorKind::DecodeError,
319                    format!("failed to parse binance ticker snapshot: {error}"),
320                )
321                .with_venue(Venue::Binance, Product::LinearUsdt)
322                .with_operation("binance.parse_ticker_snapshot")
323            })?;
324        let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
325            MarketError::new(
326                ErrorKind::Unsupported,
327                format!("unsupported binance instrument '{}'", instrument_id),
328            )
329            .with_venue(Venue::Binance, Product::LinearUsdt)
330        })?;
331
332        Ok(FastTicker {
333            instrument_id: spec.instrument_id.clone(),
334            last_price: Price::new(parse_decimal(&snapshot.last_price)?)
335                .quantize(spec.price_scale)?,
336            mark_price: None,
337            index_price: None,
338            volume_24h: Some(
339                Quantity::new(parse_decimal(&snapshot.volume)?).quantize(spec.qty_scale)?,
340            ),
341            turnover_24h: quantize_optional_notional(
342                parse_decimal(&snapshot.quote_volume)?,
343                spec.quote_scale,
344            ),
345            event_time: TimestampMs::new(snapshot.close_time),
346        }
347        .to_unified(&spec))
348    }
349
350    pub fn parse_trades_snapshot(
351        &self,
352        payload: &str,
353        request: &FetchTradesRequest,
354    ) -> Result<Vec<bat_markets_core::TradeTick>> {
355        let spec = self
356            .resolve_instrument(&request.instrument_id)
357            .ok_or_else(|| {
358                MarketError::new(
359                    ErrorKind::Unsupported,
360                    format!("unsupported binance instrument '{}'", request.instrument_id),
361                )
362                .with_venue(Venue::Binance, Product::LinearUsdt)
363            })?;
364        let snapshots =
365            serde_json::from_str::<Vec<native::AggTradeSnapshot>>(payload).map_err(|error| {
366                MarketError::new(
367                    ErrorKind::DecodeError,
368                    format!("failed to parse binance trades snapshot: {error}"),
369                )
370                .with_venue(Venue::Binance, Product::LinearUsdt)
371                .with_operation("binance.parse_trades_snapshot")
372            })?;
373
374        snapshots
375            .into_iter()
376            .map(|snapshot| {
377                Ok(FastTrade {
378                    instrument_id: spec.instrument_id.clone(),
379                    trade_id: TradeId::from(snapshot.agg_trade_id.to_string()),
380                    price: Price::new(parse_decimal(&snapshot.price)?)
381                        .quantize(spec.price_scale)?,
382                    quantity: Quantity::new(parse_decimal(&snapshot.quantity)?)
383                        .quantize(spec.qty_scale)?,
384                    aggressor_side: if snapshot.is_buyer_maker {
385                        AggressorSide::Seller
386                    } else {
387                        AggressorSide::Buyer
388                    },
389                    event_time: TimestampMs::new(snapshot.trade_time),
390                }
391                .to_unified(&spec))
392            })
393            .collect()
394    }
395
396    pub fn parse_book_top_snapshot(
397        &self,
398        payload: &str,
399        instrument_id: &InstrumentId,
400    ) -> Result<bat_markets_core::BookTop> {
401        let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
402            MarketError::new(
403                ErrorKind::Unsupported,
404                format!("unsupported binance instrument '{}'", instrument_id),
405            )
406            .with_venue(Venue::Binance, Product::LinearUsdt)
407        })?;
408        let snapshot =
409            serde_json::from_str::<native::BookTickerSnapshot>(payload).map_err(|error| {
410                MarketError::new(
411                    ErrorKind::DecodeError,
412                    format!("failed to parse binance book-ticker snapshot: {error}"),
413                )
414                .with_venue(Venue::Binance, Product::LinearUsdt)
415                .with_operation("binance.parse_book_top_snapshot")
416            })?;
417
418        Ok(FastBookTop {
419            instrument_id: spec.instrument_id.clone(),
420            bid_price: Price::new(parse_decimal(&snapshot.bid_price)?)
421                .quantize(spec.price_scale)?,
422            bid_quantity: Quantity::new(parse_decimal(&snapshot.bid_qty)?)
423                .quantize(spec.qty_scale)?,
424            ask_price: Price::new(parse_decimal(&snapshot.ask_price)?)
425                .quantize(spec.price_scale)?,
426            ask_quantity: Quantity::new(parse_decimal(&snapshot.ask_qty)?)
427                .quantize(spec.qty_scale)?,
428            event_time: TimestampMs::new(snapshot.time),
429        }
430        .to_unified(&spec))
431    }
432
433    pub fn parse_ohlcv_snapshot(
434        &self,
435        payload: &str,
436        request: &FetchOhlcvRequest,
437    ) -> Result<Vec<Kline>> {
438        let interval = KlineInterval::parse(request.interval.as_ref()).ok_or_else(|| {
439            MarketError::new(
440                ErrorKind::Unsupported,
441                format!("unsupported binance OHLCV interval '{}'", request.interval),
442            )
443            .with_venue(Venue::Binance, Product::LinearUsdt)
444        })?;
445        let instrument_id = request.single_instrument_id()?;
446        let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
447            MarketError::new(
448                ErrorKind::Unsupported,
449                format!("unsupported binance instrument '{}'", instrument_id),
450            )
451            .with_venue(Venue::Binance, Product::LinearUsdt)
452        })?;
453        let rows =
454            serde_json::from_str::<Vec<Vec<serde_json::Value>>>(payload).map_err(|error| {
455                MarketError::new(
456                    ErrorKind::DecodeError,
457                    format!("failed to parse binance klines snapshot: {error}"),
458                )
459                .with_venue(Venue::Binance, Product::LinearUsdt)
460                .with_operation("binance.parse_ohlcv_snapshot")
461            })?;
462
463        let mut klines = rows
464            .into_iter()
465            .map(|row| parse_binance_kline_row(&spec, interval, row))
466            .collect::<Result<Vec<_>>>()?;
467        klines.sort_by_key(|kline| kline.open_time.value());
468        Ok(klines)
469    }
470
471    fn position_from_account_snapshot(
472        &self,
473        position: native::AccountPositionSnapshot,
474        observed_at: TimestampMs,
475        size: Decimal,
476    ) -> Result<Position> {
477        let spec = self.require_native_symbol(&position.symbol)?;
478        Ok(Position {
479            position_id: PositionId::from(format!(
480                "binance:{}:{}",
481                position.symbol, position.position_side
482            )),
483            instrument_id: spec.instrument_id.clone(),
484            direction: decimal_direction(size),
485            size: Quantity::new(size.abs()),
486            entry_price: parse_optional_decimal(position.entry_price.as_deref())?.map(Price::new),
487            mark_price: None,
488            unrealized_pnl: position
489                .unrealized_profit
490                .as_deref()
491                .map(balance_amount)
492                .transpose()?,
493            leverage: parse_optional_decimal(position.leverage.as_deref())?.map(Leverage::new),
494            margin_mode: parse_margin_mode_snapshot(
495                position.margin_type.as_deref(),
496                position.isolated,
497                position.isolated_margin.as_deref(),
498                position.isolated_wallet.as_deref(),
499            )?,
500            position_mode: parse_position_mode(&position.position_side),
501            updated_at: observed_at,
502        })
503    }
504
505    fn order_from_snapshot(
506        &self,
507        snapshot: native::OrderSnapshot,
508        observed_at: TimestampMs,
509    ) -> Result<Order> {
510        let spec = self.require_native_symbol(&snapshot.symbol)?;
511        let average_fill_price = if snapshot.average_price == "0" {
512            None
513        } else {
514            Some(Price::new(parse_decimal(&snapshot.average_price)?))
515        };
516        Ok(Order {
517            order_id: OrderId::from(snapshot.order_id.to_string()),
518            client_order_id: Some(snapshot.client_order_id.into()),
519            instrument_id: spec.instrument_id.clone(),
520            side: parse_side(&snapshot.side)?,
521            order_type: parse_order_type(&snapshot.order_type)?,
522            time_in_force: Some(parse_time_in_force(&snapshot.time_in_force)?),
523            status: parse_order_status(&snapshot.status)?,
524            price: Some(Price::new(parse_decimal(&snapshot.price)?)),
525            quantity: Quantity::new(parse_decimal(&snapshot.original_quantity)?),
526            filled_quantity: Quantity::new(parse_decimal(&snapshot.executed_quantity)?),
527            average_fill_price,
528            reduce_only: snapshot.reduce_only,
529            post_only: matches!(snapshot.time_in_force.as_str(), "GTX"),
530            created_at: snapshot
531                .created_time
532                .map(TimestampMs::new)
533                .unwrap_or(observed_at),
534            updated_at: TimestampMs::new(snapshot.update_time),
535            venue_status: Some(snapshot.status.into()),
536        })
537    }
538
539    fn algo_order_from_snapshot(&self, snapshot: native::AlgoOrderSnapshot) -> Result<Order> {
540        let spec = self.require_native_symbol(&snapshot.symbol)?;
541        Ok(Order {
542            order_id: binance_algo_order_id(snapshot.algo_id),
543            client_order_id: Some(snapshot.client_algo_id.into()),
544            instrument_id: spec.instrument_id.clone(),
545            side: parse_side(&snapshot.side)?,
546            order_type: parse_order_type(&snapshot.order_type)?,
547            time_in_force: parse_optional_time_in_force(snapshot.time_in_force.as_deref())?,
548            status: parse_algo_order_status(&snapshot.algo_status, Decimal::ZERO),
549            price: parse_optional_price_or_empty(snapshot.price.as_deref())?,
550            quantity: Quantity::new(parse_decimal(&snapshot.quantity)?),
551            filled_quantity: Quantity::new(Decimal::ZERO),
552            average_fill_price: parse_optional_price_or_empty(snapshot.actual_price.as_deref())?,
553            reduce_only: snapshot.reduce_only.unwrap_or(false),
554            post_only: matches!(snapshot.time_in_force.as_deref(), Some("GTX")),
555            created_at: TimestampMs::new(snapshot.create_time),
556            updated_at: TimestampMs::new(snapshot.update_time),
557            venue_status: Some(snapshot.algo_status.into()),
558        })
559    }
560
561    fn algo_order_from_update_event(&self, event: native::AlgoOrderUpdateEvent) -> Result<Order> {
562        let spec = self.require_native_symbol(&event.order.symbol)?;
563        let filled_quantity =
564            parse_optional_decimal_or_empty(event.order.executed_quantity.as_deref())?
565                .unwrap_or(Decimal::ZERO);
566        let created_at = event
567            .order
568            .trigger_time
569            .filter(|time| *time > 0)
570            .unwrap_or(event.transaction_time);
571        Ok(Order {
572            order_id: binance_algo_order_id(event.order.algo_id),
573            client_order_id: Some(event.order.client_algo_id.into()),
574            instrument_id: spec.instrument_id.clone(),
575            side: parse_side(&event.order.side)?,
576            order_type: parse_order_type(&event.order.order_type)?,
577            time_in_force: parse_optional_time_in_force(event.order.time_in_force.as_deref())?,
578            status: parse_algo_order_status(&event.order.algo_status, filled_quantity),
579            price: parse_optional_price_or_empty(event.order.price.as_deref())?,
580            quantity: Quantity::new(parse_decimal(&event.order.quantity)?),
581            filled_quantity: Quantity::new(filled_quantity),
582            average_fill_price: parse_optional_price_or_empty(
583                event.order.average_price.as_deref(),
584            )?,
585            reduce_only: event.order.reduce_only,
586            post_only: matches!(event.order.time_in_force.as_deref(), Some("GTX")),
587            created_at: TimestampMs::new(created_at),
588            updated_at: TimestampMs::new(event.event_time.max(event.transaction_time)),
589            venue_status: Some(event.order.algo_status.into()),
590        })
591    }
592
593    fn execution_from_snapshot(&self, snapshot: native::UserTradeSnapshot) -> Result<Execution> {
594        let spec = self.require_native_symbol(&snapshot.symbol)?;
595        Ok(Execution {
596            execution_id: TradeId::from(snapshot.id.to_string()),
597            order_id: OrderId::from(snapshot.order_id.to_string()),
598            client_order_id: None,
599            instrument_id: spec.instrument_id.clone(),
600            side: parse_side(&snapshot.side)?,
601            quantity: Quantity::new(parse_decimal(&snapshot.qty)?),
602            price: Price::new(parse_decimal(&snapshot.price)?),
603            fee: Some(balance_amount(&snapshot.commission)?),
604            fee_asset: Some(AssetCode::from(snapshot.commission_asset)),
605            liquidity: Some(if snapshot.maker {
606                bat_markets_core::Liquidity::Maker
607            } else {
608                bat_markets_core::Liquidity::Taker
609            }),
610            executed_at: TimestampMs::new(snapshot.time),
611        })
612    }
613
614    fn require_native_symbol(&self, native_symbol: &str) -> Result<InstrumentSpec> {
615        self.resolve_native_symbol(native_symbol).ok_or_else(|| {
616            MarketError::new(
617                ErrorKind::Unsupported,
618                format!("unsupported binance symbol '{native_symbol}'"),
619            )
620            .with_venue(Venue::Binance, Product::LinearUsdt)
621        })
622    }
623}
624
625impl VenueAdapter for BinanceLinearFuturesAdapter {
626    fn venue(&self) -> Venue {
627        Venue::Binance
628    }
629
630    fn product(&self) -> Product {
631        Product::LinearUsdt
632    }
633
634    fn config(&self) -> &BatMarketsConfig {
635        &self.config
636    }
637
638    fn capabilities(&self) -> CapabilitySet {
639        self.capabilities
640    }
641
642    fn lane_set(&self) -> bat_markets_core::LaneSet {
643        self.lane_set
644    }
645
646    fn instrument_specs(&self) -> Vec<InstrumentSpec> {
647        self.instruments.read().all()
648    }
649
650    fn resolve_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentSpec> {
651        self.instruments.read().get(instrument_id)
652    }
653
654    fn resolve_native_symbol(&self, native_symbol: &str) -> Option<InstrumentSpec> {
655        self.instruments.read().by_native_symbol(native_symbol)
656    }
657
658    fn parse_public(&self, payload: &str) -> Result<Vec<PublicLaneEvent>> {
659        if let Ok(snapshot) = serde_json::from_str::<native::OpenInterestSnapshot>(payload) {
660            let spec = self.require_native_symbol(&snapshot.symbol)?;
661            let value = Quantity::new(parse_decimal(&snapshot.open_interest)?);
662            return Ok(vec![PublicLaneEvent::OpenInterest(OpenInterest {
663                instrument_id: spec.instrument_id.clone(),
664                value,
665                event_time: TimestampMs::new(snapshot.time),
666            })]);
667        }
668
669        let message = self.parse_native_public(payload)?;
670        match message {
671            native::PublicMessage::Ticker(event) => {
672                let spec = self.require_native_symbol(&event.symbol)?;
673                Ok(vec![PublicLaneEvent::Ticker(FastTicker {
674                    instrument_id: spec.instrument_id.clone(),
675                    last_price: Price::new(parse_decimal(&event.last_price)?)
676                        .quantize(spec.price_scale)?,
677                    mark_price: None,
678                    index_price: None,
679                    volume_24h: Some(
680                        Quantity::new(parse_decimal(&event.volume_24h)?)
681                            .quantize(spec.qty_scale)?,
682                    ),
683                    turnover_24h: quantize_optional_notional(
684                        parse_decimal(&event.quote_volume_24h)?,
685                        spec.quote_scale,
686                    ),
687                    event_time: TimestampMs::new(event.event_time),
688                })])
689            }
690            native::PublicMessage::AggTrade(event) => {
691                let spec = self.require_native_symbol(&event.symbol)?;
692                Ok(vec![PublicLaneEvent::Trade(FastTrade {
693                    instrument_id: spec.instrument_id.clone(),
694                    trade_id: TradeId::from(event.agg_trade_id.to_string()),
695                    price: Price::new(parse_decimal(&event.price)?).quantize(spec.price_scale)?,
696                    quantity: Quantity::new(parse_decimal(&event.quantity)?)
697                        .quantize(spec.qty_scale)?,
698                    aggressor_side: if event.is_buyer_maker {
699                        AggressorSide::Seller
700                    } else {
701                        AggressorSide::Buyer
702                    },
703                    event_time: TimestampMs::new(event.trade_time),
704                })])
705            }
706            native::PublicMessage::BookTicker(event) => {
707                let spec = self.require_native_symbol(&event.symbol)?;
708                Ok(vec![PublicLaneEvent::BookTop(FastBookTop {
709                    instrument_id: spec.instrument_id.clone(),
710                    bid_price: Price::new(parse_decimal(&event.best_bid_price)?)
711                        .quantize(spec.price_scale)?,
712                    bid_quantity: Quantity::new(parse_decimal(&event.best_bid_qty)?)
713                        .quantize(spec.qty_scale)?,
714                    ask_price: Price::new(parse_decimal(&event.best_ask_price)?)
715                        .quantize(spec.price_scale)?,
716                    ask_quantity: Quantity::new(parse_decimal(&event.best_ask_qty)?)
717                        .quantize(spec.qty_scale)?,
718                    event_time: TimestampMs::new(event.transaction_time),
719                })])
720            }
721            native::PublicMessage::Depth(event) => {
722                let spec = self.require_native_symbol(&event.symbol)?;
723                let best_bid = event.bids.first().ok_or_else(|| {
724                    MarketError::new(ErrorKind::DecodeError, "missing binance best bid")
725                })?;
726                let best_ask = event.asks.first().ok_or_else(|| {
727                    MarketError::new(ErrorKind::DecodeError, "missing binance best ask")
728                })?;
729                Ok(vec![
730                    PublicLaneEvent::BookTop(FastBookTop {
731                        instrument_id: spec.instrument_id.clone(),
732                        bid_price: Price::new(parse_decimal(&best_bid[0])?)
733                            .quantize(spec.price_scale)?,
734                        bid_quantity: Quantity::new(parse_decimal(&best_bid[1])?)
735                            .quantize(spec.qty_scale)?,
736                        ask_price: Price::new(parse_decimal(&best_ask[0])?)
737                            .quantize(spec.price_scale)?,
738                        ask_quantity: Quantity::new(parse_decimal(&best_ask[1])?)
739                            .quantize(spec.qty_scale)?,
740                        event_time: TimestampMs::new(event.event_time),
741                    }),
742                    PublicLaneEvent::OrderBookDelta(FastOrderBookDelta {
743                        instrument_id: spec.instrument_id.clone(),
744                        bids: event
745                            .bids
746                            .iter()
747                            .map(|level| {
748                                Ok((
749                                    Price::new(parse_decimal(&level[0])?)
750                                        .quantize(spec.price_scale)?,
751                                    Quantity::new(parse_decimal(&level[1])?)
752                                        .quantize(spec.qty_scale)?,
753                                ))
754                            })
755                            .collect::<Result<Vec<_>>>()?,
756                        asks: event
757                            .asks
758                            .iter()
759                            .map(|level| {
760                                Ok((
761                                    Price::new(parse_decimal(&level[0])?)
762                                        .quantize(spec.price_scale)?,
763                                    Quantity::new(parse_decimal(&level[1])?)
764                                        .quantize(spec.qty_scale)?,
765                                ))
766                            })
767                            .collect::<Result<Vec<_>>>()?,
768                        event_time: TimestampMs::new(event.event_time),
769                    }),
770                ])
771            }
772            native::PublicMessage::Kline(event) => {
773                let spec = self.require_native_symbol(&event.symbol)?;
774                Ok(vec![PublicLaneEvent::Kline(FastKline {
775                    instrument_id: spec.instrument_id.clone(),
776                    interval: event.kline.interval.into(),
777                    open: Price::new(parse_decimal(&event.kline.open)?)
778                        .quantize(spec.price_scale)?,
779                    high: Price::new(parse_decimal(&event.kline.high)?)
780                        .quantize(spec.price_scale)?,
781                    low: Price::new(parse_decimal(&event.kline.low)?).quantize(spec.price_scale)?,
782                    close: Price::new(parse_decimal(&event.kline.close)?)
783                        .quantize(spec.price_scale)?,
784                    volume: Quantity::new(parse_decimal(&event.kline.volume)?)
785                        .quantize(spec.qty_scale)?,
786                    open_time: TimestampMs::new(event.kline.open_time),
787                    close_time: TimestampMs::new(event.kline.close_time),
788                    closed: event.kline.closed,
789                })])
790            }
791            native::PublicMessage::MarkPrice(event) => {
792                let spec = self.require_native_symbol(&event.symbol)?;
793                Ok(vec![
794                    PublicLaneEvent::MarkPrice(FastMarkPrice {
795                        instrument_id: spec.instrument_id.clone(),
796                        price: Price::new(parse_decimal(&event.mark_price)?)
797                            .quantize(spec.price_scale)?,
798                        funding_rate: Some(Rate::new(parse_decimal(&event.funding_rate)?)),
799                        event_time: TimestampMs::new(event.event_time),
800                    }),
801                    PublicLaneEvent::FundingRate(FundingRate {
802                        instrument_id: spec.instrument_id.clone(),
803                        value: Rate::new(parse_decimal(&event.funding_rate)?),
804                        mark_price: Some(Price::new(parse_decimal(&event.mark_price)?)),
805                        event_time: TimestampMs::new(event.event_time),
806                    }),
807                ])
808            }
809            native::PublicMessage::ForceOrder(event) => {
810                let spec = self.require_native_symbol(&event.order.symbol)?;
811                let quantity_raw = if event.order.cumulative_filled_qty.is_empty() {
812                    &event.order.quantity
813                } else {
814                    &event.order.cumulative_filled_qty
815                };
816                let price_raw = if event.order.average_price == "0" {
817                    &event.order.price
818                } else {
819                    &event.order.average_price
820                };
821                Ok(vec![PublicLaneEvent::Liquidation(
822                    bat_markets_core::FastLiquidation {
823                        instrument_id: spec.instrument_id.clone(),
824                        side: parse_side(&event.order.side)?,
825                        price: Price::new(parse_decimal(price_raw)?).quantize(spec.price_scale)?,
826                        quantity: Quantity::new(parse_decimal(quantity_raw)?)
827                            .quantize(spec.qty_scale)?,
828                        event_time: TimestampMs::new(event.order.trade_time.max(event.event_time)),
829                    },
830                )])
831            }
832        }
833    }
834
835    fn parse_private(&self, payload: &str) -> Result<Vec<PrivateLaneEvent>> {
836        let message = self.parse_native_private(payload)?;
837        match message {
838            native::PrivateMessage::AccountUpdate(event) => {
839                let mut events = Vec::new();
840
841                for balance in event.account.balances {
842                    events.push(PrivateLaneEvent::Balance(Balance {
843                        asset: AssetCode::from(balance.asset),
844                        wallet_balance: balance_amount(&balance.wallet_balance)?,
845                        available_balance: balance_amount(&balance.cross_wallet_balance)?,
846                        updated_at: TimestampMs::new(event.transaction_time),
847                    }));
848                }
849
850                for position in event.account.positions {
851                    let spec = self.require_native_symbol(&position.symbol)?;
852                    let size = parse_decimal(&position.position_amount)?;
853                    events.push(PrivateLaneEvent::Position(Position {
854                        position_id: PositionId::from(format!(
855                            "binance:{}:{}",
856                            position.symbol, position.position_side
857                        )),
858                        instrument_id: spec.instrument_id.clone(),
859                        direction: decimal_direction(size),
860                        size: Quantity::new(size.abs()),
861                        entry_price: parse_optional_decimal(position.entry_price.as_deref())?
862                            .map(Price::new),
863                        mark_price: None,
864                        unrealized_pnl: Some(balance_amount(&position.unrealized_pnl)?),
865                        leverage: None,
866                        margin_mode: parse_margin_mode(&position.margin_type)?,
867                        position_mode: parse_position_mode(&position.position_side),
868                        updated_at: TimestampMs::new(event.event_time),
869                    }));
870                }
871
872                Ok(events)
873            }
874            native::PrivateMessage::OrderTradeUpdate(event) => {
875                let spec = self.require_native_symbol(&event.order.symbol)?;
876                let mut events = Vec::new();
877                let order_id = OrderId::from(event.order.order_id.to_string());
878                let client_order_id = Some(event.order.client_order_id.clone().into());
879                let average_fill_price =
880                    parse_optional_decimal(event.order.average_price.as_deref())?.map(Price::new);
881                let updated_at = TimestampMs::new(event.order.trade_time);
882
883                events.push(PrivateLaneEvent::Order(Order {
884                    order_id: order_id.clone(),
885                    client_order_id: client_order_id.clone(),
886                    instrument_id: spec.instrument_id.clone(),
887                    side: parse_side(&event.order.side)?,
888                    order_type: parse_order_type(&event.order.order_type)?,
889                    time_in_force: Some(parse_time_in_force(&event.order.time_in_force)?),
890                    status: parse_order_status(&event.order.order_status)?,
891                    price: Some(Price::new(parse_decimal(&event.order.price)?)),
892                    quantity: Quantity::new(parse_decimal(&event.order.original_quantity)?),
893                    filled_quantity: Quantity::new(parse_decimal(
894                        &event.order.cumulative_filled_qty,
895                    )?),
896                    average_fill_price,
897                    reduce_only: event.order.reduce_only,
898                    post_only: false,
899                    created_at: TimestampMs::new(
900                        event
901                            .order
902                            .order_trade_time
903                            .unwrap_or(event.order.trade_time),
904                    ),
905                    updated_at,
906                    venue_status: Some(event.order.execution_type.into()),
907                }));
908
909                if parse_decimal(&event.order.last_filled_qty)? > Decimal::ZERO {
910                    events.push(PrivateLaneEvent::Execution(Execution {
911                        execution_id: TradeId::from(
912                            event
913                                .order
914                                .trade_id
915                                .unwrap_or(event.order.trade_time)
916                                .to_string(),
917                        ),
918                        order_id,
919                        client_order_id,
920                        instrument_id: spec.instrument_id.clone(),
921                        side: parse_side(&event.order.side)?,
922                        quantity: Quantity::new(parse_decimal(&event.order.last_filled_qty)?),
923                        price: Price::new(parse_decimal(&event.order.last_filled_price)?),
924                        fee: parse_optional_decimal(event.order.commission.as_deref())?
925                            .map(Into::into),
926                        fee_asset: event.order.commission_asset.map(AssetCode::from),
927                        liquidity: None,
928                        executed_at: updated_at,
929                    }));
930                }
931
932                Ok(events)
933            }
934            native::PrivateMessage::TradeLite(event) => {
935                let spec = self.require_native_symbol(&event.symbol)?;
936                Ok(vec![PrivateLaneEvent::Execution(Execution {
937                    execution_id: TradeId::from(event.trade_id.to_string()),
938                    order_id: OrderId::from(event.order_id.to_string()),
939                    client_order_id: Some(event.client_order_id.into()),
940                    instrument_id: spec.instrument_id.clone(),
941                    side: parse_side(&event.side)?,
942                    quantity: Quantity::new(parse_decimal(&event.last_filled_qty)?),
943                    price: Price::new(parse_decimal(&event.last_filled_price)?),
944                    fee: None,
945                    fee_asset: None,
946                    liquidity: Some(if event.is_maker {
947                        Liquidity::Maker
948                    } else {
949                        Liquidity::Taker
950                    }),
951                    executed_at: TimestampMs::new(event.trade_time.max(event.event_time)),
952                })])
953            }
954            native::PrivateMessage::AlgoUpdate(event) => Ok(vec![PrivateLaneEvent::Order(
955                self.algo_order_from_update_event(*event)?,
956            )]),
957        }
958    }
959
960    fn classify_command(
961        &self,
962        operation: CommandOperation,
963        payload: Option<&str>,
964        request_id: Option<RequestId>,
965    ) -> Result<CommandReceipt> {
966        let Some(payload) = payload else {
967            return Ok(CommandReceipt {
968                operation,
969                status: CommandStatus::UnknownExecution,
970                venue: Venue::Binance,
971                product: Product::LinearUsdt,
972                instrument_id: None,
973                order_id: None,
974                client_order_id: None,
975                request_id,
976                message: Some("command outcome requires reconcile".into()),
977                native_code: None,
978                retriable: true,
979            });
980        };
981
982        if let Ok(value) = serde_json::from_str::<serde_json::Value>(payload)
983            && let Some(error) = value.get("error").cloned()
984            && let Ok(error) = serde_json::from_value::<native::ErrorResponse>(error)
985        {
986            return Ok(CommandReceipt {
987                operation,
988                status: CommandStatus::Rejected,
989                venue: Venue::Binance,
990                product: Product::LinearUsdt,
991                instrument_id: None,
992                order_id: None,
993                client_order_id: None,
994                request_id,
995                message: Some(error.message.into()),
996                native_code: Some(error.code.to_string().into()),
997                retriable: false,
998            });
999        }
1000
1001        if let Ok(error) = serde_json::from_str::<native::ErrorResponse>(payload) {
1002            return Ok(CommandReceipt {
1003                operation,
1004                status: CommandStatus::Rejected,
1005                venue: Venue::Binance,
1006                product: Product::LinearUsdt,
1007                instrument_id: None,
1008                order_id: None,
1009                client_order_id: None,
1010                request_id,
1011                message: Some(error.message.into()),
1012                native_code: Some(error.code.to_string().into()),
1013                retriable: false,
1014            });
1015        }
1016
1017        match operation {
1018            CommandOperation::CreateOrder
1019            | CommandOperation::AmendOrder
1020            | CommandOperation::CancelOrder
1021            | CommandOperation::ClosePosition
1022            | CommandOperation::GetOrder => {
1023                let response = parse_binance_command_identity(payload).map_err(|error| {
1024                    MarketError::new(
1025                        ErrorKind::DecodeError,
1026                        format!("failed to classify binance order response: {error}"),
1027                    )
1028                    .with_venue(Venue::Binance, Product::LinearUsdt)
1029                    .with_operation("binance.classify_command")
1030                })?;
1031                let (symbol, order_id, client_order_id) = match response {
1032                    BinanceAcceptedCommand::Order(response) => (
1033                        Some(response.symbol),
1034                        Some(OrderId::from(response.order_id.to_string())),
1035                        Some(response.client_order_id.into()),
1036                    ),
1037                    BinanceAcceptedCommand::AlgoOrder(response) => (
1038                        Some(response.symbol),
1039                        Some(binance_algo_order_id(response.algo_id)),
1040                        Some(response.client_algo_id.into()),
1041                    ),
1042                    BinanceAcceptedCommand::CancelAlgo(response) => (
1043                        None,
1044                        Some(binance_algo_order_id(response.algo_id)),
1045                        Some(response.client_algo_id.into()),
1046                    ),
1047                };
1048                let instrument_id = match symbol {
1049                    Some(symbol) => {
1050                        Some(self.require_native_symbol(&symbol)?.instrument_id.clone())
1051                    }
1052                    None => None,
1053                };
1054                Ok(CommandReceipt {
1055                    operation,
1056                    status: CommandStatus::Accepted,
1057                    venue: Venue::Binance,
1058                    product: Product::LinearUsdt,
1059                    instrument_id,
1060                    order_id,
1061                    client_order_id,
1062                    request_id,
1063                    message: Some("accepted".into()),
1064                    native_code: None,
1065                    retriable: false,
1066                })
1067            }
1068            CommandOperation::CreateOrders
1069            | CommandOperation::AmendOrders
1070            | CommandOperation::CancelOrders
1071            | CommandOperation::CancelAllOrders
1072            | CommandOperation::ValidateOrder
1073            | CommandOperation::SetPositionMode => Ok(CommandReceipt {
1074                operation,
1075                status: CommandStatus::Accepted,
1076                venue: Venue::Binance,
1077                product: Product::LinearUsdt,
1078                instrument_id: None,
1079                order_id: None,
1080                client_order_id: None,
1081                request_id,
1082                message: Some("accepted".into()),
1083                native_code: None,
1084                retriable: false,
1085            }),
1086            CommandOperation::SetLeverage => {
1087                let response = serde_json::from_str::<native::SetLeverageResponse>(payload)
1088                    .map_err(|error| {
1089                        MarketError::new(
1090                            ErrorKind::DecodeError,
1091                            format!("failed to classify binance leverage response: {error}"),
1092                        )
1093                    })?;
1094                let spec = self.require_native_symbol(&response.symbol)?;
1095                Ok(CommandReceipt {
1096                    operation,
1097                    status: CommandStatus::Accepted,
1098                    venue: Venue::Binance,
1099                    product: Product::LinearUsdt,
1100                    instrument_id: Some(spec.instrument_id.clone()),
1101                    order_id: None,
1102                    client_order_id: None,
1103                    request_id,
1104                    message: Some(format!("leverage set to {}", response.leverage).into()),
1105                    native_code: None,
1106                    retriable: false,
1107                })
1108            }
1109            CommandOperation::SetMarginMode => {
1110                let response =
1111                    serde_json::from_str::<native::SuccessResponse>(payload).map_err(|error| {
1112                        MarketError::new(
1113                            ErrorKind::DecodeError,
1114                            format!("failed to classify binance margin-mode response: {error}"),
1115                        )
1116                    })?;
1117                Ok(CommandReceipt {
1118                    operation,
1119                    status: CommandStatus::Accepted,
1120                    venue: Venue::Binance,
1121                    product: Product::LinearUsdt,
1122                    instrument_id: None,
1123                    order_id: None,
1124                    client_order_id: None,
1125                    request_id,
1126                    message: Some(response.message.into()),
1127                    native_code: response.code.map(|value| value.to_string().into()),
1128                    retriable: false,
1129                })
1130            }
1131        }
1132    }
1133}
1134
1135enum BinanceAcceptedCommand {
1136    Order(native::OrderResponse),
1137    AlgoOrder(Box<native::AlgoOrderSnapshot>),
1138    CancelAlgo(native::CancelAlgoOrderResponse),
1139}
1140
1141fn parse_binance_command_identity(
1142    payload: &str,
1143) -> std::result::Result<BinanceAcceptedCommand, serde_json::Error> {
1144    if let Ok(response) = parse_binance_order_response(payload) {
1145        return Ok(BinanceAcceptedCommand::Order(response));
1146    }
1147    if let Ok(response) = parse_binance_algo_order_response(payload) {
1148        return Ok(BinanceAcceptedCommand::AlgoOrder(Box::new(response)));
1149    }
1150    parse_binance_cancel_algo_order_response(payload).map(BinanceAcceptedCommand::CancelAlgo)
1151}
1152
1153fn parse_binance_order_response(
1154    payload: &str,
1155) -> std::result::Result<native::OrderResponse, serde_json::Error> {
1156    if let Ok(value) = serde_json::from_str::<serde_json::Value>(payload)
1157        && let Some(result) = value.get("result").cloned()
1158    {
1159        return serde_json::from_value(result);
1160    }
1161    serde_json::from_str(payload)
1162}
1163
1164fn parse_binance_algo_order_response(
1165    payload: &str,
1166) -> std::result::Result<native::AlgoOrderSnapshot, serde_json::Error> {
1167    serde_json::from_str(payload)
1168}
1169
1170fn parse_binance_cancel_algo_order_response(
1171    payload: &str,
1172) -> std::result::Result<native::CancelAlgoOrderResponse, serde_json::Error> {
1173    serde_json::from_str(payload)
1174}
1175
1176fn btc_spec() -> InstrumentSpec {
1177    instrument_spec(("BTC", "USDT", "USDT"), "BTCUSDT", 2, 3, 5, Some(125))
1178}
1179
1180fn eth_spec() -> InstrumentSpec {
1181    instrument_spec(("ETH", "USDT", "USDT"), "ETHUSDT", 2, 3, 5, Some(100))
1182}
1183
1184fn instrument_spec(
1185    assets: (&str, &str, &str),
1186    native_symbol: &str,
1187    price_scale: u32,
1188    qty_scale: u32,
1189    quote_scale: u32,
1190    max_leverage: Option<i64>,
1191) -> InstrumentSpec {
1192    let (base, quote, settle) = assets;
1193    InstrumentSpec {
1194        venue: Venue::Binance,
1195        product: Product::LinearUsdt,
1196        market_type: MarketType::LinearPerpetual,
1197        instrument_id: InstrumentId::from(canonical_symbol(base, quote, settle)),
1198        canonical_symbol: canonical_symbol(base, quote, settle).into(),
1199        native_symbol: native_symbol.into(),
1200        base: AssetCode::from(base),
1201        quote: AssetCode::from(quote),
1202        settle: AssetCode::from(settle),
1203        contract_size: Quantity::new(Decimal::ONE),
1204        tick_size: Price::new(Decimal::new(1, price_scale)),
1205        step_size: Quantity::new(Decimal::new(1, qty_scale)),
1206        min_qty: Quantity::new(Decimal::new(1, qty_scale)),
1207        min_notional: Notional::new(Decimal::new(5, quote_scale)),
1208        price_scale,
1209        qty_scale,
1210        quote_scale,
1211        max_leverage: max_leverage.map(|value| Leverage::new(Decimal::new(value, 0))),
1212        support: InstrumentSupport {
1213            public_streams: true,
1214            private_trading: true,
1215            leverage_set: true,
1216            margin_mode_set: true,
1217            funding_rate: true,
1218            open_interest: true,
1219        },
1220        status: InstrumentStatus::Active,
1221    }
1222}
1223
1224fn canonical_symbol(base: &str, quote: &str, settle: &str) -> String {
1225    format!("{base}/{quote}:{settle}")
1226}
1227
1228fn parse_decimal(raw: &str) -> Result<Decimal> {
1229    raw.parse::<Decimal>().map_err(|error| {
1230        MarketError::new(
1231            ErrorKind::DecodeError,
1232            format!("invalid decimal '{raw}': {error}"),
1233        )
1234        .with_venue(Venue::Binance, Product::LinearUsdt)
1235    })
1236}
1237
1238fn parse_optional_decimal(raw: Option<&str>) -> Result<Option<Decimal>> {
1239    raw.map(parse_decimal).transpose()
1240}
1241
1242fn parse_optional_decimal_or_empty(raw: Option<&str>) -> Result<Option<Decimal>> {
1243    raw.filter(|value| !value.is_empty())
1244        .map(parse_decimal)
1245        .transpose()
1246}
1247
1248fn parse_optional_price_or_empty(raw: Option<&str>) -> Result<Option<Price>> {
1249    parse_optional_decimal_or_empty(raw).map(|value| {
1250        value.and_then(|price| {
1251            if price.is_zero() {
1252                None
1253            } else {
1254                Some(Price::new(price))
1255            }
1256        })
1257    })
1258}
1259
1260fn parse_side(raw: &str) -> Result<Side> {
1261    match raw {
1262        "BUY" => Ok(Side::Buy),
1263        "SELL" => Ok(Side::Sell),
1264        other => Err(MarketError::new(
1265            ErrorKind::DecodeError,
1266            format!("unsupported binance side '{other}'"),
1267        )),
1268    }
1269}
1270
1271fn parse_order_type(raw: &str) -> Result<OrderType> {
1272    match raw {
1273        "MARKET" => Ok(OrderType::Market),
1274        "LIMIT" => Ok(OrderType::Limit),
1275        "STOP_MARKET" => Ok(OrderType::StopMarket),
1276        "STOP" => Ok(OrderType::StopLimit),
1277        "TAKE_PROFIT_MARKET" => Ok(OrderType::TakeProfitMarket),
1278        "TAKE_PROFIT" => Ok(OrderType::TakeProfitLimit),
1279        other => Err(MarketError::new(
1280            ErrorKind::DecodeError,
1281            format!("unsupported binance order type '{other}'"),
1282        )),
1283    }
1284}
1285
1286fn parse_time_in_force(raw: &str) -> Result<TimeInForce> {
1287    match raw {
1288        "GTC" => Ok(TimeInForce::Gtc),
1289        "IOC" => Ok(TimeInForce::Ioc),
1290        "FOK" => Ok(TimeInForce::Fok),
1291        "GTX" => Ok(TimeInForce::PostOnly),
1292        other => Err(MarketError::new(
1293            ErrorKind::DecodeError,
1294            format!("unsupported binance time in force '{other}'"),
1295        )),
1296    }
1297}
1298
1299fn parse_optional_time_in_force(raw: Option<&str>) -> Result<Option<TimeInForce>> {
1300    raw.filter(|value| !value.is_empty())
1301        .map(parse_time_in_force)
1302        .transpose()
1303}
1304
1305fn parse_order_status(raw: &str) -> Result<OrderStatus> {
1306    match raw {
1307        "NEW" => Ok(OrderStatus::New),
1308        "PARTIALLY_FILLED" => Ok(OrderStatus::PartiallyFilled),
1309        "FILLED" => Ok(OrderStatus::Filled),
1310        "CANCELED" => Ok(OrderStatus::Canceled),
1311        "REJECTED" => Ok(OrderStatus::Rejected),
1312        "EXPIRED" => Ok(OrderStatus::Expired),
1313        "PENDING_CANCEL" => Ok(OrderStatus::PendingCancel),
1314        other => Err(MarketError::new(
1315            ErrorKind::DecodeError,
1316            format!("unsupported binance order status '{other}'"),
1317        )),
1318    }
1319}
1320
1321fn parse_algo_order_status(raw: &str, filled_quantity: Decimal) -> OrderStatus {
1322    match raw {
1323        "NEW" => OrderStatus::New,
1324        "CANCELED" => OrderStatus::Canceled,
1325        "TRIGGERING" | "TRIGGERED" => {
1326            if filled_quantity > Decimal::ZERO {
1327                OrderStatus::PartiallyFilled
1328            } else {
1329                OrderStatus::New
1330            }
1331        }
1332        "FINISHED" => {
1333            if filled_quantity > Decimal::ZERO {
1334                OrderStatus::Filled
1335            } else {
1336                OrderStatus::Canceled
1337            }
1338        }
1339        "REJECTED" => OrderStatus::Rejected,
1340        "EXPIRED" => OrderStatus::Expired,
1341        _ => OrderStatus::New,
1342    }
1343}
1344
1345fn binance_algo_order_id(algo_id: i64) -> OrderId {
1346    OrderId::from(format!("binance-algo:{algo_id}"))
1347}
1348
1349fn parse_margin_mode(raw: &str) -> Result<MarginMode> {
1350    match raw {
1351        "isolated" | "ISOLATED" => Ok(MarginMode::Isolated),
1352        "cross" | "crossed" | "CROSSED" => Ok(MarginMode::Cross),
1353        other => Err(MarketError::new(
1354            ErrorKind::DecodeError,
1355            format!("unsupported binance margin type '{other}'"),
1356        )),
1357    }
1358}
1359
1360fn parse_margin_mode_snapshot(
1361    raw: Option<&str>,
1362    isolated: Option<bool>,
1363    isolated_margin: Option<&str>,
1364    isolated_wallet: Option<&str>,
1365) -> Result<MarginMode> {
1366    if let Some(raw) = raw {
1367        return parse_margin_mode(raw);
1368    }
1369
1370    if let Some(isolated) = isolated {
1371        return Ok(if isolated {
1372            MarginMode::Isolated
1373        } else {
1374            MarginMode::Cross
1375        });
1376    }
1377
1378    let isolated_margin = parse_optional_decimal(isolated_margin)?;
1379    let isolated_wallet = parse_optional_decimal(isolated_wallet)?;
1380    if isolated_margin.is_some() || isolated_wallet.is_some() {
1381        return Ok(
1382            if isolated_margin.unwrap_or_default().is_zero()
1383                && isolated_wallet.unwrap_or_default().is_zero()
1384            {
1385                MarginMode::Cross
1386            } else {
1387                MarginMode::Isolated
1388            },
1389        );
1390    }
1391
1392    Err(MarketError::new(
1393        ErrorKind::DecodeError,
1394        "missing binance margin mode in account snapshot",
1395    ))
1396}
1397
1398fn parse_position_mode(raw: &str) -> PositionMode {
1399    match raw {
1400        "LONG" | "SHORT" => PositionMode::Hedge,
1401        _ => PositionMode::OneWay,
1402    }
1403}
1404
1405fn parse_instrument_status(raw: &str) -> InstrumentStatus {
1406    match raw {
1407        "TRADING" => InstrumentStatus::Active,
1408        "SETTLING" | "CLOSE" | "PENDING_TRADING" => InstrumentStatus::Halted,
1409        _ => InstrumentStatus::Halted,
1410    }
1411}
1412
1413fn decimal_direction(value: Decimal) -> PositionDirection {
1414    if value > Decimal::ZERO {
1415        PositionDirection::Long
1416    } else if value < Decimal::ZERO {
1417        PositionDirection::Short
1418    } else {
1419        PositionDirection::Flat
1420    }
1421}
1422
1423fn balance_amount(raw: &str) -> Result<bat_markets_core::Amount> {
1424    parse_decimal(raw).map(Into::into)
1425}
1426
1427fn quantize_optional_notional(
1428    value: Decimal,
1429    scale: u32,
1430) -> Option<bat_markets_core::FastNotional> {
1431    Notional::new(value).quantize(scale).ok()
1432}
1433
1434fn decimal_scale(value: Decimal) -> u32 {
1435    value.normalize().scale()
1436}
1437
1438fn require_filter_decimal(
1439    filters: &[native::ExchangeFilter],
1440    filter_type: &str,
1441    select: impl Fn(&native::ExchangeFilter) -> Option<&str>,
1442) -> Result<Decimal> {
1443    let raw = filters
1444        .iter()
1445        .find(|filter| filter.filter_type == filter_type)
1446        .and_then(select)
1447        .ok_or_else(|| {
1448            MarketError::new(
1449                ErrorKind::DecodeError,
1450                format!("missing binance {filter_type} filter"),
1451            )
1452        })?;
1453    parse_decimal(raw)
1454}
1455
1456fn parse_binance_kline_row(
1457    spec: &InstrumentSpec,
1458    interval: KlineInterval,
1459    row: Vec<serde_json::Value>,
1460) -> Result<Kline> {
1461    if row.len() < 7 {
1462        return Err(MarketError::new(
1463            ErrorKind::DecodeError,
1464            format!(
1465                "binance kline row has {} fields, expected at least 7",
1466                row.len()
1467            ),
1468        )
1469        .with_venue(Venue::Binance, Product::LinearUsdt));
1470    }
1471
1472    let open_time = parse_i64_value(&row[0], "open_time")?;
1473    let close_time = parse_i64_value(&row[6], "close_time")?;
1474
1475    Ok(Kline {
1476        instrument_id: spec.instrument_id.clone(),
1477        interval: Box::<str>::from(interval),
1478        open: spec.price_from_fast(
1479            Price::new(parse_decimal(parse_str_value(&row[1], "open")?)?)
1480                .quantize(spec.price_scale)?,
1481        ),
1482        high: spec.price_from_fast(
1483            Price::new(parse_decimal(parse_str_value(&row[2], "high")?)?)
1484                .quantize(spec.price_scale)?,
1485        ),
1486        low: spec.price_from_fast(
1487            Price::new(parse_decimal(parse_str_value(&row[3], "low")?)?)
1488                .quantize(spec.price_scale)?,
1489        ),
1490        close: spec.price_from_fast(
1491            Price::new(parse_decimal(parse_str_value(&row[4], "close")?)?)
1492                .quantize(spec.price_scale)?,
1493        ),
1494        volume: spec.quantity_from_fast(
1495            Quantity::new(parse_decimal(parse_str_value(&row[5], "volume")?)?)
1496                .quantize(spec.qty_scale)?,
1497        ),
1498        open_time: TimestampMs::new(open_time),
1499        close_time: TimestampMs::new(close_time),
1500        closed: close_time < now_timestamp_ms(),
1501    })
1502}
1503
1504fn parse_i64_value(value: &serde_json::Value, label: &str) -> Result<i64> {
1505    match value {
1506        serde_json::Value::Number(number) => number.as_i64().ok_or_else(|| {
1507            MarketError::new(
1508                ErrorKind::DecodeError,
1509                format!("invalid numeric value for binance {label}"),
1510            )
1511            .with_venue(Venue::Binance, Product::LinearUsdt)
1512        }),
1513        serde_json::Value::String(raw) => raw.parse::<i64>().map_err(|error| {
1514            MarketError::new(
1515                ErrorKind::DecodeError,
1516                format!("invalid i64 '{raw}' for binance {label}: {error}"),
1517            )
1518            .with_venue(Venue::Binance, Product::LinearUsdt)
1519        }),
1520        other => Err(MarketError::new(
1521            ErrorKind::DecodeError,
1522            format!("unsupported binance {label} representation: {other}"),
1523        )
1524        .with_venue(Venue::Binance, Product::LinearUsdt)),
1525    }
1526}
1527
1528fn parse_str_value<'a>(value: &'a serde_json::Value, label: &str) -> Result<&'a str> {
1529    match value {
1530        serde_json::Value::String(raw) => Ok(raw),
1531        other => Err(MarketError::new(
1532            ErrorKind::DecodeError,
1533            format!("unsupported binance {label} representation: {other}"),
1534        )
1535        .with_venue(Venue::Binance, Product::LinearUsdt)),
1536    }
1537}
1538
1539fn now_timestamp_ms() -> i64 {
1540    SystemTime::now()
1541        .duration_since(UNIX_EPOCH)
1542        .map(|duration| duration.as_millis().min(i128::from(i64::MAX) as u128) as i64)
1543        .unwrap_or(0)
1544}
1545
1546#[cfg(test)]
1547mod tests {
1548    use super::BinanceLinearFuturesAdapter;
1549    use bat_markets_core::{
1550        FetchOhlcvRequest, FetchTradesRequest, InstrumentId, OrderStatus, TimestampMs, VenueAdapter,
1551    };
1552
1553    const USER_TRADES: &str = include_str!(concat!(
1554        env!("CARGO_MANIFEST_DIR"),
1555        "/../../fixtures/binance/user_trades.json"
1556    ));
1557    const ORDER_HISTORY: &str = include_str!(concat!(
1558        env!("CARGO_MANIFEST_DIR"),
1559        "/../../fixtures/binance/order_history.json"
1560    ));
1561
1562    #[test]
1563    fn parse_binance_execution_history_snapshot() {
1564        let adapter = BinanceLinearFuturesAdapter::new();
1565        let executions = adapter
1566            .parse_executions_snapshot(USER_TRADES)
1567            .expect("binance user trades fixture should parse");
1568        assert_eq!(executions.len(), 1);
1569        assert_eq!(executions[0].execution_id.to_string(), "880001");
1570    }
1571
1572    #[test]
1573    fn parse_binance_order_history_snapshot() {
1574        let adapter = BinanceLinearFuturesAdapter::new();
1575        let orders = adapter
1576            .parse_order_history_snapshot(ORDER_HISTORY, TimestampMs::new(1))
1577            .expect("binance order history fixture should parse");
1578        assert_eq!(orders.len(), 1);
1579        assert_eq!(orders[0].status, OrderStatus::Filled);
1580    }
1581
1582    #[test]
1583    fn parse_binance_rest_ticker_snapshot() {
1584        let adapter = BinanceLinearFuturesAdapter::new();
1585        let ticker = adapter
1586            .parse_ticker_snapshot(
1587                r#"{
1588                    "symbol":"BTCUSDT",
1589                    "lastPrice":"70100.50",
1590                    "volume":"1234.567",
1591                    "quoteVolume":"86500000.12",
1592                    "closeTime":1710000000000
1593                }"#,
1594                &InstrumentId::from("BTC/USDT:USDT"),
1595            )
1596            .expect("binance rest ticker should parse");
1597        assert_eq!(ticker.last_price.to_string(), "70100.50");
1598    }
1599
1600    #[test]
1601    fn parse_binance_rest_trades_snapshot() {
1602        let adapter = BinanceLinearFuturesAdapter::new();
1603        let trades = adapter
1604            .parse_trades_snapshot(
1605                r#"[{"a":1,"p":"70100.10","q":"0.500","T":1710000000001,"m":true}]"#,
1606                &FetchTradesRequest::new(InstrumentId::from("BTC/USDT:USDT"), Some(1)),
1607            )
1608            .expect("binance rest trades should parse");
1609        assert_eq!(trades.len(), 1);
1610        assert_eq!(trades[0].price.to_string(), "70100.10");
1611    }
1612
1613    #[test]
1614    fn parse_binance_rest_book_top_snapshot() {
1615        let adapter = BinanceLinearFuturesAdapter::new();
1616        let book_top = adapter
1617            .parse_book_top_snapshot(
1618                r#"{
1619                    "symbol":"BTCUSDT",
1620                    "bidPrice":"70100.90",
1621                    "bidQty":"1.250",
1622                    "askPrice":"70101.10",
1623                    "askQty":"0.900",
1624                    "time":1710000000200
1625                }"#,
1626                &InstrumentId::from("BTC/USDT:USDT"),
1627            )
1628            .expect("binance rest book top should parse");
1629        assert_eq!(book_top.bid.price.to_string(), "70100.90");
1630        assert_eq!(book_top.ask.price.to_string(), "70101.10");
1631    }
1632
1633    #[test]
1634    fn parse_binance_ticker_drops_unrepresentable_turnover_instead_of_failing() {
1635        let adapter = BinanceLinearFuturesAdapter::new();
1636        let events = adapter
1637            .parse_public(
1638                r#"{
1639                    "e":"24hrTicker",
1640                    "E":1710000000000,
1641                    "s":"BTCUSDT",
1642                    "c":"64000.10",
1643                    "v":"12345.678",
1644                    "q":"100000000000000000000.00"
1645                }"#,
1646            )
1647            .expect("binance ticker with large quote turnover should still parse");
1648
1649        let ticker = match &events[0] {
1650            bat_markets_core::PublicLaneEvent::Ticker(ticker) => ticker,
1651            other => panic!("expected ticker event, got {other:?}"),
1652        };
1653        assert!(ticker.turnover_24h.is_none());
1654    }
1655
1656    #[test]
1657    fn parse_binance_private_order_update_without_order_create_time() {
1658        let adapter = BinanceLinearFuturesAdapter::new();
1659        let events = adapter
1660            .parse_private(
1661                r#"{
1662                    "e":"ORDER_TRADE_UPDATE",
1663                    "o":{
1664                        "s":"BTCUSDT",
1665                        "c":"codex-demo",
1666                        "S":"BUY",
1667                        "o":"LIMIT",
1668                        "f":"GTC",
1669                        "q":"0.002",
1670                        "p":"64000.10",
1671                        "ap":"0",
1672                        "x":"CANCELED",
1673                        "X":"CANCELED",
1674                        "i":123456,
1675                        "l":"0",
1676                        "z":"0",
1677                        "L":"0",
1678                        "n":null,
1679                        "N":null,
1680                        "T":1710000001234,
1681                        "t":null,
1682                        "R":false
1683                    }
1684                }"#,
1685            )
1686            .expect("private order update without O should parse");
1687
1688        let order = match &events[0] {
1689            bat_markets_core::PrivateLaneEvent::Order(order) => order,
1690            other => panic!("expected order event, got {other:?}"),
1691        };
1692        assert_eq!(order.created_at, TimestampMs::new(1710000001234));
1693        assert_eq!(order.updated_at, TimestampMs::new(1710000001234));
1694    }
1695
1696    #[test]
1697    fn parse_binance_private_trade_lite_execution() {
1698        let adapter = BinanceLinearFuturesAdapter::new();
1699        let events = adapter
1700            .parse_private(
1701                r#"{
1702                    "e":"TRADE_LITE",
1703                    "E":1776795392416,
1704                    "T":1776795392415,
1705                    "s":"BTCUSDT",
1706                    "q":"0.001",
1707                    "p":"0.000000",
1708                    "m":false,
1709                    "c":"bx-open-1776795391188",
1710                    "S":"BUY",
1711                    "L":"70050.10",
1712                    "l":"0.001",
1713                    "t":3317622935,
1714                    "i":96593497380
1715                }"#,
1716            )
1717            .expect("trade lite should parse");
1718
1719        assert_eq!(events.len(), 1);
1720        let bat_markets_core::PrivateLaneEvent::Execution(execution) = &events[0] else {
1721            panic!("expected execution event from TRADE_LITE");
1722        };
1723        assert_eq!(execution.instrument_id.as_ref(), "BTC/USDT:USDT");
1724        assert_eq!(
1725            execution.client_order_id.as_ref().map(ToString::to_string),
1726            Some("bx-open-1776795391188".to_owned())
1727        );
1728        assert_eq!(execution.quantity.value().to_string(), "0.001");
1729        assert_eq!(execution.price.value().to_string(), "70050.10");
1730        assert_eq!(
1731            execution.liquidity,
1732            Some(bat_markets_core::Liquidity::Taker)
1733        );
1734    }
1735
1736    #[test]
1737    fn parse_binance_account_snapshot_tolerates_missing_optional_position_fields() {
1738        let adapter = BinanceLinearFuturesAdapter::new();
1739        let (account, positions) = adapter
1740            .parse_account_snapshot(
1741                r#"{
1742                    "totalWalletBalance":"5000.0",
1743                    "availableBalance":"5000.0",
1744                    "totalUnrealizedProfit":"0.0",
1745                    "assets":[
1746                        {
1747                            "asset":"USDT",
1748                            "walletBalance":"5000.0",
1749                            "availableBalance":"5000.0"
1750                        }
1751                    ],
1752                    "positions":[
1753                        {
1754                            "symbol":"BTCUSDT",
1755                            "positionAmt":"0.0",
1756                            "positionSide":"BOTH"
1757                        },
1758                        {
1759                            "symbol":"BTCUSDT",
1760                            "positionAmt":"0.001",
1761                            "unrealizedProfit":"0.0",
1762                            "isolatedMargin":"0",
1763                            "isolatedWallet":"0",
1764                            "positionSide":"BOTH"
1765                        }
1766                    ]
1767                }"#,
1768                TimestampMs::new(42),
1769            )
1770            .expect("account snapshot with sparse position fields should still parse");
1771
1772        assert_eq!(account.balances.len(), 1);
1773        assert_eq!(positions.len(), 1);
1774        assert!(positions[0].entry_price.is_none());
1775        assert!(positions[0].leverage.is_none());
1776        assert_eq!(
1777            positions[0].margin_mode,
1778            bat_markets_core::MarginMode::Cross
1779        );
1780    }
1781
1782    #[test]
1783    fn parse_binance_ohlcv_snapshot() {
1784        let adapter = BinanceLinearFuturesAdapter::new();
1785        let klines = adapter
1786            .parse_ohlcv_snapshot(
1787                r#"[
1788                    [1710000000000,"64000.1","64100.0","63950.0","64050.0","12.345","1710000059999","0","0","0","0","0"],
1789                    [1710000060000,"64050.0","64150.0","64000.0","64100.0","23.456","1710000119999","0","0","0","0","0"]
1790                ]"#,
1791                &FetchOhlcvRequest::for_instrument(
1792                    InstrumentId::from("BTC/USDT:USDT"),
1793                    "1m",
1794                    None,
1795                    None,
1796                    Some(2),
1797                ),
1798            )
1799            .expect("binance klines snapshot should parse");
1800
1801        assert_eq!(klines.len(), 2);
1802        assert_eq!(klines[0].interval.as_ref(), "1m");
1803        assert_eq!(klines[0].open.to_string(), "64000.10");
1804        assert_eq!(klines[1].close.to_string(), "64100.00");
1805    }
1806}