use std::{collections::HashMap, fmt::Debug, num::NonZeroU32, sync::Arc};
use chrono::{DateTime, Utc};
use dashmap::DashMap;
use nautilus_core::{
consts::NAUTILUS_USER_AGENT, datetime::SECONDS_IN_DAY, hex, nanos::UnixNanos, time::AtomicTime,
};
use nautilus_model::{
data::{Bar, BarType, TradeTick},
enums::{AggregationSource, BarAggregation, OrderSide, OrderType, TimeInForce},
events::AccountState,
identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
instruments::{Instrument, any::InstrumentAny},
reports::{FillReport, OrderStatusReport},
types::{Price, Quantity},
};
use nautilus_network::{
http::{HttpClient, HttpResponse, Method},
ratelimiter::quota::Quota,
};
use serde::Serialize;
use ustr::Ustr;
use super::{
error::{BinanceSpotHttpError, BinanceSpotHttpResult},
models::{
AvgPrice, BatchCancelResult, BatchOrderResult, BinanceAccountInfo, BinanceAccountTrade,
BinanceCancelOrderResponse, BinanceDepth, BinanceKlines, BinanceNewOrderResponse,
BinanceOrderResponse, BinanceTrades, BookTicker, ListenKeyResponse, Ticker24hr,
TickerPrice, TradeFee,
},
parse,
query::{
AccountInfoParams, AccountTradesParams, AllOrdersParams, AvgPriceParams, BatchCancelItem,
BatchOrderItem, CancelOpenOrdersParams, CancelOrderParams, CancelReplaceOrderParams,
DepthParams, KlinesParams, ListenKeyParams, NewOrderParams, OpenOrdersParams,
QueryOrderParams, TickerParams, TradeFeeParams, TradesParams,
},
};
use crate::{
common::{
consts::{
BINANCE_API_KEY_HEADER, BINANCE_NAUTILUS_SPOT_BROKER_ID, BINANCE_SPOT_RATE_LIMITS,
BinanceRateLimitQuota,
},
credential::SigningCredential,
encoder::{decode_broker_id, encode_broker_id},
enums::{
BinanceEnvironment, BinanceProductType, BinanceRateLimitInterval, BinanceRateLimitType,
BinanceSide, BinanceTimeInForce,
},
models::BinanceErrorResponse,
parse::{
get_currency, parse_fill_report_sbe, parse_klines_to_bars,
parse_new_order_response_sbe, parse_order_status_report_sbe, parse_spot_instrument_sbe,
parse_spot_trades_sbe,
},
urls::get_http_base_url,
},
spot::{
enums::{
BinanceCancelReplaceMode, BinanceOrderResponseType, BinanceSpotOrderType,
order_type_to_binance_spot, time_in_force_to_binance_spot,
},
sbe::spot::{
ReadBuf, SBE_SCHEMA_ID, SBE_SCHEMA_VERSION,
error_response_codec::{self, ErrorResponseDecoder},
message_header_codec::MessageHeaderDecoder,
},
},
};
pub const SBE_SCHEMA_HEADER: &str = "3:3";
use crate::common::consts::BINANCE_SPOT_API_PATH as SPOT_API_PATH;
const BINANCE_GLOBAL_RATE_KEY: &str = "binance:spot:global";
const BINANCE_ORDERS_RATE_KEY: &str = "binance:spot:orders";
struct RateLimitConfig {
default_quota: Option<Quota>,
keyed_quotas: Vec<(String, Quota)>,
order_keys: Vec<String>,
}
#[derive(Debug, Clone)]
pub struct BinanceRawSpotHttpClient {
client: HttpClient,
base_url: String,
credential: Option<SigningCredential>,
recv_window: Option<u64>,
order_rate_keys: Vec<String>,
}
impl BinanceRawSpotHttpClient {
pub fn new(
environment: BinanceEnvironment,
api_key: Option<String>,
api_secret: Option<String>,
base_url_override: Option<String>,
recv_window: Option<u64>,
timeout_secs: Option<u64>,
proxy_url: Option<String>,
) -> BinanceSpotHttpResult<Self> {
let RateLimitConfig {
default_quota,
keyed_quotas,
order_keys,
} = Self::rate_limit_config();
let credential = match (api_key, api_secret) {
(Some(key), Some(secret)) => Some(SigningCredential::new(key, secret)),
(None, None) => None,
_ => return Err(BinanceSpotHttpError::MissingCredentials),
};
let base_url = base_url_override.unwrap_or_else(|| {
get_http_base_url(BinanceProductType::Spot, environment).to_string()
});
let headers = Self::default_headers(&credential);
let client = HttpClient::new(
headers,
vec![BINANCE_API_KEY_HEADER.to_string()],
keyed_quotas,
default_quota,
timeout_secs,
proxy_url,
)?;
Ok(Self {
client,
base_url,
credential,
recv_window,
order_rate_keys: order_keys,
})
}
#[must_use]
pub const fn schema_id() -> u16 {
SBE_SCHEMA_ID
}
#[must_use]
pub const fn schema_version() -> u16 {
SBE_SCHEMA_VERSION
}
pub async fn get<P>(&self, path: &str, params: Option<&P>) -> BinanceSpotHttpResult<Vec<u8>>
where
P: Serialize + ?Sized,
{
self.request(Method::GET, path, params, false, false).await
}
pub async fn get_signed<P>(
&self,
path: &str,
params: Option<&P>,
) -> BinanceSpotHttpResult<Vec<u8>>
where
P: Serialize + ?Sized,
{
self.request(Method::GET, path, params, true, false).await
}
pub async fn post_signed<P>(
&self,
path: &str,
params: Option<&P>,
) -> BinanceSpotHttpResult<Vec<u8>>
where
P: Serialize + ?Sized,
{
self.request(Method::POST, path, params, true, true).await
}
pub async fn delete_signed<P>(
&self,
path: &str,
params: Option<&P>,
) -> BinanceSpotHttpResult<Vec<u8>>
where
P: Serialize + ?Sized,
{
self.request(Method::DELETE, path, params, true, true).await
}
async fn request<P>(
&self,
method: Method,
path: &str,
params: Option<&P>,
signed: bool,
use_order_quota: bool,
) -> BinanceSpotHttpResult<Vec<u8>>
where
P: Serialize + ?Sized,
{
let mut query = params
.map(serde_urlencoded::to_string)
.transpose()
.map_err(|e| BinanceSpotHttpError::ValidationError(e.to_string()))?
.unwrap_or_default();
let mut headers = HashMap::new();
if signed {
let cred = self
.credential
.as_ref()
.ok_or(BinanceSpotHttpError::MissingCredentials)?;
if !query.is_empty() {
query.push('&');
}
let timestamp = Utc::now().timestamp_millis();
query.push_str(&format!("timestamp={timestamp}"));
if let Some(recv_window) = self.recv_window {
query.push_str(&format!("&recvWindow={recv_window}"));
}
let signature = cred.sign(&query);
query.push_str(&format!("&signature={signature}"));
headers.insert(
BINANCE_API_KEY_HEADER.to_string(),
cred.api_key().to_string(),
);
}
let url = self.build_url(path, &query);
let keys = self.rate_limit_keys(use_order_quota);
let response = self
.client
.request(
method,
url,
None::<&HashMap<String, Vec<String>>>,
Some(headers),
None,
None,
Some(keys),
)
.await?;
if !response.status.is_success() {
return self.parse_error_response(&response);
}
Ok(response.body.to_vec())
}
fn build_url(&self, path: &str, query: &str) -> String {
let normalized_path = if path.starts_with('/') {
path.to_string()
} else {
format!("/{path}")
};
let mut url = format!("{}{}{}", self.base_url, SPOT_API_PATH, normalized_path);
if !query.is_empty() {
url.push('?');
url.push_str(query);
}
url
}
fn rate_limit_keys(&self, use_orders: bool) -> Vec<String> {
if use_orders {
let mut keys = Vec::with_capacity(1 + self.order_rate_keys.len());
keys.push(BINANCE_GLOBAL_RATE_KEY.to_string());
keys.extend(self.order_rate_keys.iter().cloned());
keys
} else {
vec![BINANCE_GLOBAL_RATE_KEY.to_string()]
}
}
fn parse_error_response<T>(&self, response: &HttpResponse) -> BinanceSpotHttpResult<T> {
let status = response.status.as_u16();
let body = &response.body;
if let Ok(body_str) = std::str::from_utf8(body)
&& let Ok(err) = serde_json::from_str::<BinanceErrorResponse>(body_str)
{
return Err(BinanceSpotHttpError::BinanceError {
code: err.code,
message: err.msg,
});
}
if let Some((code, message)) = Self::try_decode_sbe_error(body) {
return Err(BinanceSpotHttpError::BinanceError {
code: code.into(),
message,
});
}
Err(BinanceSpotHttpError::UnexpectedStatus {
status,
body: hex::encode(body),
})
}
fn try_decode_sbe_error(body: &[u8]) -> Option<(i16, String)> {
const HEADER_LEN: usize = 8;
if body.len() < HEADER_LEN + error_response_codec::SBE_BLOCK_LENGTH as usize {
return None;
}
let buf = ReadBuf::new(body);
let header = MessageHeaderDecoder::default().wrap(buf, 0);
if header.template_id() != error_response_codec::SBE_TEMPLATE_ID {
return None;
}
let mut decoder = ErrorResponseDecoder::default().header(header, 0);
let code = decoder.code();
let msg_coords = decoder.msg_decoder();
let msg_bytes = decoder.msg_slice(msg_coords);
let message = String::from_utf8_lossy(msg_bytes).into_owned();
Some((code, message))
}
fn default_headers(credential: &Option<SigningCredential>) -> HashMap<String, String> {
let mut headers = HashMap::new();
headers.insert("User-Agent".to_string(), NAUTILUS_USER_AGENT.to_string());
headers.insert("Accept".to_string(), "application/sbe".to_string());
headers.insert("X-MBX-SBE".to_string(), SBE_SCHEMA_HEADER.to_string());
if let Some(cred) = credential {
headers.insert(
BINANCE_API_KEY_HEADER.to_string(),
cred.api_key().to_string(),
);
}
headers
}
fn rate_limit_config() -> RateLimitConfig {
let quotas = BINANCE_SPOT_RATE_LIMITS;
let mut keyed = Vec::new();
let mut order_keys = Vec::new();
let mut default = None;
for quota in quotas {
if let Some(q) = Self::quota_from(quota) {
match quota.rate_limit_type {
BinanceRateLimitType::RequestWeight if default.is_none() => {
default = Some(q);
}
BinanceRateLimitType::Orders => {
let key = format!("{}:{:?}", BINANCE_ORDERS_RATE_KEY, quota.interval);
order_keys.push(key.clone());
keyed.push((key, q));
}
_ => {}
}
}
}
let default_quota = default.unwrap_or_else(|| {
Quota::per_second(NonZeroU32::new(10).expect("non-zero")).expect("valid constant")
});
keyed.push((BINANCE_GLOBAL_RATE_KEY.to_string(), default_quota));
RateLimitConfig {
default_quota: Some(default_quota),
keyed_quotas: keyed,
order_keys,
}
}
fn quota_from(quota: &BinanceRateLimitQuota) -> Option<Quota> {
let burst = NonZeroU32::new(quota.limit)?;
match quota.interval {
BinanceRateLimitInterval::Second => Quota::per_second(burst),
BinanceRateLimitInterval::Minute => Some(Quota::per_minute(burst)),
BinanceRateLimitInterval::Day => {
Quota::with_period(std::time::Duration::from_secs(SECONDS_IN_DAY))
.map(|q| q.allow_burst(burst))
}
BinanceRateLimitInterval::Unknown => None,
}
}
pub async fn ping(&self) -> BinanceSpotHttpResult<()> {
let bytes = self.get("ping", None::<&()>).await?;
parse::decode_ping(&bytes)?;
Ok(())
}
pub async fn server_time(&self) -> BinanceSpotHttpResult<i64> {
let bytes = self.get("time", None::<&()>).await?;
let timestamp = parse::decode_server_time(&bytes)?;
Ok(timestamp)
}
pub async fn exchange_info(
&self,
) -> BinanceSpotHttpResult<super::models::BinanceExchangeInfoSbe> {
let bytes = self.get("exchangeInfo", None::<&()>).await?;
let info = parse::decode_exchange_info(&bytes)?;
Ok(info)
}
pub async fn depth(&self, params: &DepthParams) -> BinanceSpotHttpResult<BinanceDepth> {
let bytes = self.get("depth", Some(params)).await?;
let depth = parse::decode_depth(&bytes)?;
Ok(depth)
}
pub async fn trades(
&self,
symbol: &str,
limit: Option<u32>,
) -> BinanceSpotHttpResult<BinanceTrades> {
let params = TradesParams {
symbol: symbol.to_string(),
limit,
};
let bytes = self.get("trades", Some(¶ms)).await?;
let trades = parse::decode_trades(&bytes)?;
Ok(trades)
}
pub async fn klines(
&self,
symbol: &str,
interval: &str,
start_time: Option<i64>,
end_time: Option<i64>,
limit: Option<u32>,
) -> BinanceSpotHttpResult<BinanceKlines> {
let params = KlinesParams {
symbol: symbol.to_string(),
interval: interval.to_string(),
start_time,
end_time,
time_zone: None,
limit,
};
let bytes = self.get("klines", Some(¶ms)).await?;
let klines = parse::decode_klines(&bytes)?;
Ok(klines)
}
async fn get_json<P>(&self, path: &str, params: Option<&P>) -> BinanceSpotHttpResult<Vec<u8>>
where
P: Serialize + ?Sized,
{
let query = params
.map(serde_urlencoded::to_string)
.transpose()
.map_err(|e| BinanceSpotHttpError::ValidationError(e.to_string()))?
.unwrap_or_default();
let url = self.build_url(path, &query);
let keys = vec![BINANCE_GLOBAL_RATE_KEY.to_string()];
let response = self
.client
.request(
Method::GET,
url,
None::<&HashMap<String, Vec<String>>>,
None,
None,
None,
Some(keys),
)
.await?;
if !response.status.is_success() {
return self.parse_error_response(&response);
}
Ok(response.body.to_vec())
}
pub async fn ticker_24hr(
&self,
symbol: Option<&str>,
) -> BinanceSpotHttpResult<Vec<Ticker24hr>> {
let params = symbol.map(TickerParams::for_symbol);
let bytes = self.get_json("ticker/24hr", params.as_ref()).await?;
if symbol.is_some() {
let ticker: Ticker24hr = serde_json::from_slice(&bytes)
.map_err(|e| BinanceSpotHttpError::JsonError(e.to_string()))?;
Ok(vec![ticker])
} else {
let tickers: Vec<Ticker24hr> = serde_json::from_slice(&bytes)
.map_err(|e| BinanceSpotHttpError::JsonError(e.to_string()))?;
Ok(tickers)
}
}
pub async fn ticker_price(
&self,
symbol: Option<&str>,
) -> BinanceSpotHttpResult<Vec<TickerPrice>> {
let params = symbol.map(TickerParams::for_symbol);
let bytes = self.get_json("ticker/price", params.as_ref()).await?;
if symbol.is_some() {
let ticker: TickerPrice = serde_json::from_slice(&bytes)
.map_err(|e| BinanceSpotHttpError::JsonError(e.to_string()))?;
Ok(vec![ticker])
} else {
let tickers: Vec<TickerPrice> = serde_json::from_slice(&bytes)
.map_err(|e| BinanceSpotHttpError::JsonError(e.to_string()))?;
Ok(tickers)
}
}
pub async fn ticker_book(
&self,
symbol: Option<&str>,
) -> BinanceSpotHttpResult<Vec<BookTicker>> {
let params = symbol.map(TickerParams::for_symbol);
let bytes = self.get_json("ticker/bookTicker", params.as_ref()).await?;
if symbol.is_some() {
let ticker: BookTicker = serde_json::from_slice(&bytes)
.map_err(|e| BinanceSpotHttpError::JsonError(e.to_string()))?;
Ok(vec![ticker])
} else {
let tickers: Vec<BookTicker> = serde_json::from_slice(&bytes)
.map_err(|e| BinanceSpotHttpError::JsonError(e.to_string()))?;
Ok(tickers)
}
}
pub async fn avg_price(&self, symbol: &str) -> BinanceSpotHttpResult<AvgPrice> {
let params = AvgPriceParams::new(symbol);
let bytes = self.get_json("avgPrice", Some(¶ms)).await?;
let avg_price: AvgPrice = serde_json::from_slice(&bytes)
.map_err(|e| BinanceSpotHttpError::JsonError(e.to_string()))?;
Ok(avg_price)
}
pub async fn get_trade_fee(
&self,
symbol: Option<&str>,
) -> BinanceSpotHttpResult<Vec<TradeFee>> {
let params = symbol.map(TradeFeeParams::for_symbol);
let bytes = self
.get_signed_sapi("asset/tradeFee", params.as_ref())
.await?;
let fees: Vec<TradeFee> = serde_json::from_slice(&bytes)
.map_err(|e| BinanceSpotHttpError::JsonError(e.to_string()))?;
Ok(fees)
}
async fn get_signed_sapi<P>(
&self,
path: &str,
params: Option<&P>,
) -> BinanceSpotHttpResult<Vec<u8>>
where
P: Serialize + ?Sized,
{
let cred = self
.credential
.as_ref()
.ok_or(BinanceSpotHttpError::MissingCredentials)?;
let mut query = params
.map(serde_urlencoded::to_string)
.transpose()
.map_err(|e| BinanceSpotHttpError::ValidationError(e.to_string()))?
.unwrap_or_default();
if !query.is_empty() {
query.push('&');
}
let timestamp = Utc::now().timestamp_millis();
query.push_str(&format!("timestamp={timestamp}"));
if let Some(recv_window) = self.recv_window {
query.push_str(&format!("&recvWindow={recv_window}"));
}
let signature = cred.sign(&query);
query.push_str(&format!("&signature={signature}"));
let normalized_path = if path.starts_with('/') {
path.to_string()
} else {
format!("/{path}")
};
let mut url = format!("{}/sapi/v1{}", self.base_url, normalized_path);
if !query.is_empty() {
url.push('?');
url.push_str(&query);
}
let mut headers = HashMap::new();
headers.insert(
BINANCE_API_KEY_HEADER.to_string(),
cred.api_key().to_string(),
);
let keys = vec![BINANCE_GLOBAL_RATE_KEY.to_string()];
let response = self
.client
.request(
Method::GET,
url,
None::<&HashMap<String, Vec<String>>>,
Some(headers),
None,
None,
Some(keys),
)
.await?;
if !response.status.is_success() {
return self.parse_error_response(&response);
}
Ok(response.body.to_vec())
}
fn percent_encode(input: &str) -> String {
let mut result = String::with_capacity(input.len() * 3);
for byte in input.bytes() {
match byte {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'_' | b'.' | b'~' => {
result.push(byte as char);
}
_ => {
result.push('%');
result.push_str(&format!("{byte:02X}"));
}
}
}
result
}
pub async fn batch_submit_orders(
&self,
orders: &[BatchOrderItem],
) -> BinanceSpotHttpResult<Vec<BatchOrderResult>> {
if orders.is_empty() {
return Ok(Vec::new());
}
if orders.len() > 5 {
return Err(BinanceSpotHttpError::ValidationError(
"Batch order limit is 5 orders maximum".to_string(),
));
}
let batch_json = serde_json::to_string(orders)
.map_err(|e| BinanceSpotHttpError::ValidationError(e.to_string()))?;
let bytes = self
.batch_request(Method::POST, "batchOrders", &batch_json)
.await?;
let results: Vec<BatchOrderResult> = serde_json::from_slice(&bytes)
.map_err(|e| BinanceSpotHttpError::JsonError(e.to_string()))?;
Ok(results)
}
pub async fn batch_cancel_orders(
&self,
cancels: &[BatchCancelItem],
) -> BinanceSpotHttpResult<Vec<BatchCancelResult>> {
if cancels.is_empty() {
return Ok(Vec::new());
}
if cancels.len() > 5 {
return Err(BinanceSpotHttpError::ValidationError(
"Batch cancel limit is 5 orders maximum".to_string(),
));
}
let batch_json = serde_json::to_string(cancels)
.map_err(|e| BinanceSpotHttpError::ValidationError(e.to_string()))?;
let bytes = self
.batch_request(Method::DELETE, "batchOrders", &batch_json)
.await?;
let results: Vec<BatchCancelResult> = serde_json::from_slice(&bytes)
.map_err(|e| BinanceSpotHttpError::JsonError(e.to_string()))?;
Ok(results)
}
async fn batch_request(
&self,
method: Method,
path: &str,
batch_json: &str,
) -> BinanceSpotHttpResult<Vec<u8>> {
let cred = self
.credential
.as_ref()
.ok_or(BinanceSpotHttpError::MissingCredentials)?;
let encoded_batch = Self::percent_encode(batch_json);
let timestamp = Utc::now().timestamp_millis();
let mut query = format!("batchOrders={encoded_batch}×tamp={timestamp}");
if let Some(recv_window) = self.recv_window {
query.push_str(&format!("&recvWindow={recv_window}"));
}
let signature = cred.sign(&query);
query.push_str(&format!("&signature={signature}"));
let url = self.build_url(path, &query);
let mut headers = HashMap::new();
headers.insert(
BINANCE_API_KEY_HEADER.to_string(),
cred.api_key().to_string(),
);
let keys = self.rate_limit_keys(true);
let response = self
.client
.request(
method,
url,
None::<&HashMap<String, Vec<String>>>,
Some(headers),
None,
None,
Some(keys),
)
.await?;
if !response.status.is_success() {
return self.parse_error_response(&response);
}
Ok(response.body.to_vec())
}
pub async fn account(
&self,
params: &AccountInfoParams,
) -> BinanceSpotHttpResult<BinanceAccountInfo> {
let bytes = self.get_signed("account", Some(params)).await?;
let response = parse::decode_account(&bytes)?;
Ok(response)
}
pub async fn account_trades(
&self,
symbol: &str,
order_id: Option<i64>,
start_time: Option<i64>,
end_time: Option<i64>,
limit: Option<u32>,
) -> BinanceSpotHttpResult<Vec<BinanceAccountTrade>> {
let params = AccountTradesParams {
symbol: symbol.to_string(),
order_id,
start_time,
end_time,
from_id: None,
limit,
};
let bytes = self.get_signed("myTrades", Some(¶ms)).await?;
let response = parse::decode_account_trades(&bytes)?;
Ok(response)
}
pub async fn query_order(
&self,
symbol: &str,
order_id: Option<i64>,
client_order_id: Option<&str>,
) -> BinanceSpotHttpResult<BinanceOrderResponse> {
let params = QueryOrderParams {
symbol: symbol.to_string(),
order_id,
orig_client_order_id: client_order_id.map(|s| s.to_string()),
};
let bytes = self.get_signed("order", Some(¶ms)).await?;
let response = parse::decode_order(&bytes)?;
Ok(response)
}
pub async fn open_orders(
&self,
symbol: Option<&str>,
) -> BinanceSpotHttpResult<Vec<BinanceOrderResponse>> {
let params = OpenOrdersParams {
symbol: symbol.map(|s| s.to_string()),
};
let bytes = self.get_signed("openOrders", Some(¶ms)).await?;
let response = parse::decode_orders(&bytes)?;
Ok(response)
}
pub async fn all_orders(
&self,
symbol: &str,
start_time: Option<i64>,
end_time: Option<i64>,
limit: Option<u32>,
) -> BinanceSpotHttpResult<Vec<BinanceOrderResponse>> {
let params = AllOrdersParams {
symbol: symbol.to_string(),
order_id: None,
start_time,
end_time,
limit,
};
let bytes = self.get_signed("allOrders", Some(¶ms)).await?;
let response = parse::decode_orders(&bytes)?;
Ok(response)
}
async fn post_order<P>(&self, path: &str, params: Option<&P>) -> BinanceSpotHttpResult<Vec<u8>>
where
P: Serialize + ?Sized,
{
self.post_signed(path, params).await
}
async fn delete_order<P>(
&self,
path: &str,
params: Option<&P>,
) -> BinanceSpotHttpResult<Vec<u8>>
where
P: Serialize + ?Sized,
{
self.delete_signed(path, params).await
}
#[allow(clippy::too_many_arguments)]
pub async fn new_order(
&self,
symbol: &str,
side: BinanceSide,
order_type: BinanceSpotOrderType,
time_in_force: Option<BinanceTimeInForce>,
quantity: Option<&str>,
price: Option<&str>,
client_order_id: Option<&str>,
stop_price: Option<&str>,
) -> BinanceSpotHttpResult<BinanceNewOrderResponse> {
let params = NewOrderParams {
symbol: symbol.to_string(),
side,
order_type,
time_in_force,
quantity: quantity.map(|s| s.to_string()),
quote_order_qty: None,
price: price.map(|s| s.to_string()),
new_client_order_id: client_order_id.map(|s| s.to_string()),
stop_price: stop_price.map(|s| s.to_string()),
trailing_delta: None,
iceberg_qty: None,
new_order_resp_type: Some(BinanceOrderResponseType::Full),
self_trade_prevention_mode: None,
strategy_id: None,
strategy_type: None,
};
let bytes = self.post_order("order", Some(¶ms)).await?;
let response = parse::decode_new_order_full(&bytes)?;
Ok(response)
}
#[allow(clippy::too_many_arguments)]
pub async fn new_order_full(
&self,
symbol: &str,
side: BinanceSide,
order_type: BinanceSpotOrderType,
time_in_force: Option<BinanceTimeInForce>,
quantity: Option<&str>,
quote_order_qty: Option<&str>,
price: Option<&str>,
client_order_id: Option<&str>,
stop_price: Option<&str>,
iceberg_qty: Option<&str>,
) -> BinanceSpotHttpResult<BinanceNewOrderResponse> {
let params = NewOrderParams {
symbol: symbol.to_string(),
side,
order_type,
time_in_force,
quantity: quantity.map(|s| s.to_string()),
quote_order_qty: quote_order_qty.map(|s| s.to_string()),
price: price.map(|s| s.to_string()),
new_client_order_id: client_order_id.map(|s| s.to_string()),
stop_price: stop_price.map(|s| s.to_string()),
trailing_delta: None,
iceberg_qty: iceberg_qty.map(|s| s.to_string()),
new_order_resp_type: Some(BinanceOrderResponseType::Full),
self_trade_prevention_mode: None,
strategy_id: None,
strategy_type: None,
};
let bytes = self.post_order("order", Some(¶ms)).await?;
let response = parse::decode_new_order_full(&bytes)?;
Ok(response)
}
#[allow(clippy::too_many_arguments)]
pub async fn cancel_replace_order(
&self,
symbol: &str,
side: BinanceSide,
order_type: BinanceSpotOrderType,
time_in_force: Option<BinanceTimeInForce>,
quantity: Option<&str>,
price: Option<&str>,
cancel_order_id: Option<i64>,
cancel_client_order_id: Option<&str>,
new_client_order_id: Option<&str>,
) -> BinanceSpotHttpResult<BinanceNewOrderResponse> {
let params = CancelReplaceOrderParams {
symbol: symbol.to_string(),
side,
order_type,
cancel_replace_mode: BinanceCancelReplaceMode::StopOnFailure,
time_in_force,
quantity: quantity.map(|s| s.to_string()),
quote_order_qty: None,
price: price.map(|s| s.to_string()),
cancel_order_id,
cancel_orig_client_order_id: cancel_client_order_id.map(|s| s.to_string()),
new_client_order_id: new_client_order_id.map(|s| s.to_string()),
stop_price: None,
trailing_delta: None,
iceberg_qty: None,
new_order_resp_type: Some(BinanceOrderResponseType::Full),
self_trade_prevention_mode: None,
};
let bytes = self
.post_order("order/cancelReplace", Some(¶ms))
.await?;
let response = parse::decode_new_order_full(&bytes)?;
Ok(response)
}
pub async fn cancel_order(
&self,
symbol: &str,
order_id: Option<i64>,
client_order_id: Option<&str>,
) -> BinanceSpotHttpResult<BinanceCancelOrderResponse> {
let params = match (order_id, client_order_id) {
(Some(id), _) => CancelOrderParams::by_order_id(symbol, id),
(None, Some(id)) => CancelOrderParams::by_client_order_id(symbol, id.to_string()),
(None, None) => {
return Err(BinanceSpotHttpError::ValidationError(
"Either order_id or client_order_id must be provided".to_string(),
));
}
};
let bytes = self.delete_order("order", Some(¶ms)).await?;
let response = parse::decode_cancel_order(&bytes)?;
Ok(response)
}
pub async fn cancel_open_orders(
&self,
symbol: &str,
) -> BinanceSpotHttpResult<Vec<BinanceCancelOrderResponse>> {
let params = CancelOpenOrdersParams::new(symbol.to_string());
let bytes = self.delete_order("openOrders", Some(¶ms)).await?;
let response = parse::decode_cancel_open_orders(&bytes)?;
Ok(response)
}
async fn request_with_api_key<P>(
&self,
method: Method,
path: &str,
params: Option<&P>,
) -> BinanceSpotHttpResult<Vec<u8>>
where
P: Serialize + ?Sized,
{
let cred = self
.credential
.as_ref()
.ok_or(BinanceSpotHttpError::MissingCredentials)?;
let query = params
.map(serde_urlencoded::to_string)
.transpose()
.map_err(|e| BinanceSpotHttpError::ValidationError(e.to_string()))?
.unwrap_or_default();
let url = self.build_url(path, &query);
let mut headers = HashMap::new();
headers.insert(
BINANCE_API_KEY_HEADER.to_string(),
cred.api_key().to_string(),
);
let keys = vec![BINANCE_GLOBAL_RATE_KEY.to_string()];
let response = self
.client
.request(
method,
url,
None::<&HashMap<String, Vec<String>>>,
Some(headers),
None,
None,
Some(keys),
)
.await?;
if !response.status.is_success() {
return self.parse_error_response(&response);
}
Ok(response.body.to_vec())
}
pub async fn create_listen_key(&self) -> BinanceSpotHttpResult<ListenKeyResponse> {
let bytes = self
.request_with_api_key(Method::POST, "userDataStream", None::<&()>)
.await?;
let response: ListenKeyResponse = serde_json::from_slice(&bytes)
.map_err(|e| BinanceSpotHttpError::JsonError(e.to_string()))?;
Ok(response)
}
pub async fn extend_listen_key(&self, listen_key: &str) -> BinanceSpotHttpResult<()> {
let params = ListenKeyParams::new(listen_key);
self.request_with_api_key(Method::PUT, "userDataStream", Some(¶ms))
.await?;
Ok(())
}
pub async fn close_listen_key(&self, listen_key: &str) -> BinanceSpotHttpResult<()> {
let params = ListenKeyParams::new(listen_key);
self.request_with_api_key(Method::DELETE, "userDataStream", Some(¶ms))
.await?;
Ok(())
}
}
pub struct BinanceSpotHttpClient {
inner: Arc<BinanceRawSpotHttpClient>,
clock: &'static AtomicTime,
instruments_cache: Arc<DashMap<Ustr, InstrumentAny>>,
}
impl Clone for BinanceSpotHttpClient {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
clock: self.clock,
instruments_cache: self.instruments_cache.clone(),
}
}
}
impl Debug for BinanceSpotHttpClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(BinanceSpotHttpClient))
.field("inner", &self.inner)
.field("instruments_cached", &self.instruments_cache.len())
.finish()
}
}
impl BinanceSpotHttpClient {
#[allow(clippy::too_many_arguments)]
pub fn new(
environment: BinanceEnvironment,
clock: &'static AtomicTime,
api_key: Option<String>,
api_secret: Option<String>,
base_url_override: Option<String>,
recv_window: Option<u64>,
timeout_secs: Option<u64>,
proxy_url: Option<String>,
) -> BinanceSpotHttpResult<Self> {
let inner = BinanceRawSpotHttpClient::new(
environment,
api_key,
api_secret,
base_url_override,
recv_window,
timeout_secs,
proxy_url,
)?;
Ok(Self {
inner: Arc::new(inner),
clock,
instruments_cache: Arc::new(DashMap::new()),
})
}
#[must_use]
pub fn inner(&self) -> &BinanceRawSpotHttpClient {
&self.inner
}
#[must_use]
pub const fn schema_id() -> u16 {
SBE_SCHEMA_ID
}
#[must_use]
pub const fn schema_version() -> u16 {
SBE_SCHEMA_VERSION
}
fn generate_ts_init(&self) -> UnixNanos {
self.clock.get_time_ns()
}
fn instrument_from_cache(&self, symbol: Ustr) -> anyhow::Result<InstrumentAny> {
self.instruments_cache
.get(&symbol)
.map(|entry| entry.value().clone())
.ok_or_else(|| anyhow::anyhow!("Instrument {symbol} not in cache"))
}
pub fn cache_instruments(&self, instruments: Vec<InstrumentAny>) {
for inst in instruments {
self.instruments_cache
.insert(inst.raw_symbol().inner(), inst);
}
}
pub fn cache_instrument(&self, instrument: InstrumentAny) {
self.instruments_cache
.insert(instrument.raw_symbol().inner(), instrument);
}
#[must_use]
pub fn get_instrument(&self, symbol: &Ustr) -> Option<InstrumentAny> {
self.instruments_cache
.get(symbol)
.map(|entry| entry.value().clone())
}
pub async fn ping(&self) -> BinanceSpotHttpResult<()> {
self.inner.ping().await
}
pub async fn server_time(&self) -> BinanceSpotHttpResult<i64> {
self.inner.server_time().await
}
pub async fn exchange_info(
&self,
) -> BinanceSpotHttpResult<super::models::BinanceExchangeInfoSbe> {
self.inner.exchange_info().await
}
pub async fn request_instruments(&self) -> BinanceSpotHttpResult<Vec<InstrumentAny>> {
let info = self.exchange_info().await?;
let ts_init = self.generate_ts_init();
let mut instruments = Vec::with_capacity(info.symbols.len());
for symbol in &info.symbols {
match parse_spot_instrument_sbe(symbol, ts_init, ts_init) {
Ok(instrument) => instruments.push(instrument),
Err(e) => {
log::debug!(
"Skipping symbol during instrument parsing: symbol={}, error={e}",
symbol.symbol
);
}
}
}
self.cache_instruments(instruments.clone());
log::info!("Loaded spot instruments: count={}", instruments.len());
Ok(instruments)
}
pub async fn request_trades(
&self,
instrument_id: InstrumentId,
limit: Option<u32>,
) -> anyhow::Result<Vec<TradeTick>> {
let symbol = instrument_id.symbol.inner();
let instrument = self.instrument_from_cache(symbol)?;
let ts_init = self.generate_ts_init();
let trades = self
.inner
.trades(symbol.as_str(), limit)
.await
.map_err(|e| anyhow::anyhow!(e))?;
parse_spot_trades_sbe(&trades, &instrument, ts_init)
}
pub async fn request_bars(
&self,
bar_type: BarType,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<u32>,
) -> anyhow::Result<Vec<Bar>> {
anyhow::ensure!(
bar_type.aggregation_source() == AggregationSource::External,
"Only EXTERNAL aggregation is supported"
);
let spec = bar_type.spec();
let step = spec.step.get();
let interval = match spec.aggregation {
BarAggregation::Second => {
anyhow::bail!("Binance Spot does not support second-level kline intervals")
}
BarAggregation::Minute => format!("{step}m"),
BarAggregation::Hour => format!("{step}h"),
BarAggregation::Day => format!("{step}d"),
BarAggregation::Week => format!("{step}w"),
BarAggregation::Month => format!("{step}M"),
a => anyhow::bail!("Binance does not support {a:?} aggregation"),
};
let symbol = bar_type.instrument_id().symbol;
let instrument = self.instrument_from_cache(symbol.inner())?;
let ts_init = self.generate_ts_init();
let klines = self
.inner
.klines(
symbol.as_str(),
&interval,
start.map(|dt| dt.timestamp_millis()),
end.map(|dt| dt.timestamp_millis()),
limit,
)
.await
.map_err(|e| anyhow::anyhow!(e))?;
parse_klines_to_bars(&klines, bar_type, &instrument, ts_init)
}
pub async fn request_account_state(
&self,
account_id: AccountId,
) -> anyhow::Result<AccountState> {
let ts_init = self.clock.get_time_ns();
let params = AccountInfoParams::default();
let account_info = self.inner.account(¶ms).await?;
Ok(account_info.to_account_state(account_id, ts_init))
}
pub async fn request_order_status_report(
&self,
account_id: AccountId,
instrument_id: InstrumentId,
venue_order_id: Option<VenueOrderId>,
client_order_id: Option<ClientOrderId>,
) -> anyhow::Result<OrderStatusReport> {
anyhow::ensure!(
venue_order_id.is_some() || client_order_id.is_some(),
"Either venue_order_id or client_order_id must be provided"
);
let symbol = instrument_id.symbol.inner();
let instrument = self.instrument_from_cache(symbol)?;
let ts_init = self.generate_ts_init();
let order_id = venue_order_id
.map(|id| id.inner().parse::<i64>())
.transpose()
.map_err(|_| anyhow::anyhow!("Invalid venue order ID"))?;
let client_id_str =
client_order_id.map(|id| encode_broker_id(&id, BINANCE_NAUTILUS_SPOT_BROKER_ID));
let order = self
.inner
.query_order(symbol.as_str(), order_id, client_id_str.as_deref())
.await
.map_err(|e| anyhow::anyhow!(e))?;
parse_order_status_report_sbe(
&order,
account_id,
&instrument,
BINANCE_NAUTILUS_SPOT_BROKER_ID,
ts_init,
)
}
#[allow(clippy::too_many_arguments)]
pub async fn request_order_status_reports(
&self,
account_id: AccountId,
instrument_id: Option<InstrumentId>,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
open_only: bool,
limit: Option<u32>,
) -> anyhow::Result<Vec<OrderStatusReport>> {
let ts_init = self.generate_ts_init();
let symbol = instrument_id.map(|id| id.symbol.to_string());
let orders = if open_only {
self.inner
.open_orders(symbol.as_deref())
.await
.map_err(|e| anyhow::anyhow!(e))?
} else {
let symbol = symbol
.ok_or_else(|| anyhow::anyhow!("instrument_id is required when open_only=false"))?;
self.inner
.all_orders(
&symbol,
start.map(|dt| dt.timestamp_millis()),
end.map(|dt| dt.timestamp_millis()),
limit,
)
.await
.map_err(|e| anyhow::anyhow!(e))?
};
orders
.iter()
.map(|order| {
let symbol = Ustr::from(&order.symbol);
let instrument = self.instrument_from_cache(symbol)?;
parse_order_status_report_sbe(
order,
account_id,
&instrument,
BINANCE_NAUTILUS_SPOT_BROKER_ID,
ts_init,
)
})
.collect()
}
#[allow(clippy::too_many_arguments)]
pub async fn request_fill_reports(
&self,
account_id: AccountId,
instrument_id: InstrumentId,
venue_order_id: Option<VenueOrderId>,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<u32>,
) -> anyhow::Result<Vec<FillReport>> {
let ts_init = self.generate_ts_init();
let symbol = instrument_id.symbol.inner();
let order_id = venue_order_id
.map(|id| id.inner().parse::<i64>())
.transpose()
.map_err(|_| anyhow::anyhow!("Invalid venue order ID"))?;
let trades = self
.inner
.account_trades(
symbol.as_str(),
order_id,
start.map(|dt| dt.timestamp_millis()),
end.map(|dt| dt.timestamp_millis()),
limit,
)
.await
.map_err(|e| anyhow::anyhow!(e))?;
trades
.iter()
.map(|trade| {
let symbol = Ustr::from(&trade.symbol);
let instrument = self.instrument_from_cache(symbol)?;
let commission_currency = get_currency(&trade.commission_asset);
parse_fill_report_sbe(trade, account_id, &instrument, commission_currency, ts_init)
})
.collect()
}
#[allow(clippy::too_many_arguments)]
pub async fn submit_order(
&self,
account_id: AccountId,
instrument_id: InstrumentId,
client_order_id: ClientOrderId,
order_side: OrderSide,
order_type: OrderType,
quantity: Quantity,
time_in_force: TimeInForce,
price: Option<Price>,
trigger_price: Option<Price>,
post_only: bool,
quote_quantity: bool,
display_qty: Option<Quantity>,
) -> anyhow::Result<OrderStatusReport> {
let symbol = instrument_id.symbol.inner();
let instrument = self.instrument_from_cache(symbol)?;
let ts_init = self.generate_ts_init();
let binance_side = BinanceSide::try_from(order_side)?;
let binance_order_type = order_type_to_binance_spot(order_type, post_only)?;
let requires_trigger = matches!(
order_type,
OrderType::StopMarket
| OrderType::StopLimit
| OrderType::MarketIfTouched
| OrderType::LimitIfTouched
);
if requires_trigger && trigger_price.is_none() {
anyhow::bail!("Conditional orders require a trigger price");
}
let requires_price = matches!(
binance_order_type,
BinanceSpotOrderType::Limit
| BinanceSpotOrderType::StopLossLimit
| BinanceSpotOrderType::TakeProfitLimit
| BinanceSpotOrderType::LimitMaker
);
if requires_price && price.is_none() {
anyhow::bail!("{binance_order_type:?} orders require a price");
}
let supports_tif = matches!(
binance_order_type,
BinanceSpotOrderType::Limit
| BinanceSpotOrderType::StopLossLimit
| BinanceSpotOrderType::TakeProfitLimit
);
let binance_tif = if supports_tif {
Some(time_in_force_to_binance_spot(time_in_force)?)
} else {
None
};
let qty_str = quantity.to_string();
let price_str = price.map(|p| p.to_string());
let stop_price_str = trigger_price.map(|p| p.to_string());
let iceberg_qty_str = display_qty.map(|q| q.to_string());
let client_id_str = encode_broker_id(&client_order_id, BINANCE_NAUTILUS_SPOT_BROKER_ID);
if quote_quantity && binance_order_type != BinanceSpotOrderType::Market {
anyhow::bail!("quoteOrderQty is only supported for MARKET orders");
}
let (base_qty, quote_qty) = if quote_quantity {
(None, Some(qty_str.as_str()))
} else {
(Some(qty_str.as_str()), None)
};
let response = self
.inner
.new_order_full(
symbol.as_str(),
binance_side,
binance_order_type,
binance_tif,
base_qty,
quote_qty,
price_str.as_deref(),
Some(&client_id_str),
stop_price_str.as_deref(),
iceberg_qty_str.as_deref(),
)
.await?;
parse_new_order_response_sbe(
&response,
account_id,
&instrument,
BINANCE_NAUTILUS_SPOT_BROKER_ID,
ts_init,
)
}
pub async fn submit_order_list(
&self,
orders: &[BatchOrderItem],
) -> BinanceSpotHttpResult<Vec<BatchOrderResult>> {
self.inner.batch_submit_orders(orders).await
}
#[allow(clippy::too_many_arguments)]
pub async fn modify_order(
&self,
account_id: AccountId,
instrument_id: InstrumentId,
venue_order_id: VenueOrderId,
client_order_id: ClientOrderId,
order_side: OrderSide,
order_type: OrderType,
quantity: Quantity,
time_in_force: TimeInForce,
price: Option<Price>,
) -> anyhow::Result<OrderStatusReport> {
let symbol = instrument_id.symbol.inner();
let instrument = self.instrument_from_cache(symbol)?;
let ts_init = self.generate_ts_init();
let binance_side = BinanceSide::try_from(order_side)?;
let binance_order_type = order_type_to_binance_spot(order_type, false)?;
let binance_tif = time_in_force_to_binance_spot(time_in_force)?;
let cancel_order_id: i64 = venue_order_id
.inner()
.parse()
.map_err(|_| anyhow::anyhow!("Invalid venue order ID: {venue_order_id}"))?;
let qty_str = quantity.to_string();
let price_str = price.map(|p| p.to_string());
let client_id_str = encode_broker_id(&client_order_id, BINANCE_NAUTILUS_SPOT_BROKER_ID);
let response = self
.inner
.cancel_replace_order(
symbol.as_str(),
binance_side,
binance_order_type,
Some(binance_tif),
Some(&qty_str),
price_str.as_deref(),
Some(cancel_order_id),
None,
Some(&client_id_str),
)
.await
.map_err(|e| anyhow::anyhow!(e))?;
parse_new_order_response_sbe(
&response,
account_id,
&instrument,
BINANCE_NAUTILUS_SPOT_BROKER_ID,
ts_init,
)
}
pub async fn cancel_order(
&self,
instrument_id: InstrumentId,
venue_order_id: Option<VenueOrderId>,
client_order_id: Option<ClientOrderId>,
) -> anyhow::Result<VenueOrderId> {
let symbol = instrument_id.symbol.inner();
let order_id = venue_order_id
.map(|id| id.inner().parse::<i64>())
.transpose()
.map_err(|_| anyhow::anyhow!("Invalid venue order ID"))?;
let client_id_str =
client_order_id.map(|id| encode_broker_id(&id, BINANCE_NAUTILUS_SPOT_BROKER_ID));
let response = self
.inner
.cancel_order(symbol.as_str(), order_id, client_id_str.as_deref())
.await
.map_err(|e| anyhow::anyhow!(e))?;
Ok(VenueOrderId::new(response.order_id.to_string()))
}
pub async fn batch_cancel_orders(
&self,
cancels: &[BatchCancelItem],
) -> BinanceSpotHttpResult<Vec<BatchCancelResult>> {
self.inner.batch_cancel_orders(cancels).await
}
pub async fn cancel_all_orders(
&self,
instrument_id: InstrumentId,
) -> anyhow::Result<Vec<(VenueOrderId, ClientOrderId)>> {
let symbol = instrument_id.symbol.inner();
let responses = self
.inner
.cancel_open_orders(symbol.as_str())
.await
.map_err(|e| anyhow::anyhow!(e))?;
Ok(responses
.into_iter()
.map(|r| {
(
VenueOrderId::new(r.order_id.to_string()),
ClientOrderId::new(decode_broker_id(
&r.orig_client_order_id,
BINANCE_NAUTILUS_SPOT_BROKER_ID,
)),
)
})
.collect())
}
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
#[rstest]
fn test_schema_constants() {
assert_eq!(BinanceRawSpotHttpClient::schema_id(), 3);
assert_eq!(BinanceRawSpotHttpClient::schema_version(), 3);
assert_eq!(BinanceSpotHttpClient::schema_id(), 3);
assert_eq!(BinanceSpotHttpClient::schema_version(), 3);
}
#[rstest]
fn test_sbe_schema_header() {
assert_eq!(SBE_SCHEMA_HEADER, "3:3");
}
#[rstest]
fn test_default_headers_include_sbe() {
let headers = BinanceRawSpotHttpClient::default_headers(&None);
assert_eq!(headers.get("Accept"), Some(&"application/sbe".to_string()));
assert_eq!(headers.get("X-MBX-SBE"), Some(&"3:3".to_string()));
}
#[rstest]
fn test_rate_limit_config() {
let config = BinanceRawSpotHttpClient::rate_limit_config();
assert!(config.default_quota.is_some());
assert_eq!(config.order_keys.len(), 2);
}
#[rstest]
fn test_quota_from_unknown_interval_returns_none() {
let quota = BinanceRateLimitQuota {
rate_limit_type: BinanceRateLimitType::Orders,
interval: BinanceRateLimitInterval::Unknown,
interval_num: 1,
limit: 10,
};
assert!(BinanceRawSpotHttpClient::quota_from("a).is_none());
}
}