1use async_trait::async_trait;
2use binance_sdk::{
3 common::{config::ConfigurationRestApi, errors::ConnectorError, models::RestApiResponse},
4 derivatives_trading_usds_futures::{
5 self as binance_futures,
6 rest_api::{
7 self, NewOrderParams, NewOrderPriceMatchEnum, NewOrderSideEnum, NewOrderTimeInForceEnum,
8 },
9 websocket_streams,
10 },
11};
12use chrono::{DateTime, Utc};
13use rust_decimal::Decimal;
14use serde::Deserialize;
15use serde_json::Value;
16use std::{any::Any, collections::HashMap, num::NonZeroU32, sync::Arc, time::Duration};
17use tesser_broker::{
18 register_connector_factory, BrokerError, BrokerInfo, BrokerResult, ConnectorFactory,
19 ConnectorStream, ConnectorStreamConfig, ExecutionClient, MarketStream, Quota, RateLimiter,
20 RateLimiterError,
21};
22use tesser_core::{
23 AccountBalance, AssetId, Candle, ExchangeId, Fill, Instrument, InstrumentKind, Order,
24 OrderBook, OrderRequest, OrderStatus, OrderType, OrderUpdateRequest, Position, Side, Symbol,
25 TimeInForce,
26};
27use tokio::time::sleep;
28use uuid::Uuid;
29
30pub mod ws;
31
32pub use ws::{
33 extract_order_update, BinanceMarketStream, BinanceSubscription, BinanceUserDataStream,
34 UserDataStreamEventsResponse,
35};
36
37const BINANCE_DEFAULT_WEIGHT_LIMIT: u32 = 1_200;
38const WEIGHT_BACKOFF_RATIO: f32 = 0.9;
39const ORDER_WEIGHT: u32 = 1;
40const CANCEL_WEIGHT: u32 = 1;
41const QUERY_WEIGHT: u32 = 5;
42
43#[derive(Clone)]
44pub struct BinanceCredentials {
45 pub api_key: String,
46 pub api_secret: String,
47}
48
49pub struct BinanceConfig {
50 pub rest_url: String,
51 pub ws_url: String,
52 pub recv_window: u64,
53 pub weight_limit_per_minute: u32,
54}
55
56impl Default for BinanceConfig {
57 fn default() -> Self {
58 Self {
59 rest_url: "https://fapi.binance.com".to_string(),
60 ws_url: "wss://fstream.binance.com/stream".to_string(),
61 recv_window: 5_000,
62 weight_limit_per_minute: BINANCE_DEFAULT_WEIGHT_LIMIT,
63 }
64 }
65}
66
67impl BinanceConfig {
68 pub fn testnet() -> Self {
69 Self {
70 rest_url: "https://testnet.binancefuture.com".to_string(),
71 ws_url: "wss://stream.binancefuture.com/stream".to_string(),
72 ..Self::default()
73 }
74 }
75}
76
77pub struct BinanceClient {
78 rest: Arc<rest_api::RestApi>,
79 info: BrokerInfo,
80 credentials: Option<BinanceCredentials>,
81 config: BinanceConfig,
82 weight_limiter: Option<RateLimiter>,
83 exchange: ExchangeId,
84}
85
86impl BinanceClient {
87 pub fn new(
88 config: BinanceConfig,
89 credentials: Option<BinanceCredentials>,
90 exchange: ExchangeId,
91 ) -> Self {
92 let mut builder = ConfigurationRestApi::builder();
93 builder = builder.base_path(config.rest_url.clone());
94 if let Some(creds) = credentials.as_ref() {
95 builder = builder
96 .api_key(creds.api_key.clone())
97 .api_secret(creds.api_secret.clone());
98 }
99 let rest_cfg = builder
100 .build()
101 .expect("failed to build Binance REST configuration");
102 let rest = binance_futures::DerivativesTradingUsdsFuturesRestApi::from_config(rest_cfg);
103 let supports_testnet = config.rest_url.contains("testnet");
104 let weight_limiter = NonZeroU32::new(config.weight_limit_per_minute)
105 .map(Quota::per_minute)
106 .map(RateLimiter::direct);
107 Self {
108 rest: Arc::new(rest),
109 info: BrokerInfo {
110 name: "binance".into(),
111 markets: vec!["usd_perp".into()],
112 supports_testnet,
113 },
114 credentials,
115 config,
116 weight_limiter,
117 exchange,
118 }
119 }
120
121 pub fn credentials(&self) -> Option<BinanceCredentials> {
122 self.credentials.clone()
123 }
124
125 pub fn ws_url(&self) -> &str {
126 &self.config.ws_url
127 }
128
129 pub fn rest_url(&self) -> &str {
130 &self.config.rest_url
131 }
132
133 pub fn recv_window(&self) -> u64 {
134 self.config.recv_window
135 }
136
137 pub fn rest(&self) -> Arc<rest_api::RestApi> {
138 Arc::clone(&self.rest)
139 }
140
141 pub fn exchange(&self) -> ExchangeId {
142 self.exchange
143 }
144
145 async fn throttle_weight(&self, units: u32) -> BrokerResult<()> {
146 if units == 0 {
147 return Ok(());
148 }
149 if let Some(limiter) = &self.weight_limiter {
150 if let Some(nonzero) = NonZeroU32::new(units) {
151 limiter
152 .until_units_ready(nonzero)
153 .await
154 .map_err(Self::rate_limiter_error)?;
155 }
156 }
157 Ok(())
158 }
159
160 async fn handle_weight_headers(&self, headers: &HashMap<String, String>) {
161 if self.config.weight_limit_per_minute == 0 {
162 return;
163 }
164 if let Some(value) = headers.get("x-mbx-used-weight-1m") {
165 if let Ok(used) = value.parse::<u32>() {
166 self.backoff_if_needed(used).await;
167 }
168 }
169 }
170
171 async fn backoff_if_needed(&self, used: u32) {
172 let limit = self.config.weight_limit_per_minute;
173 if limit == 0 {
174 return;
175 }
176 if used >= limit {
177 sleep(Duration::from_secs(1)).await;
178 } else if (used as f32) / (limit as f32) >= WEIGHT_BACKOFF_RATIO {
179 sleep(Duration::from_millis(200)).await;
180 }
181 }
182
183 async fn parse_response<T>(&self, result: anyhow::Result<RestApiResponse<T>>) -> BrokerResult<T>
184 where
185 T: Send + 'static,
186 {
187 let response = result.map_err(|err| BrokerError::Transport(err.to_string()))?;
188 self.handle_weight_headers(&response.headers).await;
189 response.data().await.map_err(map_connector_error)
190 }
191
192 fn rate_limiter_error(err: RateLimiterError) -> BrokerError {
193 BrokerError::Other(format!("rate limited: {err}"))
194 }
195
196 pub async fn start_user_stream(&self) -> BrokerResult<String> {
197 self.throttle_weight(QUERY_WEIGHT).await?;
198 let response = self
199 .parse_response(self.rest.start_user_data_stream().await)
200 .await?;
201 response
202 .listen_key
203 .ok_or_else(|| BrokerError::Other("missing listenKey".into()))
204 }
205
206 pub async fn keepalive_user_stream(&self) -> BrokerResult<String> {
207 self.throttle_weight(QUERY_WEIGHT).await?;
208 let response = self
209 .parse_response(self.rest.keepalive_user_data_stream().await)
210 .await?;
211 response
212 .listen_key
213 .ok_or_else(|| BrokerError::Other("missing listenKey".into()))
214 }
215}
216
217#[async_trait]
218impl ExecutionClient for BinanceClient {
219 fn info(&self) -> BrokerInfo {
220 self.info.clone()
221 }
222
223 async fn place_order(&self, request: OrderRequest) -> BrokerResult<Order> {
224 self.throttle_weight(ORDER_WEIGHT).await?;
225 let side = map_side(request.side);
226 let order_type = map_order_type(request.order_type);
227 let symbol_code = request.symbol.code().to_string();
228 let mut builder = NewOrderParams::builder(symbol_code, side, order_type);
229 let mut client_request = request.clone();
230 if let Some(price) = request.price {
231 builder = builder.price(Some(price));
232 }
233 builder = builder.quantity(Some(request.quantity));
234 if let Some(trigger) = request.trigger_price {
235 builder = builder.stop_price(Some(trigger));
236 }
237 let tif = match request
238 .time_in_force
239 .or_else(|| default_time_in_force(request.order_type))
240 {
241 Some(TimeInForce::GoodTilCanceled) => Some(NewOrderTimeInForceEnum::Gtc),
242 Some(TimeInForce::ImmediateOrCancel) => Some(NewOrderTimeInForceEnum::Ioc),
243 Some(TimeInForce::FillOrKill) => Some(NewOrderTimeInForceEnum::Fok),
244 None => None,
245 };
246 if let Some(value) = tif {
247 builder = builder.time_in_force(Some(value));
248 }
249 let client_id = client_request
250 .client_order_id
251 .clone()
252 .unwrap_or_else(|| format!("tesser-{}", Uuid::new_v4()));
253 client_request.client_order_id = Some(client_id.clone());
254 builder = builder
255 .new_client_order_id(Some(client_id))
256 .price_match(Some(NewOrderPriceMatchEnum::None))
257 .recv_window(Some(self.config.recv_window as i64));
258
259 let params = builder
260 .build()
261 .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
262 let response = self
263 .parse_response(self.rest.new_order(params).await)
264 .await?;
265 Ok(build_order_from_response(response, client_request))
266 }
267
268 async fn cancel_order(
269 &self,
270 order_id: tesser_core::OrderId,
271 symbol: Symbol,
272 ) -> BrokerResult<()> {
273 self.throttle_weight(CANCEL_WEIGHT).await?;
274 let mut builder = rest_api::CancelOrderParams::builder(symbol.code().to_string())
275 .recv_window(Some(self.config.recv_window as i64));
276 if let Ok(id) = order_id.parse::<i64>() {
277 builder = builder.order_id(Some(id));
278 } else {
279 builder = builder.orig_client_order_id(Some(order_id));
280 }
281 let params = builder
282 .build()
283 .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
284 self.parse_response(self.rest.cancel_order(params).await)
285 .await?;
286 Ok(())
287 }
288
289 async fn amend_order(&self, request: OrderUpdateRequest) -> BrokerResult<Order> {
290 self.throttle_weight(ORDER_WEIGHT).await?;
291 let new_price = request.new_price.ok_or_else(|| {
292 BrokerError::InvalidRequest("amend requires new price for Binance".into())
293 })?;
294 let new_qty = request.new_quantity.ok_or_else(|| {
295 BrokerError::InvalidRequest("amend requires new quantity for Binance".into())
296 })?;
297 let mut builder = rest_api::ModifyOrderParams::builder(
298 request.symbol.code().to_string(),
299 map_modify_side(request.side),
300 new_qty,
301 new_price,
302 )
303 .recv_window(Some(self.config.recv_window as i64));
304 if let Ok(id) = request.order_id.parse::<i64>() {
305 builder = builder.order_id(Some(id));
306 } else {
307 builder = builder.orig_client_order_id(Some(request.order_id.clone()));
308 }
309 let params = builder
310 .build()
311 .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
312 let response = self
313 .parse_response(self.rest.modify_order(params).await)
314 .await?;
315 Ok(build_order_from_modify_response(response, &request))
316 }
317
318 async fn list_open_orders(&self, symbol: Symbol) -> BrokerResult<Vec<Order>> {
319 self.throttle_weight(QUERY_WEIGHT).await?;
320 let params = rest_api::CurrentAllOpenOrdersParams::builder()
321 .symbol(Some(symbol.code().to_string()))
322 .recv_window(Some(self.config.recv_window as i64))
323 .build()
324 .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
325 let raw = self
326 .parse_response(self.rest.current_all_open_orders(params).await)
327 .await?;
328 let mut orders = Vec::new();
329 for entry in raw {
330 if let Some(order) = order_from_open_order(self.exchange, &entry) {
331 orders.push(order);
332 }
333 }
334 Ok(orders)
335 }
336
337 async fn account_balances(&self) -> BrokerResult<Vec<AccountBalance>> {
338 self.throttle_weight(QUERY_WEIGHT).await?;
339 let params = rest_api::FuturesAccountBalanceV3Params::builder()
340 .recv_window(Some(self.config.recv_window as i64))
341 .build()
342 .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
343 let balances = self
344 .parse_response(self.rest.futures_account_balance_v3(params).await)
345 .await?;
346 Ok(balances
347 .into_iter()
348 .filter_map(|entry| balance_from_entry(self.exchange, &entry))
349 .collect())
350 }
351
352 async fn positions(&self, _symbols: Option<&Vec<Symbol>>) -> BrokerResult<Vec<Position>> {
353 self.throttle_weight(QUERY_WEIGHT).await?;
354 let params = rest_api::PositionInformationV2Params::builder()
355 .recv_window(Some(self.config.recv_window as i64))
356 .build()
357 .map_err(|err| BrokerError::InvalidRequest(err.to_string()))?;
358 let positions = self
359 .parse_response(self.rest.position_information_v2(params).await)
360 .await?;
361 Ok(positions
362 .into_iter()
363 .filter_map(|entry| position_from_entry(self.exchange, &entry))
364 .collect())
365 }
366
367 async fn list_instruments(&self, _category: &str) -> BrokerResult<Vec<Instrument>> {
368 self.throttle_weight(QUERY_WEIGHT).await?;
369 let info = self
370 .parse_response(self.rest.exchange_information().await)
371 .await?;
372 let mut instruments = Vec::new();
373 for symbol in info.symbols.unwrap_or_default() {
374 if let Some(instr) = instrument_from_symbol(self.exchange, &symbol) {
375 instruments.push(instr);
376 }
377 }
378 Ok(instruments)
379 }
380
381 fn as_any(&self) -> &dyn Any {
382 self
383 }
384}
385
386fn build_order_from_response(response: rest_api::NewOrderResponse, request: OrderRequest) -> Order {
387 let status = response
388 .status
389 .as_deref()
390 .map(map_order_status)
391 .unwrap_or(OrderStatus::PendingNew);
392 Order {
393 id: response
394 .order_id
395 .map(|id| id.to_string())
396 .unwrap_or_else(|| Utc::now().timestamp_millis().to_string()),
397 request,
398 status,
399 filled_quantity: parse_decimal_opt(response.executed_qty.as_deref())
400 .unwrap_or(Decimal::ZERO),
401 avg_fill_price: parse_decimal_opt(response.avg_price.as_deref()),
402 created_at: timestamp_from_ms(response.update_time),
403 updated_at: timestamp_from_ms(response.update_time),
404 }
405}
406
407fn build_order_from_modify_response(
408 response: rest_api::ModifyOrderResponse,
409 update: &OrderUpdateRequest,
410) -> Order {
411 let symbol = response
412 .symbol
413 .clone()
414 .map(|code| Symbol::from_code(update.symbol.exchange, code))
415 .unwrap_or(update.symbol);
416 let order_type = response
417 .r#type
418 .as_deref()
419 .map(map_order_type_from_str)
420 .unwrap_or(OrderType::Limit);
421 let quantity = parse_decimal_opt(response.orig_qty.as_deref())
422 .or(update.new_quantity)
423 .unwrap_or(Decimal::ZERO);
424 let price = parse_decimal_opt(response.price.as_deref()).or(update.new_price);
425 let request = OrderRequest {
426 symbol,
427 side: response
428 .side
429 .as_deref()
430 .map(map_order_side)
431 .unwrap_or(update.side),
432 order_type,
433 quantity,
434 price,
435 trigger_price: parse_decimal_opt(response.stop_price.as_deref()),
436 time_in_force: response.time_in_force.as_deref().and_then(map_tif_from_str),
437 client_order_id: response.client_order_id.clone(),
438 take_profit: None,
439 stop_loss: None,
440 display_quantity: None,
441 };
442 Order {
443 id: response
444 .order_id
445 .map(|id| id.to_string())
446 .unwrap_or_else(|| update.order_id.clone()),
447 request,
448 status: response
449 .status
450 .as_deref()
451 .map(map_order_status)
452 .unwrap_or(OrderStatus::PendingNew),
453 filled_quantity: parse_decimal_opt(response.executed_qty.as_deref())
454 .unwrap_or(Decimal::ZERO),
455 avg_fill_price: parse_decimal_opt(response.avg_price.as_deref()),
456 created_at: timestamp_from_ms(response.update_time),
457 updated_at: timestamp_from_ms(response.update_time),
458 }
459}
460
461fn order_from_open_order(
462 exchange: ExchangeId,
463 entry: &rest_api::AllOrdersResponseInner,
464) -> Option<Order> {
465 let symbol = entry
466 .symbol
467 .as_deref()
468 .map(|code| Symbol::from_code(exchange, code))?;
469 let request = OrderRequest {
470 symbol,
471 side: entry
472 .side
473 .as_deref()
474 .map(map_order_side)
475 .unwrap_or(Side::Buy),
476 order_type: entry
477 .r#type
478 .as_deref()
479 .map(map_order_type_from_str)
480 .unwrap_or(OrderType::Limit),
481 quantity: parse_decimal_opt(entry.orig_qty.as_deref())?,
482 price: parse_decimal_opt(entry.price.as_deref()),
483 trigger_price: parse_decimal_opt(entry.stop_price.as_deref()),
484 time_in_force: entry.time_in_force.as_deref().and_then(map_tif_from_str),
485 client_order_id: entry.client_order_id.clone(),
486 take_profit: None,
487 stop_loss: None,
488 display_quantity: None,
489 };
490 Some(Order {
491 id: entry
492 .order_id
493 .map(|id| id.to_string())
494 .or_else(|| entry.client_order_id.clone())?,
495 request,
496 status: entry
497 .status
498 .as_deref()
499 .map(map_order_status)
500 .unwrap_or(OrderStatus::PendingNew),
501 filled_quantity: parse_decimal_opt(entry.executed_qty.as_deref()).unwrap_or(Decimal::ZERO),
502 avg_fill_price: parse_decimal_opt(entry.avg_price.as_deref()),
503 created_at: timestamp_from_ms(entry.time),
504 updated_at: timestamp_from_ms(entry.update_time),
505 })
506}
507
508fn balance_from_entry(
509 exchange: ExchangeId,
510 entry: &rest_api::FuturesAccountBalanceV2ResponseInner,
511) -> Option<AccountBalance> {
512 let asset = entry
513 .asset
514 .as_deref()
515 .map(|code| AssetId::from_code(exchange, code))?;
516 let total = parse_decimal_opt(entry.balance.as_deref())?;
517 let available = parse_decimal_opt(entry.available_balance.as_deref()).unwrap_or(total);
518 Some(AccountBalance {
519 exchange,
520 asset,
521 total,
522 available,
523 updated_at: timestamp_from_ms(entry.update_time),
524 })
525}
526
527fn position_from_entry(
528 exchange: ExchangeId,
529 entry: &rest_api::PositionInformationV2ResponseInner,
530) -> Option<Position> {
531 let qty = parse_decimal_opt(entry.position_amt.as_deref())?;
532 if qty.is_zero() {
533 return None;
534 }
535 let side = if qty.is_sign_positive() {
536 Some(Side::Buy)
537 } else {
538 Some(Side::Sell)
539 };
540 let symbol = entry
541 .symbol
542 .as_deref()
543 .map(|code| Symbol::from_code(exchange, code))?;
544 Some(Position {
545 symbol,
546 side,
547 quantity: qty.abs(),
548 entry_price: parse_decimal_opt(entry.entry_price.as_deref()),
549 unrealized_pnl: parse_decimal_opt(entry.un_realized_profit.as_deref())
550 .unwrap_or(Decimal::ZERO),
551 updated_at: timestamp_from_ms(entry.update_time),
552 })
553}
554
555fn instrument_from_symbol(
556 exchange: ExchangeId,
557 symbol: &rest_api::ExchangeInformationResponseSymbolsInner,
558) -> Option<Instrument> {
559 let symbol_name = symbol
560 .symbol
561 .as_deref()
562 .map(|code| Symbol::from_code(exchange, code))?;
563 let base = symbol
564 .base_asset
565 .as_deref()
566 .map(|code| AssetId::from_code(exchange, code))?;
567 let quote = symbol
568 .quote_asset
569 .as_deref()
570 .map(|code| AssetId::from_code(exchange, code))?;
571 let settlement = symbol
572 .margin_asset
573 .as_deref()
574 .map(|code| AssetId::from_code(exchange, code))
575 .unwrap_or(quote);
576 let mut tick_size = Decimal::ONE;
577 let mut lot_size = Decimal::ONE;
578 if let Some(filters) = &symbol.filters {
579 for filter in filters {
580 match filter.filter_type.as_deref() {
581 Some("PRICE_FILTER") => {
582 if let Some(tick) = parse_decimal_opt(filter.tick_size.as_deref()) {
583 tick_size = tick;
584 }
585 }
586 Some("LOT_SIZE") => {
587 if let Some(step) = parse_decimal_opt(filter.step_size.as_deref()) {
588 lot_size = step;
589 }
590 }
591 _ => {}
592 }
593 }
594 }
595
596 Some(Instrument {
597 symbol: symbol_name,
598 base,
599 quote,
600 kind: InstrumentKind::LinearPerpetual,
601 settlement_currency: settlement,
602 tick_size,
603 lot_size,
604 })
605}
606
607fn map_order_status(value: &str) -> OrderStatus {
608 match value {
609 "NEW" => OrderStatus::Accepted,
610 "PARTIALLY_FILLED" => OrderStatus::PartiallyFilled,
611 "FILLED" => OrderStatus::Filled,
612 "CANCELED" | "EXPIRED" => OrderStatus::Canceled,
613 "REJECTED" => OrderStatus::Rejected,
614 _ => OrderStatus::PendingNew,
615 }
616}
617
618fn map_order_side(value: &str) -> Side {
619 match value {
620 "SELL" => Side::Sell,
621 _ => Side::Buy,
622 }
623}
624
625fn map_side(value: Side) -> NewOrderSideEnum {
626 match value {
627 Side::Buy => NewOrderSideEnum::Buy,
628 Side::Sell => NewOrderSideEnum::Sell,
629 }
630}
631
632fn map_modify_side(value: Side) -> rest_api::ModifyOrderSideEnum {
633 match value {
634 Side::Buy => rest_api::ModifyOrderSideEnum::Buy,
635 Side::Sell => rest_api::ModifyOrderSideEnum::Sell,
636 }
637}
638
639fn map_order_type(order_type: OrderType) -> String {
640 match order_type {
641 OrderType::Market => "MARKET".into(),
642 OrderType::Limit => "LIMIT".into(),
643 OrderType::StopMarket => "STOP_MARKET".into(),
644 }
645}
646
647fn map_order_type_from_str(value: &str) -> OrderType {
648 match value {
649 "MARKET" => OrderType::Market,
650 "STOP_MARKET" | "TAKE_PROFIT_MARKET" => OrderType::StopMarket,
651 _ => OrderType::Limit,
652 }
653}
654
655fn map_tif_from_str(value: &str) -> Option<TimeInForce> {
656 match value {
657 "GTC" => Some(TimeInForce::GoodTilCanceled),
658 "IOC" => Some(TimeInForce::ImmediateOrCancel),
659 "FOK" => Some(TimeInForce::FillOrKill),
660 _ => None,
661 }
662}
663
664fn default_time_in_force(order_type: OrderType) -> Option<TimeInForce> {
665 match order_type {
666 OrderType::Limit => Some(TimeInForce::GoodTilCanceled),
667 _ => None,
668 }
669}
670
671pub fn parse_decimal_opt(value: Option<&str>) -> Option<Decimal> {
672 value.and_then(|v| v.parse::<Decimal>().ok())
673}
674
675pub fn timestamp_from_ms(value: Option<i64>) -> DateTime<Utc> {
676 value
677 .and_then(DateTime::<Utc>::from_timestamp_millis)
678 .unwrap_or_else(Utc::now)
679}
680
681pub fn fill_from_update(
682 exchange: ExchangeId,
683 order: &websocket_streams::OrderTradeUpdateO,
684) -> Option<Fill> {
685 let last_qty = parse_decimal_opt(order.l.as_deref())?;
686 if last_qty.is_zero() {
687 return None;
688 }
689 let price = parse_decimal_opt(order.l_uppercase.as_deref())
690 .or_else(|| parse_decimal_opt(order.p.as_deref()))?;
691 let side = order
692 .s_uppercase
693 .as_deref()
694 .map(map_order_side)
695 .unwrap_or(Side::Buy);
696 let symbol = order
697 .s
698 .as_deref()
699 .map(|code| Symbol::from_code(exchange, code))
700 .unwrap_or(Symbol::unspecified());
701 let fee_currency = order
702 .n_uppercase
703 .as_deref()
704 .filter(|code| !code.is_empty())
705 .map(|code| AssetId::from_code(exchange, code));
706 Some(Fill {
707 order_id: order.i.map(|id| id.to_string()).unwrap_or_default(),
708 symbol,
709 side,
710 fill_price: price,
711 fill_quantity: last_qty,
712 fee: parse_decimal_opt(order.n.as_deref()),
713 fee_asset: fee_currency,
714 timestamp: timestamp_from_ms(order.t_uppercase),
715 })
716}
717
718pub fn order_from_update(
719 exchange: ExchangeId,
720 order: &websocket_streams::OrderTradeUpdateO,
721) -> Option<Order> {
722 let symbol = order
723 .s
724 .as_deref()
725 .map(|code| Symbol::from_code(exchange, code))?;
726 let request = OrderRequest {
727 symbol,
728 side: order
729 .s_uppercase
730 .as_deref()
731 .map(map_order_side)
732 .unwrap_or(Side::Buy),
733 order_type: order
734 .o
735 .as_deref()
736 .map(map_order_type_from_str)
737 .unwrap_or(OrderType::Limit),
738 quantity: parse_decimal_opt(order.q.as_deref()).unwrap_or(Decimal::ZERO),
739 price: parse_decimal_opt(order.p.as_deref()),
740 trigger_price: parse_decimal_opt(order.sp.as_deref()),
741 time_in_force: order.f.as_deref().and_then(map_tif_from_str),
742 client_order_id: order.c.clone(),
743 take_profit: None,
744 stop_loss: None,
745 display_quantity: None,
746 };
747 Some(Order {
748 id: order.i.map(|v| v.to_string()).unwrap_or_default(),
749 request,
750 status: order
751 .x_uppercase
752 .as_deref()
753 .map(map_order_status)
754 .unwrap_or(OrderStatus::PendingNew),
755 filled_quantity: parse_decimal_opt(order.z.as_deref()).unwrap_or(Decimal::ZERO),
756 avg_fill_price: parse_decimal_opt(order.ap.as_deref()),
757 created_at: timestamp_from_ms(order.t_uppercase),
758 updated_at: timestamp_from_ms(order.t_uppercase),
759 })
760}
761
762#[derive(Clone, Debug, Deserialize)]
763struct BinanceConnectorConfig {
764 #[serde(default = "default_rest_url")]
765 rest_url: String,
766 #[serde(default = "default_ws_url")]
767 ws_url: String,
768 #[serde(default = "default_exchange_name")]
769 exchange: String,
770 #[serde(default)]
771 api_key: String,
772 #[serde(default)]
773 api_secret: String,
774 #[serde(default = "default_recv_window")]
775 recv_window: u64,
776 #[serde(default = "default_weight_limit_per_minute")]
777 weight_limit_per_minute: u32,
778}
779
780fn default_rest_url() -> String {
781 "https://fapi.binance.com".into()
782}
783
784fn default_ws_url() -> String {
785 "wss://fstream.binance.com/stream".into()
786}
787
788fn default_exchange_name() -> String {
789 "binance_perp".into()
790}
791
792fn default_recv_window() -> u64 {
793 5_000
794}
795
796fn default_weight_limit_per_minute() -> u32 {
797 BINANCE_DEFAULT_WEIGHT_LIMIT
798}
799
800#[derive(Default)]
801pub struct BinanceFactory;
802
803impl BinanceFactory {
804 fn parse_config(&self, value: &Value) -> BrokerResult<BinanceConnectorConfig> {
805 serde_json::from_value(value.clone()).map_err(|err| {
806 BrokerError::InvalidRequest(format!("invalid binance connector config: {err}"))
807 })
808 }
809
810 fn credentials(cfg: &BinanceConnectorConfig) -> Option<BinanceCredentials> {
811 if cfg.api_key.trim().is_empty() || cfg.api_secret.trim().is_empty() {
812 None
813 } else {
814 Some(BinanceCredentials {
815 api_key: cfg.api_key.clone(),
816 api_secret: cfg.api_secret.clone(),
817 })
818 }
819 }
820}
821
822const BINANCE_DEFAULT_DEPTH: usize = 50;
823
824pub fn register_factory() {
825 register_connector_factory(Arc::new(BinanceFactory));
826}
827
828#[async_trait]
829impl ConnectorFactory for BinanceFactory {
830 fn name(&self) -> &str {
831 "binance"
832 }
833
834 async fn create_execution_client(
835 &self,
836 config: &Value,
837 ) -> BrokerResult<Arc<dyn ExecutionClient>> {
838 let cfg = self.parse_config(config)?;
839 let exchange = ExchangeId::from(cfg.exchange.as_str());
840 let binance_cfg = BinanceConfig {
841 rest_url: cfg.rest_url.clone(),
842 ws_url: cfg.ws_url.clone(),
843 recv_window: cfg.recv_window,
844 weight_limit_per_minute: cfg.weight_limit_per_minute,
845 };
846 Ok(Arc::new(BinanceClient::new(
847 binance_cfg,
848 Self::credentials(&cfg),
849 exchange,
850 )))
851 }
852
853 async fn create_market_stream(
854 &self,
855 config: &Value,
856 stream_config: ConnectorStreamConfig,
857 ) -> BrokerResult<Box<dyn ConnectorStream>> {
858 let cfg = self.parse_config(config)?;
859 let exchange = ExchangeId::from(cfg.exchange.as_str());
860 let stream = BinanceMarketStream::connect(
861 &cfg.ws_url,
862 &cfg.rest_url,
863 stream_config.connection_status,
864 exchange,
865 )
866 .await?;
867 Ok(Box::new(BinanceConnectorStream::new(
868 stream,
869 stream_config
870 .metadata
871 .get("orderbook_depth")
872 .and_then(|v| v.as_u64())
873 .map(|v| v as usize)
874 .unwrap_or(BINANCE_DEFAULT_DEPTH),
875 )))
876 }
877}
878
879struct BinanceConnectorStream {
880 inner: BinanceMarketStream,
881 depth: usize,
882}
883
884impl BinanceConnectorStream {
885 fn new(inner: BinanceMarketStream, depth: usize) -> Self {
886 Self { inner, depth }
887 }
888}
889
890#[async_trait]
891impl ConnectorStream for BinanceConnectorStream {
892 async fn subscribe(
893 &mut self,
894 symbols: &[String],
895 interval: tesser_core::Interval,
896 ) -> BrokerResult<()> {
897 for symbol in symbols {
898 self.inner
899 .subscribe(BinanceSubscription::Trades {
900 symbol: symbol.clone(),
901 })
902 .await?;
903 self.inner
904 .subscribe(BinanceSubscription::Kline {
905 symbol: symbol.clone(),
906 interval,
907 })
908 .await?;
909 self.inner
910 .subscribe(BinanceSubscription::OrderBook {
911 symbol: symbol.clone(),
912 depth: self.depth,
913 })
914 .await?;
915 }
916 Ok(())
917 }
918
919 async fn next_tick(&mut self) -> BrokerResult<Option<tesser_core::Tick>> {
920 self.inner.next_tick().await
921 }
922
923 async fn next_candle(&mut self) -> BrokerResult<Option<Candle>> {
924 self.inner.next_candle().await
925 }
926
927 async fn next_order_book(&mut self) -> BrokerResult<Option<OrderBook>> {
928 self.inner.next_order_book().await
929 }
930}
931
932fn map_connector_error(err: ConnectorError) -> BrokerError {
933 match err {
934 ConnectorError::UnauthorizedError(msg) => BrokerError::Authentication(msg),
935 ConnectorError::ForbiddenError(msg)
936 | ConnectorError::TooManyRequestsError(msg)
937 | ConnectorError::RateLimitBanError(msg)
938 | ConnectorError::ServerError { msg, .. } => BrokerError::Exchange(msg),
939 ConnectorError::NetworkError(msg) => BrokerError::Transport(msg),
940 ConnectorError::NotFoundError(msg) | ConnectorError::BadRequestError(msg) => {
941 BrokerError::InvalidRequest(msg)
942 }
943 ConnectorError::ConnectorClientError(msg) => BrokerError::Other(msg),
944 }
945}