1use anyhow::{Context, Result};
2use hmac::{Hmac, Mac};
3use sha2::Sha256;
4use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
5use std::time::Instant;
6
7use crate::error::AppError;
8use crate::model::candle::Candle;
9use crate::model::order::OrderSide;
10
11use super::types::{
12 AccountInfo, BinanceAllOrder, BinanceFuturesAccountInfo, BinanceFuturesAllOrder,
13 BinanceFuturesOrderResponse, BinanceFuturesPositionRisk, BinanceFuturesUserTrade,
14 BinanceListenKeyResponse, BinanceMyTrade, BinanceOrderResponse, ServerTimeResponse,
15};
16
17#[derive(Debug, Clone, Copy)]
18pub struct SymbolOrderRules {
19 pub min_qty: f64,
20 pub max_qty: f64,
21 pub step_size: f64,
22 pub min_notional: Option<f64>,
23}
24
25pub struct BinanceRestClient {
26 http: reqwest::Client,
27 base_url: String,
28 futures_base_url: String,
29 api_key: String,
30 secret_key: String,
31 futures_api_key: String,
32 futures_secret_key: String,
33 recv_window: u64,
34 time_offset_ms: AtomicI64,
35 request_count: AtomicU64,
37 window_start: std::sync::Mutex<Instant>,
38}
39
40impl BinanceRestClient {
41 pub fn new(
42 base_url: &str,
43 futures_base_url: &str,
44 api_key: &str,
45 secret_key: &str,
46 futures_api_key: &str,
47 futures_secret_key: &str,
48 recv_window: u64,
49 ) -> Self {
50 Self {
51 http: reqwest::Client::new(),
52 base_url: base_url.to_string(),
53 futures_base_url: futures_base_url.to_string(),
54 api_key: api_key.to_string(),
55 secret_key: secret_key.to_string(),
56 futures_api_key: futures_api_key.to_string(),
57 futures_secret_key: futures_secret_key.to_string(),
58 recv_window,
59 time_offset_ms: AtomicI64::new(0),
60 request_count: AtomicU64::new(0),
61 window_start: std::sync::Mutex::new(Instant::now()),
62 }
63 }
64
65 fn sign_with_secret(&self, query: &str, secret_key: &str) -> String {
66 let offset = self.time_offset_ms.load(Ordering::Relaxed);
67 let timestamp = chrono::Utc::now().timestamp_millis() + offset;
68 let full_query = if query.is_empty() {
69 format!("recvWindow={}×tamp={}", self.recv_window, timestamp)
70 } else {
71 format!(
72 "{}&recvWindow={}×tamp={}",
73 query, self.recv_window, timestamp
74 )
75 };
76 let mut mac =
77 Hmac::<Sha256>::new_from_slice(secret_key.as_bytes()).expect("HMAC key error");
78 mac.update(full_query.as_bytes());
79 let signature = hex::encode(mac.finalize().into_bytes());
80 format!("{}&signature={}", full_query, signature)
81 }
82
83 fn sign(&self, query: &str) -> String {
84 self.sign_with_secret(query, &self.secret_key)
85 }
86
87 fn sign_futures(&self, query: &str) -> String {
88 self.sign_with_secret(query, &self.futures_secret_key)
89 }
90
91 async fn sync_time_offset(&self) -> Result<()> {
92 let server_ms = self.server_time().await? as i64;
93 let local_ms = chrono::Utc::now().timestamp_millis();
94 let offset = server_ms - local_ms;
95 self.time_offset_ms.store(offset, Ordering::Relaxed);
96 tracing::warn!(
97 offset_ms = offset,
98 "Synchronized Binance server time offset"
99 );
100 Ok(())
101 }
102
103 fn parse_binance_api_error(body: &str) -> Option<super::types::BinanceApiErrorResponse> {
104 serde_json::from_str::<super::types::BinanceApiErrorResponse>(body).ok()
105 }
106
107 fn check_rate_limit(&self) {
108 let mut start = match self.window_start.lock() {
109 Ok(guard) => guard,
110 Err(poisoned) => {
111 tracing::error!("rate-limit mutex poisoned; continuing with recovered state");
112 poisoned.into_inner()
113 }
114 };
115 if start.elapsed().as_secs() >= 60 {
116 *start = Instant::now();
117 self.request_count.store(0, Ordering::Relaxed);
118 }
119 let count = self.request_count.fetch_add(1, Ordering::Relaxed);
120 if count > 960 {
121 tracing::warn!(count, "Approaching rate limit (80% of 1200/min)");
122 }
123 }
124
125 pub async fn ping(&self) -> Result<()> {
126 let url = format!("{}/api/v3/ping", self.base_url);
127 self.http
128 .get(&url)
129 .send()
130 .await
131 .context("ping failed")?
132 .error_for_status()
133 .context("ping returned error status")?;
134 Ok(())
135 }
136
137 pub async fn server_time(&self) -> Result<u64> {
138 let url = format!("{}/api/v3/time", self.base_url);
139 let resp: ServerTimeResponse = self
140 .http
141 .get(&url)
142 .send()
143 .await
144 .context("server_time failed")?
145 .json()
146 .await?;
147 Ok(resp.server_time)
148 }
149
150 pub async fn get_account(&self) -> Result<AccountInfo> {
151 self.check_rate_limit();
152
153 let signed = self.sign("");
154 let url = format!("{}/api/v3/account?{}", self.base_url, signed);
155
156 let resp = self
157 .http
158 .get(&url)
159 .header("X-MBX-APIKEY", &self.api_key)
160 .send()
161 .await
162 .context("get_account HTTP failed")?;
163
164 if !resp.status().is_success() {
165 let body = resp.text().await.unwrap_or_default();
166 if let Ok(err) = serde_json::from_str::<super::types::BinanceApiErrorResponse>(&body) {
167 return Err(AppError::BinanceApi {
168 code: err.code,
169 msg: err.msg,
170 }
171 .into());
172 }
173 return Err(anyhow::anyhow!("Account request failed: {}", body));
174 }
175
176 Ok(resp.json().await?)
177 }
178
179 pub async fn get_futures_account(&self) -> Result<BinanceFuturesAccountInfo> {
180 self.check_rate_limit();
181
182 let signed = self.sign_futures("");
183 let url = format!("{}/fapi/v2/account?{}", self.futures_base_url, signed);
184
185 let resp = self
186 .http
187 .get(&url)
188 .header("X-MBX-APIKEY", &self.futures_api_key)
189 .send()
190 .await
191 .context("get_futures_account HTTP failed")?;
192
193 if !resp.status().is_success() {
194 let body = resp.text().await.unwrap_or_default();
195 if let Ok(err) = serde_json::from_str::<super::types::BinanceApiErrorResponse>(&body) {
196 return Err(AppError::BinanceApi {
197 code: err.code,
198 msg: err.msg,
199 }
200 .into());
201 }
202 return Err(anyhow::anyhow!("Futures account request failed: {}", body));
203 }
204
205 Ok(resp.json().await?)
206 }
207
208 pub async fn get_futures_position_risk(&self) -> Result<Vec<BinanceFuturesPositionRisk>> {
209 self.check_rate_limit();
210 for attempt in 0..=1 {
211 let signed = self.sign_futures("");
212 let url = format!("{}/fapi/v2/positionRisk?{}", self.futures_base_url, signed);
213 let resp = self
214 .http
215 .get(&url)
216 .header("X-MBX-APIKEY", &self.futures_api_key)
217 .send()
218 .await
219 .context("get_futures_position_risk HTTP failed")?;
220 if resp.status().is_success() {
221 return Ok(resp.json().await?);
222 }
223 let body = resp.text().await.unwrap_or_default();
224 if let Some(err) = Self::parse_binance_api_error(&body) {
225 if err.code == -1021 && attempt == 0 {
226 tracing::warn!(
227 "futures positionRisk got -1021; syncing server time and retrying once"
228 );
229 self.sync_time_offset().await?;
230 continue;
231 }
232 return Err(AppError::BinanceApi {
233 code: err.code,
234 msg: err.msg,
235 }
236 .into());
237 }
238 return Err(anyhow::anyhow!(
239 "Futures positionRisk request failed: {}",
240 body
241 ));
242 }
243 Err(anyhow::anyhow!(
244 "Futures positionRisk request failed after retry"
245 ))
246 }
247
248 pub async fn start_futures_user_data_stream(&self) -> Result<String> {
249 self.check_rate_limit();
250 let url = format!("{}/fapi/v1/listenKey", self.futures_base_url);
251 let resp = self
252 .http
253 .post(&url)
254 .header("X-MBX-APIKEY", &self.futures_api_key)
255 .send()
256 .await
257 .context("start_futures_user_data_stream HTTP failed")?;
258 if !resp.status().is_success() {
259 let body = resp.text().await.unwrap_or_default();
260 if let Some(err) = Self::parse_binance_api_error(&body) {
261 return Err(AppError::BinanceApi {
262 code: err.code,
263 msg: err.msg,
264 }
265 .into());
266 }
267 return Err(anyhow::anyhow!(
268 "Futures listenKey create request failed: {}",
269 body
270 ));
271 }
272 let payload: BinanceListenKeyResponse = resp.json().await?;
273 Ok(payload.listen_key)
274 }
275
276 pub async fn keepalive_futures_user_data_stream(&self, listen_key: &str) -> Result<()> {
277 self.check_rate_limit();
278 let url = format!("{}/fapi/v1/listenKey", self.futures_base_url);
279 let resp = self
280 .http
281 .put(&url)
282 .header("X-MBX-APIKEY", &self.futures_api_key)
283 .query(&[("listenKey", listen_key)])
284 .send()
285 .await
286 .context("keepalive_futures_user_data_stream HTTP failed")?;
287 if !resp.status().is_success() {
288 let body = resp.text().await.unwrap_or_default();
289 if let Some(err) = Self::parse_binance_api_error(&body) {
290 return Err(AppError::BinanceApi {
291 code: err.code,
292 msg: err.msg,
293 }
294 .into());
295 }
296 return Err(anyhow::anyhow!(
297 "Futures listenKey keepalive request failed: {}",
298 body
299 ));
300 }
301 Ok(())
302 }
303
304 pub async fn close_futures_user_data_stream(&self, listen_key: &str) -> Result<()> {
305 self.check_rate_limit();
306 let url = format!("{}/fapi/v1/listenKey", self.futures_base_url);
307 let resp = self
308 .http
309 .delete(&url)
310 .header("X-MBX-APIKEY", &self.futures_api_key)
311 .query(&[("listenKey", listen_key)])
312 .send()
313 .await
314 .context("close_futures_user_data_stream HTTP failed")?;
315 if !resp.status().is_success() {
316 let body = resp.text().await.unwrap_or_default();
317 if let Some(err) = Self::parse_binance_api_error(&body) {
318 return Err(AppError::BinanceApi {
319 code: err.code,
320 msg: err.msg,
321 }
322 .into());
323 }
324 return Err(anyhow::anyhow!(
325 "Futures listenKey close request failed: {}",
326 body
327 ));
328 }
329 Ok(())
330 }
331
332 pub async fn start_spot_user_data_stream(&self) -> Result<String> {
333 self.check_rate_limit();
334 let url = format!("{}/api/v3/userDataStream", self.base_url);
335 let resp = self
336 .http
337 .post(&url)
338 .header("X-MBX-APIKEY", &self.api_key)
339 .send()
340 .await
341 .context("start_spot_user_data_stream HTTP failed")?;
342 if !resp.status().is_success() {
343 let body = resp.text().await.unwrap_or_default();
344 if let Some(err) = Self::parse_binance_api_error(&body) {
345 return Err(AppError::BinanceApi {
346 code: err.code,
347 msg: err.msg,
348 }
349 .into());
350 }
351 return Err(anyhow::anyhow!(
352 "Spot listenKey create request failed: {}",
353 body
354 ));
355 }
356 let payload: BinanceListenKeyResponse = resp.json().await?;
357 Ok(payload.listen_key)
358 }
359
360 pub async fn keepalive_spot_user_data_stream(&self, listen_key: &str) -> Result<()> {
361 self.check_rate_limit();
362 let url = format!("{}/api/v3/userDataStream", self.base_url);
363 let resp = self
364 .http
365 .put(&url)
366 .header("X-MBX-APIKEY", &self.api_key)
367 .query(&[("listenKey", listen_key)])
368 .send()
369 .await
370 .context("keepalive_spot_user_data_stream HTTP failed")?;
371 if !resp.status().is_success() {
372 let body = resp.text().await.unwrap_or_default();
373 if let Some(err) = Self::parse_binance_api_error(&body) {
374 return Err(AppError::BinanceApi {
375 code: err.code,
376 msg: err.msg,
377 }
378 .into());
379 }
380 return Err(anyhow::anyhow!(
381 "Spot listenKey keepalive request failed: {}",
382 body
383 ));
384 }
385 Ok(())
386 }
387
388 pub async fn close_spot_user_data_stream(&self, listen_key: &str) -> Result<()> {
389 self.check_rate_limit();
390 let url = format!("{}/api/v3/userDataStream", self.base_url);
391 let resp = self
392 .http
393 .delete(&url)
394 .header("X-MBX-APIKEY", &self.api_key)
395 .query(&[("listenKey", listen_key)])
396 .send()
397 .await
398 .context("close_spot_user_data_stream HTTP failed")?;
399 if !resp.status().is_success() {
400 let body = resp.text().await.unwrap_or_default();
401 if let Some(err) = Self::parse_binance_api_error(&body) {
402 return Err(AppError::BinanceApi {
403 code: err.code,
404 msg: err.msg,
405 }
406 .into());
407 }
408 return Err(anyhow::anyhow!(
409 "Spot listenKey close request failed: {}",
410 body
411 ));
412 }
413 Ok(())
414 }
415
416 pub async fn place_market_order(
417 &self,
418 symbol: &str,
419 side: OrderSide,
420 quantity: f64,
421 client_order_id: &str,
422 ) -> Result<BinanceOrderResponse> {
423 self.check_rate_limit();
424
425 let query = format!(
426 "symbol={}&side={}&type=MARKET&quantity={:.5}&newClientOrderId={}&newOrderRespType=FULL",
427 symbol,
428 side.as_binance_str(),
429 quantity,
430 client_order_id,
431 );
432 let signed = self.sign(&query);
433 let url = format!("{}/api/v3/order?{}", self.base_url, signed);
434
435 tracing::info!(
436 symbol,
437 side = %side,
438 quantity,
439 client_order_id,
440 "Placing market order"
441 );
442
443 let resp = self
444 .http
445 .post(&url)
446 .header("X-MBX-APIKEY", &self.api_key)
447 .send()
448 .await
449 .context("place_market_order HTTP failed")?;
450
451 if !resp.status().is_success() {
452 let body = resp.text().await.unwrap_or_default();
453 if let Ok(err) = serde_json::from_str::<super::types::BinanceApiErrorResponse>(&body) {
454 return Err(AppError::BinanceApi {
455 code: err.code,
456 msg: err.msg,
457 }
458 .into());
459 }
460 return Err(anyhow::anyhow!("Order request failed: {}", body));
461 }
462
463 let order: BinanceOrderResponse = resp.json().await?;
464 tracing::info!(
465 order_id = order.order_id,
466 status = %order.status,
467 client_order_id = %order.client_order_id,
468 "Order response received"
469 );
470 Ok(order)
471 }
472
473 pub async fn place_futures_market_order(
474 &self,
475 symbol: &str,
476 side: OrderSide,
477 quantity: f64,
478 client_order_id: &str,
479 ) -> Result<BinanceOrderResponse> {
480 self.check_rate_limit();
481
482 let query = format!(
483 "symbol={}&side={}&type=MARKET&quantity={:.5}&newClientOrderId={}&newOrderRespType=RESULT",
484 symbol,
485 side.as_binance_str(),
486 quantity,
487 client_order_id,
488 );
489 let signed = self.sign_futures(&query);
490 let url = format!("{}/fapi/v1/order?{}", self.futures_base_url, signed);
491
492 tracing::info!(
493 symbol,
494 side = %side,
495 quantity,
496 client_order_id,
497 "Placing futures market order"
498 );
499
500 let resp = self
501 .http
502 .post(&url)
503 .header("X-MBX-APIKEY", &self.futures_api_key)
504 .send()
505 .await
506 .context("place_futures_market_order HTTP failed")?;
507
508 if !resp.status().is_success() {
509 let body = resp.text().await.unwrap_or_default();
510 if let Ok(err) = serde_json::from_str::<super::types::BinanceApiErrorResponse>(&body) {
511 return Err(AppError::BinanceApi {
512 code: err.code,
513 msg: err.msg,
514 }
515 .into());
516 }
517 return Err(anyhow::anyhow!("Futures order request failed: {}", body));
518 }
519
520 let fut: BinanceFuturesOrderResponse = resp.json().await?;
521 let avg = if fut.avg_price > 0.0 {
522 fut.avg_price
523 } else if fut.price > 0.0 {
524 fut.price
525 } else {
526 0.0
527 };
528 let fills = if fut.executed_qty > 0.0 && avg > 0.0 {
529 vec![super::types::BinanceFill {
530 price: avg,
531 qty: fut.executed_qty,
532 commission: 0.0,
533 commission_asset: "USDT".to_string(),
534 }]
535 } else {
536 Vec::new()
537 };
538
539 Ok(BinanceOrderResponse {
540 symbol: fut.symbol,
541 order_id: fut.order_id,
542 client_order_id: fut.client_order_id,
543 price: if fut.price > 0.0 { fut.price } else { avg },
544 orig_qty: fut.orig_qty,
545 executed_qty: fut.executed_qty,
546 status: fut.status,
547 r#type: fut.r#type,
548 side: fut.side,
549 fills,
550 })
551 }
552
553 pub async fn place_futures_stop_market_order(
554 &self,
555 symbol: &str,
556 side: OrderSide,
557 quantity: f64,
558 stop_price: f64,
559 client_order_id: &str,
560 ) -> Result<BinanceOrderResponse> {
561 self.check_rate_limit();
562
563 let query = format!(
564 "symbol={}&side={}&type=STOP_MARKET&quantity={:.5}&stopPrice={:.5}&reduceOnly=true&newClientOrderId={}&newOrderRespType=RESULT",
565 symbol,
566 side.as_binance_str(),
567 quantity,
568 stop_price,
569 client_order_id,
570 );
571 let signed = self.sign_futures(&query);
572 let url = format!("{}/fapi/v1/order?{}", self.futures_base_url, signed);
573
574 tracing::info!(
575 symbol,
576 side = %side,
577 quantity,
578 stop_price,
579 client_order_id,
580 "Placing futures stop-market order"
581 );
582
583 let resp = self
584 .http
585 .post(&url)
586 .header("X-MBX-APIKEY", &self.futures_api_key)
587 .send()
588 .await
589 .context("place_futures_stop_market_order HTTP failed")?;
590
591 if !resp.status().is_success() {
592 let body = resp.text().await.unwrap_or_default();
593 if let Ok(err) = serde_json::from_str::<super::types::BinanceApiErrorResponse>(&body) {
594 return Err(AppError::BinanceApi {
595 code: err.code,
596 msg: err.msg,
597 }
598 .into());
599 }
600 return Err(anyhow::anyhow!(
601 "Futures stop order request failed: {}",
602 body
603 ));
604 }
605
606 let fut: BinanceFuturesOrderResponse = resp.json().await?;
607 let avg = if fut.avg_price > 0.0 {
608 fut.avg_price
609 } else if fut.price > 0.0 {
610 fut.price
611 } else {
612 0.0
613 };
614 let fills = if fut.executed_qty > 0.0 && avg > 0.0 {
615 vec![super::types::BinanceFill {
616 price: avg,
617 qty: fut.executed_qty,
618 commission: 0.0,
619 commission_asset: "USDT".to_string(),
620 }]
621 } else {
622 Vec::new()
623 };
624
625 Ok(BinanceOrderResponse {
626 symbol: fut.symbol,
627 order_id: fut.order_id,
628 client_order_id: fut.client_order_id,
629 price: if fut.price > 0.0 { fut.price } else { avg },
630 orig_qty: fut.orig_qty,
631 executed_qty: fut.executed_qty,
632 status: fut.status,
633 r#type: fut.r#type,
634 side: fut.side,
635 fills,
636 })
637 }
638
639 pub async fn get_spot_symbol_order_rules(&self, symbol: &str) -> Result<SymbolOrderRules> {
640 let url = format!("{}/api/v3/exchangeInfo?symbol={}", self.base_url, symbol);
641 let payload: serde_json::Value = self
642 .http
643 .get(&url)
644 .send()
645 .await
646 .context("get_spot_symbol_order_rules HTTP failed")?
647 .error_for_status()
648 .context("get_spot_symbol_order_rules returned error status")?
649 .json()
650 .await
651 .context("get_spot_symbol_order_rules JSON parse failed")?;
652 parse_symbol_order_rules_from_exchange_info(&payload, symbol, true)
653 }
654
655 pub async fn get_futures_symbol_order_rules(&self, symbol: &str) -> Result<SymbolOrderRules> {
656 let url = format!(
657 "{}/fapi/v1/exchangeInfo?symbol={}",
658 self.futures_base_url, symbol
659 );
660 let payload: serde_json::Value = self
661 .http
662 .get(&url)
663 .send()
664 .await
665 .context("get_futures_symbol_order_rules HTTP failed")?
666 .error_for_status()
667 .context("get_futures_symbol_order_rules returned error status")?
668 .json()
669 .await
670 .context("get_futures_symbol_order_rules JSON parse failed")?;
671 parse_symbol_order_rules_from_exchange_info(&payload, symbol, false)
672 }
673
674 pub async fn get_klines(
677 &self,
678 symbol: &str,
679 interval: &str,
680 limit: usize,
681 ) -> Result<Vec<Candle>> {
682 self.get_klines_for_market(symbol, interval, limit, false)
683 .await
684 }
685
686 pub async fn get_klines_for_market(
687 &self,
688 symbol: &str,
689 interval: &str,
690 limit: usize,
691 is_futures: bool,
692 ) -> Result<Vec<Candle>> {
693 self.check_rate_limit();
694
695 let url = if is_futures {
696 format!(
697 "{}/fapi/v1/klines?symbol={}&interval={}&limit={}",
698 self.futures_base_url, symbol, interval, limit,
699 )
700 } else {
701 format!(
702 "{}/api/v3/klines?symbol={}&interval={}&limit={}",
703 self.base_url, symbol, interval, limit,
704 )
705 };
706
707 let resp: Vec<Vec<serde_json::Value>> = self
708 .http
709 .get(&url)
710 .send()
711 .await
712 .context("get_klines HTTP failed")?
713 .error_for_status()
714 .context("get_klines returned error status")?
715 .json()
716 .await
717 .context("get_klines JSON parse failed")?;
718
719 let candles: Vec<Candle> = resp
720 .iter()
721 .filter_map(|kline| {
722 let open_time = kline.get(0)?.as_u64()?;
723 let open = kline.get(1)?.as_str()?.parse::<f64>().ok()?;
724 let high = kline.get(2)?.as_str()?.parse::<f64>().ok()?;
725 let low = kline.get(3)?.as_str()?.parse::<f64>().ok()?;
726 let close = kline.get(4)?.as_str()?.parse::<f64>().ok()?;
727 let close_time = kline
729 .get(6)?
730 .as_u64()
731 .map(|v| v.saturating_add(1))
732 .unwrap_or(open_time.saturating_add(60_000));
733 Some(Candle {
734 open,
735 high,
736 low,
737 close,
738 open_time,
739 close_time,
740 })
741 })
742 .collect();
743
744 Ok(candles)
745 }
746
747 pub async fn cancel_order(
748 &self,
749 symbol: &str,
750 client_order_id: &str,
751 ) -> Result<BinanceOrderResponse> {
752 self.check_rate_limit();
753
754 let query = format!("symbol={}&origClientOrderId={}", symbol, client_order_id);
755 let signed = self.sign(&query);
756 let url = format!("{}/api/v3/order?{}", self.base_url, signed);
757
758 tracing::info!(symbol, client_order_id, "Cancelling order");
759
760 let resp = self
761 .http
762 .delete(&url)
763 .header("X-MBX-APIKEY", &self.api_key)
764 .send()
765 .await
766 .context("cancel_order HTTP failed")?;
767
768 if !resp.status().is_success() {
769 let body = resp.text().await.unwrap_or_default();
770 if let Ok(err) = serde_json::from_str::<super::types::BinanceApiErrorResponse>(&body) {
771 return Err(AppError::BinanceApi {
772 code: err.code,
773 msg: err.msg,
774 }
775 .into());
776 }
777 return Err(anyhow::anyhow!("Cancel request failed: {}", body));
778 }
779
780 Ok(resp.json().await?)
781 }
782
783 async fn get_all_orders_page(
785 &self,
786 symbol: &str,
787 limit: usize,
788 from_order_id: Option<u64>,
789 ) -> Result<Vec<BinanceAllOrder>> {
790 self.check_rate_limit();
791
792 let limit = limit.clamp(1, 1000);
793 let query = match from_order_id {
794 Some(order_id) => format!("symbol={}&limit={}&orderId={}", symbol, limit, order_id),
795 None => format!("symbol={}&limit={}", symbol, limit),
796 };
797 for attempt in 0..=1 {
798 let signed = self.sign(&query);
799 let url = format!("{}/api/v3/allOrders?{}", self.base_url, signed);
800
801 let resp = self
802 .http
803 .get(&url)
804 .header("X-MBX-APIKEY", &self.api_key)
805 .send()
806 .await
807 .context("get_all_orders HTTP failed")?;
808
809 if resp.status().is_success() {
810 return Ok(resp.json().await?);
811 }
812
813 let body = resp.text().await.unwrap_or_default();
814 if let Some(err) = Self::parse_binance_api_error(&body) {
815 if err.code == -1021 && attempt == 0 {
816 tracing::warn!("allOrders got -1021; syncing server time and retrying once");
817 self.sync_time_offset().await?;
818 continue;
819 }
820 return Err(AppError::BinanceApi {
821 code: err.code,
822 msg: err.msg,
823 }
824 .into());
825 }
826 return Err(anyhow::anyhow!("All orders request failed: {}", body));
827 }
828
829 Err(anyhow::anyhow!("All orders request failed after retry"))
830 }
831
832 pub async fn get_all_orders(&self, symbol: &str, limit: usize) -> Result<Vec<BinanceAllOrder>> {
835 self.get_all_orders_page(symbol, limit, None).await
836 }
837
838 async fn get_futures_all_orders_page(
839 &self,
840 symbol: &str,
841 limit: usize,
842 from_order_id: Option<u64>,
843 ) -> Result<Vec<BinanceAllOrder>> {
844 self.check_rate_limit();
845 let limit = limit.clamp(1, 1000);
846 let query = match from_order_id {
847 Some(order_id) => format!("symbol={}&limit={}&orderId={}", symbol, limit, order_id),
848 None => format!("symbol={}&limit={}", symbol, limit),
849 };
850 let signed = self.sign_futures(&query);
851 let url = format!("{}/fapi/v1/allOrders?{}", self.futures_base_url, signed);
852 let resp = self
853 .http
854 .get(&url)
855 .header("X-MBX-APIKEY", &self.futures_api_key)
856 .send()
857 .await
858 .context("get_futures_all_orders HTTP failed")?;
859 if !resp.status().is_success() {
860 let body = resp.text().await.unwrap_or_default();
861 if let Some(err) = Self::parse_binance_api_error(&body) {
862 return Err(AppError::BinanceApi {
863 code: err.code,
864 msg: err.msg,
865 }
866 .into());
867 }
868 return Err(anyhow::anyhow!(
869 "Futures allOrders request failed: {}",
870 body
871 ));
872 }
873 let rows: Vec<BinanceFuturesAllOrder> = resp.json().await?;
874 Ok(rows
875 .into_iter()
876 .map(|o| {
877 let cumm_quote = if o.cum_quote > 0.0 {
878 o.cum_quote
879 } else {
880 o.avg_price * o.executed_qty
881 };
882 BinanceAllOrder {
883 symbol: o.symbol,
884 order_id: o.order_id,
885 client_order_id: o.client_order_id,
886 price: o.price,
887 orig_qty: o.orig_qty,
888 executed_qty: o.executed_qty,
889 cummulative_quote_qty: cumm_quote,
890 status: o.status,
891 r#type: o.r#type,
892 side: o.side,
893 time: o.time,
894 update_time: o.update_time,
895 }
896 })
897 .collect())
898 }
899
900 pub async fn get_futures_all_orders(
901 &self,
902 symbol: &str,
903 limit: usize,
904 ) -> Result<Vec<BinanceAllOrder>> {
905 self.get_futures_all_orders_page(symbol, limit, None).await
906 }
907
908 async fn get_my_trades_page(
909 &self,
910 symbol: &str,
911 limit: usize,
912 from_id: Option<u64>,
913 ) -> Result<Vec<BinanceMyTrade>> {
914 self.check_rate_limit();
915
916 let limit = limit.clamp(1, 1000);
917 let query = match from_id {
918 Some(v) => format!("symbol={}&limit={}&fromId={}", symbol, limit, v),
919 None => format!("symbol={}&limit={}", symbol, limit),
920 };
921 for attempt in 0..=1 {
922 let signed = self.sign(&query);
923 let url = format!("{}/api/v3/myTrades?{}", self.base_url, signed);
924
925 let resp = self
926 .http
927 .get(&url)
928 .header("X-MBX-APIKEY", &self.api_key)
929 .send()
930 .await
931 .context("get_my_trades HTTP failed")?;
932
933 if resp.status().is_success() {
934 return Ok(resp.json().await?);
935 }
936
937 let body = resp.text().await.unwrap_or_default();
938 if let Some(err) = Self::parse_binance_api_error(&body) {
939 if err.code == -1021 && attempt == 0 {
940 tracing::warn!("myTrades got -1021; syncing server time and retrying once");
941 self.sync_time_offset().await?;
942 continue;
943 }
944 return Err(AppError::BinanceApi {
945 code: err.code,
946 msg: err.msg,
947 }
948 .into());
949 }
950 return Err(anyhow::anyhow!("My trades request failed: {}", body));
951 }
952
953 Err(anyhow::anyhow!("My trades request failed after retry"))
954 }
955
956 pub async fn get_my_trades(&self, symbol: &str, limit: usize) -> Result<Vec<BinanceMyTrade>> {
958 self.get_my_trades_page(symbol, limit, None).await
959 }
960
961 pub async fn get_my_trades_history(
963 &self,
964 symbol: &str,
965 max_total: usize,
966 ) -> Result<Vec<BinanceMyTrade>> {
967 let page_size = 1000usize;
968 let target = max_total.max(1);
969 let mut out = Vec::new();
970 let mut cursor: u64 = 0;
971
972 loop {
973 let page = self
974 .get_my_trades_page(
975 symbol,
976 page_size.min(target.saturating_sub(out.len())),
977 Some(cursor),
978 )
979 .await?;
980 if page.is_empty() {
981 break;
982 }
983 let fetched = page.len();
984 let mut max_trade_id = cursor;
985 for t in page {
986 max_trade_id = max_trade_id.max(t.id);
987 out.push(t);
988 if out.len() >= target {
989 break;
990 }
991 }
992 if out.len() >= target || fetched < page_size {
993 break;
994 }
995 let next = max_trade_id.saturating_add(1);
996 if next <= cursor {
997 break;
998 }
999 cursor = next;
1000 }
1001
1002 Ok(out)
1003 }
1004
1005 pub async fn get_my_trades_since(
1007 &self,
1008 symbol: &str,
1009 from_id: u64,
1010 max_pages: usize,
1011 ) -> Result<Vec<BinanceMyTrade>> {
1012 let page_size = 1000usize;
1013 let mut out = Vec::new();
1014 let mut cursor = from_id;
1015 let mut pages = 0usize;
1016
1017 while pages < max_pages.max(1) {
1018 let page = self
1019 .get_my_trades_page(symbol, page_size, Some(cursor))
1020 .await?;
1021 if page.is_empty() {
1022 break;
1023 }
1024 pages += 1;
1025 let fetched = page.len();
1026 let mut max_trade_id = cursor;
1027 for t in page {
1028 max_trade_id = max_trade_id.max(t.id);
1029 out.push(t);
1030 }
1031 if fetched < page_size {
1032 break;
1033 }
1034 let next = max_trade_id.saturating_add(1);
1035 if next <= cursor {
1036 break;
1037 }
1038 cursor = next;
1039 }
1040
1041 Ok(out)
1042 }
1043
1044 async fn get_futures_my_trades_page(
1045 &self,
1046 symbol: &str,
1047 limit: usize,
1048 from_id: Option<u64>,
1049 ) -> Result<Vec<BinanceMyTrade>> {
1050 self.check_rate_limit();
1051 let limit = limit.clamp(1, 1000);
1052 let query = match from_id {
1053 Some(v) => format!("symbol={}&limit={}&fromId={}", symbol, limit, v),
1054 None => format!("symbol={}&limit={}", symbol, limit),
1055 };
1056 let signed = self.sign_futures(&query);
1057 let url = format!("{}/fapi/v1/userTrades?{}", self.futures_base_url, signed);
1058 let resp = self
1059 .http
1060 .get(&url)
1061 .header("X-MBX-APIKEY", &self.futures_api_key)
1062 .send()
1063 .await
1064 .context("get_futures_my_trades HTTP failed")?;
1065 if !resp.status().is_success() {
1066 let body = resp.text().await.unwrap_or_default();
1067 if let Some(err) = Self::parse_binance_api_error(&body) {
1068 return Err(AppError::BinanceApi {
1069 code: err.code,
1070 msg: err.msg,
1071 }
1072 .into());
1073 }
1074 return Err(anyhow::anyhow!("Futures myTrades request failed: {}", body));
1075 }
1076 let rows: Vec<BinanceFuturesUserTrade> = resp.json().await?;
1077 Ok(rows
1078 .into_iter()
1079 .map(|t| BinanceMyTrade {
1080 symbol: t.symbol,
1081 id: t.id,
1082 order_id: t.order_id,
1083 price: t.price,
1084 qty: t.qty,
1085 commission: t.commission,
1086 commission_asset: t.commission_asset,
1087 time: t.time,
1088 is_buyer: t.buyer,
1089 is_maker: t.maker,
1090 realized_pnl: t.realized_pnl,
1091 })
1092 .collect())
1093 }
1094
1095 pub async fn get_futures_my_trades_history(
1096 &self,
1097 symbol: &str,
1098 max_total: usize,
1099 ) -> Result<Vec<BinanceMyTrade>> {
1100 let page_size = 1000usize;
1101 let target = max_total.max(1);
1102 let mut out = Vec::new();
1103 let mut cursor: u64 = 0;
1104 loop {
1105 let page = self
1106 .get_futures_my_trades_page(
1107 symbol,
1108 page_size.min(target.saturating_sub(out.len())),
1109 Some(cursor),
1110 )
1111 .await?;
1112 if page.is_empty() {
1113 break;
1114 }
1115 let fetched = page.len();
1116 let mut max_trade_id = cursor;
1117 for t in page {
1118 max_trade_id = max_trade_id.max(t.id);
1119 out.push(t);
1120 if out.len() >= target {
1121 break;
1122 }
1123 }
1124 if out.len() >= target || fetched < page_size {
1125 break;
1126 }
1127 let next = max_trade_id.saturating_add(1);
1128 if next <= cursor {
1129 break;
1130 }
1131 cursor = next;
1132 }
1133 Ok(out)
1134 }
1135}
1136
1137fn parse_symbol_order_rules_from_exchange_info(
1138 payload: &serde_json::Value,
1139 symbol: &str,
1140 prefer_market_lot_size: bool,
1141) -> Result<SymbolOrderRules> {
1142 let symbols = payload
1143 .get("symbols")
1144 .and_then(|v| v.as_array())
1145 .context("exchangeInfo missing symbols")?;
1146 let symbol_row = symbols
1147 .iter()
1148 .find(|row| row.get("symbol").and_then(|v| v.as_str()) == Some(symbol))
1149 .with_context(|| format!("exchangeInfo symbol not found: {}", symbol))?;
1150 let filters = symbol_row
1151 .get("filters")
1152 .and_then(|v| v.as_array())
1153 .context("exchangeInfo symbol missing filters")?;
1154
1155 let primary_type = if prefer_market_lot_size {
1156 "MARKET_LOT_SIZE"
1157 } else {
1158 "LOT_SIZE"
1159 };
1160 let fallback_type = if prefer_market_lot_size {
1161 "LOT_SIZE"
1162 } else {
1163 "MARKET_LOT_SIZE"
1164 };
1165 let parsed = find_filter(filters, primary_type)
1166 .and_then(parse_lot_filter_values)
1167 .or_else(|| find_filter(filters, fallback_type).and_then(parse_lot_filter_values))
1168 .context("exchangeInfo missing valid LOT_SIZE/MARKET_LOT_SIZE")?;
1169 let (min_qty, max_qty, step_size) = parsed;
1170
1171 let min_notional = find_filter(filters, "MIN_NOTIONAL")
1172 .and_then(|f| f.get("notional").or_else(|| f.get("minNotional")))
1173 .and_then(|v| v.as_str())
1174 .and_then(|s| s.parse::<f64>().ok());
1175
1176 Ok(SymbolOrderRules {
1177 min_qty,
1178 max_qty,
1179 step_size,
1180 min_notional,
1181 })
1182}
1183
1184fn find_filter<'a>(
1185 filters: &'a [serde_json::Value],
1186 filter_type: &str,
1187) -> Option<&'a serde_json::Value> {
1188 filters
1189 .iter()
1190 .find(|f| f.get("filterType").and_then(|v| v.as_str()) == Some(filter_type))
1191}
1192
1193fn parse_lot_filter_values(filter: &serde_json::Value) -> Option<(f64, f64, f64)> {
1194 let min_qty = json_str_to_f64(filter, "minQty").ok()?;
1195 let max_qty = json_str_to_f64(filter, "maxQty").ok()?;
1196 let step_size = json_str_to_f64(filter, "stepSize").ok()?;
1197 if step_size <= 0.0 {
1198 return None;
1199 }
1200 Some((min_qty, max_qty, step_size))
1201}
1202
1203fn json_str_to_f64(row: &serde_json::Value, key: &str) -> Result<f64> {
1204 let s = row
1205 .get(key)
1206 .and_then(|v| v.as_str())
1207 .with_context(|| format!("missing field {}", key))?;
1208 s.parse::<f64>()
1209 .with_context(|| format!("invalid {} value {}", key, s))
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214 use super::*;
1215 use serde_json::json;
1216
1217 #[test]
1218 fn hmac_signing_produces_hex_signature() {
1219 let client = BinanceRestClient::new(
1220 "https://testnet.binance.vision",
1221 "https://testnet.binancefuture.com",
1222 "test_key",
1223 "test_secret",
1224 "test_fut_key",
1225 "test_fut_secret",
1226 5000,
1227 );
1228 let signed = client.sign("symbol=BTCUSDT&side=BUY");
1229 assert!(signed.contains("symbol=BTCUSDT&side=BUY"));
1231 assert!(signed.contains("recvWindow=5000"));
1232 assert!(signed.contains("timestamp="));
1233 assert!(signed.contains("&signature="));
1234
1235 let sig = signed.split("&signature=").nth(1).unwrap();
1237 assert_eq!(sig.len(), 64);
1238 assert!(sig.chars().all(|c| c.is_ascii_hexdigit()));
1239 }
1240
1241 #[test]
1242 fn hmac_known_vector() {
1243 let secret = "NhqPtmdSJYdKjVHjA7PZj4Mge3R5YNiP1e3UZjInClVN65XAbvqqM6A7H5fATj0j";
1245 let query = "symbol=LTCBTC&side=BUY&type=LIMIT&timeInForce=GTC&quantity=1&price=0.1&recvWindow=5000×tamp=1499827319559";
1246
1247 let mut mac = Hmac::<Sha256>::new_from_slice(secret.as_bytes()).unwrap();
1248 mac.update(query.as_bytes());
1249 let signature = hex::encode(mac.finalize().into_bytes());
1250
1251 assert_eq!(
1252 signature,
1253 "c8db56825ae71d6d79447849e617115f4a920fa2acdcab2b053c4b2838bd6b71"
1254 );
1255 }
1256
1257 #[test]
1258 fn check_rate_limit_does_not_panic_on_poisoned_mutex() {
1259 let client = BinanceRestClient::new(
1260 "https://testnet.binance.vision",
1261 "https://testnet.binancefuture.com",
1262 "test_key",
1263 "test_secret",
1264 "test_fut_key",
1265 "test_fut_secret",
1266 5000,
1267 );
1268
1269 let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1270 let _guard = client.window_start.lock().unwrap();
1271 panic!("poison window_start mutex");
1272 }));
1273
1274 let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1275 client.check_rate_limit();
1276 }));
1277 assert!(
1278 result.is_ok(),
1279 "check_rate_limit should recover from poison"
1280 );
1281 }
1282
1283 #[test]
1284 fn parse_symbol_rules_prefers_market_lot_size_for_spot() {
1285 let payload = json!({
1286 "symbols": [{
1287 "symbol": "BTCUSDT",
1288 "filters": [
1289 {"filterType":"LOT_SIZE","minQty":"0.00100000","maxQty":"100.00000000","stepSize":"0.00100000"},
1290 {"filterType":"MARKET_LOT_SIZE","minQty":"0.00001000","maxQty":"50.00000000","stepSize":"0.00001000"},
1291 {"filterType":"MIN_NOTIONAL","minNotional":"5.00000000"}
1292 ]
1293 }]
1294 });
1295 let rules = parse_symbol_order_rules_from_exchange_info(&payload, "BTCUSDT", true).unwrap();
1296 assert!((rules.step_size - 0.00001).abs() < 1e-12);
1297 assert!((rules.min_qty - 0.00001).abs() < 1e-12);
1298 assert_eq!(rules.min_notional, Some(5.0));
1299 }
1300
1301 #[test]
1302 fn parse_symbol_rules_uses_lot_size_for_futures() {
1303 let payload = json!({
1304 "symbols": [{
1305 "symbol": "ETHUSDT",
1306 "filters": [
1307 {"filterType":"LOT_SIZE","minQty":"0.001","maxQty":"10000","stepSize":"0.001"},
1308 {"filterType":"MARKET_LOT_SIZE","minQty":"0.01","maxQty":"1000","stepSize":"0.01"}
1309 ]
1310 }]
1311 });
1312 let rules =
1313 parse_symbol_order_rules_from_exchange_info(&payload, "ETHUSDT", false).unwrap();
1314 assert!((rules.step_size - 0.001).abs() < 1e-12);
1315 assert!((rules.min_qty - 0.001).abs() < 1e-12);
1316 }
1317
1318 #[test]
1319 fn parse_symbol_rules_fallback_when_market_lot_size_is_invalid() {
1320 let payload = json!({
1321 "symbols": [{
1322 "symbol": "BTCUSDT",
1323 "filters": [
1324 {"filterType":"LOT_SIZE","minQty":"0.00001000","maxQty":"50.00000000","stepSize":"0.00001000"},
1325 {"filterType":"MARKET_LOT_SIZE","minQty":"0.00001000","maxQty":"50.00000000","stepSize":"0.00000000"}
1326 ]
1327 }]
1328 });
1329 let rules = parse_symbol_order_rules_from_exchange_info(&payload, "BTCUSDT", true).unwrap();
1330 assert!((rules.step_size - 0.00001).abs() < 1e-12);
1331 }
1332}