1pub mod native;
4
5use std::{
6 collections::BTreeMap,
7 sync::Arc,
8 time::{SystemTime, UNIX_EPOCH},
9};
10
11use parking_lot::RwLock;
12use rust_decimal::Decimal;
13use serde::Deserialize;
14
15use bat_markets_core::{
16 AccountSnapshot, AggressorSide, AssetCode, Balance, BatMarketsConfig, CapabilitySet,
17 CommandOperation, CommandReceipt, CommandStatus, ErrorKind, Execution, FastBookTop, FastKline,
18 FastLiquidation, FastMarkPrice, FastOrderBookDelta, FastTicker, FastTrade, FetchOhlcvRequest,
19 FetchTradesRequest, FundingRate, InstrumentCatalog, InstrumentId, InstrumentSpec,
20 InstrumentStatus, InstrumentSupport, Kline, KlineInterval, Leverage, MarginMode, MarketError,
21 MarketType, Notional, OpenInterest, Order, OrderId, OrderStatus, OrderType, Position,
22 PositionDirection, PositionId, PositionMode, Price, PrivateLaneEvent, Product, PublicLaneEvent,
23 Quantity, Rate, RequestId, Result, Side, Ticker, TimeInForce, TimestampMs, TradeId, Venue,
24 VenueAdapter,
25};
26
27#[derive(Clone, Debug, PartialEq, Eq)]
29pub struct BybitAccountContext {
30 pub wallet_account_type: Box<str>,
31 pub margin_mode: Option<MarginMode>,
32}
33
34#[derive(Clone, Debug)]
36pub struct BybitLinearFuturesAdapter {
37 config: BatMarketsConfig,
38 capabilities: CapabilitySet,
39 lane_set: bat_markets_core::LaneSet,
40 instruments: Arc<RwLock<InstrumentCatalog>>,
41 ticker_cache: Arc<RwLock<BTreeMap<String, native::TickerData>>>,
42}
43
44impl Default for BybitLinearFuturesAdapter {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50impl BybitLinearFuturesAdapter {
51 #[must_use]
52 pub fn new() -> Self {
53 Self::with_config(BatMarketsConfig::new(Venue::Bybit, Product::LinearUsdt))
54 }
55
56 #[must_use]
57 pub fn with_config(config: BatMarketsConfig) -> Self {
58 Self {
59 config,
60 capabilities: CapabilitySet::linear_futures_defaults(),
61 lane_set: bat_markets_core::LaneSet::linear_futures_defaults(),
62 instruments: Arc::new(RwLock::new(InstrumentCatalog::new([
63 btc_spec(),
64 eth_spec(),
65 ]))),
66 ticker_cache: Arc::new(RwLock::new(BTreeMap::new())),
67 }
68 }
69
70 pub fn replace_instruments(&self, instruments: Vec<InstrumentSpec>) {
71 self.instruments.write().replace(instruments);
72 }
73
74 pub fn parse_native_public(&self, payload: &str) -> Result<native::PublicEnvelope> {
75 serde_json::from_str(payload).map_err(|error| {
76 MarketError::new(
77 ErrorKind::DecodeError,
78 format!("failed to parse bybit public payload: {error}"),
79 )
80 .with_venue(Venue::Bybit, Product::LinearUsdt)
81 .with_operation("bybit.parse_native_public")
82 })
83 }
84
85 pub fn parse_native_private(&self, payload: &str) -> Result<native::PrivateEnvelope> {
86 serde_json::from_str(payload).map_err(|error| {
87 MarketError::new(
88 ErrorKind::DecodeError,
89 format!("failed to parse bybit private payload: {error}"),
90 )
91 .with_venue(Venue::Bybit, Product::LinearUsdt)
92 .with_operation("bybit.parse_native_private")
93 })
94 }
95
96 fn merged_ticker_data(&self, envelope: &native::PublicEnvelope) -> Result<native::TickerData> {
97 let patch = serde_json::from_value::<native::TickerPatch>(envelope.data.clone())
98 .map_err(decode_error)?;
99 let symbol = patch
100 .symbol
101 .clone()
102 .or_else(|| envelope.topic.strip_prefix("tickers.").map(str::to_owned))
103 .ok_or_else(|| {
104 MarketError::new(
105 ErrorKind::DecodeError,
106 format!(
107 "failed to derive bybit ticker symbol from topic '{}'",
108 envelope.topic
109 ),
110 )
111 })?;
112
113 let mut cache = self.ticker_cache.write();
114 let entry = cache
115 .entry(symbol.clone())
116 .or_insert_with(|| native::TickerData {
117 symbol: symbol.clone(),
118 last_price: String::new(),
119 mark_price: String::new(),
120 index_price: String::new(),
121 open_interest: String::new(),
122 funding_rate: String::new(),
123 volume_24h: String::new(),
124 turnover_24h: String::new(),
125 bid1_price: None,
126 bid1_size: None,
127 ask1_price: None,
128 ask1_size: None,
129 });
130
131 merge_string_field(&mut entry.last_price, patch.last_price);
132 merge_string_field(&mut entry.mark_price, patch.mark_price);
133 merge_string_field(&mut entry.index_price, patch.index_price);
134 merge_string_field(&mut entry.open_interest, patch.open_interest);
135 merge_string_field(&mut entry.funding_rate, patch.funding_rate);
136 merge_string_field(&mut entry.volume_24h, patch.volume_24h);
137 merge_string_field(&mut entry.turnover_24h, patch.turnover_24h);
138 merge_optional_string_field(&mut entry.bid1_price, patch.bid1_price);
139 merge_optional_string_field(&mut entry.bid1_size, patch.bid1_size);
140 merge_optional_string_field(&mut entry.ask1_price, patch.ask1_price);
141 merge_optional_string_field(&mut entry.ask1_size, patch.ask1_size);
142
143 ensure_non_empty(&entry.last_price, "lastPrice")?;
144
145 Ok(entry.clone())
146 }
147
148 pub fn parse_server_time(&self, payload: &str) -> Result<TimestampMs> {
149 let response =
150 serde_json::from_str::<native::ServerTimeResponse>(payload).map_err(decode_error)?;
151 if response.ret_code != 0 {
152 return Err(exchange_reject(response.ret_code, &response.ret_msg));
153 }
154 let nanos = response
155 .result
156 .time_nano
157 .parse::<i128>()
158 .map_err(|error| decode_string_error("bybit server time nano", error))?;
159 Ok(TimestampMs::new((nanos / 1_000_000_i128) as i64))
160 }
161
162 pub fn parse_metadata_snapshot(&self, payload: &str) -> Result<Vec<InstrumentSpec>> {
163 let response = serde_json::from_str::<native::InstrumentsInfoResponse>(payload)
164 .map_err(decode_error)?;
165 if response.ret_code != 0 {
166 return Err(exchange_reject(response.ret_code, &response.ret_msg));
167 }
168
169 let mut instruments = Vec::new();
170 for instrument in response.result.list {
171 if instrument.contract_type != "LinearPerpetual" {
172 continue;
173 }
174 let tick_size = parse_decimal(&instrument.price_filter.tick_size)?;
175 let qty_step = parse_decimal(&instrument.lot_size_filter.qty_step)?;
176 let min_qty = parse_decimal(&instrument.lot_size_filter.min_order_qty)?;
177 let min_notional = parse_decimal(&instrument.lot_size_filter.min_notional_value)?;
178 let price_scale = decimal_scale(tick_size);
179 let qty_scale = decimal_scale(qty_step);
180 let quote_scale = price_scale
181 .saturating_add(qty_scale)
182 .max(decimal_scale(min_notional));
183
184 instruments.push(InstrumentSpec {
185 venue: Venue::Bybit,
186 product: Product::LinearUsdt,
187 market_type: MarketType::LinearPerpetual,
188 instrument_id: InstrumentId::from(canonical_symbol(
189 &instrument.base_coin,
190 &instrument.quote_coin,
191 &instrument.settle_coin,
192 )),
193 canonical_symbol: canonical_symbol(
194 &instrument.base_coin,
195 &instrument.quote_coin,
196 &instrument.settle_coin,
197 )
198 .into(),
199 native_symbol: instrument.symbol.into(),
200 base: AssetCode::from(instrument.base_coin),
201 quote: AssetCode::from(instrument.quote_coin),
202 settle: AssetCode::from(instrument.settle_coin),
203 contract_size: Quantity::new(Decimal::ONE),
204 tick_size: Price::new(tick_size),
205 step_size: Quantity::new(qty_step),
206 min_qty: Quantity::new(min_qty),
207 min_notional: Notional::new(min_notional),
208 price_scale,
209 qty_scale,
210 quote_scale,
211 max_leverage: Some(Leverage::new(parse_decimal(
212 &instrument.leverage_filter.max_leverage,
213 )?)),
214 support: InstrumentSupport {
215 public_streams: true,
216 private_trading: true,
217 leverage_set: true,
218 margin_mode_set: true,
219 funding_rate: true,
220 open_interest: true,
221 },
222 status: parse_instrument_status(&instrument.status),
223 });
224 }
225
226 Ok(instruments)
227 }
228
229 pub fn parse_account_context(&self, payload: &str) -> Result<BybitAccountContext> {
230 let response =
231 serde_json::from_str::<native::AccountInfoResponse>(payload).map_err(decode_error)?;
232 if response.ret_code != 0 {
233 return Err(exchange_reject(response.ret_code, &response.ret_msg));
234 }
235
236 Ok(BybitAccountContext {
237 wallet_account_type: if response.result.unified_margin_status.unwrap_or(0) > 0 {
238 "UNIFIED".into()
239 } else {
240 "CONTRACT".into()
241 },
242 margin_mode: response
243 .result
244 .margin_mode
245 .as_deref()
246 .and_then(parse_margin_mode_name),
247 })
248 }
249
250 pub fn parse_account_snapshot(
251 &self,
252 payload: &str,
253 observed_at: TimestampMs,
254 ) -> Result<AccountSnapshot> {
255 let response =
256 serde_json::from_str::<native::WalletBalanceResponse>(payload).map_err(decode_error)?;
257 if response.ret_code != 0 {
258 return Err(exchange_reject(response.ret_code, &response.ret_msg));
259 }
260
261 let account = response.result.list.into_iter().next().ok_or_else(|| {
262 MarketError::new(
263 ErrorKind::DecodeError,
264 "missing bybit wallet balance account",
265 )
266 })?;
267 let balances = account
268 .coin
269 .into_iter()
270 .map(|coin| {
271 Ok(Balance {
272 asset: AssetCode::from(coin.coin),
273 wallet_balance: parse_decimal(&coin.wallet_balance)?.into(),
274 available_balance: parse_decimal_or_zero_on_empty(&coin.available_to_withdraw)?
275 .into(),
276 updated_at: observed_at,
277 })
278 })
279 .collect::<Result<Vec<_>>>()?;
280
281 Ok(AccountSnapshot {
282 balances,
283 summary: Some(bat_markets_core::AccountSummary {
284 total_wallet_balance: parse_decimal(&account.total_wallet_balance)?.into(),
285 total_available_balance: parse_decimal_or_zero_on_empty(
286 &account.total_available_balance,
287 )?
288 .into(),
289 total_unrealized_pnl: parse_optional_decimal(account.total_perp_upl.as_deref())?
290 .unwrap_or(Decimal::ZERO)
291 .into(),
292 updated_at: observed_at,
293 }),
294 })
295 }
296
297 pub fn parse_positions_snapshot(
298 &self,
299 payload: &str,
300 observed_at: TimestampMs,
301 ) -> Result<Vec<Position>> {
302 let response =
303 serde_json::from_str::<native::PositionListResponse>(payload).map_err(decode_error)?;
304 if response.ret_code != 0 {
305 return Err(exchange_reject(response.ret_code, &response.ret_msg));
306 }
307
308 response
309 .result
310 .list
311 .into_iter()
312 .filter(|position| position.size != "0")
313 .map(|position| self.position_from_snapshot(position, observed_at))
314 .collect()
315 }
316
317 pub fn parse_open_orders_snapshot(
318 &self,
319 payload: &str,
320 observed_at: TimestampMs,
321 ) -> Result<Vec<Order>> {
322 let response =
323 serde_json::from_str::<native::OrderListResponse>(payload).map_err(decode_error)?;
324 if response.ret_code != 0 {
325 return Err(exchange_reject(response.ret_code, &response.ret_msg));
326 }
327
328 response
329 .result
330 .list
331 .into_iter()
332 .map(|order| self.order_from_snapshot(order, observed_at))
333 .collect()
334 }
335
336 pub fn parse_order_snapshot(&self, payload: &str, observed_at: TimestampMs) -> Result<Order> {
337 let mut orders = self.parse_open_orders_snapshot(payload, observed_at)?;
338 orders.pop().ok_or_else(|| {
339 MarketError::new(
340 ErrorKind::DecodeError,
341 "missing bybit order snapshot entry in response",
342 )
343 })
344 }
345
346 pub fn parse_order_history_snapshot(
347 &self,
348 payload: &str,
349 observed_at: TimestampMs,
350 ) -> Result<Vec<Order>> {
351 self.parse_open_orders_snapshot(payload, observed_at)
352 }
353
354 pub fn parse_executions_snapshot(&self, payload: &str) -> Result<Vec<Execution>> {
355 let response =
356 serde_json::from_str::<native::ExecutionListResponse>(payload).map_err(decode_error)?;
357 if response.ret_code != 0 {
358 return Err(exchange_reject(response.ret_code, &response.ret_msg));
359 }
360
361 response
362 .result
363 .list
364 .into_iter()
365 .map(|execution| self.execution_from_snapshot(execution))
366 .collect()
367 }
368
369 pub fn parse_ticker_snapshot(
370 &self,
371 payload: &str,
372 instrument_id: &InstrumentId,
373 ) -> Result<Ticker> {
374 let response =
375 serde_json::from_str::<native::MarketTickersResponse>(payload).map_err(decode_error)?;
376 if response.ret_code != 0 {
377 return Err(exchange_reject(response.ret_code, &response.ret_msg));
378 }
379 let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
380 MarketError::new(
381 ErrorKind::Unsupported,
382 format!("unsupported bybit instrument '{}'", instrument_id),
383 )
384 .with_venue(Venue::Bybit, Product::LinearUsdt)
385 })?;
386 let data = response.result.list.into_iter().next().ok_or_else(|| {
387 MarketError::new(
388 ErrorKind::DecodeError,
389 "missing bybit ticker entry in market/tickers response",
390 )
391 })?;
392
393 Ok(FastTicker {
394 instrument_id: spec.instrument_id.clone(),
395 last_price: Price::new(parse_decimal(&data.last_price)?).quantize(spec.price_scale)?,
396 mark_price: Some(
397 Price::new(parse_decimal(&data.mark_price)?).quantize(spec.price_scale)?,
398 ),
399 index_price: Some(
400 Price::new(parse_decimal(&data.index_price)?).quantize(spec.price_scale)?,
401 ),
402 volume_24h: Some(
403 Quantity::new(parse_decimal(&data.volume_24h)?).quantize(spec.qty_scale)?,
404 ),
405 turnover_24h: Some(
406 Notional::new(parse_decimal(&data.turnover_24h)?).quantize(spec.quote_scale)?,
407 ),
408 event_time: TimestampMs::new(now_timestamp_ms()),
409 }
410 .to_unified(&spec))
411 }
412
413 pub fn parse_trades_snapshot(
414 &self,
415 payload: &str,
416 request: &FetchTradesRequest,
417 ) -> Result<Vec<bat_markets_core::TradeTick>> {
418 let response = serde_json::from_str::<native::RecentPublicTradesResponse>(payload)
419 .map_err(decode_error)?;
420 if response.ret_code != 0 {
421 return Err(exchange_reject(response.ret_code, &response.ret_msg));
422 }
423 let spec = self
424 .resolve_instrument(&request.instrument_id)
425 .ok_or_else(|| {
426 MarketError::new(
427 ErrorKind::Unsupported,
428 format!("unsupported bybit instrument '{}'", request.instrument_id),
429 )
430 .with_venue(Venue::Bybit, Product::LinearUsdt)
431 })?;
432
433 response
434 .result
435 .list
436 .into_iter()
437 .map(|trade| {
438 let trade_time = trade
439 .time
440 .parse::<i64>()
441 .map_err(|error| decode_string_error("bybit trade time", error))?;
442 Ok(FastTrade {
443 instrument_id: spec.instrument_id.clone(),
444 trade_id: TradeId::from(trade.exec_id),
445 price: Price::new(parse_decimal(&trade.price)?).quantize(spec.price_scale)?,
446 quantity: Quantity::new(parse_decimal(&trade.size)?)
447 .quantize(spec.qty_scale)?,
448 aggressor_side: parse_aggressor(&trade.side)?,
449 event_time: TimestampMs::new(trade_time),
450 }
451 .to_unified(&spec))
452 })
453 .collect()
454 }
455
456 pub fn parse_book_top_snapshot(
457 &self,
458 payload: &str,
459 instrument_id: &InstrumentId,
460 ) -> Result<bat_markets_core::BookTop> {
461 let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
462 MarketError::new(
463 ErrorKind::Unsupported,
464 format!("unsupported bybit instrument '{}'", instrument_id),
465 )
466 .with_venue(Venue::Bybit, Product::LinearUsdt)
467 })?;
468 let response =
469 serde_json::from_str::<native::OrderBookResponse>(payload).map_err(decode_error)?;
470 if response.ret_code != 0 {
471 return Err(exchange_reject(response.ret_code, &response.ret_msg));
472 }
473 let bid =
474 response.result.bids.first().ok_or_else(|| {
475 MarketError::new(ErrorKind::DecodeError, "missing bybit best bid")
476 })?;
477 let ask =
478 response.result.asks.first().ok_or_else(|| {
479 MarketError::new(ErrorKind::DecodeError, "missing bybit best ask")
480 })?;
481
482 Ok(FastBookTop {
483 instrument_id: spec.instrument_id.clone(),
484 bid_price: Price::new(parse_decimal(&bid[0])?).quantize(spec.price_scale)?,
485 bid_quantity: Quantity::new(parse_decimal(&bid[1])?).quantize(spec.qty_scale)?,
486 ask_price: Price::new(parse_decimal(&ask[0])?).quantize(spec.price_scale)?,
487 ask_quantity: Quantity::new(parse_decimal(&ask[1])?).quantize(spec.qty_scale)?,
488 event_time: TimestampMs::new(
489 response
490 .result
491 .cts
492 .or(response.result.ts)
493 .unwrap_or_else(now_timestamp_ms),
494 ),
495 }
496 .to_unified(&spec))
497 }
498
499 pub fn parse_ohlcv_snapshot(
500 &self,
501 payload: &str,
502 request: &FetchOhlcvRequest,
503 ) -> Result<Vec<Kline>> {
504 #[derive(Clone, Debug, Deserialize)]
505 struct KlineListResponse {
506 #[serde(rename = "retCode")]
507 ret_code: i64,
508 #[serde(rename = "retMsg")]
509 ret_msg: String,
510 result: KlineListResult,
511 }
512
513 #[derive(Clone, Debug, Deserialize)]
514 struct KlineListResult {
515 list: Vec<[String; 7]>,
516 }
517
518 let interval = KlineInterval::parse(request.interval.as_ref()).ok_or_else(|| {
519 MarketError::new(
520 ErrorKind::Unsupported,
521 format!("unsupported bybit OHLCV interval '{}'", request.interval),
522 )
523 .with_venue(Venue::Bybit, Product::LinearUsdt)
524 })?;
525 let instrument_id = request.single_instrument_id()?;
526 let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
527 MarketError::new(
528 ErrorKind::Unsupported,
529 format!("unsupported bybit instrument '{}'", instrument_id),
530 )
531 .with_venue(Venue::Bybit, Product::LinearUsdt)
532 })?;
533 let response = serde_json::from_str::<KlineListResponse>(payload).map_err(decode_error)?;
534 if response.ret_code != 0 {
535 return Err(exchange_reject(response.ret_code, &response.ret_msg));
536 }
537
538 let mut klines = response
539 .result
540 .list
541 .into_iter()
542 .map(|row| {
543 let open_time = row[0]
544 .parse::<i64>()
545 .map_err(|error| decode_string_error("bybit kline startTime", error))?;
546 let close_time = interval.close_time_ms(open_time).ok_or_else(|| {
547 MarketError::new(
548 ErrorKind::DecodeError,
549 format!(
550 "failed to derive bybit kline close time for interval '{}'",
551 Box::<str>::from(interval)
552 ),
553 )
554 .with_venue(Venue::Bybit, Product::LinearUsdt)
555 })?;
556 Ok(Kline {
557 instrument_id: spec.instrument_id.clone(),
558 interval: Box::<str>::from(interval),
559 open: spec.price_from_fast(
560 Price::new(parse_decimal(&row[1])?).quantize(spec.price_scale)?,
561 ),
562 high: spec.price_from_fast(
563 Price::new(parse_decimal(&row[2])?).quantize(spec.price_scale)?,
564 ),
565 low: spec.price_from_fast(
566 Price::new(parse_decimal(&row[3])?).quantize(spec.price_scale)?,
567 ),
568 close: spec.price_from_fast(
569 Price::new(parse_decimal(&row[4])?).quantize(spec.price_scale)?,
570 ),
571 volume: spec.quantity_from_fast(
572 Quantity::new(parse_decimal(&row[5])?).quantize(spec.qty_scale)?,
573 ),
574 open_time: TimestampMs::new(open_time),
575 close_time: TimestampMs::new(close_time),
576 closed: close_time < now_timestamp_ms(),
577 })
578 })
579 .collect::<Result<Vec<_>>>()?;
580 klines.sort_by_key(|kline| kline.open_time.value());
581 Ok(klines)
582 }
583
584 fn position_from_snapshot(
585 &self,
586 position: native::PositionData,
587 observed_at: TimestampMs,
588 ) -> Result<Position> {
589 let spec = self.require_native_symbol(&position.symbol)?;
590 let side = parse_side(&position.side)?;
591 Ok(Position {
592 position_id: PositionId::from(format!(
593 "bybit:{}:{}",
594 position.symbol, position.position_idx
595 )),
596 instrument_id: spec.instrument_id.clone(),
597 direction: match side {
598 Side::Buy => PositionDirection::Long,
599 Side::Sell => PositionDirection::Short,
600 },
601 size: Quantity::new(parse_decimal(&position.size)?),
602 entry_price: parse_optional_decimal_str(&position.entry_price)?.map(Price::new),
603 mark_price: None,
604 unrealized_pnl: parse_optional_decimal_str(&position.unrealised_pnl)?.map(Into::into),
605 leverage: parse_optional_decimal(position.leverage.as_deref())?.map(Leverage::new),
606 margin_mode: parse_trade_mode(position.trade_mode),
607 position_mode: parse_position_mode(position.position_idx),
608 updated_at: observed_at,
609 })
610 }
611
612 fn order_from_snapshot(
613 &self,
614 order: native::OrderData,
615 observed_at: TimestampMs,
616 ) -> Result<Order> {
617 let spec = self.require_native_symbol(&order.symbol)?;
618 let created_at = if order.created_time > 0 {
619 TimestampMs::new(order.created_time)
620 } else {
621 observed_at
622 };
623 let updated_at = if order.updated_time > 0 {
624 TimestampMs::new(order.updated_time)
625 } else {
626 observed_at
627 };
628 Ok(Order {
629 order_id: OrderId::from(order.order_id),
630 client_order_id: order.order_link_id.map(Into::into),
631 instrument_id: spec.instrument_id.clone(),
632 side: parse_side(&order.side)?,
633 order_type: parse_order_type(&order.order_type)?,
634 time_in_force: Some(parse_time_in_force(&order.time_in_force)?),
635 status: parse_order_status(&order.order_status)?,
636 price: parse_optional_decimal_str(&order.price)?.map(Price::new),
637 quantity: Quantity::new(parse_decimal(&order.quantity)?),
638 filled_quantity: Quantity::new(parse_decimal(&order.cumulative_exec_qty)?),
639 average_fill_price: parse_optional_decimal(order.average_price.as_deref())?
640 .map(Price::new),
641 reduce_only: order.reduce_only,
642 post_only: matches!(order.time_in_force.as_str(), "PostOnly"),
643 created_at,
644 updated_at,
645 venue_status: Some(order.order_status.into()),
646 })
647 }
648
649 fn execution_from_snapshot(&self, execution: native::ExecutionData) -> Result<Execution> {
650 let spec = self.require_native_symbol(&execution.symbol)?;
651 Ok(Execution {
652 execution_id: TradeId::from(execution.exec_id),
653 order_id: OrderId::from(execution.order_id),
654 client_order_id: execution.order_link_id.map(Into::into),
655 instrument_id: spec.instrument_id.clone(),
656 side: parse_side(&execution.side)?,
657 quantity: Quantity::new(parse_decimal(&execution.exec_qty)?),
658 price: Price::new(parse_decimal(&execution.exec_price)?),
659 fee: Some(parse_decimal(&execution.exec_fee)?.into()),
660 fee_asset: execution.fee_currency.map(AssetCode::from),
661 liquidity: None,
662 executed_at: TimestampMs::new(execution.exec_time),
663 })
664 }
665
666 fn require_native_symbol(&self, native_symbol: &str) -> Result<InstrumentSpec> {
667 self.resolve_native_symbol(native_symbol).ok_or_else(|| {
668 MarketError::new(
669 ErrorKind::Unsupported,
670 format!("unsupported bybit symbol '{native_symbol}'"),
671 )
672 .with_venue(Venue::Bybit, Product::LinearUsdt)
673 })
674 }
675}
676
677impl VenueAdapter for BybitLinearFuturesAdapter {
678 fn venue(&self) -> Venue {
679 Venue::Bybit
680 }
681
682 fn product(&self) -> Product {
683 Product::LinearUsdt
684 }
685
686 fn config(&self) -> &BatMarketsConfig {
687 &self.config
688 }
689
690 fn capabilities(&self) -> CapabilitySet {
691 self.capabilities
692 }
693
694 fn lane_set(&self) -> bat_markets_core::LaneSet {
695 self.lane_set
696 }
697
698 fn instrument_specs(&self) -> Vec<InstrumentSpec> {
699 self.instruments.read().all()
700 }
701
702 fn resolve_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentSpec> {
703 self.instruments.read().get(instrument_id)
704 }
705
706 fn resolve_native_symbol(&self, native_symbol: &str) -> Option<InstrumentSpec> {
707 self.instruments.read().by_native_symbol(native_symbol)
708 }
709
710 fn parse_public(&self, payload: &str) -> Result<Vec<PublicLaneEvent>> {
711 let envelope = self.parse_native_public(payload)?;
712 if envelope.topic.starts_with("tickers.") {
713 let data = self.merged_ticker_data(&envelope)?;
714 let spec = self.require_native_symbol(&data.symbol)?;
715 let mut events = Vec::with_capacity(4);
716 events.push(PublicLaneEvent::Ticker(FastTicker {
717 instrument_id: spec.instrument_id.clone(),
718 last_price: Price::new(parse_decimal(&data.last_price)?)
719 .quantize(spec.price_scale)?,
720 mark_price: parse_optional_decimal_str(&data.mark_price)?
721 .map(|value| Price::new(value).quantize(spec.price_scale))
722 .transpose()?,
723 index_price: parse_optional_decimal_str(&data.index_price)?
724 .map(|value| Price::new(value).quantize(spec.price_scale))
725 .transpose()?,
726 volume_24h: parse_optional_decimal_str(&data.volume_24h)?
727 .map(|value| Quantity::new(value).quantize(spec.qty_scale))
728 .transpose()?,
729 turnover_24h: parse_optional_decimal_str(&data.turnover_24h)?
730 .map(|value| Notional::new(value).quantize(spec.quote_scale))
731 .transpose()?,
732 event_time: TimestampMs::new(envelope.ts),
733 }));
734 if let Some(mark_price) = parse_optional_decimal_str(&data.mark_price)? {
735 events.push(PublicLaneEvent::MarkPrice(FastMarkPrice {
736 instrument_id: spec.instrument_id.clone(),
737 price: Price::new(mark_price).quantize(spec.price_scale)?,
738 funding_rate: parse_optional_decimal_str(&data.funding_rate)?.map(Rate::new),
739 event_time: TimestampMs::new(envelope.ts),
740 }));
741 }
742 if let Some(funding_rate) = parse_optional_decimal_str(&data.funding_rate)? {
743 events.push(PublicLaneEvent::FundingRate(FundingRate {
744 instrument_id: spec.instrument_id.clone(),
745 value: Rate::new(funding_rate),
746 mark_price: parse_optional_decimal_str(&data.mark_price)?.map(Price::new),
747 event_time: TimestampMs::new(envelope.ts),
748 }));
749 }
750 if let Some(open_interest) = parse_optional_decimal_str(&data.open_interest)? {
751 events.push(PublicLaneEvent::OpenInterest(OpenInterest {
752 instrument_id: spec.instrument_id.clone(),
753 value: Quantity::new(open_interest),
754 event_time: TimestampMs::new(envelope.ts),
755 }));
756 }
757 return Ok(events);
758 }
759
760 if envelope.topic.starts_with("publicTrade.") {
761 let data: Vec<native::PublicTradeData> =
762 serde_json::from_value(envelope.data).map_err(decode_error)?;
763 let mut events = Vec::with_capacity(data.len());
764 for trade in data {
765 let spec = self.require_native_symbol(&trade.symbol)?;
766 events.push(PublicLaneEvent::Trade(FastTrade {
767 instrument_id: spec.instrument_id.clone(),
768 trade_id: TradeId::from(trade.trade_id),
769 price: Price::new(parse_decimal(&trade.price)?).quantize(spec.price_scale)?,
770 quantity: Quantity::new(parse_decimal(&trade.quantity)?)
771 .quantize(spec.qty_scale)?,
772 aggressor_side: parse_aggressor(&trade.side)?,
773 event_time: TimestampMs::new(trade.trade_time),
774 }));
775 }
776 return Ok(events);
777 }
778
779 if envelope.topic.starts_with("orderbook.") {
780 let data: native::OrderBookData =
781 serde_json::from_value(envelope.data).map_err(decode_error)?;
782 let spec = self.require_native_symbol(&data.symbol)?;
783 let bid = data.bids.first().ok_or_else(|| {
784 MarketError::new(ErrorKind::DecodeError, "missing bybit best bid")
785 })?;
786 let ask = data.asks.first().ok_or_else(|| {
787 MarketError::new(ErrorKind::DecodeError, "missing bybit best ask")
788 })?;
789 return Ok(vec![
790 PublicLaneEvent::BookTop(FastBookTop {
791 instrument_id: spec.instrument_id.clone(),
792 bid_price: Price::new(parse_decimal(&bid[0])?).quantize(spec.price_scale)?,
793 bid_quantity: Quantity::new(parse_decimal(&bid[1])?)
794 .quantize(spec.qty_scale)?,
795 ask_price: Price::new(parse_decimal(&ask[0])?).quantize(spec.price_scale)?,
796 ask_quantity: Quantity::new(parse_decimal(&ask[1])?)
797 .quantize(spec.qty_scale)?,
798 event_time: TimestampMs::new(envelope.ts),
799 }),
800 PublicLaneEvent::OrderBookDelta(FastOrderBookDelta {
801 instrument_id: spec.instrument_id.clone(),
802 bids: data
803 .bids
804 .iter()
805 .map(|level| {
806 Ok((
807 Price::new(parse_decimal(&level[0])?).quantize(spec.price_scale)?,
808 Quantity::new(parse_decimal(&level[1])?)
809 .quantize(spec.qty_scale)?,
810 ))
811 })
812 .collect::<Result<Vec<_>>>()?,
813 asks: data
814 .asks
815 .iter()
816 .map(|level| {
817 Ok((
818 Price::new(parse_decimal(&level[0])?).quantize(spec.price_scale)?,
819 Quantity::new(parse_decimal(&level[1])?)
820 .quantize(spec.qty_scale)?,
821 ))
822 })
823 .collect::<Result<Vec<_>>>()?,
824 event_time: TimestampMs::new(envelope.ts),
825 }),
826 ]);
827 }
828
829 if envelope.topic.starts_with("allLiquidation.") {
830 let data: Vec<native::AllLiquidationData> =
831 serde_json::from_value(envelope.data).map_err(decode_error)?;
832 let mut events = Vec::with_capacity(data.len());
833 for liquidation in data {
834 let spec = self.require_native_symbol(&liquidation.symbol)?;
835 events.push(PublicLaneEvent::Liquidation(FastLiquidation {
836 instrument_id: spec.instrument_id.clone(),
837 side: parse_side(&liquidation.side)?,
838 price: Price::new(parse_decimal(&liquidation.price)?)
839 .quantize(spec.price_scale)?,
840 quantity: Quantity::new(parse_decimal(&liquidation.quantity)?)
841 .quantize(spec.qty_scale)?,
842 event_time: TimestampMs::new(liquidation.updated_time),
843 }));
844 }
845 return Ok(events);
846 }
847
848 if envelope.topic.starts_with("kline.") {
849 let topic_symbol = envelope.topic.rsplit('.').next().ok_or_else(|| {
850 MarketError::new(
851 ErrorKind::DecodeError,
852 format!(
853 "failed to derive bybit kline symbol from topic '{}'",
854 envelope.topic
855 ),
856 )
857 .with_venue(Venue::Bybit, Product::LinearUsdt)
858 })?;
859 let data: Vec<native::KlineData> =
860 serde_json::from_value(envelope.data).map_err(decode_error)?;
861 let mut events = Vec::with_capacity(data.len());
862 for kline in data {
863 let symbol = kline.symbol.as_deref().unwrap_or(topic_symbol);
864 let spec = self.require_native_symbol(symbol)?;
865 events.push(PublicLaneEvent::Kline(FastKline {
866 instrument_id: spec.instrument_id.clone(),
867 interval: kline.interval.into(),
868 open: Price::new(parse_decimal(&kline.open)?).quantize(spec.price_scale)?,
869 high: Price::new(parse_decimal(&kline.high)?).quantize(spec.price_scale)?,
870 low: Price::new(parse_decimal(&kline.low)?).quantize(spec.price_scale)?,
871 close: Price::new(parse_decimal(&kline.close)?).quantize(spec.price_scale)?,
872 volume: Quantity::new(parse_decimal(&kline.volume)?)
873 .quantize(spec.qty_scale)?,
874 open_time: TimestampMs::new(kline.start),
875 close_time: TimestampMs::new(kline.end),
876 closed: kline.confirm,
877 }));
878 }
879 return Ok(events);
880 }
881
882 Err(MarketError::new(
883 ErrorKind::Unsupported,
884 format!("unsupported bybit public topic '{}'", envelope.topic),
885 ))
886 }
887
888 fn parse_private(&self, payload: &str) -> Result<Vec<PrivateLaneEvent>> {
889 let envelope = self.parse_native_private(payload)?;
890 if envelope.topic == "wallet" {
891 let data: Vec<native::WalletData> =
892 serde_json::from_value(envelope.data).map_err(decode_error)?;
893 let mut events = Vec::new();
894 for wallet in data {
895 for coin in wallet.coins {
896 events.push(PrivateLaneEvent::Balance(Balance {
897 asset: AssetCode::from(coin.coin),
898 wallet_balance: parse_decimal(&coin.wallet_balance)?.into(),
899 available_balance: parse_decimal_or_zero_on_empty(
900 &coin.available_to_withdraw,
901 )?
902 .into(),
903 updated_at: TimestampMs::new(envelope.creation_time),
904 }));
905 }
906 }
907 return Ok(events);
908 }
909
910 if envelope.topic == "position" {
911 let data: Vec<native::PositionData> =
912 serde_json::from_value(envelope.data).map_err(decode_error)?;
913 let mut events = Vec::new();
914 for position in data {
915 if position.size == "0" {
916 continue;
917 }
918 let spec = self.require_native_symbol(&position.symbol)?;
919 let side = parse_side(&position.side)?;
920 events.push(PrivateLaneEvent::Position(Position {
921 position_id: PositionId::from(format!(
922 "bybit:{}:{}",
923 position.symbol, position.position_idx
924 )),
925 instrument_id: spec.instrument_id.clone(),
926 direction: match side {
927 Side::Buy => PositionDirection::Long,
928 Side::Sell => PositionDirection::Short,
929 },
930 size: Quantity::new(parse_decimal(&position.size)?),
931 entry_price: parse_optional_decimal_str(&position.entry_price)?.map(Price::new),
932 mark_price: None,
933 unrealized_pnl: parse_optional_decimal_str(&position.unrealised_pnl)?
934 .map(Into::into),
935 leverage: parse_optional_decimal(position.leverage.as_deref())?
936 .map(Leverage::new),
937 margin_mode: parse_trade_mode(position.trade_mode),
938 position_mode: parse_position_mode(position.position_idx),
939 updated_at: TimestampMs::new(envelope.creation_time),
940 }));
941 }
942 return Ok(events);
943 }
944
945 if envelope.topic == "order" {
946 let data: Vec<native::OrderData> =
947 serde_json::from_value(envelope.data).map_err(decode_error)?;
948 let mut events = Vec::new();
949 for order in data {
950 events.push(PrivateLaneEvent::Order(self.order_from_snapshot(
951 order,
952 TimestampMs::new(envelope.creation_time),
953 )?));
954 }
955 return Ok(events);
956 }
957
958 if envelope.topic == "execution" {
959 let data: Vec<native::ExecutionData> =
960 serde_json::from_value(envelope.data).map_err(decode_error)?;
961 let mut events = Vec::new();
962 for execution in data {
963 let spec = self.require_native_symbol(&execution.symbol)?;
964 events.push(PrivateLaneEvent::Execution(Execution {
965 execution_id: TradeId::from(execution.exec_id),
966 order_id: OrderId::from(execution.order_id),
967 client_order_id: execution.order_link_id.map(Into::into),
968 instrument_id: spec.instrument_id.clone(),
969 side: parse_side(&execution.side)?,
970 quantity: Quantity::new(parse_decimal(&execution.exec_qty)?),
971 price: Price::new(parse_decimal(&execution.exec_price)?),
972 fee: Some(parse_decimal(&execution.exec_fee)?.into()),
973 fee_asset: execution.fee_currency.map(AssetCode::from),
974 liquidity: None,
975 executed_at: TimestampMs::new(execution.exec_time),
976 }));
977 }
978 return Ok(events);
979 }
980
981 Err(MarketError::new(
982 ErrorKind::Unsupported,
983 format!("unsupported bybit private topic '{}'", envelope.topic),
984 ))
985 }
986
987 fn classify_command(
988 &self,
989 operation: CommandOperation,
990 payload: Option<&str>,
991 request_id: Option<RequestId>,
992 ) -> Result<CommandReceipt> {
993 let Some(payload) = payload else {
994 return Ok(CommandReceipt {
995 operation,
996 status: CommandStatus::UnknownExecution,
997 venue: Venue::Bybit,
998 product: Product::LinearUsdt,
999 instrument_id: None,
1000 order_id: None,
1001 client_order_id: None,
1002 request_id,
1003 message: Some("command outcome requires reconcile".into()),
1004 native_code: None,
1005 retriable: true,
1006 });
1007 };
1008
1009 let value = serde_json::from_str::<serde_json::Value>(payload).map_err(|error| {
1010 MarketError::new(
1011 ErrorKind::DecodeError,
1012 format!("failed to classify bybit command response: {error}"),
1013 )
1014 })?;
1015 let response = native::RetCodeResponse {
1016 ret_code: value
1017 .get("retCode")
1018 .and_then(|value| {
1019 value
1020 .as_i64()
1021 .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1022 })
1023 .unwrap_or_default(),
1024 ret_msg: value
1025 .get("retMsg")
1026 .and_then(serde_json::Value::as_str)
1027 .unwrap_or_default()
1028 .to_owned(),
1029 result: value
1030 .get("result")
1031 .cloned()
1032 .or_else(|| value.get("data").cloned()),
1033 };
1034
1035 if response.ret_code != 0 {
1036 return Ok(CommandReceipt {
1037 operation,
1038 status: CommandStatus::Rejected,
1039 venue: Venue::Bybit,
1040 product: Product::LinearUsdt,
1041 instrument_id: None,
1042 order_id: None,
1043 client_order_id: None,
1044 request_id,
1045 message: Some(response.ret_msg.into()),
1046 native_code: Some(response.ret_code.to_string().into()),
1047 retriable: false,
1048 });
1049 }
1050
1051 let result = response.result.unwrap_or_default();
1052
1053 let (instrument_id, order_id, client_order_id) = match operation {
1054 CommandOperation::CreateOrder
1055 | CommandOperation::AmendOrder
1056 | CommandOperation::CancelOrder
1057 | CommandOperation::ClosePosition
1058 | CommandOperation::GetOrder => {
1059 let order = serde_json::from_value::<native::OrderResult>(result)
1060 .unwrap_or_else(|_| native::OrderResult::default());
1061 let instrument_id = order
1062 .symbol
1063 .as_deref()
1064 .and_then(|symbol| self.resolve_native_symbol(symbol))
1065 .map(|spec| spec.instrument_id.clone());
1066 (
1067 instrument_id,
1068 order.order_id.map(OrderId::from),
1069 order.order_link_id.map(Into::into),
1070 )
1071 }
1072 _ => (None, None, None),
1073 };
1074
1075 Ok(CommandReceipt {
1076 operation,
1077 status: CommandStatus::Accepted,
1078 venue: Venue::Bybit,
1079 product: Product::LinearUsdt,
1080 instrument_id,
1081 order_id,
1082 client_order_id,
1083 request_id,
1084 message: Some(response.ret_msg.into()),
1085 native_code: Some(response.ret_code.to_string().into()),
1086 retriable: false,
1087 })
1088 }
1089}
1090
1091fn btc_spec() -> InstrumentSpec {
1092 instrument_spec(("BTC", "USDT", "USDT"), "BTCUSDT", 2, 3, 5, 100)
1093}
1094
1095fn eth_spec() -> InstrumentSpec {
1096 instrument_spec(("ETH", "USDT", "USDT"), "ETHUSDT", 2, 3, 5, 50)
1097}
1098
1099fn instrument_spec(
1100 assets: (&str, &str, &str),
1101 native_symbol: &str,
1102 price_scale: u32,
1103 qty_scale: u32,
1104 quote_scale: u32,
1105 max_leverage: i64,
1106) -> InstrumentSpec {
1107 let (base, quote, settle) = assets;
1108 InstrumentSpec {
1109 venue: Venue::Bybit,
1110 product: Product::LinearUsdt,
1111 market_type: MarketType::LinearPerpetual,
1112 instrument_id: InstrumentId::from(canonical_symbol(base, quote, settle)),
1113 canonical_symbol: canonical_symbol(base, quote, settle).into(),
1114 native_symbol: native_symbol.into(),
1115 base: AssetCode::from(base),
1116 quote: AssetCode::from(quote),
1117 settle: AssetCode::from(settle),
1118 contract_size: Quantity::new(Decimal::ONE),
1119 tick_size: Price::new(Decimal::new(1, price_scale)),
1120 step_size: Quantity::new(Decimal::new(1, qty_scale)),
1121 min_qty: Quantity::new(Decimal::new(1, qty_scale)),
1122 min_notional: Notional::new(Decimal::new(5, quote_scale)),
1123 price_scale,
1124 qty_scale,
1125 quote_scale,
1126 max_leverage: Some(Leverage::new(Decimal::new(max_leverage, 0))),
1127 support: InstrumentSupport {
1128 public_streams: true,
1129 private_trading: true,
1130 leverage_set: true,
1131 margin_mode_set: true,
1132 funding_rate: true,
1133 open_interest: true,
1134 },
1135 status: InstrumentStatus::Active,
1136 }
1137}
1138
1139fn canonical_symbol(base: &str, quote: &str, settle: &str) -> String {
1140 format!("{base}/{quote}:{settle}")
1141}
1142
1143fn parse_decimal(raw: &str) -> Result<Decimal> {
1144 raw.parse::<Decimal>().map_err(|error| {
1145 MarketError::new(
1146 ErrorKind::DecodeError,
1147 format!("invalid decimal '{raw}': {error}"),
1148 )
1149 .with_venue(Venue::Bybit, Product::LinearUsdt)
1150 })
1151}
1152
1153fn parse_optional_decimal(raw: Option<&str>) -> Result<Option<Decimal>> {
1154 raw.map(str::trim)
1155 .filter(|raw| !raw.is_empty())
1156 .map(parse_decimal)
1157 .transpose()
1158}
1159
1160fn parse_decimal_or_zero_on_empty(raw: &str) -> Result<Decimal> {
1161 Ok(parse_optional_decimal_str(raw)?.unwrap_or(Decimal::ZERO))
1162}
1163
1164fn parse_side(raw: &str) -> Result<Side> {
1165 match raw {
1166 "Buy" => Ok(Side::Buy),
1167 "Sell" => Ok(Side::Sell),
1168 other => Err(MarketError::new(
1169 ErrorKind::DecodeError,
1170 format!("unsupported bybit side '{other}'"),
1171 )),
1172 }
1173}
1174
1175fn parse_aggressor(raw: &str) -> Result<AggressorSide> {
1176 match raw {
1177 "Buy" => Ok(AggressorSide::Buyer),
1178 "Sell" => Ok(AggressorSide::Seller),
1179 other => Err(MarketError::new(
1180 ErrorKind::DecodeError,
1181 format!("unsupported bybit trade side '{other}'"),
1182 )),
1183 }
1184}
1185
1186fn parse_order_type(raw: &str) -> Result<OrderType> {
1187 match raw {
1188 "Market" => Ok(OrderType::Market),
1189 "Limit" => Ok(OrderType::Limit),
1190 "Stop" => Ok(OrderType::StopLimit),
1191 "StopMarket" => Ok(OrderType::StopMarket),
1192 "TakeProfit" => Ok(OrderType::TakeProfitLimit),
1193 "TakeProfitMarket" => Ok(OrderType::TakeProfitMarket),
1194 other => Err(MarketError::new(
1195 ErrorKind::DecodeError,
1196 format!("unsupported bybit order type '{other}'"),
1197 )),
1198 }
1199}
1200
1201fn parse_time_in_force(raw: &str) -> Result<TimeInForce> {
1202 match raw {
1203 "GTC" => Ok(TimeInForce::Gtc),
1204 "IOC" => Ok(TimeInForce::Ioc),
1205 "FOK" => Ok(TimeInForce::Fok),
1206 "PostOnly" => Ok(TimeInForce::PostOnly),
1207 other => Err(MarketError::new(
1208 ErrorKind::DecodeError,
1209 format!("unsupported bybit time in force '{other}'"),
1210 )),
1211 }
1212}
1213
1214fn parse_order_status(raw: &str) -> Result<OrderStatus> {
1215 match raw {
1216 "New" => Ok(OrderStatus::New),
1217 "PartiallyFilled" => Ok(OrderStatus::PartiallyFilled),
1218 "Filled" => Ok(OrderStatus::Filled),
1219 "Cancelled" | "Canceled" => Ok(OrderStatus::Canceled),
1220 "Rejected" | "Deactivated" => Ok(OrderStatus::Rejected),
1221 other => Err(MarketError::new(
1222 ErrorKind::DecodeError,
1223 format!("unsupported bybit order status '{other}'"),
1224 )),
1225 }
1226}
1227
1228fn parse_trade_mode(mode: u8) -> MarginMode {
1229 if mode == 1 {
1230 MarginMode::Isolated
1231 } else {
1232 MarginMode::Cross
1233 }
1234}
1235
1236fn parse_margin_mode_name(raw: &str) -> Option<MarginMode> {
1237 match raw {
1238 "ISOLATED_MARGIN" => Some(MarginMode::Isolated),
1239 "REGULAR_MARGIN" | "CROSS_MARGIN" => Some(MarginMode::Cross),
1240 _ => None,
1241 }
1242}
1243
1244fn parse_position_mode(index: u8) -> PositionMode {
1245 if index == 0 {
1246 PositionMode::OneWay
1247 } else {
1248 PositionMode::Hedge
1249 }
1250}
1251
1252fn parse_instrument_status(raw: &str) -> InstrumentStatus {
1253 match raw {
1254 "Trading" => InstrumentStatus::Active,
1255 "Settling" | "Closed" | "PreLaunch" => InstrumentStatus::Halted,
1256 "Deliverying" => InstrumentStatus::Settled,
1257 _ => InstrumentStatus::Halted,
1258 }
1259}
1260
1261fn merge_string_field(target: &mut String, value: Option<String>) {
1262 if let Some(value) = value
1263 && !value.is_empty()
1264 {
1265 *target = value;
1266 }
1267}
1268
1269fn merge_optional_string_field(target: &mut Option<String>, value: Option<String>) {
1270 if let Some(value) = value
1271 && !value.is_empty()
1272 {
1273 *target = Some(value);
1274 }
1275}
1276
1277fn ensure_non_empty(value: &str, field: &str) -> Result<()> {
1278 if value.is_empty() {
1279 return Err(MarketError::new(
1280 ErrorKind::DecodeError,
1281 format!("missing bybit ticker field '{field}' after snapshot/delta merge"),
1282 ));
1283 }
1284 Ok(())
1285}
1286
1287fn parse_optional_decimal_str(raw: &str) -> Result<Option<Decimal>> {
1288 let raw = raw.trim();
1289 if raw.is_empty() {
1290 return Ok(None);
1291 }
1292 parse_decimal(raw).map(Some)
1293}
1294
1295fn decimal_scale(value: Decimal) -> u32 {
1296 value.normalize().scale()
1297}
1298
1299fn decode_error(error: serde_json::Error) -> MarketError {
1300 MarketError::new(
1301 ErrorKind::DecodeError,
1302 format!("failed to decode bybit topic payload: {error}"),
1303 )
1304 .with_venue(Venue::Bybit, Product::LinearUsdt)
1305}
1306
1307fn decode_string_error(label: &str, error: impl std::fmt::Display) -> MarketError {
1308 MarketError::new(
1309 ErrorKind::DecodeError,
1310 format!("failed to decode {label}: {error}"),
1311 )
1312 .with_venue(Venue::Bybit, Product::LinearUsdt)
1313}
1314
1315fn exchange_reject(code: i64, message: &str) -> MarketError {
1316 MarketError::new(ErrorKind::ExchangeReject, message)
1317 .with_venue(Venue::Bybit, Product::LinearUsdt)
1318 .with_native_code(code.to_string())
1319}
1320
1321fn now_timestamp_ms() -> i64 {
1322 SystemTime::now()
1323 .duration_since(UNIX_EPOCH)
1324 .map(|duration| duration.as_millis().min(i128::from(i64::MAX) as u128) as i64)
1325 .unwrap_or(0)
1326}
1327
1328#[cfg(test)]
1329mod tests {
1330 use super::BybitLinearFuturesAdapter;
1331 use bat_markets_core::{
1332 FetchOhlcvRequest, FetchTradesRequest, InstrumentId, OrderStatus, PublicLaneEvent,
1333 TimestampMs, VenueAdapter,
1334 };
1335
1336 const EXECUTION_HISTORY: &str = include_str!(concat!(
1337 env!("CARGO_MANIFEST_DIR"),
1338 "/../../fixtures/bybit/execution_history.json"
1339 ));
1340 const ORDER_HISTORY: &str = include_str!(concat!(
1341 env!("CARGO_MANIFEST_DIR"),
1342 "/../../fixtures/bybit/order_history.json"
1343 ));
1344
1345 #[test]
1346 fn parse_bybit_execution_history_snapshot() {
1347 let adapter = BybitLinearFuturesAdapter::new();
1348 let executions = adapter
1349 .parse_executions_snapshot(EXECUTION_HISTORY)
1350 .expect("bybit execution history fixture should parse");
1351 assert_eq!(executions.len(), 1);
1352 assert_eq!(executions[0].execution_id.to_string(), "bybit-exec-1");
1353 }
1354
1355 #[test]
1356 fn parse_bybit_order_history_snapshot() {
1357 let adapter = BybitLinearFuturesAdapter::new();
1358 let orders = adapter
1359 .parse_order_history_snapshot(ORDER_HISTORY, TimestampMs::new(1))
1360 .expect("bybit order history fixture should parse");
1361 assert_eq!(orders.len(), 1);
1362 assert_eq!(orders[0].status, OrderStatus::Canceled);
1363 }
1364
1365 #[test]
1366 fn parse_bybit_rest_ticker_snapshot() {
1367 let adapter = BybitLinearFuturesAdapter::new();
1368 let ticker = adapter
1369 .parse_ticker_snapshot(
1370 r#"{
1371 "retCode":0,
1372 "retMsg":"OK",
1373 "result":{"list":[{"symbol":"BTCUSDT","lastPrice":"70110.0","markPrice":"70108.5","indexPrice":"70105.0","openInterest":"30000.500","fundingRate":"0.000120","volume24h":"5432.100","turnover24h":"381000000.55","bid1Price":"70109.5","bid1Size":"2.500","ask1Price":"70110.5","ask1Size":"1.700"}]}
1374 }"#,
1375 &InstrumentId::from("BTC/USDT:USDT"),
1376 )
1377 .expect("bybit rest ticker should parse");
1378 assert_eq!(ticker.last_price.to_string(), "70110.00");
1379 }
1380
1381 #[test]
1382 fn parse_bybit_rest_trades_snapshot() {
1383 let adapter = BybitLinearFuturesAdapter::new();
1384 let trades = adapter
1385 .parse_trades_snapshot(
1386 r#"{
1387 "retCode":0,
1388 "retMsg":"OK",
1389 "result":{"list":[{"execId":"abc123","symbol":"BTCUSDT","price":"70109.9","size":"0.250","side":"Buy","time":"1710000000001"}]}
1390 }"#,
1391 &FetchTradesRequest::new(InstrumentId::from("BTC/USDT:USDT"), Some(1)),
1392 )
1393 .expect("bybit rest trades should parse");
1394 assert_eq!(trades.len(), 1);
1395 assert_eq!(trades[0].price.to_string(), "70109.90");
1396 }
1397
1398 #[test]
1399 fn parse_bybit_rest_book_top_snapshot() {
1400 let adapter = BybitLinearFuturesAdapter::new();
1401 let book_top = adapter
1402 .parse_book_top_snapshot(
1403 r#"{
1404 "retCode":0,
1405 "retMsg":"OK",
1406 "result":{"s":"BTCUSDT","b":[["70110.0","2.500"]],"a":[["70110.5","1.700"]],"ts":1710000100200,"u":10,"seq":100,"cts":1710000100195}
1407 }"#,
1408 &InstrumentId::from("BTC/USDT:USDT"),
1409 )
1410 .expect("bybit rest book top should parse");
1411 assert_eq!(book_top.bid.price.to_string(), "70110.00");
1412 assert_eq!(book_top.ask.price.to_string(), "70110.50");
1413 }
1414
1415 #[test]
1416 fn parse_bybit_ohlcv_snapshot_sorts_ascending() {
1417 let adapter = BybitLinearFuturesAdapter::new();
1418 let klines = adapter
1419 .parse_ohlcv_snapshot(
1420 r#"{
1421 "retCode":0,
1422 "retMsg":"OK",
1423 "result":{
1424 "list":[
1425 ["1710000060000","64050.0","64150.0","64000.0","64100.0","23.456","0"],
1426 ["1710000000000","64000.1","64100.0","63950.0","64050.0","12.345","0"]
1427 ]
1428 }
1429 }"#,
1430 &FetchOhlcvRequest::for_instrument(
1431 InstrumentId::from("BTC/USDT:USDT"),
1432 "1m",
1433 None,
1434 None,
1435 Some(2),
1436 ),
1437 )
1438 .expect("bybit klines snapshot should parse");
1439
1440 assert_eq!(klines.len(), 2);
1441 assert_eq!(klines[0].open_time, TimestampMs::new(1710000000000));
1442 assert_eq!(klines[1].close.to_string(), "64100.00");
1443 }
1444
1445 #[test]
1446 fn parse_bybit_public_kline_without_symbol_uses_topic_suffix() {
1447 let adapter = BybitLinearFuturesAdapter::new();
1448 let events = adapter
1449 .parse_public(
1450 r#"{
1451 "topic":"kline.1.BTCUSDT",
1452 "type":"snapshot",
1453 "ts":1710000005000,
1454 "data":[
1455 {
1456 "start":1710000000000,
1457 "end":1710000059999,
1458 "interval":"1",
1459 "open":"64000.0",
1460 "close":"64010.0",
1461 "high":"64020.0",
1462 "low":"63990.0",
1463 "volume":"12.0",
1464 "confirm":false
1465 }
1466 ]
1467 }"#,
1468 )
1469 .expect("kline payload without symbol should still parse");
1470
1471 assert_eq!(events.len(), 1);
1472 let PublicLaneEvent::Kline(kline) = &events[0] else {
1473 panic!("expected kline event");
1474 };
1475 assert_eq!(kline.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1476 assert_eq!(kline.interval.as_ref(), "1");
1477 }
1478
1479 #[test]
1480 fn parse_bybit_ticker_delta_merges_cached_snapshot() {
1481 let adapter = BybitLinearFuturesAdapter::new();
1482 adapter
1483 .parse_public(
1484 r#"{
1485 "topic":"tickers.BTCUSDT",
1486 "type":"snapshot",
1487 "ts":1710000100000,
1488 "data":{"symbol":"BTCUSDT","lastPrice":"70110.0","markPrice":"70108.5","indexPrice":"70105.0","openInterest":"30000.500","fundingRate":"0.000120","volume24h":"5432.100","turnover24h":"381000000.55"}
1489 }"#,
1490 )
1491 .expect("snapshot should parse");
1492
1493 let events = adapter
1494 .parse_public(
1495 r#"{
1496 "topic":"tickers.BTCUSDT",
1497 "type":"delta",
1498 "ts":1710000100100,
1499 "data":{"symbol":"BTCUSDT","lastPrice":"70111.0","fundingRate":""}
1500 }"#,
1501 )
1502 .expect("delta should merge with cached snapshot");
1503
1504 let ticker = match &events[0] {
1505 PublicLaneEvent::Ticker(ticker) => ticker,
1506 other => panic!("expected ticker event, got {other:?}"),
1507 };
1508 let spec = adapter
1509 .resolve_instrument(&InstrumentId::from("BTC/USDT:USDT"))
1510 .expect("btc instrument should resolve");
1511 let ticker = ticker.to_unified(&spec);
1512 assert_eq!(ticker.last_price.to_string(), "70111.00");
1513 assert_eq!(
1514 ticker
1515 .mark_price
1516 .expect("mark price should stay cached")
1517 .to_string(),
1518 "70108.50"
1519 );
1520 }
1521
1522 #[test]
1523 fn parse_bybit_wallet_snapshot_tolerates_empty_available_balances() {
1524 let adapter = BybitLinearFuturesAdapter::new();
1525 let snapshot = adapter
1526 .parse_account_snapshot(
1527 r#"{
1528 "retCode":0,
1529 "retMsg":"OK",
1530 "result":{
1531 "list":[
1532 {
1533 "accountType":"UNIFIED",
1534 "totalWalletBalance":"125.5",
1535 "totalAvailableBalance":"",
1536 "totalPerpUPL":"",
1537 "coin":[
1538 {"coin":"USDT","walletBalance":"125.5","availableToWithdraw":""}
1539 ]
1540 }
1541 ]
1542 }
1543 }"#,
1544 TimestampMs::new(1710000000000),
1545 )
1546 .expect("wallet snapshot with empty optional balances should parse");
1547
1548 assert_eq!(snapshot.balances.len(), 1);
1549 assert_eq!(snapshot.balances[0].available_balance.to_string(), "0");
1550 let summary = snapshot.summary.expect("summary should be present");
1551 assert_eq!(summary.total_available_balance.to_string(), "0");
1552 assert_eq!(summary.total_unrealized_pnl.to_string(), "0");
1553 }
1554
1555 #[test]
1556 fn parse_bybit_position_snapshot_tolerates_empty_optional_numeric_fields() {
1557 let adapter = BybitLinearFuturesAdapter::new();
1558 let positions = adapter
1559 .parse_positions_snapshot(
1560 r#"{
1561 "retCode":0,
1562 "retMsg":"OK",
1563 "result":{
1564 "list":[
1565 {
1566 "symbol":"BTCUSDT",
1567 "side":"Buy",
1568 "size":"0.010",
1569 "entryPrice":"",
1570 "unrealisedPnl":"",
1571 "tradeMode":0,
1572 "positionIdx":1,
1573 "leverage":""
1574 }
1575 ]
1576 }
1577 }"#,
1578 TimestampMs::new(1710000000000),
1579 )
1580 .expect("position snapshot with empty optional decimals should parse");
1581
1582 assert_eq!(positions.len(), 1);
1583 assert!(positions[0].entry_price.is_none());
1584 assert!(positions[0].unrealized_pnl.is_none());
1585 assert!(positions[0].leverage.is_none());
1586 }
1587
1588 #[test]
1589 fn parse_bybit_metadata_snapshot_keeps_linear_perpetual_and_skips_dated_futures() {
1590 let adapter = BybitLinearFuturesAdapter::new();
1591 let instruments = adapter
1592 .parse_metadata_snapshot(
1593 r#"{
1594 "retCode":0,
1595 "retMsg":"OK",
1596 "result":{
1597 "list":[
1598 {
1599 "symbol":"BTCUSDT",
1600 "contractType":"LinearPerpetual",
1601 "status":"Trading",
1602 "baseCoin":"BTC",
1603 "quoteCoin":"USDT",
1604 "settleCoin":"USDT",
1605 "priceScale":"2",
1606 "priceFilter":{"tickSize":"0.10"},
1607 "lotSizeFilter":{"qtyStep":"0.001","minOrderQty":"0.001","minNotionalValue":"5"},
1608 "leverageFilter":{"maxLeverage":"100.00"}
1609 },
1610 {
1611 "symbol":"BTCUSDT-29MAY26",
1612 "contractType":"LinearFutures",
1613 "status":"Trading",
1614 "baseCoin":"BTC",
1615 "quoteCoin":"USDT",
1616 "settleCoin":"USDT",
1617 "priceScale":"2",
1618 "priceFilter":{"tickSize":"0.10"},
1619 "lotSizeFilter":{"qtyStep":"0.001","minOrderQty":"0.001","minNotionalValue":"5"},
1620 "leverageFilter":{"maxLeverage":"50.00"}
1621 }
1622 ]
1623 }
1624 }"#,
1625 )
1626 .expect("metadata snapshot should parse");
1627
1628 assert_eq!(instruments.len(), 1);
1629 assert_eq!(instruments[0].native_symbol.as_ref(), "BTCUSDT");
1630 assert_eq!(
1631 instruments[0].instrument_id,
1632 InstrumentId::from("BTC/USDT:USDT")
1633 );
1634 }
1635}