1use std::{any::Any, num::NonZeroU32, str::FromStr, sync::Arc, time::Duration};
7
8use async_trait::async_trait;
9use chrono::Utc;
10use hmac::{Hmac, Mac};
11use reqwest::{Client, Method};
12use rust_decimal::Decimal;
13use serde::{de::DeserializeOwned, Deserialize};
14use serde_json::Value;
15use sha2::Sha256;
16use tesser_broker::{
17 register_connector_factory, BrokerError, BrokerErrorKind, BrokerInfo, BrokerResult,
18 ConnectorFactory, ConnectorStream, ConnectorStreamConfig, ExecutionClient, MarketStream, Quota,
19 RateLimiter, RateLimiterError,
20};
21use tesser_core::{
22 AccountBalance, AssetId, Candle, ExchangeId, Fill, Instrument, InstrumentKind, Order,
23 OrderBook, OrderRequest, OrderStatus, OrderType, OrderUpdateRequest, Position, Quantity, Side,
24 Symbol, TimeInForce,
25};
26use tracing::{trace, warn};
27
28pub mod ws;
29
30pub use ws::{BybitMarketStream, BybitSubscription, PublicChannel};
31
32type HmacSha256 = Hmac<Sha256>;
33const EXECUTION_MAX_WINDOW_MS: i64 = 7 * 24 * 60 * 60 * 1000;
34
35#[derive(Clone)]
37pub struct BybitCredentials {
38 pub api_key: String,
39 pub api_secret: String,
40}
41
42pub struct BybitConfig {
44 pub base_url: String,
45 pub category: String,
46 pub recv_window: u64,
47 pub ws_url: Option<String>,
48 pub public_quota: Option<Quota>,
49 pub private_quota: Option<Quota>,
50 pub settle_coin: Option<String>,
51}
52
53impl Default for BybitConfig {
54 fn default() -> Self {
55 Self {
56 base_url: "https://api-testnet.bybit.com".into(),
57 category: "linear".into(),
58 recv_window: 5_000,
59 ws_url: None,
60 public_quota: None,
61 private_quota: None,
62 settle_coin: Some("USDT".into()),
63 }
64 }
65}
66
67pub struct BybitClient {
69 http: Client,
70 config: BybitConfig,
71 credentials: Option<BybitCredentials>,
72 info: BrokerInfo,
73 public_limiter: Option<RateLimiter>,
74 private_limiter: Option<RateLimiter>,
75 exchange: ExchangeId,
76}
77
78#[derive(Clone, Debug)]
80pub struct BybitExecution {
81 pub fill: Fill,
82 pub exec_id: Option<String>,
83}
84
85impl BybitClient {
86 pub fn new(
88 config: BybitConfig,
89 credentials: Option<BybitCredentials>,
90 exchange: ExchangeId,
91 ) -> Self {
92 let http = Client::builder()
93 .connect_timeout(Duration::from_secs(5))
94 .timeout(Duration::from_secs(10))
95 .build()
96 .expect("failed to create reqwest client");
97 let public_limiter = config.public_quota.map(RateLimiter::direct);
98 let private_limiter = config.private_quota.map(RateLimiter::keyed);
99 Self {
100 info: BrokerInfo {
101 name: "bybit".into(),
102 markets: vec![config.category.clone()],
103 supports_testnet: config.base_url.contains("testnet"),
104 },
105 http,
106 config,
107 credentials,
108 public_limiter,
109 private_limiter,
110 exchange,
111 }
112 }
113
114 pub fn testnet(credentials: Option<BybitCredentials>) -> Self {
116 Self::new(
117 BybitConfig::default(),
118 credentials,
119 ExchangeId::from("bybit_linear"),
120 )
121 }
122
123 pub fn get_credentials(&self) -> Option<BybitCredentials> {
124 self.credentials.clone()
125 }
126
127 pub fn exchange(&self) -> ExchangeId {
128 self.exchange
129 }
130
131 pub fn get_ws_url(&self) -> String {
132 self.config
133 .ws_url
134 .clone()
135 .unwrap_or_else(|| self.config.base_url.replace("https://api", "wss://stream"))
136 }
137
138 fn symbol_code(symbol: Symbol) -> &'static str {
139 symbol.code()
140 }
141
142 fn parse_symbol(&self, value: &str) -> Symbol {
143 Symbol::from_code(self.exchange, value)
144 }
145
146 fn parse_asset(&self, value: &str) -> AssetId {
147 AssetId::from_code(self.exchange, value)
148 }
149
150 async fn throttle_public(&self) -> BrokerResult<()> {
151 if let Some(limiter) = &self.public_limiter {
152 limiter
153 .until_ready()
154 .await
155 .map_err(Self::rate_limiter_error)?;
156 }
157 Ok(())
158 }
159
160 async fn throttle_private(&self) -> BrokerResult<()> {
161 if let (Some(limiter), Some(creds)) = (&self.private_limiter, &self.credentials) {
162 limiter
163 .until_key_ready(&creds.api_key)
164 .await
165 .map_err(Self::rate_limiter_error)?;
166 }
167 Ok(())
168 }
169
170 pub async fn server_time(&self) -> BrokerResult<u128> {
172 let resp: ApiResponse<ServerTimeResult> = self.public_get("/v5/market/time").await?;
173 self.ensure_success(&resp)?;
174 resp.result
175 .time_nano
176 .parse::<u128>()
177 .map_err(|err| BrokerError::Serialization(err.to_string()))
178 }
179
180 fn url(&self, path: &str) -> String {
181 format!("{}/{}", self.config.base_url, path.trim_start_matches('/'))
182 }
183
184 fn ensure_success<T>(&self, resp: &ApiResponse<T>) -> BrokerResult<()> {
185 if resp.ret_code == 0 {
186 Ok(())
187 } else {
188 Err(BrokerError::Exchange(format!(
189 "{} (code {})",
190 resp.ret_msg, resp.ret_code
191 )))
192 }
193 }
194
195 async fn public_get<T>(&self, path: &str) -> BrokerResult<ApiResponse<T>>
196 where
197 T: DeserializeOwned,
198 {
199 self.throttle_public().await?;
200 let url = self.url(path);
201 self.http
202 .get(url)
203 .send()
204 .await
205 .map_err(|err| BrokerError::Transport(err.to_string()))?
206 .json::<ApiResponse<T>>()
207 .await
208 .map_err(|err| BrokerError::Serialization(err.to_string()))
209 }
210
211 fn creds(&self) -> BrokerResult<&BybitCredentials> {
212 self.credentials
213 .as_ref()
214 .ok_or_else(|| BrokerError::Authentication("missing Bybit credentials".into()))
215 }
216
217 fn rate_limiter_error(err: RateLimiterError) -> BrokerError {
218 BrokerError::Other(format!("rate limited: {err}"))
219 }
220
221 async fn signed_request<T>(
222 &self,
223 method: Method,
224 path: &str,
225 body: Value,
226 query: Option<Vec<(String, String)>>,
227 ) -> BrokerResult<ApiResponse<T>>
228 where
229 T: DeserializeOwned,
230 {
231 let creds = self.creds()?;
232 self.throttle_private().await?;
233 let timestamp = Utc::now().timestamp_millis();
234 let query_string = query
235 .as_ref()
236 .map(|pairs| serde_urlencoded::to_string(pairs).unwrap_or_default())
237 .unwrap_or_default();
238 let payload = if method == Method::GET {
239 format!(
240 "{timestamp}{}{}{}",
241 creds.api_key, self.config.recv_window, query_string
242 )
243 } else {
244 let body_string = body.to_string();
245 format!(
246 "{timestamp}{}{}{}",
247 creds.api_key, self.config.recv_window, body_string
248 )
249 };
250 let mut mac = HmacSha256::new_from_slice(creds.api_secret.as_bytes())
251 .map_err(|err| BrokerError::Other(format!("failed to create signing key: {err}")))?;
252 mac.update(payload.as_bytes());
253 let signature = hex::encode(mac.finalize().into_bytes());
254 let url = if query_string.is_empty() {
255 self.url(path)
256 } else {
257 format!("{}?{}", self.url(path), query_string)
258 };
259 trace!(
260 method = %method,
261 path = %path,
262 url = %url,
263 query = %query_string,
264 body = ?if method == Method::GET {
265 serde_json::Value::Null
266 } else {
267 body.clone()
268 },
269 "sending Bybit signed request"
270 );
271 let mut request = self.http.request(method.clone(), url);
272 request = request
273 .header("X-BAPI-API-KEY", &creds.api_key)
274 .header("X-BAPI-TIMESTAMP", timestamp.to_string())
275 .header("X-BAPI-SIGN", signature)
276 .header("X-BAPI-RECV-WINDOW", self.config.recv_window.to_string())
277 .header("Content-Type", "application/json");
278 if method != Method::GET {
279 request = request.json(&body);
280 }
281 let response = request
282 .send()
283 .await
284 .map_err(|err| BrokerError::Transport(err.to_string()))?;
285 let status = response.status();
286 let text = response
287 .text()
288 .await
289 .map_err(|err| BrokerError::Serialization(err.to_string()))?;
290 trace!(
291 path = %path,
292 status = %status,
293 body = %preview_json(&text),
294 "received response from Bybit"
295 );
296 let parsed = serde_json::from_str::<ApiResponse<T>>(&text).map_err(|err| {
297 BrokerError::Serialization(format!(
298 "failed to decode Bybit response: {err}; body={}",
299 preview_json(&text)
300 ))
301 })?;
302 self.ensure_success(&parsed)?;
303 Ok(parsed)
304 }
305
306 fn map_time_in_force(tif: Option<TimeInForce>) -> &'static str {
307 match tif.unwrap_or(TimeInForce::GoodTilCanceled) {
308 TimeInForce::GoodTilCanceled => "GTC",
309 TimeInForce::ImmediateOrCancel => "IOC",
310 TimeInForce::FillOrKill => "FOK",
311 }
312 }
313
314 fn map_side(side: Side) -> &'static str {
315 match side {
316 Side::Buy => "Buy",
317 Side::Sell => "Sell",
318 }
319 }
320
321 fn map_order_type(order_type: tesser_core::OrderType) -> &'static str {
322 match order_type {
323 tesser_core::OrderType::Market => "Market",
324 tesser_core::OrderType::Limit => "Limit",
325 tesser_core::OrderType::StopMarket => "Market",
326 }
327 }
328
329 fn qty_string(qty: Quantity) -> String {
330 qty.normalize().to_string()
331 }
332
333 pub(crate) fn map_order_status(status: &str) -> OrderStatus {
334 match status {
335 "New" | "Created" | "PendingNew" | "Untriggered" => OrderStatus::PendingNew,
336 "Accepted" | "Active" | "Triggered" => OrderStatus::Accepted,
337 "Rejected" => OrderStatus::Rejected,
338 "PartiallyFilled" | "PartiallyFilledCanceled" => OrderStatus::PartiallyFilled,
339 "Filled" => OrderStatus::Filled,
340 "Cancelled" | "Canceled" | "Deactivated" => OrderStatus::Canceled,
341 other => {
342 warn!(status = other, "unhandled Bybit order status");
343 OrderStatus::PendingNew
344 }
345 }
346 }
347
348 pub async fn list_executions_since(
351 &self,
352 since: chrono::DateTime<chrono::Utc>,
353 ) -> BrokerResult<Vec<BybitExecution>> {
354 let mut out: Vec<BybitExecution> = Vec::new();
355 let mut chunk_start = since.timestamp_millis();
356 let end_ms = chrono::Utc::now().timestamp_millis();
357 let min_start = end_ms.saturating_sub(EXECUTION_MAX_WINDOW_MS);
358 if chunk_start < min_start {
359 warn!(
360 requested_start = chunk_start,
361 clamped_start = min_start,
362 "Bybit execution history limited to 7 days; clamping startTime"
363 );
364 chunk_start = min_start;
365 }
366
367 while chunk_start < end_ms {
368 let chunk_end = (chunk_start + EXECUTION_MAX_WINDOW_MS - 1).min(end_ms);
369 let mut cursor: Option<String> = None;
370
371 loop {
372 let mut query = vec![
373 ("category".to_string(), self.config.category.clone()),
374 ("limit".to_string(), "100".to_string()),
375 ("startTime".to_string(), chunk_start.to_string()),
376 ("endTime".to_string(), chunk_end.to_string()),
377 ];
378 if let Some(ref cur) = cursor {
379 query.push(("cursor".to_string(), cur.clone()));
380 }
381 let resp: ApiResponse<ExecutionListResult> = self
382 .signed_request(Method::GET, "/v5/execution/list", Value::Null, Some(query))
383 .await?;
384
385 for item in resp.result.list.into_iter() {
386 if item.exec_qty.is_empty() {
388 continue;
389 }
390 let exec_qty: Decimal = item.exec_qty.parse().unwrap_or(Decimal::ZERO);
391 if exec_qty.is_zero() {
392 continue;
393 }
394 let price: Decimal = item.exec_price.parse().unwrap_or(Decimal::ZERO);
395 let fee: Option<Decimal> = if item.exec_fee.is_empty() {
396 None
397 } else {
398 item.exec_fee.parse::<Decimal>().ok()
399 };
400 let ts = millis_to_datetime(&item.exec_time);
401 let side = match item.side.as_str() {
402 "Buy" => Side::Buy,
403 _ => Side::Sell,
404 };
405 let fee_asset = item
406 .fee_currency
407 .as_deref()
408 .filter(|code| !code.is_empty())
409 .map(|code| self.parse_asset(code));
410 out.push(BybitExecution {
411 fill: Fill {
412 order_id: item.order_id,
413 symbol: self.parse_symbol(&item.symbol),
414 side,
415 fill_price: price,
416 fill_quantity: exec_qty,
417 fee,
418 fee_asset,
419 timestamp: ts,
420 },
421 exec_id: Some(item.exec_id.clone()),
422 });
423 }
424
425 if let Some(c) = resp.result.next_page_cursor {
426 if c.is_empty() {
427 break;
428 }
429 cursor = Some(c);
430 continue;
431 }
432
433 break;
434 }
435
436 chunk_start = chunk_end.saturating_add(1);
437 }
438
439 Ok(out)
440 }
441}
442
443#[async_trait]
444impl ExecutionClient for BybitClient {
445 fn info(&self) -> BrokerInfo {
446 self.info.clone()
447 }
448
449 async fn place_order(&self, request: OrderRequest) -> BrokerResult<Order> {
450 let mut payload = serde_json::json!({
451 "category": self.config.category,
452 "symbol": Self::symbol_code(request.symbol),
453 "side": Self::map_side(request.side),
454 "qty": Self::qty_string(request.quantity),
455 "timeInForce": Self::map_time_in_force(request.time_in_force),
456 "price": request.price,
457 "orderLinkId": request.client_order_id,
458 });
459
460 match request.order_type {
461 tesser_core::OrderType::Market | tesser_core::OrderType::Limit => {
462 payload["orderType"] = serde_json::json!(Self::map_order_type(request.order_type));
463 }
464 tesser_core::OrderType::StopMarket => {
465 let trigger_price = request.trigger_price.ok_or_else(|| {
466 BrokerError::InvalidRequest("StopMarket order requires a trigger_price".into())
467 })?;
468 payload["orderType"] = serde_json::json!("Market");
469 payload["triggerPrice"] = serde_json::json!(format!("{}", trigger_price));
470 }
471 }
472 let resp: ApiResponse<CreateOrderResult> = self
473 .signed_request(Method::POST, "/v5/order/create", payload, None)
474 .await?;
475 Ok(Order {
476 id: resp.result.order_id,
477 request,
478 status: OrderStatus::PendingNew,
479 filled_quantity: Decimal::ZERO,
480 avg_fill_price: None,
481 created_at: Utc::now(),
482 updated_at: Utc::now(),
483 })
484 }
485
486 async fn cancel_order(
487 &self,
488 order_id: tesser_core::OrderId,
489 symbol: Symbol,
490 ) -> BrokerResult<()> {
491 let payload = serde_json::json!({
492 "category": self.config.category,
493 "symbol": Self::symbol_code(symbol),
494 "orderId": order_id,
495 });
496 self.signed_request::<serde_json::Value>(Method::POST, "/v5/order/cancel", payload, None)
497 .await?;
498 Ok(())
499 }
500
501 async fn amend_order(&self, request: OrderUpdateRequest) -> BrokerResult<Order> {
502 let mut payload = serde_json::json!({
503 "category": self.config.category,
504 "symbol": Self::symbol_code(request.symbol),
505 "orderId": request.order_id,
506 });
507 if let Some(price) = request.new_price {
508 payload["price"] = serde_json::json!(price);
509 }
510 if let Some(quantity) = request.new_quantity {
511 payload["qty"] = serde_json::json!(Self::qty_string(quantity));
512 }
513 if payload.get("price").is_none() && payload.get("qty").is_none() {
514 return Err(BrokerError::InvalidRequest(
515 "amend requires price or quantity".into(),
516 ));
517 }
518 let resp: ApiResponse<CreateOrderResult> = self
519 .signed_request(Method::POST, "/v5/order/amend", payload, None)
520 .await?;
521 if let Ok(open_orders) = self.list_open_orders(request.symbol).await {
522 if let Some(order) = open_orders
523 .into_iter()
524 .find(|order| order.id == resp.result.order_id)
525 {
526 return Ok(order);
527 }
528 }
529 Ok(Order {
530 id: resp.result.order_id,
531 request: OrderRequest {
532 symbol: request.symbol,
533 side: request.side,
534 order_type: OrderType::Limit,
535 quantity: request.new_quantity.unwrap_or(Decimal::ZERO),
536 price: request.new_price,
537 trigger_price: None,
538 time_in_force: None,
539 client_order_id: None,
540 take_profit: None,
541 stop_loss: None,
542 display_quantity: None,
543 },
544 status: OrderStatus::PendingNew,
545 filled_quantity: Decimal::ZERO,
546 avg_fill_price: None,
547 created_at: Utc::now(),
548 updated_at: Utc::now(),
549 })
550 }
551
552 async fn list_open_orders(&self, symbol: Symbol) -> BrokerResult<Vec<Order>> {
553 let query = vec![
554 ("category".to_string(), self.config.category.clone()),
555 ("symbol".to_string(), Self::symbol_code(symbol).to_string()),
556 ("openOnly".to_string(), "0".into()),
557 ];
558 let resp: ApiResponse<OpenOrdersResult> = self
559 .signed_request(Method::GET, "/v5/order/realtime", Value::Null, Some(query))
560 .await?;
561 let orders = resp
562 .result
563 .list
564 .into_iter()
565 .map(|item| {
566 let symbol = self.parse_symbol(&item.symbol);
567 Order {
568 id: item.order_id,
569 request: OrderRequest {
570 symbol,
571 side: if item.side == "Buy" {
572 Side::Buy
573 } else {
574 Side::Sell
575 },
576 order_type: if item.trigger_price.is_some() {
577 tesser_core::OrderType::StopMarket
578 } else if item.order_type == "Market" {
579 tesser_core::OrderType::Market
580 } else {
581 tesser_core::OrderType::Limit
582 },
583 quantity: item.qty.parse().unwrap_or(Decimal::ZERO),
584 price: item.price.parse::<Decimal>().ok(),
585 trigger_price: item
586 .trigger_price
587 .as_deref()
588 .and_then(|value| value.parse::<Decimal>().ok()),
589 time_in_force: None,
590 client_order_id: Some(item.order_link_id),
591 take_profit: None,
592 stop_loss: None,
593 display_quantity: None,
594 },
595 status: Self::map_order_status(&item.order_status),
596 filled_quantity: item.cum_exec_qty.parse().unwrap_or(Decimal::ZERO),
597 avg_fill_price: item.avg_price.parse::<Decimal>().ok(),
598 created_at: millis_to_datetime(&item.created_time),
599 updated_at: millis_to_datetime(&item.updated_time),
600 }
601 })
602 .collect();
603 Ok(orders)
604 }
605
606 async fn account_balances(&self) -> BrokerResult<Vec<AccountBalance>> {
607 let query = vec![("accountType".to_string(), "UNIFIED".into())];
608 let resp: ApiResponse<WalletBalanceResult> = self
609 .signed_request(
610 Method::GET,
611 "/v5/account/wallet-balance",
612 Value::Null,
613 Some(query),
614 )
615 .await?;
616 let mut balances = Vec::new();
617 for account in resp.result.list {
618 for coin in account.coin {
619 let total = coin.wallet_balance.parse().unwrap_or(Decimal::ZERO);
620 let available = coin
621 .available_to_withdraw
622 .as_deref()
623 .unwrap_or("0")
624 .parse()
625 .unwrap_or(Decimal::ZERO);
626 balances.push(AccountBalance {
627 exchange: self.exchange,
628 asset: self.parse_asset(&coin.coin),
629 total,
630 available,
631 updated_at: Utc::now(),
632 });
633 }
634 }
635 Ok(balances)
636 }
637
638 async fn positions(&self, symbols: Option<&Vec<Symbol>>) -> BrokerResult<Vec<Position>> {
639 if self.config.category == "linear" && symbols.is_none() {
640 return Err(BrokerError::InvalidRequest(
641 "Bybit linear positions require symbols".into(),
642 ));
643 }
644
645 let mut positions = Vec::new();
646 for symbol in symbols.unwrap_or(&vec![]) {
647 let query = vec![
648 ("category".to_string(), self.config.category.clone()),
649 ("symbol".to_string(), Self::symbol_code(*symbol).to_string()),
650 ];
651 let resp: ApiResponse<PositionListResult> = self
652 .signed_request(Method::GET, "/v5/position/list", Value::Null, Some(query))
653 .await?;
654 for item in resp.result.list {
655 let quantity: Decimal = item.size.parse().map_err(|err| {
656 BrokerError::from_display(err, BrokerErrorKind::Serialization)
657 })?;
658 if quantity.is_zero() {
659 continue;
660 }
661 let entry_price = item.avg_price.parse().ok();
662 positions.push(Position {
663 symbol: self.parse_symbol(&item.symbol),
664 side: match item.side.as_str() {
665 "Buy" => Some(Side::Buy),
666 "Sell" => Some(Side::Sell),
667 _ => None,
668 },
669 quantity,
670 entry_price,
671 unrealized_pnl: item.unrealised_pnl.parse().unwrap_or(Decimal::ZERO),
672 updated_at: millis_to_datetime(&item.updated_time),
673 });
674 }
675 }
676 Ok(positions)
677 }
678
679 async fn list_instruments(&self, category: &str) -> BrokerResult<Vec<Instrument>> {
680 let path = format!("/v5/market/instruments-info?category={}", category);
681 let resp: ApiResponse<InstrumentInfoResult> = self.public_get(&path).await?;
682 self.ensure_success(&resp)?;
683 let mut instruments = Vec::new();
684 for item in resp.result.list {
685 let tick_size = item.price_filter.tick_size.parse().unwrap_or(Decimal::ZERO);
686 let lot_size = item
687 .lot_size_filter
688 .qty_step
689 .as_deref()
690 .and_then(|value| value.parse().ok())
691 .unwrap_or(Decimal::ZERO);
692 let settlement = item
693 .settle_coin
694 .clone()
695 .unwrap_or_else(|| item.quote_coin.clone());
696 instruments.push(Instrument {
697 symbol: self.parse_symbol(&item.symbol),
698 base: self.parse_asset(&item.base_coin),
699 quote: self.parse_asset(&item.quote_coin),
700 kind: map_instrument_kind(item.contract_type.as_deref(), category),
701 settlement_currency: self.parse_asset(&settlement),
702 tick_size,
703 lot_size,
704 });
705 }
706 Ok(instruments)
707 }
708
709 fn as_any(&self) -> &dyn Any {
710 self
711 }
712}
713
714fn map_instrument_kind(contract_type: Option<&str>, category: &str) -> InstrumentKind {
715 match contract_type {
716 Some("InversePerpetual") => InstrumentKind::InversePerpetual,
717 Some("LinearPerpetual") => InstrumentKind::LinearPerpetual,
718 _ => match category {
719 "inverse" => InstrumentKind::InversePerpetual,
720 "spot" => InstrumentKind::Spot,
721 _ => InstrumentKind::LinearPerpetual,
722 },
723 }
724}
725
726fn preview_json(body: &str) -> String {
727 const MAX_CHARS: usize = 2048;
728 if body.chars().count() <= MAX_CHARS {
729 return body.to_string();
730 }
731 let truncated: String = body.chars().take(MAX_CHARS).collect();
732 format!("{truncated}… <{} chars total>", body.chars().count())
733}
734
735pub(crate) fn millis_to_datetime(value: &str) -> chrono::DateTime<Utc> {
736 value
737 .parse::<i64>()
738 .ok()
739 .and_then(chrono::DateTime::<Utc>::from_timestamp_millis)
740 .unwrap_or_else(Utc::now)
741}
742
743#[derive(Deserialize)]
744struct ApiResponse<T> {
745 #[serde(rename = "retCode")]
746 ret_code: i64,
747 #[serde(rename = "retMsg")]
748 ret_msg: String,
749 result: T,
750}
751
752#[derive(Deserialize)]
753struct ServerTimeResult {
754 #[serde(rename = "timeNano")]
755 time_nano: String,
756}
757
758#[derive(Deserialize)]
759struct CreateOrderResult {
760 #[serde(rename = "orderId")]
761 order_id: String,
762}
763
764#[derive(Deserialize)]
765struct InstrumentInfoResult {
766 list: Vec<InstrumentInfoItem>,
767}
768
769#[derive(Deserialize)]
770struct InstrumentInfoItem {
771 symbol: String,
772 #[serde(rename = "baseCoin")]
773 base_coin: String,
774 #[serde(rename = "quoteCoin")]
775 quote_coin: String,
776 #[serde(rename = "settleCoin")]
777 settle_coin: Option<String>,
778 #[serde(rename = "contractType")]
779 contract_type: Option<String>,
780 #[serde(rename = "priceFilter")]
781 price_filter: InstrumentPriceFilter,
782 #[serde(rename = "lotSizeFilter")]
783 lot_size_filter: InstrumentLotFilter,
784}
785
786#[derive(Deserialize)]
787struct InstrumentPriceFilter {
788 #[serde(rename = "tickSize")]
789 tick_size: String,
790}
791
792#[derive(Deserialize)]
793struct InstrumentLotFilter {
794 #[serde(rename = "qtyStep")]
795 qty_step: Option<String>,
796}
797
798#[derive(Deserialize)]
799struct OpenOrdersResult {
800 list: Vec<OrderItem>,
801}
802
803#[derive(Deserialize)]
804struct OrderItem {
805 #[serde(rename = "orderId")]
806 order_id: String,
807 #[serde(rename = "orderLinkId")]
808 order_link_id: String,
809 symbol: String,
810 price: String,
811 qty: String,
812 side: String,
813 #[serde(rename = "orderStatus")]
814 order_status: String,
815 #[serde(rename = "orderType")]
816 order_type: String,
817 #[serde(rename = "triggerPrice")]
818 trigger_price: Option<String>,
819 #[serde(rename = "cumExecQty")]
820 cum_exec_qty: String,
821 #[serde(rename = "avgPrice")]
822 avg_price: String,
823 #[serde(rename = "createdTime")]
824 created_time: String,
825 #[serde(rename = "updatedTime")]
826 updated_time: String,
827}
828
829#[derive(Deserialize)]
830struct WalletBalanceResult {
831 list: Vec<AccountEntry>,
832}
833
834#[derive(Deserialize)]
835struct AccountEntry {
836 coin: Vec<CoinBalance>,
837}
838
839#[derive(Deserialize)]
840struct CoinBalance {
841 coin: String,
842 #[serde(rename = "walletBalance")]
843 wallet_balance: String,
844 #[serde(rename = "availableToWithdraw")]
845 available_to_withdraw: Option<String>,
846}
847
848#[derive(Deserialize)]
849struct PositionListResult {
850 list: Vec<PositionItem>,
851}
852
853#[derive(Deserialize)]
854struct PositionItem {
855 symbol: String,
856 side: String,
857 size: String,
858 #[serde(rename = "avgPrice")]
859 avg_price: String,
860 #[serde(rename = "unrealisedPnl")]
861 unrealised_pnl: String,
862 #[serde(rename = "updatedTime")]
863 updated_time: String,
864}
865
866#[derive(Deserialize)]
867struct ExecutionListResult {
868 #[serde(rename = "nextPageCursor")]
869 next_page_cursor: Option<String>,
870 #[allow(dead_code)]
871 category: String,
872 list: Vec<ExecutionListItem>,
873}
874
875#[derive(Deserialize)]
876struct ExecutionListItem {
877 #[serde(rename = "execId")]
878 exec_id: String,
879 symbol: String,
880 #[serde(rename = "orderId")]
881 order_id: String,
882 side: String,
883 #[serde(rename = "execPrice")]
884 exec_price: String,
885 #[serde(rename = "execQty")]
886 exec_qty: String,
887 #[serde(rename = "execFee")]
888 exec_fee: String,
889 #[serde(rename = "feeCurrency")]
890 fee_currency: Option<String>,
891 #[serde(rename = "execTime")]
892 exec_time: String,
893}
894
895#[derive(Clone, Debug, Deserialize)]
896struct BybitConnectorConfig {
897 #[serde(default = "default_rest_url")]
898 rest_url: String,
899 #[serde(default = "default_ws_url")]
900 ws_url: String,
901 #[serde(default)]
902 exchange: Option<String>,
903 #[serde(default = "default_category")]
904 category: String,
905 #[serde(default)]
906 api_key: String,
907 #[serde(default)]
908 api_secret: String,
909 #[serde(default = "default_recv_window")]
910 recv_window: u64,
911 #[serde(default = "default_settle_coin")]
912 settle_coin: Option<String>,
913 #[serde(default = "default_rate_limit_tier")]
914 rate_limit_tier: RateLimitTier,
915 #[serde(default)]
916 private_rps: Option<u32>,
917 #[serde(default)]
918 public_rps: Option<u32>,
919}
920
921fn default_rest_url() -> String {
922 "https://api.bybit.com".into()
923}
924
925fn default_ws_url() -> String {
926 "wss://stream.bybit.com".into()
927}
928
929fn default_category() -> String {
930 "linear".into()
931}
932
933fn resolve_exchange_id(cfg: &BybitConnectorConfig) -> ExchangeId {
934 let default_name = format!("bybit_{}", cfg.category.trim().to_ascii_lowercase());
935 let name = cfg
936 .exchange
937 .as_deref()
938 .map(str::trim)
939 .filter(|value| !value.is_empty())
940 .map(ToOwned::to_owned)
941 .unwrap_or(default_name);
942 ExchangeId::from(name.as_str())
943}
944
945fn default_recv_window() -> u64 {
946 5_000
947}
948
949fn default_settle_coin() -> Option<String> {
950 Some("USDT".into())
951}
952
953fn default_rate_limit_tier() -> RateLimitTier {
954 RateLimitTier::Standard
955}
956
957#[derive(Clone, Copy, Debug, Deserialize)]
958#[serde(rename_all = "snake_case")]
959enum RateLimitTier {
960 Standard,
961 Pro,
962 Vip,
963}
964
965impl RateLimitTier {
966 fn private_rps(self) -> u32 {
967 match self {
968 RateLimitTier::Standard => 10,
969 RateLimitTier::Pro => 20,
970 RateLimitTier::Vip => 50,
971 }
972 }
973}
974
975#[derive(Default)]
976pub struct BybitFactory;
977
978impl BybitFactory {
979 fn parse_config(&self, value: &Value) -> BrokerResult<BybitConnectorConfig> {
980 serde_json::from_value(value.clone()).map_err(|err| {
981 BrokerError::InvalidRequest(format!("invalid bybit connector config: {err}"))
982 })
983 }
984
985 fn credentials(cfg: &BybitConnectorConfig) -> Option<BybitCredentials> {
986 if cfg.api_key.trim().is_empty() || cfg.api_secret.trim().is_empty() {
987 None
988 } else {
989 Some(BybitCredentials {
990 api_key: cfg.api_key.clone(),
991 api_secret: cfg.api_secret.clone(),
992 })
993 }
994 }
995}
996
997const BYBIT_DEFAULT_DEPTH: usize = 50;
998const BYBIT_PUBLIC_DEFAULT_RPS: u32 = 50;
999
1000pub fn register_factory() {
1001 register_connector_factory(Arc::new(BybitFactory));
1002}
1003
1004fn rate_limits_from_config(cfg: &BybitConnectorConfig) -> (Option<Quota>, Option<Quota>) {
1005 let private_rps = cfg
1006 .private_rps
1007 .unwrap_or_else(|| cfg.rate_limit_tier.private_rps());
1008 let public_rps = cfg.public_rps.unwrap_or(BYBIT_PUBLIC_DEFAULT_RPS);
1009 (quota_from_rps(public_rps), quota_from_rps(private_rps))
1010}
1011
1012fn quota_from_rps(rps: u32) -> Option<Quota> {
1013 NonZeroU32::new(rps).map(Quota::per_second)
1014}
1015
1016#[async_trait]
1017impl ConnectorFactory for BybitFactory {
1018 fn name(&self) -> &str {
1019 "bybit"
1020 }
1021
1022 async fn create_execution_client(
1023 &self,
1024 config: &Value,
1025 ) -> BrokerResult<Arc<dyn ExecutionClient>> {
1026 let cfg = self.parse_config(config)?;
1027 let exchange = resolve_exchange_id(&cfg);
1028 let (public_quota, private_quota) = rate_limits_from_config(&cfg);
1029 let bybit_cfg = BybitConfig {
1030 base_url: cfg.rest_url.clone(),
1031 category: cfg.category.clone(),
1032 recv_window: cfg.recv_window,
1033 ws_url: Some(cfg.ws_url.clone()),
1034 public_quota,
1035 private_quota,
1036 settle_coin: cfg.settle_coin.clone(),
1037 };
1038 Ok(Arc::new(BybitClient::new(
1039 bybit_cfg,
1040 Self::credentials(&cfg),
1041 exchange,
1042 )))
1043 }
1044
1045 async fn create_market_stream(
1046 &self,
1047 config: &Value,
1048 stream_config: ConnectorStreamConfig,
1049 ) -> BrokerResult<Box<dyn ConnectorStream>> {
1050 let cfg = self.parse_config(config)?;
1051 let exchange = resolve_exchange_id(&cfg);
1052 let channel = PublicChannel::from_str(&cfg.category)
1053 .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
1054 let stream = BybitMarketStream::connect_public(
1055 &cfg.ws_url,
1056 channel,
1057 stream_config.connection_status,
1058 exchange,
1059 )
1060 .await?;
1061 Ok(Box::new(BybitConnectorStream::new(
1062 stream,
1063 stream_config
1064 .metadata
1065 .get("orderbook_depth")
1066 .and_then(|v| v.as_u64())
1067 .map(|v| v as usize)
1068 .unwrap_or(BYBIT_DEFAULT_DEPTH),
1069 )))
1070 }
1071}
1072
1073struct BybitConnectorStream {
1074 inner: BybitMarketStream,
1075 depth: usize,
1076}
1077
1078impl BybitConnectorStream {
1079 fn new(inner: BybitMarketStream, depth: usize) -> Self {
1080 Self { inner, depth }
1081 }
1082}
1083
1084#[async_trait]
1085impl ConnectorStream for BybitConnectorStream {
1086 async fn subscribe(
1087 &mut self,
1088 symbols: &[String],
1089 interval: tesser_core::Interval,
1090 ) -> BrokerResult<()> {
1091 for symbol in symbols {
1092 self.inner
1093 .subscribe(BybitSubscription::Trades {
1094 symbol: symbol.clone(),
1095 })
1096 .await?;
1097 self.inner
1098 .subscribe(BybitSubscription::Kline {
1099 symbol: symbol.clone(),
1100 interval,
1101 })
1102 .await?;
1103 self.inner
1104 .subscribe(BybitSubscription::OrderBook {
1105 symbol: symbol.clone(),
1106 depth: self.depth.clamp(1, 200),
1107 })
1108 .await?;
1109 }
1110 Ok(())
1111 }
1112
1113 async fn next_tick(&mut self) -> BrokerResult<Option<tesser_core::Tick>> {
1114 self.inner.next_tick().await
1115 }
1116
1117 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
1118 self.inner.next_candle().await
1119 }
1120
1121 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
1122 self.inner.next_order_book().await
1123 }
1124}
1125
1126#[cfg(test)]
1127mod tests {
1128 use super::*;
1129
1130 #[test]
1131 fn signature_matches_docs_example() {
1132 let creds = BybitCredentials {
1133 api_key: "XXXXXXXXXX".into(),
1134 api_secret: "sec".repeat(10),
1135 };
1136 let payload = format!(
1137 "{}{}{}{}",
1138 1_658_385_579_423i64, creds.api_key, 5_000, r#"{"category": "option"}"#
1139 );
1140 let mut mac = HmacSha256::new_from_slice(creds.api_secret.as_bytes()).expect("init mac");
1141 mac.update(payload.as_bytes());
1142 let signature = hex::encode(mac.finalize().into_bytes());
1143 assert_eq!(
1144 signature.len(),
1145 64,
1146 "signature should be 256-bit hex encoded"
1147 );
1148 }
1149}