1pub mod native;
4
5use std::{
6 sync::Arc,
7 time::{SystemTime, UNIX_EPOCH},
8};
9
10use parking_lot::RwLock;
11use rust_decimal::Decimal;
12
13use bat_markets_core::{
14 AccountSnapshot, AggressorSide, AssetCode, Balance, BatMarketsConfig, CapabilitySet,
15 CommandOperation, CommandReceipt, CommandStatus, ErrorKind, Execution, FastBookTop, FastKline,
16 FastMarkPrice, FastOrderBookDelta, FastTicker, FastTrade, FetchOhlcvRequest,
17 FetchTradesRequest, FundingRate, InstrumentCatalog, InstrumentId, InstrumentSpec,
18 InstrumentStatus, InstrumentSupport, Kline, KlineInterval, Leverage, Liquidity, MarginMode,
19 MarketError, MarketType, Notional, OpenInterest, Order, OrderId, OrderStatus, OrderType,
20 Position, PositionDirection, PositionId, PositionMode, Price, PrivateLaneEvent, Product,
21 PublicLaneEvent, Quantity, Rate, RequestId, Result, Side, Ticker, TimeInForce, TimestampMs,
22 TradeId, Venue, VenueAdapter,
23};
24
25#[derive(Clone, Debug)]
27pub struct BinanceLinearFuturesAdapter {
28 config: BatMarketsConfig,
29 capabilities: CapabilitySet,
30 lane_set: bat_markets_core::LaneSet,
31 instruments: Arc<RwLock<InstrumentCatalog>>,
32}
33
34impl Default for BinanceLinearFuturesAdapter {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl BinanceLinearFuturesAdapter {
41 #[must_use]
42 pub fn new() -> Self {
43 Self::with_config(BatMarketsConfig::new(Venue::Binance, Product::LinearUsdt))
44 }
45
46 #[must_use]
47 pub fn with_config(config: BatMarketsConfig) -> Self {
48 Self {
49 config,
50 capabilities: CapabilitySet::linear_futures_defaults(),
51 lane_set: bat_markets_core::LaneSet::linear_futures_defaults(),
52 instruments: Arc::new(RwLock::new(InstrumentCatalog::new([
53 btc_spec(),
54 eth_spec(),
55 ]))),
56 }
57 }
58
59 pub fn replace_instruments(&self, instruments: Vec<InstrumentSpec>) {
60 self.instruments.write().replace(instruments);
61 }
62
63 pub fn parse_native_public(&self, payload: &str) -> Result<native::PublicMessage> {
64 serde_json::from_str(payload).map_err(|error| {
65 MarketError::new(
66 ErrorKind::DecodeError,
67 format!("failed to parse binance public payload: {error}"),
68 )
69 .with_venue(Venue::Binance, Product::LinearUsdt)
70 .with_operation("binance.parse_native_public")
71 })
72 }
73
74 pub fn parse_native_private(&self, payload: &str) -> Result<native::PrivateMessage> {
75 serde_json::from_str(payload).map_err(|error| {
76 MarketError::new(
77 ErrorKind::DecodeError,
78 format!("failed to parse binance private payload: {error}"),
79 )
80 .with_venue(Venue::Binance, Product::LinearUsdt)
81 .with_operation("binance.parse_native_private")
82 })
83 }
84
85 pub fn parse_server_time(&self, payload: &str) -> Result<TimestampMs> {
86 let response =
87 serde_json::from_str::<native::ServerTimeResponse>(payload).map_err(|error| {
88 MarketError::new(
89 ErrorKind::DecodeError,
90 format!("failed to parse binance server-time response: {error}"),
91 )
92 })?;
93 Ok(TimestampMs::new(response.server_time))
94 }
95
96 pub fn parse_metadata_snapshot(&self, payload: &str) -> Result<Vec<InstrumentSpec>> {
97 let response =
98 serde_json::from_str::<native::ExchangeInfoResponse>(payload).map_err(|error| {
99 MarketError::new(
100 ErrorKind::DecodeError,
101 format!("failed to parse binance exchangeInfo response: {error}"),
102 )
103 .with_venue(Venue::Binance, Product::LinearUsdt)
104 .with_operation("binance.parse_metadata_snapshot")
105 })?;
106
107 let mut instruments = Vec::new();
108 for symbol in response.symbols {
109 if symbol.contract_type != "PERPETUAL" || symbol.quote_asset != "USDT" {
110 continue;
111 }
112
113 let tick_size = require_filter_decimal(&symbol.filters, "PRICE_FILTER", |filter| {
114 filter.tick_size.as_deref()
115 })?;
116 let step_size = require_filter_decimal(&symbol.filters, "LOT_SIZE", |filter| {
117 filter.step_size.as_deref()
118 })?;
119 let min_qty = require_filter_decimal(&symbol.filters, "LOT_SIZE", |filter| {
120 filter.min_qty.as_deref()
121 })?;
122 let min_notional = require_filter_decimal(&symbol.filters, "MIN_NOTIONAL", |filter| {
123 filter.notional.as_deref()
124 })?;
125
126 let price_scale = decimal_scale(tick_size);
127 let qty_scale = decimal_scale(step_size);
128 let quote_scale = symbol
129 .quote_precision
130 .max(price_scale.saturating_add(qty_scale))
131 .max(decimal_scale(min_notional));
132
133 instruments.push(InstrumentSpec {
134 venue: Venue::Binance,
135 product: Product::LinearUsdt,
136 market_type: MarketType::LinearPerpetual,
137 instrument_id: InstrumentId::from(canonical_symbol(
138 &symbol.base_asset,
139 &symbol.quote_asset,
140 &symbol.margin_asset,
141 )),
142 canonical_symbol: canonical_symbol(
143 &symbol.base_asset,
144 &symbol.quote_asset,
145 &symbol.margin_asset,
146 )
147 .into(),
148 native_symbol: symbol.symbol.into(),
149 base: AssetCode::from(symbol.base_asset),
150 quote: AssetCode::from(symbol.quote_asset),
151 settle: AssetCode::from(symbol.margin_asset),
152 contract_size: Quantity::new(Decimal::ONE),
153 tick_size: Price::new(tick_size),
154 step_size: Quantity::new(step_size),
155 min_qty: Quantity::new(min_qty),
156 min_notional: Notional::new(min_notional),
157 price_scale,
158 qty_scale,
159 quote_scale,
160 max_leverage: None,
161 support: InstrumentSupport {
162 public_streams: true,
163 private_trading: true,
164 leverage_set: true,
165 margin_mode_set: true,
166 funding_rate: true,
167 open_interest: true,
168 },
169 status: parse_instrument_status(&symbol.status),
170 });
171 }
172
173 Ok(instruments)
174 }
175
176 pub fn parse_account_snapshot(
177 &self,
178 payload: &str,
179 observed_at: TimestampMs,
180 ) -> Result<(AccountSnapshot, Vec<Position>)> {
181 let response =
182 serde_json::from_str::<native::AccountInfoResponse>(payload).map_err(|error| {
183 MarketError::new(
184 ErrorKind::DecodeError,
185 format!("failed to parse binance account snapshot: {error}"),
186 )
187 .with_venue(Venue::Binance, Product::LinearUsdt)
188 .with_operation("binance.parse_account_snapshot")
189 })?;
190
191 let balances = response
192 .assets
193 .into_iter()
194 .map(|asset| {
195 Ok(Balance {
196 asset: AssetCode::from(asset.asset),
197 wallet_balance: balance_amount(&asset.wallet_balance)?,
198 available_balance: balance_amount(&asset.available_balance)?,
199 updated_at: observed_at,
200 })
201 })
202 .collect::<Result<Vec<_>>>()?;
203
204 let positions = response
205 .positions
206 .into_iter()
207 .filter_map(|position| match parse_decimal(&position.position_amount) {
208 Ok(size) if size.is_zero() => None,
209 Ok(size) => Some(self.position_from_account_snapshot(position, observed_at, size)),
210 Err(error) => Some(Err(error)),
211 })
212 .collect::<Result<Vec<_>>>()?;
213
214 let account = AccountSnapshot {
215 balances,
216 summary: Some(bat_markets_core::AccountSummary {
217 total_wallet_balance: balance_amount(&response.total_wallet_balance)?,
218 total_available_balance: balance_amount(&response.available_balance)?,
219 total_unrealized_pnl: balance_amount(&response.total_unrealized_profit)?,
220 updated_at: observed_at,
221 }),
222 };
223
224 Ok((account, positions))
225 }
226
227 pub fn parse_open_orders_snapshot(
228 &self,
229 payload: &str,
230 observed_at: TimestampMs,
231 ) -> Result<Vec<Order>> {
232 let snapshots =
233 serde_json::from_str::<Vec<native::OrderSnapshot>>(payload).map_err(|error| {
234 MarketError::new(
235 ErrorKind::DecodeError,
236 format!("failed to parse binance open-orders snapshot: {error}"),
237 )
238 .with_venue(Venue::Binance, Product::LinearUsdt)
239 .with_operation("binance.parse_open_orders_snapshot")
240 })?;
241
242 snapshots
243 .into_iter()
244 .map(|snapshot| self.order_from_snapshot(snapshot, observed_at))
245 .collect()
246 }
247
248 pub fn parse_open_algo_orders_snapshot(
249 &self,
250 payload: &str,
251 _observed_at: TimestampMs,
252 ) -> Result<Vec<Order>> {
253 let snapshots =
254 serde_json::from_str::<Vec<native::AlgoOrderSnapshot>>(payload).map_err(|error| {
255 MarketError::new(
256 ErrorKind::DecodeError,
257 format!("failed to parse binance open-algo-orders snapshot: {error}"),
258 )
259 .with_venue(Venue::Binance, Product::LinearUsdt)
260 .with_operation("binance.parse_open_algo_orders_snapshot")
261 })?;
262
263 snapshots
264 .into_iter()
265 .map(|snapshot| self.algo_order_from_snapshot(snapshot))
266 .collect()
267 }
268
269 pub fn parse_order_snapshot(&self, payload: &str, observed_at: TimestampMs) -> Result<Order> {
270 if let Ok(snapshot) = serde_json::from_str::<native::OrderSnapshot>(payload) {
271 return self.order_from_snapshot(snapshot, observed_at);
272 }
273 let snapshot =
274 serde_json::from_str::<native::AlgoOrderSnapshot>(payload).map_err(|error| {
275 MarketError::new(
276 ErrorKind::DecodeError,
277 format!("failed to parse binance order snapshot: {error}"),
278 )
279 .with_venue(Venue::Binance, Product::LinearUsdt)
280 .with_operation("binance.parse_order_snapshot")
281 })?;
282 self.algo_order_from_snapshot(snapshot)
283 }
284
285 pub fn parse_order_history_snapshot(
286 &self,
287 payload: &str,
288 observed_at: TimestampMs,
289 ) -> Result<Vec<Order>> {
290 self.parse_open_orders_snapshot(payload, observed_at)
291 }
292
293 pub fn parse_executions_snapshot(&self, payload: &str) -> Result<Vec<Execution>> {
294 let snapshots =
295 serde_json::from_str::<Vec<native::UserTradeSnapshot>>(payload).map_err(|error| {
296 MarketError::new(
297 ErrorKind::DecodeError,
298 format!("failed to parse binance user-trades snapshot: {error}"),
299 )
300 .with_venue(Venue::Binance, Product::LinearUsdt)
301 .with_operation("binance.parse_executions_snapshot")
302 })?;
303
304 snapshots
305 .into_iter()
306 .map(|snapshot| self.execution_from_snapshot(snapshot))
307 .collect()
308 }
309
310 pub fn parse_ticker_snapshot(
311 &self,
312 payload: &str,
313 instrument_id: &InstrumentId,
314 ) -> Result<Ticker> {
315 let snapshot =
316 serde_json::from_str::<native::TickerSnapshot>(payload).map_err(|error| {
317 MarketError::new(
318 ErrorKind::DecodeError,
319 format!("failed to parse binance ticker snapshot: {error}"),
320 )
321 .with_venue(Venue::Binance, Product::LinearUsdt)
322 .with_operation("binance.parse_ticker_snapshot")
323 })?;
324 let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
325 MarketError::new(
326 ErrorKind::Unsupported,
327 format!("unsupported binance instrument '{}'", instrument_id),
328 )
329 .with_venue(Venue::Binance, Product::LinearUsdt)
330 })?;
331
332 Ok(FastTicker {
333 instrument_id: spec.instrument_id.clone(),
334 last_price: Price::new(parse_decimal(&snapshot.last_price)?)
335 .quantize(spec.price_scale)?,
336 mark_price: None,
337 index_price: None,
338 volume_24h: Some(
339 Quantity::new(parse_decimal(&snapshot.volume)?).quantize(spec.qty_scale)?,
340 ),
341 turnover_24h: quantize_optional_notional(
342 parse_decimal(&snapshot.quote_volume)?,
343 spec.quote_scale,
344 ),
345 event_time: TimestampMs::new(snapshot.close_time),
346 }
347 .to_unified(&spec))
348 }
349
350 pub fn parse_trades_snapshot(
351 &self,
352 payload: &str,
353 request: &FetchTradesRequest,
354 ) -> Result<Vec<bat_markets_core::TradeTick>> {
355 let spec = self
356 .resolve_instrument(&request.instrument_id)
357 .ok_or_else(|| {
358 MarketError::new(
359 ErrorKind::Unsupported,
360 format!("unsupported binance instrument '{}'", request.instrument_id),
361 )
362 .with_venue(Venue::Binance, Product::LinearUsdt)
363 })?;
364 let snapshots =
365 serde_json::from_str::<Vec<native::AggTradeSnapshot>>(payload).map_err(|error| {
366 MarketError::new(
367 ErrorKind::DecodeError,
368 format!("failed to parse binance trades snapshot: {error}"),
369 )
370 .with_venue(Venue::Binance, Product::LinearUsdt)
371 .with_operation("binance.parse_trades_snapshot")
372 })?;
373
374 snapshots
375 .into_iter()
376 .map(|snapshot| {
377 Ok(FastTrade {
378 instrument_id: spec.instrument_id.clone(),
379 trade_id: TradeId::from(snapshot.agg_trade_id.to_string()),
380 price: Price::new(parse_decimal(&snapshot.price)?)
381 .quantize(spec.price_scale)?,
382 quantity: Quantity::new(parse_decimal(&snapshot.quantity)?)
383 .quantize(spec.qty_scale)?,
384 aggressor_side: if snapshot.is_buyer_maker {
385 AggressorSide::Seller
386 } else {
387 AggressorSide::Buyer
388 },
389 event_time: TimestampMs::new(snapshot.trade_time),
390 }
391 .to_unified(&spec))
392 })
393 .collect()
394 }
395
396 pub fn parse_book_top_snapshot(
397 &self,
398 payload: &str,
399 instrument_id: &InstrumentId,
400 ) -> Result<bat_markets_core::BookTop> {
401 let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
402 MarketError::new(
403 ErrorKind::Unsupported,
404 format!("unsupported binance instrument '{}'", instrument_id),
405 )
406 .with_venue(Venue::Binance, Product::LinearUsdt)
407 })?;
408 let snapshot =
409 serde_json::from_str::<native::BookTickerSnapshot>(payload).map_err(|error| {
410 MarketError::new(
411 ErrorKind::DecodeError,
412 format!("failed to parse binance book-ticker snapshot: {error}"),
413 )
414 .with_venue(Venue::Binance, Product::LinearUsdt)
415 .with_operation("binance.parse_book_top_snapshot")
416 })?;
417
418 Ok(FastBookTop {
419 instrument_id: spec.instrument_id.clone(),
420 bid_price: Price::new(parse_decimal(&snapshot.bid_price)?)
421 .quantize(spec.price_scale)?,
422 bid_quantity: Quantity::new(parse_decimal(&snapshot.bid_qty)?)
423 .quantize(spec.qty_scale)?,
424 ask_price: Price::new(parse_decimal(&snapshot.ask_price)?)
425 .quantize(spec.price_scale)?,
426 ask_quantity: Quantity::new(parse_decimal(&snapshot.ask_qty)?)
427 .quantize(spec.qty_scale)?,
428 event_time: TimestampMs::new(snapshot.time),
429 }
430 .to_unified(&spec))
431 }
432
433 pub fn parse_ohlcv_snapshot(
434 &self,
435 payload: &str,
436 request: &FetchOhlcvRequest,
437 ) -> Result<Vec<Kline>> {
438 let interval = KlineInterval::parse(request.interval.as_ref()).ok_or_else(|| {
439 MarketError::new(
440 ErrorKind::Unsupported,
441 format!("unsupported binance OHLCV interval '{}'", request.interval),
442 )
443 .with_venue(Venue::Binance, Product::LinearUsdt)
444 })?;
445 let instrument_id = request.single_instrument_id()?;
446 let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
447 MarketError::new(
448 ErrorKind::Unsupported,
449 format!("unsupported binance instrument '{}'", instrument_id),
450 )
451 .with_venue(Venue::Binance, Product::LinearUsdt)
452 })?;
453 let rows =
454 serde_json::from_str::<Vec<Vec<serde_json::Value>>>(payload).map_err(|error| {
455 MarketError::new(
456 ErrorKind::DecodeError,
457 format!("failed to parse binance klines snapshot: {error}"),
458 )
459 .with_venue(Venue::Binance, Product::LinearUsdt)
460 .with_operation("binance.parse_ohlcv_snapshot")
461 })?;
462
463 let mut klines = rows
464 .into_iter()
465 .map(|row| parse_binance_kline_row(&spec, interval, row))
466 .collect::<Result<Vec<_>>>()?;
467 klines.sort_by_key(|kline| kline.open_time.value());
468 Ok(klines)
469 }
470
471 fn position_from_account_snapshot(
472 &self,
473 position: native::AccountPositionSnapshot,
474 observed_at: TimestampMs,
475 size: Decimal,
476 ) -> Result<Position> {
477 let spec = self.require_native_symbol(&position.symbol)?;
478 Ok(Position {
479 position_id: PositionId::from(format!(
480 "binance:{}:{}",
481 position.symbol, position.position_side
482 )),
483 instrument_id: spec.instrument_id.clone(),
484 direction: decimal_direction(size),
485 size: Quantity::new(size.abs()),
486 entry_price: parse_optional_decimal(position.entry_price.as_deref())?.map(Price::new),
487 mark_price: None,
488 unrealized_pnl: position
489 .unrealized_profit
490 .as_deref()
491 .map(balance_amount)
492 .transpose()?,
493 leverage: parse_optional_decimal(position.leverage.as_deref())?.map(Leverage::new),
494 margin_mode: parse_margin_mode_snapshot(
495 position.margin_type.as_deref(),
496 position.isolated,
497 position.isolated_margin.as_deref(),
498 position.isolated_wallet.as_deref(),
499 )?,
500 position_mode: parse_position_mode(&position.position_side),
501 updated_at: observed_at,
502 })
503 }
504
505 fn order_from_snapshot(
506 &self,
507 snapshot: native::OrderSnapshot,
508 observed_at: TimestampMs,
509 ) -> Result<Order> {
510 let spec = self.require_native_symbol(&snapshot.symbol)?;
511 let average_fill_price = if snapshot.average_price == "0" {
512 None
513 } else {
514 Some(Price::new(parse_decimal(&snapshot.average_price)?))
515 };
516 Ok(Order {
517 order_id: OrderId::from(snapshot.order_id.to_string()),
518 client_order_id: Some(snapshot.client_order_id.into()),
519 instrument_id: spec.instrument_id.clone(),
520 side: parse_side(&snapshot.side)?,
521 order_type: parse_order_type(&snapshot.order_type)?,
522 time_in_force: Some(parse_time_in_force(&snapshot.time_in_force)?),
523 status: parse_order_status(&snapshot.status)?,
524 price: Some(Price::new(parse_decimal(&snapshot.price)?)),
525 quantity: Quantity::new(parse_decimal(&snapshot.original_quantity)?),
526 filled_quantity: Quantity::new(parse_decimal(&snapshot.executed_quantity)?),
527 average_fill_price,
528 reduce_only: snapshot.reduce_only,
529 post_only: matches!(snapshot.time_in_force.as_str(), "GTX"),
530 created_at: snapshot
531 .created_time
532 .map(TimestampMs::new)
533 .unwrap_or(observed_at),
534 updated_at: TimestampMs::new(snapshot.update_time),
535 venue_status: Some(snapshot.status.into()),
536 })
537 }
538
539 fn algo_order_from_snapshot(&self, snapshot: native::AlgoOrderSnapshot) -> Result<Order> {
540 let spec = self.require_native_symbol(&snapshot.symbol)?;
541 Ok(Order {
542 order_id: binance_algo_order_id(snapshot.algo_id),
543 client_order_id: Some(snapshot.client_algo_id.into()),
544 instrument_id: spec.instrument_id.clone(),
545 side: parse_side(&snapshot.side)?,
546 order_type: parse_order_type(&snapshot.order_type)?,
547 time_in_force: parse_optional_time_in_force(snapshot.time_in_force.as_deref())?,
548 status: parse_algo_order_status(&snapshot.algo_status, Decimal::ZERO),
549 price: parse_optional_price_or_empty(snapshot.price.as_deref())?,
550 quantity: Quantity::new(parse_decimal(&snapshot.quantity)?),
551 filled_quantity: Quantity::new(Decimal::ZERO),
552 average_fill_price: parse_optional_price_or_empty(snapshot.actual_price.as_deref())?,
553 reduce_only: snapshot.reduce_only.unwrap_or(false),
554 post_only: matches!(snapshot.time_in_force.as_deref(), Some("GTX")),
555 created_at: TimestampMs::new(snapshot.create_time),
556 updated_at: TimestampMs::new(snapshot.update_time),
557 venue_status: Some(snapshot.algo_status.into()),
558 })
559 }
560
561 fn algo_order_from_update_event(&self, event: native::AlgoOrderUpdateEvent) -> Result<Order> {
562 let spec = self.require_native_symbol(&event.order.symbol)?;
563 let filled_quantity =
564 parse_optional_decimal_or_empty(event.order.executed_quantity.as_deref())?
565 .unwrap_or(Decimal::ZERO);
566 let created_at = event
567 .order
568 .trigger_time
569 .filter(|time| *time > 0)
570 .unwrap_or(event.transaction_time);
571 Ok(Order {
572 order_id: binance_algo_order_id(event.order.algo_id),
573 client_order_id: Some(event.order.client_algo_id.into()),
574 instrument_id: spec.instrument_id.clone(),
575 side: parse_side(&event.order.side)?,
576 order_type: parse_order_type(&event.order.order_type)?,
577 time_in_force: parse_optional_time_in_force(event.order.time_in_force.as_deref())?,
578 status: parse_algo_order_status(&event.order.algo_status, filled_quantity),
579 price: parse_optional_price_or_empty(event.order.price.as_deref())?,
580 quantity: Quantity::new(parse_decimal(&event.order.quantity)?),
581 filled_quantity: Quantity::new(filled_quantity),
582 average_fill_price: parse_optional_price_or_empty(
583 event.order.average_price.as_deref(),
584 )?,
585 reduce_only: event.order.reduce_only,
586 post_only: matches!(event.order.time_in_force.as_deref(), Some("GTX")),
587 created_at: TimestampMs::new(created_at),
588 updated_at: TimestampMs::new(event.event_time.max(event.transaction_time)),
589 venue_status: Some(event.order.algo_status.into()),
590 })
591 }
592
593 fn execution_from_snapshot(&self, snapshot: native::UserTradeSnapshot) -> Result<Execution> {
594 let spec = self.require_native_symbol(&snapshot.symbol)?;
595 Ok(Execution {
596 execution_id: TradeId::from(snapshot.id.to_string()),
597 order_id: OrderId::from(snapshot.order_id.to_string()),
598 client_order_id: None,
599 instrument_id: spec.instrument_id.clone(),
600 side: parse_side(&snapshot.side)?,
601 quantity: Quantity::new(parse_decimal(&snapshot.qty)?),
602 price: Price::new(parse_decimal(&snapshot.price)?),
603 fee: Some(balance_amount(&snapshot.commission)?),
604 fee_asset: Some(AssetCode::from(snapshot.commission_asset)),
605 liquidity: Some(if snapshot.maker {
606 bat_markets_core::Liquidity::Maker
607 } else {
608 bat_markets_core::Liquidity::Taker
609 }),
610 executed_at: TimestampMs::new(snapshot.time),
611 })
612 }
613
614 fn require_native_symbol(&self, native_symbol: &str) -> Result<InstrumentSpec> {
615 self.resolve_native_symbol(native_symbol).ok_or_else(|| {
616 MarketError::new(
617 ErrorKind::Unsupported,
618 format!("unsupported binance symbol '{native_symbol}'"),
619 )
620 .with_venue(Venue::Binance, Product::LinearUsdt)
621 })
622 }
623}
624
625impl VenueAdapter for BinanceLinearFuturesAdapter {
626 fn venue(&self) -> Venue {
627 Venue::Binance
628 }
629
630 fn product(&self) -> Product {
631 Product::LinearUsdt
632 }
633
634 fn config(&self) -> &BatMarketsConfig {
635 &self.config
636 }
637
638 fn capabilities(&self) -> CapabilitySet {
639 self.capabilities
640 }
641
642 fn lane_set(&self) -> bat_markets_core::LaneSet {
643 self.lane_set
644 }
645
646 fn instrument_specs(&self) -> Vec<InstrumentSpec> {
647 self.instruments.read().all()
648 }
649
650 fn resolve_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentSpec> {
651 self.instruments.read().get(instrument_id)
652 }
653
654 fn resolve_native_symbol(&self, native_symbol: &str) -> Option<InstrumentSpec> {
655 self.instruments.read().by_native_symbol(native_symbol)
656 }
657
658 fn parse_public(&self, payload: &str) -> Result<Vec<PublicLaneEvent>> {
659 if let Ok(snapshot) = serde_json::from_str::<native::OpenInterestSnapshot>(payload) {
660 let spec = self.require_native_symbol(&snapshot.symbol)?;
661 let value = Quantity::new(parse_decimal(&snapshot.open_interest)?);
662 return Ok(vec![PublicLaneEvent::OpenInterest(OpenInterest {
663 instrument_id: spec.instrument_id.clone(),
664 value,
665 event_time: TimestampMs::new(snapshot.time),
666 })]);
667 }
668
669 let message = self.parse_native_public(payload)?;
670 match message {
671 native::PublicMessage::Ticker(event) => {
672 let spec = self.require_native_symbol(&event.symbol)?;
673 Ok(vec![PublicLaneEvent::Ticker(FastTicker {
674 instrument_id: spec.instrument_id.clone(),
675 last_price: Price::new(parse_decimal(&event.last_price)?)
676 .quantize(spec.price_scale)?,
677 mark_price: None,
678 index_price: None,
679 volume_24h: Some(
680 Quantity::new(parse_decimal(&event.volume_24h)?)
681 .quantize(spec.qty_scale)?,
682 ),
683 turnover_24h: quantize_optional_notional(
684 parse_decimal(&event.quote_volume_24h)?,
685 spec.quote_scale,
686 ),
687 event_time: TimestampMs::new(event.event_time),
688 })])
689 }
690 native::PublicMessage::AggTrade(event) => {
691 let spec = self.require_native_symbol(&event.symbol)?;
692 Ok(vec![PublicLaneEvent::Trade(FastTrade {
693 instrument_id: spec.instrument_id.clone(),
694 trade_id: TradeId::from(event.agg_trade_id.to_string()),
695 price: Price::new(parse_decimal(&event.price)?).quantize(spec.price_scale)?,
696 quantity: Quantity::new(parse_decimal(&event.quantity)?)
697 .quantize(spec.qty_scale)?,
698 aggressor_side: if event.is_buyer_maker {
699 AggressorSide::Seller
700 } else {
701 AggressorSide::Buyer
702 },
703 event_time: TimestampMs::new(event.trade_time),
704 })])
705 }
706 native::PublicMessage::BookTicker(event) => {
707 let spec = self.require_native_symbol(&event.symbol)?;
708 Ok(vec![PublicLaneEvent::BookTop(FastBookTop {
709 instrument_id: spec.instrument_id.clone(),
710 bid_price: Price::new(parse_decimal(&event.best_bid_price)?)
711 .quantize(spec.price_scale)?,
712 bid_quantity: Quantity::new(parse_decimal(&event.best_bid_qty)?)
713 .quantize(spec.qty_scale)?,
714 ask_price: Price::new(parse_decimal(&event.best_ask_price)?)
715 .quantize(spec.price_scale)?,
716 ask_quantity: Quantity::new(parse_decimal(&event.best_ask_qty)?)
717 .quantize(spec.qty_scale)?,
718 event_time: TimestampMs::new(event.transaction_time),
719 })])
720 }
721 native::PublicMessage::Depth(event) => {
722 let spec = self.require_native_symbol(&event.symbol)?;
723 let best_bid = event.bids.first().ok_or_else(|| {
724 MarketError::new(ErrorKind::DecodeError, "missing binance best bid")
725 })?;
726 let best_ask = event.asks.first().ok_or_else(|| {
727 MarketError::new(ErrorKind::DecodeError, "missing binance best ask")
728 })?;
729 Ok(vec![
730 PublicLaneEvent::BookTop(FastBookTop {
731 instrument_id: spec.instrument_id.clone(),
732 bid_price: Price::new(parse_decimal(&best_bid[0])?)
733 .quantize(spec.price_scale)?,
734 bid_quantity: Quantity::new(parse_decimal(&best_bid[1])?)
735 .quantize(spec.qty_scale)?,
736 ask_price: Price::new(parse_decimal(&best_ask[0])?)
737 .quantize(spec.price_scale)?,
738 ask_quantity: Quantity::new(parse_decimal(&best_ask[1])?)
739 .quantize(spec.qty_scale)?,
740 event_time: TimestampMs::new(event.event_time),
741 }),
742 PublicLaneEvent::OrderBookDelta(FastOrderBookDelta {
743 instrument_id: spec.instrument_id.clone(),
744 bids: event
745 .bids
746 .iter()
747 .map(|level| {
748 Ok((
749 Price::new(parse_decimal(&level[0])?)
750 .quantize(spec.price_scale)?,
751 Quantity::new(parse_decimal(&level[1])?)
752 .quantize(spec.qty_scale)?,
753 ))
754 })
755 .collect::<Result<Vec<_>>>()?,
756 asks: event
757 .asks
758 .iter()
759 .map(|level| {
760 Ok((
761 Price::new(parse_decimal(&level[0])?)
762 .quantize(spec.price_scale)?,
763 Quantity::new(parse_decimal(&level[1])?)
764 .quantize(spec.qty_scale)?,
765 ))
766 })
767 .collect::<Result<Vec<_>>>()?,
768 event_time: TimestampMs::new(event.event_time),
769 }),
770 ])
771 }
772 native::PublicMessage::Kline(event) => {
773 let spec = self.require_native_symbol(&event.symbol)?;
774 Ok(vec![PublicLaneEvent::Kline(FastKline {
775 instrument_id: spec.instrument_id.clone(),
776 interval: event.kline.interval.into(),
777 open: Price::new(parse_decimal(&event.kline.open)?)
778 .quantize(spec.price_scale)?,
779 high: Price::new(parse_decimal(&event.kline.high)?)
780 .quantize(spec.price_scale)?,
781 low: Price::new(parse_decimal(&event.kline.low)?).quantize(spec.price_scale)?,
782 close: Price::new(parse_decimal(&event.kline.close)?)
783 .quantize(spec.price_scale)?,
784 volume: Quantity::new(parse_decimal(&event.kline.volume)?)
785 .quantize(spec.qty_scale)?,
786 open_time: TimestampMs::new(event.kline.open_time),
787 close_time: TimestampMs::new(event.kline.close_time),
788 closed: event.kline.closed,
789 })])
790 }
791 native::PublicMessage::MarkPrice(event) => {
792 let spec = self.require_native_symbol(&event.symbol)?;
793 Ok(vec![
794 PublicLaneEvent::MarkPrice(FastMarkPrice {
795 instrument_id: spec.instrument_id.clone(),
796 price: Price::new(parse_decimal(&event.mark_price)?)
797 .quantize(spec.price_scale)?,
798 funding_rate: Some(Rate::new(parse_decimal(&event.funding_rate)?)),
799 event_time: TimestampMs::new(event.event_time),
800 }),
801 PublicLaneEvent::FundingRate(FundingRate {
802 instrument_id: spec.instrument_id.clone(),
803 value: Rate::new(parse_decimal(&event.funding_rate)?),
804 mark_price: Some(Price::new(parse_decimal(&event.mark_price)?)),
805 event_time: TimestampMs::new(event.event_time),
806 }),
807 ])
808 }
809 native::PublicMessage::ForceOrder(event) => {
810 let spec = self.require_native_symbol(&event.order.symbol)?;
811 let quantity_raw = if event.order.cumulative_filled_qty.is_empty() {
812 &event.order.quantity
813 } else {
814 &event.order.cumulative_filled_qty
815 };
816 let price_raw = if event.order.average_price == "0" {
817 &event.order.price
818 } else {
819 &event.order.average_price
820 };
821 Ok(vec![PublicLaneEvent::Liquidation(
822 bat_markets_core::FastLiquidation {
823 instrument_id: spec.instrument_id.clone(),
824 side: parse_side(&event.order.side)?,
825 price: Price::new(parse_decimal(price_raw)?).quantize(spec.price_scale)?,
826 quantity: Quantity::new(parse_decimal(quantity_raw)?)
827 .quantize(spec.qty_scale)?,
828 event_time: TimestampMs::new(event.order.trade_time.max(event.event_time)),
829 },
830 )])
831 }
832 }
833 }
834
835 fn parse_private(&self, payload: &str) -> Result<Vec<PrivateLaneEvent>> {
836 let message = self.parse_native_private(payload)?;
837 match message {
838 native::PrivateMessage::AccountUpdate(event) => {
839 let mut events = Vec::new();
840
841 for balance in event.account.balances {
842 events.push(PrivateLaneEvent::Balance(Balance {
843 asset: AssetCode::from(balance.asset),
844 wallet_balance: balance_amount(&balance.wallet_balance)?,
845 available_balance: balance_amount(&balance.cross_wallet_balance)?,
846 updated_at: TimestampMs::new(event.transaction_time),
847 }));
848 }
849
850 for position in event.account.positions {
851 let spec = self.require_native_symbol(&position.symbol)?;
852 let size = parse_decimal(&position.position_amount)?;
853 events.push(PrivateLaneEvent::Position(Position {
854 position_id: PositionId::from(format!(
855 "binance:{}:{}",
856 position.symbol, position.position_side
857 )),
858 instrument_id: spec.instrument_id.clone(),
859 direction: decimal_direction(size),
860 size: Quantity::new(size.abs()),
861 entry_price: parse_optional_decimal(position.entry_price.as_deref())?
862 .map(Price::new),
863 mark_price: None,
864 unrealized_pnl: Some(balance_amount(&position.unrealized_pnl)?),
865 leverage: None,
866 margin_mode: parse_margin_mode(&position.margin_type)?,
867 position_mode: parse_position_mode(&position.position_side),
868 updated_at: TimestampMs::new(event.event_time),
869 }));
870 }
871
872 Ok(events)
873 }
874 native::PrivateMessage::OrderTradeUpdate(event) => {
875 let spec = self.require_native_symbol(&event.order.symbol)?;
876 let mut events = Vec::new();
877 let order_id = OrderId::from(event.order.order_id.to_string());
878 let client_order_id = Some(event.order.client_order_id.clone().into());
879 let average_fill_price =
880 parse_optional_decimal(event.order.average_price.as_deref())?.map(Price::new);
881 let updated_at = TimestampMs::new(event.order.trade_time);
882
883 events.push(PrivateLaneEvent::Order(Order {
884 order_id: order_id.clone(),
885 client_order_id: client_order_id.clone(),
886 instrument_id: spec.instrument_id.clone(),
887 side: parse_side(&event.order.side)?,
888 order_type: parse_order_type(&event.order.order_type)?,
889 time_in_force: Some(parse_time_in_force(&event.order.time_in_force)?),
890 status: parse_order_status(&event.order.order_status)?,
891 price: Some(Price::new(parse_decimal(&event.order.price)?)),
892 quantity: Quantity::new(parse_decimal(&event.order.original_quantity)?),
893 filled_quantity: Quantity::new(parse_decimal(
894 &event.order.cumulative_filled_qty,
895 )?),
896 average_fill_price,
897 reduce_only: event.order.reduce_only,
898 post_only: false,
899 created_at: TimestampMs::new(
900 event
901 .order
902 .order_trade_time
903 .unwrap_or(event.order.trade_time),
904 ),
905 updated_at,
906 venue_status: Some(event.order.execution_type.into()),
907 }));
908
909 if parse_decimal(&event.order.last_filled_qty)? > Decimal::ZERO {
910 events.push(PrivateLaneEvent::Execution(Execution {
911 execution_id: TradeId::from(
912 event
913 .order
914 .trade_id
915 .unwrap_or(event.order.trade_time)
916 .to_string(),
917 ),
918 order_id,
919 client_order_id,
920 instrument_id: spec.instrument_id.clone(),
921 side: parse_side(&event.order.side)?,
922 quantity: Quantity::new(parse_decimal(&event.order.last_filled_qty)?),
923 price: Price::new(parse_decimal(&event.order.last_filled_price)?),
924 fee: parse_optional_decimal(event.order.commission.as_deref())?
925 .map(Into::into),
926 fee_asset: event.order.commission_asset.map(AssetCode::from),
927 liquidity: None,
928 executed_at: updated_at,
929 }));
930 }
931
932 Ok(events)
933 }
934 native::PrivateMessage::TradeLite(event) => {
935 let spec = self.require_native_symbol(&event.symbol)?;
936 Ok(vec![PrivateLaneEvent::Execution(Execution {
937 execution_id: TradeId::from(event.trade_id.to_string()),
938 order_id: OrderId::from(event.order_id.to_string()),
939 client_order_id: Some(event.client_order_id.into()),
940 instrument_id: spec.instrument_id.clone(),
941 side: parse_side(&event.side)?,
942 quantity: Quantity::new(parse_decimal(&event.last_filled_qty)?),
943 price: Price::new(parse_decimal(&event.last_filled_price)?),
944 fee: None,
945 fee_asset: None,
946 liquidity: Some(if event.is_maker {
947 Liquidity::Maker
948 } else {
949 Liquidity::Taker
950 }),
951 executed_at: TimestampMs::new(event.trade_time.max(event.event_time)),
952 })])
953 }
954 native::PrivateMessage::AlgoUpdate(event) => Ok(vec![PrivateLaneEvent::Order(
955 self.algo_order_from_update_event(*event)?,
956 )]),
957 }
958 }
959
960 fn classify_command(
961 &self,
962 operation: CommandOperation,
963 payload: Option<&str>,
964 request_id: Option<RequestId>,
965 ) -> Result<CommandReceipt> {
966 let Some(payload) = payload else {
967 return Ok(CommandReceipt {
968 operation,
969 status: CommandStatus::UnknownExecution,
970 venue: Venue::Binance,
971 product: Product::LinearUsdt,
972 instrument_id: None,
973 order_id: None,
974 client_order_id: None,
975 request_id,
976 message: Some("command outcome requires reconcile".into()),
977 native_code: None,
978 retriable: true,
979 });
980 };
981
982 if let Ok(value) = serde_json::from_str::<serde_json::Value>(payload)
983 && let Some(error) = value.get("error").cloned()
984 && let Ok(error) = serde_json::from_value::<native::ErrorResponse>(error)
985 {
986 return Ok(CommandReceipt {
987 operation,
988 status: CommandStatus::Rejected,
989 venue: Venue::Binance,
990 product: Product::LinearUsdt,
991 instrument_id: None,
992 order_id: None,
993 client_order_id: None,
994 request_id,
995 message: Some(error.message.into()),
996 native_code: Some(error.code.to_string().into()),
997 retriable: false,
998 });
999 }
1000
1001 if let Ok(error) = serde_json::from_str::<native::ErrorResponse>(payload) {
1002 return Ok(CommandReceipt {
1003 operation,
1004 status: CommandStatus::Rejected,
1005 venue: Venue::Binance,
1006 product: Product::LinearUsdt,
1007 instrument_id: None,
1008 order_id: None,
1009 client_order_id: None,
1010 request_id,
1011 message: Some(error.message.into()),
1012 native_code: Some(error.code.to_string().into()),
1013 retriable: false,
1014 });
1015 }
1016
1017 match operation {
1018 CommandOperation::CreateOrder
1019 | CommandOperation::AmendOrder
1020 | CommandOperation::CancelOrder
1021 | CommandOperation::ClosePosition
1022 | CommandOperation::GetOrder => {
1023 let response = parse_binance_command_identity(payload).map_err(|error| {
1024 MarketError::new(
1025 ErrorKind::DecodeError,
1026 format!("failed to classify binance order response: {error}"),
1027 )
1028 .with_venue(Venue::Binance, Product::LinearUsdt)
1029 .with_operation("binance.classify_command")
1030 })?;
1031 let (symbol, order_id, client_order_id) = match response {
1032 BinanceAcceptedCommand::Order(response) => (
1033 Some(response.symbol),
1034 Some(OrderId::from(response.order_id.to_string())),
1035 Some(response.client_order_id.into()),
1036 ),
1037 BinanceAcceptedCommand::AlgoOrder(response) => (
1038 Some(response.symbol),
1039 Some(binance_algo_order_id(response.algo_id)),
1040 Some(response.client_algo_id.into()),
1041 ),
1042 BinanceAcceptedCommand::CancelAlgo(response) => (
1043 None,
1044 Some(binance_algo_order_id(response.algo_id)),
1045 Some(response.client_algo_id.into()),
1046 ),
1047 };
1048 let instrument_id = match symbol {
1049 Some(symbol) => {
1050 Some(self.require_native_symbol(&symbol)?.instrument_id.clone())
1051 }
1052 None => None,
1053 };
1054 Ok(CommandReceipt {
1055 operation,
1056 status: CommandStatus::Accepted,
1057 venue: Venue::Binance,
1058 product: Product::LinearUsdt,
1059 instrument_id,
1060 order_id,
1061 client_order_id,
1062 request_id,
1063 message: Some("accepted".into()),
1064 native_code: None,
1065 retriable: false,
1066 })
1067 }
1068 CommandOperation::CreateOrders
1069 | CommandOperation::AmendOrders
1070 | CommandOperation::CancelOrders
1071 | CommandOperation::CancelAllOrders
1072 | CommandOperation::ValidateOrder
1073 | CommandOperation::SetPositionMode => Ok(CommandReceipt {
1074 operation,
1075 status: CommandStatus::Accepted,
1076 venue: Venue::Binance,
1077 product: Product::LinearUsdt,
1078 instrument_id: None,
1079 order_id: None,
1080 client_order_id: None,
1081 request_id,
1082 message: Some("accepted".into()),
1083 native_code: None,
1084 retriable: false,
1085 }),
1086 CommandOperation::SetLeverage => {
1087 let response = serde_json::from_str::<native::SetLeverageResponse>(payload)
1088 .map_err(|error| {
1089 MarketError::new(
1090 ErrorKind::DecodeError,
1091 format!("failed to classify binance leverage response: {error}"),
1092 )
1093 })?;
1094 let spec = self.require_native_symbol(&response.symbol)?;
1095 Ok(CommandReceipt {
1096 operation,
1097 status: CommandStatus::Accepted,
1098 venue: Venue::Binance,
1099 product: Product::LinearUsdt,
1100 instrument_id: Some(spec.instrument_id.clone()),
1101 order_id: None,
1102 client_order_id: None,
1103 request_id,
1104 message: Some(format!("leverage set to {}", response.leverage).into()),
1105 native_code: None,
1106 retriable: false,
1107 })
1108 }
1109 CommandOperation::SetMarginMode => {
1110 let response =
1111 serde_json::from_str::<native::SuccessResponse>(payload).map_err(|error| {
1112 MarketError::new(
1113 ErrorKind::DecodeError,
1114 format!("failed to classify binance margin-mode response: {error}"),
1115 )
1116 })?;
1117 Ok(CommandReceipt {
1118 operation,
1119 status: CommandStatus::Accepted,
1120 venue: Venue::Binance,
1121 product: Product::LinearUsdt,
1122 instrument_id: None,
1123 order_id: None,
1124 client_order_id: None,
1125 request_id,
1126 message: Some(response.message.into()),
1127 native_code: response.code.map(|value| value.to_string().into()),
1128 retriable: false,
1129 })
1130 }
1131 }
1132 }
1133}
1134
1135enum BinanceAcceptedCommand {
1136 Order(native::OrderResponse),
1137 AlgoOrder(Box<native::AlgoOrderSnapshot>),
1138 CancelAlgo(native::CancelAlgoOrderResponse),
1139}
1140
1141fn parse_binance_command_identity(
1142 payload: &str,
1143) -> std::result::Result<BinanceAcceptedCommand, serde_json::Error> {
1144 if let Ok(response) = parse_binance_order_response(payload) {
1145 return Ok(BinanceAcceptedCommand::Order(response));
1146 }
1147 if let Ok(response) = parse_binance_algo_order_response(payload) {
1148 return Ok(BinanceAcceptedCommand::AlgoOrder(Box::new(response)));
1149 }
1150 parse_binance_cancel_algo_order_response(payload).map(BinanceAcceptedCommand::CancelAlgo)
1151}
1152
1153fn parse_binance_order_response(
1154 payload: &str,
1155) -> std::result::Result<native::OrderResponse, serde_json::Error> {
1156 if let Ok(value) = serde_json::from_str::<serde_json::Value>(payload)
1157 && let Some(result) = value.get("result").cloned()
1158 {
1159 return serde_json::from_value(result);
1160 }
1161 serde_json::from_str(payload)
1162}
1163
1164fn parse_binance_algo_order_response(
1165 payload: &str,
1166) -> std::result::Result<native::AlgoOrderSnapshot, serde_json::Error> {
1167 serde_json::from_str(payload)
1168}
1169
1170fn parse_binance_cancel_algo_order_response(
1171 payload: &str,
1172) -> std::result::Result<native::CancelAlgoOrderResponse, serde_json::Error> {
1173 serde_json::from_str(payload)
1174}
1175
1176fn btc_spec() -> InstrumentSpec {
1177 instrument_spec(("BTC", "USDT", "USDT"), "BTCUSDT", 2, 3, 5, Some(125))
1178}
1179
1180fn eth_spec() -> InstrumentSpec {
1181 instrument_spec(("ETH", "USDT", "USDT"), "ETHUSDT", 2, 3, 5, Some(100))
1182}
1183
1184fn instrument_spec(
1185 assets: (&str, &str, &str),
1186 native_symbol: &str,
1187 price_scale: u32,
1188 qty_scale: u32,
1189 quote_scale: u32,
1190 max_leverage: Option<i64>,
1191) -> InstrumentSpec {
1192 let (base, quote, settle) = assets;
1193 InstrumentSpec {
1194 venue: Venue::Binance,
1195 product: Product::LinearUsdt,
1196 market_type: MarketType::LinearPerpetual,
1197 instrument_id: InstrumentId::from(canonical_symbol(base, quote, settle)),
1198 canonical_symbol: canonical_symbol(base, quote, settle).into(),
1199 native_symbol: native_symbol.into(),
1200 base: AssetCode::from(base),
1201 quote: AssetCode::from(quote),
1202 settle: AssetCode::from(settle),
1203 contract_size: Quantity::new(Decimal::ONE),
1204 tick_size: Price::new(Decimal::new(1, price_scale)),
1205 step_size: Quantity::new(Decimal::new(1, qty_scale)),
1206 min_qty: Quantity::new(Decimal::new(1, qty_scale)),
1207 min_notional: Notional::new(Decimal::new(5, quote_scale)),
1208 price_scale,
1209 qty_scale,
1210 quote_scale,
1211 max_leverage: max_leverage.map(|value| Leverage::new(Decimal::new(value, 0))),
1212 support: InstrumentSupport {
1213 public_streams: true,
1214 private_trading: true,
1215 leverage_set: true,
1216 margin_mode_set: true,
1217 funding_rate: true,
1218 open_interest: true,
1219 },
1220 status: InstrumentStatus::Active,
1221 }
1222}
1223
1224fn canonical_symbol(base: &str, quote: &str, settle: &str) -> String {
1225 format!("{base}/{quote}:{settle}")
1226}
1227
1228fn parse_decimal(raw: &str) -> Result<Decimal> {
1229 raw.parse::<Decimal>().map_err(|error| {
1230 MarketError::new(
1231 ErrorKind::DecodeError,
1232 format!("invalid decimal '{raw}': {error}"),
1233 )
1234 .with_venue(Venue::Binance, Product::LinearUsdt)
1235 })
1236}
1237
1238fn parse_optional_decimal(raw: Option<&str>) -> Result<Option<Decimal>> {
1239 raw.map(parse_decimal).transpose()
1240}
1241
1242fn parse_optional_decimal_or_empty(raw: Option<&str>) -> Result<Option<Decimal>> {
1243 raw.filter(|value| !value.is_empty())
1244 .map(parse_decimal)
1245 .transpose()
1246}
1247
1248fn parse_optional_price_or_empty(raw: Option<&str>) -> Result<Option<Price>> {
1249 parse_optional_decimal_or_empty(raw).map(|value| {
1250 value.and_then(|price| {
1251 if price.is_zero() {
1252 None
1253 } else {
1254 Some(Price::new(price))
1255 }
1256 })
1257 })
1258}
1259
1260fn parse_side(raw: &str) -> Result<Side> {
1261 match raw {
1262 "BUY" => Ok(Side::Buy),
1263 "SELL" => Ok(Side::Sell),
1264 other => Err(MarketError::new(
1265 ErrorKind::DecodeError,
1266 format!("unsupported binance side '{other}'"),
1267 )),
1268 }
1269}
1270
1271fn parse_order_type(raw: &str) -> Result<OrderType> {
1272 match raw {
1273 "MARKET" => Ok(OrderType::Market),
1274 "LIMIT" => Ok(OrderType::Limit),
1275 "STOP_MARKET" => Ok(OrderType::StopMarket),
1276 "STOP" => Ok(OrderType::StopLimit),
1277 "TAKE_PROFIT_MARKET" => Ok(OrderType::TakeProfitMarket),
1278 "TAKE_PROFIT" => Ok(OrderType::TakeProfitLimit),
1279 other => Err(MarketError::new(
1280 ErrorKind::DecodeError,
1281 format!("unsupported binance order type '{other}'"),
1282 )),
1283 }
1284}
1285
1286fn parse_time_in_force(raw: &str) -> Result<TimeInForce> {
1287 match raw {
1288 "GTC" => Ok(TimeInForce::Gtc),
1289 "IOC" => Ok(TimeInForce::Ioc),
1290 "FOK" => Ok(TimeInForce::Fok),
1291 "GTX" => Ok(TimeInForce::PostOnly),
1292 other => Err(MarketError::new(
1293 ErrorKind::DecodeError,
1294 format!("unsupported binance time in force '{other}'"),
1295 )),
1296 }
1297}
1298
1299fn parse_optional_time_in_force(raw: Option<&str>) -> Result<Option<TimeInForce>> {
1300 raw.filter(|value| !value.is_empty())
1301 .map(parse_time_in_force)
1302 .transpose()
1303}
1304
1305fn parse_order_status(raw: &str) -> Result<OrderStatus> {
1306 match raw {
1307 "NEW" => Ok(OrderStatus::New),
1308 "PARTIALLY_FILLED" => Ok(OrderStatus::PartiallyFilled),
1309 "FILLED" => Ok(OrderStatus::Filled),
1310 "CANCELED" => Ok(OrderStatus::Canceled),
1311 "REJECTED" => Ok(OrderStatus::Rejected),
1312 "EXPIRED" => Ok(OrderStatus::Expired),
1313 "PENDING_CANCEL" => Ok(OrderStatus::PendingCancel),
1314 other => Err(MarketError::new(
1315 ErrorKind::DecodeError,
1316 format!("unsupported binance order status '{other}'"),
1317 )),
1318 }
1319}
1320
1321fn parse_algo_order_status(raw: &str, filled_quantity: Decimal) -> OrderStatus {
1322 match raw {
1323 "NEW" => OrderStatus::New,
1324 "CANCELED" => OrderStatus::Canceled,
1325 "TRIGGERING" | "TRIGGERED" => {
1326 if filled_quantity > Decimal::ZERO {
1327 OrderStatus::PartiallyFilled
1328 } else {
1329 OrderStatus::New
1330 }
1331 }
1332 "FINISHED" => {
1333 if filled_quantity > Decimal::ZERO {
1334 OrderStatus::Filled
1335 } else {
1336 OrderStatus::Canceled
1337 }
1338 }
1339 "REJECTED" => OrderStatus::Rejected,
1340 "EXPIRED" => OrderStatus::Expired,
1341 _ => OrderStatus::New,
1342 }
1343}
1344
1345fn binance_algo_order_id(algo_id: i64) -> OrderId {
1346 OrderId::from(format!("binance-algo:{algo_id}"))
1347}
1348
1349fn parse_margin_mode(raw: &str) -> Result<MarginMode> {
1350 match raw {
1351 "isolated" | "ISOLATED" => Ok(MarginMode::Isolated),
1352 "cross" | "crossed" | "CROSSED" => Ok(MarginMode::Cross),
1353 other => Err(MarketError::new(
1354 ErrorKind::DecodeError,
1355 format!("unsupported binance margin type '{other}'"),
1356 )),
1357 }
1358}
1359
1360fn parse_margin_mode_snapshot(
1361 raw: Option<&str>,
1362 isolated: Option<bool>,
1363 isolated_margin: Option<&str>,
1364 isolated_wallet: Option<&str>,
1365) -> Result<MarginMode> {
1366 if let Some(raw) = raw {
1367 return parse_margin_mode(raw);
1368 }
1369
1370 if let Some(isolated) = isolated {
1371 return Ok(if isolated {
1372 MarginMode::Isolated
1373 } else {
1374 MarginMode::Cross
1375 });
1376 }
1377
1378 let isolated_margin = parse_optional_decimal(isolated_margin)?;
1379 let isolated_wallet = parse_optional_decimal(isolated_wallet)?;
1380 if isolated_margin.is_some() || isolated_wallet.is_some() {
1381 return Ok(
1382 if isolated_margin.unwrap_or_default().is_zero()
1383 && isolated_wallet.unwrap_or_default().is_zero()
1384 {
1385 MarginMode::Cross
1386 } else {
1387 MarginMode::Isolated
1388 },
1389 );
1390 }
1391
1392 Err(MarketError::new(
1393 ErrorKind::DecodeError,
1394 "missing binance margin mode in account snapshot",
1395 ))
1396}
1397
1398fn parse_position_mode(raw: &str) -> PositionMode {
1399 match raw {
1400 "LONG" | "SHORT" => PositionMode::Hedge,
1401 _ => PositionMode::OneWay,
1402 }
1403}
1404
1405fn parse_instrument_status(raw: &str) -> InstrumentStatus {
1406 match raw {
1407 "TRADING" => InstrumentStatus::Active,
1408 "SETTLING" | "CLOSE" | "PENDING_TRADING" => InstrumentStatus::Halted,
1409 _ => InstrumentStatus::Halted,
1410 }
1411}
1412
1413fn decimal_direction(value: Decimal) -> PositionDirection {
1414 if value > Decimal::ZERO {
1415 PositionDirection::Long
1416 } else if value < Decimal::ZERO {
1417 PositionDirection::Short
1418 } else {
1419 PositionDirection::Flat
1420 }
1421}
1422
1423fn balance_amount(raw: &str) -> Result<bat_markets_core::Amount> {
1424 parse_decimal(raw).map(Into::into)
1425}
1426
1427fn quantize_optional_notional(
1428 value: Decimal,
1429 scale: u32,
1430) -> Option<bat_markets_core::FastNotional> {
1431 Notional::new(value).quantize(scale).ok()
1432}
1433
1434fn decimal_scale(value: Decimal) -> u32 {
1435 value.normalize().scale()
1436}
1437
1438fn require_filter_decimal(
1439 filters: &[native::ExchangeFilter],
1440 filter_type: &str,
1441 select: impl Fn(&native::ExchangeFilter) -> Option<&str>,
1442) -> Result<Decimal> {
1443 let raw = filters
1444 .iter()
1445 .find(|filter| filter.filter_type == filter_type)
1446 .and_then(select)
1447 .ok_or_else(|| {
1448 MarketError::new(
1449 ErrorKind::DecodeError,
1450 format!("missing binance {filter_type} filter"),
1451 )
1452 })?;
1453 parse_decimal(raw)
1454}
1455
1456fn parse_binance_kline_row(
1457 spec: &InstrumentSpec,
1458 interval: KlineInterval,
1459 row: Vec<serde_json::Value>,
1460) -> Result<Kline> {
1461 if row.len() < 7 {
1462 return Err(MarketError::new(
1463 ErrorKind::DecodeError,
1464 format!(
1465 "binance kline row has {} fields, expected at least 7",
1466 row.len()
1467 ),
1468 )
1469 .with_venue(Venue::Binance, Product::LinearUsdt));
1470 }
1471
1472 let open_time = parse_i64_value(&row[0], "open_time")?;
1473 let close_time = parse_i64_value(&row[6], "close_time")?;
1474
1475 Ok(Kline {
1476 instrument_id: spec.instrument_id.clone(),
1477 interval: Box::<str>::from(interval),
1478 open: spec.price_from_fast(
1479 Price::new(parse_decimal(parse_str_value(&row[1], "open")?)?)
1480 .quantize(spec.price_scale)?,
1481 ),
1482 high: spec.price_from_fast(
1483 Price::new(parse_decimal(parse_str_value(&row[2], "high")?)?)
1484 .quantize(spec.price_scale)?,
1485 ),
1486 low: spec.price_from_fast(
1487 Price::new(parse_decimal(parse_str_value(&row[3], "low")?)?)
1488 .quantize(spec.price_scale)?,
1489 ),
1490 close: spec.price_from_fast(
1491 Price::new(parse_decimal(parse_str_value(&row[4], "close")?)?)
1492 .quantize(spec.price_scale)?,
1493 ),
1494 volume: spec.quantity_from_fast(
1495 Quantity::new(parse_decimal(parse_str_value(&row[5], "volume")?)?)
1496 .quantize(spec.qty_scale)?,
1497 ),
1498 open_time: TimestampMs::new(open_time),
1499 close_time: TimestampMs::new(close_time),
1500 closed: close_time < now_timestamp_ms(),
1501 })
1502}
1503
1504fn parse_i64_value(value: &serde_json::Value, label: &str) -> Result<i64> {
1505 match value {
1506 serde_json::Value::Number(number) => number.as_i64().ok_or_else(|| {
1507 MarketError::new(
1508 ErrorKind::DecodeError,
1509 format!("invalid numeric value for binance {label}"),
1510 )
1511 .with_venue(Venue::Binance, Product::LinearUsdt)
1512 }),
1513 serde_json::Value::String(raw) => raw.parse::<i64>().map_err(|error| {
1514 MarketError::new(
1515 ErrorKind::DecodeError,
1516 format!("invalid i64 '{raw}' for binance {label}: {error}"),
1517 )
1518 .with_venue(Venue::Binance, Product::LinearUsdt)
1519 }),
1520 other => Err(MarketError::new(
1521 ErrorKind::DecodeError,
1522 format!("unsupported binance {label} representation: {other}"),
1523 )
1524 .with_venue(Venue::Binance, Product::LinearUsdt)),
1525 }
1526}
1527
1528fn parse_str_value<'a>(value: &'a serde_json::Value, label: &str) -> Result<&'a str> {
1529 match value {
1530 serde_json::Value::String(raw) => Ok(raw),
1531 other => Err(MarketError::new(
1532 ErrorKind::DecodeError,
1533 format!("unsupported binance {label} representation: {other}"),
1534 )
1535 .with_venue(Venue::Binance, Product::LinearUsdt)),
1536 }
1537}
1538
1539fn now_timestamp_ms() -> i64 {
1540 SystemTime::now()
1541 .duration_since(UNIX_EPOCH)
1542 .map(|duration| duration.as_millis().min(i128::from(i64::MAX) as u128) as i64)
1543 .unwrap_or(0)
1544}
1545
1546#[cfg(test)]
1547mod tests {
1548 use super::BinanceLinearFuturesAdapter;
1549 use bat_markets_core::{
1550 FetchOhlcvRequest, FetchTradesRequest, InstrumentId, OrderStatus, TimestampMs, VenueAdapter,
1551 };
1552
1553 const USER_TRADES: &str = include_str!(concat!(
1554 env!("CARGO_MANIFEST_DIR"),
1555 "/../../fixtures/binance/user_trades.json"
1556 ));
1557 const ORDER_HISTORY: &str = include_str!(concat!(
1558 env!("CARGO_MANIFEST_DIR"),
1559 "/../../fixtures/binance/order_history.json"
1560 ));
1561
1562 #[test]
1563 fn parse_binance_execution_history_snapshot() {
1564 let adapter = BinanceLinearFuturesAdapter::new();
1565 let executions = adapter
1566 .parse_executions_snapshot(USER_TRADES)
1567 .expect("binance user trades fixture should parse");
1568 assert_eq!(executions.len(), 1);
1569 assert_eq!(executions[0].execution_id.to_string(), "880001");
1570 }
1571
1572 #[test]
1573 fn parse_binance_order_history_snapshot() {
1574 let adapter = BinanceLinearFuturesAdapter::new();
1575 let orders = adapter
1576 .parse_order_history_snapshot(ORDER_HISTORY, TimestampMs::new(1))
1577 .expect("binance order history fixture should parse");
1578 assert_eq!(orders.len(), 1);
1579 assert_eq!(orders[0].status, OrderStatus::Filled);
1580 }
1581
1582 #[test]
1583 fn parse_binance_rest_ticker_snapshot() {
1584 let adapter = BinanceLinearFuturesAdapter::new();
1585 let ticker = adapter
1586 .parse_ticker_snapshot(
1587 r#"{
1588 "symbol":"BTCUSDT",
1589 "lastPrice":"70100.50",
1590 "volume":"1234.567",
1591 "quoteVolume":"86500000.12",
1592 "closeTime":1710000000000
1593 }"#,
1594 &InstrumentId::from("BTC/USDT:USDT"),
1595 )
1596 .expect("binance rest ticker should parse");
1597 assert_eq!(ticker.last_price.to_string(), "70100.50");
1598 }
1599
1600 #[test]
1601 fn parse_binance_rest_trades_snapshot() {
1602 let adapter = BinanceLinearFuturesAdapter::new();
1603 let trades = adapter
1604 .parse_trades_snapshot(
1605 r#"[{"a":1,"p":"70100.10","q":"0.500","T":1710000000001,"m":true}]"#,
1606 &FetchTradesRequest::new(InstrumentId::from("BTC/USDT:USDT"), Some(1)),
1607 )
1608 .expect("binance rest trades should parse");
1609 assert_eq!(trades.len(), 1);
1610 assert_eq!(trades[0].price.to_string(), "70100.10");
1611 }
1612
1613 #[test]
1614 fn parse_binance_rest_book_top_snapshot() {
1615 let adapter = BinanceLinearFuturesAdapter::new();
1616 let book_top = adapter
1617 .parse_book_top_snapshot(
1618 r#"{
1619 "symbol":"BTCUSDT",
1620 "bidPrice":"70100.90",
1621 "bidQty":"1.250",
1622 "askPrice":"70101.10",
1623 "askQty":"0.900",
1624 "time":1710000000200
1625 }"#,
1626 &InstrumentId::from("BTC/USDT:USDT"),
1627 )
1628 .expect("binance rest book top should parse");
1629 assert_eq!(book_top.bid.price.to_string(), "70100.90");
1630 assert_eq!(book_top.ask.price.to_string(), "70101.10");
1631 }
1632
1633 #[test]
1634 fn parse_binance_ticker_drops_unrepresentable_turnover_instead_of_failing() {
1635 let adapter = BinanceLinearFuturesAdapter::new();
1636 let events = adapter
1637 .parse_public(
1638 r#"{
1639 "e":"24hrTicker",
1640 "E":1710000000000,
1641 "s":"BTCUSDT",
1642 "c":"64000.10",
1643 "v":"12345.678",
1644 "q":"100000000000000000000.00"
1645 }"#,
1646 )
1647 .expect("binance ticker with large quote turnover should still parse");
1648
1649 let ticker = match &events[0] {
1650 bat_markets_core::PublicLaneEvent::Ticker(ticker) => ticker,
1651 other => panic!("expected ticker event, got {other:?}"),
1652 };
1653 assert!(ticker.turnover_24h.is_none());
1654 }
1655
1656 #[test]
1657 fn parse_binance_private_order_update_without_order_create_time() {
1658 let adapter = BinanceLinearFuturesAdapter::new();
1659 let events = adapter
1660 .parse_private(
1661 r#"{
1662 "e":"ORDER_TRADE_UPDATE",
1663 "o":{
1664 "s":"BTCUSDT",
1665 "c":"codex-demo",
1666 "S":"BUY",
1667 "o":"LIMIT",
1668 "f":"GTC",
1669 "q":"0.002",
1670 "p":"64000.10",
1671 "ap":"0",
1672 "x":"CANCELED",
1673 "X":"CANCELED",
1674 "i":123456,
1675 "l":"0",
1676 "z":"0",
1677 "L":"0",
1678 "n":null,
1679 "N":null,
1680 "T":1710000001234,
1681 "t":null,
1682 "R":false
1683 }
1684 }"#,
1685 )
1686 .expect("private order update without O should parse");
1687
1688 let order = match &events[0] {
1689 bat_markets_core::PrivateLaneEvent::Order(order) => order,
1690 other => panic!("expected order event, got {other:?}"),
1691 };
1692 assert_eq!(order.created_at, TimestampMs::new(1710000001234));
1693 assert_eq!(order.updated_at, TimestampMs::new(1710000001234));
1694 }
1695
1696 #[test]
1697 fn parse_binance_private_trade_lite_execution() {
1698 let adapter = BinanceLinearFuturesAdapter::new();
1699 let events = adapter
1700 .parse_private(
1701 r#"{
1702 "e":"TRADE_LITE",
1703 "E":1776795392416,
1704 "T":1776795392415,
1705 "s":"BTCUSDT",
1706 "q":"0.001",
1707 "p":"0.000000",
1708 "m":false,
1709 "c":"bx-open-1776795391188",
1710 "S":"BUY",
1711 "L":"70050.10",
1712 "l":"0.001",
1713 "t":3317622935,
1714 "i":96593497380
1715 }"#,
1716 )
1717 .expect("trade lite should parse");
1718
1719 assert_eq!(events.len(), 1);
1720 let bat_markets_core::PrivateLaneEvent::Execution(execution) = &events[0] else {
1721 panic!("expected execution event from TRADE_LITE");
1722 };
1723 assert_eq!(execution.instrument_id.as_ref(), "BTC/USDT:USDT");
1724 assert_eq!(
1725 execution.client_order_id.as_ref().map(ToString::to_string),
1726 Some("bx-open-1776795391188".to_owned())
1727 );
1728 assert_eq!(execution.quantity.value().to_string(), "0.001");
1729 assert_eq!(execution.price.value().to_string(), "70050.10");
1730 assert_eq!(
1731 execution.liquidity,
1732 Some(bat_markets_core::Liquidity::Taker)
1733 );
1734 }
1735
1736 #[test]
1737 fn parse_binance_account_snapshot_tolerates_missing_optional_position_fields() {
1738 let adapter = BinanceLinearFuturesAdapter::new();
1739 let (account, positions) = adapter
1740 .parse_account_snapshot(
1741 r#"{
1742 "totalWalletBalance":"5000.0",
1743 "availableBalance":"5000.0",
1744 "totalUnrealizedProfit":"0.0",
1745 "assets":[
1746 {
1747 "asset":"USDT",
1748 "walletBalance":"5000.0",
1749 "availableBalance":"5000.0"
1750 }
1751 ],
1752 "positions":[
1753 {
1754 "symbol":"BTCUSDT",
1755 "positionAmt":"0.0",
1756 "positionSide":"BOTH"
1757 },
1758 {
1759 "symbol":"BTCUSDT",
1760 "positionAmt":"0.001",
1761 "unrealizedProfit":"0.0",
1762 "isolatedMargin":"0",
1763 "isolatedWallet":"0",
1764 "positionSide":"BOTH"
1765 }
1766 ]
1767 }"#,
1768 TimestampMs::new(42),
1769 )
1770 .expect("account snapshot with sparse position fields should still parse");
1771
1772 assert_eq!(account.balances.len(), 1);
1773 assert_eq!(positions.len(), 1);
1774 assert!(positions[0].entry_price.is_none());
1775 assert!(positions[0].leverage.is_none());
1776 assert_eq!(
1777 positions[0].margin_mode,
1778 bat_markets_core::MarginMode::Cross
1779 );
1780 }
1781
1782 #[test]
1783 fn parse_binance_ohlcv_snapshot() {
1784 let adapter = BinanceLinearFuturesAdapter::new();
1785 let klines = adapter
1786 .parse_ohlcv_snapshot(
1787 r#"[
1788 [1710000000000,"64000.1","64100.0","63950.0","64050.0","12.345","1710000059999","0","0","0","0","0"],
1789 [1710000060000,"64050.0","64150.0","64000.0","64100.0","23.456","1710000119999","0","0","0","0","0"]
1790 ]"#,
1791 &FetchOhlcvRequest::for_instrument(
1792 InstrumentId::from("BTC/USDT:USDT"),
1793 "1m",
1794 None,
1795 None,
1796 Some(2),
1797 ),
1798 )
1799 .expect("binance klines snapshot should parse");
1800
1801 assert_eq!(klines.len(), 2);
1802 assert_eq!(klines[0].interval.as_ref(), "1m");
1803 assert_eq!(klines[0].open.to_string(), "64000.10");
1804 assert_eq!(klines[1].close.to_string(), "64100.00");
1805 }
1806}