Skip to main content

tesser_bybit/
lib.rs

1//! Bybit REST connector targeting the public v5 API.
2//!
3//! Authentication details follow the rules described in
4//! `bybit-api-docs/docs/v5/guide.mdx`.
5
6use std::{any::Any, num::NonZeroU32, str::FromStr, sync::Arc, time::Duration};
7
8use async_trait::async_trait;
9use chrono::Utc;
10use hmac::{Hmac, Mac};
11use reqwest::{Client, Method};
12use rust_decimal::Decimal;
13use serde::{de::DeserializeOwned, Deserialize};
14use serde_json::Value;
15use sha2::Sha256;
16use tesser_broker::{
17    register_connector_factory, BrokerError, BrokerErrorKind, BrokerInfo, BrokerResult,
18    ConnectorFactory, ConnectorStream, ConnectorStreamConfig, ExecutionClient, MarketStream, Quota,
19    RateLimiter, RateLimiterError,
20};
21use tesser_core::{
22    AccountBalance, AssetId, Candle, ExchangeId, Fill, Instrument, InstrumentKind, Order,
23    OrderBook, OrderRequest, OrderStatus, OrderType, OrderUpdateRequest, Position, Quantity, Side,
24    Symbol, TimeInForce,
25};
26use tracing::{trace, warn};
27
28pub mod ws;
29
30pub use ws::{BybitMarketStream, BybitSubscription, PublicChannel};
31
32type HmacSha256 = Hmac<Sha256>;
33const EXECUTION_MAX_WINDOW_MS: i64 = 7 * 24 * 60 * 60 * 1000;
34
35/// API credentials required for private REST endpoints.
36#[derive(Clone)]
37pub struct BybitCredentials {
38    pub api_key: String,
39    pub api_secret: String,
40}
41
42/// Configuration for the Bybit REST client.
43pub struct BybitConfig {
44    pub base_url: String,
45    pub category: String,
46    pub recv_window: u64,
47    pub ws_url: Option<String>,
48    pub public_quota: Option<Quota>,
49    pub private_quota: Option<Quota>,
50    pub settle_coin: Option<String>,
51}
52
53impl Default for BybitConfig {
54    fn default() -> Self {
55        Self {
56            base_url: "https://api-testnet.bybit.com".into(),
57            category: "linear".into(),
58            recv_window: 5_000,
59            ws_url: None,
60            public_quota: None,
61            private_quota: None,
62            settle_coin: Some("USDT".into()),
63        }
64    }
65}
66
67/// A thin wrapper over the Bybit v5 REST API.
68pub struct BybitClient {
69    http: Client,
70    config: BybitConfig,
71    credentials: Option<BybitCredentials>,
72    info: BrokerInfo,
73    public_limiter: Option<RateLimiter>,
74    private_limiter: Option<RateLimiter>,
75    exchange: ExchangeId,
76}
77
78/// Execution record enriched with the originating `exec_id`.
79#[derive(Clone, Debug)]
80pub struct BybitExecution {
81    pub fill: Fill,
82    pub exec_id: Option<String>,
83}
84
85impl BybitClient {
86    /// Build a new client optionally configured with credentials.
87    pub fn new(
88        config: BybitConfig,
89        credentials: Option<BybitCredentials>,
90        exchange: ExchangeId,
91    ) -> Self {
92        let http = Client::builder()
93            .connect_timeout(Duration::from_secs(5))
94            .timeout(Duration::from_secs(10))
95            .build()
96            .expect("failed to create reqwest client");
97        let public_limiter = config.public_quota.map(RateLimiter::direct);
98        let private_limiter = config.private_quota.map(RateLimiter::keyed);
99        Self {
100            info: BrokerInfo {
101                name: "bybit".into(),
102                markets: vec![config.category.clone()],
103                supports_testnet: config.base_url.contains("testnet"),
104            },
105            http,
106            config,
107            credentials,
108            public_limiter,
109            private_limiter,
110            exchange,
111        }
112    }
113
114    /// Convenience helper for the Bybit testnet.
115    pub fn testnet(credentials: Option<BybitCredentials>) -> Self {
116        Self::new(
117            BybitConfig::default(),
118            credentials,
119            ExchangeId::from("bybit_linear"),
120        )
121    }
122
123    pub fn get_credentials(&self) -> Option<BybitCredentials> {
124        self.credentials.clone()
125    }
126
127    pub fn exchange(&self) -> ExchangeId {
128        self.exchange
129    }
130
131    pub fn get_ws_url(&self) -> String {
132        self.config
133            .ws_url
134            .clone()
135            .unwrap_or_else(|| self.config.base_url.replace("https://api", "wss://stream"))
136    }
137
138    fn symbol_code(symbol: Symbol) -> &'static str {
139        symbol.code()
140    }
141
142    fn parse_symbol(&self, value: &str) -> Symbol {
143        Symbol::from_code(self.exchange, value)
144    }
145
146    fn parse_asset(&self, value: &str) -> AssetId {
147        AssetId::from_code(self.exchange, value)
148    }
149
150    async fn throttle_public(&self) -> BrokerResult<()> {
151        if let Some(limiter) = &self.public_limiter {
152            limiter
153                .until_ready()
154                .await
155                .map_err(Self::rate_limiter_error)?;
156        }
157        Ok(())
158    }
159
160    async fn throttle_private(&self) -> BrokerResult<()> {
161        if let (Some(limiter), Some(creds)) = (&self.private_limiter, &self.credentials) {
162            limiter
163                .until_key_ready(&creds.api_key)
164                .await
165                .map_err(Self::rate_limiter_error)?;
166        }
167        Ok(())
168    }
169
170    /// Fetch Bybit server time (docs/v5/market/time.mdx).
171    pub async fn server_time(&self) -> BrokerResult<u128> {
172        let resp: ApiResponse<ServerTimeResult> = self.public_get("/v5/market/time").await?;
173        self.ensure_success(&resp)?;
174        resp.result
175            .time_nano
176            .parse::<u128>()
177            .map_err(|err| BrokerError::Serialization(err.to_string()))
178    }
179
180    fn url(&self, path: &str) -> String {
181        format!("{}/{}", self.config.base_url, path.trim_start_matches('/'))
182    }
183
184    fn ensure_success<T>(&self, resp: &ApiResponse<T>) -> BrokerResult<()> {
185        if resp.ret_code == 0 {
186            Ok(())
187        } else {
188            Err(BrokerError::Exchange(format!(
189                "{} (code {})",
190                resp.ret_msg, resp.ret_code
191            )))
192        }
193    }
194
195    async fn public_get<T>(&self, path: &str) -> BrokerResult<ApiResponse<T>>
196    where
197        T: DeserializeOwned,
198    {
199        self.throttle_public().await?;
200        let url = self.url(path);
201        self.http
202            .get(url)
203            .send()
204            .await
205            .map_err(|err| BrokerError::Transport(err.to_string()))?
206            .json::<ApiResponse<T>>()
207            .await
208            .map_err(|err| BrokerError::Serialization(err.to_string()))
209    }
210
211    fn creds(&self) -> BrokerResult<&BybitCredentials> {
212        self.credentials
213            .as_ref()
214            .ok_or_else(|| BrokerError::Authentication("missing Bybit credentials".into()))
215    }
216
217    fn rate_limiter_error(err: RateLimiterError) -> BrokerError {
218        BrokerError::Other(format!("rate limited: {err}"))
219    }
220
221    async fn signed_request<T>(
222        &self,
223        method: Method,
224        path: &str,
225        body: Value,
226        query: Option<Vec<(String, String)>>,
227    ) -> BrokerResult<ApiResponse<T>>
228    where
229        T: DeserializeOwned,
230    {
231        let creds = self.creds()?;
232        self.throttle_private().await?;
233        let timestamp = Utc::now().timestamp_millis();
234        let query_string = query
235            .as_ref()
236            .map(|pairs| serde_urlencoded::to_string(pairs).unwrap_or_default())
237            .unwrap_or_default();
238        let payload = if method == Method::GET {
239            format!(
240                "{timestamp}{}{}{}",
241                creds.api_key, self.config.recv_window, query_string
242            )
243        } else {
244            let body_string = body.to_string();
245            format!(
246                "{timestamp}{}{}{}",
247                creds.api_key, self.config.recv_window, body_string
248            )
249        };
250        let mut mac = HmacSha256::new_from_slice(creds.api_secret.as_bytes())
251            .map_err(|err| BrokerError::Other(format!("failed to create signing key: {err}")))?;
252        mac.update(payload.as_bytes());
253        let signature = hex::encode(mac.finalize().into_bytes());
254        let url = if query_string.is_empty() {
255            self.url(path)
256        } else {
257            format!("{}?{}", self.url(path), query_string)
258        };
259        trace!(
260            method = %method,
261            path = %path,
262            url = %url,
263            query = %query_string,
264            body = ?if method == Method::GET {
265                serde_json::Value::Null
266            } else {
267                body.clone()
268            },
269            "sending Bybit signed request"
270        );
271        let mut request = self.http.request(method.clone(), url);
272        request = request
273            .header("X-BAPI-API-KEY", &creds.api_key)
274            .header("X-BAPI-TIMESTAMP", timestamp.to_string())
275            .header("X-BAPI-SIGN", signature)
276            .header("X-BAPI-RECV-WINDOW", self.config.recv_window.to_string())
277            .header("Content-Type", "application/json");
278        if method != Method::GET {
279            request = request.json(&body);
280        }
281        let response = request
282            .send()
283            .await
284            .map_err(|err| BrokerError::Transport(err.to_string()))?;
285        let status = response.status();
286        let text = response
287            .text()
288            .await
289            .map_err(|err| BrokerError::Serialization(err.to_string()))?;
290        trace!(
291            path = %path,
292            status = %status,
293            body = %preview_json(&text),
294            "received response from Bybit"
295        );
296        let parsed = serde_json::from_str::<ApiResponse<T>>(&text).map_err(|err| {
297            BrokerError::Serialization(format!(
298                "failed to decode Bybit response: {err}; body={}",
299                preview_json(&text)
300            ))
301        })?;
302        self.ensure_success(&parsed)?;
303        Ok(parsed)
304    }
305
306    fn map_time_in_force(tif: Option<TimeInForce>) -> &'static str {
307        match tif.unwrap_or(TimeInForce::GoodTilCanceled) {
308            TimeInForce::GoodTilCanceled => "GTC",
309            TimeInForce::ImmediateOrCancel => "IOC",
310            TimeInForce::FillOrKill => "FOK",
311        }
312    }
313
314    fn map_side(side: Side) -> &'static str {
315        match side {
316            Side::Buy => "Buy",
317            Side::Sell => "Sell",
318        }
319    }
320
321    fn map_order_type(order_type: tesser_core::OrderType) -> &'static str {
322        match order_type {
323            tesser_core::OrderType::Market => "Market",
324            tesser_core::OrderType::Limit => "Limit",
325            tesser_core::OrderType::StopMarket => "Market",
326        }
327    }
328
329    fn qty_string(qty: Quantity) -> String {
330        qty.normalize().to_string()
331    }
332
333    pub(crate) fn map_order_status(status: &str) -> OrderStatus {
334        match status {
335            "New" | "Created" | "PendingNew" | "Untriggered" => OrderStatus::PendingNew,
336            "Accepted" | "Active" | "Triggered" => OrderStatus::Accepted,
337            "Rejected" => OrderStatus::Rejected,
338            "PartiallyFilled" | "PartiallyFilledCanceled" => OrderStatus::PartiallyFilled,
339            "Filled" => OrderStatus::Filled,
340            "Cancelled" | "Canceled" | "Deactivated" => OrderStatus::Canceled,
341            other => {
342                warn!(status = other, "unhandled Bybit order status");
343                OrderStatus::PendingNew
344            }
345        }
346    }
347
348    /// Fetch executions (fills) since the provided timestamp (inclusive), paginated.
349    /// Maps the response into framework-native `Fill` records.
350    pub async fn list_executions_since(
351        &self,
352        since: chrono::DateTime<chrono::Utc>,
353    ) -> BrokerResult<Vec<BybitExecution>> {
354        let mut out: Vec<BybitExecution> = Vec::new();
355        let mut chunk_start = since.timestamp_millis();
356        let end_ms = chrono::Utc::now().timestamp_millis();
357        let min_start = end_ms.saturating_sub(EXECUTION_MAX_WINDOW_MS);
358        if chunk_start < min_start {
359            warn!(
360                requested_start = chunk_start,
361                clamped_start = min_start,
362                "Bybit execution history limited to 7 days; clamping startTime"
363            );
364            chunk_start = min_start;
365        }
366
367        while chunk_start < end_ms {
368            let chunk_end = (chunk_start + EXECUTION_MAX_WINDOW_MS - 1).min(end_ms);
369            let mut cursor: Option<String> = None;
370
371            loop {
372                let mut query = vec![
373                    ("category".to_string(), self.config.category.clone()),
374                    ("limit".to_string(), "100".to_string()),
375                    ("startTime".to_string(), chunk_start.to_string()),
376                    ("endTime".to_string(), chunk_end.to_string()),
377                ];
378                if let Some(ref cur) = cursor {
379                    query.push(("cursor".to_string(), cur.clone()));
380                }
381                let resp: ApiResponse<ExecutionListResult> = self
382                    .signed_request(Method::GET, "/v5/execution/list", Value::Null, Some(query))
383                    .await?;
384
385                for item in resp.result.list.into_iter() {
386                    // Filter only entries with non-zero quantity
387                    if item.exec_qty.is_empty() {
388                        continue;
389                    }
390                    let exec_qty: Decimal = item.exec_qty.parse().unwrap_or(Decimal::ZERO);
391                    if exec_qty.is_zero() {
392                        continue;
393                    }
394                    let price: Decimal = item.exec_price.parse().unwrap_or(Decimal::ZERO);
395                    let fee: Option<Decimal> = if item.exec_fee.is_empty() {
396                        None
397                    } else {
398                        item.exec_fee.parse::<Decimal>().ok()
399                    };
400                    let ts = millis_to_datetime(&item.exec_time);
401                    let side = match item.side.as_str() {
402                        "Buy" => Side::Buy,
403                        _ => Side::Sell,
404                    };
405                    let fee_asset = item
406                        .fee_currency
407                        .as_deref()
408                        .filter(|code| !code.is_empty())
409                        .map(|code| self.parse_asset(code));
410                    out.push(BybitExecution {
411                        fill: Fill {
412                            order_id: item.order_id,
413                            symbol: self.parse_symbol(&item.symbol),
414                            side,
415                            fill_price: price,
416                            fill_quantity: exec_qty,
417                            fee,
418                            fee_asset,
419                            timestamp: ts,
420                        },
421                        exec_id: Some(item.exec_id.clone()),
422                    });
423                }
424
425                if let Some(c) = resp.result.next_page_cursor {
426                    if c.is_empty() {
427                        break;
428                    }
429                    cursor = Some(c);
430                    continue;
431                }
432
433                break;
434            }
435
436            chunk_start = chunk_end.saturating_add(1);
437        }
438
439        Ok(out)
440    }
441}
442
443#[async_trait]
444impl ExecutionClient for BybitClient {
445    fn info(&self) -> BrokerInfo {
446        self.info.clone()
447    }
448
449    async fn place_order(&self, request: OrderRequest) -> BrokerResult<Order> {
450        let mut payload = serde_json::json!({
451            "category": self.config.category,
452            "symbol": Self::symbol_code(request.symbol),
453            "side": Self::map_side(request.side),
454            "qty": Self::qty_string(request.quantity),
455            "timeInForce": Self::map_time_in_force(request.time_in_force),
456            "price": request.price,
457            "orderLinkId": request.client_order_id,
458        });
459
460        match request.order_type {
461            tesser_core::OrderType::Market | tesser_core::OrderType::Limit => {
462                payload["orderType"] = serde_json::json!(Self::map_order_type(request.order_type));
463            }
464            tesser_core::OrderType::StopMarket => {
465                let trigger_price = request.trigger_price.ok_or_else(|| {
466                    BrokerError::InvalidRequest("StopMarket order requires a trigger_price".into())
467                })?;
468                payload["orderType"] = serde_json::json!("Market");
469                payload["triggerPrice"] = serde_json::json!(format!("{}", trigger_price));
470            }
471        }
472        let resp: ApiResponse<CreateOrderResult> = self
473            .signed_request(Method::POST, "/v5/order/create", payload, None)
474            .await?;
475        Ok(Order {
476            id: resp.result.order_id,
477            request,
478            status: OrderStatus::PendingNew,
479            filled_quantity: Decimal::ZERO,
480            avg_fill_price: None,
481            created_at: Utc::now(),
482            updated_at: Utc::now(),
483        })
484    }
485
486    async fn cancel_order(
487        &self,
488        order_id: tesser_core::OrderId,
489        symbol: Symbol,
490    ) -> BrokerResult<()> {
491        let payload = serde_json::json!({
492            "category": self.config.category,
493            "symbol": Self::symbol_code(symbol),
494            "orderId": order_id,
495        });
496        self.signed_request::<serde_json::Value>(Method::POST, "/v5/order/cancel", payload, None)
497            .await?;
498        Ok(())
499    }
500
501    async fn amend_order(&self, request: OrderUpdateRequest) -> BrokerResult<Order> {
502        let mut payload = serde_json::json!({
503            "category": self.config.category,
504            "symbol": Self::symbol_code(request.symbol),
505            "orderId": request.order_id,
506        });
507        if let Some(price) = request.new_price {
508            payload["price"] = serde_json::json!(price);
509        }
510        if let Some(quantity) = request.new_quantity {
511            payload["qty"] = serde_json::json!(Self::qty_string(quantity));
512        }
513        if payload.get("price").is_none() && payload.get("qty").is_none() {
514            return Err(BrokerError::InvalidRequest(
515                "amend requires price or quantity".into(),
516            ));
517        }
518        let resp: ApiResponse<CreateOrderResult> = self
519            .signed_request(Method::POST, "/v5/order/amend", payload, None)
520            .await?;
521        if let Ok(open_orders) = self.list_open_orders(request.symbol).await {
522            if let Some(order) = open_orders
523                .into_iter()
524                .find(|order| order.id == resp.result.order_id)
525            {
526                return Ok(order);
527            }
528        }
529        Ok(Order {
530            id: resp.result.order_id,
531            request: OrderRequest {
532                symbol: request.symbol,
533                side: request.side,
534                order_type: OrderType::Limit,
535                quantity: request.new_quantity.unwrap_or(Decimal::ZERO),
536                price: request.new_price,
537                trigger_price: None,
538                time_in_force: None,
539                client_order_id: None,
540                take_profit: None,
541                stop_loss: None,
542                display_quantity: None,
543            },
544            status: OrderStatus::PendingNew,
545            filled_quantity: Decimal::ZERO,
546            avg_fill_price: None,
547            created_at: Utc::now(),
548            updated_at: Utc::now(),
549        })
550    }
551
552    async fn list_open_orders(&self, symbol: Symbol) -> BrokerResult<Vec<Order>> {
553        let query = vec![
554            ("category".to_string(), self.config.category.clone()),
555            ("symbol".to_string(), Self::symbol_code(symbol).to_string()),
556            ("openOnly".to_string(), "0".into()),
557        ];
558        let resp: ApiResponse<OpenOrdersResult> = self
559            .signed_request(Method::GET, "/v5/order/realtime", Value::Null, Some(query))
560            .await?;
561        let orders = resp
562            .result
563            .list
564            .into_iter()
565            .map(|item| {
566                let symbol = self.parse_symbol(&item.symbol);
567                Order {
568                    id: item.order_id,
569                    request: OrderRequest {
570                        symbol,
571                        side: if item.side == "Buy" {
572                            Side::Buy
573                        } else {
574                            Side::Sell
575                        },
576                        order_type: if item.trigger_price.is_some() {
577                            tesser_core::OrderType::StopMarket
578                        } else if item.order_type == "Market" {
579                            tesser_core::OrderType::Market
580                        } else {
581                            tesser_core::OrderType::Limit
582                        },
583                        quantity: item.qty.parse().unwrap_or(Decimal::ZERO),
584                        price: item.price.parse::<Decimal>().ok(),
585                        trigger_price: item
586                            .trigger_price
587                            .as_deref()
588                            .and_then(|value| value.parse::<Decimal>().ok()),
589                        time_in_force: None,
590                        client_order_id: Some(item.order_link_id),
591                        take_profit: None,
592                        stop_loss: None,
593                        display_quantity: None,
594                    },
595                    status: Self::map_order_status(&item.order_status),
596                    filled_quantity: item.cum_exec_qty.parse().unwrap_or(Decimal::ZERO),
597                    avg_fill_price: item.avg_price.parse::<Decimal>().ok(),
598                    created_at: millis_to_datetime(&item.created_time),
599                    updated_at: millis_to_datetime(&item.updated_time),
600                }
601            })
602            .collect();
603        Ok(orders)
604    }
605
606    async fn account_balances(&self) -> BrokerResult<Vec<AccountBalance>> {
607        let query = vec![("accountType".to_string(), "UNIFIED".into())];
608        let resp: ApiResponse<WalletBalanceResult> = self
609            .signed_request(
610                Method::GET,
611                "/v5/account/wallet-balance",
612                Value::Null,
613                Some(query),
614            )
615            .await?;
616        let mut balances = Vec::new();
617        for account in resp.result.list {
618            for coin in account.coin {
619                let total = coin.wallet_balance.parse().unwrap_or(Decimal::ZERO);
620                let available = coin
621                    .available_to_withdraw
622                    .as_deref()
623                    .unwrap_or("0")
624                    .parse()
625                    .unwrap_or(Decimal::ZERO);
626                balances.push(AccountBalance {
627                    exchange: self.exchange,
628                    asset: self.parse_asset(&coin.coin),
629                    total,
630                    available,
631                    updated_at: Utc::now(),
632                });
633            }
634        }
635        Ok(balances)
636    }
637
638    async fn positions(&self, symbols: Option<&Vec<Symbol>>) -> BrokerResult<Vec<Position>> {
639        if self.config.category == "linear" && symbols.is_none() {
640            return Err(BrokerError::InvalidRequest(
641                "Bybit linear positions require symbols".into(),
642            ));
643        }
644
645        let mut positions = Vec::new();
646        for symbol in symbols.unwrap_or(&vec![]) {
647            let query = vec![
648                ("category".to_string(), self.config.category.clone()),
649                ("symbol".to_string(), Self::symbol_code(*symbol).to_string()),
650            ];
651            let resp: ApiResponse<PositionListResult> = self
652                .signed_request(Method::GET, "/v5/position/list", Value::Null, Some(query))
653                .await?;
654            for item in resp.result.list {
655                let quantity: Decimal = item.size.parse().map_err(|err| {
656                    BrokerError::from_display(err, BrokerErrorKind::Serialization)
657                })?;
658                if quantity.is_zero() {
659                    continue;
660                }
661                let entry_price = item.avg_price.parse().ok();
662                positions.push(Position {
663                    symbol: self.parse_symbol(&item.symbol),
664                    side: match item.side.as_str() {
665                        "Buy" => Some(Side::Buy),
666                        "Sell" => Some(Side::Sell),
667                        _ => None,
668                    },
669                    quantity,
670                    entry_price,
671                    unrealized_pnl: item.unrealised_pnl.parse().unwrap_or(Decimal::ZERO),
672                    updated_at: millis_to_datetime(&item.updated_time),
673                });
674            }
675        }
676        Ok(positions)
677    }
678
679    async fn list_instruments(&self, category: &str) -> BrokerResult<Vec<Instrument>> {
680        let path = format!("/v5/market/instruments-info?category={}", category);
681        let resp: ApiResponse<InstrumentInfoResult> = self.public_get(&path).await?;
682        self.ensure_success(&resp)?;
683        let mut instruments = Vec::new();
684        for item in resp.result.list {
685            let tick_size = item.price_filter.tick_size.parse().unwrap_or(Decimal::ZERO);
686            let lot_size = item
687                .lot_size_filter
688                .qty_step
689                .as_deref()
690                .and_then(|value| value.parse().ok())
691                .unwrap_or(Decimal::ZERO);
692            let settlement = item
693                .settle_coin
694                .clone()
695                .unwrap_or_else(|| item.quote_coin.clone());
696            instruments.push(Instrument {
697                symbol: self.parse_symbol(&item.symbol),
698                base: self.parse_asset(&item.base_coin),
699                quote: self.parse_asset(&item.quote_coin),
700                kind: map_instrument_kind(item.contract_type.as_deref(), category),
701                settlement_currency: self.parse_asset(&settlement),
702                tick_size,
703                lot_size,
704            });
705        }
706        Ok(instruments)
707    }
708
709    fn as_any(&self) -> &dyn Any {
710        self
711    }
712}
713
714fn map_instrument_kind(contract_type: Option<&str>, category: &str) -> InstrumentKind {
715    match contract_type {
716        Some("InversePerpetual") => InstrumentKind::InversePerpetual,
717        Some("LinearPerpetual") => InstrumentKind::LinearPerpetual,
718        _ => match category {
719            "inverse" => InstrumentKind::InversePerpetual,
720            "spot" => InstrumentKind::Spot,
721            _ => InstrumentKind::LinearPerpetual,
722        },
723    }
724}
725
726fn preview_json(body: &str) -> String {
727    const MAX_CHARS: usize = 2048;
728    if body.chars().count() <= MAX_CHARS {
729        return body.to_string();
730    }
731    let truncated: String = body.chars().take(MAX_CHARS).collect();
732    format!("{truncated}… <{} chars total>", body.chars().count())
733}
734
735pub(crate) fn millis_to_datetime(value: &str) -> chrono::DateTime<Utc> {
736    value
737        .parse::<i64>()
738        .ok()
739        .and_then(chrono::DateTime::<Utc>::from_timestamp_millis)
740        .unwrap_or_else(Utc::now)
741}
742
743#[derive(Deserialize)]
744struct ApiResponse<T> {
745    #[serde(rename = "retCode")]
746    ret_code: i64,
747    #[serde(rename = "retMsg")]
748    ret_msg: String,
749    result: T,
750}
751
752#[derive(Deserialize)]
753struct ServerTimeResult {
754    #[serde(rename = "timeNano")]
755    time_nano: String,
756}
757
758#[derive(Deserialize)]
759struct CreateOrderResult {
760    #[serde(rename = "orderId")]
761    order_id: String,
762}
763
764#[derive(Deserialize)]
765struct InstrumentInfoResult {
766    list: Vec<InstrumentInfoItem>,
767}
768
769#[derive(Deserialize)]
770struct InstrumentInfoItem {
771    symbol: String,
772    #[serde(rename = "baseCoin")]
773    base_coin: String,
774    #[serde(rename = "quoteCoin")]
775    quote_coin: String,
776    #[serde(rename = "settleCoin")]
777    settle_coin: Option<String>,
778    #[serde(rename = "contractType")]
779    contract_type: Option<String>,
780    #[serde(rename = "priceFilter")]
781    price_filter: InstrumentPriceFilter,
782    #[serde(rename = "lotSizeFilter")]
783    lot_size_filter: InstrumentLotFilter,
784}
785
786#[derive(Deserialize)]
787struct InstrumentPriceFilter {
788    #[serde(rename = "tickSize")]
789    tick_size: String,
790}
791
792#[derive(Deserialize)]
793struct InstrumentLotFilter {
794    #[serde(rename = "qtyStep")]
795    qty_step: Option<String>,
796}
797
798#[derive(Deserialize)]
799struct OpenOrdersResult {
800    list: Vec<OrderItem>,
801}
802
803#[derive(Deserialize)]
804struct OrderItem {
805    #[serde(rename = "orderId")]
806    order_id: String,
807    #[serde(rename = "orderLinkId")]
808    order_link_id: String,
809    symbol: String,
810    price: String,
811    qty: String,
812    side: String,
813    #[serde(rename = "orderStatus")]
814    order_status: String,
815    #[serde(rename = "orderType")]
816    order_type: String,
817    #[serde(rename = "triggerPrice")]
818    trigger_price: Option<String>,
819    #[serde(rename = "cumExecQty")]
820    cum_exec_qty: String,
821    #[serde(rename = "avgPrice")]
822    avg_price: String,
823    #[serde(rename = "createdTime")]
824    created_time: String,
825    #[serde(rename = "updatedTime")]
826    updated_time: String,
827}
828
829#[derive(Deserialize)]
830struct WalletBalanceResult {
831    list: Vec<AccountEntry>,
832}
833
834#[derive(Deserialize)]
835struct AccountEntry {
836    coin: Vec<CoinBalance>,
837}
838
839#[derive(Deserialize)]
840struct CoinBalance {
841    coin: String,
842    #[serde(rename = "walletBalance")]
843    wallet_balance: String,
844    #[serde(rename = "availableToWithdraw")]
845    available_to_withdraw: Option<String>,
846}
847
848#[derive(Deserialize)]
849struct PositionListResult {
850    list: Vec<PositionItem>,
851}
852
853#[derive(Deserialize)]
854struct PositionItem {
855    symbol: String,
856    side: String,
857    size: String,
858    #[serde(rename = "avgPrice")]
859    avg_price: String,
860    #[serde(rename = "unrealisedPnl")]
861    unrealised_pnl: String,
862    #[serde(rename = "updatedTime")]
863    updated_time: String,
864}
865
866#[derive(Deserialize)]
867struct ExecutionListResult {
868    #[serde(rename = "nextPageCursor")]
869    next_page_cursor: Option<String>,
870    #[allow(dead_code)]
871    category: String,
872    list: Vec<ExecutionListItem>,
873}
874
875#[derive(Deserialize)]
876struct ExecutionListItem {
877    #[serde(rename = "execId")]
878    exec_id: String,
879    symbol: String,
880    #[serde(rename = "orderId")]
881    order_id: String,
882    side: String,
883    #[serde(rename = "execPrice")]
884    exec_price: String,
885    #[serde(rename = "execQty")]
886    exec_qty: String,
887    #[serde(rename = "execFee")]
888    exec_fee: String,
889    #[serde(rename = "feeCurrency")]
890    fee_currency: Option<String>,
891    #[serde(rename = "execTime")]
892    exec_time: String,
893}
894
895#[derive(Clone, Debug, Deserialize)]
896struct BybitConnectorConfig {
897    #[serde(default = "default_rest_url")]
898    rest_url: String,
899    #[serde(default = "default_ws_url")]
900    ws_url: String,
901    #[serde(default)]
902    exchange: Option<String>,
903    #[serde(default = "default_category")]
904    category: String,
905    #[serde(default)]
906    api_key: String,
907    #[serde(default)]
908    api_secret: String,
909    #[serde(default = "default_recv_window")]
910    recv_window: u64,
911    #[serde(default = "default_settle_coin")]
912    settle_coin: Option<String>,
913    #[serde(default = "default_rate_limit_tier")]
914    rate_limit_tier: RateLimitTier,
915    #[serde(default)]
916    private_rps: Option<u32>,
917    #[serde(default)]
918    public_rps: Option<u32>,
919}
920
921fn default_rest_url() -> String {
922    "https://api.bybit.com".into()
923}
924
925fn default_ws_url() -> String {
926    "wss://stream.bybit.com".into()
927}
928
929fn default_category() -> String {
930    "linear".into()
931}
932
933fn resolve_exchange_id(cfg: &BybitConnectorConfig) -> ExchangeId {
934    let default_name = format!("bybit_{}", cfg.category.trim().to_ascii_lowercase());
935    let name = cfg
936        .exchange
937        .as_deref()
938        .map(str::trim)
939        .filter(|value| !value.is_empty())
940        .map(ToOwned::to_owned)
941        .unwrap_or(default_name);
942    ExchangeId::from(name.as_str())
943}
944
945fn default_recv_window() -> u64 {
946    5_000
947}
948
949fn default_settle_coin() -> Option<String> {
950    Some("USDT".into())
951}
952
953fn default_rate_limit_tier() -> RateLimitTier {
954    RateLimitTier::Standard
955}
956
957#[derive(Clone, Copy, Debug, Deserialize)]
958#[serde(rename_all = "snake_case")]
959enum RateLimitTier {
960    Standard,
961    Pro,
962    Vip,
963}
964
965impl RateLimitTier {
966    fn private_rps(self) -> u32 {
967        match self {
968            RateLimitTier::Standard => 10,
969            RateLimitTier::Pro => 20,
970            RateLimitTier::Vip => 50,
971        }
972    }
973}
974
975#[derive(Default)]
976pub struct BybitFactory;
977
978impl BybitFactory {
979    fn parse_config(&self, value: &Value) -> BrokerResult<BybitConnectorConfig> {
980        serde_json::from_value(value.clone()).map_err(|err| {
981            BrokerError::InvalidRequest(format!("invalid bybit connector config: {err}"))
982        })
983    }
984
985    fn credentials(cfg: &BybitConnectorConfig) -> Option<BybitCredentials> {
986        if cfg.api_key.trim().is_empty() || cfg.api_secret.trim().is_empty() {
987            None
988        } else {
989            Some(BybitCredentials {
990                api_key: cfg.api_key.clone(),
991                api_secret: cfg.api_secret.clone(),
992            })
993        }
994    }
995}
996
997const BYBIT_DEFAULT_DEPTH: usize = 50;
998const BYBIT_PUBLIC_DEFAULT_RPS: u32 = 50;
999
1000pub fn register_factory() {
1001    register_connector_factory(Arc::new(BybitFactory));
1002}
1003
1004fn rate_limits_from_config(cfg: &BybitConnectorConfig) -> (Option<Quota>, Option<Quota>) {
1005    let private_rps = cfg
1006        .private_rps
1007        .unwrap_or_else(|| cfg.rate_limit_tier.private_rps());
1008    let public_rps = cfg.public_rps.unwrap_or(BYBIT_PUBLIC_DEFAULT_RPS);
1009    (quota_from_rps(public_rps), quota_from_rps(private_rps))
1010}
1011
1012fn quota_from_rps(rps: u32) -> Option<Quota> {
1013    NonZeroU32::new(rps).map(Quota::per_second)
1014}
1015
1016#[async_trait]
1017impl ConnectorFactory for BybitFactory {
1018    fn name(&self) -> &str {
1019        "bybit"
1020    }
1021
1022    async fn create_execution_client(
1023        &self,
1024        config: &Value,
1025    ) -> BrokerResult<Arc<dyn ExecutionClient>> {
1026        let cfg = self.parse_config(config)?;
1027        let exchange = resolve_exchange_id(&cfg);
1028        let (public_quota, private_quota) = rate_limits_from_config(&cfg);
1029        let bybit_cfg = BybitConfig {
1030            base_url: cfg.rest_url.clone(),
1031            category: cfg.category.clone(),
1032            recv_window: cfg.recv_window,
1033            ws_url: Some(cfg.ws_url.clone()),
1034            public_quota,
1035            private_quota,
1036            settle_coin: cfg.settle_coin.clone(),
1037        };
1038        Ok(Arc::new(BybitClient::new(
1039            bybit_cfg,
1040            Self::credentials(&cfg),
1041            exchange,
1042        )))
1043    }
1044
1045    async fn create_market_stream(
1046        &self,
1047        config: &Value,
1048        stream_config: ConnectorStreamConfig,
1049    ) -> BrokerResult<Box<dyn ConnectorStream>> {
1050        let cfg = self.parse_config(config)?;
1051        let exchange = resolve_exchange_id(&cfg);
1052        let channel = PublicChannel::from_str(&cfg.category)
1053            .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
1054        let stream = BybitMarketStream::connect_public(
1055            &cfg.ws_url,
1056            channel,
1057            stream_config.connection_status,
1058            exchange,
1059        )
1060        .await?;
1061        Ok(Box::new(BybitConnectorStream::new(
1062            stream,
1063            stream_config
1064                .metadata
1065                .get("orderbook_depth")
1066                .and_then(|v| v.as_u64())
1067                .map(|v| v as usize)
1068                .unwrap_or(BYBIT_DEFAULT_DEPTH),
1069        )))
1070    }
1071}
1072
1073struct BybitConnectorStream {
1074    inner: BybitMarketStream,
1075    depth: usize,
1076}
1077
1078impl BybitConnectorStream {
1079    fn new(inner: BybitMarketStream, depth: usize) -> Self {
1080        Self { inner, depth }
1081    }
1082}
1083
1084#[async_trait]
1085impl ConnectorStream for BybitConnectorStream {
1086    async fn subscribe(
1087        &mut self,
1088        symbols: &[String],
1089        interval: tesser_core::Interval,
1090    ) -> BrokerResult<()> {
1091        for symbol in symbols {
1092            self.inner
1093                .subscribe(BybitSubscription::Trades {
1094                    symbol: symbol.clone(),
1095                })
1096                .await?;
1097            self.inner
1098                .subscribe(BybitSubscription::Kline {
1099                    symbol: symbol.clone(),
1100                    interval,
1101                })
1102                .await?;
1103            self.inner
1104                .subscribe(BybitSubscription::OrderBook {
1105                    symbol: symbol.clone(),
1106                    depth: self.depth.clamp(1, 200),
1107                })
1108                .await?;
1109        }
1110        Ok(())
1111    }
1112
1113    async fn next_tick(&mut self) -> BrokerResult<Option<tesser_core::Tick>> {
1114        self.inner.next_tick().await
1115    }
1116
1117    async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
1118        self.inner.next_candle().await
1119    }
1120
1121    async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
1122        self.inner.next_order_book().await
1123    }
1124}
1125
1126#[cfg(test)]
1127mod tests {
1128    use super::*;
1129
1130    #[test]
1131    fn signature_matches_docs_example() {
1132        let creds = BybitCredentials {
1133            api_key: "XXXXXXXXXX".into(),
1134            api_secret: "sec".repeat(10),
1135        };
1136        let payload = format!(
1137            "{}{}{}{}",
1138            1_658_385_579_423i64, creds.api_key, 5_000, r#"{"category": "option"}"#
1139        );
1140        let mut mac = HmacSha256::new_from_slice(creds.api_secret.as_bytes()).expect("init mac");
1141        mac.update(payload.as_bytes());
1142        let signature = hex::encode(mac.finalize().into_bytes());
1143        assert_eq!(
1144            signature.len(),
1145            64,
1146            "signature should be 256-bit hex encoded"
1147        );
1148    }
1149}