Skip to main content

bat_markets_bybit/
lib.rs

1//! Bybit linear futures adapter.
2
3pub mod native;
4
5use std::{
6    collections::BTreeMap,
7    sync::Arc,
8    time::{SystemTime, UNIX_EPOCH},
9};
10
11use parking_lot::RwLock;
12use rust_decimal::Decimal;
13use serde::Deserialize;
14
15use bat_markets_core::{
16    AccountSnapshot, AggressorSide, AssetCode, Balance, BatMarketsConfig, CapabilitySet,
17    CommandOperation, CommandReceipt, CommandStatus, ErrorKind, Execution, FastBookTop, FastKline,
18    FastLiquidation, FastMarkPrice, FastOrderBookDelta, FastTicker, FastTrade, FetchOhlcvRequest,
19    FetchTradesRequest, FundingRate, InstrumentCatalog, InstrumentId, InstrumentSpec,
20    InstrumentStatus, InstrumentSupport, Kline, KlineInterval, Leverage, MarginMode, MarketError,
21    MarketType, Notional, OpenInterest, Order, OrderId, OrderStatus, OrderType, Position,
22    PositionDirection, PositionId, PositionMode, Price, PrivateLaneEvent, Product, PublicLaneEvent,
23    Quantity, Rate, RequestId, Result, Side, Ticker, TimeInForce, TimestampMs, TradeId, Venue,
24    VenueAdapter,
25};
26
27/// Bybit account context discovered from authenticated endpoints.
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub struct BybitAccountContext {
30    pub wallet_account_type: Box<str>,
31    pub margin_mode: Option<MarginMode>,
32}
33
34/// Bybit linear futures adapter with a handwritten, fixture-backed contract.
35#[derive(Clone, Debug)]
36pub struct BybitLinearFuturesAdapter {
37    config: BatMarketsConfig,
38    capabilities: CapabilitySet,
39    lane_set: bat_markets_core::LaneSet,
40    instruments: Arc<RwLock<InstrumentCatalog>>,
41    ticker_cache: Arc<RwLock<BTreeMap<String, native::TickerData>>>,
42}
43
44impl Default for BybitLinearFuturesAdapter {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50impl BybitLinearFuturesAdapter {
51    #[must_use]
52    pub fn new() -> Self {
53        Self::with_config(BatMarketsConfig::new(Venue::Bybit, Product::LinearUsdt))
54    }
55
56    #[must_use]
57    pub fn with_config(config: BatMarketsConfig) -> Self {
58        Self {
59            config,
60            capabilities: CapabilitySet::linear_futures_defaults(),
61            lane_set: bat_markets_core::LaneSet::linear_futures_defaults(),
62            instruments: Arc::new(RwLock::new(InstrumentCatalog::new([
63                btc_spec(),
64                eth_spec(),
65            ]))),
66            ticker_cache: Arc::new(RwLock::new(BTreeMap::new())),
67        }
68    }
69
70    pub fn replace_instruments(&self, instruments: Vec<InstrumentSpec>) {
71        self.instruments.write().replace(instruments);
72    }
73
74    pub fn parse_native_public(&self, payload: &str) -> Result<native::PublicEnvelope> {
75        serde_json::from_str(payload).map_err(|error| {
76            MarketError::new(
77                ErrorKind::DecodeError,
78                format!("failed to parse bybit public payload: {error}"),
79            )
80            .with_venue(Venue::Bybit, Product::LinearUsdt)
81            .with_operation("bybit.parse_native_public")
82        })
83    }
84
85    pub fn parse_native_private(&self, payload: &str) -> Result<native::PrivateEnvelope> {
86        serde_json::from_str(payload).map_err(|error| {
87            MarketError::new(
88                ErrorKind::DecodeError,
89                format!("failed to parse bybit private payload: {error}"),
90            )
91            .with_venue(Venue::Bybit, Product::LinearUsdt)
92            .with_operation("bybit.parse_native_private")
93        })
94    }
95
96    fn merged_ticker_data(&self, envelope: &native::PublicEnvelope) -> Result<native::TickerData> {
97        let patch = serde_json::from_value::<native::TickerPatch>(envelope.data.clone())
98            .map_err(decode_error)?;
99        let symbol = patch
100            .symbol
101            .clone()
102            .or_else(|| envelope.topic.strip_prefix("tickers.").map(str::to_owned))
103            .ok_or_else(|| {
104                MarketError::new(
105                    ErrorKind::DecodeError,
106                    format!(
107                        "failed to derive bybit ticker symbol from topic '{}'",
108                        envelope.topic
109                    ),
110                )
111            })?;
112
113        let mut cache = self.ticker_cache.write();
114        let entry = cache
115            .entry(symbol.clone())
116            .or_insert_with(|| native::TickerData {
117                symbol: symbol.clone(),
118                last_price: String::new(),
119                mark_price: String::new(),
120                index_price: String::new(),
121                open_interest: String::new(),
122                funding_rate: String::new(),
123                volume_24h: String::new(),
124                turnover_24h: String::new(),
125                bid1_price: None,
126                bid1_size: None,
127                ask1_price: None,
128                ask1_size: None,
129            });
130
131        merge_string_field(&mut entry.last_price, patch.last_price);
132        merge_string_field(&mut entry.mark_price, patch.mark_price);
133        merge_string_field(&mut entry.index_price, patch.index_price);
134        merge_string_field(&mut entry.open_interest, patch.open_interest);
135        merge_string_field(&mut entry.funding_rate, patch.funding_rate);
136        merge_string_field(&mut entry.volume_24h, patch.volume_24h);
137        merge_string_field(&mut entry.turnover_24h, patch.turnover_24h);
138        merge_optional_string_field(&mut entry.bid1_price, patch.bid1_price);
139        merge_optional_string_field(&mut entry.bid1_size, patch.bid1_size);
140        merge_optional_string_field(&mut entry.ask1_price, patch.ask1_price);
141        merge_optional_string_field(&mut entry.ask1_size, patch.ask1_size);
142
143        ensure_non_empty(&entry.last_price, "lastPrice")?;
144
145        Ok(entry.clone())
146    }
147
148    pub fn parse_server_time(&self, payload: &str) -> Result<TimestampMs> {
149        let response =
150            serde_json::from_str::<native::ServerTimeResponse>(payload).map_err(decode_error)?;
151        if response.ret_code != 0 {
152            return Err(exchange_reject(response.ret_code, &response.ret_msg));
153        }
154        let nanos = response
155            .result
156            .time_nano
157            .parse::<i128>()
158            .map_err(|error| decode_string_error("bybit server time nano", error))?;
159        Ok(TimestampMs::new((nanos / 1_000_000_i128) as i64))
160    }
161
162    pub fn parse_metadata_snapshot(&self, payload: &str) -> Result<Vec<InstrumentSpec>> {
163        let response = serde_json::from_str::<native::InstrumentsInfoResponse>(payload)
164            .map_err(decode_error)?;
165        if response.ret_code != 0 {
166            return Err(exchange_reject(response.ret_code, &response.ret_msg));
167        }
168
169        let mut instruments = Vec::new();
170        for instrument in response.result.list {
171            if instrument.contract_type != "LinearPerpetual" {
172                continue;
173            }
174            let tick_size = parse_decimal(&instrument.price_filter.tick_size)?;
175            let qty_step = parse_decimal(&instrument.lot_size_filter.qty_step)?;
176            let min_qty = parse_decimal(&instrument.lot_size_filter.min_order_qty)?;
177            let min_notional = parse_decimal(&instrument.lot_size_filter.min_notional_value)?;
178            let price_scale = decimal_scale(tick_size);
179            let qty_scale = decimal_scale(qty_step);
180            let quote_scale = price_scale
181                .saturating_add(qty_scale)
182                .max(decimal_scale(min_notional));
183
184            instruments.push(InstrumentSpec {
185                venue: Venue::Bybit,
186                product: Product::LinearUsdt,
187                market_type: MarketType::LinearPerpetual,
188                instrument_id: InstrumentId::from(canonical_symbol(
189                    &instrument.base_coin,
190                    &instrument.quote_coin,
191                    &instrument.settle_coin,
192                )),
193                canonical_symbol: canonical_symbol(
194                    &instrument.base_coin,
195                    &instrument.quote_coin,
196                    &instrument.settle_coin,
197                )
198                .into(),
199                native_symbol: instrument.symbol.into(),
200                base: AssetCode::from(instrument.base_coin),
201                quote: AssetCode::from(instrument.quote_coin),
202                settle: AssetCode::from(instrument.settle_coin),
203                contract_size: Quantity::new(Decimal::ONE),
204                tick_size: Price::new(tick_size),
205                step_size: Quantity::new(qty_step),
206                min_qty: Quantity::new(min_qty),
207                min_notional: Notional::new(min_notional),
208                price_scale,
209                qty_scale,
210                quote_scale,
211                max_leverage: Some(Leverage::new(parse_decimal(
212                    &instrument.leverage_filter.max_leverage,
213                )?)),
214                support: InstrumentSupport {
215                    public_streams: true,
216                    private_trading: true,
217                    leverage_set: true,
218                    margin_mode_set: true,
219                    funding_rate: true,
220                    open_interest: true,
221                },
222                status: parse_instrument_status(&instrument.status),
223            });
224        }
225
226        Ok(instruments)
227    }
228
229    pub fn parse_account_context(&self, payload: &str) -> Result<BybitAccountContext> {
230        let response =
231            serde_json::from_str::<native::AccountInfoResponse>(payload).map_err(decode_error)?;
232        if response.ret_code != 0 {
233            return Err(exchange_reject(response.ret_code, &response.ret_msg));
234        }
235
236        Ok(BybitAccountContext {
237            wallet_account_type: if response.result.unified_margin_status.unwrap_or(0) > 0 {
238                "UNIFIED".into()
239            } else {
240                "CONTRACT".into()
241            },
242            margin_mode: response
243                .result
244                .margin_mode
245                .as_deref()
246                .and_then(parse_margin_mode_name),
247        })
248    }
249
250    pub fn parse_account_snapshot(
251        &self,
252        payload: &str,
253        observed_at: TimestampMs,
254    ) -> Result<AccountSnapshot> {
255        let response =
256            serde_json::from_str::<native::WalletBalanceResponse>(payload).map_err(decode_error)?;
257        if response.ret_code != 0 {
258            return Err(exchange_reject(response.ret_code, &response.ret_msg));
259        }
260
261        let account = response.result.list.into_iter().next().ok_or_else(|| {
262            MarketError::new(
263                ErrorKind::DecodeError,
264                "missing bybit wallet balance account",
265            )
266        })?;
267        let balances = account
268            .coin
269            .into_iter()
270            .map(|coin| {
271                Ok(Balance {
272                    asset: AssetCode::from(coin.coin),
273                    wallet_balance: parse_decimal(&coin.wallet_balance)?.into(),
274                    available_balance: parse_decimal_or_zero_on_empty(&coin.available_to_withdraw)?
275                        .into(),
276                    updated_at: observed_at,
277                })
278            })
279            .collect::<Result<Vec<_>>>()?;
280
281        Ok(AccountSnapshot {
282            balances,
283            summary: Some(bat_markets_core::AccountSummary {
284                total_wallet_balance: parse_decimal(&account.total_wallet_balance)?.into(),
285                total_available_balance: parse_decimal_or_zero_on_empty(
286                    &account.total_available_balance,
287                )?
288                .into(),
289                total_unrealized_pnl: parse_optional_decimal(account.total_perp_upl.as_deref())?
290                    .unwrap_or(Decimal::ZERO)
291                    .into(),
292                updated_at: observed_at,
293            }),
294        })
295    }
296
297    pub fn parse_positions_snapshot(
298        &self,
299        payload: &str,
300        observed_at: TimestampMs,
301    ) -> Result<Vec<Position>> {
302        let response =
303            serde_json::from_str::<native::PositionListResponse>(payload).map_err(decode_error)?;
304        if response.ret_code != 0 {
305            return Err(exchange_reject(response.ret_code, &response.ret_msg));
306        }
307
308        response
309            .result
310            .list
311            .into_iter()
312            .filter(|position| position.size != "0")
313            .map(|position| self.position_from_snapshot(position, observed_at))
314            .collect()
315    }
316
317    pub fn parse_open_orders_snapshot(
318        &self,
319        payload: &str,
320        observed_at: TimestampMs,
321    ) -> Result<Vec<Order>> {
322        let response =
323            serde_json::from_str::<native::OrderListResponse>(payload).map_err(decode_error)?;
324        if response.ret_code != 0 {
325            return Err(exchange_reject(response.ret_code, &response.ret_msg));
326        }
327
328        response
329            .result
330            .list
331            .into_iter()
332            .map(|order| self.order_from_snapshot(order, observed_at))
333            .collect()
334    }
335
336    pub fn parse_order_snapshot(&self, payload: &str, observed_at: TimestampMs) -> Result<Order> {
337        let mut orders = self.parse_open_orders_snapshot(payload, observed_at)?;
338        orders.pop().ok_or_else(|| {
339            MarketError::new(
340                ErrorKind::DecodeError,
341                "missing bybit order snapshot entry in response",
342            )
343        })
344    }
345
346    pub fn parse_order_history_snapshot(
347        &self,
348        payload: &str,
349        observed_at: TimestampMs,
350    ) -> Result<Vec<Order>> {
351        self.parse_open_orders_snapshot(payload, observed_at)
352    }
353
354    pub fn parse_executions_snapshot(&self, payload: &str) -> Result<Vec<Execution>> {
355        let response =
356            serde_json::from_str::<native::ExecutionListResponse>(payload).map_err(decode_error)?;
357        if response.ret_code != 0 {
358            return Err(exchange_reject(response.ret_code, &response.ret_msg));
359        }
360
361        response
362            .result
363            .list
364            .into_iter()
365            .map(|execution| self.execution_from_snapshot(execution))
366            .collect()
367    }
368
369    pub fn parse_ticker_snapshot(
370        &self,
371        payload: &str,
372        instrument_id: &InstrumentId,
373    ) -> Result<Ticker> {
374        let response =
375            serde_json::from_str::<native::MarketTickersResponse>(payload).map_err(decode_error)?;
376        if response.ret_code != 0 {
377            return Err(exchange_reject(response.ret_code, &response.ret_msg));
378        }
379        let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
380            MarketError::new(
381                ErrorKind::Unsupported,
382                format!("unsupported bybit instrument '{}'", instrument_id),
383            )
384            .with_venue(Venue::Bybit, Product::LinearUsdt)
385        })?;
386        let data = response.result.list.into_iter().next().ok_or_else(|| {
387            MarketError::new(
388                ErrorKind::DecodeError,
389                "missing bybit ticker entry in market/tickers response",
390            )
391        })?;
392
393        Ok(FastTicker {
394            instrument_id: spec.instrument_id.clone(),
395            last_price: Price::new(parse_decimal(&data.last_price)?).quantize(spec.price_scale)?,
396            mark_price: Some(
397                Price::new(parse_decimal(&data.mark_price)?).quantize(spec.price_scale)?,
398            ),
399            index_price: Some(
400                Price::new(parse_decimal(&data.index_price)?).quantize(spec.price_scale)?,
401            ),
402            volume_24h: Some(
403                Quantity::new(parse_decimal(&data.volume_24h)?).quantize(spec.qty_scale)?,
404            ),
405            turnover_24h: Some(
406                Notional::new(parse_decimal(&data.turnover_24h)?).quantize(spec.quote_scale)?,
407            ),
408            event_time: TimestampMs::new(now_timestamp_ms()),
409        }
410        .to_unified(&spec))
411    }
412
413    pub fn parse_trades_snapshot(
414        &self,
415        payload: &str,
416        request: &FetchTradesRequest,
417    ) -> Result<Vec<bat_markets_core::TradeTick>> {
418        let response = serde_json::from_str::<native::RecentPublicTradesResponse>(payload)
419            .map_err(decode_error)?;
420        if response.ret_code != 0 {
421            return Err(exchange_reject(response.ret_code, &response.ret_msg));
422        }
423        let spec = self
424            .resolve_instrument(&request.instrument_id)
425            .ok_or_else(|| {
426                MarketError::new(
427                    ErrorKind::Unsupported,
428                    format!("unsupported bybit instrument '{}'", request.instrument_id),
429                )
430                .with_venue(Venue::Bybit, Product::LinearUsdt)
431            })?;
432
433        response
434            .result
435            .list
436            .into_iter()
437            .map(|trade| {
438                let trade_time = trade
439                    .time
440                    .parse::<i64>()
441                    .map_err(|error| decode_string_error("bybit trade time", error))?;
442                Ok(FastTrade {
443                    instrument_id: spec.instrument_id.clone(),
444                    trade_id: TradeId::from(trade.exec_id),
445                    price: Price::new(parse_decimal(&trade.price)?).quantize(spec.price_scale)?,
446                    quantity: Quantity::new(parse_decimal(&trade.size)?)
447                        .quantize(spec.qty_scale)?,
448                    aggressor_side: parse_aggressor(&trade.side)?,
449                    event_time: TimestampMs::new(trade_time),
450                }
451                .to_unified(&spec))
452            })
453            .collect()
454    }
455
456    pub fn parse_book_top_snapshot(
457        &self,
458        payload: &str,
459        instrument_id: &InstrumentId,
460    ) -> Result<bat_markets_core::BookTop> {
461        let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
462            MarketError::new(
463                ErrorKind::Unsupported,
464                format!("unsupported bybit instrument '{}'", instrument_id),
465            )
466            .with_venue(Venue::Bybit, Product::LinearUsdt)
467        })?;
468        let response =
469            serde_json::from_str::<native::OrderBookResponse>(payload).map_err(decode_error)?;
470        if response.ret_code != 0 {
471            return Err(exchange_reject(response.ret_code, &response.ret_msg));
472        }
473        let bid =
474            response.result.bids.first().ok_or_else(|| {
475                MarketError::new(ErrorKind::DecodeError, "missing bybit best bid")
476            })?;
477        let ask =
478            response.result.asks.first().ok_or_else(|| {
479                MarketError::new(ErrorKind::DecodeError, "missing bybit best ask")
480            })?;
481
482        Ok(FastBookTop {
483            instrument_id: spec.instrument_id.clone(),
484            bid_price: Price::new(parse_decimal(&bid[0])?).quantize(spec.price_scale)?,
485            bid_quantity: Quantity::new(parse_decimal(&bid[1])?).quantize(spec.qty_scale)?,
486            ask_price: Price::new(parse_decimal(&ask[0])?).quantize(spec.price_scale)?,
487            ask_quantity: Quantity::new(parse_decimal(&ask[1])?).quantize(spec.qty_scale)?,
488            event_time: TimestampMs::new(
489                response
490                    .result
491                    .cts
492                    .or(response.result.ts)
493                    .unwrap_or_else(now_timestamp_ms),
494            ),
495        }
496        .to_unified(&spec))
497    }
498
499    pub fn parse_ohlcv_snapshot(
500        &self,
501        payload: &str,
502        request: &FetchOhlcvRequest,
503    ) -> Result<Vec<Kline>> {
504        #[derive(Clone, Debug, Deserialize)]
505        struct KlineListResponse {
506            #[serde(rename = "retCode")]
507            ret_code: i64,
508            #[serde(rename = "retMsg")]
509            ret_msg: String,
510            result: KlineListResult,
511        }
512
513        #[derive(Clone, Debug, Deserialize)]
514        struct KlineListResult {
515            list: Vec<[String; 7]>,
516        }
517
518        let interval = KlineInterval::parse(request.interval.as_ref()).ok_or_else(|| {
519            MarketError::new(
520                ErrorKind::Unsupported,
521                format!("unsupported bybit OHLCV interval '{}'", request.interval),
522            )
523            .with_venue(Venue::Bybit, Product::LinearUsdt)
524        })?;
525        let instrument_id = request.single_instrument_id()?;
526        let spec = self.resolve_instrument(instrument_id).ok_or_else(|| {
527            MarketError::new(
528                ErrorKind::Unsupported,
529                format!("unsupported bybit instrument '{}'", instrument_id),
530            )
531            .with_venue(Venue::Bybit, Product::LinearUsdt)
532        })?;
533        let response = serde_json::from_str::<KlineListResponse>(payload).map_err(decode_error)?;
534        if response.ret_code != 0 {
535            return Err(exchange_reject(response.ret_code, &response.ret_msg));
536        }
537
538        let mut klines = response
539            .result
540            .list
541            .into_iter()
542            .map(|row| {
543                let open_time = row[0]
544                    .parse::<i64>()
545                    .map_err(|error| decode_string_error("bybit kline startTime", error))?;
546                let close_time = interval.close_time_ms(open_time).ok_or_else(|| {
547                    MarketError::new(
548                        ErrorKind::DecodeError,
549                        format!(
550                            "failed to derive bybit kline close time for interval '{}'",
551                            Box::<str>::from(interval)
552                        ),
553                    )
554                    .with_venue(Venue::Bybit, Product::LinearUsdt)
555                })?;
556                Ok(Kline {
557                    instrument_id: spec.instrument_id.clone(),
558                    interval: Box::<str>::from(interval),
559                    open: spec.price_from_fast(
560                        Price::new(parse_decimal(&row[1])?).quantize(spec.price_scale)?,
561                    ),
562                    high: spec.price_from_fast(
563                        Price::new(parse_decimal(&row[2])?).quantize(spec.price_scale)?,
564                    ),
565                    low: spec.price_from_fast(
566                        Price::new(parse_decimal(&row[3])?).quantize(spec.price_scale)?,
567                    ),
568                    close: spec.price_from_fast(
569                        Price::new(parse_decimal(&row[4])?).quantize(spec.price_scale)?,
570                    ),
571                    volume: spec.quantity_from_fast(
572                        Quantity::new(parse_decimal(&row[5])?).quantize(spec.qty_scale)?,
573                    ),
574                    open_time: TimestampMs::new(open_time),
575                    close_time: TimestampMs::new(close_time),
576                    closed: close_time < now_timestamp_ms(),
577                })
578            })
579            .collect::<Result<Vec<_>>>()?;
580        klines.sort_by_key(|kline| kline.open_time.value());
581        Ok(klines)
582    }
583
584    fn position_from_snapshot(
585        &self,
586        position: native::PositionData,
587        observed_at: TimestampMs,
588    ) -> Result<Position> {
589        let spec = self.require_native_symbol(&position.symbol)?;
590        let side = parse_side(&position.side)?;
591        Ok(Position {
592            position_id: PositionId::from(format!(
593                "bybit:{}:{}",
594                position.symbol, position.position_idx
595            )),
596            instrument_id: spec.instrument_id.clone(),
597            direction: match side {
598                Side::Buy => PositionDirection::Long,
599                Side::Sell => PositionDirection::Short,
600            },
601            size: Quantity::new(parse_decimal(&position.size)?),
602            entry_price: parse_optional_decimal_str(&position.entry_price)?.map(Price::new),
603            mark_price: None,
604            unrealized_pnl: parse_optional_decimal_str(&position.unrealised_pnl)?.map(Into::into),
605            leverage: parse_optional_decimal(position.leverage.as_deref())?.map(Leverage::new),
606            margin_mode: parse_trade_mode(position.trade_mode),
607            position_mode: parse_position_mode(position.position_idx),
608            updated_at: observed_at,
609        })
610    }
611
612    fn order_from_snapshot(
613        &self,
614        order: native::OrderData,
615        observed_at: TimestampMs,
616    ) -> Result<Order> {
617        let spec = self.require_native_symbol(&order.symbol)?;
618        let created_at = if order.created_time > 0 {
619            TimestampMs::new(order.created_time)
620        } else {
621            observed_at
622        };
623        let updated_at = if order.updated_time > 0 {
624            TimestampMs::new(order.updated_time)
625        } else {
626            observed_at
627        };
628        Ok(Order {
629            order_id: OrderId::from(order.order_id),
630            client_order_id: order.order_link_id.map(Into::into),
631            instrument_id: spec.instrument_id.clone(),
632            side: parse_side(&order.side)?,
633            order_type: parse_order_type(&order.order_type)?,
634            time_in_force: Some(parse_time_in_force(&order.time_in_force)?),
635            status: parse_order_status(&order.order_status)?,
636            price: parse_optional_decimal_str(&order.price)?.map(Price::new),
637            quantity: Quantity::new(parse_decimal(&order.quantity)?),
638            filled_quantity: Quantity::new(parse_decimal(&order.cumulative_exec_qty)?),
639            average_fill_price: parse_optional_decimal(order.average_price.as_deref())?
640                .map(Price::new),
641            reduce_only: order.reduce_only,
642            post_only: matches!(order.time_in_force.as_str(), "PostOnly"),
643            created_at,
644            updated_at,
645            venue_status: Some(order.order_status.into()),
646        })
647    }
648
649    fn execution_from_snapshot(&self, execution: native::ExecutionData) -> Result<Execution> {
650        let spec = self.require_native_symbol(&execution.symbol)?;
651        Ok(Execution {
652            execution_id: TradeId::from(execution.exec_id),
653            order_id: OrderId::from(execution.order_id),
654            client_order_id: execution.order_link_id.map(Into::into),
655            instrument_id: spec.instrument_id.clone(),
656            side: parse_side(&execution.side)?,
657            quantity: Quantity::new(parse_decimal(&execution.exec_qty)?),
658            price: Price::new(parse_decimal(&execution.exec_price)?),
659            fee: Some(parse_decimal(&execution.exec_fee)?.into()),
660            fee_asset: execution.fee_currency.map(AssetCode::from),
661            liquidity: None,
662            executed_at: TimestampMs::new(execution.exec_time),
663        })
664    }
665
666    fn require_native_symbol(&self, native_symbol: &str) -> Result<InstrumentSpec> {
667        self.resolve_native_symbol(native_symbol).ok_or_else(|| {
668            MarketError::new(
669                ErrorKind::Unsupported,
670                format!("unsupported bybit symbol '{native_symbol}'"),
671            )
672            .with_venue(Venue::Bybit, Product::LinearUsdt)
673        })
674    }
675}
676
677impl VenueAdapter for BybitLinearFuturesAdapter {
678    fn venue(&self) -> Venue {
679        Venue::Bybit
680    }
681
682    fn product(&self) -> Product {
683        Product::LinearUsdt
684    }
685
686    fn config(&self) -> &BatMarketsConfig {
687        &self.config
688    }
689
690    fn capabilities(&self) -> CapabilitySet {
691        self.capabilities
692    }
693
694    fn lane_set(&self) -> bat_markets_core::LaneSet {
695        self.lane_set
696    }
697
698    fn instrument_specs(&self) -> Vec<InstrumentSpec> {
699        self.instruments.read().all()
700    }
701
702    fn resolve_instrument(&self, instrument_id: &InstrumentId) -> Option<InstrumentSpec> {
703        self.instruments.read().get(instrument_id)
704    }
705
706    fn resolve_native_symbol(&self, native_symbol: &str) -> Option<InstrumentSpec> {
707        self.instruments.read().by_native_symbol(native_symbol)
708    }
709
710    fn parse_public(&self, payload: &str) -> Result<Vec<PublicLaneEvent>> {
711        let envelope = self.parse_native_public(payload)?;
712        if envelope.topic.starts_with("tickers.") {
713            let data = self.merged_ticker_data(&envelope)?;
714            let spec = self.require_native_symbol(&data.symbol)?;
715            let mut events = Vec::with_capacity(4);
716            events.push(PublicLaneEvent::Ticker(FastTicker {
717                instrument_id: spec.instrument_id.clone(),
718                last_price: Price::new(parse_decimal(&data.last_price)?)
719                    .quantize(spec.price_scale)?,
720                mark_price: parse_optional_decimal_str(&data.mark_price)?
721                    .map(|value| Price::new(value).quantize(spec.price_scale))
722                    .transpose()?,
723                index_price: parse_optional_decimal_str(&data.index_price)?
724                    .map(|value| Price::new(value).quantize(spec.price_scale))
725                    .transpose()?,
726                volume_24h: parse_optional_decimal_str(&data.volume_24h)?
727                    .map(|value| Quantity::new(value).quantize(spec.qty_scale))
728                    .transpose()?,
729                turnover_24h: parse_optional_decimal_str(&data.turnover_24h)?
730                    .map(|value| Notional::new(value).quantize(spec.quote_scale))
731                    .transpose()?,
732                event_time: TimestampMs::new(envelope.ts),
733            }));
734            if let Some(mark_price) = parse_optional_decimal_str(&data.mark_price)? {
735                events.push(PublicLaneEvent::MarkPrice(FastMarkPrice {
736                    instrument_id: spec.instrument_id.clone(),
737                    price: Price::new(mark_price).quantize(spec.price_scale)?,
738                    funding_rate: parse_optional_decimal_str(&data.funding_rate)?.map(Rate::new),
739                    event_time: TimestampMs::new(envelope.ts),
740                }));
741            }
742            if let Some(funding_rate) = parse_optional_decimal_str(&data.funding_rate)? {
743                events.push(PublicLaneEvent::FundingRate(FundingRate {
744                    instrument_id: spec.instrument_id.clone(),
745                    value: Rate::new(funding_rate),
746                    mark_price: parse_optional_decimal_str(&data.mark_price)?.map(Price::new),
747                    event_time: TimestampMs::new(envelope.ts),
748                }));
749            }
750            if let Some(open_interest) = parse_optional_decimal_str(&data.open_interest)? {
751                events.push(PublicLaneEvent::OpenInterest(OpenInterest {
752                    instrument_id: spec.instrument_id.clone(),
753                    value: Quantity::new(open_interest),
754                    event_time: TimestampMs::new(envelope.ts),
755                }));
756            }
757            return Ok(events);
758        }
759
760        if envelope.topic.starts_with("publicTrade.") {
761            let data: Vec<native::PublicTradeData> =
762                serde_json::from_value(envelope.data).map_err(decode_error)?;
763            let mut events = Vec::with_capacity(data.len());
764            for trade in data {
765                let spec = self.require_native_symbol(&trade.symbol)?;
766                events.push(PublicLaneEvent::Trade(FastTrade {
767                    instrument_id: spec.instrument_id.clone(),
768                    trade_id: TradeId::from(trade.trade_id),
769                    price: Price::new(parse_decimal(&trade.price)?).quantize(spec.price_scale)?,
770                    quantity: Quantity::new(parse_decimal(&trade.quantity)?)
771                        .quantize(spec.qty_scale)?,
772                    aggressor_side: parse_aggressor(&trade.side)?,
773                    event_time: TimestampMs::new(trade.trade_time),
774                }));
775            }
776            return Ok(events);
777        }
778
779        if envelope.topic.starts_with("orderbook.") {
780            let data: native::OrderBookData =
781                serde_json::from_value(envelope.data).map_err(decode_error)?;
782            let spec = self.require_native_symbol(&data.symbol)?;
783            let bid = data.bids.first().ok_or_else(|| {
784                MarketError::new(ErrorKind::DecodeError, "missing bybit best bid")
785            })?;
786            let ask = data.asks.first().ok_or_else(|| {
787                MarketError::new(ErrorKind::DecodeError, "missing bybit best ask")
788            })?;
789            return Ok(vec![
790                PublicLaneEvent::BookTop(FastBookTop {
791                    instrument_id: spec.instrument_id.clone(),
792                    bid_price: Price::new(parse_decimal(&bid[0])?).quantize(spec.price_scale)?,
793                    bid_quantity: Quantity::new(parse_decimal(&bid[1])?)
794                        .quantize(spec.qty_scale)?,
795                    ask_price: Price::new(parse_decimal(&ask[0])?).quantize(spec.price_scale)?,
796                    ask_quantity: Quantity::new(parse_decimal(&ask[1])?)
797                        .quantize(spec.qty_scale)?,
798                    event_time: TimestampMs::new(envelope.ts),
799                }),
800                PublicLaneEvent::OrderBookDelta(FastOrderBookDelta {
801                    instrument_id: spec.instrument_id.clone(),
802                    bids: data
803                        .bids
804                        .iter()
805                        .map(|level| {
806                            Ok((
807                                Price::new(parse_decimal(&level[0])?).quantize(spec.price_scale)?,
808                                Quantity::new(parse_decimal(&level[1])?)
809                                    .quantize(spec.qty_scale)?,
810                            ))
811                        })
812                        .collect::<Result<Vec<_>>>()?,
813                    asks: data
814                        .asks
815                        .iter()
816                        .map(|level| {
817                            Ok((
818                                Price::new(parse_decimal(&level[0])?).quantize(spec.price_scale)?,
819                                Quantity::new(parse_decimal(&level[1])?)
820                                    .quantize(spec.qty_scale)?,
821                            ))
822                        })
823                        .collect::<Result<Vec<_>>>()?,
824                    event_time: TimestampMs::new(envelope.ts),
825                }),
826            ]);
827        }
828
829        if envelope.topic.starts_with("allLiquidation.") {
830            let data: Vec<native::AllLiquidationData> =
831                serde_json::from_value(envelope.data).map_err(decode_error)?;
832            let mut events = Vec::with_capacity(data.len());
833            for liquidation in data {
834                let spec = self.require_native_symbol(&liquidation.symbol)?;
835                events.push(PublicLaneEvent::Liquidation(FastLiquidation {
836                    instrument_id: spec.instrument_id.clone(),
837                    side: parse_side(&liquidation.side)?,
838                    price: Price::new(parse_decimal(&liquidation.price)?)
839                        .quantize(spec.price_scale)?,
840                    quantity: Quantity::new(parse_decimal(&liquidation.quantity)?)
841                        .quantize(spec.qty_scale)?,
842                    event_time: TimestampMs::new(liquidation.updated_time),
843                }));
844            }
845            return Ok(events);
846        }
847
848        if envelope.topic.starts_with("kline.") {
849            let topic_symbol = envelope.topic.rsplit('.').next().ok_or_else(|| {
850                MarketError::new(
851                    ErrorKind::DecodeError,
852                    format!(
853                        "failed to derive bybit kline symbol from topic '{}'",
854                        envelope.topic
855                    ),
856                )
857                .with_venue(Venue::Bybit, Product::LinearUsdt)
858            })?;
859            let data: Vec<native::KlineData> =
860                serde_json::from_value(envelope.data).map_err(decode_error)?;
861            let mut events = Vec::with_capacity(data.len());
862            for kline in data {
863                let symbol = kline.symbol.as_deref().unwrap_or(topic_symbol);
864                let spec = self.require_native_symbol(symbol)?;
865                events.push(PublicLaneEvent::Kline(FastKline {
866                    instrument_id: spec.instrument_id.clone(),
867                    interval: kline.interval.into(),
868                    open: Price::new(parse_decimal(&kline.open)?).quantize(spec.price_scale)?,
869                    high: Price::new(parse_decimal(&kline.high)?).quantize(spec.price_scale)?,
870                    low: Price::new(parse_decimal(&kline.low)?).quantize(spec.price_scale)?,
871                    close: Price::new(parse_decimal(&kline.close)?).quantize(spec.price_scale)?,
872                    volume: Quantity::new(parse_decimal(&kline.volume)?)
873                        .quantize(spec.qty_scale)?,
874                    open_time: TimestampMs::new(kline.start),
875                    close_time: TimestampMs::new(kline.end),
876                    closed: kline.confirm,
877                }));
878            }
879            return Ok(events);
880        }
881
882        Err(MarketError::new(
883            ErrorKind::Unsupported,
884            format!("unsupported bybit public topic '{}'", envelope.topic),
885        ))
886    }
887
888    fn parse_private(&self, payload: &str) -> Result<Vec<PrivateLaneEvent>> {
889        let envelope = self.parse_native_private(payload)?;
890        if envelope.topic == "wallet" {
891            let data: Vec<native::WalletData> =
892                serde_json::from_value(envelope.data).map_err(decode_error)?;
893            let mut events = Vec::new();
894            for wallet in data {
895                for coin in wallet.coins {
896                    events.push(PrivateLaneEvent::Balance(Balance {
897                        asset: AssetCode::from(coin.coin),
898                        wallet_balance: parse_decimal(&coin.wallet_balance)?.into(),
899                        available_balance: parse_decimal_or_zero_on_empty(
900                            &coin.available_to_withdraw,
901                        )?
902                        .into(),
903                        updated_at: TimestampMs::new(envelope.creation_time),
904                    }));
905                }
906            }
907            return Ok(events);
908        }
909
910        if envelope.topic == "position" {
911            let data: Vec<native::PositionData> =
912                serde_json::from_value(envelope.data).map_err(decode_error)?;
913            let mut events = Vec::new();
914            for position in data {
915                if position.size == "0" {
916                    continue;
917                }
918                let spec = self.require_native_symbol(&position.symbol)?;
919                let side = parse_side(&position.side)?;
920                events.push(PrivateLaneEvent::Position(Position {
921                    position_id: PositionId::from(format!(
922                        "bybit:{}:{}",
923                        position.symbol, position.position_idx
924                    )),
925                    instrument_id: spec.instrument_id.clone(),
926                    direction: match side {
927                        Side::Buy => PositionDirection::Long,
928                        Side::Sell => PositionDirection::Short,
929                    },
930                    size: Quantity::new(parse_decimal(&position.size)?),
931                    entry_price: parse_optional_decimal_str(&position.entry_price)?.map(Price::new),
932                    mark_price: None,
933                    unrealized_pnl: parse_optional_decimal_str(&position.unrealised_pnl)?
934                        .map(Into::into),
935                    leverage: parse_optional_decimal(position.leverage.as_deref())?
936                        .map(Leverage::new),
937                    margin_mode: parse_trade_mode(position.trade_mode),
938                    position_mode: parse_position_mode(position.position_idx),
939                    updated_at: TimestampMs::new(envelope.creation_time),
940                }));
941            }
942            return Ok(events);
943        }
944
945        if envelope.topic == "order" {
946            let data: Vec<native::OrderData> =
947                serde_json::from_value(envelope.data).map_err(decode_error)?;
948            let mut events = Vec::new();
949            for order in data {
950                events.push(PrivateLaneEvent::Order(self.order_from_snapshot(
951                    order,
952                    TimestampMs::new(envelope.creation_time),
953                )?));
954            }
955            return Ok(events);
956        }
957
958        if envelope.topic == "execution" {
959            let data: Vec<native::ExecutionData> =
960                serde_json::from_value(envelope.data).map_err(decode_error)?;
961            let mut events = Vec::new();
962            for execution in data {
963                let spec = self.require_native_symbol(&execution.symbol)?;
964                events.push(PrivateLaneEvent::Execution(Execution {
965                    execution_id: TradeId::from(execution.exec_id),
966                    order_id: OrderId::from(execution.order_id),
967                    client_order_id: execution.order_link_id.map(Into::into),
968                    instrument_id: spec.instrument_id.clone(),
969                    side: parse_side(&execution.side)?,
970                    quantity: Quantity::new(parse_decimal(&execution.exec_qty)?),
971                    price: Price::new(parse_decimal(&execution.exec_price)?),
972                    fee: Some(parse_decimal(&execution.exec_fee)?.into()),
973                    fee_asset: execution.fee_currency.map(AssetCode::from),
974                    liquidity: None,
975                    executed_at: TimestampMs::new(execution.exec_time),
976                }));
977            }
978            return Ok(events);
979        }
980
981        Err(MarketError::new(
982            ErrorKind::Unsupported,
983            format!("unsupported bybit private topic '{}'", envelope.topic),
984        ))
985    }
986
987    fn classify_command(
988        &self,
989        operation: CommandOperation,
990        payload: Option<&str>,
991        request_id: Option<RequestId>,
992    ) -> Result<CommandReceipt> {
993        let Some(payload) = payload else {
994            return Ok(CommandReceipt {
995                operation,
996                status: CommandStatus::UnknownExecution,
997                venue: Venue::Bybit,
998                product: Product::LinearUsdt,
999                instrument_id: None,
1000                order_id: None,
1001                client_order_id: None,
1002                request_id,
1003                message: Some("command outcome requires reconcile".into()),
1004                native_code: None,
1005                retriable: true,
1006            });
1007        };
1008
1009        let value = serde_json::from_str::<serde_json::Value>(payload).map_err(|error| {
1010            MarketError::new(
1011                ErrorKind::DecodeError,
1012                format!("failed to classify bybit command response: {error}"),
1013            )
1014        })?;
1015        let response = native::RetCodeResponse {
1016            ret_code: value
1017                .get("retCode")
1018                .and_then(|value| {
1019                    value
1020                        .as_i64()
1021                        .or_else(|| value.as_str().and_then(|raw| raw.parse::<i64>().ok()))
1022                })
1023                .unwrap_or_default(),
1024            ret_msg: value
1025                .get("retMsg")
1026                .and_then(serde_json::Value::as_str)
1027                .unwrap_or_default()
1028                .to_owned(),
1029            result: value
1030                .get("result")
1031                .cloned()
1032                .or_else(|| value.get("data").cloned()),
1033        };
1034
1035        if response.ret_code != 0 {
1036            return Ok(CommandReceipt {
1037                operation,
1038                status: CommandStatus::Rejected,
1039                venue: Venue::Bybit,
1040                product: Product::LinearUsdt,
1041                instrument_id: None,
1042                order_id: None,
1043                client_order_id: None,
1044                request_id,
1045                message: Some(response.ret_msg.into()),
1046                native_code: Some(response.ret_code.to_string().into()),
1047                retriable: false,
1048            });
1049        }
1050
1051        let result = response.result.unwrap_or_default();
1052
1053        let (instrument_id, order_id, client_order_id) = match operation {
1054            CommandOperation::CreateOrder
1055            | CommandOperation::AmendOrder
1056            | CommandOperation::CancelOrder
1057            | CommandOperation::ClosePosition
1058            | CommandOperation::GetOrder => {
1059                let order = serde_json::from_value::<native::OrderResult>(result)
1060                    .unwrap_or_else(|_| native::OrderResult::default());
1061                let instrument_id = order
1062                    .symbol
1063                    .as_deref()
1064                    .and_then(|symbol| self.resolve_native_symbol(symbol))
1065                    .map(|spec| spec.instrument_id.clone());
1066                (
1067                    instrument_id,
1068                    order.order_id.map(OrderId::from),
1069                    order.order_link_id.map(Into::into),
1070                )
1071            }
1072            _ => (None, None, None),
1073        };
1074
1075        Ok(CommandReceipt {
1076            operation,
1077            status: CommandStatus::Accepted,
1078            venue: Venue::Bybit,
1079            product: Product::LinearUsdt,
1080            instrument_id,
1081            order_id,
1082            client_order_id,
1083            request_id,
1084            message: Some(response.ret_msg.into()),
1085            native_code: Some(response.ret_code.to_string().into()),
1086            retriable: false,
1087        })
1088    }
1089}
1090
1091fn btc_spec() -> InstrumentSpec {
1092    instrument_spec(("BTC", "USDT", "USDT"), "BTCUSDT", 2, 3, 5, 100)
1093}
1094
1095fn eth_spec() -> InstrumentSpec {
1096    instrument_spec(("ETH", "USDT", "USDT"), "ETHUSDT", 2, 3, 5, 50)
1097}
1098
1099fn instrument_spec(
1100    assets: (&str, &str, &str),
1101    native_symbol: &str,
1102    price_scale: u32,
1103    qty_scale: u32,
1104    quote_scale: u32,
1105    max_leverage: i64,
1106) -> InstrumentSpec {
1107    let (base, quote, settle) = assets;
1108    InstrumentSpec {
1109        venue: Venue::Bybit,
1110        product: Product::LinearUsdt,
1111        market_type: MarketType::LinearPerpetual,
1112        instrument_id: InstrumentId::from(canonical_symbol(base, quote, settle)),
1113        canonical_symbol: canonical_symbol(base, quote, settle).into(),
1114        native_symbol: native_symbol.into(),
1115        base: AssetCode::from(base),
1116        quote: AssetCode::from(quote),
1117        settle: AssetCode::from(settle),
1118        contract_size: Quantity::new(Decimal::ONE),
1119        tick_size: Price::new(Decimal::new(1, price_scale)),
1120        step_size: Quantity::new(Decimal::new(1, qty_scale)),
1121        min_qty: Quantity::new(Decimal::new(1, qty_scale)),
1122        min_notional: Notional::new(Decimal::new(5, quote_scale)),
1123        price_scale,
1124        qty_scale,
1125        quote_scale,
1126        max_leverage: Some(Leverage::new(Decimal::new(max_leverage, 0))),
1127        support: InstrumentSupport {
1128            public_streams: true,
1129            private_trading: true,
1130            leverage_set: true,
1131            margin_mode_set: true,
1132            funding_rate: true,
1133            open_interest: true,
1134        },
1135        status: InstrumentStatus::Active,
1136    }
1137}
1138
1139fn canonical_symbol(base: &str, quote: &str, settle: &str) -> String {
1140    format!("{base}/{quote}:{settle}")
1141}
1142
1143fn parse_decimal(raw: &str) -> Result<Decimal> {
1144    raw.parse::<Decimal>().map_err(|error| {
1145        MarketError::new(
1146            ErrorKind::DecodeError,
1147            format!("invalid decimal '{raw}': {error}"),
1148        )
1149        .with_venue(Venue::Bybit, Product::LinearUsdt)
1150    })
1151}
1152
1153fn parse_optional_decimal(raw: Option<&str>) -> Result<Option<Decimal>> {
1154    raw.map(str::trim)
1155        .filter(|raw| !raw.is_empty())
1156        .map(parse_decimal)
1157        .transpose()
1158}
1159
1160fn parse_decimal_or_zero_on_empty(raw: &str) -> Result<Decimal> {
1161    Ok(parse_optional_decimal_str(raw)?.unwrap_or(Decimal::ZERO))
1162}
1163
1164fn parse_side(raw: &str) -> Result<Side> {
1165    match raw {
1166        "Buy" => Ok(Side::Buy),
1167        "Sell" => Ok(Side::Sell),
1168        other => Err(MarketError::new(
1169            ErrorKind::DecodeError,
1170            format!("unsupported bybit side '{other}'"),
1171        )),
1172    }
1173}
1174
1175fn parse_aggressor(raw: &str) -> Result<AggressorSide> {
1176    match raw {
1177        "Buy" => Ok(AggressorSide::Buyer),
1178        "Sell" => Ok(AggressorSide::Seller),
1179        other => Err(MarketError::new(
1180            ErrorKind::DecodeError,
1181            format!("unsupported bybit trade side '{other}'"),
1182        )),
1183    }
1184}
1185
1186fn parse_order_type(raw: &str) -> Result<OrderType> {
1187    match raw {
1188        "Market" => Ok(OrderType::Market),
1189        "Limit" => Ok(OrderType::Limit),
1190        "Stop" => Ok(OrderType::StopLimit),
1191        "StopMarket" => Ok(OrderType::StopMarket),
1192        "TakeProfit" => Ok(OrderType::TakeProfitLimit),
1193        "TakeProfitMarket" => Ok(OrderType::TakeProfitMarket),
1194        other => Err(MarketError::new(
1195            ErrorKind::DecodeError,
1196            format!("unsupported bybit order type '{other}'"),
1197        )),
1198    }
1199}
1200
1201fn parse_time_in_force(raw: &str) -> Result<TimeInForce> {
1202    match raw {
1203        "GTC" => Ok(TimeInForce::Gtc),
1204        "IOC" => Ok(TimeInForce::Ioc),
1205        "FOK" => Ok(TimeInForce::Fok),
1206        "PostOnly" => Ok(TimeInForce::PostOnly),
1207        other => Err(MarketError::new(
1208            ErrorKind::DecodeError,
1209            format!("unsupported bybit time in force '{other}'"),
1210        )),
1211    }
1212}
1213
1214fn parse_order_status(raw: &str) -> Result<OrderStatus> {
1215    match raw {
1216        "New" => Ok(OrderStatus::New),
1217        "PartiallyFilled" => Ok(OrderStatus::PartiallyFilled),
1218        "Filled" => Ok(OrderStatus::Filled),
1219        "Cancelled" | "Canceled" => Ok(OrderStatus::Canceled),
1220        "Rejected" | "Deactivated" => Ok(OrderStatus::Rejected),
1221        other => Err(MarketError::new(
1222            ErrorKind::DecodeError,
1223            format!("unsupported bybit order status '{other}'"),
1224        )),
1225    }
1226}
1227
1228fn parse_trade_mode(mode: u8) -> MarginMode {
1229    if mode == 1 {
1230        MarginMode::Isolated
1231    } else {
1232        MarginMode::Cross
1233    }
1234}
1235
1236fn parse_margin_mode_name(raw: &str) -> Option<MarginMode> {
1237    match raw {
1238        "ISOLATED_MARGIN" => Some(MarginMode::Isolated),
1239        "REGULAR_MARGIN" | "CROSS_MARGIN" => Some(MarginMode::Cross),
1240        _ => None,
1241    }
1242}
1243
1244fn parse_position_mode(index: u8) -> PositionMode {
1245    if index == 0 {
1246        PositionMode::OneWay
1247    } else {
1248        PositionMode::Hedge
1249    }
1250}
1251
1252fn parse_instrument_status(raw: &str) -> InstrumentStatus {
1253    match raw {
1254        "Trading" => InstrumentStatus::Active,
1255        "Settling" | "Closed" | "PreLaunch" => InstrumentStatus::Halted,
1256        "Deliverying" => InstrumentStatus::Settled,
1257        _ => InstrumentStatus::Halted,
1258    }
1259}
1260
1261fn merge_string_field(target: &mut String, value: Option<String>) {
1262    if let Some(value) = value
1263        && !value.is_empty()
1264    {
1265        *target = value;
1266    }
1267}
1268
1269fn merge_optional_string_field(target: &mut Option<String>, value: Option<String>) {
1270    if let Some(value) = value
1271        && !value.is_empty()
1272    {
1273        *target = Some(value);
1274    }
1275}
1276
1277fn ensure_non_empty(value: &str, field: &str) -> Result<()> {
1278    if value.is_empty() {
1279        return Err(MarketError::new(
1280            ErrorKind::DecodeError,
1281            format!("missing bybit ticker field '{field}' after snapshot/delta merge"),
1282        ));
1283    }
1284    Ok(())
1285}
1286
1287fn parse_optional_decimal_str(raw: &str) -> Result<Option<Decimal>> {
1288    let raw = raw.trim();
1289    if raw.is_empty() {
1290        return Ok(None);
1291    }
1292    parse_decimal(raw).map(Some)
1293}
1294
1295fn decimal_scale(value: Decimal) -> u32 {
1296    value.normalize().scale()
1297}
1298
1299fn decode_error(error: serde_json::Error) -> MarketError {
1300    MarketError::new(
1301        ErrorKind::DecodeError,
1302        format!("failed to decode bybit topic payload: {error}"),
1303    )
1304    .with_venue(Venue::Bybit, Product::LinearUsdt)
1305}
1306
1307fn decode_string_error(label: &str, error: impl std::fmt::Display) -> MarketError {
1308    MarketError::new(
1309        ErrorKind::DecodeError,
1310        format!("failed to decode {label}: {error}"),
1311    )
1312    .with_venue(Venue::Bybit, Product::LinearUsdt)
1313}
1314
1315fn exchange_reject(code: i64, message: &str) -> MarketError {
1316    MarketError::new(ErrorKind::ExchangeReject, message)
1317        .with_venue(Venue::Bybit, Product::LinearUsdt)
1318        .with_native_code(code.to_string())
1319}
1320
1321fn now_timestamp_ms() -> i64 {
1322    SystemTime::now()
1323        .duration_since(UNIX_EPOCH)
1324        .map(|duration| duration.as_millis().min(i128::from(i64::MAX) as u128) as i64)
1325        .unwrap_or(0)
1326}
1327
1328#[cfg(test)]
1329mod tests {
1330    use super::BybitLinearFuturesAdapter;
1331    use bat_markets_core::{
1332        FetchOhlcvRequest, FetchTradesRequest, InstrumentId, OrderStatus, PublicLaneEvent,
1333        TimestampMs, VenueAdapter,
1334    };
1335
1336    const EXECUTION_HISTORY: &str = include_str!(concat!(
1337        env!("CARGO_MANIFEST_DIR"),
1338        "/../../fixtures/bybit/execution_history.json"
1339    ));
1340    const ORDER_HISTORY: &str = include_str!(concat!(
1341        env!("CARGO_MANIFEST_DIR"),
1342        "/../../fixtures/bybit/order_history.json"
1343    ));
1344
1345    #[test]
1346    fn parse_bybit_execution_history_snapshot() {
1347        let adapter = BybitLinearFuturesAdapter::new();
1348        let executions = adapter
1349            .parse_executions_snapshot(EXECUTION_HISTORY)
1350            .expect("bybit execution history fixture should parse");
1351        assert_eq!(executions.len(), 1);
1352        assert_eq!(executions[0].execution_id.to_string(), "bybit-exec-1");
1353    }
1354
1355    #[test]
1356    fn parse_bybit_order_history_snapshot() {
1357        let adapter = BybitLinearFuturesAdapter::new();
1358        let orders = adapter
1359            .parse_order_history_snapshot(ORDER_HISTORY, TimestampMs::new(1))
1360            .expect("bybit order history fixture should parse");
1361        assert_eq!(orders.len(), 1);
1362        assert_eq!(orders[0].status, OrderStatus::Canceled);
1363    }
1364
1365    #[test]
1366    fn parse_bybit_rest_ticker_snapshot() {
1367        let adapter = BybitLinearFuturesAdapter::new();
1368        let ticker = adapter
1369            .parse_ticker_snapshot(
1370                r#"{
1371                    "retCode":0,
1372                    "retMsg":"OK",
1373                    "result":{"list":[{"symbol":"BTCUSDT","lastPrice":"70110.0","markPrice":"70108.5","indexPrice":"70105.0","openInterest":"30000.500","fundingRate":"0.000120","volume24h":"5432.100","turnover24h":"381000000.55","bid1Price":"70109.5","bid1Size":"2.500","ask1Price":"70110.5","ask1Size":"1.700"}]}
1374                }"#,
1375                &InstrumentId::from("BTC/USDT:USDT"),
1376            )
1377            .expect("bybit rest ticker should parse");
1378        assert_eq!(ticker.last_price.to_string(), "70110.00");
1379    }
1380
1381    #[test]
1382    fn parse_bybit_rest_trades_snapshot() {
1383        let adapter = BybitLinearFuturesAdapter::new();
1384        let trades = adapter
1385            .parse_trades_snapshot(
1386                r#"{
1387                    "retCode":0,
1388                    "retMsg":"OK",
1389                    "result":{"list":[{"execId":"abc123","symbol":"BTCUSDT","price":"70109.9","size":"0.250","side":"Buy","time":"1710000000001"}]}
1390                }"#,
1391                &FetchTradesRequest::new(InstrumentId::from("BTC/USDT:USDT"), Some(1)),
1392            )
1393            .expect("bybit rest trades should parse");
1394        assert_eq!(trades.len(), 1);
1395        assert_eq!(trades[0].price.to_string(), "70109.90");
1396    }
1397
1398    #[test]
1399    fn parse_bybit_rest_book_top_snapshot() {
1400        let adapter = BybitLinearFuturesAdapter::new();
1401        let book_top = adapter
1402            .parse_book_top_snapshot(
1403                r#"{
1404                    "retCode":0,
1405                    "retMsg":"OK",
1406                    "result":{"s":"BTCUSDT","b":[["70110.0","2.500"]],"a":[["70110.5","1.700"]],"ts":1710000100200,"u":10,"seq":100,"cts":1710000100195}
1407                }"#,
1408                &InstrumentId::from("BTC/USDT:USDT"),
1409            )
1410            .expect("bybit rest book top should parse");
1411        assert_eq!(book_top.bid.price.to_string(), "70110.00");
1412        assert_eq!(book_top.ask.price.to_string(), "70110.50");
1413    }
1414
1415    #[test]
1416    fn parse_bybit_ohlcv_snapshot_sorts_ascending() {
1417        let adapter = BybitLinearFuturesAdapter::new();
1418        let klines = adapter
1419            .parse_ohlcv_snapshot(
1420                r#"{
1421                    "retCode":0,
1422                    "retMsg":"OK",
1423                    "result":{
1424                        "list":[
1425                            ["1710000060000","64050.0","64150.0","64000.0","64100.0","23.456","0"],
1426                            ["1710000000000","64000.1","64100.0","63950.0","64050.0","12.345","0"]
1427                        ]
1428                    }
1429                }"#,
1430                &FetchOhlcvRequest::for_instrument(
1431                    InstrumentId::from("BTC/USDT:USDT"),
1432                    "1m",
1433                    None,
1434                    None,
1435                    Some(2),
1436                ),
1437            )
1438            .expect("bybit klines snapshot should parse");
1439
1440        assert_eq!(klines.len(), 2);
1441        assert_eq!(klines[0].open_time, TimestampMs::new(1710000000000));
1442        assert_eq!(klines[1].close.to_string(), "64100.00");
1443    }
1444
1445    #[test]
1446    fn parse_bybit_public_kline_without_symbol_uses_topic_suffix() {
1447        let adapter = BybitLinearFuturesAdapter::new();
1448        let events = adapter
1449            .parse_public(
1450                r#"{
1451                    "topic":"kline.1.BTCUSDT",
1452                    "type":"snapshot",
1453                    "ts":1710000005000,
1454                    "data":[
1455                        {
1456                            "start":1710000000000,
1457                            "end":1710000059999,
1458                            "interval":"1",
1459                            "open":"64000.0",
1460                            "close":"64010.0",
1461                            "high":"64020.0",
1462                            "low":"63990.0",
1463                            "volume":"12.0",
1464                            "confirm":false
1465                        }
1466                    ]
1467                }"#,
1468            )
1469            .expect("kline payload without symbol should still parse");
1470
1471        assert_eq!(events.len(), 1);
1472        let PublicLaneEvent::Kline(kline) = &events[0] else {
1473            panic!("expected kline event");
1474        };
1475        assert_eq!(kline.instrument_id, InstrumentId::from("BTC/USDT:USDT"));
1476        assert_eq!(kline.interval.as_ref(), "1");
1477    }
1478
1479    #[test]
1480    fn parse_bybit_ticker_delta_merges_cached_snapshot() {
1481        let adapter = BybitLinearFuturesAdapter::new();
1482        adapter
1483            .parse_public(
1484                r#"{
1485                    "topic":"tickers.BTCUSDT",
1486                    "type":"snapshot",
1487                    "ts":1710000100000,
1488                    "data":{"symbol":"BTCUSDT","lastPrice":"70110.0","markPrice":"70108.5","indexPrice":"70105.0","openInterest":"30000.500","fundingRate":"0.000120","volume24h":"5432.100","turnover24h":"381000000.55"}
1489                }"#,
1490            )
1491            .expect("snapshot should parse");
1492
1493        let events = adapter
1494            .parse_public(
1495                r#"{
1496                    "topic":"tickers.BTCUSDT",
1497                    "type":"delta",
1498                    "ts":1710000100100,
1499                    "data":{"symbol":"BTCUSDT","lastPrice":"70111.0","fundingRate":""}
1500                }"#,
1501            )
1502            .expect("delta should merge with cached snapshot");
1503
1504        let ticker = match &events[0] {
1505            PublicLaneEvent::Ticker(ticker) => ticker,
1506            other => panic!("expected ticker event, got {other:?}"),
1507        };
1508        let spec = adapter
1509            .resolve_instrument(&InstrumentId::from("BTC/USDT:USDT"))
1510            .expect("btc instrument should resolve");
1511        let ticker = ticker.to_unified(&spec);
1512        assert_eq!(ticker.last_price.to_string(), "70111.00");
1513        assert_eq!(
1514            ticker
1515                .mark_price
1516                .expect("mark price should stay cached")
1517                .to_string(),
1518            "70108.50"
1519        );
1520    }
1521
1522    #[test]
1523    fn parse_bybit_wallet_snapshot_tolerates_empty_available_balances() {
1524        let adapter = BybitLinearFuturesAdapter::new();
1525        let snapshot = adapter
1526            .parse_account_snapshot(
1527                r#"{
1528                    "retCode":0,
1529                    "retMsg":"OK",
1530                    "result":{
1531                        "list":[
1532                            {
1533                                "accountType":"UNIFIED",
1534                                "totalWalletBalance":"125.5",
1535                                "totalAvailableBalance":"",
1536                                "totalPerpUPL":"",
1537                                "coin":[
1538                                    {"coin":"USDT","walletBalance":"125.5","availableToWithdraw":""}
1539                                ]
1540                            }
1541                        ]
1542                    }
1543                }"#,
1544                TimestampMs::new(1710000000000),
1545            )
1546            .expect("wallet snapshot with empty optional balances should parse");
1547
1548        assert_eq!(snapshot.balances.len(), 1);
1549        assert_eq!(snapshot.balances[0].available_balance.to_string(), "0");
1550        let summary = snapshot.summary.expect("summary should be present");
1551        assert_eq!(summary.total_available_balance.to_string(), "0");
1552        assert_eq!(summary.total_unrealized_pnl.to_string(), "0");
1553    }
1554
1555    #[test]
1556    fn parse_bybit_position_snapshot_tolerates_empty_optional_numeric_fields() {
1557        let adapter = BybitLinearFuturesAdapter::new();
1558        let positions = adapter
1559            .parse_positions_snapshot(
1560                r#"{
1561                    "retCode":0,
1562                    "retMsg":"OK",
1563                    "result":{
1564                        "list":[
1565                            {
1566                                "symbol":"BTCUSDT",
1567                                "side":"Buy",
1568                                "size":"0.010",
1569                                "entryPrice":"",
1570                                "unrealisedPnl":"",
1571                                "tradeMode":0,
1572                                "positionIdx":1,
1573                                "leverage":""
1574                            }
1575                        ]
1576                    }
1577                }"#,
1578                TimestampMs::new(1710000000000),
1579            )
1580            .expect("position snapshot with empty optional decimals should parse");
1581
1582        assert_eq!(positions.len(), 1);
1583        assert!(positions[0].entry_price.is_none());
1584        assert!(positions[0].unrealized_pnl.is_none());
1585        assert!(positions[0].leverage.is_none());
1586    }
1587
1588    #[test]
1589    fn parse_bybit_metadata_snapshot_keeps_linear_perpetual_and_skips_dated_futures() {
1590        let adapter = BybitLinearFuturesAdapter::new();
1591        let instruments = adapter
1592            .parse_metadata_snapshot(
1593                r#"{
1594                    "retCode":0,
1595                    "retMsg":"OK",
1596                    "result":{
1597                        "list":[
1598                            {
1599                                "symbol":"BTCUSDT",
1600                                "contractType":"LinearPerpetual",
1601                                "status":"Trading",
1602                                "baseCoin":"BTC",
1603                                "quoteCoin":"USDT",
1604                                "settleCoin":"USDT",
1605                                "priceScale":"2",
1606                                "priceFilter":{"tickSize":"0.10"},
1607                                "lotSizeFilter":{"qtyStep":"0.001","minOrderQty":"0.001","minNotionalValue":"5"},
1608                                "leverageFilter":{"maxLeverage":"100.00"}
1609                            },
1610                            {
1611                                "symbol":"BTCUSDT-29MAY26",
1612                                "contractType":"LinearFutures",
1613                                "status":"Trading",
1614                                "baseCoin":"BTC",
1615                                "quoteCoin":"USDT",
1616                                "settleCoin":"USDT",
1617                                "priceScale":"2",
1618                                "priceFilter":{"tickSize":"0.10"},
1619                                "lotSizeFilter":{"qtyStep":"0.001","minOrderQty":"0.001","minNotionalValue":"5"},
1620                                "leverageFilter":{"maxLeverage":"50.00"}
1621                            }
1622                        ]
1623                    }
1624                }"#,
1625            )
1626            .expect("metadata snapshot should parse");
1627
1628        assert_eq!(instruments.len(), 1);
1629        assert_eq!(instruments[0].native_symbol.as_ref(), "BTCUSDT");
1630        assert_eq!(
1631            instruments[0].instrument_id,
1632            InstrumentId::from("BTC/USDT:USDT")
1633        );
1634    }
1635}